Last active
September 12, 2025 16:25
-
-
Save jmatsu/ba4bb5cc269a89710e7031c18313a9d1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import app.cash.turbine.test | |
| import kotlinx.coroutines.CoroutineScope | |
| import kotlinx.coroutines.CoroutineStart | |
| import kotlinx.coroutines.Dispatchers | |
| import kotlinx.coroutines.ExperimentalCoroutinesApi | |
| import kotlinx.coroutines.Job | |
| import kotlinx.coroutines.async | |
| import kotlinx.coroutines.channels.Channel | |
| import kotlinx.coroutines.flow.AbstractFlow | |
| import kotlinx.coroutines.flow.Flow | |
| import kotlinx.coroutines.flow.FlowCollector | |
| import kotlinx.coroutines.flow.MutableSharedFlow | |
| import kotlinx.coroutines.flow.MutableStateFlow | |
| import kotlinx.coroutines.flow.collect | |
| import kotlinx.coroutines.flow.filter | |
| import kotlinx.coroutines.flow.filterNotNull | |
| import kotlinx.coroutines.flow.first | |
| import kotlinx.coroutines.flow.flowOf | |
| import kotlinx.coroutines.flow.map | |
| import kotlinx.coroutines.flow.onCompletion | |
| import kotlinx.coroutines.flow.receiveAsFlow | |
| import kotlinx.coroutines.flow.stateIn | |
| import kotlinx.coroutines.flow.takeWhile | |
| import kotlinx.coroutines.launch | |
| import kotlinx.coroutines.test.TestScope | |
| import kotlinx.coroutines.test.runTest | |
| import kotlinx.coroutines.withTimeoutOrNull | |
| import org.junit.jupiter.api.Assertions.assertEquals | |
| import org.junit.jupiter.api.Assertions.fail | |
| import org.junit.jupiter.api.Test | |
| import kotlin.coroutines.cancellation.CancellationException | |
| import kotlin.time.Duration.Companion.milliseconds | |
| import kotlin.time.Duration.Companion.seconds | |
| class SampleTest { | |
| @Test | |
| fun `stateIn on a completable flow can pass this test`(): Unit = runTest { | |
| val upstream = flowOf(0) | |
| upstream.stateIn(this) | |
| .takeWhile { false } | |
| .collect() | |
| } | |
| @Test | |
| fun `stateIn on a non-completable flow with TestScope causes timeout`() { | |
| val testScope = TestScope() | |
| try { | |
| testScope.runTest(timeout = 1.seconds) { | |
| val upstream = MutableStateFlow(0) | |
| upstream.stateIn(this) | |
| .takeWhile { false } | |
| .collect() | |
| } | |
| fail("test should fail due to timeout") | |
| } catch (e: AssertionError) { | |
| if (e.javaClass.simpleName != "UncompletedCoroutinesError") throw e | |
| // Pass only if the test TestScope raises a timeout error | |
| } | |
| } | |
| @Test | |
| fun `stateIn on a non-completable flow with backgroundScope pass this test`(): Unit = | |
| runTest { | |
| val upstream = MutableStateFlow(0) | |
| upstream.stateIn(backgroundScope) | |
| .takeWhile { false } | |
| .collect() | |
| } | |
| @Test | |
| fun `test a completable flow by turbine`() = runTest { | |
| flowOf("one", "two").test { | |
| assertEquals("one", awaitItem()) | |
| assertEquals("two", awaitItem()) | |
| awaitComplete() | |
| } | |
| } | |
| @Test | |
| fun `test a completable flow by MutableSharedFlow`() = runTest { | |
| val buffer = MutableSharedFlow<String>() | |
| val completionException = CancellationException("expected") | |
| val assertionJob = async(start = CoroutineStart.UNDISPATCHED) { | |
| try { | |
| assertEquals("one", buffer.first()) | |
| assertEquals("two", buffer.first()) | |
| buffer.first() | |
| fail("Never receive three elements") | |
| } catch (e: CancellationException) { | |
| throw CancellationException("Unexpected cancellation", e) | |
| } | |
| } | |
| flowOf("one", "two") | |
| .onCompletion { cause -> | |
| val e = cause?.let { CancellationException("upstream failed", it) } | |
| assertionJob.cancel(e ?: completionException) | |
| } | |
| .collect { | |
| buffer.emit(it) | |
| } | |
| try { | |
| assertionJob.await() | |
| fail("assertionJob must be canceled") | |
| } catch (e: CancellationException) { | |
| assertEquals(completionException.message, e.message) | |
| } | |
| } | |
| @Test | |
| fun `a test template for a custom operator`() = runTest { | |
| val expected = "one" | |
| val channel = Channel<Either<Unit, String>>(Channel.UNLIMITED) | |
| channel.receiveAsFlow() | |
| .takeWhile { it.isRight } | |
| .map { it.right() } | |
| .customOperator(scope = backgroundScope) | |
| .test { | |
| channel.send(Either.Right("one")) | |
| assertEquals(expected, awaitItem()) | |
| channel.send(Either.Left(Unit)) | |
| awaitComplete() | |
| } | |
| } | |
| @Test | |
| fun `Implementation difference led to a wrong test`() = runTest { | |
| assertEquals( | |
| true, | |
| "Didn't receive" in stringifyImplementationDifference(stringRepository = ProdStringRepository()) | |
| ) | |
| assertEquals( | |
| true, | |
| "Received" in stringifyImplementationDifference(stringRepository = TestStringRepository()) | |
| ) | |
| } | |
| private suspend fun TestScope.stringifyImplementationDifference( | |
| stringRepository: StringRepository | |
| ): String { | |
| val stringChannel = Channel<String>(Channel.UNLIMITED) | |
| // Use LAZY to emulate a race condition anyway | |
| val deferred = async(context = Dispatchers.IO, start = CoroutineStart.LAZY) { | |
| withTimeoutOrNull(500.milliseconds) { | |
| stringRepository.strings | |
| .collect { | |
| stringChannel.send(it) | |
| } | |
| } | |
| } | |
| stringRepository.update("one") | |
| deferred.await() | |
| val result = stringChannel.tryReceive().also { require(!it.isClosed) } | |
| if (result.isSuccess) { | |
| assertEquals("one", result.getOrThrow()) | |
| return "Received a value was sent while there is no collector" | |
| } else { | |
| return "Didn't receive a value was sent while there is no collector" | |
| } | |
| } | |
| private interface StringRepository { | |
| val strings: Flow<String> | |
| fun update(value: String) | |
| } | |
| private class ProdStringRepository : StringRepository { | |
| private val _strings = MutableSharedFlow<String>() | |
| override val strings: Flow<String> = _strings.filter { true } | |
| override fun update(value: String) { | |
| _strings.tryEmit(value) | |
| } | |
| } | |
| private class TestStringRepository : StringRepository { | |
| private val _strings = MutableStateFlow<String?>(null) | |
| override val strings: Flow<String> = _strings.filterNotNull() | |
| override fun update(value: String) { | |
| _strings.tryEmit(value) | |
| } | |
| } | |
| private fun <T> Flow<T>.customOperator(scope: CoroutineScope): Flow<T> { | |
| val job = scope.launch { | |
| collect { | |
| println(it) | |
| } | |
| } | |
| return CustomFlow(upstream = this, job = job) | |
| } | |
| @OptIn(ExperimentalCoroutinesApi::class) | |
| private class CustomFlow<T>( | |
| private val upstream: Flow<T>, | |
| @Suppress("unused") private val job: Job | |
| ) : AbstractFlow<T>() { | |
| override suspend fun collectSafely(collector: FlowCollector<T>) { | |
| upstream.collect(collector) | |
| } | |
| } | |
| private sealed class Either<L, R> { | |
| abstract val isLeft: Boolean | |
| abstract val isRight: Boolean | |
| fun left(): L = (this as Left<L, R>).value | |
| fun right(): R = (this as Right<L, R>).value | |
| data class Left<L, R>(val value: L) : Either<L, R>() { | |
| override val isLeft = true | |
| override val isRight = false | |
| } | |
| data class Right<L, R>(val value: R) : Either<L, R>() { | |
| override val isLeft = false | |
| override val isRight = true | |
| } | |
| } | |
| } |
Comments are disabled for this gist.