Skip to content

Instantly share code, notes, and snippets.

@antoinelyset
Created December 5, 2025 16:53
Show Gist options
  • Select an option

  • Save antoinelyset/837d742cdfd91499662c7b30689dc897 to your computer and use it in GitHub Desktop.

Select an option

Save antoinelyset/837d742cdfd91499662c7b30689dc897 to your computer and use it in GitHub Desktop.

Queue Consolidation Proposal

Current Problem

Each super source type and node type creates its own queue and worker:

N super sources × M node types = N×M queues/workers

Example with current sources:

  • Linear (LinearTeam → LinearIssue, LinearComment)
  • Notion (NotionWorkspace → NotionPage, NotionDatabase)
  • GitHub (GithubRepo → GithubIssue, GithubPullRequest)
  • Google Drive, Asana, Attio, Hubspot, Git, GitLab, Website...

This results in 20+ queues/workers, and grows with every new source.

Problems

  1. Memory: Each worker consumes memory even when idle
  2. Complexity: Adding a new source requires no code changes but spawns new workers
  3. Monitoring: Hard to monitor 20+ queues
  4. Resource waste: Most queues are idle most of the time

Proposed Solution: Speed Tier Queues

Instead of per-source queues, consolidate into speed tiers based on API rate limits and concurrency needs.

Why Speed Tiers?

Different sources have different API constraints:

  • Fast APIs (Linear, GitHub): Can handle high concurrency (5+)
  • Slow APIs (Notion, Google Drive): Need lower concurrency (2) due to rate limits

Using a single queue with groups would require either:

  • Grouping without organizationId → lose per-org isolation
  • Dynamic setGroupConcurrency per org → complex and impractical

Speed tiers give us:

  • ✅ Per-source concurrency tuning
  • ✅ Per-org isolation (via groups within each tier)
  • ✅ Fewer queues (6 vs 20+)
  • ✅ Simple to add new sources (assign to a tier)

Queue Structure

Queue Concurrency Sources
ImportSuperSourceRootFast 1 Linear, GitHub, Git, GitLab
ImportSuperSourceRootSlow 1 Notion, Google Drive, Attio, Hubspot, Asana
IndexSuperSourceNodeFast 5 Linear, GitHub, Git, GitLab
IndexSuperSourceNodeSlow 2 Notion, Google Drive, Attio, Hubspot, Asana
HandleSuperWebhookFast 5 Linear, GitHub
HandleSuperWebhookSlow 2 Others

Total: 6 queues (vs 20+ currently)

Implementation

1. Define Speed Tiers

// src/workers/utils/super-source-tiers.ts

export enum SuperSourceTier {
  FAST = 'Fast',
  SLOW = 'Slow',
}

export const SUPER_SOURCE_TIER_CONFIG: Record<SuperSourceTier, {
  importConcurrency: number
  indexConcurrency: number
  webhookConcurrency: number
}> = {
  [SuperSourceTier.FAST]: {
    importConcurrency: 1,
    indexConcurrency: 5,
    webhookConcurrency: 5,
  },
  [SuperSourceTier.SLOW]: {
    importConcurrency: 1,
    indexConcurrency: 2,
    webhookConcurrency: 2,
  },
}

// Map each source to a tier
export const SUPER_SOURCE_TO_TIER: Record<SuperSource, SuperSourceTier> = {
  [SuperSource.Linear]: SuperSourceTier.FAST,
  [SuperSource.GitHub]: SuperSourceTier.FAST,
  [SuperSource.Git]: SuperSourceTier.FAST,
  [SuperSource.GitLab]: SuperSourceTier.FAST,
  [SuperSource.Notion]: SuperSourceTier.SLOW,
  [SuperSource.GoogleDrive]: SuperSourceTier.SLOW,
  [SuperSource.Attio]: SuperSourceTier.SLOW,
  [SuperSource.Hubspot]: SuperSourceTier.SLOW,
  [SuperSource.Asana]: SuperSourceTier.SLOW,
  [SuperSource.Website]: SuperSourceTier.SLOW,
  // Add new sources here
}

export function getTierForSource(source: SuperSource): SuperSourceTier {
  return SUPER_SOURCE_TO_TIER[source] ?? SuperSourceTier.SLOW
}

2. Unified Import Queue (per tier)

// src/workers/utils/queue.ts

type SuperImportPayload = {
  superSource: SuperSource
  superRootNodeType: SuperRootNodeType
  organizationId: string
  rootNodeId: string
  rootNodeConnectionId: string
  firstRun?: boolean
  progress?: Record<string, unknown>
}

export function makeSuperImportWorkerAndQueue(
  services: QueueServiceDependencies,
  tier: SuperSourceTier
) {
  const tierConfig = SUPER_SOURCE_TIER_CONFIG[tier]

  return makeSliteWorkerAndQueue<void, SuperImportPayload>({
    queueName: `ImportSuperSourceRoot${tier}`,
    factoryOptions: {
      defaultQueueOptions: {
        unique: {
          byKeys: ['organizationId', 'rootNodeConnectionId', 'rootNodeId'],
        },
        removeOnFail: false,
        retryStrategy: {
          attempts: 5,
          backoff: { type: 'custom' },
        },
        group: { byKeys: ['organizationId'] },
      },
      defaultWorkerOptions: {
        group: {
          concurrency: tierConfig.importConcurrency,
        },
        settings: {
          backoffStrategy: externalServiceBackoffStrategy,
        },
      },
    },

    async processor(processorServices, job, token) {
      const {
        superSource,
        superRootNodeType,
        rootNodeId,
        rootNodeConnectionId,
        organizationId,
        progress,
        firstRun,
      } = job.data

      // Check rate limit
      const resetAt = await checkRateLimitAndGetResetAt({
        services: setupServicesForSuperSourceConnector(processorServices),
        connectionId: rootNodeConnectionId,
        source: superSource,
      })

      if (resetAt !== null) {
        await job.moveToDelayed(resetAt.getTime(), token)
        throw new DelayedError('Rate limit exceeded, retry later')
      }

      try {
        await SystemImportsSuperRootNode(processorServices, {
          superSource,
          superRootNodeType,
          organizationId,
          rootNodeId,
          rootNodeConnectionId,
          progress,
          firstRun,
          job,
        })
      } catch (error) {
        if (error instanceof RateLimitExceededError) {
          await job.moveToDelayed(Date.now() + error.retryAfterMs, token)
          throw new DelayedError('Rate limit exceeded, retry later')
        }
        throw error
      }
    },
  })
}

3. Unified Index Queue (per tier)

type SuperIndexPayload = {
  superSource: SuperSource
  superRootNodeType: SuperRootNodeType
  superIndexableNodeType: SuperIndexableNodeType
  organizationId: string
  rootNodeId: string
  rootNodeConnectionId: string
  nodeId: string
  node: SuperSourceIndexableNodeWithFragmentId
  synchronizationDate?: string
}

export function makeSuperIndexWorkerAndQueue(
  services: QueueServiceDependencies,
  tier: SuperSourceTier
) {
  const tierConfig = SUPER_SOURCE_TIER_CONFIG[tier]

  return makeSliteWorkerAndQueue<void, SuperIndexPayload>({
    queueName: `IndexSuperSourceNode${tier}`,
    factoryOptions: {
      defaultQueueOptions: {
        unique: {
          byKeys: ['organizationId', 'rootNodeConnectionId', 'rootNodeId', 'nodeId'],
        },
        removeOnFail: false,
        retryStrategy: {
          attempts: 5,
          backoff: { type: 'custom' },
        },
        group: { byKeys: ['organizationId'] },
      },
      defaultWorkerOptions: {
        group: {
          concurrency: tierConfig.indexConcurrency,
        },
        settings: {
          backoffStrategy: externalServiceBackoffStrategy,
        },
      },
    },

    async processor(processorServices, job, token) {
      const {
        superSource,
        superRootNodeType,
        superIndexableNodeType,
        synchronizationDate: synchronizationDateStr,
        rootNodeId,
        rootNodeConnectionId,
        node,
        organizationId,
      } = job.data

      const synchronizationDate = synchronizationDateStr
        ? new Date(synchronizationDateStr)
        : undefined

      // Check rate limit
      const resetAt = await checkRateLimitAndGetResetAt({
        services: setupServicesForSuperSourceConnector(processorServices),
        connectionId: rootNodeConnectionId,
        source: superSource,
      })

      if (resetAt) {
        await job.moveToDelayed(resetAt.getTime(), token)
        throw new DelayedError('Rate limit exceeded, retry later')
      }

      try {
        const reindexed = await SystemIndexesSuperIndexableNode(processorServices, {
          superSource,
          superIndexableNodeType,
          organizationId,
          rootNodeId,
          connectionId: rootNodeConnectionId,
          node,
          synchronizationDate,
        })

        if (reindexed) {
          processorServices.metrics.increment(
            `super.${slugifySuperSource(superSource)}.indexation`
          )
        } else {
          processorServices.metrics.increment(
            `super.${slugifySuperSource(superSource)}.already_indexed`
          )
        }
      } catch (error) {
        if (error instanceof RateLimitExceededError) {
          await job.moveToDelayed(Date.now() + error.retryAfterMs, token)
          throw new DelayedError('Rate limit exceeded, retry later')
        }
        throw error
      }
    },
  })
}

4. Unified Webhook Queue (per tier)

type SuperWebhookPayload = {
  superSource: SuperSource
  webhookPath: string | undefined
  data: SuperSourceWebhookOnReceiveOutput
}

export function makeSuperWebhookWorkerAndQueue(
  services: QueueServiceDependencies,
  tier: SuperSourceTier
) {
  const tierConfig = SUPER_SOURCE_TIER_CONFIG[tier]

  return makeSliteWorkerAndQueue<void, SuperWebhookPayload>({
    queueName: `HandleSuperWebhook${tier}`,
    factoryOptions: {
      defaultQueueOptions: {
        removeOnFail: false,
        retryStrategy: {
          attempts: 5,
          backoff: { type: 'custom' },
        },
      },
      defaultWorkerOptions: {
        concurrency: tierConfig.webhookConcurrency,
        settings: {
          backoffStrategy: externalServiceBackoffStrategy,
        },
      },
    },

    async processor(processorServices, job, token) {
      const { superSource, webhookPath, data } = job.data

      const superSourceConfig = SuperSources[superSource]
      const webhook = superSourceConfig?.webhooks?.find(w => w.path === webhookPath)

      if (!webhook?.processor) {
        processorServices.logger.warn(
          `No webhook processor found for ${superSource}/${webhookPath}`
        )
        return
      }

      try {
        const indexationParamsToSchedule = await webhook.processor(
          setupServicesForSuperSourceConnector(processorServices),
          data
        )

        if (indexationParamsToSchedule) {
          await scheduleNodeIndexations(processorServices, {
            indexationParamsToSchedule,
            superSource,
          })
        }
      } catch (error) {
        if (error instanceof RateLimitExceededError) {
          await job.moveToDelayed(Date.now() + error.retryAfterMs, token)
          throw new DelayedError('Rate limit exceeded, retry later')
        }
        throw error
      }
    },
  })
}

5. Service Setup

// src/services/superQueues.ts

export function setupSuperQueues(services: QueueServiceDependencies) {
  // Create queues for each tier
  const importFast = makeSuperImportWorkerAndQueue(services, SuperSourceTier.FAST)
  const importSlow = makeSuperImportWorkerAndQueue(services, SuperSourceTier.SLOW)

  const indexFast = makeSuperIndexWorkerAndQueue(services, SuperSourceTier.FAST)
  const indexSlow = makeSuperIndexWorkerAndQueue(services, SuperSourceTier.SLOW)

  const webhookFast = makeSuperWebhookWorkerAndQueue(services, SuperSourceTier.FAST)
  const webhookSlow = makeSuperWebhookWorkerAndQueue(services, SuperSourceTier.SLOW)

  return {
    import: {
      [SuperSourceTier.FAST]: importFast,
      [SuperSourceTier.SLOW]: importSlow,
    },
    index: {
      [SuperSourceTier.FAST]: indexFast,
      [SuperSourceTier.SLOW]: indexSlow,
    },
    webhook: {
      [SuperSourceTier.FAST]: webhookFast,
      [SuperSourceTier.SLOW]: webhookSlow,
    },

    // Helper to get the right queue for a source
    getImportQueue(source: SuperSource) {
      return this.import[getTierForSource(source)]
    },
    getIndexQueue(source: SuperSource) {
      return this.index[getTierForSource(source)]
    },
    getWebhookQueue(source: SuperSource) {
      return this.webhook[getTierForSource(source)]
    },
  }
}

6. Scheduling Jobs

// When scheduling an import
const tier = getTierForSource(superSource)
await services.superQueues.import[tier].queue.schedule({
  superSource,
  superRootNodeType,
  organizationId,
  rootNodeId,
  rootNodeConnectionId,
  firstRun,
})

// Or using the helper
await services.superQueues.getImportQueue(superSource).queue.schedule({
  superSource,
  superRootNodeType,
  organizationId,
  rootNodeId,
  rootNodeConnectionId,
  firstRun,
})

Migration Path

Phase 1: Add New Queues

  1. Create the tier configuration
  2. Add the 6 new unified queues alongside existing ones
  3. Deploy and verify queues are created

Phase 2: Dual-Write

  1. Update scheduling code to write to both old and new queues
  2. Monitor both sets of queues
  3. Verify jobs complete successfully in new queues

Phase 3: Switch Over

  1. Stop writing to old queues
  2. Wait for old queues to drain
  3. Monitor new queues exclusively

Phase 4: Cleanup

  1. Remove old queue definitions
  2. Remove old worker code
  3. Update documentation

Benefits Summary

Metric Before After
Number of queues 20+ 6
Number of workers 20+ 6
Memory overhead High Low
Adding new source New workers Assign to tier
Per-source concurrency Yes Yes (via tiers)
Per-org isolation Yes Yes (via groups)
Monitoring complexity High Low

Adding a New Source

  1. Add the source to SUPER_SOURCE_TO_TIER:

    [SuperSource.NewSource]: SuperSourceTier.SLOW, // or FAST
  2. That's it. No new queues or workers needed.

Future Improvements

  1. Add a MEDIUM tier if needed for sources with moderate rate limits
  2. Dynamic tier assignment based on runtime metrics
  3. Per-source rate limiting using BullMQ Pro's setGroupRateLimit if needed

References

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment