Created
February 24, 2026 19:37
-
-
Save dzid26/ee2efbb26c7c39f6e408bcbe76eaef56 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Analyze Tesla DI_uiSpeed hysteresis (+/- half-width) and bias against DI_vehicleSpeed on route logs. | |
| Features: | |
| - Route-level caching similar to torque_lat_accel.py | |
| - Robust DI_uiSpeedUnits detection: | |
| - DBC DI_uiSpeedUnits decode | |
| - raw bit 32 and raw bit 33 | |
| - data-inferred units from DI_uiSpeed vs DI_vehicleSpeed fit | |
| The best source is selected automatically per route. | |
| - Metrics printed in source units, metric (kph), imperial (mph), and m/s. | |
| - "Max" metrics include raw max and p99 (skip-outliers view). | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import os | |
| import pickle | |
| import sys | |
| from dataclasses import dataclass | |
| from functools import partial | |
| import numpy as np | |
| from opendbc.can.dbc import DBC | |
| from opendbc.can.parser import get_raw_value | |
| from opendbc.car.common.conversions import Conversions as CV | |
| from opendbc.car.structs import CarParams | |
| from openpilot.tools.lib.file_sources import comma_api_source, comma_car_segments_source, internal_source, openpilotci_source | |
| from openpilot.tools.lib.logreader import LogReader | |
| from openpilot.tools.lib.route import FileName, SegmentRange | |
| CACHE_DIR = os.path.expanduser("~/.cache/openpilot_tesla_ui_speed") | |
| CACHE_VERSION = 8 | |
| MAX_TRANSITION_DT_S = 0.5 | |
| ROBUST_MAX_PERCENTILE = 99.0 | |
| PARTY_BUS = 0 | |
| MIN_SPEED_FOR_UNIT_SCORING_KPH = 5.0 | |
| ROBUST_AVG_LOW_PERCENTILE = 10.0 | |
| ROBUST_AVG_HIGH_PERCENTILE = 90.0 | |
| MEDIAN_TRIM_LOW_PERCENTILE = 10.0 | |
| MEDIAN_TRIM_HIGH_PERCENTILE = 90.0 | |
| MEDIAN_BOOTSTRAP_ITERS = 300 | |
| MEDIAN_BOOTSTRAP_MAX_POINTS = 5000 | |
| CONSISTENCY_MARGIN_GAIN = 0.005 # unitless gain diff (0.5%) | |
| CONSISTENCY_MARGIN_SPEED_SOURCE = 0.05 # source units (kph or mph) | |
| DEFAULT_ROUTES = [ | |
| "2a251bf8a265ff32/000002c9--46bf64b2d2", | |
| "2a251bf8a265ff32/000002cd--f97069273f", | |
| "4b3cbd6038d07ca6/000001ee--f11ccf6421", | |
| ] | |
| os.makedirs(CACHE_DIR, exist_ok=True) | |
| _TESLA_PARTY_DBC = DBC("tesla_model3_party") | |
| _DI_SPEED_MSG = _TESLA_PARTY_DBC.name_to_msg["DI_speed"] | |
| _DI_SPEED_ADDR = _DI_SPEED_MSG.address | |
| _SIG_UI_UNITS = _DI_SPEED_MSG.sigs["DI_uiSpeedUnits"] | |
| _SIG_UI_SPEED = _DI_SPEED_MSG.sigs["DI_uiSpeed"] | |
| _SIG_VEH_SPEED = _DI_SPEED_MSG.sigs["DI_vehicleSpeed"] | |
| _DI_VEHICLE_SPEED_SIGNALS = sorted([name for name in _DI_SPEED_MSG.sigs.keys() if name.startswith("DI_vehicleSpeed")]) | |
| DEFAULT_VEHICLE_SPEED_SIGNAL = "DI_vehicleSpeed" if "DI_vehicleSpeed" in _DI_SPEED_MSG.sigs else (_DI_VEHICLE_SPEED_SIGNALS[0] if len(_DI_VEHICLE_SPEED_SIGNALS) else "") | |
| UNIT_MPH = 0 | |
| UNIT_KPH = 1 | |
| Ecu = CarParams.Ecu | |
| USE_COLOR = False | |
| SHOW_METADATA = False | |
| @dataclass | |
| class SegmentSamples: | |
| t: list[float] | |
| ui_speed_raw: list[float] | |
| unit_dbc: list[int] | |
| unit_bit32: list[int] | |
| unit_bit33: list[int] | |
| vehicle_kph: list[float] | |
| gps_lat: list[float] | |
| gps_lon: list[float] | |
| @dataclass | |
| class RouteSamples: | |
| route: str | |
| t: np.ndarray | |
| ui_speed_raw: np.ndarray | |
| ui_units: np.ndarray | |
| vehicle_kph: np.ndarray | |
| vehicle_speed_signal: str | |
| unit_source: str | |
| eps_fw: str | |
| region: str | |
| region_center_lat: float | None | |
| region_center_lon: float | None | |
| @dataclass | |
| class UnitStats: | |
| unit_code: int | |
| sample_count: int | |
| bias_mean_source: float | None | |
| bias_avg_filtered_source: float | None | |
| bias_median_source: float | None | |
| bias_pos_max_source: float | None | |
| bias_pos_max_robust_source: float | None | |
| gain_avg_bias_comp: float | None | |
| gain_avg_filtered_bias_comp: float | None | |
| gain_sample_count: int | |
| hysteresis_mean_source: float | None | |
| hysteresis_avg_filtered_source: float | None | |
| hysteresis_median_source: float | None | |
| hysteresis_max_robust_source: float | None | |
| hysteresis_boundary_count: int | |
| transition_up_count: int | |
| transition_down_count: int | |
| bias_median_ci_low_source: float | None | |
| bias_median_ci_high_source: float | None | |
| bias_median_ci_width_source: float | None | |
| bias_median_mad_source: float | None | |
| bias_median_trimmed_source: float | None | |
| bias_median_trim_shift_source: float | None | |
| bias_median_conf_sample_count: int | |
| hysteresis_median_ci_low_source: float | None | |
| hysteresis_median_ci_high_source: float | None | |
| hysteresis_median_ci_width_source: float | None | |
| hysteresis_median_mad_source: float | None | |
| hysteresis_median_trimmed_source: float | None | |
| hysteresis_median_trim_shift_source: float | None | |
| hysteresis_median_conf_sample_count: int | |
| def unit_name(unit_code: int) -> str: | |
| if unit_code == UNIT_KPH: | |
| return "KPH" | |
| if unit_code == UNIT_MPH: | |
| return "MPH" | |
| return f"UNKNOWN({unit_code})" | |
| def encountered_units_text(units: set[int]) -> str: | |
| labels: list[str] = [] | |
| if UNIT_KPH in units: | |
| labels.append("kph") | |
| if UNIT_MPH in units: | |
| labels.append("mph") | |
| for u in sorted(units): | |
| if u not in (UNIT_KPH, UNIT_MPH): | |
| labels.append(unit_name(int(u)).lower()) | |
| return " ".join(labels) if labels else "no-units" | |
| def region_from_lat_lon(lat: float, lon: float) -> str: | |
| # Coarse automotive-market regions from lat/lon; intended for route-level grouping only. | |
| if not np.isfinite(lat) or not np.isfinite(lon): | |
| return "UNKNOWN" | |
| if lat < -90.0 or lat > 90.0 or lon < -180.0 or lon > 180.0: | |
| return "UNKNOWN" | |
| if 49.0 <= lat <= 61.5 and -11.5 <= lon <= 3.0: | |
| return "UK/IE" | |
| if 35.0 <= lat <= 72.5 and -25.0 <= lon <= 45.0: | |
| return "EU" | |
| # Southwest BC override (incl. Vancouver Island / Vancouver metro) to avoid US box overlap. | |
| if 48.2 <= lat <= 50.6 and -126.5 <= lon <= -122.8: | |
| return "CA" | |
| # US first (incl. AK/HI/PR) to avoid overlap with CA box. | |
| if (24.0 <= lat <= 49.0 and -125.0 <= lon <= -66.0) or \ | |
| (51.0 <= lat <= 72.0 and -170.0 <= lon <= -129.0) or \ | |
| (18.0 <= lat <= 23.0 and -161.0 <= lon <= -154.0) or \ | |
| (17.0 <= lat <= 19.0 and -68.0 <= lon <= -65.0): | |
| return "US" | |
| # Canada (coarse box). | |
| if 41.5 <= lat <= 83.0 and -141.0 <= lon <= -52.0: | |
| return "CA" | |
| if 15.0 <= lat <= 72.5 and -170.0 <= lon <= -50.0: | |
| return "NA" | |
| if -56.0 <= lat <= 15.0 and -95.0 <= lon <= -30.0: | |
| return "LATAM" | |
| if -48.0 <= lat <= -10.0 and 110.0 <= lon <= 180.0: | |
| return "ANZ" | |
| if 20.0 <= lat <= 51.0 and 122.0 <= lon <= 154.0: | |
| return "JP/KR" | |
| if 18.0 <= lat <= 54.0 and 73.0 <= lon <= 135.0: | |
| return "CN" | |
| if 5.0 <= lat <= 37.0 and 67.0 <= lon <= 97.0: | |
| return "IN" | |
| if -10.0 <= lat <= 37.0 and -20.0 <= lon <= 65.0: | |
| return "MEA" | |
| if -40.0 <= lat <= 25.0 and 95.0 <= lon <= 180.0: | |
| return "APAC" | |
| return "OTHER" | |
| def classify_route_region(gps_lat: np.ndarray, gps_lon: np.ndarray) -> tuple[str, float | None, float | None]: | |
| valid = np.isfinite(gps_lat) & np.isfinite(gps_lon) | |
| if not np.any(valid): | |
| return "UNKNOWN", None, None | |
| lats = gps_lat[valid] | |
| lons = gps_lon[valid] | |
| in_range = (lats >= -90.0) & (lats <= 90.0) & (lons >= -180.0) & (lons <= 180.0) | |
| if not np.any(in_range): | |
| return "UNKNOWN", None, None | |
| lats = lats[in_range] | |
| lons = lons[in_range] | |
| regions = np.asarray([region_from_lat_lon(float(lat), float(lon)) for lat, lon in zip(lats, lons)], dtype=object) | |
| uniq, counts = np.unique(regions, return_counts=True) | |
| top_idx = int(np.argmax(counts)) | |
| region = str(uniq[top_idx]) | |
| center_lat = float(np.median(lats)) | |
| center_lon = float(np.median(lons)) | |
| return region, center_lat, center_lon | |
| def resolve_vehicle_speed_signal(signal_name: str) -> str: | |
| if signal_name in _DI_SPEED_MSG.sigs: | |
| return signal_name | |
| hint = ", ".join(_DI_VEHICLE_SPEED_SIGNALS) if len(_DI_VEHICLE_SPEED_SIGNALS) else "<none>" | |
| raise ValueError(f"Unknown DI_speed signal '{signal_name}'. Available DI_vehicleSpeed* signals: {hint}") | |
| def colorize(text: str, *, fg: int | None = None, bold: bool = False, dim: bool = False) -> str: | |
| if not USE_COLOR: | |
| return text | |
| codes: list[str] = [] | |
| if bold: | |
| codes.append("1") | |
| if dim: | |
| codes.append("2") | |
| if fg is not None: | |
| codes.append(str(fg)) | |
| if not codes: | |
| return text | |
| return f"\033[{';'.join(codes)}m{text}\033[0m" | |
| def gray_meta(text: str) -> str: | |
| return colorize(text, fg=90) | |
| def decode_signal_value(dat: bytes | bytearray, sig) -> float: | |
| raw = get_raw_value(dat, sig) | |
| if sig.is_signed: | |
| raw -= ((raw >> (sig.size - 1)) & 0x1) * (1 << sig.size) | |
| return float(raw) * sig.factor + sig.offset | |
| def extract_raw_bit(dat: bytes | bytearray, bit_index: int) -> int: | |
| byte_index = bit_index // 8 | |
| if byte_index >= len(dat): | |
| return 0 | |
| return int((dat[byte_index] >> (bit_index % 8)) & 0x1) | |
| def sanitize_units(arr: np.ndarray) -> np.ndarray: | |
| # Convert arbitrary numeric array into strict {0: MPH, 1: KPH} | |
| return np.where(np.asarray(arr, dtype=float) >= 0.5, UNIT_KPH, UNIT_MPH).astype(int) | |
| def infer_units_from_speed_fit(ui_speed_raw: np.ndarray, vehicle_kph: np.ndarray) -> np.ndarray: | |
| # Compare DI_uiSpeed interpreted as KPH vs MPH against DI_vehicleSpeed (always kph). | |
| ui_as_kph_err = np.abs(ui_speed_raw - vehicle_kph) | |
| ui_as_mph_err = np.abs(ui_speed_raw * CV.MPH_TO_KPH - vehicle_kph) | |
| inferred = np.where(ui_as_kph_err <= ui_as_mph_err, UNIT_KPH, UNIT_MPH).astype(int) | |
| # Low-speed samples are ambiguous; assign them to dominant high-speed inferred unit. | |
| high_speed = np.isfinite(vehicle_kph) & (vehicle_kph >= MIN_SPEED_FOR_UNIT_SCORING_KPH) | |
| if np.any(high_speed): | |
| dominant = UNIT_KPH if np.mean(inferred[high_speed] == UNIT_KPH) >= 0.5 else UNIT_MPH | |
| inferred[~high_speed] = dominant | |
| return inferred | |
| def unit_series_fit_score_kph(ui_speed_raw: np.ndarray, vehicle_kph: np.ndarray, units: np.ndarray) -> float: | |
| units_bin = sanitize_units(units) | |
| ui_kph_est = np.where(units_bin == UNIT_KPH, ui_speed_raw, ui_speed_raw * CV.MPH_TO_KPH) | |
| err = np.abs(ui_kph_est - vehicle_kph) | |
| valid = np.isfinite(err) & np.isfinite(vehicle_kph) & (vehicle_kph >= MIN_SPEED_FOR_UNIT_SCORING_KPH) | |
| if np.sum(valid) < 50: | |
| valid = np.isfinite(err) | |
| if np.sum(valid) == 0: | |
| return float("inf") | |
| return float(np.median(err[valid])) | |
| def resolve_route_units(ui_speed_raw: np.ndarray, | |
| vehicle_kph: np.ndarray, | |
| unit_dbc: np.ndarray, | |
| unit_bit32: np.ndarray, | |
| unit_bit33: np.ndarray) -> tuple[np.ndarray, str]: | |
| inferred = infer_units_from_speed_fit(ui_speed_raw, vehicle_kph) | |
| candidates = { | |
| "inferred_fit": sanitize_units(inferred), | |
| "raw_bit33": sanitize_units(unit_bit33), | |
| "raw_bit32": sanitize_units(unit_bit32), | |
| "dbc_signal": sanitize_units(unit_dbc), | |
| } | |
| best_name = "inferred_fit" | |
| best_score = float("inf") | |
| scores: dict[str, float] = {} | |
| for name, units in candidates.items(): | |
| score = unit_series_fit_score_kph(ui_speed_raw, vehicle_kph, units) | |
| scores[name] = score | |
| if score < best_score: | |
| best_score = score | |
| best_name = name | |
| selected = candidates[best_name] | |
| selected = stabilize_unit_series(selected) | |
| source_labels = { | |
| "inferred_fit": "inferred_from_speed_fit", | |
| "raw_bit33": "raw_bit33", | |
| "raw_bit32": "raw_bit32", | |
| "dbc_signal": "dbc_di_uiSpeedUnits", | |
| } | |
| source_notes = { | |
| "inferred_fit": "units inferred by best fit of DI_uiSpeed to DI_vehicleSpeed", | |
| "raw_bit33": "units decoded from raw bit 33", | |
| "raw_bit32": "units decoded from raw bit 32", | |
| "dbc_signal": "units decoded from DBC DI_uiSpeedUnits signal", | |
| } | |
| ranked = sorted(((n, s) for n, s in scores.items() if np.isfinite(s)), key=lambda x: x[1]) | |
| margin = None | |
| if len(ranked) >= 2: | |
| margin = ranked[1][1] - ranked[0][1] | |
| counts = np.bincount(selected, minlength=2) | |
| total = int(np.sum(counts)) | |
| if total > 0 and counts[UNIT_KPH] > 0 and counts[UNIT_MPH] > 0: | |
| kph_pct = 100.0 * float(counts[UNIT_KPH]) / total | |
| mph_pct = 100.0 * float(counts[UNIT_MPH]) / total | |
| resolved_units = f"mixed(kph={kph_pct:.1f}%, mph={mph_pct:.1f}%)" | |
| elif counts[UNIT_MPH] > 0: | |
| resolved_units = "mph" | |
| else: | |
| resolved_units = "kph" | |
| score_table = ", ".join(f"{source_labels.get(n, n)}={s:.3f}" for n, s in ranked) | |
| label = source_labels.get(best_name, best_name) | |
| note = source_notes.get(best_name, "unit source selected by fit") | |
| if margin is not None: | |
| source_text = ( | |
| f"{label} ({note}; resolved_units={resolved_units}; fit_err={best_score:.3f} kph, " | |
| f"margin_to_next={margin:.3f} kph; candidate_errs_kph=[{score_table}])" | |
| ) | |
| else: | |
| source_text = ( | |
| f"{label} ({note}; resolved_units={resolved_units}; fit_err={best_score:.3f} kph; " | |
| f"candidate_errs_kph=[{score_table}])" | |
| ) | |
| return selected, source_text | |
| def stabilize_unit_series(units: np.ndarray) -> np.ndarray: | |
| # Keep true mixed-unit logs, but collapse tiny minority blips from ambiguous samples. | |
| units = sanitize_units(units) | |
| if units.size == 0: | |
| return units | |
| counts = np.bincount(units, minlength=2) | |
| dominant = UNIT_KPH if counts[UNIT_KPH] >= counts[UNIT_MPH] else UNIT_MPH | |
| minority_count = int(np.min(counts)) | |
| minority_frac = minority_count / float(units.size) | |
| if minority_count <= 20 or minority_frac < 0.005: | |
| return np.full_like(units, dominant) | |
| return units | |
| def scan_segment(lr: LogReader, party_bus: int = PARTY_BUS, vehicle_speed_signal: str = DEFAULT_VEHICLE_SPEED_SIGNAL) -> SegmentSamples: | |
| if vehicle_speed_signal not in _DI_SPEED_MSG.sigs: | |
| raise KeyError(f"Unknown DI_speed signal '{vehicle_speed_signal}'") | |
| veh_speed_sig = _DI_SPEED_MSG.sigs[vehicle_speed_signal] | |
| t: list[float] = [] | |
| ui_speed_raw: list[float] = [] | |
| unit_dbc: list[int] = [] | |
| unit_bit32: list[int] = [] | |
| unit_bit33: list[int] = [] | |
| vehicle_kph: list[float] = [] | |
| gps_lat: list[float] = [] | |
| gps_lon: list[float] = [] | |
| for msg in lr: | |
| which = msg.which() | |
| if which == "gpsLocation": | |
| g = msg.gpsLocation | |
| has_fix = bool(getattr(g, "hasFix", True)) | |
| if has_fix and np.isfinite(g.latitude) and np.isfinite(g.longitude) and abs(g.latitude) <= 90.0 and abs(g.longitude) <= 180.0: | |
| gps_lat.append(float(g.latitude)) | |
| gps_lon.append(float(g.longitude)) | |
| continue | |
| if which == "gpsLocationExternal": | |
| g = msg.gpsLocationExternal | |
| has_fix = bool(getattr(g, "hasFix", True)) | |
| if has_fix and np.isfinite(g.latitude) and np.isfinite(g.longitude) and abs(g.latitude) <= 90.0 and abs(g.longitude) <= 180.0: | |
| gps_lat.append(float(g.latitude)) | |
| gps_lon.append(float(g.longitude)) | |
| continue | |
| if which != "can": | |
| continue | |
| ts = msg.logMonoTime * 1e-9 | |
| for can in msg.can: | |
| if can.src != party_bus or can.address != _DI_SPEED_ADDR or len(can.dat) < 8: | |
| continue | |
| ui_speed = decode_signal_value(can.dat, _SIG_UI_SPEED) | |
| # Some DI_vehicleSpeed variants use all-ones raw as SNA (e.g. 13-bit -> 8191 -> 615.28 kph). | |
| veh_raw = get_raw_value(can.dat, veh_speed_sig) | |
| if veh_raw == ((1 << veh_speed_sig.size) - 1): | |
| continue | |
| veh_speed_kph = decode_signal_value(can.dat, veh_speed_sig) | |
| units_dbc = int(round(decode_signal_value(can.dat, _SIG_UI_UNITS))) | |
| bit32 = extract_raw_bit(can.dat, 32) | |
| bit33 = extract_raw_bit(can.dat, 33) | |
| # Ignore explicit SNA/out-of-range points. | |
| if not np.isfinite(ui_speed) or not np.isfinite(veh_speed_kph): | |
| continue | |
| if ui_speed >= 255 or ui_speed < 0: | |
| continue | |
| if veh_speed_kph < -5.0 or veh_speed_kph > 350.0: | |
| continue | |
| t.append(ts) | |
| ui_speed_raw.append(ui_speed) | |
| unit_dbc.append(units_dbc) | |
| unit_bit32.append(bit32) | |
| unit_bit33.append(bit33) | |
| vehicle_kph.append(veh_speed_kph) | |
| return SegmentSamples(t, ui_speed_raw, unit_dbc, unit_bit32, unit_bit33, vehicle_kph, gps_lat, gps_lon) | |
| def scan_segment_list(lr: LogReader, | |
| party_bus: int = PARTY_BUS, | |
| vehicle_speed_signal: str = DEFAULT_VEHICLE_SPEED_SIGNAL) -> list[SegmentSamples]: | |
| return [scan_segment(lr, party_bus=party_bus, vehicle_speed_signal=vehicle_speed_signal)] | |
| def normalize_route(route: str) -> str: | |
| return route.replace("%7C", "|").replace("%7c", "|") | |
| def resolve_available_rlogs(route: str) -> tuple[list[str], int, int]: | |
| sr = SegmentRange(route) | |
| requested_seg_idxs = list(sr.seg_idxs) | |
| remaining_seg_idxs = list(requested_seg_idxs) | |
| found_files: dict[int, str] = {} | |
| sources = [internal_source, comma_api_source, openpilotci_source, comma_car_segments_source] | |
| for source in sources: | |
| if len(remaining_seg_idxs) == 0: | |
| break | |
| try: | |
| files = source(sr, remaining_seg_idxs, FileName.RLOG) | |
| found_files.update(files) | |
| remaining_seg_idxs = [idx for idx in remaining_seg_idxs if idx not in found_files] | |
| except Exception: | |
| continue | |
| ordered_files = [found_files[idx] for idx in sorted(found_files.keys())] | |
| return ordered_files, len(remaining_seg_idxs), len(requested_seg_idxs) | |
| def _decode_fw_version_text(fw_version: bytes | bytearray | str) -> str: | |
| if isinstance(fw_version, (bytes, bytearray)): | |
| return bytes(fw_version).decode("utf-8", errors="replace").strip() | |
| return str(fw_version).strip() | |
| def get_eps_fw_from_logreader(lr: LogReader) -> str: | |
| cp = lr.first("carParams") | |
| if cp is None: | |
| return "n/a" | |
| eps_all: list[str] = [] | |
| eps_non_logging: list[str] = [] | |
| for fw in cp.carFw: | |
| if fw.ecu != Ecu.eps: | |
| continue | |
| fw_text = _decode_fw_version_text(fw.fwVersion) | |
| if len(fw_text) == 0: | |
| continue | |
| eps_all.append(fw_text) | |
| if not fw.logging: | |
| eps_non_logging.append(fw_text) | |
| selected = eps_non_logging if len(eps_non_logging) else eps_all | |
| if len(selected) == 0: | |
| return "n/a" | |
| return " | ".join(sorted(set(selected))) | |
| def route_cache_filename(route: str, vehicle_speed_signal: str = DEFAULT_VEHICLE_SPEED_SIGNAL) -> str: | |
| safe = route.replace("/", "_").replace("|", "_") | |
| sig_safe = vehicle_speed_signal.replace("/", "_") | |
| return os.path.join(CACHE_DIR, f"v{CACHE_VERSION}_{safe}_{sig_safe}.pkl") | |
| def load_or_scan_route(route: str, | |
| workers: int, | |
| no_cache: bool, | |
| vehicle_speed_signal: str = DEFAULT_VEHICLE_SPEED_SIGNAL) -> RouteSamples: | |
| cache_file = route_cache_filename(route, vehicle_speed_signal) | |
| if not no_cache and os.path.exists(cache_file): | |
| with open(cache_file, "rb") as f: | |
| cached = pickle.load(f) | |
| if isinstance(cached, dict) and cached.get("version") == CACHE_VERSION: | |
| data = cached["data"] | |
| return RouteSamples( | |
| route=route, | |
| t=np.asarray(data["t"], dtype=float), | |
| ui_speed_raw=np.asarray(data["ui_speed_raw"], dtype=float), | |
| ui_units=np.asarray(data["ui_units"], dtype=int), | |
| vehicle_kph=np.asarray(data["vehicle_kph"], dtype=float), | |
| vehicle_speed_signal=str(data.get("vehicle_speed_signal", vehicle_speed_signal)), | |
| unit_source=str(data.get("unit_source", "unknown")), | |
| eps_fw=str(data.get("eps_fw", "n/a")), | |
| region=str(data.get("region", "UNKNOWN")), | |
| region_center_lat=float(data["region_center_lat"]) if data.get("region_center_lat") is not None else None, | |
| region_center_lon=float(data["region_center_lon"]) if data.get("region_center_lon") is not None else None, | |
| ) | |
| rlog_files, missing_count, total_count = resolve_available_rlogs(route) | |
| if len(rlog_files) == 0: | |
| raise FileNotFoundError(f"No rlogs found for {route}") | |
| if missing_count > 0: | |
| print(colorize(f"Using partial rlogs for {route}: found {len(rlog_files)}/{total_count} segments", fg=33)) | |
| lr = LogReader(rlog_files, sort_by_time=True) | |
| eps_fw = get_eps_fw_from_logreader(lr) | |
| segment_results = lr.run_across_segments( | |
| workers, | |
| partial(scan_segment_list, party_bus=PARTY_BUS, vehicle_speed_signal=vehicle_speed_signal), | |
| disable_tqdm=True, | |
| ) | |
| t: list[float] = [] | |
| ui_speed_raw: list[float] = [] | |
| unit_dbc: list[int] = [] | |
| unit_bit32: list[int] = [] | |
| unit_bit33: list[int] = [] | |
| vehicle_kph: list[float] = [] | |
| gps_lat: list[float] = [] | |
| gps_lon: list[float] = [] | |
| for seg in segment_results: | |
| if seg is None: | |
| continue | |
| t.extend(seg.t) | |
| ui_speed_raw.extend(seg.ui_speed_raw) | |
| unit_dbc.extend(seg.unit_dbc) | |
| unit_bit32.extend(seg.unit_bit32) | |
| unit_bit33.extend(seg.unit_bit33) | |
| vehicle_kph.extend(seg.vehicle_kph) | |
| gps_lat.extend(seg.gps_lat) | |
| gps_lon.extend(seg.gps_lon) | |
| region, center_lat, center_lon = classify_route_region(np.asarray(gps_lat, dtype=float), np.asarray(gps_lon, dtype=float)) | |
| if len(t) == 0: | |
| rs = RouteSamples( | |
| route=route, | |
| t=np.array([], dtype=float), | |
| ui_speed_raw=np.array([], dtype=float), | |
| ui_units=np.array([], dtype=int), | |
| vehicle_kph=np.array([], dtype=float), | |
| vehicle_speed_signal=vehicle_speed_signal, | |
| unit_source="none", | |
| eps_fw=eps_fw, | |
| region=region, | |
| region_center_lat=center_lat, | |
| region_center_lon=center_lon, | |
| ) | |
| else: | |
| order = np.argsort(np.asarray(t, dtype=float)) | |
| t_np = np.asarray(t, dtype=float)[order] | |
| ui_np = np.asarray(ui_speed_raw, dtype=float)[order] | |
| dbc_np = np.asarray(unit_dbc, dtype=int)[order] | |
| b32_np = np.asarray(unit_bit32, dtype=int)[order] | |
| b33_np = np.asarray(unit_bit33, dtype=int)[order] | |
| veh_np = np.asarray(vehicle_kph, dtype=float)[order] | |
| units_np, unit_source = resolve_route_units(ui_np, veh_np, dbc_np, b32_np, b33_np) | |
| rs = RouteSamples( | |
| route=route, | |
| t=t_np, | |
| ui_speed_raw=ui_np, | |
| ui_units=units_np, | |
| vehicle_kph=veh_np, | |
| vehicle_speed_signal=vehicle_speed_signal, | |
| unit_source=unit_source, | |
| eps_fw=eps_fw, | |
| region=region, | |
| region_center_lat=center_lat, | |
| region_center_lon=center_lon, | |
| ) | |
| if not no_cache: | |
| with open(cache_file, "wb") as f: | |
| pickle.dump( | |
| { | |
| "version": CACHE_VERSION, | |
| "route": route, | |
| "data": { | |
| "t": rs.t, | |
| "ui_speed_raw": rs.ui_speed_raw, | |
| "ui_units": rs.ui_units, | |
| "vehicle_kph": rs.vehicle_kph, | |
| "vehicle_speed_signal": rs.vehicle_speed_signal, | |
| "unit_source": rs.unit_source, | |
| "eps_fw": rs.eps_fw, | |
| "region": rs.region, | |
| "region_center_lat": rs.region_center_lat, | |
| "region_center_lon": rs.region_center_lon, | |
| }, | |
| }, | |
| f, | |
| ) | |
| return rs | |
| def robust_percentile(values: np.ndarray, percentile: float) -> float | None: | |
| if values.size == 0: | |
| return None | |
| finite = values[np.isfinite(values)] | |
| if finite.size == 0: | |
| return None | |
| if finite.size < 25: | |
| if percentile >= 50: | |
| return float(np.max(finite)) | |
| return float(np.min(finite)) | |
| return float(np.percentile(finite, percentile)) | |
| def robust_trimmed_mean(values: np.ndarray, | |
| low_percentile: float = ROBUST_AVG_LOW_PERCENTILE, | |
| high_percentile: float = ROBUST_AVG_HIGH_PERCENTILE) -> float | None: | |
| if values.size == 0: | |
| return None | |
| finite = values[np.isfinite(values)] | |
| if finite.size == 0: | |
| return None | |
| if finite.size < 25: | |
| return float(np.mean(finite)) | |
| lo = float(np.percentile(finite, low_percentile)) | |
| hi = float(np.percentile(finite, high_percentile)) | |
| kept = finite[(finite >= lo) & (finite <= hi)] | |
| if kept.size == 0: | |
| return float(np.mean(finite)) | |
| return float(np.mean(kept)) | |
| def median_confidence(values: np.ndarray) -> tuple[float | None, float | None, float | None, float | None, float | None, float | None, int]: | |
| finite = values[np.isfinite(values)] | |
| if finite.size == 0: | |
| return None, None, None, None, None, None, 0 | |
| median = float(np.median(finite)) | |
| mad = float(np.median(np.abs(finite - median))) | |
| lo = float(np.percentile(finite, MEDIAN_TRIM_LOW_PERCENTILE)) | |
| hi = float(np.percentile(finite, MEDIAN_TRIM_HIGH_PERCENTILE)) | |
| kept = finite[(finite >= lo) & (finite <= hi)] | |
| trim_median = float(np.median(kept)) if kept.size else median | |
| trim_shift = abs(trim_median - median) | |
| boot_arr = finite | |
| if boot_arr.size > MEDIAN_BOOTSTRAP_MAX_POINTS: | |
| # Use deterministic downsampling to keep runtime predictable on long routes. | |
| rng_ds = np.random.default_rng(0) | |
| idx = rng_ds.choice(boot_arr.size, size=MEDIAN_BOOTSTRAP_MAX_POINTS, replace=False) | |
| boot_arr = boot_arr[idx] | |
| ci_low = median | |
| ci_high = median | |
| if boot_arr.size >= 3: | |
| rng_bs = np.random.default_rng(1) | |
| sample_idx = rng_bs.integers(0, boot_arr.size, size=(MEDIAN_BOOTSTRAP_ITERS, boot_arr.size)) | |
| boot_medians = np.median(boot_arr[sample_idx], axis=1) | |
| ci_low = float(np.percentile(boot_medians, 2.5)) | |
| ci_high = float(np.percentile(boot_medians, 97.5)) | |
| return ci_low, ci_high, float(ci_high - ci_low), mad, trim_median, float(trim_shift), int(finite.size) | |
| def convert_source_to_all_units(value_source: float | None, source_unit: int) -> tuple[float | None, float | None, float | None, float | None]: | |
| if value_source is None: | |
| return None, None, None, None | |
| if source_unit == UNIT_KPH: | |
| value_kph = value_source | |
| value_mph = value_source * CV.KPH_TO_MPH | |
| value_ms = value_source * CV.KPH_TO_MS | |
| return value_source, value_kph, value_mph, value_ms | |
| if source_unit == UNIT_MPH: | |
| value_mph = value_source | |
| value_kph = value_source * CV.MPH_TO_KPH | |
| value_ms = value_source * CV.MPH_TO_MS | |
| return value_source, value_kph, value_mph, value_ms | |
| value_kph = value_source | |
| value_mph = value_source * CV.KPH_TO_MPH | |
| value_ms = value_source * CV.KPH_TO_MS | |
| return value_source, value_kph, value_mph, value_ms | |
| def fmt(v: float | None, digits: int = 3) -> str: | |
| if v is None: | |
| return "n/a" | |
| return f"{v:.{digits}f}" | |
| def fmt_gain_percent(gain: float | None, digits: int = 2) -> str: | |
| if gain is None: | |
| return "n/a" | |
| return f"{gain * 100.0:.{digits}f}%" | |
| def format_all_units(label: str, value_source: float | None, source_unit: int) -> str: | |
| src, kph, mph, ms = convert_source_to_all_units(value_source, source_unit) | |
| return ( | |
| f"{label}: source({unit_name(source_unit)})={fmt(src)}, " | |
| f"metric={fmt(kph)} kph, imperial={fmt(mph)} mph, m/s={fmt(ms)}" | |
| ) | |
| def format_unit_values(value_source: float | None, source_unit: int) -> str: | |
| src, kph, mph, ms = convert_source_to_all_units(value_source, source_unit) | |
| return f"src={fmt(src)} {unit_name(source_unit)} | {fmt(kph)} kph | {fmt(mph)} mph | {fmt(ms)} m/s" | |
| def source_to_kph(value_source: float | None, source_unit: int) -> float | None: | |
| if value_source is None: | |
| return None | |
| if source_unit == UNIT_MPH: | |
| return float(value_source * CV.MPH_TO_KPH) | |
| return float(value_source) | |
| def format_kph(value_source: float | None, source_unit: int, digits: int = 3) -> str: | |
| return f"{fmt(source_to_kph(value_source, source_unit), digits)} kph" | |
| def sum_if_both(a: float | None, b: float | None) -> float | None: | |
| if a is None or b is None: | |
| return None | |
| if not np.isfinite(a) or not np.isfinite(b): | |
| return None | |
| return float(a + b) | |
| def print_metric_line(name: str, value_source: float | None, source_unit: int, indent: str = " ") -> None: | |
| print(f"{indent}{colorize(f'{name:<14}', fg=90)} {format_unit_values(value_source, source_unit)}") | |
| def print_median_confidence_lines(ci_low: float | None, | |
| ci_high: float | None, | |
| ci_width: float | None, | |
| mad: float | None, | |
| trim_shift: float | None, | |
| sample_count: int, | |
| source_unit: int, | |
| indent: str = " ") -> None: | |
| print(f"{indent}{colorize('median confidence', fg=90)}") | |
| print(f"{indent}{colorize('conf n'.ljust(14), fg=90)} {sample_count}") | |
| print_metric_line("ci95 low", ci_low, source_unit, indent=indent) | |
| print_metric_line("ci95 high", ci_high, source_unit, indent=indent) | |
| print_metric_line("ci95 width", ci_width, source_unit, indent=indent) | |
| print_metric_line("mad", mad, source_unit, indent=indent) | |
| print_metric_line("trim shift", trim_shift, source_unit, indent=indent) | |
| def print_median_confidence_legend() -> None: | |
| print(colorize("Median Confidence Legend", fg=34, bold=True)) | |
| print(f" {colorize('applies to:', fg=90)} normal median (full valid sample set)") | |
| print(f" {colorize('ci95 low/high:', fg=90)} 95% bootstrap interval of the median") | |
| print(f" {colorize('ci95 width:', fg=90)} ci95_high - ci95_low (smaller means more stable median)") | |
| print(f" {colorize('mad:', fg=90)} median absolute deviation around the median") | |
| print(f" {colorize('trim shift:', fg=90)} abs(median(10-90% trimmed) - median(full)); near 0 means robust") | |
| print() | |
| def compute_unit_stats(t: np.ndarray, | |
| ui_speed_raw: np.ndarray, | |
| vehicle_kph: np.ndarray, | |
| unit_code: int, | |
| compute_median_confidence: bool = False) -> UnitStats: | |
| if ui_speed_raw.size == 0: | |
| return UnitStats( | |
| unit_code=unit_code, | |
| sample_count=0, | |
| bias_mean_source=None, | |
| bias_avg_filtered_source=None, | |
| bias_median_source=None, | |
| bias_pos_max_source=None, | |
| bias_pos_max_robust_source=None, | |
| gain_avg_bias_comp=None, | |
| gain_avg_filtered_bias_comp=None, | |
| gain_sample_count=0, | |
| hysteresis_mean_source=None, | |
| hysteresis_avg_filtered_source=None, | |
| hysteresis_median_source=None, | |
| hysteresis_max_robust_source=None, | |
| hysteresis_boundary_count=0, | |
| transition_up_count=0, | |
| transition_down_count=0, | |
| bias_median_ci_low_source=None, | |
| bias_median_ci_high_source=None, | |
| bias_median_ci_width_source=None, | |
| bias_median_mad_source=None, | |
| bias_median_trimmed_source=None, | |
| bias_median_trim_shift_source=None, | |
| bias_median_conf_sample_count=0, | |
| hysteresis_median_ci_low_source=None, | |
| hysteresis_median_ci_high_source=None, | |
| hysteresis_median_ci_width_source=None, | |
| hysteresis_median_mad_source=None, | |
| hysteresis_median_trimmed_source=None, | |
| hysteresis_median_trim_shift_source=None, | |
| hysteresis_median_conf_sample_count=0, | |
| ) | |
| if unit_code == UNIT_KPH: | |
| vehicle_source = vehicle_kph | |
| else: | |
| vehicle_source = vehicle_kph * CV.KPH_TO_MPH | |
| speed_valid = np.isfinite(vehicle_source) & (vehicle_source > 0.0) | |
| if not np.any(speed_valid): | |
| return UnitStats( | |
| unit_code=unit_code, | |
| sample_count=0, | |
| bias_mean_source=None, | |
| bias_avg_filtered_source=None, | |
| bias_median_source=None, | |
| bias_pos_max_source=None, | |
| bias_pos_max_robust_source=None, | |
| gain_avg_bias_comp=None, | |
| gain_avg_filtered_bias_comp=None, | |
| gain_sample_count=0, | |
| hysteresis_mean_source=None, | |
| hysteresis_avg_filtered_source=None, | |
| hysteresis_median_source=None, | |
| hysteresis_max_robust_source=None, | |
| hysteresis_boundary_count=0, | |
| transition_up_count=0, | |
| transition_down_count=0, | |
| bias_median_ci_low_source=None, | |
| bias_median_ci_high_source=None, | |
| bias_median_ci_width_source=None, | |
| bias_median_mad_source=None, | |
| bias_median_trimmed_source=None, | |
| bias_median_trim_shift_source=None, | |
| bias_median_conf_sample_count=0, | |
| hysteresis_median_ci_low_source=None, | |
| hysteresis_median_ci_high_source=None, | |
| hysteresis_median_ci_width_source=None, | |
| hysteresis_median_mad_source=None, | |
| hysteresis_median_trimmed_source=None, | |
| hysteresis_median_trim_shift_source=None, | |
| hysteresis_median_conf_sample_count=0, | |
| ) | |
| t = t[speed_valid] | |
| ui_speed_raw = ui_speed_raw[speed_valid] | |
| vehicle_source = vehicle_source[speed_valid] | |
| bias_source = ui_speed_raw - vehicle_source | |
| finite_bias = bias_source[np.isfinite(bias_source)] | |
| bias_mean = float(np.mean(finite_bias)) if finite_bias.size else None | |
| bias_avg_filtered = robust_trimmed_mean(finite_bias) | |
| bias_median = float(np.median(finite_bias)) if finite_bias.size else None | |
| bias_pos_max = float(np.max(finite_bias)) if finite_bias.size else None | |
| bias_pos_max_robust = robust_percentile(finite_bias, ROBUST_MAX_PERCENTILE) | |
| gain_avg_bias_comp = None | |
| gain_avg_filtered_bias_comp = None | |
| gain_sample_count = 0 | |
| if bias_median is not None: | |
| gain_min_speed_source = MIN_SPEED_FOR_UNIT_SCORING_KPH if unit_code == UNIT_KPH else (MIN_SPEED_FOR_UNIT_SCORING_KPH * CV.KPH_TO_MPH) | |
| gain_valid = np.isfinite(ui_speed_raw) & np.isfinite(vehicle_source) & (vehicle_source >= gain_min_speed_source) | |
| if np.any(gain_valid): | |
| x = vehicle_source[gain_valid] | |
| y = ui_speed_raw[gain_valid] - bias_median | |
| denom = float(np.dot(x, x)) | |
| if denom > 1e-6: | |
| gain_avg_bias_comp = float(np.dot(x, y) / denom) | |
| ratio_valid = np.isfinite(x) & np.isfinite(y) & (np.abs(x) > 1e-3) | |
| if np.any(ratio_valid): | |
| gain_ratios = y[ratio_valid] / x[ratio_valid] | |
| gain_avg_filtered_bias_comp = robust_trimmed_mean(gain_ratios) | |
| gain_sample_count = int(np.sum(gain_valid)) | |
| transition_up_count = 0 | |
| transition_down_count = 0 | |
| widths: list[float] = [] | |
| if ui_speed_raw.size >= 2: | |
| dt = np.diff(t) | |
| prev_ui = ui_speed_raw[:-1] | |
| curr_ui = ui_speed_raw[1:] | |
| step = curr_ui - prev_ui | |
| veh_next = vehicle_source[1:] | |
| valid = np.isfinite(dt) & np.isfinite(step) & np.isfinite(veh_next) | |
| valid &= (dt > 0.0) & (dt <= MAX_TRANSITION_DT_S) | |
| valid &= (np.abs(step) == 1.0) | |
| up_by_boundary: dict[int, list[float]] = {} | |
| down_by_boundary: dict[int, list[float]] = {} | |
| idxs = np.where(valid)[0] | |
| for i in idxs: | |
| s = step[i] | |
| if s > 0: | |
| boundary = int(round(prev_ui[i])) | |
| up_by_boundary.setdefault(boundary, []).append(float(veh_next[i])) | |
| elif s < 0: | |
| boundary = int(round(curr_ui[i])) | |
| down_by_boundary.setdefault(boundary, []).append(float(veh_next[i])) | |
| transition_up_count = int(sum(len(v) for v in up_by_boundary.values())) | |
| transition_down_count = int(sum(len(v) for v in down_by_boundary.values())) | |
| for boundary in sorted(set(up_by_boundary.keys()) & set(down_by_boundary.keys())): | |
| up_speeds = np.asarray(up_by_boundary[boundary], dtype=float) | |
| down_speeds = np.asarray(down_by_boundary[boundary], dtype=float) | |
| if up_speeds.size == 0 or down_speeds.size == 0: | |
| continue | |
| widths.append(float(np.median(up_speeds) - np.median(down_speeds))) | |
| widths_np = np.asarray(widths, dtype=float) | |
| hyst_pm_np = widths_np * 0.5 | |
| hysteresis_mean = float(np.mean(hyst_pm_np)) if hyst_pm_np.size else None | |
| hysteresis_avg_filtered = robust_trimmed_mean(hyst_pm_np) | |
| hysteresis_median = float(np.median(hyst_pm_np)) if hyst_pm_np.size else None | |
| hysteresis_max_robust = robust_percentile(hyst_pm_np, ROBUST_MAX_PERCENTILE) | |
| bias_ci_low = bias_ci_high = bias_ci_width = bias_mad = bias_trimmed = bias_trim_shift = None | |
| bias_conf_n = 0 | |
| hyst_ci_low = hyst_ci_high = hyst_ci_width = hyst_mad = hyst_trimmed = hyst_trim_shift = None | |
| hyst_conf_n = 0 | |
| if compute_median_confidence: | |
| (bias_ci_low, bias_ci_high, bias_ci_width, | |
| bias_mad, bias_trimmed, bias_trim_shift, bias_conf_n) = median_confidence(finite_bias) | |
| (hyst_ci_low, hyst_ci_high, hyst_ci_width, | |
| hyst_mad, hyst_trimmed, hyst_trim_shift, hyst_conf_n) = median_confidence(hyst_pm_np) | |
| return UnitStats( | |
| unit_code=unit_code, | |
| sample_count=int(ui_speed_raw.size), | |
| bias_mean_source=bias_mean, | |
| bias_avg_filtered_source=bias_avg_filtered, | |
| bias_median_source=bias_median, | |
| bias_pos_max_source=bias_pos_max, | |
| bias_pos_max_robust_source=bias_pos_max_robust, | |
| gain_avg_bias_comp=gain_avg_bias_comp, | |
| gain_avg_filtered_bias_comp=gain_avg_filtered_bias_comp, | |
| gain_sample_count=gain_sample_count, | |
| hysteresis_mean_source=hysteresis_mean, | |
| hysteresis_avg_filtered_source=hysteresis_avg_filtered, | |
| hysteresis_median_source=hysteresis_median, | |
| hysteresis_max_robust_source=hysteresis_max_robust, | |
| hysteresis_boundary_count=int(hyst_pm_np.size), | |
| transition_up_count=transition_up_count, | |
| transition_down_count=transition_down_count, | |
| bias_median_ci_low_source=bias_ci_low, | |
| bias_median_ci_high_source=bias_ci_high, | |
| bias_median_ci_width_source=bias_ci_width, | |
| bias_median_mad_source=bias_mad, | |
| bias_median_trimmed_source=bias_trimmed, | |
| bias_median_trim_shift_source=bias_trim_shift, | |
| bias_median_conf_sample_count=bias_conf_n, | |
| hysteresis_median_ci_low_source=hyst_ci_low, | |
| hysteresis_median_ci_high_source=hyst_ci_high, | |
| hysteresis_median_ci_width_source=hyst_ci_width, | |
| hysteresis_median_mad_source=hyst_mad, | |
| hysteresis_median_trimmed_source=hyst_trimmed, | |
| hysteresis_median_trim_shift_source=hyst_trim_shift, | |
| hysteresis_median_conf_sample_count=hyst_conf_n, | |
| ) | |
| def print_unit_summary(stats: UnitStats) -> None: | |
| unit = stats.unit_code | |
| print(f" {colorize(f'[{unit_name(unit)}]', fg=32, bold=True)} samples={stats.sample_count}") | |
| print(f" {colorize('Gain (bias-compensated)', bold=True)}") | |
| avg_label = colorize("avg".ljust(14), fg=90) | |
| avg_filtered_label = colorize("avg_filtered".ljust(14), fg=90) | |
| print(f" {avg_label} {fmt(stats.gain_avg_bias_comp, 4)} (unitless, n={stats.gain_sample_count})") | |
| print(f" {avg_filtered_label} {fmt(stats.gain_avg_filtered_bias_comp, 4)} (trimmed, n={stats.gain_sample_count})") | |
| print(f" {colorize('Bias (DI_uiSpeed - DI_vehicleSpeed)', bold=True)}") | |
| print_metric_line("mean", stats.bias_mean_source, unit) | |
| print_metric_line("avg_filtered", stats.bias_avg_filtered_source, unit) | |
| print_metric_line("median", stats.bias_median_source, unit) | |
| print_metric_line("median 10-90", stats.bias_median_trimmed_source, unit) | |
| if stats.bias_median_conf_sample_count > 0: | |
| print() | |
| print_median_confidence_lines( | |
| stats.bias_median_ci_low_source, | |
| stats.bias_median_ci_high_source, | |
| stats.bias_median_ci_width_source, | |
| stats.bias_median_mad_source, | |
| stats.bias_median_trim_shift_source, | |
| stats.bias_median_conf_sample_count, | |
| unit, | |
| ) | |
| print(f" {colorize('Hysteresis +/- (0.5 * (up-threshold - down-threshold))', bold=True)}") | |
| levels_label = colorize("levels".ljust(14), fg=90) | |
| transitions_label = colorize("transitions".ljust(14), fg=90) | |
| print(f" {levels_label} {stats.hysteresis_boundary_count}") | |
| print(f" {transitions_label} up={stats.transition_up_count} down={stats.transition_down_count}") | |
| print_metric_line("mean", stats.hysteresis_mean_source, unit) | |
| print_metric_line("avg_filtered", stats.hysteresis_avg_filtered_source, unit) | |
| print_metric_line(f"p{ROBUST_MAX_PERCENTILE:g} max", stats.hysteresis_max_robust_source, unit) | |
| print_metric_line("median", stats.hysteresis_median_source, unit) | |
| print_metric_line("median 10-90", stats.hysteresis_median_trimmed_source, unit) | |
| if stats.hysteresis_median_conf_sample_count > 0: | |
| print() | |
| print_median_confidence_lines( | |
| stats.hysteresis_median_ci_low_source, | |
| stats.hysteresis_median_ci_high_source, | |
| stats.hysteresis_median_ci_width_source, | |
| stats.hysteresis_median_mad_source, | |
| stats.hysteresis_median_trim_shift_source, | |
| stats.hysteresis_median_conf_sample_count, | |
| unit, | |
| ) | |
| def print_route_stats(samples: RouteSamples) -> None: | |
| route = samples.route | |
| route_with_fw = f"{route} [{samples.eps_fw}]" | |
| if samples.t.size == 0: | |
| print("\n" + colorize("=" * 96, fg=36, bold=True)) | |
| print(colorize(f"ROUTE: {route_with_fw}", fg=36, bold=True)) | |
| print(colorize("no DI_speed samples found", fg=31, bold=True)) | |
| return | |
| unit_vals, unit_counts = np.unique(samples.ui_units, return_counts=True) | |
| counts = ", ".join(f"{unit_name(int(u))}={int(c)}" for u, c in zip(unit_vals, unit_counts)) | |
| print("\n" + colorize("=" * 96, fg=36, bold=True)) | |
| print(colorize(f"ROUTE: {route_with_fw}", fg=36, bold=True)) | |
| print(f"{colorize('vehicle_speed_signal:', fg=90)} {samples.vehicle_speed_signal}") | |
| print(f"{colorize('unit_source:', fg=90)} {samples.unit_source}") | |
| region_text = samples.region | |
| if samples.region_center_lat is not None and samples.region_center_lon is not None: | |
| region_text += f" ({samples.region_center_lat:.3f}, {samples.region_center_lon:.3f})" | |
| print(f"{colorize('region:', fg=90)} {region_text}") | |
| print(f"{colorize('unit_counts:', fg=90)} {counts}") | |
| for u in unit_vals: | |
| unit = int(u) | |
| mask = samples.ui_units == unit | |
| stats = compute_unit_stats( | |
| samples.t[mask], | |
| samples.ui_speed_raw[mask], | |
| samples.vehicle_kph[mask], | |
| unit, | |
| compute_median_confidence=True, | |
| ) | |
| print_unit_summary(stats) | |
| def finite_array(values: list[float | None]) -> np.ndarray: | |
| vals = np.asarray([v for v in values if v is not None and np.isfinite(v)], dtype=float) | |
| return vals | |
| def summarize_values(values: list[float | None]) -> tuple[float | None, float | None, float | None, float | None, float | None, int]: | |
| arr = finite_array(values) | |
| if arr.size == 0: | |
| return None, None, None, None, None, 0 | |
| mean = float(np.mean(arr)) | |
| std = float(np.std(arr)) | |
| vmin = float(np.min(arr)) | |
| vmax = float(np.max(arr)) | |
| span = float(vmax - vmin) | |
| return mean, std, vmin, vmax, span, int(arr.size) | |
| def route_metric_stats_by_unit(per_route_samples: list[RouteSamples]) -> dict[int, list[tuple[str, UnitStats]]]: | |
| unit_stats: dict[int, list[tuple[str, UnitStats]]] = {} | |
| for rs in per_route_samples: | |
| if rs.t.size == 0: | |
| continue | |
| for u in np.unique(rs.ui_units): | |
| unit = int(u) | |
| mask = rs.ui_units == unit | |
| stats = compute_unit_stats(rs.t[mask], rs.ui_speed_raw[mask], rs.vehicle_kph[mask], unit) | |
| if stats.sample_count == 0: | |
| continue | |
| unit_stats.setdefault(unit, []).append((rs.route, stats)) | |
| return unit_stats | |
| def route_metric_stats_by_unit_and_fw(per_route_samples: list[RouteSamples]) -> dict[int, dict[str, list[tuple[str, UnitStats]]]]: | |
| by_unit_fw: dict[int, dict[str, list[tuple[str, UnitStats]]]] = {} | |
| for rs in per_route_samples: | |
| if rs.t.size == 0: | |
| continue | |
| fw = rs.eps_fw if len(rs.eps_fw) else "n/a" | |
| for u in np.unique(rs.ui_units): | |
| unit = int(u) | |
| mask = rs.ui_units == unit | |
| stats = compute_unit_stats(rs.t[mask], rs.ui_speed_raw[mask], rs.vehicle_kph[mask], unit) | |
| if stats.sample_count == 0: | |
| continue | |
| by_unit_fw.setdefault(unit, {}).setdefault(fw, []).append((rs.route, stats)) | |
| return by_unit_fw | |
| def route_metric_stats_by_unit_and_region(per_route_samples: list[RouteSamples]) -> dict[int, dict[str, list[tuple[str, UnitStats]]]]: | |
| by_unit_region: dict[int, dict[str, list[tuple[str, UnitStats]]]] = {} | |
| for rs in per_route_samples: | |
| if rs.t.size == 0: | |
| continue | |
| region_group = rs.region | |
| for u in np.unique(rs.ui_units): | |
| unit = int(u) | |
| mask = rs.ui_units == unit | |
| stats = compute_unit_stats(rs.t[mask], rs.ui_speed_raw[mask], rs.vehicle_kph[mask], unit) | |
| if stats.sample_count == 0: | |
| continue | |
| by_unit_region.setdefault(unit, {}).setdefault(region_group, []).append((rs.route, stats)) | |
| return by_unit_region | |
| def firmware_metric_lists_from_rows(rows: list[tuple[str, UnitStats]], | |
| unit: int, | |
| route_fw: dict[str, str]) -> tuple[list[float | None], list[float | None], list[float | None], list[float | None], list[str]]: | |
| by_fw: dict[str, dict[str, list[float | None]]] = {} | |
| for route, stats in rows: | |
| fw = route_fw.get(route, "n/a") | |
| bucket = by_fw.setdefault(fw, {"gain": [], "bias": [], "hyst": [], "disp": []}) | |
| bias_kph = source_to_kph(stats.bias_median_source, unit) | |
| hyst_kph = source_to_kph(stats.hysteresis_median_source, unit) | |
| bucket["gain"].append(stats.gain_avg_bias_comp) | |
| bucket["bias"].append(bias_kph) | |
| bucket["hyst"].append(hyst_kph) | |
| bucket["disp"].append(sum_if_both(bias_kph, hyst_kph)) | |
| fw_ids = sorted(by_fw.keys()) | |
| gain_vals: list[float | None] = [] | |
| bias_vals: list[float | None] = [] | |
| hyst_vals: list[float | None] = [] | |
| disp_vals: list[float | None] = [] | |
| for fw in fw_ids: | |
| g = finite_array(by_fw[fw]["gain"]) | |
| b = finite_array(by_fw[fw]["bias"]) | |
| h = finite_array(by_fw[fw]["hyst"]) | |
| d = finite_array(by_fw[fw]["disp"]) | |
| gain_vals.append(float(np.median(g)) if g.size else None) | |
| bias_vals.append(float(np.median(b)) if b.size else None) | |
| hyst_vals.append(float(np.median(h)) if h.size else None) | |
| disp_vals.append(float(np.median(d)) if d.size else None) | |
| return gain_vals, bias_vals, hyst_vals, disp_vals, fw_ids | |
| def user_id_from_route(route: str) -> str: | |
| normalized = normalize_route(route) | |
| return normalized.split("/", 1)[0] | |
| def route_context_text(route: str, | |
| route_meta: dict[str, RouteSamples] | None = None, | |
| *, | |
| region: str | None = None, | |
| fw: str | None = None, | |
| include_region: bool = True) -> str: | |
| if route_meta is not None: | |
| rs = route_meta.get(route) | |
| if rs is not None: | |
| if region is None: | |
| region = rs.region | |
| if fw is None: | |
| fw = rs.eps_fw | |
| if not SHOW_METADATA: | |
| return route | |
| parts = [route] | |
| if include_region: | |
| parts.append(f"region={region if region is not None else 'UNKNOWN'}") | |
| parts.append(f"user={user_id_from_route(route)}") | |
| parts.append(f"fw={fw if fw is not None else 'n/a'}") | |
| return ", ".join(parts) | |
| def print_consistency_summary(per_route_samples: list[RouteSamples]) -> None: | |
| by_unit = route_metric_stats_by_unit(per_route_samples) | |
| route_eps = {rs.route: rs.eps_fw for rs in per_route_samples} | |
| route_region = {rs.route: rs.region for rs in per_route_samples} | |
| print("\n" + colorize("Compare Routes (kph-normalized)", fg=35, bold=True)) | |
| if len(by_unit) == 0: | |
| print(f" {colorize('no unit-grouped route data available', fg=31)}") | |
| return | |
| for unit in sorted(by_unit.keys()): | |
| rows = by_unit[unit] | |
| unit_routes = [route for route, _ in rows] | |
| unit_firmwares = len({route_eps.get(route, "n/a") for route in unit_routes}) | |
| unit_regions = len({route_region.get(route, "UNKNOWN") for route in unit_routes}) | |
| unit_users = len({user_id_from_route(route) for route in unit_routes}) | |
| if SHOW_METADATA: | |
| print( | |
| f" {colorize(f'[{unit_name(unit)}]', fg=32, bold=True)} " | |
| f"routes_with_data={len(rows)}, firmwares={unit_firmwares}, regions={unit_regions}, users={unit_users}" | |
| ) | |
| else: | |
| print( | |
| f" {colorize(f'[{unit_name(unit)}]', fg=32, bold=True)} " | |
| f"routes_with_data={len(rows)}" | |
| ) | |
| if len(rows) < 2: | |
| print(f" {colorize('not enough routes for consistency comparison', fg=33)}") | |
| else: | |
| bias_medians = [source_to_kph(s.bias_median_source, unit) for _, s in rows] | |
| gain_avgs = [s.gain_avg_bias_comp for _, s in rows] | |
| hyst_medians = [source_to_kph(s.hysteresis_median_source, unit) for _, s in rows] | |
| disp_comp_medians = [sum_if_both(source_to_kph(s.bias_median_source, unit), source_to_kph(s.hysteresis_median_source, unit)) | |
| for _, s in rows] | |
| _, _, _, _, bias_span, bias_n = summarize_values(bias_medians) | |
| _, _, _, _, gain_span, gain_n = summarize_values(gain_avgs) | |
| _, _, _, _, hyst_span, hyst_n = summarize_values(hyst_medians) | |
| _, _, _, _, disp_comp_span, disp_comp_n = summarize_values(disp_comp_medians) | |
| print(f" gain(avg) diff {gray_meta(f'(n={gain_n})')}: {fmt_gain_percent(gain_span, 3)}") | |
| print(f" bias_median span {gray_meta(f'(n={bias_n})')}: {fmt(bias_span)} kph") | |
| print(f" hysteresis(+/-)_median span {gray_meta(f'(n={hyst_n})')}: {fmt(hyst_span)} kph") | |
| print(f" display_comp span {gray_meta(f'(n={disp_comp_n})')}: {fmt(disp_comp_span)} kph") | |
| print(f" {colorize('route values:', fg=36, bold=True)}") | |
| for route, stats in rows: | |
| eps_fw = route_eps.get(route, "n/a") | |
| region = route_region.get(route, "UNKNOWN") | |
| route_label = colorize(route, fg=90) | |
| fw_label = colorize(f"[{eps_fw}, {region}]", bold=True) | |
| print( | |
| f" {route_label} {fw_label}:" | |
| ) | |
| print( | |
| f" " | |
| f"gain_avg={fmt(stats.gain_avg_bias_comp, 4)}, " | |
| f"bias_median={format_kph(stats.bias_median_source, unit)}, " | |
| f"hysteresis(+/-)_median={format_kph(stats.hysteresis_median_source, unit)}, " | |
| f"display_comp={fmt(sum_if_both(source_to_kph(stats.bias_median_source, unit), source_to_kph(stats.hysteresis_median_source, unit)))} kph" | |
| ) | |
| def print_firmware_consistency_summary(per_route_samples: list[RouteSamples]) -> None: | |
| by_unit_fw = route_metric_stats_by_unit_and_fw(per_route_samples) | |
| route_region = {rs.route: rs.region for rs in per_route_samples} | |
| print("\n" + colorize("Consistency Across Firmwares (kph-normalized)", fg=35, bold=True)) | |
| if len(by_unit_fw) == 0: | |
| print(f" {colorize('no unit-grouped firmware data available', fg=31)}") | |
| return | |
| for unit in sorted(by_unit_fw.keys()): | |
| fw_rows = by_unit_fw[unit] | |
| firmwares = sorted(fw_rows.keys()) | |
| all_rows = [row for fw in firmwares for row in fw_rows[fw]] | |
| unit_routes = [route for route, _ in all_rows] | |
| unit_regions = len({route_region.get(route, "UNKNOWN") for route in unit_routes}) | |
| unit_users = len({user_id_from_route(route) for route in unit_routes}) | |
| print( | |
| f" {colorize(f'[{unit_name(unit)}]', fg=32, bold=True)} " | |
| f"firmwares_with_data={len(firmwares)}, routes={len(all_rows)}, regions={unit_regions}, users={unit_users}" | |
| ) | |
| if len(firmwares) < 2: | |
| print(f" {colorize('not enough firmwares for consistency comparison', fg=33)}") | |
| else: | |
| fw_gain_vals: list[float | None] = [] | |
| fw_bias_vals: list[float | None] = [] | |
| fw_hyst_vals: list[float | None] = [] | |
| fw_disp_comp_vals: list[float | None] = [] | |
| for fw in firmwares: | |
| rows = fw_rows[fw] | |
| gains = finite_array([s.gain_avg_bias_comp for _, s in rows]) | |
| biases = finite_array([source_to_kph(s.bias_median_source, unit) for _, s in rows]) | |
| hysts = finite_array([source_to_kph(s.hysteresis_median_source, unit) for _, s in rows]) | |
| disp_comps = finite_array([sum_if_both(source_to_kph(s.bias_median_source, unit), source_to_kph(s.hysteresis_median_source, unit)) | |
| for _, s in rows]) | |
| fw_gain_vals.append(float(np.median(gains)) if gains.size else None) | |
| fw_bias_vals.append(float(np.median(biases)) if biases.size else None) | |
| fw_hyst_vals.append(float(np.median(hysts)) if hysts.size else None) | |
| fw_disp_comp_vals.append(float(np.median(disp_comps)) if disp_comps.size else None) | |
| _, _, _, _, gain_span, gain_n = summarize_values(fw_gain_vals) | |
| _, _, _, _, bias_span, bias_n = summarize_values(fw_bias_vals) | |
| _, _, _, _, hyst_span, hyst_n = summarize_values(fw_hyst_vals) | |
| _, _, _, _, disp_comp_span, disp_comp_n = summarize_values(fw_disp_comp_vals) | |
| speed_margin_kph = source_to_kph(CONSISTENCY_MARGIN_SPEED_SOURCE, unit) | |
| gain_consistent = gain_n >= 2 and gain_span is not None and gain_span <= CONSISTENCY_MARGIN_GAIN | |
| bias_consistent = bias_n >= 2 and bias_span is not None and speed_margin_kph is not None and bias_span <= speed_margin_kph | |
| hyst_consistent = hyst_n >= 2 and hyst_span is not None and speed_margin_kph is not None and hyst_span <= speed_margin_kph | |
| disp_comp_consistent = disp_comp_n >= 2 and disp_comp_span is not None and speed_margin_kph is not None and disp_comp_span <= speed_margin_kph | |
| gain_label = colorize("consistent" if gain_consistent else "not consistent", fg=32 if gain_consistent else 31, bold=True) | |
| bias_label = colorize("consistent" if bias_consistent else "not consistent", fg=32 if bias_consistent else 31, bold=True) | |
| hyst_label = colorize("consistent" if hyst_consistent else "not consistent", fg=32 if hyst_consistent else 31, bold=True) | |
| disp_comp_label = colorize("consistent" if disp_comp_consistent else "not consistent", fg=32 if disp_comp_consistent else 31, bold=True) | |
| print(f" gain(avg) diff {gray_meta(f'(n={gain_n})')}: {fmt_gain_percent(gain_span, 3)} ({gain_label})") | |
| print(f" bias_median span {gray_meta(f'(n={bias_n})')}: {fmt(bias_span)} kph ({bias_label})") | |
| print(f" hysteresis(+/-)_median span {gray_meta(f'(n={hyst_n})')}: {fmt(hyst_span)} kph ({hyst_label})") | |
| print(f" display_comp span {gray_meta(f'(n={disp_comp_n})')}: {fmt(disp_comp_span)} kph ({disp_comp_label})") | |
| print(f" {colorize('firmware values:', fg=36, bold=True)}") | |
| for fw in firmwares: | |
| rows = fw_rows[fw] | |
| gains_raw = [s.gain_avg_bias_comp for _, s in rows] | |
| biases_raw = [source_to_kph(s.bias_median_source, unit) for _, s in rows] | |
| hysts_raw = [source_to_kph(s.hysteresis_median_source, unit) for _, s in rows] | |
| disp_comps_raw = [sum_if_both(source_to_kph(s.bias_median_source, unit), source_to_kph(s.hysteresis_median_source, unit)) | |
| for _, s in rows] | |
| gains = finite_array(gains_raw) | |
| biases = finite_array(biases_raw) | |
| hysts = finite_array(hysts_raw) | |
| disp_comps = finite_array(disp_comps_raw) | |
| gain_v = float(np.median(gains)) if gains.size else None | |
| bias_v = float(np.median(biases)) if biases.size else None | |
| hyst_v = float(np.median(hysts)) if hysts.size else None | |
| disp_comp_v = float(np.median(disp_comps)) if disp_comps.size else None | |
| _, _, _, _, gain_span_fw, gain_n_fw = summarize_values(gains_raw) | |
| _, _, _, _, bias_span_fw, bias_n_fw = summarize_values(biases_raw) | |
| _, _, _, _, hyst_span_fw, hyst_n_fw = summarize_values(hysts_raw) | |
| _, _, _, _, disp_comp_span_fw, disp_comp_n_fw = summarize_values(disp_comps_raw) | |
| speed_margin_kph = source_to_kph(CONSISTENCY_MARGIN_SPEED_SOURCE, unit) | |
| if gain_n_fw < 2: | |
| gain_label_fw = colorize("single-route", fg=33, bold=True) | |
| else: | |
| gain_consistent_fw = gain_span_fw is not None and gain_span_fw <= CONSISTENCY_MARGIN_GAIN | |
| gain_label_fw = colorize("consistent" if gain_consistent_fw else "not consistent", fg=32 if gain_consistent_fw else 31, bold=True) | |
| if bias_n_fw < 2: | |
| bias_label_fw = colorize("single-route", fg=33, bold=True) | |
| else: | |
| bias_consistent_fw = bias_span_fw is not None and speed_margin_kph is not None and bias_span_fw <= speed_margin_kph | |
| bias_label_fw = colorize("consistent" if bias_consistent_fw else "not consistent", fg=32 if bias_consistent_fw else 31, bold=True) | |
| if hyst_n_fw < 2: | |
| hyst_label_fw = colorize("single-route", fg=33, bold=True) | |
| else: | |
| hyst_consistent_fw = hyst_span_fw is not None and speed_margin_kph is not None and hyst_span_fw <= speed_margin_kph | |
| hyst_label_fw = colorize("consistent" if hyst_consistent_fw else "not consistent", fg=32 if hyst_consistent_fw else 31, bold=True) | |
| if disp_comp_n_fw < 2: | |
| disp_comp_label_fw = colorize("single-route", fg=33, bold=True) | |
| else: | |
| disp_comp_consistent_fw = disp_comp_span_fw is not None and speed_margin_kph is not None and disp_comp_span_fw <= speed_margin_kph | |
| disp_comp_label_fw = colorize("consistent" if disp_comp_consistent_fw else "not consistent", fg=32 if disp_comp_consistent_fw else 31, bold=True) | |
| fw_label = colorize(f"[{fw}]", bold=True) | |
| region_count = len({route_region.get(route, "UNKNOWN") for route, _ in rows}) | |
| user_count = len({user_id_from_route(route) for route, _ in rows}) | |
| print(f" {fw_label} routes={len(rows)}: regions={region_count}, users={user_count}") | |
| print( | |
| f" gain_avg={fmt(gain_v, 4)} " | |
| f"({gray_meta(f'diff={fmt_gain_percent(gain_span_fw, 3)}, n={gain_n_fw}')}, {gain_label_fw})" | |
| ) | |
| print( | |
| f" bias_median={fmt(bias_v)} kph " | |
| f"({gray_meta(f'span={fmt(bias_span_fw)} kph, n={bias_n_fw}')}, {bias_label_fw})" | |
| ) | |
| print( | |
| f" hysteresis(+/-)_median={fmt(hyst_v)} kph " | |
| f"({gray_meta(f'span={fmt(hyst_span_fw)} kph, n={hyst_n_fw}')}, {hyst_label_fw})" | |
| ) | |
| print( | |
| f" display_comp={fmt(disp_comp_v)} kph " | |
| f"({gray_meta(f'span={fmt(disp_comp_span_fw)} kph, n={disp_comp_n_fw}')}, {disp_comp_label_fw})" | |
| ) | |
| def print_region_consistency_summary(per_route_samples: list[RouteSamples]) -> None: | |
| by_unit_region = route_metric_stats_by_unit_and_region(per_route_samples) | |
| route_fw = {rs.route: (rs.eps_fw if len(rs.eps_fw) else "n/a") for rs in per_route_samples} | |
| route_meta = {rs.route: rs for rs in per_route_samples} | |
| print("\n" + colorize("Consistency Across Firmwares in different Regions (kph-normalized)", fg=35, bold=True)) | |
| if len(by_unit_region) == 0: | |
| print(f" {colorize('no unit-grouped region data available', fg=31)}") | |
| return | |
| for unit in sorted(by_unit_region.keys()): | |
| region_rows = by_unit_region[unit] | |
| regions = sorted(region_rows.keys()) | |
| print(f" {colorize(f'[{unit_name(unit)}]', fg=32, bold=True)} regions_with_data={len(regions)}") | |
| if len(regions) < 2: | |
| print(f" {colorize('not enough regions for consistency comparison', fg=33)}") | |
| else: | |
| region_gain_vals: list[float | None] = [] | |
| region_bias_vals: list[float | None] = [] | |
| region_hyst_vals: list[float | None] = [] | |
| region_disp_vals: list[float | None] = [] | |
| region_gain_pairs: list[tuple[str, float | None]] = [] | |
| region_bias_pairs: list[tuple[str, float | None]] = [] | |
| region_hyst_pairs: list[tuple[str, float | None]] = [] | |
| region_disp_pairs: list[tuple[str, float | None]] = [] | |
| for region in regions: | |
| rows = region_rows[region] | |
| gain_fw_vals, bias_fw_vals, hyst_fw_vals, disp_fw_vals, _ = firmware_metric_lists_from_rows(rows, unit, route_fw) | |
| gains = finite_array(gain_fw_vals) | |
| biases = finite_array(bias_fw_vals) | |
| hysts = finite_array(hyst_fw_vals) | |
| disp = finite_array(disp_fw_vals) | |
| region_gain_v = float(np.median(gains)) if gains.size else None | |
| region_bias_v = float(np.median(biases)) if biases.size else None | |
| region_hyst_v = float(np.median(hysts)) if hysts.size else None | |
| region_disp_v = float(np.median(disp)) if disp.size else None | |
| region_gain_vals.append(region_gain_v) | |
| region_bias_vals.append(region_bias_v) | |
| region_hyst_vals.append(region_hyst_v) | |
| region_disp_vals.append(region_disp_v) | |
| region_gain_pairs.append((region, region_gain_v)) | |
| region_bias_pairs.append((region, region_bias_v)) | |
| region_hyst_pairs.append((region, region_hyst_v)) | |
| region_disp_pairs.append((region, region_disp_v)) | |
| _, _, _, _, gain_span, gain_n = summarize_values(region_gain_vals) | |
| _, _, _, _, bias_span, bias_n = summarize_values(region_bias_vals) | |
| _, _, _, _, hyst_span, hyst_n = summarize_values(region_hyst_vals) | |
| _, _, _, _, disp_span, disp_n = summarize_values(region_disp_vals) | |
| speed_margin_kph = source_to_kph(CONSISTENCY_MARGIN_SPEED_SOURCE, unit) | |
| gain_consistent = gain_n >= 2 and gain_span is not None and gain_span <= CONSISTENCY_MARGIN_GAIN | |
| bias_consistent = bias_n >= 2 and bias_span is not None and speed_margin_kph is not None and bias_span <= speed_margin_kph | |
| hyst_consistent = hyst_n >= 2 and hyst_span is not None and speed_margin_kph is not None and hyst_span <= speed_margin_kph | |
| disp_consistent = disp_n >= 2 and disp_span is not None and speed_margin_kph is not None and disp_span <= speed_margin_kph | |
| gain_label = colorize("consistent" if gain_consistent else "not consistent", fg=32 if gain_consistent else 31, bold=True) | |
| bias_label = colorize("consistent" if bias_consistent else "not consistent", fg=32 if bias_consistent else 31, bold=True) | |
| hyst_label = colorize("consistent" if hyst_consistent else "not consistent", fg=32 if hyst_consistent else 31, bold=True) | |
| disp_label = colorize("consistent" if disp_consistent else "not consistent", fg=32 if disp_consistent else 31, bold=True) | |
| def extrema_region_text(region_vals: list[tuple[str, float | None]], digits: int, suffix: str = "") -> str: | |
| finite_vals = [(region, float(v)) for region, v in region_vals if v is not None and np.isfinite(v)] | |
| if len(finite_vals) == 0: | |
| return "" | |
| min_region, min_val = min(finite_vals, key=lambda x: x[1]) | |
| max_region, max_val = max(finite_vals, key=lambda x: x[1]) | |
| if not SHOW_METADATA: | |
| return gray_meta( | |
| f"min={fmt(min_val, digits)}{suffix} @{min_region}, " | |
| f"max={fmt(max_val, digits)}{suffix} @{max_region}" | |
| ) | |
| min_route = sorted(route for route, _ in region_rows[min_region])[0] if min_region in region_rows and len(region_rows[min_region]) else "n/a" | |
| max_route = sorted(route for route, _ in region_rows[max_region])[0] if max_region in region_rows and len(region_rows[max_region]) else "n/a" | |
| min_ctx = route_context_text(min_route, route_meta, region=min_region, fw=route_fw.get(min_route, "n/a"), include_region=False) | |
| max_ctx = route_context_text(max_route, route_meta, region=max_region, fw=route_fw.get(max_route, "n/a"), include_region=False) | |
| return gray_meta( | |
| f"min={fmt(min_val, digits)}{suffix} @{min_region}({min_ctx}), " | |
| f"max={fmt(max_val, digits)}{suffix} @{max_region}({max_ctx})" | |
| ) | |
| gain_ext = extrema_region_text(region_gain_pairs, 4) | |
| bias_ext = extrema_region_text(region_bias_pairs, 3, " kph") | |
| hyst_ext = extrema_region_text(region_hyst_pairs, 3, " kph") | |
| disp_ext = extrema_region_text(region_disp_pairs, 3, " kph") | |
| print( | |
| f" gain(avg) diff {gray_meta(f'(n={gain_n})')}: {fmt_gain_percent(gain_span, 3)} ({gain_label})" | |
| f"{(' ' + gain_ext) if (not gain_consistent and gain_ext) else ''}" | |
| ) | |
| print( | |
| f" bias_median span {gray_meta(f'(n={bias_n})')}: {fmt(bias_span)} kph ({bias_label})" | |
| f"{(' ' + bias_ext) if (not bias_consistent and bias_ext) else ''}" | |
| ) | |
| print( | |
| f" hysteresis(+/-)_median span {gray_meta(f'(n={hyst_n})')}: {fmt(hyst_span)} kph ({hyst_label})" | |
| f"{(' ' + hyst_ext) if (not hyst_consistent and hyst_ext) else ''}" | |
| ) | |
| print( | |
| f" display_comp span {gray_meta(f'(n={disp_n})')}: {fmt(disp_span)} kph ({disp_label})" | |
| f"{(' ' + disp_ext) if (not disp_consistent and disp_ext) else ''}" | |
| ) | |
| print(f" {colorize('region values:', fg=36, bold=True)}") | |
| for region in regions: | |
| rows = region_rows[region] | |
| gains_raw, biases_raw, hysts_raw, disp_raw, fw_ids = firmware_metric_lists_from_rows(rows, unit, route_fw) | |
| gains = finite_array(gains_raw) | |
| biases = finite_array(biases_raw) | |
| hysts = finite_array(hysts_raw) | |
| disp = finite_array(disp_raw) | |
| gain_v = float(np.median(gains)) if gains.size else None | |
| bias_v = float(np.median(biases)) if biases.size else None | |
| hyst_v = float(np.median(hysts)) if hysts.size else None | |
| disp_v = float(np.median(disp)) if disp.size else None | |
| _, _, _, _, gain_span_rg, gain_n_rg = summarize_values(gains_raw) | |
| _, _, _, _, bias_span_rg, bias_n_rg = summarize_values(biases_raw) | |
| _, _, _, _, hyst_span_rg, hyst_n_rg = summarize_values(hysts_raw) | |
| _, _, _, _, disp_span_rg, disp_n_rg = summarize_values(disp_raw) | |
| speed_margin_kph = source_to_kph(CONSISTENCY_MARGIN_SPEED_SOURCE, unit) | |
| if gain_n_rg < 2: | |
| gain_label_rg = colorize("single-firmware", fg=33, bold=True) | |
| else: | |
| gain_consistent_rg = gain_span_rg is not None and gain_span_rg <= CONSISTENCY_MARGIN_GAIN | |
| gain_label_rg = colorize("consistent" if gain_consistent_rg else "not consistent", fg=32 if gain_consistent_rg else 31, bold=True) | |
| if bias_n_rg < 2: | |
| bias_label_rg = colorize("single-firmware", fg=33, bold=True) | |
| else: | |
| bias_consistent_rg = bias_span_rg is not None and speed_margin_kph is not None and bias_span_rg <= speed_margin_kph | |
| bias_label_rg = colorize("consistent" if bias_consistent_rg else "not consistent", fg=32 if bias_consistent_rg else 31, bold=True) | |
| if hyst_n_rg < 2: | |
| hyst_label_rg = colorize("single-firmware", fg=33, bold=True) | |
| else: | |
| hyst_consistent_rg = hyst_span_rg is not None and speed_margin_kph is not None and hyst_span_rg <= speed_margin_kph | |
| hyst_label_rg = colorize("consistent" if hyst_consistent_rg else "not consistent", fg=32 if hyst_consistent_rg else 31, bold=True) | |
| if disp_n_rg < 2: | |
| disp_label_rg = colorize("single-firmware", fg=33, bold=True) | |
| else: | |
| disp_consistent_rg = disp_span_rg is not None and speed_margin_kph is not None and disp_span_rg <= speed_margin_kph | |
| disp_label_rg = colorize("consistent" if disp_consistent_rg else "not consistent", fg=32 if disp_consistent_rg else 31, bold=True) | |
| user_count = len({user_id_from_route(route) for route, _ in rows}) | |
| units_text = encountered_units_text({unit}) | |
| print(f" {colorize(f'[{region}]', bold=True)} routes={len(rows)}, firmwares={len(fw_ids)}, users: {user_count}, ({units_text})") | |
| print( | |
| f" gain_avg={fmt(gain_v, 4)} " | |
| f"({gray_meta(f'diff={fmt_gain_percent(gain_span_rg, 3)}, n={gain_n_rg}')}, {gain_label_rg})" | |
| ) | |
| print( | |
| f" bias_median={fmt(bias_v)} kph " | |
| f"({gray_meta(f'span={fmt(bias_span_rg)} kph, n={bias_n_rg}')}, {bias_label_rg})" | |
| ) | |
| print( | |
| f" hysteresis(+/-)_median={fmt(hyst_v)} kph " | |
| f"({gray_meta(f'span={fmt(hyst_span_rg)} kph, n={hyst_n_rg}')}, {hyst_label_rg})" | |
| ) | |
| print( | |
| f" display_comp={fmt(disp_v)} kph " | |
| f"({gray_meta(f'span={fmt(disp_span_rg)} kph, n={disp_n_rg}')}, {disp_label_rg})" | |
| ) | |
| def print_final_consistency(per_route_samples: list[RouteSamples]) -> None: | |
| by_unit = route_metric_stats_by_unit(per_route_samples) | |
| route_eps = {rs.route: rs.eps_fw for rs in per_route_samples} | |
| route_region = {rs.route: rs.region for rs in per_route_samples} | |
| route_meta = {rs.route: rs for rs in per_route_samples} | |
| print("\n" + colorize("Final Consistency", fg=35, bold=True)) | |
| print( | |
| f" {colorize('margins:', fg=90)} " | |
| f"gain_diff<={fmt_gain_percent(CONSISTENCY_MARGIN_GAIN, 3)}, " | |
| f"speed_span<={fmt(CONSISTENCY_MARGIN_SPEED_SOURCE, 3)} source units " | |
| f"(KPH={fmt(source_to_kph(CONSISTENCY_MARGIN_SPEED_SOURCE, UNIT_KPH), 3)} kph, " | |
| f"MPH={fmt(source_to_kph(CONSISTENCY_MARGIN_SPEED_SOURCE, UNIT_MPH), 3)} kph)" | |
| ) | |
| if len(by_unit) == 0: | |
| print(f" {colorize('no unit-grouped route data available', fg=31)}") | |
| return | |
| for unit in sorted(by_unit.keys()): | |
| rows = by_unit[unit] | |
| unit_routes = [route for route, _ in rows] | |
| unit_firmwares = len({route_eps.get(route, "n/a") for route in unit_routes}) | |
| unit_regions = len({route_region.get(route, "UNKNOWN") for route in unit_routes}) | |
| unit_users = len({user_id_from_route(route) for route in unit_routes}) | |
| if SHOW_METADATA: | |
| print( | |
| f" {colorize(f'[{unit_name(unit)}]', fg=32, bold=True)} " | |
| f"routes_with_data={len(rows)}, firmwares={unit_firmwares}, regions={unit_regions}, users={unit_users}" | |
| ) | |
| else: | |
| print( | |
| f" {colorize(f'[{unit_name(unit)}]', fg=32, bold=True)} " | |
| f"routes_with_data={len(rows)}" | |
| ) | |
| if len(rows) < 2: | |
| print(f" {colorize('not enough routes for consistency check', fg=33)}") | |
| for route, stats in rows: | |
| route_label = colorize(route, fg=90) | |
| print(f" {route_label}:") | |
| print(f" gain_avg={fmt(stats.gain_avg_bias_comp, 4)}") | |
| print(f" bias_median={format_kph(stats.bias_median_source, unit)}") | |
| print(f" hysteresis(+/-)_median={format_kph(stats.hysteresis_median_source, unit)}") | |
| print(f" display_comp={fmt(sum_if_both(source_to_kph(stats.bias_median_source, unit), source_to_kph(stats.hysteresis_median_source, unit)))} kph") | |
| continue | |
| gain_avgs = [s.gain_avg_bias_comp for _, s in rows] | |
| bias_medians = [source_to_kph(s.bias_median_source, unit) for _, s in rows] | |
| hyst_medians = [source_to_kph(s.hysteresis_median_source, unit) for _, s in rows] | |
| disp_comp_medians = [sum_if_both(source_to_kph(s.bias_median_source, unit), source_to_kph(s.hysteresis_median_source, unit)) | |
| for _, s in rows] | |
| gain_route_vals = [(route, float(s.gain_avg_bias_comp)) for route, s in rows | |
| if s.gain_avg_bias_comp is not None and np.isfinite(s.gain_avg_bias_comp)] | |
| bias_route_vals = [(route, float(v)) for (route, _), v in zip(rows, bias_medians) | |
| if v is not None and np.isfinite(v)] | |
| hyst_route_vals = [(route, float(v)) for (route, _), v in zip(rows, hyst_medians) | |
| if v is not None and np.isfinite(v)] | |
| disp_comp_route_vals = [(route, float(v)) for (route, _), v in zip(rows, disp_comp_medians) | |
| if v is not None and np.isfinite(v)] | |
| gain_vals = finite_array(gain_avgs) | |
| bias_vals = finite_array(bias_medians) | |
| hyst_vals = finite_array(hyst_medians) | |
| disp_comp_vals = finite_array(disp_comp_medians) | |
| _, _, _, _, gain_span, gain_n = summarize_values(gain_avgs) | |
| _, _, _, _, bias_span, bias_n = summarize_values(bias_medians) | |
| _, _, _, _, hyst_span, hyst_n = summarize_values(hyst_medians) | |
| _, _, _, _, disp_comp_span, disp_comp_n = summarize_values(disp_comp_medians) | |
| speed_margin_kph = source_to_kph(CONSISTENCY_MARGIN_SPEED_SOURCE, unit) | |
| gain_consistent = gain_n >= 2 and gain_span is not None and gain_span <= CONSISTENCY_MARGIN_GAIN | |
| bias_consistent = bias_n >= 2 and bias_span is not None and speed_margin_kph is not None and bias_span <= speed_margin_kph | |
| hyst_consistent = hyst_n >= 2 and hyst_span is not None and speed_margin_kph is not None and hyst_span <= speed_margin_kph | |
| disp_comp_consistent = disp_comp_n >= 2 and disp_comp_span is not None and speed_margin_kph is not None and disp_comp_span <= speed_margin_kph | |
| gain_median_value = float(np.median(gain_vals)) if gain_vals.size else None | |
| bias_median_value = float(np.median(bias_vals)) if bias_vals.size else None | |
| hyst_median_value = float(np.median(hyst_vals)) if hyst_vals.size else None | |
| disp_comp_median_value = float(np.median(disp_comp_vals)) if disp_comp_vals.size else None | |
| gain_label = colorize("consistent" if gain_consistent else "not consistent", fg=32 if gain_consistent else 31, bold=True) | |
| bias_label = colorize("consistent" if bias_consistent else "not consistent", fg=32 if bias_consistent else 31, bold=True) | |
| hyst_label = colorize("consistent" if hyst_consistent else "not consistent", fg=32 if hyst_consistent else 31, bold=True) | |
| disp_comp_label = colorize("consistent" if disp_comp_consistent else "not consistent", fg=32 if disp_comp_consistent else 31, bold=True) | |
| def extrema_text(route_vals: list[tuple[str, float]], digits: int, suffix: str = "") -> str: | |
| if len(route_vals) == 0: | |
| return "" | |
| min_route, min_val = min(route_vals, key=lambda x: x[1]) | |
| max_route, max_val = max(route_vals, key=lambda x: x[1]) | |
| min_txt = f"{fmt(min_val, digits)}{suffix}" | |
| max_txt = f"{fmt(max_val, digits)}{suffix}" | |
| if SHOW_METADATA: | |
| min_ctx = route_context_text(min_route, route_meta) | |
| max_ctx = route_context_text(max_route, route_meta) | |
| return gray_meta(f"min={min_txt} @{min_ctx}, max={max_txt} @{max_ctx}") | |
| return gray_meta(f"min={min_txt}, max={max_txt}") | |
| def raw_array_text(values: list[float | None], digits: int) -> str: | |
| finite_vals = [float(v) for v in values if v is not None and np.isfinite(v)] | |
| arr_txt = "[" + ", ".join(fmt(v, digits) for v in finite_vals) + "]" | |
| return gray_meta(f"raw={arr_txt}") | |
| bias_raw = raw_array_text(bias_medians, 3) | |
| hyst_raw = raw_array_text(hyst_medians, 3) | |
| disp_raw = raw_array_text(disp_comp_medians, 3) | |
| if gain_consistent: | |
| print(f" gain_avg: {gain_label} ({gray_meta(f'n={gain_n}, median={fmt(gain_median_value, 4)}')})") | |
| else: | |
| gain_ext = extrema_text(gain_route_vals, 4) | |
| print( | |
| f" gain_avg: {gain_label} ({gray_meta(f'n={gain_n}, diff={fmt_gain_percent(gain_span, 3)}')})" | |
| f"{(' ' + gain_ext) if gain_ext else ''}" | |
| ) | |
| if bias_consistent: | |
| print(f" bias_median: {bias_label} ({gray_meta(f'n={bias_n}, median={fmt(bias_median_value)} kph')}) {bias_raw}") | |
| else: | |
| bias_ext = extrema_text(bias_route_vals, 3, " kph") | |
| print( | |
| f" bias_median: {bias_label} ({gray_meta(f'n={bias_n}, span={fmt(bias_span)} kph')})" | |
| f"{(' ' + bias_ext) if bias_ext else ''} {bias_raw}" | |
| ) | |
| if hyst_consistent: | |
| print(f" hysteresis(+/-)_median: {hyst_label} ({gray_meta(f'n={hyst_n}, median={fmt(hyst_median_value)} kph')}) {hyst_raw}") | |
| else: | |
| hyst_ext = extrema_text(hyst_route_vals, 3, " kph") | |
| print( | |
| f" hysteresis(+/-)_median: {hyst_label} ({gray_meta(f'n={hyst_n}, span={fmt(hyst_span)} kph')})" | |
| f"{(' ' + hyst_ext) if hyst_ext else ''} {hyst_raw}" | |
| ) | |
| if disp_comp_consistent: | |
| print(f" display_comp: {disp_comp_label} ({gray_meta(f'n={disp_comp_n}, median={fmt(disp_comp_median_value)} kph')}) {disp_raw}") | |
| else: | |
| disp_ext = extrema_text(disp_comp_route_vals, 3, " kph") | |
| print( | |
| f" display_comp: {disp_comp_label} ({gray_meta(f'n={disp_comp_n}, span={fmt(disp_comp_span)} kph')})" | |
| f"{(' ' + disp_ext) if disp_ext else ''} {disp_raw}" | |
| ) | |
| def print_display_comp_extremes(per_route_samples: list[RouteSamples]) -> None: | |
| by_unit = route_metric_stats_by_unit(per_route_samples) | |
| print("\n" + colorize("Display Comp Extremes", fg=35, bold=True)) | |
| if len(by_unit) == 0: | |
| print(f" {colorize('no unit-grouped route data available', fg=31)}") | |
| return | |
| route_meta = {rs.route: rs for rs in per_route_samples} | |
| for unit in (UNIT_MPH, UNIT_KPH): | |
| rows = by_unit.get(unit, []) | |
| route_vals: list[tuple[str, float]] = [] | |
| for route, stats in rows: | |
| disp_comp = sum_if_both(stats.bias_median_source, stats.hysteresis_median_source) | |
| if disp_comp is None or not np.isfinite(disp_comp): | |
| continue | |
| disp_comp_kph = source_to_kph(float(disp_comp), unit) | |
| if disp_comp_kph is None or not np.isfinite(disp_comp_kph): | |
| continue | |
| route_vals.append((route, float(disp_comp_kph))) | |
| print(f" {colorize(f'[{unit_name(unit)}]', fg=32, bold=True)} {gray_meta(f'routes_with_value={len(route_vals)}')}") | |
| if len(route_vals) == 0: | |
| print(f" {gray_meta('no display compensation values available')}") | |
| continue | |
| min_route, min_val_kph = min(route_vals, key=lambda x: x[1]) | |
| max_route, max_val_kph = max(route_vals, key=lambda x: x[1]) | |
| values_txt = "[" + ", ".join(fmt(v, 3) for _, v in route_vals) + "]" | |
| print(f" {colorize('display_comp:', bold=True)} {gray_meta(values_txt)}") | |
| if SHOW_METADATA: | |
| min_info = f"({route_context_text(min_route, route_meta)})" | |
| max_info = f"({route_context_text(max_route, route_meta)})" | |
| print(f" {colorize('min:', bold=True)} {fmt(min_val_kph)} kph {gray_meta(min_info)}") | |
| print(f" {colorize('max:', bold=True)} {fmt(max_val_kph)} kph {gray_meta(max_info)}") | |
| else: | |
| print(f" {colorize('min:', bold=True)} {fmt(min_val_kph)} kph") | |
| print(f" {colorize('max:', bold=True)} {fmt(max_val_kph)} kph") | |
| def print_bias_hysteresis_extremes(per_route_samples: list[RouteSamples]) -> None: | |
| by_unit = route_metric_stats_by_unit(per_route_samples) | |
| print("\n" + colorize("Bias & Hysteresis Extremes", fg=35, bold=True)) | |
| if len(by_unit) == 0: | |
| print(f" {colorize('no unit-grouped route data available', fg=31)}") | |
| return | |
| route_meta = {rs.route: rs for rs in per_route_samples} | |
| def print_extrema_block(metric_name: str, route_vals: list[tuple[str, float]]) -> None: | |
| if len(route_vals) == 0: | |
| print(f" {gray_meta(f'no {metric_name} values available')}") | |
| return | |
| min_route, min_val_kph = min(route_vals, key=lambda x: x[1]) | |
| max_route, max_val_kph = max(route_vals, key=lambda x: x[1]) | |
| values_txt = "[" + ", ".join(fmt(v, 3) for _, v in route_vals) + "]" | |
| print(f" {colorize(f'{metric_name}:', bold=True)} {gray_meta(values_txt)}") | |
| if SHOW_METADATA: | |
| min_info = f"({route_context_text(min_route, route_meta)})" | |
| max_info = f"({route_context_text(max_route, route_meta)})" | |
| print(f" {colorize('min:', bold=True)} {fmt(min_val_kph)} kph {gray_meta(min_info)}") | |
| print(f" {colorize('max:', bold=True)} {fmt(max_val_kph)} kph {gray_meta(max_info)}") | |
| else: | |
| print(f" {colorize('min:', bold=True)} {fmt(min_val_kph)} kph") | |
| print(f" {colorize('max:', bold=True)} {fmt(max_val_kph)} kph") | |
| for unit in (UNIT_MPH, UNIT_KPH): | |
| rows = by_unit.get(unit, []) | |
| bias_vals: list[tuple[str, float]] = [] | |
| hyst_vals: list[tuple[str, float]] = [] | |
| for route, stats in rows: | |
| bias_kph = source_to_kph(stats.bias_median_source, unit) | |
| if bias_kph is not None and np.isfinite(bias_kph): | |
| bias_vals.append((route, float(bias_kph))) | |
| hyst_kph = source_to_kph(stats.hysteresis_median_source, unit) | |
| if hyst_kph is not None and np.isfinite(hyst_kph): | |
| hyst_vals.append((route, float(hyst_kph))) | |
| routes_with_any = len(set([r for r, _ in bias_vals] + [r for r, _ in hyst_vals])) | |
| print(f" {colorize(f'[{unit_name(unit)}]', fg=32, bold=True)} {gray_meta(f'routes_with_value={routes_with_any}')}") | |
| print_extrema_block("bias_median", bias_vals) | |
| print_extrema_block("hysteresis(+/-)_median", hyst_vals) | |
| def compact_exception(exc: Exception) -> str: | |
| text = str(exc).strip() | |
| if not text: | |
| return exc.__class__.__name__ | |
| first_line = text.splitlines()[0].strip() | |
| if "logs were not found" in first_line: | |
| return first_line.split(",", 1)[0] | |
| return first_line | |
| def print_aggregate_stats(per_route_samples: list[RouteSamples]) -> None: | |
| if len(per_route_samples) == 0: | |
| return | |
| t_all = np.concatenate([rs.t for rs in per_route_samples if rs.t.size], axis=0) | |
| ui_all = np.concatenate([rs.ui_speed_raw for rs in per_route_samples if rs.ui_speed_raw.size], axis=0) | |
| units_all = np.concatenate([rs.ui_units for rs in per_route_samples if rs.ui_units.size], axis=0) | |
| veh_all = np.concatenate([rs.vehicle_kph for rs in per_route_samples if rs.vehicle_kph.size], axis=0) | |
| if t_all.size == 0: | |
| return | |
| order = np.argsort(t_all) | |
| t_all = t_all[order] | |
| ui_all = ui_all[order] | |
| units_all = units_all[order] | |
| veh_all = veh_all[order] | |
| print("\n" + colorize("=" * 96, fg=35, bold=True)) | |
| print(colorize("ALL_ROUTES", fg=35, bold=True)) | |
| unit_vals, unit_counts = np.unique(units_all, return_counts=True) | |
| counts = ", ".join(f"{unit_name(int(u))}={int(c)}" for u, c in zip(unit_vals, unit_counts)) | |
| print(f"{colorize('unit_counts:', fg=90)} {counts}") | |
| print(f"{colorize('sample summary:', fg=90)}") | |
| for u in unit_vals: | |
| unit = int(u) | |
| mask = units_all == unit | |
| stats = compute_unit_stats(t_all[mask], ui_all[mask], veh_all[mask], unit) | |
| print(f" [{unit_name(unit)}] samples={stats.sample_count}") | |
| print_consistency_summary(per_route_samples) | |
| print_firmware_consistency_summary(per_route_samples) | |
| print_region_consistency_summary(per_route_samples) | |
| print_final_consistency(per_route_samples) | |
| print_bias_hysteresis_extremes(per_route_samples) | |
| print_display_comp_extremes(per_route_samples) | |
| def main() -> int: | |
| parser = argparse.ArgumentParser( | |
| description="Measure Tesla DI_uiSpeed hysteresis (+/- half-width) and bias versus DI_vehicleSpeed", | |
| formatter_class=argparse.ArgumentDefaultsHelpFormatter, | |
| ) | |
| parser.add_argument("route", nargs="*", help="Route(s) in dongle/segment format") | |
| parser.add_argument("--workers", type=int, default=4, help="Workers for run_across_segments") | |
| parser.add_argument("--no-cache", action="store_true", help="Disable cache read/write") | |
| parser.add_argument("--no-color", action="store_true", help="Disable ANSI color output") | |
| parser.add_argument("--show-metadata", action="store_true", help="Show extra metadata (region/user/fw) in summaries") | |
| parser.add_argument( | |
| "--vehicle-speed-signal", | |
| default=DEFAULT_VEHICLE_SPEED_SIGNAL, | |
| help="DI_speed signal to use as vehicle speed reference (e.g. DI_vehicleSpeed or DI_vehicleSpeed_new)", | |
| ) | |
| args = parser.parse_args() | |
| global USE_COLOR, SHOW_METADATA | |
| USE_COLOR = (not args.no_color) and sys.stdout.isatty() and os.environ.get("NO_COLOR") is None | |
| SHOW_METADATA = args.show_metadata | |
| try: | |
| vehicle_speed_signal = resolve_vehicle_speed_signal(args.vehicle_speed_signal) | |
| except ValueError as e: | |
| parser.error(str(e)) | |
| return 2 | |
| routes = args.route if len(args.route) else DEFAULT_ROUTES | |
| normalized_routes = [] | |
| for raw in routes: | |
| route = normalize_route(raw) | |
| if route != raw: | |
| print(f"Normalized route '{raw}' -> '{route}'") | |
| normalized_routes.append(route) | |
| all_route_samples: list[RouteSamples] = [] | |
| total_routes = len(normalized_routes) | |
| print(colorize(f"Processing {total_routes} route(s)...", fg=34, bold=True)) | |
| print_median_confidence_legend() | |
| for i, route in enumerate(normalized_routes, start=1): | |
| if i > 1: | |
| print() | |
| print(colorize(f"[{i}/{total_routes}] {route}", fg=34)) | |
| cache_file = route_cache_filename(route, vehicle_speed_signal) | |
| try: | |
| if not args.no_cache and os.path.exists(cache_file): | |
| print(colorize("Loading cache...", fg=90)) | |
| else: | |
| print(colorize("Processing route data...", fg=90)) | |
| samples = load_or_scan_route(route, workers=args.workers, no_cache=args.no_cache, vehicle_speed_signal=vehicle_speed_signal) | |
| print_route_stats(samples) | |
| all_route_samples.append(samples) | |
| except Exception as e: | |
| print(colorize(f"Skipping {route} due to error: {compact_exception(e)}", fg=31, bold=True)) | |
| if os.path.exists(cache_file): | |
| os.remove(cache_file) | |
| continue | |
| if len(all_route_samples) == 0: | |
| print(colorize("No valid routes were processed", fg=31, bold=True)) | |
| return 1 | |
| print_aggregate_stats(all_route_samples) | |
| return 0 | |
| if __name__ == "__main__": | |
| sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment