Skip to content

Instantly share code, notes, and snippets.

@nalditopr
Created March 2, 2026 16:29
Show Gist options
  • Select an option

  • Save nalditopr/6887da45cb957102b7dea8e0f1810566 to your computer and use it in GitHub Desktop.

Select an option

Save nalditopr/6887da45cb957102b7dea8e0f1810566 to your computer and use it in GitHub Desktop.
poll-v1r.py — Test v1r RSA key registration on Powerwall 3 leader + followers
#!/usr/bin/env python3
"""
poll-v1r.py - Test v1r RSA key registration on Powerwall leader and followers
Polls each device directly to verify whether the RSA key pairing succeeded after
toggling each Powerwall's breaker during fleet_register.py enrollment.
Usage:
python3 poll-v1r.py --leader 10.42.1.X \\
--follower 10.42.1.Y --follower-pwd JIHGFEDCBA \\
--follower 10.42.1.Z --follower-pwd ZYXWVUTSRQ \\
--gw-pwd ABCDEFGHIJ \\
--email you@example.com \\
--rsa-key tedapi_rsa_private.pem
# Re-poll every 10 seconds until all devices are registered:
python3 poll-v1r.py --leader 10.42.1.X \\
--follower 10.42.1.Y --follower-pwd JIHGFEDCBA \\
--gw-pwd ABCDEFGHIJ --email you@example.com \\
--rsa-key tedapi_rsa_private.pem --watch 10
Each device runs four checks:
[1] POST /api/login/Basic - verifies customer password, gets Bearer token
[2] GET /tedapi/info - firmware version + DIN (uses Bearer token)
[3] GET /tedapi/din - confirms DIN via Bearer auth
[4] POST /tedapi/v1r - RSA key test (the critical check)
v1r result meanings:
✓ REGISTERED - key accepted, config.json returned
✗ UNKNOWN_KEY_ID - key not registered (toggle breaker + re-run fleet_register.py)
✗ INACTIVE_KEY - key registered but not yet active (wait and retry)
✗ TIMEOUT - v1r endpoint not responding
✗ NOT_AUTHORIZED - firmware-level auth rejection (try re-registration)
Tip: If a follower's login keeps timing out, supply its DIN directly:
--follower 10.42.1.44:1707000-11-M--TGxxxxxxxxxxxx
(the v1r check runs even without a Bearer token when DIN is pre-supplied)
"""
import argparse
import json
import math
import struct
import sys
import time
import uuid
import requests
import urllib3
from urllib3.exceptions import InsecureRequestWarning
urllib3.disable_warnings(InsecureRequestWarning)
# ── Colour helpers ────────────────────────────────────────────────────────────
def _ansi(code, text):
return f"\033[{code}m{text}\033[0m" if sys.stdout.isatty() else text
ok = lambda t: _ansi("32;1", t) # green bold
err = lambda t: _ansi("31;1", t) # red bold
wrn = lambda t: _ansi("33;1", t) # yellow bold
inf = lambda t: _ansi("36", t) # cyan
dim = lambda t: _ansi("2", t) # dim
# ── v1r signing logic (mirrors TEDAPIv1r) ─────────────────────────────────────
def _to_tlv(tag, value_bytes):
return bytes([tag]) + bytes([len(value_bytes)]) + value_bytes
def _build_tlv(din, expires_at, inner_bytes):
return b''.join([
_to_tlv(0, bytes([7])), # TAG_SIGNATURE_TYPE = RSA (7)
_to_tlv(1, bytes([7])), # TAG_DOMAIN = ENERGY_DEVICE (7)
_to_tlv(2, din.encode()), # TAG_PERSONALIZATION = DIN
_to_tlv(4, struct.pack('>I', expires_at)), # TAG_EXPIRES_AT
bytes([255]), # TAG_END
inner_bytes, # payload bytes
])
def make_v1r_payload(private_key, public_key_der, din, inner_bytes):
"""Wrap inner_bytes in a signed RoutableMessage for POST /tedapi/v1r."""
from pypowerwall.tedapi import tedapi_combined_pb2 as combined_pb2
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import hashes
routable = combined_pb2.RoutableMessage()
routable.to_destination.domain = combined_pb2.DOMAIN_ENERGY_DEVICE
routable.protobuf_message_as_bytes = inner_bytes
routable.uuid = str(uuid.uuid4()).encode()
expires_at = math.ceil(time.time()) + 12
tlv = _build_tlv(din, expires_at, inner_bytes)
sig = private_key.sign(tlv, padding.PKCS1v15(), hashes.SHA512())
routable.signature_data.signer_identity.public_key = public_key_der
routable.signature_data.rsa_data.expires_at = expires_at
routable.signature_data.rsa_data.signature = sig
return routable.SerializeToString()
def build_config_request(din):
"""Build a FileStore config.json request envelope (inner bytes for v1r)."""
from pypowerwall.tedapi import tedapi_combined_pb2 as combined_pb2
msg = combined_pb2.Message()
msg.message.deliveryChannel = combined_pb2.DELIVERY_CHANNEL_HERMES_COMMAND
msg.message.sender.authorizedClient = 1
msg.message.recipient.din = din
msg.message.filestore.readFileRequest.domain = (
combined_pb2.FILE_STORE_API_DOMAIN_CONFIG_JSON
)
msg.message.filestore.readFileRequest.name = "config.json"
return msg.message.SerializeToString()
# ── Per-device check ──────────────────────────────────────────────────────────
def check_device(label, host, password, email, private_key, public_key_der,
timeout=10, known_din=None, signing_din=None):
"""
Run all four checks against one device (leader or follower).
known_din : DIN pre-supplied by the user (overrides /tedapi/info + /tedapi/din
when login fails)
signing_din : DIN to use for TLV personalization. For followers this must be
the *leader* DIN (the key is registered on the leader only).
Defaults to the device DIN if not supplied.
Returns (all_passed: bool, summary: dict).
"""
from pypowerwall.tedapi import tedapi_combined_pb2 as combined_pb2
results = {}
session = requests.Session()
session.verify = False
print(f"\n{'─'*60}")
print(f" Device : {inf(label)} ({host})")
print(f"{'─'*60}")
# ── [1] /api/login/Basic ─────────────────────────────────────────
# Login first so we can use Bearer token for /tedapi/info and /tedapi/din.
step = "[1] /api/login/Basic"
token = None
try:
body = json.dumps({
"username": "customer",
"password": password,
"email": email,
"clientInfo": {"timezone": "America/Chicago"},
})
r = session.post(f"https://{host}/api/login/Basic", data=body,
headers={"Content-Type": "application/json"},
timeout=timeout)
if r.status_code == 200:
token = r.json().get("token", "")
print(f" {ok('✓')} {step} token={token[:16]}...")
results["login"] = True
else:
print(f" {err('✗')} {step} HTTP {r.status_code} {r.text[:80]}")
results["login"] = False
except requests.exceptions.ReadTimeout:
print(f" {err('✗')} {step} Read timeout — device reachable but login stalled")
print(f" {dim(' Follower may be booting or its gateway service is initializing.')}")
print(f" {dim(' Try again in 30-60 seconds, or supply --follower IP:DIN to skip login.')}")
results["login"] = False
except Exception as e:
print(f" {err('✗')} {step} {e}")
results["login"] = False
# ── [2] /tedapi/info ─────────────────────────────────────────────
# Requires Bearer token on some firmware versions.
step = "[2] /tedapi/info"
reported_din = known_din # pre-seeded from command line if supplied
try:
headers = {"Authorization": f"Bearer {token}"} if token else {}
r = session.get(f"https://{host}/tedapi/info", headers=headers, timeout=timeout)
if r.status_code == 200:
info = r.json()
reported_din = info.get("din", "?")
fw = info.get("firmwareVersion", {}).get("version", "?")
dev = info.get("deviceType", "?")
print(f" {ok('✓')} {step} DIN={reported_din} FW={fw} type={dev}")
results["info"] = True
elif r.status_code == 403:
msg = dim("(403 — interface-level block on this firmware, non-fatal)")
if known_din:
msg += dim(f" using supplied DIN={known_din}")
print(f" {wrn('~')} {step} HTTP 403 {msg}")
results["info"] = None # 403 is non-fatal — don't count against pass/fail
else:
print(f" {err('✗')} {step} HTTP {r.status_code}")
results["info"] = False
except requests.exceptions.ConnectionError as e:
print(f" {err('✗')} {step} connection refused / unreachable — {e}")
results["info"] = False
results.update({"din": False, "v1r": False})
print(f" {dim(' (device unreachable — skipping remaining steps)')}")
return False, results
except Exception as e:
print(f" {err('✗')} {step} {e}")
results["info"] = False
# ── [3] /tedapi/din ──────────────────────────────────────────────
step = "[3] /tedapi/din"
auth_din = None
try:
headers = {"Authorization": f"Bearer {token}"} if token else {}
r = session.get(f"https://{host}/tedapi/din", headers=headers,
timeout=timeout)
if r.status_code == 200:
auth_din = r.text.strip()
if reported_din and auth_din != reported_din:
match = wrn(f"⚠ mismatch with /tedapi/info ({reported_din})")
elif reported_din:
match = ok("✓ matches /tedapi/info")
else:
match = dim("(/tedapi/info not available for comparison)")
print(f" {ok('✓')} {step} {auth_din} {match}")
results["din"] = True
else:
print(f" {err('✗')} {step} HTTP {r.status_code}")
results["din"] = False
except Exception as e:
print(f" {err('✗')} {step} {e}")
results["din"] = False
# Determine which DIN to use as recipient in the envelope
device_din = auth_din or reported_din
if not device_din:
print(f" {err('✗')} [4] /tedapi/v1r {err('No DIN for recipient — supply via IP:DIN format')}")
print(f" {dim(' e.g. --follower 10.42.1.44:1707000-11-M--TGxxxxxxxxxxxx')}")
results["v1r"] = False
return False, results
# TLV personalization DIN: for followers the leader DIN must be used
# (the RSA key is registered on the leader; personalization must match).
# If signing_din was passed in, prefer it; otherwise fall back to device DIN.
tlv_din = signing_din or device_din
# ── [4] /tedapi/v1r (RSA key test) ───────────────────────────────
step = "[4] /tedapi/v1r"
try:
inner_bytes = build_config_request(device_din)
payload = make_v1r_payload(private_key, public_key_der, tlv_din, inner_bytes)
print(f" {dim(f' recipient={device_din} tlv_personalization={tlv_din}')}")
r = session.post(f"https://{host}/tedapi/v1r",
data=payload,
headers={"Content-Type": "application/octet-stream"},
timeout=timeout)
if r.status_code not in (200,):
print(f" {err('✗')} {step} HTTP {r.status_code}")
results["v1r"] = False
else:
# Check for plain-text "client not authorized" (36 bytes, firmware-level)
raw = r.content
if raw and b"client not authorized" in raw:
print(f" {err('✗')} {step} {err('NOT_AUTHORIZED')} "
f"{dim('(key not registered on this device)')}")
results["v1r"] = False
return False, results
# Parse as RoutableMessage
resp = combined_pb2.RoutableMessage()
resp.ParseFromString(raw)
fault = resp.signed_message_status.message_fault
fault_name = combined_pb2.MessageFault_E.Name(fault)
if fault == combined_pb2.MESSAGEFAULT_ERROR_NONE:
# Extract config JSON to confirm real data and surface battery_blocks
inner = resp.protobuf_message_as_bytes
snippet = ""
if inner:
try:
envelope = combined_pb2.MessageEnvelope()
envelope.ParseFromString(inner)
blob = envelope.filestore.readFileResponse.file.blob
config = json.loads(blob.decode("utf-8"))
vin = config.get("vin", "?")
snippet = f" config VIN={vin}"
# Surface battery_blocks so main() can auto-resolve follower DINs
results["_battery_blocks"] = config.get("battery_blocks", [])
except Exception:
snippet = f" ({len(inner)} bytes)"
print(f" {ok('✓')} {step} {ok('REGISTERED')} ✓{snippet}")
results["v1r"] = True
elif fault == combined_pb2.MESSAGEFAULT_ERROR_UNKNOWN_KEY_ID:
print(f" {err('✗')} {step} {err('UNKNOWN_KEY_ID')} "
f"{dim('(key not registered — toggle this breaker)')}")
results["v1r"] = False
elif fault == combined_pb2.MESSAGEFAULT_ERROR_INACTIVE_KEY:
print(f" {wrn('⚠')} {step} {wrn('INACTIVE_KEY')} "
f"{dim('(registered but not yet active — wait and retry)')}")
results["v1r"] = False
elif fault == combined_pb2.MESSAGEFAULT_ERROR_TIMEOUT:
print(f" {err('✗')} {step} {err('TIMEOUT')} "
f"{dim('(v1r endpoint unreachable)')}")
results["v1r"] = False
else:
print(f" {err('✗')} {step} {err(fault_name)}")
results["v1r"] = False
except Exception as e:
print(f" {err('✗')} {step} {e}")
results["v1r"] = False
# Only v1r matters for the pass verdict.
# login → each follower has its own QR password (expected 401 with leader pw)
# info → may return 403 without Bearer token
# din → may return 403 without Bearer token; also informational only
# Keys starting with "_" are metadata, not check results.
all_passed = all(v for k, v in results.items()
if not k.startswith("_") and v is not None
and k not in ("login", "din"))
# Surface the resolved DIN so the caller can use it for follower signing
results["_din"] = device_din
return all_passed, results
# ── Main ──────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="Poll Powerwall leader + followers to verify v1r RSA key registration",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
parser.add_argument("--leader", required=True, metavar="IP",
help="Leader Powerwall IP (e.g. 10.42.1.40)")
parser.add_argument("--follower", action="append", metavar="IP[:DIN]",
default=[], dest="followers",
help="Follower IP, optionally with DIN: 10.42.1.44:1707000-11-M--TGxxx "
"(DIN auto-discovered from leader config if not supplied)")
parser.add_argument("--gw-pwd", required=True, metavar="PWD",
help="Full 10-char QR code password (last 5 used for login)")
parser.add_argument("--email", required=True, metavar="EMAIL",
help="Tesla account email used for /api/login/Basic")
parser.add_argument("--follower-pwd", action="append", metavar="PWD",
default=[], dest="follower_pwds",
help="Follower QR code password (repeat for each --follower, in order; "
"falls back to --gw-pwd if not supplied)")
parser.add_argument("--rsa-key", default="tedapi_rsa_private.pem", metavar="PATH",
help="Path to RSA private key PEM (default: tedapi_rsa_private.pem)")
parser.add_argument("--timeout", type=int, default=20, metavar="SEC",
help="Per-request timeout in seconds (default: 20)")
parser.add_argument("--watch", type=int, default=0, metavar="SEC",
help="Re-poll every N seconds until all pass (0 = run once)")
args = parser.parse_args()
# Validate + load RSA key
from cryptography.hazmat.primitives import serialization
try:
with open(args.rsa_key, "rb") as f:
private_key = serialization.load_pem_private_key(f.read(), password=None)
except FileNotFoundError:
print(err(f"RSA key not found: {args.rsa_key}"))
print(dim(" Run: python -m pypowerwall register"))
sys.exit(1)
public_key_der = private_key.public_key().public_bytes(
serialization.Encoding.DER,
serialization.PublicFormat.PKCS1,
)
# Customer password = last 5 chars of QR code password
customer_pw = args.gw_pwd[-5:]
follower_pws = [pw[-5:] for pw in args.follower_pwds] if args.follower_pwds else []
# Parse follower specs: "IP" or "IP:DIN"
def parse_device(spec):
"""Return (ip, din_or_None) from 'IP' or 'IP:DIN' spec."""
# DINs contain '--' but no ':', IPs have no ':' either → split on first ':'
# Exception: IPv6 addresses — not expected in this Powerwall context.
if ':' in spec:
ip, din = spec.split(':', 1)
return ip.strip(), din.strip()
return spec.strip(), None
leader_ip, leader_din_override = parse_device(args.leader)
followers_parsed = [parse_device(f) for f in args.followers]
# We need the leader DIN to use as TLV personalization for follower checks.
# It will be filled in after the leader check runs.
leader_din_resolved = leader_din_override # may be updated after leader check
devices = [("Leader", leader_ip, leader_din_override, None)] + [
(f"Follower {i+1}", ip, din, None) for i, (ip, din) in enumerate(followers_parsed)
]
iteration = 0
while True:
iteration += 1
ts = time.strftime("%Y-%m-%d %H:%M:%S")
print(f"\n{'═'*60}")
print(f" poll-v1r · {ts} · pass {iteration}")
print(f"{'═'*60}")
print(f" RSA key : {args.rsa_key}")
print(f" Password : {'*' * len(customer_pw)} (last {len(customer_pw)} chars of gw-pwd)")
all_ok = True
status_lines = []
leader_battery_blocks = [] # populated after leader check
for idx, (label, host, known_din, _) in enumerate(devices):
is_leader = (idx == 0)
# Followers must sign TLV with the leader DIN (key registered on leader)
tlv_signing = None if is_leader else leader_din_resolved
# Auto-resolve follower DIN from leader battery_blocks if not supplied
resolved_known_din = known_din
if not is_leader and not resolved_known_din and leader_battery_blocks:
# Filter out leader DIN; match followers by index
follower_dins = [
b["vin"] for b in leader_battery_blocks
if b.get("vin") and b["vin"] != leader_din_resolved
]
follower_idx = idx - 1 # 0-based follower index
if follower_idx < len(follower_dins):
resolved_known_din = follower_dins[follower_idx]
print(f" {dim(f' Auto-discovered {label} DIN from leader config: {resolved_known_din}')}")
# Pick password: each follower has its own QR code
if is_leader:
device_pw = customer_pw
else:
follower_idx = idx - 1
device_pw = follower_pws[follower_idx] if follower_idx < len(follower_pws) else customer_pw
passed, results = check_device(
label, host, device_pw, args.email, private_key, public_key_der,
timeout=args.timeout,
known_din=resolved_known_din,
signing_din=tlv_signing,
)
# Capture leader DIN and battery_blocks for subsequent follower checks
if is_leader:
resolved = results.get("_din")
if resolved:
leader_din_resolved = resolved
leader_battery_blocks = results.get("_battery_blocks", [])
if leader_battery_blocks:
follower_dins_found = [
b["vin"] for b in leader_battery_blocks
if b.get("vin") and b["vin"] != leader_din_resolved
]
if follower_dins_found:
print(f" {dim(f' Leader config: found {len(follower_dins_found)} follower DIN(s): {follower_dins_found}')}")
all_ok = all_ok and passed
v1r_icon = ok("✓ REGISTERED") if results.get("v1r") else err("✗ NOT REGISTERED")
status_lines.append(f" {label:<14} {host:<16} {v1r_icon}")
print(f"\n{'─'*60}")
print(" Summary:")
for line in status_lines:
print(line)
print()
if all_ok:
print(ok(" ALL DEVICES REGISTERED — follower queries should now work!"))
print()
break
if args.watch <= 0:
print(dim(" Re-run with --watch N to poll automatically."))
print()
break
print(wrn(f" Not all registered yet — retrying in {args.watch}s (Ctrl-C to stop)"))
try:
time.sleep(args.watch)
except KeyboardInterrupt:
print("\n Stopped.")
break
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment