Skip to content

Instantly share code, notes, and snippets.

@esnssr
Created February 24, 2026 13:51
Show Gist options
  • Select an option

  • Save esnssr/da8b459f1c6a25766a8499b9821953a5 to your computer and use it in GitHub Desktop.

Select an option

Save esnssr/da8b459f1c6a25766a8499b9821953a5 to your computer and use it in GitHub Desktop.
`AsyncStream` implementation from Combine Publisher
import Foundation
import Combine
extension AnyCancellable: @retroactive @unchecked Sendable {}
extension CurrentValueSubject: @retroactive @unchecked Sendable {}
public extension Publisher where Failure == Never, Output: Sendable {
var stream: AsyncStream<Output> {
AsyncStream { continuation in
let cancellable = self.sink { completion in
continuation.finish()
} receiveValue: { value in
continuation.yield(value)
}
continuation.onTermination = { continuation in
cancellable.cancel()
}
}
}
}
public extension Publisher where Failure: Error, Output: Sendable {
var stream: AsyncThrowingStream<Output, Error> {
AsyncThrowingStream<Output, Error> { continuation in
let cancellable = self.sink { completion in
switch completion {
case .finished:
continuation.finish()
case .failure(let error):
continuation.finish(throwing: error)
}
} receiveValue: { value in
continuation.yield(value)
}
continuation.onTermination = { continuation in
cancellable.cancel()
}
}
}
}

Combine has a .values property that turns publishers into AsyncSequence. unfortunately, it has a race condition bug that can cause values to be dropped.

the following implementation uses async streams as a replacement.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment