|
import { createHash } from "node:crypto"; |
|
import type { OpenClawPluginApi } from "openclaw/plugin-sdk"; |
|
|
|
type RetryOutcome = "error" | "timeout" | "killed" | "reset" | "deleted"; |
|
|
|
type SpawnSpec = { |
|
task: string; |
|
label?: string; |
|
agentId?: string; |
|
model?: string; |
|
thinking?: string; |
|
mode?: string; |
|
thread?: boolean; |
|
}; |
|
|
|
type PluginConfig = { |
|
injectOrchestratorPrompt?: boolean; |
|
enforceMainDelegation?: boolean; |
|
autoRetryFailedSubagents?: boolean; |
|
maxRetriesPerTask?: number; |
|
retryOutcomes?: RetryOutcome[]; |
|
allowedMainTools?: string[]; |
|
allowedOrchestratorTools?: string[]; |
|
orchestratorLabelPrefix?: string; |
|
}; |
|
|
|
type PendingSpawn = { |
|
requesterSessionKey: string; |
|
spec: SpawnSpec; |
|
kind: "orchestrator" | "worker"; |
|
}; |
|
|
|
type WorkerStatus = "running" | "ok" | "error" | "timeout" | "killed" | "reset" | "deleted" | "unknown"; |
|
|
|
type WorkerState = { |
|
childSessionKey: string; |
|
orchestratorSessionKey: string; |
|
label?: string; |
|
taskPreview: string; |
|
status: WorkerStatus; |
|
startedAtMs: number; |
|
updatedAtMs: number; |
|
outcome?: string; |
|
}; |
|
|
|
const DEFAULT_ALLOWED_MAIN_TOOLS = [ |
|
"sessions_spawn", |
|
"sessions_send", |
|
"sessions_list", |
|
"sessions_history", |
|
"session_status", |
|
"memory_search", |
|
"memory_get", |
|
]; |
|
|
|
const DEFAULT_ALLOWED_ORCHESTRATOR_TOOLS = [ |
|
"sessions_spawn", |
|
"sessions_send", |
|
"sessions_list", |
|
"sessions_history", |
|
"session_status", |
|
"memory_search", |
|
"memory_get", |
|
]; |
|
|
|
const DEFAULT_RETRY_OUTCOMES: RetryOutcome[] = ["error", "timeout"]; |
|
const STRICT_ROUTER_TOOLS = new Set([ |
|
"sessions_spawn", |
|
"sessions_send", |
|
"sessions_list", |
|
"sessions_history", |
|
"session_status", |
|
"memory_search", |
|
"memory_get", |
|
]); |
|
|
|
function asRecord(value: unknown): Record<string, unknown> { |
|
if (!value || typeof value !== "object" || Array.isArray(value)) return {}; |
|
return value as Record<string, unknown>; |
|
} |
|
|
|
function asString(value: unknown): string | undefined { |
|
if (typeof value !== "string") return undefined; |
|
const trimmed = value.trim(); |
|
return trimmed.length > 0 ? trimmed : undefined; |
|
} |
|
|
|
function asBoolean(value: unknown, fallback: boolean): boolean { |
|
return typeof value === "boolean" ? value : fallback; |
|
} |
|
|
|
function asPositiveInt(value: unknown, fallback: number): number { |
|
if (typeof value !== "number" || !Number.isFinite(value)) return fallback; |
|
const floored = Math.floor(value); |
|
return floored > 0 ? floored : fallback; |
|
} |
|
|
|
function asStringArray(value: unknown): string[] | undefined { |
|
if (!Array.isArray(value)) return undefined; |
|
const out = value |
|
.map((item) => (typeof item === "string" ? item.trim() : "")) |
|
.filter((item) => item.length > 0); |
|
return out.length > 0 ? out : undefined; |
|
} |
|
|
|
function resolveRetryOutcomes(value: unknown): Set<RetryOutcome> { |
|
const fromCfg = asStringArray(value); |
|
const raw = fromCfg ?? DEFAULT_RETRY_OUTCOMES; |
|
const allowed = new Set<RetryOutcome>(["error", "timeout", "killed", "reset", "deleted"]); |
|
const selected = raw.filter((entry): entry is RetryOutcome => allowed.has(entry as RetryOutcome)); |
|
return new Set(selected.length > 0 ? selected : DEFAULT_RETRY_OUTCOMES); |
|
} |
|
|
|
function isFrontControllerSessionKey(sessionKey: string | undefined): boolean { |
|
if (!sessionKey) return false; |
|
if (sessionKey.includes(":subagent:")) return false; |
|
return /^agent:/i.test(sessionKey); |
|
} |
|
|
|
function shortHash(input: string): string { |
|
return createHash("sha256").update(input).digest("hex").slice(0, 10); |
|
} |
|
|
|
function sanitizeLabel(input: string | undefined): string | undefined { |
|
const raw = (input ?? "").trim(); |
|
if (!raw) return undefined; |
|
const compact = raw.replace(/\s+/g, "-").toLowerCase(); |
|
const safe = compact.replace(/[^a-z0-9._-]/g, ""); |
|
return safe.slice(0, 64) || undefined; |
|
} |
|
|
|
function taskPreview(input: string | undefined, max = 180): string { |
|
const raw = (input ?? "").trim(); |
|
if (!raw) return "(no task)"; |
|
const compact = raw.replace(/\s+/g, " "); |
|
return compact.length <= max ? compact : `${compact.slice(0, max - 1)}…`; |
|
} |
|
|
|
function asSessionMessage(value: unknown): string | undefined { |
|
if (typeof value === "string") return asString(value); |
|
if (!Array.isArray(value)) return undefined; |
|
const chunks: string[] = []; |
|
for (const part of value) { |
|
const obj = asRecord(part); |
|
if (obj.type !== "text") continue; |
|
const text = asString(obj.text); |
|
if (text) chunks.push(text); |
|
} |
|
const joined = chunks.join("\n").trim(); |
|
return joined.length > 0 ? joined : undefined; |
|
} |
|
|
|
function extractLastUserMessage(messages: unknown[]): string | undefined { |
|
for (let i = messages.length - 1; i >= 0; i -= 1) { |
|
const obj = asRecord(messages[i]); |
|
if (asString(obj.role) !== "user") continue; |
|
const content = asSessionMessage(obj.content); |
|
if (content) return content; |
|
} |
|
return undefined; |
|
} |
|
|
|
function buildWorkerStatusSnapshot(orchestratorSessionKey: string, workers: Iterable<WorkerState>): string { |
|
const items = [...workers].sort((a, b) => b.updatedAtMs - a.updatedAtMs).slice(0, 12); |
|
if (items.length === 0) { |
|
return `- Hidden orchestrator ${orchestratorSessionKey} has no tracked worker sessions.`; |
|
} |
|
return [ |
|
`- Hidden orchestrator ${orchestratorSessionKey}`, |
|
`- Workers tracked: ${items.length}`, |
|
...items.map( |
|
(worker) => |
|
`- [${worker.status}] ${worker.childSessionKey} (${worker.label ?? "no-label"}) :: ${worker.taskPreview}`, |
|
), |
|
].join("\n"); |
|
} |
|
|
|
function makePendingKey( |
|
runId: string | undefined, |
|
toolCallId: string | undefined, |
|
sessionKey: string | undefined, |
|
): string { |
|
return `${runId ?? ""}|${toolCallId ?? ""}|${sessionKey ?? ""}`; |
|
} |
|
|
|
function stableTaskKey(requesterSessionKey: string, spec: SpawnSpec): string { |
|
const normalized = [ |
|
requesterSessionKey, |
|
spec.agentId ?? "", |
|
spec.label ?? "", |
|
spec.task.trim(), |
|
].join("\n"); |
|
return createHash("sha256").update(normalized).digest("hex"); |
|
} |
|
|
|
function extractChildSessionKey(value: unknown, depth = 0): string | undefined { |
|
if (depth > 4) return undefined; |
|
const obj = asRecord(value); |
|
const direct = asString(obj.childSessionKey); |
|
if (direct) return direct; |
|
for (const nested of Object.values(obj)) { |
|
if (!nested || typeof nested !== "object") continue; |
|
const found = extractChildSessionKey(nested, depth + 1); |
|
if (found) return found; |
|
} |
|
return undefined; |
|
} |
|
|
|
function isSpawnAccepted(value: unknown): boolean { |
|
const obj = asRecord(value); |
|
const details = asRecord(obj.details); |
|
const status = |
|
asString(obj.status)?.toLowerCase() ?? |
|
asString(details.status)?.toLowerCase(); |
|
if (status) { |
|
return status === "accepted" || status === "ok" || status === "success"; |
|
} |
|
if (obj.isError === true || details.isError === true) return false; |
|
if (asString(obj.error) || asString(details.error)) return false; |
|
return true; |
|
} |
|
|
|
function isOrchestratorEchoMessage(text: string): boolean { |
|
const normalized = text.toLowerCase(); |
|
return ( |
|
normalized.includes("[orchestrator mode is active]") && |
|
normalized.includes("spawn exactly one hidden orchestrator session") |
|
); |
|
} |
|
|
|
function buildOrchestratorPrompt(maxRetries: number, orchestratorSessionKey?: string): string { |
|
const lines = [ |
|
"[Orchestrator mode is active]", |
|
"- You are the fast front-controller. Keep the first user-facing response short.", |
|
"- Front-controller should not call read/exec/process directly.", |
|
"- Spawn exactly one hidden orchestrator session for this thread via sessions_spawn.", |
|
"- Delegate all real work to that orchestrator via sessions_send.", |
|
"- For live progress, query the hidden orchestrator via session_status/sessions_history.", |
|
"- If user asks for progress/status, query hidden orchestrator first, then answer.", |
|
`- If a system event contains <orchestrator-retry>, immediately respawn the failed task. Retry limit: ${maxRetries}.`, |
|
]; |
|
if (orchestratorSessionKey) { |
|
lines.push(`- Active hidden orchestrator session: ${orchestratorSessionKey}`); |
|
lines.push( |
|
`- Use sessions_send with sessionKey=${orchestratorSessionKey} for new delegated work in this thread.`, |
|
); |
|
} |
|
return lines.join("\n"); |
|
} |
|
|
|
function buildHiddenOrchestratorTask(requesterSessionKey: string, userTask: string): string { |
|
return [ |
|
"[Hidden Thread Orchestrator]", |
|
`requesterSessionKey: ${requesterSessionKey}`, |
|
"You are a hidden orchestrator for one front-controller thread.", |
|
"Execution contract:", |
|
"1) Break the task into concrete worker subtasks.", |
|
"2) Spawn worker subagents (sessions_spawn) for tool/file/shell/web work.", |
|
"3) Track worker status (pending/running/succeeded/failed) and retry failed workers only when useful.", |
|
"4) Keep progress updates concise and structured so requester can poll status.", |
|
"5) Send progress updates back to requester via sessions_send as useful.", |
|
"6) When all workers finish, send a final summary to requester via sessions_send with completed changes, failures, and next actions.", |
|
"7) Do not run read/write/edit/exec/process yourself unless delegation repeatedly fails.", |
|
"", |
|
"Primary task from front-controller:", |
|
userTask, |
|
].join("\n"); |
|
} |
|
|
|
function buildDelegationHint(toolName: string, params: unknown): string { |
|
const preview = taskPreview(JSON.stringify(params), 260); |
|
return [ |
|
`Hidden orchestrator policy blocked '${toolName}'.`, |
|
"Delegate this to a worker using sessions_spawn with a concrete task.", |
|
"Example:", |
|
`sessions_spawn({\"label\":\"worker-${toolName}\",\"task\":\"Use tool ${toolName} with params ${preview}. Return concise findings and artifacts.\"})`, |
|
].join(" "); |
|
} |
|
|
|
function buildRetrySystemEvent( |
|
spec: SpawnSpec, |
|
failedSessionKey: string, |
|
outcome: string, |
|
attempt: number, |
|
maxRetries: number, |
|
): string { |
|
return [ |
|
"<orchestrator-retry>", |
|
`failedChildSession: ${failedSessionKey}`, |
|
`outcome: ${outcome}`, |
|
`attempt: ${attempt}/${maxRetries}`, |
|
"action: Respawn this subagent task immediately via sessions_spawn.", |
|
`task: ${spec.task}`, |
|
`label: ${spec.label ?? ""}`, |
|
`agentId: ${spec.agentId ?? ""}`, |
|
`model: ${spec.model ?? ""}`, |
|
`thinking: ${spec.thinking ?? ""}`, |
|
`mode: ${spec.mode ?? ""}`, |
|
`thread: ${String(spec.thread ?? false)}`, |
|
"</orchestrator-retry>", |
|
].join("\n"); |
|
} |
|
|
|
export default function register(api: OpenClawPluginApi) { |
|
const cfg = asRecord(api.pluginConfig) as PluginConfig; |
|
const injectOrchestratorPrompt = asBoolean(cfg.injectOrchestratorPrompt, true); |
|
const enforceMainDelegation = asBoolean(cfg.enforceMainDelegation, false); |
|
const autoRetryFailedSubagents = asBoolean(cfg.autoRetryFailedSubagents, true); |
|
const maxRetriesPerTask = asPositiveInt(cfg.maxRetriesPerTask, 2); |
|
const orchestratorLabelPrefix = sanitizeLabel(asString(cfg.orchestratorLabelPrefix)) ?? "thread-orchestrator"; |
|
const retryOutcomes = resolveRetryOutcomes(cfg.retryOutcomes); |
|
const allowedMainTools = new Set([ |
|
...DEFAULT_ALLOWED_MAIN_TOOLS, |
|
...(asStringArray(cfg.allowedMainTools) ?? []), |
|
]); |
|
const allowedOrchestratorTools = new Set([ |
|
...DEFAULT_ALLOWED_ORCHESTRATOR_TOOLS, |
|
...(asStringArray(cfg.allowedOrchestratorTools) ?? []), |
|
]); |
|
// Keep front/orchestrator strict even if config accidentally adds direct tools. |
|
for (const toolId of [...allowedMainTools]) { |
|
if (!STRICT_ROUTER_TOOLS.has(toolId)) allowedMainTools.delete(toolId); |
|
} |
|
for (const toolId of [...allowedOrchestratorTools]) { |
|
if (!STRICT_ROUTER_TOOLS.has(toolId)) allowedOrchestratorTools.delete(toolId); |
|
} |
|
|
|
const pendingSpawns = new Map<string, PendingSpawn>(); |
|
const spawnByChildSession = new Map<string, PendingSpawn>(); |
|
const retryCountByTask = new Map<string, number>(); |
|
const orchestratorByRequester = new Map<string, string>(); |
|
const requesterByOrchestrator = new Map<string, string>(); |
|
const orchestratorSessions = new Set<string>(); |
|
const lastUserMessageBySession = new Map<string, string>(); |
|
const lastDelegatedTaskByOrchestrator = new Map<string, string>(); |
|
const blockedAttemptsByRun = new Map<string, number>(); |
|
const workerStateBySession = new Map<string, WorkerState>(); |
|
const workersByOrchestrator = new Map<string, Set<string>>(); |
|
|
|
api.on("before_prompt_build", async (event, ctx) => { |
|
if (ctx.sessionKey) { |
|
const latestUser = extractLastUserMessage(event.messages); |
|
if (latestUser && !isOrchestratorEchoMessage(latestUser)) { |
|
lastUserMessageBySession.set(ctx.sessionKey, latestUser); |
|
} |
|
} |
|
if (!injectOrchestratorPrompt) return; |
|
if (!isFrontControllerSessionKey(ctx.sessionKey)) return; |
|
const knownOrchestrator = ctx.sessionKey ? orchestratorByRequester.get(ctx.sessionKey) : undefined; |
|
let statusBlock = ""; |
|
if (knownOrchestrator) { |
|
const workerKeys = workersByOrchestrator.get(knownOrchestrator); |
|
const workers = workerKeys |
|
? [...workerKeys].map((key) => workerStateBySession.get(key)).filter((item): item is WorkerState => Boolean(item)) |
|
: []; |
|
if (workers.length > 0) { |
|
statusBlock = `\n[Orchestrator status snapshot]\n${buildWorkerStatusSnapshot(knownOrchestrator, workers)}\n`; |
|
} |
|
} |
|
return { |
|
prependContext: `${buildOrchestratorPrompt(maxRetriesPerTask, knownOrchestrator)}${statusBlock}`, |
|
}; |
|
}); |
|
|
|
api.on("before_tool_call", async (event, ctx) => { |
|
const sessionKey = ctx.sessionKey; |
|
const isOrchestratorSession = sessionKey ? orchestratorSessions.has(sessionKey) : false; |
|
const isFrontSession = isFrontControllerSessionKey(sessionKey) && !isOrchestratorSession; |
|
|
|
if (enforceMainDelegation && isFrontSession) { |
|
if (!allowedMainTools.has(event.toolName)) { |
|
const orchestratorSession = sessionKey ? orchestratorByRequester.get(sessionKey) : undefined; |
|
const suffix = orchestratorSession |
|
? ` Use sessions_send with sessionKey='${orchestratorSession}' to delegate work.` |
|
: " Use sessions_spawn to start a hidden orchestrator first."; |
|
return { |
|
block: true, |
|
blockReason: `Front-controller session tool '${event.toolName}' blocked by orchestrator policy.${suffix}`, |
|
}; |
|
} |
|
} |
|
|
|
if (isOrchestratorSession && !allowedOrchestratorTools.has(event.toolName)) { |
|
const blockKey = `${event.runId ?? "run"}|${sessionKey ?? "session"}|${event.toolName}`; |
|
const blockedAttempts = (blockedAttemptsByRun.get(blockKey) ?? 0) + 1; |
|
blockedAttemptsByRun.set(blockKey, blockedAttempts); |
|
if (blockedAttempts >= 3) { |
|
api.logger.warn( |
|
`orchestrator: allowing direct tool fallback after repeated policy blocks (${event.toolName}, attempts=${blockedAttempts})`, |
|
); |
|
} else { |
|
const hint = buildDelegationHint(event.toolName, event.params); |
|
if (sessionKey) lastDelegatedTaskByOrchestrator.set(sessionKey, hint); |
|
return { |
|
block: true, |
|
blockReason: hint, |
|
}; |
|
} |
|
} |
|
|
|
if (!sessionKey) return; |
|
|
|
if (isOrchestratorSession && event.toolName === "sessions_send") { |
|
const requesterSession = requesterByOrchestrator.get(sessionKey); |
|
if (requesterSession) { |
|
const params = asRecord(event.params); |
|
const hasSessionKey = Boolean(asString(params.sessionKey)); |
|
const hasLabel = Boolean(asString(params.label)); |
|
if (!hasSessionKey && !hasLabel) { |
|
const msg = asString(params.message); |
|
if (msg) lastDelegatedTaskByOrchestrator.set(sessionKey, msg); |
|
return { |
|
params: { |
|
...params, |
|
sessionKey: requesterSession, |
|
}, |
|
}; |
|
} |
|
} |
|
} |
|
|
|
if (isFrontSession && event.toolName === "sessions_send") { |
|
const params = asRecord(event.params); |
|
const knownOrchestrator = orchestratorByRequester.get(sessionKey); |
|
const sessionTarget = asString(params.sessionKey); |
|
if (knownOrchestrator && (!sessionTarget || sessionTarget === knownOrchestrator)) { |
|
const msg = asString(params.message); |
|
if (msg) lastDelegatedTaskByOrchestrator.set(knownOrchestrator, msg); |
|
} |
|
} |
|
|
|
if (isFrontSession && (event.toolName === "session_status" || event.toolName === "sessions_history")) { |
|
const knownOrchestrator = orchestratorByRequester.get(sessionKey); |
|
if (knownOrchestrator) { |
|
const params = asRecord(event.params); |
|
const hasSessionKey = Boolean(asString(params.sessionKey)); |
|
const hasLabel = Boolean(asString(params.label)); |
|
if (!hasSessionKey && !hasLabel) { |
|
return { |
|
params: { |
|
...params, |
|
sessionKey: knownOrchestrator, |
|
}, |
|
}; |
|
} |
|
} |
|
} |
|
|
|
if (event.toolName !== "sessions_spawn") return; |
|
|
|
const params = asRecord(event.params); |
|
const rawTask = asString(params.task); |
|
const knownOrchestrator = orchestratorByRequester.get(sessionKey); |
|
const delegatedTask = isOrchestratorSession ? lastDelegatedTaskByOrchestrator.get(sessionKey) : undefined; |
|
const userFallbackTask = lastUserMessageBySession.get(sessionKey); |
|
const task = |
|
rawTask ?? |
|
delegatedTask ?? |
|
userFallbackTask ?? |
|
(isOrchestratorSession |
|
? "Delegate the current thread objective to a worker and report results." |
|
: "Initialize hidden thread orchestrator and continue the current user objective."); |
|
|
|
let nextParams: Record<string, unknown> = { ...params, task }; |
|
let pendingKind: "orchestrator" | "worker" = "worker"; |
|
|
|
if (isFrontSession) { |
|
if (knownOrchestrator) { |
|
return { |
|
block: true, |
|
blockReason: `Hidden orchestrator already active (${knownOrchestrator}). Use sessions_send to delegate additional work and session_status/sessions_history to poll progress.`, |
|
}; |
|
} |
|
const defaultLabel = `${orchestratorLabelPrefix}-${shortHash(sessionKey)}`; |
|
nextParams = { |
|
...nextParams, |
|
label: sanitizeLabel(asString(params.label)) ?? defaultLabel, |
|
task: buildHiddenOrchestratorTask(sessionKey, task), |
|
thread: true, |
|
}; |
|
pendingKind = "orchestrator"; |
|
} |
|
|
|
const effectiveTask = asString(nextParams.task) ?? task; |
|
const spec: SpawnSpec = { |
|
task: effectiveTask, |
|
label: asString(nextParams.label), |
|
agentId: asString(nextParams.agentId), |
|
model: asString(nextParams.model), |
|
thinking: asString(nextParams.thinking), |
|
mode: asString(nextParams.mode), |
|
thread: typeof nextParams.thread === "boolean" ? nextParams.thread : undefined, |
|
}; |
|
|
|
const pendingKey = makePendingKey(event.runId, event.toolCallId, sessionKey); |
|
pendingSpawns.set(pendingKey, { |
|
requesterSessionKey: sessionKey, |
|
spec, |
|
kind: pendingKind, |
|
}); |
|
|
|
if (pendingKind === "orchestrator" || !rawTask) { |
|
return { params: nextParams }; |
|
} |
|
}); |
|
|
|
api.on("after_tool_call", async (event, ctx) => { |
|
if (event.toolName !== "sessions_spawn") return; |
|
|
|
const pendingKey = makePendingKey(event.runId, event.toolCallId, ctx.sessionKey); |
|
const pending = pendingSpawns.get(pendingKey); |
|
if (!pending) return; |
|
pendingSpawns.delete(pendingKey); |
|
|
|
if (!isSpawnAccepted(event.result)) { |
|
api.logger.warn( |
|
`orchestrator: sessions_spawn was not accepted for ${pending.requesterSessionKey}; skipping bind/worker tracking`, |
|
); |
|
return; |
|
} |
|
|
|
const childSessionKey = extractChildSessionKey(event.result); |
|
if (!childSessionKey) { |
|
api.logger.warn( |
|
`orchestrator: sessions_spawn accepted without childSessionKey for ${pending.requesterSessionKey}`, |
|
); |
|
return; |
|
} |
|
|
|
spawnByChildSession.set(childSessionKey, pending); |
|
|
|
if (pending.kind === "orchestrator") { |
|
orchestratorSessions.add(childSessionKey); |
|
orchestratorByRequester.set(pending.requesterSessionKey, childSessionKey); |
|
requesterByOrchestrator.set(childSessionKey, pending.requesterSessionKey); |
|
api.logger.info( |
|
`orchestrator: bound hidden orchestrator ${childSessionKey} -> requester ${pending.requesterSessionKey}`, |
|
); |
|
return; |
|
} |
|
|
|
const orchestratorSessionKey = pending.requesterSessionKey; |
|
const startedAtMs = Date.now(); |
|
const workerState: WorkerState = { |
|
childSessionKey, |
|
orchestratorSessionKey, |
|
label: pending.spec.label, |
|
taskPreview: taskPreview(pending.spec.task), |
|
status: "running", |
|
startedAtMs, |
|
updatedAtMs: startedAtMs, |
|
}; |
|
workerStateBySession.set(childSessionKey, workerState); |
|
const workerSet = workersByOrchestrator.get(orchestratorSessionKey) ?? new Set<string>(); |
|
workerSet.add(childSessionKey); |
|
workersByOrchestrator.set(orchestratorSessionKey, workerSet); |
|
api.logger.info( |
|
`orchestrator: worker spawned ${childSessionKey} under ${orchestratorSessionKey} (${workerState.label ?? "no-label"})`, |
|
); |
|
}); |
|
|
|
api.on("subagent_spawned", async (event, ctx) => { |
|
const childSessionKey = event.childSessionKey; |
|
const requesterSessionKey = ctx.requesterSessionKey; |
|
if (!childSessionKey || !requesterSessionKey) return; |
|
if (!orchestratorSessions.has(requesterSessionKey)) return; |
|
const state = workerStateBySession.get(childSessionKey); |
|
if (!state) return; |
|
state.updatedAtMs = Date.now(); |
|
workerStateBySession.set(childSessionKey, state); |
|
}); |
|
|
|
api.on("subagent_delivery_target", async (event) => { |
|
const requesterSessionKey = event.requesterSessionKey; |
|
if (!requesterSessionKey) return; |
|
if (!orchestratorSessions.has(requesterSessionKey)) return; |
|
const state = workerStateBySession.get(event.childSessionKey); |
|
if (!state) return; |
|
state.updatedAtMs = Date.now(); |
|
workerStateBySession.set(event.childSessionKey, state); |
|
}); |
|
|
|
api.on("subagent_ended", async (event) => { |
|
if (event.targetKind !== "subagent") return; |
|
|
|
const childSessionKey = event.targetSessionKey; |
|
const workerState = workerStateBySession.get(childSessionKey); |
|
if (workerState) { |
|
const status = (asString(event.outcome)?.toLowerCase() ?? "unknown") as WorkerStatus; |
|
workerState.status = status; |
|
workerState.outcome = asString(event.outcome); |
|
workerState.updatedAtMs = Date.now(); |
|
workerStateBySession.set(childSessionKey, workerState); |
|
} |
|
|
|
if (orchestratorSessions.has(childSessionKey)) { |
|
orchestratorSessions.delete(childSessionKey); |
|
const requester = requesterByOrchestrator.get(childSessionKey); |
|
requesterByOrchestrator.delete(childSessionKey); |
|
if (requester && orchestratorByRequester.get(requester) === childSessionKey) { |
|
orchestratorByRequester.delete(requester); |
|
} |
|
const workerKeys = workersByOrchestrator.get(childSessionKey); |
|
if (workerKeys) { |
|
for (const workerKey of workerKeys) workerStateBySession.delete(workerKey); |
|
workersByOrchestrator.delete(childSessionKey); |
|
} |
|
api.logger.info(`orchestrator: hidden orchestrator ended ${childSessionKey}`); |
|
} |
|
|
|
if (!autoRetryFailedSubagents) return; |
|
const pending = spawnByChildSession.get(childSessionKey); |
|
if (!pending) return; |
|
if (pending.kind === "orchestrator") { |
|
spawnByChildSession.delete(childSessionKey); |
|
return; |
|
} |
|
|
|
const outcome = asString(event.outcome)?.toLowerCase() ?? "unknown"; |
|
if (!retryOutcomes.has(outcome as RetryOutcome)) { |
|
spawnByChildSession.delete(childSessionKey); |
|
return; |
|
} |
|
|
|
const taskKey = stableTaskKey(pending.requesterSessionKey, pending.spec); |
|
const retries = retryCountByTask.get(taskKey) ?? 0; |
|
if (retries >= maxRetriesPerTask) { |
|
api.logger.warn( |
|
`orchestrator: retry budget exhausted (${retries}/${maxRetriesPerTask}) for ${childSessionKey}`, |
|
); |
|
spawnByChildSession.delete(childSessionKey); |
|
return; |
|
} |
|
|
|
const nextAttempt = retries + 1; |
|
retryCountByTask.set(taskKey, nextAttempt); |
|
|
|
const eventText = buildRetrySystemEvent( |
|
pending.spec, |
|
childSessionKey, |
|
outcome, |
|
nextAttempt, |
|
maxRetriesPerTask, |
|
); |
|
const queued = api.runtime.system.enqueueSystemEvent(eventText, { |
|
sessionKey: pending.requesterSessionKey, |
|
}); |
|
|
|
if (!queued) { |
|
api.logger.warn( |
|
`orchestrator: failed to queue retry event for requester session ${pending.requesterSessionKey}`, |
|
); |
|
return; |
|
} |
|
|
|
api.runtime.system.requestHeartbeatNow({ |
|
reason: "orchestrator-subagent-retry", |
|
sessionKey: pending.requesterSessionKey, |
|
}); |
|
|
|
api.logger.info( |
|
`orchestrator: queued retry ${nextAttempt}/${maxRetriesPerTask} for subagent ${childSessionKey}`, |
|
); |
|
}); |
|
|
|
api.on("session_end", async (event) => { |
|
const sessionKey = event.sessionKey; |
|
if (!sessionKey) return; |
|
|
|
const knownOrchestrator = orchestratorByRequester.get(sessionKey); |
|
if (knownOrchestrator) { |
|
orchestratorByRequester.delete(sessionKey); |
|
requesterByOrchestrator.delete(knownOrchestrator); |
|
orchestratorSessions.delete(knownOrchestrator); |
|
const workerKeys = workersByOrchestrator.get(knownOrchestrator); |
|
if (workerKeys) { |
|
for (const workerKey of workerKeys) workerStateBySession.delete(workerKey); |
|
workersByOrchestrator.delete(knownOrchestrator); |
|
} |
|
} |
|
|
|
if (orchestratorSessions.has(sessionKey)) { |
|
orchestratorSessions.delete(sessionKey); |
|
const requester = requesterByOrchestrator.get(sessionKey); |
|
requesterByOrchestrator.delete(sessionKey); |
|
if (requester && orchestratorByRequester.get(requester) === sessionKey) { |
|
orchestratorByRequester.delete(requester); |
|
} |
|
const workerKeys = workersByOrchestrator.get(sessionKey); |
|
if (workerKeys) { |
|
for (const workerKey of workerKeys) workerStateBySession.delete(workerKey); |
|
workersByOrchestrator.delete(sessionKey); |
|
} |
|
} |
|
|
|
workerStateBySession.delete(sessionKey); |
|
lastUserMessageBySession.delete(sessionKey); |
|
lastDelegatedTaskByOrchestrator.delete(sessionKey); |
|
}); |
|
} |