|
#!/usr/bin/env node |
|
import http from "node:http"; |
|
import { Readable } from "node:stream"; |
|
|
|
const port = Number(process.env.MERCURY_PROXY_PORT || "18795"); |
|
const upstreamOrigin = process.env.MERCURY_PROXY_UPSTREAM || "https://api.inceptionlabs.ai"; |
|
const minGapMs = Number(process.env.MERCURY_PROXY_MIN_GAP_MS || "32000"); |
|
const maxGapMs = Number(process.env.MERCURY_PROXY_MAX_GAP_MS || "90000"); |
|
const requestTimeoutMs = Number(process.env.MERCURY_PROXY_REQUEST_TIMEOUT_MS || "120000"); |
|
const maxAttempts = Number(process.env.MERCURY_PROXY_MAX_ATTEMPTS || "3"); |
|
const backoffMultiplier = Number(process.env.MERCURY_PROXY_BACKOFF_MULTIPLIER || "1.5"); |
|
const backoffStepMs = Number(process.env.MERCURY_PROXY_BACKOFF_STEP_MS || "5000"); |
|
const decayStepMs = Number(process.env.MERCURY_PROXY_DECAY_STEP_MS || "2000"); |
|
const decayEverySuccesses = Number(process.env.MERCURY_PROXY_DECAY_EVERY_SUCCESSES || "3"); |
|
const retryableStatusCodes = new Set([408, 425, 429, 500, 502, 503, 504, 529]); |
|
|
|
const queue = []; |
|
let processing = false; |
|
let nextReadyAt = 0; |
|
let currentGapMs = minGapMs; |
|
let healthyStreak = 0; |
|
const stats = { |
|
totalRequests: 0, |
|
totalRetries: 0, |
|
rateLimitResponses: 0, |
|
timeoutErrors: 0, |
|
networkErrors: 0, |
|
lastBackoffAt: null, |
|
lastBackoffReason: null, |
|
}; |
|
|
|
function sleep(ms) { |
|
return new Promise((resolve) => setTimeout(resolve, ms)); |
|
} |
|
|
|
function nowIso() { |
|
return new Date().toISOString(); |
|
} |
|
|
|
function sanitizeHeaders(headers) { |
|
const out = {}; |
|
for (const [k, v] of Object.entries(headers)) { |
|
if (v == null) continue; |
|
const key = k.toLowerCase(); |
|
if (key === "host") continue; |
|
if (key === "connection") continue; |
|
if (key === "content-length") continue; |
|
out[key] = Array.isArray(v) ? v.join(",") : String(v); |
|
} |
|
return out; |
|
} |
|
|
|
function writeUpstreamHeaders(res, headers) { |
|
for (const [k, v] of headers.entries()) { |
|
const key = k.toLowerCase(); |
|
if (key === "connection") continue; |
|
if (key === "transfer-encoding") continue; |
|
if (key === "keep-alive") continue; |
|
if (key === "content-length") continue; |
|
res.setHeader(k, v); |
|
} |
|
} |
|
|
|
function clampGapMs(value) { |
|
return Math.min(maxGapMs, Math.max(minGapMs, Math.round(value))); |
|
} |
|
|
|
function parseRetryAfterMs(retryAfterHeader) { |
|
if (!retryAfterHeader) return null; |
|
const seconds = Number(retryAfterHeader); |
|
if (Number.isFinite(seconds) && seconds > 0) { |
|
return Math.round(seconds * 1000); |
|
} |
|
const asDate = Date.parse(retryAfterHeader); |
|
if (Number.isFinite(asDate)) { |
|
return Math.max(0, asDate - Date.now()); |
|
} |
|
return null; |
|
} |
|
|
|
function increaseGap(reason, extraDelayMs = 0) { |
|
const target = Math.max( |
|
currentGapMs * backoffMultiplier, |
|
currentGapMs + backoffStepMs, |
|
extraDelayMs > 0 ? extraDelayMs : 0, |
|
); |
|
const nextGapMs = clampGapMs(target); |
|
healthyStreak = 0; |
|
if (nextGapMs !== currentGapMs) { |
|
currentGapMs = nextGapMs; |
|
stats.lastBackoffAt = nowIso(); |
|
stats.lastBackoffReason = reason; |
|
console.warn( |
|
`[${stats.lastBackoffAt}] adaptive backoff: reason=${reason} newGapMs=${currentGapMs}`, |
|
); |
|
} |
|
} |
|
|
|
function decayGapOnHealthyResult() { |
|
healthyStreak += 1; |
|
if (healthyStreak < decayEverySuccesses) return; |
|
healthyStreak = 0; |
|
const nextGapMs = clampGapMs(currentGapMs - decayStepMs); |
|
if (nextGapMs !== currentGapMs) { |
|
currentGapMs = nextGapMs; |
|
console.log(`[${nowIso()}] adaptive decay: newGapMs=${currentGapMs}`); |
|
} |
|
} |
|
|
|
function enqueue(job) { |
|
queue.push(job); |
|
void drainQueue(); |
|
} |
|
|
|
async function drainQueue() { |
|
if (processing) return; |
|
processing = true; |
|
try { |
|
while (queue.length > 0) { |
|
const job = queue.shift(); |
|
if (!job) continue; |
|
const waitMs = Math.max(0, nextReadyAt - Date.now()); |
|
if (waitMs > 0) { |
|
await sleep(waitMs); |
|
} |
|
await runJob(job); |
|
nextReadyAt = Date.now() + currentGapMs; |
|
} |
|
} finally { |
|
processing = false; |
|
} |
|
} |
|
|
|
function isRetryableStatus(status) { |
|
return retryableStatusCodes.has(status); |
|
} |
|
|
|
async function discardBody(response) { |
|
try { |
|
await response.arrayBuffer(); |
|
} catch { |
|
// Ignore body cleanup failures. |
|
} |
|
} |
|
|
|
async function runJob(job) { |
|
const target = new URL(job.pathWithQuery, upstreamOrigin); |
|
stats.totalRequests += 1; |
|
|
|
for (let attempt = 1; attempt <= maxAttempts; attempt += 1) { |
|
let controller = null; |
|
let timeoutHandle = null; |
|
try { |
|
controller = new AbortController(); |
|
timeoutHandle = setTimeout(() => controller.abort(), requestTimeoutMs); |
|
const upstream = await fetch(target, { |
|
method: job.method, |
|
headers: job.headers, |
|
body: job.method === "GET" || job.method === "HEAD" ? undefined : job.body, |
|
redirect: "manual", |
|
signal: controller.signal, |
|
}); |
|
clearTimeout(timeoutHandle); |
|
|
|
if (isRetryableStatus(upstream.status) && attempt < maxAttempts) { |
|
const retryAfterMs = parseRetryAfterMs(upstream.headers.get("retry-after")); |
|
if (upstream.status === 429) { |
|
stats.rateLimitResponses += 1; |
|
} |
|
increaseGap(`status_${upstream.status}`, retryAfterMs ?? 0); |
|
stats.totalRetries += 1; |
|
await discardBody(upstream); |
|
await sleep(Math.max(currentGapMs, retryAfterMs ?? 0)); |
|
continue; |
|
} |
|
|
|
if (upstream.status === 429) { |
|
stats.rateLimitResponses += 1; |
|
const retryAfterMs = parseRetryAfterMs(upstream.headers.get("retry-after")); |
|
increaseGap("status_429_terminal", retryAfterMs ?? 0); |
|
} else if (isRetryableStatus(upstream.status)) { |
|
increaseGap(`status_${upstream.status}`); |
|
} else { |
|
decayGapOnHealthyResult(); |
|
} |
|
|
|
job.res.statusCode = upstream.status; |
|
writeUpstreamHeaders(job.res, upstream.headers); |
|
if (!upstream.body) { |
|
job.res.end(); |
|
return; |
|
} |
|
Readable.fromWeb(upstream.body).pipe(job.res); |
|
return; |
|
} catch (error) { |
|
if (timeoutHandle) { |
|
clearTimeout(timeoutHandle); |
|
} |
|
const isTimeout = error instanceof Error && error.name === "AbortError"; |
|
if (isTimeout) { |
|
stats.timeoutErrors += 1; |
|
} else { |
|
stats.networkErrors += 1; |
|
} |
|
increaseGap(isTimeout ? "timeout" : "network_error"); |
|
if (attempt < maxAttempts) { |
|
stats.totalRetries += 1; |
|
await sleep(currentGapMs); |
|
continue; |
|
} |
|
const msg = error instanceof Error ? error.message : String(error); |
|
job.res.statusCode = 502; |
|
job.res.setHeader("content-type", "application/json"); |
|
job.res.end(JSON.stringify({ error: "upstream_error", message: msg })); |
|
console.error(`[${nowIso()}] proxy error: ${msg}`); |
|
return; |
|
} |
|
} |
|
} |
|
|
|
const server = http.createServer(async (req, res) => { |
|
const method = (req.method || "GET").toUpperCase(); |
|
const url = req.url || "/"; |
|
|
|
if (url === "/healthz") { |
|
res.statusCode = 200; |
|
res.setHeader("content-type", "application/json"); |
|
res.end( |
|
JSON.stringify({ |
|
ok: true, |
|
queueDepth: queue.length, |
|
processing, |
|
minGapMs, |
|
currentGapMs, |
|
maxGapMs, |
|
requestTimeoutMs, |
|
maxAttempts, |
|
backoffMultiplier, |
|
backoffStepMs, |
|
decayStepMs, |
|
decayEverySuccesses, |
|
retryableStatusCodes: Array.from(retryableStatusCodes.values()), |
|
stats, |
|
upstreamOrigin, |
|
}), |
|
); |
|
return; |
|
} |
|
|
|
const bodyChunks = []; |
|
req.on("data", (chunk) => bodyChunks.push(chunk)); |
|
req.on("error", () => { |
|
if (!res.headersSent) { |
|
res.statusCode = 400; |
|
res.end("request stream error"); |
|
} |
|
}); |
|
req.on("end", () => { |
|
const body = bodyChunks.length > 0 ? Buffer.concat(bodyChunks) : undefined; |
|
const headers = sanitizeHeaders(req.headers); |
|
enqueue({ |
|
method, |
|
pathWithQuery: url, |
|
headers, |
|
body, |
|
res, |
|
}); |
|
}); |
|
}); |
|
|
|
server.listen(port, "127.0.0.1", () => { |
|
console.log( |
|
`[${nowIso()}] mercury-throttle-proxy listening on http://127.0.0.1:${port} -> ${upstreamOrigin} (minGapMs=${minGapMs}, maxGapMs=${maxGapMs}, maxAttempts=${maxAttempts}, timeoutMs=${requestTimeoutMs})`, |
|
); |
|
}); |