Skip to content

Instantly share code, notes, and snippets.

@abutbul
Created March 27, 2025 17:23
Show Gist options
  • Select an option

  • Save abutbul/1664a65b57009da8208e5c301496f8b5 to your computer and use it in GitHub Desktop.

Select an option

Save abutbul/1664a65b57009da8208e5c301496f8b5 to your computer and use it in GitHub Desktop.
import traceback
import logging
import sys
import os
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.prebuilt import create_react_agent
from langchain_ollama.chat_models import ChatOllama
from langchain_deepseek import ChatDeepSeek
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from langchain_core.messages import AIMessage
from pydantic import BaseModel
import uvicorn
if not os.getenv("DEEPSEEK_API_KEY"):
os.environ["DEEPSEEK_API_KEY"] = ""
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger("mcp_client")
# Define server configuration once
MCP_SERVER_CONFIG = {
"gatherings": {
"command": "python",
"args": ["gatherings_mcp_server_single.py"],
"transport": "stdio"
}
}
# Debug mode - set to True to include detailed error info in responses
DEBUG_MODE = True
app = FastAPI(title="MCP Client API (Debug Mode)")
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
"""Global exception handler to log all exceptions"""
error_detail = {
"error": str(exc),
"traceback": traceback.format_exc() if DEBUG_MODE else "Set DEBUG_MODE=True for traceback"
}
logger.error(f"Unhandled exception: {str(exc)}\n{traceback.format_exc()}")
return JSONResponse(status_code=500, content=error_detail)
@app.get("/health")
async def health_check():
return {"status": "ok"}
@app.get("/tools")
async def list_tools():
"""Endpoint to explicitly list all available tools without invoking the agent"""
try:
logger.info("Listing available tools")
logger.debug("Initializing MultiServerMCPClient")
async with MultiServerMCPClient(MCP_SERVER_CONFIG) as client:
try:
logger.debug("Getting tools from MCP Client")
tools = client.get_tools()
# Format tools in a more readable way
tool_info = []
for tool in tools:
tool_data = {
"name": tool.name,
"description": tool.description,
"parameters": str(tool.args)
}
tool_info.append(tool_data)
logger.debug(f"Retrieved {len(tools)} tools")
return {
"status": "success",
"count": len(tools),
"tools": tool_info,
"server_config": MCP_SERVER_CONFIG
}
except Exception as e:
logger.error(f"Error listing tools: {str(e)}")
logger.error(traceback.format_exc())
error_detail = {
"error": str(e),
"traceback": traceback.format_exc() if DEBUG_MODE else None
}
raise HTTPException(status_code=500, detail=error_detail)
except Exception as e:
logger.error(f"Unhandled exception in list_tools: {str(e)}")
logger.error(traceback.format_exc())
error_detail = {
"error": str(e),
"traceback": traceback.format_exc() if DEBUG_MODE else None
}
raise HTTPException(status_code=500, detail=error_detail)
class MessageRequest(BaseModel):
message: str
@app.post("/message")
async def process_message(request: MessageRequest):
try:
logger.info(f"Processing message: {request.message}")
# Initialize model
logger.debug("Initializing ChatOllama model")
model = ChatOllama(model="llama3.1", temperature=0.3)
# logger.debug("Initializing ChatDeepSeek model")
# model = ChatDeepSeek(model="deepseek-chat", temperature=0)
logger.debug("Initializing MultiServerMCPClient")
async with MultiServerMCPClient(MCP_SERVER_CONFIG) as client:
try:
logger.debug("Getting tools from MCP Client")
tools = client.get_tools()
logger.debug(f"Retrieved {len(tools)} tools")
# Log tool names for debugging
tool_names = [tool.name for tool in tools]
logger.debug(f"Available tools: {tool_names}")
# Check if the message is specifically asking for tools
if any(keyword in request.message.lower() for keyword in ["what tools", "list tools", "available tools"]):
tool_descriptions = [f"- {tool.name}: {tool.description}" for tool in tools]
response_content = "I have the following tools available:\n" + "\n".join(tool_descriptions)
# Format response according to documented structure
return {
"response": {
"messages": [
{
"content": response_content,
"type": "ai",
"tool_calls": [],
"tool_call_id": None
}
],
"return_values": {
"output": response_content
}
},
"status": "success"
}
logger.debug("Creating react agent")
agent = create_react_agent(model, tools)
logger.debug("Invoking agent with message")
# Format input as messages which LangGraph ReAct agent expects
agent_response = await agent.ainvoke({
"messages": [{"role": "user", "content": request.message}]
})
logger.info("Agent response received successfully")
logger.debug(f"Agent response type: {type(agent_response)}")
logger.debug(f"Agent response details: {agent_response}")
# Improved response handling logic
if isinstance(agent_response, AIMessage):
response_content = agent_response.content
elif isinstance(agent_response, dict) and "messages" in agent_response:
# Handle dict with messages
messages = agent_response["messages"]
if messages and len(messages) > 0:
last_message = messages[-1]
if isinstance(last_message, dict) and "content" in last_message:
response_content = last_message["content"]
elif hasattr(last_message, "content"):
response_content = last_message.content
else:
response_content = str(last_message)
else:
response_content = str(agent_response)
elif hasattr(agent_response, "messages"):
# Handle object with messages attribute
messages = agent_response.messages
if messages and len(messages) > 0:
last_message = messages[-1]
if hasattr(last_message, "content"):
response_content = last_message.content
else:
response_content = str(last_message)
else:
response_content = str(agent_response)
else:
# Last resort fallback
response_content = str(agent_response)
logger.debug(f"Extracted response content: {response_content}")
# Format response according to documented structure
return {
"response": {
"messages": [
{
"content": response_content,
"type": "ai",
"tool_calls": [],
"tool_call_id": None
}
],
"return_values": {
"output": response_content
}
},
"status": "success"
}
except Exception as e:
logger.error(f"Error during agent processing: {str(e)}")
logger.error(traceback.format_exc())
error_detail = {
"error": str(e),
"traceback": traceback.format_exc() if DEBUG_MODE else None
}
raise HTTPException(status_code=500, detail=error_detail)
except Exception as e:
logger.error(f"Unhandled exception in process_message: {str(e)}")
logger.error(traceback.format_exc())
error_detail = {
"error": str(e),
"traceback": traceback.format_exc() if DEBUG_MODE else None
}
raise HTTPException(status_code=500, detail=error_detail)
if __name__ == "__main__":
# Start the FastAPI server when the script is run directly
logger.info("Starting FastAPI server in debug mode")
uvicorn.run(app, host="0.0.0.0", port=8000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment