Last active
July 18, 2020 06:37
-
-
Save emanb29/2a6819dd802c387b956624bbd3bd1e1c to your computer and use it in GitHub Desktop.
Computers are clocks that sometimes do things
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import Clock.Message | |
| import akka.actor.typed.{ActorRef, Behavior} | |
| import akka.actor.typed.scaladsl.{AbstractBehavior, ActorContext} | |
| import scala.collection.mutable | |
| object Clock { | |
| sealed trait Message | |
| /** | |
| * Receive a tick from a Clock. This should be adapted by any subscriber of this clock | |
| * @param replyTo the clock that issued the tick | |
| */ | |
| final case class Tick(replyTo: ActorRef[Tock]) extends Message | |
| object Tock { | |
| def apply(ticked: ActorRef[Tick]): Tock = new Tock(Set(ticked)) | |
| } | |
| /** | |
| * Acknowledge readiness of the provided [[ActorRef]]s to be ticked in the next clock interval | |
| * @param ticked | |
| */ | |
| final case class Tock(ticked: Set[ActorRef[Tick]]) extends Message | |
| object Unsubscribe { | |
| def apply(who: ActorRef[Tick]): Unsubscribe = new Unsubscribe(Set(who)) | |
| } | |
| /** | |
| * Request removal from ticking. All agents in `who` will receive at most one more `Tick` | |
| * @param who | |
| */ | |
| final case class Unsubscribe(who: Set[ActorRef[Tick]]) extends Message | |
| } | |
| final case class Clock(ctx: ActorContext[Message]) extends AbstractBehavior[Message](ctx) { | |
| private var whichTick = 0L | |
| def tick: Long = whichTick | |
| // the millisecond wall-clock timestamp at which the current interval started | |
| private var lastTick = System.currentTimeMillis() | |
| // the wall-clock duration elapsed during the previous interval | |
| private var lastInterval = 0L | |
| // refs that have been ticked but not tocked back yet this interval | |
| private val pendingTicks: mutable.Set[ActorRef[Clock.Tick]] = mutable.Set.empty | |
| // refs that have tocked this interval | |
| private val completedTicks: mutable.Set[ActorRef[Clock.Tick]] = mutable.Set.empty | |
| // refs that are not to be ticked anymore | |
| private val unsubscribed: mutable.Set[ActorRef[Clock.Tick]] = mutable.Set.empty | |
| override def onMessage(msg: Message): Behavior[Message] = msg match { | |
| case Clock.Tick(replyTo) => | |
| println(s"The clock was ticked by $replyTo -- this has undefined semantics") | |
| this | |
| case Clock.Tock(ticked) => | |
| completedTicks ++= ticked | |
| pendingTicks --= ticked | |
| if (pendingTicks.isEmpty) { | |
| // No more ticks are coming this interval, so: | |
| // Update wall clock | |
| val now = System.currentTimeMillis() | |
| lastInterval = now - lastTick | |
| lastTick = now | |
| // increment counter | |
| whichTick += 1 | |
| // Re-register all listeners for the next interval | |
| pendingTicks.addAll(completedTicks.diff(unsubscribed)) | |
| completedTicks.clear() | |
| unsubscribed.clear() | |
| // Tick everything again | |
| pendingTicks.foreach(_.tell(Clock.Tick(ctx.self))) | |
| } | |
| this | |
| case Clock.Unsubscribe(who) => | |
| unsubscribed ++= who | |
| this | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment