Skip to content

Instantly share code, notes, and snippets.

@raoulbia-ai
Created January 15, 2026 12:52
Show Gist options
  • Select an option

  • Save raoulbia-ai/934f1fd83f09638dd2343ad152a96782 to your computer and use it in GitHub Desktop.

Select an option

Save raoulbia-ai/934f1fd83f09638dd2343ad152a96782 to your computer and use it in GitHub Desktop.
A2A Agent Connector Component for Langflow. Connects to remote agents using Google's A2A (Agent-to-Agent) Protocol. Drop this file into your Langflow custom components directory. License: MIT
"""Requirements: pip install a2a-sdk httpx"""
from __future__ import annotations
import uuid
from typing import TYPE_CHECKING, Any
from langflow.custom import Component
from langflow.io import (
IntInput,
MessageInput,
Output,
SecretStrInput,
StrInput,
)
from langflow.schema.message import Message
if TYPE_CHECKING:
from a2a.client import A2AClient
from a2a.types import AgentCard, Task, Message as A2AMessage
class A2AAgentConnector(Component):
"""A2A Agent Connector Component.
Connects to remote agents using the A2A Protocol.
Supports synchronous communication and exposes remote agents as tools.
"""
display_name = "A2A Agent Connector"
description = "Connect to remote agents using the Agent2Agent (A2A) Protocol"
icon = "network"
name = "A2AAgentConnector"
documentation = "https://github.com/google/a2a-protocol"
inputs = [
StrInput(
name="agent_card_url",
display_name="Agent Card URL",
info="URL to the remote agent's .well-known/agent.json endpoint",
required=True,
placeholder="http://localhost:7860/api/v1/a2a/{flow_id}/{component_id}/.well-known/agent.json",
tool_mode=False,
),
StrInput(
name="tool_name",
display_name="Tool Name",
info="Unique name for this tool (e.g., 'planning_agent', 'research_agent')",
required=False,
advanced=False,
tool_mode=False,
),
MessageInput(
name="message",
display_name="Message",
info="Message to send to the remote agent",
required=True,
tool_mode=True,
),
IntInput(
name="timeout",
display_name="Timeout (seconds)",
info="Request timeout. Use 300+ for reasoning models with long thinking pauses.",
value=1200,
advanced=True,
),
SecretStrInput(
name="auth_token",
display_name="Authentication Token",
info="Bearer token for authentication (if required)",
required=False,
advanced=True,
),
]
outputs = [
Output(
name="response",
display_name="Agent Response",
method="send_to_agent",
tool_mode=True,
),
]
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._agent_card_cache: dict[str, Any] = {}
self._sdk_available: bool | None = None
def _check_sdk_availability(self) -> bool:
"""Check if A2A SDK is available at runtime."""
if self._sdk_available is not None:
return self._sdk_available
try:
from a2a.client import A2AClient
from a2a.types import AgentCard, SendMessageRequest, MessageSendParams
import httpx
self._sdk_available = True
except ImportError:
self._sdk_available = False
return self._sdk_available
async def _get_tools(self):
"""Get tools with custom names."""
from langflow.base.tools.component_tool import ComponentToolkit
tool_name = getattr(self, "tool_name", None)
return ComponentToolkit(component=self).get_tools(
tool_name=tool_name if tool_name else None,
callbacks=self.get_langchain_callbacks(),
)
async def send_to_agent(self) -> Message:
"""Send message to remote A2A agent."""
if not self._check_sdk_availability():
return Message(
text="Error: A2A SDK not installed. Run: pip install a2a-sdk",
sender="System",
)
try:
if not self.agent_card_url:
raise ValueError("Agent Card URL is required")
if not self.message:
raise ValueError("Message is required")
# Fetch agent card
agent_card = await self._fetch_agent_card(self.agent_card_url)
if not agent_card:
raise ValueError(f"Failed to fetch agent card from {self.agent_card_url}")
# Create client and send message
client = await self._create_client(agent_card)
response_msg = await self._send_message(client)
return response_msg
except Exception as e:
self.log(f"Error communicating with A2A agent: {e}")
return Message(text=f"Error: {e}", sender="System")
async def _fetch_agent_card(self, url: str):
"""Fetch and parse an agent card."""
from a2a.types import AgentCard
import httpx
if url in self._agent_card_cache:
return self._agent_card_cache[url]
try:
async with httpx.AsyncClient(verify=False) as client:
response = await client.get(url, timeout=10.0, follow_redirects=True)
response.raise_for_status()
card_data = response.json()
agent_card = AgentCard(**card_data)
self._agent_card_cache[url] = agent_card
return agent_card
except Exception as e:
self.log(f"Error fetching agent card: {e}")
return None
async def _create_client(self, agent_card):
"""Create A2A client with proper timeout for reasoning models."""
from a2a.client import A2AClient
import httpx
headers = {}
if self.auth_token:
headers["Authorization"] = f"Bearer {self.auth_token}"
# Key fix for reasoning models: read=None disables inter-chunk timeout
# This allows models to "think" for 30-60+ seconds between chunks
timeout_config = httpx.Timeout(
connect=30.0,
read=None, # No timeout between streaming chunks
write=30.0,
pool=30.0,
)
httpx_client = httpx.AsyncClient(
timeout=timeout_config,
headers=headers if headers else None,
verify=False,
follow_redirects=True,
)
return A2AClient(httpx_client=httpx_client, agent_card=agent_card)
async def _send_message(self, client) -> Message:
"""Send message via A2A protocol."""
from a2a.types import SendMessageRequest, MessageSendParams, Task, JSONRPCErrorResponse
# Build message payload
message_text = self.message.text if hasattr(self.message, "text") else str(self.message)
message_payload = {
"message": {
"messageId": str(uuid.uuid4()),
"role": "user",
"parts": [{"kind": "text", "text": message_text}],
}
}
# Send request
request = SendMessageRequest(
id=str(uuid.uuid4()),
params=MessageSendParams(**message_payload),
)
response = await client.send_message(request)
# Check for errors
if isinstance(response.root, JSONRPCErrorResponse):
error = response.root.error
error_msg = getattr(error, "message", str(error))
return Message(text=f"Agent error: {error_msg}", sender="System")
# Extract response
result = response.root.result
response_text = self._extract_text(result)
return Message(text=response_text, sender="A2A Agent")
def _extract_text(self, result) -> str:
"""Extract text from A2A response."""
from a2a.types import Task
if isinstance(result, Task):
# Extract from task history
if result.history:
for msg in reversed(result.history):
if msg.role == "agent":
for part in msg.parts:
if part.root.kind == "text":
return part.root.text
return f"Task {result.status}: {result.taskId}"
else:
# Extract from message
text_parts = []
for part in result.parts:
if part.root.kind == "text":
text_parts.append(part.root.text)
return "\n".join(text_parts) if text_parts else "No response"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment