Skip to content

Instantly share code, notes, and snippets.

@gvergnaud
Created November 8, 2025 17:57
Show Gist options
  • Select an option

  • Save gvergnaud/70fa279f1f06c66364c9a25a4eaabee2 to your computer and use it in GitHub Desktop.

Select an option

Save gvergnaud/70fa279f1f06c66364c9a25a4eaabee2 to your computer and use it in GitHub Desktop.
/**
* TIL that you can use try {} finally { cancelation() }
* inside a generator function and call `.return()` on the generator
* to run the cancelation of all generators currently running in the call stack!
*
* It means we can implement an Observable class with cancelation support
* all with only generators:
*/
class Observable<T> {
private onSubscribe: () => AsyncGenerator<T, void, void>;
constructor(onSubscribe: () => AsyncGenerator<T, void, void>) {
this.onSubscribe = onSubscribe;
}
iter(): AsyncGenerator<T, void, void> {
return this.onSubscribe();
}
map<U>(mapper: (value: T) => U): Observable<U> {
const self = this;
return new Observable(async function* () {
for await (const x of self.onSubscribe()) {
yield mapper(x);
}
});
}
filter<U extends T>(predicate: (value: T) => value is U): Observable<U> {
const self = this;
return new Observable(async function* () {
for await (const x of self.onSubscribe()) {
if (predicate(x)) yield x;
}
});
}
scan<U>(scanner: (acc: U, value: T) => U, init: U): Observable<U> {
const self = this;
return new Observable(async function* () {
let acc: U = init;
for await (const x of self.onSubscribe()) {
acc = scanner(acc, x);
yield acc;
}
});
}
}
const sleep = (ms: number) => new Promise((res) => setTimeout(res, ms));
const interval = (ms: number) =>
new Observable(async function* () {
try {
let i = 0;
while (true) {
await sleep(ms);
yield i++;
}
} finally {
console.log("cancel");
}
});
const interval2 = (ms: number) =>
new Observable(async function* () {
try {
yield* interval(ms).iter();
} finally {
console.log("cancel 2");
}
});
const fib = interval2(1000)
.scan(([a, b]) => [b, a + b], [0n, 1n])
.map(([, x]) => x);
const generator = fib.iter();
for await (const value of generator) {
console.log(value);
if (value > 100n) {
generator.return(); // logs: "cancel" and "cancel 2"
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment