Created
July 1, 2025 13:09
-
-
Save paul-butcher/9a5422b81bf6b48820f09dc9e480f46c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import scala.concurrent.{Await, ExecutionContext, Future} | |
| import java.util.concurrent.{ExecutorService, Executors} | |
| import scala.concurrent.duration.DurationInt | |
| val singleThreadedPool: ExecutorService = Executors.newSingleThreadExecutor() | |
| implicit val ec = | |
| concurrent.ExecutionContext.fromExecutorService(singleThreadedPool) | |
| val rand = new scala.util.Random | |
| def merge(message: String): Future[(String, List[String])] = | |
| Future.successful( | |
| message, | |
| List(s"work${rand.nextInt(50)}", s"work${rand.nextInt(50)}") | |
| ) | |
| def index(content: Seq[String]): Future[Either[Seq[String], Seq[String]]] = { | |
| Future.successful(Left(content.filter { | |
| _ => rand.nextBoolean() | |
| })) | |
| } | |
| def process(messages: Seq[String]): Future[Seq[String]] = | |
| Future.sequence(messages.map(merge)) map { | |
| merged: Seq[(String, List[String])] => | |
| // Now we have a list of all the Works to be indexed | |
| // corresponding to the message they are part of | |
| // Construct a map to get back from the Work to the Message | |
| val docsToMessages: Map[String, String] = merged.flatMap { | |
| case (message, works) => | |
| works.map(work => work -> message) | |
| }.toMap | |
| (docsToMessages, index(merged.flatMap(_._2))) | |
| } flatMap { | |
| case (docsToMessages, result) => | |
| result map { | |
| case Left(failedWorks) => | |
| failedWorks map { | |
| failedWork => | |
| println(s"$failedWork (${docsToMessages(failedWork)}) failed") | |
| // Return a list of failed messages to the caller | |
| // by looking it up in the Work -> Message map | |
| // (maybe toSet at the end?) | |
| // If multiple Works in one Message fail, it doesn't matter too much, | |
| // but it would be neater to only return each one once. | |
| docsToMessages(failedWork) | |
| } | |
| case Right(goodWorks) => | |
| // Ok, No problems | |
| goodWorks.foreach(w => println(s"$w was OK")) | |
| Nil | |
| } | |
| } | |
| val results = Await | |
| .result( | |
| process((1 to 20).map(int => s"message$int")), | |
| 10.seconds | |
| ) | |
| .toSet | |
| println("results") | |
| println(results) | |
| println(results.size) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment