Skip to content

Instantly share code, notes, and snippets.

@cnolanminich
Last active January 23, 2026 20:53
Show Gist options
  • Select an option

  • Save cnolanminich/7ac382eef8840a110f14d8f3dd08c4d1 to your computer and use it in GitHub Desktop.

Select an option

Save cnolanminich/7ac382eef8840a110f14d8f3dd08c4d1 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "requests",
# ]
# ///
"""
Query Dagster's GraphQL API to get monthly credits (ops executed + asset materializations).
Replicates this SQL query:
WITH events AS (
SELECT DISTINCT
DATE_FORMAT(timestamp, '%Y-%m') AS event_month,
dagster_event_type,
COALESCE(run_id, '||', step_key) AS step_id,
COUNT(1) AS credits
FROM event_logs
WHERE dagster_event_type = 'STEP_START'
OR dagster_event_type = 'ASSET_MATERIALIZATION'
GROUP BY 1, 2, 3
)
SELECT event_month, SUM(credits) AS sum_credits
FROM events GROUP BY 1
Usage:
uv run tools/monthly_credits.py [--url http://localhost:3000/graphql]
"""
import argparse
import requests
from collections import defaultdict
from datetime import datetime
# GraphQL query to fetch runs
RUNS_QUERY = """
query RunsQuery($cursor: String, $limit: Int!) {
runsOrError(cursor: $cursor, limit: $limit) {
__typename
... on Runs {
results {
runId
}
}
... on PythonError {
message
}
}
}
"""
# GraphQL query to fetch events for a specific run
EVENTS_QUERY = """
query EventsQuery($runId: ID!) {
runOrError(runId: $runId) {
__typename
... on Run {
eventConnection {
events {
__typename
... on MessageEvent {
timestamp
}
... on ExecutionStepStartEvent {
stepKey
}
... on MaterializationEvent {
stepKey
assetKey {
path
}
}
}
}
}
... on RunNotFoundError {
message
}
... on PythonError {
message
}
}
}
"""
def fetch_all_runs(graphql_url: str) -> list[dict]:
"""Fetch all runs from Dagster."""
runs = []
cursor = None
while True:
response = requests.post(
graphql_url,
json={
"query": RUNS_QUERY,
"variables": {"cursor": cursor, "limit": 100}
}
)
response.raise_for_status()
data = response.json()
if "errors" in data:
raise Exception(f"GraphQL errors: {data['errors']}")
result = data["data"]["runsOrError"]
if result["__typename"] == "PythonError":
raise Exception(f"Dagster error: {result['message']}")
batch = result["results"]
if not batch:
break
runs.extend(batch)
cursor = batch[-1]["runId"]
print(f"Fetched {len(runs)} runs...")
return runs
def fetch_credit_events(graphql_url: str, run_id: str) -> list[dict]:
"""Fetch STEP_START and ASSET_MATERIALIZATION events for a run."""
events = []
response = requests.post(
graphql_url,
json={
"query": EVENTS_QUERY,
"variables": {"runId": run_id}
}
)
if not response.ok:
print(f"Error fetching events for run {run_id}: {response.status_code}")
print(f"Response: {response.text[:500]}")
return []
data = response.json()
if "errors" in data:
print(f"GraphQL errors for run {run_id}: {data['errors']}")
return []
result = data["data"]["runOrError"]
if result["__typename"] in ("RunNotFoundError", "PythonError"):
print(f"Warning: Could not fetch events for run {run_id}")
return []
event_connection = result["eventConnection"]
for event in event_connection["events"]:
if event["__typename"] == "ExecutionStepStartEvent":
events.append({
"timestamp": event["timestamp"],
"event_type": "STEP_START",
"step_key": event["stepKey"],
})
elif event["__typename"] == "MaterializationEvent":
events.append({
"timestamp": event["timestamp"],
"event_type": "ASSET_MATERIALIZATION",
"step_key": event.get("stepKey"),
"asset_key": "/".join(event["assetKey"]["path"]) if event.get("assetKey") else None,
})
return events
def aggregate_monthly_credits(graphql_url: str) -> dict:
"""
Aggregate credits by month.
Returns dict with:
- 'by_month': {'YYYY-MM': total_credits}
- 'by_month_and_type': {'YYYY-MM': {'STEP_START': n, 'ASSET_MATERIALIZATION': m}}
"""
print("Fetching runs...")
runs = fetch_all_runs(graphql_url)
print(f"Total runs: {len(runs)}")
# Track unique (month, event_type, step_id) combinations, then count
# This matches the SQL: GROUP BY month, event_type, step_id first
event_groups = defaultdict(int)
monthly_by_type = defaultdict(lambda: defaultdict(int))
for i, run in enumerate(runs):
run_id = run["runId"]
events = fetch_credit_events(graphql_url, run_id)
for event in events:
# timestamp is in milliseconds
ts = datetime.fromtimestamp(float(event["timestamp"]) / 1000)
month_key = ts.strftime("%Y-%m")
event_type = event["event_type"]
step_key = event.get("step_key") or ""
# Create step_id like SQL: coalesce(run_id, '||', step_key)
step_id = f"{run_id}||{step_key}"
# Count each unique (month, event_type, step_id) as 1 credit
group_key = (month_key, event_type, step_id)
event_groups[group_key] += 1
if (i + 1) % 10 == 0:
print(f"Processed {i + 1}/{len(runs)} runs...")
# Now aggregate: sum credits by month
monthly_totals = defaultdict(int)
for (month, event_type, step_id), count in event_groups.items():
monthly_totals[month] += count
monthly_by_type[month][event_type] += count
return {
"by_month": dict(sorted(monthly_totals.items())),
"by_month_and_type": {k: dict(v) for k, v in sorted(monthly_by_type.items())}
}
def main():
parser = argparse.ArgumentParser(
description="Query monthly credits (ops + materializations) from Dagster GraphQL API"
)
parser.add_argument(
"--url",
default="http://localhost:3000/graphql",
help="Dagster GraphQL endpoint URL (default: http://localhost:3000/graphql)"
)
parser.add_argument(
"--detailed",
action="store_true",
help="Show breakdown by event type"
)
args = parser.parse_args()
print(f"Querying Dagster at {args.url}\n")
result = aggregate_monthly_credits(args.url)
print("\n" + "=" * 60)
if args.detailed:
print(f"{'Month':<10} | {'Ops Executed':>15} | {'Materializations':>18} | {'Total':>10}")
print("-" * 60)
for month, total in result["by_month"].items():
by_type = result["by_month_and_type"].get(month, {})
ops = by_type.get("STEP_START", 0)
mats = by_type.get("ASSET_MATERIALIZATION", 0)
print(f"{month:<10} | {ops:>15,} | {mats:>18,} | {total:>10,}")
else:
print(f"{'Month':<10} | {'Total Credits':>15}")
print("-" * 30)
for month, total in result["by_month"].items():
print(f"{month:<10} | {total:>15,}")
print("=" * 60)
# Print grand totals
grand_total = sum(result["by_month"].values())
print(f"\nGrand Total: {grand_total:,} credits")
if __name__ == "__main__":
main()
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "requests",
# ]
# ///
"""
Query Dagster's GraphQL API to get monthly ops executed counts.
Replicates this SQL query:
WITH step_events AS (
SELECT DATE_TRUNC('month', timestamp) AS event_month, run_id, step_key, COUNT(*) as executions
FROM event_logs WHERE dagster_event_type = 'STEP_START'
GROUP BY DATE_TRUNC('month', timestamp), run_id, step_key
)
SELECT TO_CHAR(event_month, 'YYYY-MM') as month, SUM(executions) as total_ops_executed
FROM step_events GROUP BY event_month ORDER BY event_month;
Usage:
python monthly_ops_executed.py [--url http://localhost:3000/graphql]
"""
import argparse
import requests
from collections import defaultdict
from datetime import datetime
# GraphQL query to fetch runs with their step events
RUNS_QUERY = """
query RunsQuery($cursor: String, $limit: Int!) {
runsOrError(cursor: $cursor, limit: $limit) {
__typename
... on Runs {
results {
runId
startTime
}
}
... on PythonError {
message
}
}
}
"""
# GraphQL query to fetch events for a specific run
EVENTS_QUERY = """
query EventsQuery($runId: ID!) {
runOrError(runId: $runId) {
__typename
... on Run {
eventConnection {
events {
__typename
... on MessageEvent {
timestamp
}
... on ExecutionStepStartEvent {
stepKey
}
}
}
}
... on RunNotFoundError {
message
}
... on PythonError {
message
}
}
}
"""
# Alternative: Single query approach using assetOrError if you want asset-level metrics
# This is more efficient but requires knowing asset keys upfront
def fetch_all_runs(graphql_url: str) -> list[dict]:
"""Fetch all runs from Dagster."""
runs = []
cursor = None
while True:
response = requests.post(
graphql_url,
json={
"query": RUNS_QUERY,
"variables": {"cursor": cursor, "limit": 100}
}
)
response.raise_for_status()
data = response.json()
if "errors" in data:
raise Exception(f"GraphQL errors: {data['errors']}")
result = data["data"]["runsOrError"]
if result["__typename"] == "PythonError":
raise Exception(f"Dagster error: {result['message']}")
batch = result["results"]
if not batch:
break
runs.extend(batch)
cursor = batch[-1]["runId"]
print(f"Fetched {len(runs)} runs...")
return runs
def fetch_step_start_events(graphql_url: str, run_id: str) -> list[dict]:
"""Fetch all STEP_START events for a run."""
events = []
response = requests.post(
graphql_url,
json={
"query": EVENTS_QUERY,
"variables": {"runId": run_id}
}
)
if not response.ok:
print(f"Error fetching events for run {run_id}: {response.status_code}")
print(f"Response: {response.text[:500]}")
return []
data = response.json()
if "errors" in data:
print(f"GraphQL errors for run {run_id}: {data['errors']}")
return []
result = data["data"]["runOrError"]
if result["__typename"] in ("RunNotFoundError", "PythonError"):
print(f"Warning: Could not fetch events for run {run_id}")
return []
event_connection = result["eventConnection"]
for event in event_connection["events"]:
if event["__typename"] == "ExecutionStepStartEvent":
events.append({
"timestamp": event["timestamp"],
"step_key": event["stepKey"]
})
return events
def aggregate_monthly_ops(graphql_url: str) -> dict[str, int]:
"""
Aggregate ops executed by month.
Returns dict mapping 'YYYY-MM' -> total_ops_executed
"""
# Step 1: Fetch all runs
print("Fetching runs...")
runs = fetch_all_runs(graphql_url)
print(f"Total runs: {len(runs)}")
# Step 2: For each run, fetch STEP_START events and aggregate
monthly_counts = defaultdict(int)
for i, run in enumerate(runs):
run_id = run["runId"]
events = fetch_step_start_events(graphql_url, run_id)
# Group by (month, run_id, step_key) first, then sum
# This matches the SQL's CTE behavior
step_events = defaultdict(int)
for event in events:
# timestamp is in seconds since epoch
# timestamp comes as string in milliseconds since epoch
ts = datetime.fromtimestamp(float(event["timestamp"]) / 1000)
month_key = ts.strftime("%Y-%m")
step_key = event["step_key"]
step_events[(month_key, run_id, step_key)] += 1
# Add to monthly totals
for (month, _, _), count in step_events.items():
monthly_counts[month] += count
if (i + 1) % 10 == 0:
print(f"Processed {i + 1}/{len(runs)} runs...")
return dict(sorted(monthly_counts.items()))
def main():
parser = argparse.ArgumentParser(
description="Query monthly ops executed from Dagster GraphQL API"
)
parser.add_argument(
"--url",
default="http://localhost:3000/graphql",
help="Dagster GraphQL endpoint URL (default: http://localhost:3000/graphql)"
)
args = parser.parse_args()
print(f"Querying Dagster at {args.url}\n")
monthly_ops = aggregate_monthly_ops(args.url)
print("\n" + "=" * 40)
print(f"{'Month':<10} | {'Total Ops Executed':>20}")
print("-" * 40)
for month, count in monthly_ops.items():
print(f"{month:<10} | {count:>20,}")
print("=" * 40)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment