Skip to content

Instantly share code, notes, and snippets.

@xeioex
Created January 16, 2026 06:18
Show Gist options
  • Select an option

  • Save xeioex/d3ad5af9c0c8dca12f53aa4a9bad57ff to your computer and use it in GitHub Desktop.

Select an option

Save xeioex/d3ad5af9c0c8dca12f53aa4a9bad57ff to your computer and use it in GitHub Desktop.
receivers:
otlp:
protocols:
grpc:
http:
connectors:
spanmetrics:
dimensions:
- name: mcp.name
- name: mcp.status
histogram:
explicit:
buckets: [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5]
exporters:
prometheus:
endpoint: 0.0.0.0:9464
service:
pipelines:
traces:
receivers: [otlp]
exporters: [spanmetrics]
metrics:
receivers: [spanmetrics]
exporters: [prometheus]
load_module /home/xeioex/workspace/nginx/nginx/objs/ngx_otel_module.so;
error_log /dev/stdout info;
daemon off;
master_process off;
events {
}
http {
js_import main from mcp.js;
js_set $mcp_tool main.mcp_tool_name;
js_set $mcp_status main.mcp_tool_status;
otel_exporter {
endpoint localhost:4317;
}
server {
listen 9000;
location / {
otel_trace on;
otel_span_attr "mcp.name" $mcp_tool;
otel_span_attr "mcp.status" $mcp_status;
js_header_filter main.clear_content_length;
js_body_filter main.filter;
proxy_pass http://127.0.0.1:9001;
}
}
}
var _mcp_messages = [];
var _mcp_buffer = "";
function clear_content_length(r) {
delete r.headersOut['Content-Length'];
}
function filter(r, data, flags) {
_mcp_buffer += data;
r.sendBuffer(data, flags);
// Check for "data: " prefix and a complete SSE message
// We only care about the first message
if (_mcp_messages.length === 0) {
var sse_messages = _mcp_buffer.split(/\\n\\n/);
for (var i = 0; i < sse_messages.length; i++) {
var msg = sse_messages[i];
if (msg.startsWith("data: ")) {
var json_str = msg.substring(6); // "data: ".length
try {
var json_obj = JSON.parse(json_str);
_mcp_messages.push(json_obj);
r.done();
return;
} catch (e) {
}
}
}
}
}
function getPath(r, json_obj, path) {
if (!json_obj) {
return undefined;
}
var parts = path.split('.');
var current = json_obj;
for (var i = 0; i < parts.length; i++) {
var part = parts[i];
if (typeof current !== 'object' || current === null || !current.hasOwnProperty(part)) {
return undefined;
}
current = current[part];
}
return current;
}
function has_error(r) {
if (_mcp_messages.length === 0) {
return false; // No message parsed yet, assume no error
}
var first_message = _mcp_messages[0];
// JSON-RPC2 error response
if (getPath(r, first_message, "error")) {
r.log("Error found in JSON-RPC2 response");
return true;
}
// Tool error (IsError: true in result)
if (getPath(r, first_message, "result.isError")) {
r.log("Error found in tool execution result");
return true;
}
return false;
}
function mcp_tool_name(r) {
var body = JSON.parse(r.requestText);
var method = body.method;
if (method == 'tools/call') {
return body.params.name;
}
return '';
}
function mcp_tool_status(r) {
if (has_error(r)) {
return 'error';
}
return 'ok';
}
function track_mcp(r) {
var body = JSON.parse(r.requestText);
var method = body.method;
if (method == 'tools/call') {
return `${body.params.name} lat:${r.variables.request_time} error:${has_error(r)}`;
}
return '';
}
export default { filter, clear_content_length, has_error, track_mcp, mcp_tool_name, mcp_tool_status };
package main
import (
"bufio"
"bytes"
"context"
"crypto/rand"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"math/big"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
)
// Configuration
type Config struct {
URL string
Duration time.Duration
Workers int
MaxRequests int
ToolCount int
HeaderName string // For session ID if needed, though spec says mcp-session-id
}
// Statistics
type Stats struct {
Requests atomic.Int64
Errors atomic.Int64
Success atomic.Int64
BytesRx atomic.Int64
LatencySum atomic.Int64 // Microseconds
}
// JSON-RPC 2.0 Types
type JSONRPCRequest struct {
JSONRPC string `json:"jsonrpc"`
Method string `json:"method"`
Params Params `json:"params"`
ID int64 `json:"id"`
}
type Params struct {
Name string `json:"name"`
Arguments map[string]interface{} `json:"arguments"`
Meta map[string]interface{} `json:"_meta,omitempty"`
}
type JSONRPCResponse struct {
JSONRPC string `json:"jsonrpc"`
ID *int64 `json:"id,omitempty"` // ID can be null in error parsing
Result json.RawMessage `json:"result,omitempty"`
Error *JSONRPCError `json:"error,omitempty"`
}
type JSONRPCError struct {
Code int `json:"code"`
Message string `json:"message"`
Data json.RawMessage `json:"data,omitempty"`
}
// Tool definitions for variety
var baseToolNames = []string{
"get_forecast", "get_stock_price", "search_web", "translate_text",
"calculate_sum", "resize_image", "send_email", "query_database",
}
func main() {
// Parse Flags
targetURL := flag.String("url", "http://127.0.0.1:9000/mcp", "Target MCP Endpoint URL")
durationStr := flag.String("duration", "10s", "Benchmark duration (e.g., 10s, 1m)")
workers := flag.Int("workers", 10, "Number of concurrent workers")
maxRequests := flag.Int("max-requests", 0, "Max number of requests per worker (0 = unlimited)")
toolCount := flag.Int("tools", 64, "Number of distinct tool names to generate")
flag.Parse()
duration, err := time.ParseDuration(*durationStr)
if err != nil {
log.Fatalf("Invalid duration: %v", err)
}
config := &Config{
URL: *targetURL,
Duration: duration,
Workers: *workers,
MaxRequests: *maxRequests,
ToolCount: *toolCount,
}
fmt.Printf("Starting MCP Mock Client\n")
fmt.Printf("Target: %s\n", config.URL)
if config.MaxRequests > 0 {
fmt.Printf("Duration: Unlimited (capped by %d requests/worker)\n", config.MaxRequests)
} else {
fmt.Printf("Duration: %s\n", config.Duration)
}
fmt.Printf("Workers: %d\n", config.Workers)
fmt.Printf("Tools Variety: %d\n", config.ToolCount)
// Setup Tools
tools := generateTools(config.ToolCount)
// Stats
stats := &Stats{}
// HTTP Client
client := &http.Client{
Transport: &http.Transport{
MaxIdleConns: config.Workers,
MaxIdleConnsPerHost: config.Workers,
IdleConnTimeout: 90 * time.Second,
},
Timeout: 30 * time.Second,
}
var ctx context.Context
var cancel context.CancelFunc
if config.MaxRequests > 0 {
ctx, cancel = context.WithCancel(context.Background())
} else {
ctx, cancel = context.WithTimeout(context.Background(), config.Duration)
}
// Ensure cancel is called to clean up resources and stop the status printer
defer cancel()
var wg sync.WaitGroup
startTime := time.Now()
// Start Workers
for i := 0; i < config.Workers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
runWorker(ctx, client, config, tools, stats, workerID)
}(i)
}
// Status Printer
go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
req := stats.Requests.Load()
errs := stats.Errors.Load()
elapsed := time.Since(startTime).Seconds()
rps := float64(req) / elapsed
fmt.Printf("Requests: %d | Errors: %d | RPS: %.2f\n", req, errs, rps)
}
}
}()
wg.Wait()
// Stop the status printer if it's still running (especially in max-requests mode)
cancel()
printFinalStats(stats, time.Since(startTime))
}
func runWorker(ctx context.Context, client *http.Client, cfg *Config, tools []string, stats *Stats, workerID int) {
// Unique Session ID per worker or per run? Log says "mcp-session-id",
// typically constant for a session. Let's give each worker a session or generate new ones.
// "mock mcp client" -> usually one client = one session, but load test might simulate many.
// Let's generate a session ID per worker to simulate multiple users.
sessionID := generateUUID()
var reqID int64 = 0
for {
select {
case <-ctx.Done():
return
default:
}
if cfg.MaxRequests > 0 && reqID >= int64(cfg.MaxRequests) {
return
}
reqID++
tool := tools[randInt(len(tools))]
// Payload
payload := JSONRPCRequest{
JSONRPC: "2.0",
Method: "tools/call",
Params: Params{
Name: tool,
Arguments: generateRandomArgs(tool),
// _meta ignored
},
ID: reqID,
}
bodyBytes, err := json.Marshal(payload)
if err != nil {
log.Printf("Marshal error: %v", err)
continue
}
req, err := http.NewRequestWithContext(ctx, "POST", cfg.URL, bytes.NewReader(bodyBytes))
if err != nil {
if ctx.Err() == nil {
log.Printf("Request creation error: %v", err)
}
return
}
// Headers
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json, text/event-stream")
req.Header.Set("mcp-session-id", sessionID)
req.Header.Set("mcp-protocol-version", "2025-06-18")
req.Header.Set("User-Agent", "mcp-mock-client/1.0")
start := time.Now()
resp, err := client.Do(req)
if err != nil {
stats.Errors.Add(1)
continue
}
// Process Response
readBytes, success := processResponse(resp, reqID)
duration := time.Since(start).Microseconds()
resp.Body.Close()
stats.Requests.Add(1)
stats.LatencySum.Add(duration)
stats.BytesRx.Add(readBytes)
if success {
stats.Success.Add(1)
} else {
stats.Errors.Add(1)
}
}
}
func processResponse(resp *http.Response, expectedID int64) (int64, bool) {
if resp.StatusCode != 200 {
return 0, false
}
contentType := resp.Header.Get("Content-Type")
var totalBytes int64
var success bool = true
// Handle SSE
if strings.Contains(contentType, "text/event-stream") {
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Bytes()
totalBytes += int64(len(line) + 1) // +1 for newline
// Parse "data: " lines
if bytes.HasPrefix(line, []byte("data: ")) {
data := bytes.TrimPrefix(line, []byte("data: "))
if !validateJSONRPC(data, expectedID) {
success = false
}
}
}
} else {
// Handle Standard JSON
body, err := io.ReadAll(resp.Body)
if err != nil {
return 0, false
}
totalBytes = int64(len(body))
if !validateJSONRPC(body, expectedID) {
success = false
}
}
return totalBytes, success
}
func validateJSONRPC(data []byte, expectedID int64) bool {
var rpcResp JSONRPCResponse
if err := json.Unmarshal(data, &rpcResp); err != nil {
// Try to parse as error only if standard unmarshal failed (though struct has both)
// If unmarshal fails entirely, it's invalid JSON
return false
}
// Validate Version
if rpcResp.JSONRPC != "2.0" {
return false
}
// Validate ID (if present)
// Some notifications might not have ID, but request-response should.
if rpcResp.ID != nil && *rpcResp.ID != expectedID {
return false
}
// Must have result OR error
if len(rpcResp.Result) == 0 && rpcResp.Error == nil {
return false
}
// Check for application error
if rpcResp.Error != nil {
return false
}
// Check for tool-level error (Variant 2)
if len(rpcResp.Result) > 0 {
var resMap map[string]interface{}
if err := json.Unmarshal(rpcResp.Result, &resMap); err == nil {
if isErr, ok := resMap["isError"].(bool); ok && isErr {
return false
}
}
}
return true
}
func generateTools(count int) []string {
tools := make([]string, count)
for i := 0; i < count; i++ {
if i < len(baseToolNames) {
tools[i] = baseToolNames[i]
} else {
tools[i] = fmt.Sprintf("tool_%d", i)
}
}
return tools
}
func generateRandomArgs(toolName string) map[string]interface{} {
args := make(map[string]interface{})
// Basic variation based on tool name to look realistic
if strings.Contains(toolName, "forecast") {
args["latitude"] = 30.0 + (randFloat() * 20.0)
args["longitude"] = -120.0 + (randFloat() * 40.0)
} else if strings.Contains(toolName, "stock") {
args["symbol"] = "MSFT"
} else if strings.Contains(toolName, "search") {
args["query"] = "latest news"
} else {
// Generic
args["arg1"] = randInt(100)
args["arg2"] = "test_value"
}
return args
}
// Helpers
func generateUUID() string {
b := make([]byte, 16)
_, err := rand.Read(b)
if err != nil {
return "00000000-0000-0000-0000-000000000000"
}
return fmt.Sprintf("%x%x%x%x%x",
b[0:4], b[4:6], b[6:8], b[8:10], b[10:])
}
func randInt(max int) int {
n, _ := rand.Int(rand.Reader, big.NewInt(int64(max)))
return int(n.Int64())
}
func randFloat() float64 {
// Simple float 0-1
// crypto/rand doesn't give floats directly, use math/big or just simple byte conv
n, _ := rand.Int(rand.Reader, big.NewInt(1000000))
return float64(n.Int64()) / 1000000.0
}
func printFinalStats(stats *Stats, duration time.Duration) {
req := stats.Requests.Load()
errs := stats.Errors.Load()
bytes := stats.BytesRx.Load()
latencySum := stats.LatencySum.Load()
rps := float64(req) / duration.Seconds()
avgLatency := 0.0
if req > 0 {
avgLatency = float64(latencySum) / float64(req) / 1000.0 // ms
}
fmt.Println("=== Final Results ===")
fmt.Printf("Duration: %v\n", duration)
fmt.Printf("Total Requests: %d\n", req)
fmt.Printf("Successful: %d\n", stats.Success.Load())
fmt.Printf("Errors: %d\n", errs)
fmt.Printf("Throughput: %.2f Req/s\n", rps)
fmt.Printf("Avg Latency: %.2f ms\n", avgLatency)
fmt.Printf("Total Data Received: %.2f MB\n", float64(bytes)/1024/1024)
}
package main
import (
"crypto/rand"
"encoding/base64"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"math/big"
"net/http"
"time"
)
// Configuration
var (
port = flag.Int("port", 9001, "Server port")
errorRate = flag.Float64("error-rate", 0.05, "Probability of returning a protocol error (0.0-1.0)")
toolErrorRate = flag.Float64("tool-error-rate", 0.05, "Probability of returning a tool error (0.0-1.0)")
longRespRate = flag.Float64("long-rate", 0.10, "Probability of returning a large response (0.0-1.0)")
minLatency = flag.Duration("min-latency", 5*time.Millisecond, "Minimum processing latency")
maxLatency = flag.Duration("max-latency", 50*time.Millisecond, "Maximum processing latency (for normal requests)")
)
// JSON-RPC Types (Simplified for Server)
type JSONRPCRequest struct {
JSONRPC string `json:"jsonrpc"`
Method string `json:"method"`
Params Params `json:"params"`
ID interface{} `json:"id"` // can be string or number
}
type Params struct {
Name string `json:"name"`
Arguments map[string]interface{} `json:"arguments"`
}
type JSONRPCResponse struct {
JSONRPC string `json:"jsonrpc"`
ID interface{} `json:"id"`
Result interface{} `json:"result,omitempty"`
Error *JSONRPCError `json:"error,omitempty"`
}
type JSONRPCError struct {
Code int `json:"code"`
Message string `json:"message"`
}
type ToolResult struct {
Content []ContentItem `json:"content"`
IsError bool `json:"isError"`
}
type ContentItem struct {
Type string `json:"type"`
Text string `json:"text"`
}
func main() {
flag.Parse()
http.HandleFunc("/mcp", handleMCP)
addr := fmt.Sprintf(":%d", *port)
fmt.Printf("Starting MCP Mock Server on %s\n", addr)
fmt.Printf(" Protocol Error Rate: %.2f\n", *errorRate)
fmt.Printf(" Tool Error Rate: %.2f\n", *toolErrorRate)
fmt.Printf(" Long Response Rate: %.2f\n", *longRespRate)
if err := http.ListenAndServe(addr, nil); err != nil {
log.Fatalf("Server failed: %v", err)
}
}
func handleMCP(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// 1. Parse Request
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Failed to read body", http.StatusBadRequest)
return
}
defer r.Body.Close()
var req JSONRPCRequest
if err := json.Unmarshal(body, &req); err != nil {
writeJSONError(w, nil, -32700, "Parse error")
return
}
// 2. Determine Latency
simulateLatency(req.Params.Name)
// 3. Prepare Response Headers for SSE
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
// mcp-headers usually required by spec, echoing standard ones
w.Header().Set("mcp-protocol-version", "2025-06-18")
// 4. Generate Result or Error
resp := generateResponse(req)
// 5. Send Response (SSE format)
// Spec often wraps JSON-RPC in an event, or just sends data.
// Common MCP over SSE sends the JSON-RPC message as data.
respBytes, err := json.Marshal(resp)
if err != nil {
log.Printf("Failed to marshal response: %v", err)
return
}
// SSE format: "data: <json>\n\n"
// Ensure implicit chunking by flushing
fmt.Fprintf(w, "data: %s\n\n", respBytes)
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
}
func simulateLatency(toolName string) {
var duration time.Duration
switch toolName {
case "query_database":
// Slowest: 1s - 2s
duration = time.Duration(1000+randInt(1000)) * time.Millisecond
case "resize_image":
// Slow: 300ms - 600ms
duration = time.Duration(300+randInt(300)) * time.Millisecond
default:
// Normal: min - max
rangeMs := int64(*maxLatency - *minLatency) / 1e6
if rangeMs <= 0 {
duration = *minLatency
} else {
duration = *minLatency + time.Duration(randInt(int(rangeMs)))*time.Millisecond
}
}
time.Sleep(duration)
}
func generateResponse(req JSONRPCRequest) JSONRPCResponse {
roll := randFloat()
// 1. Protocol Error
if roll < *errorRate {
return JSONRPCResponse{
JSONRPC: "2.0",
ID: req.ID,
Error: &JSONRPCError{
Code: -32603,
Message: "Internal error (simulated)",
},
}
}
// 2. Tool Error (Variant 2)
// Returns a successful JSON-RPC response, but the result indicates a tool failure.
if roll < *errorRate+*toolErrorRate {
return JSONRPCResponse{
JSONRPC: "2.0",
ID: req.ID,
Result: ToolResult{
Content: []ContentItem{
{
Type: "text",
Text: fmt.Sprintf("Error executing tool %s: unable to fetch data.", req.Params.Name),
},
},
IsError: true,
},
}
}
// 3. Success Result
result := make(map[string]interface{})
// Echo back tool name for verification
result["tool"] = req.Params.Name
result["status"] = "success"
// Determine Size
if randFloat() < *longRespRate {
// Large response (> 4KB, e.g., 5KB - 15KB)
// Simulating an image buffer or large text result
size := 5000 + randInt(10000)
result["data"] = generateRandomString(size)
result["type"] = "large_blob"
} else {
// Short response
result["message"] = "Operation completed successfully"
result["timestamp"] = time.Now().Unix()
}
return JSONRPCResponse{
JSONRPC: "2.0",
ID: req.ID,
Result: result,
}
}
// Helpers
func writeJSONError(w http.ResponseWriter, id interface{}, code int, msg string) {
w.Header().Set("Content-Type", "application/json") // Or SSE if connection already established?
// For simplicity, if we fail before SSE upgrade, standard JSON.
// But if we are in handleMCP, we prefer consistency.
// Since the client expects SSE for success/fail mixed, let's try to stick to SSE if possible
// OR just standard error if we haven't sent headers.
// The prompt implies SSE responses.
resp := JSONRPCResponse{
JSONRPC: "2.0",
ID: id,
Error: &JSONRPCError{
Code: code,
Message: msg,
},
}
bytes, _ := json.Marshal(resp)
w.WriteHeader(http.StatusOK) // SSE usually returns 200 then stream
w.Header().Set("Content-Type", "text/event-stream")
fmt.Fprintf(w, "data: %s\n\n", bytes)
}
func randInt(max int) int {
n, _ := rand.Int(rand.Reader, big.NewInt(int64(max)))
return int(n.Int64())
}
func randFloat() float64 {
n, _ := rand.Int(rand.Reader, big.NewInt(1000000))
return float64(n.Int64()) / 1000000.0
}
func generateRandomString(n int) string {
b := make([]byte, n)
rand.Read(b)
return base64.StdEncoding.EncodeToString(b)
}
scrape_configs:
- job_name: 'otel-collector'
scrape_interval: 5s
static_configs:
- targets: ['localhost:9464']
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment