Created
January 8, 2026 21:27
-
-
Save ethanabrooks/e8d0c3245ab760ef864849beee23d2e7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from __future__ import annotations | |
| import argparse | |
| import datetime | |
| from typing import Any | |
| from google.auth import default as google_auth_default | |
| from google.cloud import bigquery | |
| def _utc_now() -> datetime.datetime: | |
| return datetime.datetime.now(datetime.UTC) | |
| def _iso(dt: datetime.datetime) -> str: | |
| return dt.astimezone(datetime.UTC).isoformat() | |
| def _bool_lower(value: bool) -> str: | |
| return "true" if value else "false" | |
| def _adc_identity() -> tuple[str | None, str, str | None]: | |
| creds, project_id = google_auth_default( | |
| scopes=["https://www.googleapis.com/auth/cloud-platform"] | |
| ) | |
| email = getattr(creds, "service_account_email", None) | |
| return project_id, type(creds).__name__, email | |
| def _bq_client() -> bigquery.Client: | |
| creds, _ = google_auth_default(scopes=["https://www.googleapis.com/auth/cloud-platform"]) | |
| # Billing project defaults to ADC project if available; override via env if needed. | |
| return bigquery.Client(credentials=creds) | |
| def _query_one_episode( | |
| *, | |
| collection_path: str, | |
| start_time: datetime.datetime, | |
| end_time: datetime.datetime, | |
| ) -> dict[str, Any] | None: | |
| sql = """ | |
| SELECT | |
| timestamp, | |
| trace_event_id, | |
| instance_id, | |
| component_tag, | |
| collection_path | |
| FROM `reflectionai-data.events.life_of_a_trace_episode_started` | |
| WHERE timestamp >= @start_time | |
| AND timestamp <= @end_time | |
| AND collection_path = @collection_path | |
| ORDER BY timestamp DESC | |
| LIMIT 1 | |
| """ | |
| params = [ | |
| bigquery.ScalarQueryParameter("start_time", "TIMESTAMP", start_time), | |
| bigquery.ScalarQueryParameter("end_time", "TIMESTAMP", end_time), | |
| bigquery.ScalarQueryParameter("collection_path", "STRING", collection_path), | |
| ] | |
| job = _bq_client().query(sql, job_config=bigquery.QueryJobConfig(query_parameters=params)) | |
| rows = list(job.result()) | |
| if not rows: | |
| return None | |
| r = rows[0] | |
| return dict( | |
| timestamp=r.timestamp, | |
| trace_event_id=r.trace_event_id, | |
| instance_id=r.instance_id, | |
| component_tag=r.component_tag, | |
| collection_path=r.collection_path, | |
| ) | |
| def _query_one_reasoning_joined_to_episode( | |
| *, | |
| collection_path: str, | |
| start_time: datetime.datetime, | |
| end_time: datetime.datetime, | |
| ) -> dict[str, Any] | None: | |
| sql = """ | |
| WITH reasoning AS ( | |
| SELECT | |
| timestamp, | |
| trace_event_id, | |
| xid, | |
| agent_step, | |
| message_index, | |
| text, | |
| num_bytes, | |
| num_chars, | |
| num_lines, | |
| truncated, | |
| instance_id, | |
| component_tag | |
| FROM `reflectionai-data.events.life_of_a_trace_reasoning` | |
| WHERE timestamp >= @start_time | |
| AND timestamp <= @end_time | |
| ), | |
| episodes AS ( | |
| SELECT trace_event_id, collection_path | |
| FROM `reflectionai-data.events.life_of_a_trace_episode_started` | |
| WHERE timestamp >= @start_time | |
| AND timestamp <= @end_time | |
| AND collection_path = @collection_path | |
| ) | |
| SELECT | |
| r.timestamp, | |
| r.trace_event_id, | |
| r.xid, | |
| r.agent_step, | |
| r.message_index, | |
| r.text, | |
| r.num_bytes, | |
| r.num_chars, | |
| r.num_lines, | |
| r.truncated, | |
| r.instance_id, | |
| r.component_tag, | |
| e.collection_path | |
| FROM reasoning r | |
| JOIN episodes e | |
| USING (trace_event_id) | |
| ORDER BY r.timestamp DESC | |
| LIMIT 1 | |
| """ | |
| params = [ | |
| bigquery.ScalarQueryParameter("start_time", "TIMESTAMP", start_time), | |
| bigquery.ScalarQueryParameter("end_time", "TIMESTAMP", end_time), | |
| bigquery.ScalarQueryParameter("collection_path", "STRING", collection_path), | |
| ] | |
| job = _bq_client().query(sql, job_config=bigquery.QueryJobConfig(query_parameters=params)) | |
| rows = list(job.result()) | |
| if not rows: | |
| return None | |
| r = rows[0] | |
| return dict( | |
| timestamp=r.timestamp, | |
| trace_event_id=r.trace_event_id, | |
| xid=r.xid, | |
| agent_step=r.agent_step, | |
| message_index=r.message_index, | |
| text=r.text, | |
| num_bytes=r.num_bytes, | |
| num_chars=r.num_chars, | |
| num_lines=r.num_lines, | |
| truncated=r.truncated, | |
| instance_id=r.instance_id, | |
| component_tag=r.component_tag, | |
| collection_path=r.collection_path, | |
| ) | |
| def _max_timestamp(table: str) -> datetime.datetime | None: | |
| sql = f"SELECT MAX(timestamp) AS max_ts FROM `{table}`" | |
| job = _bq_client().query(sql) | |
| rows = list(job.result()) | |
| if not rows: | |
| return None | |
| return rows[0].max_ts | |
| def main() -> None: | |
| parser = argparse.ArgumentParser( | |
| description="Pretty-print episode_started + reasoning telemetry from BigQuery." | |
| ) | |
| parser.add_argument("--collection_path", required=True) | |
| parser.add_argument("--minutes", type=int, default=120) | |
| args = parser.parse_args() | |
| end_time = _utc_now() | |
| start_time = end_time - datetime.timedelta(minutes=args.minutes) | |
| minutes_label = int((end_time - start_time).total_seconds() // 60) | |
| project_id, cred_type, email = _adc_identity() | |
| print("=== Auth ===") | |
| print("adc_project_id:", project_id) | |
| print("adc_credentials_type:", cred_type) | |
| print("adc_email:", email) | |
| print( | |
| f"=== BigQuery: episode_started (last ~{minutes_label} min; {_iso(start_time)} to {_iso(end_time)}) ===" | |
| ) | |
| episode = _query_one_episode( | |
| collection_path=args.collection_path, start_time=start_time, end_time=end_time | |
| ) | |
| if episode is None: | |
| print("rows: 0") | |
| else: | |
| print("rows: 1") | |
| print("timestamp: ", _iso(episode["timestamp"])) | |
| print("trace_event_id: ", episode["trace_event_id"]) | |
| print("instance_id: ", episode["instance_id"]) | |
| print("component_tag: ", episode["component_tag"]) | |
| print("collection_path: ", episode["collection_path"]) | |
| print( | |
| f"\n=== BigQuery: reasoning events joined to episodes (last ~{minutes_label} min; {_iso(start_time)} to {_iso(end_time)}) ===" | |
| ) | |
| reasoning = _query_one_reasoning_joined_to_episode( | |
| collection_path=args.collection_path, start_time=start_time, end_time=end_time | |
| ) | |
| if reasoning is None: | |
| print("rows: 0") | |
| else: | |
| print("rows: 1") | |
| print("timestamp: ", _iso(reasoning["timestamp"])) | |
| print("trace_event_id: ", reasoning["trace_event_id"]) | |
| print("xid: ", reasoning["xid"]) | |
| print("agent_step: ", reasoning["agent_step"]) | |
| print("message_index: ", reasoning["message_index"]) | |
| print("num_bytes: ", reasoning["num_bytes"]) | |
| print("num_chars: ", reasoning["num_chars"]) | |
| print("num_lines: ", reasoning["num_lines"]) | |
| print("truncated: ", _bool_lower(bool(reasoning["truncated"]))) | |
| print("instance_id: ", reasoning["instance_id"]) | |
| print("component_tag: ", reasoning["component_tag"]) | |
| print("collection_path: ", reasoning["collection_path"]) | |
| text = (reasoning["text"] or "").replace("\n", "\\n") | |
| prefix = text[:120] | |
| if bool(reasoning["truncated"]): | |
| print('text_prefix: "', prefix, '" (truncated)', sep="") | |
| else: | |
| print('text_prefix: "', prefix, '"', sep="") | |
| print("\n=== BigQuery: table freshness ===") | |
| max_ts = _max_timestamp("reflectionai-data.events.life_of_a_trace_reasoning") | |
| print("life_of_a_trace_reasoning max timestamp:", None if max_ts is None else _iso(max_ts)) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment