Skip to content

Instantly share code, notes, and snippets.

@teone
Created August 15, 2025 22:48
Show Gist options
  • Select an option

  • Save teone/02b626d2faf5b1d2010888d857497eaa to your computer and use it in GitHub Desktop.

Select an option

Save teone/02b626d2faf5b1d2010888d857497eaa to your computer and use it in GitHub Desktop.
Langchain MCP server integration
# Exposing LangGraph Agents over MCP
# This example shows how to wrap a LangGraph agent as an MCP server
import asyncio
import json
from typing import Dict, Any, List, Optional
from mcp import Server, types
from mcp.server import stdio_server
from pydantic import BaseModel
# LangGraph imports
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.messages import HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
# =============================================================================
# 1. AGENT IMPLEMENTATION (Your existing LangGraph agent)
# =============================================================================
class AgentState(BaseModel):
messages: List[Dict[str, Any]] = []
task: str = ""
result: str = ""
context: Dict[str, Any] = {}
class DataAnalysisAgent:
"""A specialized agent for data analysis tasks"""
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4", temperature=0)
self.name = "data_analysis_agent"
async def analyze_data(self, state: AgentState) -> AgentState:
"""Analyze data based on the task"""
# Simple data analysis logic
task = state.task
context = state.context
# Create analysis prompt
prompt = f"""
You are a data analysis expert.
Task: {task}
Context: {json.dumps(context, indent=2)}
Provide a clear, actionable analysis.
"""
# Get LLM response
response = await self.llm.ainvoke([HumanMessage(content=prompt)])
# Update state
state.result = response.content
state.messages.append({
"role": "assistant",
"content": response.content,
"agent": self.name
})
return state
def create_graph(self):
"""Create the LangGraph workflow"""
workflow = StateGraph(AgentState)
# Add the analysis node
workflow.add_node("analyze", self.analyze_data)
# Set up the flow
workflow.set_entry_point("analyze")
workflow.add_edge("analyze", END)
# Compile with memory
memory = MemorySaver()
return workflow.compile(checkpointer=memory)
# =============================================================================
# 2. MCP SERVER WRAPPER
# =============================================================================
class AgentMCPServer:
"""MCP Server that wraps a LangGraph agent"""
def __init__(self, agent_name: str):
self.agent_name = agent_name
self.server = Server(f"{agent_name}-mcp-server")
# Initialize the agent
self.agent = DataAnalysisAgent()
self.graph = self.agent.create_graph()
# Store active sessions
self.sessions: Dict[str, Dict] = {}
# Register MCP handlers
self._register_handlers()
def _register_handlers(self):
"""Register MCP server handlers"""
@self.server.list_tools()
async def list_tools() -> List[types.Tool]:
"""List available agent capabilities as tools"""
return [
types.Tool(
name="analyze_data",
description="Analyze data using the specialized data analysis agent",
inputSchema={
"type": "object",
"properties": {
"task": {
"type": "string",
"description": "The analysis task to perform"
},
"data": {
"type": "object",
"description": "Data to analyze (JSON object)"
},
"session_id": {
"type": "string",
"description": "Session ID for conversation continuity",
"default": "default"
}
},
"required": ["task"]
}
),
types.Tool(
name="get_session_history",
description="Get conversation history for a session",
inputSchema={
"type": "object",
"properties": {
"session_id": {
"type": "string",
"description": "Session ID to retrieve"
}
},
"required": ["session_id"]
}
),
types.Tool(
name="clear_session",
description="Clear a session's conversation history",
inputSchema={
"type": "object",
"properties": {
"session_id": {
"type": "string",
"description": "Session ID to clear"
}
},
"required": ["session_id"]
}
)
]
@self.server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> List[types.TextContent]:
"""Handle tool calls by routing to agent methods"""
try:
if name == "analyze_data":
return await self._handle_analyze_data(arguments)
elif name == "get_session_history":
return await self._handle_get_history(arguments)
elif name == "clear_session":
return await self._handle_clear_session(arguments)
else:
return [types.TextContent(
type="text",
text=f"Unknown tool: {name}"
)]
except Exception as e:
return [types.TextContent(
type="text",
text=f"Error executing {name}: {str(e)}"
)]
async def _handle_analyze_data(self, arguments: Dict[str, Any]) -> List[types.TextContent]:
"""Handle data analysis requests"""
task = arguments.get("task", "")
data = arguments.get("data", {})
session_id = arguments.get("session_id", "default")
# Create initial state
initial_state = AgentState(
task=task,
context={"data": data, "session_id": session_id}
)
# Run the agent
config = {"configurable": {"thread_id": session_id}}
result = await self.graph.ainvoke(initial_state, config)
# Store session info
self.sessions[session_id] = {
"last_task": task,
"last_result": result.result,
"message_count": len(result.messages)
}
# Return result
return [types.TextContent(
type="text",
text=json.dumps({
"result": result.result,
"session_id": session_id,
"message_count": len(result.messages)
}, indent=2)
)]
async def _handle_get_history(self, arguments: Dict[str, Any]) -> List[types.TextContent]:
"""Get session history"""
session_id = arguments.get("session_id", "default")
if session_id in self.sessions:
return [types.TextContent(
type="text",
text=json.dumps(self.sessions[session_id], indent=2)
)]
else:
return [types.TextContent(
type="text",
text=f"No session found: {session_id}"
)]
async def _handle_clear_session(self, arguments: Dict[str, Any]) -> List[types.TextContent]:
"""Clear session history"""
session_id = arguments.get("session_id", "default")
if session_id in self.sessions:
del self.sessions[session_id]
return [types.TextContent(
type="text",
text=f"Cleared session: {session_id}"
)]
else:
return [types.TextContent(
type="text",
text=f"Session not found: {session_id}"
)]
async def run(self):
"""Run the MCP server"""
async with stdio_server() as streams:
await self.server.run(streams[0], streams[1])
# =============================================================================
# 3. MULTI-AGENT COMPOSITION EXAMPLE
# =============================================================================
class ResearchAgent:
"""Another agent that can call the data analysis agent via MCP"""
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4", temperature=0)
self.name = "research_agent"
async def conduct_research(self, state: AgentState) -> AgentState:
"""Conduct research by delegating analysis to the data analysis agent"""
# This would typically use an MCP client to call the data analysis agent
# For demonstration, we'll simulate the call
task = state.task
# Simulate calling the data analysis agent via MCP
analysis_request = {
"task": f"Analyze the following research question: {task}",
"data": {"research_topic": task, "depth": "comprehensive"}
}
# In a real implementation, this would be:
# result = await mcp_client.call_tool("data_analysis_server", "analyze_data", analysis_request)
# For now, simulate a response
simulated_analysis = f"Research analysis for: {task}"
state.result = f"Research completed. Analysis: {simulated_analysis}"
state.messages.append({
"role": "assistant",
"content": state.result,
"agent": self.name
})
return state
# =============================================================================
# 4. ADVANCED AGENT MCP SERVER WITH STREAMING
# =============================================================================
class StreamingAgentMCPServer(AgentMCPServer):
"""Enhanced MCP server with streaming capabilities"""
def _register_handlers(self):
"""Register enhanced MCP handlers with streaming support"""
# Call parent registration first
super()._register_handlers()
@self.server.list_tools()
async def list_tools() -> List[types.Tool]:
"""Enhanced tool list with streaming support"""
base_tools = await super(StreamingAgentMCPServer, self).server._tool_handlers["list_tools"]()
# Add streaming tool
streaming_tool = types.Tool(
name="analyze_data_stream",
description="Analyze data with real-time streaming updates",
inputSchema={
"type": "object",
"properties": {
"task": {"type": "string", "description": "Analysis task"},
"data": {"type": "object", "description": "Data to analyze"},
"session_id": {"type": "string", "default": "default"},
"stream": {"type": "boolean", "default": True, "description": "Enable streaming"}
},
"required": ["task"]
}
)
return base_tools + [streaming_tool]
@self.server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> List[types.TextContent]:
"""Enhanced tool calling with streaming"""
if name == "analyze_data_stream":
return await self._handle_streaming_analysis(arguments)
else:
# Delegate to parent implementation
return await super(StreamingAgentMCPServer, self)._handle_tool_call(name, arguments)
async def _handle_streaming_analysis(self, arguments: Dict[str, Any]) -> List[types.TextContent]:
"""Handle streaming data analysis"""
task = arguments.get("task", "")
data = arguments.get("data", {})
session_id = arguments.get("session_id", "default")
# Simulate streaming by breaking analysis into steps
steps = [
"Initializing analysis...",
"Processing data...",
"Applying analysis algorithms...",
"Generating insights...",
"Finalizing results..."
]
results = []
for i, step in enumerate(steps):
# In a real implementation, you'd yield partial results here
progress = {
"step": i + 1,
"total_steps": len(steps),
"status": step,
"progress": (i + 1) / len(steps) * 100
}
results.append(types.TextContent(
type="text",
text=json.dumps(progress, indent=2)
))
# Add final result
final_result = {
"status": "completed",
"result": f"Analysis completed for: {task}",
"session_id": session_id,
"data_processed": len(str(data))
}
results.append(types.TextContent(
type="text",
text=json.dumps(final_result, indent=2)
))
return results
# =============================================================================
# 5. CLIENT EXAMPLE - CALLING AGENTS VIA MCP
# =============================================================================
import subprocess
from mcp import ClientSession
from mcp.client.stdio import stdio_client
class AgentMCPClient:
"""Client for calling agents exposed via MCP"""
def __init__(self):
self.sessions = {}
self.processes = {}
async def connect_to_agent(self, agent_name: str, server_command: List[str]):
"""Connect to an agent MCP server"""
try:
# Start the agent MCP server
process = subprocess.Popen(
server_command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
self.processes[agent_name] = process
# Create client session
session = await stdio_client(process.stdin, process.stdout)
await session.initialize()
self.sessions[agent_name] = session
return True
except Exception as e:
print(f"Failed to connect to {agent_name}: {e}")
return False
async def call_agent(self, agent_name: str, task: str, data: Dict = None, session_id: str = "default"):
"""Call an agent to perform a task"""
if agent_name not in self.sessions:
raise ValueError(f"Agent {agent_name} not connected")
session = self.sessions[agent_name]
# Call the analyze_data tool
result = await session.call_tool("analyze_data", {
"task": task,
"data": data or {},
"session_id": session_id
})
return json.loads(result.content[0].text)
async def close_all(self):
"""Close all agent connections"""
for session in self.sessions.values():
await session.close()
for process in self.processes.values():
process.terminate()
# =============================================================================
# 6. USAGE EXAMPLES
# =============================================================================
async def run_agent_server():
"""Run the agent as an MCP server"""
server = AgentMCPServer("data_analysis")
await server.run()
async def use_agent_client():
"""Example of using the agent via MCP client"""
client = AgentMCPClient()
# Connect to the agent server
await client.connect_to_agent(
"data_analysis",
["python", "-c", "from agent_mcp import AgentMCPServer; import asyncio; asyncio.run(AgentMCPServer('data_analysis').run())"]
)
# Call the agent
result = await client.call_agent(
"data_analysis",
"Analyze sales trends in Q4 data",
{"sales": [100, 120, 110, 140]},
"session_1"
)
print("Agent Result:", result)
await client.close_all()
async def multi_agent_orchestration():
"""Example of multiple agents calling each other via MCP"""
client = AgentMCPClient()
# Connect to multiple agent servers
await client.connect_to_agent("data_analysis", ["python", "data_analysis_server.py"])
await client.connect_to_agent("research", ["python", "research_server.py"])
# Research agent calls data analysis agent
research_result = await client.call_agent(
"research",
"Research market trends for electric vehicles",
session_id="research_session_1"
)
# Data analysis agent processes the research data
analysis_result = await client.call_agent(
"data_analysis",
"Analyze the EV market research data",
research_result.get("data", {}),
"analysis_session_1"
)
print("Final Analysis:", analysis_result)
await client.close_all()
# =============================================================================
# 7. MAIN EXECUTION
# =============================================================================
if __name__ == "__main__":
import sys
if len(sys.argv) > 1 and sys.argv[1] == "server":
# Run as MCP server
print("Starting Data Analysis Agent MCP Server...")
asyncio.run(run_agent_server())
else:
# Run client example
print("Running MCP Client Example...")
asyncio.run(use_agent_client())

LangGraph + MCP Multi-Agent System Tutorial

This tutorial demonstrates how to build a multi-agent system using LangGraph with MCP (Model Context Protocol) servers for enhanced tool capabilities.

Overview

MCP (Model Context Protocol) allows AI assistants to securely connect to external data sources and tools. By integrating MCP servers with LangGraph, you can create powerful multi-agent systems with access to diverse capabilities.

Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   LangGraph     β”‚    β”‚   MCP Client     β”‚    β”‚  MCP Servers    β”‚
β”‚   Multi-Agent   │◄──►│   Integration    │◄──►│  (File, DB,     β”‚
β”‚   System        β”‚    β”‚                  β”‚    β”‚   Web, etc.)    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Prerequisites

# Install required packages
pip install langgraph langchain-core langchain-openai
pip install mcp  # MCP Python SDK
pip install httpx  # For HTTP MCP servers

Step 1: Setting Up MCP Servers

First, let's set up multiple MCP servers for different capabilities:

File System MCP Server

# mcp_servers/filesystem_server.py
import asyncio
from mcp import Server, types
from mcp.server import stdio_server
import os
import json

# Create MCP server
server = Server("filesystem-server")

@server.list_tools()
async def list_tools() -> list[types.Tool]:
    return [
        types.Tool(
            name="read_file",
            description="Read contents of a file",
            inputSchema={
                "type": "object",
                "properties": {
                    "path": {"type": "string", "description": "File path to read"}
                },
                "required": ["path"]
            }
        ),
        types.Tool(
            name="write_file", 
            description="Write content to a file",
            inputSchema={
                "type": "object",
                "properties": {
                    "path": {"type": "string", "description": "File path to write"},
                    "content": {"type": "string", "description": "Content to write"}
                },
                "required": ["path", "content"]
            }
        ),
        types.Tool(
            name="list_directory",
            description="List files in a directory", 
            inputSchema={
                "type": "object",
                "properties": {
                    "path": {"type": "string", "description": "Directory path"}
                },
                "required": ["path"]
            }
        )
    ]

@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[types.TextContent]:
    if name == "read_file":
        try:
            with open(arguments["path"], "r") as f:
                content = f.read()
            return [types.TextContent(type="text", text=content)]
        except Exception as e:
            return [types.TextContent(type="text", text=f"Error reading file: {str(e)}")]
    
    elif name == "write_file":
        try:
            with open(arguments["path"], "w") as f:
                f.write(arguments["content"])
            return [types.TextContent(type="text", text=f"Successfully wrote to {arguments['path']}")]
        except Exception as e:
            return [types.TextContent(type="text", text=f"Error writing file: {str(e)}")]
    
    elif name == "list_directory":
        try:
            files = os.listdir(arguments["path"])
            return [types.TextContent(type="text", text=json.dumps(files, indent=2))]
        except Exception as e:
            return [types.TextContent(type="text", text=f"Error listing directory: {str(e)}")]

# Run server
async def main():
    async with stdio_server() as streams:
        await server.run(streams[0], streams[1])

if __name__ == "__main__":
    asyncio.run(main())

Database MCP Server

# mcp_servers/database_server.py
import asyncio
import sqlite3
import json
from mcp import Server, types
from mcp.server import stdio_server

server = Server("database-server")

# Initialize SQLite database
def init_db():
    conn = sqlite3.connect("example.db")
    cursor = conn.cursor()
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY,
            name TEXT NOT NULL,
            email TEXT UNIQUE
        )
    """)
    conn.commit()
    conn.close()

init_db()

@server.list_tools()
async def list_tools() -> list[types.Tool]:
    return [
        types.Tool(
            name="query_database",
            description="Execute SQL query on database",
            inputSchema={
                "type": "object",
                "properties": {
                    "query": {"type": "string", "description": "SQL query to execute"}
                },
                "required": ["query"]
            }
        ),
        types.Tool(
            name="insert_user",
            description="Insert a new user",
            inputSchema={
                "type": "object",
                "properties": {
                    "name": {"type": "string"},
                    "email": {"type": "string"}
                },
                "required": ["name", "email"]
            }
        )
    ]

@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[types.TextContent]:
    conn = sqlite3.connect("example.db")
    cursor = conn.cursor()
    
    try:
        if name == "query_database":
            cursor.execute(arguments["query"])
            results = cursor.fetchall()
            return [types.TextContent(type="text", text=json.dumps(results, indent=2))]
        
        elif name == "insert_user":
            cursor.execute("INSERT INTO users (name, email) VALUES (?, ?)", 
                         (arguments["name"], arguments["email"]))
            conn.commit()
            return [types.TextContent(type="text", text="User inserted successfully")]
    
    except Exception as e:
        return [types.TextContent(type="text", text=f"Database error: {str(e)}")]
    finally:
        conn.close()

async def main():
    async with stdio_server() as streams:
        await server.run(streams[0], streams[1])

if __name__ == "__main__":
    asyncio.run(main())

Step 2: MCP Client Integration

Create an MCP client to connect to your servers:

# mcp_client.py
import asyncio
import subprocess
from typing import Dict, Any, List
from mcp import ClientSession
from mcp.client.stdio import stdio_client

class MCPClient:
    def __init__(self):
        self.sessions: Dict[str, ClientSession] = {}
        self.processes: Dict[str, subprocess.Popen] = {}
    
    async def connect_server(self, server_name: str, command: List[str]):
        """Connect to an MCP server"""
        try:
            # Start the MCP server process
            process = subprocess.Popen(
                command,
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True
            )
            self.processes[server_name] = process
            
            # Create client session
            session = await stdio_client(process.stdin, process.stdout)
            self.sessions[server_name] = session
            
            # Initialize the session
            await session.initialize()
            
            print(f"Connected to {server_name}")
            return True
        except Exception as e:
            print(f"Failed to connect to {server_name}: {e}")
            return False
    
    async def get_available_tools(self, server_name: str = None):
        """Get all available tools from connected servers"""
        all_tools = {}
        
        servers = [server_name] if server_name else self.sessions.keys()
        
        for name in servers:
            if name in self.sessions:
                try:
                    tools = await self.sessions[name].list_tools()
                    all_tools[name] = tools.tools
                except Exception as e:
                    print(f"Error getting tools from {name}: {e}")
        
        return all_tools
    
    async def call_tool(self, server_name: str, tool_name: str, arguments: Dict[str, Any]):
        """Call a specific tool on a specific server"""
        if server_name not in self.sessions:
            raise ValueError(f"Server {server_name} not connected")
        
        try:
            result = await self.sessions[server_name].call_tool(tool_name, arguments)
            return result.content
        except Exception as e:
            raise Exception(f"Tool call failed: {e}")
    
    async def close_all(self):
        """Close all connections"""
        for session in self.sessions.values():
            try:
                await session.close()
            except:
                pass
        
        for process in self.processes.values():
            try:
                process.terminate()
                process.wait(timeout=5)
            except:
                process.kill()

# Global MCP client instance
mcp_client = MCPClient()

Step 3: LangGraph Multi-Agent System

Now let's create a multi-agent system that uses MCP servers:

# multi_agent_system.py
import asyncio
from typing import Dict, Any, List
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from pydantic import BaseModel
import json

from mcp_client import mcp_client

# Define the state
class AgentState(BaseModel):
    messages: List[Dict[str, Any]] = []
    current_task: str = ""
    available_tools: Dict[str, Any] = {}
    agent_responses: Dict[str, str] = {}
    final_response: str = ""

# Initialize LLM
llm = ChatOpenAI(model="gpt-4", temperature=0)

class FileAgent:
    """Agent specialized in file operations"""
    
    def __init__(self):
        self.name = "file_agent"
        self.server_name = "filesystem"
    
    async def process(self, state: AgentState) -> AgentState:
        """Process file-related tasks"""
        task = state.current_task.lower()
        
        if any(keyword in task for keyword in ["file", "read", "write", "directory", "folder"]):
            try:
                # Get file tools
                tools = await mcp_client.get_available_tools(self.server_name)
                
                # Create a prompt for the LLM to decide which tool to use
                prompt = f"""
                You are a file management agent. Your task is: {state.current_task}
                
                Available tools: {json.dumps([tool.name for tool in tools.get(self.server_name, [])], indent=2)}
                
                Based on the task, determine which tool to use and what arguments to provide.
                Respond with a JSON object containing:
                - "tool": the tool name to use
                - "arguments": a dictionary of arguments for the tool
                
                If the task doesn't require file operations, respond with {"tool": null}
                """
                
                response = await llm.ainvoke([SystemMessage(content=prompt)])
                
                try:
                    decision = json.loads(response.content)
                    
                    if decision.get("tool"):
                        result = await mcp_client.call_tool(
                            self.server_name, 
                            decision["tool"], 
                            decision["arguments"]
                        )
                        
                        state.agent_responses[self.name] = f"File operation completed: {result[0].text if result else 'No result'}"
                    else:
                        state.agent_responses[self.name] = "No file operations needed"
                        
                except json.JSONDecodeError:
                    state.agent_responses[self.name] = "Error parsing LLM response"
                    
            except Exception as e:
                state.agent_responses[self.name] = f"Error in file agent: {str(e)}"
        
        return state

class DatabaseAgent:
    """Agent specialized in database operations"""
    
    def __init__(self):
        self.name = "database_agent" 
        self.server_name = "database"
    
    async def process(self, state: AgentState) -> AgentState:
        """Process database-related tasks"""
        task = state.current_task.lower()
        
        if any(keyword in task for keyword in ["database", "db", "query", "sql", "user", "insert", "select"]):
            try:
                tools = await mcp_client.get_available_tools(self.server_name)
                
                prompt = f"""
                You are a database management agent. Your task is: {state.current_task}
                
                Available tools: {json.dumps([tool.name for tool in tools.get(self.server_name, [])], indent=2)}
                
                Based on the task, determine which tool to use and what arguments to provide.
                Respond with a JSON object containing:
                - "tool": the tool name to use  
                - "arguments": a dictionary of arguments for the tool
                
                If the task doesn't require database operations, respond with {"tool": null}
                """
                
                response = await llm.ainvoke([SystemMessage(content=prompt)])
                
                try:
                    decision = json.loads(response.content)
                    
                    if decision.get("tool"):
                        result = await mcp_client.call_tool(
                            self.server_name,
                            decision["tool"],
                            decision["arguments"]
                        )
                        
                        state.agent_responses[self.name] = f"Database operation completed: {result[0].text if result else 'No result'}"
                    else:
                        state.agent_responses[self.name] = "No database operations needed"
                        
                except json.JSONDecodeError:
                    state.agent_responses[self.name] = "Error parsing LLM response"
                    
            except Exception as e:
                state.agent_responses[self.name] = f"Error in database agent: {str(e)}"
        
        return state

class CoordinatorAgent:
    """Agent that coordinates other agents and provides final response"""
    
    def __init__(self):
        self.name = "coordinator"
    
    async def process(self, state: AgentState) -> AgentState:
        """Coordinate agent responses and provide final answer"""
        
        # Collect all agent responses
        responses = []
        for agent_name, response in state.agent_responses.items():
            responses.append(f"{agent_name}: {response}")
        
        # Create final response
        prompt = f"""
        You are a coordinator agent. The user's task was: {state.current_task}
        
        Here are the responses from specialist agents:
        {chr(10).join(responses)}
        
        Provide a coherent, helpful final response to the user based on all agent outputs.
        Be concise but comprehensive.
        """
        
        final_response = await llm.ainvoke([SystemMessage(content=prompt)])
        state.final_response = final_response.content
        
        return state

# Create the multi-agent workflow
def create_multi_agent_workflow():
    """Create the LangGraph workflow"""
    
    # Initialize agents
    file_agent = FileAgent()
    db_agent = DatabaseAgent()
    coordinator = CoordinatorAgent()
    
    # Define the workflow
    workflow = StateGraph(AgentState)
    
    # Add nodes
    workflow.add_node("file_agent", file_agent.process)
    workflow.add_node("database_agent", db_agent.process)
    workflow.add_node("coordinator", coordinator.process)
    
    # Define the flow
    workflow.set_entry_point("file_agent")
    workflow.add_edge("file_agent", "database_agent")
    workflow.add_edge("database_agent", "coordinator")
    workflow.add_edge("coordinator", END)
    
    # Add memory
    memory = MemorySaver()
    app = workflow.compile(checkpointer=memory)
    
    return app

async def main():
    """Main function to run the multi-agent system"""
    
    # Connect to MCP servers
    print("Connecting to MCP servers...")
    
    # Connect to filesystem server
    await mcp_client.connect_server("filesystem", ["python", "mcp_servers/filesystem_server.py"])
    
    # Connect to database server  
    await mcp_client.connect_server("database", ["python", "mcp_servers/database_server.py"])
    
    print("All servers connected!")
    
    # Create the workflow
    app = create_multi_agent_workflow()
    
    # Example tasks
    tasks = [
        "Create a file called 'users.txt' with a list of user names",
        "Insert a new user named 'Alice Smith' with email 'alice@example.com' into the database",
        "Query the database to show all users and then write the results to a file called 'user_report.txt'"
    ]
    
    # Run tasks
    for i, task in enumerate(tasks):
        print(f"\n--- Task {i+1}: {task} ---")
        
        # Create initial state
        initial_state = AgentState(
            current_task=task,
            messages=[{"role": "user", "content": task}]
        )
        
        # Run the workflow
        config = {"configurable": {"thread_id": f"task_{i}"}}
        result = await app.ainvoke(initial_state, config)
        
        print(f"Final Response: {result.final_response}")
        print(f"Agent Details: {result.agent_responses}")
    
    # Cleanup
    await mcp_client.close_all()

if __name__ == "__main__":
    asyncio.run(main())

Step 4: Running the System

  1. Start the system:
python multi_agent_system.py
  1. The system will:
    • Connect to multiple MCP servers
    • Route tasks to appropriate specialist agents
    • Coordinate responses through the coordinator agent
    • Provide comprehensive final answers

Step 5: Advanced Features

Adding More MCP Servers

# Example: Web scraping MCP server
async def add_web_server():
    await mcp_client.connect_server("web", ["python", "mcp_servers/web_server.py"])

Dynamic Agent Discovery

async def discover_agents():
    """Dynamically discover available agents based on connected MCP servers"""
    all_tools = await mcp_client.get_available_tools()
    
    agents = {}
    for server_name, tools in all_tools.items():
        agent_class = create_agent_for_server(server_name, tools)
        agents[server_name] = agent_class()
    
    return agents

Benefits of This Architecture

  1. Modularity: Each MCP server provides specific capabilities
  2. Scalability: Easy to add new servers and agents
  3. Separation of Concerns: Agents specialize in specific domains
  4. Flexibility: Can route tasks dynamically based on content
  5. Maintainability: Clear boundaries between components

Next Steps

  1. Add authentication and security layers
  2. Implement error recovery and retry mechanisms
  3. Add monitoring and logging
  4. Create a web interface using the React integration from earlier
  5. Deploy using Docker containers for each MCP server

This architecture provides a solid foundation for building sophisticated multi-agent systems with LangGraph and MCP servers.

Single Agent LangGraph + MCP Tutorial

This tutorial demonstrates how to build a single LangGraph agent that uses MCP (Model Context Protocol) tools with llm.bind_tools and ToolNode.

Overview

We'll create a simple but powerful system:

  • One MCP Server: Provides file system tools
  • One LangGraph Agent: Uses the MCP tools via bind_tools and ToolNode
  • Clean Architecture: Modern LangGraph patterns with proper tool integration
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   User Input    │───▢│   LangGraph      │───▢│   MCP Server    β”‚
β”‚                 β”‚    β”‚   Agent          β”‚    β”‚   (File Tools)  β”‚
β”‚                 │◀───│   + ToolNode     │◀───│                 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Prerequisites

# Install required packages
pip install langgraph langchain-core langchain-openai
pip install mcp  # MCP Python SDK

Step 1: Create the MCP Server

Let's start with a simple file system MCP server:

# mcp_file_server.py
import asyncio
import os
import json
from pathlib import Path
from typing import List, Dict, Any
from mcp import Server, types
from mcp.server import stdio_server

class FileSystemMCPServer:
    """Simple MCP server for file system operations"""
    
    def __init__(self):
        self.server = Server("filesystem-server")
        self._register_tools()
    
    def _register_tools(self):
        """Register file system tools"""
        
        @self.server.list_tools()
        async def list_tools() -> List[types.Tool]:
            return [
                types.Tool(
                    name="read_file",
                    description="Read the contents of a file",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "path": {
                                "type": "string",
                                "description": "Path to the file to read"
                            }
                        },
                        "required": ["path"]
                    }
                ),
                types.Tool(
                    name="write_file",
                    description="Write content to a file",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "path": {
                                "type": "string", 
                                "description": "Path to the file to write"
                            },
                            "content": {
                                "type": "string",
                                "description": "Content to write to the file"
                            }
                        },
                        "required": ["path", "content"]
                    }
                ),
                types.Tool(
                    name="list_files",
                    description="List files in a directory",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "path": {
                                "type": "string",
                                "description": "Directory path to list",
                                "default": "."
                            }
                        }
                    }
                ),
                types.Tool(
                    name="create_directory",
                    description="Create a new directory",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "path": {
                                "type": "string",
                                "description": "Path of directory to create"
                            }
                        },
                        "required": ["path"]
                    }
                )
            ]
        
        @self.server.call_tool()
        async def call_tool(name: str, arguments: Dict[str, Any]) -> List[types.TextContent]:
            """Handle tool calls"""
            
            try:
                if name == "read_file":
                    path = arguments["path"]
                    with open(path, "r", encoding="utf-8") as f:
                        content = f.read()
                    return [types.TextContent(
                        type="text", 
                        text=f"File content:\n{content}"
                    )]
                
                elif name == "write_file":
                    path = arguments["path"]
                    content = arguments["content"]
                    
                    # Create directory if it doesn't exist
                    os.makedirs(os.path.dirname(path), exist_ok=True)
                    
                    with open(path, "w", encoding="utf-8") as f:
                        f.write(content)
                    return [types.TextContent(
                        type="text",
                        text=f"Successfully wrote {len(content)} characters to {path}"
                    )]
                
                elif name == "list_files":
                    path = arguments.get("path", ".")
                    items = []
                    
                    for item in sorted(os.listdir(path)):
                        item_path = os.path.join(path, item)
                        if os.path.isdir(item_path):
                            items.append(f"πŸ“ {item}/")
                        else:
                            size = os.path.getsize(item_path)
                            items.append(f"πŸ“„ {item} ({size} bytes)")
                    
                    return [types.TextContent(
                        type="text",
                        text=f"Contents of {path}:\n" + "\n".join(items)
                    )]
                
                elif name == "create_directory":
                    path = arguments["path"]
                    os.makedirs(path, exist_ok=True)
                    return [types.TextContent(
                        type="text",
                        text=f"Created directory: {path}"
                    )]
                
                else:
                    return [types.TextContent(
                        type="text",
                        text=f"Unknown tool: {name}"
                    )]
            
            except Exception as e:
                return [types.TextContent(
                    type="text",
                    text=f"Error executing {name}: {str(e)}"
                )]
    
    async def run(self):
        """Run the MCP server"""
        async with stdio_server() as streams:
            await self.server.run(streams[0], streams[1])

# Run the server
if __name__ == "__main__":
    server = FileSystemMCPServer()
    asyncio.run(server.run())

Step 2: MCP Client Integration

Create a client to connect to the MCP server:

# mcp_client.py
import asyncio
import subprocess
import json
from typing import Dict, Any, List, Optional
from mcp import ClientSession
from mcp.client.stdio import stdio_client

class MCPToolClient:
    """Client for calling MCP server tools"""
    
    def __init__(self):
        self.session: Optional[ClientSession] = None
        self.process: Optional[subprocess.Popen] = None
        self.tools_schema = {}
    
    async def connect(self, server_command: List[str]):
        """Connect to MCP server"""
        try:
            # Start the MCP server process
            self.process = subprocess.Popen(
                server_command,
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True
            )
            
            # Create client session
            self.session = await stdio_client(
                self.process.stdin, 
                self.process.stdout
            )
            
            # Initialize the session
            await self.session.initialize()
            
            # Get available tools
            await self._load_tools_schema()
            
            print("βœ… Connected to MCP server")
            return True
            
        except Exception as e:
            print(f"❌ Failed to connect to MCP server: {e}")
            return False
    
    async def _load_tools_schema(self):
        """Load tools schema from MCP server"""
        try:
            tools_response = await self.session.list_tools()
            
            for tool in tools_response.tools:
                self.tools_schema[tool.name] = {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": tool.inputSchema
                }
            
            print(f"πŸ“‹ Loaded {len(self.tools_schema)} tools from MCP server")
            
        except Exception as e:
            print(f"❌ Failed to load tools schema: {e}")
    
    def get_tools_schema(self) -> List[Dict[str, Any]]:
        """Get tools schema for LangChain binding"""
        return list(self.tools_schema.values())
    
    async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str:
        """Call a tool on the MCP server"""
        if not self.session:
            raise RuntimeError("Not connected to MCP server")
        
        try:
            result = await self.session.call_tool(tool_name, arguments)
            
            if result.content:
                return result.content[0].text
            return "Tool executed successfully (no output)"
            
        except Exception as e:
            return f"Error calling tool {tool_name}: {str(e)}"
    
    async def close(self):
        """Close the MCP connection"""
        if self.session:
            await self.session.close()
        
        if self.process:
            self.process.terminate()
            self.process.wait(timeout=5)

# Global MCP client instance
mcp_client = MCPToolClient()

Step 3: LangGraph Agent with ToolNode

Now let's create the main LangGraph agent:

# langgraph_agent.py
import asyncio
from typing import Annotated, Dict, Any, List
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
from langgraph.checkpoint.memory import MemorySaver

from mcp_client import mcp_client

# =============================================================================
# STEP 1: Create LangChain Tools from MCP Tools
# =============================================================================

class MCPToolWrapper:
    """Wrapper to convert MCP tools to LangChain tools"""
    
    @staticmethod
    def create_langchain_tools():
        """Create LangChain tools from MCP tools"""
        tools = []
        
        @tool
        async def read_file(path: str) -> str:
            """Read the contents of a file"""
            return await mcp_client.call_tool("read_file", {"path": path})
        
        @tool
        async def write_file(path: str, content: str) -> str:
            """Write content to a file"""
            return await mcp_client.call_tool("write_file", {
                "path": path, 
                "content": content
            })
        
        @tool
        async def list_files(path: str = ".") -> str:
            """List files in a directory"""
            return await mcp_client.call_tool("list_files", {"path": path})
        
        @tool
        async def create_directory(path: str) -> str:
            """Create a new directory"""
            return await mcp_client.call_tool("create_directory", {"path": path})
        
        return [read_file, write_file, list_files, create_directory]

# =============================================================================
# STEP 2: Define Agent State
# =============================================================================

class AgentState(MessagesState):
    """State for our LangGraph agent"""
    # MessagesState already includes 'messages' field
    # We can add additional fields if needed
    pass

# =============================================================================
# STEP 3: Create the LangGraph Agent
# =============================================================================

class FileAssistantAgent:
    """LangGraph agent that uses MCP file tools"""
    
    def __init__(self):
        # Initialize LLM
        self.llm = ChatOpenAI(model="gpt-4", temperature=0)
        
        # Create tools from MCP
        self.tools = MCPToolWrapper.create_langchain_tools()
        
        # Bind tools to LLM
        self.llm_with_tools = self.llm.bind_tools(self.tools)
        
        # Create tool node
        self.tool_node = ToolNode(self.tools)
        
        # Build the graph
        self.graph = self._create_graph()
    
    def _create_graph(self) -> StateGraph:
        """Create the LangGraph workflow"""
        
        # Define the graph
        workflow = StateGraph(AgentState)
        
        # Add nodes
        workflow.add_node("agent", self._call_model)
        workflow.add_node("tools", self.tool_node)
        
        # Define the flow
        workflow.add_edge(START, "agent")
        workflow.add_conditional_edges(
            "agent",
            self._should_continue,
            {
                "tools": "tools",
                "end": END
            }
        )
        workflow.add_edge("tools", "agent")
        
        # Add memory
        memory = MemorySaver()
        return workflow.compile(checkpointer=memory)
    
    async def _call_model(self, state: AgentState) -> Dict[str, Any]:
        """Call the LLM with tools"""
        messages = state["messages"]
        response = await self.llm_with_tools.ainvoke(messages)
        return {"messages": [response]}
    
    def _should_continue(self, state: AgentState) -> str:
        """Decide whether to continue or end"""
        messages = state["messages"]
        last_message = messages[-1]
        
        # If there are tool calls, continue to tools
        if last_message.tool_calls:
            return "tools"
        # Otherwise, end
        return "end"
    
    async def run(self, user_input: str, thread_id: str = "default") -> str:
        """Run the agent with user input"""
        
        # Create initial state
        initial_state = {
            "messages": [HumanMessage(content=user_input)]
        }
        
        # Run the graph
        config = {"configurable": {"thread_id": thread_id}}
        final_state = await self.graph.ainvoke(initial_state, config)
        
        # Return the last message content
        return final_state["messages"][-1].content
    
    async def stream(self, user_input: str, thread_id: str = "default"):
        """Stream the agent's response"""
        
        initial_state = {
            "messages": [HumanMessage(content=user_input)]
        }
        
        config = {"configurable": {"thread_id": thread_id}}
        
        async for event in self.graph.astream(initial_state, config):
            for node_name, state in event.items():
                if node_name == "agent":
                    message = state["messages"][-1]
                    if hasattr(message, 'content') and message.content:
                        print(f"πŸ€– {message.content}")
                elif node_name == "tools":
                    for message in state["messages"]:
                        if isinstance(message, ToolMessage):
                            print(f"πŸ”§ Tool: {message.name}")
                            print(f"πŸ“€ Result: {message.content}")

# =============================================================================
# STEP 4: Main Application
# =============================================================================

async def main():
    """Main application function"""
    
    print("πŸš€ Starting File Assistant Agent with MCP")
    print("=" * 50)
    
    # Connect to MCP server
    print("πŸ“‘ Connecting to MCP server...")
    connected = await mcp_client.connect([
        "python", "mcp_file_server.py"
    ])
    
    if not connected:
        print("❌ Failed to connect to MCP server")
        return
    
    # Create the agent
    print("πŸ€– Creating LangGraph agent...")
    agent = FileAssistantAgent()
    
    # Interactive loop
    print("\nβœ… Agent ready! Type 'quit' to exit.")
    print("πŸ’‘ Try: 'Create a file called test.txt with some content'")
    print("-" * 50)
    
    while True:
        try:
            user_input = input("\nπŸ‘€ You: ").strip()
            
            if user_input.lower() in ['quit', 'exit', 'q']:
                break
            
            if not user_input:
                continue
            
            print("\nπŸ”„ Processing...")
            
            # Stream the response
            await agent.stream(user_input, "interactive_session")
            
        except KeyboardInterrupt:
            break
        except Exception as e:
            print(f"❌ Error: {e}")
    
    # Cleanup
    print("\n🧹 Cleaning up...")
    await mcp_client.close()
    print("πŸ‘‹ Goodbye!")

# =============================================================================
# STEP 5: Example Usage Functions
# =============================================================================

async def run_examples():
    """Run some example interactions"""
    
    # Connect to MCP server
    await mcp_client.connect(["python", "mcp_file_server.py"])
    
    # Create agent
    agent = FileAssistantAgent()
    
    # Example tasks
    examples = [
        "List the files in the current directory",
        "Create a file called 'notes.txt' with the content 'Hello, World!'",
        "Read the content of notes.txt",
        "Create a directory called 'projects'",
        "Write a Python script to projects/hello.py that prints hello world"
    ]
    
    print("πŸ§ͺ Running Examples")
    print("=" * 30)
    
    for i, task in enumerate(examples, 1):
        print(f"\nπŸ“ Example {i}: {task}")
        print("-" * 40)
        
        try:
            response = await agent.run(task, f"example_{i}")
            print(f"πŸ€– Response: {response}")
        except Exception as e:
            print(f"❌ Error: {e}")
    
    # Cleanup
    await mcp_client.close()

# =============================================================================
# ENTRY POINT
# =============================================================================

if __name__ == "__main__":
    import sys
    
    if len(sys.argv) > 1 and sys.argv[1] == "examples":
        # Run examples
        asyncio.run(run_examples())
    else:
        # Run interactive mode
        asyncio.run(main())

Step 4: Running the System

Start Interactive Mode:

python langgraph_agent.py

Run Examples:

python langgraph_agent.py examples

Start Just the MCP Server:

python mcp_file_server.py

Step 5: Example Interactions

πŸ‘€ You: Create a file called shopping.txt with a grocery list

πŸ€– I'll create a shopping list file for you.

πŸ”§ Tool: write_file
πŸ“€ Result: Successfully wrote 67 characters to shopping.txt

πŸ€– I've created a file called 'shopping.txt' with a grocery list containing essential items like bread, milk, eggs, apples, and bananas.

πŸ‘€ You: Read that file back to me

πŸ”§ Tool: read_file  
πŸ“€ Result: File content:
Grocery Shopping List:
- Bread
- Milk  
- Eggs
- Apples
- Bananas

πŸ€– Here's the content of your shopping.txt file: [shows grocery list]

Key Benefits of This Approach

  1. Clean Architecture: Single agent with clear tool integration
  2. Modern LangGraph: Uses bind_tools and ToolNode patterns
  3. Async Support: Fully async implementation
  4. Streaming: Real-time response streaming
  5. Memory: Conversation persistence across interactions
  6. Error Handling: Robust error handling throughout
  7. Modular: Easy to add new MCP tools

Extending the System

Add New MCP Tools:

# In mcp_file_server.py, add to list_tools():
types.Tool(
    name="search_files",
    description="Search for files by name pattern",
    # ... schema
)

# In langgraph_agent.py, add to MCPToolWrapper:
@tool
async def search_files(pattern: str) -> str:
    """Search for files matching a pattern"""
    return await mcp_client.call_tool("search_files", {"pattern": pattern})

Add Multiple MCP Servers:

# Connect to multiple servers
await mcp_client.connect(["python", "file_server.py"])
await web_client.connect(["python", "web_server.py"])  
await db_client.connect(["python", "database_server.py"])

# Combine tools from all servers
all_tools = (
    file_tools + 
    web_tools + 
    database_tools
)

This simplified architecture is much easier to understand and extend while still providing powerful MCP integration with modern LangGraph patterns!

@teone
Copy link
Author

teone commented Aug 15, 2025

Note that the MCPToolWrapper should be reimplemented to dinamically call the list_tool method of the MCP server and construct a list of available tools

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment