Skip to content

Instantly share code, notes, and snippets.

@julianpeeters
Last active January 13, 2020 04:18
Show Gist options
  • Select an option

  • Save julianpeeters/9da3eb6342ed38a6f318a3bd1f678f1c to your computer and use it in GitHub Desktop.

Select an option

Save julianpeeters/9da3eb6342ed38a6f318a3bd1f678f1c to your computer and use it in GitHub Desktop.
import cats.effect.{Concurrent, Sync}
import cats.effect.concurrent.{Deferred, Ref}
import cats.effect.implicits._
import cats.implicits._
import scala.collection.mutable.Queue
trait FlushableQueue[F[_], A] {
def enqueue(a: A): F[Unit]
def dequeueAll: F[List[A]]
}
object FlushableQueue {
sealed trait Status[F[_]]
final case class Ready[F[_]]() extends Status[F]
final case class Updating[F[_]](d: Deferred[F, Either[Throwable, Unit]]) extends Status[F]
private def completeAction[F[_]: Sync, R](
f: F[R], // the action affecting the queue
d: Deferred[F, Either[Throwable, Unit]],
s: Ref[F, Status[F]]
): F[R] = {
for {
r <- f.attempt // keep errors in an either so we can update the status before breaking out of the for comp
_ <- s.set(r match { // in either success or failure, set back to Ready for when someone tries again
case Left(_) => Ready() // give up and set the state back. We'll notify the awaiters and then rethrow
case Right(_) => Ready() // set the status back to Ready and notify the awaiters
})
_ <- d.complete(r.void) //in either success or failure, notify awaiters that the action is done
} yield r
}.rethrow // now that we're done managing status, if there was a failure, raise to caller, let them resolve it
private def stateTransition[F[_]: Sync, R](
action: F[R],
deferred: Deferred[F, Either[Throwable, Unit]],
statusRef: Ref[F, Status[F]]
): Status[F] => (Status[F], F[R]) =
status =>
status match {
case Ready() =>
val newStatus: Status[F] = Updating(deferred) // set to Updating
val completion: F[R] = completeAction[F,R](action, deferred, statusRef)
(newStatus, completion)
case Updating(inProgress) => // whoever set it to Updating will be calling complete to notify us awaiters
val sameStatus: Status[F] = status // when they call complete, we go straight to another update
val awaitInProgress: F[Unit] = inProgress.get.rethrow // wait for notification that update is done
val completion: F[R] = completeAction[F,R](action, deferred, statusRef)
(sameStatus, awaitInProgress >> completion)
}
def ofMutableQueue[F[_]: Concurrent, A](): F[FlushableQueue[F, A]] =
for {
queue <- Sync[F].delay(new Queue[A])
statusRef <- Ref.of[F, Status[F]](Ready[F]())
} yield {
new FlushableQueue[F, A] {
private def deferAccess[R](action: F[R]): F[R] =
Deferred[F, Either[Throwable, Unit]].flatMap { deferred =>
val transition: Status[F] => (Status[F], F[R]) = stateTransition[F, R](action, deferred, statusRef)
statusRef.modify(transition).flatten
}
override def enqueue(a: A): F[Unit] =
deferAccess[Unit] {
Sync[F].delay(queue.enqueue(a)).void
}
override def dequeueAll: F[List[A]] =
deferAccess[List[A]] {
Sync[F].delay(queue.dequeueAll(_ => true).toList)
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment