Skip to content

Instantly share code, notes, and snippets.

@QwertyJack
Created January 27, 2026 16:19
Show Gist options
  • Select an option

  • Save QwertyJack/ab2a71e6b8cb4d7da8a8b84da7c8711f to your computer and use it in GitHub Desktop.

Select an option

Save QwertyJack/ab2a71e6b8cb4d7da8a8b84da7c8711f to your computer and use it in GitHub Desktop.
GLM-4.7 Tool Call Parser for vLLM with streaming support (v2)
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""
GLM-4.7 Tool Call Parser with true streaming support (v2).
This is a fixed version of the parser from PR #32888 that addresses:
- Bug 1: Missing partial token buffering
- Bug 2: Incorrect key/value parsing order
- Bug 3: Long value content not streamed incrementally (added in v2)
For long string values (e.g., file content), this parser now streams
the value content incrementally as it arrives, rather than waiting for
the complete </arg_value> closing tag.
Usage with vllm serve (plugin mode):
vllm serve <model> --tool-parser-plugin /app/glm47_tool_parser.py \
--tool-call-parser glm47
"""
import ast
import json
from collections.abc import Sequence
from typing import Any
import regex as re
from vllm.entrypoints.chat_utils import make_tool_call_id
from vllm.entrypoints.openai.protocol import (
ChatCompletionRequest,
ChatCompletionToolsParam,
DeltaFunctionCall,
DeltaMessage,
DeltaToolCall,
ExtractedToolCallInformation,
FunctionCall,
ToolCall,
)
from vllm.logger import init_logger
from vllm.tokenizers import TokenizerLike
from vllm.tool_parsers.abstract_tool_parser import ToolParser, ToolParserManager
logger = init_logger(__name__)
@ToolParserManager.register_module("glm47")
class Glm47ToolParser(ToolParser):
"""Tool parser for GLM-4.7 models that emit XML-like tool call tags.
Tool call format:
<tool_call>{tool_name}\\n
<arg_key>k</arg_key><arg_value>v</arg_value>
...
</tool_call>
The non-streaming path extracts complete tool calls from the final text.
The streaming path emits tool-call deltas incrementally as arguments arrive.
This is a FIXED version that addresses bugs in the original PR #32888,
with true incremental streaming for long string values.
"""
def __init__(self, tokenizer: TokenizerLike):
super().__init__(tokenizer)
# Stateful streaming fields. One parser instance is reused across chunks.
self.current_tool_name_sent: bool = False
self.prev_tool_call_arr: list[dict[str, Any]] = []
self.current_tool_id: int = -1
self.streamed_args_for_tool: list[str] = []
self.tool_call_start_token: str = "<tool_call>"
self.tool_call_end_token: str = "</tool_call>"
self.arg_key_start: str = "<arg_key>"
self.arg_key_end: str = "</arg_key>"
self.arg_val_start: str = "<arg_value>"
self.arg_val_end: str = "</arg_value>"
self.tool_calls_start_token = self.tool_call_start_token
self.func_call_regex = re.compile(r"<tool_call>.*?</tool_call>", re.DOTALL)
self.func_detail_regex = re.compile(
r"<tool_call>([^\n]*)\n(.*)</tool_call>", re.DOTALL
)
self.func_arg_regex = re.compile(
r"<arg_key>(.*?)</arg_key>\s*<arg_value>(.*?)</arg_value>", re.DOTALL
)
if not self.model_tokenizer:
raise ValueError(
"The model tokenizer must be passed to the ToolParser "
"constructor during construction."
)
self.tool_call_start_token_id = self.vocab.get(self.tool_call_start_token)
self.tool_call_end_token_id = self.vocab.get(self.tool_call_end_token)
self._buffer: str = ""
# Streaming state (for true incremental tool-call streaming)
self._in_tool_call: bool = False
self._current_tool_name: str | None = None
self._pending_key: str | None = None
self._streaming_string_value: bool = False # True when streaming a string value incrementally
self._tool_call_ids: list[str] = []
self._args_started: list[bool] = []
self._args_closed: list[bool] = []
self._seen_keys: list[set[str]] = []
@staticmethod
def _deserialize(value: str) -> Any:
try:
return json.loads(value)
except json.JSONDecodeError:
pass
try:
return ast.literal_eval(value)
except (ValueError, SyntaxError):
pass
return value
@staticmethod
def _json_escape_string_content(s: str) -> str:
"""JSON-escape string content for incremental streaming.
This escapes the content that goes INSIDE a JSON string (between quotes),
not including the surrounding quotes themselves.
"""
# Use json.dumps to get proper escaping, then strip the surrounding quotes
if not s:
return ""
return json.dumps(s, ensure_ascii=False)[1:-1]
@staticmethod
def _is_string_type(
tool_name: str,
arg_name: str,
tools: list[ChatCompletionToolsParam] | None,
) -> bool:
if tools is None:
return False
for tool in tools:
if tool.function.name != tool_name:
continue
if tool.function.parameters is None:
return False
arg_type = (
tool.function.parameters.get("properties", {})
.get(arg_name, {})
.get("type", None)
)
return arg_type == "string"
logger.debug("No tool named '%s'.", tool_name)
return False
@staticmethod
def _tools_enabled(request: ChatCompletionRequest) -> bool:
"""Return whether tool parsing should be applied for this request."""
try:
tools = getattr(request, "tools", None)
tool_choice = getattr(request, "tool_choice", None)
return bool(tools) and tool_choice != "none"
except Exception:
# If the request object is unexpected, default to parsing.
logger.exception("Failed to determine if tools are enabled.")
return True
def adjust_request(self, request: ChatCompletionRequest) -> ChatCompletionRequest:
"""
Adjust request parameters to ensure tool call tokens are not skipped
during tokenizer decoding.
"""
request = super().adjust_request(request)
if request.tools and request.tool_choice != "none":
# Ensure tool call tokens (<tool_call>, </tool_call>) are not skipped
# during decoding.
request.skip_special_tokens = False
return request
def extract_tool_calls(
self,
model_output: str,
request: ChatCompletionRequest,
) -> ExtractedToolCallInformation:
matched_tool_calls = self.func_call_regex.findall(model_output)
logger.debug("model_output: %s", model_output)
try:
tool_calls: list[ToolCall] = []
for match in matched_tool_calls:
tc_detail = self.func_detail_regex.search(match)
if not tc_detail:
logger.warning(
"Failed to parse tool call details from: %s",
match,
)
continue
tc_name = tc_detail.group(1)
tc_args = tc_detail.group(2)
pairs = self.func_arg_regex.findall(tc_args) if tc_args else []
arg_dct: dict[str, Any] = {}
for key, value in pairs:
arg_key = key.strip()
arg_val = value.strip()
if not self._is_string_type(tc_name, arg_key, request.tools):
arg_val = self._deserialize(arg_val)
logger.debug("arg_key = %s, arg_val = %s", arg_key, arg_val)
arg_dct[arg_key] = arg_val
tool_calls.append(
ToolCall(
type="function",
function=FunctionCall(
name=tc_name,
arguments=json.dumps(arg_dct, ensure_ascii=False),
),
)
)
except Exception:
logger.exception("Failed to extract tool call spec")
return ExtractedToolCallInformation(
tools_called=False, tool_calls=[], content=model_output
)
else:
if len(tool_calls) > 0:
content = model_output[: model_output.find(self.tool_calls_start_token)]
return ExtractedToolCallInformation(
tools_called=True, tool_calls=tool_calls, content=content
)
return ExtractedToolCallInformation(
tools_called=False, tool_calls=[], content=model_output
)
def extract_tool_calls_streaming(
self,
previous_text: str,
current_text: str,
delta_text: str,
previous_token_ids: Sequence[int],
current_token_ids: Sequence[int],
delta_token_ids: Sequence[int],
request: ChatCompletionRequest,
) -> DeltaMessage | None:
# Note: previous_text, current_text, previous_token_ids, current_token_ids,
# delta_token_ids are unused - we use our own buffer-based state machine.
# If tools are not enabled for this request, pass through content directly.
# Avoid buffering in this case to keep streaming text aligned and prevent
# content duplication artifacts.
if not self._tools_enabled(request):
return DeltaMessage(content=delta_text) if delta_text else None
# Stream tool calls incrementally: emit tool name early, then stream JSON
# argument fragments as they arrive.
self._buffer += delta_text
# Process at most one emission per invocation to preserve ordering.
while True:
if not self._in_tool_call:
start_idx = self._buffer.find(self.tool_call_start_token)
if start_idx == -1:
# FIX for Bug 1: Check for partial start token at end of buffer
# before emitting content. This prevents emitting partial tokens
# like "<tool_ca" when the full "<tool_call>" is split across chunks.
for i in range(1, len(self.tool_call_start_token)):
if self._buffer.endswith(self.tool_call_start_token[:i]):
# Potential partial match at end of buffer
out = self._buffer[:-i]
self._buffer = self._buffer[-i:]
return DeltaMessage(content=out) if out else None
# No tool call start token present: emit everything as content.
out = self._buffer
self._buffer = ""
return DeltaMessage(content=out) if out else None
if start_idx > 0:
out = self._buffer[:start_idx]
self._buffer = self._buffer[start_idx:]
return DeltaMessage(content=out) if out else None
# Buffer starts with the start token.
self._buffer = self._buffer[len(self.tool_call_start_token):]
self._begin_tool_call()
continue
# In tool call: parse tool name first.
if not self.current_tool_name_sent:
nl = self._buffer.find("\n")
ak = self._buffer.find(self.arg_key_start)
end = self._buffer.find(self.tool_call_end_token)
candidates = [i for i in [nl, ak, end] if i != -1]
if not candidates:
return None
cut = min(candidates)
tool_name = self._buffer[:cut].strip()
if tool_name == "" and cut == end:
return None
if cut == nl:
self._buffer = self._buffer[nl + 1:]
else:
self._buffer = self._buffer[cut:]
self._current_tool_name = tool_name
self.current_tool_name_sent = True
return self._emit_tool_name_delta(tool_name)
assert self._current_tool_name is not None
# Handle incremental string value streaming
if self._streaming_string_value:
# Currently streaming a string value - emit content incrementally
val_end = self._buffer.find(self.arg_val_end)
if val_end != -1:
# Found closing tag - emit remaining content and close the string
raw_content = self._buffer[:val_end]
self._buffer = self._buffer[val_end + len(self.arg_val_end):]
self._streaming_string_value = False
self._pending_key = None
# Emit the remaining content with closing quote
escaped = self._json_escape_string_content(raw_content)
frag = escaped + '"'
self.streamed_args_for_tool[self.current_tool_id] += frag
return self._emit_tool_args_delta(frag)
else:
# No closing tag yet - check for partial </arg_value> at end
safe_len = len(self._buffer)
for i in range(1, len(self.arg_val_end)):
if self._buffer.endswith(self.arg_val_end[:i]):
safe_len = len(self._buffer) - i
break
if safe_len > 0:
to_emit = self._buffer[:safe_len]
self._buffer = self._buffer[safe_len:]
escaped = self._json_escape_string_content(to_emit)
if escaped:
self.streamed_args_for_tool[self.current_tool_id] += escaped
return self._emit_tool_args_delta(escaped)
return None
# FIX for Bug 2: If we have a pending key, parse its value FIRST
# before checking for end tag. This ensures we don't close args
# prematurely when the buffer contains the value but no more keys.
if self._pending_key is not None:
val_pos = self._buffer.find(self.arg_val_start)
if val_pos == -1:
return None
if val_pos > 0:
self._buffer = self._buffer[val_pos:]
key = (self._pending_key or "").strip()
# Check if this is a string type - if so, stream incrementally
is_string = self._is_string_type(
self._current_tool_name, key, request.tools
)
if is_string:
# String type: emit key prefix and enter streaming mode
self._buffer = self._buffer[len(self.arg_val_start):]
if key in self._seen_keys[self.current_tool_id]:
# Duplicate key - skip it
self._pending_key = None
continue
self._seen_keys[self.current_tool_id].add(key)
key_json = json.dumps(key, ensure_ascii=False)
if not self._args_started[self.current_tool_id]:
frag = "{" + key_json + ':"'
self._args_started[self.current_tool_id] = True
else:
frag = "," + key_json + ':"'
self.streamed_args_for_tool[self.current_tool_id] += frag
self._streaming_string_value = True
return self._emit_tool_args_delta(frag)
else:
# Non-string type: wait for complete value (original behavior)
val_end = self._buffer.find(self.arg_val_end)
if val_end == -1:
return None
raw_val = self._buffer[len(self.arg_val_start):val_end].strip()
self._buffer = self._buffer[val_end + len(self.arg_val_end):]
self._pending_key = None
frag = self._append_arg_fragment(
tool_name=self._current_tool_name,
key=key,
raw_val=raw_val,
request_tools=request.tools,
)
if frag:
return self._emit_tool_args_delta(frag)
continue
# Parse next complete <arg_key>/<arg_value> pair, or close.
end_pos = self._buffer.find(self.tool_call_end_token)
key_pos = self._buffer.find(self.arg_key_start)
if end_pos != -1 and (key_pos == -1 or end_pos < key_pos):
self._buffer = self._buffer[end_pos + len(self.tool_call_end_token):]
frag = self._close_args_if_needed()
self._finish_tool_call()
return self._emit_tool_args_delta(frag) if frag else None
if key_pos == -1:
return None
if key_pos > 0:
self._buffer = self._buffer[key_pos:]
key_end = self._buffer.find(self.arg_key_end)
if key_end == -1:
return None
key = self._buffer[len(self.arg_key_start):key_end]
self._buffer = self._buffer[key_end + len(self.arg_key_end):]
self._pending_key = key
continue
def _ensure_tool_state(self) -> None:
while len(self._tool_call_ids) <= self.current_tool_id:
self._tool_call_ids.append(
make_tool_call_id(id_type="random", func_name=None, idx=None)
)
while len(self.streamed_args_for_tool) <= self.current_tool_id:
self.streamed_args_for_tool.append("")
while len(self.prev_tool_call_arr) <= self.current_tool_id:
self.prev_tool_call_arr.append({})
while len(self._args_started) <= self.current_tool_id:
self._args_started.append(False)
while len(self._args_closed) <= self.current_tool_id:
self._args_closed.append(False)
while len(self._seen_keys) <= self.current_tool_id:
self._seen_keys.append(set())
def _begin_tool_call(self) -> None:
if self.current_tool_id == -1:
self.current_tool_id = 0
else:
self.current_tool_id += 1
self._ensure_tool_state()
self.current_tool_name_sent = False
self._current_tool_name = None
self._pending_key = None
self._streaming_string_value = False
self._in_tool_call = True
def _finish_tool_call(self) -> None:
self._in_tool_call = False
self._current_tool_name = None
self._pending_key = None
self._streaming_string_value = False
def _emit_tool_name_delta(self, tool_name: str) -> DeltaMessage:
# Some clients assume `function.arguments` is always a string whenever
# `tool_calls` is present, even on the first delta that only includes a name.
return DeltaMessage(
tool_calls=[
DeltaToolCall(
index=self.current_tool_id,
id=self._tool_call_ids[self.current_tool_id],
type="function",
function=DeltaFunctionCall(
name=tool_name,
arguments="",
).model_dump(exclude_none=True),
)
]
)
def _emit_tool_args_delta(self, fragment: str) -> DeltaMessage:
return DeltaMessage(
tool_calls=[
DeltaToolCall(
index=self.current_tool_id,
function=DeltaFunctionCall(arguments=fragment).model_dump(
exclude_none=True
),
)
]
)
def _append_arg_fragment(
self,
*,
tool_name: str,
key: str,
raw_val: str,
request_tools: list[ChatCompletionToolsParam] | None,
) -> str | None:
key = key.strip()
if not key:
return None
if key in self._seen_keys[self.current_tool_id]:
# Avoid emitting duplicate keys (cannot rewrite already-streamed JSON).
return None
if self._is_string_type(tool_name, key, request_tools):
val_obj: Any = raw_val
else:
val_obj = self._deserialize(raw_val)
key_json = json.dumps(key, ensure_ascii=False)
val_json = json.dumps(val_obj, ensure_ascii=False)
if not self._args_started[self.current_tool_id]:
fragment = "{" + key_json + ":" + val_json
self._args_started[self.current_tool_id] = True
else:
fragment = "," + key_json + ":" + val_json
self._seen_keys[self.current_tool_id].add(key)
self.streamed_args_for_tool[self.current_tool_id] += fragment
self.prev_tool_call_arr[self.current_tool_id] = {
"name": tool_name,
"arguments": self.streamed_args_for_tool[self.current_tool_id],
}
return fragment
def _close_args_if_needed(self) -> str | None:
if self._args_closed[self.current_tool_id]:
return None
self._args_closed[self.current_tool_id] = True
if not self._args_started[self.current_tool_id]:
fragment = "{}"
self.streamed_args_for_tool[self.current_tool_id] = fragment
else:
fragment = "}"
self.streamed_args_for_tool[self.current_tool_id] += fragment
return fragment
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment