-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpacket.py
52 lines (40 loc) · 1.17 KB
/
packet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from scapy.all import *
from threading import Thread, Event
from time import sleep
class Sniffer(Thread):
def __init__(self, interface="ens33"):
super().__init__()
self.daemon = True
self.socket = None
self.interface = interface
self.stop_sniffer = Event()
def run(self):
self.socket = conf.L2listen(
type=ETH_P_ALL,
iface=self.interface,
filter="ip"
)
sniff(
opened_socket=self.socket,
prn=self.print_packet,
stop_filter=self.should_stop_sniffer
)
def join(self, timeout=None):
self.stop_sniffer.set()
super().join(timeout)
def should_stop_sniffer(self, packet):
return self.stop_sniffer.isSet()
def print_packet(self, packet):
ip_layer = packet.getlayer(IP)
print("[!] New Packet: {src} -> {dst}".format(src=ip_layer.src, dst=ip_layer.dst))
sniffer = Sniffer()
print("[*] Start sniffing...")
sniffer.start()
try:
while True:
sleep(100)
except KeyboardInterrupt:
print("[*] Stop sniffing")
sniffer.join(2.0)
if sniffer.isAlive():
sniffer.socket.close()