Created
February 21, 2026 20:00
-
-
Save felix-ht/7c33764b406c9e3b4432a29ac523b510 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Reasoning test: compare how different OpenRouter models handle reasoning tokens. | |
| Tests per model: | |
| 1. Plain completion + reasoning enabled | |
| 2. Plain completion (baseline, no reasoning param) | |
| 3. Streaming + reasoning enabled | |
| 4. Tool calling + reasoning enabled | |
| 5. JSON schema structured output + reasoning enabled | |
| 6. Schema-less JSON output + reasoning enabled | |
| 7. Reasoning with exclude=true (internal-only reasoning) | |
| 8. Tiny output baseline (no reasoning) | |
| 9. Tiny output + reasoning | |
| Checks: | |
| - Does the model fill message.reasoning? | |
| - Does the model fill message.reasoning_details? | |
| - Are reasoning_tokens reported in usage (including nested paths)? | |
| - Does reasoning survive tool calls, JSON schema, and schema-less modes? | |
| - Does exclude=true still show reasoning_tokens in usage? | |
| - Is there a gap between completion_tokens and visible output tokens? | |
| Uses httpx directly for raw field inspection — no SDK abstraction hiding fields. | |
| Run: | |
| uv run python scripts/reasoning_test.py | |
| uv run python scripts/reasoning_test.py anthropic/claude-sonnet-4-5 deepseek/deepseek-r1 | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import os | |
| import sys | |
| import time | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| import httpx | |
| import tiktoken | |
| _tokenizer = tiktoken.get_encoding("cl100k_base") | |
| API_KEY = os.environ.get("OPENROUTER_API_KEY", "") | |
| BASE_URL = "https://openrouter.ai/api/v1" | |
| MODELS = [ | |
| "z-ai/glm-5", | |
| "moonshotai/kimi-k2.5", | |
| "minimax/minimax-m2.5", | |
| "anthropic/claude-sonnet-4.6", | |
| "openai/gpt-5.2", | |
| ] | |
| MATH_PROMPT = "What is 27 * 453? Think carefully before answering." | |
| TOOL_PROMPT = "What's the weather in Tokyo right now?" | |
| SENTIMENT_PROMPT = "Analyze the sentiment of: 'I love sunny days but hate the rain.'" | |
| TINY_PROMPT = "Answer with exactly the single character 'X' and nothing else." | |
| WEATHER_TOOLS = [ | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "get_weather", | |
| "description": "Get current weather for a location", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "location": {"type": "string", "description": "City name"}, | |
| }, | |
| "required": ["location"], | |
| "additionalProperties": False, | |
| }, | |
| }, | |
| } | |
| ] | |
| SENTIMENT_SCHEMA = { | |
| "name": "SentimentAnalysis", | |
| "strict": True, | |
| "schema": { | |
| "type": "object", | |
| "properties": { | |
| "sentiment": {"type": "string", "enum": ["positive", "negative", "mixed"]}, | |
| "confidence": {"type": "number"}, | |
| "explanation": {"type": "string"}, | |
| }, | |
| "required": ["sentiment", "confidence", "explanation"], | |
| "additionalProperties": False, | |
| }, | |
| } | |
| REASONING_CFG = {"effort": "medium"} | |
| DIVIDER = "=" * 70 | |
| THIN = "-" * 70 | |
| def get_reasoning_tokens_from_usage(usage: dict) -> int: | |
| # a) usage["reasoning_tokens"] | |
| if "reasoning_tokens" in usage: | |
| val = usage["reasoning_tokens"] | |
| if isinstance(val, (int, float)): | |
| return int(val) | |
| # b-d) known nested detail dicts, in priority order | |
| for key in ( | |
| "output_tokens_details", | |
| "completion_tokens_details", | |
| "prompt_tokens_details", | |
| ): | |
| nested = usage.get(key) | |
| if isinstance(nested, dict) and "reasoning_tokens" in nested: | |
| val = nested["reasoning_tokens"] | |
| if isinstance(val, (int, float)): | |
| return int(val) | |
| # e) any other nested dict with reasoning_tokens | |
| for v in usage.values(): | |
| if isinstance(v, dict) and "reasoning_tokens" in v: | |
| val = v["reasoning_tokens"] | |
| if isinstance(val, (int, float)): | |
| return int(val) | |
| return 0 | |
| def count_tokens(text: str) -> int: | |
| return len(_tokenizer.encode(text)) | |
| def extract_visible_text(r: TestResult) -> str: | |
| parts: list[str] = [] | |
| if r.content: | |
| parts.append(r.content) | |
| if r.reasoning: | |
| parts.append(r.reasoning) | |
| if r.reasoning_details: | |
| for detail in r.reasoning_details: | |
| if not isinstance(detail, dict): | |
| continue | |
| for key in ("thinking", "content", "text", "summary"): | |
| val = detail.get(key) | |
| if val and isinstance(val, str): | |
| parts.append(val) | |
| break | |
| return "\n".join(parts) | |
| def _fill_token_metrics(r: TestResult) -> None: | |
| if r.error: | |
| return | |
| visible = extract_visible_text(r) | |
| r.visible_tokens = count_tokens(visible) | |
| r.completion_minus_visible = max( | |
| 0, r.completion_tokens - r.visible_tokens | |
| ) | |
| @dataclass | |
| class TestResult: | |
| model: str | |
| test_name: str | |
| content: str = "" | |
| reasoning: str | None = None | |
| reasoning_details: list | None = None | |
| reasoning_tokens: int = 0 | |
| completion_tokens: int = 0 | |
| prompt_tokens: int = 0 | |
| tool_calls: list | None = None | |
| finish_reason: str = "" | |
| elapsed: float = 0.0 | |
| error: str | None = None | |
| visible_tokens: int = 0 | |
| completion_minus_visible: int = 0 | |
| raw_message: dict = field(default_factory=dict) | |
| raw_usage: dict = field(default_factory=dict) | |
| raw_top_level: dict = field(default_factory=dict) | |
| def _headers() -> dict: | |
| return { | |
| "Authorization": f"Bearer {API_KEY}", | |
| "Content-Type": "application/json", | |
| } | |
| async def call_non_streaming( | |
| client: httpx.AsyncClient, | |
| model: str, | |
| messages: list[dict], | |
| *, | |
| reasoning: dict | None = None, | |
| tools: list | None = None, | |
| tool_choice: dict | None = None, | |
| response_format: dict | None = None, | |
| max_tokens: int | None = None, | |
| temperature: float | None = None, | |
| ) -> dict: | |
| body: dict = {"model": model, "messages": messages} | |
| if reasoning is not None: | |
| body["reasoning"] = reasoning | |
| if tools: | |
| body["tools"] = tools | |
| if tool_choice: | |
| body["tool_choice"] = tool_choice | |
| if response_format: | |
| body["response_format"] = response_format | |
| if max_tokens is not None: | |
| body["max_tokens"] = max_tokens | |
| if temperature is not None: | |
| body["temperature"] = temperature | |
| resp = await client.post( | |
| f"{BASE_URL}/chat/completions", | |
| json=body, | |
| headers=_headers(), | |
| timeout=120, | |
| ) | |
| return resp.json() | |
| async def call_streaming( | |
| client: httpx.AsyncClient, | |
| model: str, | |
| messages: list[dict], | |
| *, | |
| reasoning: dict | None = None, | |
| max_tokens: int | None = None, | |
| temperature: float | None = None, | |
| ) -> dict: | |
| body: dict = { | |
| "model": model, | |
| "messages": messages, | |
| "stream": True, | |
| "stream_options": {"include_usage": True}, | |
| } | |
| if reasoning is not None: | |
| body["reasoning"] = reasoning | |
| if max_tokens is not None: | |
| body["max_tokens"] = max_tokens | |
| if temperature is not None: | |
| body["temperature"] = temperature | |
| collected: dict = { | |
| "content": "", | |
| "reasoning": "", | |
| "reasoning_details": [], | |
| "tool_calls": [], | |
| "usage": {}, | |
| "finish_reason": "", | |
| "streamed_reasoning_chunks": 0, | |
| "streamed_content_chunks": 0, | |
| } | |
| async with client.stream( | |
| "POST", | |
| f"{BASE_URL}/chat/completions", | |
| json=body, | |
| headers=_headers(), | |
| timeout=120, | |
| ) as resp: | |
| async for line in resp.aiter_lines(): | |
| if not line.startswith("data: "): | |
| continue | |
| data = line[6:] | |
| if data.strip() == "[DONE]": | |
| break | |
| try: | |
| chunk = json.loads(data) | |
| except json.JSONDecodeError: | |
| continue | |
| if chunk.get("usage"): | |
| collected["usage"] = chunk["usage"] | |
| choices = chunk.get("choices", []) | |
| if not choices: | |
| continue | |
| delta = choices[0].get("delta", {}) | |
| if delta.get("content"): | |
| collected["content"] += delta["content"] | |
| collected["streamed_content_chunks"] += 1 | |
| if delta.get("reasoning"): | |
| collected["reasoning"] += delta["reasoning"] | |
| collected["streamed_reasoning_chunks"] += 1 | |
| if delta.get("reasoning_details"): | |
| collected["reasoning_details"].extend(delta["reasoning_details"]) | |
| if choices[0].get("finish_reason"): | |
| collected["finish_reason"] = choices[0]["finish_reason"] | |
| return collected | |
| def extract_result( | |
| model: str, | |
| test_name: str, | |
| data: dict, | |
| *, | |
| streamed: bool = False, | |
| elapsed: float = 0.0, | |
| ) -> TestResult: | |
| if streamed: | |
| usage = data.get("usage", {}) | |
| return TestResult( | |
| model=model, | |
| test_name=test_name, | |
| content=data.get("content", ""), | |
| reasoning=data.get("reasoning") or None, | |
| reasoning_details=data.get("reasoning_details") or None, | |
| reasoning_tokens=get_reasoning_tokens_from_usage(usage), | |
| completion_tokens=usage.get("completion_tokens", 0) or 0, | |
| prompt_tokens=usage.get("prompt_tokens", 0) or 0, | |
| finish_reason=data.get("finish_reason", ""), | |
| elapsed=elapsed, | |
| raw_message=data, | |
| raw_usage=usage, | |
| ) | |
| if "error" in data: | |
| return TestResult( | |
| model=model, test_name=test_name, | |
| error=json.dumps(data["error"]), elapsed=elapsed, | |
| ) | |
| choice = data.get("choices", [{}])[0] | |
| msg = choice.get("message", {}) | |
| usage = data.get("usage", {}) | |
| top_level = {k: data[k] for k in ("id", "model", "created") if k in data} | |
| return TestResult( | |
| model=model, | |
| test_name=test_name, | |
| content=msg.get("content", "") or "", | |
| reasoning=msg.get("reasoning"), | |
| reasoning_details=msg.get("reasoning_details"), | |
| reasoning_tokens=get_reasoning_tokens_from_usage(usage), | |
| completion_tokens=usage.get("completion_tokens", 0) or 0, | |
| prompt_tokens=usage.get("prompt_tokens", 0) or 0, | |
| tool_calls=msg.get("tool_calls"), | |
| finish_reason=choice.get("finish_reason", ""), | |
| elapsed=elapsed, | |
| raw_message=msg, | |
| raw_usage=usage, | |
| raw_top_level=top_level, | |
| ) | |
| def print_result(r: TestResult) -> None: | |
| status = "ERROR" if r.error else "OK" | |
| print(f" [{status}] {r.test_name} ({r.elapsed:.1f}s)") | |
| if r.error: | |
| print(f" Error: {r.error[:200]}") | |
| return | |
| has_reasoning = bool(r.reasoning) | |
| has_details = bool(r.reasoning_details) | |
| has_r_tokens = r.reasoning_tokens > 0 | |
| print(f" Content: {len(r.content)} chars") | |
| r_field = f"YES ({len(r.reasoning)} chars)" if has_reasoning else "NO" | |
| print(f" Reasoning field: {r_field}") | |
| r_det = f"YES ({len(r.reasoning_details)} items)" if has_details else "NO" | |
| print(f" Reasoning details: {r_det}") | |
| r_tok = str(r.reasoning_tokens) if has_r_tokens else "NOT REPORTED" | |
| print(f" Reasoning tokens: {r_tok}") | |
| print(f" Completion tokens: {r.completion_tokens}") | |
| print(f" Visible tokens: {r.visible_tokens}") | |
| print(f" Completion - visible: {r.completion_minus_visible}") | |
| print(f" Finish reason: {r.finish_reason}") | |
| if r.tool_calls: | |
| print(f" Tool calls: {len(r.tool_calls)}") | |
| for tc in r.tool_calls: | |
| fn = tc.get("function", {}) | |
| print(f" -> {fn.get('name', '?')}({fn.get('arguments', '')[:80]})") | |
| if has_reasoning: | |
| preview = r.reasoning[:200].replace("\n", " ") | |
| print(f" Reasoning preview: {preview}...") | |
| if has_details: | |
| for i, detail in enumerate(r.reasoning_details[:2]): | |
| dtype = detail.get("type", "?") | |
| thinking = str(detail.get("thinking", detail.get("content", "")))[:120] | |
| print(f" Detail[{i}] ({dtype}): {thinking.replace(chr(10), ' ')}...") | |
| async def run_test( | |
| client: httpx.AsyncClient, | |
| model: str, | |
| test_name: str, | |
| messages: list[dict], | |
| *, | |
| reasoning: dict | None = None, | |
| tools: list | None = None, | |
| tool_choice: dict | None = None, | |
| response_format: dict | None = None, | |
| stream: bool = False, | |
| max_tokens: int | None = None, | |
| temperature: float | None = None, | |
| ) -> TestResult: | |
| t0 = time.perf_counter() | |
| try: | |
| if stream: | |
| data = await call_streaming( | |
| client, model, messages, | |
| reasoning=reasoning, | |
| max_tokens=max_tokens, | |
| temperature=temperature, | |
| ) | |
| elapsed = time.perf_counter() - t0 | |
| result = extract_result( | |
| model, test_name, data, streamed=True, elapsed=elapsed, | |
| ) | |
| else: | |
| data = await call_non_streaming( | |
| client, model, messages, | |
| reasoning=reasoning, | |
| tools=tools, | |
| tool_choice=tool_choice, | |
| response_format=response_format, | |
| max_tokens=max_tokens, | |
| temperature=temperature, | |
| ) | |
| elapsed = time.perf_counter() - t0 | |
| result = extract_result(model, test_name, data, elapsed=elapsed) | |
| except Exception as e: | |
| elapsed = time.perf_counter() - t0 | |
| result = TestResult(model=model, test_name=test_name, error=str(e), elapsed=elapsed) | |
| _fill_token_metrics(result) | |
| return result | |
| async def test_model(client: httpx.AsyncClient, model: str) -> list[TestResult]: | |
| print(f"\n{DIVIDER}") | |
| print(f" MODEL: {model}") | |
| print(DIVIDER) | |
| results: list[TestResult] = [] | |
| math_msgs = [{"role": "user", "content": MATH_PROMPT}] | |
| tool_msgs = [{"role": "user", "content": TOOL_PROMPT}] | |
| sent_msgs = [{"role": "user", "content": SENTIMENT_PROMPT}] | |
| sent_json_msgs = [ | |
| { | |
| "role": "user", | |
| "content": ( | |
| f"{SENTIMENT_PROMPT}\n\n" | |
| "Respond in JSON with keys: sentiment, confidence, explanation" | |
| ), | |
| } | |
| ] | |
| tiny_msgs = [{"role": "user", "content": TINY_PROMPT}] | |
| # 1. Plain + reasoning | |
| print(f"\n{THIN}") | |
| print(" Test 1: Plain completion + reasoning") | |
| r = await run_test( | |
| client, model, "plain+reasoning", math_msgs, reasoning=REASONING_CFG, | |
| ) | |
| results.append(r) | |
| print_result(r) | |
| # 2. Plain (baseline) | |
| print(f"\n{THIN}") | |
| print(" Test 2: Plain completion (no reasoning param)") | |
| r = await run_test(client, model, "plain_baseline", math_msgs) | |
| results.append(r) | |
| print_result(r) | |
| # 3. Streaming + reasoning | |
| print(f"\n{THIN}") | |
| print(" Test 3: Streaming + reasoning") | |
| r = await run_test( | |
| client, model, "stream+reasoning", math_msgs, | |
| reasoning=REASONING_CFG, stream=True, | |
| ) | |
| results.append(r) | |
| print_result(r) | |
| if r.raw_message.get("streamed_reasoning_chunks"): | |
| print(f" Streamed reasoning chunks: {r.raw_message['streamed_reasoning_chunks']}") | |
| if r.raw_message.get("streamed_content_chunks"): | |
| print(f" Streamed content chunks: {r.raw_message['streamed_content_chunks']}") | |
| # 4. Tool calling + reasoning | |
| print(f"\n{THIN}") | |
| print(" Test 4: Tool calling + reasoning") | |
| r = await run_test( | |
| client, model, "tools+reasoning", tool_msgs, | |
| reasoning=REASONING_CFG, tools=WEATHER_TOOLS, | |
| ) | |
| results.append(r) | |
| print_result(r) | |
| # 5. JSON schema + reasoning | |
| print(f"\n{THIN}") | |
| print(" Test 5: JSON schema structured output + reasoning") | |
| r = await run_test( | |
| client, model, "json_schema+reasoning", sent_msgs, | |
| reasoning=REASONING_CFG, | |
| response_format={"type": "json_schema", "json_schema": SENTIMENT_SCHEMA}, | |
| ) | |
| results.append(r) | |
| print_result(r) | |
| # 6. Schema-less JSON + reasoning | |
| print(f"\n{THIN}") | |
| print(" Test 6: Schema-less JSON (json_object) + reasoning") | |
| r = await run_test( | |
| client, model, "json_object+reasoning", sent_json_msgs, | |
| reasoning=REASONING_CFG, | |
| response_format={"type": "json_object"}, | |
| ) | |
| results.append(r) | |
| print_result(r) | |
| # 7. Reasoning with exclude=true | |
| print(f"\n{THIN}") | |
| print(" Test 7: Reasoning with exclude=true (internal-only)") | |
| r = await run_test( | |
| client, model, "reasoning_excluded", math_msgs, | |
| reasoning={"effort": "medium", "exclude": True}, | |
| ) | |
| results.append(r) | |
| print_result(r) | |
| if not r.error: | |
| had_internal = r.reasoning_tokens > 0 | |
| leaked = bool(r.reasoning or r.reasoning_details) | |
| tag_i = "YES" if had_internal else "NO" | |
| tag_l = "YES (bug?)" if leaked else "NO (correct)" | |
| print(f" Internal reasoning (via tokens): {tag_i}") | |
| print(f" Leaked to response: {tag_l}") | |
| # 8. Tiny baseline | |
| print(f"\n{THIN}") | |
| print(" Test 8: Tiny output baseline (no reasoning)") | |
| r = await run_test( | |
| client, model, "tiny_baseline", tiny_msgs, | |
| max_tokens=5, temperature=0, | |
| ) | |
| results.append(r) | |
| print_result(r) | |
| # 9. Tiny + reasoning | |
| print(f"\n{THIN}") | |
| print(" Test 9: Tiny output + reasoning") | |
| r = await run_test( | |
| client, model, "tiny+reasoning", tiny_msgs, | |
| reasoning=REASONING_CFG, max_tokens=5, temperature=0, | |
| ) | |
| results.append(r) | |
| print_result(r) | |
| return results | |
| def print_summary(all_results: dict[str, list[TestResult]]) -> None: | |
| print(f"\n\n{DIVIDER}") | |
| print(" SUMMARY: Reasoning field presence across models and modes") | |
| print(DIVIDER) | |
| print( | |
| f"\n {'Model':<35} {'Test':<25} " | |
| f"{'R.Field':<9} {'R.Details':<10} {'R.Tokens':<10} " | |
| f"{'Gap':<8} {'Time':<7} {'Status'}" | |
| ) | |
| print(f" {'-' * 115}") | |
| for model, results in all_results.items(): | |
| for r in results: | |
| if r.error: | |
| print( | |
| f" {model:<35} {r.test_name:<25} " | |
| f"{'—':<9} {'—':<10} {'—':<10} " | |
| f"{'—':<8} {r.elapsed:.1f}s ERROR" | |
| ) | |
| continue | |
| r_field = "YES" if r.reasoning else "no" | |
| r_details = "YES" if r.reasoning_details else "no" | |
| r_tokens = ( | |
| str(r.reasoning_tokens) if r.reasoning_tokens > 0 else "n/r" | |
| ) | |
| gap = str(r.completion_minus_visible) | |
| print( | |
| f" {model:<35} {r.test_name:<25} " | |
| f"{r_field:<9} {r_details:<10} {r_tokens:<10} " | |
| f"{gap:<8} {r.elapsed:.1f}s OK" | |
| ) | |
| print() | |
| # Condensed per-model verdict | |
| print(f"\n {'Model':<35} {'Verdict'}") | |
| print(f" {'-' * 70}") | |
| for model, results in all_results.items(): | |
| ok_results = [r for r in results if not r.error] | |
| if not ok_results: | |
| print(f" {model:<35} ALL ERRORS") | |
| continue | |
| reasoning_results = [ | |
| r | |
| for r in ok_results | |
| if "reasoning" in r.test_name | |
| and "baseline" not in r.test_name | |
| and "excluded" not in r.test_name | |
| ] | |
| fills_field = any(r.reasoning for r in reasoning_results) | |
| fills_details = any( | |
| r.reasoning_details for r in reasoning_results | |
| ) | |
| reports_tokens = any( | |
| r.reasoning_tokens > 0 for r in reasoning_results | |
| ) | |
| def _has_reasoning(r: TestResult) -> bool: | |
| return bool( | |
| r.reasoning or r.reasoning_details | |
| or r.reasoning_tokens > 0 | |
| ) | |
| works_with_tools = any( | |
| r.test_name == "tools+reasoning" | |
| and not r.error and _has_reasoning(r) | |
| for r in ok_results | |
| ) | |
| works_with_schema = any( | |
| r.test_name == "json_schema+reasoning" | |
| and not r.error and _has_reasoning(r) | |
| for r in ok_results | |
| ) | |
| excluded = next( | |
| (r for r in ok_results if r.test_name == "reasoning_excluded"), | |
| None, | |
| ) | |
| exclude_works = ( | |
| excluded | |
| and excluded.reasoning_tokens > 0 | |
| and not excluded.reasoning | |
| and not excluded.reasoning_details | |
| ) | |
| parts: list[str] = [] | |
| if fills_field: | |
| parts.append("fills reasoning field") | |
| if fills_details: | |
| parts.append("fills reasoning_details") | |
| if reports_tokens: | |
| parts.append("reports tokens") | |
| if not fills_field and not fills_details and not reports_tokens: | |
| parts.append("NO reasoning output detected") | |
| if works_with_tools: | |
| parts.append("+tools") | |
| if works_with_schema: | |
| parts.append("+schema") | |
| if exclude_works: | |
| parts.append("exclude works") | |
| print(f" {model:<35} {', '.join(parts)}") | |
| async def main() -> None: | |
| if not API_KEY: | |
| print("ERROR: Set OPENROUTER_API_KEY in .env or environment") | |
| sys.exit(1) | |
| models = sys.argv[1:] if len(sys.argv) > 1 else MODELS | |
| print("Reasoning Test — OpenRouter Model Comparison") | |
| print(f"Models: {', '.join(models)}") | |
| print(f"API key: {API_KEY[:8]}...{API_KEY[-4:]}") | |
| all_results: dict[str, list[TestResult]] = {} | |
| async with httpx.AsyncClient() as client: | |
| for model in models: | |
| try: | |
| results = await test_model(client, model) | |
| all_results[model] = results | |
| except Exception as e: | |
| print(f"\n FATAL for {model}: {e}") | |
| all_results[model] = [ | |
| TestResult(model=model, test_name="fatal", error=str(e)) | |
| ] | |
| print_summary(all_results) | |
| out = Path("reasoning_test_results.json") | |
| raw: dict = {} | |
| for model, results in all_results.items(): | |
| raw[model] = [ | |
| { | |
| "test": r.test_name, | |
| "content_len": len(r.content), | |
| "content_preview": r.content[:200], | |
| "reasoning_len": len(r.reasoning) if r.reasoning else 0, | |
| "reasoning_preview": (r.reasoning[:200] if r.reasoning else None), | |
| "reasoning_details_count": ( | |
| len(r.reasoning_details) if r.reasoning_details else 0 | |
| ), | |
| "reasoning_tokens": r.reasoning_tokens, | |
| "completion_tokens": r.completion_tokens, | |
| "prompt_tokens": r.prompt_tokens, | |
| "visible_tokens": r.visible_tokens, | |
| "completion_minus_visible": r.completion_minus_visible, | |
| "tool_calls": r.tool_calls, | |
| "finish_reason": r.finish_reason, | |
| "elapsed": round(r.elapsed, 2), | |
| "error": r.error, | |
| "raw_message_keys": list(r.raw_message.keys()) if r.raw_message else [], | |
| "usage": r.raw_usage, | |
| "top_level": r.raw_top_level, | |
| } | |
| for r in results | |
| ] | |
| out.write_text(json.dumps(raw, indent=2, default=str)) | |
| print(f"\nRaw results saved to {out}") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment