Last active
January 23, 2026 20:53
-
-
Save cnolanminich/7ac382eef8840a110f14d8f3dd08c4d1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # /// script | |
| # requires-python = ">=3.10" | |
| # dependencies = [ | |
| # "requests", | |
| # ] | |
| # /// | |
| """ | |
| Query Dagster's GraphQL API to get monthly credits (ops executed + asset materializations). | |
| Replicates this SQL query: | |
| WITH events AS ( | |
| SELECT DISTINCT | |
| DATE_FORMAT(timestamp, '%Y-%m') AS event_month, | |
| dagster_event_type, | |
| COALESCE(run_id, '||', step_key) AS step_id, | |
| COUNT(1) AS credits | |
| FROM event_logs | |
| WHERE dagster_event_type = 'STEP_START' | |
| OR dagster_event_type = 'ASSET_MATERIALIZATION' | |
| GROUP BY 1, 2, 3 | |
| ) | |
| SELECT event_month, SUM(credits) AS sum_credits | |
| FROM events GROUP BY 1 | |
| Usage: | |
| uv run tools/monthly_credits.py [--url http://localhost:3000/graphql] | |
| """ | |
| import argparse | |
| import requests | |
| from collections import defaultdict | |
| from datetime import datetime | |
| # GraphQL query to fetch runs | |
| RUNS_QUERY = """ | |
| query RunsQuery($cursor: String, $limit: Int!) { | |
| runsOrError(cursor: $cursor, limit: $limit) { | |
| __typename | |
| ... on Runs { | |
| results { | |
| runId | |
| } | |
| } | |
| ... on PythonError { | |
| message | |
| } | |
| } | |
| } | |
| """ | |
| # GraphQL query to fetch events for a specific run | |
| EVENTS_QUERY = """ | |
| query EventsQuery($runId: ID!) { | |
| runOrError(runId: $runId) { | |
| __typename | |
| ... on Run { | |
| eventConnection { | |
| events { | |
| __typename | |
| ... on MessageEvent { | |
| timestamp | |
| } | |
| ... on ExecutionStepStartEvent { | |
| stepKey | |
| } | |
| ... on MaterializationEvent { | |
| stepKey | |
| assetKey { | |
| path | |
| } | |
| } | |
| } | |
| } | |
| } | |
| ... on RunNotFoundError { | |
| message | |
| } | |
| ... on PythonError { | |
| message | |
| } | |
| } | |
| } | |
| """ | |
| def fetch_all_runs(graphql_url: str) -> list[dict]: | |
| """Fetch all runs from Dagster.""" | |
| runs = [] | |
| cursor = None | |
| while True: | |
| response = requests.post( | |
| graphql_url, | |
| json={ | |
| "query": RUNS_QUERY, | |
| "variables": {"cursor": cursor, "limit": 100} | |
| } | |
| ) | |
| response.raise_for_status() | |
| data = response.json() | |
| if "errors" in data: | |
| raise Exception(f"GraphQL errors: {data['errors']}") | |
| result = data["data"]["runsOrError"] | |
| if result["__typename"] == "PythonError": | |
| raise Exception(f"Dagster error: {result['message']}") | |
| batch = result["results"] | |
| if not batch: | |
| break | |
| runs.extend(batch) | |
| cursor = batch[-1]["runId"] | |
| print(f"Fetched {len(runs)} runs...") | |
| return runs | |
| def fetch_credit_events(graphql_url: str, run_id: str) -> list[dict]: | |
| """Fetch STEP_START and ASSET_MATERIALIZATION events for a run.""" | |
| events = [] | |
| response = requests.post( | |
| graphql_url, | |
| json={ | |
| "query": EVENTS_QUERY, | |
| "variables": {"runId": run_id} | |
| } | |
| ) | |
| if not response.ok: | |
| print(f"Error fetching events for run {run_id}: {response.status_code}") | |
| print(f"Response: {response.text[:500]}") | |
| return [] | |
| data = response.json() | |
| if "errors" in data: | |
| print(f"GraphQL errors for run {run_id}: {data['errors']}") | |
| return [] | |
| result = data["data"]["runOrError"] | |
| if result["__typename"] in ("RunNotFoundError", "PythonError"): | |
| print(f"Warning: Could not fetch events for run {run_id}") | |
| return [] | |
| event_connection = result["eventConnection"] | |
| for event in event_connection["events"]: | |
| if event["__typename"] == "ExecutionStepStartEvent": | |
| events.append({ | |
| "timestamp": event["timestamp"], | |
| "event_type": "STEP_START", | |
| "step_key": event["stepKey"], | |
| }) | |
| elif event["__typename"] == "MaterializationEvent": | |
| events.append({ | |
| "timestamp": event["timestamp"], | |
| "event_type": "ASSET_MATERIALIZATION", | |
| "step_key": event.get("stepKey"), | |
| "asset_key": "/".join(event["assetKey"]["path"]) if event.get("assetKey") else None, | |
| }) | |
| return events | |
| def aggregate_monthly_credits(graphql_url: str) -> dict: | |
| """ | |
| Aggregate credits by month. | |
| Returns dict with: | |
| - 'by_month': {'YYYY-MM': total_credits} | |
| - 'by_month_and_type': {'YYYY-MM': {'STEP_START': n, 'ASSET_MATERIALIZATION': m}} | |
| """ | |
| print("Fetching runs...") | |
| runs = fetch_all_runs(graphql_url) | |
| print(f"Total runs: {len(runs)}") | |
| # Track unique (month, event_type, step_id) combinations, then count | |
| # This matches the SQL: GROUP BY month, event_type, step_id first | |
| event_groups = defaultdict(int) | |
| monthly_by_type = defaultdict(lambda: defaultdict(int)) | |
| for i, run in enumerate(runs): | |
| run_id = run["runId"] | |
| events = fetch_credit_events(graphql_url, run_id) | |
| for event in events: | |
| # timestamp is in milliseconds | |
| ts = datetime.fromtimestamp(float(event["timestamp"]) / 1000) | |
| month_key = ts.strftime("%Y-%m") | |
| event_type = event["event_type"] | |
| step_key = event.get("step_key") or "" | |
| # Create step_id like SQL: coalesce(run_id, '||', step_key) | |
| step_id = f"{run_id}||{step_key}" | |
| # Count each unique (month, event_type, step_id) as 1 credit | |
| group_key = (month_key, event_type, step_id) | |
| event_groups[group_key] += 1 | |
| if (i + 1) % 10 == 0: | |
| print(f"Processed {i + 1}/{len(runs)} runs...") | |
| # Now aggregate: sum credits by month | |
| monthly_totals = defaultdict(int) | |
| for (month, event_type, step_id), count in event_groups.items(): | |
| monthly_totals[month] += count | |
| monthly_by_type[month][event_type] += count | |
| return { | |
| "by_month": dict(sorted(monthly_totals.items())), | |
| "by_month_and_type": {k: dict(v) for k, v in sorted(monthly_by_type.items())} | |
| } | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Query monthly credits (ops + materializations) from Dagster GraphQL API" | |
| ) | |
| parser.add_argument( | |
| "--url", | |
| default="http://localhost:3000/graphql", | |
| help="Dagster GraphQL endpoint URL (default: http://localhost:3000/graphql)" | |
| ) | |
| parser.add_argument( | |
| "--detailed", | |
| action="store_true", | |
| help="Show breakdown by event type" | |
| ) | |
| args = parser.parse_args() | |
| print(f"Querying Dagster at {args.url}\n") | |
| result = aggregate_monthly_credits(args.url) | |
| print("\n" + "=" * 60) | |
| if args.detailed: | |
| print(f"{'Month':<10} | {'Ops Executed':>15} | {'Materializations':>18} | {'Total':>10}") | |
| print("-" * 60) | |
| for month, total in result["by_month"].items(): | |
| by_type = result["by_month_and_type"].get(month, {}) | |
| ops = by_type.get("STEP_START", 0) | |
| mats = by_type.get("ASSET_MATERIALIZATION", 0) | |
| print(f"{month:<10} | {ops:>15,} | {mats:>18,} | {total:>10,}") | |
| else: | |
| print(f"{'Month':<10} | {'Total Credits':>15}") | |
| print("-" * 30) | |
| for month, total in result["by_month"].items(): | |
| print(f"{month:<10} | {total:>15,}") | |
| print("=" * 60) | |
| # Print grand totals | |
| grand_total = sum(result["by_month"].values()) | |
| print(f"\nGrand Total: {grand_total:,} credits") | |
| if __name__ == "__main__": | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # /// script | |
| # requires-python = ">=3.10" | |
| # dependencies = [ | |
| # "requests", | |
| # ] | |
| # /// | |
| """ | |
| Query Dagster's GraphQL API to get monthly ops executed counts. | |
| Replicates this SQL query: | |
| WITH step_events AS ( | |
| SELECT DATE_TRUNC('month', timestamp) AS event_month, run_id, step_key, COUNT(*) as executions | |
| FROM event_logs WHERE dagster_event_type = 'STEP_START' | |
| GROUP BY DATE_TRUNC('month', timestamp), run_id, step_key | |
| ) | |
| SELECT TO_CHAR(event_month, 'YYYY-MM') as month, SUM(executions) as total_ops_executed | |
| FROM step_events GROUP BY event_month ORDER BY event_month; | |
| Usage: | |
| python monthly_ops_executed.py [--url http://localhost:3000/graphql] | |
| """ | |
| import argparse | |
| import requests | |
| from collections import defaultdict | |
| from datetime import datetime | |
| # GraphQL query to fetch runs with their step events | |
| RUNS_QUERY = """ | |
| query RunsQuery($cursor: String, $limit: Int!) { | |
| runsOrError(cursor: $cursor, limit: $limit) { | |
| __typename | |
| ... on Runs { | |
| results { | |
| runId | |
| startTime | |
| } | |
| } | |
| ... on PythonError { | |
| message | |
| } | |
| } | |
| } | |
| """ | |
| # GraphQL query to fetch events for a specific run | |
| EVENTS_QUERY = """ | |
| query EventsQuery($runId: ID!) { | |
| runOrError(runId: $runId) { | |
| __typename | |
| ... on Run { | |
| eventConnection { | |
| events { | |
| __typename | |
| ... on MessageEvent { | |
| timestamp | |
| } | |
| ... on ExecutionStepStartEvent { | |
| stepKey | |
| } | |
| } | |
| } | |
| } | |
| ... on RunNotFoundError { | |
| message | |
| } | |
| ... on PythonError { | |
| message | |
| } | |
| } | |
| } | |
| """ | |
| # Alternative: Single query approach using assetOrError if you want asset-level metrics | |
| # This is more efficient but requires knowing asset keys upfront | |
| def fetch_all_runs(graphql_url: str) -> list[dict]: | |
| """Fetch all runs from Dagster.""" | |
| runs = [] | |
| cursor = None | |
| while True: | |
| response = requests.post( | |
| graphql_url, | |
| json={ | |
| "query": RUNS_QUERY, | |
| "variables": {"cursor": cursor, "limit": 100} | |
| } | |
| ) | |
| response.raise_for_status() | |
| data = response.json() | |
| if "errors" in data: | |
| raise Exception(f"GraphQL errors: {data['errors']}") | |
| result = data["data"]["runsOrError"] | |
| if result["__typename"] == "PythonError": | |
| raise Exception(f"Dagster error: {result['message']}") | |
| batch = result["results"] | |
| if not batch: | |
| break | |
| runs.extend(batch) | |
| cursor = batch[-1]["runId"] | |
| print(f"Fetched {len(runs)} runs...") | |
| return runs | |
| def fetch_step_start_events(graphql_url: str, run_id: str) -> list[dict]: | |
| """Fetch all STEP_START events for a run.""" | |
| events = [] | |
| response = requests.post( | |
| graphql_url, | |
| json={ | |
| "query": EVENTS_QUERY, | |
| "variables": {"runId": run_id} | |
| } | |
| ) | |
| if not response.ok: | |
| print(f"Error fetching events for run {run_id}: {response.status_code}") | |
| print(f"Response: {response.text[:500]}") | |
| return [] | |
| data = response.json() | |
| if "errors" in data: | |
| print(f"GraphQL errors for run {run_id}: {data['errors']}") | |
| return [] | |
| result = data["data"]["runOrError"] | |
| if result["__typename"] in ("RunNotFoundError", "PythonError"): | |
| print(f"Warning: Could not fetch events for run {run_id}") | |
| return [] | |
| event_connection = result["eventConnection"] | |
| for event in event_connection["events"]: | |
| if event["__typename"] == "ExecutionStepStartEvent": | |
| events.append({ | |
| "timestamp": event["timestamp"], | |
| "step_key": event["stepKey"] | |
| }) | |
| return events | |
| def aggregate_monthly_ops(graphql_url: str) -> dict[str, int]: | |
| """ | |
| Aggregate ops executed by month. | |
| Returns dict mapping 'YYYY-MM' -> total_ops_executed | |
| """ | |
| # Step 1: Fetch all runs | |
| print("Fetching runs...") | |
| runs = fetch_all_runs(graphql_url) | |
| print(f"Total runs: {len(runs)}") | |
| # Step 2: For each run, fetch STEP_START events and aggregate | |
| monthly_counts = defaultdict(int) | |
| for i, run in enumerate(runs): | |
| run_id = run["runId"] | |
| events = fetch_step_start_events(graphql_url, run_id) | |
| # Group by (month, run_id, step_key) first, then sum | |
| # This matches the SQL's CTE behavior | |
| step_events = defaultdict(int) | |
| for event in events: | |
| # timestamp is in seconds since epoch | |
| # timestamp comes as string in milliseconds since epoch | |
| ts = datetime.fromtimestamp(float(event["timestamp"]) / 1000) | |
| month_key = ts.strftime("%Y-%m") | |
| step_key = event["step_key"] | |
| step_events[(month_key, run_id, step_key)] += 1 | |
| # Add to monthly totals | |
| for (month, _, _), count in step_events.items(): | |
| monthly_counts[month] += count | |
| if (i + 1) % 10 == 0: | |
| print(f"Processed {i + 1}/{len(runs)} runs...") | |
| return dict(sorted(monthly_counts.items())) | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Query monthly ops executed from Dagster GraphQL API" | |
| ) | |
| parser.add_argument( | |
| "--url", | |
| default="http://localhost:3000/graphql", | |
| help="Dagster GraphQL endpoint URL (default: http://localhost:3000/graphql)" | |
| ) | |
| args = parser.parse_args() | |
| print(f"Querying Dagster at {args.url}\n") | |
| monthly_ops = aggregate_monthly_ops(args.url) | |
| print("\n" + "=" * 40) | |
| print(f"{'Month':<10} | {'Total Ops Executed':>20}") | |
| print("-" * 40) | |
| for month, count in monthly_ops.items(): | |
| print(f"{month:<10} | {count:>20,}") | |
| print("=" * 40) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment