Skip to content

Instantly share code, notes, and snippets.

@ethanabrooks
Created January 8, 2026 21:27
Show Gist options
  • Select an option

  • Save ethanabrooks/e8d0c3245ab760ef864849beee23d2e7 to your computer and use it in GitHub Desktop.

Select an option

Save ethanabrooks/e8d0c3245ab760ef864849beee23d2e7 to your computer and use it in GitHub Desktop.
from __future__ import annotations
import argparse
import datetime
from typing import Any
from google.auth import default as google_auth_default
from google.cloud import bigquery
def _utc_now() -> datetime.datetime:
return datetime.datetime.now(datetime.UTC)
def _iso(dt: datetime.datetime) -> str:
return dt.astimezone(datetime.UTC).isoformat()
def _bool_lower(value: bool) -> str:
return "true" if value else "false"
def _adc_identity() -> tuple[str | None, str, str | None]:
creds, project_id = google_auth_default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
email = getattr(creds, "service_account_email", None)
return project_id, type(creds).__name__, email
def _bq_client() -> bigquery.Client:
creds, _ = google_auth_default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
# Billing project defaults to ADC project if available; override via env if needed.
return bigquery.Client(credentials=creds)
def _query_one_episode(
*,
collection_path: str,
start_time: datetime.datetime,
end_time: datetime.datetime,
) -> dict[str, Any] | None:
sql = """
SELECT
timestamp,
trace_event_id,
instance_id,
component_tag,
collection_path
FROM `reflectionai-data.events.life_of_a_trace_episode_started`
WHERE timestamp >= @start_time
AND timestamp <= @end_time
AND collection_path = @collection_path
ORDER BY timestamp DESC
LIMIT 1
"""
params = [
bigquery.ScalarQueryParameter("start_time", "TIMESTAMP", start_time),
bigquery.ScalarQueryParameter("end_time", "TIMESTAMP", end_time),
bigquery.ScalarQueryParameter("collection_path", "STRING", collection_path),
]
job = _bq_client().query(sql, job_config=bigquery.QueryJobConfig(query_parameters=params))
rows = list(job.result())
if not rows:
return None
r = rows[0]
return dict(
timestamp=r.timestamp,
trace_event_id=r.trace_event_id,
instance_id=r.instance_id,
component_tag=r.component_tag,
collection_path=r.collection_path,
)
def _query_one_reasoning_joined_to_episode(
*,
collection_path: str,
start_time: datetime.datetime,
end_time: datetime.datetime,
) -> dict[str, Any] | None:
sql = """
WITH reasoning AS (
SELECT
timestamp,
trace_event_id,
xid,
agent_step,
message_index,
text,
num_bytes,
num_chars,
num_lines,
truncated,
instance_id,
component_tag
FROM `reflectionai-data.events.life_of_a_trace_reasoning`
WHERE timestamp >= @start_time
AND timestamp <= @end_time
),
episodes AS (
SELECT trace_event_id, collection_path
FROM `reflectionai-data.events.life_of_a_trace_episode_started`
WHERE timestamp >= @start_time
AND timestamp <= @end_time
AND collection_path = @collection_path
)
SELECT
r.timestamp,
r.trace_event_id,
r.xid,
r.agent_step,
r.message_index,
r.text,
r.num_bytes,
r.num_chars,
r.num_lines,
r.truncated,
r.instance_id,
r.component_tag,
e.collection_path
FROM reasoning r
JOIN episodes e
USING (trace_event_id)
ORDER BY r.timestamp DESC
LIMIT 1
"""
params = [
bigquery.ScalarQueryParameter("start_time", "TIMESTAMP", start_time),
bigquery.ScalarQueryParameter("end_time", "TIMESTAMP", end_time),
bigquery.ScalarQueryParameter("collection_path", "STRING", collection_path),
]
job = _bq_client().query(sql, job_config=bigquery.QueryJobConfig(query_parameters=params))
rows = list(job.result())
if not rows:
return None
r = rows[0]
return dict(
timestamp=r.timestamp,
trace_event_id=r.trace_event_id,
xid=r.xid,
agent_step=r.agent_step,
message_index=r.message_index,
text=r.text,
num_bytes=r.num_bytes,
num_chars=r.num_chars,
num_lines=r.num_lines,
truncated=r.truncated,
instance_id=r.instance_id,
component_tag=r.component_tag,
collection_path=r.collection_path,
)
def _max_timestamp(table: str) -> datetime.datetime | None:
sql = f"SELECT MAX(timestamp) AS max_ts FROM `{table}`"
job = _bq_client().query(sql)
rows = list(job.result())
if not rows:
return None
return rows[0].max_ts
def main() -> None:
parser = argparse.ArgumentParser(
description="Pretty-print episode_started + reasoning telemetry from BigQuery."
)
parser.add_argument("--collection_path", required=True)
parser.add_argument("--minutes", type=int, default=120)
args = parser.parse_args()
end_time = _utc_now()
start_time = end_time - datetime.timedelta(minutes=args.minutes)
minutes_label = int((end_time - start_time).total_seconds() // 60)
project_id, cred_type, email = _adc_identity()
print("=== Auth ===")
print("adc_project_id:", project_id)
print("adc_credentials_type:", cred_type)
print("adc_email:", email)
print(
f"=== BigQuery: episode_started (last ~{minutes_label} min; {_iso(start_time)} to {_iso(end_time)}) ==="
)
episode = _query_one_episode(
collection_path=args.collection_path, start_time=start_time, end_time=end_time
)
if episode is None:
print("rows: 0")
else:
print("rows: 1")
print("timestamp: ", _iso(episode["timestamp"]))
print("trace_event_id: ", episode["trace_event_id"])
print("instance_id: ", episode["instance_id"])
print("component_tag: ", episode["component_tag"])
print("collection_path: ", episode["collection_path"])
print(
f"\n=== BigQuery: reasoning events joined to episodes (last ~{minutes_label} min; {_iso(start_time)} to {_iso(end_time)}) ==="
)
reasoning = _query_one_reasoning_joined_to_episode(
collection_path=args.collection_path, start_time=start_time, end_time=end_time
)
if reasoning is None:
print("rows: 0")
else:
print("rows: 1")
print("timestamp: ", _iso(reasoning["timestamp"]))
print("trace_event_id: ", reasoning["trace_event_id"])
print("xid: ", reasoning["xid"])
print("agent_step: ", reasoning["agent_step"])
print("message_index: ", reasoning["message_index"])
print("num_bytes: ", reasoning["num_bytes"])
print("num_chars: ", reasoning["num_chars"])
print("num_lines: ", reasoning["num_lines"])
print("truncated: ", _bool_lower(bool(reasoning["truncated"])))
print("instance_id: ", reasoning["instance_id"])
print("component_tag: ", reasoning["component_tag"])
print("collection_path: ", reasoning["collection_path"])
text = (reasoning["text"] or "").replace("\n", "\\n")
prefix = text[:120]
if bool(reasoning["truncated"]):
print('text_prefix: "', prefix, '" (truncated)', sep="")
else:
print('text_prefix: "', prefix, '"', sep="")
print("\n=== BigQuery: table freshness ===")
max_ts = _max_timestamp("reflectionai-data.events.life_of_a_trace_reasoning")
print("life_of_a_trace_reasoning max timestamp:", None if max_ts is None else _iso(max_ts))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment