Last active
January 7, 2026 14:55
-
-
Save mimshins/e5a134e8590c023f18669b2045d31a9d to your computer and use it in GitHub Desktop.
Create streams lazily when they are read from or written to.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { | |
| PassThrough, | |
| type Readable as NodeReadable, | |
| type Writable as NodeWritable, | |
| type TransformOptions, | |
| } from "node:stream"; | |
| /** | |
| * Factory function type for creating readable streams | |
| */ | |
| export type ReadableFactory = (options?: TransformOptions) => NodeReadable; | |
| /** | |
| * Factory function type for creating writable streams | |
| */ | |
| export type WritableFactory = (options?: TransformOptions) => NodeWritable; | |
| /** | |
| * Stream options type alias for convenience | |
| */ | |
| export type StreamOptions = TransformOptions; | |
| /** | |
| * Patches a method to execute a callback before the first call | |
| * | |
| * @param instance The object instance to patch | |
| * @param method The method name to patch | |
| * @param callback The callback to execute before first call | |
| */ | |
| const beforeFirstCall = <T extends PassThrough, K extends keyof T>( | |
| instance: T, | |
| method: K, | |
| callback: (...args: unknown[]) => void, | |
| ): void => { | |
| type InstanceMethod = (...args: unknown[]) => unknown; | |
| // Store reference to the original method | |
| const original = instance[method] as InstanceMethod; | |
| // Replace method with interceptor wrapper | |
| (instance[method] as InstanceMethod) = function ( | |
| this: T, | |
| ...args: unknown[] | |
| ): unknown { | |
| // Remove the wrapper (self-destruct pattern) | |
| delete instance[method]; | |
| // Restore the original method | |
| instance[method] = original as T[K]; | |
| // Execute the "before first call" callback | |
| callback.apply(this, args); | |
| // Call the original method with the same arguments | |
| return (instance[method] as InstanceMethod).apply(this, args); | |
| }; | |
| }; | |
| /** | |
| * Lazy readable stream that creates the source stream only when first read | |
| */ | |
| export class Readable extends PassThrough { | |
| /** | |
| * Creates a new lazy readable stream | |
| * | |
| * @param factory Function that creates the source readable stream | |
| * @param options Stream options | |
| */ | |
| constructor(factory: ReadableFactory, options?: StreamOptions) { | |
| super(options); | |
| beforeFirstCall(this, "_read", () => { | |
| // Only create the source stream when first read operation occurs | |
| const source = factory.call(this, options); | |
| source.on("error", (error: Error) => this.emit("error", error)); | |
| source.pipe(this); | |
| }); | |
| this.emit("readable"); | |
| } | |
| } | |
| /** | |
| * Lazy writable stream that creates the destination stream only when first written | |
| */ | |
| export class Writable extends PassThrough { | |
| /** | |
| * Creates a new lazy writable stream | |
| * | |
| * @param factory Function that creates the destination writable stream | |
| * @param options Stream options | |
| */ | |
| constructor(factory: WritableFactory, options?: StreamOptions) { | |
| super(options); | |
| beforeFirstCall(this, "_write", () => { | |
| // Only create the destination stream when first write operation occurs | |
| const destination = factory.call(this, options); | |
| destination.on("error", (error: Error) => this.emit("error", error)); | |
| this.pipe(destination); | |
| }); | |
| this.emit("writable"); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment