Skip to content

Instantly share code, notes, and snippets.

@dzid26
Created February 24, 2026 19:37
Show Gist options
  • Select an option

  • Save dzid26/ee2efbb26c7c39f6e408bcbe76eaef56 to your computer and use it in GitHub Desktop.

Select an option

Save dzid26/ee2efbb26c7c39f6e408bcbe76eaef56 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Analyze Tesla DI_uiSpeed hysteresis (+/- half-width) and bias against DI_vehicleSpeed on route logs.
Features:
- Route-level caching similar to torque_lat_accel.py
- Robust DI_uiSpeedUnits detection:
- DBC DI_uiSpeedUnits decode
- raw bit 32 and raw bit 33
- data-inferred units from DI_uiSpeed vs DI_vehicleSpeed fit
The best source is selected automatically per route.
- Metrics printed in source units, metric (kph), imperial (mph), and m/s.
- "Max" metrics include raw max and p99 (skip-outliers view).
"""
from __future__ import annotations
import argparse
import os
import pickle
import sys
from dataclasses import dataclass
from functools import partial
import numpy as np
from opendbc.can.dbc import DBC
from opendbc.can.parser import get_raw_value
from opendbc.car.common.conversions import Conversions as CV
from opendbc.car.structs import CarParams
from openpilot.tools.lib.file_sources import comma_api_source, comma_car_segments_source, internal_source, openpilotci_source
from openpilot.tools.lib.logreader import LogReader
from openpilot.tools.lib.route import FileName, SegmentRange
CACHE_DIR = os.path.expanduser("~/.cache/openpilot_tesla_ui_speed")
CACHE_VERSION = 8
MAX_TRANSITION_DT_S = 0.5
ROBUST_MAX_PERCENTILE = 99.0
PARTY_BUS = 0
MIN_SPEED_FOR_UNIT_SCORING_KPH = 5.0
ROBUST_AVG_LOW_PERCENTILE = 10.0
ROBUST_AVG_HIGH_PERCENTILE = 90.0
MEDIAN_TRIM_LOW_PERCENTILE = 10.0
MEDIAN_TRIM_HIGH_PERCENTILE = 90.0
MEDIAN_BOOTSTRAP_ITERS = 300
MEDIAN_BOOTSTRAP_MAX_POINTS = 5000
CONSISTENCY_MARGIN_GAIN = 0.005 # unitless gain diff (0.5%)
CONSISTENCY_MARGIN_SPEED_SOURCE = 0.05 # source units (kph or mph)
DEFAULT_ROUTES = [
"2a251bf8a265ff32/000002c9--46bf64b2d2",
"2a251bf8a265ff32/000002cd--f97069273f",
"4b3cbd6038d07ca6/000001ee--f11ccf6421",
]
os.makedirs(CACHE_DIR, exist_ok=True)
_TESLA_PARTY_DBC = DBC("tesla_model3_party")
_DI_SPEED_MSG = _TESLA_PARTY_DBC.name_to_msg["DI_speed"]
_DI_SPEED_ADDR = _DI_SPEED_MSG.address
_SIG_UI_UNITS = _DI_SPEED_MSG.sigs["DI_uiSpeedUnits"]
_SIG_UI_SPEED = _DI_SPEED_MSG.sigs["DI_uiSpeed"]
_SIG_VEH_SPEED = _DI_SPEED_MSG.sigs["DI_vehicleSpeed"]
_DI_VEHICLE_SPEED_SIGNALS = sorted([name for name in _DI_SPEED_MSG.sigs.keys() if name.startswith("DI_vehicleSpeed")])
DEFAULT_VEHICLE_SPEED_SIGNAL = "DI_vehicleSpeed" if "DI_vehicleSpeed" in _DI_SPEED_MSG.sigs else (_DI_VEHICLE_SPEED_SIGNALS[0] if len(_DI_VEHICLE_SPEED_SIGNALS) else "")
UNIT_MPH = 0
UNIT_KPH = 1
Ecu = CarParams.Ecu
USE_COLOR = False
SHOW_METADATA = False
@dataclass
class SegmentSamples:
t: list[float]
ui_speed_raw: list[float]
unit_dbc: list[int]
unit_bit32: list[int]
unit_bit33: list[int]
vehicle_kph: list[float]
gps_lat: list[float]
gps_lon: list[float]
@dataclass
class RouteSamples:
route: str
t: np.ndarray
ui_speed_raw: np.ndarray
ui_units: np.ndarray
vehicle_kph: np.ndarray
vehicle_speed_signal: str
unit_source: str
eps_fw: str
region: str
region_center_lat: float | None
region_center_lon: float | None
@dataclass
class UnitStats:
unit_code: int
sample_count: int
bias_mean_source: float | None
bias_avg_filtered_source: float | None
bias_median_source: float | None
bias_pos_max_source: float | None
bias_pos_max_robust_source: float | None
gain_avg_bias_comp: float | None
gain_avg_filtered_bias_comp: float | None
gain_sample_count: int
hysteresis_mean_source: float | None
hysteresis_avg_filtered_source: float | None
hysteresis_median_source: float | None
hysteresis_max_robust_source: float | None
hysteresis_boundary_count: int
transition_up_count: int
transition_down_count: int
bias_median_ci_low_source: float | None
bias_median_ci_high_source: float | None
bias_median_ci_width_source: float | None
bias_median_mad_source: float | None
bias_median_trimmed_source: float | None
bias_median_trim_shift_source: float | None
bias_median_conf_sample_count: int
hysteresis_median_ci_low_source: float | None
hysteresis_median_ci_high_source: float | None
hysteresis_median_ci_width_source: float | None
hysteresis_median_mad_source: float | None
hysteresis_median_trimmed_source: float | None
hysteresis_median_trim_shift_source: float | None
hysteresis_median_conf_sample_count: int
def unit_name(unit_code: int) -> str:
if unit_code == UNIT_KPH:
return "KPH"
if unit_code == UNIT_MPH:
return "MPH"
return f"UNKNOWN({unit_code})"
def encountered_units_text(units: set[int]) -> str:
labels: list[str] = []
if UNIT_KPH in units:
labels.append("kph")
if UNIT_MPH in units:
labels.append("mph")
for u in sorted(units):
if u not in (UNIT_KPH, UNIT_MPH):
labels.append(unit_name(int(u)).lower())
return " ".join(labels) if labels else "no-units"
def region_from_lat_lon(lat: float, lon: float) -> str:
# Coarse automotive-market regions from lat/lon; intended for route-level grouping only.
if not np.isfinite(lat) or not np.isfinite(lon):
return "UNKNOWN"
if lat < -90.0 or lat > 90.0 or lon < -180.0 or lon > 180.0:
return "UNKNOWN"
if 49.0 <= lat <= 61.5 and -11.5 <= lon <= 3.0:
return "UK/IE"
if 35.0 <= lat <= 72.5 and -25.0 <= lon <= 45.0:
return "EU"
# Southwest BC override (incl. Vancouver Island / Vancouver metro) to avoid US box overlap.
if 48.2 <= lat <= 50.6 and -126.5 <= lon <= -122.8:
return "CA"
# US first (incl. AK/HI/PR) to avoid overlap with CA box.
if (24.0 <= lat <= 49.0 and -125.0 <= lon <= -66.0) or \
(51.0 <= lat <= 72.0 and -170.0 <= lon <= -129.0) or \
(18.0 <= lat <= 23.0 and -161.0 <= lon <= -154.0) or \
(17.0 <= lat <= 19.0 and -68.0 <= lon <= -65.0):
return "US"
# Canada (coarse box).
if 41.5 <= lat <= 83.0 and -141.0 <= lon <= -52.0:
return "CA"
if 15.0 <= lat <= 72.5 and -170.0 <= lon <= -50.0:
return "NA"
if -56.0 <= lat <= 15.0 and -95.0 <= lon <= -30.0:
return "LATAM"
if -48.0 <= lat <= -10.0 and 110.0 <= lon <= 180.0:
return "ANZ"
if 20.0 <= lat <= 51.0 and 122.0 <= lon <= 154.0:
return "JP/KR"
if 18.0 <= lat <= 54.0 and 73.0 <= lon <= 135.0:
return "CN"
if 5.0 <= lat <= 37.0 and 67.0 <= lon <= 97.0:
return "IN"
if -10.0 <= lat <= 37.0 and -20.0 <= lon <= 65.0:
return "MEA"
if -40.0 <= lat <= 25.0 and 95.0 <= lon <= 180.0:
return "APAC"
return "OTHER"
def classify_route_region(gps_lat: np.ndarray, gps_lon: np.ndarray) -> tuple[str, float | None, float | None]:
valid = np.isfinite(gps_lat) & np.isfinite(gps_lon)
if not np.any(valid):
return "UNKNOWN", None, None
lats = gps_lat[valid]
lons = gps_lon[valid]
in_range = (lats >= -90.0) & (lats <= 90.0) & (lons >= -180.0) & (lons <= 180.0)
if not np.any(in_range):
return "UNKNOWN", None, None
lats = lats[in_range]
lons = lons[in_range]
regions = np.asarray([region_from_lat_lon(float(lat), float(lon)) for lat, lon in zip(lats, lons)], dtype=object)
uniq, counts = np.unique(regions, return_counts=True)
top_idx = int(np.argmax(counts))
region = str(uniq[top_idx])
center_lat = float(np.median(lats))
center_lon = float(np.median(lons))
return region, center_lat, center_lon
def resolve_vehicle_speed_signal(signal_name: str) -> str:
if signal_name in _DI_SPEED_MSG.sigs:
return signal_name
hint = ", ".join(_DI_VEHICLE_SPEED_SIGNALS) if len(_DI_VEHICLE_SPEED_SIGNALS) else "<none>"
raise ValueError(f"Unknown DI_speed signal '{signal_name}'. Available DI_vehicleSpeed* signals: {hint}")
def colorize(text: str, *, fg: int | None = None, bold: bool = False, dim: bool = False) -> str:
if not USE_COLOR:
return text
codes: list[str] = []
if bold:
codes.append("1")
if dim:
codes.append("2")
if fg is not None:
codes.append(str(fg))
if not codes:
return text
return f"\033[{';'.join(codes)}m{text}\033[0m"
def gray_meta(text: str) -> str:
return colorize(text, fg=90)
def decode_signal_value(dat: bytes | bytearray, sig) -> float:
raw = get_raw_value(dat, sig)
if sig.is_signed:
raw -= ((raw >> (sig.size - 1)) & 0x1) * (1 << sig.size)
return float(raw) * sig.factor + sig.offset
def extract_raw_bit(dat: bytes | bytearray, bit_index: int) -> int:
byte_index = bit_index // 8
if byte_index >= len(dat):
return 0
return int((dat[byte_index] >> (bit_index % 8)) & 0x1)
def sanitize_units(arr: np.ndarray) -> np.ndarray:
# Convert arbitrary numeric array into strict {0: MPH, 1: KPH}
return np.where(np.asarray(arr, dtype=float) >= 0.5, UNIT_KPH, UNIT_MPH).astype(int)
def infer_units_from_speed_fit(ui_speed_raw: np.ndarray, vehicle_kph: np.ndarray) -> np.ndarray:
# Compare DI_uiSpeed interpreted as KPH vs MPH against DI_vehicleSpeed (always kph).
ui_as_kph_err = np.abs(ui_speed_raw - vehicle_kph)
ui_as_mph_err = np.abs(ui_speed_raw * CV.MPH_TO_KPH - vehicle_kph)
inferred = np.where(ui_as_kph_err <= ui_as_mph_err, UNIT_KPH, UNIT_MPH).astype(int)
# Low-speed samples are ambiguous; assign them to dominant high-speed inferred unit.
high_speed = np.isfinite(vehicle_kph) & (vehicle_kph >= MIN_SPEED_FOR_UNIT_SCORING_KPH)
if np.any(high_speed):
dominant = UNIT_KPH if np.mean(inferred[high_speed] == UNIT_KPH) >= 0.5 else UNIT_MPH
inferred[~high_speed] = dominant
return inferred
def unit_series_fit_score_kph(ui_speed_raw: np.ndarray, vehicle_kph: np.ndarray, units: np.ndarray) -> float:
units_bin = sanitize_units(units)
ui_kph_est = np.where(units_bin == UNIT_KPH, ui_speed_raw, ui_speed_raw * CV.MPH_TO_KPH)
err = np.abs(ui_kph_est - vehicle_kph)
valid = np.isfinite(err) & np.isfinite(vehicle_kph) & (vehicle_kph >= MIN_SPEED_FOR_UNIT_SCORING_KPH)
if np.sum(valid) < 50:
valid = np.isfinite(err)
if np.sum(valid) == 0:
return float("inf")
return float(np.median(err[valid]))
def resolve_route_units(ui_speed_raw: np.ndarray,
vehicle_kph: np.ndarray,
unit_dbc: np.ndarray,
unit_bit32: np.ndarray,
unit_bit33: np.ndarray) -> tuple[np.ndarray, str]:
inferred = infer_units_from_speed_fit(ui_speed_raw, vehicle_kph)
candidates = {
"inferred_fit": sanitize_units(inferred),
"raw_bit33": sanitize_units(unit_bit33),
"raw_bit32": sanitize_units(unit_bit32),
"dbc_signal": sanitize_units(unit_dbc),
}
best_name = "inferred_fit"
best_score = float("inf")
scores: dict[str, float] = {}
for name, units in candidates.items():
score = unit_series_fit_score_kph(ui_speed_raw, vehicle_kph, units)
scores[name] = score
if score < best_score:
best_score = score
best_name = name
selected = candidates[best_name]
selected = stabilize_unit_series(selected)
source_labels = {
"inferred_fit": "inferred_from_speed_fit",
"raw_bit33": "raw_bit33",
"raw_bit32": "raw_bit32",
"dbc_signal": "dbc_di_uiSpeedUnits",
}
source_notes = {
"inferred_fit": "units inferred by best fit of DI_uiSpeed to DI_vehicleSpeed",
"raw_bit33": "units decoded from raw bit 33",
"raw_bit32": "units decoded from raw bit 32",
"dbc_signal": "units decoded from DBC DI_uiSpeedUnits signal",
}
ranked = sorted(((n, s) for n, s in scores.items() if np.isfinite(s)), key=lambda x: x[1])
margin = None
if len(ranked) >= 2:
margin = ranked[1][1] - ranked[0][1]
counts = np.bincount(selected, minlength=2)
total = int(np.sum(counts))
if total > 0 and counts[UNIT_KPH] > 0 and counts[UNIT_MPH] > 0:
kph_pct = 100.0 * float(counts[UNIT_KPH]) / total
mph_pct = 100.0 * float(counts[UNIT_MPH]) / total
resolved_units = f"mixed(kph={kph_pct:.1f}%, mph={mph_pct:.1f}%)"
elif counts[UNIT_MPH] > 0:
resolved_units = "mph"
else:
resolved_units = "kph"
score_table = ", ".join(f"{source_labels.get(n, n)}={s:.3f}" for n, s in ranked)
label = source_labels.get(best_name, best_name)
note = source_notes.get(best_name, "unit source selected by fit")
if margin is not None:
source_text = (
f"{label} ({note}; resolved_units={resolved_units}; fit_err={best_score:.3f} kph, "
f"margin_to_next={margin:.3f} kph; candidate_errs_kph=[{score_table}])"
)
else:
source_text = (
f"{label} ({note}; resolved_units={resolved_units}; fit_err={best_score:.3f} kph; "
f"candidate_errs_kph=[{score_table}])"
)
return selected, source_text
def stabilize_unit_series(units: np.ndarray) -> np.ndarray:
# Keep true mixed-unit logs, but collapse tiny minority blips from ambiguous samples.
units = sanitize_units(units)
if units.size == 0:
return units
counts = np.bincount(units, minlength=2)
dominant = UNIT_KPH if counts[UNIT_KPH] >= counts[UNIT_MPH] else UNIT_MPH
minority_count = int(np.min(counts))
minority_frac = minority_count / float(units.size)
if minority_count <= 20 or minority_frac < 0.005:
return np.full_like(units, dominant)
return units
def scan_segment(lr: LogReader, party_bus: int = PARTY_BUS, vehicle_speed_signal: str = DEFAULT_VEHICLE_SPEED_SIGNAL) -> SegmentSamples:
if vehicle_speed_signal not in _DI_SPEED_MSG.sigs:
raise KeyError(f"Unknown DI_speed signal '{vehicle_speed_signal}'")
veh_speed_sig = _DI_SPEED_MSG.sigs[vehicle_speed_signal]
t: list[float] = []
ui_speed_raw: list[float] = []
unit_dbc: list[int] = []
unit_bit32: list[int] = []
unit_bit33: list[int] = []
vehicle_kph: list[float] = []
gps_lat: list[float] = []
gps_lon: list[float] = []
for msg in lr:
which = msg.which()
if which == "gpsLocation":
g = msg.gpsLocation
has_fix = bool(getattr(g, "hasFix", True))
if has_fix and np.isfinite(g.latitude) and np.isfinite(g.longitude) and abs(g.latitude) <= 90.0 and abs(g.longitude) <= 180.0:
gps_lat.append(float(g.latitude))
gps_lon.append(float(g.longitude))
continue
if which == "gpsLocationExternal":
g = msg.gpsLocationExternal
has_fix = bool(getattr(g, "hasFix", True))
if has_fix and np.isfinite(g.latitude) and np.isfinite(g.longitude) and abs(g.latitude) <= 90.0 and abs(g.longitude) <= 180.0:
gps_lat.append(float(g.latitude))
gps_lon.append(float(g.longitude))
continue
if which != "can":
continue
ts = msg.logMonoTime * 1e-9
for can in msg.can:
if can.src != party_bus or can.address != _DI_SPEED_ADDR or len(can.dat) < 8:
continue
ui_speed = decode_signal_value(can.dat, _SIG_UI_SPEED)
# Some DI_vehicleSpeed variants use all-ones raw as SNA (e.g. 13-bit -> 8191 -> 615.28 kph).
veh_raw = get_raw_value(can.dat, veh_speed_sig)
if veh_raw == ((1 << veh_speed_sig.size) - 1):
continue
veh_speed_kph = decode_signal_value(can.dat, veh_speed_sig)
units_dbc = int(round(decode_signal_value(can.dat, _SIG_UI_UNITS)))
bit32 = extract_raw_bit(can.dat, 32)
bit33 = extract_raw_bit(can.dat, 33)
# Ignore explicit SNA/out-of-range points.
if not np.isfinite(ui_speed) or not np.isfinite(veh_speed_kph):
continue
if ui_speed >= 255 or ui_speed < 0:
continue
if veh_speed_kph < -5.0 or veh_speed_kph > 350.0:
continue
t.append(ts)
ui_speed_raw.append(ui_speed)
unit_dbc.append(units_dbc)
unit_bit32.append(bit32)
unit_bit33.append(bit33)
vehicle_kph.append(veh_speed_kph)
return SegmentSamples(t, ui_speed_raw, unit_dbc, unit_bit32, unit_bit33, vehicle_kph, gps_lat, gps_lon)
def scan_segment_list(lr: LogReader,
party_bus: int = PARTY_BUS,
vehicle_speed_signal: str = DEFAULT_VEHICLE_SPEED_SIGNAL) -> list[SegmentSamples]:
return [scan_segment(lr, party_bus=party_bus, vehicle_speed_signal=vehicle_speed_signal)]
def normalize_route(route: str) -> str:
return route.replace("%7C", "|").replace("%7c", "|")
def resolve_available_rlogs(route: str) -> tuple[list[str], int, int]:
sr = SegmentRange(route)
requested_seg_idxs = list(sr.seg_idxs)
remaining_seg_idxs = list(requested_seg_idxs)
found_files: dict[int, str] = {}
sources = [internal_source, comma_api_source, openpilotci_source, comma_car_segments_source]
for source in sources:
if len(remaining_seg_idxs) == 0:
break
try:
files = source(sr, remaining_seg_idxs, FileName.RLOG)
found_files.update(files)
remaining_seg_idxs = [idx for idx in remaining_seg_idxs if idx not in found_files]
except Exception:
continue
ordered_files = [found_files[idx] for idx in sorted(found_files.keys())]
return ordered_files, len(remaining_seg_idxs), len(requested_seg_idxs)
def _decode_fw_version_text(fw_version: bytes | bytearray | str) -> str:
if isinstance(fw_version, (bytes, bytearray)):
return bytes(fw_version).decode("utf-8", errors="replace").strip()
return str(fw_version).strip()
def get_eps_fw_from_logreader(lr: LogReader) -> str:
cp = lr.first("carParams")
if cp is None:
return "n/a"
eps_all: list[str] = []
eps_non_logging: list[str] = []
for fw in cp.carFw:
if fw.ecu != Ecu.eps:
continue
fw_text = _decode_fw_version_text(fw.fwVersion)
if len(fw_text) == 0:
continue
eps_all.append(fw_text)
if not fw.logging:
eps_non_logging.append(fw_text)
selected = eps_non_logging if len(eps_non_logging) else eps_all
if len(selected) == 0:
return "n/a"
return " | ".join(sorted(set(selected)))
def route_cache_filename(route: str, vehicle_speed_signal: str = DEFAULT_VEHICLE_SPEED_SIGNAL) -> str:
safe = route.replace("/", "_").replace("|", "_")
sig_safe = vehicle_speed_signal.replace("/", "_")
return os.path.join(CACHE_DIR, f"v{CACHE_VERSION}_{safe}_{sig_safe}.pkl")
def load_or_scan_route(route: str,
workers: int,
no_cache: bool,
vehicle_speed_signal: str = DEFAULT_VEHICLE_SPEED_SIGNAL) -> RouteSamples:
cache_file = route_cache_filename(route, vehicle_speed_signal)
if not no_cache and os.path.exists(cache_file):
with open(cache_file, "rb") as f:
cached = pickle.load(f)
if isinstance(cached, dict) and cached.get("version") == CACHE_VERSION:
data = cached["data"]
return RouteSamples(
route=route,
t=np.asarray(data["t"], dtype=float),
ui_speed_raw=np.asarray(data["ui_speed_raw"], dtype=float),
ui_units=np.asarray(data["ui_units"], dtype=int),
vehicle_kph=np.asarray(data["vehicle_kph"], dtype=float),
vehicle_speed_signal=str(data.get("vehicle_speed_signal", vehicle_speed_signal)),
unit_source=str(data.get("unit_source", "unknown")),
eps_fw=str(data.get("eps_fw", "n/a")),
region=str(data.get("region", "UNKNOWN")),
region_center_lat=float(data["region_center_lat"]) if data.get("region_center_lat") is not None else None,
region_center_lon=float(data["region_center_lon"]) if data.get("region_center_lon") is not None else None,
)
rlog_files, missing_count, total_count = resolve_available_rlogs(route)
if len(rlog_files) == 0:
raise FileNotFoundError(f"No rlogs found for {route}")
if missing_count > 0:
print(colorize(f"Using partial rlogs for {route}: found {len(rlog_files)}/{total_count} segments", fg=33))
lr = LogReader(rlog_files, sort_by_time=True)
eps_fw = get_eps_fw_from_logreader(lr)
segment_results = lr.run_across_segments(
workers,
partial(scan_segment_list, party_bus=PARTY_BUS, vehicle_speed_signal=vehicle_speed_signal),
disable_tqdm=True,
)
t: list[float] = []
ui_speed_raw: list[float] = []
unit_dbc: list[int] = []
unit_bit32: list[int] = []
unit_bit33: list[int] = []
vehicle_kph: list[float] = []
gps_lat: list[float] = []
gps_lon: list[float] = []
for seg in segment_results:
if seg is None:
continue
t.extend(seg.t)
ui_speed_raw.extend(seg.ui_speed_raw)
unit_dbc.extend(seg.unit_dbc)
unit_bit32.extend(seg.unit_bit32)
unit_bit33.extend(seg.unit_bit33)
vehicle_kph.extend(seg.vehicle_kph)
gps_lat.extend(seg.gps_lat)
gps_lon.extend(seg.gps_lon)
region, center_lat, center_lon = classify_route_region(np.asarray(gps_lat, dtype=float), np.asarray(gps_lon, dtype=float))
if len(t) == 0:
rs = RouteSamples(
route=route,
t=np.array([], dtype=float),
ui_speed_raw=np.array([], dtype=float),
ui_units=np.array([], dtype=int),
vehicle_kph=np.array([], dtype=float),
vehicle_speed_signal=vehicle_speed_signal,
unit_source="none",
eps_fw=eps_fw,
region=region,
region_center_lat=center_lat,
region_center_lon=center_lon,
)
else:
order = np.argsort(np.asarray(t, dtype=float))
t_np = np.asarray(t, dtype=float)[order]
ui_np = np.asarray(ui_speed_raw, dtype=float)[order]
dbc_np = np.asarray(unit_dbc, dtype=int)[order]
b32_np = np.asarray(unit_bit32, dtype=int)[order]
b33_np = np.asarray(unit_bit33, dtype=int)[order]
veh_np = np.asarray(vehicle_kph, dtype=float)[order]
units_np, unit_source = resolve_route_units(ui_np, veh_np, dbc_np, b32_np, b33_np)
rs = RouteSamples(
route=route,
t=t_np,
ui_speed_raw=ui_np,
ui_units=units_np,
vehicle_kph=veh_np,
vehicle_speed_signal=vehicle_speed_signal,
unit_source=unit_source,
eps_fw=eps_fw,
region=region,
region_center_lat=center_lat,
region_center_lon=center_lon,
)
if not no_cache:
with open(cache_file, "wb") as f:
pickle.dump(
{
"version": CACHE_VERSION,
"route": route,
"data": {
"t": rs.t,
"ui_speed_raw": rs.ui_speed_raw,
"ui_units": rs.ui_units,
"vehicle_kph": rs.vehicle_kph,
"vehicle_speed_signal": rs.vehicle_speed_signal,
"unit_source": rs.unit_source,
"eps_fw": rs.eps_fw,
"region": rs.region,
"region_center_lat": rs.region_center_lat,
"region_center_lon": rs.region_center_lon,
},
},
f,
)
return rs
def robust_percentile(values: np.ndarray, percentile: float) -> float | None:
if values.size == 0:
return None
finite = values[np.isfinite(values)]
if finite.size == 0:
return None
if finite.size < 25:
if percentile >= 50:
return float(np.max(finite))
return float(np.min(finite))
return float(np.percentile(finite, percentile))
def robust_trimmed_mean(values: np.ndarray,
low_percentile: float = ROBUST_AVG_LOW_PERCENTILE,
high_percentile: float = ROBUST_AVG_HIGH_PERCENTILE) -> float | None:
if values.size == 0:
return None
finite = values[np.isfinite(values)]
if finite.size == 0:
return None
if finite.size < 25:
return float(np.mean(finite))
lo = float(np.percentile(finite, low_percentile))
hi = float(np.percentile(finite, high_percentile))
kept = finite[(finite >= lo) & (finite <= hi)]
if kept.size == 0:
return float(np.mean(finite))
return float(np.mean(kept))
def median_confidence(values: np.ndarray) -> tuple[float | None, float | None, float | None, float | None, float | None, float | None, int]:
finite = values[np.isfinite(values)]
if finite.size == 0:
return None, None, None, None, None, None, 0
median = float(np.median(finite))
mad = float(np.median(np.abs(finite - median)))
lo = float(np.percentile(finite, MEDIAN_TRIM_LOW_PERCENTILE))
hi = float(np.percentile(finite, MEDIAN_TRIM_HIGH_PERCENTILE))
kept = finite[(finite >= lo) & (finite <= hi)]
trim_median = float(np.median(kept)) if kept.size else median
trim_shift = abs(trim_median - median)
boot_arr = finite
if boot_arr.size > MEDIAN_BOOTSTRAP_MAX_POINTS:
# Use deterministic downsampling to keep runtime predictable on long routes.
rng_ds = np.random.default_rng(0)
idx = rng_ds.choice(boot_arr.size, size=MEDIAN_BOOTSTRAP_MAX_POINTS, replace=False)
boot_arr = boot_arr[idx]
ci_low = median
ci_high = median
if boot_arr.size >= 3:
rng_bs = np.random.default_rng(1)
sample_idx = rng_bs.integers(0, boot_arr.size, size=(MEDIAN_BOOTSTRAP_ITERS, boot_arr.size))
boot_medians = np.median(boot_arr[sample_idx], axis=1)
ci_low = float(np.percentile(boot_medians, 2.5))
ci_high = float(np.percentile(boot_medians, 97.5))
return ci_low, ci_high, float(ci_high - ci_low), mad, trim_median, float(trim_shift), int(finite.size)
def convert_source_to_all_units(value_source: float | None, source_unit: int) -> tuple[float | None, float | None, float | None, float | None]:
if value_source is None:
return None, None, None, None
if source_unit == UNIT_KPH:
value_kph = value_source
value_mph = value_source * CV.KPH_TO_MPH
value_ms = value_source * CV.KPH_TO_MS
return value_source, value_kph, value_mph, value_ms
if source_unit == UNIT_MPH:
value_mph = value_source
value_kph = value_source * CV.MPH_TO_KPH
value_ms = value_source * CV.MPH_TO_MS
return value_source, value_kph, value_mph, value_ms
value_kph = value_source
value_mph = value_source * CV.KPH_TO_MPH
value_ms = value_source * CV.KPH_TO_MS
return value_source, value_kph, value_mph, value_ms
def fmt(v: float | None, digits: int = 3) -> str:
if v is None:
return "n/a"
return f"{v:.{digits}f}"
def fmt_gain_percent(gain: float | None, digits: int = 2) -> str:
if gain is None:
return "n/a"
return f"{gain * 100.0:.{digits}f}%"
def format_all_units(label: str, value_source: float | None, source_unit: int) -> str:
src, kph, mph, ms = convert_source_to_all_units(value_source, source_unit)
return (
f"{label}: source({unit_name(source_unit)})={fmt(src)}, "
f"metric={fmt(kph)} kph, imperial={fmt(mph)} mph, m/s={fmt(ms)}"
)
def format_unit_values(value_source: float | None, source_unit: int) -> str:
src, kph, mph, ms = convert_source_to_all_units(value_source, source_unit)
return f"src={fmt(src)} {unit_name(source_unit)} | {fmt(kph)} kph | {fmt(mph)} mph | {fmt(ms)} m/s"
def source_to_kph(value_source: float | None, source_unit: int) -> float | None:
if value_source is None:
return None
if source_unit == UNIT_MPH:
return float(value_source * CV.MPH_TO_KPH)
return float(value_source)
def format_kph(value_source: float | None, source_unit: int, digits: int = 3) -> str:
return f"{fmt(source_to_kph(value_source, source_unit), digits)} kph"
def sum_if_both(a: float | None, b: float | None) -> float | None:
if a is None or b is None:
return None
if not np.isfinite(a) or not np.isfinite(b):
return None
return float(a + b)
def print_metric_line(name: str, value_source: float | None, source_unit: int, indent: str = " ") -> None:
print(f"{indent}{colorize(f'{name:<14}', fg=90)} {format_unit_values(value_source, source_unit)}")
def print_median_confidence_lines(ci_low: float | None,
ci_high: float | None,
ci_width: float | None,
mad: float | None,
trim_shift: float | None,
sample_count: int,
source_unit: int,
indent: str = " ") -> None:
print(f"{indent}{colorize('median confidence', fg=90)}")
print(f"{indent}{colorize('conf n'.ljust(14), fg=90)} {sample_count}")
print_metric_line("ci95 low", ci_low, source_unit, indent=indent)
print_metric_line("ci95 high", ci_high, source_unit, indent=indent)
print_metric_line("ci95 width", ci_width, source_unit, indent=indent)
print_metric_line("mad", mad, source_unit, indent=indent)
print_metric_line("trim shift", trim_shift, source_unit, indent=indent)
def print_median_confidence_legend() -> None:
print(colorize("Median Confidence Legend", fg=34, bold=True))
print(f" {colorize('applies to:', fg=90)} normal median (full valid sample set)")
print(f" {colorize('ci95 low/high:', fg=90)} 95% bootstrap interval of the median")
print(f" {colorize('ci95 width:', fg=90)} ci95_high - ci95_low (smaller means more stable median)")
print(f" {colorize('mad:', fg=90)} median absolute deviation around the median")
print(f" {colorize('trim shift:', fg=90)} abs(median(10-90% trimmed) - median(full)); near 0 means robust")
print()
def compute_unit_stats(t: np.ndarray,
ui_speed_raw: np.ndarray,
vehicle_kph: np.ndarray,
unit_code: int,
compute_median_confidence: bool = False) -> UnitStats:
if ui_speed_raw.size == 0:
return UnitStats(
unit_code=unit_code,
sample_count=0,
bias_mean_source=None,
bias_avg_filtered_source=None,
bias_median_source=None,
bias_pos_max_source=None,
bias_pos_max_robust_source=None,
gain_avg_bias_comp=None,
gain_avg_filtered_bias_comp=None,
gain_sample_count=0,
hysteresis_mean_source=None,
hysteresis_avg_filtered_source=None,
hysteresis_median_source=None,
hysteresis_max_robust_source=None,
hysteresis_boundary_count=0,
transition_up_count=0,
transition_down_count=0,
bias_median_ci_low_source=None,
bias_median_ci_high_source=None,
bias_median_ci_width_source=None,
bias_median_mad_source=None,
bias_median_trimmed_source=None,
bias_median_trim_shift_source=None,
bias_median_conf_sample_count=0,
hysteresis_median_ci_low_source=None,
hysteresis_median_ci_high_source=None,
hysteresis_median_ci_width_source=None,
hysteresis_median_mad_source=None,
hysteresis_median_trimmed_source=None,
hysteresis_median_trim_shift_source=None,
hysteresis_median_conf_sample_count=0,
)
if unit_code == UNIT_KPH:
vehicle_source = vehicle_kph
else:
vehicle_source = vehicle_kph * CV.KPH_TO_MPH
speed_valid = np.isfinite(vehicle_source) & (vehicle_source > 0.0)
if not np.any(speed_valid):
return UnitStats(
unit_code=unit_code,
sample_count=0,
bias_mean_source=None,
bias_avg_filtered_source=None,
bias_median_source=None,
bias_pos_max_source=None,
bias_pos_max_robust_source=None,
gain_avg_bias_comp=None,
gain_avg_filtered_bias_comp=None,
gain_sample_count=0,
hysteresis_mean_source=None,
hysteresis_avg_filtered_source=None,
hysteresis_median_source=None,
hysteresis_max_robust_source=None,
hysteresis_boundary_count=0,
transition_up_count=0,
transition_down_count=0,
bias_median_ci_low_source=None,
bias_median_ci_high_source=None,
bias_median_ci_width_source=None,
bias_median_mad_source=None,
bias_median_trimmed_source=None,
bias_median_trim_shift_source=None,
bias_median_conf_sample_count=0,
hysteresis_median_ci_low_source=None,
hysteresis_median_ci_high_source=None,
hysteresis_median_ci_width_source=None,
hysteresis_median_mad_source=None,
hysteresis_median_trimmed_source=None,
hysteresis_median_trim_shift_source=None,
hysteresis_median_conf_sample_count=0,
)
t = t[speed_valid]
ui_speed_raw = ui_speed_raw[speed_valid]
vehicle_source = vehicle_source[speed_valid]
bias_source = ui_speed_raw - vehicle_source
finite_bias = bias_source[np.isfinite(bias_source)]
bias_mean = float(np.mean(finite_bias)) if finite_bias.size else None
bias_avg_filtered = robust_trimmed_mean(finite_bias)
bias_median = float(np.median(finite_bias)) if finite_bias.size else None
bias_pos_max = float(np.max(finite_bias)) if finite_bias.size else None
bias_pos_max_robust = robust_percentile(finite_bias, ROBUST_MAX_PERCENTILE)
gain_avg_bias_comp = None
gain_avg_filtered_bias_comp = None
gain_sample_count = 0
if bias_median is not None:
gain_min_speed_source = MIN_SPEED_FOR_UNIT_SCORING_KPH if unit_code == UNIT_KPH else (MIN_SPEED_FOR_UNIT_SCORING_KPH * CV.KPH_TO_MPH)
gain_valid = np.isfinite(ui_speed_raw) & np.isfinite(vehicle_source) & (vehicle_source >= gain_min_speed_source)
if np.any(gain_valid):
x = vehicle_source[gain_valid]
y = ui_speed_raw[gain_valid] - bias_median
denom = float(np.dot(x, x))
if denom > 1e-6:
gain_avg_bias_comp = float(np.dot(x, y) / denom)
ratio_valid = np.isfinite(x) & np.isfinite(y) & (np.abs(x) > 1e-3)
if np.any(ratio_valid):
gain_ratios = y[ratio_valid] / x[ratio_valid]
gain_avg_filtered_bias_comp = robust_trimmed_mean(gain_ratios)
gain_sample_count = int(np.sum(gain_valid))
transition_up_count = 0
transition_down_count = 0
widths: list[float] = []
if ui_speed_raw.size >= 2:
dt = np.diff(t)
prev_ui = ui_speed_raw[:-1]
curr_ui = ui_speed_raw[1:]
step = curr_ui - prev_ui
veh_next = vehicle_source[1:]
valid = np.isfinite(dt) & np.isfinite(step) & np.isfinite(veh_next)
valid &= (dt > 0.0) & (dt <= MAX_TRANSITION_DT_S)
valid &= (np.abs(step) == 1.0)
up_by_boundary: dict[int, list[float]] = {}
down_by_boundary: dict[int, list[float]] = {}
idxs = np.where(valid)[0]
for i in idxs:
s = step[i]
if s > 0:
boundary = int(round(prev_ui[i]))
up_by_boundary.setdefault(boundary, []).append(float(veh_next[i]))
elif s < 0:
boundary = int(round(curr_ui[i]))
down_by_boundary.setdefault(boundary, []).append(float(veh_next[i]))
transition_up_count = int(sum(len(v) for v in up_by_boundary.values()))
transition_down_count = int(sum(len(v) for v in down_by_boundary.values()))
for boundary in sorted(set(up_by_boundary.keys()) & set(down_by_boundary.keys())):
up_speeds = np.asarray(up_by_boundary[boundary], dtype=float)
down_speeds = np.asarray(down_by_boundary[boundary], dtype=float)
if up_speeds.size == 0 or down_speeds.size == 0:
continue
widths.append(float(np.median(up_speeds) - np.median(down_speeds)))
widths_np = np.asarray(widths, dtype=float)
hyst_pm_np = widths_np * 0.5
hysteresis_mean = float(np.mean(hyst_pm_np)) if hyst_pm_np.size else None
hysteresis_avg_filtered = robust_trimmed_mean(hyst_pm_np)
hysteresis_median = float(np.median(hyst_pm_np)) if hyst_pm_np.size else None
hysteresis_max_robust = robust_percentile(hyst_pm_np, ROBUST_MAX_PERCENTILE)
bias_ci_low = bias_ci_high = bias_ci_width = bias_mad = bias_trimmed = bias_trim_shift = None
bias_conf_n = 0
hyst_ci_low = hyst_ci_high = hyst_ci_width = hyst_mad = hyst_trimmed = hyst_trim_shift = None
hyst_conf_n = 0
if compute_median_confidence:
(bias_ci_low, bias_ci_high, bias_ci_width,
bias_mad, bias_trimmed, bias_trim_shift, bias_conf_n) = median_confidence(finite_bias)
(hyst_ci_low, hyst_ci_high, hyst_ci_width,
hyst_mad, hyst_trimmed, hyst_trim_shift, hyst_conf_n) = median_confidence(hyst_pm_np)
return UnitStats(
unit_code=unit_code,
sample_count=int(ui_speed_raw.size),
bias_mean_source=bias_mean,
bias_avg_filtered_source=bias_avg_filtered,
bias_median_source=bias_median,
bias_pos_max_source=bias_pos_max,
bias_pos_max_robust_source=bias_pos_max_robust,
gain_avg_bias_comp=gain_avg_bias_comp,
gain_avg_filtered_bias_comp=gain_avg_filtered_bias_comp,
gain_sample_count=gain_sample_count,
hysteresis_mean_source=hysteresis_mean,
hysteresis_avg_filtered_source=hysteresis_avg_filtered,
hysteresis_median_source=hysteresis_median,
hysteresis_max_robust_source=hysteresis_max_robust,
hysteresis_boundary_count=int(hyst_pm_np.size),
transition_up_count=transition_up_count,
transition_down_count=transition_down_count,
bias_median_ci_low_source=bias_ci_low,
bias_median_ci_high_source=bias_ci_high,
bias_median_ci_width_source=bias_ci_width,
bias_median_mad_source=bias_mad,
bias_median_trimmed_source=bias_trimmed,
bias_median_trim_shift_source=bias_trim_shift,
bias_median_conf_sample_count=bias_conf_n,
hysteresis_median_ci_low_source=hyst_ci_low,
hysteresis_median_ci_high_source=hyst_ci_high,
hysteresis_median_ci_width_source=hyst_ci_width,
hysteresis_median_mad_source=hyst_mad,
hysteresis_median_trimmed_source=hyst_trimmed,
hysteresis_median_trim_shift_source=hyst_trim_shift,
hysteresis_median_conf_sample_count=hyst_conf_n,
)
def print_unit_summary(stats: UnitStats) -> None:
unit = stats.unit_code
print(f" {colorize(f'[{unit_name(unit)}]', fg=32, bold=True)} samples={stats.sample_count}")
print(f" {colorize('Gain (bias-compensated)', bold=True)}")
avg_label = colorize("avg".ljust(14), fg=90)
avg_filtered_label = colorize("avg_filtered".ljust(14), fg=90)
print(f" {avg_label} {fmt(stats.gain_avg_bias_comp, 4)} (unitless, n={stats.gain_sample_count})")
print(f" {avg_filtered_label} {fmt(stats.gain_avg_filtered_bias_comp, 4)} (trimmed, n={stats.gain_sample_count})")
print(f" {colorize('Bias (DI_uiSpeed - DI_vehicleSpeed)', bold=True)}")
print_metric_line("mean", stats.bias_mean_source, unit)
print_metric_line("avg_filtered", stats.bias_avg_filtered_source, unit)
print_metric_line("median", stats.bias_median_source, unit)
print_metric_line("median 10-90", stats.bias_median_trimmed_source, unit)
if stats.bias_median_conf_sample_count > 0:
print()
print_median_confidence_lines(
stats.bias_median_ci_low_source,
stats.bias_median_ci_high_source,
stats.bias_median_ci_width_source,
stats.bias_median_mad_source,
stats.bias_median_trim_shift_source,
stats.bias_median_conf_sample_count,
unit,
)
print(f" {colorize('Hysteresis +/- (0.5 * (up-threshold - down-threshold))', bold=True)}")
levels_label = colorize("levels".ljust(14), fg=90)
transitions_label = colorize("transitions".ljust(14), fg=90)
print(f" {levels_label} {stats.hysteresis_boundary_count}")
print(f" {transitions_label} up={stats.transition_up_count} down={stats.transition_down_count}")
print_metric_line("mean", stats.hysteresis_mean_source, unit)
print_metric_line("avg_filtered", stats.hysteresis_avg_filtered_source, unit)
print_metric_line(f"p{ROBUST_MAX_PERCENTILE:g} max", stats.hysteresis_max_robust_source, unit)
print_metric_line("median", stats.hysteresis_median_source, unit)
print_metric_line("median 10-90", stats.hysteresis_median_trimmed_source, unit)
if stats.hysteresis_median_conf_sample_count > 0:
print()
print_median_confidence_lines(
stats.hysteresis_median_ci_low_source,
stats.hysteresis_median_ci_high_source,
stats.hysteresis_median_ci_width_source,
stats.hysteresis_median_mad_source,
stats.hysteresis_median_trim_shift_source,
stats.hysteresis_median_conf_sample_count,
unit,
)
def print_route_stats(samples: RouteSamples) -> None:
route = samples.route
route_with_fw = f"{route} [{samples.eps_fw}]"
if samples.t.size == 0:
print("\n" + colorize("=" * 96, fg=36, bold=True))
print(colorize(f"ROUTE: {route_with_fw}", fg=36, bold=True))
print(colorize("no DI_speed samples found", fg=31, bold=True))
return
unit_vals, unit_counts = np.unique(samples.ui_units, return_counts=True)
counts = ", ".join(f"{unit_name(int(u))}={int(c)}" for u, c in zip(unit_vals, unit_counts))
print("\n" + colorize("=" * 96, fg=36, bold=True))
print(colorize(f"ROUTE: {route_with_fw}", fg=36, bold=True))
print(f"{colorize('vehicle_speed_signal:', fg=90)} {samples.vehicle_speed_signal}")
print(f"{colorize('unit_source:', fg=90)} {samples.unit_source}")
region_text = samples.region
if samples.region_center_lat is not None and samples.region_center_lon is not None:
region_text += f" ({samples.region_center_lat:.3f}, {samples.region_center_lon:.3f})"
print(f"{colorize('region:', fg=90)} {region_text}")
print(f"{colorize('unit_counts:', fg=90)} {counts}")
for u in unit_vals:
unit = int(u)
mask = samples.ui_units == unit
stats = compute_unit_stats(
samples.t[mask],
samples.ui_speed_raw[mask],
samples.vehicle_kph[mask],
unit,
compute_median_confidence=True,
)
print_unit_summary(stats)
def finite_array(values: list[float | None]) -> np.ndarray:
vals = np.asarray([v for v in values if v is not None and np.isfinite(v)], dtype=float)
return vals
def summarize_values(values: list[float | None]) -> tuple[float | None, float | None, float | None, float | None, float | None, int]:
arr = finite_array(values)
if arr.size == 0:
return None, None, None, None, None, 0
mean = float(np.mean(arr))
std = float(np.std(arr))
vmin = float(np.min(arr))
vmax = float(np.max(arr))
span = float(vmax - vmin)
return mean, std, vmin, vmax, span, int(arr.size)
def route_metric_stats_by_unit(per_route_samples: list[RouteSamples]) -> dict[int, list[tuple[str, UnitStats]]]:
unit_stats: dict[int, list[tuple[str, UnitStats]]] = {}
for rs in per_route_samples:
if rs.t.size == 0:
continue
for u in np.unique(rs.ui_units):
unit = int(u)
mask = rs.ui_units == unit
stats = compute_unit_stats(rs.t[mask], rs.ui_speed_raw[mask], rs.vehicle_kph[mask], unit)
if stats.sample_count == 0:
continue
unit_stats.setdefault(unit, []).append((rs.route, stats))
return unit_stats
def route_metric_stats_by_unit_and_fw(per_route_samples: list[RouteSamples]) -> dict[int, dict[str, list[tuple[str, UnitStats]]]]:
by_unit_fw: dict[int, dict[str, list[tuple[str, UnitStats]]]] = {}
for rs in per_route_samples:
if rs.t.size == 0:
continue
fw = rs.eps_fw if len(rs.eps_fw) else "n/a"
for u in np.unique(rs.ui_units):
unit = int(u)
mask = rs.ui_units == unit
stats = compute_unit_stats(rs.t[mask], rs.ui_speed_raw[mask], rs.vehicle_kph[mask], unit)
if stats.sample_count == 0:
continue
by_unit_fw.setdefault(unit, {}).setdefault(fw, []).append((rs.route, stats))
return by_unit_fw
def route_metric_stats_by_unit_and_region(per_route_samples: list[RouteSamples]) -> dict[int, dict[str, list[tuple[str, UnitStats]]]]:
by_unit_region: dict[int, dict[str, list[tuple[str, UnitStats]]]] = {}
for rs in per_route_samples:
if rs.t.size == 0:
continue
region_group = rs.region
for u in np.unique(rs.ui_units):
unit = int(u)
mask = rs.ui_units == unit
stats = compute_unit_stats(rs.t[mask], rs.ui_speed_raw[mask], rs.vehicle_kph[mask], unit)
if stats.sample_count == 0:
continue
by_unit_region.setdefault(unit, {}).setdefault(region_group, []).append((rs.route, stats))
return by_unit_region
def firmware_metric_lists_from_rows(rows: list[tuple[str, UnitStats]],
unit: int,
route_fw: dict[str, str]) -> tuple[list[float | None], list[float | None], list[float | None], list[float | None], list[str]]:
by_fw: dict[str, dict[str, list[float | None]]] = {}
for route, stats in rows:
fw = route_fw.get(route, "n/a")
bucket = by_fw.setdefault(fw, {"gain": [], "bias": [], "hyst": [], "disp": []})
bias_kph = source_to_kph(stats.bias_median_source, unit)
hyst_kph = source_to_kph(stats.hysteresis_median_source, unit)
bucket["gain"].append(stats.gain_avg_bias_comp)
bucket["bias"].append(bias_kph)
bucket["hyst"].append(hyst_kph)
bucket["disp"].append(sum_if_both(bias_kph, hyst_kph))
fw_ids = sorted(by_fw.keys())
gain_vals: list[float | None] = []
bias_vals: list[float | None] = []
hyst_vals: list[float | None] = []
disp_vals: list[float | None] = []
for fw in fw_ids:
g = finite_array(by_fw[fw]["gain"])
b = finite_array(by_fw[fw]["bias"])
h = finite_array(by_fw[fw]["hyst"])
d = finite_array(by_fw[fw]["disp"])
gain_vals.append(float(np.median(g)) if g.size else None)
bias_vals.append(float(np.median(b)) if b.size else None)
hyst_vals.append(float(np.median(h)) if h.size else None)
disp_vals.append(float(np.median(d)) if d.size else None)
return gain_vals, bias_vals, hyst_vals, disp_vals, fw_ids
def user_id_from_route(route: str) -> str:
normalized = normalize_route(route)
return normalized.split("/", 1)[0]
def route_context_text(route: str,
route_meta: dict[str, RouteSamples] | None = None,
*,
region: str | None = None,
fw: str | None = None,
include_region: bool = True) -> str:
if route_meta is not None:
rs = route_meta.get(route)
if rs is not None:
if region is None:
region = rs.region
if fw is None:
fw = rs.eps_fw
if not SHOW_METADATA:
return route
parts = [route]
if include_region:
parts.append(f"region={region if region is not None else 'UNKNOWN'}")
parts.append(f"user={user_id_from_route(route)}")
parts.append(f"fw={fw if fw is not None else 'n/a'}")
return ", ".join(parts)
def print_consistency_summary(per_route_samples: list[RouteSamples]) -> None:
by_unit = route_metric_stats_by_unit(per_route_samples)
route_eps = {rs.route: rs.eps_fw for rs in per_route_samples}
route_region = {rs.route: rs.region for rs in per_route_samples}
print("\n" + colorize("Compare Routes (kph-normalized)", fg=35, bold=True))
if len(by_unit) == 0:
print(f" {colorize('no unit-grouped route data available', fg=31)}")
return
for unit in sorted(by_unit.keys()):
rows = by_unit[unit]
unit_routes = [route for route, _ in rows]
unit_firmwares = len({route_eps.get(route, "n/a") for route in unit_routes})
unit_regions = len({route_region.get(route, "UNKNOWN") for route in unit_routes})
unit_users = len({user_id_from_route(route) for route in unit_routes})
if SHOW_METADATA:
print(
f" {colorize(f'[{unit_name(unit)}]', fg=32, bold=True)} "
f"routes_with_data={len(rows)}, firmwares={unit_firmwares}, regions={unit_regions}, users={unit_users}"
)
else:
print(
f" {colorize(f'[{unit_name(unit)}]', fg=32, bold=True)} "
f"routes_with_data={len(rows)}"
)
if len(rows) < 2:
print(f" {colorize('not enough routes for consistency comparison', fg=33)}")
else:
bias_medians = [source_to_kph(s.bias_median_source, unit) for _, s in rows]
gain_avgs = [s.gain_avg_bias_comp for _, s in rows]
hyst_medians = [source_to_kph(s.hysteresis_median_source, unit) for _, s in rows]
disp_comp_medians = [sum_if_both(source_to_kph(s.bias_median_source, unit), source_to_kph(s.hysteresis_median_source, unit))
for _, s in rows]
_, _, _, _, bias_span, bias_n = summarize_values(bias_medians)
_, _, _, _, gain_span, gain_n = summarize_values(gain_avgs)
_, _, _, _, hyst_span, hyst_n = summarize_values(hyst_medians)
_, _, _, _, disp_comp_span, disp_comp_n = summarize_values(disp_comp_medians)
print(f" gain(avg) diff {gray_meta(f'(n={gain_n})')}: {fmt_gain_percent(gain_span, 3)}")
print(f" bias_median span {gray_meta(f'(n={bias_n})')}: {fmt(bias_span)} kph")
print(f" hysteresis(+/-)_median span {gray_meta(f'(n={hyst_n})')}: {fmt(hyst_span)} kph")
print(f" display_comp span {gray_meta(f'(n={disp_comp_n})')}: {fmt(disp_comp_span)} kph")
print(f" {colorize('route values:', fg=36, bold=True)}")
for route, stats in rows:
eps_fw = route_eps.get(route, "n/a")
region = route_region.get(route, "UNKNOWN")
route_label = colorize(route, fg=90)
fw_label = colorize(f"[{eps_fw}, {region}]", bold=True)
print(
f" {route_label} {fw_label}:"
)
print(
f" "
f"gain_avg={fmt(stats.gain_avg_bias_comp, 4)}, "
f"bias_median={format_kph(stats.bias_median_source, unit)}, "
f"hysteresis(+/-)_median={format_kph(stats.hysteresis_median_source, unit)}, "
f"display_comp={fmt(sum_if_both(source_to_kph(stats.bias_median_source, unit), source_to_kph(stats.hysteresis_median_source, unit)))} kph"
)
def print_firmware_consistency_summary(per_route_samples: list[RouteSamples]) -> None:
by_unit_fw = route_metric_stats_by_unit_and_fw(per_route_samples)
route_region = {rs.route: rs.region for rs in per_route_samples}
print("\n" + colorize("Consistency Across Firmwares (kph-normalized)", fg=35, bold=True))
if len(by_unit_fw) == 0:
print(f" {colorize('no unit-grouped firmware data available', fg=31)}")
return
for unit in sorted(by_unit_fw.keys()):
fw_rows = by_unit_fw[unit]
firmwares = sorted(fw_rows.keys())
all_rows = [row for fw in firmwares for row in fw_rows[fw]]
unit_routes = [route for route, _ in all_rows]
unit_regions = len({route_region.get(route, "UNKNOWN") for route in unit_routes})
unit_users = len({user_id_from_route(route) for route in unit_routes})
print(
f" {colorize(f'[{unit_name(unit)}]', fg=32, bold=True)} "
f"firmwares_with_data={len(firmwares)}, routes={len(all_rows)}, regions={unit_regions}, users={unit_users}"
)
if len(firmwares) < 2:
print(f" {colorize('not enough firmwares for consistency comparison', fg=33)}")
else:
fw_gain_vals: list[float | None] = []
fw_bias_vals: list[float | None] = []
fw_hyst_vals: list[float | None] = []
fw_disp_comp_vals: list[float | None] = []
for fw in firmwares:
rows = fw_rows[fw]
gains = finite_array([s.gain_avg_bias_comp for _, s in rows])
biases = finite_array([source_to_kph(s.bias_median_source, unit) for _, s in rows])
hysts = finite_array([source_to_kph(s.hysteresis_median_source, unit) for _, s in rows])
disp_comps = finite_array([sum_if_both(source_to_kph(s.bias_median_source, unit), source_to_kph(s.hysteresis_median_source, unit))
for _, s in rows])
fw_gain_vals.append(float(np.median(gains)) if gains.size else None)
fw_bias_vals.append(float(np.median(biases)) if biases.size else None)
fw_hyst_vals.append(float(np.median(hysts)) if hysts.size else None)
fw_disp_comp_vals.append(float(np.median(disp_comps)) if disp_comps.size else None)
_, _, _, _, gain_span, gain_n = summarize_values(fw_gain_vals)
_, _, _, _, bias_span, bias_n = summarize_values(fw_bias_vals)
_, _, _, _, hyst_span, hyst_n = summarize_values(fw_hyst_vals)
_, _, _, _, disp_comp_span, disp_comp_n = summarize_values(fw_disp_comp_vals)
speed_margin_kph = source_to_kph(CONSISTENCY_MARGIN_SPEED_SOURCE, unit)
gain_consistent = gain_n >= 2 and gain_span is not None and gain_span <= CONSISTENCY_MARGIN_GAIN
bias_consistent = bias_n >= 2 and bias_span is not None and speed_margin_kph is not None and bias_span <= speed_margin_kph
hyst_consistent = hyst_n >= 2 and hyst_span is not None and speed_margin_kph is not None and hyst_span <= speed_margin_kph
disp_comp_consistent = disp_comp_n >= 2 and disp_comp_span is not None and speed_margin_kph is not None and disp_comp_span <= speed_margin_kph
gain_label = colorize("consistent" if gain_consistent else "not consistent", fg=32 if gain_consistent else 31, bold=True)
bias_label = colorize("consistent" if bias_consistent else "not consistent", fg=32 if bias_consistent else 31, bold=True)
hyst_label = colorize("consistent" if hyst_consistent else "not consistent", fg=32 if hyst_consistent else 31, bold=True)
disp_comp_label = colorize("consistent" if disp_comp_consistent else "not consistent", fg=32 if disp_comp_consistent else 31, bold=True)
print(f" gain(avg) diff {gray_meta(f'(n={gain_n})')}: {fmt_gain_percent(gain_span, 3)} ({gain_label})")
print(f" bias_median span {gray_meta(f'(n={bias_n})')}: {fmt(bias_span)} kph ({bias_label})")
print(f" hysteresis(+/-)_median span {gray_meta(f'(n={hyst_n})')}: {fmt(hyst_span)} kph ({hyst_label})")
print(f" display_comp span {gray_meta(f'(n={disp_comp_n})')}: {fmt(disp_comp_span)} kph ({disp_comp_label})")
print(f" {colorize('firmware values:', fg=36, bold=True)}")
for fw in firmwares:
rows = fw_rows[fw]
gains_raw = [s.gain_avg_bias_comp for _, s in rows]
biases_raw = [source_to_kph(s.bias_median_source, unit) for _, s in rows]
hysts_raw = [source_to_kph(s.hysteresis_median_source, unit) for _, s in rows]
disp_comps_raw = [sum_if_both(source_to_kph(s.bias_median_source, unit), source_to_kph(s.hysteresis_median_source, unit))
for _, s in rows]
gains = finite_array(gains_raw)
biases = finite_array(biases_raw)
hysts = finite_array(hysts_raw)
disp_comps = finite_array(disp_comps_raw)
gain_v = float(np.median(gains)) if gains.size else None
bias_v = float(np.median(biases)) if biases.size else None
hyst_v = float(np.median(hysts)) if hysts.size else None
disp_comp_v = float(np.median(disp_comps)) if disp_comps.size else None
_, _, _, _, gain_span_fw, gain_n_fw = summarize_values(gains_raw)
_, _, _, _, bias_span_fw, bias_n_fw = summarize_values(biases_raw)
_, _, _, _, hyst_span_fw, hyst_n_fw = summarize_values(hysts_raw)
_, _, _, _, disp_comp_span_fw, disp_comp_n_fw = summarize_values(disp_comps_raw)
speed_margin_kph = source_to_kph(CONSISTENCY_MARGIN_SPEED_SOURCE, unit)
if gain_n_fw < 2:
gain_label_fw = colorize("single-route", fg=33, bold=True)
else:
gain_consistent_fw = gain_span_fw is not None and gain_span_fw <= CONSISTENCY_MARGIN_GAIN
gain_label_fw = colorize("consistent" if gain_consistent_fw else "not consistent", fg=32 if gain_consistent_fw else 31, bold=True)
if bias_n_fw < 2:
bias_label_fw = colorize("single-route", fg=33, bold=True)
else:
bias_consistent_fw = bias_span_fw is not None and speed_margin_kph is not None and bias_span_fw <= speed_margin_kph
bias_label_fw = colorize("consistent" if bias_consistent_fw else "not consistent", fg=32 if bias_consistent_fw else 31, bold=True)
if hyst_n_fw < 2:
hyst_label_fw = colorize("single-route", fg=33, bold=True)
else:
hyst_consistent_fw = hyst_span_fw is not None and speed_margin_kph is not None and hyst_span_fw <= speed_margin_kph
hyst_label_fw = colorize("consistent" if hyst_consistent_fw else "not consistent", fg=32 if hyst_consistent_fw else 31, bold=True)
if disp_comp_n_fw < 2:
disp_comp_label_fw = colorize("single-route", fg=33, bold=True)
else:
disp_comp_consistent_fw = disp_comp_span_fw is not None and speed_margin_kph is not None and disp_comp_span_fw <= speed_margin_kph
disp_comp_label_fw = colorize("consistent" if disp_comp_consistent_fw else "not consistent", fg=32 if disp_comp_consistent_fw else 31, bold=True)
fw_label = colorize(f"[{fw}]", bold=True)
region_count = len({route_region.get(route, "UNKNOWN") for route, _ in rows})
user_count = len({user_id_from_route(route) for route, _ in rows})
print(f" {fw_label} routes={len(rows)}: regions={region_count}, users={user_count}")
print(
f" gain_avg={fmt(gain_v, 4)} "
f"({gray_meta(f'diff={fmt_gain_percent(gain_span_fw, 3)}, n={gain_n_fw}')}, {gain_label_fw})"
)
print(
f" bias_median={fmt(bias_v)} kph "
f"({gray_meta(f'span={fmt(bias_span_fw)} kph, n={bias_n_fw}')}, {bias_label_fw})"
)
print(
f" hysteresis(+/-)_median={fmt(hyst_v)} kph "
f"({gray_meta(f'span={fmt(hyst_span_fw)} kph, n={hyst_n_fw}')}, {hyst_label_fw})"
)
print(
f" display_comp={fmt(disp_comp_v)} kph "
f"({gray_meta(f'span={fmt(disp_comp_span_fw)} kph, n={disp_comp_n_fw}')}, {disp_comp_label_fw})"
)
def print_region_consistency_summary(per_route_samples: list[RouteSamples]) -> None:
by_unit_region = route_metric_stats_by_unit_and_region(per_route_samples)
route_fw = {rs.route: (rs.eps_fw if len(rs.eps_fw) else "n/a") for rs in per_route_samples}
route_meta = {rs.route: rs for rs in per_route_samples}
print("\n" + colorize("Consistency Across Firmwares in different Regions (kph-normalized)", fg=35, bold=True))
if len(by_unit_region) == 0:
print(f" {colorize('no unit-grouped region data available', fg=31)}")
return
for unit in sorted(by_unit_region.keys()):
region_rows = by_unit_region[unit]
regions = sorted(region_rows.keys())
print(f" {colorize(f'[{unit_name(unit)}]', fg=32, bold=True)} regions_with_data={len(regions)}")
if len(regions) < 2:
print(f" {colorize('not enough regions for consistency comparison', fg=33)}")
else:
region_gain_vals: list[float | None] = []
region_bias_vals: list[float | None] = []
region_hyst_vals: list[float | None] = []
region_disp_vals: list[float | None] = []
region_gain_pairs: list[tuple[str, float | None]] = []
region_bias_pairs: list[tuple[str, float | None]] = []
region_hyst_pairs: list[tuple[str, float | None]] = []
region_disp_pairs: list[tuple[str, float | None]] = []
for region in regions:
rows = region_rows[region]
gain_fw_vals, bias_fw_vals, hyst_fw_vals, disp_fw_vals, _ = firmware_metric_lists_from_rows(rows, unit, route_fw)
gains = finite_array(gain_fw_vals)
biases = finite_array(bias_fw_vals)
hysts = finite_array(hyst_fw_vals)
disp = finite_array(disp_fw_vals)
region_gain_v = float(np.median(gains)) if gains.size else None
region_bias_v = float(np.median(biases)) if biases.size else None
region_hyst_v = float(np.median(hysts)) if hysts.size else None
region_disp_v = float(np.median(disp)) if disp.size else None
region_gain_vals.append(region_gain_v)
region_bias_vals.append(region_bias_v)
region_hyst_vals.append(region_hyst_v)
region_disp_vals.append(region_disp_v)
region_gain_pairs.append((region, region_gain_v))
region_bias_pairs.append((region, region_bias_v))
region_hyst_pairs.append((region, region_hyst_v))
region_disp_pairs.append((region, region_disp_v))
_, _, _, _, gain_span, gain_n = summarize_values(region_gain_vals)
_, _, _, _, bias_span, bias_n = summarize_values(region_bias_vals)
_, _, _, _, hyst_span, hyst_n = summarize_values(region_hyst_vals)
_, _, _, _, disp_span, disp_n = summarize_values(region_disp_vals)
speed_margin_kph = source_to_kph(CONSISTENCY_MARGIN_SPEED_SOURCE, unit)
gain_consistent = gain_n >= 2 and gain_span is not None and gain_span <= CONSISTENCY_MARGIN_GAIN
bias_consistent = bias_n >= 2 and bias_span is not None and speed_margin_kph is not None and bias_span <= speed_margin_kph
hyst_consistent = hyst_n >= 2 and hyst_span is not None and speed_margin_kph is not None and hyst_span <= speed_margin_kph
disp_consistent = disp_n >= 2 and disp_span is not None and speed_margin_kph is not None and disp_span <= speed_margin_kph
gain_label = colorize("consistent" if gain_consistent else "not consistent", fg=32 if gain_consistent else 31, bold=True)
bias_label = colorize("consistent" if bias_consistent else "not consistent", fg=32 if bias_consistent else 31, bold=True)
hyst_label = colorize("consistent" if hyst_consistent else "not consistent", fg=32 if hyst_consistent else 31, bold=True)
disp_label = colorize("consistent" if disp_consistent else "not consistent", fg=32 if disp_consistent else 31, bold=True)
def extrema_region_text(region_vals: list[tuple[str, float | None]], digits: int, suffix: str = "") -> str:
finite_vals = [(region, float(v)) for region, v in region_vals if v is not None and np.isfinite(v)]
if len(finite_vals) == 0:
return ""
min_region, min_val = min(finite_vals, key=lambda x: x[1])
max_region, max_val = max(finite_vals, key=lambda x: x[1])
if not SHOW_METADATA:
return gray_meta(
f"min={fmt(min_val, digits)}{suffix} @{min_region}, "
f"max={fmt(max_val, digits)}{suffix} @{max_region}"
)
min_route = sorted(route for route, _ in region_rows[min_region])[0] if min_region in region_rows and len(region_rows[min_region]) else "n/a"
max_route = sorted(route for route, _ in region_rows[max_region])[0] if max_region in region_rows and len(region_rows[max_region]) else "n/a"
min_ctx = route_context_text(min_route, route_meta, region=min_region, fw=route_fw.get(min_route, "n/a"), include_region=False)
max_ctx = route_context_text(max_route, route_meta, region=max_region, fw=route_fw.get(max_route, "n/a"), include_region=False)
return gray_meta(
f"min={fmt(min_val, digits)}{suffix} @{min_region}({min_ctx}), "
f"max={fmt(max_val, digits)}{suffix} @{max_region}({max_ctx})"
)
gain_ext = extrema_region_text(region_gain_pairs, 4)
bias_ext = extrema_region_text(region_bias_pairs, 3, " kph")
hyst_ext = extrema_region_text(region_hyst_pairs, 3, " kph")
disp_ext = extrema_region_text(region_disp_pairs, 3, " kph")
print(
f" gain(avg) diff {gray_meta(f'(n={gain_n})')}: {fmt_gain_percent(gain_span, 3)} ({gain_label})"
f"{(' ' + gain_ext) if (not gain_consistent and gain_ext) else ''}"
)
print(
f" bias_median span {gray_meta(f'(n={bias_n})')}: {fmt(bias_span)} kph ({bias_label})"
f"{(' ' + bias_ext) if (not bias_consistent and bias_ext) else ''}"
)
print(
f" hysteresis(+/-)_median span {gray_meta(f'(n={hyst_n})')}: {fmt(hyst_span)} kph ({hyst_label})"
f"{(' ' + hyst_ext) if (not hyst_consistent and hyst_ext) else ''}"
)
print(
f" display_comp span {gray_meta(f'(n={disp_n})')}: {fmt(disp_span)} kph ({disp_label})"
f"{(' ' + disp_ext) if (not disp_consistent and disp_ext) else ''}"
)
print(f" {colorize('region values:', fg=36, bold=True)}")
for region in regions:
rows = region_rows[region]
gains_raw, biases_raw, hysts_raw, disp_raw, fw_ids = firmware_metric_lists_from_rows(rows, unit, route_fw)
gains = finite_array(gains_raw)
biases = finite_array(biases_raw)
hysts = finite_array(hysts_raw)
disp = finite_array(disp_raw)
gain_v = float(np.median(gains)) if gains.size else None
bias_v = float(np.median(biases)) if biases.size else None
hyst_v = float(np.median(hysts)) if hysts.size else None
disp_v = float(np.median(disp)) if disp.size else None
_, _, _, _, gain_span_rg, gain_n_rg = summarize_values(gains_raw)
_, _, _, _, bias_span_rg, bias_n_rg = summarize_values(biases_raw)
_, _, _, _, hyst_span_rg, hyst_n_rg = summarize_values(hysts_raw)
_, _, _, _, disp_span_rg, disp_n_rg = summarize_values(disp_raw)
speed_margin_kph = source_to_kph(CONSISTENCY_MARGIN_SPEED_SOURCE, unit)
if gain_n_rg < 2:
gain_label_rg = colorize("single-firmware", fg=33, bold=True)
else:
gain_consistent_rg = gain_span_rg is not None and gain_span_rg <= CONSISTENCY_MARGIN_GAIN
gain_label_rg = colorize("consistent" if gain_consistent_rg else "not consistent", fg=32 if gain_consistent_rg else 31, bold=True)
if bias_n_rg < 2:
bias_label_rg = colorize("single-firmware", fg=33, bold=True)
else:
bias_consistent_rg = bias_span_rg is not None and speed_margin_kph is not None and bias_span_rg <= speed_margin_kph
bias_label_rg = colorize("consistent" if bias_consistent_rg else "not consistent", fg=32 if bias_consistent_rg else 31, bold=True)
if hyst_n_rg < 2:
hyst_label_rg = colorize("single-firmware", fg=33, bold=True)
else:
hyst_consistent_rg = hyst_span_rg is not None and speed_margin_kph is not None and hyst_span_rg <= speed_margin_kph
hyst_label_rg = colorize("consistent" if hyst_consistent_rg else "not consistent", fg=32 if hyst_consistent_rg else 31, bold=True)
if disp_n_rg < 2:
disp_label_rg = colorize("single-firmware", fg=33, bold=True)
else:
disp_consistent_rg = disp_span_rg is not None and speed_margin_kph is not None and disp_span_rg <= speed_margin_kph
disp_label_rg = colorize("consistent" if disp_consistent_rg else "not consistent", fg=32 if disp_consistent_rg else 31, bold=True)
user_count = len({user_id_from_route(route) for route, _ in rows})
units_text = encountered_units_text({unit})
print(f" {colorize(f'[{region}]', bold=True)} routes={len(rows)}, firmwares={len(fw_ids)}, users: {user_count}, ({units_text})")
print(
f" gain_avg={fmt(gain_v, 4)} "
f"({gray_meta(f'diff={fmt_gain_percent(gain_span_rg, 3)}, n={gain_n_rg}')}, {gain_label_rg})"
)
print(
f" bias_median={fmt(bias_v)} kph "
f"({gray_meta(f'span={fmt(bias_span_rg)} kph, n={bias_n_rg}')}, {bias_label_rg})"
)
print(
f" hysteresis(+/-)_median={fmt(hyst_v)} kph "
f"({gray_meta(f'span={fmt(hyst_span_rg)} kph, n={hyst_n_rg}')}, {hyst_label_rg})"
)
print(
f" display_comp={fmt(disp_v)} kph "
f"({gray_meta(f'span={fmt(disp_span_rg)} kph, n={disp_n_rg}')}, {disp_label_rg})"
)
def print_final_consistency(per_route_samples: list[RouteSamples]) -> None:
by_unit = route_metric_stats_by_unit(per_route_samples)
route_eps = {rs.route: rs.eps_fw for rs in per_route_samples}
route_region = {rs.route: rs.region for rs in per_route_samples}
route_meta = {rs.route: rs for rs in per_route_samples}
print("\n" + colorize("Final Consistency", fg=35, bold=True))
print(
f" {colorize('margins:', fg=90)} "
f"gain_diff<={fmt_gain_percent(CONSISTENCY_MARGIN_GAIN, 3)}, "
f"speed_span<={fmt(CONSISTENCY_MARGIN_SPEED_SOURCE, 3)} source units "
f"(KPH={fmt(source_to_kph(CONSISTENCY_MARGIN_SPEED_SOURCE, UNIT_KPH), 3)} kph, "
f"MPH={fmt(source_to_kph(CONSISTENCY_MARGIN_SPEED_SOURCE, UNIT_MPH), 3)} kph)"
)
if len(by_unit) == 0:
print(f" {colorize('no unit-grouped route data available', fg=31)}")
return
for unit in sorted(by_unit.keys()):
rows = by_unit[unit]
unit_routes = [route for route, _ in rows]
unit_firmwares = len({route_eps.get(route, "n/a") for route in unit_routes})
unit_regions = len({route_region.get(route, "UNKNOWN") for route in unit_routes})
unit_users = len({user_id_from_route(route) for route in unit_routes})
if SHOW_METADATA:
print(
f" {colorize(f'[{unit_name(unit)}]', fg=32, bold=True)} "
f"routes_with_data={len(rows)}, firmwares={unit_firmwares}, regions={unit_regions}, users={unit_users}"
)
else:
print(
f" {colorize(f'[{unit_name(unit)}]', fg=32, bold=True)} "
f"routes_with_data={len(rows)}"
)
if len(rows) < 2:
print(f" {colorize('not enough routes for consistency check', fg=33)}")
for route, stats in rows:
route_label = colorize(route, fg=90)
print(f" {route_label}:")
print(f" gain_avg={fmt(stats.gain_avg_bias_comp, 4)}")
print(f" bias_median={format_kph(stats.bias_median_source, unit)}")
print(f" hysteresis(+/-)_median={format_kph(stats.hysteresis_median_source, unit)}")
print(f" display_comp={fmt(sum_if_both(source_to_kph(stats.bias_median_source, unit), source_to_kph(stats.hysteresis_median_source, unit)))} kph")
continue
gain_avgs = [s.gain_avg_bias_comp for _, s in rows]
bias_medians = [source_to_kph(s.bias_median_source, unit) for _, s in rows]
hyst_medians = [source_to_kph(s.hysteresis_median_source, unit) for _, s in rows]
disp_comp_medians = [sum_if_both(source_to_kph(s.bias_median_source, unit), source_to_kph(s.hysteresis_median_source, unit))
for _, s in rows]
gain_route_vals = [(route, float(s.gain_avg_bias_comp)) for route, s in rows
if s.gain_avg_bias_comp is not None and np.isfinite(s.gain_avg_bias_comp)]
bias_route_vals = [(route, float(v)) for (route, _), v in zip(rows, bias_medians)
if v is not None and np.isfinite(v)]
hyst_route_vals = [(route, float(v)) for (route, _), v in zip(rows, hyst_medians)
if v is not None and np.isfinite(v)]
disp_comp_route_vals = [(route, float(v)) for (route, _), v in zip(rows, disp_comp_medians)
if v is not None and np.isfinite(v)]
gain_vals = finite_array(gain_avgs)
bias_vals = finite_array(bias_medians)
hyst_vals = finite_array(hyst_medians)
disp_comp_vals = finite_array(disp_comp_medians)
_, _, _, _, gain_span, gain_n = summarize_values(gain_avgs)
_, _, _, _, bias_span, bias_n = summarize_values(bias_medians)
_, _, _, _, hyst_span, hyst_n = summarize_values(hyst_medians)
_, _, _, _, disp_comp_span, disp_comp_n = summarize_values(disp_comp_medians)
speed_margin_kph = source_to_kph(CONSISTENCY_MARGIN_SPEED_SOURCE, unit)
gain_consistent = gain_n >= 2 and gain_span is not None and gain_span <= CONSISTENCY_MARGIN_GAIN
bias_consistent = bias_n >= 2 and bias_span is not None and speed_margin_kph is not None and bias_span <= speed_margin_kph
hyst_consistent = hyst_n >= 2 and hyst_span is not None and speed_margin_kph is not None and hyst_span <= speed_margin_kph
disp_comp_consistent = disp_comp_n >= 2 and disp_comp_span is not None and speed_margin_kph is not None and disp_comp_span <= speed_margin_kph
gain_median_value = float(np.median(gain_vals)) if gain_vals.size else None
bias_median_value = float(np.median(bias_vals)) if bias_vals.size else None
hyst_median_value = float(np.median(hyst_vals)) if hyst_vals.size else None
disp_comp_median_value = float(np.median(disp_comp_vals)) if disp_comp_vals.size else None
gain_label = colorize("consistent" if gain_consistent else "not consistent", fg=32 if gain_consistent else 31, bold=True)
bias_label = colorize("consistent" if bias_consistent else "not consistent", fg=32 if bias_consistent else 31, bold=True)
hyst_label = colorize("consistent" if hyst_consistent else "not consistent", fg=32 if hyst_consistent else 31, bold=True)
disp_comp_label = colorize("consistent" if disp_comp_consistent else "not consistent", fg=32 if disp_comp_consistent else 31, bold=True)
def extrema_text(route_vals: list[tuple[str, float]], digits: int, suffix: str = "") -> str:
if len(route_vals) == 0:
return ""
min_route, min_val = min(route_vals, key=lambda x: x[1])
max_route, max_val = max(route_vals, key=lambda x: x[1])
min_txt = f"{fmt(min_val, digits)}{suffix}"
max_txt = f"{fmt(max_val, digits)}{suffix}"
if SHOW_METADATA:
min_ctx = route_context_text(min_route, route_meta)
max_ctx = route_context_text(max_route, route_meta)
return gray_meta(f"min={min_txt} @{min_ctx}, max={max_txt} @{max_ctx}")
return gray_meta(f"min={min_txt}, max={max_txt}")
def raw_array_text(values: list[float | None], digits: int) -> str:
finite_vals = [float(v) for v in values if v is not None and np.isfinite(v)]
arr_txt = "[" + ", ".join(fmt(v, digits) for v in finite_vals) + "]"
return gray_meta(f"raw={arr_txt}")
bias_raw = raw_array_text(bias_medians, 3)
hyst_raw = raw_array_text(hyst_medians, 3)
disp_raw = raw_array_text(disp_comp_medians, 3)
if gain_consistent:
print(f" gain_avg: {gain_label} ({gray_meta(f'n={gain_n}, median={fmt(gain_median_value, 4)}')})")
else:
gain_ext = extrema_text(gain_route_vals, 4)
print(
f" gain_avg: {gain_label} ({gray_meta(f'n={gain_n}, diff={fmt_gain_percent(gain_span, 3)}')})"
f"{(' ' + gain_ext) if gain_ext else ''}"
)
if bias_consistent:
print(f" bias_median: {bias_label} ({gray_meta(f'n={bias_n}, median={fmt(bias_median_value)} kph')}) {bias_raw}")
else:
bias_ext = extrema_text(bias_route_vals, 3, " kph")
print(
f" bias_median: {bias_label} ({gray_meta(f'n={bias_n}, span={fmt(bias_span)} kph')})"
f"{(' ' + bias_ext) if bias_ext else ''} {bias_raw}"
)
if hyst_consistent:
print(f" hysteresis(+/-)_median: {hyst_label} ({gray_meta(f'n={hyst_n}, median={fmt(hyst_median_value)} kph')}) {hyst_raw}")
else:
hyst_ext = extrema_text(hyst_route_vals, 3, " kph")
print(
f" hysteresis(+/-)_median: {hyst_label} ({gray_meta(f'n={hyst_n}, span={fmt(hyst_span)} kph')})"
f"{(' ' + hyst_ext) if hyst_ext else ''} {hyst_raw}"
)
if disp_comp_consistent:
print(f" display_comp: {disp_comp_label} ({gray_meta(f'n={disp_comp_n}, median={fmt(disp_comp_median_value)} kph')}) {disp_raw}")
else:
disp_ext = extrema_text(disp_comp_route_vals, 3, " kph")
print(
f" display_comp: {disp_comp_label} ({gray_meta(f'n={disp_comp_n}, span={fmt(disp_comp_span)} kph')})"
f"{(' ' + disp_ext) if disp_ext else ''} {disp_raw}"
)
def print_display_comp_extremes(per_route_samples: list[RouteSamples]) -> None:
by_unit = route_metric_stats_by_unit(per_route_samples)
print("\n" + colorize("Display Comp Extremes", fg=35, bold=True))
if len(by_unit) == 0:
print(f" {colorize('no unit-grouped route data available', fg=31)}")
return
route_meta = {rs.route: rs for rs in per_route_samples}
for unit in (UNIT_MPH, UNIT_KPH):
rows = by_unit.get(unit, [])
route_vals: list[tuple[str, float]] = []
for route, stats in rows:
disp_comp = sum_if_both(stats.bias_median_source, stats.hysteresis_median_source)
if disp_comp is None or not np.isfinite(disp_comp):
continue
disp_comp_kph = source_to_kph(float(disp_comp), unit)
if disp_comp_kph is None or not np.isfinite(disp_comp_kph):
continue
route_vals.append((route, float(disp_comp_kph)))
print(f" {colorize(f'[{unit_name(unit)}]', fg=32, bold=True)} {gray_meta(f'routes_with_value={len(route_vals)}')}")
if len(route_vals) == 0:
print(f" {gray_meta('no display compensation values available')}")
continue
min_route, min_val_kph = min(route_vals, key=lambda x: x[1])
max_route, max_val_kph = max(route_vals, key=lambda x: x[1])
values_txt = "[" + ", ".join(fmt(v, 3) for _, v in route_vals) + "]"
print(f" {colorize('display_comp:', bold=True)} {gray_meta(values_txt)}")
if SHOW_METADATA:
min_info = f"({route_context_text(min_route, route_meta)})"
max_info = f"({route_context_text(max_route, route_meta)})"
print(f" {colorize('min:', bold=True)} {fmt(min_val_kph)} kph {gray_meta(min_info)}")
print(f" {colorize('max:', bold=True)} {fmt(max_val_kph)} kph {gray_meta(max_info)}")
else:
print(f" {colorize('min:', bold=True)} {fmt(min_val_kph)} kph")
print(f" {colorize('max:', bold=True)} {fmt(max_val_kph)} kph")
def print_bias_hysteresis_extremes(per_route_samples: list[RouteSamples]) -> None:
by_unit = route_metric_stats_by_unit(per_route_samples)
print("\n" + colorize("Bias & Hysteresis Extremes", fg=35, bold=True))
if len(by_unit) == 0:
print(f" {colorize('no unit-grouped route data available', fg=31)}")
return
route_meta = {rs.route: rs for rs in per_route_samples}
def print_extrema_block(metric_name: str, route_vals: list[tuple[str, float]]) -> None:
if len(route_vals) == 0:
print(f" {gray_meta(f'no {metric_name} values available')}")
return
min_route, min_val_kph = min(route_vals, key=lambda x: x[1])
max_route, max_val_kph = max(route_vals, key=lambda x: x[1])
values_txt = "[" + ", ".join(fmt(v, 3) for _, v in route_vals) + "]"
print(f" {colorize(f'{metric_name}:', bold=True)} {gray_meta(values_txt)}")
if SHOW_METADATA:
min_info = f"({route_context_text(min_route, route_meta)})"
max_info = f"({route_context_text(max_route, route_meta)})"
print(f" {colorize('min:', bold=True)} {fmt(min_val_kph)} kph {gray_meta(min_info)}")
print(f" {colorize('max:', bold=True)} {fmt(max_val_kph)} kph {gray_meta(max_info)}")
else:
print(f" {colorize('min:', bold=True)} {fmt(min_val_kph)} kph")
print(f" {colorize('max:', bold=True)} {fmt(max_val_kph)} kph")
for unit in (UNIT_MPH, UNIT_KPH):
rows = by_unit.get(unit, [])
bias_vals: list[tuple[str, float]] = []
hyst_vals: list[tuple[str, float]] = []
for route, stats in rows:
bias_kph = source_to_kph(stats.bias_median_source, unit)
if bias_kph is not None and np.isfinite(bias_kph):
bias_vals.append((route, float(bias_kph)))
hyst_kph = source_to_kph(stats.hysteresis_median_source, unit)
if hyst_kph is not None and np.isfinite(hyst_kph):
hyst_vals.append((route, float(hyst_kph)))
routes_with_any = len(set([r for r, _ in bias_vals] + [r for r, _ in hyst_vals]))
print(f" {colorize(f'[{unit_name(unit)}]', fg=32, bold=True)} {gray_meta(f'routes_with_value={routes_with_any}')}")
print_extrema_block("bias_median", bias_vals)
print_extrema_block("hysteresis(+/-)_median", hyst_vals)
def compact_exception(exc: Exception) -> str:
text = str(exc).strip()
if not text:
return exc.__class__.__name__
first_line = text.splitlines()[0].strip()
if "logs were not found" in first_line:
return first_line.split(",", 1)[0]
return first_line
def print_aggregate_stats(per_route_samples: list[RouteSamples]) -> None:
if len(per_route_samples) == 0:
return
t_all = np.concatenate([rs.t for rs in per_route_samples if rs.t.size], axis=0)
ui_all = np.concatenate([rs.ui_speed_raw for rs in per_route_samples if rs.ui_speed_raw.size], axis=0)
units_all = np.concatenate([rs.ui_units for rs in per_route_samples if rs.ui_units.size], axis=0)
veh_all = np.concatenate([rs.vehicle_kph for rs in per_route_samples if rs.vehicle_kph.size], axis=0)
if t_all.size == 0:
return
order = np.argsort(t_all)
t_all = t_all[order]
ui_all = ui_all[order]
units_all = units_all[order]
veh_all = veh_all[order]
print("\n" + colorize("=" * 96, fg=35, bold=True))
print(colorize("ALL_ROUTES", fg=35, bold=True))
unit_vals, unit_counts = np.unique(units_all, return_counts=True)
counts = ", ".join(f"{unit_name(int(u))}={int(c)}" for u, c in zip(unit_vals, unit_counts))
print(f"{colorize('unit_counts:', fg=90)} {counts}")
print(f"{colorize('sample summary:', fg=90)}")
for u in unit_vals:
unit = int(u)
mask = units_all == unit
stats = compute_unit_stats(t_all[mask], ui_all[mask], veh_all[mask], unit)
print(f" [{unit_name(unit)}] samples={stats.sample_count}")
print_consistency_summary(per_route_samples)
print_firmware_consistency_summary(per_route_samples)
print_region_consistency_summary(per_route_samples)
print_final_consistency(per_route_samples)
print_bias_hysteresis_extremes(per_route_samples)
print_display_comp_extremes(per_route_samples)
def main() -> int:
parser = argparse.ArgumentParser(
description="Measure Tesla DI_uiSpeed hysteresis (+/- half-width) and bias versus DI_vehicleSpeed",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument("route", nargs="*", help="Route(s) in dongle/segment format")
parser.add_argument("--workers", type=int, default=4, help="Workers for run_across_segments")
parser.add_argument("--no-cache", action="store_true", help="Disable cache read/write")
parser.add_argument("--no-color", action="store_true", help="Disable ANSI color output")
parser.add_argument("--show-metadata", action="store_true", help="Show extra metadata (region/user/fw) in summaries")
parser.add_argument(
"--vehicle-speed-signal",
default=DEFAULT_VEHICLE_SPEED_SIGNAL,
help="DI_speed signal to use as vehicle speed reference (e.g. DI_vehicleSpeed or DI_vehicleSpeed_new)",
)
args = parser.parse_args()
global USE_COLOR, SHOW_METADATA
USE_COLOR = (not args.no_color) and sys.stdout.isatty() and os.environ.get("NO_COLOR") is None
SHOW_METADATA = args.show_metadata
try:
vehicle_speed_signal = resolve_vehicle_speed_signal(args.vehicle_speed_signal)
except ValueError as e:
parser.error(str(e))
return 2
routes = args.route if len(args.route) else DEFAULT_ROUTES
normalized_routes = []
for raw in routes:
route = normalize_route(raw)
if route != raw:
print(f"Normalized route '{raw}' -> '{route}'")
normalized_routes.append(route)
all_route_samples: list[RouteSamples] = []
total_routes = len(normalized_routes)
print(colorize(f"Processing {total_routes} route(s)...", fg=34, bold=True))
print_median_confidence_legend()
for i, route in enumerate(normalized_routes, start=1):
if i > 1:
print()
print(colorize(f"[{i}/{total_routes}] {route}", fg=34))
cache_file = route_cache_filename(route, vehicle_speed_signal)
try:
if not args.no_cache and os.path.exists(cache_file):
print(colorize("Loading cache...", fg=90))
else:
print(colorize("Processing route data...", fg=90))
samples = load_or_scan_route(route, workers=args.workers, no_cache=args.no_cache, vehicle_speed_signal=vehicle_speed_signal)
print_route_stats(samples)
all_route_samples.append(samples)
except Exception as e:
print(colorize(f"Skipping {route} due to error: {compact_exception(e)}", fg=31, bold=True))
if os.path.exists(cache_file):
os.remove(cache_file)
continue
if len(all_route_samples) == 0:
print(colorize("No valid routes were processed", fg=31, bold=True))
return 1
print_aggregate_stats(all_route_samples)
return 0
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment