Created
January 19, 2026 15:31
-
-
Save fyxme/1dc1662fddfa231f5fa4d2bf519ec93d to your computer and use it in GitHub Desktop.
A python script that compares scapy, pyshark, and dpkt for parsing wardriving pcap files.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| PCAP Parser Comparison Script | |
| Compares scapy, pyshark, and dpkt for parsing wardriving pcap files. | |
| Measures: parsing speed, memory usage, and extracted data quality. | |
| Install dependencies: | |
| pip install scapy pyshark dpkt psutil | |
| Usage: | |
| python compare_pcap_parsers.py <pcap_file> | |
| python compare_pcap_parsers.py --generate-sample # Creates a sample pcap for testing | |
| """ | |
| import argparse | |
| import multiprocessing | |
| import os | |
| import sys | |
| import time | |
| import traceback | |
| from dataclasses import dataclass, field | |
| from typing import Callable | |
| try: | |
| import psutil | |
| except ImportError: | |
| psutil = None | |
| print("Warning: psutil not installed. Memory tracking disabled.") | |
| print("Install with: pip install psutil") | |
| @dataclass | |
| class BenchmarkResult: | |
| """Results from parsing a pcap file.""" | |
| library: str | |
| success: bool | |
| parse_time: float = 0.0 | |
| memory_mb: float = 0.0 | |
| packet_count: int = 0 | |
| beacon_count: int = 0 | |
| probe_request_count: int = 0 | |
| probe_response_count: int = 0 | |
| unique_ssids: set = field(default_factory=set) | |
| unique_bssids: set = field(default_factory=set) | |
| error: str = "" | |
| def get_memory_mb() -> float: | |
| """Get current process memory usage in MB.""" | |
| if psutil: | |
| process = psutil.Process(os.getpid()) | |
| return process.memory_info().rss / 1024 / 1024 | |
| return 0.0 | |
| def _parser_worker(target_name: str, parser_func: Callable[[str], "BenchmarkResult"], target_pcap: str, | |
| out_conn: "multiprocessing.connection.Connection"): | |
| """Run a parser function and send the result through a pipe.""" | |
| try: | |
| res = parser_func(target_pcap) | |
| except Exception as e: # safety net to always return a result | |
| err = f"{type(e).__name__}: {e}" | |
| res = BenchmarkResult(library=target_name, success=False, error=err) | |
| try: | |
| out_conn.send(res) | |
| finally: | |
| out_conn.close() | |
| def get_process_tree_rss_bytes(proc: "psutil.Process") -> int: | |
| """Return RSS for a process and all its children. Handles short-lived processes.""" | |
| try: | |
| total = proc.memory_info().rss | |
| for child in proc.children(recursive=True): | |
| try: | |
| total += child.memory_info().rss | |
| except psutil.NoSuchProcess: | |
| continue | |
| return total | |
| except psutil.NoSuchProcess: | |
| return 0 | |
| def run_parser_in_subprocess(name: str, parser_func: Callable[[str], "BenchmarkResult"], pcap_path: str, | |
| sample_interval: float = 0.05) -> "BenchmarkResult": | |
| """ | |
| Run a parser in an isolated process and measure peak RSS (including children like tshark). | |
| Falls back to in-process execution when psutil is unavailable. | |
| """ | |
| if not psutil: | |
| # psutil missing; run directly without peak measurement | |
| result = parser_func(pcap_path) | |
| result.memory_mb = max(result.memory_mb, 0.0) | |
| return result | |
| ctx = multiprocessing.get_context("spawn") | |
| parent_conn, child_conn = ctx.Pipe(duplex=False) | |
| proc = ctx.Process(target=_parser_worker, args=(name, parser_func, pcap_path, child_conn)) | |
| proc.start() | |
| peak_rss_bytes = 0 | |
| try: | |
| ps_proc = psutil.Process(proc.pid) | |
| while proc.is_alive(): | |
| peak_rss_bytes = max(peak_rss_bytes, get_process_tree_rss_bytes(ps_proc)) | |
| time.sleep(sample_interval) | |
| # One last sample after the process exits (captures late spikes from short-lived children) | |
| peak_rss_bytes = max(peak_rss_bytes, get_process_tree_rss_bytes(ps_proc)) | |
| except psutil.NoSuchProcess: | |
| pass | |
| proc.join() | |
| try: | |
| result = parent_conn.recv() | |
| except Exception: | |
| result = BenchmarkResult(library=name, success=False, error="Worker failed without returning a result") | |
| finally: | |
| parent_conn.close() | |
| # Override memory measurement with peak RSS observed across the process tree | |
| if peak_rss_bytes: | |
| result.memory_mb = peak_rss_bytes / 1024 / 1024 | |
| return result | |
| # ============================================================================= | |
| # SCAPY PARSER | |
| # ============================================================================= | |
| def parse_with_scapy(pcap_path: str) -> BenchmarkResult: | |
| """Parse pcap using scapy.""" | |
| result = BenchmarkResult(library="scapy", success=False) | |
| try: | |
| from scapy.all import rdpcap, Dot11, Dot11Beacon, Dot11ProbeReq, Dot11ProbeResp, Dot11Elt | |
| mem_before = get_memory_mb() | |
| start_time = time.perf_counter() | |
| packets = rdpcap(pcap_path) | |
| for pkt in packets: | |
| result.packet_count += 1 | |
| if not pkt.haslayer(Dot11): | |
| continue | |
| dot11 = pkt.getlayer(Dot11) | |
| # Extract BSSID based on frame type | |
| if pkt.haslayer(Dot11Beacon): | |
| result.beacon_count += 1 | |
| bssid = dot11.addr2 | |
| if bssid: | |
| result.unique_bssids.add(bssid.upper()) | |
| # Extract SSID from information elements | |
| elt = pkt.getlayer(Dot11Elt) | |
| while elt: | |
| if elt.ID == 0: # SSID element | |
| try: | |
| ssid = elt.info.decode('utf-8', errors='ignore') | |
| if ssid: | |
| result.unique_ssids.add(ssid) | |
| except: | |
| pass | |
| break | |
| elt = elt.payload.getlayer(Dot11Elt) | |
| elif pkt.haslayer(Dot11ProbeReq): | |
| result.probe_request_count += 1 | |
| elif pkt.haslayer(Dot11ProbeResp): | |
| result.probe_response_count += 1 | |
| bssid = dot11.addr2 | |
| if bssid: | |
| result.unique_bssids.add(bssid.upper()) | |
| result.parse_time = time.perf_counter() - start_time | |
| result.memory_mb = get_memory_mb() - mem_before | |
| result.success = True | |
| except ImportError: | |
| result.error = "scapy not installed. Install with: pip install scapy" | |
| except Exception as e: | |
| result.error = f"{type(e).__name__}: {e}" | |
| return result | |
| # ============================================================================= | |
| # PYSHARK PARSER | |
| # ============================================================================= | |
| def parse_with_pyshark(pcap_path: str) -> BenchmarkResult: | |
| """Parse pcap using pyshark (requires tshark/Wireshark installed).""" | |
| result = BenchmarkResult(library="pyshark", success=False) | |
| try: | |
| import pyshark | |
| mem_before = get_memory_mb() | |
| start_time = time.perf_counter() | |
| # Use FileCapture for reading pcap files | |
| cap = pyshark.FileCapture(pcap_path, keep_packets=False) | |
| for pkt in cap: | |
| result.packet_count += 1 | |
| # Check for 802.11 wireless layer | |
| if not hasattr(pkt, 'wlan'): | |
| continue | |
| wlan = pkt.wlan | |
| # Get frame subtype - pyshark exposes this as a string like "8" or "0x0008" | |
| try: | |
| # Try different attribute names used by different tshark versions | |
| subtype_raw = getattr(wlan, 'fc_type_subtype', None) | |
| if subtype_raw is None: | |
| subtype_raw = getattr(wlan, 'subtype', None) | |
| if subtype_raw is None: | |
| continue | |
| # Handle hex strings like "0x0008" or decimal strings like "8" | |
| subtype_str = str(subtype_raw) | |
| if subtype_str.startswith('0x'): | |
| frame_subtype = int(subtype_str, 16) | |
| else: | |
| frame_subtype = int(subtype_str) | |
| except (AttributeError, ValueError): | |
| continue | |
| # Extract BSSID - try multiple attribute names | |
| bssid = None | |
| for attr in ['bssid', 'ta', 'addr2']: | |
| if hasattr(wlan, attr): | |
| bssid = getattr(wlan, attr, None) | |
| if bssid: | |
| break | |
| # Extract SSID - try multiple methods | |
| ssid = None | |
| # Method 1: Check wlan.mgt layer (modern tshark) - SSID is hex-encoded | |
| try: | |
| wlan_mgt = pkt['wlan.mgt'] | |
| hex_ssid = getattr(wlan_mgt, 'wlan_ssid', None) | |
| if hex_ssid: | |
| # Decode hex like '54:65:73:74:4e:65:74:77:6f:72:6b' to 'TestNetwork' | |
| ssid = bytes.fromhex(hex_ssid.replace(':', '')).decode('utf-8', errors='ignore') | |
| except (KeyError, AttributeError, ValueError): | |
| pass | |
| # Method 2: Try direct attributes on wlan layer | |
| if not ssid: | |
| for attr in ['ssid', 'tag_ssid', 'wlan_ssid']: | |
| val = getattr(wlan, attr, None) | |
| if val: | |
| # Check if hex-encoded | |
| if ':' in val and len(val) > 2: | |
| try: | |
| ssid = bytes.fromhex(val.replace(':', '')).decode('utf-8', errors='ignore') | |
| except: | |
| ssid = val | |
| else: | |
| ssid = val | |
| break | |
| # Method 3: Check wlan_mgt layer (older tshark - underscore not dot) | |
| if not ssid and hasattr(pkt, 'wlan_mgt'): | |
| for attr in ['ssid', 'tag_ssid', 'wlan_ssid']: | |
| val = getattr(pkt.wlan_mgt, attr, None) | |
| if val: | |
| if ':' in val and len(val) > 2: | |
| try: | |
| ssid = bytes.fromhex(val.replace(':', '')).decode('utf-8', errors='ignore') | |
| except: | |
| ssid = val | |
| else: | |
| ssid = val | |
| break | |
| # Beacon (subtype 8) | |
| if frame_subtype == 8: | |
| result.beacon_count += 1 | |
| if bssid: | |
| result.unique_bssids.add(bssid.upper()) | |
| if ssid: | |
| result.unique_ssids.add(ssid) | |
| # Probe Request (subtype 4) | |
| elif frame_subtype == 4: | |
| result.probe_request_count += 1 | |
| # Probe Response (subtype 5) | |
| elif frame_subtype == 5: | |
| result.probe_response_count += 1 | |
| if bssid: | |
| result.unique_bssids.add(bssid.upper()) | |
| cap.close() | |
| result.parse_time = time.perf_counter() - start_time | |
| result.memory_mb = get_memory_mb() - mem_before | |
| result.success = True | |
| except ImportError: | |
| result.error = "pyshark not installed. Install with: pip install pyshark (also requires Wireshark/tshark)" | |
| except Exception as e: | |
| result.error = f"{type(e).__name__}: {e}" | |
| return result | |
| # ============================================================================= | |
| # DPKT PARSER | |
| # ============================================================================= | |
| def parse_with_dpkt(pcap_path: str) -> BenchmarkResult: | |
| """Parse pcap using dpkt.""" | |
| result = BenchmarkResult(library="dpkt", success=False) | |
| try: | |
| import dpkt | |
| mem_before = get_memory_mb() | |
| start_time = time.perf_counter() | |
| with open(pcap_path, 'rb') as f: | |
| # Try pcap format first, then pcapng | |
| try: | |
| pcap = dpkt.pcap.Reader(f) | |
| except ValueError: | |
| f.seek(0) | |
| pcap = dpkt.pcapng.Reader(f) | |
| for timestamp, buf in pcap: | |
| result.packet_count += 1 | |
| # Check if this is an 802.11 frame (need radiotap header handling) | |
| try: | |
| # Try to parse as radiotap + 802.11 | |
| if len(buf) < 4: | |
| continue | |
| # Check for radiotap header (starts with 0x00 0x00) | |
| if buf[0:2] == b'\x00\x00': | |
| # Parse radiotap header to get length | |
| radiotap_len = int.from_bytes(buf[2:4], 'little') | |
| if radiotap_len >= len(buf): | |
| continue | |
| ieee80211_buf = buf[radiotap_len:] | |
| else: | |
| ieee80211_buf = buf | |
| if len(ieee80211_buf) < 2: | |
| continue | |
| # Parse 802.11 frame control | |
| frame_control = ieee80211_buf[0:2] | |
| fc_type = (frame_control[0] >> 2) & 0x03 | |
| fc_subtype = (frame_control[0] >> 4) & 0x0f | |
| # Management frames (type 0) | |
| if fc_type == 0: | |
| # Beacon (subtype 8) | |
| if fc_subtype == 8: | |
| result.beacon_count += 1 | |
| if len(ieee80211_buf) >= 24: | |
| # BSSID is at offset 16 (6 bytes) | |
| bssid_bytes = ieee80211_buf[16:22] | |
| bssid = ':'.join(f'{b:02X}' for b in bssid_bytes) | |
| result.unique_bssids.add(bssid) | |
| # Parse tagged parameters for SSID (after fixed params at offset 36) | |
| if len(ieee80211_buf) > 36: | |
| tagged = ieee80211_buf[36:] | |
| ssid = extract_ssid_from_tagged(tagged) | |
| if ssid: | |
| result.unique_ssids.add(ssid) | |
| # Probe Request (subtype 4) | |
| elif fc_subtype == 4: | |
| result.probe_request_count += 1 | |
| # Probe Response (subtype 5) | |
| elif fc_subtype == 5: | |
| result.probe_response_count += 1 | |
| if len(ieee80211_buf) >= 24: | |
| bssid_bytes = ieee80211_buf[16:22] | |
| bssid = ':'.join(f'{b:02X}' for b in bssid_bytes) | |
| result.unique_bssids.add(bssid) | |
| except Exception: | |
| # Skip malformed packets | |
| continue | |
| result.parse_time = time.perf_counter() - start_time | |
| result.memory_mb = get_memory_mb() - mem_before | |
| result.success = True | |
| except ImportError: | |
| result.error = "dpkt not installed. Install with: pip install dpkt" | |
| except Exception as e: | |
| result.error = f"{type(e).__name__}: {e}" | |
| return result | |
| def extract_ssid_from_tagged(tagged: bytes) -> str: | |
| """Extract SSID from 802.11 tagged parameters.""" | |
| offset = 0 | |
| while offset + 2 <= len(tagged): | |
| tag_id = tagged[offset] | |
| tag_len = tagged[offset + 1] | |
| if offset + 2 + tag_len > len(tagged): | |
| break | |
| if tag_id == 0: # SSID | |
| try: | |
| ssid = tagged[offset + 2:offset + 2 + tag_len].decode('utf-8', errors='ignore') | |
| return ssid | |
| except: | |
| return "" | |
| offset += 2 + tag_len | |
| return "" | |
| # ============================================================================= | |
| # SAMPLE PCAP GENERATION | |
| # ============================================================================= | |
| def generate_sample_pcap(output_path: str, num_packets: int = 1000) -> bool: | |
| """Generate a sample pcap file with 802.11 beacon frames for testing.""" | |
| try: | |
| from scapy.all import ( | |
| wrpcap, RadioTap, Dot11, Dot11Beacon, Dot11Elt, | |
| Dot11ProbeReq, Dot11ProbeResp | |
| ) | |
| packets = [] | |
| ssids = [ | |
| "TestNetwork", "CoffeeShop_WiFi", "HomeNetwork", "GuestWiFi", | |
| "SecureNet_5G", "OpenWiFi", "OfficeNet", "IoT_Network" | |
| ] | |
| for i in range(num_packets): | |
| ssid = ssids[i % len(ssids)] | |
| bssid = f"00:11:22:33:44:{i % 256:02X}" | |
| # Vary packet types | |
| pkt_type = i % 10 | |
| if pkt_type < 6: # 60% beacons | |
| pkt = ( | |
| RadioTap() / | |
| Dot11(type=0, subtype=8, addr1="ff:ff:ff:ff:ff:ff", addr2=bssid, addr3=bssid) / | |
| Dot11Beacon(cap="ESS+privacy") / | |
| Dot11Elt(ID=0, info=ssid.encode()) / | |
| Dot11Elt(ID=1, info=b"\x82\x84\x8b\x96\x0c\x12\x18\x24") / | |
| Dot11Elt(ID=3, info=bytes([i % 11 + 1])) | |
| ) | |
| elif pkt_type < 8: # 20% probe requests | |
| pkt = ( | |
| RadioTap() / | |
| Dot11(type=0, subtype=4, addr1="ff:ff:ff:ff:ff:ff", | |
| addr2=f"AA:BB:CC:DD:EE:{i % 256:02X}", addr3="ff:ff:ff:ff:ff:ff") / | |
| Dot11ProbeReq() / | |
| Dot11Elt(ID=0, info=ssid.encode()) | |
| ) | |
| else: # 20% probe responses | |
| pkt = ( | |
| RadioTap() / | |
| Dot11(type=0, subtype=5, addr1=f"AA:BB:CC:DD:EE:{i % 256:02X}", | |
| addr2=bssid, addr3=bssid) / | |
| Dot11ProbeResp(cap="ESS+privacy") / | |
| Dot11Elt(ID=0, info=ssid.encode()) | |
| ) | |
| packets.append(pkt) | |
| wrpcap(output_path, packets) | |
| print(f"Generated sample pcap: {output_path}") | |
| print(f" Packets: {num_packets}") | |
| print(f" Unique SSIDs: {len(ssids)}") | |
| return True | |
| except ImportError: | |
| print("Error: scapy is required to generate sample pcaps") | |
| print("Install with: pip install scapy") | |
| return False | |
| except Exception as e: | |
| print(f"Error generating sample pcap: {e}") | |
| return False | |
| # ============================================================================= | |
| # RESULTS DISPLAY | |
| # ============================================================================= | |
| def print_results(results: list[BenchmarkResult]): | |
| """Print comparison results in a formatted table.""" | |
| print("\n" + "=" * 80) | |
| print(" PCAP PARSER COMPARISON RESULTS") | |
| print("=" * 80) | |
| # Performance table | |
| print("\n### Performance Metrics\n") | |
| print(f"{'Library':<12} {'Status':<10} {'Time (s)':<12} {'Memory (MB)':<14} {'Packets':<10}") | |
| print("-" * 60) | |
| for r in results: | |
| status = "✓ OK" if r.success else "✗ FAIL" | |
| time_str = f"{r.parse_time:.3f}" if r.success else "-" | |
| mem_str = f"{r.memory_mb:.1f}" if r.success else "-" | |
| pkt_str = str(r.packet_count) if r.success else "-" | |
| print(f"{r.library:<12} {status:<10} {time_str:<12} {mem_str:<14} {pkt_str:<10}") | |
| # Extraction results | |
| print("\n### Extracted Data\n") | |
| print(f"{'Library':<12} {'Beacons':<10} {'ProbeReq':<10} {'ProbeResp':<10} {'SSIDs':<10} {'BSSIDs':<10}") | |
| print("-" * 65) | |
| for r in results: | |
| if r.success: | |
| print(f"{r.library:<12} {r.beacon_count:<10} {r.probe_request_count:<10} " | |
| f"{r.probe_response_count:<10} {len(r.unique_ssids):<10} {len(r.unique_bssids):<10}") | |
| else: | |
| print(f"{r.library:<12} {'-':<10} {'-':<10} {'-':<10} {'-':<10} {'-':<10}") | |
| # Show extracted SSIDs from first successful parser | |
| for r in results: | |
| if r.success and r.unique_ssids: | |
| print(f"\n### Sample SSIDs (from {r.library})\n") | |
| for ssid in sorted(r.unique_ssids)[:10]: | |
| print(f" - {ssid or '[Hidden]'}") | |
| if len(r.unique_ssids) > 10: | |
| print(f" ... and {len(r.unique_ssids) - 10} more") | |
| break | |
| # Errors | |
| failed = [r for r in results if not r.success] | |
| if failed: | |
| print("\n### Errors\n") | |
| for r in failed: | |
| print(f" {r.library}: {r.error}") | |
| # Summary | |
| print("\n### Summary\n") | |
| successful = [r for r in results if r.success] | |
| if successful: | |
| # Find all libraries that tie for each category | |
| min_time = min(r.parse_time for r in successful) | |
| fastest = [r for r in successful if r.parse_time == min_time] | |
| min_mem = min(r.memory_mb for r in successful) | |
| lowest_mem = [r for r in successful if r.memory_mb == min_mem] | |
| max_bssids = max(len(r.unique_bssids) for r in successful) | |
| most_data = [r for r in successful if len(r.unique_bssids) == max_bssids] | |
| def format_winners(winners: list, value_fn, value_fmt: str) -> str: | |
| names = ", ".join(r.library for r in winners) | |
| value = value_fmt.format(value_fn(winners[0])) | |
| return f"{names} ({value})" | |
| print(f" Fastest: {format_winners(fastest, lambda r: r.parse_time, '{:.3f}s')}") | |
| print(f" Lowest Memory: {format_winners(lowest_mem, lambda r: r.memory_mb, '{:.1f} MB')}") | |
| print(f" Most BSSIDs: {format_winners(most_data, lambda r: len(r.unique_bssids), '{} found')}") | |
| print("\n" + "=" * 80) | |
| # ============================================================================= | |
| # DEBUG | |
| # ============================================================================= | |
| def debug_pyshark_fields(pcap_path: str): | |
| """Debug pyshark field names by inspecting the first beacon packet.""" | |
| try: | |
| import pyshark | |
| except ImportError: | |
| print("pyshark not installed") | |
| return | |
| print("\n### Debugging pyshark fields ###\n") | |
| cap = pyshark.FileCapture(pcap_path) | |
| for pkt in cap: | |
| if not hasattr(pkt, 'wlan'): | |
| continue | |
| wlan = pkt.wlan | |
| # Check if this is a beacon | |
| try: | |
| subtype_raw = getattr(wlan, 'fc_type_subtype', None) | |
| if subtype_raw is None: | |
| continue | |
| subtype_str = str(subtype_raw) | |
| if subtype_str.startswith('0x'): | |
| frame_subtype = int(subtype_str, 16) | |
| else: | |
| frame_subtype = int(subtype_str) | |
| if frame_subtype != 8: | |
| continue | |
| except: | |
| continue | |
| print("Found beacon packet!") | |
| print(f"\nPacket layers: {[layer.layer_name for layer in pkt.layers]}") | |
| print("\n--- wlan layer attributes ---") | |
| for attr in sorted(dir(wlan)): | |
| if not attr.startswith('_'): | |
| try: | |
| val = getattr(wlan, attr) | |
| if not callable(val): | |
| print(f" wlan.{attr} = {repr(val)[:80]}") | |
| except: | |
| pass | |
| # Check for wlan_mgt layer | |
| if hasattr(pkt, 'wlan_mgt'): | |
| print("\n--- wlan_mgt layer attributes ---") | |
| for attr in sorted(dir(pkt.wlan_mgt)): | |
| if not attr.startswith('_'): | |
| try: | |
| val = getattr(pkt.wlan_mgt, attr) | |
| if not callable(val): | |
| print(f" wlan_mgt.{attr} = {repr(val)[:80]}") | |
| except: | |
| pass | |
| # Try to find anything with 'ssid' in it | |
| print("\n--- Fields containing 'ssid' ---") | |
| for layer in pkt.layers: | |
| for attr in dir(layer): | |
| if 'ssid' in attr.lower() and not attr.startswith('_'): | |
| try: | |
| val = getattr(layer, attr) | |
| if not callable(val): | |
| print(f" {layer.layer_name}.{attr} = {repr(val)}") | |
| except: | |
| pass | |
| cap.close() | |
| return | |
| cap.close() | |
| print("No beacon packets found") | |
| # Mapping used by the subprocess runner | |
| PARSERS: dict[str, Callable[[str], BenchmarkResult]] = { | |
| "scapy": parse_with_scapy, | |
| "pyshark": parse_with_pyshark, | |
| "dpkt": parse_with_dpkt, | |
| } | |
| # ============================================================================= | |
| # MAIN | |
| # ============================================================================= | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Compare Python pcap parsing libraries for wardriving data" | |
| ) | |
| parser.add_argument( | |
| "pcap_file", | |
| nargs="?", | |
| help="Path to pcap file to parse" | |
| ) | |
| parser.add_argument( | |
| "--generate-sample", | |
| action="store_true", | |
| help="Generate a sample pcap file for testing" | |
| ) | |
| parser.add_argument( | |
| "--sample-packets", | |
| type=int, | |
| default=1000, | |
| help="Number of packets to generate in sample (default: 1000)" | |
| ) | |
| parser.add_argument( | |
| "--output", | |
| "-o", | |
| default="sample_wardriving.pcap", | |
| help="Output path for generated sample pcap" | |
| ) | |
| parser.add_argument( | |
| "--debug-pyshark", | |
| action="store_true", | |
| help="Debug pyshark field names on first beacon packet" | |
| ) | |
| args = parser.parse_args() | |
| if args.generate_sample: | |
| success = generate_sample_pcap(args.output, args.sample_packets) | |
| if success and not args.pcap_file: | |
| args.pcap_file = args.output | |
| elif not success: | |
| sys.exit(1) | |
| if not args.pcap_file: | |
| parser.print_help() | |
| print("\nError: Please provide a pcap file or use --generate-sample") | |
| sys.exit(1) | |
| if not os.path.exists(args.pcap_file): | |
| print(f"Error: File not found: {args.pcap_file}") | |
| sys.exit(1) | |
| file_size_mb = os.path.getsize(args.pcap_file) / 1024 / 1024 | |
| print(f"\nParsing: {args.pcap_file} ({file_size_mb:.2f} MB)") | |
| print("-" * 40) | |
| # Debug pyshark fields if requested | |
| if args.debug_pyshark: | |
| debug_pyshark_fields(args.pcap_file) | |
| sys.exit(0) | |
| # Run all parsers | |
| results = [] | |
| for name, parser_func in PARSERS.items(): | |
| print(f"Testing {name}...", end=" ", flush=True) | |
| result = run_parser_in_subprocess(name, parser_func, args.pcap_file) | |
| status = "✓" if result.success else "✗" | |
| print(f"{status}") | |
| results.append(result) | |
| print_results(results) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment