Skip to content

Instantly share code, notes, and snippets.

@stackdump
Last active January 3, 2026 20:25
Show Gist options
  • Select an option

  • Save stackdump/3ad171703354b94d456ce6ae558ea589 to your computer and use it in GitHub Desktop.

Select an option

Save stackdump/3ad171703354b94d456ce6ae558ea589 to your computer and use it in GitHub Desktop.
Dual State Engine Architecture - Go/JS event sourcing template

Dual State Engine Architecture

A template for building deterministic, verifiable systems with parallel Go/JavaScript implementations.

Core Concept

Build complex stateful systems where state is a function of events:

State(t) = fold(apply, initialState, events[0..t])

By implementing identical logic in Go (server) and JavaScript (client), you get:

  • Determinism: Same seed + same actions = same outcome
  • Verification: Server replays client-claimed outcomes
  • Offline-first: Client runs full logic locally, syncs events later
  • Testability: Run thousands of simulations to find edge cases

Quick Start

make build      # Build Go binary
make run        # Server at http://localhost:8080
make test       # Verify Go/JS parity

Project Structure

project/
├── engine/              # Go state engine (authoritative)
│   ├── state.go         # Core state, action processing
│   ├── events.go        # Event types and logging
│   ├── processor.go     # Action → Event pipeline
│   └── projections.go   # Derived views from events
├── server/              # HTTP/WebSocket server
├── streams/             # Reactive event bus
└── cmd/
    ├── main.go          # Entry point
    └── client/          # Browser client (JS mirror)
        ├── state.js     # State logic (mirrors state.go)
        ├── events.js    # Event handling
        └── app.js       # UI, WebSocket, rendering

Architecture

Event Flow

Action → ProcessAction() → State Mutation → Event Logged → Broadcast
                                                ↓
                                           SQLite Store
                                                ↓
                                         Projections (analytics, reports, replays)

Key Principles

  1. Actions are the source of truth — Not state snapshots. Reconstruct from (seed, actions[]).
  2. Separate RNG streams — Deterministic random with isolated seeds for different subsystems.
  3. Monadic pipelines — Clean decision logic using Option/Maybe patterns.
  4. Reactive event bus — Pub/sub decouples subsystems.

Seeded Determinism

Use the same PRNG algorithm in both languages. Mulberry32 works well:

// Go - engine/random.go
func (s *State) deterministicRandom() float64 {
    s.RandSeed += 0x6D2B79F5
    t := s.RandSeed
    t = (t ^ (t >> 15)) * (t | 1)
    t ^= t + (t^(t>>7))*(t|61)
    return float64((t ^ (t >> 14)) & 0x7FFFFFFF) / float64(0x7FFFFFFF)
}
// JavaScript - client/state.js
deterministicRandom() {
    this.randSeed = (this.randSeed + 0x6D2B79F5) >>> 0;
    let t = this.randSeed;
    t = Math.imul(t ^ (t >>> 15), t | 1);
    t = (t ^ (t + Math.imul(t ^ (t >>> 7), t | 61))) >>> 0;
    return ((t ^ (t >>> 14)) & 0x7FFFFFFF) / 0x7FFFFFFF;
}

Monadic Design Patterns

Use samber/mo for Option types and samber/lo for functional pipelines:

import (
    "github.com/samber/lo"
    "github.com/samber/mo"
)

// Option types for clean control flow
type Result = mo.Option[ActionType]

// tryAction creates a Result from a condition
func tryAction(cond bool, action ActionType) Result {
    if cond {
        return mo.Some(action)
    }
    return mo.None[ActionType]()
}

// firstAction returns first Some from lazy thunks (short-circuit evaluation)
func firstAction(thunks ...func() Result) Result {
    for _, thunk := range thunks {
        if result := thunk(); result.IsPresent() {
            return result
        }
    }
    return mo.None[ActionType]()
}

// Functional pipelines with lo
func processEntities(entities []*Entity, px, py int) []entityView {
    return lo.Map(entities, func(e *Entity, _ int) entityView {
        return entityView{entity: e, dist: manhattan(e.X, e.Y, px, py)}
    })
}

// Filter and transform chains
func findTargets(entities []*Entity, criteria func(*Entity) bool) []*Entity {
    return lo.Filter(entities, func(e *Entity, _ int) bool {
        return criteria(e)
    })
}

// Decision pipeline using monadic composition
func decideAction(state *State) ActionType {
    return firstAction(
        func() Result { return tryHandleUrgent(state) },
        func() Result { return tryHandleNormal(state) },
        func() Result { return tryHandleFallback(state) },
    ).OrElse(ActionWait)
}

JavaScript Equivalent

// Option monad in JS
const some = (value) => ({ isPresent: true, value });
const none = () => ({ isPresent: false });

const tryAction = (cond, action) => cond ? some(action) : none();

const firstAction = (...thunks) => {
    for (const thunk of thunks) {
        const result = thunk();
        if (result.isPresent) return result;
    }
    return none();
};

// Functional pipelines
const withDistance = (entities, px, py) =>
    entities.map(e => ({ entity: e, dist: manhattan(e.x, e.y, px, py) }));

const filterBy = (arr, predicate) => arr.filter(predicate);

// Decision pipeline
const decideAction = (state) =>
    firstAction(
        () => tryHandleUrgent(state),
        () => tryHandleNormal(state),
        () => tryHandleFallback(state),
    ).value ?? 'wait';

Reactive Streams

Use pub/sub for event distribution. This decouples subsystems and enables spectators, logging, and analytics.

// streams/streams.go
package streams

import (
    "context"
    "sync"
)

// Event represents a domain event
type Event struct {
    Type      string
    SessionID string
    Timestamp int64
    Data      map[string]interface{}
}

// EventStream provides reactive event streaming
type EventStream struct {
    mu          sync.RWMutex
    subscribers []chan Event
    ctx         context.Context
    cancel      context.CancelFunc
}

func NewEventStream() *EventStream {
    ctx, cancel := context.WithCancel(context.Background())
    return &EventStream{
        subscribers: make([]chan Event, 0),
        ctx:         ctx,
        cancel:      cancel,
    }
}

// Publish sends an event to all subscribers
func (s *EventStream) Publish(event Event) {
    s.mu.RLock()
    defer s.mu.RUnlock()

    for _, ch := range s.subscribers {
        select {
        case ch <- event:
        default:
            // Drop if subscriber is slow (backpressure)
        }
    }
}

// Subscribe returns a channel for receiving events
func (s *EventStream) Subscribe() chan Event {
    ch := make(chan Event, 100)

    s.mu.Lock()
    s.subscribers = append(s.subscribers, ch)
    s.mu.Unlock()

    return ch
}

// Unsubscribe removes a channel from subscribers
func (s *EventStream) Unsubscribe(ch chan Event) {
    s.mu.Lock()
    defer s.mu.Unlock()

    for i, c := range s.subscribers {
        if c == ch {
            s.subscribers = append(s.subscribers[:i], s.subscribers[i+1:]...)
            close(ch)
            return
        }
    }
}

func (s *EventStream) Close() {
    s.cancel()
    s.mu.Lock()
    defer s.mu.Unlock()

    for _, ch := range s.subscribers {
        close(ch)
    }
    s.subscribers = nil
}

Reactive Operators

import "github.com/samber/lo"

// FilterBySession returns events for a specific session
func FilterBySession(sessionID string) func(Event) bool {
    return func(e Event) bool {
        return e.SessionID == sessionID
    }
}

// FilterByTypes returns events matching any of the given types
func FilterByTypes(types ...string) func(Event) bool {
    typeSet := lo.SliceToMap(types, func(t string) (string, bool) {
        return t, true
    })
    return func(e Event) bool {
        _, ok := typeSet[e.Type]
        return ok
    }
}

// Usage
ch := eventStream.Subscribe()
for event := range ch {
    if FilterBySession("session-123")(event) {
        // Handle session-specific event
    }
}

Event Bus (Centralized Hub)

// EventBus is the central hub for all events
type EventBus struct {
    events    *EventStream
    wsStreams map[string]*WSMessageStream
    mu        sync.RWMutex
}

func NewEventBus() *EventBus {
    return &EventBus{
        events:    NewEventStream(),
        wsStreams: make(map[string]*WSMessageStream),
    }
}

func (b *EventBus) Events() *EventStream {
    return b.events
}

func (b *EventBus) BroadcastToSession(sessionID string, msg WSMessage) {
    b.mu.RLock()
    defer b.mu.RUnlock()

    for _, stream := range b.wsStreams {
        stream.Send(msg)
    }
}

Reactive WebSocket Handler

Use the pump pattern for WebSocket connections: separate goroutines for different concerns, unified by context-based lifecycle management.

// server/ws_handler.go
package server

import (
    "context"
    "encoding/json"
    "sync"
    "sync/atomic"

    "github.com/gorilla/websocket"
)

var connectionID atomic.Int64

// ReactiveWSHandler handles a websocket connection with reactive streams
type ReactiveWSHandler struct {
    server  *Server
    conn    *websocket.Conn
    connID  string
    session *Session
    ctx     context.Context
    cancel  context.CancelFunc
    writeMu sync.Mutex  // Protects websocket writes
}

func NewReactiveWSHandler(s *Server, conn *websocket.Conn) *ReactiveWSHandler {
    ctx, cancel := context.WithCancel(context.Background())
    id := connectionID.Add(1)

    return &ReactiveWSHandler{
        server: s,
        conn:   conn,
        connID: fmt.Sprintf("ws-%d", id),
        ctx:    ctx,
        cancel: cancel,
    }
}

The Pump Pattern

Each WebSocket connection runs multiple goroutines ("pumps") for different responsibilities:

// Handle starts all pumps and manages connection lifecycle
func (h *ReactiveWSHandler) Handle(r *http.Request) {
    defer h.cleanup()

    // Restore session from cookie if exists
    if cookie, err := r.Cookie("session"); err == nil {
        h.session = h.server.restoreSession(cookie.Value)
        if h.session != nil {
            h.sendState()
        }
    }

    // Start pumps concurrently
    go h.eventSubscriptionPump()  // Subscribe to domain events
    go h.broadcastPump()          // Forward messages to client

    // Blocking: process incoming messages
    h.incomingPump()
}

Incoming Pump (Client → Server)

Reads messages from WebSocket, dispatches to handlers:

func (h *ReactiveWSHandler) incomingPump() {
    for {
        select {
        case <-h.ctx.Done():
            return
        default:
            _, data, err := h.conn.ReadMessage()
            if err != nil {
                return  // Connection closed
            }

            var msg Message
            if err := json.Unmarshal(data, &msg); err != nil {
                h.sendError("invalid_json", "Invalid message format")
                continue
            }

            h.handleMessage(msg)
        }
    }
}

func (h *ReactiveWSHandler) handleMessage(msg Message) {
    switch msg.Type {
    case "ping":
        h.sendPong()

    case "join":
        var payload JoinPayload
        json.Unmarshal(msg.Payload, &payload)
        h.session = h.server.createSession(payload.Seed)
        h.sendState()

    case "action":
        if h.session == nil {
            h.sendError("no_session", "Join first")
            return
        }
        var payload ActionPayload
        json.Unmarshal(msg.Payload, &payload)
        h.processAction(payload.Action)

    case "spectate":
        var payload SpectatePayload
        json.Unmarshal(msg.Payload, &payload)
        h.spectateSession(payload.SessionID)
    }
}

Event Subscription Pump (Domain Events → Client)

Subscribes to the event bus and forwards relevant events:

func (h *ReactiveWSHandler) eventSubscriptionPump() {
    // Subscribe to domain events
    eventCh := h.server.eventBus.Events().Subscribe()
    defer h.server.eventBus.Events().Unsubscribe(eventCh)

    for {
        select {
        case <-h.ctx.Done():
            return
        case event, ok := <-eventCh:
            if !ok {
                return
            }

            // Filter: only events for our session
            if h.session == nil || event.SessionID != h.session.ID {
                continue
            }

            // Broadcast state on significant events (for spectators)
            switch event.Type {
            case "state_change", "action_processed":
                h.sendState()
            }
        }
    }
}

Thread-Safe Writes

WebSocket writes must be serialized:

func (h *ReactiveWSHandler) writeMessage(msg Message) {
    h.writeMu.Lock()
    defer h.writeMu.Unlock()

    data, err := json.Marshal(msg)
    if err != nil {
        return
    }
    h.conn.WriteMessage(websocket.TextMessage, data)
}

func (h *ReactiveWSHandler) sendState() {
    if h.session == nil {
        return
    }

    h.session.mu.Lock()
    state := h.session.State.GetSnapshot()
    h.session.mu.Unlock()

    payload, _ := json.Marshal(state)
    h.writeMessage(Message{
        Type:      "state",
        Payload:   payload,
        Timestamp: time.Now().UnixMilli(),
    })
}

func (h *ReactiveWSHandler) sendError(code, message string) {
    payload, _ := json.Marshal(map[string]string{
        "code":    code,
        "message": message,
    })
    h.writeMessage(Message{Type: "error", Payload: payload})
}

Action Processing with Event Publishing

Actions mutate state and publish events:

func (h *ReactiveWSHandler) processAction(action string) {
    h.session.mu.Lock()
    h.session.State.ProcessAction(ActionType(action))
    h.session.mu.Unlock()

    // Publish to event bus for reactive broadcasts
    h.server.eventBus.Events().Publish(Event{
        Type:      "action_processed",
        SessionID: h.session.ID,
        Timestamp: time.Now().UnixMilli(),
        Data: map[string]interface{}{
            "action": action,
        },
    })

    h.sendState()
}

Graceful Cleanup

Context cancellation triggers coordinated shutdown:

func (h *ReactiveWSHandler) cleanup() {
    h.cancel()  // Signals all pumps to exit
    h.conn.Close()
}

WebSocket Protocol

// Client → Server
{"type": "join", "payload": {"seed": 12345}}
{"type": "action", "payload": {"action": "submit"}}
{"type": "spectate", "payload": {"session_id": "abc123"}}
{"type": "ping"}

// Server → Client
{"type": "state", "payload": {...}, "timestamp": 1704067200000}
{"type": "error", "payload": {"code": "no_session", "message": "Join first"}}
{"type": "pong", "timestamp": 1704067200000}
{"type": "config", "payload": {"version": "1.0.0"}}

Session Restoration

Sessions survive page reloads via cookies + persistence:

func (h *ReactiveWSHandler) Handle(r *http.Request) {
    // Check for existing session from HttpOnly cookie
    if cookie, err := r.Cookie("session_id"); err == nil {
        // Try memory first
        if session := h.server.getSession(cookie.Value); session != nil {
            h.session = session
        } else {
            // Fall back to SQLite (survives server restart)
            h.session = h.server.restoreFromDB(cookie.Value)
        }

        if h.session != nil {
            h.sendConfig()
            h.sendState()
        }
    }
    // ... start pumps
}

Spectator Support

Multiple clients can watch the same session:

func (h *ReactiveWSHandler) spectateSession(sessionID string) {
    session := h.server.getSession(sessionID)
    if session == nil {
        // Find any active session
        session = h.server.getAnyActiveSession()
    }

    if session != nil {
        h.session = session
        h.sendState()
    } else {
        h.sendError("no_session", "No active session to spectate")
    }
}

Event Sourcing

Event Store Schema

CREATE TABLE events (
    id INTEGER PRIMARY KEY,
    time DATETIME,
    session_id TEXT,
    sequence INTEGER,
    event_type TEXT,
    entity_id TEXT,
    data JSON,
    version TEXT
);

CREATE TABLE sessions (
    session_id TEXT PRIMARY KEY,
    seed INTEGER,
    created_at DATETIME,
    last_activity DATETIME
);

-- Projections (derived from events)
CREATE TABLE projections (
    session_id TEXT,
    projection_type TEXT,
    data JSON,
    updated_at DATETIME
);

Verification Protocol

Clients claim outcomes, servers verify by replay:

func VerifyAndRecord(seed int64, actions []ActionType, claim Claim) (*Result, error) {
    state := NewState(seed)
    for _, action := range actions {
        state.ProcessAction(action)
    }
    if state.Outcome != claim.Outcome {
        return nil, ErrClaimMismatch
    }
    // Record verified result
    return &Result{Verified: true, State: state}, nil
}

Projections

Derive multiple views from the same event stream:

// Aggregate events into analytics
type Stats struct {
    TotalActions int
    ActionCounts map[string]int
}

func AggregateStats(events []Event) Stats {
    stats := Stats{ActionCounts: make(map[string]int)}

    for _, e := range events {
        stats.TotalActions++
        stats.ActionCounts[e.Type]++
    }

    return stats
}

Working With This Architecture

Adding Features

  1. Add to Go engine first (engine/)
  2. Mirror in JavaScript client (cmd/client/)
  3. Ensure determinism: same inputs → same outputs
  4. Add event types if needed

Testing Determinism

func TestDeterminism(t *testing.T) {
    seed := int64(12345)
    actions := []ActionType{ActionA, ActionB, ActionC}

    // Run twice with same inputs
    state1 := NewState(seed)
    state2 := NewState(seed)

    for _, action := range actions {
        state1.ProcessAction(action)
        state2.ProcessAction(action)
    }

    if state1.Hash() != state2.Hash() {
        t.Error("Determinism violation: same inputs produced different states")
    }
}

Cross-Language Parity Tests

# Generate reference outputs from Go
go run ./cmd/testgen > testcases.json

# Verify JavaScript produces same outputs
node cmd/client/test.js testcases.json

Dependencies

Go

require (
    github.com/samber/lo v1.47.0    // Functional utilities
    github.com/samber/mo v1.13.0    // Monads (Option, Result, Either)
    github.com/gorilla/websocket v1.5.3
    github.com/mattn/go-sqlite3 v1.14.24
)

JavaScript

No external dependencies required. Implement monadic helpers inline or use:

  • lodash/fp for functional utilities
  • Custom Option implementation (shown above)

Design Lessons

  1. Dual implementation catches bugs — Differences between Go and JS reveal hidden assumptions.

  2. Seeded RNG is essential — Without determinism, replay and verification are impossible.

  3. Events decouple subsystems — Components don't know about each other; they just emit events.

  4. Functional pipelines are debuggable — Monadic composition makes decision logic traceable.

  5. Projections are cheap — Derive multiple views (stats, replays, heatmaps) from one event stream.

  6. Actions, not snapshots — Store the minimal source of truth; reconstruct state on demand.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment