Skip to content

Instantly share code, notes, and snippets.

@jmatsu
Last active September 12, 2025 16:25
Show Gist options
  • Select an option

  • Save jmatsu/ba4bb5cc269a89710e7031c18313a9d1 to your computer and use it in GitHub Desktop.

Select an option

Save jmatsu/ba4bb5cc269a89710e7031c18313a9d1 to your computer and use it in GitHub Desktop.
import app.cash.turbine.test
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.AbstractFlow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.filterNotNull
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withTimeoutOrNull
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.fail
import org.junit.jupiter.api.Test
import kotlin.coroutines.cancellation.CancellationException
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
class SampleTest {
@Test
fun `stateIn on a completable flow can pass this test`(): Unit = runTest {
val upstream = flowOf(0)
upstream.stateIn(this)
.takeWhile { false }
.collect()
}
@Test
fun `stateIn on a non-completable flow with TestScope causes timeout`() {
val testScope = TestScope()
try {
testScope.runTest(timeout = 1.seconds) {
val upstream = MutableStateFlow(0)
upstream.stateIn(this)
.takeWhile { false }
.collect()
}
fail("test should fail due to timeout")
} catch (e: AssertionError) {
if (e.javaClass.simpleName != "UncompletedCoroutinesError") throw e
// Pass only if the test TestScope raises a timeout error
}
}
@Test
fun `stateIn on a non-completable flow with backgroundScope pass this test`(): Unit =
runTest {
val upstream = MutableStateFlow(0)
upstream.stateIn(backgroundScope)
.takeWhile { false }
.collect()
}
@Test
fun `test a completable flow by turbine`() = runTest {
flowOf("one", "two").test {
assertEquals("one", awaitItem())
assertEquals("two", awaitItem())
awaitComplete()
}
}
@Test
fun `test a completable flow by MutableSharedFlow`() = runTest {
val buffer = MutableSharedFlow<String>()
val completionException = CancellationException("expected")
val assertionJob = async(start = CoroutineStart.UNDISPATCHED) {
try {
assertEquals("one", buffer.first())
assertEquals("two", buffer.first())
buffer.first()
fail("Never receive three elements")
} catch (e: CancellationException) {
throw CancellationException("Unexpected cancellation", e)
}
}
flowOf("one", "two")
.onCompletion { cause ->
val e = cause?.let { CancellationException("upstream failed", it) }
assertionJob.cancel(e ?: completionException)
}
.collect {
buffer.emit(it)
}
try {
assertionJob.await()
fail("assertionJob must be canceled")
} catch (e: CancellationException) {
assertEquals(completionException.message, e.message)
}
}
@Test
fun `a test template for a custom operator`() = runTest {
val expected = "one"
val channel = Channel<Either<Unit, String>>(Channel.UNLIMITED)
channel.receiveAsFlow()
.takeWhile { it.isRight }
.map { it.right() }
.customOperator(scope = backgroundScope)
.test {
channel.send(Either.Right("one"))
assertEquals(expected, awaitItem())
channel.send(Either.Left(Unit))
awaitComplete()
}
}
@Test
fun `Implementation difference led to a wrong test`() = runTest {
assertEquals(
true,
"Didn't receive" in stringifyImplementationDifference(stringRepository = ProdStringRepository())
)
assertEquals(
true,
"Received" in stringifyImplementationDifference(stringRepository = TestStringRepository())
)
}
private suspend fun TestScope.stringifyImplementationDifference(
stringRepository: StringRepository
): String {
val stringChannel = Channel<String>(Channel.UNLIMITED)
// Use LAZY to emulate a race condition anyway
val deferred = async(context = Dispatchers.IO, start = CoroutineStart.LAZY) {
withTimeoutOrNull(500.milliseconds) {
stringRepository.strings
.collect {
stringChannel.send(it)
}
}
}
stringRepository.update("one")
deferred.await()
val result = stringChannel.tryReceive().also { require(!it.isClosed) }
if (result.isSuccess) {
assertEquals("one", result.getOrThrow())
return "Received a value was sent while there is no collector"
} else {
return "Didn't receive a value was sent while there is no collector"
}
}
private interface StringRepository {
val strings: Flow<String>
fun update(value: String)
}
private class ProdStringRepository : StringRepository {
private val _strings = MutableSharedFlow<String>()
override val strings: Flow<String> = _strings.filter { true }
override fun update(value: String) {
_strings.tryEmit(value)
}
}
private class TestStringRepository : StringRepository {
private val _strings = MutableStateFlow<String?>(null)
override val strings: Flow<String> = _strings.filterNotNull()
override fun update(value: String) {
_strings.tryEmit(value)
}
}
private fun <T> Flow<T>.customOperator(scope: CoroutineScope): Flow<T> {
val job = scope.launch {
collect {
println(it)
}
}
return CustomFlow(upstream = this, job = job)
}
@OptIn(ExperimentalCoroutinesApi::class)
private class CustomFlow<T>(
private val upstream: Flow<T>,
@Suppress("unused") private val job: Job
) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
upstream.collect(collector)
}
}
private sealed class Either<L, R> {
abstract val isLeft: Boolean
abstract val isRight: Boolean
fun left(): L = (this as Left<L, R>).value
fun right(): R = (this as Right<L, R>).value
data class Left<L, R>(val value: L) : Either<L, R>() {
override val isLeft = true
override val isRight = false
}
data class Right<L, R>(val value: R) : Either<L, R>() {
override val isLeft = false
override val isRight = true
}
}
}

Comments are disabled for this gist.