Skip to content

Instantly share code, notes, and snippets.

@raoulbia-ai
Last active January 15, 2026 12:53
Show Gist options
  • Select an option

  • Save raoulbia-ai/285152d7c789143dfe6b7a848f9dd204 to your computer and use it in GitHub Desktop.

Select an option

Save raoulbia-ai/285152d7c789143dfe6b7a848f9dd204 to your computer and use it in GitHub Desktop.
A2A Server Agent Component for Langflow. A tool-calling agent that exposes itself via Google's A2A (Agent-to-Agent) Protocol. Drop this file into your Langflow custom components directory. License: MIT
"""Requirements: pip install a2a-sdk"""
from __future__ import annotations
import uuid
from typing import TYPE_CHECKING
from langflow.components.langchain_utilities.tool_calling import ToolCallingAgentComponent
from langflow.io import BoolInput, FloatInput, IntInput, Output, StrInput
from langflow.schema.message import Message
if TYPE_CHECKING:
from a2a.types import AgentCard, AgentSkill
class A2AServerAgentComponent(ToolCallingAgentComponent):
"""A2A Server Agent Component.
A full Langflow agent that also exposes itself via the A2A Protocol.
Inherits all agent functionality (tools, LLM, memory) and adds A2A endpoints.
"""
display_name = "A2A Server Agent"
description = "A tool-calling agent exposed via the Agent2Agent (A2A) Protocol"
icon = "network"
name = "A2AServerAgent"
documentation = "https://github.com/google/a2a-protocol"
inputs = [
*ToolCallingAgentComponent.inputs,
IntInput(
name="max_iterations",
display_name="Max Iterations",
value=30,
advanced=True,
info="Maximum tool execution attempts. Increased for deep research workflows.",
),
FloatInput(
name="max_execution_time",
display_name="Max Execution Time (seconds)",
value=1800.0,
advanced=True,
info="Maximum execution time (default: 30 minutes for research workflows).",
),
StrInput(
name="a2a_agent_name",
display_name="A2A Agent Name",
info="Name for the A2A agent card",
required=False,
value="",
advanced=True,
),
StrInput(
name="a2a_agent_description",
display_name="A2A Agent Description",
info="Description for the A2A agent card",
required=False,
value="",
advanced=True,
),
StrInput(
name="a2a_base_url",
display_name="A2A Base URL",
info="Base URL for the agent (e.g., https://myhost:7860). Leave empty for auto-detection.",
required=False,
),
BoolInput(
name="a2a_expose",
display_name="Expose via A2A",
info="Enable A2A protocol exposure for this agent",
value=True,
advanced=True,
),
]
outputs = [
Output(name="response", display_name="Response", method="message_response"),
Output(name="agent_url", display_name="A2A Agent URL", method="get_a2a_info", tool_mode=False),
Output(name="agent_card_url", display_name="A2A Agent Card URL", method="get_a2a_info", tool_mode=False),
]
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._registered = False
self._component_id: str | None = None
self._agent_url: str | None = None
self._agent_card_url: str | None = None
async def message_response(self) -> Message:
"""Run the agent and expose via A2A if enabled."""
# Check A2A SDK availability
try:
from a2a.types import AgentCard
a2a_available = True
except ImportError:
a2a_available = False
# Register with A2A service if enabled and not already registered
if self.a2a_expose and not self._registered and a2a_available:
await self._register_a2a_endpoint()
# Execute agent normally
response = await super().message_response()
# Append A2A registration info to the response
if self.a2a_expose and self._registered and self._agent_card_url:
url_info = f"\n\n🌐 A2A Agent Card URL:\n{self._agent_card_url}"
response.text = (response.text or "") + url_info
return response
async def get_a2a_info(self) -> tuple[str, str]:
"""Get A2A endpoint information."""
if not self.a2a_expose:
return "A2A not enabled", "A2A not enabled"
try:
from a2a.types import AgentCard
if not self._registered:
await self._register_a2a_endpoint()
except ImportError:
return "A2A SDK not installed", "A2A SDK not installed"
return self._agent_url or "Not registered", self._agent_card_url or "Not registered"
async def _register_a2a_endpoint(self) -> None:
"""Register this agent with the A2A service."""
try:
from langflow.services.deps import get_a2a_service
a2a_service = get_a2a_service()
# Get flow ID
flow_id = "default"
if hasattr(self, "_vertex") and self._vertex and hasattr(self._vertex, "graph"):
flow_id = self._vertex.graph.flow_id or flow_id
# Generate stable component ID from flow_id
if not self._component_id:
self._component_id = flow_id if flow_id != "default" else f"agent_{uuid.uuid4().hex[:8]}"
# Generate agent card
agent_card = self._generate_agent_card(flow_id)
# Create executor
executor = self._create_a2a_executor()
# Register with service
a2a_service.register_agent(
component_id=self._component_id,
flow_id=flow_id,
agent_card=agent_card,
executor_callable=executor,
)
# Build URLs
base_url = self.a2a_base_url or "http://localhost:7860"
self._agent_url = f"{base_url}/api/v1/a2a/{flow_id}/{self._component_id}/rpc"
self._agent_card_url = f"{base_url}/api/v1/a2a/{flow_id}/{self._component_id}/.well-known/agent.json"
self._registered = True
self.log(f"Registered A2A agent at: {self._agent_url}")
except Exception as e:
self.log(f"Failed to register A2A endpoint: {e}")
self._registered = False
def _generate_agent_card(self, flow_id: str) -> "AgentCard":
"""Generate an A2A agent card from this agent's configuration."""
from langflow.services.deps import get_a2a_service
from a2a.types import AgentSkill
a2a_service = get_a2a_service()
agent_name = self.a2a_agent_name or f"Agent {self._component_id}"
agent_description = self.a2a_agent_description or (
getattr(self, "system_prompt", "A Langflow agent")[:200]
)
# Generate skills from tools
skills = []
if hasattr(self, "tools") and self.tools:
for tool in self.tools[:5]:
try:
skill = AgentSkill(
id=f"tool_{getattr(tool, 'name', 'unknown')}",
name=getattr(tool, "name", "unknown"),
description=getattr(tool, "description", "No description")[:200],
tags=["tool"],
examples=[],
)
skills.append(skill)
except Exception:
pass
base_url = self.a2a_base_url or "http://localhost:7860"
agent_url = f"{base_url}/api/v1/a2a/{flow_id}/{self._component_id or 'unknown'}/rpc"
return a2a_service.generate_agent_card(
agent_name=agent_name,
description=agent_description,
component_id=self._component_id or "unknown",
flow_id=flow_id,
skills=skills,
base_url=agent_url,
)
def _create_a2a_executor(self):
"""Create an executor callable that runs this agent for A2A requests."""
async def executor(message_payload: dict, stream: bool = False):
try:
# Extract text from A2A message parts
message_parts = message_payload.get("parts", [])
text_content = ""
for part in message_parts:
if part.get("kind") == "text":
text_content += part.get("text", "")
if not text_content:
text_content = "No message provided"
# Store original input
original_input = self.input_value
# Set input and execute
from langflow.schema.message import Message as LangflowMessage
self.input_value = LangflowMessage(text=text_content, sender="user")
response_message = await super(A2AServerAgentComponent, self).message_response()
# Restore original input
self.input_value = original_input
# Return A2A-formatted response
response_text = response_message.text if hasattr(response_message, "text") else str(response_message)
return {
"messageId": str(uuid.uuid4()),
"role": "agent",
"parts": [{"kind": "text", "text": response_text}],
"kind": "message",
}
except Exception as e:
return {
"role": "agent",
"parts": [{"kind": "text", "text": f"Error: {e!s}"}],
}
return executor
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment