Skip to content

Instantly share code, notes, and snippets.

@HarryR
Created January 21, 2026 17:51
Show Gist options
  • Select an option

  • Save HarryR/93c07f679370849e5e32fe7e12aa7610 to your computer and use it in GitHub Desktop.

Select an option

Save HarryR/93c07f679370849e5e32fe7e12aa7610 to your computer and use it in GitHub Desktop.
Use the Google Identity Key and pairing date to monitor your FMDN device
#!/usr/bin/env python3
"""
BLE Advertisement Monitor for FMDN Device Debugging
Monitors all BLE advertisements and tracks devices broadcasting FMDN/Eddystone
service data. Helps identify what data is being broadcast and timing patterns.
Usage:
python ble_monitor.py [--identity-key KEY] [--pair-date TIMESTAMP] [--mac MAC]
Requirements:
pip install bleak
"""
import argparse
import asyncio
import struct
import sys
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
try:
from bleak import BleakScanner
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
except ImportError:
print("Error: bleak not installed. Run: pip install bleak")
sys.exit(1)
# Service UUIDs
FMDN_SERVICE_UUID = "0000fcaf-0000-1000-8000-00805f9b34fb"
EDDYSTONE_SERVICE_UUID = "0000feaa-0000-1000-8000-00805f9b34fb"
# ANSI colors for terminal output
class Colors:
RESET = "\033[0m"
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
MAGENTA = "\033[95m"
CYAN = "\033[96m"
GRAY = "\033[90m"
BOLD = "\033[1m"
@dataclass
class DeviceStats:
"""Statistics for a tracked device."""
first_seen: float = 0.0
last_seen: float = 0.0
advert_count: int = 0
rssi_samples: list = field(default_factory=list)
intervals: list = field(default_factory=list)
frame_types: dict = field(default_factory=lambda: defaultdict(int))
service_uuids: set = field(default_factory=set)
gaps: list = field(default_factory=list) # Gaps > 5 seconds
class BleMonitor:
def __init__(
self,
identity_key: Optional[str] = None,
pair_date: Optional[int] = None,
target_mac: Optional[str] = None,
):
self.identity_key = identity_key
self.pair_date = pair_date
self.target_mac = target_mac.upper() if target_mac else None
self.devices: dict[str, DeviceStats] = defaultdict(DeviceStats)
self.fmdn_devices: set[str] = set() # MACs that have sent FMDN/Eddystone
self.start_time = datetime.now()
self.total_adverts = 0
# EID matching setup
self.expected_eids: set[bytes] = set()
if identity_key and pair_date:
self._setup_eid_matching()
def _setup_eid_matching(self):
"""Compute expected EIDs if identity key and pair date provided."""
try:
from eid_generator import EidGenerator
key_bytes = bytes.fromhex(self.identity_key)
# Generate EIDs for ±7 windows (±2 hours)
for offset in range(-7, 8):
eid = EidGenerator.compute_eid(key_bytes, self.pair_date, offset)
self.expected_eids.add(eid)
print(f"{Colors.GREEN}Loaded {len(self.expected_eids)} expected EIDs for matching{Colors.RESET}")
except ImportError:
print(f"{Colors.YELLOW}Note: eid_generator.py not found, EID matching disabled{Colors.RESET}")
except Exception as e:
print(f"{Colors.RED}Error setting up EID matching: {e}{Colors.RESET}")
def _parse_frame_type(self, service_data: bytes) -> tuple[int, Optional[bytes]]:
"""Parse FMDN/Eddystone frame type and extract EID if present."""
if not service_data:
return -1, None
frame_type = service_data[0]
eid = None
if len(service_data) >= 21:
eid = service_data[1:21]
return frame_type, eid
def _format_bytes(self, data: bytes) -> str:
"""Format bytes as hex string."""
return data.hex()
def _format_rssi_bar(self, rssi: int) -> str:
"""Visual RSSI bar."""
# RSSI typically ranges from -100 (weak) to -30 (strong)
normalized = max(0, min(100, (rssi + 100)))
bars = normalized // 10
return f"[{'█' * bars}{'░' * (10 - bars)}]"
def _get_frame_type_name(self, frame_type: int) -> str:
"""Human-readable frame type name."""
if frame_type == 0x00:
return "Near Owner"
elif frame_type == 0x40:
return "Standard EID"
elif frame_type == 0x41:
return "Unwinder EID"
elif frame_type == 0x10:
return "Eddystone-UID"
elif frame_type == 0x20:
return "Eddystone-URL"
elif frame_type == 0x30:
return "Eddystone-TLM"
else:
return f"Unknown (0x{frame_type:02x})"
def _check_eid_match(self, eid: bytes) -> bool:
"""Check if EID matches expected values."""
if not self.expected_eids:
return False
return eid in self.expected_eids
def detection_callback(self, device: BLEDevice, advertisement_data: AdvertisementData):
"""Called for each BLE advertisement detected."""
now = asyncio.get_event_loop().time()
mac = device.address.upper()
self.total_adverts += 1
# Get service data
service_data = advertisement_data.service_data or {}
fmdn_data = service_data.get(FMDN_SERVICE_UUID)
eddystone_data = service_data.get(EDDYSTONE_SERVICE_UUID)
is_fmdn = fmdn_data is not None or eddystone_data is not None
# Filter by target MAC if specified
if self.target_mac and mac != self.target_mac and not is_fmdn:
return
# Update device stats
stats = self.devices[mac]
if stats.first_seen == 0:
stats.first_seen = now
# Track intervals and gaps
if stats.last_seen > 0:
interval = now - stats.last_seen
stats.intervals.append(interval)
if interval > 5.0: # Gap > 5 seconds
stats.gaps.append((stats.last_seen, now, interval))
stats.last_seen = now
stats.advert_count += 1
stats.rssi_samples.append(advertisement_data.rssi)
# Track service UUIDs
for uuid in service_data.keys():
stats.service_uuids.add(uuid)
# Process FMDN/Eddystone data
if is_fmdn:
self.fmdn_devices.add(mac)
data = fmdn_data or eddystone_data
service_type = "FMDN" if fmdn_data else "Eddystone"
frame_type, eid = self._parse_frame_type(data)
stats.frame_types[frame_type] += 1
# Check EID match
eid_match = False
if eid and self.expected_eids:
eid_match = self._check_eid_match(eid)
# Print detailed output
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
rssi_bar = self._format_rssi_bar(advertisement_data.rssi)
frame_name = self._get_frame_type_name(frame_type)
# Color based on match status
if eid_match:
color = Colors.GREEN
match_indicator = " ✓ MATCH"
elif self.target_mac and mac == self.target_mac:
color = Colors.CYAN
match_indicator = ""
else:
color = Colors.YELLOW
match_indicator = ""
print(f"{color}{timestamp} {mac} {rssi_bar} {advertisement_data.rssi:4d}dBm | "
f"{service_type:9s} {frame_name:14s}{match_indicator}{Colors.RESET}")
# Show full data breakdown
print(f" {Colors.GRAY}Raw: {self._format_bytes(data)}{Colors.RESET}")
if eid:
print(f" {Colors.GRAY}EID: {self._format_bytes(eid)}{Colors.RESET}")
# Show any additional bytes after EID
if len(data) > 21:
extra = data[21:]
print(f" {Colors.GRAY}Extra ({len(extra)} bytes): {self._format_bytes(extra)}{Colors.RESET}")
# Show manufacturer data if present
if advertisement_data.manufacturer_data:
for company_id, mfg_data in advertisement_data.manufacturer_data.items():
print(f" {Colors.GRAY}Manufacturer 0x{company_id:04x}: {self._format_bytes(mfg_data)}{Colors.RESET}")
# Show all service UUIDs
if len(service_data) > 1:
other_services = [k for k in service_data.keys()
if k not in (FMDN_SERVICE_UUID, EDDYSTONE_SERVICE_UUID)]
if other_services:
print(f" {Colors.GRAY}Other services: {other_services}{Colors.RESET}")
# Show gap warning
if stats.intervals and stats.intervals[-1] > 5.0:
gap_secs = stats.intervals[-1]
print(f" {Colors.RED}⚠ GAP: {gap_secs:.1f}s since last advertisement{Colors.RESET}")
print() # Blank line between entries
elif self.target_mac and mac == self.target_mac:
# Non-FMDN advertisement from target device
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
print(f"{Colors.MAGENTA}{timestamp} {mac} (non-FMDN){Colors.RESET}")
print(f" {Colors.GRAY}RSSI: {advertisement_data.rssi} dBm{Colors.RESET}")
print(f" {Colors.GRAY}Name: {advertisement_data.local_name}{Colors.RESET}")
if service_data:
for uuid, data in service_data.items():
print(f" {Colors.GRAY}Service {uuid}: {self._format_bytes(data)}{Colors.RESET}")
if advertisement_data.manufacturer_data:
for company_id, data in advertisement_data.manufacturer_data.items():
print(f" {Colors.GRAY}Manufacturer 0x{company_id:04x}: {self._format_bytes(data)}{Colors.RESET}")
print()
def print_summary(self):
"""Print summary statistics."""
print(f"\n{Colors.BOLD}{'=' * 70}")
print(f"SUMMARY")
print(f"{'=' * 70}{Colors.RESET}")
duration = (datetime.now() - self.start_time).total_seconds()
print(f"Duration: {duration:.1f}s | Total advertisements: {self.total_adverts}")
print(f"FMDN/Eddystone devices found: {len(self.fmdn_devices)}")
for mac in self.fmdn_devices:
stats = self.devices[mac]
avg_interval = sum(stats.intervals) / len(stats.intervals) if stats.intervals else 0
avg_rssi = sum(stats.rssi_samples) / len(stats.rssi_samples) if stats.rssi_samples else 0
print(f"\n{Colors.CYAN}Device: {mac}{Colors.RESET}")
print(f" Advertisements: {stats.advert_count}")
print(f" Avg interval: {avg_interval*1000:.0f}ms")
print(f" Avg RSSI: {avg_rssi:.0f} dBm")
print(f" RSSI range: {min(stats.rssi_samples)} to {max(stats.rssi_samples)} dBm")
print(f" Frame types:")
for ft, count in sorted(stats.frame_types.items()):
print(f" {self._get_frame_type_name(ft)}: {count}")
if stats.gaps:
print(f" {Colors.RED}Gaps (>5s): {len(stats.gaps)}{Colors.RESET}")
for start, end, duration in stats.gaps[-5:]: # Show last 5 gaps
print(f" {Colors.RED}{duration:.1f}s gap{Colors.RESET}")
async def main():
parser = argparse.ArgumentParser(description="BLE Advertisement Monitor for FMDN Debugging")
parser.add_argument("--identity-key", "-k", help="64-char hex identity key for EID matching")
parser.add_argument("--pair-date", "-p", type=int, help="Unix timestamp of device pairing")
parser.add_argument("--mac", "-m", help="Target MAC address to track (auto-detects FMDN devices)")
parser.add_argument("--duration", "-d", type=int, default=0, help="Scan duration in seconds (0=infinite)")
args = parser.parse_args()
print(f"{Colors.BOLD}BLE Advertisement Monitor{Colors.RESET}")
print(f"Scanning for FMDN (0xFCAF) and Eddystone (0xFEAA) service data...")
if args.mac:
print(f"Filtering for MAC: {args.mac.upper()}")
if args.identity_key:
print(f"EID matching enabled")
print(f"Press Ctrl+C to stop and show summary\n")
monitor = BleMonitor(
identity_key=args.identity_key,
pair_date=args.pair_date,
target_mac=args.mac,
)
try:
async with BleakScanner(detection_callback=monitor.detection_callback):
if args.duration > 0:
await asyncio.sleep(args.duration)
else:
# Run forever until Ctrl+C
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
pass
finally:
monitor.print_summary()
if __name__ == "__main__":
asyncio.run(main())
#!/usr/bin/env python3
"""
FMDN Ephemeral Identifier (EID) Generator
Generates EIDs from an identity key and pair date for FMDN device detection.
Port of the Kotlin implementation for use with the BLE monitor.
EID Algorithm (from Google FMDN spec):
1. Mask timestamp (zero lower 10 bits)
2. Build 32-byte data block
3. AES-256-ECB encrypt with identity key
4. Convert to BigInteger, mod by SECP160r1 curve order
5. Multiply by generator point: R = r * G
6. EID = x-coordinate of R (20 bytes)
"""
import time
from dataclasses import dataclass
from typing import Optional
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
# secp160r1 curve parameters
# p = 2^160 - 2^31 - 1
P = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF7FFFFFFF
# Curve order
N = 0x0100000000000000000001F4C8F927AED3CA752257
# Generator point
GX = 0x4A96B5688EF573284664698968C38BB913CBFC82
GY = 0x23A628553168947D59DCC912042351377AC5FB32
K = 10 # Lower bits to mask in timestamp
ROTATION_PERIOD = 1024 # seconds (~17 minutes)
@dataclass
class JacobianPoint:
"""Point in Jacobian coordinates (X:Y:Z) where affine x = X/Z^2, y = Y/Z^3."""
X: int
Y: int
Z: int
def is_infinity(self) -> bool:
return self.Z == 0
# Point at infinity
INFINITY = JacobianPoint(1, 1, 0)
# Generator point in Jacobian coordinates
G = JacobianPoint(GX, GY, 1)
def jacobian_double(point: JacobianPoint) -> JacobianPoint:
"""
Point doubling in Jacobian coordinates, optimized for a = -3.
Formula: dbl-2001-b from hyperelliptic.org
"""
if point.is_infinity():
return point
X1, Y1, Z1 = point.X, point.Y, point.Z
delta = (Z1 * Z1) % P
gamma = (Y1 * Y1) % P
beta = (X1 * gamma) % P
# alpha = 3 * (X1 - delta) * (X1 + delta) -- a = -3 optimization
x_minus_delta = (X1 - delta) % P
x_plus_delta = (X1 + delta) % P
alpha = (3 * x_minus_delta * x_plus_delta) % P
# X3 = alpha^2 - 8*beta
alpha2 = (alpha * alpha) % P
beta8 = (8 * beta) % P
X3 = (alpha2 - beta8) % P
# Y3 = alpha * (4*beta - X3) - 8*gamma^2
beta4 = (4 * beta) % P
gamma2 = (gamma * gamma) % P
Y3 = (alpha * (beta4 - X3) - 8 * gamma2) % P
# Z3 = (Y1 + Z1)^2 - gamma - delta
y1_plus_z1 = (Y1 + Z1) % P
Z3 = (y1_plus_z1 * y1_plus_z1 - gamma - delta) % P
return JacobianPoint(X3, Y3, Z3)
def jacobian_add(p1: JacobianPoint, p2: JacobianPoint) -> JacobianPoint:
"""
Point addition in Jacobian coordinates.
Formula: add-2007-bl from hyperelliptic.org
"""
if p1.is_infinity():
return p2
if p2.is_infinity():
return p1
X1, Y1, Z1 = p1.X, p1.Y, p1.Z
X2, Y2, Z2 = p2.X, p2.Y, p2.Z
Z1Z1 = (Z1 * Z1) % P
Z2Z2 = (Z2 * Z2) % P
U1 = (X1 * Z2Z2) % P
U2 = (X2 * Z1Z1) % P
S1 = (Y1 * Z2 * Z2Z2) % P
S2 = (Y2 * Z1 * Z1Z1) % P
H = (U2 - U1) % P
S2_minus_S1 = (S2 - S1) % P
# Special cases
if H == 0:
if S2_minus_S1 == 0:
return jacobian_double(p1) # P == Q
else:
return INFINITY # P == -Q
I = (4 * H * H) % P
J = (H * I) % P
r = (2 * S2_minus_S1) % P
V = (U1 * I) % P
# X3 = r^2 - J - 2V
r2 = (r * r) % P
X3 = (r2 - J - 2 * V) % P
# Y3 = r * (V - X3) - 2 * S1 * J
Y3 = (r * (V - X3) - 2 * S1 * J) % P
# Z3 = ((Z1 + Z2)^2 - Z1Z1 - Z2Z2) * H
z1_plus_z2 = (Z1 + Z2) % P
Z3 = ((z1_plus_z2 * z1_plus_z2 - Z1Z1 - Z2Z2) * H) % P
return JacobianPoint(X3, Y3, Z3)
def scalar_multiply(k: int, point: JacobianPoint) -> JacobianPoint:
"""Scalar multiplication using double-and-add."""
if k == 0 or point.is_infinity():
return INFINITY
k = k % N
if k == 0:
return INFINITY
result = INFINITY
addend = point
while k > 0:
if k & 1:
result = jacobian_add(result, addend)
addend = jacobian_double(addend)
k >>= 1
return result
def to_affine_x(point: JacobianPoint) -> int:
"""Convert Jacobian point to affine x-coordinate: x = X / Z^2."""
if point.is_infinity():
raise ValueError("Point at infinity has no affine coordinates")
Z2 = (point.Z * point.Z) % P
Z2_inv = pow(Z2, P - 2, P) # Modular inverse using Fermat's little theorem
return (point.X * Z2_inv) % P
def multiply_g(k: int) -> int:
"""Multiply generator point by scalar and return affine x-coordinate."""
R = scalar_multiply(k, G)
return to_affine_x(R)
class EidGenerator:
"""Generates FMDN Ephemeral Identifiers."""
ROTATION_PERIOD = ROTATION_PERIOD
@staticmethod
def generate_eid(identity_key: bytes, beacon_time_counter: int) -> bytes:
"""
Generate an EID for the given identity key and beacon time counter.
Args:
identity_key: 32-byte identity key from Google Find My Device
beacon_time_counter: Seconds since device pairing
Returns:
20-byte EID
"""
if len(identity_key) != 32:
raise ValueError(f"Identity key must be 32 bytes, got {len(identity_key)}")
# Step 1: Mask timestamp (zero lower K bits)
ts_masked = (beacon_time_counter & ((-1) << K)) & 0xFFFFFFFF
# Step 2: Build 32-byte data block
# Format: 0xFF*11 + K + ts(4) + 0x00*11 + K + ts(4)
ts_bytes = ts_masked.to_bytes(4, 'big')
data = (
b'\xFF' * 11 +
bytes([K]) +
ts_bytes +
b'\x00' * 11 +
bytes([K]) +
ts_bytes
)
# Step 3: AES-256-ECB encrypt
cipher = Cipher(algorithms.AES(identity_key), modes.ECB(), backend=default_backend())
encryptor = cipher.encryptor()
r_dash = encryptor.update(data) + encryptor.finalize()
# Step 4: r = r' mod n (curve order)
r_dash_int = int.from_bytes(r_dash, 'big')
r = r_dash_int % N
# Step 5: R = r * G (generator point multiplication)
# Step 6: Return x-coordinate as 20 bytes (big-endian, zero-padded)
x = multiply_g(r)
return x.to_bytes(20, 'big')
@staticmethod
def get_beacon_time_counter(pair_date: int) -> int:
"""Calculate beacon time counter from current time and pair date."""
now = int(time.time())
return now - pair_date
@staticmethod
def get_current_eid(identity_key: bytes, pair_date: int) -> bytes:
"""Generate the current EID for the given identity key and pair date."""
beacon_time_counter = EidGenerator.get_beacon_time_counter(pair_date)
return EidGenerator.generate_eid(identity_key, beacon_time_counter)
@staticmethod
def get_eids_with_offsets(identity_key: bytes, pair_date: int, windows: int = 7) -> list[tuple[bytes, int]]:
"""
Generate EIDs for current and adjacent time windows to handle clock skew.
Args:
identity_key: 32-byte identity key
pair_date: Unix timestamp when device was paired
windows: Number of windows to check in each direction (default 7 = ~2 hours)
Returns:
List of (EID, window_offset) tuples
"""
beacon_time_counter = EidGenerator.get_beacon_time_counter(pair_date)
results = []
for offset in range(-windows, windows + 1):
eid = EidGenerator.generate_eid(
identity_key,
beacon_time_counter + offset * ROTATION_PERIOD
)
results.append((eid, offset))
return results
@staticmethod
def compute_eid(identity_key: bytes, pair_date: int, offset: int = 0) -> bytes:
"""Compute EID for a specific window offset."""
beacon_time_counter = EidGenerator.get_beacon_time_counter(pair_date)
return EidGenerator.generate_eid(
identity_key,
beacon_time_counter + offset * ROTATION_PERIOD
)
@staticmethod
def find_eid_offset(
identity_key: bytes,
pair_date: int,
advertised_eid: bytes,
max_hours: int = 24
) -> Optional[int]:
"""
Search for matching EID across a wide time range.
Args:
identity_key: 32-byte identity key
pair_date: Unix timestamp when device was paired
advertised_eid: The EID to search for
max_hours: Maximum hours to search in each direction
Returns:
Window offset if found, None otherwise
"""
if len(advertised_eid) != 20:
return None
beacon_time_counter = EidGenerator.get_beacon_time_counter(pair_date)
windows_to_check = (max_hours * 3600) // ROTATION_PERIOD
for offset in range(-windows_to_check, windows_to_check + 1):
eid = EidGenerator.generate_eid(
identity_key,
beacon_time_counter + offset * ROTATION_PERIOD
)
if eid == advertised_eid:
return offset
return None
def main():
"""Test EID generation."""
import sys
if len(sys.argv) < 3:
print("Usage: python eid_generator.py <identity_key_hex> <pair_date>")
print(" identity_key_hex: 64-character hex string (32 bytes)")
print(" pair_date: Unix timestamp when device was paired")
sys.exit(1)
identity_key_hex = sys.argv[1]
pair_date = int(sys.argv[2])
# Clean and validate identity key
identity_key_hex = ''.join(c for c in identity_key_hex.lower() if c in '0123456789abcdef')
if len(identity_key_hex) != 64:
print(f"Error: Identity key must be 64 hex chars, got {len(identity_key_hex)}")
sys.exit(1)
identity_key = bytes.fromhex(identity_key_hex)
print(f"Identity key: {identity_key_hex[:16]}...{identity_key_hex[-16:]}")
print(f"Pair date: {pair_date}")
print()
# Generate current EID
current_eid = EidGenerator.get_current_eid(identity_key, pair_date)
print(f"Current EID: {current_eid.hex()}")
print()
# Generate EIDs with clock skew tolerance
print("EIDs with clock skew tolerance (±7 windows = ~2 hours):")
for eid, offset in EidGenerator.get_eids_with_offsets(identity_key, pair_date):
drift_secs = offset * ROTATION_PERIOD
marker = " <-- current" if offset == 0 else ""
print(f" offset {offset:+3d} ({drift_secs:+6d}s): {eid.hex()}{marker}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment