forked from aquasecurity/kube-hunter
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path__main__.py
executable file
·143 lines (121 loc) · 4.48 KB
/
__main__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#!/usr/bin/env python3
# flake8: noqa: E402
from functools import partial
import logging
import threading
from kube_hunter.conf import Config, set_config
from kube_hunter.conf.parser import parse_args
from kube_hunter.conf.logging import setup_logger
from kube_hunter.plugins import initialize_plugin_manager
pm = initialize_plugin_manager()
# Using a plugin hook for adding arguments before parsing
args = parse_args(add_args_hook=pm.hook.parser_add_arguments)
config = Config(
active=args.active,
cidr=args.cidr,
include_patched_versions=args.include_patched_versions,
interface=args.interface,
log_file=args.log_file,
mapping=args.mapping,
network_timeout=args.network_timeout,
num_worker_threads=args.num_worker_threads,
pod=args.pod,
quick=args.quick,
remote=args.remote,
statistics=args.statistics,
k8s_auto_discover_nodes=args.k8s_auto_discover_nodes,
service_account_token=args.service_account_token,
kubeconfig=args.kubeconfig,
enable_cve_hunting=args.enable_cve_hunting,
custom=args.custom,
)
setup_logger(args.log, args.log_file)
set_config(config)
# Running all other registered plugins before execution
pm.hook.load_plugin(args=args)
from kube_hunter.core.events.event_handler import handler
from kube_hunter.core.events.types import HuntFinished, HuntStarted
from kube_hunter.modules.discovery.hosts import RunningAsPodEvent, HostScanEvent
from kube_hunter.modules.report import get_reporter, get_dispatcher
logger = logging.getLogger(__name__)
config.dispatcher = get_dispatcher(args.dispatch)
config.reporter = get_reporter(args.report)
def interactive_set_config():
"""Sets config manually, returns True for success"""
options = [
("Remote scanning", "scans one or more specific IPs or DNS names"),
("Interface scanning", "scans subnets on all local network interfaces"),
("IP range scanning", "scans a given IP range"),
]
print("Choose one of the options below:")
for i, (option, explanation) in enumerate(options):
print("{}. {} ({})".format(i + 1, option.ljust(20), explanation))
choice = input("Your choice: ")
if choice == "1":
config.remote = input("Remotes (separated by a ','): ").replace(" ", "").split(",")
elif choice == "2":
config.interface = True
elif choice == "3":
config.cidr = (
input("CIDR separated by a ',' (example - 192.168.0.0/16,!192.168.0.8/32,!192.168.1.0/24): ")
.replace(" ", "")
.split(",")
)
else:
return False
return True
def list_hunters(class_names=False):
print("\nPassive Hunters:\n----------------")
for hunter, docs in handler.passive_hunters.items():
name, doc = hunter.parse_docs(docs)
if class_names:
name = hunter.__name__
print(f"* {name}\n {doc}\n")
if config.active:
print("\n\nActive Hunters:\n---------------")
for hunter, docs in handler.active_hunters.items():
name, doc = hunter.parse_docs(docs)
if class_names:
name = hunter.__name__
print(f"* {name}\n {doc}\n")
hunt_started_lock = threading.Lock()
hunt_started = False
def main():
global hunt_started
scan_options = [config.pod, config.cidr, config.remote, config.interface, config.k8s_auto_discover_nodes]
try:
if args.list:
if args.raw_hunter_names:
list_hunters(class_names=True)
else:
list_hunters()
return
if not any(scan_options):
if not interactive_set_config():
return
with hunt_started_lock:
hunt_started = True
handler.publish_event(HuntStarted())
if config.pod:
handler.publish_event(RunningAsPodEvent())
else:
handler.publish_event(HostScanEvent())
# Blocking to see discovery output
handler.join()
except KeyboardInterrupt:
logger.debug("Kube-Hunter stopped by user")
# happens when running a container without interactive option
except EOFError:
logger.error("\033[0;31mPlease run again with -it\033[0m")
finally:
hunt_started_lock.acquire()
if hunt_started:
hunt_started_lock.release()
handler.publish_event(HuntFinished())
handler.join()
handler.free()
logger.debug("Cleaned Queue")
else:
hunt_started_lock.release()
if __name__ == "__main__":
main()