Skip to content

Instantly share code, notes, and snippets.

@cairijun
Created December 12, 2025 06:14
Show Gist options
  • Select an option

  • Save cairijun/160eeff5b8d3afb19ddb91fb5304b025 to your computer and use it in GitHub Desktop.

Select an option

Save cairijun/160eeff5b8d3afb19ddb91fb5304b025 to your computer and use it in GitHub Desktop.
Minimal MCP client for CodeCompanion
local log = require("codecompanion.utils.log")
local last_msg_id = 0
local function next_msg_id()
last_msg_id = last_msg_id + 1
return last_msg_id
end
local fire_nvim_event = function(event_name_suffix, data)
vim.api.nvim_exec_autocmds("User", {
pattern = "CodeCompanionMCP" .. event_name_suffix,
data = data,
})
end
local JsonRpc = {
ERROR_PARSE = -32700,
ERROR_INVALID_REQUEST = -32600,
ERROR_METHOD_NOT_FOUND = -32601,
ERROR_INVALID_PARAMS = -32602,
ERROR_INTERNAL = -32603,
}
local M = {}
---@class MCPRequest
---@field jsonrpc "2.0"
---@field id integer | string
---@field method string
---@field params table<string, any>?
---@class MCPResultResponse
---@field jsonrpc "2.0"
---@field id integer | string
---@field result table<string, any>?
---@class MCPErrorResponse
---@field jsonrpc "2.0"
---@field id integer | string
---@field error { code: integer, message: string, data: any? }
---ToolOverride allows per-tool customization
---@class ToolOverride
---@field opts table?
---@field enabled nil | boolean | fun(): boolean
---@field system_prompt string?
---@field output table<string, any>? override parts of the tool output handler (success, error, prompt, rejected)
---Configuration for starting and connecting to an MCP server
---@class ServerConfig
---@field cmd string[]
---@field env table<string, string>?
---@field name_prefix string? prefix to add to all tool names from this server to avoid conflicts
---@field server_instructions nil | string | fun(orig_server_instructions: string): string override the server-level instructions
---@field default_tool_opts table<string, any>? default options to apply to all tools from this server
---@field tool_overrides table<string, ToolOverride>? per-tool overrides by tool name
---@alias ServerRequestHandler fun(cli: Client, params: table<string, any>?): "result" | "error", table<string, any>
---@class Client
---@field name string
---@field cfg ServerConfig
---@field ready boolean
---@field resp_handlers table<integer, fun(resp: MCPResultResponse|MCPErrorResponse)>
---@field server_request_handlers table<string, ServerRequestHandler>
---@field proc vim.SystemObj?
---@field _last_line_tail string?
---@field server_capabilities table<string, any>?
---@field server_instructions string?
local Client = {}
Client.__index = Client
---@param name string
---@param cfg ServerConfig
function Client:new(name, cfg)
return setmetatable({
name = name,
cfg = cfg,
ready = false,
resp_handlers = {},
server_request_handlers = {
["ping"] = self._handle_server_ping,
["roots/list"] = self._handler_server_roots_list,
},
}, self)
end
function Client:start_if_not_started()
if self.proc then
return
end
log:info("Starting MCP Server [%s] with command: %s", self.name, table.concat(self.cfg.cmd, " "))
fire_nvim_event("ServerStart", { name = self.name })
local sys_obj = vim.system(
self.cfg.cmd,
{
env = self.cfg.env,
text = true,
stdin = true,
stdout = vim.schedule_wrap(function(err, data)
self:_on_proc_stdout(err, data)
end),
stderr = vim.schedule_wrap(function(err, data)
self:_on_proc_stderr(err, data)
end),
},
vim.schedule_wrap(function(out)
self:_on_proc_exit(out)
end)
)
self.proc = sys_obj
self:_start_init_mcp()
end
function Client:_start_init_mcp()
assert(self.proc, "MCP Server process is not running.")
assert(not self.ready, "MCP Server is already initialized.")
self:request("initialize", {
protocolVersion = "2025-11-25",
clientInfo = {
name = "CodeCompanion.nvim",
version = "0.0.0",
},
capabilities = {
roots = { listChanged = false },
},
}, function(resp)
if resp.error then
log:error("MCP Server [%s] initialization failed: %s", self.name, vim.inspect(resp))
return
end
log:info("MCP Server [%s] initialized successfully.", self.name)
log:info("MCP Server [%s] protocol version: %s", self.name, resp.result.protocolVersion)
log:info("MCP Server [%s] info: %s", self.name, vim.inspect(resp.result.serverInfo))
log:info("MCP Server [%s] capabilities: %s", self.name, vim.inspect(resp.result.capabilities))
self:notify("notifications/initialized")
self.server_capabilities = resp.result.capabilities
self.server_instructions = resp.result.instructions
self.ready = true
fire_nvim_event("ServerReady", { name = self.name })
self:refresh_tools()
end)
end
---@param out vim.SystemCompleted
function Client:_on_proc_exit(out)
self.ready = false
self.proc = nil
if out.code == 0 then
log:info("MCP Server [%s] exited normally.", self.name)
else
log:warn("MCP Server [%s] exited with code %d, signal %d", self.name, out.code, out.signal)
end
fire_nvim_event("ServerExit", { name = self.name, code = out.code, signal = out.signal })
end
function Client:_on_proc_stdout(err, data)
if err then
log:error("MCP Server [%s] stdout error: %s", self.name, err)
return
end
if not data or data == "" then
return
end
local combined_data = ""
if self._last_line_tail then
combined_data = self._last_line_tail .. data
self._last_line_tail = nil
else
combined_data = data
end
-- Check if there's a partial line at the end without a newline
local last_newline_pos = combined_data:match(".*()\n")
if last_newline_pos == nil then
-- No newline found, store the entire data as tail
self._last_line_tail = combined_data
return
elseif last_newline_pos < #combined_data then
-- Partial line found, store it as tail
self._last_line_tail = combined_data:sub(last_newline_pos + 1)
combined_data = combined_data:sub(1, last_newline_pos)
end
for line in vim.gsplit(combined_data, "\n", { plain = true, trimempty = true }) do
if line == "" then
goto continue
end
local ok, msg = pcall(vim.fn.json_decode, line)
if not ok then
log:error("MCP Server [%s] failed to decode received line [%s]: %s", self.name, msg, line)
goto continue
end
if type(msg) ~= "table" or msg.jsonrpc ~= "2.0" then
log:error("MCP Server [%s] received invalid MCP message: %s", self.name, line)
goto continue
end
if msg.id == nil then
log:info("MCP Server [%s] received notification: %s", self.name, line)
goto continue
end
if msg.method then
self:_handle_server_request(msg)
else
local handler = self.resp_handlers[msg.id]
if handler then
self.resp_handlers[msg.id] = nil
local handle_ok, handle_result = pcall(handler, msg)
if handle_ok then
log:debug("MCP Server [%s] response handler succeeded for request %s", self.name, msg.id)
else
log:error(
"MCP Server [%s] response handler failed for request %s: %s",
self.name,
msg.id,
handle_result
)
end
else
log:warn(
"MCP Server [%s] received response with unknown id %s: %s",
self.name,
msg.id,
line
)
end
end
::continue::
end
end
function Client:_handle_server_request(msg)
assert(self.proc, "MCP Server process is not running.")
local resp = {
jsonrpc = "2.0",
id = msg.id,
}
local handler = self.server_request_handlers[msg.method]
if not handler then
log:warn(
"MCP Server [%s] received request %s with unknown method %s",
self.name,
msg.id,
msg.method
)
resp.error = { code = JsonRpc.ERROR_METHOD_NOT_FOUND, message = "Method not found" }
else
local ok, status, body = pcall(handler, self, msg.params)
if not ok then
log:error(
"MCP Server [%s] handler for method %s failed for request %s: %s",
self.name,
msg.method,
msg.id,
status
)
resp.error = { code = JsonRpc.ERROR_INTERNAL, message = status }
elseif status == "error" then
log:error(
"MCP Server [%s] handler for method %s returned error for request %s: %s",
self.name,
msg.method,
msg.id,
vim.inspect(body)
)
resp.error = body
elseif status == "result" then
log:debug(
"MCP Server [%s] handler for method %s returned result for request %s",
self.name,
msg.method,
msg.id
)
resp.result = body
else
log:error(
"MCP Server [%s] handler for method %s returned invalid status %s for request %s",
self.name,
msg.method,
status,
msg.id
)
resp.error = { code = JsonRpc.ERROR_INTERNAL, message = "Internal server error" }
end
end
local resp_str = vim.fn.json_encode(resp)
self.proc:write({ resp_str })
end
function Client:_on_proc_stderr(err, data)
if err then
log:error("MCP Server [%s] stderr error: %s", self.name, err)
elseif data then
log:info("MCP Server [%s]: %s", self.name, data)
end
end
function Client:notify(method, params)
assert(self.proc, "MCP Server process is not running.")
local notif = {
jsonrpc = "2.0",
method = method,
params = params,
}
local notif_str = vim.fn.json_encode(notif)
log:debug("MCP Server [%s] sending notification: %s", self.name, notif_str)
self.proc:write({ notif_str })
end
---@param method string
---@param params table<string, any>?
---@param resp_handler fun(resp: MCPResultResponse|MCPErrorResponse)
function Client:request(method, params, resp_handler)
assert(self.proc, "MCP Server process is not running.")
local req_id = next_msg_id()
local req = {
jsonrpc = "2.0",
id = req_id,
method = method,
params = params,
}
if resp_handler then
self.resp_handlers[req_id] = resp_handler
end
local req_str = vim.fn.json_encode(req)
log:debug("MCP Server [%s] sending request %s: %s", self.name, req_id, req_str)
self.proc:write({ req_str })
end
function Client:_handle_server_ping(params)
return "result", {}
end
function Client:_handler_server_roots_list(params)
local roots = {
{ name = "Current Working Directory", uri = vim.uri_from_fname(vim.fn.getcwd()) },
}
local lsp_workspaces = {}
for _, client in pairs(vim.lsp.get_clients()) do
for _, folder in ipairs(client.workspace_folders or {}) do
table.insert(lsp_workspaces, folder.name)
end
end
vim.list.unique(lsp_workspaces)
for _, ws in ipairs(lsp_workspaces) do
table.insert(roots, { name = "LSP Workspace", uri = vim.uri_from_fname(ws) })
end
local git_root = vim.fs.root(".", ".git")
if git_root then
table.insert(roots, { name = "Git Repository Root", uri = vim.uri_from_fname(git_root) })
end
return "result", { roots = roots }
end
local DEFAULT_TOOL_OUTPUT_CALLBACK = {}
function DEFAULT_TOOL_OUTPUT_CALLBACK.success(self, tools, cmd, stdout)
local chat = tools.chat
local output = stdout and table.concat(stdout, "\n")
local args = vim.inspect(self.args)
local for_user = string.format(
"MCP Tool [%s] executed successfully.\nArguments:\n%s\nOutput:\n%s",
self.name,
args,
output
)
chat:add_tool_output(self, output, for_user)
end
function DEFAULT_TOOL_OUTPUT_CALLBACK.error(self, tools, cmd, stderr)
local chat = tools.chat
local err_msg = stderr and table.concat(stderr, "\n") or "<NO ERROR MESSAGE>"
local for_user = string.format(
"MCP Tool [%s] execution failed.\nArguments:\n%s\nError Message:\n%s",
self.name,
vim.inspect(self.args),
err_msg
)
chat:add_tool_output(self, "MCP Tool execution failed:\n" .. err_msg, for_user)
end
function DEFAULT_TOOL_OUTPUT_CALLBACK.prompt(self, tools)
return string.format(
"Please confirm to execute the MCP tool [%s] with arguments:\n%s",
self.name,
vim.inspect(self.args)
)
end
---@param client Client
---@param mcp_tool table
local function build_cc_tool_from_mcp_tool(client, mcp_tool)
-- If tool requires task execution, it's not supported.
if mcp_tool.execution and mcp_tool.execution.taskSupport == "required" then
log:error(
"MCP Tool [%s] requires task execution support, which is not supported",
mcp_tool.name
)
return nil
end
-- name prefix
local prefixed_name = (client.cfg.name_prefix or "") .. mcp_tool.name
-- Get per-tool override if any
local override = (client.cfg.tool_overrides and client.cfg.tool_overrides[mcp_tool.name]) or {}
local tool_opts = vim.tbl_extend("force", client.cfg.default_tool_opts or {}, override.opts or {})
local output_callback =
vim.tbl_extend("force", DEFAULT_TOOL_OUTPUT_CALLBACK, override.output or {})
-- Build tool object.
local tool = {
name = prefixed_name,
opts = tool_opts,
schema = {
type = "function",
["function"] = {
name = prefixed_name,
description = mcp_tool.description,
parameters = mcp_tool.inputSchema,
strict = true,
},
},
system_prompt = override.system_prompt,
cmds = {
function(self, args, input, output_handler)
client:request("tools/call", {
name = mcp_tool.name,
arguments = args,
}, function(resp)
local output
if resp.error then
log:error("MCP Tool [%s] call request failed: %s", prefixed_name, vim.inspect(resp))
output = { status = "error", data = resp.error.message }
elseif resp.result == nil then
log:warn("MCP Tool [%s] call request returned no result", prefixed_name)
output = { status = "success", data = "<NO OUTPUT>" }
else
local output_str
if #resp.result.content == 1 and resp.result.content[1].type == "text" then
output_str = resp.result.content[1].text
else
output_str = vim.inspect(resp.result.content)
end
if resp.result.isError then
log:error("MCP Tool [%s] call returned error: %s", prefixed_name, output_str)
output = { status = "error", data = output_str }
else
log:debug("MCP Tool [%s] call returned success: %s", prefixed_name, output_str)
output = { status = "success", data = output_str }
end
end
vim.schedule(function()
output_handler(output)
end)
end)
end,
},
output = output_callback,
}
local tool_cfg = {
description = mcp_tool.title or mcp_tool.name,
callback = tool,
enabled = override.enabled,
opts = { mcp = { server = client.name } },
}
return prefixed_name, tool_cfg
end
function Client:refresh_tools()
assert(self.ready, "MCP Server is not ready.")
if not self.server_capabilities.tools then
log:warn("MCP Server [%s] does not support tools", self.name)
return
end
self:request("tools/list", nil, function(resp)
if resp.error then
log:error("MCP Server [%s] tools/list request failed: %s", self.name, vim.inspect(resp))
return
end
log:info("MCP Server [%s] available tools: %s", self.name, vim.inspect(resp.result.tools))
local chat_tools = require("codecompanion.config").interactions.chat.tools
local configured_tools = {}
for _, tool in ipairs(resp.result.tools) do
local name, tool_cfg = build_cc_tool_from_mcp_tool(self, tool)
if name and tool_cfg then
local orig = chat_tools[name]
local server_of_orig = orig and vim.tbl_get(orig, "opts", "mcp", "server")
if orig and server_of_orig ~= self.name then
log:warn(
"MCP Tool [%s] from server [%s] conflicts with existing tool from server [%s], skipping",
name,
self.name,
server_of_orig or "<builtin>"
)
else
chat_tools[name] = tool_cfg
table.insert(configured_tools, name)
end
end
end
if #configured_tools == 0 then
log:info("MCP Server [%s] has no valid tools to configure", self.name)
return
end
-- Build server prompts
local server_prompts =
{ string.format("I'm giving you access to tools from MCP server '%s': ${tools}.", self.name) }
local cfg_server_instr = self.cfg.server_instructions
local final_server_instructions
if type(cfg_server_instr) == "function" then
final_server_instructions = cfg_server_instr(self.server_instructions)
elseif type(cfg_server_instr) == "string" then
final_server_instructions = self.server_instructions
else
final_server_instructions = self.server_instructions
end
if final_server_instructions and final_server_instructions ~= "" then
table.insert(server_prompts, "Detailed instructions of this MCP server:")
table.insert(server_prompts, final_server_instructions)
end
chat_tools.groups["mcp." .. self.name] = {
description = string.format("Tools from MCP Server '%s'", self.name),
tools = configured_tools,
prompt = table.concat(server_prompts, "\n"),
opts = { collapse_tools = true },
}
end)
end
---@type table<string, Client>
local clients = {}
---@param configs table<string, ServerConfig>
function M.setup(configs)
for name, cfg in pairs(configs) do
local client = Client:new(name, cfg)
clients[name] = client
end
local au_group = vim.api.nvim_create_augroup("CodeCompanionMCPClients", { clear = true })
vim.api.nvim_create_autocmd("User", {
pattern = "CodeCompanionChatCreated",
group = au_group,
callback = function()
for _, client in pairs(clients) do
client:start_if_not_started()
end
end,
})
end
return M
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment