Skip to content

Instantly share code, notes, and snippets.

@Aaronontheweb
Last active January 21, 2026 21:34
Show Gist options
  • Select an option

  • Save Aaronontheweb/30d6bdc709d99fa312c3a470c347c899 to your computer and use it in GitHub Desktop.

Select an option

Save Aaronontheweb/30d6bdc709d99fa312c3a470c347c899 to your computer and use it in GitHub Desktop.
Akka.Streams Rate Calculator with Instantaneous Rate and EMA - .NET 10 single file app
#:package Akka@1.5.58
#:package Akka.Streams@1.5.58
using Akka;
using Akka.Actor;
using Akka.Streams;
using Akka.Streams.Dsl;
// Create the actor system and materializer
using var system = ActorSystem.Create("rate-calculator");
using var materializer = system.Materializer();
Console.WriteLine("Rate Calculator PoC - Press Ctrl+C to stop\n");
Console.WriteLine($"{"Time",-12} {"Instant Rate",14} {"EMA Rate",14} {"Total",12}");
Console.WriteLine(new string('-', 56));
// Configuration
var tickInterval = TimeSpan.FromSeconds(1);
var emaAlpha = 0.3; // EMA smoothing factor: higher = more weight to recent values (0.0-1.0)
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); };
// =============================================================================
// COUNTER ACTOR - Tracks work completions from your calculation engine
// =============================================================================
// In production, your calculation engine would send Increment messages here
// as work items complete. This actor provides thread-safe counting without locks.
var counterActor = system.ActorOf(Props.Create(() => new CounterActor()), "counter");
// Simulate work happening elsewhere (your calculation engine)
_ = Task.Run(async () =>
{
var random = new Random();
while (!cts.Token.IsCancellationRequested)
{
try
{
counterActor.Tell(CounterActor.Increment.Instance);
await Task.Delay(random.Next(1, 10), cts.Token);
}
catch (OperationCanceledException) { break; }
}
}, cts.Token);
// =============================================================================
// STREAM PIPELINE - Using Source.Tick to periodically sample the counter
// =============================================================================
//
// This matches the pattern you proposed:
// Source.Tick -> sample counter -> compute rate + EMA -> output
//
// Pipeline stages:
// Source.Tick -> Via(cancellation) -> SelectAsync(sample counter) -> Scan -> Select
//
// -----------------------------------------------------------------------------
// STAGE 1: Source.Tick - Emit a tick at regular intervals
// -----------------------------------------------------------------------------
// Emits NotUsed.Instance every tickInterval (1 second).
// This drives the periodic sampling of your counter.
// First param: initial delay, Second param: interval between ticks
// -----------------------------------------------------------------------------
// STAGE 2: Via(CancellationToken.AsFlow) - Graceful shutdown support
// -----------------------------------------------------------------------------
// Completes the stream gracefully when cancellation is requested (Ctrl+C).
// -----------------------------------------------------------------------------
// STAGE 3: SelectAsync - Sample the counter on each tick
// -----------------------------------------------------------------------------
// Asks the CounterActor for the current count. This is where we "sample"
// the external state (work completions) that's being updated elsewhere.
// -----------------------------------------------------------------------------
// STAGE 4: Scan - Stateful rate and EMA calculation
// -----------------------------------------------------------------------------
// Maintains state across ticks to compute:
// - Instantaneous rate: (currentCount - previousCount) / elapsed time
// - EMA: α * currentRate + (1-α) * previousEMA (smooths out spikes)
// -----------------------------------------------------------------------------
// STAGE 5: Select - Transform to output record
// -----------------------------------------------------------------------------
// Maps the internal RateState to a clean RateMeasurement record.
var rateStream = Source.Tick(tickInterval, tickInterval, NotUsed.Instance)
.Via(cts.Token.AsFlow<NotUsed>(cancelGracefully: true))
.SelectAsync(1, async _ => await counterActor.Ask<long>(CounterActor.GetCount.Instance))
.Scan(new RateState(), (RateState state, long currentCount) =>
{
var now = DateTimeOffset.UtcNow;
var itemsSinceLastTick = currentCount - state.PreviousCount;
// Calculate elapsed time since last tick
var elapsed = Math.Max((now - state.LastTick).TotalSeconds, tickInterval.TotalSeconds);
// Instantaneous rate = items completed since last tick / elapsed time
var instantaneousRate = itemsSinceLastTick / elapsed;
// Exponential Moving Average smooths out rate fluctuations
// EMA = α * current + (1 - α) * previous
// When α = 0.3: 30% weight to current sample, 70% to historical trend
state.Ema = state.IsFirstTick
? instantaneousRate
: (emaAlpha * instantaneousRate) + ((1 - emaAlpha) * state.Ema);
state.PreviousCount = currentCount;
state.TotalCount = currentCount;
state.InstantaneousRate = instantaneousRate;
state.LastTick = now;
state.IsFirstTick = false;
return state;
})
.Skip(1) // Skip the initial seed value
.Select(s => new RateMeasurement(s.InstantaneousRate, s.Ema, s.TotalCount, s.LastTick));
// -----------------------------------------------------------------------------
// SINK: RunForeach - Consume and display measurements
// -----------------------------------------------------------------------------
// In production, replace with:
// .RunWith(StreamRefs.SourceRef<RateMeasurement>(), materializer)
// .PipeTo(Sender, sourceRef => sourceRef);
// to distribute the rate stream via SignalR.
try
{
await rateStream
.RunForeach(m =>
{
Console.WriteLine(
$"{m.Timestamp:HH:mm:ss.fff} {m.InstantaneousRate,14:N2}/s {m.SmoothedRate,14:N2}/s {m.TotalProcessed,12:N0}");
}, materializer);
}
catch (OperationCanceledException) { }
Console.WriteLine("\nShutdown complete.");
await system.Terminate();
// =============================================================================
// TYPES
// =============================================================================
record RateMeasurement(double InstantaneousRate, double SmoothedRate, long TotalProcessed, DateTimeOffset Timestamp);
class RateState
{
public long PreviousCount { get; set; }
public long TotalCount { get; set; }
public double InstantaneousRate { get; set; }
public double Ema { get; set; }
public DateTimeOffset LastTick { get; set; } = DateTimeOffset.UtcNow;
public bool IsFirstTick { get; set; } = true;
}
// =============================================================================
// COUNTER ACTOR - Thread-safe counter using Akka's actor model
// =============================================================================
// Your calculation engine sends Increment messages as work completes.
// The rate stream periodically asks for the current count via GetCount.
class CounterActor : ReceiveActor
{
public sealed class Increment
{
public static readonly Increment Instance = new();
private Increment() { }
}
public sealed class GetCount
{
public static readonly GetCount Instance = new();
private GetCount() { }
}
private long _count;
public CounterActor()
{
Receive<Increment>(_ => _count++);
Receive<GetCount>(_ => Sender.Tell(_count));
}
}
@Aaronontheweb
Copy link
Author

Sample Output

Rate Calculator PoC - Press Ctrl+C to stop

Time           Instant Rate       EMA Rate        Total
--------------------------------------------------------
21:34:12.175         192.52/s         192.52/s          219
21:34:13.160         191.00/s         192.06/s          410
21:34:14.158         203.00/s         195.35/s          613
21:34:15.157         182.00/s         191.34/s          795
21:34:16.156         184.00/s         189.14/s          979
21:34:17.156         205.98/s         194.19/s        1,185
21:34:18.157         185.94/s         191.72/s        1,371
21:34:19.156         187.00/s         190.30/s        1,558
21:34:20.156         192.00/s         190.81/s        1,750
21:34:21.157         201.94/s         194.15/s        1,952
21:34:22.156         209.00/s         198.60/s        2,161

Notice how the Instant Rate fluctuates (182-209/s) while the EMA Rate smooths it out (189-198/s), giving you a stable trend line for your SignalR dashboard.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment