Skip to content

Instantly share code, notes, and snippets.

@pboothe
Created January 18, 2026 23:15
Show Gist options
  • Select an option

  • Save pboothe/1acb4f33d5158754011d881151fdeada to your computer and use it in GitHub Desktop.

Select an option

Save pboothe/1acb4f33d5158754011d881151fdeada to your computer and use it in GitHub Desktop.
// Package memoryless provides a Ticker that ticks according to the memoryless (Poisson) distribution.
//
// Use this ticker instead of time.Ticker to generate events with a Poisson distribution and avoid pathological cases.
//
// See https://en.wikipedia.org/wiki/Poisson_distribution for more information as well as the acronym P.A.S.T.A.
package memoryless
import (
"context"
"errors"
"math/rand"
"time"
"github.com/triple-pat/go/clocks"
)
var errBadConfig = errors.New("bad memoryless config")
// Ticker is the configuration for the memoryless ticker.
//
// Expected is the expected interval between ticks.
// Min is the minimum interval between ticks.
// Max is the maximum interval between ticks.
//
// If Max is not set, there is no upper bound on the interval.
//
// Technically, the use of Min and Max prevents the result from being a true
// Poisson distribution, but it can be operationally useful to have guarantees
// that this ticker will not produce events too frequently or too seldom. The
// interaction between math and the real world is full of tradeoffs.
//
// Speaking of tradeoffs, the other tradeoff here is that the ticker will never
// send a tick if there isn't a listener waiting to receive it. This means that
// if the receiver is doing a long-lived task after each tick, it is impossible
// for the ticker to send a tick with a period shorter than the duration of the
// long-lived task. This also serves as a lower bound on the period between
// ticks, and skews this away from being a perfect Poisson distribution. Despite
// all those caveats, this is still about as good as you can do.
//
// Use this ticker to prevent accidental synchronization in long-lived systems.
type Ticker struct {
Clock clocks.Clock
Expected time.Duration
Min time.Duration
Max time.Duration
}
// Validate returns an error if the Ticker configuration doesn't make sense.
func (t Ticker) Validate() error {
if t.Expected <= 0 {
return errBadConfig
}
if t.Min < 0 || t.Min > t.Expected {
return errBadConfig
}
if t.Max != 0 && (t.Max < t.Min || t.Max < t.Expected) {
return errBadConfig
}
return nil
}
// NewTicker creates a new memoryless Ticker with the specified expected interval.
// It uses the system clock and no bounds on the interval.
func NewTicker(expected time.Duration) Ticker {
return Ticker{
Expected: expected,
Clock: clocks.System(),
}
}
func (t Ticker) randomAcceptableWaitTime() time.Duration {
// An initial value that is guaranteed to be outside the configured bounds.
var wt time.Duration = -1
// Sample until we get a value that is within the configured bounds.
//
// I'm pretty sure that resampling is better than clamping here, but I
// welcome any analysis either way.
for wt < t.Min || (t.Max != 0 && wt > t.Max) {
// To see that this is correct, remember that Duration is an alias for
// int64, where the value is the number of nanoseconds. So we are
// converting the int64 to a float64, scaling it, and then converting
// back to an int64.
wt = time.Duration(rand.ExpFloat64() * float64(t.Expected))
}
return wt
}
// Tick creates a channel that will emit times at random intervals according
// to the memoryless distribution. It is intended to be a drop-in replacement
// for time.Tick().
//
// The channel will be closed when the context is done. If you never cancel
// the context, this function will leak a goroutine. The returned channel is
// unbuffered, because it is important that the ticks never "bunch up" inside
// the channel.
//
// If the Ticker does not Validate(), results are not guaranteed.
//
// This ticker is useful for tasks that you don't want to accidentally "sync
// up" over time, like two different garbage collectors or flushing buffers
// to disk.
func (t Ticker) Tick(ctx context.Context) <-chan time.Time {
c := make(chan time.Time) // It is important that this channel is unbuffered.
// Until the context is done, sample a wait time and wait that long.
go func() {
defer close(c)
for ctx.Err() == nil {
duration := t.randomAcceptableWaitTime()
// Wait until the duration has passed or the context is done.
select {
case now := <-t.Clock.After(duration):
// Do a non-blocking send to the unbuffered channel.
select {
case c <- now:
default:
}
case <-ctx.Done():
return
}
}
}()
return c
}
package memoryless
import (
"context"
"math"
"sort"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/triple-pat/go/clocks"
)
func TestConfigValidate(t *testing.T) {
tests := []struct {
name string
cfg Ticker
wantErr bool
}{
{
name: "valid basic config",
cfg: Ticker{
Expected: time.Second,
},
},
{
name: "valid with min",
cfg: Ticker{
Expected: time.Second,
Min: 500 * time.Millisecond,
},
},
{
name: "valid with min and max",
cfg: Ticker{
Expected: time.Second,
Min: 500 * time.Millisecond,
Max: 2 * time.Second,
},
},
{
name: "invalid - zero expected",
cfg: Ticker{
Expected: 0,
},
wantErr: true,
},
{
name: "invalid - negative expected",
cfg: Ticker{
Expected: -time.Second,
},
wantErr: true,
},
{
name: "invalid - min > expected",
cfg: Ticker{
Expected: time.Second,
Min: 2 * time.Second,
},
wantErr: true,
},
{
name: "invalid - max < expected",
cfg: Ticker{
Expected: time.Second,
Min: 500 * time.Millisecond,
Max: 750 * time.Millisecond,
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.cfg.Validate()
if tt.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}
// This test will spuriously fail about 1/100 times due to the 1% critical value.
// Because of the inherent flakiness, we call it a long test and run it rarely.
func TestWaitTimeDistribution(t *testing.T) {
if testing.Short() {
t.Skip("skipping statistical significance test, as it is inherently flaky because of the 1% critical value")
}
cfg := Ticker{
Expected: 10 * time.Millisecond,
}
require.NoError(t, cfg.Validate())
// Generate a moderate sample size - large enough for statistical significance
// but not so large that tiny deviations cause failures
const sampleSize = 10_000
intervals := make([]float64, sampleSize)
// Collect intervals and convert to float64 seconds
for i := range intervals {
d := cfg.randomAcceptableWaitTime()
intervals[i] = d.Seconds()
}
// Calculate mean for exponential parameter
var sum float64
for _, v := range intervals {
sum += v
}
mean := sum / float64(len(intervals))
lambda := 1.0 / mean // Rate parameter for exponential distribution
// Sort intervals for K-S test
sorted := make([]float64, len(intervals))
copy(sorted, intervals)
sort.Float64s(sorted)
// Perform Kolmogorov-Smirnov test
maxD := 0.0 // Maximum difference between empirical and theoretical CDF
for i, x := range sorted {
// Empirical CDF: (i+1)/n
empiricalCDF := float64(i+1) / float64(len(sorted))
// Theoretical CDF for exponential: 1 - e^(-λx)
theoreticalCDF := 1.0 - math.Exp(-lambda*x)
// Update maximum difference
d := math.Abs(empiricalCDF - theoreticalCDF)
maxD = math.Max(maxD, d)
}
// Critical value for α=0.01 (99% confidence)
// From K-S test tables with a 5% tolerance factor
criticalValue := 1.05 * math.Sqrt(-math.Log(0.005)/(2*float64(sampleSize)))
// Test statistic should be less than critical value
require.Less(t, maxD, criticalValue,
"K-S test failed: maximum difference %v exceeds critical value %v", maxD, criticalValue)
}
func TestTickWithBlockingClock(t *testing.T) {
clock := clocks.Blocking(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC))
ticker := Ticker{
Expected: 10 * time.Second,
Min: 5 * time.Second,
Max: 15 * time.Second,
Clock: clock,
}
require.NoError(t, ticker.Validate())
ctx, cancel := context.WithCancel(t.Context())
defer cancel()
ch := ticker.Tick(ctx)
// Create a channel to collect timestamps and a done channel
const numDurations = 100
durations := make([]time.Duration, 0, numDurations)
done := make(chan struct{})
// Start goroutine to collect timestamps
go func() {
defer close(done)
var prev time.Time
for len(durations) < numDurations {
tick, ok := <-ch
if !ok {
t.Error("channel closed before collecting enough ticks")
return
}
if !prev.IsZero() {
durations = append(durations, tick.Sub(prev))
}
prev = tick
}
}()
// Advance time in small increments until the done channel is closed.
// Done as a function because "break" breaks out of the select statement, not the outer loop.
func() {
for {
select {
case <-done:
return
default:
clock.Advance(ticker.Min / 100)
}
}
}()
// Verify timestamps are monotonic and within bounds
require.Len(t, durations, numDurations, "should have collected exactly %d durations", numDurations)
for i, duration := range durations {
require.GreaterOrEqual(t, duration.Seconds(), ticker.Min.Seconds(),
"duration %v (%d) below minimum", duration, i)
// We can't verify the max here without more complex coordination between the gathering goroutine and the clock.
}
// Test cancellation
cancel()
clock.Advance(ticker.Max * 2)
// Channel should be closed
_, ok := <-ch
require.False(t, ok, "channel should be closed after cancellation")
}
func TestTickNoReceiver(t *testing.T) {
clock := clocks.Blocking(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC))
ticker := Ticker{
Expected: time.Second,
Min: 500 * time.Millisecond,
Max: 2 * time.Second,
Clock: clock,
}
require.NoError(t, ticker.Validate())
// Start the Ticker.
ctx, cancel := context.WithCancel(t.Context())
ch := ticker.Tick(ctx)
// Give the Ticker a chance to start up before we continue.
time.Sleep(time.Millisecond)
// Advance time without any receiver on the channel.
clock.Advance(2 * ticker.Max)
// Give the Ticker a chance to react to the new time.
time.Sleep(time.Millisecond)
// Now try to collect a single tick, which should fail.
select {
case tick := <-ch:
t.Errorf("received tick: %v", tick)
default:
// No tick available - success!
}
// Now verify there really was no tick written.
// Cancel the context.
cancel()
tick, ok := <-ch
require.False(t, ok, "channel should be closed after cancellation")
require.True(t, tick.IsZero(), "tick should be zero time")
}
func TestTickerDoesNotGetStuckAfterDroppingTick(t *testing.T) {
clock := clocks.Blocking(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC))
ticker := Ticker{
Expected: time.Millisecond,
Max: time.Second,
Clock: clock,
}
require.NoError(t, ticker.Validate())
ch := ticker.Tick(t.Context())
// Give the Ticker a chance to start up before we continue.
time.Sleep(time.Millisecond)
// Advance the clock. The ticker should fire, but since there's no receiver,
// the tick should be dropped. (behavior verified by the TestTickNoReceiver test)
clock.Advance(2 * ticker.Max)
// Give the Ticker a chance to react to the new time.
time.Sleep(time.Millisecond)
// Now advance the clock to trigger the next tick.
clock.Advance(2 * ticker.Max)
// Wait for the tick to be received.
tick := <-ch
require.False(t, tick.IsZero(), "tick should be non-zero time")
}
func TestNewTicker(t *testing.T) {
expected := time.Hour
ticker := NewTicker(expected)
require.Equal(t, expected, ticker.Expected)
require.Equal(t, time.Duration(0), ticker.Min)
require.Equal(t, time.Duration(0), ticker.Max)
require.Equal(t, clocks.System(), ticker.Clock)
// Validate should pass for default configuration
require.NoError(t, ticker.Validate())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment