Last active
February 28, 2026 07:50
-
-
Save rebolyte/e4c5f9e6a34fa82d1ac3ae6289c5ca5d to your computer and use it in GitHub Desktop.
Actor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| export const Done = Symbol("Done"); | |
| export type CrashDirective = "resume" | "reset" | "stop" | "escalate"; | |
| export type ActorRef<M> = { | |
| send(message: M): void; | |
| stop(): void; | |
| readonly name: string; | |
| }; | |
| export type Ctx<M> = { | |
| self: ActorRef<M>; | |
| spawn: <S2, M2>(config: SpawnConfig<S2, M2>) => ActorRef<M2>; | |
| lookup: <M2>(name: string) => ActorRef<M2>; | |
| }; | |
| export type SpawnConfig<S, M> = { | |
| name: string; | |
| initialState: S; | |
| onMessage: (state: S, message: M, ctx: Ctx<M>) => Promise<S | typeof Done>; | |
| onCrash?: (error: unknown, message: M, ctx: Ctx<M>) => CrashDirective; | |
| onDone?: (ctx: Ctx<M>) => void; | |
| }; | |
| export type ActorSystem = { | |
| spawn: <S, M>(config: SpawnConfig<S, M>) => ActorRef<M>; | |
| lookup: <M>(name: string) => ActorRef<M>; | |
| stop(): void; | |
| }; | |
| export function createSystem(): ActorSystem { | |
| const registry = new Map<string, ActorRef<any>>(); | |
| const topLevel = new Set<ActorRef<any>>(); | |
| let stopped = false; | |
| function lookup<M>(name: string): ActorRef<M> { | |
| const ref = registry.get(name); | |
| if (!ref) throw new Error(`Actor not found: ${name}`); | |
| return ref; | |
| } | |
| function spawnActor<S, M>( | |
| config: SpawnConfig<S, M>, | |
| parent: Set<ActorRef<any>>, | |
| onEscalate?: (error: unknown) => void, | |
| ): ActorRef<M> { | |
| if (registry.has(config.name)) { | |
| throw new Error(`Actor already exists: ${config.name}`); | |
| } | |
| let state = config.initialState; | |
| let processing = false; | |
| let alive = true; | |
| const queue: M[] = []; | |
| const children = new Set<ActorRef<any>>(); | |
| function handleChildEscalation(error: unknown) { | |
| // child escalated to us — run our own onCrash as if we crashed | |
| const directive = config.onCrash?.(error, undefined as any, ctx) ?? "stop"; | |
| applyDirective(directive, error); | |
| } | |
| function applyDirective(directive: CrashDirective, error: unknown) { | |
| switch (directive) { | |
| case "resume": | |
| // keep state, keep going — drop the bad message | |
| break; | |
| case "reset": | |
| // restart with initial state, keep children, keep mailbox | |
| state = config.initialState; | |
| break; | |
| case "stop": | |
| self.stop(); | |
| break; | |
| case "escalate": | |
| if (onEscalate) { | |
| onEscalate(error); | |
| } else { | |
| // top-level with no handler — stop | |
| console.error(`Unhandled escalation in actor "${config.name}":`, error); | |
| self.stop(); | |
| } | |
| break; | |
| } | |
| } | |
| const self: ActorRef<M> = { | |
| name: config.name, | |
| send(message: M) { | |
| if (!alive) return; // dead letter | |
| queue.push(message); | |
| drain(); | |
| }, | |
| stop() { | |
| if (!alive) return; | |
| alive = false; | |
| for (const child of children) child.stop(); | |
| children.clear(); | |
| queue.length = 0; | |
| registry.delete(config.name); | |
| parent.delete(self); | |
| }, | |
| }; | |
| const ctx: Ctx<M> = { | |
| self, | |
| spawn: <S2, M2>(childConfig: SpawnConfig<S2, M2>) => | |
| spawnActor(childConfig, children, handleChildEscalation), | |
| lookup, | |
| }; | |
| async function drain() { | |
| if (processing || !alive) return; | |
| processing = true; | |
| while (queue.length > 0 && alive) { | |
| const msg = queue.shift()!; | |
| try { | |
| const result = await config.onMessage(state, msg, ctx); | |
| if (result === Done) { | |
| config.onDone?.(ctx); | |
| self.stop(); | |
| return; | |
| } | |
| state = result as S; | |
| } catch (error) { | |
| const directive = config.onCrash?.(error, msg, ctx) ?? "stop"; | |
| applyDirective(directive, error); | |
| if (!alive) return; | |
| } | |
| } | |
| processing = false; | |
| } | |
| registry.set(config.name, self); | |
| parent.add(self); | |
| return self; | |
| } | |
| return { | |
| spawn: <S, M>(config: SpawnConfig<S, M>) => { | |
| if (stopped) throw new Error("System is stopped"); | |
| return spawnActor(config, topLevel); | |
| }, | |
| lookup, | |
| stop() { | |
| stopped = true; | |
| for (const ref of topLevel) ref.stop(); | |
| topLevel.clear(); | |
| }, | |
| }; | |
| } | |
| // ask: send a message expecting a reply, using a temp actor | |
| export function ask<M, R>( | |
| system: ActorSystem, | |
| target: ActorRef<M>, | |
| factory: (replyTo: ActorRef<R>) => M, | |
| timeoutMs = 5000, | |
| ): Promise<R> { | |
| return new Promise((resolve, reject) => { | |
| const timer = setTimeout(() => { | |
| ref.stop(); | |
| reject(new Error("Ask timed out")); | |
| }, timeoutMs); | |
| const ref = system.spawn<undefined, R>({ | |
| name: `_ask_${crypto.randomUUID()}`, | |
| initialState: undefined, | |
| onMessage: async (_state, reply) => { | |
| clearTimeout(timer); | |
| resolve(reply); | |
| return Done; | |
| }, | |
| }); | |
| target.send(factory(ref)); | |
| }); | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { createSystem, ask, Done, type ActorRef } from "./actors"; | |
| const system = createSystem(); | |
| // -- Example: ping-pong with a counter that stops after 3 -- | |
| type PingMsg = { type: "ping"; replyTo: ActorRef<PongMsg> }; | |
| type PongMsg = { type: "pong"; count: number }; | |
| const ponger = system.spawn<null, PingMsg>({ | |
| name: "ponger", | |
| initialState: null, | |
| onMessage: async (_state, msg) => { | |
| // just reply — stateless | |
| msg.replyTo.send({ type: "pong", count: 1 }); | |
| return null; | |
| }, | |
| }); | |
| const pinger = system.spawn<number, PongMsg>({ | |
| name: "pinger", | |
| initialState: 0, | |
| onMessage: async (count, _msg, ctx) => { | |
| const next = count + 1; | |
| console.log(`pinger: received pong #${next}`); | |
| if (next >= 3) return Done; | |
| ponger.send({ type: "ping", replyTo: ctx.self }); | |
| return next; | |
| }, | |
| onDone: () => console.log("pinger: done after 3 pongs"), | |
| }); | |
| // kick it off | |
| ponger.send({ type: "ping", replyTo: pinger }); | |
| // -- Example: ask pattern -- | |
| type MathMsg = { op: "add"; a: number; b: number; replyTo: ActorRef<number> }; | |
| const calculator = system.spawn<null, MathMsg>({ | |
| name: "calculator", | |
| initialState: null, | |
| onMessage: async (_state, msg) => { | |
| msg.replyTo.send(msg.a + msg.b); | |
| return null; | |
| }, | |
| }); | |
| const result = await ask<MathMsg, number>(system, calculator, (replyTo) => ({ | |
| op: "add", | |
| a: 17, | |
| b: 25, | |
| replyTo, | |
| })); | |
| console.log(`ask result: 17 + 25 = ${result}`); | |
| // -- Example: parent-child hierarchy -- | |
| const parent = system.spawn<null, string>({ | |
| name: "parent", | |
| initialState: null, | |
| onMessage: async (_state, msg, ctx) => { | |
| if (msg === "spawn-child") { | |
| ctx.spawn({ | |
| name: "child", | |
| initialState: 0, | |
| onMessage: async (n, _msg) => { | |
| console.log(`child: message #${n + 1}`); | |
| return n + 1; | |
| }, | |
| }); | |
| } | |
| return null; | |
| }, | |
| }); | |
| parent.send("spawn-child"); | |
| // stopping parent also stops child | |
| setTimeout(() => { | |
| parent.stop(); | |
| console.log("parent stopped (child stopped with it)"); | |
| system.stop(); | |
| }, 100); | |
| // -- fromCallback: wrap setInterval -- | |
| const ticker = fromCallback(system, "ticker", ({ send }) => { | |
| let i = 0; | |
| const id = setInterval(() => send(++i), 1000); | |
| return () => clearInterval(id); // cleanup | |
| }); | |
| setTimeout(() => ticker.stop(), 3500); | |
| // -- fromCallback: bidirectional (WebSocket-style) -- | |
| type WsOut = { type: "send"; data: string }; | |
| type WsIn = { type: "message"; data: string }; | |
| const ws = fromCallback<WsIn, WsOut>(system, "ws", ({ send, receive }) => { | |
| // simulate a socket | |
| const fakeSocket = { | |
| onmessage: null as ((data: string) => void) | null, | |
| send(data: string) { console.log("socket sent:", data); }, | |
| close() { console.log("socket closed"); }, | |
| }; | |
| // inbound: socket → actor system | |
| fakeSocket.onmessage = (data) => send({ type: "message", data }); | |
| // outbound: actor system → socket | |
| receive((msg) => { | |
| if (msg.type === "send") fakeSocket.send(msg.data); | |
| }); | |
| return () => fakeSocket.close(); | |
| }); | |
| // other actors can now send TO the websocket actor | |
| ws.send({ type: "send", data: "hello server" }); | |
| // -- fromPromise: one-shot async work -- | |
| const logger = system.spawn<null, any>({ | |
| name: "logger", | |
| initialState: null, | |
| onMessage: async (_s, msg) => { | |
| console.log("logger received:", msg); | |
| return null; | |
| }, | |
| }); | |
| fromPromise( | |
| system, | |
| "fetch-user", | |
| async () => { | |
| // simulate API call | |
| return { id: 1, name: "James" }; | |
| }, | |
| logger, | |
| ); | |
| // logger receives: { type: "resolved", value: { id: 1, name: "James" } } | |
| // -- fromObservable: any Subscribable shape -- | |
| // minimal observable (works with RxJS, wonka, zen-observable, etc) | |
| const fakeObservable = { | |
| subscribe( | |
| next: (v: string) => void, | |
| _error?: (e: unknown) => void, | |
| complete?: () => void, | |
| ) { | |
| next("hello"); | |
| next("world"); | |
| setTimeout(() => { | |
| next("async one"); | |
| complete?.(); | |
| }, 100); | |
| return { unsubscribe() {} }; | |
| }, | |
| }; | |
| fromObservable(system, "words", fakeObservable, logger); | |
| // logger receives: "hello", "world", "async one" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment