Last active
July 12, 2025 08:07
-
-
Save bfreuden/8371ac1ca8c777a391e1154504db9490 to your computer and use it in GitHub Desktop.
Back-pressure example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import kotlinx.coroutines.* | |
| import kotlinx.coroutines.channels.produce | |
| import java.util.concurrent.atomic.AtomicInteger | |
| suspend fun getIdsFromService1(id: String): List<String> { | |
| return listOf("id1", "id2", "id3", "id4", "id5", "id6") | |
| } | |
| val concurrency2 = AtomicInteger(0) | |
| val maxConcurrency2 = AtomicInteger(0) | |
| suspend fun getIdsFromService2(id: String): List<String> { | |
| val currentConcurrency = concurrency2.incrementAndGet() | |
| maxConcurrency2.getAndUpdate { it.coerceAtLeast(currentConcurrency) } | |
| delay(10) | |
| concurrency2.decrementAndGet() | |
| return listOf("id1", "id2", "id3", "id4", "id5", "id6").map { "$id-$it" } | |
| } | |
| val concurrency3 = AtomicInteger(0) | |
| val maxConcurrency3 = AtomicInteger(0) | |
| suspend fun getIdsFromService3(id: String): List<String> { | |
| val currentConcurrency = concurrency3.incrementAndGet() | |
| maxConcurrency3.getAndUpdate { it.coerceAtLeast(currentConcurrency) } | |
| delay(10) | |
| concurrency3.decrementAndGet() | |
| return listOf("id1", "id2", "id3", "id4", "id5", "id6").map { "$id-$it" } | |
| } | |
| fun main() = runBlocking { | |
| val service1Id = "whatever" | |
| coroutineScope { | |
| val service2Ids = produce { | |
| val ids = getIdsFromService1(service1Id) | |
| for (id in ids) | |
| send(id) | |
| } | |
| val service2Concurrency = 8 | |
| val service3Ids = produce { | |
| repeat(service2Concurrency) { | |
| launch(Dispatchers.IO) { | |
| for (service2Id in service2Ids) { | |
| val ids = getIdsFromService2(service2Id) | |
| for (id in ids) | |
| send(id) | |
| } | |
| } | |
| } | |
| } | |
| val service3Concurrency = 3 | |
| repeat(service3Concurrency) { | |
| launch(Dispatchers.IO) { | |
| for (service3Id in service3Ids) { | |
| val ids = getIdsFromService3(service3Id) | |
| for (id in ids) | |
| println(id) | |
| } | |
| } | |
| } | |
| } | |
| println("max service 2 concurrency: $maxConcurrency2") | |
| println("max service 3 concurrency: $maxConcurrency3") | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment