|
import { createHash } from "node:crypto"; |
|
import { mkdirSync } from "node:fs"; |
|
import { dirname } from "node:path"; |
|
import { DatabaseSync } from "node:sqlite"; |
|
import type { OpenClawPluginApi } from "openclaw/plugin-sdk"; |
|
|
|
type RetryOutcome = "error" | "timeout" | "killed" | "reset" | "deleted"; |
|
type TaskLifecycleState = "queued" | "leased" | "running" | "needs_input" | "needs_verification" | "done" | "failed" | "dead"; |
|
type SpawnKind = "worker" | "planner" | "audit" | "research" | "orchestrator"; |
|
|
|
type SpawnSpec = { |
|
task: string; |
|
label?: string; |
|
agentId?: string; |
|
model?: string; |
|
thinking?: string; |
|
mode?: string; |
|
thread?: boolean; |
|
kind?: SpawnKind; |
|
requestId?: string; |
|
idempotencyKey?: string; |
|
}; |
|
|
|
type PluginConfig = { |
|
injectOrchestratorPrompt?: boolean; |
|
enforceMainDelegation?: boolean; |
|
enforceFridayChannels?: boolean; |
|
autoRetryFailedSubagents?: boolean; |
|
maxRetriesPerTask?: number; |
|
retryBaseBackoffMs?: number; |
|
retryMaxBackoffMs?: number; |
|
schedulerTickMs?: number; |
|
schedulerMaxConcurrentWorkers?: number; |
|
verificationGraceMs?: number; |
|
stateDbPath?: string; |
|
statusChannel?: string; |
|
statusStickyEmoji?: string; |
|
statusEnabled?: boolean; |
|
retryOutcomes?: RetryOutcome[]; |
|
allowedMainTools?: string[]; |
|
allowedOrchestratorTools?: string[]; |
|
orchestratorLabelPrefix?: string; |
|
}; |
|
|
|
type PendingSpawn = { |
|
requesterSessionKey: string; |
|
spec: SpawnSpec; |
|
kind: "orchestrator" | "worker"; |
|
taskKey?: string; |
|
taskId?: string; |
|
attempt?: number; |
|
}; |
|
|
|
type WorkerStatus = "running" | "ok" | "success" | "completed" | "error" | "timeout" | "killed" | "reset" | "deleted" | "unknown"; |
|
|
|
type WorkerState = { |
|
childSessionKey: string; |
|
orchestratorSessionKey: string; |
|
label?: string; |
|
taskPreview: string; |
|
status: WorkerStatus; |
|
startedAtMs: number; |
|
updatedAtMs: number; |
|
outcome?: string; |
|
taskId?: string; |
|
taskKey?: string; |
|
attempt?: number; |
|
}; |
|
|
|
type RetryEnvelope = { |
|
taskKey: string; |
|
taskId?: string; |
|
requesterSessionKey: string; |
|
spec: SpawnSpec; |
|
failedSessionKey: string; |
|
outcome: string; |
|
attempt: number; |
|
runAfterMs: number; |
|
}; |
|
|
|
type FridayStatusSummary = { |
|
queued: number; |
|
leased: number; |
|
running: number; |
|
needsInput: number; |
|
needsVerification: number; |
|
done: number; |
|
failed: number; |
|
dead: number; |
|
}; |
|
|
|
const DEFAULT_ALLOWED_MAIN_TOOLS = [ |
|
"sessions_spawn", |
|
"sessions_send", |
|
"sessions_list", |
|
"sessions_history", |
|
"session_status", |
|
"subagents", |
|
"agents_list", |
|
"memory_search", |
|
"memory_get", |
|
]; |
|
|
|
const DEFAULT_ALLOWED_ORCHESTRATOR_TOOLS = [ |
|
"sessions_spawn", |
|
"sessions_send", |
|
"sessions_list", |
|
"sessions_history", |
|
"session_status", |
|
"process", |
|
"message", |
|
"subagents", |
|
"agents_list", |
|
"memory_search", |
|
"memory_get", |
|
]; |
|
|
|
const DEFAULT_RETRY_OUTCOMES: RetryOutcome[] = ["error", "timeout"]; |
|
const DEFAULT_STATUS_STICKY_EMOJI = "🤖"; |
|
const DEFAULT_STATUS_CHANNEL = "#friday-status"; |
|
const DEFAULT_STATE_DB_PATH = `${process.env.HOME ?? "/home/kadajett"}/.openclaw/state/orchestrator.db`; |
|
const STRICT_ROUTER_TOOLS = new Set([ |
|
"sessions_spawn", |
|
"sessions_send", |
|
"sessions_list", |
|
"sessions_history", |
|
"session_status", |
|
"process", |
|
"message", |
|
"subagents", |
|
"agents_list", |
|
"memory_search", |
|
"memory_get", |
|
]); |
|
const OUTBOUND_IDEMPOTENCY_WINDOW_MS = 120_000; |
|
const ORCHESTRATOR_IDLE_RETIRE_MS = 180_000; |
|
|
|
const VALID_TASK_STATES = new Set<TaskLifecycleState>([ |
|
"queued", |
|
"leased", |
|
"running", |
|
"needs_input", |
|
"needs_verification", |
|
"done", |
|
"failed", |
|
"dead", |
|
]); |
|
|
|
function asRecord(value: unknown): Record<string, unknown> { |
|
if (!value || typeof value !== "object" || Array.isArray(value)) return {}; |
|
return value as Record<string, unknown>; |
|
} |
|
|
|
function asString(value: unknown): string | undefined { |
|
if (typeof value !== "string") return undefined; |
|
const trimmed = value.trim(); |
|
return trimmed.length > 0 ? trimmed : undefined; |
|
} |
|
|
|
function asBoolean(value: unknown, fallback: boolean): boolean { |
|
return typeof value === "boolean" ? value : fallback; |
|
} |
|
|
|
function asPositiveInt(value: unknown, fallback: number): number { |
|
if (typeof value !== "number" || !Number.isFinite(value)) return fallback; |
|
const floored = Math.floor(value); |
|
return floored > 0 ? floored : fallback; |
|
} |
|
|
|
function asNonNegativeInt(value: unknown, fallback: number): number { |
|
if (typeof value !== "number" || !Number.isFinite(value)) return fallback; |
|
const floored = Math.floor(value); |
|
return floored >= 0 ? floored : fallback; |
|
} |
|
|
|
function asStringArray(value: unknown): string[] | undefined { |
|
if (!Array.isArray(value)) return undefined; |
|
const out = value |
|
.map((item) => (typeof item === "string" ? item.trim() : "")) |
|
.filter((item) => item.length > 0); |
|
return out.length > 0 ? out : undefined; |
|
} |
|
|
|
function resolveRetryOutcomes(value: unknown): Set<RetryOutcome> { |
|
const fromCfg = asStringArray(value); |
|
const raw = fromCfg ?? DEFAULT_RETRY_OUTCOMES; |
|
const allowed = new Set<RetryOutcome>(["error", "timeout", "killed", "reset", "deleted"]); |
|
const selected = raw.filter((entry): entry is RetryOutcome => allowed.has(entry as RetryOutcome)); |
|
return new Set(selected.length > 0 ? selected : DEFAULT_RETRY_OUTCOMES); |
|
} |
|
|
|
function normalizeTaskState(value: unknown, fallback: TaskLifecycleState): TaskLifecycleState { |
|
const raw = asString(value)?.toLowerCase(); |
|
if (!raw) return fallback; |
|
return VALID_TASK_STATES.has(raw as TaskLifecycleState) ? (raw as TaskLifecycleState) : fallback; |
|
} |
|
|
|
function normalizeFridayChannelTarget(value: string | undefined): string { |
|
const raw = (value ?? "").trim(); |
|
if (!raw) return DEFAULT_STATUS_CHANNEL; |
|
return raw.startsWith("#") ? raw.toLowerCase() : `#${raw.toLowerCase()}`; |
|
} |
|
|
|
function isAllowedFridayTarget(value: string | undefined): boolean { |
|
const target = normalizeFridayChannelTarget(value); |
|
return target === "#friday" || target.startsWith("#friday-") || target.includes("#friday-forum"); |
|
} |
|
|
|
function computeRetryDelayMs(baseMs: number, maxMs: number, attempt: number): number { |
|
const exp = Math.max(0, attempt - 1); |
|
const raw = baseMs * 2 ** exp; |
|
const jitter = Math.floor(Math.random() * Math.max(250, Math.floor(baseMs / 4))); |
|
return Math.min(maxMs, raw + jitter); |
|
} |
|
|
|
function isFrontControllerSessionKey(sessionKey: string | undefined): boolean { |
|
if (!sessionKey) return false; |
|
if (sessionKey.includes(":subagent:")) return false; |
|
return /^agent:/i.test(sessionKey); |
|
} |
|
|
|
function shortHash(input: string): string { |
|
return createHash("sha256").update(input).digest("hex").slice(0, 10); |
|
} |
|
|
|
function sanitizeLabel(input: string | undefined): string | undefined { |
|
const raw = (input ?? "").trim(); |
|
if (!raw) return undefined; |
|
const compact = raw.replace(/\s+/g, "-").toLowerCase(); |
|
const safe = compact.replace(/[^a-z0-9._-]/g, ""); |
|
return safe.slice(0, 64) || undefined; |
|
} |
|
|
|
function taskPreview(input: string | undefined, max = 180): string { |
|
const raw = (input ?? "").trim(); |
|
if (!raw) return "(no task)"; |
|
const compact = raw.replace(/\s+/g, " "); |
|
return compact.length <= max ? compact : `${compact.slice(0, max - 1)}…`; |
|
} |
|
|
|
function normalizeSpawnKind(value: string | undefined): SpawnKind | undefined { |
|
const raw = value?.trim().toLowerCase(); |
|
if (!raw) return undefined; |
|
if (raw === "worker" || raw === "planner" || raw === "audit" || raw === "research" || raw === "orchestrator") { |
|
return raw; |
|
} |
|
return undefined; |
|
} |
|
|
|
function inferSpawnKind(task: string, label?: string): SpawnKind { |
|
const probe = `${label ?? ""}\n${task}`.toLowerCase(); |
|
if ( |
|
/\b(plan|planner|planning|roadmap|approval|approve|outline|proposal|next steps)\b/.test(probe) |
|
) { |
|
return "planner"; |
|
} |
|
return "worker"; |
|
} |
|
|
|
function asSessionMessage(value: unknown): string | undefined { |
|
if (typeof value === "string") return asString(value); |
|
if (!Array.isArray(value)) return undefined; |
|
const chunks: string[] = []; |
|
for (const part of value) { |
|
const obj = asRecord(part); |
|
if (obj.type !== "text") continue; |
|
const text = asString(obj.text); |
|
if (text) chunks.push(text); |
|
} |
|
const joined = chunks.join("\n").trim(); |
|
return joined.length > 0 ? joined : undefined; |
|
} |
|
|
|
function extractLastUserMessage(messages: unknown[]): string | undefined { |
|
for (let i = messages.length - 1; i >= 0; i -= 1) { |
|
const obj = asRecord(messages[i]); |
|
if (asString(obj.role) !== "user") continue; |
|
const content = asSessionMessage(obj.content); |
|
if (content) return content; |
|
} |
|
return undefined; |
|
} |
|
|
|
function buildWorkerStatusSnapshot(orchestratorSessionKey: string, workers: Iterable<WorkerState>): string { |
|
const items = [...workers].sort((a, b) => b.updatedAtMs - a.updatedAtMs).slice(0, 12); |
|
if (items.length === 0) { |
|
return `- Hidden orchestrator ${orchestratorSessionKey} has no tracked worker sessions.`; |
|
} |
|
return [ |
|
`- Hidden orchestrator ${orchestratorSessionKey}`, |
|
`- Workers tracked: ${items.length}`, |
|
...items.map( |
|
(worker) => |
|
`- [${worker.status}] ${worker.childSessionKey} (${worker.label ?? "no-label"}) :: ${worker.taskPreview}`, |
|
), |
|
].join("\n"); |
|
} |
|
|
|
function makePendingKey( |
|
runId: string | undefined, |
|
toolCallId: string | undefined, |
|
sessionKey: string | undefined, |
|
): string { |
|
return `${runId ?? ""}|${toolCallId ?? ""}|${sessionKey ?? ""}`; |
|
} |
|
|
|
function stableTaskKey(requesterSessionKey: string, spec: SpawnSpec): string { |
|
const kind = spec.kind ?? "worker"; |
|
const taskBody = spec.task.trim(); |
|
const plannerScope = kind === "planner" |
|
? spec.requestId ?? taskBody |
|
: taskBody; |
|
const normalized = [ |
|
requesterSessionKey, |
|
kind, |
|
spec.agentId ?? "", |
|
spec.label ?? "", |
|
plannerScope, |
|
spec.requestId ?? "", |
|
].join("\n"); |
|
return createHash("sha256").update(normalized).digest("hex"); |
|
} |
|
|
|
function extractChildSessionKey(value: unknown, depth = 0): string | undefined { |
|
if (depth > 4) return undefined; |
|
const obj = asRecord(value); |
|
const direct = asString(obj.childSessionKey); |
|
if (direct) return direct; |
|
for (const nested of Object.values(obj)) { |
|
if (!nested || typeof nested !== "object") continue; |
|
const found = extractChildSessionKey(nested, depth + 1); |
|
if (found) return found; |
|
} |
|
return undefined; |
|
} |
|
|
|
function isSpawnAccepted(value: unknown): boolean { |
|
const obj = asRecord(value); |
|
const details = asRecord(obj.details); |
|
const status = |
|
asString(obj.status)?.toLowerCase() ?? |
|
asString(details.status)?.toLowerCase(); |
|
if (status) { |
|
return status === "accepted" || status === "ok" || status === "success"; |
|
} |
|
if (obj.isError === true || details.isError === true) return false; |
|
if (asString(obj.error) || asString(details.error)) return false; |
|
return true; |
|
} |
|
|
|
function isOrchestratorEchoMessage(text: string): boolean { |
|
const normalized = text.toLowerCase(); |
|
return ( |
|
normalized.includes("[orchestrator mode is active]") && |
|
normalized.includes("spawn exactly one hidden orchestrator session") |
|
); |
|
} |
|
|
|
function buildOrchestratorPrompt(maxRetries: number, orchestratorSessionKey?: string): string { |
|
const lines = [ |
|
"[Orchestrator mode is active]", |
|
"- You are the fast front-controller. Keep the first user-facing response short.", |
|
"- Preserve the assistant persona/tone from local identity files in user-facing replies.", |
|
"- Do not expose internal worker chatter; summarize real progress clearly.", |
|
"- Front-controller should not call read/exec/process directly.", |
|
"- Spawn exactly one hidden orchestrator session for this thread via sessions_spawn.", |
|
"- Delegate all real work to that orchestrator via sessions_send.", |
|
"- For live progress, query the hidden orchestrator via session_status/sessions_history.", |
|
"- If user asks for progress/status, query hidden orchestrator first, then answer.", |
|
`- If a system event contains <orchestrator-retry>, immediately respawn the failed task. Retry limit: ${maxRetries}.`, |
|
]; |
|
if (orchestratorSessionKey) { |
|
lines.push(`- Active hidden orchestrator session: ${orchestratorSessionKey}`); |
|
lines.push( |
|
`- Use sessions_send with sessionKey=${orchestratorSessionKey} for new delegated work in this thread.`, |
|
); |
|
} |
|
return lines.join("\n"); |
|
} |
|
|
|
function buildHiddenOrchestratorTask(requesterSessionKey: string, userTask: string): string { |
|
return [ |
|
"[Hidden Thread Orchestrator]", |
|
`requesterSessionKey: ${requesterSessionKey}`, |
|
"You are a hidden orchestrator for one front-controller thread.", |
|
"Execution contract:", |
|
"1) Break the task into concrete worker subtasks.", |
|
"2) Spawn worker subagents (sessions_spawn) for tool/file/shell/web work.", |
|
"3) Track worker status (pending/running/succeeded/failed) and retry failed workers only when useful.", |
|
"4) Keep progress updates concise and structured so requester can poll status.", |
|
"5) Do not send greetings/small-talk to requester; only send real status, blockers, or completion.", |
|
"6) Send progress updates back to requester via sessions_send only when there is meaningful change.", |
|
"7) When all workers finish, send one final summary to requester with completed changes, failures, and next actions.", |
|
"8) Never claim work is done unless a worker/tool result confirms it.", |
|
"9) Do not run read/write/edit/exec/process yourself unless delegation repeatedly fails.", |
|
"", |
|
"Primary task from front-controller:", |
|
userTask, |
|
].join("\n"); |
|
} |
|
|
|
function buildDelegationHint(toolName: string, params: unknown): string { |
|
const preview = taskPreview(JSON.stringify(params), 260); |
|
return [ |
|
`Hidden orchestrator policy blocked '${toolName}'.`, |
|
"Delegate this to a worker using sessions_spawn with a concrete task.", |
|
"Example:", |
|
`sessions_spawn({\"label\":\"worker-${toolName}\",\"task\":\"Use tool ${toolName} with params ${preview}. Return concise findings and artifacts.\"})`, |
|
].join(" "); |
|
} |
|
|
|
function buildRetrySystemEvent( |
|
spec: SpawnSpec, |
|
failedSessionKey: string, |
|
outcome: string, |
|
attempt: number, |
|
maxRetries: number, |
|
): string { |
|
return [ |
|
"<orchestrator-retry>", |
|
`failedChildSession: ${failedSessionKey}`, |
|
`outcome: ${outcome}`, |
|
`attempt: ${attempt}/${maxRetries}`, |
|
"action: Respawn this subagent task immediately via sessions_spawn.", |
|
`task: ${spec.task}`, |
|
`label: ${spec.label ?? ""}`, |
|
`agentId: ${spec.agentId ?? ""}`, |
|
`model: ${spec.model ?? ""}`, |
|
`thinking: ${spec.thinking ?? ""}`, |
|
`mode: ${spec.mode ?? ""}`, |
|
`thread: ${String(spec.thread ?? false)}`, |
|
"</orchestrator-retry>", |
|
].join("\n"); |
|
} |
|
|
|
function buildWorkerTaskPacket(taskId: string, attempt: number, rawTask: string): string { |
|
return [ |
|
rawTask.trim(), |
|
"", |
|
"[Worker result contract]", |
|
"When done, include ONE JSON object fenced in ```json``` with this schema:", |
|
"{", |
|
` \"task_id\": \"${taskId}\",`, |
|
" \"status\": \"completed|blocked|failed\",", |
|
" \"blockers\": [{\"type\":\"string\",\"message\":\"string\",\"suggested_fix\":\"string\",\"required_from_user\":false}],", |
|
" \"artifacts\": [{\"kind\":\"string\",\"path_or_ref\":\"string\",\"description\":\"string\"}],", |
|
" \"verification\": [{\"kind\":\"command|file|url\",\"value\":\"string\",\"expected\":\"string\"}],", |
|
" \"summary\": \"max 8 short lines\"", |
|
"}", |
|
`Current attempt: ${attempt}`, |
|
].join("\n"); |
|
} |
|
|
|
function stripWorkerTaskPacket(rawTask: string): string { |
|
const marker = "\n[Worker result contract]"; |
|
const idx = rawTask.indexOf(marker); |
|
if (idx < 0) return rawTask.trim(); |
|
return rawTask.slice(0, idx).trim(); |
|
} |
|
|
|
function makeTaskId(taskKey: string, attempt: number): string { |
|
return `${taskKey.slice(0, 12)}-a${attempt}`; |
|
} |
|
|
|
function formatDigitEmoji(value: number): string { |
|
const clamped = Math.max(0, Math.min(10, Math.floor(value))); |
|
if (clamped === 10) return "🔟"; |
|
const digits = ["0️⃣", "1️⃣", "2️⃣", "3️⃣", "4️⃣", "5️⃣", "6️⃣", "7️⃣", "8️⃣", "9️⃣"]; |
|
return digits[clamped] ?? "0️⃣"; |
|
} |
|
|
|
function buildStatusSystemEvent( |
|
requesterSessionKey: string, |
|
orchestratorSessionKey: string, |
|
summary: FridayStatusSummary, |
|
stickyEmoji: string, |
|
): string { |
|
return [ |
|
"<orchestrator-status>", |
|
`requesterSessionKey: ${requesterSessionKey}`, |
|
`orchestratorSessionKey: ${orchestratorSessionKey}`, |
|
`stickyActiveEmoji: ${stickyEmoji}`, |
|
`workersRunning: ${summary.running}`, |
|
`tasksQueued: ${summary.queued}`, |
|
`tasksDone: ${summary.done}`, |
|
`tasksFailed: ${summary.failed + summary.dead}`, |
|
"reactionHints:", |
|
`- active: 👷 ${formatDigitEmoji(summary.running)}`, |
|
`- queued: ⏳ ${formatDigitEmoji(summary.queued)}`, |
|
`- done: ✅ ${formatDigitEmoji(summary.done)}`, |
|
"</orchestrator-status>", |
|
].join("\n"); |
|
} |
|
|
|
function buildIncidentSystemEvent( |
|
requesterSessionKey: string, |
|
orchestratorSessionKey: string, |
|
statusChannel: string, |
|
reason: string, |
|
details: string, |
|
): string { |
|
return [ |
|
"<orchestrator-incident>", |
|
`targetChannel: ${statusChannel}`, |
|
`requesterSessionKey: ${requesterSessionKey}`, |
|
`orchestratorSessionKey: ${orchestratorSessionKey}`, |
|
`reason: ${reason}`, |
|
`details: ${details}`, |
|
"action: Post this incident to targetChannel via message tool if channel policy allows.", |
|
"</orchestrator-incident>", |
|
].join("\n"); |
|
} |
|
|
|
type WorkerReceipt = { |
|
taskId: string; |
|
status: "completed" | "blocked" | "failed"; |
|
verificationCount: number; |
|
explicitNoVerification: boolean; |
|
}; |
|
|
|
function parseWorkerReceipts(text: string): WorkerReceipt[] { |
|
const receiptsByTask = new Map<string, WorkerReceipt>(); |
|
const fromObject = (obj: Record<string, unknown>): WorkerReceipt | undefined => { |
|
const taskId = asString(obj.task_id); |
|
const status = asString(obj.status)?.toLowerCase() as WorkerReceipt["status"] | undefined; |
|
if (!taskId || !status || (status !== "completed" && status !== "blocked" && status !== "failed")) return undefined; |
|
const verification = Array.isArray(obj.verification) ? obj.verification : undefined; |
|
const verificationCount = verification?.length ?? 0; |
|
const explicitNoVerification = |
|
(Array.isArray(verification) && verification.length === 0) || |
|
obj.verification_required === false || |
|
obj.no_verification === true; |
|
return { |
|
taskId, |
|
status, |
|
verificationCount, |
|
explicitNoVerification, |
|
}; |
|
}; |
|
const fenceRegex = /```json\s*([\s\S]*?)```/gi; |
|
let fence: RegExpExecArray | null; |
|
while ((fence = fenceRegex.exec(text)) !== null) { |
|
const payload = fence[1]; |
|
if (!payload) continue; |
|
try { |
|
const parsed = JSON.parse(payload); |
|
if (Array.isArray(parsed)) { |
|
for (const item of parsed) { |
|
const receipt = fromObject(asRecord(item)); |
|
if (receipt) receiptsByTask.set(receipt.taskId, receipt); |
|
} |
|
} else { |
|
const receipt = fromObject(asRecord(parsed)); |
|
if (receipt) receiptsByTask.set(receipt.taskId, receipt); |
|
} |
|
} catch { |
|
// Fall through to regex fallback. |
|
} |
|
} |
|
if (receiptsByTask.size === 0) { |
|
const regex = /"task_id"\s*:\s*"([^"]+)"[\s\S]*?"status"\s*:\s*"(completed|blocked|failed)"/gi; |
|
let match: RegExpExecArray | null; |
|
while ((match = regex.exec(text)) !== null) { |
|
const taskId = match[1]?.trim(); |
|
const status = match[2]?.toLowerCase() as WorkerReceipt["status"] | undefined; |
|
if (!taskId || !status) continue; |
|
receiptsByTask.set(taskId, { |
|
taskId, |
|
status, |
|
verificationCount: 0, |
|
explicitNoVerification: false, |
|
}); |
|
} |
|
} |
|
return [...receiptsByTask.values()]; |
|
} |
|
|
|
export default function register(api: OpenClawPluginApi) { |
|
const cfg = asRecord(api.pluginConfig) as PluginConfig; |
|
const injectOrchestratorPrompt = asBoolean(cfg.injectOrchestratorPrompt, true); |
|
const enforceMainDelegation = asBoolean(cfg.enforceMainDelegation, false); |
|
const enforceFridayChannels = asBoolean(cfg.enforceFridayChannels, false); |
|
const autoRetryFailedSubagents = asBoolean(cfg.autoRetryFailedSubagents, false); |
|
const maxRetriesPerTask = asPositiveInt(cfg.maxRetriesPerTask, 2); |
|
const retryBaseBackoffMs = asPositiveInt(cfg.retryBaseBackoffMs, 1_500); |
|
const retryMaxBackoffMs = asPositiveInt(cfg.retryMaxBackoffMs, 60_000); |
|
const schedulerTickMs = asPositiveInt(cfg.schedulerTickMs, 1_000); |
|
const schedulerMaxConcurrentWorkers = asPositiveInt(cfg.schedulerMaxConcurrentWorkers, 6); |
|
const verificationGraceMs = asPositiveInt(cfg.verificationGraceMs, 120_000); |
|
const statusEnabled = asBoolean(cfg.statusEnabled, false); |
|
const statusStickyEmoji = asString(cfg.statusStickyEmoji) ?? DEFAULT_STATUS_STICKY_EMOJI; |
|
const stateDbPath = asString(cfg.stateDbPath) ?? DEFAULT_STATE_DB_PATH; |
|
const requestedStatusChannel = asString(cfg.statusChannel); |
|
const normalizedStatusChannel = normalizeFridayChannelTarget(requestedStatusChannel); |
|
const statusChannel = enforceFridayChannels && !isAllowedFridayTarget(normalizedStatusChannel) |
|
? DEFAULT_STATUS_CHANNEL |
|
: normalizedStatusChannel; |
|
const orchestratorLabelPrefix = sanitizeLabel(asString(cfg.orchestratorLabelPrefix)) ?? "thread-orchestrator"; |
|
const retryOutcomes = resolveRetryOutcomes(cfg.retryOutcomes); |
|
const allowedMainTools = new Set([ |
|
...DEFAULT_ALLOWED_MAIN_TOOLS, |
|
...(asStringArray(cfg.allowedMainTools) ?? []), |
|
]); |
|
const allowedOrchestratorTools = new Set([ |
|
...DEFAULT_ALLOWED_ORCHESTRATOR_TOOLS, |
|
...(asStringArray(cfg.allowedOrchestratorTools) ?? []), |
|
]); |
|
// Keep front/orchestrator strict even if config accidentally adds direct tools. |
|
for (const toolId of [...allowedMainTools]) { |
|
if (!STRICT_ROUTER_TOOLS.has(toolId)) allowedMainTools.delete(toolId); |
|
} |
|
for (const toolId of [...allowedOrchestratorTools]) { |
|
if (!STRICT_ROUTER_TOOLS.has(toolId)) allowedOrchestratorTools.delete(toolId); |
|
} |
|
|
|
mkdirSync(dirname(stateDbPath), { recursive: true }); |
|
const db = new DatabaseSync(stateDbPath); |
|
db.exec("PRAGMA journal_mode = WAL;"); |
|
db.exec("PRAGMA busy_timeout = 5000;"); |
|
db.exec(` |
|
CREATE TABLE IF NOT EXISTS orchestrator_bindings ( |
|
requester_session_key TEXT PRIMARY KEY, |
|
orchestrator_session_key TEXT NOT NULL UNIQUE, |
|
updated_at INTEGER NOT NULL |
|
); |
|
CREATE TABLE IF NOT EXISTS orchestrator_retries ( |
|
task_key TEXT PRIMARY KEY, |
|
retries INTEGER NOT NULL DEFAULT 0, |
|
updated_at INTEGER NOT NULL |
|
); |
|
CREATE TABLE IF NOT EXISTS orchestrator_tasks ( |
|
task_id TEXT PRIMARY KEY, |
|
task_key TEXT NOT NULL, |
|
requester_session_key TEXT NOT NULL, |
|
orchestrator_session_key TEXT NOT NULL, |
|
label TEXT, |
|
task_preview TEXT NOT NULL, |
|
state TEXT NOT NULL, |
|
attempt INTEGER NOT NULL DEFAULT 1, |
|
max_retries INTEGER NOT NULL DEFAULT 0, |
|
next_run_at INTEGER, |
|
last_outcome TEXT, |
|
spec_json TEXT, |
|
receipt_seen INTEGER NOT NULL DEFAULT 0, |
|
verification_items INTEGER NOT NULL DEFAULT 0, |
|
explicit_no_verification INTEGER NOT NULL DEFAULT 0, |
|
created_at INTEGER NOT NULL, |
|
updated_at INTEGER NOT NULL |
|
); |
|
CREATE INDEX IF NOT EXISTS idx_orchestrator_tasks_orch_state |
|
ON orchestrator_tasks(orchestrator_session_key, state, updated_at); |
|
CREATE TABLE IF NOT EXISTS orchestrator_workers ( |
|
child_session_key TEXT PRIMARY KEY, |
|
task_id TEXT, |
|
task_key TEXT, |
|
orchestrator_session_key TEXT NOT NULL, |
|
label TEXT, |
|
task_preview TEXT NOT NULL, |
|
status TEXT NOT NULL, |
|
attempt INTEGER NOT NULL DEFAULT 1, |
|
started_at INTEGER NOT NULL, |
|
updated_at INTEGER NOT NULL, |
|
outcome TEXT |
|
); |
|
CREATE INDEX IF NOT EXISTS idx_orchestrator_workers_orch |
|
ON orchestrator_workers(orchestrator_session_key, updated_at); |
|
`); |
|
const addColumnIfMissing = (sql: string): void => { |
|
try { |
|
db.exec(sql); |
|
} catch (error) { |
|
const text = String(error ?? ""); |
|
if (!/duplicate column name/i.test(text)) throw error; |
|
} |
|
}; |
|
addColumnIfMissing("ALTER TABLE orchestrator_tasks ADD COLUMN receipt_seen INTEGER NOT NULL DEFAULT 0;"); |
|
addColumnIfMissing("ALTER TABLE orchestrator_tasks ADD COLUMN verification_items INTEGER NOT NULL DEFAULT 0;"); |
|
addColumnIfMissing("ALTER TABLE orchestrator_tasks ADD COLUMN explicit_no_verification INTEGER NOT NULL DEFAULT 0;"); |
|
|
|
const pendingSpawns = new Map<string, PendingSpawn>(); |
|
const spawnByChildSession = new Map<string, PendingSpawn>(); |
|
const retryCountByTask = new Map<string, number>(); |
|
const attemptCountByTask = new Map<string, number>(); |
|
const pendingRetryByTask = new Map<string, RetryEnvelope>(); |
|
const orchestratorByRequester = new Map<string, string>(); |
|
const requesterByOrchestrator = new Map<string, string>(); |
|
const orchestratorSessions = new Set<string>(); |
|
const retiredOrchestratorSessions = new Set<string>(); |
|
const orchestratorLastTouchBySession = new Map<string, number>(); |
|
const outboundIdempotencySeenByTarget = new Map<string, number>(); |
|
const lastUserMessageBySession = new Map<string, string>(); |
|
const lastDelegatedTaskByOrchestrator = new Map<string, string>(); |
|
const lastStatusEmitByRequester = new Map<string, number>(); |
|
const lastStatusIdempotencyByRequester = new Map<string, string>(); |
|
const lastIncidentHashByRequester = new Map<string, string>(); |
|
const blockedAttemptsByRun = new Map<string, number>(); |
|
const workerStateBySession = new Map<string, WorkerState>(); |
|
const workersByOrchestrator = new Map<string, Set<string>>(); |
|
|
|
function readTaskState(taskId: string): TaskLifecycleState | undefined { |
|
const row = db.prepare(`SELECT state FROM orchestrator_tasks WHERE task_id = ? LIMIT 1`).get(taskId) as unknown; |
|
const state = asString(asRecord(row).state); |
|
if (!state) return undefined; |
|
return normalizeTaskState(state, "queued"); |
|
} |
|
|
|
function markTaskReceipt(taskId: string, receipt: WorkerReceipt): void { |
|
db.prepare(` |
|
UPDATE orchestrator_tasks |
|
SET receipt_seen = 1, |
|
verification_items = ?, |
|
explicit_no_verification = ? |
|
WHERE task_id = ? |
|
`).run(receipt.verificationCount, receipt.explicitNoVerification ? 1 : 0, taskId); |
|
} |
|
|
|
function deriveOrchestratorStateIdempotencyKey(orchestratorSessionKey: string): string { |
|
const summary = readStatusSummary(orchestratorSessionKey); |
|
const workerKeys = workersByOrchestrator.get(orchestratorSessionKey); |
|
const runningWorkers = workerKeys |
|
? [...workerKeys].filter((key) => workerStateBySession.get(key)?.status === "running").length |
|
: 0; |
|
return [ |
|
"state", |
|
orchestratorSessionKey, |
|
summary.queued, |
|
summary.leased, |
|
summary.running, |
|
summary.needsInput, |
|
summary.needsVerification, |
|
summary.done, |
|
summary.failed, |
|
summary.dead, |
|
runningWorkers, |
|
].join(":"); |
|
} |
|
|
|
function extractOutboundTarget(params: Record<string, unknown>, fallbackSessionKey?: string): string { |
|
return ( |
|
asString(params.sessionKey) ?? |
|
asString(params.channel) ?? |
|
asString(params.to) ?? |
|
asString(params.target) ?? |
|
asString(params.channelId) ?? |
|
asString(params.threadId) ?? |
|
fallbackSessionKey ?? |
|
"unknown-target" |
|
); |
|
} |
|
|
|
function extractOutboundText(params: Record<string, unknown>): string | undefined { |
|
return ( |
|
asString(params.message) ?? |
|
asString(params.content) ?? |
|
asString(params.text) ?? |
|
asString(params.body) |
|
); |
|
} |
|
|
|
function deriveOutboundIdempotencyKey( |
|
orchestratorSessionKey: string, |
|
params: Record<string, unknown>, |
|
text: string | undefined, |
|
): string { |
|
const explicit = |
|
asString(params.idempotencyKey) ?? |
|
asString(asRecord(params.metadata).idempotencyKey) ?? |
|
asString(asRecord(params.meta).idempotencyKey); |
|
if (explicit) return explicit; |
|
const body = (text ?? "").trim(); |
|
if (body.includes("<orchestrator-status>")) { |
|
const running = body.match(/workersRunning:\s*(\d+)/i)?.[1] ?? "0"; |
|
const queued = body.match(/tasksQueued:\s*(\d+)/i)?.[1] ?? "0"; |
|
const done = body.match(/tasksDone:\s*(\d+)/i)?.[1] ?? "0"; |
|
const failed = body.match(/tasksFailed:\s*(\d+)/i)?.[1] ?? "0"; |
|
return `status:${orchestratorSessionKey}:${running}:${queued}:${done}:${failed}`; |
|
} |
|
if (body.includes("<orchestrator-incident>")) { |
|
const reason = body.match(/reason:\s*([^\n]+)/i)?.[1]?.trim() ?? "incident"; |
|
const details = body.match(/details:\s*([^\n]+)/i)?.[1]?.trim() ?? ""; |
|
return `incident:${orchestratorSessionKey}:${shortHash(`${reason}|${details}`)}`; |
|
} |
|
const receipts = parseWorkerReceipts(body); |
|
if (receipts.length > 0) { |
|
const receiptKey = receipts |
|
.map((receipt) => `${receipt.taskId}:${receipt.status}`) |
|
.sort() |
|
.join(","); |
|
return `receipt:${orchestratorSessionKey}:${receiptKey}`; |
|
} |
|
return deriveOrchestratorStateIdempotencyKey(orchestratorSessionKey); |
|
} |
|
|
|
function shouldSuppressOutbound(target: string, idempotencyKey: string, now: number): boolean { |
|
for (const [key, sentAt] of outboundIdempotencySeenByTarget.entries()) { |
|
if (now - sentAt > OUTBOUND_IDEMPOTENCY_WINDOW_MS) outboundIdempotencySeenByTarget.delete(key); |
|
} |
|
const dedupeKey = `${target}|${idempotencyKey}`; |
|
const previous = outboundIdempotencySeenByTarget.get(dedupeKey); |
|
if (previous && now - previous <= OUTBOUND_IDEMPOTENCY_WINDOW_MS) return true; |
|
outboundIdempotencySeenByTarget.set(dedupeKey, now); |
|
return false; |
|
} |
|
|
|
function setBinding(requesterSessionKey: string, orchestratorSessionKey: string): void { |
|
const now = Date.now(); |
|
db.prepare(` |
|
DELETE FROM orchestrator_bindings |
|
WHERE orchestrator_session_key = ? AND requester_session_key <> ? |
|
`).run(orchestratorSessionKey, requesterSessionKey); |
|
db.prepare(` |
|
INSERT INTO orchestrator_bindings(requester_session_key, orchestrator_session_key, updated_at) |
|
VALUES (?, ?, ?) |
|
ON CONFLICT(requester_session_key) DO UPDATE SET |
|
orchestrator_session_key=excluded.orchestrator_session_key, |
|
updated_at=excluded.updated_at |
|
`).run(requesterSessionKey, orchestratorSessionKey, now); |
|
orchestratorLastTouchBySession.set(orchestratorSessionKey, now); |
|
retiredOrchestratorSessions.delete(orchestratorSessionKey); |
|
} |
|
|
|
function deleteBindingByRequester(requesterSessionKey: string): void { |
|
db.prepare(`DELETE FROM orchestrator_bindings WHERE requester_session_key = ?`).run(requesterSessionKey); |
|
} |
|
|
|
function deleteBindingByOrchestrator(orchestratorSessionKey: string): void { |
|
db.prepare(`DELETE FROM orchestrator_bindings WHERE orchestrator_session_key = ?`).run(orchestratorSessionKey); |
|
} |
|
|
|
function touchOrchestrator(orchestratorSessionKey: string): void { |
|
orchestratorLastTouchBySession.set(orchestratorSessionKey, Date.now()); |
|
} |
|
|
|
function countActiveTasks(orchestratorSessionKey: string): number { |
|
const row = db.prepare(` |
|
SELECT COUNT(*) AS c |
|
FROM orchestrator_tasks |
|
WHERE orchestrator_session_key = ? |
|
AND state IN ('queued', 'leased', 'running', 'needs_input', 'needs_verification') |
|
`).get(orchestratorSessionKey) as unknown; |
|
return asNonNegativeInt(asRecord(row).c, 0); |
|
} |
|
|
|
function retireOrchestratorSession(orchestratorSessionKey: string, reason: string): void { |
|
const requester = requesterByOrchestrator.get(orchestratorSessionKey); |
|
requesterByOrchestrator.delete(orchestratorSessionKey); |
|
orchestratorSessions.delete(orchestratorSessionKey); |
|
retiredOrchestratorSessions.add(orchestratorSessionKey); |
|
orchestratorLastTouchBySession.delete(orchestratorSessionKey); |
|
deleteBindingByOrchestrator(orchestratorSessionKey); |
|
if (requester && orchestratorByRequester.get(requester) === orchestratorSessionKey) { |
|
orchestratorByRequester.delete(requester); |
|
deleteBindingByRequester(requester); |
|
} |
|
const workerKeys = workersByOrchestrator.get(orchestratorSessionKey); |
|
if (workerKeys) { |
|
for (const workerKey of workerKeys) { |
|
workerStateBySession.delete(workerKey); |
|
deleteWorkerRow(workerKey); |
|
} |
|
workersByOrchestrator.delete(orchestratorSessionKey); |
|
} |
|
api.logger.info(`orchestrator: retired hidden orchestrator ${orchestratorSessionKey} (${reason})`); |
|
} |
|
|
|
function setRetryCount(taskKey: string, retries: number): void { |
|
const now = Date.now(); |
|
retryCountByTask.set(taskKey, retries); |
|
db.prepare(` |
|
INSERT INTO orchestrator_retries(task_key, retries, updated_at) |
|
VALUES (?, ?, ?) |
|
ON CONFLICT(task_key) DO UPDATE SET retries=excluded.retries, updated_at=excluded.updated_at |
|
`).run(taskKey, retries, now); |
|
} |
|
|
|
function upsertTaskRow( |
|
taskId: string, |
|
taskKey: string, |
|
requesterSessionKey: string, |
|
orchestratorSessionKey: string, |
|
spec: SpawnSpec, |
|
state: TaskLifecycleState, |
|
attempt: number, |
|
opts?: { nextRunAt?: number; outcome?: string }, |
|
): void { |
|
const now = Date.now(); |
|
const preview = taskPreview(spec.task); |
|
db.prepare(` |
|
INSERT INTO orchestrator_tasks( |
|
task_id, task_key, requester_session_key, orchestrator_session_key, |
|
label, task_preview, state, attempt, max_retries, |
|
next_run_at, last_outcome, spec_json, created_at, updated_at |
|
) |
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |
|
ON CONFLICT(task_id) DO UPDATE SET |
|
state=excluded.state, |
|
attempt=excluded.attempt, |
|
next_run_at=excluded.next_run_at, |
|
last_outcome=excluded.last_outcome, |
|
spec_json=excluded.spec_json, |
|
updated_at=excluded.updated_at |
|
`).run( |
|
taskId, |
|
taskKey, |
|
requesterSessionKey, |
|
orchestratorSessionKey, |
|
spec.label ?? null, |
|
preview, |
|
state, |
|
attempt, |
|
maxRetriesPerTask, |
|
opts?.nextRunAt ?? null, |
|
opts?.outcome ?? null, |
|
JSON.stringify(spec), |
|
now, |
|
now, |
|
); |
|
} |
|
|
|
function setTaskState(taskId: string, state: TaskLifecycleState, opts?: { nextRunAt?: number; outcome?: string }): void { |
|
const now = Date.now(); |
|
db.prepare(` |
|
UPDATE orchestrator_tasks |
|
SET state = ?, next_run_at = ?, last_outcome = ?, updated_at = ? |
|
WHERE task_id = ? |
|
`).run(state, opts?.nextRunAt ?? null, opts?.outcome ?? null, now, taskId); |
|
} |
|
|
|
function upsertWorkerRow(worker: WorkerState): void { |
|
db.prepare(` |
|
INSERT INTO orchestrator_workers( |
|
child_session_key, task_id, task_key, orchestrator_session_key, |
|
label, task_preview, status, attempt, started_at, updated_at, outcome |
|
) |
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |
|
ON CONFLICT(child_session_key) DO UPDATE SET |
|
status=excluded.status, |
|
updated_at=excluded.updated_at, |
|
outcome=excluded.outcome, |
|
task_id=excluded.task_id, |
|
task_key=excluded.task_key, |
|
attempt=excluded.attempt |
|
`).run( |
|
worker.childSessionKey, |
|
worker.taskId ?? null, |
|
worker.taskKey ?? null, |
|
worker.orchestratorSessionKey, |
|
worker.label ?? null, |
|
worker.taskPreview, |
|
worker.status, |
|
worker.attempt ?? 1, |
|
worker.startedAtMs, |
|
worker.updatedAtMs, |
|
worker.outcome ?? null, |
|
); |
|
} |
|
|
|
function deleteWorkerRow(childSessionKey: string): void { |
|
db.prepare(`DELETE FROM orchestrator_workers WHERE child_session_key = ?`).run(childSessionKey); |
|
} |
|
|
|
function findActiveTaskIdByTaskKey(taskKey: string): string | undefined { |
|
const row = db.prepare(` |
|
SELECT task_id |
|
FROM orchestrator_tasks |
|
WHERE task_key = ? |
|
AND state IN ('queued', 'leased', 'running', 'needs_input', 'needs_verification') |
|
ORDER BY updated_at DESC |
|
LIMIT 1 |
|
`).get(taskKey) as unknown; |
|
return asString(asRecord(row).task_id); |
|
} |
|
|
|
function readStatusSummary(orchestratorSessionKey: string): FridayStatusSummary { |
|
const out: FridayStatusSummary = { |
|
queued: 0, |
|
leased: 0, |
|
running: 0, |
|
needsInput: 0, |
|
needsVerification: 0, |
|
done: 0, |
|
failed: 0, |
|
dead: 0, |
|
}; |
|
const rows = db.prepare(` |
|
SELECT state, COUNT(*) AS count |
|
FROM orchestrator_tasks |
|
WHERE orchestrator_session_key = ? |
|
GROUP BY state |
|
`).all(orchestratorSessionKey) as unknown[]; |
|
for (const row of rows) { |
|
const obj = asRecord(row); |
|
const state = normalizeTaskState(obj.state, "queued"); |
|
const count = asNonNegativeInt(obj.count, 0); |
|
if (state === "queued") out.queued += count; |
|
else if (state === "leased") out.leased += count; |
|
else if (state === "running") out.running += count; |
|
else if (state === "needs_input") out.needsInput += count; |
|
else if (state === "needs_verification") out.needsVerification += count; |
|
else if (state === "done") out.done += count; |
|
else if (state === "failed") out.failed += count; |
|
else if (state === "dead") out.dead += count; |
|
} |
|
return out; |
|
} |
|
|
|
function emitStatusUpdate(orchestratorSessionKey: string, reason: string): void { |
|
if (!statusEnabled) return; |
|
const requesterSessionKey = requesterByOrchestrator.get(orchestratorSessionKey); |
|
if (!requesterSessionKey) return; |
|
const summary = readStatusSummary(orchestratorSessionKey); |
|
const statusIdempotencyKey = [ |
|
orchestratorSessionKey, |
|
summary.queued, |
|
summary.leased, |
|
summary.running, |
|
summary.needsInput, |
|
summary.needsVerification, |
|
summary.done, |
|
summary.failed, |
|
summary.dead, |
|
].join(":"); |
|
if (lastStatusIdempotencyByRequester.get(requesterSessionKey) === statusIdempotencyKey) return; |
|
lastStatusIdempotencyByRequester.set(requesterSessionKey, statusIdempotencyKey); |
|
const now = Date.now(); |
|
const last = lastStatusEmitByRequester.get(requesterSessionKey) ?? 0; |
|
if (now - last < 700) return; |
|
lastStatusEmitByRequester.set(requesterSessionKey, now); |
|
const statusEvent = buildStatusSystemEvent( |
|
requesterSessionKey, |
|
orchestratorSessionKey, |
|
summary, |
|
statusStickyEmoji, |
|
); |
|
api.runtime.system.enqueueSystemEvent(statusEvent, { sessionKey: requesterSessionKey }); |
|
api.runtime.system.requestHeartbeatNow({ |
|
reason: `orchestrator-status:${reason}`, |
|
sessionKey: requesterSessionKey, |
|
}); |
|
const failures = summary.failed + summary.dead; |
|
if (failures <= 0) return; |
|
const incidentHash = shortHash(`${orchestratorSessionKey}|${failures}|${summary.running}|${summary.queued}`); |
|
if (lastIncidentHashByRequester.get(requesterSessionKey) === incidentHash) return; |
|
lastIncidentHashByRequester.set(requesterSessionKey, incidentHash); |
|
const incidentEvent = buildIncidentSystemEvent( |
|
requesterSessionKey, |
|
orchestratorSessionKey, |
|
statusChannel, |
|
"worker-failure", |
|
`failures=${failures} running=${summary.running} queued=${summary.queued}`, |
|
); |
|
api.runtime.system.enqueueSystemEvent(incidentEvent, { sessionKey: requesterSessionKey }); |
|
} |
|
|
|
// Rehydrate persisted maps so orchestration survives gateway restarts. |
|
const bindingRows = db.prepare(` |
|
SELECT requester_session_key, orchestrator_session_key |
|
FROM orchestrator_bindings |
|
`).all() as unknown[]; |
|
for (const row of bindingRows) { |
|
const obj = asRecord(row); |
|
const requester = asString(obj.requester_session_key); |
|
const orchestrator = asString(obj.orchestrator_session_key); |
|
if (!requester || !orchestrator) continue; |
|
orchestratorByRequester.set(requester, orchestrator); |
|
requesterByOrchestrator.set(orchestrator, requester); |
|
orchestratorSessions.add(orchestrator); |
|
} |
|
|
|
const retryRows = db.prepare(`SELECT task_key, retries FROM orchestrator_retries`).all() as unknown[]; |
|
for (const row of retryRows) { |
|
const obj = asRecord(row); |
|
const taskKey = asString(obj.task_key); |
|
if (!taskKey) continue; |
|
retryCountByTask.set(taskKey, asNonNegativeInt(obj.retries, 0)); |
|
} |
|
|
|
const workerRows = db.prepare(` |
|
SELECT child_session_key, orchestrator_session_key, label, task_preview, status, |
|
started_at, updated_at, outcome, task_id, task_key, attempt |
|
FROM orchestrator_workers |
|
`).all() as unknown[]; |
|
for (const row of workerRows) { |
|
const obj = asRecord(row); |
|
const childSessionKey = asString(obj.child_session_key); |
|
const orchestratorSessionKey = asString(obj.orchestrator_session_key); |
|
if (!childSessionKey || !orchestratorSessionKey) continue; |
|
const worker: WorkerState = { |
|
childSessionKey, |
|
orchestratorSessionKey, |
|
label: asString(obj.label), |
|
taskPreview: asString(obj.task_preview) ?? "(unknown task)", |
|
status: (asString(obj.status)?.toLowerCase() ?? "unknown") as WorkerStatus, |
|
startedAtMs: asNonNegativeInt(obj.started_at, Date.now()), |
|
updatedAtMs: asNonNegativeInt(obj.updated_at, Date.now()), |
|
outcome: asString(obj.outcome), |
|
taskId: asString(obj.task_id), |
|
taskKey: asString(obj.task_key), |
|
attempt: asPositiveInt(obj.attempt, 1), |
|
}; |
|
workerStateBySession.set(childSessionKey, worker); |
|
const workerSet = workersByOrchestrator.get(orchestratorSessionKey) ?? new Set<string>(); |
|
workerSet.add(childSessionKey); |
|
workersByOrchestrator.set(orchestratorSessionKey, workerSet); |
|
} |
|
|
|
const queuedRetryRows = db.prepare(` |
|
SELECT task_id, task_key, orchestrator_session_key, spec_json, next_run_at, last_outcome, attempt |
|
FROM orchestrator_tasks |
|
WHERE state = 'queued' AND next_run_at IS NOT NULL |
|
`).all() as unknown[]; |
|
for (const row of queuedRetryRows) { |
|
const obj = asRecord(row); |
|
const taskKey = asString(obj.task_key); |
|
const orchestratorSessionKey = asString(obj.orchestrator_session_key); |
|
const specJson = asString(obj.spec_json); |
|
if (!taskKey || !orchestratorSessionKey || !specJson) continue; |
|
try { |
|
const specRecord = asRecord(JSON.parse(specJson)); |
|
const spec: SpawnSpec = { |
|
task: asString(specRecord.task) ?? "", |
|
label: asString(specRecord.label), |
|
agentId: asString(specRecord.agentId), |
|
model: asString(specRecord.model), |
|
thinking: asString(specRecord.thinking), |
|
mode: asString(specRecord.mode), |
|
thread: typeof specRecord.thread === "boolean" ? specRecord.thread : undefined, |
|
kind: normalizeSpawnKind(asString(specRecord.kind)), |
|
requestId: asString(specRecord.requestId), |
|
idempotencyKey: asString(specRecord.idempotencyKey), |
|
}; |
|
if (!spec.task) continue; |
|
const retryEnvelope: RetryEnvelope = { |
|
taskKey, |
|
taskId: asString(obj.task_id), |
|
requesterSessionKey: orchestratorSessionKey, |
|
spec, |
|
failedSessionKey: "restored-from-db", |
|
outcome: asString(obj.last_outcome) ?? "restored", |
|
attempt: asPositiveInt(obj.attempt, 1), |
|
runAfterMs: asNonNegativeInt(obj.next_run_at, Date.now()), |
|
}; |
|
pendingRetryByTask.set(taskKey, retryEnvelope); |
|
} catch (error) { |
|
api.logger.warn( |
|
`orchestrator: failed to parse queued retry spec for ${taskKey}: ${error instanceof Error ? error.message : String(error)}`, |
|
); |
|
} |
|
} |
|
|
|
setInterval(() => { |
|
const now = Date.now(); |
|
const inFlight = [...workerStateBySession.values()].filter((item) => item.status === "running").length; |
|
let availableSlots = Math.max(0, schedulerMaxConcurrentWorkers - inFlight); |
|
for (const [taskKey, retryEnvelope] of [...pendingRetryByTask.entries()]) { |
|
if (retryEnvelope.runAfterMs > now) continue; |
|
if (availableSlots <= 0) break; |
|
const retryText = buildRetrySystemEvent( |
|
retryEnvelope.spec, |
|
retryEnvelope.failedSessionKey, |
|
retryEnvelope.outcome, |
|
retryEnvelope.attempt, |
|
maxRetriesPerTask, |
|
); |
|
const queued = api.runtime.system.enqueueSystemEvent(retryText, { |
|
sessionKey: retryEnvelope.requesterSessionKey, |
|
}); |
|
if (!queued) continue; |
|
if (retryEnvelope.taskId) { |
|
setTaskState(retryEnvelope.taskId, "queued", { outcome: retryEnvelope.outcome }); |
|
} |
|
pendingRetryByTask.delete(taskKey); |
|
availableSlots -= 1; |
|
emitStatusUpdate(retryEnvelope.requesterSessionKey, "retry-dispatched"); |
|
touchOrchestrator(retryEnvelope.requesterSessionKey); |
|
api.runtime.system.requestHeartbeatNow({ |
|
reason: "orchestrator-scheduler-retry-dispatch", |
|
sessionKey: retryEnvelope.requesterSessionKey, |
|
}); |
|
} |
|
const staleVerificationRows = db.prepare(` |
|
SELECT task_id, orchestrator_session_key, receipt_seen, verification_items, explicit_no_verification |
|
FROM orchestrator_tasks |
|
WHERE state = 'needs_verification' AND updated_at <= ? |
|
LIMIT 50 |
|
`).all(now - verificationGraceMs) as unknown[]; |
|
for (const row of staleVerificationRows) { |
|
const obj = asRecord(row); |
|
const taskId = asString(obj.task_id); |
|
const orchestratorSessionKey = asString(obj.orchestrator_session_key); |
|
if (!taskId) continue; |
|
const receiptSeen = asNonNegativeInt(obj.receipt_seen, 0) > 0; |
|
const verificationItems = asNonNegativeInt(obj.verification_items, 0); |
|
const explicitNoVerification = asNonNegativeInt(obj.explicit_no_verification, 0) > 0; |
|
if (receiptSeen && (verificationItems > 0 || explicitNoVerification)) { |
|
setTaskState(taskId, "done", { outcome: "auto-verified-after-grace-window" }); |
|
} else { |
|
setTaskState(taskId, "needs_input", { |
|
outcome: receiptSeen ? "verification-timeout-unverified" : "verification-timeout-missing-receipt", |
|
}); |
|
} |
|
if (orchestratorSessionKey) emitStatusUpdate(orchestratorSessionKey, "verification-autocomplete"); |
|
} |
|
for (const orchestratorSessionKey of [...orchestratorSessions]) { |
|
const activeTasks = countActiveTasks(orchestratorSessionKey); |
|
const hasPendingRetry = [...pendingRetryByTask.values()].some( |
|
(entry) => entry.requesterSessionKey === orchestratorSessionKey, |
|
); |
|
const workerKeys = workersByOrchestrator.get(orchestratorSessionKey); |
|
const hasRunningWorker = Boolean( |
|
workerKeys && [...workerKeys].some((key) => workerStateBySession.get(key)?.status === "running"), |
|
); |
|
const lastTouch = orchestratorLastTouchBySession.get(orchestratorSessionKey) ?? now; |
|
const idleForMs = now - lastTouch; |
|
if (activeTasks === 0 && !hasPendingRetry && !hasRunningWorker && idleForMs >= ORCHESTRATOR_IDLE_RETIRE_MS) { |
|
retireOrchestratorSession(orchestratorSessionKey, `idle-for-${idleForMs}ms`); |
|
} |
|
} |
|
}, schedulerTickMs); |
|
|
|
api.logger.info( |
|
`orchestrator: loaded with durable state db=${stateDbPath}, statusChannel=${statusChannel}, maxConcurrent=${schedulerMaxConcurrentWorkers}`, |
|
); |
|
|
|
api.on("before_prompt_build", async (event, ctx) => { |
|
if (ctx.sessionKey) { |
|
const latestUser = extractLastUserMessage(event.messages); |
|
if (latestUser && !isOrchestratorEchoMessage(latestUser)) { |
|
lastUserMessageBySession.set(ctx.sessionKey, latestUser); |
|
} |
|
} |
|
if (!injectOrchestratorPrompt) return; |
|
if (!isFrontControllerSessionKey(ctx.sessionKey)) return; |
|
const knownOrchestrator = ctx.sessionKey ? orchestratorByRequester.get(ctx.sessionKey) : undefined; |
|
let statusBlock = ""; |
|
if (knownOrchestrator) { |
|
const summary = readStatusSummary(knownOrchestrator); |
|
const workerKeys = workersByOrchestrator.get(knownOrchestrator); |
|
const workers = workerKeys |
|
? [...workerKeys].map((key) => workerStateBySession.get(key)).filter((item): item is WorkerState => Boolean(item)) |
|
: []; |
|
const header = [ |
|
"[Orchestrator status snapshot]", |
|
`- queued=${summary.queued} leased=${summary.leased} running=${summary.running} needs_input=${summary.needsInput} needs_verification=${summary.needsVerification} done=${summary.done} failed=${summary.failed} dead=${summary.dead}`, |
|
]; |
|
const detail = workers.length > 0 ? buildWorkerStatusSnapshot(knownOrchestrator, workers) : "- No active worker sessions tracked."; |
|
statusBlock = `\n${header.join("\n")}\n${detail}\n`; |
|
} |
|
return { |
|
prependContext: `${buildOrchestratorPrompt(maxRetriesPerTask, knownOrchestrator)}${statusBlock}`, |
|
}; |
|
}); |
|
|
|
api.on("before_tool_call", async (event, ctx) => { |
|
const sessionKey = ctx.sessionKey; |
|
if (sessionKey && retiredOrchestratorSessions.has(sessionKey)) { |
|
return { |
|
block: true, |
|
blockReason: |
|
"This hidden orchestrator session has been retired after completion/idle. " + |
|
"Use sessions_spawn from the front-controller to create a fresh orchestrator if needed.", |
|
}; |
|
} |
|
const isOrchestratorSession = sessionKey ? orchestratorSessions.has(sessionKey) : false; |
|
const isFrontSession = isFrontControllerSessionKey(sessionKey) && !isOrchestratorSession; |
|
|
|
if (enforceFridayChannels && event.toolName === "message") { |
|
const params = asRecord(event.params); |
|
const target = |
|
asString(params.channel) ?? |
|
asString(params.to) ?? |
|
asString(params.target) ?? |
|
asString(params.threadId) ?? |
|
asString(params.channelId); |
|
if (target && !isAllowedFridayTarget(target)) { |
|
return { |
|
block: true, |
|
blockReason: |
|
`Message target '${target}' blocked by orchestrator policy. ` + |
|
"Allowed targets: #friday, #friday-*, or #friday-forum channels.", |
|
}; |
|
} |
|
} |
|
|
|
if (enforceMainDelegation && isFrontSession) { |
|
if (!allowedMainTools.has(event.toolName)) { |
|
const orchestratorSession = sessionKey ? orchestratorByRequester.get(sessionKey) : undefined; |
|
const suffix = orchestratorSession |
|
? ` Use sessions_send with sessionKey='${orchestratorSession}' to delegate work.` |
|
: " Use sessions_spawn to start a hidden orchestrator first."; |
|
return { |
|
block: true, |
|
blockReason: `Front-controller session tool '${event.toolName}' blocked by orchestrator policy.${suffix}`, |
|
}; |
|
} |
|
} |
|
|
|
if (isOrchestratorSession && !allowedOrchestratorTools.has(event.toolName)) { |
|
const blockKey = `${event.runId ?? "run"}|${sessionKey ?? "session"}|${event.toolName}`; |
|
const blockedAttempts = (blockedAttemptsByRun.get(blockKey) ?? 0) + 1; |
|
blockedAttemptsByRun.set(blockKey, blockedAttempts); |
|
if (blockedAttempts >= 3) { |
|
api.logger.warn( |
|
`orchestrator: allowing direct tool fallback after repeated policy blocks (${event.toolName}, attempts=${blockedAttempts})`, |
|
); |
|
} else { |
|
const hint = buildDelegationHint(event.toolName, event.params); |
|
if (sessionKey) lastDelegatedTaskByOrchestrator.set(sessionKey, hint); |
|
return { |
|
block: true, |
|
blockReason: hint, |
|
}; |
|
} |
|
} |
|
|
|
if (!sessionKey) return; |
|
|
|
if (isOrchestratorSession && event.toolName === "sessions_send") { |
|
const requesterSession = requesterByOrchestrator.get(sessionKey); |
|
if (requesterSession) { |
|
const params = asRecord(event.params); |
|
const hasSessionKey = Boolean(asString(params.sessionKey)); |
|
const hasLabel = Boolean(asString(params.label)); |
|
const msg = extractOutboundText(params); |
|
const explicitSessionKey = asString(params.sessionKey); |
|
const impliedSessionKey = !hasSessionKey && !hasLabel ? requesterSession : undefined; |
|
const targetSessionKey = explicitSessionKey ?? impliedSessionKey; |
|
if (msg && targetSessionKey === requesterSession) { |
|
const now = Date.now(); |
|
const target = extractOutboundTarget(params, requesterSession); |
|
const idempotencyKey = deriveOutboundIdempotencyKey(sessionKey, params, msg); |
|
if (shouldSuppressOutbound(target, idempotencyKey, now)) { |
|
api.logger.info( |
|
`orchestrator: suppressed duplicate sessions_send to ${requesterSession} (idempotency=${idempotencyKey})`, |
|
); |
|
return { |
|
block: true, |
|
blockReason: |
|
`Duplicate orchestrator update suppressed (idempotency=${idempotencyKey}). ` + |
|
"Poll current status instead of re-announcing unchanged state.", |
|
}; |
|
} |
|
} |
|
if (msg) { |
|
touchOrchestrator(sessionKey); |
|
lastDelegatedTaskByOrchestrator.set(sessionKey, msg); |
|
const receipts = parseWorkerReceipts(msg); |
|
for (const receipt of receipts) { |
|
markTaskReceipt(receipt.taskId, receipt); |
|
if (receipt.status === "completed") { |
|
if (receipt.verificationCount > 0) { |
|
setTaskState(receipt.taskId, "needs_verification", { |
|
outcome: "worker-reported-completed-awaiting-verification", |
|
}); |
|
} else if (receipt.explicitNoVerification) { |
|
setTaskState(receipt.taskId, "done", { outcome: "worker-reported-completed-no-verification" }); |
|
} else { |
|
setTaskState(receipt.taskId, "needs_verification", { |
|
outcome: "worker-reported-completed-missing-verification", |
|
}); |
|
} |
|
} else if (receipt.status === "blocked") { |
|
setTaskState(receipt.taskId, "needs_input", { outcome: "worker-reported-blocked" }); |
|
} else if (receipt.status === "failed") { |
|
setTaskState(receipt.taskId, "failed", { outcome: "worker-reported-failed" }); |
|
} |
|
} |
|
if (receipts.length > 0) emitStatusUpdate(sessionKey, "worker-receipt"); |
|
} |
|
if (!hasSessionKey && !hasLabel) { |
|
return { |
|
params: { |
|
...params, |
|
sessionKey: requesterSession, |
|
}, |
|
}; |
|
} |
|
} |
|
} |
|
|
|
if (isOrchestratorSession && event.toolName === "message") { |
|
const params = asRecord(event.params); |
|
const text = extractOutboundText(params); |
|
if (text) { |
|
const target = extractOutboundTarget(params, requesterByOrchestrator.get(sessionKey)); |
|
const idempotencyKey = deriveOutboundIdempotencyKey(sessionKey, params, text); |
|
const now = Date.now(); |
|
if (shouldSuppressOutbound(target, idempotencyKey, now)) { |
|
api.logger.info( |
|
`orchestrator: suppressed duplicate message to ${target} (idempotency=${idempotencyKey})`, |
|
); |
|
return { |
|
block: true, |
|
blockReason: `Duplicate outbound message suppressed (idempotency=${idempotencyKey}).`, |
|
}; |
|
} |
|
} |
|
} |
|
|
|
if (isFrontSession && event.toolName === "sessions_send") { |
|
const params = asRecord(event.params); |
|
const knownOrchestrator = orchestratorByRequester.get(sessionKey); |
|
const sessionTarget = asString(params.sessionKey); |
|
if (knownOrchestrator && (!sessionTarget || sessionTarget === knownOrchestrator)) { |
|
const msg = asString(params.message); |
|
if (msg) lastDelegatedTaskByOrchestrator.set(knownOrchestrator, msg); |
|
} |
|
} |
|
|
|
if (isFrontSession && (event.toolName === "session_status" || event.toolName === "sessions_history")) { |
|
const knownOrchestrator = orchestratorByRequester.get(sessionKey); |
|
if (knownOrchestrator) { |
|
const params = asRecord(event.params); |
|
const hasSessionKey = Boolean(asString(params.sessionKey)); |
|
const hasLabel = Boolean(asString(params.label)); |
|
if (!hasSessionKey && !hasLabel) { |
|
return { |
|
params: { |
|
...params, |
|
sessionKey: knownOrchestrator, |
|
}, |
|
}; |
|
} |
|
} |
|
} |
|
|
|
if (event.toolName !== "sessions_spawn") return; |
|
|
|
const params = asRecord(event.params); |
|
const rawTask = asString(params.task); |
|
const knownOrchestrator = orchestratorByRequester.get(sessionKey); |
|
const delegatedTask = isOrchestratorSession ? lastDelegatedTaskByOrchestrator.get(sessionKey) : undefined; |
|
const userFallbackTask = lastUserMessageBySession.get(sessionKey); |
|
const task = |
|
rawTask ?? |
|
delegatedTask ?? |
|
userFallbackTask ?? |
|
(isOrchestratorSession |
|
? "Delegate the current thread objective to a worker and report results." |
|
: "Initialize hidden thread orchestrator and continue the current user objective."); |
|
|
|
let nextParams: Record<string, unknown> = { ...params, task }; |
|
let pendingKind: "orchestrator" | "worker" = "worker"; |
|
let pendingRequesterSessionKey = sessionKey; |
|
let taskRequesterSessionKey = requesterByOrchestrator.get(sessionKey) ?? sessionKey; |
|
let taskOrchestratorSessionKey = sessionKey; |
|
|
|
if (isFrontSession) { |
|
if (knownOrchestrator) { |
|
const requestedLabel = sanitizeLabel(asString(params.label)); |
|
const routedKind = inferSpawnKind(task, requestedLabel); |
|
taskRequesterSessionKey = sessionKey; |
|
taskOrchestratorSessionKey = knownOrchestrator; |
|
pendingRequesterSessionKey = knownOrchestrator; |
|
nextParams = { |
|
...nextParams, |
|
label: requestedLabel ?? `${routedKind}-${shortHash(task)}`, |
|
// Prevent thread/session UI spam when front-controller is already bound. |
|
thread: false, |
|
}; |
|
api.logger.info( |
|
`orchestrator: front sessions_spawn rerouted as worker dispatch via existing orchestrator ${knownOrchestrator}`, |
|
); |
|
} else { |
|
const defaultLabel = `${orchestratorLabelPrefix}-${shortHash(sessionKey)}`; |
|
nextParams = { |
|
...nextParams, |
|
label: sanitizeLabel(asString(params.label)) ?? defaultLabel, |
|
task: buildHiddenOrchestratorTask(sessionKey, task), |
|
thread: false, |
|
}; |
|
pendingKind = "orchestrator"; |
|
} |
|
} |
|
|
|
const effectiveTask = asString(nextParams.task) ?? task; |
|
const requestedKind = normalizeSpawnKind(asString(nextParams.kind) ?? asString(params.kind)); |
|
const inferredKind = requestedKind ?? inferSpawnKind( |
|
effectiveTask, |
|
asString(nextParams.label) ?? asString(params.label), |
|
); |
|
const requestId = |
|
asString(nextParams.requestId) ?? |
|
asString(nextParams.planningRequestId) ?? |
|
asString(params.requestId) ?? |
|
asString(params.planningRequestId) ?? |
|
(inferredKind === "planner" ? (event.runId ?? `${sessionKey}:${shortHash(effectiveTask)}`) : undefined); |
|
let spec: SpawnSpec = { |
|
task: effectiveTask, |
|
label: asString(nextParams.label), |
|
agentId: asString(nextParams.agentId), |
|
model: asString(nextParams.model), |
|
thinking: asString(nextParams.thinking), |
|
mode: asString(nextParams.mode), |
|
thread: typeof nextParams.thread === "boolean" ? nextParams.thread : undefined, |
|
kind: inferredKind, |
|
requestId, |
|
idempotencyKey: |
|
asString(nextParams.idempotencyKey) ?? |
|
asString(asRecord(nextParams.metadata).idempotencyKey) ?? |
|
asString(asRecord(nextParams.meta).idempotencyKey), |
|
}; |
|
|
|
let taskKey: string | undefined; |
|
let taskId: string | undefined; |
|
let attempt: number | undefined; |
|
if (pendingKind === "worker") { |
|
const requesterSessionKey = taskRequesterSessionKey; |
|
const orchestratorSessionKey = taskOrchestratorSessionKey; |
|
taskKey = stableTaskKey(requesterSessionKey, spec); |
|
const activeTaskId = findActiveTaskIdByTaskKey(taskKey); |
|
if (activeTaskId) { |
|
touchOrchestrator(orchestratorSessionKey); |
|
const dedupeSummary = [ |
|
`taskKey=${taskKey.slice(0, 12)}`, |
|
`activeTask=${activeTaskId}`, |
|
`kind=${spec.kind ?? "worker"}`, |
|
`requestId=${spec.requestId ?? "none"}`, |
|
`label=${spec.label ?? "none"}`, |
|
].join(" "); |
|
api.logger.warn( |
|
`orchestrator: sessions_spawn deduped duplicate worker dispatch (${dedupeSummary})`, |
|
); |
|
return { |
|
block: true, |
|
blockReason: |
|
`Duplicate worker task suppressed; task ${activeTaskId} is already active. ` + |
|
"Use session_status/sessions_history to poll progress instead of spawning again. " + |
|
`(${dedupeSummary})`, |
|
}; |
|
} |
|
attempt = (attemptCountByTask.get(taskKey) ?? 0) + 1; |
|
attemptCountByTask.set(taskKey, attempt); |
|
taskId = makeTaskId(taskKey, attempt); |
|
const normalizedTask = stripWorkerTaskPacket(spec.task); |
|
spec = { |
|
...spec, |
|
task: buildWorkerTaskPacket(taskId, attempt, normalizedTask), |
|
}; |
|
nextParams = { |
|
...nextParams, |
|
task: spec.task, |
|
}; |
|
upsertTaskRow( |
|
taskId, |
|
taskKey, |
|
requesterSessionKey, |
|
orchestratorSessionKey, |
|
spec, |
|
"leased", |
|
attempt, |
|
); |
|
} |
|
|
|
const pendingKey = makePendingKey(event.runId, event.toolCallId, sessionKey); |
|
pendingSpawns.set(pendingKey, { |
|
requesterSessionKey: pendingRequesterSessionKey, |
|
spec, |
|
kind: pendingKind, |
|
taskKey, |
|
taskId, |
|
attempt, |
|
}); |
|
|
|
return { params: nextParams }; |
|
}); |
|
|
|
api.on("after_tool_call", async (event, ctx) => { |
|
if (event.toolName !== "sessions_spawn") return; |
|
|
|
const pendingKey = makePendingKey(event.runId, event.toolCallId, ctx.sessionKey); |
|
const pending = pendingSpawns.get(pendingKey); |
|
if (!pending) return; |
|
pendingSpawns.delete(pendingKey); |
|
|
|
if (!isSpawnAccepted(event.result)) { |
|
if (pending.taskId) { |
|
setTaskState(pending.taskId, "failed", { outcome: "spawn-not-accepted" }); |
|
} |
|
api.logger.warn( |
|
`orchestrator: sessions_spawn was not accepted for ${pending.requesterSessionKey}; skipping bind/worker tracking`, |
|
); |
|
return; |
|
} |
|
|
|
const childSessionKey = extractChildSessionKey(event.result); |
|
if (!childSessionKey) { |
|
if (pending.taskId) { |
|
setTaskState(pending.taskId, "failed", { outcome: "missing-child-session-key" }); |
|
} |
|
api.logger.warn( |
|
`orchestrator: sessions_spawn accepted without childSessionKey for ${pending.requesterSessionKey}`, |
|
); |
|
return; |
|
} |
|
|
|
spawnByChildSession.set(childSessionKey, pending); |
|
|
|
if (pending.kind === "orchestrator") { |
|
orchestratorSessions.add(childSessionKey); |
|
orchestratorByRequester.set(pending.requesterSessionKey, childSessionKey); |
|
requesterByOrchestrator.set(childSessionKey, pending.requesterSessionKey); |
|
setBinding(pending.requesterSessionKey, childSessionKey); |
|
emitStatusUpdate(childSessionKey, "orchestrator-bound"); |
|
api.logger.info( |
|
`orchestrator: bound hidden orchestrator ${childSessionKey} -> requester ${pending.requesterSessionKey}`, |
|
); |
|
return; |
|
} |
|
|
|
const orchestratorSessionKey = pending.requesterSessionKey; |
|
const startedAtMs = Date.now(); |
|
touchOrchestrator(orchestratorSessionKey); |
|
const workerState: WorkerState = { |
|
childSessionKey, |
|
orchestratorSessionKey, |
|
label: pending.spec.label, |
|
taskPreview: taskPreview(pending.spec.task), |
|
status: "running", |
|
startedAtMs, |
|
updatedAtMs: startedAtMs, |
|
taskId: pending.taskId, |
|
taskKey: pending.taskKey, |
|
attempt: pending.attempt, |
|
}; |
|
workerStateBySession.set(childSessionKey, workerState); |
|
const workerSet = workersByOrchestrator.get(orchestratorSessionKey) ?? new Set<string>(); |
|
workerSet.add(childSessionKey); |
|
workersByOrchestrator.set(orchestratorSessionKey, workerSet); |
|
if (pending.taskId) { |
|
setTaskState(pending.taskId, "running", { outcome: "worker-started" }); |
|
} |
|
upsertWorkerRow(workerState); |
|
emitStatusUpdate(orchestratorSessionKey, "worker-spawned"); |
|
api.logger.info( |
|
`orchestrator: worker spawned ${childSessionKey} under ${orchestratorSessionKey} (${workerState.label ?? "no-label"})`, |
|
); |
|
}); |
|
|
|
api.on("subagent_spawned", async (event, ctx) => { |
|
const childSessionKey = event.childSessionKey; |
|
const requesterSessionKey = ctx.requesterSessionKey; |
|
if (!childSessionKey || !requesterSessionKey) return; |
|
if (!orchestratorSessions.has(requesterSessionKey)) return; |
|
const state = workerStateBySession.get(childSessionKey); |
|
if (!state) return; |
|
state.updatedAtMs = Date.now(); |
|
workerStateBySession.set(childSessionKey, state); |
|
upsertWorkerRow(state); |
|
}); |
|
|
|
api.on("subagent_delivery_target", async (event) => { |
|
const requesterSessionKey = event.requesterSessionKey; |
|
if (!requesterSessionKey) return; |
|
if (!orchestratorSessions.has(requesterSessionKey)) return; |
|
const state = workerStateBySession.get(event.childSessionKey); |
|
if (!state) return; |
|
state.updatedAtMs = Date.now(); |
|
workerStateBySession.set(event.childSessionKey, state); |
|
upsertWorkerRow(state); |
|
}); |
|
|
|
api.on("subagent_ended", async (event) => { |
|
if (event.targetKind !== "subagent") return; |
|
|
|
const childSessionKey = event.targetSessionKey; |
|
const workerState = workerStateBySession.get(childSessionKey); |
|
if (workerState) { |
|
const status = (asString(event.outcome)?.toLowerCase() ?? "unknown") as WorkerStatus; |
|
workerState.status = status; |
|
workerState.outcome = asString(event.outcome); |
|
workerState.updatedAtMs = Date.now(); |
|
workerStateBySession.set(childSessionKey, workerState); |
|
upsertWorkerRow(workerState); |
|
if (workerState.taskId) { |
|
const currentState = readTaskState(workerState.taskId); |
|
if (status === "ok" || status === "success" || status === "completed") { |
|
if (currentState !== "done" && currentState !== "failed" && currentState !== "dead" && currentState !== "needs_input") { |
|
setTaskState(workerState.taskId, "needs_verification", { outcome: "worker-ended-ok" }); |
|
} |
|
} else if (status === "unknown") { |
|
if (currentState !== "done" && currentState !== "dead") { |
|
setTaskState(workerState.taskId, "failed", { outcome: "worker-ended-unknown" }); |
|
} |
|
} else { |
|
if (currentState !== "done" && currentState !== "dead") { |
|
setTaskState(workerState.taskId, "failed", { outcome: `worker-ended-${status}` }); |
|
} |
|
} |
|
} |
|
emitStatusUpdate(workerState.orchestratorSessionKey, "worker-ended"); |
|
} |
|
|
|
if (orchestratorSessions.has(childSessionKey)) { |
|
retireOrchestratorSession(childSessionKey, "session-ended"); |
|
api.logger.info(`orchestrator: hidden orchestrator ended ${childSessionKey}`); |
|
} |
|
|
|
const pending = spawnByChildSession.get(childSessionKey); |
|
spawnByChildSession.delete(childSessionKey); |
|
if (!pending) return; |
|
if (pending.kind === "orchestrator") { |
|
return; |
|
} |
|
if (!autoRetryFailedSubagents) return; |
|
|
|
const outcome = asString(event.outcome)?.toLowerCase() ?? "unknown"; |
|
if (!retryOutcomes.has(outcome as RetryOutcome)) { |
|
return; |
|
} |
|
|
|
const taskKey = pending.taskKey ?? stableTaskKey(pending.requesterSessionKey, pending.spec); |
|
const retries = retryCountByTask.get(taskKey) ?? 0; |
|
if (retries >= maxRetriesPerTask) { |
|
if (pending.taskId) setTaskState(pending.taskId, "dead", { outcome: `retry-exhausted-${outcome}` }); |
|
api.logger.warn( |
|
`orchestrator: retry budget exhausted (${retries}/${maxRetriesPerTask}) for ${childSessionKey}`, |
|
); |
|
const orchestratorSessionKey = pending.requesterSessionKey; |
|
const requesterSessionKey = requesterByOrchestrator.get(orchestratorSessionKey); |
|
if (requesterSessionKey) { |
|
const incidentEvent = buildIncidentSystemEvent( |
|
requesterSessionKey, |
|
orchestratorSessionKey, |
|
statusChannel, |
|
"retry-budget-exhausted", |
|
`taskKey=${taskKey} child=${childSessionKey} outcome=${outcome}`, |
|
); |
|
api.runtime.system.enqueueSystemEvent(incidentEvent, { sessionKey: requesterSessionKey }); |
|
} |
|
emitStatusUpdate(orchestratorSessionKey, "retry-exhausted"); |
|
return; |
|
} |
|
|
|
const nextAttempt = retries + 1; |
|
setRetryCount(taskKey, nextAttempt); |
|
const delayMs = computeRetryDelayMs(retryBaseBackoffMs, retryMaxBackoffMs, nextAttempt); |
|
const runAfterMs = Date.now() + delayMs; |
|
pendingRetryByTask.set(taskKey, { |
|
taskKey, |
|
taskId: pending.taskId, |
|
requesterSessionKey: pending.requesterSessionKey, |
|
spec: pending.spec, |
|
failedSessionKey: childSessionKey, |
|
outcome, |
|
attempt: nextAttempt, |
|
runAfterMs, |
|
}); |
|
if (pending.taskId) { |
|
setTaskState(pending.taskId, "queued", { |
|
nextRunAt: runAfterMs, |
|
outcome: `queued-retry-${nextAttempt}-${outcome}`, |
|
}); |
|
} |
|
emitStatusUpdate(pending.requesterSessionKey, "retry-queued"); |
|
api.runtime.system.requestHeartbeatNow({ |
|
reason: "orchestrator-subagent-retry-queued", |
|
sessionKey: pending.requesterSessionKey, |
|
}); |
|
|
|
api.logger.info( |
|
`orchestrator: scheduled retry ${nextAttempt}/${maxRetriesPerTask} for ${childSessionKey} in ${delayMs}ms`, |
|
); |
|
}); |
|
|
|
api.on("session_end", async (event) => { |
|
const sessionKey = event.sessionKey; |
|
if (!sessionKey) return; |
|
|
|
const knownOrchestrator = orchestratorByRequester.get(sessionKey); |
|
if (knownOrchestrator) { |
|
retireOrchestratorSession(knownOrchestrator, "requester-session-ended"); |
|
} |
|
|
|
if (orchestratorSessions.has(sessionKey)) { |
|
retireOrchestratorSession(sessionKey, "orchestrator-session-ended"); |
|
} |
|
|
|
for (const [taskKey, retryEnvelope] of [...pendingRetryByTask.entries()]) { |
|
if (retryEnvelope.requesterSessionKey === sessionKey) pendingRetryByTask.delete(taskKey); |
|
} |
|
|
|
workerStateBySession.delete(sessionKey); |
|
deleteWorkerRow(sessionKey); |
|
lastUserMessageBySession.delete(sessionKey); |
|
lastDelegatedTaskByOrchestrator.delete(sessionKey); |
|
}); |
|
} |