Created
February 3, 2025 15:24
-
-
Save jacobwgillespie/94123ce79b52cdaf79fc49d4156b22bf to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| export type QueueEvent = keyof EventHandlers | |
| export type RemoveHandler = () => void | |
| export type ListenHandler<T> = (queue: Queue<T>) => void | RemoveHandler | |
| export interface EventIteratorOptions { | |
| highWaterMark: number | undefined | |
| lowWaterMark: number | undefined | |
| } | |
| export interface Queue<T> { | |
| push(value: T): void | |
| stop(): void | |
| fail(error: Error): void | |
| on<E extends QueueEvent>(event: E, fn: EventHandlers[E]): void | |
| } | |
| interface EventHandlers { | |
| highWater(): void | |
| lowWater(): void | |
| } | |
| interface AsyncResolver<T> { | |
| resolve: (res: IteratorResult<T>) => void | |
| reject: (err: Error) => void | |
| } | |
| // eslint-disable-next-line @typescript-eslint/no-empty-function | |
| const noop = () => {} | |
| class EventQueue<T> { | |
| highWaterMark: number | undefined | |
| lowWaterMark: number | undefined | |
| readonly pullQueue: Array<AsyncResolver<T>> = [] | |
| readonly pushQueue: Array<Promise<IteratorResult<T>>> = [] | |
| readonly eventHandlers: Partial<EventHandlers> = {} | |
| isPaused = false | |
| isStopped = false | |
| removeCallback?: RemoveHandler | |
| push(value: T): void { | |
| if (this.isStopped) return | |
| const resolution = {value, done: false} | |
| if (this.pullQueue.length) { | |
| const placeholder = this.pullQueue.shift() | |
| if (placeholder) placeholder.resolve(resolution) | |
| } else { | |
| this.pushQueue.push(Promise.resolve(resolution)) | |
| if (this.highWaterMark !== undefined && this.pushQueue.length >= this.highWaterMark && !this.isPaused) { | |
| this.isPaused = true | |
| if (this.eventHandlers.highWater) { | |
| this.eventHandlers.highWater() | |
| } else if (console) { | |
| console.warn(`EventIterator queue reached ${this.pushQueue.length} items`) | |
| } | |
| } | |
| } | |
| } | |
| stop(): void { | |
| if (this.isStopped) return | |
| this.isStopped = true | |
| this.remove() | |
| for (const placeholder of this.pullQueue) { | |
| placeholder.resolve({value: undefined, done: true}) | |
| } | |
| this.pullQueue.length = 0 | |
| } | |
| fail(error: Error): void { | |
| if (this.isStopped) return | |
| this.isStopped = true | |
| this.remove() | |
| if (this.pullQueue.length) { | |
| for (const placeholder of this.pullQueue) { | |
| placeholder.reject(error) | |
| } | |
| this.pullQueue.length = 0 | |
| } else { | |
| const rejection = Promise.reject(error) | |
| /* Attach error handler to avoid leaking an unhandled promise rejection. */ | |
| rejection.catch(noop) | |
| this.pushQueue.push(rejection) | |
| } | |
| } | |
| remove() { | |
| void Promise.resolve().then(() => { | |
| if (this.removeCallback) this.removeCallback() | |
| }) | |
| } | |
| [Symbol.asyncIterator](): AsyncIterator<T> { | |
| return { | |
| next: () => { | |
| const result = this.pushQueue.shift() | |
| if (result) { | |
| if (this.lowWaterMark !== undefined && this.pushQueue.length <= this.lowWaterMark && this.isPaused) { | |
| this.isPaused = false | |
| if (this.eventHandlers.lowWater) { | |
| this.eventHandlers.lowWater() | |
| } | |
| } | |
| return result | |
| } else if (this.isStopped) { | |
| return Promise.resolve({value: undefined, done: true}) | |
| } else { | |
| return new Promise((resolve, reject) => { | |
| this.pullQueue.push({resolve, reject}) | |
| }) | |
| } | |
| }, | |
| return: () => { | |
| this.isStopped = true | |
| this.pushQueue.length = 0 | |
| this.remove() | |
| return Promise.resolve({value: undefined, done: true}) | |
| }, | |
| } | |
| } | |
| } | |
| export class EventIterator<T> implements AsyncIterable<T> { | |
| private queue = new EventQueue<T>() | |
| constructor(listen: ListenHandler<T>, {highWaterMark = 100, lowWaterMark = 1}: Partial<EventIteratorOptions> = {}) { | |
| this.queue.highWaterMark = highWaterMark | |
| this.queue.lowWaterMark = lowWaterMark | |
| this.queue.removeCallback = | |
| listen({ | |
| push: (value) => this.queue.push(value), | |
| stop: () => this.queue.stop(), | |
| fail: (error) => this.queue.fail(error), | |
| on: (event, fn) => { | |
| this.queue.eventHandlers[event] = fn | |
| }, | |
| }) ?? noop | |
| } | |
| [Symbol.asyncIterator](): AsyncIterator<T> { | |
| return this.queue[Symbol.asyncIterator]() | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment