Skip to content

Instantly share code, notes, and snippets.

@kostapc
Last active October 7, 2020 12:50
Show Gist options
  • Select an option

  • Save kostapc/3f84a7d16009b46fee4546fa98ed5cfb to your computer and use it in GitHub Desktop.

Select an option

Save kostapc/3f84a7d16009b46fee4546fa98ed5cfb to your computer and use it in GitHub Desktop.
kotlin async flow with floating parralel executions count
package net.c0f3.labs.kotlin.flow
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Semaphore
import kotlin.system.measureTimeMillis
class AsyncFlowEmission {
data class Container(
val index: Int,
val data: String
)
fun createFlowWithChannel(
source: Collection<String>,
parts: Int,
pIndex: Int = parts
): Flow<Container> {
val channel = Channel<Container>(parts)
val semaphore = Semaphore(pIndex, 0)
val getData: (index: Int) -> String = { index ->
runBlocking {
// long data fetching
delay(100)
}
source.elementAt(index)
}
GlobalScope.launch {
for (index in 0 until parts) {
semaphore.acquire()
GlobalScope.launch {
val str = getData(index)
println("#${index + 1} emitted $str")
channel.send(Container(
index = index,
data = str
))
// close channel after sending last
if (index == parts - 1) {
channel.close()
}
}
}
}
val flow = flow {
for (c in 1..parts) {
semaphore.release()
this.emit(channel.receive())
}
}
return flow
}
}
fun main() {
val afe = AsyncFlowEmission()
val collection = listOf(
"1", "2", "3", "4", "5", "6", "7", "8"
)
println("---- START")
runBlocking {
val flow = afe.createFlowWithChannel(
collection, collection.size, 2
)
val execTime = measureTimeMillis {
flow.collect { cont ->
println("${cont.index + 1} >>> ${cont.data}")
}
}
println("total flow time: $execTime")
}
println("---- END")
}
/*
---- START
#3 emitted 3
#1 emitted 1
#2 emitted 2
3 >>> 3
1 >>> 1
2 >>> 2
#5 emitted 5
#4 emitted 4
#6 emitted 6
5 >>> 5
4 >>> 4
6 >>> 6
#7 emitted 7
#8 emitted 8
7 >>> 7
8 >>> 8
total flow time: 334
---- END
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment