|
# /// script |
|
# dependencies = [ |
|
# "httpx>=0.27.0", |
|
# "pydantic>=2.0.0", |
|
# ] |
|
# /// |
|
|
|
""" |
|
Minimal reproduction script for zhipu anthropic API ToolCall zero value issue. |
|
Issue: When the tools list changes mid-stream, new tools receive zero/empty parameters. |
|
This script demonstrates the issue by: |
|
1. Making a streaming request with initial tools |
|
2. Observing the tool_use events in the stream |
|
3. Checking if tool parameters are complete when received |
|
Usage: |
|
uv run scripts/reproduce_tool_issue.py |
|
""" |
|
|
|
import asyncio |
|
import json |
|
import os |
|
from typing import Any |
|
|
|
import httpx |
|
from pydantic import BaseModel |
|
|
|
|
|
# Configuration |
|
ZHIPU_API_KEY = os.environ.get("ZHIPU_API_KEY", "") |
|
ZHIPU_BASE_URL = os.environ.get( |
|
"ZHIPU_BASE_URL", "https://open.bigmodel.cn/api/anthropic" |
|
) |
|
|
|
# Tool definitions |
|
TOOLS = [ |
|
{ |
|
"name": "get_weather", |
|
"description": "Get the current weather for a location", |
|
"input_schema": { |
|
"type": "object", |
|
"properties": { |
|
"location": { |
|
"type": "string", |
|
"description": "City name, e.g. San Francisco, CA", |
|
}, |
|
"unit": { |
|
"type": "string", |
|
"enum": ["celsius", "fahrenheit"], |
|
"description": "Temperature unit", |
|
}, |
|
}, |
|
"required": ["location"], |
|
}, |
|
}, |
|
{ |
|
"name": "calculate", |
|
"description": "Perform a simple calculation", |
|
"input_schema": { |
|
"type": "object", |
|
"properties": { |
|
"operation": { |
|
"type": "string", |
|
"enum": ["add", "subtract", "multiply", "divide"], |
|
}, |
|
"a": {"type": "number"}, |
|
"b": {"type": "number"}, |
|
}, |
|
"required": ["operation", "a", "b"], |
|
}, |
|
}, |
|
] |
|
|
|
|
|
class ToolUseBlock(BaseModel): |
|
id: str |
|
name: str |
|
input: dict[str, Any] |
|
|
|
|
|
class StreamEvent: |
|
def __init__(self, event_type: str, data: dict[str, Any]): |
|
self.event_type = event_type |
|
self.data = data |
|
|
|
def __repr__(self) -> str: |
|
return f"StreamEvent(type={self.event_type}, data={self.data})" |
|
|
|
|
|
async def parse_sse_stream(response: httpx.Response) -> list[StreamEvent]: |
|
"""Parse Server-Sent Events stream.""" |
|
events: list[StreamEvent] = [] |
|
event_type = "" |
|
data_buffer = "" |
|
|
|
async for line in response.aiter_lines(): |
|
line = line.strip() |
|
|
|
if not line: |
|
# Empty line marks end of event |
|
if event_type or data_buffer: |
|
events.append( |
|
StreamEvent( |
|
event_type=event_type or "message", |
|
data=json.loads(data_buffer) if data_buffer else {}, |
|
) |
|
) |
|
event_type = "" |
|
data_buffer = "" |
|
continue |
|
|
|
if line.startswith(":"): |
|
# Comment line, skip |
|
continue |
|
|
|
if line.startswith("event:"): |
|
event_type = line[6:].strip() |
|
elif line.startswith("data:"): |
|
data_buffer += line[5:].strip() |
|
|
|
return events |
|
|
|
|
|
async def parse_sse_stream_from_lines(lines: list[str]) -> list[StreamEvent]: |
|
"""Parse Server-Sent Events from pre-collected lines.""" |
|
events: list[StreamEvent] = [] |
|
event_type = "" |
|
data_buffer = "" |
|
|
|
for line in lines: |
|
line = line.strip() |
|
|
|
if not line: |
|
# Empty line marks end of event |
|
if event_type or data_buffer: |
|
events.append( |
|
StreamEvent( |
|
event_type=event_type or "message", |
|
data=json.loads(data_buffer) if data_buffer else {}, |
|
) |
|
) |
|
event_type = "" |
|
data_buffer = "" |
|
continue |
|
|
|
if line.startswith(":"): |
|
# Comment line, skip |
|
continue |
|
|
|
if line.startswith("event:"): |
|
event_type = line[6:].strip() |
|
elif line.startswith("data:"): |
|
data_buffer += line[5:].strip() |
|
|
|
return events |
|
|
|
|
|
async def test_streaming_tool_call() -> None: |
|
"""Test streaming tool call with zhipu API.""" |
|
if not ZHIPU_API_KEY: |
|
print("ERROR: ZHIPU_API_KEY environment variable not set") |
|
print("Usage: ZHIPU_API_KEY=your_key uv run scripts/reproduce_tool_issue.py") |
|
return |
|
|
|
print(f"Testing API: {ZHIPU_BASE_URL}") |
|
print("-" * 60) |
|
|
|
request_payload = { |
|
"model": "GLM-4.7", |
|
"max_tokens": 1024, |
|
"stream": True, |
|
"messages": [ |
|
{ |
|
"role": "user", |
|
"content": "What's the weather in Beijing? Also, what is 25 + 17?", |
|
} |
|
], |
|
"tools": TOOLS, |
|
} |
|
|
|
print(f"Request payload (tools):") |
|
for tool in TOOLS: |
|
print(f" - {tool['name']}") |
|
print("-" * 60) |
|
|
|
async with httpx.AsyncClient(timeout=60.0) as client: |
|
response = await client.post( |
|
f"{ZHIPU_BASE_URL}/v1/messages", |
|
headers={ |
|
"Authorization": f"Bearer {ZHIPU_API_KEY}", |
|
"Content-Type": "application/json", |
|
"Accept": "text/event-stream", |
|
}, |
|
json=request_payload, |
|
) |
|
|
|
print(f"Response status: {response.status_code}") |
|
print("-" * 60) |
|
|
|
if response.status_code != 200: |
|
print(f"Error response: {response.text}") |
|
return |
|
|
|
events = await parse_sse_stream(response) |
|
|
|
# Track tool use blocks and their input accumulation |
|
tool_use_blocks: dict[str, dict[str, Any]] = {} |
|
current_tool_index: int | None = None |
|
partial_json_buffers: dict[int, str] = {} |
|
|
|
for event in events: |
|
print(f"Event: {event.event_type}") |
|
|
|
if event.event_type == "content_block_start": |
|
if event.data.get("content_block", {}).get("type") == "tool_use": |
|
index = event.data.get("index", -1) |
|
block = event.data.get("content_block", {}) |
|
tool_id = block.get("id", "") |
|
tool_name = block.get("name", "") |
|
tool_use_blocks[tool_id] = { |
|
"id": tool_id, |
|
"name": tool_name, |
|
"input": {}, |
|
"input_json": "", |
|
} |
|
print(f" Tool use started: index={index}, name={tool_name}") |
|
current_tool_index = index |
|
|
|
elif event.event_type == "content_block_delta": |
|
index = event.data.get("index", -1) |
|
delta = event.data.get("delta", {}) |
|
delta_type = delta.get("type", "") |
|
|
|
if delta_type == "input_json_delta": |
|
partial_json = delta.get("partial_json", "") |
|
partial_json_buffers[index] = ( |
|
partial_json_buffers.get(index, "") + partial_json |
|
) |
|
print(f" Input JSON delta (index={index}): {repr(partial_json)}") |
|
|
|
# Find the tool_id for this index |
|
for tool in tool_use_blocks.values(): |
|
# We need to track which index corresponds to which tool |
|
pass |
|
|
|
elif event.event_type == "content_block_stop": |
|
index = event.data.get("index", -1) |
|
if index in partial_json_buffers: |
|
full_json = partial_json_buffers[index] |
|
print(f" Final JSON (index={index}): {full_json}") |
|
|
|
# Try to parse the complete JSON |
|
try: |
|
parsed_input = json.loads(full_json) |
|
print(f" Parsed input: {parsed_input}") |
|
|
|
# Check if input is empty/zero values |
|
if not parsed_input: |
|
print(" ⚠️ ISSUE DETECTED: Tool input is empty object!") |
|
else: |
|
# Check if all values are None/empty |
|
all_empty = all( |
|
v is None or v == "" or v == 0 |
|
for v in parsed_input.values() |
|
) |
|
if all_empty: |
|
print( |
|
" ⚠️ ISSUE DETECTED: All tool parameters are zero/empty!" |
|
) |
|
|
|
except json.JSONDecodeError as e: |
|
print(f" ⚠️ JSON parsing error: {e}") |
|
|
|
elif event.event_type == "message_stop": |
|
print(" Message complete") |
|
|
|
print("-" * 60) |
|
print("Summary:") |
|
print(f" Total events: {len(events)}") |
|
print(f" Tool use blocks: {len(tool_use_blocks)}") |
|
|
|
|
|
async def test_dynamic_tool_change() -> None: |
|
""" |
|
Test scenario where tools list changes between requests. |
|
This simulates the scenario where: |
|
1. First request with tool A |
|
2. Model starts using tool A |
|
3. Second request with tool A + tool B |
|
4. Model switches to tool B but gets zero parameters |
|
""" |
|
if not ZHIPU_API_KEY: |
|
print("ERROR: ZHIPU_API_KEY environment variable not set") |
|
return |
|
|
|
print("\n" + "=" * 60) |
|
print("TESTING DYNAMIC TOOL CHANGE SCENARIO") |
|
print("=" * 60) |
|
|
|
# Build message history |
|
messages_history: list[dict[str, Any]] = [ |
|
{ |
|
"role": "user", |
|
"content": "Hello", |
|
} |
|
] |
|
|
|
# First request with only get_weather tool |
|
print("\nStep 1: Initial request with single tool (get_weather)") |
|
print("-" * 60) |
|
|
|
request_1 = { |
|
"model": "glm-4-flash", |
|
"max_tokens": 1024, |
|
"stream": True, |
|
"messages": messages_history, |
|
"tools": [TOOLS[0]], # Only get_weather |
|
"thinking": { |
|
"type": "enabled", |
|
"budget_tokens": 1024, |
|
}, |
|
} |
|
|
|
async with httpx.AsyncClient(timeout=60.0) as client: |
|
response = await client.post( |
|
f"{ZHIPU_BASE_URL}/v1/messages", |
|
headers={ |
|
"Authorization": f"Bearer {ZHIPU_API_KEY}", |
|
"Content-Type": "application/json", |
|
"Accept": "text/event-stream", |
|
}, |
|
json=request_1, |
|
) |
|
|
|
if response.status_code == 200: |
|
events = await parse_sse_stream(response) |
|
# Track blocks by index |
|
blocks_by_index: dict[int, dict[str, Any]] = {} |
|
text_buffers: dict[int, str] = {} |
|
thinking_buffers: dict[int, str] = {} |
|
partial_json_buffers: dict[int, str] = {} |
|
|
|
for event in events: |
|
if event.event_type == "content_block_start": |
|
index = event.data.get("index", -1) |
|
block = event.data.get("content_block", {}) |
|
block_type = block.get("type") |
|
print(f" Block start: index={index}, type={block_type}") |
|
# Initialize block with the template from content_block |
|
blocks_by_index[index] = block.copy() |
|
|
|
elif event.event_type == "content_block_delta": |
|
index = event.data.get("index", -1) |
|
delta = event.data.get("delta", {}) |
|
delta_type = delta.get("type", "") |
|
|
|
if delta_type == "text_delta": |
|
text = delta.get("text", "") |
|
text_buffers[index] = text_buffers.get(index, "") + text |
|
if index in blocks_by_index: |
|
blocks_by_index[index]["text"] = text_buffers[index] |
|
|
|
elif delta_type == "thinking_delta": |
|
thinking = delta.get("thinking", "") |
|
thinking_buffers[index] = thinking_buffers.get(index, "") + thinking |
|
if index in blocks_by_index: |
|
blocks_by_index[index]["thinking"] = thinking_buffers[index] |
|
|
|
elif delta_type == "input_json_delta": |
|
partial = delta.get("partial_json", "") |
|
partial_json_buffers[index] = partial_json_buffers.get(index, "") + partial |
|
print(f" Input JSON delta (index={index}): {repr(partial)}") |
|
|
|
elif event.event_type == "content_block_stop": |
|
index = event.data.get("index", -1) |
|
print(f" Block stop: index={index}") |
|
|
|
# For tool_use blocks, parse the accumulated JSON |
|
if index in blocks_by_index and blocks_by_index[index].get("type") == "tool_use": |
|
if index in partial_json_buffers: |
|
full_json = partial_json_buffers[index] |
|
print(f" Final JSON (index={index}): {full_json}") |
|
try: |
|
parsed_input = json.loads(full_json) |
|
print(f" Parsed input: {parsed_input}") |
|
blocks_by_index[index]["input"] = parsed_input |
|
except json.JSONDecodeError as e: |
|
print(f" ⚠️ JSON parsing error: {e}") |
|
|
|
# Convert blocks_by_index to sorted list |
|
content_blocks = [ |
|
blocks_by_index[i] for i in sorted(blocks_by_index.keys()) |
|
] |
|
print(f" Accumulated {len(content_blocks)} content blocks") |
|
|
|
# Accumulate message history: add assistant response |
|
messages_history.append({"role": "assistant", "content": content_blocks}) |
|
else: |
|
print(f"Error: {response.text}") |
|
return |
|
|
|
# Second request adding calculate tool to the list |
|
print("\nStep 2: Follow-up request with calculate tool (streaming)") |
|
print("-" * 60) |
|
|
|
# Add new user message |
|
messages_history.extend([ |
|
{ |
|
"role": "user", |
|
"content": "Now calculate 100 + 200 using the calculate tool", |
|
}, |
|
{ |
|
"role": "user", |
|
"content": "You must call the tool \"calculate\" exactly once. \nDo not output plain text. Provide all required fields in the tool input. \nRequired fields: \"operation\", \"a\", \"b\".", |
|
}, |
|
]) |
|
|
|
request_2 = { |
|
"model": "GLM-4.7", |
|
"max_tokens": 1024, |
|
"stream": True, |
|
"messages": messages_history, |
|
"tools": [TOOLS[1]], # Now only calculate tool |
|
"tool_choice": { |
|
"type": "tool", |
|
"name": "calculate", |
|
}, |
|
"thinking": { |
|
"type": "enabled", |
|
"budget_tokens": 1024, |
|
}, |
|
} |
|
print("request_2: ", request_2) |
|
|
|
# Record request details |
|
request_headers = { |
|
"Authorization": f"Bearer {ZHIPU_API_KEY}", |
|
"Content-Type": "application/json", |
|
"Accept": "text/event-stream", |
|
} |
|
request_body = request_2 |
|
|
|
response = await client.post( |
|
f"{ZHIPU_BASE_URL}/v1/messages", |
|
headers=request_headers, |
|
json=request_body, |
|
) |
|
|
|
# Record response details |
|
response_headers = dict(response.headers) |
|
response_lines: list[str] = [] |
|
|
|
# Collect raw response lines while parsing |
|
async for line in response.aiter_lines(): |
|
response_lines.append(line) |
|
|
|
response_body = "\n".join(response_lines) |
|
|
|
# Write to file |
|
output_file = "/tmp/zhipu_request_response.log" |
|
with open(output_file, "w") as f: |
|
f.write("=" * 60 + "\n") |
|
f.write("REQUEST\n") |
|
f.write("=" * 60 + "\n") |
|
f.write("Headers:\n") |
|
for k, v in request_headers.items(): |
|
# Mask authorization token |
|
if k.lower() == "authorization": |
|
v = v[:20] + "...***masked***..." |
|
f.write(f" {k}: {v}\n") |
|
f.write("\nBody:\n") |
|
f.write(json.dumps(request_body, indent=2, ensure_ascii=False)) |
|
f.write("\n\n") |
|
|
|
f.write("=" * 60 + "\n") |
|
f.write("RESPONSE\n") |
|
f.write("=" * 60 + "\n") |
|
f.write(f"Status: {response.status_code}\n") |
|
f.write("Headers:\n") |
|
for k, v in response_headers.items(): |
|
f.write(f" {k}: {v}\n") |
|
f.write("\nBody:\n") |
|
f.write(response_body) |
|
print(f"Request/Response written to: {output_file}") |
|
|
|
if response.status_code == 200: |
|
# Parse collected lines |
|
events = await parse_sse_stream_from_lines(response_lines) |
|
# Track blocks by index |
|
blocks_by_index: dict[int, dict[str, Any]] = {} |
|
text_buffers: dict[int, str] = {} |
|
thinking_buffers: dict[int, str] = {} |
|
partial_json_buffers: dict[int, str] = {} |
|
|
|
for event in events: |
|
if event.event_type == "content_block_start": |
|
index = event.data.get("index", -1) |
|
block = event.data.get("content_block", {}) |
|
block_type = block.get("type") |
|
print(f" Block start: index={index}, type={block_type}") |
|
blocks_by_index[index] = block.copy() |
|
|
|
elif event.event_type == "content_block_delta": |
|
index = event.data.get("index", -1) |
|
delta = event.data.get("delta", {}) |
|
delta_type = delta.get("type", "") |
|
|
|
if delta_type == "text_delta": |
|
text = delta.get("text", "") |
|
text_buffers[index] = text_buffers.get(index, "") + text |
|
if index in blocks_by_index: |
|
blocks_by_index[index]["text"] = text_buffers[index] |
|
|
|
elif delta_type == "thinking_delta": |
|
thinking = delta.get("thinking", "") |
|
thinking_buffers[index] = thinking_buffers.get(index, "") + thinking |
|
if index in blocks_by_index: |
|
blocks_by_index[index]["thinking"] = thinking_buffers[index] |
|
|
|
elif delta_type == "input_json_delta": |
|
partial = delta.get("partial_json", "") |
|
partial_json_buffers[index] = partial_json_buffers.get(index, "") + partial |
|
print(f" Input JSON delta (index={index}): {repr(partial)}") |
|
|
|
elif event.event_type == "content_block_stop": |
|
index = event.data.get("index", -1) |
|
print(f" Block stop: index={index}") |
|
|
|
# For tool_use blocks, parse the accumulated JSON |
|
if index in blocks_by_index and blocks_by_index[index].get("type") == "tool_use": |
|
if index in partial_json_buffers: |
|
full_json = partial_json_buffers[index] |
|
print(f" Final JSON (index={index}): {full_json}") |
|
try: |
|
parsed_input = json.loads(full_json) |
|
print(f" Parsed input: {parsed_input}") |
|
blocks_by_index[index]["input"] = parsed_input |
|
|
|
# Check for the issue |
|
if not parsed_input or all( |
|
v is None or v == "" or v == 0 for v in parsed_input.values() |
|
): |
|
print(" ⚠️ ISSUE DETECTED: calculate tool has zero/empty parameters!") |
|
else: |
|
print(" ✓ Tool parameters are present") |
|
except json.JSONDecodeError as e: |
|
print(f" ⚠️ JSON parsing error: {e}") |
|
|
|
# Convert blocks_by_index to sorted list |
|
content_blocks = [ |
|
blocks_by_index[i] for i in sorted(blocks_by_index.keys()) |
|
] |
|
print(f" Accumulated {len(content_blocks)} content blocks") |
|
else: |
|
print(f"Error: {response.text}") |
|
|
|
|
|
async def main() -> None: |
|
"""Run all tests.""" |
|
print("Zhipu Anthropic API ToolCall Issue Reproduction") |
|
print("=" * 60) |
|
print() |
|
|
|
#await test_streaming_tool_call() |
|
await test_dynamic_tool_change() |
|
|
|
print("\n" + "=" * 60) |
|
print("Tests complete") |
|
|
|
|
|
if __name__ == "__main__": |
|
asyncio.run(main()) |