Skip to content

Instantly share code, notes, and snippets.

@Esya
Created March 8, 2026 17:50
Show Gist options
  • Select an option

  • Save Esya/f13c0d4a54321982ded24f0b291bb62b to your computer and use it in GitHub Desktop.

Select an option

Save Esya/f13c0d4a54321982ded24f0b291bb62b to your computer and use it in GitHub Desktop.
Game Admin Staging System Remediation Plan — SQS + Stream Reduction

Revised Remediation Plan: Game Admin Staging System (v2)

Incorporates findings from claude.rca.md, codex.rca.md, remediation.md, codex-remediation.md, and two rounds of Codex feedback.


Feedback Responses (Round 1)

Feedback #1: Manual "Generate Diff" fallback is asserted but not evidenced

Valid. No generate-diff contract or handler exists. Fixed: generate-diff endpoint is now part of the core release (Step 2), not deferred.

Feedback #2: Manual-write paths are broader than create-override

Valid. Three handlers mutate staging: create-override, create-entity, upload-image. All flow through ManualOverrideService.createOrUpdateOverride(). Fixed: SQS publish is centralized at handler level after all mutations complete (see Round 2 fix below).

Feedback #3: Queue publish should not be in @repo/game-importer

Valid. @repo/game-importer has no @repo/queue dependency. Fixed: publish happens in the Trigger task (import-game.task.ts) after importer.run().

Feedback #4: Duplicate lock-change / diff-change emission risk

Valid. Fixed: pick one authoritative path per event type (see Step 3c).

Feedback #5: 150K countDocuments pairs claim is overstated

Partially valid. Debounce replacement logic means only the last event per 3-second window runs queries. The cost is handler churn (150K invocations scheduling/cancelling timeouts), not 150K query pairs.

Feedback #6: Phase sequencing should avoid a "no auto-diff" gap

Valid. Fixed: Steps 2 and 3 ship as one atomic release.

Feedback #7: Consider single-owner stream execution for remaining streams

Noted, deferred to Step 4. 2 streams on single-digit-write collections is negligible even at N instances.


Feedback Responses (Round 2)

Feedback R2-#1: Diff can be generated before staging data is updated (Critical)

Valid. This was a real bug in the previous plan.

The previous plan put SQS publish inside ManualOverrideService.createOrUpdateOverride(). But in all three handlers, the staging mutation happens after that call:

create-override.handler.ts:
  1. manualOverrideService.createOrUpdateOverride()  ← override saved
  2. stagingService.updateFields()                    ← staging updated

create-entity.handler.ts:
  1. manualOverrideService.createOrUpdateOverride()  ← override saved
  2. stagingService.createEntity()                    ← staging created

upload-image.handler.ts → imageUploadService.uploadImageAndCreateOverride():
  1. manualOverrideService.createOrUpdateOverride()  ← override saved
  2. stagingService.updateFields()                    ← staging updated

Publishing SQS from createOrUpdateOverride() means the diff worker could run against stale staging data.

Fix: Move SQS publish + entity-change emit to each handler, after all mutations complete. This also eliminates @repo/queue coupling from @repo/game-admin (fixes R2-#5 too). Trade-off: 3 call sites instead of 1, but correctness > DRY here.

Feedback R2-#2: No operator recovery path if SQS publish fails (High)

Valid. The previous plan deferred the generate-diff endpoint to Step 4 (hardening) but treated SQS publish failure as non-fatal with "diff can be triggered manually" — which wasn't true.

Fix: generate-diff endpoint is now part of Step 2 (core release). Ships in the same PR as SQS + stream removal. Operators always have a fallback.

Feedback R2-#3: FIFO dedup can drop legitimate import diff requests (Medium)

Valid. With contentBasedDeduplication: true, two imports of the same game within 5 minutes produce identical payloads ({gameSlug, trigger: "import"}) and SQS drops the second.

Fix: Include importRunId in the import publish payload. Each import run gets a unique ID, so content-based dedup sees distinct messages. Manual triggers already include entitySlug which differentiates them.

Feedback R2-#4: Duplicate event emissions kept intentionally (Medium)

Reconsidered. The previous plan kept dual emission "for cross-instance visibility." But with only 2 low-cardinality streams remaining, the justification is weak — especially for lock-change where LockService already emits directly on every acquire/release/progress update.

Fix: Pick one authoritative emitter per event type:

  • lock-change: Stream-only. Remove direct emits from LockService. The change stream catches all lock document mutations (including from other processes), making it the correct single source.
  • diff-change: Stream-only. Remove direct emit from DeploymentStatusService.updateDiffStatus(). The stream catches the diff document update.

This eliminates duplicate delivery entirely. The trade-off is slightly higher latency for same-process events (stream detection vs. synchronous emit), but lock/diff writes are infrequent and the stream latency is <100ms for low-cardinality collections.

Feedback R2-#5: @repo/game-admin@repo/queue coupling (Medium)

Valid. This is a Layer 2 domain package importing infrastructure. Handler-level publishing avoids it entirely.

Fix: All SQS publishes happen at handler level (Layer 4) or in Trigger tasks — never in domain services. @repo/game-admin stays free of @repo/queue. See Step 2e.


Revised Plan

Step 1: Reduce Import Task Concurrency (Day 1, 1-line change)

File: packages/backend/tasks/src/trigger/game-importer/import-game.task.ts:48

queue: {
  concurrencyLimit: 2, // was 10
},

Why: Fastest possible stabilization. Concurrent large imports multiply all write/stream amplification. Doesn't break anything.


Step 2: Add SQS Diff Generation + Generate-Diff Endpoint (Days 2-4)

2a. Queue config

File: packages/backend/queue/src/config.ts — add:

"game-admin-generate-diff": {
  name: "game-admin-generate-diff",
  type: "fifo",
  visibilityTimeout: 1800, // 30 min (diff generation for large games)
  messageRetentionPeriod: 86400, // 1 day
  maxMessageSize: 4096,
  consumer: {
    concurrency: 1, // one diff at a time per game
    batchSize: 1,
  },
  deadLetterQueue: {
    name: "game-admin-generate-diff-dlq",
    maxReceiveCount: 3,
  },
  fifoConfig: {
    contentBasedDeduplication: true,
    deduplicationScope: "messageGroup",
    fifoThroughputLimit: "perMessageGroupId",
  },
},

2b. DTO

New file: packages/backend/queue/src/dtos/game-admin-generate-diff.dto.ts

import { z } from "zod"

export const gameAdminGenerateDiffPayloadSchema = z.object({
  gameSlug: z.string(),
  trigger: z.enum(["import", "manual"]),
  entityType: z.string().optional(),
  entitySlug: z.string().optional(),
  importRunId: z.string().optional(),
})

export type GameAdminGenerateDiffPayload = z.infer<typeof gameAdminGenerateDiffPayloadSchema>

2c. SQS Handler

New file: packages/games/game-admin/src/services/diff/generate-diff.handler.ts

Extracts the core logic from DiffMonitorService.generateIncrementalDiff() into a standalone handler:

  • Acquires diff lock via LockService.attemptLockAcquisition()
  • Runs calculateExpansionCardCounts()
  • For trigger: "manual" with entityType/entitySlug: entity-specific diff (fast path via DiffService.generateAndPatchEntityDiff)
  • For trigger: "import": full diff via DiffService.generateDiff()
  • Stores diff document, diff details, impact analysis
  • Releases lock in finally block

Register in apps/backend/src/handlers/registry.ts.

2d. Publish from Trigger task (import path)

File: packages/backend/tasks/src/trigger/game-importer/import-game.task.ts

After importer.run() succeeds (line 173), when useStaging is true:

await importer.run(useCache, taskContext, matchProducts, autoApply, useStaging, lockId, forceRedownloadImages)

// Trigger diff generation via SQS (staging imports only)
if (useStaging) {
  try {
    const publisher = container.resolve(QueuePublisher)
    await publisher.publish(
      "game-admin-generate-diff",
      gameAdminGenerateDiffPayloadSchema,
      {
        gameSlug,
        trigger: "import",
        importRunId: ctx?.run?.id, // Unique per run — prevents FIFO dedup from dropping back-to-back imports
      },
      { messageGroupId: gameSlug },
    )
    logger.info(`Published diff generation request for ${gameSlug}`)
  } catch (error) {
    logger.error(`Failed to publish diff request for ${gameSlug}: ${error}`)
    // Import succeeded but diff won't auto-generate.
    // Operator can use the generate-diff endpoint as fallback.
  }
}

Why importRunId matters: With contentBasedDeduplication: true, SQS deduplicates by message body within a 5-minute window. Without importRunId, two imports of the same game within 5 minutes would produce identical payloads and the second would be silently dropped. Including the unique run ID ensures each import triggers its own diff.

2e. Publish from manual mutation handlers

SQS publish happens at handler level, after all mutations complete. This ensures staging data is consistent before diff generation runs.

create-override.handler.ts — after stagingService.updateFields():

const { override, transformedFields } =
  await manualOverrideService.createOrUpdateOverride(...)

await stagingService.updateFields(...)

// All mutations done — staging is consistent. Now trigger diff + notify UI.
gameAdminEventEmitter.emit("entity-change", {
  type: "entity-change",
  gameSlug: input.gameSlug,
  entitySlug: input.entitySlug,
  entityType: input.entityType,
  changeType: "updated",
  source: "manual",
  timestamp: new Date(),
})

const publisher = context.container.resolve(QueuePublisher)
await publisher.publish(
  "game-admin-generate-diff",
  gameAdminGenerateDiffPayloadSchema,
  { gameSlug: input.gameSlug, trigger: "manual", entityType: input.entityType, entitySlug: input.entitySlug },
  { messageGroupId: input.gameSlug },
)

create-entity.handler.ts — after stagingService.createEntity():

const { override } = await manualOverrideService.createOrUpdateOverride(...)
const entity = await stagingService.createEntity(...)

// All mutations done.
gameAdminEventEmitter.emit("entity-change", {
  type: "entity-change",
  gameSlug: input.gameSlug,
  entitySlug,
  entityType: input.entityType,
  changeType: "created",
  source: "manual",
  timestamp: new Date(),
})

const publisher = context.container.resolve(QueuePublisher)
await publisher.publish(
  "game-admin-generate-diff",
  gameAdminGenerateDiffPayloadSchema,
  { gameSlug: input.gameSlug, trigger: "manual", entityType: input.entityType, entitySlug },
  { messageGroupId: input.gameSlug },
)

upload-image.handler.ts — after imageUploadService.uploadImageAndCreateOverride() returns (this method internally does both override + staging update):

const result = await imageUploadService.uploadImageAndCreateOverride(...)

// uploadImageAndCreateOverride() does override + staging mutation internally.
gameAdminEventEmitter.emit("entity-change", {
  type: "entity-change",
  gameSlug: input.gameSlug,
  entitySlug: input.entitySlug,
  entityType: input.entityType,
  changeType: "updated",
  source: "manual",
  timestamp: new Date(),
})

const publisher = context.container.resolve(QueuePublisher)
await publisher.publish(
  "game-admin-generate-diff",
  gameAdminGenerateDiffPayloadSchema,
  { gameSlug: input.gameSlug, trigger: "manual", entityType: input.entityType, entitySlug: input.entitySlug },
  { messageGroupId: input.gameSlug },
)

return result

Why handler-level, not service-level:

  1. Correctness: The handler knows when all mutations are done. The service only knows about its own step.
  2. No @repo/queue coupling in @repo/game-admin: Handlers are Layer 4 (@repo/api), which can depend on anything. Domain packages stay clean.
  3. Explicitness: Each publish is visible in the handler — easy to audit which mutations trigger diffs.

The trade-off is 3 call sites instead of 1. A shared helper function (e.g. publishManualDiffRequest(context, input)) can reduce boilerplate if desired.

2f. Generate-diff admin endpoint (manual fallback)

Ships in the same release as SQS + stream removal. Provides operator recovery if SQS publish fails.

Contract: packages/games/game-admin/src/contracts/diff/generate-diff.ts

export const generateDiffContract = oc
  .input(z.object({ gameSlug: z.string() }))
  .output(z.object({ message: z.string() }))

Handler: packages/backend/api/src/lib/orpc/handlers/game-admin/diff/generate-diff.handler.ts

The handler publishes to the same game-admin-generate-diff SQS queue (does NOT run diff inline — that would block the HTTP response). Returns immediately with "Diff generation queued."

Wire into the diff router and add a button in the admin UI.


Step 3: Remove DiffMonitorService + Strip ChangeStreamService (Same release as Step 2)

Critical: Steps 2 and 3 ship as one atomic release. No gap.

3a. Remove DiffMonitorService initialization

File: apps/backend/src/cardnexus-api.ts — remove lines 121-130.

3b. Strip ChangeStreamService to low-cardinality only

File: packages/games/game-admin/src/services/stream/change-stream.service.ts

Remove:

  • setupImportDataStream() + handleImportDataChange() + activeImportRuns map
  • setupManualOverrideStream() + handleManualOverrideChange()
  • setupStagingStreams() + handleStagingChange()

Keep:

  • setupDiffStream() + handleDiffChange()diff-change for admin UI
  • setupLockStream() + handleLockChange()lock-change for admin UI
  • Stream lifecycle methods (createChangeStream, recreateStream, close, etc.)

Updated initialize():

async initialize() {
  this.connection = await getGameAdminConnection()
  await this.setupDiffStream()
  await this.setupLockStream()
  logger.info("Change stream service initialized (locks + diffs only)")
}

Result: 14 streams → 2 streams.

3c. Resolve duplicate event emission — single authoritative path

lock-change: Stream-only.

  • Remove all gameAdminEventEmitter.emit("lock-change", ...) calls from LockService (lines 64, 131, 177, 344, 371 of lock.service.ts).
  • The change stream on importlocks is the single source. It catches all lock mutations including from other backend instances.

diff-change: Stream-only.

  • Remove gameAdminEventEmitter.emit("diff-change", ...) from DeploymentStatusService.updateDiffStatus() (line 97 of deployment-status.service.ts).
  • The change stream on stagingdiffs is the single source.

deploy-status: Direct-only (no change needed).

  • DeploymentService emits deploy-status directly. There is no change stream on deployment progress — this was always direct-only.

entity-change: Direct-only (handled by Step 2e).

  • Emitted from handlers after manual mutations. No change stream needed.
Event Authoritative Path Why
lock-change Change stream on importlocks Catches cross-instance lock ops; low cardinality
diff-change Change stream on stagingdiffs Catches cross-instance diff writes; low cardinality
deploy-status Direct emit from DeploymentService Deployment runs in one process; no cross-instance need
entity-change Direct emit from handlers Manual mutations are synchronous request-scoped

Step 4: Hardening and Observability (Week 2)

4a. Exponential backoff for stream reconnection

File: change-stream.service.ts — replace fixed 5-second retry:

private async recreateStream(options: ChangeStreamOptions, attempt = 0) {
  const MAX_RETRIES = 5
  const backoff = Math.min(5000 * 2 ** attempt, 60000)

  if (attempt >= MAX_RETRIES) {
    logger.error(`Giving up on stream ${options.collection} after ${MAX_RETRIES} attempts`)
    return
  }

  const existing = this.streams.get(options.collection)
  if (existing && !existing.closed) {
    await existing.close()
  }
  this.streams.delete(options.collection)

  setTimeout(async () => {
    try {
      const newStream = await this.createChangeStream(options)
      this.streams.set(options.collection, newStream)
    } catch (error) {
      this.recreateStream(options, attempt + 1)
    }
  }, backoff)
}

4b. Observability

  • SQS: queue depth, age of oldest message, DLQ count for game-admin-generate-diff
  • MongoDB: change stream open cursor count (expect exactly 2 per instance), oplog window
  • Trigger: task completion latency, diff generation duration

4c. Leader election for streams (optional)

With 2 streams on low-cardinality collections, this is low priority. Consider if scaling to many instances.


Architecture After Remediation

┌──────────────────────────────────────────────────────────────┐
│ Admin UI: triggerGameImport()                                 │
│  → API acquires lock, triggers Trigger.dev task               │
└──────────────────────────┬───────────────────────────────────┘
                           │ Trigger.dev
                           ▼
┌──────────────────────────────────────────────────────────────┐
│ Trigger.dev: importGame task                                  │
│  1. Write ImportData (bulkWrite batches)                      │
│  2. performDirectMerge() → staging tables (bulkWrite)         │
│  3. Release import lock                                       │
│  4. Publish SQS: game-admin-generate-diff                     │
│     {trigger:"import", importRunId: unique}                   │
└──────────────────────────┬───────────────────────────────────┘
                           │ SQS FIFO (gameSlug group)
                           ▼
┌──────────────────────────────────────────────────────────────┐
│ SQS Worker: GameAdminGenerateDiffHandler                      │
│  1. Acquire diff lock                                         │
│  2. Calculate expansion card counts                           │
│  3. Generate diff (full or entity-specific)                   │
│  4. Store diff → stagingdiffs collection                      │
│  5. Release diff lock                                         │
└──────────────────────────┬───────────────────────────────────┘
                           │ write to stagingdiffs
                           ▼
┌──────────────────────────────────────────────────────────────┐
│ ChangeStreamService (2 streams — single authoritative source) │
│  • stagingdiffs  → "diff-change"  → admin WebSocket           │
│  • importlocks   → "lock-change"  → admin WebSocket           │
└──────────────────────────────────────────────────────────────┘

┌──────────────────────────────────────────────────────────────┐
│ Manual Mutation Handlers                                      │
│  (create-override, create-entity, upload-image)               │
│                                                               │
│  1. ManualOverrideService.createOrUpdateOverride()            │
│  2. StagingService.updateFields() / createEntity()            │
│  ── all mutations done ──                                     │
│  3. Emit "entity-change" directly (admin toast)               │
│  4. Publish SQS: game-admin-generate-diff {trigger:"manual"}  │
└──────────────────────────────────────────────────────────────┘

┌──────────────────────────────────────────────────────────────┐
│ generate-diff endpoint (manual fallback)                      │
│  → Publishes to same SQS queue                                │
│  → Operator can trigger from admin UI if SQS publish failed   │
└──────────────────────────────────────────────────────────────┘

┌──────────────────────────────────────────────────────────────┐
│ Admin Frontend (WebSocket)                                     │
│  • lock-change    → lock/unlock status    (stream only)       │
│  • diff-change    → "new diff available"  (stream only)       │
│  • deploy-status  → deployment progress   (direct emit only)  │
│  • entity-change  → manual override toast (direct emit only)  │
└──────────────────────────────────────────────────────────────┘

Implementation Checklist

Step 1 — Immediate (Day 1)

  • Reduce importGame concurrency 10 → 2 (import-game.task.ts:48)

Step 2 — SQS Diff Generation (Days 2-4)

  • Add game-admin-generate-diff queue config (queue/src/config.ts)
  • Add DTO (queue/src/dtos/game-admin-generate-diff.dto.ts)
  • Export from queue/src/dtos/index.ts
  • Create GameAdminGenerateDiffHandler (extract from DiffMonitorService.generateIncrementalDiff)
  • Register handler in apps/backend/src/handlers/registry.ts
  • Publish from import-game.task.ts after importer.run() — include importRunId
  • Add SQS publish + entity-change emit in all 3 manual handlers (after all mutations)
  • Add generate-diff contract + handler (publishes to same SQS queue)
  • Wire generate-diff into diff router
  • Provision queue in Terraform (dev/staging/prod)

Step 3 — Stream Reduction (Same release as Step 2)

  • Remove DiffMonitorService initialization from cardnexus-api.ts
  • Strip ChangeStreamService to setupDiffStream() + setupLockStream() only
  • Remove direct lock-change emits from LockService (5 call sites)
  • Remove direct diff-change emit from DeploymentStatusService.updateDiffStatus()
  • Delete DiffMonitorService file

Step 4 — Hardening (Week 2)

  • Exponential backoff + max retries for stream reconnection
  • SQS dashboards (queue depth, DLQ count)
  • MongoDB alerts (oplog window, open cursor count)
  • Admin UI button for generate-diff endpoint
  • Consider leader election for remaining 2 streams (optional)

Impact Summary

Metric Before After
Change stream cursors per instance 14 2
Change stream events per 150K import ~600K handler invocations Low tens (mostly lock progress updates + diff writes)
fullDocument: "updateLookup" reads per import ~600K Low tens (same low-cardinality lock/diff writes)
Diff generation triggers per import Hundreds of debounced events → 1 1 SQS message → 1
Write amplification feedback loops Yes None
Max concurrent imports 10 2
Duplicate event delivery to frontend Yes (lock-change, diff-change) No (single authoritative path per event)
@repo/game-admin@repo/queue coupling N/A None (publishes at handler/task level only)
Operator fallback if SQS fails None generate-diff endpoint

Rollback

  • Step 1: Revert concurrency to 10 (1-line change).
  • Steps 2+3 (shipped together): Re-enable DiffMonitorService in cardnexus-api.ts, restore full ChangeStreamService, restore direct emits in LockService/DeploymentStatusService. SQS messages accumulate harmlessly.
  • Step 4: Independent, no rollback needed.

For safe rollout, use a feature flag GAME_ADMIN_DIFF_ORCHESTRATOR = "sqs" | "legacy":

  • "legacy": DiffMonitorService starts, SQS publishes are skipped, direct emits active
  • "sqs": DiffMonitorService disabled, SQS publishes active, stream-only emits

This allows instant rollback without a deploy.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment