Skip to content

Instantly share code, notes, and snippets.

@emanb29
Last active July 18, 2020 06:37
Show Gist options
  • Select an option

  • Save emanb29/2a6819dd802c387b956624bbd3bd1e1c to your computer and use it in GitHub Desktop.

Select an option

Save emanb29/2a6819dd802c387b956624bbd3bd1e1c to your computer and use it in GitHub Desktop.
Computers are clocks that sometimes do things
import Clock.Message
import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.{AbstractBehavior, ActorContext}
import scala.collection.mutable
object Clock {
sealed trait Message
/**
* Receive a tick from a Clock. This should be adapted by any subscriber of this clock
* @param replyTo the clock that issued the tick
*/
final case class Tick(replyTo: ActorRef[Tock]) extends Message
object Tock {
def apply(ticked: ActorRef[Tick]): Tock = new Tock(Set(ticked))
}
/**
* Acknowledge readiness of the provided [[ActorRef]]s to be ticked in the next clock interval
* @param ticked
*/
final case class Tock(ticked: Set[ActorRef[Tick]]) extends Message
object Unsubscribe {
def apply(who: ActorRef[Tick]): Unsubscribe = new Unsubscribe(Set(who))
}
/**
* Request removal from ticking. All agents in `who` will receive at most one more `Tick`
* @param who
*/
final case class Unsubscribe(who: Set[ActorRef[Tick]]) extends Message
}
final case class Clock(ctx: ActorContext[Message]) extends AbstractBehavior[Message](ctx) {
private var whichTick = 0L
def tick: Long = whichTick
// the millisecond wall-clock timestamp at which the current interval started
private var lastTick = System.currentTimeMillis()
// the wall-clock duration elapsed during the previous interval
private var lastInterval = 0L
// refs that have been ticked but not tocked back yet this interval
private val pendingTicks: mutable.Set[ActorRef[Clock.Tick]] = mutable.Set.empty
// refs that have tocked this interval
private val completedTicks: mutable.Set[ActorRef[Clock.Tick]] = mutable.Set.empty
// refs that are not to be ticked anymore
private val unsubscribed: mutable.Set[ActorRef[Clock.Tick]] = mutable.Set.empty
override def onMessage(msg: Message): Behavior[Message] = msg match {
case Clock.Tick(replyTo) =>
println(s"The clock was ticked by $replyTo -- this has undefined semantics")
this
case Clock.Tock(ticked) =>
completedTicks ++= ticked
pendingTicks --= ticked
if (pendingTicks.isEmpty) {
// No more ticks are coming this interval, so:
// Update wall clock
val now = System.currentTimeMillis()
lastInterval = now - lastTick
lastTick = now
// increment counter
whichTick += 1
// Re-register all listeners for the next interval
pendingTicks.addAll(completedTicks.diff(unsubscribed))
completedTicks.clear()
unsubscribed.clear()
// Tick everything again
pendingTicks.foreach(_.tell(Clock.Tick(ctx.self)))
}
this
case Clock.Unsubscribe(who) =>
unsubscribed ++= who
this
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment