|
|
#!/usr/bin/env python3 from scapy.all import * import ipaddress from threading import Thread, Event from time import sleep import os
recv_count = 0 pcount = 0
server_port = 50508
class Sniffer(Thread): def __init__(self, iface="ens18"):
super().__init__()
self.daemon = True self.recv_count = 0 self.socket = None self.iface = iface self.stop_sniffer = Event()
self.debug_src = "" self.debug_sport = 0
self.last_ack = 0 self.last_seq = 0
def run(self): self.socket = conf.L2listen( type=ETH_P_ALL, iface=self.iface, filter="ip" )
sniff( opened_socket=self.socket, prn=self.handle_packet,
)
def join(self, timeout=None): self.stop_sniffer.set() super().join(timeout)
def handle_debug_packet(self, payload): some_packet = IP(bytes(payload)) print_short(some_packet) #some_packet.show()
def handle_packet(self, packet):
#ip_layer = packet.getlayer(IP) #print("[!] New Packet: {src} -> {dst}".format(src=ip_layer.src, dst=ip_layer.dst))
if TCP in packet: tcp_sport = packet[TCP].sport tcp_dport = packet[TCP].dport
if tcp_dport == server_port: #print("sniffed a debug packet..") #packet.show()
if self.recv_count > 3: some_payload = packet[TCP].payload self.handle_debug_packet(some_payload)
self.debug_src = packet[IP].src self.debug_sport = tcp_sport
self.last_ack = packet[TCP].ack self.last_seq = packet[TCP].seq
self.recv_count += 1
def print_short(pkt):
global pcount
pcount += 1 sport = 0 dport = 0 ptype = "other" flags = ""
if TCP in pkt: dport = pkt[TCP].dport sport = pkt[TCP].sport flags = " " + str(pkt[TCP].flags) ptype = "tcp"
if ICMP in pkt: ptype = "icmp"
if UDP in pkt: dport = pkt[UDP].dport sport = pkt[UDP].sport ptype = "udp"
print("~~ " + str(pcount) + '. NetworkGenie ' + ptype + flags + ' pkt from ' + str(pkt[IP].src) + ":" + str(sport) + " -> + " + str(pkt[IP].dst) + ":" + str(dport) + " with length: " + str(len(pkt)))
# TODO: make this function be able to craft full custom packet including # source IP, sport, protocol, flags, payload, etc. def craft_send_payload(dip, dest_port):
payload = "" send_pkt = IP(dst=dip, src="10.1.10.4") / TCP(dport=dest_port,sport=40555,flags="S",window=8000) send_bytes = bytes(send_pkt) payload = send_bytes send_pkt.show()
print("debug send payload: " + str(payload))
return payload
def get_send_payload():
payload = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
send_pkt = IP(dst="9.9.9.9", src="10.0.0.161") / TCP(dport=80,sport=40404,flags="S") / "AAAAAAAA" send_bytes = bytes(send_pkt) payload = send_bytes
print("debug send payload: " + str(payload))
return payload
def send_debug_packet(sniffer, is_custom):
debug_port = sniffer.debug_sport debug_ip = sniffer.debug_src
send_seq = sniffer.last_ack send_ack = sniffer.last_seq
if debug_port == 0 or debug_ip == '': print("There was no debug source connection to send to") return
print("sending debug packet to " + str(debug_ip) + ":" + str(debug_port)) send_payload = ""
if is_custom:
print("\n\nEnter dest ip for packet..") dip = input("Enter IP: ") print("\n\nEnter dest port for packet..") dport = input("Enter port: ") send_payload = craft_send_payload(dip, int(dport))
else: send_payload = get_send_payload()
packet = IP(dst=debug_ip) / TCP(dport=debug_port, sport=server_port, flags='PA', seq=send_seq, ack=send_ack) / send_payload send(packet, iface="ens18") print("sent debug packet: ") packet.show()
def main():
sniffer = Sniffer(iface='ens18') print('starting sniffer..') sniffer.start() time.sleep(3)
done = False
while not(done):
print("Enter action to take..") print("1. Keep sniffing") print("2. Send test packet back") print("3. Craft custom packet to send from genie") print("4. Quit")
answer = input("Enter answer: ")
if answer == "1": print("sleeping for sniffer..") time.sleep(5) elif answer == "2": send_debug_packet(sniffer, False) elif answer == "3": send_debug_packet(sniffer, True) elif answer == "4": print("ending the sniffer") done = True print("user answer was: " + str(answer)) print("\n") time.sleep(1)
print("\n\nFinished sniffing for debug packets")
if __name__ == '__main__': main()
|