Skip to content

Instantly share code, notes, and snippets.

@koistya
Last active December 4, 2025 20:23
Show Gist options
  • Select an option

  • Save koistya/98220aea16d133f67bc8b3cb056bb288 to your computer and use it in GitHub Desktop.

Select an option

Save koistya/98220aea16d133f67bc8b3cb056bb288 to your computer and use it in GitHub Desktop.
Proposal: Router-Level `onOpen` / `onClose` with Typed Context (https://github.com/kriasoft/ws-kit)

Proposal: Router-Level onOpen / onClose with Typed Context

Status: Implemented Related Issue: #93 (Typed send missing in onOpen) Scope: Core + Adapters + Docs

Problem

  • Docs and examples show onOpen(ctx) { ctx.send(...) }, but adapters currently expose { data, ws } only.
  • ADR-035 says adapter hooks are observability-only and sync-only.
  • Users need typed welcome/init messages and initial topic subscriptions on connect.
  • Adapter-level send would couple transports to router internals and bypass plugin/middleware pipelines.

Decision

Add router-level lifecycle hooks with full typed context:

  • Add router.onOpen(handler) and router.onClose(handler) that receive capability-gated context (send, publish, topics when plugins installed).
  • Keep adapter hooks separate and minimal ({ data, ws }, sync, observability-only).
  • Do not overload serve({ onOpen }) to avoid type conflicts and breaking changes.

Non-Goals

  • No typed send in adapter hooks. Adapters remain mechanical bridges per ADR-031/035.
  • No serve() sugar. Users call router.onOpen() directly; this avoids type collisions and hidden indirection.
  • No short aliases. router.open() / router.close() would collide with TestRouter.close(): Promise<void> and imply imperative actions rather than event registration.

Rationale

  • Preserves adapter-as-bridge invariant (ADR-031/035); no schema or behavior leaks into adapters.
  • Delivers the DX users expect (typed welcome messages, initial subscriptions) without losing validation/pubsub/plugin behavior.
  • Portable across adapters (Bun, Cloudflare, Node, etc.) because lifecycle lives in the router.
  • Aligns code with existing spec examples and removes the doc/impl mismatch.

API Design

Router-Level Lifecycle

const router = createRouter()
  .plugin(withZod())
  .plugin(withPubSub({ adapter: memoryPubSub() }));

router.onOpen(async (ctx) => {
  // Full typed context like message handlers
  ctx.send(WelcomeMessage, { greeting: "Hello" });

  if (ctx.data.userId) {
    await ctx.topics.subscribe(`user:${ctx.data.userId}`);
  }
});

router.onClose((ctx) => {
  // publish still works (broadcast to others)
  ctx.publish("presence", UserLeftMessage, { userId: ctx.data.userId });
  // send is NOT available (socket already closing)
});

serve(router, { port: 3000 });

Adapter-Level Hooks (Unchanged)

serve(router, {
  port: 3000,
  // Adapter hooks: observability only, sync, minimal context
  onOpen: ({ data, ws }) => {
    console.log(`Connected: ${data.clientId}`);
    metrics.increment("connections");
  },
  onClose: ({ data, ws }) => {
    console.log(`Disconnected: ${data.clientId}`);
    metrics.decrement("connections");
  },
});

Documentation note: All examples of welcome messages and topic subscriptions use router.onOpen(). Adapter hooks (serve({ onOpen })) are reserved for logging/metrics only.

Context Types

OpenContext (capability-gated):

interface OpenContext<TContext, TExtensions> {
  clientId: string;
  data: TContext;
  connectedAt: number;              // Connection timestamp (ms since epoch)
  ws: ServerWebSocket;              // Escape hatch for raw socket access
  assignData: (partial) => void;    // Update connection data (e.g., upgrade anonymous → authenticated)

  // Present when validation plugin installed:
  send?: (schema, payload) => void;

  // Present when pubsub plugin installed:
  publish?: (topic, schema, payload) => Promise<PublishResult>;
  topics?: { subscribe, unsubscribe, list, has };
}

CloseContext (no send):

interface CloseContext<TContext, TExtensions> {
  clientId: string;
  data: TContext;
  code?: number;                    // WebSocket close code
  reason?: string;                  // Close reason
  ws: ServerWebSocket;              // Socket is CLOSING or CLOSED

  // Present when pubsub plugin installed:
  publish?: (topic, schema, payload) => Promise<PublishResult>;
  topics?: { list, has };           // Read-only: no subscribe/unsubscribe (cleanup is automatic)
}

Why no send in onClose?

By the time onClose fires, the WebSocket is CLOSING or CLOSED. Including send would create a foot-gun where users expect messages to deliver. publish is still useful to broadcast "user left" to others.

Why read-only topics in onClose?

Topic cleanup is automatic on connection close. The topics object in CloseContext is for debugging/metrics (checking what the connection was subscribed to). No subscribe/unsubscribe since the connection is terminating.

Lifecycle Ordering

HTTP Upgrade Request
    │
    ▼
authenticate(req) → returns TContext or undefined
    │
    ▼
adapter.onUpgrade?(req)          [sync, observability, before accept]
    │
    ▼
WebSocket OPEN (Bun/Cloudflare level)
    │
    ▼
router.onOpen(ctx) handlers      [async, full context, AWAITED]
    │                            (can throw CloseError to reject)
    ▼
adapter.onOpen({ data, ws })     [sync, observability, ALWAYS fires]
    │
    ▼
Message dispatch begins          [only after onOpen completes]
    │
    ▼
...connection active...
    │
    ▼
Close notification received      [socket entering CLOSING state]
    │
    ▼
router.onClose(ctx) handlers     [async, during close notification]
    │
    ▼
adapter.onClose({ data, ws })    [sync, observability, ALWAYS fires]
    │
    ▼
Connection fully torn down

Key guarantees:

  1. router.onOpen completes before message dispatch. Users can safely assume subscriptions are set up before handlers run.
  2. router.onClose runs during close notification. The socket is at least CLOSING, may already be CLOSED. You cannot rely on sending frames here—use publish for "user left" broadcasts.
  3. Adapter hooks fire for all successfully upgraded connections. If upgrade fails or authenticate rejects, there is no onOpen/onClose.
  4. Adapter hooks always fire (for upgraded connections). Even if a lifecycle handler throws, adapter hooks run for observability.
  5. Ordering is deterministic. Router lifecycle runs first, then adapter hooks, in both open and close flows.

Reserved System Types

Lifecycle events use reserved system types that validation plugins skip:

// packages/core/src/schema/reserved.ts
export const SYSTEM_LIFECYCLE = {
  OPEN: "$ws:open",
  CLOSE: "$ws:close",
} as const;

Namespace reservation: The $ws: prefix is reserved for internal system events. User-defined message types must not start with $ws:. Schema definition functions (message(), rpc()) enforce this at runtime:

// Throws: "Message type cannot start with '$ws:' (reserved for system events)"
const BadMessage = message("$ws:custom", { ... });

Validation plugins check ctx.type and skip validation for system events:

if (typeof ctx.type === "string" && ctx.type.startsWith("$ws:")) {
  return; // Skip validation for system events
}

Plugin Integration

Plugins register lifecycle handlers via the normal router.onOpen() API during installation.

Ordering invariant: Lifecycle handlers run in registration order. To ensure plugins can set up infrastructure before user handlers run:

// Recommended: install plugins before registering lifecycle handlers
const router = createRouter()
  .plugin(withZod())           // Plugin's onOpen registered here
  .plugin(withPubSub(...))     // Plugin's onOpen registered here
  .onOpen(userHandler);        // User handler runs after plugin handlers
// In withPubSub plugin
install(router) {
  router.onOpen((ctx) => {
    // Track connection for topic management (runs before user handlers)
    clientRegistry.add(ctx.clientId, ctx.ws);
  });

  router.onClose((ctx) => {
    // Clean up subscriptions (runs before user handlers)
    clientRegistry.remove(ctx.clientId);
  });
}

Note: If user calls .onOpen() before .plugin(), user handlers will run before plugin handlers. This is usually not what you want. Document this ordering clearly.

Error Handling

Errors in lifecycle handlers flow through router.onError():

router.onError((err, ctx) => {
  console.error(`Error in ${ctx?.type ?? "lifecycle"}:`, err);
});

Custom close codes with CloseError:

import { CloseError } from "@ws-kit/core";

router.onOpen(async (ctx) => {
  const user = await validateToken(ctx.data.token);
  if (!user) {
    // Close with custom code and reason
    throw new CloseError(4401, "Invalid token");
  }
});

Behavior:

  • Unhandled errors in onOpen close the connection with code 1011 (internal error).
  • CloseError in onOpen closes with the specified code and reason.
  • Unhandled errors in onClose are logged but don't affect close (already closing).
  • Errors thrown from router.onError itself are caught and logged to console; they never propagate to the event loop.
  • Errors never crash the process.

Type Signatures

// Base context (always present)
interface BaseOpenContext<TContext> {
  clientId: string;
  data: TContext;
  connectedAt: number;
  ws: ServerWebSocket;
  assignData: (partial: Partial<TContext>) => void;
}

interface BaseCloseContext<TContext> {
  clientId: string;
  data: TContext;
  code?: number;
  reason?: string;
  ws: ServerWebSocket;
}

// Capability-gated context derivation
type OpenContext<TContext, TExtensions> =
  BaseOpenContext<TContext> &
  (HasCapability<TExtensions, "validation"> extends true
    ? { send: SendFunction<TExtensions> }
    : {}) &
  (HasCapability<TExtensions, "pubsub"> extends true
    ? { publish: PublishFunction; topics: TopicsAPI }
    : {});

type CloseContext<TContext, TExtensions> =
  BaseCloseContext<TContext> &
  (HasCapability<TExtensions, "pubsub"> extends true
    ? { publish: PublishFunction; topics: Pick<TopicsAPI, "list" | "has"> }
    : {});

// Error context for lifecycle errors
interface LifecycleErrorContext {
  type: "$ws:open" | "$ws:close";
  clientId: string;
  data: unknown;
}

// Router lifecycle methods (capability-gated via Omit pattern)
interface RouterCore<TContext> {
  onOpen(handler: (ctx: BaseOpenContext<TContext>) => void | Promise<void>): this;
  onClose(handler: (ctx: BaseCloseContext<TContext>) => void | Promise<void>): this;
  onError(handler: (err: Error, ctx?: LifecycleErrorContext) => void): this;
}

// When plugins are installed, Omit base methods and add capability-gated versions
type RouterWithExtensions<TContext, TExtensions> =
  Omit<RouterCore<TContext>, "onOpen" | "onClose"> & {
    onOpen(handler: (ctx: OpenContext<TContext, TExtensions>) => void | Promise<void>): this;
    onClose(handler: (ctx: CloseContext<TContext, TExtensions>) => void | Promise<void>): this;
    onError(handler: (err: Error, ctx?: LifecycleErrorContext) => void): this;
  };

Implementation Plan

  1. Core types: Add BaseOpenContext, BaseCloseContext, OpenContext, CloseContext, LifecycleErrorContext to packages/core/src/context/lifecycle-context.ts.
  2. System constants: Export SYSTEM_LIFECYCLE from packages/core/src/schema/reserved.ts.
  3. CloseError: Add CloseError class to packages/core/src/error.ts and export from package root.
  4. Router API: Add onOpen(), onClose(), onError() to RouterCore and RouterImpl.
  5. Namespace enforcement: Update message() and rpc() to reject types starting with $ws:.
  6. Lifecycle dispatch: In handleOpen/handleClose, build full context via createOpenContext/createCloseContext with system type, run handlers sequentially.
  7. Validation bypass: Update withZod/withValibot to skip validation for $ws:* types.
  8. Capability gating: Use Omit pattern in type helpers so ctx.send/ctx.topics appear based on installed plugins.
  9. Tests: Add packages/core/test/features/lifecycle-hooks.test.ts.
  10. Docs: Update docs/specs/router.md, add migration note.
  11. Adapter hooks: Update adapters to ALWAYS fire onOpen/onClose hooks for observability, even if router lifecycle throws.

Migration / Compatibility

This is additive with one minor breaking change:

  • Existing adapter hooks (serve({ onOpen })) continue to work exactly as before with { data, ws } context.
  • New router.onOpen() API is opt-in.
  • Users wanting typed sends/subscriptions call router.onOpen() before passing to serve().

Breaking: The $ws: prefix is now reserved. Any existing schemas using this prefix will fail at definition time. This is expected to affect zero users, but technically requires a minor version bump.

Migration path:

// Before (raw ws.send, no type safety)
serve(router, {
  onOpen: ({ ws }) => {
    ws.send(JSON.stringify({ type: "WELCOME", payload: { greeting: "Hello" } }));
  },
});

// After (typed, capability-aware)
router.onOpen((ctx) => {
  ctx.send(WelcomeMessage, { greeting: "Hello" });
});
serve(router, { port: 3000 });

Test Cases

  1. router.onOpen runs after authenticatectx.data is populated.
  2. router.onOpen gets capabilitiesctx.send/ctx.topics present when plugins installed; type tests assert this.
  3. Messages wait for onOpen — Incoming messages are not dispatched until onOpen completes.
  4. router.onClose runs during close notificationctx.publish succeeds.
  5. onClose fires on abrupt disconnect — Even without graceful close frame.
  6. Errors route to router.onError — Lifecycle errors invoke error handler.
  7. Error in onOpen closes connection — Unhandled error closes with 1011; CloseError uses custom code.
  8. Error in router.onError is swallowed — Never propagates to event loop.
  9. Handler ordering is registration order — Plugin handlers registered first run first.
  10. Adapter hooks fire even if lifecycle throwsserve({ onOpen, onClose }) callbacks still run.
  11. $ws: prefix rejected in schemamessage("$ws:foo", {...}) throws at definition time.
  12. Hooks only fire for upgraded connections — Failed upgrade or auth rejection doesn't trigger hooks.

Resolved Questions

  1. topics.subscribe in onOpen is awaited — This delays message dispatch but ensures subscriptions are set up. Document this trade-off in specs.
  2. No short aliasesrouter.open() / router.close() would collide with TestRouter.close() and imply imperative actions. Stick with onOpen / onClose.

References

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment