Last active
January 21, 2026 21:34
-
-
Save Aaronontheweb/30d6bdc709d99fa312c3a470c347c899 to your computer and use it in GitHub Desktop.
Akka.Streams Rate Calculator with Instantaneous Rate and EMA - .NET 10 single file app
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #:package Akka@1.5.58 | |
| #:package Akka.Streams@1.5.58 | |
| using Akka; | |
| using Akka.Actor; | |
| using Akka.Streams; | |
| using Akka.Streams.Dsl; | |
| // Create the actor system and materializer | |
| using var system = ActorSystem.Create("rate-calculator"); | |
| using var materializer = system.Materializer(); | |
| Console.WriteLine("Rate Calculator PoC - Press Ctrl+C to stop\n"); | |
| Console.WriteLine($"{"Time",-12} {"Instant Rate",14} {"EMA Rate",14} {"Total",12}"); | |
| Console.WriteLine(new string('-', 56)); | |
| // Configuration | |
| var tickInterval = TimeSpan.FromSeconds(1); | |
| var emaAlpha = 0.3; // EMA smoothing factor: higher = more weight to recent values (0.0-1.0) | |
| var cts = new CancellationTokenSource(); | |
| Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); }; | |
| // ============================================================================= | |
| // COUNTER ACTOR - Tracks work completions from your calculation engine | |
| // ============================================================================= | |
| // In production, your calculation engine would send Increment messages here | |
| // as work items complete. This actor provides thread-safe counting without locks. | |
| var counterActor = system.ActorOf(Props.Create(() => new CounterActor()), "counter"); | |
| // Simulate work happening elsewhere (your calculation engine) | |
| _ = Task.Run(async () => | |
| { | |
| var random = new Random(); | |
| while (!cts.Token.IsCancellationRequested) | |
| { | |
| try | |
| { | |
| counterActor.Tell(CounterActor.Increment.Instance); | |
| await Task.Delay(random.Next(1, 10), cts.Token); | |
| } | |
| catch (OperationCanceledException) { break; } | |
| } | |
| }, cts.Token); | |
| // ============================================================================= | |
| // STREAM PIPELINE - Using Source.Tick to periodically sample the counter | |
| // ============================================================================= | |
| // | |
| // This matches the pattern you proposed: | |
| // Source.Tick -> sample counter -> compute rate + EMA -> output | |
| // | |
| // Pipeline stages: | |
| // Source.Tick -> Via(cancellation) -> SelectAsync(sample counter) -> Scan -> Select | |
| // | |
| // ----------------------------------------------------------------------------- | |
| // STAGE 1: Source.Tick - Emit a tick at regular intervals | |
| // ----------------------------------------------------------------------------- | |
| // Emits NotUsed.Instance every tickInterval (1 second). | |
| // This drives the periodic sampling of your counter. | |
| // First param: initial delay, Second param: interval between ticks | |
| // ----------------------------------------------------------------------------- | |
| // STAGE 2: Via(CancellationToken.AsFlow) - Graceful shutdown support | |
| // ----------------------------------------------------------------------------- | |
| // Completes the stream gracefully when cancellation is requested (Ctrl+C). | |
| // ----------------------------------------------------------------------------- | |
| // STAGE 3: SelectAsync - Sample the counter on each tick | |
| // ----------------------------------------------------------------------------- | |
| // Asks the CounterActor for the current count. This is where we "sample" | |
| // the external state (work completions) that's being updated elsewhere. | |
| // ----------------------------------------------------------------------------- | |
| // STAGE 4: Scan - Stateful rate and EMA calculation | |
| // ----------------------------------------------------------------------------- | |
| // Maintains state across ticks to compute: | |
| // - Instantaneous rate: (currentCount - previousCount) / elapsed time | |
| // - EMA: α * currentRate + (1-α) * previousEMA (smooths out spikes) | |
| // ----------------------------------------------------------------------------- | |
| // STAGE 5: Select - Transform to output record | |
| // ----------------------------------------------------------------------------- | |
| // Maps the internal RateState to a clean RateMeasurement record. | |
| var rateStream = Source.Tick(tickInterval, tickInterval, NotUsed.Instance) | |
| .Via(cts.Token.AsFlow<NotUsed>(cancelGracefully: true)) | |
| .SelectAsync(1, async _ => await counterActor.Ask<long>(CounterActor.GetCount.Instance)) | |
| .Scan(new RateState(), (RateState state, long currentCount) => | |
| { | |
| var now = DateTimeOffset.UtcNow; | |
| var itemsSinceLastTick = currentCount - state.PreviousCount; | |
| // Calculate elapsed time since last tick | |
| var elapsed = Math.Max((now - state.LastTick).TotalSeconds, tickInterval.TotalSeconds); | |
| // Instantaneous rate = items completed since last tick / elapsed time | |
| var instantaneousRate = itemsSinceLastTick / elapsed; | |
| // Exponential Moving Average smooths out rate fluctuations | |
| // EMA = α * current + (1 - α) * previous | |
| // When α = 0.3: 30% weight to current sample, 70% to historical trend | |
| state.Ema = state.IsFirstTick | |
| ? instantaneousRate | |
| : (emaAlpha * instantaneousRate) + ((1 - emaAlpha) * state.Ema); | |
| state.PreviousCount = currentCount; | |
| state.TotalCount = currentCount; | |
| state.InstantaneousRate = instantaneousRate; | |
| state.LastTick = now; | |
| state.IsFirstTick = false; | |
| return state; | |
| }) | |
| .Skip(1) // Skip the initial seed value | |
| .Select(s => new RateMeasurement(s.InstantaneousRate, s.Ema, s.TotalCount, s.LastTick)); | |
| // ----------------------------------------------------------------------------- | |
| // SINK: RunForeach - Consume and display measurements | |
| // ----------------------------------------------------------------------------- | |
| // In production, replace with: | |
| // .RunWith(StreamRefs.SourceRef<RateMeasurement>(), materializer) | |
| // .PipeTo(Sender, sourceRef => sourceRef); | |
| // to distribute the rate stream via SignalR. | |
| try | |
| { | |
| await rateStream | |
| .RunForeach(m => | |
| { | |
| Console.WriteLine( | |
| $"{m.Timestamp:HH:mm:ss.fff} {m.InstantaneousRate,14:N2}/s {m.SmoothedRate,14:N2}/s {m.TotalProcessed,12:N0}"); | |
| }, materializer); | |
| } | |
| catch (OperationCanceledException) { } | |
| Console.WriteLine("\nShutdown complete."); | |
| await system.Terminate(); | |
| // ============================================================================= | |
| // TYPES | |
| // ============================================================================= | |
| record RateMeasurement(double InstantaneousRate, double SmoothedRate, long TotalProcessed, DateTimeOffset Timestamp); | |
| class RateState | |
| { | |
| public long PreviousCount { get; set; } | |
| public long TotalCount { get; set; } | |
| public double InstantaneousRate { get; set; } | |
| public double Ema { get; set; } | |
| public DateTimeOffset LastTick { get; set; } = DateTimeOffset.UtcNow; | |
| public bool IsFirstTick { get; set; } = true; | |
| } | |
| // ============================================================================= | |
| // COUNTER ACTOR - Thread-safe counter using Akka's actor model | |
| // ============================================================================= | |
| // Your calculation engine sends Increment messages as work completes. | |
| // The rate stream periodically asks for the current count via GetCount. | |
| class CounterActor : ReceiveActor | |
| { | |
| public sealed class Increment | |
| { | |
| public static readonly Increment Instance = new(); | |
| private Increment() { } | |
| } | |
| public sealed class GetCount | |
| { | |
| public static readonly GetCount Instance = new(); | |
| private GetCount() { } | |
| } | |
| private long _count; | |
| public CounterActor() | |
| { | |
| Receive<Increment>(_ => _count++); | |
| Receive<GetCount>(_ => Sender.Tell(_count)); | |
| } | |
| } |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Sample Output
Notice how the Instant Rate fluctuates (182-209/s) while the EMA Rate smooths it out (189-198/s), giving you a stable trend line for your SignalR dashboard.