Skip to content

Instantly share code, notes, and snippets.

@jacobwgillespie
Created February 3, 2025 15:24
Show Gist options
  • Select an option

  • Save jacobwgillespie/94123ce79b52cdaf79fc49d4156b22bf to your computer and use it in GitHub Desktop.

Select an option

Save jacobwgillespie/94123ce79b52cdaf79fc49d4156b22bf to your computer and use it in GitHub Desktop.
export type QueueEvent = keyof EventHandlers
export type RemoveHandler = () => void
export type ListenHandler<T> = (queue: Queue<T>) => void | RemoveHandler
export interface EventIteratorOptions {
highWaterMark: number | undefined
lowWaterMark: number | undefined
}
export interface Queue<T> {
push(value: T): void
stop(): void
fail(error: Error): void
on<E extends QueueEvent>(event: E, fn: EventHandlers[E]): void
}
interface EventHandlers {
highWater(): void
lowWater(): void
}
interface AsyncResolver<T> {
resolve: (res: IteratorResult<T>) => void
reject: (err: Error) => void
}
// eslint-disable-next-line @typescript-eslint/no-empty-function
const noop = () => {}
class EventQueue<T> {
highWaterMark: number | undefined
lowWaterMark: number | undefined
readonly pullQueue: Array<AsyncResolver<T>> = []
readonly pushQueue: Array<Promise<IteratorResult<T>>> = []
readonly eventHandlers: Partial<EventHandlers> = {}
isPaused = false
isStopped = false
removeCallback?: RemoveHandler
push(value: T): void {
if (this.isStopped) return
const resolution = {value, done: false}
if (this.pullQueue.length) {
const placeholder = this.pullQueue.shift()
if (placeholder) placeholder.resolve(resolution)
} else {
this.pushQueue.push(Promise.resolve(resolution))
if (this.highWaterMark !== undefined && this.pushQueue.length >= this.highWaterMark && !this.isPaused) {
this.isPaused = true
if (this.eventHandlers.highWater) {
this.eventHandlers.highWater()
} else if (console) {
console.warn(`EventIterator queue reached ${this.pushQueue.length} items`)
}
}
}
}
stop(): void {
if (this.isStopped) return
this.isStopped = true
this.remove()
for (const placeholder of this.pullQueue) {
placeholder.resolve({value: undefined, done: true})
}
this.pullQueue.length = 0
}
fail(error: Error): void {
if (this.isStopped) return
this.isStopped = true
this.remove()
if (this.pullQueue.length) {
for (const placeholder of this.pullQueue) {
placeholder.reject(error)
}
this.pullQueue.length = 0
} else {
const rejection = Promise.reject(error)
/* Attach error handler to avoid leaking an unhandled promise rejection. */
rejection.catch(noop)
this.pushQueue.push(rejection)
}
}
remove() {
void Promise.resolve().then(() => {
if (this.removeCallback) this.removeCallback()
})
}
[Symbol.asyncIterator](): AsyncIterator<T> {
return {
next: () => {
const result = this.pushQueue.shift()
if (result) {
if (this.lowWaterMark !== undefined && this.pushQueue.length <= this.lowWaterMark && this.isPaused) {
this.isPaused = false
if (this.eventHandlers.lowWater) {
this.eventHandlers.lowWater()
}
}
return result
} else if (this.isStopped) {
return Promise.resolve({value: undefined, done: true})
} else {
return new Promise((resolve, reject) => {
this.pullQueue.push({resolve, reject})
})
}
},
return: () => {
this.isStopped = true
this.pushQueue.length = 0
this.remove()
return Promise.resolve({value: undefined, done: true})
},
}
}
}
export class EventIterator<T> implements AsyncIterable<T> {
private queue = new EventQueue<T>()
constructor(listen: ListenHandler<T>, {highWaterMark = 100, lowWaterMark = 1}: Partial<EventIteratorOptions> = {}) {
this.queue.highWaterMark = highWaterMark
this.queue.lowWaterMark = lowWaterMark
this.queue.removeCallback =
listen({
push: (value) => this.queue.push(value),
stop: () => this.queue.stop(),
fail: (error) => this.queue.fail(error),
on: (event, fn) => {
this.queue.eventHandlers[event] = fn
},
}) ?? noop
}
[Symbol.asyncIterator](): AsyncIterator<T> {
return this.queue[Symbol.asyncIterator]()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment