Skip to content

Instantly share code, notes, and snippets.

@dylangrijalva
Created July 13, 2025 00:21
Show Gist options
  • Select an option

  • Save dylangrijalva/4e74691c317ce334c697e3503fca7c62 to your computer and use it in GitHub Desktop.

Select an option

Save dylangrijalva/4e74691c317ce334c697e3503fca7c62 to your computer and use it in GitHub Desktop.
BullMQ - Pub & Sub service classes
import { Job, Worker } from "bullmq";
import type Redis from "ioredis";
import { KEY_QUEUE_DEFAULT } from "../config/constants";
import { Logger } from "../config/logger";
import type { Event, EventHandler, EventSubscriber } from "./events";
export class EventProcessor {
readonly worker: Worker;
private readonly handlers: Map<string, EventHandler<any>>;
private readonly logger: Logger;
constructor(deps: { redis: Redis }) {
const { redis } = deps;
this.worker = new Worker(KEY_QUEUE_DEFAULT, this.handleEvent, {
connection: redis,
autorun: false,
});
this.handlers = new Map();
this.logger = new Logger(this.constructor.name);
}
subscribe<TPayload extends {}, TEvent extends Event<TPayload>>(subscriber: EventSubscriber<TEvent>): void {
if (this.handlers.has(subscriber.topic)) {
this.logger.warn(`Handler for topic ${subscriber.topic} already exists. Overwriting.`);
}
this.handlers.set(subscriber.topic, subscriber.execute.bind(subscriber));
this.logger.info(`Subscribed to event topic: ${subscriber.topic}`);
}
async start(): Promise<void> {
this.logger.info("Starting event processor worker...");
await this.worker.run();
}
async stop(): Promise<void> {
this.logger.info("Stopping event processor worker...");
await this.worker.close();
}
private async handleEvent(job: Job) {
const { topic, payload } = job.data as Event<any>;
const handler = this.handlers.get(topic);
if (!handler) {
this.logger.warn(`No handler found for event topic: ${topic}`);
return;
}
await handler({ topic, payload } as any);
}
}
import { Queue } from "bullmq";
import type Redis from "ioredis";
import { KEY_QUEUE_DEFAULT } from "../config/constants";
import { Logger } from "../config/logger";
import { type Event } from "./events";
export class EventPublisher {
private readonly queue: Queue;
private readonly logger: Logger;
constructor(deps: { redis: Redis }) {
const { redis } = deps;
this.queue = new Queue(KEY_QUEUE_DEFAULT, { connection: redis });
this.logger = new Logger(this.constructor.name);
}
async publish<T extends {}>(event: Event<T>): Promise<void> {
const job = await this.queue.add(KEY_QUEUE_DEFAULT, event);
this.logger.debug(`Even published successfully (${job.id})`);
}
}
export type Event<T extends {}> = {
topic: string;
payload: T;
};
export type EventHandler<T extends Event<any>> = (event: T) => Promise<void>;
export abstract class EventSubscriber<T extends Event<any>> {
abstract readonly topic: T["topic"];
abstract execute(event: T): Promise<void>;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment