Skip to content

Instantly share code, notes, and snippets.

@bikidsx
Created December 2, 2025 10:25
Show Gist options
  • Select an option

  • Save bikidsx/0df0b7ed551d2902a5db229bc5d73bcd to your computer and use it in GitHub Desktop.

Select an option

Save bikidsx/0df0b7ed551d2902a5db229bc5d73bcd to your computer and use it in GitHub Desktop.
Vercel AI SDK Adapte for AgnoAI Framework
"""
Vercel AI SDK Adapter for Agno
Converts Agno agent/team streams to Vercel AI SDK Data V5 Stream Protocol (SSE)
Inspired by @https://github.com/gauravdhiman/vercel-agno-integration/blob/main/server/agno_adapter.py
"""
import asyncio
import json
import logging
from typing import AsyncGenerator, Any, Dict, List, Optional
from uuid import uuid4
from agno.agent import (
Agent,
RunContentEvent,
RunStartedEvent,
RunCompletedEvent,
RunErrorEvent,
ToolCallStartedEvent,
ToolCallCompletedEvent,
RunOutputEvent,
)
from agno.team import Team
from agno.run.team import TeamRunEvent
from agno.models.message import Message as AgnoMessage
logger = logging.getLogger(__name__)
# Enable debug logging
logging.basicConfig(level=logging.DEBUG)
class VercelAIAdapter:
"""
Adapts Agno Agent/Team output streams to Vercel AI SDK Data Stream Protocol.
Supports SSE streaming with proper event formatting.
"""
def __init__(self, agent_or_team):
"""
Initialize adapter with an Agno Agent or Team.
Args:
agent_or_team: Agno Agent or Team instance
"""
if not isinstance(agent_or_team, (Agent, Team)):
raise TypeError("Input must be an instance of agno.Agent or agno.Team")
self.executor = agent_or_team
self.name = agent_or_team.name
self._processed_tool_calls = set()
@staticmethod
def _format_sse_event(event_type: str, data: Any) -> str:
"""
Format data as Server-Sent Event according to Vercel AI SDK protocol.
Args:
event_type: Type of event (text-delta, tool-input-start, etc.)
data: Event data to serialize
Returns:
Formatted SSE string
"""
try:
if isinstance(data, str):
payload = json.dumps(data)
else:
payload = json.dumps(data, default=str)
except TypeError as e:
logger.error(f"Serialization error for {event_type}: {e}")
payload = json.dumps({"error": "Serialization failed"})
event_type = "error"
# Vercel AI SDK Data Stream Protocol format
return f"data: {json.dumps({'type': event_type, **json.loads(payload)})}\n\n"
async def _agno_to_vercel_stream(
self,
agno_response_stream: AsyncGenerator[RunOutputEvent, None]
) -> AsyncGenerator[bytes, None]:
"""
Convert Agno stream events to Vercel AI SDK format.
Args:
agno_response_stream: Async generator from Agno agent/team
Yields:
SSE-formatted bytes
"""
message_id = f"msg_{uuid4().hex}"
text_block_id = None
try:
# Send message start event
yield self._format_sse_event("start", {"messageId": message_id}).encode("utf-8")
await asyncio.sleep(0.01)
async for agno_response in agno_response_stream:
# Debug: log event type
logger.debug(f"Received event: {type(agno_response).__name__}")
# Check for content attribute (works for both Agent and Team events)
content = getattr(agno_response, 'content', None)
event_attr = getattr(agno_response, 'event', None)
# Handle content events (Team or Agent)
if (event_attr == TeamRunEvent.run_content or isinstance(agno_response, RunContentEvent)) and content:
# Start text block if not started
if not text_block_id:
text_block_id = f"text_{uuid4().hex}"
yield self._format_sse_event("text-start", {"id": text_block_id}).encode("utf-8")
await asyncio.sleep(0.01)
# Send text delta
yield self._format_sse_event("text-delta", {
"id": text_block_id,
"delta": content
}).encode("utf-8")
await asyncio.sleep(0.01)
# Handle tool call started (Team or Agent)
elif event_attr == TeamRunEvent.tool_call_started or isinstance(agno_response, ToolCallStartedEvent):
tool = getattr(agno_response, 'tool', None)
if tool:
tool_call_id = getattr(tool, 'tool_call_id', None) or f"tool_{uuid4().hex}"
if tool_call_id not in self._processed_tool_calls:
self._processed_tool_calls.add(tool_call_id)
tool_name = getattr(tool, 'tool_name', 'unknown')
tool_args = getattr(tool, 'tool_args', {})
# Send tool input start
yield self._format_sse_event("tool-input-start", {
"toolCallId": tool_call_id,
"toolName": tool_name
}).encode("utf-8")
await asyncio.sleep(0.01)
# Send tool input available
yield self._format_sse_event("tool-input-available", {
"toolCallId": tool_call_id,
"toolName": tool_name,
"input": tool_args if isinstance(tool_args, dict) else {}
}).encode("utf-8")
await asyncio.sleep(0.01)
# Handle tool completion (Team or Agent)
elif event_attr == TeamRunEvent.tool_call_completed or isinstance(agno_response, ToolCallCompletedEvent):
tool = getattr(agno_response, 'tool', None)
if tool:
tool_call_id = getattr(tool, 'tool_call_id', None) or f"tool_{uuid4().hex}"
tool_result = getattr(tool, 'result', None)
if tool_result:
yield self._format_sse_event("tool-output-available", {
"toolCallId": tool_call_id,
"output": str(tool_result)
}).encode("utf-8")
await asyncio.sleep(0.01)
# Handle errors (Team or Agent)
elif event_attr == TeamRunEvent.run_error or isinstance(agno_response, RunErrorEvent):
error_msg = getattr(agno_response, 'error', 'Unknown error')
yield self._format_sse_event("error", {
"errorText": str(error_msg)
}).encode("utf-8")
await asyncio.sleep(0.01)
# Handle completion (Team or Agent)
elif event_attr == TeamRunEvent.run_completed or isinstance(agno_response, RunCompletedEvent):
# End text block if started
if text_block_id:
yield self._format_sse_event("text-end", {"id": text_block_id}).encode("utf-8")
await asyncio.sleep(0.01)
# Send finish message with usage
finish_data = {"finishReason": "stop"}
metrics = getattr(agno_response, 'metrics', None)
if metrics:
finish_data["usage"] = {
"promptTokens": int(getattr(metrics, 'input_tokens', 0)),
"completionTokens": int(getattr(metrics, 'output_tokens', 0)),
}
finish_data["usage"]["totalTokens"] = (
finish_data["usage"]["promptTokens"] +
finish_data["usage"]["completionTokens"]
)
yield self._format_sse_event("finish", finish_data).encode("utf-8")
await asyncio.sleep(0.01)
# Send stream termination
yield b"data: [DONE]\n\n"
except Exception as e:
logger.error(f"Error in stream: {e}", exc_info=True)
# Send error event
yield self._format_sse_event("error", {
"errorText": "An error occurred processing your request."
}).encode("utf-8")
# Send finish event
yield self._format_sse_event("finish", {"finishReason": "error"}).encode("utf-8")
yield b"data: [DONE]\n\n"
def _prepare_agno_messages(self, messages: List[Dict[str, Any]]) -> List[AgnoMessage]:
"""
Convert Vercel AI SDK message format to Agno messages.
Args:
messages: List of messages from Vercel AI SDK
Returns:
List of Agno Message objects
"""
agno_messages = []
for msg in messages:
role = msg.get("role")
content = msg.get("content")
if content:
agno_messages.append(AgnoMessage(
role=role,
content=str(content)
))
# Handle tool invocations (results from frontend)
if "toolInvocations" in msg:
for tool_inv in msg.get("toolInvocations", []):
if tool_inv.get("state") == "result":
tool_name = tool_inv.get("toolName")
tool_result = tool_inv.get("result", {})
agno_messages.append(AgnoMessage(
role="user",
content=f"Tool '{tool_name}' returned: {json.dumps(tool_result)}"
))
return agno_messages
async def stream_response(
self,
messages: List[Dict[str, Any]],
session_id: Optional[str] = None,
user_id: Optional[str] = None,
**kwargs: Any
) -> AsyncGenerator[bytes, None]:
"""
Main entry point for streaming responses.
Args:
messages: Full message history from Vercel AI SDK
session_id: Optional session ID for context
user_id: Optional user ID
**kwargs: Additional arguments for agent/team run
Yields:
SSE-formatted bytes
"""
# Reset tool tracking for new conversation
self._processed_tool_calls = set()
# Convert messages
agno_messages = self._prepare_agno_messages(messages)
if not agno_messages:
return
# Latest message is the user input
user_input = agno_messages[-1]
# Previous messages are context
if len(agno_messages) > 1:
self.executor.add_messages = agno_messages[:-1]
# Start streaming from Agno (returns async generator directly in v2)
agno_stream = self.executor.arun(
input=user_input, # Changed from 'message' to 'input' for v2 compatibility
session_id=session_id,
user_id=user_id,
stream=True,
stream_events=True, # Changed from stream_intermediate_steps
**kwargs
)
# Convert and yield
async for chunk in self._agno_to_vercel_stream(agno_stream):
yield chunk
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment