Created
November 8, 2025 17:57
-
-
Save gvergnaud/70fa279f1f06c66364c9a25a4eaabee2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * TIL that you can use try {} finally { cancelation() } | |
| * inside a generator function and call `.return()` on the generator | |
| * to run the cancelation of all generators currently running in the call stack! | |
| * | |
| * It means we can implement an Observable class with cancelation support | |
| * all with only generators: | |
| */ | |
| class Observable<T> { | |
| private onSubscribe: () => AsyncGenerator<T, void, void>; | |
| constructor(onSubscribe: () => AsyncGenerator<T, void, void>) { | |
| this.onSubscribe = onSubscribe; | |
| } | |
| iter(): AsyncGenerator<T, void, void> { | |
| return this.onSubscribe(); | |
| } | |
| map<U>(mapper: (value: T) => U): Observable<U> { | |
| const self = this; | |
| return new Observable(async function* () { | |
| for await (const x of self.onSubscribe()) { | |
| yield mapper(x); | |
| } | |
| }); | |
| } | |
| filter<U extends T>(predicate: (value: T) => value is U): Observable<U> { | |
| const self = this; | |
| return new Observable(async function* () { | |
| for await (const x of self.onSubscribe()) { | |
| if (predicate(x)) yield x; | |
| } | |
| }); | |
| } | |
| scan<U>(scanner: (acc: U, value: T) => U, init: U): Observable<U> { | |
| const self = this; | |
| return new Observable(async function* () { | |
| let acc: U = init; | |
| for await (const x of self.onSubscribe()) { | |
| acc = scanner(acc, x); | |
| yield acc; | |
| } | |
| }); | |
| } | |
| } | |
| const sleep = (ms: number) => new Promise((res) => setTimeout(res, ms)); | |
| const interval = (ms: number) => | |
| new Observable(async function* () { | |
| try { | |
| let i = 0; | |
| while (true) { | |
| await sleep(ms); | |
| yield i++; | |
| } | |
| } finally { | |
| console.log("cancel"); | |
| } | |
| }); | |
| const interval2 = (ms: number) => | |
| new Observable(async function* () { | |
| try { | |
| yield* interval(ms).iter(); | |
| } finally { | |
| console.log("cancel 2"); | |
| } | |
| }); | |
| const fib = interval2(1000) | |
| .scan(([a, b]) => [b, a + b], [0n, 1n]) | |
| .map(([, x]) => x); | |
| const generator = fib.iter(); | |
| for await (const value of generator) { | |
| console.log(value); | |
| if (value > 100n) { | |
| generator.return(); // logs: "cancel" and "cancel 2" | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment