Skip to content

Instantly share code, notes, and snippets.

@rodrigobrito
Last active October 10, 2023 23:42
Show Gist options
  • Select an option

  • Save rodrigobrito/f6b2bf13cee4658be2a5846fa968faf8 to your computer and use it in GitHub Desktop.

Select an option

Save rodrigobrito/f6b2bf13cee4658be2a5846fa968faf8 to your computer and use it in GitHub Desktop.
C# Throttling management
[TestClass]
public class ThrottlingTest
{
[TestMethod]
public void Should_ProcessOneHundredMessages_In_OneHundredMillisecond()
{
var throttle = new Throttler(100, TimeSpan.FromMilliseconds(100));
var stopWatch = new Stopwatch();
stopWatch.Start();
for (var i = 1; i <= 100; i++)
{
throttle.Throttle();
}
stopWatch.Stop();
Assert.AreEqual(100, RoundHundred(stopWatch.Elapsed.TotalMilliseconds));
}
[TestMethod]
public void Should_ProcessOneHundredMessages_In_OneSecond()
{
var throttle = new Throttler(100, TimeSpan.FromSeconds(1));
var stopWatch = new Stopwatch();
stopWatch.Start();
for (var i = 1; i <= 100; i++)
{
throttle.Throttle();
}
stopWatch.Stop();
Assert.AreEqual(1, Math.Round(stopWatch.Elapsed.TotalSeconds));
}
[TestMethod]
public void Should_ProcessOneHundredMessages_In_TwoMinutes()
{
var throttle = new Throttler(100, TimeSpan.FromMinutes(2));
var stopWatch = new Stopwatch();
stopWatch.Start();
for (var i = 1; i <= 100; i++)
{
throttle.Throttle();
}
stopWatch.Stop();
Assert.AreEqual(2, Math.Round(stopWatch.Elapsed.TotalMinutes));
}
private static double RoundHundred(double val)
{
return Math.Round(value: val / 100, digits: 0) * 100;
}
}
public class Throttler
{
public const long NoLimit = -1;
private long _consumedMessages; // Messages or pieces of messages
private long _lastRefillTime;
private long _periodTicks;
public Throttler() : this(NoLimit, TimeSpan.FromSeconds(1))
{
}
public Throttler(long averageRate, TimeSpan period, long burstSize = 1)
{
BurstSize = burstSize;
AverageRate = averageRate;
Period = period;
}
#region Properties
public long BurstSize { get; }
public long AverageRate { get; }
public TimeSpan Period
{
get => new TimeSpan(_periodTicks);
set => _periodTicks = value.Ticks;
}
#endregion
/// <summary>
/// Compute elapsed time using DateTime.UtcNow.Ticks and refill messages using period and average rate.
/// </summary>
private void RefillMessages()
{
var currentTick = DateTime.UtcNow.Ticks;
// Last refill time in ticks unit
var refillTime = Volatile.Read(ref _lastRefillTime);
// Time delta in ticks unit
var ticksDelta = currentTick - refillTime;
var averageRate = ticksDelta * AverageRate / _periodTicks;
if (averageRate <= 0) return;
var newDetalPeriod = averageRate * _periodTicks / AverageRate;
var newRefillTime = refillTime == 0 ? currentTick : refillTime + newDetalPeriod;
// Only try to refill new ticks delta if no other thread has beaten us to the update _lastRefillTime
if (Interlocked.CompareExchange(ref _lastRefillTime, newRefillTime, refillTime) != refillTime)
{
return;
}
// Loop until we succeed in refilling "new ticks delta"
// This is why we need to make sure the refill is atomic
long newLevel;
long currentLevel;
do
{
currentLevel = Volatile.Read(ref _consumedMessages);
var adjustedLevel = Math.Min(currentLevel, BurstSize); // In case burstSize decreased
newLevel = Math.Max(0, adjustedLevel - averageRate);
} while (Interlocked.CompareExchange(ref _consumedMessages, newLevel, currentLevel) != currentLevel);
}
private long ConsumeMessages(long messages)
{
long currentLevel;
long toConsume;
do
{
currentLevel = Volatile.Read(ref _consumedMessages);
var available = BurstSize - currentLevel;
if (available == 0) return 0;
toConsume = messages;
if (available < toConsume) toConsume = available;
} while (Interlocked.CompareExchange(ref _consumedMessages, currentLevel + toConsume, currentLevel) != currentLevel);
return toConsume;
}
private long TryThrottledWait(long messages)
{
if (BurstSize <= 0 || AverageRate <= 0) return messages;
RefillMessages();
return ConsumeMessages(messages);
}
/// <summary>
/// Get time to sleep until messages can be sent again.
/// </summary>
/// <returns>Timespan to wait.</returns>
private TimeSpan GetSleepTime()
{
var refillTime = Volatile.Read(ref _lastRefillTime);
var nextRefillTime = refillTime + (_periodTicks / AverageRate);
var currentTimeTicks = DateTime.UtcNow.Ticks;
var sleepTicks = Math.Max(nextRefillTime - currentTimeTicks, 0);
return new TimeSpan(sleepTicks);
}
/// <summary>
/// Wait that works inside synchronous methods.
/// </summary>
/// <param name="messages">Number of messages to remove.</param>
/// <returns>Returns once all Thread.Sleep have occurred.</returns>
public void Throttle(long messages)
{
if (AverageRate == NoLimit) return;
var remaining = messages;
do
{
remaining -= TryThrottledWait(remaining);
var sleepTime = GetSleepTime();
if (sleepTime.Ticks != default(long))
{
Thread.Sleep(sleepTime);
}
}
while (remaining != 0);
}
/// <summary>
/// Wait that works inside synchronous methods.
/// </summary>
/// <returns>Returns once all Thread.Sleep have occurred.</returns>
public void Throttle()
{
Throttle(1);
}
/// <summary>
/// Wait that works inside Async methods.
/// </summary>
/// <param name="messages">Number of messages to remove.</param>
/// <returns>Returns once all Task.Delays have occurred</returns>
public async Task ThrottleAsync(long messages)
{
if (AverageRate == NoLimit) return;
var remaining = messages;
do
{
remaining -= TryThrottledWait(remaining);
if (remaining == 0)
{
break;
}
var sleepTime = GetSleepTime();
if (sleepTime.Ticks != default(long))
{
await Task.Delay(sleepTime).ConfigureAwait(false);
}
}
while (remaining != 0);
}
/// <summary>
/// Wait that works inside Async methods.
/// </summary>
/// <returns>Returns once all Task.Delays have occurred</returns>
public async Task ThrottleAsync()
{
await ThrottleAsync(1);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment