Skip to content

Instantly share code, notes, and snippets.

@fyxme
Created January 19, 2026 15:31
Show Gist options
  • Select an option

  • Save fyxme/1dc1662fddfa231f5fa4d2bf519ec93d to your computer and use it in GitHub Desktop.

Select an option

Save fyxme/1dc1662fddfa231f5fa4d2bf519ec93d to your computer and use it in GitHub Desktop.
A python script that compares scapy, pyshark, and dpkt for parsing wardriving pcap files.
#!/usr/bin/env python3
"""
PCAP Parser Comparison Script
Compares scapy, pyshark, and dpkt for parsing wardriving pcap files.
Measures: parsing speed, memory usage, and extracted data quality.
Install dependencies:
pip install scapy pyshark dpkt psutil
Usage:
python compare_pcap_parsers.py <pcap_file>
python compare_pcap_parsers.py --generate-sample # Creates a sample pcap for testing
"""
import argparse
import multiprocessing
import os
import sys
import time
import traceback
from dataclasses import dataclass, field
from typing import Callable
try:
import psutil
except ImportError:
psutil = None
print("Warning: psutil not installed. Memory tracking disabled.")
print("Install with: pip install psutil")
@dataclass
class BenchmarkResult:
"""Results from parsing a pcap file."""
library: str
success: bool
parse_time: float = 0.0
memory_mb: float = 0.0
packet_count: int = 0
beacon_count: int = 0
probe_request_count: int = 0
probe_response_count: int = 0
unique_ssids: set = field(default_factory=set)
unique_bssids: set = field(default_factory=set)
error: str = ""
def get_memory_mb() -> float:
"""Get current process memory usage in MB."""
if psutil:
process = psutil.Process(os.getpid())
return process.memory_info().rss / 1024 / 1024
return 0.0
def _parser_worker(target_name: str, parser_func: Callable[[str], "BenchmarkResult"], target_pcap: str,
out_conn: "multiprocessing.connection.Connection"):
"""Run a parser function and send the result through a pipe."""
try:
res = parser_func(target_pcap)
except Exception as e: # safety net to always return a result
err = f"{type(e).__name__}: {e}"
res = BenchmarkResult(library=target_name, success=False, error=err)
try:
out_conn.send(res)
finally:
out_conn.close()
def get_process_tree_rss_bytes(proc: "psutil.Process") -> int:
"""Return RSS for a process and all its children. Handles short-lived processes."""
try:
total = proc.memory_info().rss
for child in proc.children(recursive=True):
try:
total += child.memory_info().rss
except psutil.NoSuchProcess:
continue
return total
except psutil.NoSuchProcess:
return 0
def run_parser_in_subprocess(name: str, parser_func: Callable[[str], "BenchmarkResult"], pcap_path: str,
sample_interval: float = 0.05) -> "BenchmarkResult":
"""
Run a parser in an isolated process and measure peak RSS (including children like tshark).
Falls back to in-process execution when psutil is unavailable.
"""
if not psutil:
# psutil missing; run directly without peak measurement
result = parser_func(pcap_path)
result.memory_mb = max(result.memory_mb, 0.0)
return result
ctx = multiprocessing.get_context("spawn")
parent_conn, child_conn = ctx.Pipe(duplex=False)
proc = ctx.Process(target=_parser_worker, args=(name, parser_func, pcap_path, child_conn))
proc.start()
peak_rss_bytes = 0
try:
ps_proc = psutil.Process(proc.pid)
while proc.is_alive():
peak_rss_bytes = max(peak_rss_bytes, get_process_tree_rss_bytes(ps_proc))
time.sleep(sample_interval)
# One last sample after the process exits (captures late spikes from short-lived children)
peak_rss_bytes = max(peak_rss_bytes, get_process_tree_rss_bytes(ps_proc))
except psutil.NoSuchProcess:
pass
proc.join()
try:
result = parent_conn.recv()
except Exception:
result = BenchmarkResult(library=name, success=False, error="Worker failed without returning a result")
finally:
parent_conn.close()
# Override memory measurement with peak RSS observed across the process tree
if peak_rss_bytes:
result.memory_mb = peak_rss_bytes / 1024 / 1024
return result
# =============================================================================
# SCAPY PARSER
# =============================================================================
def parse_with_scapy(pcap_path: str) -> BenchmarkResult:
"""Parse pcap using scapy."""
result = BenchmarkResult(library="scapy", success=False)
try:
from scapy.all import rdpcap, Dot11, Dot11Beacon, Dot11ProbeReq, Dot11ProbeResp, Dot11Elt
mem_before = get_memory_mb()
start_time = time.perf_counter()
packets = rdpcap(pcap_path)
for pkt in packets:
result.packet_count += 1
if not pkt.haslayer(Dot11):
continue
dot11 = pkt.getlayer(Dot11)
# Extract BSSID based on frame type
if pkt.haslayer(Dot11Beacon):
result.beacon_count += 1
bssid = dot11.addr2
if bssid:
result.unique_bssids.add(bssid.upper())
# Extract SSID from information elements
elt = pkt.getlayer(Dot11Elt)
while elt:
if elt.ID == 0: # SSID element
try:
ssid = elt.info.decode('utf-8', errors='ignore')
if ssid:
result.unique_ssids.add(ssid)
except:
pass
break
elt = elt.payload.getlayer(Dot11Elt)
elif pkt.haslayer(Dot11ProbeReq):
result.probe_request_count += 1
elif pkt.haslayer(Dot11ProbeResp):
result.probe_response_count += 1
bssid = dot11.addr2
if bssid:
result.unique_bssids.add(bssid.upper())
result.parse_time = time.perf_counter() - start_time
result.memory_mb = get_memory_mb() - mem_before
result.success = True
except ImportError:
result.error = "scapy not installed. Install with: pip install scapy"
except Exception as e:
result.error = f"{type(e).__name__}: {e}"
return result
# =============================================================================
# PYSHARK PARSER
# =============================================================================
def parse_with_pyshark(pcap_path: str) -> BenchmarkResult:
"""Parse pcap using pyshark (requires tshark/Wireshark installed)."""
result = BenchmarkResult(library="pyshark", success=False)
try:
import pyshark
mem_before = get_memory_mb()
start_time = time.perf_counter()
# Use FileCapture for reading pcap files
cap = pyshark.FileCapture(pcap_path, keep_packets=False)
for pkt in cap:
result.packet_count += 1
# Check for 802.11 wireless layer
if not hasattr(pkt, 'wlan'):
continue
wlan = pkt.wlan
# Get frame subtype - pyshark exposes this as a string like "8" or "0x0008"
try:
# Try different attribute names used by different tshark versions
subtype_raw = getattr(wlan, 'fc_type_subtype', None)
if subtype_raw is None:
subtype_raw = getattr(wlan, 'subtype', None)
if subtype_raw is None:
continue
# Handle hex strings like "0x0008" or decimal strings like "8"
subtype_str = str(subtype_raw)
if subtype_str.startswith('0x'):
frame_subtype = int(subtype_str, 16)
else:
frame_subtype = int(subtype_str)
except (AttributeError, ValueError):
continue
# Extract BSSID - try multiple attribute names
bssid = None
for attr in ['bssid', 'ta', 'addr2']:
if hasattr(wlan, attr):
bssid = getattr(wlan, attr, None)
if bssid:
break
# Extract SSID - try multiple methods
ssid = None
# Method 1: Check wlan.mgt layer (modern tshark) - SSID is hex-encoded
try:
wlan_mgt = pkt['wlan.mgt']
hex_ssid = getattr(wlan_mgt, 'wlan_ssid', None)
if hex_ssid:
# Decode hex like '54:65:73:74:4e:65:74:77:6f:72:6b' to 'TestNetwork'
ssid = bytes.fromhex(hex_ssid.replace(':', '')).decode('utf-8', errors='ignore')
except (KeyError, AttributeError, ValueError):
pass
# Method 2: Try direct attributes on wlan layer
if not ssid:
for attr in ['ssid', 'tag_ssid', 'wlan_ssid']:
val = getattr(wlan, attr, None)
if val:
# Check if hex-encoded
if ':' in val and len(val) > 2:
try:
ssid = bytes.fromhex(val.replace(':', '')).decode('utf-8', errors='ignore')
except:
ssid = val
else:
ssid = val
break
# Method 3: Check wlan_mgt layer (older tshark - underscore not dot)
if not ssid and hasattr(pkt, 'wlan_mgt'):
for attr in ['ssid', 'tag_ssid', 'wlan_ssid']:
val = getattr(pkt.wlan_mgt, attr, None)
if val:
if ':' in val and len(val) > 2:
try:
ssid = bytes.fromhex(val.replace(':', '')).decode('utf-8', errors='ignore')
except:
ssid = val
else:
ssid = val
break
# Beacon (subtype 8)
if frame_subtype == 8:
result.beacon_count += 1
if bssid:
result.unique_bssids.add(bssid.upper())
if ssid:
result.unique_ssids.add(ssid)
# Probe Request (subtype 4)
elif frame_subtype == 4:
result.probe_request_count += 1
# Probe Response (subtype 5)
elif frame_subtype == 5:
result.probe_response_count += 1
if bssid:
result.unique_bssids.add(bssid.upper())
cap.close()
result.parse_time = time.perf_counter() - start_time
result.memory_mb = get_memory_mb() - mem_before
result.success = True
except ImportError:
result.error = "pyshark not installed. Install with: pip install pyshark (also requires Wireshark/tshark)"
except Exception as e:
result.error = f"{type(e).__name__}: {e}"
return result
# =============================================================================
# DPKT PARSER
# =============================================================================
def parse_with_dpkt(pcap_path: str) -> BenchmarkResult:
"""Parse pcap using dpkt."""
result = BenchmarkResult(library="dpkt", success=False)
try:
import dpkt
mem_before = get_memory_mb()
start_time = time.perf_counter()
with open(pcap_path, 'rb') as f:
# Try pcap format first, then pcapng
try:
pcap = dpkt.pcap.Reader(f)
except ValueError:
f.seek(0)
pcap = dpkt.pcapng.Reader(f)
for timestamp, buf in pcap:
result.packet_count += 1
# Check if this is an 802.11 frame (need radiotap header handling)
try:
# Try to parse as radiotap + 802.11
if len(buf) < 4:
continue
# Check for radiotap header (starts with 0x00 0x00)
if buf[0:2] == b'\x00\x00':
# Parse radiotap header to get length
radiotap_len = int.from_bytes(buf[2:4], 'little')
if radiotap_len >= len(buf):
continue
ieee80211_buf = buf[radiotap_len:]
else:
ieee80211_buf = buf
if len(ieee80211_buf) < 2:
continue
# Parse 802.11 frame control
frame_control = ieee80211_buf[0:2]
fc_type = (frame_control[0] >> 2) & 0x03
fc_subtype = (frame_control[0] >> 4) & 0x0f
# Management frames (type 0)
if fc_type == 0:
# Beacon (subtype 8)
if fc_subtype == 8:
result.beacon_count += 1
if len(ieee80211_buf) >= 24:
# BSSID is at offset 16 (6 bytes)
bssid_bytes = ieee80211_buf[16:22]
bssid = ':'.join(f'{b:02X}' for b in bssid_bytes)
result.unique_bssids.add(bssid)
# Parse tagged parameters for SSID (after fixed params at offset 36)
if len(ieee80211_buf) > 36:
tagged = ieee80211_buf[36:]
ssid = extract_ssid_from_tagged(tagged)
if ssid:
result.unique_ssids.add(ssid)
# Probe Request (subtype 4)
elif fc_subtype == 4:
result.probe_request_count += 1
# Probe Response (subtype 5)
elif fc_subtype == 5:
result.probe_response_count += 1
if len(ieee80211_buf) >= 24:
bssid_bytes = ieee80211_buf[16:22]
bssid = ':'.join(f'{b:02X}' for b in bssid_bytes)
result.unique_bssids.add(bssid)
except Exception:
# Skip malformed packets
continue
result.parse_time = time.perf_counter() - start_time
result.memory_mb = get_memory_mb() - mem_before
result.success = True
except ImportError:
result.error = "dpkt not installed. Install with: pip install dpkt"
except Exception as e:
result.error = f"{type(e).__name__}: {e}"
return result
def extract_ssid_from_tagged(tagged: bytes) -> str:
"""Extract SSID from 802.11 tagged parameters."""
offset = 0
while offset + 2 <= len(tagged):
tag_id = tagged[offset]
tag_len = tagged[offset + 1]
if offset + 2 + tag_len > len(tagged):
break
if tag_id == 0: # SSID
try:
ssid = tagged[offset + 2:offset + 2 + tag_len].decode('utf-8', errors='ignore')
return ssid
except:
return ""
offset += 2 + tag_len
return ""
# =============================================================================
# SAMPLE PCAP GENERATION
# =============================================================================
def generate_sample_pcap(output_path: str, num_packets: int = 1000) -> bool:
"""Generate a sample pcap file with 802.11 beacon frames for testing."""
try:
from scapy.all import (
wrpcap, RadioTap, Dot11, Dot11Beacon, Dot11Elt,
Dot11ProbeReq, Dot11ProbeResp
)
packets = []
ssids = [
"TestNetwork", "CoffeeShop_WiFi", "HomeNetwork", "GuestWiFi",
"SecureNet_5G", "OpenWiFi", "OfficeNet", "IoT_Network"
]
for i in range(num_packets):
ssid = ssids[i % len(ssids)]
bssid = f"00:11:22:33:44:{i % 256:02X}"
# Vary packet types
pkt_type = i % 10
if pkt_type < 6: # 60% beacons
pkt = (
RadioTap() /
Dot11(type=0, subtype=8, addr1="ff:ff:ff:ff:ff:ff", addr2=bssid, addr3=bssid) /
Dot11Beacon(cap="ESS+privacy") /
Dot11Elt(ID=0, info=ssid.encode()) /
Dot11Elt(ID=1, info=b"\x82\x84\x8b\x96\x0c\x12\x18\x24") /
Dot11Elt(ID=3, info=bytes([i % 11 + 1]))
)
elif pkt_type < 8: # 20% probe requests
pkt = (
RadioTap() /
Dot11(type=0, subtype=4, addr1="ff:ff:ff:ff:ff:ff",
addr2=f"AA:BB:CC:DD:EE:{i % 256:02X}", addr3="ff:ff:ff:ff:ff:ff") /
Dot11ProbeReq() /
Dot11Elt(ID=0, info=ssid.encode())
)
else: # 20% probe responses
pkt = (
RadioTap() /
Dot11(type=0, subtype=5, addr1=f"AA:BB:CC:DD:EE:{i % 256:02X}",
addr2=bssid, addr3=bssid) /
Dot11ProbeResp(cap="ESS+privacy") /
Dot11Elt(ID=0, info=ssid.encode())
)
packets.append(pkt)
wrpcap(output_path, packets)
print(f"Generated sample pcap: {output_path}")
print(f" Packets: {num_packets}")
print(f" Unique SSIDs: {len(ssids)}")
return True
except ImportError:
print("Error: scapy is required to generate sample pcaps")
print("Install with: pip install scapy")
return False
except Exception as e:
print(f"Error generating sample pcap: {e}")
return False
# =============================================================================
# RESULTS DISPLAY
# =============================================================================
def print_results(results: list[BenchmarkResult]):
"""Print comparison results in a formatted table."""
print("\n" + "=" * 80)
print(" PCAP PARSER COMPARISON RESULTS")
print("=" * 80)
# Performance table
print("\n### Performance Metrics\n")
print(f"{'Library':<12} {'Status':<10} {'Time (s)':<12} {'Memory (MB)':<14} {'Packets':<10}")
print("-" * 60)
for r in results:
status = "✓ OK" if r.success else "✗ FAIL"
time_str = f"{r.parse_time:.3f}" if r.success else "-"
mem_str = f"{r.memory_mb:.1f}" if r.success else "-"
pkt_str = str(r.packet_count) if r.success else "-"
print(f"{r.library:<12} {status:<10} {time_str:<12} {mem_str:<14} {pkt_str:<10}")
# Extraction results
print("\n### Extracted Data\n")
print(f"{'Library':<12} {'Beacons':<10} {'ProbeReq':<10} {'ProbeResp':<10} {'SSIDs':<10} {'BSSIDs':<10}")
print("-" * 65)
for r in results:
if r.success:
print(f"{r.library:<12} {r.beacon_count:<10} {r.probe_request_count:<10} "
f"{r.probe_response_count:<10} {len(r.unique_ssids):<10} {len(r.unique_bssids):<10}")
else:
print(f"{r.library:<12} {'-':<10} {'-':<10} {'-':<10} {'-':<10} {'-':<10}")
# Show extracted SSIDs from first successful parser
for r in results:
if r.success and r.unique_ssids:
print(f"\n### Sample SSIDs (from {r.library})\n")
for ssid in sorted(r.unique_ssids)[:10]:
print(f" - {ssid or '[Hidden]'}")
if len(r.unique_ssids) > 10:
print(f" ... and {len(r.unique_ssids) - 10} more")
break
# Errors
failed = [r for r in results if not r.success]
if failed:
print("\n### Errors\n")
for r in failed:
print(f" {r.library}: {r.error}")
# Summary
print("\n### Summary\n")
successful = [r for r in results if r.success]
if successful:
# Find all libraries that tie for each category
min_time = min(r.parse_time for r in successful)
fastest = [r for r in successful if r.parse_time == min_time]
min_mem = min(r.memory_mb for r in successful)
lowest_mem = [r for r in successful if r.memory_mb == min_mem]
max_bssids = max(len(r.unique_bssids) for r in successful)
most_data = [r for r in successful if len(r.unique_bssids) == max_bssids]
def format_winners(winners: list, value_fn, value_fmt: str) -> str:
names = ", ".join(r.library for r in winners)
value = value_fmt.format(value_fn(winners[0]))
return f"{names} ({value})"
print(f" Fastest: {format_winners(fastest, lambda r: r.parse_time, '{:.3f}s')}")
print(f" Lowest Memory: {format_winners(lowest_mem, lambda r: r.memory_mb, '{:.1f} MB')}")
print(f" Most BSSIDs: {format_winners(most_data, lambda r: len(r.unique_bssids), '{} found')}")
print("\n" + "=" * 80)
# =============================================================================
# DEBUG
# =============================================================================
def debug_pyshark_fields(pcap_path: str):
"""Debug pyshark field names by inspecting the first beacon packet."""
try:
import pyshark
except ImportError:
print("pyshark not installed")
return
print("\n### Debugging pyshark fields ###\n")
cap = pyshark.FileCapture(pcap_path)
for pkt in cap:
if not hasattr(pkt, 'wlan'):
continue
wlan = pkt.wlan
# Check if this is a beacon
try:
subtype_raw = getattr(wlan, 'fc_type_subtype', None)
if subtype_raw is None:
continue
subtype_str = str(subtype_raw)
if subtype_str.startswith('0x'):
frame_subtype = int(subtype_str, 16)
else:
frame_subtype = int(subtype_str)
if frame_subtype != 8:
continue
except:
continue
print("Found beacon packet!")
print(f"\nPacket layers: {[layer.layer_name for layer in pkt.layers]}")
print("\n--- wlan layer attributes ---")
for attr in sorted(dir(wlan)):
if not attr.startswith('_'):
try:
val = getattr(wlan, attr)
if not callable(val):
print(f" wlan.{attr} = {repr(val)[:80]}")
except:
pass
# Check for wlan_mgt layer
if hasattr(pkt, 'wlan_mgt'):
print("\n--- wlan_mgt layer attributes ---")
for attr in sorted(dir(pkt.wlan_mgt)):
if not attr.startswith('_'):
try:
val = getattr(pkt.wlan_mgt, attr)
if not callable(val):
print(f" wlan_mgt.{attr} = {repr(val)[:80]}")
except:
pass
# Try to find anything with 'ssid' in it
print("\n--- Fields containing 'ssid' ---")
for layer in pkt.layers:
for attr in dir(layer):
if 'ssid' in attr.lower() and not attr.startswith('_'):
try:
val = getattr(layer, attr)
if not callable(val):
print(f" {layer.layer_name}.{attr} = {repr(val)}")
except:
pass
cap.close()
return
cap.close()
print("No beacon packets found")
# Mapping used by the subprocess runner
PARSERS: dict[str, Callable[[str], BenchmarkResult]] = {
"scapy": parse_with_scapy,
"pyshark": parse_with_pyshark,
"dpkt": parse_with_dpkt,
}
# =============================================================================
# MAIN
# =============================================================================
def main():
parser = argparse.ArgumentParser(
description="Compare Python pcap parsing libraries for wardriving data"
)
parser.add_argument(
"pcap_file",
nargs="?",
help="Path to pcap file to parse"
)
parser.add_argument(
"--generate-sample",
action="store_true",
help="Generate a sample pcap file for testing"
)
parser.add_argument(
"--sample-packets",
type=int,
default=1000,
help="Number of packets to generate in sample (default: 1000)"
)
parser.add_argument(
"--output",
"-o",
default="sample_wardriving.pcap",
help="Output path for generated sample pcap"
)
parser.add_argument(
"--debug-pyshark",
action="store_true",
help="Debug pyshark field names on first beacon packet"
)
args = parser.parse_args()
if args.generate_sample:
success = generate_sample_pcap(args.output, args.sample_packets)
if success and not args.pcap_file:
args.pcap_file = args.output
elif not success:
sys.exit(1)
if not args.pcap_file:
parser.print_help()
print("\nError: Please provide a pcap file or use --generate-sample")
sys.exit(1)
if not os.path.exists(args.pcap_file):
print(f"Error: File not found: {args.pcap_file}")
sys.exit(1)
file_size_mb = os.path.getsize(args.pcap_file) / 1024 / 1024
print(f"\nParsing: {args.pcap_file} ({file_size_mb:.2f} MB)")
print("-" * 40)
# Debug pyshark fields if requested
if args.debug_pyshark:
debug_pyshark_fields(args.pcap_file)
sys.exit(0)
# Run all parsers
results = []
for name, parser_func in PARSERS.items():
print(f"Testing {name}...", end=" ", flush=True)
result = run_parser_in_subprocess(name, parser_func, args.pcap_file)
status = "✓" if result.success else "✗"
print(f"{status}")
results.append(result)
print_results(results)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment