Incorporates findings from claude.rca.md, codex.rca.md, remediation.md, codex-remediation.md, and two rounds of Codex feedback.
Valid. No generate-diff contract or handler exists. Fixed: generate-diff endpoint is now part of the core release (Step 2), not deferred.
Valid. Three handlers mutate staging: create-override, create-entity, upload-image. All flow through ManualOverrideService.createOrUpdateOverride(). Fixed: SQS publish is centralized at handler level after all mutations complete (see Round 2 fix below).
Valid. @repo/game-importer has no @repo/queue dependency. Fixed: publish happens in the Trigger task (import-game.task.ts) after importer.run().
Valid. Fixed: pick one authoritative path per event type (see Step 3c).
Partially valid. Debounce replacement logic means only the last event per 3-second window runs queries. The cost is handler churn (150K invocations scheduling/cancelling timeouts), not 150K query pairs.
Valid. Fixed: Steps 2 and 3 ship as one atomic release.
Noted, deferred to Step 4. 2 streams on single-digit-write collections is negligible even at N instances.
Valid. This was a real bug in the previous plan.
The previous plan put SQS publish inside ManualOverrideService.createOrUpdateOverride(). But in all three handlers, the staging mutation happens after that call:
create-override.handler.ts:
1. manualOverrideService.createOrUpdateOverride() ← override saved
2. stagingService.updateFields() ← staging updated
create-entity.handler.ts:
1. manualOverrideService.createOrUpdateOverride() ← override saved
2. stagingService.createEntity() ← staging created
upload-image.handler.ts → imageUploadService.uploadImageAndCreateOverride():
1. manualOverrideService.createOrUpdateOverride() ← override saved
2. stagingService.updateFields() ← staging updated
Publishing SQS from createOrUpdateOverride() means the diff worker could run against stale staging data.
Fix: Move SQS publish + entity-change emit to each handler, after all mutations complete. This also eliminates @repo/queue coupling from @repo/game-admin (fixes R2-#5 too). Trade-off: 3 call sites instead of 1, but correctness > DRY here.
Valid. The previous plan deferred the generate-diff endpoint to Step 4 (hardening) but treated SQS publish failure as non-fatal with "diff can be triggered manually" — which wasn't true.
Fix: generate-diff endpoint is now part of Step 2 (core release). Ships in the same PR as SQS + stream removal. Operators always have a fallback.
Valid. With contentBasedDeduplication: true, two imports of the same game within 5 minutes produce identical payloads ({gameSlug, trigger: "import"}) and SQS drops the second.
Fix: Include importRunId in the import publish payload. Each import run gets a unique ID, so content-based dedup sees distinct messages. Manual triggers already include entitySlug which differentiates them.
Reconsidered. The previous plan kept dual emission "for cross-instance visibility." But with only 2 low-cardinality streams remaining, the justification is weak — especially for lock-change where LockService already emits directly on every acquire/release/progress update.
Fix: Pick one authoritative emitter per event type:
lock-change: Stream-only. Remove direct emits fromLockService. The change stream catches all lock document mutations (including from other processes), making it the correct single source.diff-change: Stream-only. Remove direct emit fromDeploymentStatusService.updateDiffStatus(). The stream catches the diff document update.
This eliminates duplicate delivery entirely. The trade-off is slightly higher latency for same-process events (stream detection vs. synchronous emit), but lock/diff writes are infrequent and the stream latency is <100ms for low-cardinality collections.
Valid. This is a Layer 2 domain package importing infrastructure. Handler-level publishing avoids it entirely.
Fix: All SQS publishes happen at handler level (Layer 4) or in Trigger tasks — never in domain services. @repo/game-admin stays free of @repo/queue. See Step 2e.
File: packages/backend/tasks/src/trigger/game-importer/import-game.task.ts:48
queue: {
concurrencyLimit: 2, // was 10
},Why: Fastest possible stabilization. Concurrent large imports multiply all write/stream amplification. Doesn't break anything.
File: packages/backend/queue/src/config.ts — add:
"game-admin-generate-diff": {
name: "game-admin-generate-diff",
type: "fifo",
visibilityTimeout: 1800, // 30 min (diff generation for large games)
messageRetentionPeriod: 86400, // 1 day
maxMessageSize: 4096,
consumer: {
concurrency: 1, // one diff at a time per game
batchSize: 1,
},
deadLetterQueue: {
name: "game-admin-generate-diff-dlq",
maxReceiveCount: 3,
},
fifoConfig: {
contentBasedDeduplication: true,
deduplicationScope: "messageGroup",
fifoThroughputLimit: "perMessageGroupId",
},
},New file: packages/backend/queue/src/dtos/game-admin-generate-diff.dto.ts
import { z } from "zod"
export const gameAdminGenerateDiffPayloadSchema = z.object({
gameSlug: z.string(),
trigger: z.enum(["import", "manual"]),
entityType: z.string().optional(),
entitySlug: z.string().optional(),
importRunId: z.string().optional(),
})
export type GameAdminGenerateDiffPayload = z.infer<typeof gameAdminGenerateDiffPayloadSchema>New file: packages/games/game-admin/src/services/diff/generate-diff.handler.ts
Extracts the core logic from DiffMonitorService.generateIncrementalDiff() into a standalone handler:
- Acquires diff lock via
LockService.attemptLockAcquisition() - Runs
calculateExpansionCardCounts() - For
trigger: "manual"with entityType/entitySlug: entity-specific diff (fast path viaDiffService.generateAndPatchEntityDiff) - For
trigger: "import": full diff viaDiffService.generateDiff() - Stores diff document, diff details, impact analysis
- Releases lock in
finallyblock
Register in apps/backend/src/handlers/registry.ts.
File: packages/backend/tasks/src/trigger/game-importer/import-game.task.ts
After importer.run() succeeds (line 173), when useStaging is true:
await importer.run(useCache, taskContext, matchProducts, autoApply, useStaging, lockId, forceRedownloadImages)
// Trigger diff generation via SQS (staging imports only)
if (useStaging) {
try {
const publisher = container.resolve(QueuePublisher)
await publisher.publish(
"game-admin-generate-diff",
gameAdminGenerateDiffPayloadSchema,
{
gameSlug,
trigger: "import",
importRunId: ctx?.run?.id, // Unique per run — prevents FIFO dedup from dropping back-to-back imports
},
{ messageGroupId: gameSlug },
)
logger.info(`Published diff generation request for ${gameSlug}`)
} catch (error) {
logger.error(`Failed to publish diff request for ${gameSlug}: ${error}`)
// Import succeeded but diff won't auto-generate.
// Operator can use the generate-diff endpoint as fallback.
}
}Why importRunId matters: With contentBasedDeduplication: true, SQS deduplicates by message body within a 5-minute window. Without importRunId, two imports of the same game within 5 minutes would produce identical payloads and the second would be silently dropped. Including the unique run ID ensures each import triggers its own diff.
SQS publish happens at handler level, after all mutations complete. This ensures staging data is consistent before diff generation runs.
create-override.handler.ts — after stagingService.updateFields():
const { override, transformedFields } =
await manualOverrideService.createOrUpdateOverride(...)
await stagingService.updateFields(...)
// All mutations done — staging is consistent. Now trigger diff + notify UI.
gameAdminEventEmitter.emit("entity-change", {
type: "entity-change",
gameSlug: input.gameSlug,
entitySlug: input.entitySlug,
entityType: input.entityType,
changeType: "updated",
source: "manual",
timestamp: new Date(),
})
const publisher = context.container.resolve(QueuePublisher)
await publisher.publish(
"game-admin-generate-diff",
gameAdminGenerateDiffPayloadSchema,
{ gameSlug: input.gameSlug, trigger: "manual", entityType: input.entityType, entitySlug: input.entitySlug },
{ messageGroupId: input.gameSlug },
)create-entity.handler.ts — after stagingService.createEntity():
const { override } = await manualOverrideService.createOrUpdateOverride(...)
const entity = await stagingService.createEntity(...)
// All mutations done.
gameAdminEventEmitter.emit("entity-change", {
type: "entity-change",
gameSlug: input.gameSlug,
entitySlug,
entityType: input.entityType,
changeType: "created",
source: "manual",
timestamp: new Date(),
})
const publisher = context.container.resolve(QueuePublisher)
await publisher.publish(
"game-admin-generate-diff",
gameAdminGenerateDiffPayloadSchema,
{ gameSlug: input.gameSlug, trigger: "manual", entityType: input.entityType, entitySlug },
{ messageGroupId: input.gameSlug },
)upload-image.handler.ts — after imageUploadService.uploadImageAndCreateOverride() returns (this method internally does both override + staging update):
const result = await imageUploadService.uploadImageAndCreateOverride(...)
// uploadImageAndCreateOverride() does override + staging mutation internally.
gameAdminEventEmitter.emit("entity-change", {
type: "entity-change",
gameSlug: input.gameSlug,
entitySlug: input.entitySlug,
entityType: input.entityType,
changeType: "updated",
source: "manual",
timestamp: new Date(),
})
const publisher = context.container.resolve(QueuePublisher)
await publisher.publish(
"game-admin-generate-diff",
gameAdminGenerateDiffPayloadSchema,
{ gameSlug: input.gameSlug, trigger: "manual", entityType: input.entityType, entitySlug: input.entitySlug },
{ messageGroupId: input.gameSlug },
)
return resultWhy handler-level, not service-level:
- Correctness: The handler knows when all mutations are done. The service only knows about its own step.
- No
@repo/queuecoupling in@repo/game-admin: Handlers are Layer 4 (@repo/api), which can depend on anything. Domain packages stay clean. - Explicitness: Each publish is visible in the handler — easy to audit which mutations trigger diffs.
The trade-off is 3 call sites instead of 1. A shared helper function (e.g. publishManualDiffRequest(context, input)) can reduce boilerplate if desired.
Ships in the same release as SQS + stream removal. Provides operator recovery if SQS publish fails.
Contract: packages/games/game-admin/src/contracts/diff/generate-diff.ts
export const generateDiffContract = oc
.input(z.object({ gameSlug: z.string() }))
.output(z.object({ message: z.string() }))Handler: packages/backend/api/src/lib/orpc/handlers/game-admin/diff/generate-diff.handler.ts
The handler publishes to the same game-admin-generate-diff SQS queue (does NOT run diff inline — that would block the HTTP response). Returns immediately with "Diff generation queued."
Wire into the diff router and add a button in the admin UI.
Critical: Steps 2 and 3 ship as one atomic release. No gap.
File: apps/backend/src/cardnexus-api.ts — remove lines 121-130.
File: packages/games/game-admin/src/services/stream/change-stream.service.ts
Remove:
setupImportDataStream()+handleImportDataChange()+activeImportRunsmapsetupManualOverrideStream()+handleManualOverrideChange()setupStagingStreams()+handleStagingChange()
Keep:
setupDiffStream()+handleDiffChange()—diff-changefor admin UIsetupLockStream()+handleLockChange()—lock-changefor admin UI- Stream lifecycle methods (
createChangeStream,recreateStream,close, etc.)
Updated initialize():
async initialize() {
this.connection = await getGameAdminConnection()
await this.setupDiffStream()
await this.setupLockStream()
logger.info("Change stream service initialized (locks + diffs only)")
}Result: 14 streams → 2 streams.
lock-change: Stream-only.
- Remove all
gameAdminEventEmitter.emit("lock-change", ...)calls fromLockService(lines 64, 131, 177, 344, 371 oflock.service.ts). - The change stream on
importlocksis the single source. It catches all lock mutations including from other backend instances.
diff-change: Stream-only.
- Remove
gameAdminEventEmitter.emit("diff-change", ...)fromDeploymentStatusService.updateDiffStatus()(line 97 ofdeployment-status.service.ts). - The change stream on
stagingdiffsis the single source.
deploy-status: Direct-only (no change needed).
DeploymentServiceemitsdeploy-statusdirectly. There is no change stream on deployment progress — this was always direct-only.
entity-change: Direct-only (handled by Step 2e).
- Emitted from handlers after manual mutations. No change stream needed.
| Event | Authoritative Path | Why |
|---|---|---|
lock-change |
Change stream on importlocks |
Catches cross-instance lock ops; low cardinality |
diff-change |
Change stream on stagingdiffs |
Catches cross-instance diff writes; low cardinality |
deploy-status |
Direct emit from DeploymentService |
Deployment runs in one process; no cross-instance need |
entity-change |
Direct emit from handlers | Manual mutations are synchronous request-scoped |
File: change-stream.service.ts — replace fixed 5-second retry:
private async recreateStream(options: ChangeStreamOptions, attempt = 0) {
const MAX_RETRIES = 5
const backoff = Math.min(5000 * 2 ** attempt, 60000)
if (attempt >= MAX_RETRIES) {
logger.error(`Giving up on stream ${options.collection} after ${MAX_RETRIES} attempts`)
return
}
const existing = this.streams.get(options.collection)
if (existing && !existing.closed) {
await existing.close()
}
this.streams.delete(options.collection)
setTimeout(async () => {
try {
const newStream = await this.createChangeStream(options)
this.streams.set(options.collection, newStream)
} catch (error) {
this.recreateStream(options, attempt + 1)
}
}, backoff)
}- SQS: queue depth, age of oldest message, DLQ count for
game-admin-generate-diff - MongoDB: change stream open cursor count (expect exactly 2 per instance), oplog window
- Trigger: task completion latency, diff generation duration
With 2 streams on low-cardinality collections, this is low priority. Consider if scaling to many instances.
┌──────────────────────────────────────────────────────────────┐
│ Admin UI: triggerGameImport() │
│ → API acquires lock, triggers Trigger.dev task │
└──────────────────────────┬───────────────────────────────────┘
│ Trigger.dev
▼
┌──────────────────────────────────────────────────────────────┐
│ Trigger.dev: importGame task │
│ 1. Write ImportData (bulkWrite batches) │
│ 2. performDirectMerge() → staging tables (bulkWrite) │
│ 3. Release import lock │
│ 4. Publish SQS: game-admin-generate-diff │
│ {trigger:"import", importRunId: unique} │
└──────────────────────────┬───────────────────────────────────┘
│ SQS FIFO (gameSlug group)
▼
┌──────────────────────────────────────────────────────────────┐
│ SQS Worker: GameAdminGenerateDiffHandler │
│ 1. Acquire diff lock │
│ 2. Calculate expansion card counts │
│ 3. Generate diff (full or entity-specific) │
│ 4. Store diff → stagingdiffs collection │
│ 5. Release diff lock │
└──────────────────────────┬───────────────────────────────────┘
│ write to stagingdiffs
▼
┌──────────────────────────────────────────────────────────────┐
│ ChangeStreamService (2 streams — single authoritative source) │
│ • stagingdiffs → "diff-change" → admin WebSocket │
│ • importlocks → "lock-change" → admin WebSocket │
└──────────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────┐
│ Manual Mutation Handlers │
│ (create-override, create-entity, upload-image) │
│ │
│ 1. ManualOverrideService.createOrUpdateOverride() │
│ 2. StagingService.updateFields() / createEntity() │
│ ── all mutations done ── │
│ 3. Emit "entity-change" directly (admin toast) │
│ 4. Publish SQS: game-admin-generate-diff {trigger:"manual"} │
└──────────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────┐
│ generate-diff endpoint (manual fallback) │
│ → Publishes to same SQS queue │
│ → Operator can trigger from admin UI if SQS publish failed │
└──────────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────┐
│ Admin Frontend (WebSocket) │
│ • lock-change → lock/unlock status (stream only) │
│ • diff-change → "new diff available" (stream only) │
│ • deploy-status → deployment progress (direct emit only) │
│ • entity-change → manual override toast (direct emit only) │
└──────────────────────────────────────────────────────────────┘
- Reduce
importGameconcurrency 10 → 2 (import-game.task.ts:48)
- Add
game-admin-generate-diffqueue config (queue/src/config.ts) - Add DTO (
queue/src/dtos/game-admin-generate-diff.dto.ts) - Export from
queue/src/dtos/index.ts - Create
GameAdminGenerateDiffHandler(extract fromDiffMonitorService.generateIncrementalDiff) - Register handler in
apps/backend/src/handlers/registry.ts - Publish from
import-game.task.tsafterimporter.run()— includeimportRunId - Add SQS publish + entity-change emit in all 3 manual handlers (after all mutations)
- Add
generate-diffcontract + handler (publishes to same SQS queue) - Wire generate-diff into diff router
- Provision queue in Terraform (dev/staging/prod)
- Remove DiffMonitorService initialization from
cardnexus-api.ts - Strip ChangeStreamService to
setupDiffStream()+setupLockStream()only - Remove direct
lock-changeemits fromLockService(5 call sites) - Remove direct
diff-changeemit fromDeploymentStatusService.updateDiffStatus() - Delete
DiffMonitorServicefile
- Exponential backoff + max retries for stream reconnection
- SQS dashboards (queue depth, DLQ count)
- MongoDB alerts (oplog window, open cursor count)
- Admin UI button for generate-diff endpoint
- Consider leader election for remaining 2 streams (optional)
| Metric | Before | After |
|---|---|---|
| Change stream cursors per instance | 14 | 2 |
| Change stream events per 150K import | ~600K handler invocations | Low tens (mostly lock progress updates + diff writes) |
fullDocument: "updateLookup" reads per import |
~600K | Low tens (same low-cardinality lock/diff writes) |
| Diff generation triggers per import | Hundreds of debounced events → 1 | 1 SQS message → 1 |
| Write amplification feedback loops | Yes | None |
| Max concurrent imports | 10 | 2 |
| Duplicate event delivery to frontend | Yes (lock-change, diff-change) | No (single authoritative path per event) |
@repo/game-admin → @repo/queue coupling |
N/A | None (publishes at handler/task level only) |
| Operator fallback if SQS fails | None | generate-diff endpoint |
- Step 1: Revert concurrency to 10 (1-line change).
- Steps 2+3 (shipped together): Re-enable DiffMonitorService in
cardnexus-api.ts, restore full ChangeStreamService, restore direct emits in LockService/DeploymentStatusService. SQS messages accumulate harmlessly. - Step 4: Independent, no rollback needed.
For safe rollout, use a feature flag GAME_ADMIN_DIFF_ORCHESTRATOR = "sqs" | "legacy":
"legacy": DiffMonitorService starts, SQS publishes are skipped, direct emits active"sqs": DiffMonitorService disabled, SQS publishes active, stream-only emits
This allows instant rollback without a deploy.