|
#!/usr/bin/env python3 |
|
""" |
|
StepCount Importer |
|
|
|
reads a CSV that has at least: |
|
DateTime,StepCount |
|
|
|
and UPSERTs rows into an Apple-ish .steps JSON structure (inside StepSample), |
|
keyed by intervalStart (Core Data seconds since 2001-01-01 00:00:00 UTC). |
|
|
|
- if an intervalStart already exists: update stepCount (and leave other fields alone) |
|
- if it does not exist: insert a new StepSample item |
|
|
|
requires macos `compression_tool` for decode/encode, same as your other script. |
|
""" |
|
|
|
import argparse |
|
import csv |
|
import datetime as dt |
|
import json |
|
import subprocess |
|
import sys |
|
from pathlib import Path |
|
from typing import Any |
|
|
|
|
|
CORE_DATA_EPOCH_OFFSET = 978307200 # seconds between 1970-01-01 and 2001-01-01 |
|
|
|
|
|
def run_cmd_capture(cmd: list[str]) -> tuple[str, str, int]: |
|
p = subprocess.run(cmd, capture_output=True, text=True) |
|
return p.stdout, p.stderr, p.returncode |
|
|
|
|
|
def encode_to_file(doc: dict[str, Any], out_path: Path) -> int: |
|
p = subprocess.Popen( |
|
["compression_tool", "-encode", "-o", str(out_path)], |
|
stdin=subprocess.PIPE, |
|
text=True, |
|
) |
|
assert p.stdin is not None |
|
p.stdin.write(json.dumps(doc, indent=2, ensure_ascii=False)) |
|
p.stdin.close() |
|
return p.wait() |
|
|
|
|
|
def decode_steps(steps_path: Path) -> dict[str, Any]: |
|
decoded, stderr, rc = run_cmd_capture(["compression_tool", "-decode", "-i", str(steps_path)]) |
|
if rc != 0: |
|
raise RuntimeError(f"decode failed (rc={rc}): {stderr}") |
|
return json.loads(decoded) |
|
|
|
|
|
def parse_datetime_fuzzy(s: str, *, local_tz: dt.tzinfo) -> dt.datetime: |
|
""" |
|
accepts: |
|
- 'YYYY-MM-DD HH:MM:SS' |
|
- 'YYYY-MM-DDTHH:MM:SS' |
|
- with optional timezone offsets if present |
|
""" |
|
s = (s or "").strip() |
|
if not s: |
|
raise ValueError("empty DateTime") |
|
|
|
try: |
|
x = dt.datetime.fromisoformat(s.replace(" ", "T", 1) if "T" not in s and " " in s else s) |
|
except ValueError: |
|
x = dt.datetime.strptime(s, "%Y-%m-%d %H:%M:%S") |
|
|
|
if x.tzinfo is None: |
|
x = x.replace(tzinfo=local_tz) |
|
|
|
return x |
|
|
|
|
|
def datetime_to_core_data_seconds(ts: dt.datetime) -> int: |
|
""" |
|
intervalStart is seconds since 2001-01-01 00:00:00 UTC (Core Data epoch). |
|
""" |
|
ts_utc = ts.astimezone(dt.timezone.utc) |
|
unix_seconds = int(ts_utc.timestamp()) |
|
return unix_seconds - CORE_DATA_EPOCH_OFFSET |
|
|
|
|
|
def find_col(row: dict[str, str], wanted: str) -> str | None: |
|
""" |
|
case-insensitive header lookup, because your CSV format is "variable". |
|
""" |
|
wanted_l = wanted.lower() |
|
for k, v in row.items(): |
|
if (k or "").strip().lower() == wanted_l: |
|
return v |
|
return None |
|
|
|
|
|
class StepCountImporter: |
|
def upsert_from_csv( |
|
self, |
|
*, |
|
steps_path: Path, |
|
csv_path: Path, |
|
out_path: Path, |
|
) -> None: |
|
local_tz = dt.datetime.now().astimezone().tzinfo or dt.timezone.utc |
|
|
|
doc = decode_steps(steps_path) |
|
|
|
samples = doc.get("StepSample") |
|
if samples is None: |
|
samples = [] |
|
doc["StepSample"] = samples |
|
if not isinstance(samples, list): |
|
raise ValueError("doc['StepSample'] is not a list") |
|
|
|
by_interval: dict[int, dict[str, Any]] = {} |
|
for item in samples: |
|
if isinstance(item, dict) and "intervalStart" in item: |
|
try: |
|
by_interval[int(item["intervalStart"])] = item |
|
except Exception: |
|
pass |
|
|
|
inserted = 0 |
|
updated = 0 |
|
skipped = 0 |
|
|
|
with csv_path.open("r", encoding="utf-8-sig", newline="") as f: |
|
reader = csv.DictReader(f) |
|
if reader.fieldnames is None: |
|
raise ValueError("csv has no header row") |
|
|
|
for row in reader: |
|
dt_raw = find_col(row, "DateTime") |
|
sc_raw = find_col(row, "StepCount") |
|
|
|
if dt_raw is None or sc_raw is None: |
|
skipped += 1 |
|
continue |
|
|
|
try: |
|
ts = parse_datetime_fuzzy(dt_raw, local_tz=local_tz) |
|
interval_start = datetime_to_core_data_seconds(ts) |
|
except Exception: |
|
skipped += 1 |
|
continue |
|
|
|
try: |
|
sc_float = float(sc_raw) |
|
step_count = int(round(sc_float)) |
|
except Exception: |
|
skipped += 1 |
|
continue |
|
|
|
existing = by_interval.get(interval_start) |
|
if existing is not None: |
|
existing["stepCount"] = step_count |
|
updated += 1 |
|
else: |
|
new_item: dict[str, Any] = { |
|
"intervalStart": interval_start, |
|
"stepCount": step_count, |
|
"floorsAscended": 0, |
|
"distanceMeters": 0, |
|
} |
|
samples.append(new_item) |
|
by_interval[interval_start] = new_item |
|
inserted += 1 |
|
|
|
def key_fn(x: Any) -> int: |
|
if isinstance(x, dict) and "intervalStart" in x: |
|
try: |
|
return int(x["intervalStart"]) |
|
except Exception: |
|
return 0 |
|
return 0 |
|
|
|
samples.sort(key=key_fn) |
|
|
|
rc = encode_to_file(doc, out_path) |
|
if rc != 0: |
|
raise RuntimeError(f"encode failed (rc={rc})") |
|
|
|
print(f"upsert complete: inserted={inserted} updated={updated} skipped={skipped}") |
|
print(f"wrote: {out_path}") |
|
|
|
|
|
def main() -> None: |
|
ap = argparse.ArgumentParser(prog="StepCount Importer") |
|
ap.add_argument("--steps", default="./Export.steps", help="input .steps file (default: ./Export.steps)") |
|
ap.add_argument("--csv", required=True, help="input csv file containing DateTime,StepCount") |
|
ap.add_argument("--out", default="", help="output .steps file (default: Export-imported-<epoch>.steps)") |
|
args = ap.parse_args() |
|
|
|
steps_path = Path(args.steps) |
|
csv_path = Path(args.csv) |
|
|
|
if not steps_path.exists(): |
|
print(f"error: steps not found: {steps_path}", file=sys.stderr) |
|
sys.exit(2) |
|
if not csv_path.exists(): |
|
print(f"error: csv not found: {csv_path}", file=sys.stderr) |
|
sys.exit(2) |
|
|
|
if args.out: |
|
out_path = Path(args.out) |
|
else: |
|
out_path = steps_path.parent / f"Export-imported-{int(dt.datetime.now().timestamp())}.steps" |
|
|
|
StepCountImporter().upsert_from_csv(steps_path=steps_path, csv_path=csv_path, out_path=out_path) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |