Created
July 11, 2025 08:12
-
-
Save mapstack-chris/92d122cd517a5090032fe2a7c74587bb to your computer and use it in GitHub Desktop.
A lamdba transport for modelcontextprotocol/typescript-sdk
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { ResponseStream } from 'lambda-stream'; | |
| import { Logger } from '../../shared/utils/logger.js'; | |
| // Define the Transport interface based on MCP SDK requirements | |
| interface Transport { | |
| start(): Promise<void>; | |
| send(message: any): Promise<void>; | |
| close(): Promise<void>; | |
| onclose?: () => void; | |
| onerror?: (error: Error) => void; | |
| onmessage?: (message: any) => void; | |
| } | |
| // Extended interface for our Lambda transport | |
| interface LambdaTransport extends Transport { | |
| processPendingMessage(): Promise<void>; | |
| waitForProcessing(): Promise<void>; | |
| } | |
| /** | |
| * AWS Lambda transport implementation for MCP SDK. | |
| * Provides stateless JSON-RPC communication over Lambda function URLs. | |
| */ | |
| export class LambdaServerTransport implements LambdaTransport { | |
| private responseStream: ResponseStream; | |
| private event: any; | |
| private closed = false; | |
| private headersSent = false; | |
| private requestId: string; | |
| private pendingMessage: any = null; | |
| private messageProcessed: Promise<void>; | |
| private resolveMessageProcessed!: () => void; | |
| constructor(event: any, responseStream: ResponseStream) { | |
| this.event = event; | |
| this.responseStream = responseStream; | |
| this.requestId = event.requestContext?.requestId || 'unknown'; | |
| // Create a promise that will be resolved when the message is processed | |
| this.messageProcessed = new Promise((resolve) => { | |
| this.resolveMessageProcessed = resolve; | |
| }); | |
| } | |
| /** | |
| * Start processing the incoming request. | |
| * Parses the Lambda event body and triggers the onmessage callback. | |
| */ | |
| async start(): Promise<void> { | |
| try { | |
| Logger.info('LambdaServerTransport starting', { | |
| requestId: this.requestId, | |
| method: this.event.requestContext?.http?.method || this.event.httpMethod, | |
| path: this.event.rawPath || this.event.path | |
| }); | |
| // Validate HTTP method (Lambda Function URL uses requestContext.http.method) | |
| const httpMethod = this.event.requestContext?.http?.method || this.event.httpMethod; | |
| if (httpMethod !== 'POST') { | |
| throw new Error(`Unsupported HTTP method: ${httpMethod}`); | |
| } | |
| // Parse the incoming request body | |
| let body: any; | |
| try { | |
| body = JSON.parse(this.event.body || '{}'); | |
| } catch (parseError) { | |
| throw new Error('Invalid JSON in request body'); | |
| } | |
| // Validate JSON-RPC structure | |
| if (!body.jsonrpc || body.jsonrpc !== '2.0') { | |
| throw new Error('Invalid JSON-RPC version'); | |
| } | |
| Logger.info('LambdaServerTransport parsed request', { | |
| requestId: this.requestId, | |
| method: body.method, | |
| hasParams: !!body.params, | |
| id: body.id | |
| }); | |
| // Store the message for processing after handlers are registered | |
| this.pendingMessage = body; | |
| } catch (error) { | |
| Logger.error('LambdaServerTransport start error', { | |
| requestId: this.requestId, | |
| error: error instanceof Error ? error.message : 'Unknown error', | |
| stack: error instanceof Error ? error.stack : undefined | |
| }); | |
| if (this.onerror) { | |
| this.onerror(error instanceof Error ? error : new Error(String(error))); | |
| } | |
| // Send error response | |
| await this.sendError(error instanceof Error ? error.message : 'Unknown error'); | |
| this.resolveMessageProcessed(); | |
| } | |
| } | |
| /** | |
| * Process the pending message. | |
| * This should be called after handlers are registered. | |
| */ | |
| async processPendingMessage(): Promise<void> { | |
| if (!this.pendingMessage) { | |
| Logger.warn('No pending message to process', { requestId: this.requestId }); | |
| this.resolveMessageProcessed(); | |
| return; | |
| } | |
| try { | |
| // Trigger the onmessage callback with the parsed message | |
| if (this.onmessage) { | |
| Logger.info('LambdaServerTransport triggering onmessage', { | |
| requestId: this.requestId, | |
| hasOnmessage: !!this.onmessage, | |
| bodyMethod: this.pendingMessage.method, | |
| bodyId: this.pendingMessage.id | |
| }); | |
| this.onmessage(this.pendingMessage); | |
| } else { | |
| Logger.warn('LambdaServerTransport: no onmessage handler', { | |
| requestId: this.requestId | |
| }); | |
| // If no handler, send method not found error | |
| await this.send({ | |
| jsonrpc: '2.0', | |
| id: this.pendingMessage.id, | |
| error: { | |
| code: -32601, | |
| message: 'Method not found' | |
| } | |
| }); | |
| } | |
| } catch (error) { | |
| Logger.error('Error processing message', { | |
| requestId: this.requestId, | |
| error: error instanceof Error ? error.message : 'Unknown error' | |
| }); | |
| if (this.onerror) { | |
| this.onerror(error instanceof Error ? error : new Error(String(error))); | |
| } | |
| } finally { | |
| this.resolveMessageProcessed(); | |
| } | |
| } | |
| /** | |
| * Wait for message processing to complete. | |
| */ | |
| async waitForProcessing(): Promise<void> { | |
| await this.messageProcessed; | |
| } | |
| /** | |
| * Send a JSON-RPC response message. | |
| * Writes the message to the Lambda response stream. | |
| */ | |
| async send(message: any): Promise<void> { | |
| if (this.closed) { | |
| throw new Error('Transport is closed'); | |
| } | |
| try { | |
| // Initialize response headers if not already sent | |
| if (!this.headersSent) { | |
| const metadata = { | |
| statusCode: 200, | |
| headers: { | |
| 'Content-Type': 'application/json', | |
| 'Access-Control-Allow-Origin': '*', | |
| 'Access-Control-Allow-Methods': 'POST, OPTIONS', | |
| 'Access-Control-Allow-Headers': 'Content-Type', | |
| 'X-Request-ID': this.requestId | |
| } | |
| }; | |
| // Use lambda-stream's awslambda wrapper if available | |
| const awslambda = (globalThis as any).awslambda; | |
| if (awslambda?.HttpResponseStream) { | |
| this.responseStream = awslambda.HttpResponseStream.from( | |
| this.responseStream, | |
| metadata | |
| ); | |
| } else { | |
| // Fallback for local development | |
| this.responseStream.setContentType('application/json'); | |
| } | |
| this.headersSent = true; | |
| } | |
| // Write the JSON-RPC response | |
| const responseText = JSON.stringify(message); | |
| Logger.info('LambdaServerTransport sending response', { | |
| requestId: this.requestId, | |
| responseId: message.id, | |
| hasResult: !!message.result, | |
| hasError: !!message.error, | |
| responseLength: responseText.length | |
| }); | |
| this.responseStream.write(responseText); | |
| // End the stream after writing the response | |
| // This ensures we only send one response per request | |
| this.responseStream.end(); | |
| this.closed = true; | |
| } catch (error) { | |
| Logger.error('LambdaServerTransport send error', { | |
| requestId: this.requestId, | |
| error: error instanceof Error ? error.message : 'Unknown error' | |
| }); | |
| if (this.onerror) { | |
| this.onerror(error instanceof Error ? error : new Error(String(error))); | |
| } | |
| throw error; | |
| } | |
| } | |
| /** | |
| * Close the transport and end the response stream. | |
| */ | |
| async close(): Promise<void> { | |
| if (!this.closed) { | |
| this.closed = true; | |
| Logger.info('LambdaServerTransport closing', { | |
| requestId: this.requestId, | |
| headersSent: this.headersSent | |
| }); | |
| // Only end the response stream if we haven't sent any response | |
| if (!this.headersSent) { | |
| this.responseStream.end(); | |
| } | |
| // Trigger onclose callback if provided | |
| if (this.onclose) { | |
| this.onclose(); | |
| } | |
| } | |
| } | |
| /** | |
| * Send an error response directly. | |
| * Used when errors occur before the MCP server can handle them. | |
| */ | |
| private async sendError(errorMessage: string): Promise<void> { | |
| if (this.closed || this.headersSent) { | |
| return; | |
| } | |
| try { | |
| const metadata = { | |
| statusCode: 500, | |
| headers: { | |
| 'Content-Type': 'application/json', | |
| 'Access-Control-Allow-Origin': '*', | |
| 'X-Request-ID': this.requestId | |
| } | |
| }; | |
| const awslambda = (globalThis as any).awslambda; | |
| if (awslambda?.HttpResponseStream) { | |
| this.responseStream = awslambda.HttpResponseStream.from( | |
| this.responseStream, | |
| metadata | |
| ); | |
| } else { | |
| this.responseStream.setContentType('application/json'); | |
| } | |
| const errorResponse = { | |
| jsonrpc: '2.0', | |
| error: { | |
| code: -32603, | |
| message: 'Internal error', | |
| data: { | |
| message: errorMessage, | |
| requestId: this.requestId | |
| } | |
| }, | |
| id: null | |
| }; | |
| this.responseStream.write(JSON.stringify(errorResponse)); | |
| this.responseStream.end(); | |
| this.closed = true; | |
| this.headersSent = true; | |
| } catch (writeError) { | |
| Logger.error('LambdaServerTransport sendError failed', { | |
| requestId: this.requestId, | |
| error: writeError instanceof Error ? writeError.message : 'Unknown error' | |
| }); | |
| } | |
| } | |
| // Transport callbacks | |
| onclose?: () => void; | |
| onerror?: (error: Error) => void; | |
| onmessage?: (message: any) => void; | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment