Created
March 2, 2026 16:29
-
-
Save nalditopr/6887da45cb957102b7dea8e0f1810566 to your computer and use it in GitHub Desktop.
poll-v1r.py — Test v1r RSA key registration on Powerwall 3 leader + followers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| poll-v1r.py - Test v1r RSA key registration on Powerwall leader and followers | |
| Polls each device directly to verify whether the RSA key pairing succeeded after | |
| toggling each Powerwall's breaker during fleet_register.py enrollment. | |
| Usage: | |
| python3 poll-v1r.py --leader 10.42.1.X \\ | |
| --follower 10.42.1.Y --follower-pwd JIHGFEDCBA \\ | |
| --follower 10.42.1.Z --follower-pwd ZYXWVUTSRQ \\ | |
| --gw-pwd ABCDEFGHIJ \\ | |
| --email you@example.com \\ | |
| --rsa-key tedapi_rsa_private.pem | |
| # Re-poll every 10 seconds until all devices are registered: | |
| python3 poll-v1r.py --leader 10.42.1.X \\ | |
| --follower 10.42.1.Y --follower-pwd JIHGFEDCBA \\ | |
| --gw-pwd ABCDEFGHIJ --email you@example.com \\ | |
| --rsa-key tedapi_rsa_private.pem --watch 10 | |
| Each device runs four checks: | |
| [1] POST /api/login/Basic - verifies customer password, gets Bearer token | |
| [2] GET /tedapi/info - firmware version + DIN (uses Bearer token) | |
| [3] GET /tedapi/din - confirms DIN via Bearer auth | |
| [4] POST /tedapi/v1r - RSA key test (the critical check) | |
| v1r result meanings: | |
| ✓ REGISTERED - key accepted, config.json returned | |
| ✗ UNKNOWN_KEY_ID - key not registered (toggle breaker + re-run fleet_register.py) | |
| ✗ INACTIVE_KEY - key registered but not yet active (wait and retry) | |
| ✗ TIMEOUT - v1r endpoint not responding | |
| ✗ NOT_AUTHORIZED - firmware-level auth rejection (try re-registration) | |
| Tip: If a follower's login keeps timing out, supply its DIN directly: | |
| --follower 10.42.1.44:1707000-11-M--TGxxxxxxxxxxxx | |
| (the v1r check runs even without a Bearer token when DIN is pre-supplied) | |
| """ | |
| import argparse | |
| import json | |
| import math | |
| import struct | |
| import sys | |
| import time | |
| import uuid | |
| import requests | |
| import urllib3 | |
| from urllib3.exceptions import InsecureRequestWarning | |
| urllib3.disable_warnings(InsecureRequestWarning) | |
| # ── Colour helpers ──────────────────────────────────────────────────────────── | |
| def _ansi(code, text): | |
| return f"\033[{code}m{text}\033[0m" if sys.stdout.isatty() else text | |
| ok = lambda t: _ansi("32;1", t) # green bold | |
| err = lambda t: _ansi("31;1", t) # red bold | |
| wrn = lambda t: _ansi("33;1", t) # yellow bold | |
| inf = lambda t: _ansi("36", t) # cyan | |
| dim = lambda t: _ansi("2", t) # dim | |
| # ── v1r signing logic (mirrors TEDAPIv1r) ───────────────────────────────────── | |
| def _to_tlv(tag, value_bytes): | |
| return bytes([tag]) + bytes([len(value_bytes)]) + value_bytes | |
| def _build_tlv(din, expires_at, inner_bytes): | |
| return b''.join([ | |
| _to_tlv(0, bytes([7])), # TAG_SIGNATURE_TYPE = RSA (7) | |
| _to_tlv(1, bytes([7])), # TAG_DOMAIN = ENERGY_DEVICE (7) | |
| _to_tlv(2, din.encode()), # TAG_PERSONALIZATION = DIN | |
| _to_tlv(4, struct.pack('>I', expires_at)), # TAG_EXPIRES_AT | |
| bytes([255]), # TAG_END | |
| inner_bytes, # payload bytes | |
| ]) | |
| def make_v1r_payload(private_key, public_key_der, din, inner_bytes): | |
| """Wrap inner_bytes in a signed RoutableMessage for POST /tedapi/v1r.""" | |
| from pypowerwall.tedapi import tedapi_combined_pb2 as combined_pb2 | |
| from cryptography.hazmat.primitives.asymmetric import padding | |
| from cryptography.hazmat.primitives import hashes | |
| routable = combined_pb2.RoutableMessage() | |
| routable.to_destination.domain = combined_pb2.DOMAIN_ENERGY_DEVICE | |
| routable.protobuf_message_as_bytes = inner_bytes | |
| routable.uuid = str(uuid.uuid4()).encode() | |
| expires_at = math.ceil(time.time()) + 12 | |
| tlv = _build_tlv(din, expires_at, inner_bytes) | |
| sig = private_key.sign(tlv, padding.PKCS1v15(), hashes.SHA512()) | |
| routable.signature_data.signer_identity.public_key = public_key_der | |
| routable.signature_data.rsa_data.expires_at = expires_at | |
| routable.signature_data.rsa_data.signature = sig | |
| return routable.SerializeToString() | |
| def build_config_request(din): | |
| """Build a FileStore config.json request envelope (inner bytes for v1r).""" | |
| from pypowerwall.tedapi import tedapi_combined_pb2 as combined_pb2 | |
| msg = combined_pb2.Message() | |
| msg.message.deliveryChannel = combined_pb2.DELIVERY_CHANNEL_HERMES_COMMAND | |
| msg.message.sender.authorizedClient = 1 | |
| msg.message.recipient.din = din | |
| msg.message.filestore.readFileRequest.domain = ( | |
| combined_pb2.FILE_STORE_API_DOMAIN_CONFIG_JSON | |
| ) | |
| msg.message.filestore.readFileRequest.name = "config.json" | |
| return msg.message.SerializeToString() | |
| # ── Per-device check ────────────────────────────────────────────────────────── | |
| def check_device(label, host, password, email, private_key, public_key_der, | |
| timeout=10, known_din=None, signing_din=None): | |
| """ | |
| Run all four checks against one device (leader or follower). | |
| known_din : DIN pre-supplied by the user (overrides /tedapi/info + /tedapi/din | |
| when login fails) | |
| signing_din : DIN to use for TLV personalization. For followers this must be | |
| the *leader* DIN (the key is registered on the leader only). | |
| Defaults to the device DIN if not supplied. | |
| Returns (all_passed: bool, summary: dict). | |
| """ | |
| from pypowerwall.tedapi import tedapi_combined_pb2 as combined_pb2 | |
| results = {} | |
| session = requests.Session() | |
| session.verify = False | |
| print(f"\n{'─'*60}") | |
| print(f" Device : {inf(label)} ({host})") | |
| print(f"{'─'*60}") | |
| # ── [1] /api/login/Basic ───────────────────────────────────────── | |
| # Login first so we can use Bearer token for /tedapi/info and /tedapi/din. | |
| step = "[1] /api/login/Basic" | |
| token = None | |
| try: | |
| body = json.dumps({ | |
| "username": "customer", | |
| "password": password, | |
| "email": email, | |
| "clientInfo": {"timezone": "America/Chicago"}, | |
| }) | |
| r = session.post(f"https://{host}/api/login/Basic", data=body, | |
| headers={"Content-Type": "application/json"}, | |
| timeout=timeout) | |
| if r.status_code == 200: | |
| token = r.json().get("token", "") | |
| print(f" {ok('✓')} {step} token={token[:16]}...") | |
| results["login"] = True | |
| else: | |
| print(f" {err('✗')} {step} HTTP {r.status_code} {r.text[:80]}") | |
| results["login"] = False | |
| except requests.exceptions.ReadTimeout: | |
| print(f" {err('✗')} {step} Read timeout — device reachable but login stalled") | |
| print(f" {dim(' Follower may be booting or its gateway service is initializing.')}") | |
| print(f" {dim(' Try again in 30-60 seconds, or supply --follower IP:DIN to skip login.')}") | |
| results["login"] = False | |
| except Exception as e: | |
| print(f" {err('✗')} {step} {e}") | |
| results["login"] = False | |
| # ── [2] /tedapi/info ───────────────────────────────────────────── | |
| # Requires Bearer token on some firmware versions. | |
| step = "[2] /tedapi/info" | |
| reported_din = known_din # pre-seeded from command line if supplied | |
| try: | |
| headers = {"Authorization": f"Bearer {token}"} if token else {} | |
| r = session.get(f"https://{host}/tedapi/info", headers=headers, timeout=timeout) | |
| if r.status_code == 200: | |
| info = r.json() | |
| reported_din = info.get("din", "?") | |
| fw = info.get("firmwareVersion", {}).get("version", "?") | |
| dev = info.get("deviceType", "?") | |
| print(f" {ok('✓')} {step} DIN={reported_din} FW={fw} type={dev}") | |
| results["info"] = True | |
| elif r.status_code == 403: | |
| msg = dim("(403 — interface-level block on this firmware, non-fatal)") | |
| if known_din: | |
| msg += dim(f" using supplied DIN={known_din}") | |
| print(f" {wrn('~')} {step} HTTP 403 {msg}") | |
| results["info"] = None # 403 is non-fatal — don't count against pass/fail | |
| else: | |
| print(f" {err('✗')} {step} HTTP {r.status_code}") | |
| results["info"] = False | |
| except requests.exceptions.ConnectionError as e: | |
| print(f" {err('✗')} {step} connection refused / unreachable — {e}") | |
| results["info"] = False | |
| results.update({"din": False, "v1r": False}) | |
| print(f" {dim(' (device unreachable — skipping remaining steps)')}") | |
| return False, results | |
| except Exception as e: | |
| print(f" {err('✗')} {step} {e}") | |
| results["info"] = False | |
| # ── [3] /tedapi/din ────────────────────────────────────────────── | |
| step = "[3] /tedapi/din" | |
| auth_din = None | |
| try: | |
| headers = {"Authorization": f"Bearer {token}"} if token else {} | |
| r = session.get(f"https://{host}/tedapi/din", headers=headers, | |
| timeout=timeout) | |
| if r.status_code == 200: | |
| auth_din = r.text.strip() | |
| if reported_din and auth_din != reported_din: | |
| match = wrn(f"⚠ mismatch with /tedapi/info ({reported_din})") | |
| elif reported_din: | |
| match = ok("✓ matches /tedapi/info") | |
| else: | |
| match = dim("(/tedapi/info not available for comparison)") | |
| print(f" {ok('✓')} {step} {auth_din} {match}") | |
| results["din"] = True | |
| else: | |
| print(f" {err('✗')} {step} HTTP {r.status_code}") | |
| results["din"] = False | |
| except Exception as e: | |
| print(f" {err('✗')} {step} {e}") | |
| results["din"] = False | |
| # Determine which DIN to use as recipient in the envelope | |
| device_din = auth_din or reported_din | |
| if not device_din: | |
| print(f" {err('✗')} [4] /tedapi/v1r {err('No DIN for recipient — supply via IP:DIN format')}") | |
| print(f" {dim(' e.g. --follower 10.42.1.44:1707000-11-M--TGxxxxxxxxxxxx')}") | |
| results["v1r"] = False | |
| return False, results | |
| # TLV personalization DIN: for followers the leader DIN must be used | |
| # (the RSA key is registered on the leader; personalization must match). | |
| # If signing_din was passed in, prefer it; otherwise fall back to device DIN. | |
| tlv_din = signing_din or device_din | |
| # ── [4] /tedapi/v1r (RSA key test) ─────────────────────────────── | |
| step = "[4] /tedapi/v1r" | |
| try: | |
| inner_bytes = build_config_request(device_din) | |
| payload = make_v1r_payload(private_key, public_key_der, tlv_din, inner_bytes) | |
| print(f" {dim(f' recipient={device_din} tlv_personalization={tlv_din}')}") | |
| r = session.post(f"https://{host}/tedapi/v1r", | |
| data=payload, | |
| headers={"Content-Type": "application/octet-stream"}, | |
| timeout=timeout) | |
| if r.status_code not in (200,): | |
| print(f" {err('✗')} {step} HTTP {r.status_code}") | |
| results["v1r"] = False | |
| else: | |
| # Check for plain-text "client not authorized" (36 bytes, firmware-level) | |
| raw = r.content | |
| if raw and b"client not authorized" in raw: | |
| print(f" {err('✗')} {step} {err('NOT_AUTHORIZED')} " | |
| f"{dim('(key not registered on this device)')}") | |
| results["v1r"] = False | |
| return False, results | |
| # Parse as RoutableMessage | |
| resp = combined_pb2.RoutableMessage() | |
| resp.ParseFromString(raw) | |
| fault = resp.signed_message_status.message_fault | |
| fault_name = combined_pb2.MessageFault_E.Name(fault) | |
| if fault == combined_pb2.MESSAGEFAULT_ERROR_NONE: | |
| # Extract config JSON to confirm real data and surface battery_blocks | |
| inner = resp.protobuf_message_as_bytes | |
| snippet = "" | |
| if inner: | |
| try: | |
| envelope = combined_pb2.MessageEnvelope() | |
| envelope.ParseFromString(inner) | |
| blob = envelope.filestore.readFileResponse.file.blob | |
| config = json.loads(blob.decode("utf-8")) | |
| vin = config.get("vin", "?") | |
| snippet = f" config VIN={vin}" | |
| # Surface battery_blocks so main() can auto-resolve follower DINs | |
| results["_battery_blocks"] = config.get("battery_blocks", []) | |
| except Exception: | |
| snippet = f" ({len(inner)} bytes)" | |
| print(f" {ok('✓')} {step} {ok('REGISTERED')} ✓{snippet}") | |
| results["v1r"] = True | |
| elif fault == combined_pb2.MESSAGEFAULT_ERROR_UNKNOWN_KEY_ID: | |
| print(f" {err('✗')} {step} {err('UNKNOWN_KEY_ID')} " | |
| f"{dim('(key not registered — toggle this breaker)')}") | |
| results["v1r"] = False | |
| elif fault == combined_pb2.MESSAGEFAULT_ERROR_INACTIVE_KEY: | |
| print(f" {wrn('⚠')} {step} {wrn('INACTIVE_KEY')} " | |
| f"{dim('(registered but not yet active — wait and retry)')}") | |
| results["v1r"] = False | |
| elif fault == combined_pb2.MESSAGEFAULT_ERROR_TIMEOUT: | |
| print(f" {err('✗')} {step} {err('TIMEOUT')} " | |
| f"{dim('(v1r endpoint unreachable)')}") | |
| results["v1r"] = False | |
| else: | |
| print(f" {err('✗')} {step} {err(fault_name)}") | |
| results["v1r"] = False | |
| except Exception as e: | |
| print(f" {err('✗')} {step} {e}") | |
| results["v1r"] = False | |
| # Only v1r matters for the pass verdict. | |
| # login → each follower has its own QR password (expected 401 with leader pw) | |
| # info → may return 403 without Bearer token | |
| # din → may return 403 without Bearer token; also informational only | |
| # Keys starting with "_" are metadata, not check results. | |
| all_passed = all(v for k, v in results.items() | |
| if not k.startswith("_") and v is not None | |
| and k not in ("login", "din")) | |
| # Surface the resolved DIN so the caller can use it for follower signing | |
| results["_din"] = device_din | |
| return all_passed, results | |
| # ── Main ────────────────────────────────────────────────────────────────────── | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Poll Powerwall leader + followers to verify v1r RSA key registration", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=__doc__, | |
| ) | |
| parser.add_argument("--leader", required=True, metavar="IP", | |
| help="Leader Powerwall IP (e.g. 10.42.1.40)") | |
| parser.add_argument("--follower", action="append", metavar="IP[:DIN]", | |
| default=[], dest="followers", | |
| help="Follower IP, optionally with DIN: 10.42.1.44:1707000-11-M--TGxxx " | |
| "(DIN auto-discovered from leader config if not supplied)") | |
| parser.add_argument("--gw-pwd", required=True, metavar="PWD", | |
| help="Full 10-char QR code password (last 5 used for login)") | |
| parser.add_argument("--email", required=True, metavar="EMAIL", | |
| help="Tesla account email used for /api/login/Basic") | |
| parser.add_argument("--follower-pwd", action="append", metavar="PWD", | |
| default=[], dest="follower_pwds", | |
| help="Follower QR code password (repeat for each --follower, in order; " | |
| "falls back to --gw-pwd if not supplied)") | |
| parser.add_argument("--rsa-key", default="tedapi_rsa_private.pem", metavar="PATH", | |
| help="Path to RSA private key PEM (default: tedapi_rsa_private.pem)") | |
| parser.add_argument("--timeout", type=int, default=20, metavar="SEC", | |
| help="Per-request timeout in seconds (default: 20)") | |
| parser.add_argument("--watch", type=int, default=0, metavar="SEC", | |
| help="Re-poll every N seconds until all pass (0 = run once)") | |
| args = parser.parse_args() | |
| # Validate + load RSA key | |
| from cryptography.hazmat.primitives import serialization | |
| try: | |
| with open(args.rsa_key, "rb") as f: | |
| private_key = serialization.load_pem_private_key(f.read(), password=None) | |
| except FileNotFoundError: | |
| print(err(f"RSA key not found: {args.rsa_key}")) | |
| print(dim(" Run: python -m pypowerwall register")) | |
| sys.exit(1) | |
| public_key_der = private_key.public_key().public_bytes( | |
| serialization.Encoding.DER, | |
| serialization.PublicFormat.PKCS1, | |
| ) | |
| # Customer password = last 5 chars of QR code password | |
| customer_pw = args.gw_pwd[-5:] | |
| follower_pws = [pw[-5:] for pw in args.follower_pwds] if args.follower_pwds else [] | |
| # Parse follower specs: "IP" or "IP:DIN" | |
| def parse_device(spec): | |
| """Return (ip, din_or_None) from 'IP' or 'IP:DIN' spec.""" | |
| # DINs contain '--' but no ':', IPs have no ':' either → split on first ':' | |
| # Exception: IPv6 addresses — not expected in this Powerwall context. | |
| if ':' in spec: | |
| ip, din = spec.split(':', 1) | |
| return ip.strip(), din.strip() | |
| return spec.strip(), None | |
| leader_ip, leader_din_override = parse_device(args.leader) | |
| followers_parsed = [parse_device(f) for f in args.followers] | |
| # We need the leader DIN to use as TLV personalization for follower checks. | |
| # It will be filled in after the leader check runs. | |
| leader_din_resolved = leader_din_override # may be updated after leader check | |
| devices = [("Leader", leader_ip, leader_din_override, None)] + [ | |
| (f"Follower {i+1}", ip, din, None) for i, (ip, din) in enumerate(followers_parsed) | |
| ] | |
| iteration = 0 | |
| while True: | |
| iteration += 1 | |
| ts = time.strftime("%Y-%m-%d %H:%M:%S") | |
| print(f"\n{'═'*60}") | |
| print(f" poll-v1r · {ts} · pass {iteration}") | |
| print(f"{'═'*60}") | |
| print(f" RSA key : {args.rsa_key}") | |
| print(f" Password : {'*' * len(customer_pw)} (last {len(customer_pw)} chars of gw-pwd)") | |
| all_ok = True | |
| status_lines = [] | |
| leader_battery_blocks = [] # populated after leader check | |
| for idx, (label, host, known_din, _) in enumerate(devices): | |
| is_leader = (idx == 0) | |
| # Followers must sign TLV with the leader DIN (key registered on leader) | |
| tlv_signing = None if is_leader else leader_din_resolved | |
| # Auto-resolve follower DIN from leader battery_blocks if not supplied | |
| resolved_known_din = known_din | |
| if not is_leader and not resolved_known_din and leader_battery_blocks: | |
| # Filter out leader DIN; match followers by index | |
| follower_dins = [ | |
| b["vin"] for b in leader_battery_blocks | |
| if b.get("vin") and b["vin"] != leader_din_resolved | |
| ] | |
| follower_idx = idx - 1 # 0-based follower index | |
| if follower_idx < len(follower_dins): | |
| resolved_known_din = follower_dins[follower_idx] | |
| print(f" {dim(f' Auto-discovered {label} DIN from leader config: {resolved_known_din}')}") | |
| # Pick password: each follower has its own QR code | |
| if is_leader: | |
| device_pw = customer_pw | |
| else: | |
| follower_idx = idx - 1 | |
| device_pw = follower_pws[follower_idx] if follower_idx < len(follower_pws) else customer_pw | |
| passed, results = check_device( | |
| label, host, device_pw, args.email, private_key, public_key_der, | |
| timeout=args.timeout, | |
| known_din=resolved_known_din, | |
| signing_din=tlv_signing, | |
| ) | |
| # Capture leader DIN and battery_blocks for subsequent follower checks | |
| if is_leader: | |
| resolved = results.get("_din") | |
| if resolved: | |
| leader_din_resolved = resolved | |
| leader_battery_blocks = results.get("_battery_blocks", []) | |
| if leader_battery_blocks: | |
| follower_dins_found = [ | |
| b["vin"] for b in leader_battery_blocks | |
| if b.get("vin") and b["vin"] != leader_din_resolved | |
| ] | |
| if follower_dins_found: | |
| print(f" {dim(f' Leader config: found {len(follower_dins_found)} follower DIN(s): {follower_dins_found}')}") | |
| all_ok = all_ok and passed | |
| v1r_icon = ok("✓ REGISTERED") if results.get("v1r") else err("✗ NOT REGISTERED") | |
| status_lines.append(f" {label:<14} {host:<16} {v1r_icon}") | |
| print(f"\n{'─'*60}") | |
| print(" Summary:") | |
| for line in status_lines: | |
| print(line) | |
| print() | |
| if all_ok: | |
| print(ok(" ALL DEVICES REGISTERED — follower queries should now work!")) | |
| print() | |
| break | |
| if args.watch <= 0: | |
| print(dim(" Re-run with --watch N to poll automatically.")) | |
| print() | |
| break | |
| print(wrn(f" Not all registered yet — retrying in {args.watch}s (Ctrl-C to stop)")) | |
| try: | |
| time.sleep(args.watch) | |
| except KeyboardInterrupt: | |
| print("\n Stopped.") | |
| break | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment