Skip to content

Instantly share code, notes, and snippets.

@paul-butcher
Created July 1, 2025 13:09
Show Gist options
  • Select an option

  • Save paul-butcher/9a5422b81bf6b48820f09dc9e480f46c to your computer and use it in GitHub Desktop.

Select an option

Save paul-butcher/9a5422b81bf6b48820f09dc9e480f46c to your computer and use it in GitHub Desktop.
import scala.concurrent.{Await, ExecutionContext, Future}
import java.util.concurrent.{ExecutorService, Executors}
import scala.concurrent.duration.DurationInt
val singleThreadedPool: ExecutorService = Executors.newSingleThreadExecutor()
implicit val ec =
concurrent.ExecutionContext.fromExecutorService(singleThreadedPool)
val rand = new scala.util.Random
def merge(message: String): Future[(String, List[String])] =
Future.successful(
message,
List(s"work${rand.nextInt(50)}", s"work${rand.nextInt(50)}")
)
def index(content: Seq[String]): Future[Either[Seq[String], Seq[String]]] = {
Future.successful(Left(content.filter {
_ => rand.nextBoolean()
}))
}
def process(messages: Seq[String]): Future[Seq[String]] =
Future.sequence(messages.map(merge)) map {
merged: Seq[(String, List[String])] =>
// Now we have a list of all the Works to be indexed
// corresponding to the message they are part of
// Construct a map to get back from the Work to the Message
val docsToMessages: Map[String, String] = merged.flatMap {
case (message, works) =>
works.map(work => work -> message)
}.toMap
(docsToMessages, index(merged.flatMap(_._2)))
} flatMap {
case (docsToMessages, result) =>
result map {
case Left(failedWorks) =>
failedWorks map {
failedWork =>
println(s"$failedWork (${docsToMessages(failedWork)}) failed")
// Return a list of failed messages to the caller
// by looking it up in the Work -> Message map
// (maybe toSet at the end?)
// If multiple Works in one Message fail, it doesn't matter too much,
// but it would be neater to only return each one once.
docsToMessages(failedWork)
}
case Right(goodWorks) =>
// Ok, No problems
goodWorks.foreach(w => println(s"$w was OK"))
Nil
}
}
val results = Await
.result(
process((1 to 20).map(int => s"message$int")),
10.seconds
)
.toSet
println("results")
println(results)
println(results.size)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment