Skip to content

Instantly share code, notes, and snippets.

@alexwilson
Created December 31, 2025 05:33
Show Gist options
  • Select an option

  • Save alexwilson/c4b6d1cdb3cf2b67363512ac2192c2b0 to your computer and use it in GitHub Desktop.

Select an option

Save alexwilson/c4b6d1cdb3cf2b67363512ac2192c2b0 to your computer and use it in GitHub Desktop.
Pedometer++ Step Count Importer

Pedometer++ step count importer

Imports step counts from a CSV file and upserts them into a Pedometer++ .steps file.

This is for backfilling step data from external sources (Google Fit, Apple Watch exports, spreadsheets, etc).

How to use it?

Firstly, you need a .steps file. Pedometer++ will create this for you from an export.

And then you run this CLI:

$ ./stepcount_importer.py \
  --steps ./Export.steps \
  --csv ./steps.csv \
  --out ./Export-with-imported-steps.steps

What it does

  • reads a CSV with at least:
    • DateTime
    • StepCount
  • decodes an existing .steps file
  • converts each DateTime into Core Data time (seconds since 2001-01-01 00:00:00 UTC)
  • upserts into StepSample:
    • if intervalStart exists → updates stepCount
    • otherwise → inserts a new StepSample
  • re-encodes the result back into a .steps file

CSV format

The CSV format is flexible. Only two columns are required.

Example:

DateTime,StepCount,Source
2017-05-18 04:00:00,23.96,Alex’s iPhone|Withings
2017-05-18 05:00:00,35.96,Alex’s Apple Watch|Withings

Inspired by this blog post: “Hacking my Pedometer++ data”

#!/usr/bin/env python3
"""
StepCount Importer
reads a CSV that has at least:
DateTime,StepCount
and UPSERTs rows into an Apple-ish .steps JSON structure (inside StepSample),
keyed by intervalStart (Core Data seconds since 2001-01-01 00:00:00 UTC).
- if an intervalStart already exists: update stepCount (and leave other fields alone)
- if it does not exist: insert a new StepSample item
requires macos `compression_tool` for decode/encode, same as your other script.
"""
import argparse
import csv
import datetime as dt
import json
import subprocess
import sys
from pathlib import Path
from typing import Any
CORE_DATA_EPOCH_OFFSET = 978307200 # seconds between 1970-01-01 and 2001-01-01
def run_cmd_capture(cmd: list[str]) -> tuple[str, str, int]:
p = subprocess.run(cmd, capture_output=True, text=True)
return p.stdout, p.stderr, p.returncode
def encode_to_file(doc: dict[str, Any], out_path: Path) -> int:
p = subprocess.Popen(
["compression_tool", "-encode", "-o", str(out_path)],
stdin=subprocess.PIPE,
text=True,
)
assert p.stdin is not None
p.stdin.write(json.dumps(doc, indent=2, ensure_ascii=False))
p.stdin.close()
return p.wait()
def decode_steps(steps_path: Path) -> dict[str, Any]:
decoded, stderr, rc = run_cmd_capture(["compression_tool", "-decode", "-i", str(steps_path)])
if rc != 0:
raise RuntimeError(f"decode failed (rc={rc}): {stderr}")
return json.loads(decoded)
def parse_datetime_fuzzy(s: str, *, local_tz: dt.tzinfo) -> dt.datetime:
"""
accepts:
- 'YYYY-MM-DD HH:MM:SS'
- 'YYYY-MM-DDTHH:MM:SS'
- with optional timezone offsets if present
"""
s = (s or "").strip()
if not s:
raise ValueError("empty DateTime")
try:
x = dt.datetime.fromisoformat(s.replace(" ", "T", 1) if "T" not in s and " " in s else s)
except ValueError:
x = dt.datetime.strptime(s, "%Y-%m-%d %H:%M:%S")
if x.tzinfo is None:
x = x.replace(tzinfo=local_tz)
return x
def datetime_to_core_data_seconds(ts: dt.datetime) -> int:
"""
intervalStart is seconds since 2001-01-01 00:00:00 UTC (Core Data epoch).
"""
ts_utc = ts.astimezone(dt.timezone.utc)
unix_seconds = int(ts_utc.timestamp())
return unix_seconds - CORE_DATA_EPOCH_OFFSET
def find_col(row: dict[str, str], wanted: str) -> str | None:
"""
case-insensitive header lookup, because your CSV format is "variable".
"""
wanted_l = wanted.lower()
for k, v in row.items():
if (k or "").strip().lower() == wanted_l:
return v
return None
class StepCountImporter:
def upsert_from_csv(
self,
*,
steps_path: Path,
csv_path: Path,
out_path: Path,
) -> None:
local_tz = dt.datetime.now().astimezone().tzinfo or dt.timezone.utc
doc = decode_steps(steps_path)
samples = doc.get("StepSample")
if samples is None:
samples = []
doc["StepSample"] = samples
if not isinstance(samples, list):
raise ValueError("doc['StepSample'] is not a list")
by_interval: dict[int, dict[str, Any]] = {}
for item in samples:
if isinstance(item, dict) and "intervalStart" in item:
try:
by_interval[int(item["intervalStart"])] = item
except Exception:
pass
inserted = 0
updated = 0
skipped = 0
with csv_path.open("r", encoding="utf-8-sig", newline="") as f:
reader = csv.DictReader(f)
if reader.fieldnames is None:
raise ValueError("csv has no header row")
for row in reader:
dt_raw = find_col(row, "DateTime")
sc_raw = find_col(row, "StepCount")
if dt_raw is None or sc_raw is None:
skipped += 1
continue
try:
ts = parse_datetime_fuzzy(dt_raw, local_tz=local_tz)
interval_start = datetime_to_core_data_seconds(ts)
except Exception:
skipped += 1
continue
try:
sc_float = float(sc_raw)
step_count = int(round(sc_float))
except Exception:
skipped += 1
continue
existing = by_interval.get(interval_start)
if existing is not None:
existing["stepCount"] = step_count
updated += 1
else:
new_item: dict[str, Any] = {
"intervalStart": interval_start,
"stepCount": step_count,
"floorsAscended": 0,
"distanceMeters": 0,
}
samples.append(new_item)
by_interval[interval_start] = new_item
inserted += 1
def key_fn(x: Any) -> int:
if isinstance(x, dict) and "intervalStart" in x:
try:
return int(x["intervalStart"])
except Exception:
return 0
return 0
samples.sort(key=key_fn)
rc = encode_to_file(doc, out_path)
if rc != 0:
raise RuntimeError(f"encode failed (rc={rc})")
print(f"upsert complete: inserted={inserted} updated={updated} skipped={skipped}")
print(f"wrote: {out_path}")
def main() -> None:
ap = argparse.ArgumentParser(prog="StepCount Importer")
ap.add_argument("--steps", default="./Export.steps", help="input .steps file (default: ./Export.steps)")
ap.add_argument("--csv", required=True, help="input csv file containing DateTime,StepCount")
ap.add_argument("--out", default="", help="output .steps file (default: Export-imported-<epoch>.steps)")
args = ap.parse_args()
steps_path = Path(args.steps)
csv_path = Path(args.csv)
if not steps_path.exists():
print(f"error: steps not found: {steps_path}", file=sys.stderr)
sys.exit(2)
if not csv_path.exists():
print(f"error: csv not found: {csv_path}", file=sys.stderr)
sys.exit(2)
if args.out:
out_path = Path(args.out)
else:
out_path = steps_path.parent / f"Export-imported-{int(dt.datetime.now().timestamp())}.steps"
StepCountImporter().upsert_from_csv(steps_path=steps_path, csv_path=csv_path, out_path=out_path)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment