Last active
January 13, 2020 04:18
-
-
Save julianpeeters/9da3eb6342ed38a6f318a3bd1f678f1c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import cats.effect.{Concurrent, Sync} | |
| import cats.effect.concurrent.{Deferred, Ref} | |
| import cats.effect.implicits._ | |
| import cats.implicits._ | |
| import scala.collection.mutable.Queue | |
| trait FlushableQueue[F[_], A] { | |
| def enqueue(a: A): F[Unit] | |
| def dequeueAll: F[List[A]] | |
| } | |
| object FlushableQueue { | |
| sealed trait Status[F[_]] | |
| final case class Ready[F[_]]() extends Status[F] | |
| final case class Updating[F[_]](d: Deferred[F, Either[Throwable, Unit]]) extends Status[F] | |
| private def completeAction[F[_]: Sync, R]( | |
| f: F[R], // the action affecting the queue | |
| d: Deferred[F, Either[Throwable, Unit]], | |
| s: Ref[F, Status[F]] | |
| ): F[R] = { | |
| for { | |
| r <- f.attempt // keep errors in an either so we can update the status before breaking out of the for comp | |
| _ <- s.set(r match { // in either success or failure, set back to Ready for when someone tries again | |
| case Left(_) => Ready() // give up and set the state back. We'll notify the awaiters and then rethrow | |
| case Right(_) => Ready() // set the status back to Ready and notify the awaiters | |
| }) | |
| _ <- d.complete(r.void) //in either success or failure, notify awaiters that the action is done | |
| } yield r | |
| }.rethrow // now that we're done managing status, if there was a failure, raise to caller, let them resolve it | |
| private def stateTransition[F[_]: Sync, R]( | |
| action: F[R], | |
| deferred: Deferred[F, Either[Throwable, Unit]], | |
| statusRef: Ref[F, Status[F]] | |
| ): Status[F] => (Status[F], F[R]) = | |
| status => | |
| status match { | |
| case Ready() => | |
| val newStatus: Status[F] = Updating(deferred) // set to Updating | |
| val completion: F[R] = completeAction[F,R](action, deferred, statusRef) | |
| (newStatus, completion) | |
| case Updating(inProgress) => // whoever set it to Updating will be calling complete to notify us awaiters | |
| val sameStatus: Status[F] = status // when they call complete, we go straight to another update | |
| val awaitInProgress: F[Unit] = inProgress.get.rethrow // wait for notification that update is done | |
| val completion: F[R] = completeAction[F,R](action, deferred, statusRef) | |
| (sameStatus, awaitInProgress >> completion) | |
| } | |
| def ofMutableQueue[F[_]: Concurrent, A](): F[FlushableQueue[F, A]] = | |
| for { | |
| queue <- Sync[F].delay(new Queue[A]) | |
| statusRef <- Ref.of[F, Status[F]](Ready[F]()) | |
| } yield { | |
| new FlushableQueue[F, A] { | |
| private def deferAccess[R](action: F[R]): F[R] = | |
| Deferred[F, Either[Throwable, Unit]].flatMap { deferred => | |
| val transition: Status[F] => (Status[F], F[R]) = stateTransition[F, R](action, deferred, statusRef) | |
| statusRef.modify(transition).flatten | |
| } | |
| override def enqueue(a: A): F[Unit] = | |
| deferAccess[Unit] { | |
| Sync[F].delay(queue.enqueue(a)).void | |
| } | |
| override def dequeueAll: F[List[A]] = | |
| deferAccess[List[A]] { | |
| Sync[F].delay(queue.dequeueAll(_ => true).toList) | |
| } | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment