Skip to content

Instantly share code, notes, and snippets.

@cghiban
Last active March 2, 2026 23:04
Show Gist options
  • Select an option

  • Save cghiban/da14da1aefdf8b86526884f0a54d1f85 to your computer and use it in GitHub Desktop.

Select an option

Save cghiban/da14da1aefdf8b86526884f0a54d1f85 to your computer and use it in GitHub Desktop.
accumulator-batcher pattern
package main
import (
"context"
"fmt"
"math/rand"
"os/signal"
"syscall"
"time"
)
type processor[T any] func([]T)
func produce(ch chan<- string) {
go func() {
for {
ch <- fmt.Sprintf("%d", rand.Intn(100))
time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond)
}
}()
}
func consume[T any](ctx context.Context, ch <-chan T, f processor[T]) {
ticker := time.NewTicker(500 * time.Millisecond)
batch := []T{}
for {
select {
case v := <-ch:
batch = append(batch, v)
// if len(batch) >= 25 { // Flush if we hit 10 items regardless of the timer
// f(batch)
// batch = batch[:0]
// }
case <-ticker.C:
f(batch)
batch = batch[:0] //keep memory allocated
case <-ctx.Done():
ticker.Stop()
fmt.Printf("Done: %s\n", ctx.Err())
if len(batch) >= 0 {
f(batch)
}
return
}
}
}
func main() {
ctx, cancel := signal.NotifyContext(context.Background(),
syscall.SIGINT,
syscall.SIGTERM,
)
defer cancel()
batcher := func(data []string) {
// do something with the accumulated data
fmt.Printf("%s: @%v\n", time.Now().Format(time.RFC3339), data)
}
//in := make(chan int)
in := make(chan string)
go consume(ctx, in, batcher) // first we create a consumer/receiver
go produce(in) // 2nd, create a producer/sender
<-ctx.Done()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment