Created
August 19, 2025 10:39
-
-
Save FeernandoOFF/9880b5d4dc6cece1ab303cbfca182329 to your computer and use it in GitHub Desktop.
Experimenting with different Kotlin flows and its use cases
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "cells" : [ { | |
| "metadata" : { | |
| "ExecuteTime" : { | |
| "end_time" : "2025-08-19T09:50:13.080599Z", | |
| "start_time" : "2025-08-19T09:50:06.138568Z" | |
| } | |
| }, | |
| "cell_type" : "code", | |
| "source" : "%use coroutines\n", | |
| "id" : "204ec5c3197a8199", | |
| "outputs" : [ ], | |
| "execution_count" : 2 | |
| }, { | |
| "metadata" : { }, | |
| "cell_type" : "markdown", | |
| "source" : [ "## Flows\n", "Flows are a new way to express asynchronous data streams in Kotlin. And they depend on the kotlinx.coroutines library. (note import)\n", "\n", "There are two types of flows: Cold Flows and Hot Flows; The main difference is that Cold flows require a collector and always execute the same logic each time they're collected,\n", "where as Hot Flows can emit values at any point in time regardless of the collector.\n", "> In this Notebook, we will explore all types of flows" ], | |
| "id" : "c86ff696b7a100ef" | |
| }, { | |
| "metadata" : { }, | |
| "cell_type" : "markdown", | |
| "source" : [ "## Cold Flows\n", "There are a couple of cold flow builders, but they all work with the `Flow` type\n", "Overview of Flow builders:\n", "- `Flow`\n", "- `ChannelFlow`\n", "- `CallbackFlow`" ], | |
| "id" : "7938834c4b634178" | |
| }, { | |
| "metadata" : { | |
| "ExecuteTime" : { | |
| "end_time" : "2025-08-19T09:50:13.083137Z", | |
| "start_time" : "2025-08-19T09:50:06.440871Z" | |
| } | |
| }, | |
| "cell_type" : "code", | |
| "source" : [ "val myFlow = flow {\n", " emit(\"Hello World\")\n", " emit(\"Hola mundo\")\n", "}.onEach { println(it) }\n", "// .launchIn(GlobalScope) // launchIn is a collector\n", "\n", "val listToFlow = listOf(1, 2, 3, 4, 5).asFlow()\n", "val sequenceToFlow = sequenceOf(1, 2, 3, 4, 5).asFlow()\n", "val flowOf = flowOf(1, 2, 3, 4, 5)\n", "val multiTypeFlowOF = flowOf(1, \"Hello\", 3, \"World\", 5)\n", "\n", "// Needs to be on a coroutine scope to collect\n", "//runBlocking {\n", "// multiTypeFlowOF.collect { println(it) }\n", "//}\n", "\n" ], | |
| "id" : "6a25d1a6285d83a2", | |
| "outputs" : [ ], | |
| "execution_count" : 3 | |
| }, { | |
| "metadata" : { }, | |
| "cell_type" : "markdown", | |
| "source" : [ "#### Channel Flow\n", "(not to be confused with `kotlinx.coroutines.channels.Channel`)\n", "Channels are a way to communicate between coroutines and multi-source emission.\n", "\n", "Use cases:\n", "- For passing data trough different scopes i.e. `Dispatchers.IO` to `Dispatchers.Main`\n", "- Breaking logic into multiple concurrent tasks and bringing their results together as a reactive stream" ], | |
| "id" : "b06dcccf864164eb" | |
| }, { | |
| "metadata" : { | |
| "collapsed" : true, | |
| "ExecuteTime" : { | |
| "end_time" : "2025-08-19T09:50:13.083654Z", | |
| "start_time" : "2025-08-19T09:50:06.658007Z" | |
| } | |
| }, | |
| "cell_type" : "code", | |
| "source" : [ "import kotlinx.coroutines.channels.awaitClose\n", "import java.io.BufferedReader\n", "import java.io.File\n", "import java.io.FileReader\n", "\n", "\n", "// Channels are great to communicate between contexts\n", "channelFlow {\n", " launch {\n", " send(\"Hello World\")\n", " send(\"Hola mundo\")\n", " }.invokeOnCompletion { close() }\n", "\n", " // We can listen for close\n", " awaitClose { println(\"Channel closed\") }\n", "}.onEach { println(it) }\n", "// .launchIn(GlobalScope)\n", "\n", "// Examples i.e. Reading from a file and if it gets cancelled, it cancells the reading process\n", "fun readFile(file: File): Flow<String> = callbackFlow {\n", " val reader = withContext(Dispatchers.IO) {\n", " BufferedReader(FileReader(file))\n", " }\n", " val job = launch(Dispatchers.IO) {\n", " reader.useLines { lines -> lines.forEach { send(it) } }\n", " close()\n", " }\n", " awaitClose {\n", " job.cancel();\n", " reader.close()\n", " }\n", "}\n", "//runBlocking {\n", "// readFile(File(\"README.md\"))\n", "// .take(1)\n", "// .collect { println(it) }\n", "//}\n", "\n", "\n", "\n" ], | |
| "id" : "e1d559fa3eee3d20", | |
| "outputs" : [ ], | |
| "execution_count" : 4 | |
| }, { | |
| "metadata" : { }, | |
| "cell_type" : "markdown", | |
| "source" : [ "### Callback Flows\n", "Used to wrap or encapsulate traditional Callback approach to a `Flow`" ], | |
| "id" : "1e8bd74a0c528010" | |
| }, { | |
| "metadata" : { | |
| "ExecuteTime" : { | |
| "end_time" : "2025-08-19T09:50:13.083859Z", | |
| "start_time" : "2025-08-19T09:50:07.046778Z" | |
| } | |
| }, | |
| "cell_type" : "code", | |
| "source" : [ "data class Location(val latitude: Double = 1.0)\n", "data class LocationRequest(val latitude: Double = 1.0)\n", "interface LocationCallback {\n", " fun onLocationResult(result: Result<Location>)\n", "}\n", "\n", "fun requestLocationUpdates(request: LocationRequest, callback: LocationCallback) {\n", " runBlocking {\n", " try {\n", "\n", " flowOf(Location(request.latitude), Location(2.0), Location(3.0))\n", " .onEach { delay(100) }\n", " .onCompletion { callback.onLocationResult(Result.failure(Exception(\"No more locations\"))) }\n", " .collect {\n", " callback.onLocationResult(Result.success(it))\n", " }\n", " } catch (e: Exception) {\n", " callback.onLocationResult(Result.failure(e))\n", " }\n", " }\n", "}\n", "\n", "fun removeLocationUpdates(callback: LocationCallback) {\n", " println(\"Removing location updates\")\n", "}\n", "\n", "val callbackFlow = callbackFlow<Location> {\n", " val request = LocationRequest()\n", " val cb = object : LocationCallback {\n", " override fun onLocationResult(result: Result<Location>) {\n", " result.fold(\n", " onSuccess = { location -> trySend(location) },\n", " onFailure = { close(it) }\n", " )\n", " }\n", " }\n", " requestLocationUpdates(request, cb)\n", "\n", " awaitClose {\n", " removeLocationUpdates(cb)\n", " }\n", "}.onEach { println(\"Received location: $it\") }.onCompletion { println(\"Flow completed\") }\n", "\n", "runBlocking {\n", "// callbackFlow.collect()\n", "}" ], | |
| "id" : "6366aacc36153cea", | |
| "outputs" : [ ], | |
| "execution_count" : 5 | |
| }, { | |
| "metadata" : { }, | |
| "cell_type" : "markdown", | |
| "source" : [ "### Hot Flows\n", "Hot flows emit values regardless of the collector.\n", "They don't really come with builders, rather, you instanciate one of them.\n", "\n", "- `StateFlow`: Always holds some state. Ideal for UI state or if an DataLayer needs to have always some data to work with." ], | |
| "id" : "e4ed811e9b8f7c12" | |
| }, { | |
| "metadata" : { }, | |
| "cell_type" : "markdown", | |
| "source" : [ "##### SharedFlow\n", "\n", "- `SharedFlow`: Can be collected multiple times. Ideal for sharing data between multiple coroutines. Ideal for broadcasting data\n", " - `shareIn`: Allows to convert a flow into a shared flow.k\n" ], | |
| "id" : "1ea4e61451f97a80" | |
| }, { | |
| "metadata" : { | |
| "ExecuteTime" : { | |
| "end_time" : "2025-08-19T09:50:13.062761Z", | |
| "start_time" : "2025-08-19T09:50:07.610972Z" | |
| } | |
| }, | |
| "cell_type" : "code", | |
| "source" : [ "val ticker = MutableSharedFlow<Int>()\n", "\n", "// Not Ideal, since we need to create an instance and then we launch a coroutine to update it (and other one to consume it)\n", "runBlocking {\n", "// launch {\n", "// ticker\n", "// .onEach { println(\"Received: $it\") }\n", "// .onCompletion { println(\"Flow completed\") }\n", "// .collect { }\n", "// }\n", "// launch {\n", "// var count = 0\n", "// while (true) {\n", "// ticker.emit(count)\n", "// count++\n", "// delay(1000)\n", "// }\n", "// }\n", "}\n", "\n", "// Create a flow and convert it into a shared flow (this way we don't have to keep track of an instance)\n", "val betterTicker = flow {\n", " var count = 0\n", " while (true) {\n", " emit(count++)\n", " delay(1000)\n", " }\n", "}\n", " .shareIn(GlobalScope, SharingStarted.Lazily)\n", " .onStart { println(\"Starting the flow\") }\n", " .onCompletion { println(\"Flow completed\") }\n", "\n", "runBlocking {\n", " launch {\n", " delay(2000)\n", " println(\"Starting collectionr 1\")\n", " betterTicker\n", " .onEach { println(\"Received on collector 1: $it\") }\n", " .collect()\n", " }\n", " launch {\n", " println(\"Starting collection 2\")\n", " betterTicker\n", " .onEach { println(\"Received on collector 2: $it\") }\n", " .collect()\n", " }\n", "}\n", "\n" ], | |
| "id" : "a6528752041dcb23", | |
| "outputs" : [ { | |
| "name" : "stdout", | |
| "output_type" : "stream", | |
| "text" : [ "Starting collection 2\n", "Starting the flow\n", "Received on collector 2: 0\n", "Received on collector 2: 1\n", "Starting collectionr 1\n", "Starting the flow\n", "Received on collector 2: 2\n", "Received on collector 1: 2\n", "Received on collector 2: 3\n", "Received on collector 1: 3\n", "Received on collector 2: 4\n", "Received on collector 1: 4\n", "Received on collector 2: 5\n", "Received on collector 1: 5\n", "Flow completed\n", "Flow completed\n" ] | |
| }, { | |
| "ename" : "org.jetbrains.kotlinx.jupyter.exceptions.ReplInterruptedException", | |
| "evalue" : "The execution was interrupted", | |
| "output_type" : "error", | |
| "traceback" : [ "The execution was interrupted" ] | |
| } ], | |
| "execution_count" : 6 | |
| }, { | |
| "metadata" : { }, | |
| "cell_type" : "markdown", | |
| "source" : [ "#### StateFlow\n", "It always holds some kind of state" ], | |
| "id" : "1b62954a649fa145" | |
| }, { | |
| "metadata" : { | |
| "ExecuteTime" : { | |
| "end_time" : "2025-08-19T09:50:27.910911Z", | |
| "start_time" : "2025-08-19T09:50:24.280299Z" | |
| } | |
| }, | |
| "cell_type" : "code", | |
| "source" : [ "val ticker = MutableSharedFlow<Int>()\n", "\n", "\n", "// Not Ideal, since we need to create an instance and then we launch a coroutine to update it (and other one to consume it)\n", "runBlocking {\n", "// launch {\n", "// ticker\n", "// .onEach { println(\"Received: $it\") }\n", "// .onCompletion { println(\"Flow completed\") }\n", "// .collect { }\n", "// }\n", "// launch {\n", "// var count = 0\n", "// while (true) {\n", "// ticker.emit(count)\n", "// count++\n", "// delay(1000)\n", "// }\n", "// }\n", "}\n", "\n", "// Create a flow and convert it into a shared flow (this way we don't have to keep track of an instance)\n", "val betterTicker = flow {\n", " var count = 0\n", " while (true) {\n", " emit(count++)\n", " emit(count) // Emit the same value twice\n", " delay(0.5.seconds)\n", " }\n", "}\n", " .stateIn(GlobalScope, SharingStarted.Eagerly, initialValue = 0)\n", " .onStart { println(\"Starting the flow\") }\n", " .onCompletion { println(\"Flow completed\") }\n", "\n", "runBlocking {\n", " launch {\n", " println(\"Starting collection 2\")\n", " betterTicker\n", " .onEach { println(\"Received on collector 2: $it\") }\n", " .collect()\n", " }\n", "}\n" ], | |
| "id" : "375cd30f834b1e3", | |
| "outputs" : [ { | |
| "name" : "stdout", | |
| "output_type" : "stream", | |
| "text" : [ "Starting collection 2\n", "Starting the flow\n", "Received on collector 2: 1\n", "Received on collector 2: 2\n", "Received on collector 2: 3\n", "Received on collector 2: 4\n", "Received on collector 2: 5\n", "Received on collector 2: 6\n", "Received on collector 2: 7\n", "Flow completed\n" ] | |
| }, { | |
| "ename" : "org.jetbrains.kotlinx.jupyter.exceptions.ReplInterruptedException", | |
| "evalue" : "The execution was interrupted", | |
| "output_type" : "error", | |
| "traceback" : [ "The execution was interrupted" ] | |
| } ], | |
| "execution_count" : 7 | |
| }, { | |
| "metadata" : { | |
| "ExecuteTime" : { | |
| "end_time" : "2025-08-19T10:38:29.749129Z", | |
| "start_time" : "2025-08-19T10:38:26.476600Z" | |
| } | |
| }, | |
| "cell_type" : "code", | |
| "source" : [ "// Test Debouncing\n", "import kotlinx.coroutines.time.debounce\n", "import kotlin.time.Duration\n", "\n", "data class Product(val id: Int, val name: String)\n", "\n", "class ProductRepository {\n", " private val products = listOf(\n", " Product(1, \"Phone\"),\n", " Product(2, \"Tablet\"),\n", " Product(3, \"Laptop\"),\n", " Product(4, \"Watch\"),\n", " Product(5, \"Car\"),\n", " Product(6, \"iPhone 16\"),\n", " Product(7, \"Samsung Galaxy S23\"),\n", " Product(8, \"Pixel 6a\"),\n", " Product(9, \"Huawei P50\")\n", " )\n", "\n", " suspend fun searchProducts(query: String): List<Product> {\n", " println(\">> Searching for.. '$query'\")\n", " delay(1.seconds)\n", " println(\">> Completed Query: '$query'\")\n", " return products.filter { it.name.contains(query, ignoreCase = true) }\n", " }\n", "}\n", "\n", "val productRepository = ProductRepository()\n", "// Simulated user typing\n", "\n", "fun simulateUserTyping(query: String, delayBetweenLetters: Duration): Flow<String> = flow {\n", " val stringBuilder = StringBuilder()\n", " for (letter in query) {\n", " stringBuilder.append(letter)\n", " emit(stringBuilder.toString())\n", " delay(delayBetweenLetters)\n", " }\n", "}\n", "\n", "runBlocking {\n", " simulateUserTyping(\"phone\", 400.milliseconds)\n", " .debounce(400.milliseconds)\n", " .mapLatest{ search -> productRepository.searchProducts(search) }\n", " .collect { println(it) }\n", "\n", "}\n" ], | |
| "id" : "5486cd647819ea30", | |
| "outputs" : [ { | |
| "name" : "stdout", | |
| "output_type" : "stream", | |
| "text" : [ ">> Searching for.. 'ph'\n", ">> Searching for.. 'pho'\n", ">> Searching for.. 'phone'\n", ">> Completed Query: 'phone'\n", "[Product(id=1, name=Phone), Product(id=6, name=iPhone 16)]\n" ] | |
| } ], | |
| "execution_count" : 76 | |
| }, { | |
| "metadata" : { }, | |
| "cell_type" : "code", | |
| "outputs" : [ ], | |
| "execution_count" : null, | |
| "source" : "", | |
| "id" : "e211bd773fe2a15d" | |
| } ], | |
| "metadata" : { | |
| "kernelspec" : { | |
| "display_name" : "Kotlin", | |
| "language" : "kotlin", | |
| "name" : "kotlin" | |
| }, | |
| "language_info" : { | |
| "name" : "kotlin", | |
| "version" : "2.2.20-dev-4982", | |
| "mimetype" : "text/x-kotlin", | |
| "file_extension" : ".kt", | |
| "pygments_lexer" : "kotlin", | |
| "codemirror_mode" : "text/x-kotlin", | |
| "nbconvert_exporter" : "" | |
| } | |
| }, | |
| "nbformat" : 4, | |
| "nbformat_minor" : 0 | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment