Created
December 12, 2025 06:14
-
-
Save cairijun/160eeff5b8d3afb19ddb91fb5304b025 to your computer and use it in GitHub Desktop.
Minimal MCP client for CodeCompanion
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| local log = require("codecompanion.utils.log") | |
| local last_msg_id = 0 | |
| local function next_msg_id() | |
| last_msg_id = last_msg_id + 1 | |
| return last_msg_id | |
| end | |
| local fire_nvim_event = function(event_name_suffix, data) | |
| vim.api.nvim_exec_autocmds("User", { | |
| pattern = "CodeCompanionMCP" .. event_name_suffix, | |
| data = data, | |
| }) | |
| end | |
| local JsonRpc = { | |
| ERROR_PARSE = -32700, | |
| ERROR_INVALID_REQUEST = -32600, | |
| ERROR_METHOD_NOT_FOUND = -32601, | |
| ERROR_INVALID_PARAMS = -32602, | |
| ERROR_INTERNAL = -32603, | |
| } | |
| local M = {} | |
| ---@class MCPRequest | |
| ---@field jsonrpc "2.0" | |
| ---@field id integer | string | |
| ---@field method string | |
| ---@field params table<string, any>? | |
| ---@class MCPResultResponse | |
| ---@field jsonrpc "2.0" | |
| ---@field id integer | string | |
| ---@field result table<string, any>? | |
| ---@class MCPErrorResponse | |
| ---@field jsonrpc "2.0" | |
| ---@field id integer | string | |
| ---@field error { code: integer, message: string, data: any? } | |
| ---ToolOverride allows per-tool customization | |
| ---@class ToolOverride | |
| ---@field opts table? | |
| ---@field enabled nil | boolean | fun(): boolean | |
| ---@field system_prompt string? | |
| ---@field output table<string, any>? override parts of the tool output handler (success, error, prompt, rejected) | |
| ---Configuration for starting and connecting to an MCP server | |
| ---@class ServerConfig | |
| ---@field cmd string[] | |
| ---@field env table<string, string>? | |
| ---@field name_prefix string? prefix to add to all tool names from this server to avoid conflicts | |
| ---@field server_instructions nil | string | fun(orig_server_instructions: string): string override the server-level instructions | |
| ---@field default_tool_opts table<string, any>? default options to apply to all tools from this server | |
| ---@field tool_overrides table<string, ToolOverride>? per-tool overrides by tool name | |
| ---@alias ServerRequestHandler fun(cli: Client, params: table<string, any>?): "result" | "error", table<string, any> | |
| ---@class Client | |
| ---@field name string | |
| ---@field cfg ServerConfig | |
| ---@field ready boolean | |
| ---@field resp_handlers table<integer, fun(resp: MCPResultResponse|MCPErrorResponse)> | |
| ---@field server_request_handlers table<string, ServerRequestHandler> | |
| ---@field proc vim.SystemObj? | |
| ---@field _last_line_tail string? | |
| ---@field server_capabilities table<string, any>? | |
| ---@field server_instructions string? | |
| local Client = {} | |
| Client.__index = Client | |
| ---@param name string | |
| ---@param cfg ServerConfig | |
| function Client:new(name, cfg) | |
| return setmetatable({ | |
| name = name, | |
| cfg = cfg, | |
| ready = false, | |
| resp_handlers = {}, | |
| server_request_handlers = { | |
| ["ping"] = self._handle_server_ping, | |
| ["roots/list"] = self._handler_server_roots_list, | |
| }, | |
| }, self) | |
| end | |
| function Client:start_if_not_started() | |
| if self.proc then | |
| return | |
| end | |
| log:info("Starting MCP Server [%s] with command: %s", self.name, table.concat(self.cfg.cmd, " ")) | |
| fire_nvim_event("ServerStart", { name = self.name }) | |
| local sys_obj = vim.system( | |
| self.cfg.cmd, | |
| { | |
| env = self.cfg.env, | |
| text = true, | |
| stdin = true, | |
| stdout = vim.schedule_wrap(function(err, data) | |
| self:_on_proc_stdout(err, data) | |
| end), | |
| stderr = vim.schedule_wrap(function(err, data) | |
| self:_on_proc_stderr(err, data) | |
| end), | |
| }, | |
| vim.schedule_wrap(function(out) | |
| self:_on_proc_exit(out) | |
| end) | |
| ) | |
| self.proc = sys_obj | |
| self:_start_init_mcp() | |
| end | |
| function Client:_start_init_mcp() | |
| assert(self.proc, "MCP Server process is not running.") | |
| assert(not self.ready, "MCP Server is already initialized.") | |
| self:request("initialize", { | |
| protocolVersion = "2025-11-25", | |
| clientInfo = { | |
| name = "CodeCompanion.nvim", | |
| version = "0.0.0", | |
| }, | |
| capabilities = { | |
| roots = { listChanged = false }, | |
| }, | |
| }, function(resp) | |
| if resp.error then | |
| log:error("MCP Server [%s] initialization failed: %s", self.name, vim.inspect(resp)) | |
| return | |
| end | |
| log:info("MCP Server [%s] initialized successfully.", self.name) | |
| log:info("MCP Server [%s] protocol version: %s", self.name, resp.result.protocolVersion) | |
| log:info("MCP Server [%s] info: %s", self.name, vim.inspect(resp.result.serverInfo)) | |
| log:info("MCP Server [%s] capabilities: %s", self.name, vim.inspect(resp.result.capabilities)) | |
| self:notify("notifications/initialized") | |
| self.server_capabilities = resp.result.capabilities | |
| self.server_instructions = resp.result.instructions | |
| self.ready = true | |
| fire_nvim_event("ServerReady", { name = self.name }) | |
| self:refresh_tools() | |
| end) | |
| end | |
| ---@param out vim.SystemCompleted | |
| function Client:_on_proc_exit(out) | |
| self.ready = false | |
| self.proc = nil | |
| if out.code == 0 then | |
| log:info("MCP Server [%s] exited normally.", self.name) | |
| else | |
| log:warn("MCP Server [%s] exited with code %d, signal %d", self.name, out.code, out.signal) | |
| end | |
| fire_nvim_event("ServerExit", { name = self.name, code = out.code, signal = out.signal }) | |
| end | |
| function Client:_on_proc_stdout(err, data) | |
| if err then | |
| log:error("MCP Server [%s] stdout error: %s", self.name, err) | |
| return | |
| end | |
| if not data or data == "" then | |
| return | |
| end | |
| local combined_data = "" | |
| if self._last_line_tail then | |
| combined_data = self._last_line_tail .. data | |
| self._last_line_tail = nil | |
| else | |
| combined_data = data | |
| end | |
| -- Check if there's a partial line at the end without a newline | |
| local last_newline_pos = combined_data:match(".*()\n") | |
| if last_newline_pos == nil then | |
| -- No newline found, store the entire data as tail | |
| self._last_line_tail = combined_data | |
| return | |
| elseif last_newline_pos < #combined_data then | |
| -- Partial line found, store it as tail | |
| self._last_line_tail = combined_data:sub(last_newline_pos + 1) | |
| combined_data = combined_data:sub(1, last_newline_pos) | |
| end | |
| for line in vim.gsplit(combined_data, "\n", { plain = true, trimempty = true }) do | |
| if line == "" then | |
| goto continue | |
| end | |
| local ok, msg = pcall(vim.fn.json_decode, line) | |
| if not ok then | |
| log:error("MCP Server [%s] failed to decode received line [%s]: %s", self.name, msg, line) | |
| goto continue | |
| end | |
| if type(msg) ~= "table" or msg.jsonrpc ~= "2.0" then | |
| log:error("MCP Server [%s] received invalid MCP message: %s", self.name, line) | |
| goto continue | |
| end | |
| if msg.id == nil then | |
| log:info("MCP Server [%s] received notification: %s", self.name, line) | |
| goto continue | |
| end | |
| if msg.method then | |
| self:_handle_server_request(msg) | |
| else | |
| local handler = self.resp_handlers[msg.id] | |
| if handler then | |
| self.resp_handlers[msg.id] = nil | |
| local handle_ok, handle_result = pcall(handler, msg) | |
| if handle_ok then | |
| log:debug("MCP Server [%s] response handler succeeded for request %s", self.name, msg.id) | |
| else | |
| log:error( | |
| "MCP Server [%s] response handler failed for request %s: %s", | |
| self.name, | |
| msg.id, | |
| handle_result | |
| ) | |
| end | |
| else | |
| log:warn( | |
| "MCP Server [%s] received response with unknown id %s: %s", | |
| self.name, | |
| msg.id, | |
| line | |
| ) | |
| end | |
| end | |
| ::continue:: | |
| end | |
| end | |
| function Client:_handle_server_request(msg) | |
| assert(self.proc, "MCP Server process is not running.") | |
| local resp = { | |
| jsonrpc = "2.0", | |
| id = msg.id, | |
| } | |
| local handler = self.server_request_handlers[msg.method] | |
| if not handler then | |
| log:warn( | |
| "MCP Server [%s] received request %s with unknown method %s", | |
| self.name, | |
| msg.id, | |
| msg.method | |
| ) | |
| resp.error = { code = JsonRpc.ERROR_METHOD_NOT_FOUND, message = "Method not found" } | |
| else | |
| local ok, status, body = pcall(handler, self, msg.params) | |
| if not ok then | |
| log:error( | |
| "MCP Server [%s] handler for method %s failed for request %s: %s", | |
| self.name, | |
| msg.method, | |
| msg.id, | |
| status | |
| ) | |
| resp.error = { code = JsonRpc.ERROR_INTERNAL, message = status } | |
| elseif status == "error" then | |
| log:error( | |
| "MCP Server [%s] handler for method %s returned error for request %s: %s", | |
| self.name, | |
| msg.method, | |
| msg.id, | |
| vim.inspect(body) | |
| ) | |
| resp.error = body | |
| elseif status == "result" then | |
| log:debug( | |
| "MCP Server [%s] handler for method %s returned result for request %s", | |
| self.name, | |
| msg.method, | |
| msg.id | |
| ) | |
| resp.result = body | |
| else | |
| log:error( | |
| "MCP Server [%s] handler for method %s returned invalid status %s for request %s", | |
| self.name, | |
| msg.method, | |
| status, | |
| msg.id | |
| ) | |
| resp.error = { code = JsonRpc.ERROR_INTERNAL, message = "Internal server error" } | |
| end | |
| end | |
| local resp_str = vim.fn.json_encode(resp) | |
| self.proc:write({ resp_str }) | |
| end | |
| function Client:_on_proc_stderr(err, data) | |
| if err then | |
| log:error("MCP Server [%s] stderr error: %s", self.name, err) | |
| elseif data then | |
| log:info("MCP Server [%s]: %s", self.name, data) | |
| end | |
| end | |
| function Client:notify(method, params) | |
| assert(self.proc, "MCP Server process is not running.") | |
| local notif = { | |
| jsonrpc = "2.0", | |
| method = method, | |
| params = params, | |
| } | |
| local notif_str = vim.fn.json_encode(notif) | |
| log:debug("MCP Server [%s] sending notification: %s", self.name, notif_str) | |
| self.proc:write({ notif_str }) | |
| end | |
| ---@param method string | |
| ---@param params table<string, any>? | |
| ---@param resp_handler fun(resp: MCPResultResponse|MCPErrorResponse) | |
| function Client:request(method, params, resp_handler) | |
| assert(self.proc, "MCP Server process is not running.") | |
| local req_id = next_msg_id() | |
| local req = { | |
| jsonrpc = "2.0", | |
| id = req_id, | |
| method = method, | |
| params = params, | |
| } | |
| if resp_handler then | |
| self.resp_handlers[req_id] = resp_handler | |
| end | |
| local req_str = vim.fn.json_encode(req) | |
| log:debug("MCP Server [%s] sending request %s: %s", self.name, req_id, req_str) | |
| self.proc:write({ req_str }) | |
| end | |
| function Client:_handle_server_ping(params) | |
| return "result", {} | |
| end | |
| function Client:_handler_server_roots_list(params) | |
| local roots = { | |
| { name = "Current Working Directory", uri = vim.uri_from_fname(vim.fn.getcwd()) }, | |
| } | |
| local lsp_workspaces = {} | |
| for _, client in pairs(vim.lsp.get_clients()) do | |
| for _, folder in ipairs(client.workspace_folders or {}) do | |
| table.insert(lsp_workspaces, folder.name) | |
| end | |
| end | |
| vim.list.unique(lsp_workspaces) | |
| for _, ws in ipairs(lsp_workspaces) do | |
| table.insert(roots, { name = "LSP Workspace", uri = vim.uri_from_fname(ws) }) | |
| end | |
| local git_root = vim.fs.root(".", ".git") | |
| if git_root then | |
| table.insert(roots, { name = "Git Repository Root", uri = vim.uri_from_fname(git_root) }) | |
| end | |
| return "result", { roots = roots } | |
| end | |
| local DEFAULT_TOOL_OUTPUT_CALLBACK = {} | |
| function DEFAULT_TOOL_OUTPUT_CALLBACK.success(self, tools, cmd, stdout) | |
| local chat = tools.chat | |
| local output = stdout and table.concat(stdout, "\n") | |
| local args = vim.inspect(self.args) | |
| local for_user = string.format( | |
| "MCP Tool [%s] executed successfully.\nArguments:\n%s\nOutput:\n%s", | |
| self.name, | |
| args, | |
| output | |
| ) | |
| chat:add_tool_output(self, output, for_user) | |
| end | |
| function DEFAULT_TOOL_OUTPUT_CALLBACK.error(self, tools, cmd, stderr) | |
| local chat = tools.chat | |
| local err_msg = stderr and table.concat(stderr, "\n") or "<NO ERROR MESSAGE>" | |
| local for_user = string.format( | |
| "MCP Tool [%s] execution failed.\nArguments:\n%s\nError Message:\n%s", | |
| self.name, | |
| vim.inspect(self.args), | |
| err_msg | |
| ) | |
| chat:add_tool_output(self, "MCP Tool execution failed:\n" .. err_msg, for_user) | |
| end | |
| function DEFAULT_TOOL_OUTPUT_CALLBACK.prompt(self, tools) | |
| return string.format( | |
| "Please confirm to execute the MCP tool [%s] with arguments:\n%s", | |
| self.name, | |
| vim.inspect(self.args) | |
| ) | |
| end | |
| ---@param client Client | |
| ---@param mcp_tool table | |
| local function build_cc_tool_from_mcp_tool(client, mcp_tool) | |
| -- If tool requires task execution, it's not supported. | |
| if mcp_tool.execution and mcp_tool.execution.taskSupport == "required" then | |
| log:error( | |
| "MCP Tool [%s] requires task execution support, which is not supported", | |
| mcp_tool.name | |
| ) | |
| return nil | |
| end | |
| -- name prefix | |
| local prefixed_name = (client.cfg.name_prefix or "") .. mcp_tool.name | |
| -- Get per-tool override if any | |
| local override = (client.cfg.tool_overrides and client.cfg.tool_overrides[mcp_tool.name]) or {} | |
| local tool_opts = vim.tbl_extend("force", client.cfg.default_tool_opts or {}, override.opts or {}) | |
| local output_callback = | |
| vim.tbl_extend("force", DEFAULT_TOOL_OUTPUT_CALLBACK, override.output or {}) | |
| -- Build tool object. | |
| local tool = { | |
| name = prefixed_name, | |
| opts = tool_opts, | |
| schema = { | |
| type = "function", | |
| ["function"] = { | |
| name = prefixed_name, | |
| description = mcp_tool.description, | |
| parameters = mcp_tool.inputSchema, | |
| strict = true, | |
| }, | |
| }, | |
| system_prompt = override.system_prompt, | |
| cmds = { | |
| function(self, args, input, output_handler) | |
| client:request("tools/call", { | |
| name = mcp_tool.name, | |
| arguments = args, | |
| }, function(resp) | |
| local output | |
| if resp.error then | |
| log:error("MCP Tool [%s] call request failed: %s", prefixed_name, vim.inspect(resp)) | |
| output = { status = "error", data = resp.error.message } | |
| elseif resp.result == nil then | |
| log:warn("MCP Tool [%s] call request returned no result", prefixed_name) | |
| output = { status = "success", data = "<NO OUTPUT>" } | |
| else | |
| local output_str | |
| if #resp.result.content == 1 and resp.result.content[1].type == "text" then | |
| output_str = resp.result.content[1].text | |
| else | |
| output_str = vim.inspect(resp.result.content) | |
| end | |
| if resp.result.isError then | |
| log:error("MCP Tool [%s] call returned error: %s", prefixed_name, output_str) | |
| output = { status = "error", data = output_str } | |
| else | |
| log:debug("MCP Tool [%s] call returned success: %s", prefixed_name, output_str) | |
| output = { status = "success", data = output_str } | |
| end | |
| end | |
| vim.schedule(function() | |
| output_handler(output) | |
| end) | |
| end) | |
| end, | |
| }, | |
| output = output_callback, | |
| } | |
| local tool_cfg = { | |
| description = mcp_tool.title or mcp_tool.name, | |
| callback = tool, | |
| enabled = override.enabled, | |
| opts = { mcp = { server = client.name } }, | |
| } | |
| return prefixed_name, tool_cfg | |
| end | |
| function Client:refresh_tools() | |
| assert(self.ready, "MCP Server is not ready.") | |
| if not self.server_capabilities.tools then | |
| log:warn("MCP Server [%s] does not support tools", self.name) | |
| return | |
| end | |
| self:request("tools/list", nil, function(resp) | |
| if resp.error then | |
| log:error("MCP Server [%s] tools/list request failed: %s", self.name, vim.inspect(resp)) | |
| return | |
| end | |
| log:info("MCP Server [%s] available tools: %s", self.name, vim.inspect(resp.result.tools)) | |
| local chat_tools = require("codecompanion.config").interactions.chat.tools | |
| local configured_tools = {} | |
| for _, tool in ipairs(resp.result.tools) do | |
| local name, tool_cfg = build_cc_tool_from_mcp_tool(self, tool) | |
| if name and tool_cfg then | |
| local orig = chat_tools[name] | |
| local server_of_orig = orig and vim.tbl_get(orig, "opts", "mcp", "server") | |
| if orig and server_of_orig ~= self.name then | |
| log:warn( | |
| "MCP Tool [%s] from server [%s] conflicts with existing tool from server [%s], skipping", | |
| name, | |
| self.name, | |
| server_of_orig or "<builtin>" | |
| ) | |
| else | |
| chat_tools[name] = tool_cfg | |
| table.insert(configured_tools, name) | |
| end | |
| end | |
| end | |
| if #configured_tools == 0 then | |
| log:info("MCP Server [%s] has no valid tools to configure", self.name) | |
| return | |
| end | |
| -- Build server prompts | |
| local server_prompts = | |
| { string.format("I'm giving you access to tools from MCP server '%s': ${tools}.", self.name) } | |
| local cfg_server_instr = self.cfg.server_instructions | |
| local final_server_instructions | |
| if type(cfg_server_instr) == "function" then | |
| final_server_instructions = cfg_server_instr(self.server_instructions) | |
| elseif type(cfg_server_instr) == "string" then | |
| final_server_instructions = self.server_instructions | |
| else | |
| final_server_instructions = self.server_instructions | |
| end | |
| if final_server_instructions and final_server_instructions ~= "" then | |
| table.insert(server_prompts, "Detailed instructions of this MCP server:") | |
| table.insert(server_prompts, final_server_instructions) | |
| end | |
| chat_tools.groups["mcp." .. self.name] = { | |
| description = string.format("Tools from MCP Server '%s'", self.name), | |
| tools = configured_tools, | |
| prompt = table.concat(server_prompts, "\n"), | |
| opts = { collapse_tools = true }, | |
| } | |
| end) | |
| end | |
| ---@type table<string, Client> | |
| local clients = {} | |
| ---@param configs table<string, ServerConfig> | |
| function M.setup(configs) | |
| for name, cfg in pairs(configs) do | |
| local client = Client:new(name, cfg) | |
| clients[name] = client | |
| end | |
| local au_group = vim.api.nvim_create_augroup("CodeCompanionMCPClients", { clear = true }) | |
| vim.api.nvim_create_autocmd("User", { | |
| pattern = "CodeCompanionChatCreated", | |
| group = au_group, | |
| callback = function() | |
| for _, client in pairs(clients) do | |
| client:start_if_not_started() | |
| end | |
| end, | |
| }) | |
| end | |
| return M |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment