This tutorial demonstrates how to build a multi-agent system using LangGraph with MCP (Model Context Protocol) servers for enhanced tool capabilities.
MCP (Model Context Protocol) allows AI assistants to securely connect to external data sources and tools. By integrating MCP servers with LangGraph, you can create powerful multi-agent systems with access to diverse capabilities.
βββββββββββββββββββ ββββββββββββββββββββ βββββββββββββββββββ
β LangGraph β β MCP Client β β MCP Servers β
β Multi-Agent βββββΊβ Integration βββββΊβ (File, DB, β
β System β β β β Web, etc.) β
βββββββββββββββββββ ββββββββββββββββββββ βββββββββββββββββββ
# Install required packages
pip install langgraph langchain-core langchain-openai
pip install mcp # MCP Python SDK
pip install httpx # For HTTP MCP serversFirst, let's set up multiple MCP servers for different capabilities:
# mcp_servers/filesystem_server.py
import asyncio
from mcp import Server, types
from mcp.server import stdio_server
import os
import json
# Create MCP server
server = Server("filesystem-server")
@server.list_tools()
async def list_tools() -> list[types.Tool]:
return [
types.Tool(
name="read_file",
description="Read contents of a file",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path to read"}
},
"required": ["path"]
}
),
types.Tool(
name="write_file",
description="Write content to a file",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path to write"},
"content": {"type": "string", "description": "Content to write"}
},
"required": ["path", "content"]
}
),
types.Tool(
name="list_directory",
description="List files in a directory",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string", "description": "Directory path"}
},
"required": ["path"]
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[types.TextContent]:
if name == "read_file":
try:
with open(arguments["path"], "r") as f:
content = f.read()
return [types.TextContent(type="text", text=content)]
except Exception as e:
return [types.TextContent(type="text", text=f"Error reading file: {str(e)}")]
elif name == "write_file":
try:
with open(arguments["path"], "w") as f:
f.write(arguments["content"])
return [types.TextContent(type="text", text=f"Successfully wrote to {arguments['path']}")]
except Exception as e:
return [types.TextContent(type="text", text=f"Error writing file: {str(e)}")]
elif name == "list_directory":
try:
files = os.listdir(arguments["path"])
return [types.TextContent(type="text", text=json.dumps(files, indent=2))]
except Exception as e:
return [types.TextContent(type="text", text=f"Error listing directory: {str(e)}")]
# Run server
async def main():
async with stdio_server() as streams:
await server.run(streams[0], streams[1])
if __name__ == "__main__":
asyncio.run(main())# mcp_servers/database_server.py
import asyncio
import sqlite3
import json
from mcp import Server, types
from mcp.server import stdio_server
server = Server("database-server")
# Initialize SQLite database
def init_db():
conn = sqlite3.connect("example.db")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE
)
""")
conn.commit()
conn.close()
init_db()
@server.list_tools()
async def list_tools() -> list[types.Tool]:
return [
types.Tool(
name="query_database",
description="Execute SQL query on database",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "SQL query to execute"}
},
"required": ["query"]
}
),
types.Tool(
name="insert_user",
description="Insert a new user",
inputSchema={
"type": "object",
"properties": {
"name": {"type": "string"},
"email": {"type": "string"}
},
"required": ["name", "email"]
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[types.TextContent]:
conn = sqlite3.connect("example.db")
cursor = conn.cursor()
try:
if name == "query_database":
cursor.execute(arguments["query"])
results = cursor.fetchall()
return [types.TextContent(type="text", text=json.dumps(results, indent=2))]
elif name == "insert_user":
cursor.execute("INSERT INTO users (name, email) VALUES (?, ?)",
(arguments["name"], arguments["email"]))
conn.commit()
return [types.TextContent(type="text", text="User inserted successfully")]
except Exception as e:
return [types.TextContent(type="text", text=f"Database error: {str(e)}")]
finally:
conn.close()
async def main():
async with stdio_server() as streams:
await server.run(streams[0], streams[1])
if __name__ == "__main__":
asyncio.run(main())Create an MCP client to connect to your servers:
# mcp_client.py
import asyncio
import subprocess
from typing import Dict, Any, List
from mcp import ClientSession
from mcp.client.stdio import stdio_client
class MCPClient:
def __init__(self):
self.sessions: Dict[str, ClientSession] = {}
self.processes: Dict[str, subprocess.Popen] = {}
async def connect_server(self, server_name: str, command: List[str]):
"""Connect to an MCP server"""
try:
# Start the MCP server process
process = subprocess.Popen(
command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
self.processes[server_name] = process
# Create client session
session = await stdio_client(process.stdin, process.stdout)
self.sessions[server_name] = session
# Initialize the session
await session.initialize()
print(f"Connected to {server_name}")
return True
except Exception as e:
print(f"Failed to connect to {server_name}: {e}")
return False
async def get_available_tools(self, server_name: str = None):
"""Get all available tools from connected servers"""
all_tools = {}
servers = [server_name] if server_name else self.sessions.keys()
for name in servers:
if name in self.sessions:
try:
tools = await self.sessions[name].list_tools()
all_tools[name] = tools.tools
except Exception as e:
print(f"Error getting tools from {name}: {e}")
return all_tools
async def call_tool(self, server_name: str, tool_name: str, arguments: Dict[str, Any]):
"""Call a specific tool on a specific server"""
if server_name not in self.sessions:
raise ValueError(f"Server {server_name} not connected")
try:
result = await self.sessions[server_name].call_tool(tool_name, arguments)
return result.content
except Exception as e:
raise Exception(f"Tool call failed: {e}")
async def close_all(self):
"""Close all connections"""
for session in self.sessions.values():
try:
await session.close()
except:
pass
for process in self.processes.values():
try:
process.terminate()
process.wait(timeout=5)
except:
process.kill()
# Global MCP client instance
mcp_client = MCPClient()Now let's create a multi-agent system that uses MCP servers:
# multi_agent_system.py
import asyncio
from typing import Dict, Any, List
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from pydantic import BaseModel
import json
from mcp_client import mcp_client
# Define the state
class AgentState(BaseModel):
messages: List[Dict[str, Any]] = []
current_task: str = ""
available_tools: Dict[str, Any] = {}
agent_responses: Dict[str, str] = {}
final_response: str = ""
# Initialize LLM
llm = ChatOpenAI(model="gpt-4", temperature=0)
class FileAgent:
"""Agent specialized in file operations"""
def __init__(self):
self.name = "file_agent"
self.server_name = "filesystem"
async def process(self, state: AgentState) -> AgentState:
"""Process file-related tasks"""
task = state.current_task.lower()
if any(keyword in task for keyword in ["file", "read", "write", "directory", "folder"]):
try:
# Get file tools
tools = await mcp_client.get_available_tools(self.server_name)
# Create a prompt for the LLM to decide which tool to use
prompt = f"""
You are a file management agent. Your task is: {state.current_task}
Available tools: {json.dumps([tool.name for tool in tools.get(self.server_name, [])], indent=2)}
Based on the task, determine which tool to use and what arguments to provide.
Respond with a JSON object containing:
- "tool": the tool name to use
- "arguments": a dictionary of arguments for the tool
If the task doesn't require file operations, respond with {"tool": null}
"""
response = await llm.ainvoke([SystemMessage(content=prompt)])
try:
decision = json.loads(response.content)
if decision.get("tool"):
result = await mcp_client.call_tool(
self.server_name,
decision["tool"],
decision["arguments"]
)
state.agent_responses[self.name] = f"File operation completed: {result[0].text if result else 'No result'}"
else:
state.agent_responses[self.name] = "No file operations needed"
except json.JSONDecodeError:
state.agent_responses[self.name] = "Error parsing LLM response"
except Exception as e:
state.agent_responses[self.name] = f"Error in file agent: {str(e)}"
return state
class DatabaseAgent:
"""Agent specialized in database operations"""
def __init__(self):
self.name = "database_agent"
self.server_name = "database"
async def process(self, state: AgentState) -> AgentState:
"""Process database-related tasks"""
task = state.current_task.lower()
if any(keyword in task for keyword in ["database", "db", "query", "sql", "user", "insert", "select"]):
try:
tools = await mcp_client.get_available_tools(self.server_name)
prompt = f"""
You are a database management agent. Your task is: {state.current_task}
Available tools: {json.dumps([tool.name for tool in tools.get(self.server_name, [])], indent=2)}
Based on the task, determine which tool to use and what arguments to provide.
Respond with a JSON object containing:
- "tool": the tool name to use
- "arguments": a dictionary of arguments for the tool
If the task doesn't require database operations, respond with {"tool": null}
"""
response = await llm.ainvoke([SystemMessage(content=prompt)])
try:
decision = json.loads(response.content)
if decision.get("tool"):
result = await mcp_client.call_tool(
self.server_name,
decision["tool"],
decision["arguments"]
)
state.agent_responses[self.name] = f"Database operation completed: {result[0].text if result else 'No result'}"
else:
state.agent_responses[self.name] = "No database operations needed"
except json.JSONDecodeError:
state.agent_responses[self.name] = "Error parsing LLM response"
except Exception as e:
state.agent_responses[self.name] = f"Error in database agent: {str(e)}"
return state
class CoordinatorAgent:
"""Agent that coordinates other agents and provides final response"""
def __init__(self):
self.name = "coordinator"
async def process(self, state: AgentState) -> AgentState:
"""Coordinate agent responses and provide final answer"""
# Collect all agent responses
responses = []
for agent_name, response in state.agent_responses.items():
responses.append(f"{agent_name}: {response}")
# Create final response
prompt = f"""
You are a coordinator agent. The user's task was: {state.current_task}
Here are the responses from specialist agents:
{chr(10).join(responses)}
Provide a coherent, helpful final response to the user based on all agent outputs.
Be concise but comprehensive.
"""
final_response = await llm.ainvoke([SystemMessage(content=prompt)])
state.final_response = final_response.content
return state
# Create the multi-agent workflow
def create_multi_agent_workflow():
"""Create the LangGraph workflow"""
# Initialize agents
file_agent = FileAgent()
db_agent = DatabaseAgent()
coordinator = CoordinatorAgent()
# Define the workflow
workflow = StateGraph(AgentState)
# Add nodes
workflow.add_node("file_agent", file_agent.process)
workflow.add_node("database_agent", db_agent.process)
workflow.add_node("coordinator", coordinator.process)
# Define the flow
workflow.set_entry_point("file_agent")
workflow.add_edge("file_agent", "database_agent")
workflow.add_edge("database_agent", "coordinator")
workflow.add_edge("coordinator", END)
# Add memory
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
return app
async def main():
"""Main function to run the multi-agent system"""
# Connect to MCP servers
print("Connecting to MCP servers...")
# Connect to filesystem server
await mcp_client.connect_server("filesystem", ["python", "mcp_servers/filesystem_server.py"])
# Connect to database server
await mcp_client.connect_server("database", ["python", "mcp_servers/database_server.py"])
print("All servers connected!")
# Create the workflow
app = create_multi_agent_workflow()
# Example tasks
tasks = [
"Create a file called 'users.txt' with a list of user names",
"Insert a new user named 'Alice Smith' with email 'alice@example.com' into the database",
"Query the database to show all users and then write the results to a file called 'user_report.txt'"
]
# Run tasks
for i, task in enumerate(tasks):
print(f"\n--- Task {i+1}: {task} ---")
# Create initial state
initial_state = AgentState(
current_task=task,
messages=[{"role": "user", "content": task}]
)
# Run the workflow
config = {"configurable": {"thread_id": f"task_{i}"}}
result = await app.ainvoke(initial_state, config)
print(f"Final Response: {result.final_response}")
print(f"Agent Details: {result.agent_responses}")
# Cleanup
await mcp_client.close_all()
if __name__ == "__main__":
asyncio.run(main())- Start the system:
python multi_agent_system.py- The system will:
- Connect to multiple MCP servers
- Route tasks to appropriate specialist agents
- Coordinate responses through the coordinator agent
- Provide comprehensive final answers
# Example: Web scraping MCP server
async def add_web_server():
await mcp_client.connect_server("web", ["python", "mcp_servers/web_server.py"])async def discover_agents():
"""Dynamically discover available agents based on connected MCP servers"""
all_tools = await mcp_client.get_available_tools()
agents = {}
for server_name, tools in all_tools.items():
agent_class = create_agent_for_server(server_name, tools)
agents[server_name] = agent_class()
return agents- Modularity: Each MCP server provides specific capabilities
- Scalability: Easy to add new servers and agents
- Separation of Concerns: Agents specialize in specific domains
- Flexibility: Can route tasks dynamically based on content
- Maintainability: Clear boundaries between components
- Add authentication and security layers
- Implement error recovery and retry mechanisms
- Add monitoring and logging
- Create a web interface using the React integration from earlier
- Deploy using Docker containers for each MCP server
This architecture provides a solid foundation for building sophisticated multi-agent systems with LangGraph and MCP servers.
Note that the
MCPToolWrappershould be reimplemented to dinamically call thelist_toolmethod of the MCP server and construct a list of available tools