Skip to content

Instantly share code, notes, and snippets.

@mimshins
Last active January 7, 2026 14:55
Show Gist options
  • Select an option

  • Save mimshins/e5a134e8590c023f18669b2045d31a9d to your computer and use it in GitHub Desktop.

Select an option

Save mimshins/e5a134e8590c023f18669b2045d31a9d to your computer and use it in GitHub Desktop.
Create streams lazily when they are read from or written to.
import {
PassThrough,
type Readable as NodeReadable,
type Writable as NodeWritable,
type TransformOptions,
} from "node:stream";
/**
* Factory function type for creating readable streams
*/
export type ReadableFactory = (options?: TransformOptions) => NodeReadable;
/**
* Factory function type for creating writable streams
*/
export type WritableFactory = (options?: TransformOptions) => NodeWritable;
/**
* Stream options type alias for convenience
*/
export type StreamOptions = TransformOptions;
/**
* Patches a method to execute a callback before the first call
*
* @param instance The object instance to patch
* @param method The method name to patch
* @param callback The callback to execute before first call
*/
const beforeFirstCall = <T extends PassThrough, K extends keyof T>(
instance: T,
method: K,
callback: (...args: unknown[]) => void,
): void => {
type InstanceMethod = (...args: unknown[]) => unknown;
// Store reference to the original method
const original = instance[method] as InstanceMethod;
// Replace method with interceptor wrapper
(instance[method] as InstanceMethod) = function (
this: T,
...args: unknown[]
): unknown {
// Remove the wrapper (self-destruct pattern)
delete instance[method];
// Restore the original method
instance[method] = original as T[K];
// Execute the "before first call" callback
callback.apply(this, args);
// Call the original method with the same arguments
return (instance[method] as InstanceMethod).apply(this, args);
};
};
/**
* Lazy readable stream that creates the source stream only when first read
*/
export class Readable extends PassThrough {
/**
* Creates a new lazy readable stream
*
* @param factory Function that creates the source readable stream
* @param options Stream options
*/
constructor(factory: ReadableFactory, options?: StreamOptions) {
super(options);
beforeFirstCall(this, "_read", () => {
// Only create the source stream when first read operation occurs
const source = factory.call(this, options);
source.on("error", (error: Error) => this.emit("error", error));
source.pipe(this);
});
this.emit("readable");
}
}
/**
* Lazy writable stream that creates the destination stream only when first written
*/
export class Writable extends PassThrough {
/**
* Creates a new lazy writable stream
*
* @param factory Function that creates the destination writable stream
* @param options Stream options
*/
constructor(factory: WritableFactory, options?: StreamOptions) {
super(options);
beforeFirstCall(this, "_write", () => {
// Only create the destination stream when first write operation occurs
const destination = factory.call(this, options);
destination.on("error", (error: Error) => this.emit("error", error));
this.pipe(destination);
});
this.emit("writable");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment