Last active
January 15, 2026 12:53
-
-
Save raoulbia-ai/285152d7c789143dfe6b7a848f9dd204 to your computer and use it in GitHub Desktop.
A2A Server Agent Component for Langflow. A tool-calling agent that exposes itself via Google's A2A (Agent-to-Agent) Protocol. Drop this file into your Langflow custom components directory. License: MIT
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """Requirements: pip install a2a-sdk""" | |
| from __future__ import annotations | |
| import uuid | |
| from typing import TYPE_CHECKING | |
| from langflow.components.langchain_utilities.tool_calling import ToolCallingAgentComponent | |
| from langflow.io import BoolInput, FloatInput, IntInput, Output, StrInput | |
| from langflow.schema.message import Message | |
| if TYPE_CHECKING: | |
| from a2a.types import AgentCard, AgentSkill | |
| class A2AServerAgentComponent(ToolCallingAgentComponent): | |
| """A2A Server Agent Component. | |
| A full Langflow agent that also exposes itself via the A2A Protocol. | |
| Inherits all agent functionality (tools, LLM, memory) and adds A2A endpoints. | |
| """ | |
| display_name = "A2A Server Agent" | |
| description = "A tool-calling agent exposed via the Agent2Agent (A2A) Protocol" | |
| icon = "network" | |
| name = "A2AServerAgent" | |
| documentation = "https://github.com/google/a2a-protocol" | |
| inputs = [ | |
| *ToolCallingAgentComponent.inputs, | |
| IntInput( | |
| name="max_iterations", | |
| display_name="Max Iterations", | |
| value=30, | |
| advanced=True, | |
| info="Maximum tool execution attempts. Increased for deep research workflows.", | |
| ), | |
| FloatInput( | |
| name="max_execution_time", | |
| display_name="Max Execution Time (seconds)", | |
| value=1800.0, | |
| advanced=True, | |
| info="Maximum execution time (default: 30 minutes for research workflows).", | |
| ), | |
| StrInput( | |
| name="a2a_agent_name", | |
| display_name="A2A Agent Name", | |
| info="Name for the A2A agent card", | |
| required=False, | |
| value="", | |
| advanced=True, | |
| ), | |
| StrInput( | |
| name="a2a_agent_description", | |
| display_name="A2A Agent Description", | |
| info="Description for the A2A agent card", | |
| required=False, | |
| value="", | |
| advanced=True, | |
| ), | |
| StrInput( | |
| name="a2a_base_url", | |
| display_name="A2A Base URL", | |
| info="Base URL for the agent (e.g., https://myhost:7860). Leave empty for auto-detection.", | |
| required=False, | |
| ), | |
| BoolInput( | |
| name="a2a_expose", | |
| display_name="Expose via A2A", | |
| info="Enable A2A protocol exposure for this agent", | |
| value=True, | |
| advanced=True, | |
| ), | |
| ] | |
| outputs = [ | |
| Output(name="response", display_name="Response", method="message_response"), | |
| Output(name="agent_url", display_name="A2A Agent URL", method="get_a2a_info", tool_mode=False), | |
| Output(name="agent_card_url", display_name="A2A Agent Card URL", method="get_a2a_info", tool_mode=False), | |
| ] | |
| def __init__(self, **kwargs) -> None: | |
| super().__init__(**kwargs) | |
| self._registered = False | |
| self._component_id: str | None = None | |
| self._agent_url: str | None = None | |
| self._agent_card_url: str | None = None | |
| async def message_response(self) -> Message: | |
| """Run the agent and expose via A2A if enabled.""" | |
| # Check A2A SDK availability | |
| try: | |
| from a2a.types import AgentCard | |
| a2a_available = True | |
| except ImportError: | |
| a2a_available = False | |
| # Register with A2A service if enabled and not already registered | |
| if self.a2a_expose and not self._registered and a2a_available: | |
| await self._register_a2a_endpoint() | |
| # Execute agent normally | |
| response = await super().message_response() | |
| # Append A2A registration info to the response | |
| if self.a2a_expose and self._registered and self._agent_card_url: | |
| url_info = f"\n\n🌐 A2A Agent Card URL:\n{self._agent_card_url}" | |
| response.text = (response.text or "") + url_info | |
| return response | |
| async def get_a2a_info(self) -> tuple[str, str]: | |
| """Get A2A endpoint information.""" | |
| if not self.a2a_expose: | |
| return "A2A not enabled", "A2A not enabled" | |
| try: | |
| from a2a.types import AgentCard | |
| if not self._registered: | |
| await self._register_a2a_endpoint() | |
| except ImportError: | |
| return "A2A SDK not installed", "A2A SDK not installed" | |
| return self._agent_url or "Not registered", self._agent_card_url or "Not registered" | |
| async def _register_a2a_endpoint(self) -> None: | |
| """Register this agent with the A2A service.""" | |
| try: | |
| from langflow.services.deps import get_a2a_service | |
| a2a_service = get_a2a_service() | |
| # Get flow ID | |
| flow_id = "default" | |
| if hasattr(self, "_vertex") and self._vertex and hasattr(self._vertex, "graph"): | |
| flow_id = self._vertex.graph.flow_id or flow_id | |
| # Generate stable component ID from flow_id | |
| if not self._component_id: | |
| self._component_id = flow_id if flow_id != "default" else f"agent_{uuid.uuid4().hex[:8]}" | |
| # Generate agent card | |
| agent_card = self._generate_agent_card(flow_id) | |
| # Create executor | |
| executor = self._create_a2a_executor() | |
| # Register with service | |
| a2a_service.register_agent( | |
| component_id=self._component_id, | |
| flow_id=flow_id, | |
| agent_card=agent_card, | |
| executor_callable=executor, | |
| ) | |
| # Build URLs | |
| base_url = self.a2a_base_url or "http://localhost:7860" | |
| self._agent_url = f"{base_url}/api/v1/a2a/{flow_id}/{self._component_id}/rpc" | |
| self._agent_card_url = f"{base_url}/api/v1/a2a/{flow_id}/{self._component_id}/.well-known/agent.json" | |
| self._registered = True | |
| self.log(f"Registered A2A agent at: {self._agent_url}") | |
| except Exception as e: | |
| self.log(f"Failed to register A2A endpoint: {e}") | |
| self._registered = False | |
| def _generate_agent_card(self, flow_id: str) -> "AgentCard": | |
| """Generate an A2A agent card from this agent's configuration.""" | |
| from langflow.services.deps import get_a2a_service | |
| from a2a.types import AgentSkill | |
| a2a_service = get_a2a_service() | |
| agent_name = self.a2a_agent_name or f"Agent {self._component_id}" | |
| agent_description = self.a2a_agent_description or ( | |
| getattr(self, "system_prompt", "A Langflow agent")[:200] | |
| ) | |
| # Generate skills from tools | |
| skills = [] | |
| if hasattr(self, "tools") and self.tools: | |
| for tool in self.tools[:5]: | |
| try: | |
| skill = AgentSkill( | |
| id=f"tool_{getattr(tool, 'name', 'unknown')}", | |
| name=getattr(tool, "name", "unknown"), | |
| description=getattr(tool, "description", "No description")[:200], | |
| tags=["tool"], | |
| examples=[], | |
| ) | |
| skills.append(skill) | |
| except Exception: | |
| pass | |
| base_url = self.a2a_base_url or "http://localhost:7860" | |
| agent_url = f"{base_url}/api/v1/a2a/{flow_id}/{self._component_id or 'unknown'}/rpc" | |
| return a2a_service.generate_agent_card( | |
| agent_name=agent_name, | |
| description=agent_description, | |
| component_id=self._component_id or "unknown", | |
| flow_id=flow_id, | |
| skills=skills, | |
| base_url=agent_url, | |
| ) | |
| def _create_a2a_executor(self): | |
| """Create an executor callable that runs this agent for A2A requests.""" | |
| async def executor(message_payload: dict, stream: bool = False): | |
| try: | |
| # Extract text from A2A message parts | |
| message_parts = message_payload.get("parts", []) | |
| text_content = "" | |
| for part in message_parts: | |
| if part.get("kind") == "text": | |
| text_content += part.get("text", "") | |
| if not text_content: | |
| text_content = "No message provided" | |
| # Store original input | |
| original_input = self.input_value | |
| # Set input and execute | |
| from langflow.schema.message import Message as LangflowMessage | |
| self.input_value = LangflowMessage(text=text_content, sender="user") | |
| response_message = await super(A2AServerAgentComponent, self).message_response() | |
| # Restore original input | |
| self.input_value = original_input | |
| # Return A2A-formatted response | |
| response_text = response_message.text if hasattr(response_message, "text") else str(response_message) | |
| return { | |
| "messageId": str(uuid.uuid4()), | |
| "role": "agent", | |
| "parts": [{"kind": "text", "text": response_text}], | |
| "kind": "message", | |
| } | |
| except Exception as e: | |
| return { | |
| "role": "agent", | |
| "parts": [{"kind": "text", "text": f"Error: {e!s}"}], | |
| } | |
| return executor |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment