Created
January 17, 2026 11:19
-
-
Save 0xDE57/f04534227576bdf2c07082510ab12d84 to your computer and use it in GitHub Desktop.
simple udp logger for unique messages. specify port.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import socket | |
| import datetime | |
| from collections import defaultdict | |
| collected_data: dict[str, dict[str, dict[str, object]]] = defaultdict(dict) | |
| def save_to_file(data_to_save: dict[str, dict[str, dict[str, object]]]) -> None: | |
| ts = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
| filename = f"udp_log_{ts}.txt" | |
| try: | |
| with open(filename, "w", encoding="utf-8") as file: | |
| for address, payload_map in data_to_save.items(): | |
| file.write(f"{address} says:\n") | |
| for data, meta in payload_map.items(): | |
| first = meta["first_seen"].strftime("%Y-%m-%d %H:%M:%S") | |
| last = meta["last_seen"].strftime("%Y-%m-%d %H:%M:%S") | |
| file.write(f"{data}\ncount: {meta['count']} - first seen: {first} - last seen: {last}\n\n") | |
| file.write("\n") # blank line between devices | |
| print(f"[+] Snapshot written to {filename}") | |
| except Exception as exc: | |
| print(f"[!] Could not write snapshot to {filename}: {exc}") | |
| def start_udp_listener(host, port): | |
| # Create a UDP socket, and bind | |
| sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| server_address = (host, port) | |
| sock.bind(server_address) | |
| print(f"UDP Listener started on {host}:{port}\n") | |
| packets_since_last_save = 0 | |
| auto_save_count = 500 # sd card has low tolerance for frequent writes | |
| try: | |
| while True: | |
| # Receive data from the client | |
| data, address = sock.recvfrom(4096) # Buffer size is 4096 bytes | |
| hex_data = ' '.join(f"{byte:02x}" for byte in data) | |
| ip = address[0] | |
| now = datetime.datetime.now() | |
| #message = data.decode('latin-1') | |
| print(f"{now} - {address} says:\n{hex_data}\n") | |
| # Get the nested map for this IP; create if missing | |
| device_map = collected_data[ip] | |
| if hex_data in device_map: | |
| # Existing payload – bump counter and update last_seen | |
| meta = device_map[hex_data] | |
| meta["count"] += 1 | |
| meta["last_seen"] = now | |
| else: | |
| # New payload – initialise counters and timestamps | |
| device_map[hex_data] = { | |
| "count": 1, | |
| "first_seen": now, | |
| "last_seen": now, | |
| } | |
| packets_since_last_save += 1 | |
| if packets_since_last_save >= auto_save_count: | |
| save_to_file(collected_data) | |
| packets_since_last_save = 0 | |
| except KeyboardInterrupt: | |
| print("UDP Listener stopped.") | |
| except Exception as exc: | |
| print(f"\nError: {exc}") | |
| finally: | |
| sock.close() | |
| save_to_file(collected_data) | |
| if __name__ == "__main__": | |
| host = '0.0.0.0' # Listen on all available interfaces | |
| port = 57621 # Port to listen on | |
| start_udp_listener(host, port) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment