Created
January 15, 2026 12:52
-
-
Save raoulbia-ai/934f1fd83f09638dd2343ad152a96782 to your computer and use it in GitHub Desktop.
A2A Agent Connector Component for Langflow. Connects to remote agents using Google's A2A (Agent-to-Agent) Protocol. Drop this file into your Langflow custom components directory. License: MIT
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """Requirements: pip install a2a-sdk httpx""" | |
| from __future__ import annotations | |
| import uuid | |
| from typing import TYPE_CHECKING, Any | |
| from langflow.custom import Component | |
| from langflow.io import ( | |
| IntInput, | |
| MessageInput, | |
| Output, | |
| SecretStrInput, | |
| StrInput, | |
| ) | |
| from langflow.schema.message import Message | |
| if TYPE_CHECKING: | |
| from a2a.client import A2AClient | |
| from a2a.types import AgentCard, Task, Message as A2AMessage | |
| class A2AAgentConnector(Component): | |
| """A2A Agent Connector Component. | |
| Connects to remote agents using the A2A Protocol. | |
| Supports synchronous communication and exposes remote agents as tools. | |
| """ | |
| display_name = "A2A Agent Connector" | |
| description = "Connect to remote agents using the Agent2Agent (A2A) Protocol" | |
| icon = "network" | |
| name = "A2AAgentConnector" | |
| documentation = "https://github.com/google/a2a-protocol" | |
| inputs = [ | |
| StrInput( | |
| name="agent_card_url", | |
| display_name="Agent Card URL", | |
| info="URL to the remote agent's .well-known/agent.json endpoint", | |
| required=True, | |
| placeholder="http://localhost:7860/api/v1/a2a/{flow_id}/{component_id}/.well-known/agent.json", | |
| tool_mode=False, | |
| ), | |
| StrInput( | |
| name="tool_name", | |
| display_name="Tool Name", | |
| info="Unique name for this tool (e.g., 'planning_agent', 'research_agent')", | |
| required=False, | |
| advanced=False, | |
| tool_mode=False, | |
| ), | |
| MessageInput( | |
| name="message", | |
| display_name="Message", | |
| info="Message to send to the remote agent", | |
| required=True, | |
| tool_mode=True, | |
| ), | |
| IntInput( | |
| name="timeout", | |
| display_name="Timeout (seconds)", | |
| info="Request timeout. Use 300+ for reasoning models with long thinking pauses.", | |
| value=1200, | |
| advanced=True, | |
| ), | |
| SecretStrInput( | |
| name="auth_token", | |
| display_name="Authentication Token", | |
| info="Bearer token for authentication (if required)", | |
| required=False, | |
| advanced=True, | |
| ), | |
| ] | |
| outputs = [ | |
| Output( | |
| name="response", | |
| display_name="Agent Response", | |
| method="send_to_agent", | |
| tool_mode=True, | |
| ), | |
| ] | |
| def __init__(self, **kwargs): | |
| super().__init__(**kwargs) | |
| self._agent_card_cache: dict[str, Any] = {} | |
| self._sdk_available: bool | None = None | |
| def _check_sdk_availability(self) -> bool: | |
| """Check if A2A SDK is available at runtime.""" | |
| if self._sdk_available is not None: | |
| return self._sdk_available | |
| try: | |
| from a2a.client import A2AClient | |
| from a2a.types import AgentCard, SendMessageRequest, MessageSendParams | |
| import httpx | |
| self._sdk_available = True | |
| except ImportError: | |
| self._sdk_available = False | |
| return self._sdk_available | |
| async def _get_tools(self): | |
| """Get tools with custom names.""" | |
| from langflow.base.tools.component_tool import ComponentToolkit | |
| tool_name = getattr(self, "tool_name", None) | |
| return ComponentToolkit(component=self).get_tools( | |
| tool_name=tool_name if tool_name else None, | |
| callbacks=self.get_langchain_callbacks(), | |
| ) | |
| async def send_to_agent(self) -> Message: | |
| """Send message to remote A2A agent.""" | |
| if not self._check_sdk_availability(): | |
| return Message( | |
| text="Error: A2A SDK not installed. Run: pip install a2a-sdk", | |
| sender="System", | |
| ) | |
| try: | |
| if not self.agent_card_url: | |
| raise ValueError("Agent Card URL is required") | |
| if not self.message: | |
| raise ValueError("Message is required") | |
| # Fetch agent card | |
| agent_card = await self._fetch_agent_card(self.agent_card_url) | |
| if not agent_card: | |
| raise ValueError(f"Failed to fetch agent card from {self.agent_card_url}") | |
| # Create client and send message | |
| client = await self._create_client(agent_card) | |
| response_msg = await self._send_message(client) | |
| return response_msg | |
| except Exception as e: | |
| self.log(f"Error communicating with A2A agent: {e}") | |
| return Message(text=f"Error: {e}", sender="System") | |
| async def _fetch_agent_card(self, url: str): | |
| """Fetch and parse an agent card.""" | |
| from a2a.types import AgentCard | |
| import httpx | |
| if url in self._agent_card_cache: | |
| return self._agent_card_cache[url] | |
| try: | |
| async with httpx.AsyncClient(verify=False) as client: | |
| response = await client.get(url, timeout=10.0, follow_redirects=True) | |
| response.raise_for_status() | |
| card_data = response.json() | |
| agent_card = AgentCard(**card_data) | |
| self._agent_card_cache[url] = agent_card | |
| return agent_card | |
| except Exception as e: | |
| self.log(f"Error fetching agent card: {e}") | |
| return None | |
| async def _create_client(self, agent_card): | |
| """Create A2A client with proper timeout for reasoning models.""" | |
| from a2a.client import A2AClient | |
| import httpx | |
| headers = {} | |
| if self.auth_token: | |
| headers["Authorization"] = f"Bearer {self.auth_token}" | |
| # Key fix for reasoning models: read=None disables inter-chunk timeout | |
| # This allows models to "think" for 30-60+ seconds between chunks | |
| timeout_config = httpx.Timeout( | |
| connect=30.0, | |
| read=None, # No timeout between streaming chunks | |
| write=30.0, | |
| pool=30.0, | |
| ) | |
| httpx_client = httpx.AsyncClient( | |
| timeout=timeout_config, | |
| headers=headers if headers else None, | |
| verify=False, | |
| follow_redirects=True, | |
| ) | |
| return A2AClient(httpx_client=httpx_client, agent_card=agent_card) | |
| async def _send_message(self, client) -> Message: | |
| """Send message via A2A protocol.""" | |
| from a2a.types import SendMessageRequest, MessageSendParams, Task, JSONRPCErrorResponse | |
| # Build message payload | |
| message_text = self.message.text if hasattr(self.message, "text") else str(self.message) | |
| message_payload = { | |
| "message": { | |
| "messageId": str(uuid.uuid4()), | |
| "role": "user", | |
| "parts": [{"kind": "text", "text": message_text}], | |
| } | |
| } | |
| # Send request | |
| request = SendMessageRequest( | |
| id=str(uuid.uuid4()), | |
| params=MessageSendParams(**message_payload), | |
| ) | |
| response = await client.send_message(request) | |
| # Check for errors | |
| if isinstance(response.root, JSONRPCErrorResponse): | |
| error = response.root.error | |
| error_msg = getattr(error, "message", str(error)) | |
| return Message(text=f"Agent error: {error_msg}", sender="System") | |
| # Extract response | |
| result = response.root.result | |
| response_text = self._extract_text(result) | |
| return Message(text=response_text, sender="A2A Agent") | |
| def _extract_text(self, result) -> str: | |
| """Extract text from A2A response.""" | |
| from a2a.types import Task | |
| if isinstance(result, Task): | |
| # Extract from task history | |
| if result.history: | |
| for msg in reversed(result.history): | |
| if msg.role == "agent": | |
| for part in msg.parts: | |
| if part.root.kind == "text": | |
| return part.root.text | |
| return f"Task {result.status}: {result.taskId}" | |
| else: | |
| # Extract from message | |
| text_parts = [] | |
| for part in result.parts: | |
| if part.root.kind == "text": | |
| text_parts.append(part.root.text) | |
| return "\n".join(text_parts) if text_parts else "No response" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment