Created
February 16, 2026 22:21
-
-
Save macguru/c01f1b1327e7ac8a0e257cab9fdf1d7f to your computer and use it in GitHub Desktop.
AsyncStream based observation sample
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import Foundation | |
| import Synchronization | |
| class Thing { | |
| var value: Int = 0 | |
| var observers: [UUID: AsyncStream<Int>.Continuation] = [:] | |
| func nextValue() async throws -> Int { | |
| let id = UUID() | |
| let stream = AsyncStream<Int> { continuation in | |
| observers[id] = continuation | |
| } | |
| defer { | |
| observers[id] = nil | |
| } | |
| for await value in stream { | |
| return value | |
| } | |
| throw CancellationError() | |
| } | |
| func update() { | |
| value += 1 | |
| for (_, observer) in observers { | |
| observer.yield(value) | |
| } | |
| } | |
| } | |
| let t = Thing() | |
| let updater = Task { | |
| try await Task.sleep(for: .seconds(2)) | |
| print("will update") | |
| t.update() | |
| print("did update") | |
| } | |
| let waiter1 = Task { | |
| try await withTaskCancellationHandler { | |
| let v = try await t.nextValue() | |
| print("waiter1 ended: \"\(v)\"") | |
| } onCancel: { | |
| print("waiter1 cancelled") | |
| } | |
| } | |
| let waiter2 = Task { | |
| try await withTaskCancellationHandler { | |
| let v = try await t.nextValue() | |
| print("waiter2 ended: \"\(v)\"") | |
| } onCancel: { | |
| print("waiter2 cancelled") | |
| } | |
| } | |
| try await Task.sleep(for: .milliseconds(10)) | |
| print("waiting:", t.observers.count) | |
| waiter2.cancel() | |
| print("waiting for update…") | |
| _ = try? await (updater.value, waiter1.value, waiter2.value) | |
| print("done") |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Part of my blog post on Certain and Uncertain Futures, to demonstrate a
AsyncStream-based observation principle that support multiple observers. Running will print: