Created
January 18, 2026 23:15
-
-
Save pboothe/1acb4f33d5158754011d881151fdeada to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Package memoryless provides a Ticker that ticks according to the memoryless (Poisson) distribution. | |
| // | |
| // Use this ticker instead of time.Ticker to generate events with a Poisson distribution and avoid pathological cases. | |
| // | |
| // See https://en.wikipedia.org/wiki/Poisson_distribution for more information as well as the acronym P.A.S.T.A. | |
| package memoryless | |
| import ( | |
| "context" | |
| "errors" | |
| "math/rand" | |
| "time" | |
| "github.com/triple-pat/go/clocks" | |
| ) | |
| var errBadConfig = errors.New("bad memoryless config") | |
| // Ticker is the configuration for the memoryless ticker. | |
| // | |
| // Expected is the expected interval between ticks. | |
| // Min is the minimum interval between ticks. | |
| // Max is the maximum interval between ticks. | |
| // | |
| // If Max is not set, there is no upper bound on the interval. | |
| // | |
| // Technically, the use of Min and Max prevents the result from being a true | |
| // Poisson distribution, but it can be operationally useful to have guarantees | |
| // that this ticker will not produce events too frequently or too seldom. The | |
| // interaction between math and the real world is full of tradeoffs. | |
| // | |
| // Speaking of tradeoffs, the other tradeoff here is that the ticker will never | |
| // send a tick if there isn't a listener waiting to receive it. This means that | |
| // if the receiver is doing a long-lived task after each tick, it is impossible | |
| // for the ticker to send a tick with a period shorter than the duration of the | |
| // long-lived task. This also serves as a lower bound on the period between | |
| // ticks, and skews this away from being a perfect Poisson distribution. Despite | |
| // all those caveats, this is still about as good as you can do. | |
| // | |
| // Use this ticker to prevent accidental synchronization in long-lived systems. | |
| type Ticker struct { | |
| Clock clocks.Clock | |
| Expected time.Duration | |
| Min time.Duration | |
| Max time.Duration | |
| } | |
| // Validate returns an error if the Ticker configuration doesn't make sense. | |
| func (t Ticker) Validate() error { | |
| if t.Expected <= 0 { | |
| return errBadConfig | |
| } | |
| if t.Min < 0 || t.Min > t.Expected { | |
| return errBadConfig | |
| } | |
| if t.Max != 0 && (t.Max < t.Min || t.Max < t.Expected) { | |
| return errBadConfig | |
| } | |
| return nil | |
| } | |
| // NewTicker creates a new memoryless Ticker with the specified expected interval. | |
| // It uses the system clock and no bounds on the interval. | |
| func NewTicker(expected time.Duration) Ticker { | |
| return Ticker{ | |
| Expected: expected, | |
| Clock: clocks.System(), | |
| } | |
| } | |
| func (t Ticker) randomAcceptableWaitTime() time.Duration { | |
| // An initial value that is guaranteed to be outside the configured bounds. | |
| var wt time.Duration = -1 | |
| // Sample until we get a value that is within the configured bounds. | |
| // | |
| // I'm pretty sure that resampling is better than clamping here, but I | |
| // welcome any analysis either way. | |
| for wt < t.Min || (t.Max != 0 && wt > t.Max) { | |
| // To see that this is correct, remember that Duration is an alias for | |
| // int64, where the value is the number of nanoseconds. So we are | |
| // converting the int64 to a float64, scaling it, and then converting | |
| // back to an int64. | |
| wt = time.Duration(rand.ExpFloat64() * float64(t.Expected)) | |
| } | |
| return wt | |
| } | |
| // Tick creates a channel that will emit times at random intervals according | |
| // to the memoryless distribution. It is intended to be a drop-in replacement | |
| // for time.Tick(). | |
| // | |
| // The channel will be closed when the context is done. If you never cancel | |
| // the context, this function will leak a goroutine. The returned channel is | |
| // unbuffered, because it is important that the ticks never "bunch up" inside | |
| // the channel. | |
| // | |
| // If the Ticker does not Validate(), results are not guaranteed. | |
| // | |
| // This ticker is useful for tasks that you don't want to accidentally "sync | |
| // up" over time, like two different garbage collectors or flushing buffers | |
| // to disk. | |
| func (t Ticker) Tick(ctx context.Context) <-chan time.Time { | |
| c := make(chan time.Time) // It is important that this channel is unbuffered. | |
| // Until the context is done, sample a wait time and wait that long. | |
| go func() { | |
| defer close(c) | |
| for ctx.Err() == nil { | |
| duration := t.randomAcceptableWaitTime() | |
| // Wait until the duration has passed or the context is done. | |
| select { | |
| case now := <-t.Clock.After(duration): | |
| // Do a non-blocking send to the unbuffered channel. | |
| select { | |
| case c <- now: | |
| default: | |
| } | |
| case <-ctx.Done(): | |
| return | |
| } | |
| } | |
| }() | |
| return c | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package memoryless | |
| import ( | |
| "context" | |
| "math" | |
| "sort" | |
| "testing" | |
| "time" | |
| "github.com/stretchr/testify/require" | |
| "github.com/triple-pat/go/clocks" | |
| ) | |
| func TestConfigValidate(t *testing.T) { | |
| tests := []struct { | |
| name string | |
| cfg Ticker | |
| wantErr bool | |
| }{ | |
| { | |
| name: "valid basic config", | |
| cfg: Ticker{ | |
| Expected: time.Second, | |
| }, | |
| }, | |
| { | |
| name: "valid with min", | |
| cfg: Ticker{ | |
| Expected: time.Second, | |
| Min: 500 * time.Millisecond, | |
| }, | |
| }, | |
| { | |
| name: "valid with min and max", | |
| cfg: Ticker{ | |
| Expected: time.Second, | |
| Min: 500 * time.Millisecond, | |
| Max: 2 * time.Second, | |
| }, | |
| }, | |
| { | |
| name: "invalid - zero expected", | |
| cfg: Ticker{ | |
| Expected: 0, | |
| }, | |
| wantErr: true, | |
| }, | |
| { | |
| name: "invalid - negative expected", | |
| cfg: Ticker{ | |
| Expected: -time.Second, | |
| }, | |
| wantErr: true, | |
| }, | |
| { | |
| name: "invalid - min > expected", | |
| cfg: Ticker{ | |
| Expected: time.Second, | |
| Min: 2 * time.Second, | |
| }, | |
| wantErr: true, | |
| }, | |
| { | |
| name: "invalid - max < expected", | |
| cfg: Ticker{ | |
| Expected: time.Second, | |
| Min: 500 * time.Millisecond, | |
| Max: 750 * time.Millisecond, | |
| }, | |
| wantErr: true, | |
| }, | |
| } | |
| for _, tt := range tests { | |
| t.Run(tt.name, func(t *testing.T) { | |
| err := tt.cfg.Validate() | |
| if tt.wantErr { | |
| require.Error(t, err) | |
| } else { | |
| require.NoError(t, err) | |
| } | |
| }) | |
| } | |
| } | |
| // This test will spuriously fail about 1/100 times due to the 1% critical value. | |
| // Because of the inherent flakiness, we call it a long test and run it rarely. | |
| func TestWaitTimeDistribution(t *testing.T) { | |
| if testing.Short() { | |
| t.Skip("skipping statistical significance test, as it is inherently flaky because of the 1% critical value") | |
| } | |
| cfg := Ticker{ | |
| Expected: 10 * time.Millisecond, | |
| } | |
| require.NoError(t, cfg.Validate()) | |
| // Generate a moderate sample size - large enough for statistical significance | |
| // but not so large that tiny deviations cause failures | |
| const sampleSize = 10_000 | |
| intervals := make([]float64, sampleSize) | |
| // Collect intervals and convert to float64 seconds | |
| for i := range intervals { | |
| d := cfg.randomAcceptableWaitTime() | |
| intervals[i] = d.Seconds() | |
| } | |
| // Calculate mean for exponential parameter | |
| var sum float64 | |
| for _, v := range intervals { | |
| sum += v | |
| } | |
| mean := sum / float64(len(intervals)) | |
| lambda := 1.0 / mean // Rate parameter for exponential distribution | |
| // Sort intervals for K-S test | |
| sorted := make([]float64, len(intervals)) | |
| copy(sorted, intervals) | |
| sort.Float64s(sorted) | |
| // Perform Kolmogorov-Smirnov test | |
| maxD := 0.0 // Maximum difference between empirical and theoretical CDF | |
| for i, x := range sorted { | |
| // Empirical CDF: (i+1)/n | |
| empiricalCDF := float64(i+1) / float64(len(sorted)) | |
| // Theoretical CDF for exponential: 1 - e^(-λx) | |
| theoreticalCDF := 1.0 - math.Exp(-lambda*x) | |
| // Update maximum difference | |
| d := math.Abs(empiricalCDF - theoreticalCDF) | |
| maxD = math.Max(maxD, d) | |
| } | |
| // Critical value for α=0.01 (99% confidence) | |
| // From K-S test tables with a 5% tolerance factor | |
| criticalValue := 1.05 * math.Sqrt(-math.Log(0.005)/(2*float64(sampleSize))) | |
| // Test statistic should be less than critical value | |
| require.Less(t, maxD, criticalValue, | |
| "K-S test failed: maximum difference %v exceeds critical value %v", maxD, criticalValue) | |
| } | |
| func TestTickWithBlockingClock(t *testing.T) { | |
| clock := clocks.Blocking(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)) | |
| ticker := Ticker{ | |
| Expected: 10 * time.Second, | |
| Min: 5 * time.Second, | |
| Max: 15 * time.Second, | |
| Clock: clock, | |
| } | |
| require.NoError(t, ticker.Validate()) | |
| ctx, cancel := context.WithCancel(t.Context()) | |
| defer cancel() | |
| ch := ticker.Tick(ctx) | |
| // Create a channel to collect timestamps and a done channel | |
| const numDurations = 100 | |
| durations := make([]time.Duration, 0, numDurations) | |
| done := make(chan struct{}) | |
| // Start goroutine to collect timestamps | |
| go func() { | |
| defer close(done) | |
| var prev time.Time | |
| for len(durations) < numDurations { | |
| tick, ok := <-ch | |
| if !ok { | |
| t.Error("channel closed before collecting enough ticks") | |
| return | |
| } | |
| if !prev.IsZero() { | |
| durations = append(durations, tick.Sub(prev)) | |
| } | |
| prev = tick | |
| } | |
| }() | |
| // Advance time in small increments until the done channel is closed. | |
| // Done as a function because "break" breaks out of the select statement, not the outer loop. | |
| func() { | |
| for { | |
| select { | |
| case <-done: | |
| return | |
| default: | |
| clock.Advance(ticker.Min / 100) | |
| } | |
| } | |
| }() | |
| // Verify timestamps are monotonic and within bounds | |
| require.Len(t, durations, numDurations, "should have collected exactly %d durations", numDurations) | |
| for i, duration := range durations { | |
| require.GreaterOrEqual(t, duration.Seconds(), ticker.Min.Seconds(), | |
| "duration %v (%d) below minimum", duration, i) | |
| // We can't verify the max here without more complex coordination between the gathering goroutine and the clock. | |
| } | |
| // Test cancellation | |
| cancel() | |
| clock.Advance(ticker.Max * 2) | |
| // Channel should be closed | |
| _, ok := <-ch | |
| require.False(t, ok, "channel should be closed after cancellation") | |
| } | |
| func TestTickNoReceiver(t *testing.T) { | |
| clock := clocks.Blocking(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)) | |
| ticker := Ticker{ | |
| Expected: time.Second, | |
| Min: 500 * time.Millisecond, | |
| Max: 2 * time.Second, | |
| Clock: clock, | |
| } | |
| require.NoError(t, ticker.Validate()) | |
| // Start the Ticker. | |
| ctx, cancel := context.WithCancel(t.Context()) | |
| ch := ticker.Tick(ctx) | |
| // Give the Ticker a chance to start up before we continue. | |
| time.Sleep(time.Millisecond) | |
| // Advance time without any receiver on the channel. | |
| clock.Advance(2 * ticker.Max) | |
| // Give the Ticker a chance to react to the new time. | |
| time.Sleep(time.Millisecond) | |
| // Now try to collect a single tick, which should fail. | |
| select { | |
| case tick := <-ch: | |
| t.Errorf("received tick: %v", tick) | |
| default: | |
| // No tick available - success! | |
| } | |
| // Now verify there really was no tick written. | |
| // Cancel the context. | |
| cancel() | |
| tick, ok := <-ch | |
| require.False(t, ok, "channel should be closed after cancellation") | |
| require.True(t, tick.IsZero(), "tick should be zero time") | |
| } | |
| func TestTickerDoesNotGetStuckAfterDroppingTick(t *testing.T) { | |
| clock := clocks.Blocking(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)) | |
| ticker := Ticker{ | |
| Expected: time.Millisecond, | |
| Max: time.Second, | |
| Clock: clock, | |
| } | |
| require.NoError(t, ticker.Validate()) | |
| ch := ticker.Tick(t.Context()) | |
| // Give the Ticker a chance to start up before we continue. | |
| time.Sleep(time.Millisecond) | |
| // Advance the clock. The ticker should fire, but since there's no receiver, | |
| // the tick should be dropped. (behavior verified by the TestTickNoReceiver test) | |
| clock.Advance(2 * ticker.Max) | |
| // Give the Ticker a chance to react to the new time. | |
| time.Sleep(time.Millisecond) | |
| // Now advance the clock to trigger the next tick. | |
| clock.Advance(2 * ticker.Max) | |
| // Wait for the tick to be received. | |
| tick := <-ch | |
| require.False(t, tick.IsZero(), "tick should be non-zero time") | |
| } | |
| func TestNewTicker(t *testing.T) { | |
| expected := time.Hour | |
| ticker := NewTicker(expected) | |
| require.Equal(t, expected, ticker.Expected) | |
| require.Equal(t, time.Duration(0), ticker.Min) | |
| require.Equal(t, time.Duration(0), ticker.Max) | |
| require.Equal(t, clocks.System(), ticker.Clock) | |
| // Validate should pass for default configuration | |
| require.NoError(t, ticker.Validate()) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment