Created
January 21, 2026 17:51
-
-
Save HarryR/93c07f679370849e5e32fe7e12aa7610 to your computer and use it in GitHub Desktop.
Use the Google Identity Key and pairing date to monitor your FMDN device
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| BLE Advertisement Monitor for FMDN Device Debugging | |
| Monitors all BLE advertisements and tracks devices broadcasting FMDN/Eddystone | |
| service data. Helps identify what data is being broadcast and timing patterns. | |
| Usage: | |
| python ble_monitor.py [--identity-key KEY] [--pair-date TIMESTAMP] [--mac MAC] | |
| Requirements: | |
| pip install bleak | |
| """ | |
| import argparse | |
| import asyncio | |
| import struct | |
| import sys | |
| from collections import defaultdict | |
| from dataclasses import dataclass, field | |
| from datetime import datetime | |
| from typing import Optional | |
| try: | |
| from bleak import BleakScanner | |
| from bleak.backends.device import BLEDevice | |
| from bleak.backends.scanner import AdvertisementData | |
| except ImportError: | |
| print("Error: bleak not installed. Run: pip install bleak") | |
| sys.exit(1) | |
| # Service UUIDs | |
| FMDN_SERVICE_UUID = "0000fcaf-0000-1000-8000-00805f9b34fb" | |
| EDDYSTONE_SERVICE_UUID = "0000feaa-0000-1000-8000-00805f9b34fb" | |
| # ANSI colors for terminal output | |
| class Colors: | |
| RESET = "\033[0m" | |
| RED = "\033[91m" | |
| GREEN = "\033[92m" | |
| YELLOW = "\033[93m" | |
| BLUE = "\033[94m" | |
| MAGENTA = "\033[95m" | |
| CYAN = "\033[96m" | |
| GRAY = "\033[90m" | |
| BOLD = "\033[1m" | |
| @dataclass | |
| class DeviceStats: | |
| """Statistics for a tracked device.""" | |
| first_seen: float = 0.0 | |
| last_seen: float = 0.0 | |
| advert_count: int = 0 | |
| rssi_samples: list = field(default_factory=list) | |
| intervals: list = field(default_factory=list) | |
| frame_types: dict = field(default_factory=lambda: defaultdict(int)) | |
| service_uuids: set = field(default_factory=set) | |
| gaps: list = field(default_factory=list) # Gaps > 5 seconds | |
| class BleMonitor: | |
| def __init__( | |
| self, | |
| identity_key: Optional[str] = None, | |
| pair_date: Optional[int] = None, | |
| target_mac: Optional[str] = None, | |
| ): | |
| self.identity_key = identity_key | |
| self.pair_date = pair_date | |
| self.target_mac = target_mac.upper() if target_mac else None | |
| self.devices: dict[str, DeviceStats] = defaultdict(DeviceStats) | |
| self.fmdn_devices: set[str] = set() # MACs that have sent FMDN/Eddystone | |
| self.start_time = datetime.now() | |
| self.total_adverts = 0 | |
| # EID matching setup | |
| self.expected_eids: set[bytes] = set() | |
| if identity_key and pair_date: | |
| self._setup_eid_matching() | |
| def _setup_eid_matching(self): | |
| """Compute expected EIDs if identity key and pair date provided.""" | |
| try: | |
| from eid_generator import EidGenerator | |
| key_bytes = bytes.fromhex(self.identity_key) | |
| # Generate EIDs for ±7 windows (±2 hours) | |
| for offset in range(-7, 8): | |
| eid = EidGenerator.compute_eid(key_bytes, self.pair_date, offset) | |
| self.expected_eids.add(eid) | |
| print(f"{Colors.GREEN}Loaded {len(self.expected_eids)} expected EIDs for matching{Colors.RESET}") | |
| except ImportError: | |
| print(f"{Colors.YELLOW}Note: eid_generator.py not found, EID matching disabled{Colors.RESET}") | |
| except Exception as e: | |
| print(f"{Colors.RED}Error setting up EID matching: {e}{Colors.RESET}") | |
| def _parse_frame_type(self, service_data: bytes) -> tuple[int, Optional[bytes]]: | |
| """Parse FMDN/Eddystone frame type and extract EID if present.""" | |
| if not service_data: | |
| return -1, None | |
| frame_type = service_data[0] | |
| eid = None | |
| if len(service_data) >= 21: | |
| eid = service_data[1:21] | |
| return frame_type, eid | |
| def _format_bytes(self, data: bytes) -> str: | |
| """Format bytes as hex string.""" | |
| return data.hex() | |
| def _format_rssi_bar(self, rssi: int) -> str: | |
| """Visual RSSI bar.""" | |
| # RSSI typically ranges from -100 (weak) to -30 (strong) | |
| normalized = max(0, min(100, (rssi + 100))) | |
| bars = normalized // 10 | |
| return f"[{'█' * bars}{'░' * (10 - bars)}]" | |
| def _get_frame_type_name(self, frame_type: int) -> str: | |
| """Human-readable frame type name.""" | |
| if frame_type == 0x00: | |
| return "Near Owner" | |
| elif frame_type == 0x40: | |
| return "Standard EID" | |
| elif frame_type == 0x41: | |
| return "Unwinder EID" | |
| elif frame_type == 0x10: | |
| return "Eddystone-UID" | |
| elif frame_type == 0x20: | |
| return "Eddystone-URL" | |
| elif frame_type == 0x30: | |
| return "Eddystone-TLM" | |
| else: | |
| return f"Unknown (0x{frame_type:02x})" | |
| def _check_eid_match(self, eid: bytes) -> bool: | |
| """Check if EID matches expected values.""" | |
| if not self.expected_eids: | |
| return False | |
| return eid in self.expected_eids | |
| def detection_callback(self, device: BLEDevice, advertisement_data: AdvertisementData): | |
| """Called for each BLE advertisement detected.""" | |
| now = asyncio.get_event_loop().time() | |
| mac = device.address.upper() | |
| self.total_adverts += 1 | |
| # Get service data | |
| service_data = advertisement_data.service_data or {} | |
| fmdn_data = service_data.get(FMDN_SERVICE_UUID) | |
| eddystone_data = service_data.get(EDDYSTONE_SERVICE_UUID) | |
| is_fmdn = fmdn_data is not None or eddystone_data is not None | |
| # Filter by target MAC if specified | |
| if self.target_mac and mac != self.target_mac and not is_fmdn: | |
| return | |
| # Update device stats | |
| stats = self.devices[mac] | |
| if stats.first_seen == 0: | |
| stats.first_seen = now | |
| # Track intervals and gaps | |
| if stats.last_seen > 0: | |
| interval = now - stats.last_seen | |
| stats.intervals.append(interval) | |
| if interval > 5.0: # Gap > 5 seconds | |
| stats.gaps.append((stats.last_seen, now, interval)) | |
| stats.last_seen = now | |
| stats.advert_count += 1 | |
| stats.rssi_samples.append(advertisement_data.rssi) | |
| # Track service UUIDs | |
| for uuid in service_data.keys(): | |
| stats.service_uuids.add(uuid) | |
| # Process FMDN/Eddystone data | |
| if is_fmdn: | |
| self.fmdn_devices.add(mac) | |
| data = fmdn_data or eddystone_data | |
| service_type = "FMDN" if fmdn_data else "Eddystone" | |
| frame_type, eid = self._parse_frame_type(data) | |
| stats.frame_types[frame_type] += 1 | |
| # Check EID match | |
| eid_match = False | |
| if eid and self.expected_eids: | |
| eid_match = self._check_eid_match(eid) | |
| # Print detailed output | |
| timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3] | |
| rssi_bar = self._format_rssi_bar(advertisement_data.rssi) | |
| frame_name = self._get_frame_type_name(frame_type) | |
| # Color based on match status | |
| if eid_match: | |
| color = Colors.GREEN | |
| match_indicator = " ✓ MATCH" | |
| elif self.target_mac and mac == self.target_mac: | |
| color = Colors.CYAN | |
| match_indicator = "" | |
| else: | |
| color = Colors.YELLOW | |
| match_indicator = "" | |
| print(f"{color}{timestamp} {mac} {rssi_bar} {advertisement_data.rssi:4d}dBm | " | |
| f"{service_type:9s} {frame_name:14s}{match_indicator}{Colors.RESET}") | |
| # Show full data breakdown | |
| print(f" {Colors.GRAY}Raw: {self._format_bytes(data)}{Colors.RESET}") | |
| if eid: | |
| print(f" {Colors.GRAY}EID: {self._format_bytes(eid)}{Colors.RESET}") | |
| # Show any additional bytes after EID | |
| if len(data) > 21: | |
| extra = data[21:] | |
| print(f" {Colors.GRAY}Extra ({len(extra)} bytes): {self._format_bytes(extra)}{Colors.RESET}") | |
| # Show manufacturer data if present | |
| if advertisement_data.manufacturer_data: | |
| for company_id, mfg_data in advertisement_data.manufacturer_data.items(): | |
| print(f" {Colors.GRAY}Manufacturer 0x{company_id:04x}: {self._format_bytes(mfg_data)}{Colors.RESET}") | |
| # Show all service UUIDs | |
| if len(service_data) > 1: | |
| other_services = [k for k in service_data.keys() | |
| if k not in (FMDN_SERVICE_UUID, EDDYSTONE_SERVICE_UUID)] | |
| if other_services: | |
| print(f" {Colors.GRAY}Other services: {other_services}{Colors.RESET}") | |
| # Show gap warning | |
| if stats.intervals and stats.intervals[-1] > 5.0: | |
| gap_secs = stats.intervals[-1] | |
| print(f" {Colors.RED}⚠ GAP: {gap_secs:.1f}s since last advertisement{Colors.RESET}") | |
| print() # Blank line between entries | |
| elif self.target_mac and mac == self.target_mac: | |
| # Non-FMDN advertisement from target device | |
| timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3] | |
| print(f"{Colors.MAGENTA}{timestamp} {mac} (non-FMDN){Colors.RESET}") | |
| print(f" {Colors.GRAY}RSSI: {advertisement_data.rssi} dBm{Colors.RESET}") | |
| print(f" {Colors.GRAY}Name: {advertisement_data.local_name}{Colors.RESET}") | |
| if service_data: | |
| for uuid, data in service_data.items(): | |
| print(f" {Colors.GRAY}Service {uuid}: {self._format_bytes(data)}{Colors.RESET}") | |
| if advertisement_data.manufacturer_data: | |
| for company_id, data in advertisement_data.manufacturer_data.items(): | |
| print(f" {Colors.GRAY}Manufacturer 0x{company_id:04x}: {self._format_bytes(data)}{Colors.RESET}") | |
| print() | |
| def print_summary(self): | |
| """Print summary statistics.""" | |
| print(f"\n{Colors.BOLD}{'=' * 70}") | |
| print(f"SUMMARY") | |
| print(f"{'=' * 70}{Colors.RESET}") | |
| duration = (datetime.now() - self.start_time).total_seconds() | |
| print(f"Duration: {duration:.1f}s | Total advertisements: {self.total_adverts}") | |
| print(f"FMDN/Eddystone devices found: {len(self.fmdn_devices)}") | |
| for mac in self.fmdn_devices: | |
| stats = self.devices[mac] | |
| avg_interval = sum(stats.intervals) / len(stats.intervals) if stats.intervals else 0 | |
| avg_rssi = sum(stats.rssi_samples) / len(stats.rssi_samples) if stats.rssi_samples else 0 | |
| print(f"\n{Colors.CYAN}Device: {mac}{Colors.RESET}") | |
| print(f" Advertisements: {stats.advert_count}") | |
| print(f" Avg interval: {avg_interval*1000:.0f}ms") | |
| print(f" Avg RSSI: {avg_rssi:.0f} dBm") | |
| print(f" RSSI range: {min(stats.rssi_samples)} to {max(stats.rssi_samples)} dBm") | |
| print(f" Frame types:") | |
| for ft, count in sorted(stats.frame_types.items()): | |
| print(f" {self._get_frame_type_name(ft)}: {count}") | |
| if stats.gaps: | |
| print(f" {Colors.RED}Gaps (>5s): {len(stats.gaps)}{Colors.RESET}") | |
| for start, end, duration in stats.gaps[-5:]: # Show last 5 gaps | |
| print(f" {Colors.RED}{duration:.1f}s gap{Colors.RESET}") | |
| async def main(): | |
| parser = argparse.ArgumentParser(description="BLE Advertisement Monitor for FMDN Debugging") | |
| parser.add_argument("--identity-key", "-k", help="64-char hex identity key for EID matching") | |
| parser.add_argument("--pair-date", "-p", type=int, help="Unix timestamp of device pairing") | |
| parser.add_argument("--mac", "-m", help="Target MAC address to track (auto-detects FMDN devices)") | |
| parser.add_argument("--duration", "-d", type=int, default=0, help="Scan duration in seconds (0=infinite)") | |
| args = parser.parse_args() | |
| print(f"{Colors.BOLD}BLE Advertisement Monitor{Colors.RESET}") | |
| print(f"Scanning for FMDN (0xFCAF) and Eddystone (0xFEAA) service data...") | |
| if args.mac: | |
| print(f"Filtering for MAC: {args.mac.upper()}") | |
| if args.identity_key: | |
| print(f"EID matching enabled") | |
| print(f"Press Ctrl+C to stop and show summary\n") | |
| monitor = BleMonitor( | |
| identity_key=args.identity_key, | |
| pair_date=args.pair_date, | |
| target_mac=args.mac, | |
| ) | |
| try: | |
| async with BleakScanner(detection_callback=monitor.detection_callback): | |
| if args.duration > 0: | |
| await asyncio.sleep(args.duration) | |
| else: | |
| # Run forever until Ctrl+C | |
| while True: | |
| await asyncio.sleep(1) | |
| except KeyboardInterrupt: | |
| pass | |
| finally: | |
| monitor.print_summary() | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| FMDN Ephemeral Identifier (EID) Generator | |
| Generates EIDs from an identity key and pair date for FMDN device detection. | |
| Port of the Kotlin implementation for use with the BLE monitor. | |
| EID Algorithm (from Google FMDN spec): | |
| 1. Mask timestamp (zero lower 10 bits) | |
| 2. Build 32-byte data block | |
| 3. AES-256-ECB encrypt with identity key | |
| 4. Convert to BigInteger, mod by SECP160r1 curve order | |
| 5. Multiply by generator point: R = r * G | |
| 6. EID = x-coordinate of R (20 bytes) | |
| """ | |
| import time | |
| from dataclasses import dataclass | |
| from typing import Optional | |
| from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes | |
| from cryptography.hazmat.backends import default_backend | |
| # secp160r1 curve parameters | |
| # p = 2^160 - 2^31 - 1 | |
| P = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF7FFFFFFF | |
| # Curve order | |
| N = 0x0100000000000000000001F4C8F927AED3CA752257 | |
| # Generator point | |
| GX = 0x4A96B5688EF573284664698968C38BB913CBFC82 | |
| GY = 0x23A628553168947D59DCC912042351377AC5FB32 | |
| K = 10 # Lower bits to mask in timestamp | |
| ROTATION_PERIOD = 1024 # seconds (~17 minutes) | |
| @dataclass | |
| class JacobianPoint: | |
| """Point in Jacobian coordinates (X:Y:Z) where affine x = X/Z^2, y = Y/Z^3.""" | |
| X: int | |
| Y: int | |
| Z: int | |
| def is_infinity(self) -> bool: | |
| return self.Z == 0 | |
| # Point at infinity | |
| INFINITY = JacobianPoint(1, 1, 0) | |
| # Generator point in Jacobian coordinates | |
| G = JacobianPoint(GX, GY, 1) | |
| def jacobian_double(point: JacobianPoint) -> JacobianPoint: | |
| """ | |
| Point doubling in Jacobian coordinates, optimized for a = -3. | |
| Formula: dbl-2001-b from hyperelliptic.org | |
| """ | |
| if point.is_infinity(): | |
| return point | |
| X1, Y1, Z1 = point.X, point.Y, point.Z | |
| delta = (Z1 * Z1) % P | |
| gamma = (Y1 * Y1) % P | |
| beta = (X1 * gamma) % P | |
| # alpha = 3 * (X1 - delta) * (X1 + delta) -- a = -3 optimization | |
| x_minus_delta = (X1 - delta) % P | |
| x_plus_delta = (X1 + delta) % P | |
| alpha = (3 * x_minus_delta * x_plus_delta) % P | |
| # X3 = alpha^2 - 8*beta | |
| alpha2 = (alpha * alpha) % P | |
| beta8 = (8 * beta) % P | |
| X3 = (alpha2 - beta8) % P | |
| # Y3 = alpha * (4*beta - X3) - 8*gamma^2 | |
| beta4 = (4 * beta) % P | |
| gamma2 = (gamma * gamma) % P | |
| Y3 = (alpha * (beta4 - X3) - 8 * gamma2) % P | |
| # Z3 = (Y1 + Z1)^2 - gamma - delta | |
| y1_plus_z1 = (Y1 + Z1) % P | |
| Z3 = (y1_plus_z1 * y1_plus_z1 - gamma - delta) % P | |
| return JacobianPoint(X3, Y3, Z3) | |
| def jacobian_add(p1: JacobianPoint, p2: JacobianPoint) -> JacobianPoint: | |
| """ | |
| Point addition in Jacobian coordinates. | |
| Formula: add-2007-bl from hyperelliptic.org | |
| """ | |
| if p1.is_infinity(): | |
| return p2 | |
| if p2.is_infinity(): | |
| return p1 | |
| X1, Y1, Z1 = p1.X, p1.Y, p1.Z | |
| X2, Y2, Z2 = p2.X, p2.Y, p2.Z | |
| Z1Z1 = (Z1 * Z1) % P | |
| Z2Z2 = (Z2 * Z2) % P | |
| U1 = (X1 * Z2Z2) % P | |
| U2 = (X2 * Z1Z1) % P | |
| S1 = (Y1 * Z2 * Z2Z2) % P | |
| S2 = (Y2 * Z1 * Z1Z1) % P | |
| H = (U2 - U1) % P | |
| S2_minus_S1 = (S2 - S1) % P | |
| # Special cases | |
| if H == 0: | |
| if S2_minus_S1 == 0: | |
| return jacobian_double(p1) # P == Q | |
| else: | |
| return INFINITY # P == -Q | |
| I = (4 * H * H) % P | |
| J = (H * I) % P | |
| r = (2 * S2_minus_S1) % P | |
| V = (U1 * I) % P | |
| # X3 = r^2 - J - 2V | |
| r2 = (r * r) % P | |
| X3 = (r2 - J - 2 * V) % P | |
| # Y3 = r * (V - X3) - 2 * S1 * J | |
| Y3 = (r * (V - X3) - 2 * S1 * J) % P | |
| # Z3 = ((Z1 + Z2)^2 - Z1Z1 - Z2Z2) * H | |
| z1_plus_z2 = (Z1 + Z2) % P | |
| Z3 = ((z1_plus_z2 * z1_plus_z2 - Z1Z1 - Z2Z2) * H) % P | |
| return JacobianPoint(X3, Y3, Z3) | |
| def scalar_multiply(k: int, point: JacobianPoint) -> JacobianPoint: | |
| """Scalar multiplication using double-and-add.""" | |
| if k == 0 or point.is_infinity(): | |
| return INFINITY | |
| k = k % N | |
| if k == 0: | |
| return INFINITY | |
| result = INFINITY | |
| addend = point | |
| while k > 0: | |
| if k & 1: | |
| result = jacobian_add(result, addend) | |
| addend = jacobian_double(addend) | |
| k >>= 1 | |
| return result | |
| def to_affine_x(point: JacobianPoint) -> int: | |
| """Convert Jacobian point to affine x-coordinate: x = X / Z^2.""" | |
| if point.is_infinity(): | |
| raise ValueError("Point at infinity has no affine coordinates") | |
| Z2 = (point.Z * point.Z) % P | |
| Z2_inv = pow(Z2, P - 2, P) # Modular inverse using Fermat's little theorem | |
| return (point.X * Z2_inv) % P | |
| def multiply_g(k: int) -> int: | |
| """Multiply generator point by scalar and return affine x-coordinate.""" | |
| R = scalar_multiply(k, G) | |
| return to_affine_x(R) | |
| class EidGenerator: | |
| """Generates FMDN Ephemeral Identifiers.""" | |
| ROTATION_PERIOD = ROTATION_PERIOD | |
| @staticmethod | |
| def generate_eid(identity_key: bytes, beacon_time_counter: int) -> bytes: | |
| """ | |
| Generate an EID for the given identity key and beacon time counter. | |
| Args: | |
| identity_key: 32-byte identity key from Google Find My Device | |
| beacon_time_counter: Seconds since device pairing | |
| Returns: | |
| 20-byte EID | |
| """ | |
| if len(identity_key) != 32: | |
| raise ValueError(f"Identity key must be 32 bytes, got {len(identity_key)}") | |
| # Step 1: Mask timestamp (zero lower K bits) | |
| ts_masked = (beacon_time_counter & ((-1) << K)) & 0xFFFFFFFF | |
| # Step 2: Build 32-byte data block | |
| # Format: 0xFF*11 + K + ts(4) + 0x00*11 + K + ts(4) | |
| ts_bytes = ts_masked.to_bytes(4, 'big') | |
| data = ( | |
| b'\xFF' * 11 + | |
| bytes([K]) + | |
| ts_bytes + | |
| b'\x00' * 11 + | |
| bytes([K]) + | |
| ts_bytes | |
| ) | |
| # Step 3: AES-256-ECB encrypt | |
| cipher = Cipher(algorithms.AES(identity_key), modes.ECB(), backend=default_backend()) | |
| encryptor = cipher.encryptor() | |
| r_dash = encryptor.update(data) + encryptor.finalize() | |
| # Step 4: r = r' mod n (curve order) | |
| r_dash_int = int.from_bytes(r_dash, 'big') | |
| r = r_dash_int % N | |
| # Step 5: R = r * G (generator point multiplication) | |
| # Step 6: Return x-coordinate as 20 bytes (big-endian, zero-padded) | |
| x = multiply_g(r) | |
| return x.to_bytes(20, 'big') | |
| @staticmethod | |
| def get_beacon_time_counter(pair_date: int) -> int: | |
| """Calculate beacon time counter from current time and pair date.""" | |
| now = int(time.time()) | |
| return now - pair_date | |
| @staticmethod | |
| def get_current_eid(identity_key: bytes, pair_date: int) -> bytes: | |
| """Generate the current EID for the given identity key and pair date.""" | |
| beacon_time_counter = EidGenerator.get_beacon_time_counter(pair_date) | |
| return EidGenerator.generate_eid(identity_key, beacon_time_counter) | |
| @staticmethod | |
| def get_eids_with_offsets(identity_key: bytes, pair_date: int, windows: int = 7) -> list[tuple[bytes, int]]: | |
| """ | |
| Generate EIDs for current and adjacent time windows to handle clock skew. | |
| Args: | |
| identity_key: 32-byte identity key | |
| pair_date: Unix timestamp when device was paired | |
| windows: Number of windows to check in each direction (default 7 = ~2 hours) | |
| Returns: | |
| List of (EID, window_offset) tuples | |
| """ | |
| beacon_time_counter = EidGenerator.get_beacon_time_counter(pair_date) | |
| results = [] | |
| for offset in range(-windows, windows + 1): | |
| eid = EidGenerator.generate_eid( | |
| identity_key, | |
| beacon_time_counter + offset * ROTATION_PERIOD | |
| ) | |
| results.append((eid, offset)) | |
| return results | |
| @staticmethod | |
| def compute_eid(identity_key: bytes, pair_date: int, offset: int = 0) -> bytes: | |
| """Compute EID for a specific window offset.""" | |
| beacon_time_counter = EidGenerator.get_beacon_time_counter(pair_date) | |
| return EidGenerator.generate_eid( | |
| identity_key, | |
| beacon_time_counter + offset * ROTATION_PERIOD | |
| ) | |
| @staticmethod | |
| def find_eid_offset( | |
| identity_key: bytes, | |
| pair_date: int, | |
| advertised_eid: bytes, | |
| max_hours: int = 24 | |
| ) -> Optional[int]: | |
| """ | |
| Search for matching EID across a wide time range. | |
| Args: | |
| identity_key: 32-byte identity key | |
| pair_date: Unix timestamp when device was paired | |
| advertised_eid: The EID to search for | |
| max_hours: Maximum hours to search in each direction | |
| Returns: | |
| Window offset if found, None otherwise | |
| """ | |
| if len(advertised_eid) != 20: | |
| return None | |
| beacon_time_counter = EidGenerator.get_beacon_time_counter(pair_date) | |
| windows_to_check = (max_hours * 3600) // ROTATION_PERIOD | |
| for offset in range(-windows_to_check, windows_to_check + 1): | |
| eid = EidGenerator.generate_eid( | |
| identity_key, | |
| beacon_time_counter + offset * ROTATION_PERIOD | |
| ) | |
| if eid == advertised_eid: | |
| return offset | |
| return None | |
| def main(): | |
| """Test EID generation.""" | |
| import sys | |
| if len(sys.argv) < 3: | |
| print("Usage: python eid_generator.py <identity_key_hex> <pair_date>") | |
| print(" identity_key_hex: 64-character hex string (32 bytes)") | |
| print(" pair_date: Unix timestamp when device was paired") | |
| sys.exit(1) | |
| identity_key_hex = sys.argv[1] | |
| pair_date = int(sys.argv[2]) | |
| # Clean and validate identity key | |
| identity_key_hex = ''.join(c for c in identity_key_hex.lower() if c in '0123456789abcdef') | |
| if len(identity_key_hex) != 64: | |
| print(f"Error: Identity key must be 64 hex chars, got {len(identity_key_hex)}") | |
| sys.exit(1) | |
| identity_key = bytes.fromhex(identity_key_hex) | |
| print(f"Identity key: {identity_key_hex[:16]}...{identity_key_hex[-16:]}") | |
| print(f"Pair date: {pair_date}") | |
| print() | |
| # Generate current EID | |
| current_eid = EidGenerator.get_current_eid(identity_key, pair_date) | |
| print(f"Current EID: {current_eid.hex()}") | |
| print() | |
| # Generate EIDs with clock skew tolerance | |
| print("EIDs with clock skew tolerance (±7 windows = ~2 hours):") | |
| for eid, offset in EidGenerator.get_eids_with_offsets(identity_key, pair_date): | |
| drift_secs = offset * ROTATION_PERIOD | |
| marker = " <-- current" if offset == 0 else "" | |
| print(f" offset {offset:+3d} ({drift_secs:+6d}s): {eid.hex()}{marker}") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment