|
#!/usr/bin/env -S uv run --script |
|
# /// script |
|
# requires-python = ">=3.10" |
|
# dependencies = [ |
|
# "cloudscraper", |
|
# "cryptography", |
|
# ] |
|
# /// |
|
""" |
|
Fetches Claude Code session usage from the claude.ai API. |
|
Persists cookies per account and reuses them automatically when switching accounts. |
|
""" |
|
import sqlite3, subprocess, hashlib, json, os, sys, time, base64 |
|
from datetime import datetime, timezone |
|
import cloudscraper |
|
|
|
CACHE_FILE = "/tmp/claude-usage.json" |
|
CLAUDE_CONFIG = os.path.expanduser("~/.claude.json") |
|
BACKUP_DIR = os.path.expanduser("~/.claude-switch-backup/cookies") |
|
|
|
def get_current_account_email(): |
|
"""Read the current account email from ~/.claude.json""" |
|
try: |
|
with open(CLAUDE_CONFIG) as f: |
|
config = json.load(f) |
|
oauth = config.get("oauthAccount", {}) |
|
if isinstance(oauth, dict): |
|
email = oauth.get("emailAddress") |
|
if email: |
|
return email |
|
except Exception: |
|
pass |
|
return None |
|
|
|
def get_cached_account_email(): |
|
"""Read the account email from the last cache file""" |
|
try: |
|
with open(CACHE_FILE) as f: |
|
cache = json.load(f) |
|
return cache.get("account_email") |
|
except Exception: |
|
return None |
|
|
|
def get_cookies_file_path(email): |
|
"""Return the cookies file path for the given account""" |
|
if not email: |
|
return None |
|
safe_email = email.replace("@", "_at_").replace(".", "_") |
|
return os.path.join(BACKUP_DIR, f".claude-cookies-{safe_email}.json") |
|
|
|
def get_saved_cookies(email): |
|
"""Read saved cookies for the given account""" |
|
if not email: |
|
return None |
|
cookie_file = get_cookies_file_path(email) |
|
if not cookie_file or not os.path.exists(cookie_file): |
|
return None |
|
try: |
|
with open(cookie_file) as f: |
|
return json.load(f) |
|
except Exception: |
|
return None |
|
|
|
def save_cookies(email, cookies_dict): |
|
"""Save cookies for the given account""" |
|
if not email: |
|
return False |
|
os.makedirs(BACKUP_DIR, exist_ok=True) |
|
cookie_file = get_cookies_file_path(email) |
|
try: |
|
with open(cookie_file, "w") as f: |
|
json.dump(cookies_dict, f) |
|
os.chmod(cookie_file, 0o600) |
|
return True |
|
except Exception: |
|
return False |
|
|
|
def decrypt_cookie(encrypted_value, key): |
|
"""Decrypt an Arc browser cookie value""" |
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes |
|
from cryptography.hazmat.backends import default_backend |
|
b = bytes(encrypted_value) |
|
if b[:3] != b'v10': |
|
return None |
|
cipher = Cipher(algorithms.AES(key), modes.CBC(b' ' * 16), backend=default_backend()) |
|
decryptor = cipher.decryptor() |
|
raw = decryptor.update(b[3:]) + decryptor.finalize() |
|
pad_len = raw[-1] |
|
if 0 < pad_len <= 16: |
|
raw = raw[:-pad_len] |
|
if b'sk-ant-' in raw: |
|
idx = raw.find(b'sk-ant-') |
|
return raw[idx:].decode('ascii', errors='ignore') |
|
raw = raw[16:] |
|
for i in range(len(raw)): |
|
chunk = raw[i:i+8] |
|
if all(32 <= c < 127 for c in chunk): |
|
return raw[i:].decode('ascii', errors='ignore') |
|
return None |
|
|
|
def get_arc_cookies(): |
|
"""Read current claude.ai session cookies from Arc browser""" |
|
try: |
|
pw = subprocess.run( |
|
['security', 'find-generic-password', '-s', 'Arc Safe Storage', '-w'], |
|
capture_output=True, text=True, timeout=3 |
|
).stdout.strip() |
|
if not pw: |
|
return None |
|
key = hashlib.pbkdf2_hmac('sha1', pw.encode(), b'saltysalt', 1003, dklen=16) |
|
except Exception: |
|
return None |
|
|
|
db_path = os.path.expanduser( |
|
"~/Library/Application Support/Arc/User Data/Default/Cookies" |
|
) |
|
try: |
|
conn = sqlite3.connect(f"file:{db_path}?immutable=1&timeout=2", uri=True) |
|
cursor = conn.cursor() |
|
cursor.execute( |
|
"SELECT name, encrypted_value FROM cookies " |
|
"WHERE host_key LIKE '%claude.ai%' AND name IN ('sessionKey', 'cf_clearance')" |
|
) |
|
cookies = {} |
|
for name, enc_val in cursor.fetchall(): |
|
val = decrypt_cookie(enc_val, key) |
|
if val: |
|
cookies[name] = val |
|
conn.close() |
|
return cookies if cookies else None |
|
except Exception: |
|
return None |
|
|
|
def fetch_usage(): |
|
"""Fetch usage data using saved or Arc browser cookies""" |
|
import signal |
|
|
|
def timeout_handler(signum, frame): |
|
raise TimeoutError("Fetch timeout") |
|
|
|
# Global 6-second timeout (allows Arc attempt + saved fallback) |
|
signal.signal(signal.SIGALRM, timeout_handler) |
|
signal.alarm(6) |
|
|
|
try: |
|
current_email = get_current_account_email() |
|
arc_cookies = get_arc_cookies() |
|
saved_cookies = get_saved_cookies(current_email) |
|
|
|
# If Arc's sessionKey differs from the saved one for this account, |
|
# Arc is logged into a different account — exclude it from candidates. |
|
arc_is_different_account = ( |
|
arc_cookies is not None and |
|
saved_cookies is not None and |
|
arc_cookies.get('sessionKey') != saved_cookies.get('sessionKey') |
|
) |
|
|
|
candidates = [] |
|
if arc_cookies and not arc_is_different_account: |
|
candidates.append(('arc', arc_cookies)) |
|
if saved_cookies: |
|
candidates.append(('saved', saved_cookies)) |
|
|
|
base_headers = { |
|
"Accept": "application/json", |
|
"Accept-Language": "en-US,en;q=0.9", |
|
"User-Agent": ( |
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " |
|
"AppleWebKit/537.36 (KHTML, like Gecko) " |
|
"Chrome/131.0.0.0 Safari/537.36" |
|
), |
|
"Referer": "https://claude.ai/", |
|
} |
|
|
|
for source, cookies in candidates: |
|
if not cookies or 'sessionKey' not in cookies: |
|
continue |
|
|
|
try: |
|
scraper = cloudscraper.create_scraper() |
|
scraper.headers.update({ |
|
**base_headers, |
|
"Cookie": "; ".join(f"{k}={v}" for k, v in cookies.items()), |
|
}) |
|
|
|
orgs_resp = scraper.get("https://claude.ai/api/organizations", timeout=3) |
|
if orgs_resp.status_code != 200: |
|
continue # wrong account or expired — try next source |
|
|
|
orgs = orgs_resp.json() |
|
org_id = (orgs[0] if isinstance(orgs, list) else orgs).get("uuid") if orgs else None |
|
if not org_id: |
|
continue |
|
|
|
usage_resp = scraper.get( |
|
f"https://claude.ai/api/organizations/{org_id}/usage", timeout=3 |
|
) |
|
if usage_resp.status_code != 200: |
|
continue |
|
|
|
return usage_resp.json() |
|
except Exception: |
|
continue |
|
|
|
return None |
|
except (TimeoutError, Exception): |
|
return None |
|
finally: |
|
signal.alarm(0) |
|
|
|
def format_duration(total_minutes): |
|
"""Format minutes as '2d15h', '1h23m' or '45m'.""" |
|
total_minutes = int(total_minutes) |
|
if total_minutes <= 0: |
|
return "now" |
|
days = total_minutes // (60 * 24) |
|
hours = (total_minutes % (60 * 24)) // 60 |
|
minutes = total_minutes % 60 |
|
if days > 0: |
|
return f"{days}d{hours:02d}h" |
|
if hours > 0: |
|
return f"{hours}h{minutes:02d}m" |
|
return f"{minutes}m" |
|
|
|
def format_time_until_reset(resets_at_iso): |
|
"""Return time remaining until reset as '1h23m' or '45m'.""" |
|
try: |
|
reset_time = datetime.fromisoformat(resets_at_iso) |
|
now = datetime.now(timezone.utc) |
|
diff = reset_time - now |
|
total_seconds = int(diff.total_seconds()) |
|
if total_seconds <= 0: |
|
return "now" |
|
return format_duration(total_seconds // 60) |
|
except Exception: |
|
return "" |
|
|
|
def weekly_prediction(utilization, resets_at_iso): |
|
""" |
|
Estimate time remaining before the weekly limit is exhausted |
|
at the current consumption rate. |
|
""" |
|
try: |
|
from datetime import timedelta |
|
reset_time = datetime.fromisoformat(resets_at_iso) |
|
started_at = reset_time - timedelta(days=7) |
|
now = datetime.now(timezone.utc) |
|
elapsed_minutes = (now - started_at).total_seconds() / 60 |
|
if utilization <= 0 or elapsed_minutes <= 0: |
|
return "" |
|
remaining_pct = 100 - utilization |
|
if remaining_pct <= 0: |
|
return "!" |
|
minutes_left = remaining_pct * elapsed_minutes / utilization |
|
seconds_to_reset = (reset_time - now).total_seconds() |
|
if minutes_left * 60 > seconds_to_reset: |
|
return "" |
|
return format_duration(minutes_left) |
|
except Exception: |
|
return "" |
|
|
|
def save_cache_file(content, account_email=None): |
|
"""Write cache JSON with timestamp and account email""" |
|
try: |
|
cache_data = { |
|
"timestamp": int(time.time()), |
|
"five_h_info": content.get("five_h_info", ""), |
|
"weekly_info": content.get("weekly_info", ""), |
|
"account_email": account_email |
|
} |
|
with open(CACHE_FILE, "w") as f: |
|
json.dump(cache_data, f) |
|
except Exception: |
|
pass |
|
|
|
def _mask_key(key): |
|
"""Show only first and last 4 chars of a session key.""" |
|
if not key or len(key) < 12: |
|
return "(too short)" |
|
return f"{key[:8]}...{key[-4:]}" |
|
|
|
def cmd_check_cookies(): |
|
"""Command: compare Arc vs saved cookies and report if --save-cookies is needed""" |
|
email = get_current_account_email() |
|
if not email: |
|
print("Error: no active account found in ~/.claude.json") |
|
return |
|
|
|
print(f"Account : {email}") |
|
print() |
|
|
|
arc_cookies = get_arc_cookies() |
|
saved_cookies = get_saved_cookies(email) |
|
|
|
arc_key = arc_cookies.get('sessionKey') if arc_cookies else None |
|
saved_key = saved_cookies.get('sessionKey') if saved_cookies else None |
|
|
|
# Arc status |
|
if arc_key: |
|
print(f"Arc : {_mask_key(arc_key)}") |
|
else: |
|
print("Arc : not found (Arc not running or not logged in to claude.ai)") |
|
|
|
# Saved status |
|
if saved_key: |
|
cookie_file = get_cookies_file_path(email) |
|
mtime = os.path.getmtime(cookie_file) |
|
age_days = (time.time() - mtime) / 86400 |
|
print(f"Saved : {_mask_key(saved_key)} (saved {age_days:.0f}d ago)") |
|
else: |
|
print("Saved : not found") |
|
|
|
# Cache status |
|
print() |
|
try: |
|
with open(CACHE_FILE) as f: |
|
cache = json.load(f) |
|
cache_email = cache.get("account_email", "unknown") |
|
cache_age = (time.time() - cache.get("timestamp", 0)) / 60 |
|
match = "✓" if cache_email == email else "✗ MISMATCH" |
|
print(f"Cache : {cache_email} [{match}] ({cache_age:.0f} min ago)") |
|
except Exception: |
|
print("Cache : not found") |
|
|
|
# Verdict |
|
print() |
|
if not arc_key and not saved_key: |
|
print("✗ No cookies available — cannot fetch usage data") |
|
elif not arc_key and saved_key: |
|
print("✓ Arc not available, will use saved cookies") |
|
elif arc_key and not saved_key: |
|
print("! No saved cookies — run: uv run claude-usage.py --save-cookies") |
|
elif arc_key == saved_key: |
|
print("✓ Arc and saved cookies match — no action needed") |
|
else: |
|
print("✗ Arc session differs from saved cookies") |
|
print(" Arc is logged into a different account than Claude Code") |
|
print(" To fix: log Arc into the correct account, then run:") |
|
print(" uv run claude-usage.py --save-cookies") |
|
|
|
def cmd_save_cookies(): |
|
"""Command: save cookies for the current account""" |
|
email = get_current_account_email() |
|
if not email: |
|
print("Error: No active Claude account found") |
|
return False |
|
|
|
cookies = get_arc_cookies() |
|
if not cookies: |
|
print("Error: No cookies found for current account") |
|
return False |
|
|
|
if save_cookies(email, cookies): |
|
print(f"✓ Cookies saved for {email}") |
|
return True |
|
else: |
|
print(f"Error: Failed to save cookies for {email}") |
|
return False |
|
|
|
def main(): |
|
# Handle commands |
|
if len(sys.argv) > 1: |
|
if sys.argv[1] == "--save-cookies": |
|
cmd_save_cookies() |
|
return |
|
if sys.argv[1] == "--check-cookies": |
|
cmd_check_cookies() |
|
return |
|
|
|
# Normal flow: fetch and cache usage |
|
current_email = get_current_account_email() |
|
data = fetch_usage() |
|
|
|
if not data: |
|
return |
|
|
|
# 5-hour window |
|
five_h = data.get("five_hour") or {} |
|
utilization = five_h.get("utilization") |
|
resets_at = five_h.get("resets_at", "") |
|
|
|
five_h_parts = [] |
|
if utilization is not None: |
|
five_h_parts.append(f"{int(utilization)}%") |
|
if resets_at: |
|
remaining = format_time_until_reset(resets_at) |
|
if remaining: |
|
five_h_parts.append(remaining) |
|
five_h_info = " ".join(five_h_parts) |
|
|
|
# 7-day window |
|
seven_d = data.get("seven_day") or {} |
|
w_util = seven_d.get("utilization") |
|
w_resets_at = seven_d.get("resets_at", "") |
|
|
|
weekly_parts = [] |
|
if w_util is not None: |
|
weekly_parts.append(f"{int(w_util)}%") |
|
if w_resets_at: |
|
if w_util is not None: |
|
pred = weekly_prediction(w_util, w_resets_at) |
|
if pred: |
|
weekly_parts.append(pred) |
|
try: |
|
reset_time = datetime.fromisoformat(w_resets_at).astimezone() |
|
day_letters = ["L", "M", "X", "J", "V", "S", "D"] # Spanish weekday initials: Mon-Sun (Wed=X to avoid duplicate M) |
|
day_letter = day_letters[reset_time.weekday()] |
|
weekly_parts.append(f"{day_letter}{reset_time.hour}:{reset_time.minute:02d}") |
|
except Exception: |
|
pass |
|
weekly_info = " ".join(weekly_parts) |
|
|
|
save_cache_file({"five_h_info": five_h_info, "weekly_info": weekly_info}, current_email) |
|
|
|
if __name__ == "__main__": |
|
main() |