Skip to content

Instantly share code, notes, and snippets.

@FeernandoOFF
Created August 19, 2025 10:39
Show Gist options
  • Select an option

  • Save FeernandoOFF/9880b5d4dc6cece1ab303cbfca182329 to your computer and use it in GitHub Desktop.

Select an option

Save FeernandoOFF/9880b5d4dc6cece1ab303cbfca182329 to your computer and use it in GitHub Desktop.
Experimenting with different Kotlin flows and its use cases
Display the source blob
Display the rendered blob
Raw
{
"cells" : [ {
"metadata" : {
"ExecuteTime" : {
"end_time" : "2025-08-19T09:50:13.080599Z",
"start_time" : "2025-08-19T09:50:06.138568Z"
}
},
"cell_type" : "code",
"source" : "%use coroutines\n",
"id" : "204ec5c3197a8199",
"outputs" : [ ],
"execution_count" : 2
}, {
"metadata" : { },
"cell_type" : "markdown",
"source" : [ "## Flows\n", "Flows are a new way to express asynchronous data streams in Kotlin. And they depend on the kotlinx.coroutines library. (note import)\n", "\n", "There are two types of flows: Cold Flows and Hot Flows; The main difference is that Cold flows require a collector and always execute the same logic each time they're collected,\n", "where as Hot Flows can emit values at any point in time regardless of the collector.\n", "> In this Notebook, we will explore all types of flows" ],
"id" : "c86ff696b7a100ef"
}, {
"metadata" : { },
"cell_type" : "markdown",
"source" : [ "## Cold Flows\n", "There are a couple of cold flow builders, but they all work with the `Flow` type\n", "Overview of Flow builders:\n", "- `Flow`\n", "- `ChannelFlow`\n", "- `CallbackFlow`" ],
"id" : "7938834c4b634178"
}, {
"metadata" : {
"ExecuteTime" : {
"end_time" : "2025-08-19T09:50:13.083137Z",
"start_time" : "2025-08-19T09:50:06.440871Z"
}
},
"cell_type" : "code",
"source" : [ "val myFlow = flow {\n", " emit(\"Hello World\")\n", " emit(\"Hola mundo\")\n", "}.onEach { println(it) }\n", "// .launchIn(GlobalScope) // launchIn is a collector\n", "\n", "val listToFlow = listOf(1, 2, 3, 4, 5).asFlow()\n", "val sequenceToFlow = sequenceOf(1, 2, 3, 4, 5).asFlow()\n", "val flowOf = flowOf(1, 2, 3, 4, 5)\n", "val multiTypeFlowOF = flowOf(1, \"Hello\", 3, \"World\", 5)\n", "\n", "// Needs to be on a coroutine scope to collect\n", "//runBlocking {\n", "// multiTypeFlowOF.collect { println(it) }\n", "//}\n", "\n" ],
"id" : "6a25d1a6285d83a2",
"outputs" : [ ],
"execution_count" : 3
}, {
"metadata" : { },
"cell_type" : "markdown",
"source" : [ "#### Channel Flow\n", "(not to be confused with `kotlinx.coroutines.channels.Channel`)\n", "Channels are a way to communicate between coroutines and multi-source emission.\n", "\n", "Use cases:\n", "- For passing data trough different scopes i.e. `Dispatchers.IO` to `Dispatchers.Main`\n", "- Breaking logic into multiple concurrent tasks and bringing their results together as a reactive stream" ],
"id" : "b06dcccf864164eb"
}, {
"metadata" : {
"collapsed" : true,
"ExecuteTime" : {
"end_time" : "2025-08-19T09:50:13.083654Z",
"start_time" : "2025-08-19T09:50:06.658007Z"
}
},
"cell_type" : "code",
"source" : [ "import kotlinx.coroutines.channels.awaitClose\n", "import java.io.BufferedReader\n", "import java.io.File\n", "import java.io.FileReader\n", "\n", "\n", "// Channels are great to communicate between contexts\n", "channelFlow {\n", " launch {\n", " send(\"Hello World\")\n", " send(\"Hola mundo\")\n", " }.invokeOnCompletion { close() }\n", "\n", " // We can listen for close\n", " awaitClose { println(\"Channel closed\") }\n", "}.onEach { println(it) }\n", "// .launchIn(GlobalScope)\n", "\n", "// Examples i.e. Reading from a file and if it gets cancelled, it cancells the reading process\n", "fun readFile(file: File): Flow<String> = callbackFlow {\n", " val reader = withContext(Dispatchers.IO) {\n", " BufferedReader(FileReader(file))\n", " }\n", " val job = launch(Dispatchers.IO) {\n", " reader.useLines { lines -> lines.forEach { send(it) } }\n", " close()\n", " }\n", " awaitClose {\n", " job.cancel();\n", " reader.close()\n", " }\n", "}\n", "//runBlocking {\n", "// readFile(File(\"README.md\"))\n", "// .take(1)\n", "// .collect { println(it) }\n", "//}\n", "\n", "\n", "\n" ],
"id" : "e1d559fa3eee3d20",
"outputs" : [ ],
"execution_count" : 4
}, {
"metadata" : { },
"cell_type" : "markdown",
"source" : [ "### Callback Flows\n", "Used to wrap or encapsulate traditional Callback approach to a `Flow`" ],
"id" : "1e8bd74a0c528010"
}, {
"metadata" : {
"ExecuteTime" : {
"end_time" : "2025-08-19T09:50:13.083859Z",
"start_time" : "2025-08-19T09:50:07.046778Z"
}
},
"cell_type" : "code",
"source" : [ "data class Location(val latitude: Double = 1.0)\n", "data class LocationRequest(val latitude: Double = 1.0)\n", "interface LocationCallback {\n", " fun onLocationResult(result: Result<Location>)\n", "}\n", "\n", "fun requestLocationUpdates(request: LocationRequest, callback: LocationCallback) {\n", " runBlocking {\n", " try {\n", "\n", " flowOf(Location(request.latitude), Location(2.0), Location(3.0))\n", " .onEach { delay(100) }\n", " .onCompletion { callback.onLocationResult(Result.failure(Exception(\"No more locations\"))) }\n", " .collect {\n", " callback.onLocationResult(Result.success(it))\n", " }\n", " } catch (e: Exception) {\n", " callback.onLocationResult(Result.failure(e))\n", " }\n", " }\n", "}\n", "\n", "fun removeLocationUpdates(callback: LocationCallback) {\n", " println(\"Removing location updates\")\n", "}\n", "\n", "val callbackFlow = callbackFlow<Location> {\n", " val request = LocationRequest()\n", " val cb = object : LocationCallback {\n", " override fun onLocationResult(result: Result<Location>) {\n", " result.fold(\n", " onSuccess = { location -> trySend(location) },\n", " onFailure = { close(it) }\n", " )\n", " }\n", " }\n", " requestLocationUpdates(request, cb)\n", "\n", " awaitClose {\n", " removeLocationUpdates(cb)\n", " }\n", "}.onEach { println(\"Received location: $it\") }.onCompletion { println(\"Flow completed\") }\n", "\n", "runBlocking {\n", "// callbackFlow.collect()\n", "}" ],
"id" : "6366aacc36153cea",
"outputs" : [ ],
"execution_count" : 5
}, {
"metadata" : { },
"cell_type" : "markdown",
"source" : [ "### Hot Flows\n", "Hot flows emit values regardless of the collector.\n", "They don't really come with builders, rather, you instanciate one of them.\n", "\n", "- `StateFlow`: Always holds some state. Ideal for UI state or if an DataLayer needs to have always some data to work with." ],
"id" : "e4ed811e9b8f7c12"
}, {
"metadata" : { },
"cell_type" : "markdown",
"source" : [ "##### SharedFlow\n", "\n", "- `SharedFlow`: Can be collected multiple times. Ideal for sharing data between multiple coroutines. Ideal for broadcasting data\n", " - `shareIn`: Allows to convert a flow into a shared flow.k\n" ],
"id" : "1ea4e61451f97a80"
}, {
"metadata" : {
"ExecuteTime" : {
"end_time" : "2025-08-19T09:50:13.062761Z",
"start_time" : "2025-08-19T09:50:07.610972Z"
}
},
"cell_type" : "code",
"source" : [ "val ticker = MutableSharedFlow<Int>()\n", "\n", "// Not Ideal, since we need to create an instance and then we launch a coroutine to update it (and other one to consume it)\n", "runBlocking {\n", "// launch {\n", "// ticker\n", "// .onEach { println(\"Received: $it\") }\n", "// .onCompletion { println(\"Flow completed\") }\n", "// .collect { }\n", "// }\n", "// launch {\n", "// var count = 0\n", "// while (true) {\n", "// ticker.emit(count)\n", "// count++\n", "// delay(1000)\n", "// }\n", "// }\n", "}\n", "\n", "// Create a flow and convert it into a shared flow (this way we don't have to keep track of an instance)\n", "val betterTicker = flow {\n", " var count = 0\n", " while (true) {\n", " emit(count++)\n", " delay(1000)\n", " }\n", "}\n", " .shareIn(GlobalScope, SharingStarted.Lazily)\n", " .onStart { println(\"Starting the flow\") }\n", " .onCompletion { println(\"Flow completed\") }\n", "\n", "runBlocking {\n", " launch {\n", " delay(2000)\n", " println(\"Starting collectionr 1\")\n", " betterTicker\n", " .onEach { println(\"Received on collector 1: $it\") }\n", " .collect()\n", " }\n", " launch {\n", " println(\"Starting collection 2\")\n", " betterTicker\n", " .onEach { println(\"Received on collector 2: $it\") }\n", " .collect()\n", " }\n", "}\n", "\n" ],
"id" : "a6528752041dcb23",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : [ "Starting collection 2\n", "Starting the flow\n", "Received on collector 2: 0\n", "Received on collector 2: 1\n", "Starting collectionr 1\n", "Starting the flow\n", "Received on collector 2: 2\n", "Received on collector 1: 2\n", "Received on collector 2: 3\n", "Received on collector 1: 3\n", "Received on collector 2: 4\n", "Received on collector 1: 4\n", "Received on collector 2: 5\n", "Received on collector 1: 5\n", "Flow completed\n", "Flow completed\n" ]
}, {
"ename" : "org.jetbrains.kotlinx.jupyter.exceptions.ReplInterruptedException",
"evalue" : "The execution was interrupted",
"output_type" : "error",
"traceback" : [ "The execution was interrupted" ]
} ],
"execution_count" : 6
}, {
"metadata" : { },
"cell_type" : "markdown",
"source" : [ "#### StateFlow\n", "It always holds some kind of state" ],
"id" : "1b62954a649fa145"
}, {
"metadata" : {
"ExecuteTime" : {
"end_time" : "2025-08-19T09:50:27.910911Z",
"start_time" : "2025-08-19T09:50:24.280299Z"
}
},
"cell_type" : "code",
"source" : [ "val ticker = MutableSharedFlow<Int>()\n", "\n", "\n", "// Not Ideal, since we need to create an instance and then we launch a coroutine to update it (and other one to consume it)\n", "runBlocking {\n", "// launch {\n", "// ticker\n", "// .onEach { println(\"Received: $it\") }\n", "// .onCompletion { println(\"Flow completed\") }\n", "// .collect { }\n", "// }\n", "// launch {\n", "// var count = 0\n", "// while (true) {\n", "// ticker.emit(count)\n", "// count++\n", "// delay(1000)\n", "// }\n", "// }\n", "}\n", "\n", "// Create a flow and convert it into a shared flow (this way we don't have to keep track of an instance)\n", "val betterTicker = flow {\n", " var count = 0\n", " while (true) {\n", " emit(count++)\n", " emit(count) // Emit the same value twice\n", " delay(0.5.seconds)\n", " }\n", "}\n", " .stateIn(GlobalScope, SharingStarted.Eagerly, initialValue = 0)\n", " .onStart { println(\"Starting the flow\") }\n", " .onCompletion { println(\"Flow completed\") }\n", "\n", "runBlocking {\n", " launch {\n", " println(\"Starting collection 2\")\n", " betterTicker\n", " .onEach { println(\"Received on collector 2: $it\") }\n", " .collect()\n", " }\n", "}\n" ],
"id" : "375cd30f834b1e3",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : [ "Starting collection 2\n", "Starting the flow\n", "Received on collector 2: 1\n", "Received on collector 2: 2\n", "Received on collector 2: 3\n", "Received on collector 2: 4\n", "Received on collector 2: 5\n", "Received on collector 2: 6\n", "Received on collector 2: 7\n", "Flow completed\n" ]
}, {
"ename" : "org.jetbrains.kotlinx.jupyter.exceptions.ReplInterruptedException",
"evalue" : "The execution was interrupted",
"output_type" : "error",
"traceback" : [ "The execution was interrupted" ]
} ],
"execution_count" : 7
}, {
"metadata" : {
"ExecuteTime" : {
"end_time" : "2025-08-19T10:38:29.749129Z",
"start_time" : "2025-08-19T10:38:26.476600Z"
}
},
"cell_type" : "code",
"source" : [ "// Test Debouncing\n", "import kotlinx.coroutines.time.debounce\n", "import kotlin.time.Duration\n", "\n", "data class Product(val id: Int, val name: String)\n", "\n", "class ProductRepository {\n", " private val products = listOf(\n", " Product(1, \"Phone\"),\n", " Product(2, \"Tablet\"),\n", " Product(3, \"Laptop\"),\n", " Product(4, \"Watch\"),\n", " Product(5, \"Car\"),\n", " Product(6, \"iPhone 16\"),\n", " Product(7, \"Samsung Galaxy S23\"),\n", " Product(8, \"Pixel 6a\"),\n", " Product(9, \"Huawei P50\")\n", " )\n", "\n", " suspend fun searchProducts(query: String): List<Product> {\n", " println(\">> Searching for.. '$query'\")\n", " delay(1.seconds)\n", " println(\">> Completed Query: '$query'\")\n", " return products.filter { it.name.contains(query, ignoreCase = true) }\n", " }\n", "}\n", "\n", "val productRepository = ProductRepository()\n", "// Simulated user typing\n", "\n", "fun simulateUserTyping(query: String, delayBetweenLetters: Duration): Flow<String> = flow {\n", " val stringBuilder = StringBuilder()\n", " for (letter in query) {\n", " stringBuilder.append(letter)\n", " emit(stringBuilder.toString())\n", " delay(delayBetweenLetters)\n", " }\n", "}\n", "\n", "runBlocking {\n", " simulateUserTyping(\"phone\", 400.milliseconds)\n", " .debounce(400.milliseconds)\n", " .mapLatest{ search -> productRepository.searchProducts(search) }\n", " .collect { println(it) }\n", "\n", "}\n" ],
"id" : "5486cd647819ea30",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : [ ">> Searching for.. 'ph'\n", ">> Searching for.. 'pho'\n", ">> Searching for.. 'phone'\n", ">> Completed Query: 'phone'\n", "[Product(id=1, name=Phone), Product(id=6, name=iPhone 16)]\n" ]
} ],
"execution_count" : 76
}, {
"metadata" : { },
"cell_type" : "code",
"outputs" : [ ],
"execution_count" : null,
"source" : "",
"id" : "e211bd773fe2a15d"
} ],
"metadata" : {
"kernelspec" : {
"display_name" : "Kotlin",
"language" : "kotlin",
"name" : "kotlin"
},
"language_info" : {
"name" : "kotlin",
"version" : "2.2.20-dev-4982",
"mimetype" : "text/x-kotlin",
"file_extension" : ".kt",
"pygments_lexer" : "kotlin",
"codemirror_mode" : "text/x-kotlin",
"nbconvert_exporter" : ""
}
},
"nbformat" : 4,
"nbformat_minor" : 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment