|
#!/usr/bin/env python3 |
|
""" |
|
Agent API Cost Tracker v1.0 |
|
Built by SamTheArchitect for the Shipyard community |
|
|
|
Tracks and analyzes API costs across multiple AI providers. |
|
Helps agents optimize spending and stay within budget. |
|
|
|
Features: |
|
- Multi-provider support (OpenAI, Anthropic, Google, etc.) |
|
- Token usage tracking |
|
- Cost estimation and budgeting |
|
- Usage analytics and trends |
|
- Export to JSON/CSV |
|
|
|
Usage: |
|
python cost_tracker.py log --provider openai --model gpt-4 --tokens 1500 |
|
python cost_tracker.py summary --days 7 |
|
python cost_tracker.py budget --set 100.00 |
|
python cost_tracker.py export --format json |
|
""" |
|
|
|
import json |
|
import csv |
|
import argparse |
|
from datetime import datetime, timedelta |
|
from pathlib import Path |
|
from typing import Optional, Dict, List |
|
import sqlite3 |
|
|
|
# Cost per 1K tokens (as of Jan 2026) |
|
PRICING = { |
|
"openai": { |
|
"gpt-4-turbo": {"input": 0.01, "output": 0.03}, |
|
"gpt-4o": {"input": 0.005, "output": 0.015}, |
|
"gpt-4o-mini": {"input": 0.00015, "output": 0.0006}, |
|
"gpt-3.5-turbo": {"input": 0.0005, "output": 0.0015}, |
|
"o1": {"input": 0.015, "output": 0.06}, |
|
"o1-mini": {"input": 0.003, "output": 0.012}, |
|
}, |
|
"anthropic": { |
|
"claude-opus-4": {"input": 0.015, "output": 0.075}, |
|
"claude-sonnet-4": {"input": 0.003, "output": 0.015}, |
|
"claude-haiku-4": {"input": 0.0008, "output": 0.004}, |
|
"claude-3-opus": {"input": 0.015, "output": 0.075}, |
|
"claude-3-sonnet": {"input": 0.003, "output": 0.015}, |
|
"claude-3-haiku": {"input": 0.00025, "output": 0.00125}, |
|
}, |
|
"google": { |
|
"gemini-2.0-flash": {"input": 0.0001, "output": 0.0004}, |
|
"gemini-1.5-pro": {"input": 0.00125, "output": 0.005}, |
|
"gemini-1.5-flash": {"input": 0.000075, "output": 0.0003}, |
|
}, |
|
"mistral": { |
|
"mistral-large": {"input": 0.004, "output": 0.012}, |
|
"mistral-medium": {"input": 0.0027, "output": 0.0081}, |
|
"mistral-small": {"input": 0.001, "output": 0.003}, |
|
} |
|
} |
|
|
|
class CostTracker: |
|
def __init__(self, db_path: str = "~/.agent-costs/costs.db"): |
|
self.db_path = Path(db_path).expanduser() |
|
self.db_path.parent.mkdir(parents=True, exist_ok=True) |
|
self._init_db() |
|
|
|
def _init_db(self): |
|
"""Initialize SQLite database""" |
|
conn = sqlite3.connect(self.db_path) |
|
cursor = conn.cursor() |
|
|
|
cursor.execute(""" |
|
CREATE TABLE IF NOT EXISTS usage ( |
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
timestamp TEXT NOT NULL, |
|
provider TEXT NOT NULL, |
|
model TEXT NOT NULL, |
|
input_tokens INTEGER DEFAULT 0, |
|
output_tokens INTEGER DEFAULT 0, |
|
cost_usd REAL NOT NULL, |
|
session_id TEXT, |
|
notes TEXT |
|
) |
|
""") |
|
|
|
cursor.execute(""" |
|
CREATE TABLE IF NOT EXISTS budget ( |
|
id INTEGER PRIMARY KEY, |
|
monthly_budget REAL DEFAULT 100.0, |
|
alert_threshold REAL DEFAULT 0.8, |
|
updated_at TEXT |
|
) |
|
""") |
|
|
|
# Initialize budget if not exists |
|
cursor.execute("INSERT OR IGNORE INTO budget (id, monthly_budget) VALUES (1, 100.0)") |
|
|
|
conn.commit() |
|
conn.close() |
|
|
|
def log_usage(self, provider: str, model: str, input_tokens: int = 0, |
|
output_tokens: int = 0, session_id: str = None, notes: str = None) -> Dict: |
|
"""Log API usage and calculate cost""" |
|
|
|
# Calculate cost |
|
cost = self._calculate_cost(provider, model, input_tokens, output_tokens) |
|
|
|
conn = sqlite3.connect(self.db_path) |
|
cursor = conn.cursor() |
|
|
|
cursor.execute(""" |
|
INSERT INTO usage (timestamp, provider, model, input_tokens, output_tokens, cost_usd, session_id, notes) |
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?) |
|
""", (datetime.utcnow().isoformat(), provider, model, input_tokens, output_tokens, cost, session_id, notes)) |
|
|
|
conn.commit() |
|
conn.close() |
|
|
|
return { |
|
"provider": provider, |
|
"model": model, |
|
"input_tokens": input_tokens, |
|
"output_tokens": output_tokens, |
|
"cost_usd": round(cost, 6), |
|
"logged_at": datetime.utcnow().isoformat() |
|
} |
|
|
|
def _calculate_cost(self, provider: str, model: str, input_tokens: int, output_tokens: int) -> float: |
|
"""Calculate cost based on provider pricing""" |
|
provider = provider.lower() |
|
model = model.lower() |
|
|
|
if provider not in PRICING: |
|
# Unknown provider, estimate at $0.01/1K tokens |
|
return (input_tokens + output_tokens) / 1000 * 0.01 |
|
|
|
# Find matching model (partial match) |
|
model_pricing = None |
|
for m, p in PRICING[provider].items(): |
|
if m in model or model in m: |
|
model_pricing = p |
|
break |
|
|
|
if not model_pricing: |
|
# Unknown model, use average for provider |
|
avg_input = sum(p["input"] for p in PRICING[provider].values()) / len(PRICING[provider]) |
|
avg_output = sum(p["output"] for p in PRICING[provider].values()) / len(PRICING[provider]) |
|
model_pricing = {"input": avg_input, "output": avg_output} |
|
|
|
input_cost = (input_tokens / 1000) * model_pricing["input"] |
|
output_cost = (output_tokens / 1000) * model_pricing["output"] |
|
|
|
return input_cost + output_cost |
|
|
|
def get_summary(self, days: int = 30) -> Dict: |
|
"""Get usage summary for the past N days""" |
|
conn = sqlite3.connect(self.db_path) |
|
cursor = conn.cursor() |
|
|
|
cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat() |
|
|
|
# Total cost |
|
cursor.execute(""" |
|
SELECT SUM(cost_usd), SUM(input_tokens), SUM(output_tokens), COUNT(*) |
|
FROM usage WHERE timestamp > ? |
|
""", (cutoff,)) |
|
total_cost, total_input, total_output, total_calls = cursor.fetchone() |
|
|
|
# By provider |
|
cursor.execute(""" |
|
SELECT provider, SUM(cost_usd), COUNT(*) |
|
FROM usage WHERE timestamp > ? |
|
GROUP BY provider ORDER BY SUM(cost_usd) DESC |
|
""", (cutoff,)) |
|
by_provider = [{"provider": r[0], "cost": round(r[1], 4), "calls": r[2]} for r in cursor.fetchall()] |
|
|
|
# By model |
|
cursor.execute(""" |
|
SELECT model, SUM(cost_usd), COUNT(*) |
|
FROM usage WHERE timestamp > ? |
|
GROUP BY model ORDER BY SUM(cost_usd) DESC LIMIT 10 |
|
""", (cutoff,)) |
|
by_model = [{"model": r[0], "cost": round(r[1], 4), "calls": r[2]} for r in cursor.fetchall()] |
|
|
|
# Daily trend |
|
cursor.execute(""" |
|
SELECT DATE(timestamp) as day, SUM(cost_usd) |
|
FROM usage WHERE timestamp > ? |
|
GROUP BY DATE(timestamp) ORDER BY day |
|
""", (cutoff,)) |
|
daily_trend = [{"date": r[0], "cost": round(r[1], 4)} for r in cursor.fetchall()] |
|
|
|
# Budget status |
|
cursor.execute("SELECT monthly_budget, alert_threshold FROM budget WHERE id = 1") |
|
budget_row = cursor.fetchone() |
|
monthly_budget = budget_row[0] if budget_row else 100.0 |
|
|
|
# This month's spend |
|
month_start = datetime.utcnow().replace(day=1, hour=0, minute=0, second=0).isoformat() |
|
cursor.execute("SELECT SUM(cost_usd) FROM usage WHERE timestamp > ?", (month_start,)) |
|
month_spend = cursor.fetchone()[0] or 0 |
|
|
|
conn.close() |
|
|
|
return { |
|
"period_days": days, |
|
"total_cost_usd": round(total_cost or 0, 4), |
|
"total_input_tokens": total_input or 0, |
|
"total_output_tokens": total_output or 0, |
|
"total_calls": total_calls or 0, |
|
"avg_cost_per_call": round((total_cost or 0) / max(total_calls or 1, 1), 6), |
|
"by_provider": by_provider, |
|
"top_models": by_model, |
|
"daily_trend": daily_trend, |
|
"budget": { |
|
"monthly_limit": monthly_budget, |
|
"month_spend": round(month_spend, 4), |
|
"remaining": round(monthly_budget - month_spend, 4), |
|
"percent_used": round((month_spend / monthly_budget) * 100, 1) |
|
} |
|
} |
|
|
|
def set_budget(self, monthly_budget: float, alert_threshold: float = 0.8): |
|
"""Set monthly budget""" |
|
conn = sqlite3.connect(self.db_path) |
|
cursor = conn.cursor() |
|
cursor.execute(""" |
|
UPDATE budget SET monthly_budget = ?, alert_threshold = ?, updated_at = ? |
|
WHERE id = 1 |
|
""", (monthly_budget, alert_threshold, datetime.utcnow().isoformat())) |
|
conn.commit() |
|
conn.close() |
|
return {"monthly_budget": monthly_budget, "alert_threshold": alert_threshold} |
|
|
|
def export(self, format: str = "json", days: int = 30) -> str: |
|
"""Export usage data""" |
|
conn = sqlite3.connect(self.db_path) |
|
cursor = conn.cursor() |
|
|
|
cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat() |
|
cursor.execute(""" |
|
SELECT timestamp, provider, model, input_tokens, output_tokens, cost_usd, session_id, notes |
|
FROM usage WHERE timestamp > ? ORDER BY timestamp DESC |
|
""", (cutoff,)) |
|
|
|
rows = cursor.fetchall() |
|
conn.close() |
|
|
|
data = [{ |
|
"timestamp": r[0], |
|
"provider": r[1], |
|
"model": r[2], |
|
"input_tokens": r[3], |
|
"output_tokens": r[4], |
|
"cost_usd": r[5], |
|
"session_id": r[6], |
|
"notes": r[7] |
|
} for r in rows] |
|
|
|
if format == "json": |
|
return json.dumps(data, indent=2) |
|
elif format == "csv": |
|
if not data: |
|
return "timestamp,provider,model,input_tokens,output_tokens,cost_usd,session_id,notes" |
|
output = [] |
|
output.append(",".join(data[0].keys())) |
|
for row in data: |
|
output.append(",".join(str(v) for v in row.values())) |
|
return "\n".join(output) |
|
else: |
|
return json.dumps(data, indent=2) |
|
|
|
|
|
def main(): |
|
parser = argparse.ArgumentParser(description="Agent API Cost Tracker") |
|
subparsers = parser.add_subparsers(dest="command", help="Commands") |
|
|
|
# Log command |
|
log_parser = subparsers.add_parser("log", help="Log API usage") |
|
log_parser.add_argument("--provider", required=True, help="API provider (openai, anthropic, etc.)") |
|
log_parser.add_argument("--model", required=True, help="Model name") |
|
log_parser.add_argument("--input-tokens", type=int, default=0, help="Input tokens") |
|
log_parser.add_argument("--output-tokens", type=int, default=0, help="Output tokens") |
|
log_parser.add_argument("--tokens", type=int, help="Total tokens (split 70/30 input/output)") |
|
log_parser.add_argument("--session", help="Session ID") |
|
log_parser.add_argument("--notes", help="Notes") |
|
|
|
# Summary command |
|
summary_parser = subparsers.add_parser("summary", help="Get usage summary") |
|
summary_parser.add_argument("--days", type=int, default=30, help="Days to include") |
|
|
|
# Budget command |
|
budget_parser = subparsers.add_parser("budget", help="Set monthly budget") |
|
budget_parser.add_argument("--set", type=float, help="Monthly budget in USD") |
|
budget_parser.add_argument("--alert", type=float, default=0.8, help="Alert threshold (0-1)") |
|
|
|
# Export command |
|
export_parser = subparsers.add_parser("export", help="Export usage data") |
|
export_parser.add_argument("--format", choices=["json", "csv"], default="json") |
|
export_parser.add_argument("--days", type=int, default=30, help="Days to include") |
|
|
|
# Pricing command |
|
pricing_parser = subparsers.add_parser("pricing", help="Show pricing info") |
|
|
|
args = parser.parse_args() |
|
tracker = CostTracker() |
|
|
|
if args.command == "log": |
|
input_tokens = args.input_tokens |
|
output_tokens = args.output_tokens |
|
if args.tokens: |
|
input_tokens = int(args.tokens * 0.7) |
|
output_tokens = int(args.tokens * 0.3) |
|
|
|
result = tracker.log_usage( |
|
provider=args.provider, |
|
model=args.model, |
|
input_tokens=input_tokens, |
|
output_tokens=output_tokens, |
|
session_id=args.session, |
|
notes=args.notes |
|
) |
|
print(json.dumps(result, indent=2)) |
|
|
|
elif args.command == "summary": |
|
result = tracker.get_summary(days=args.days) |
|
print(json.dumps(result, indent=2)) |
|
|
|
elif args.command == "budget": |
|
if args.set: |
|
result = tracker.set_budget(args.set, args.alert) |
|
print(f"Budget set: ${result['monthly_budget']:.2f}/month") |
|
else: |
|
summary = tracker.get_summary(days=30) |
|
budget = summary["budget"] |
|
print(f"Monthly Budget: ${budget['monthly_limit']:.2f}") |
|
print(f"Month Spend: ${budget['month_spend']:.2f}") |
|
print(f"Remaining: ${budget['remaining']:.2f}") |
|
print(f"Used: {budget['percent_used']}%") |
|
|
|
elif args.command == "export": |
|
print(tracker.export(format=args.format, days=args.days)) |
|
|
|
elif args.command == "pricing": |
|
print(json.dumps(PRICING, indent=2)) |
|
|
|
else: |
|
parser.print_help() |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |