Last active
October 10, 2023 23:42
-
-
Save rodrigobrito/f6b2bf13cee4658be2a5846fa968faf8 to your computer and use it in GitHub Desktop.
C# Throttling management
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| [TestClass] | |
| public class ThrottlingTest | |
| { | |
| [TestMethod] | |
| public void Should_ProcessOneHundredMessages_In_OneHundredMillisecond() | |
| { | |
| var throttle = new Throttler(100, TimeSpan.FromMilliseconds(100)); | |
| var stopWatch = new Stopwatch(); | |
| stopWatch.Start(); | |
| for (var i = 1; i <= 100; i++) | |
| { | |
| throttle.Throttle(); | |
| } | |
| stopWatch.Stop(); | |
| Assert.AreEqual(100, RoundHundred(stopWatch.Elapsed.TotalMilliseconds)); | |
| } | |
| [TestMethod] | |
| public void Should_ProcessOneHundredMessages_In_OneSecond() | |
| { | |
| var throttle = new Throttler(100, TimeSpan.FromSeconds(1)); | |
| var stopWatch = new Stopwatch(); | |
| stopWatch.Start(); | |
| for (var i = 1; i <= 100; i++) | |
| { | |
| throttle.Throttle(); | |
| } | |
| stopWatch.Stop(); | |
| Assert.AreEqual(1, Math.Round(stopWatch.Elapsed.TotalSeconds)); | |
| } | |
| [TestMethod] | |
| public void Should_ProcessOneHundredMessages_In_TwoMinutes() | |
| { | |
| var throttle = new Throttler(100, TimeSpan.FromMinutes(2)); | |
| var stopWatch = new Stopwatch(); | |
| stopWatch.Start(); | |
| for (var i = 1; i <= 100; i++) | |
| { | |
| throttle.Throttle(); | |
| } | |
| stopWatch.Stop(); | |
| Assert.AreEqual(2, Math.Round(stopWatch.Elapsed.TotalMinutes)); | |
| } | |
| private static double RoundHundred(double val) | |
| { | |
| return Math.Round(value: val / 100, digits: 0) * 100; | |
| } | |
| } | |
| public class Throttler | |
| { | |
| public const long NoLimit = -1; | |
| private long _consumedMessages; // Messages or pieces of messages | |
| private long _lastRefillTime; | |
| private long _periodTicks; | |
| public Throttler() : this(NoLimit, TimeSpan.FromSeconds(1)) | |
| { | |
| } | |
| public Throttler(long averageRate, TimeSpan period, long burstSize = 1) | |
| { | |
| BurstSize = burstSize; | |
| AverageRate = averageRate; | |
| Period = period; | |
| } | |
| #region Properties | |
| public long BurstSize { get; } | |
| public long AverageRate { get; } | |
| public TimeSpan Period | |
| { | |
| get => new TimeSpan(_periodTicks); | |
| set => _periodTicks = value.Ticks; | |
| } | |
| #endregion | |
| /// <summary> | |
| /// Compute elapsed time using DateTime.UtcNow.Ticks and refill messages using period and average rate. | |
| /// </summary> | |
| private void RefillMessages() | |
| { | |
| var currentTick = DateTime.UtcNow.Ticks; | |
| // Last refill time in ticks unit | |
| var refillTime = Volatile.Read(ref _lastRefillTime); | |
| // Time delta in ticks unit | |
| var ticksDelta = currentTick - refillTime; | |
| var averageRate = ticksDelta * AverageRate / _periodTicks; | |
| if (averageRate <= 0) return; | |
| var newDetalPeriod = averageRate * _periodTicks / AverageRate; | |
| var newRefillTime = refillTime == 0 ? currentTick : refillTime + newDetalPeriod; | |
| // Only try to refill new ticks delta if no other thread has beaten us to the update _lastRefillTime | |
| if (Interlocked.CompareExchange(ref _lastRefillTime, newRefillTime, refillTime) != refillTime) | |
| { | |
| return; | |
| } | |
| // Loop until we succeed in refilling "new ticks delta" | |
| // This is why we need to make sure the refill is atomic | |
| long newLevel; | |
| long currentLevel; | |
| do | |
| { | |
| currentLevel = Volatile.Read(ref _consumedMessages); | |
| var adjustedLevel = Math.Min(currentLevel, BurstSize); // In case burstSize decreased | |
| newLevel = Math.Max(0, adjustedLevel - averageRate); | |
| } while (Interlocked.CompareExchange(ref _consumedMessages, newLevel, currentLevel) != currentLevel); | |
| } | |
| private long ConsumeMessages(long messages) | |
| { | |
| long currentLevel; | |
| long toConsume; | |
| do | |
| { | |
| currentLevel = Volatile.Read(ref _consumedMessages); | |
| var available = BurstSize - currentLevel; | |
| if (available == 0) return 0; | |
| toConsume = messages; | |
| if (available < toConsume) toConsume = available; | |
| } while (Interlocked.CompareExchange(ref _consumedMessages, currentLevel + toConsume, currentLevel) != currentLevel); | |
| return toConsume; | |
| } | |
| private long TryThrottledWait(long messages) | |
| { | |
| if (BurstSize <= 0 || AverageRate <= 0) return messages; | |
| RefillMessages(); | |
| return ConsumeMessages(messages); | |
| } | |
| /// <summary> | |
| /// Get time to sleep until messages can be sent again. | |
| /// </summary> | |
| /// <returns>Timespan to wait.</returns> | |
| private TimeSpan GetSleepTime() | |
| { | |
| var refillTime = Volatile.Read(ref _lastRefillTime); | |
| var nextRefillTime = refillTime + (_periodTicks / AverageRate); | |
| var currentTimeTicks = DateTime.UtcNow.Ticks; | |
| var sleepTicks = Math.Max(nextRefillTime - currentTimeTicks, 0); | |
| return new TimeSpan(sleepTicks); | |
| } | |
| /// <summary> | |
| /// Wait that works inside synchronous methods. | |
| /// </summary> | |
| /// <param name="messages">Number of messages to remove.</param> | |
| /// <returns>Returns once all Thread.Sleep have occurred.</returns> | |
| public void Throttle(long messages) | |
| { | |
| if (AverageRate == NoLimit) return; | |
| var remaining = messages; | |
| do | |
| { | |
| remaining -= TryThrottledWait(remaining); | |
| var sleepTime = GetSleepTime(); | |
| if (sleepTime.Ticks != default(long)) | |
| { | |
| Thread.Sleep(sleepTime); | |
| } | |
| } | |
| while (remaining != 0); | |
| } | |
| /// <summary> | |
| /// Wait that works inside synchronous methods. | |
| /// </summary> | |
| /// <returns>Returns once all Thread.Sleep have occurred.</returns> | |
| public void Throttle() | |
| { | |
| Throttle(1); | |
| } | |
| /// <summary> | |
| /// Wait that works inside Async methods. | |
| /// </summary> | |
| /// <param name="messages">Number of messages to remove.</param> | |
| /// <returns>Returns once all Task.Delays have occurred</returns> | |
| public async Task ThrottleAsync(long messages) | |
| { | |
| if (AverageRate == NoLimit) return; | |
| var remaining = messages; | |
| do | |
| { | |
| remaining -= TryThrottledWait(remaining); | |
| if (remaining == 0) | |
| { | |
| break; | |
| } | |
| var sleepTime = GetSleepTime(); | |
| if (sleepTime.Ticks != default(long)) | |
| { | |
| await Task.Delay(sleepTime).ConfigureAwait(false); | |
| } | |
| } | |
| while (remaining != 0); | |
| } | |
| /// <summary> | |
| /// Wait that works inside Async methods. | |
| /// </summary> | |
| /// <returns>Returns once all Task.Delays have occurred</returns> | |
| public async Task ThrottleAsync() | |
| { | |
| await ThrottleAsync(1); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment