You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
226 lines
5.0 KiB
226 lines
5.0 KiB
#!/usr/bin/env python3
|
|
from scapy.all import *
|
|
import ipaddress
|
|
from threading import Thread, Event
|
|
from time import sleep
|
|
import os
|
|
|
|
|
|
recv_count = 0
|
|
pcount = 0
|
|
|
|
server_port = 50508
|
|
|
|
|
|
|
|
class Sniffer(Thread):
|
|
def __init__(self, iface="ens18"):
|
|
|
|
super().__init__()
|
|
|
|
self.daemon = True
|
|
self.recv_count = 0
|
|
self.socket = None
|
|
self.iface = iface
|
|
self.stop_sniffer = Event()
|
|
|
|
self.debug_src = ""
|
|
self.debug_sport = 0
|
|
|
|
self.last_ack = 0
|
|
self.last_seq = 0
|
|
|
|
|
|
def run(self):
|
|
self.socket = conf.L2listen(
|
|
type=ETH_P_ALL,
|
|
iface=self.iface,
|
|
filter="ip"
|
|
)
|
|
|
|
sniff(
|
|
opened_socket=self.socket,
|
|
prn=self.handle_packet,
|
|
|
|
)
|
|
|
|
def join(self, timeout=None):
|
|
self.stop_sniffer.set()
|
|
super().join(timeout)
|
|
|
|
|
|
def handle_debug_packet(self, payload):
|
|
some_packet = IP(bytes(payload))
|
|
print_short(some_packet)
|
|
#some_packet.show()
|
|
|
|
|
|
|
|
def handle_packet(self, packet):
|
|
|
|
#ip_layer = packet.getlayer(IP)
|
|
#print("[!] New Packet: {src} -> {dst}".format(src=ip_layer.src, dst=ip_layer.dst))
|
|
|
|
if TCP in packet:
|
|
tcp_sport = packet[TCP].sport
|
|
tcp_dport = packet[TCP].dport
|
|
|
|
if tcp_dport == server_port:
|
|
#print("sniffed a debug packet..")
|
|
#packet.show()
|
|
|
|
if self.recv_count > 3:
|
|
some_payload = packet[TCP].payload
|
|
self.handle_debug_packet(some_payload)
|
|
|
|
|
|
self.debug_src = packet[IP].src
|
|
self.debug_sport = tcp_sport
|
|
|
|
self.last_ack = packet[TCP].ack
|
|
self.last_seq = packet[TCP].seq
|
|
|
|
self.recv_count += 1
|
|
|
|
|
|
def print_short(pkt):
|
|
|
|
global pcount
|
|
|
|
pcount += 1
|
|
sport = 0
|
|
dport = 0
|
|
ptype = "other"
|
|
flags = ""
|
|
|
|
if TCP in pkt:
|
|
dport = pkt[TCP].dport
|
|
sport = pkt[TCP].sport
|
|
flags = " " + str(pkt[TCP].flags)
|
|
ptype = "tcp"
|
|
|
|
if ICMP in pkt:
|
|
ptype = "icmp"
|
|
|
|
if UDP in pkt:
|
|
dport = pkt[UDP].dport
|
|
sport = pkt[UDP].sport
|
|
ptype = "udp"
|
|
|
|
print("~~ " + str(pcount) + '. NetworkGenie ' + ptype + flags + ' pkt from ' + str(pkt[IP].src) + ":" + str(sport) + " -> + " + str(pkt[IP].dst) + ":" + str(dport) + " with length: " + str(len(pkt)))
|
|
|
|
|
|
# TODO: make this function be able to craft full custom packet including
|
|
# source IP, sport, protocol, flags, payload, etc.
|
|
def craft_send_payload(dip, dest_port):
|
|
|
|
payload = ""
|
|
send_pkt = IP(dst=dip, src="10.1.10.4") / TCP(dport=dest_port,sport=40555,flags="S",window=8000)
|
|
|
|
send_bytes = bytes(send_pkt)
|
|
payload = send_bytes
|
|
send_pkt.show()
|
|
|
|
print("debug send payload: " + str(payload))
|
|
|
|
return payload
|
|
|
|
|
|
|
|
|
|
def get_send_payload():
|
|
|
|
payload = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
|
|
|
|
send_pkt = IP(dst="9.9.9.9", src="10.0.0.161") / TCP(dport=80,sport=40404,flags="S") / "AAAAAAAA"
|
|
send_bytes = bytes(send_pkt)
|
|
payload = send_bytes
|
|
|
|
|
|
print("debug send payload: " + str(payload))
|
|
|
|
return payload
|
|
|
|
|
|
|
|
|
|
def send_debug_packet(sniffer, is_custom):
|
|
|
|
debug_port = sniffer.debug_sport
|
|
debug_ip = sniffer.debug_src
|
|
|
|
send_seq = sniffer.last_ack
|
|
send_ack = sniffer.last_seq
|
|
|
|
|
|
if debug_port == 0 or debug_ip == '':
|
|
print("There was no debug source connection to send to")
|
|
return
|
|
|
|
print("sending debug packet to " + str(debug_ip) + ":" + str(debug_port))
|
|
send_payload = ""
|
|
|
|
if is_custom:
|
|
|
|
print("\n\nEnter dest ip for packet..")
|
|
dip = input("Enter IP: ")
|
|
print("\n\nEnter dest port for packet..")
|
|
dport = input("Enter port: ")
|
|
send_payload = craft_send_payload(dip, int(dport))
|
|
|
|
else:
|
|
send_payload = get_send_payload()
|
|
|
|
|
|
packet = IP(dst=debug_ip) / TCP(dport=debug_port, sport=server_port, flags='PA', seq=send_seq, ack=send_ack) / send_payload
|
|
send(packet, iface="ens18")
|
|
print("sent debug packet: ")
|
|
packet.show()
|
|
|
|
|
|
|
|
def main():
|
|
|
|
sniffer = Sniffer(iface='ens18')
|
|
print('starting sniffer..')
|
|
sniffer.start()
|
|
time.sleep(3)
|
|
|
|
|
|
done = False
|
|
|
|
|
|
while not(done):
|
|
|
|
print("Enter action to take..")
|
|
print("1. Keep sniffing")
|
|
print("2. Send test packet back")
|
|
print("3. Craft custom packet to send from genie")
|
|
print("4. Quit")
|
|
|
|
answer = input("Enter answer: ")
|
|
|
|
if answer == "1":
|
|
print("sleeping for sniffer..")
|
|
time.sleep(5)
|
|
elif answer == "2":
|
|
send_debug_packet(sniffer, False)
|
|
elif answer == "3":
|
|
send_debug_packet(sniffer, True)
|
|
elif answer == "4":
|
|
print("ending the sniffer")
|
|
done = True
|
|
|
|
print("user answer was: " + str(answer))
|
|
print("\n")
|
|
time.sleep(1)
|
|
|
|
|
|
|
|
print("\n\nFinished sniffing for debug packets")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|
|
|
|
|