Skip to content

Instantly share code, notes, and snippets.

@emaphp
Created September 3, 2025 17:27
Show Gist options
  • Select an option

  • Save emaphp/749160832b614c0afdcfd99edbf525a3 to your computer and use it in GitHub Desktop.

Select an option

Save emaphp/749160832b614c0afdcfd99edbf525a3 to your computer and use it in GitHub Desktop.
Double buffer queue
package main
import (
"log"
"math/rand"
"os"
"slices"
"sync"
"time"
)
// A value representing a state transition that needs to be handled
type OrderedQueueState int
const (
ORDERED_QUEUE_SHUTDOWN OrderedQueueState = iota
ORDERED_QUEUE_SWITCH
ORDERED_QUEUE_FULL
)
// The type of value to store
type OrderedQueueItem int
// A subscriber exposes a method that returns the channel it will be using to read values
// It also includes a subscription ID
type OrderedQueueSubscriber interface {
SubscriptionId() int
InputCh() chan OrderedQueueItem
}
// A signal allows managing the execution of a subscriber
type OrderedQueueSignal struct {
SignalCh chan struct{}
WaitGroup *sync.WaitGroup
CloseCh chan struct{}
}
func NewOrderedQueueSignal() *OrderedQueueSignal {
var subsWg sync.WaitGroup
signalCh := make(chan struct{})
closeCh := make(chan struct{})
return &OrderedQueueSignal{
SignalCh: signalCh,
WaitGroup: &subsWg,
CloseCh: closeCh,
}
}
type OrderedQueueOpts struct {
// How often the queue needs to be flushed
Interval time.Duration
// Queue size
Size int
// If items are sorted before broadcast
Sorted bool
// The amount of goroutines used to broadcast
Workers int
}
// An OrderedQueue is a double-buffer queue that periodically flushes a partially ordered list
// to a set of subscribers
// The queue is flushed whenever the buffers gets full or the timer triggers a swap
type OrderedQueue struct {
opts *OrderedQueueOpts
mutex sync.RWMutex
buffers [2][]OrderedQueueItem
front int
back int
subscribers []OrderedQueueSubscriber
signal *OrderedQueueSignal
}
// A worker that dispatches items to a list of subscribers
type OrderedQueueDispatcher struct {
workerId int
subscribers []OrderedQueueSubscriber
}
func NewOrderedQueueDispatcher(workerId int, subscribers []OrderedQueueSubscriber) *OrderedQueueDispatcher {
return &OrderedQueueDispatcher{
workerId,
subscribers,
}
}
func (dispatcher *OrderedQueueDispatcher) Run(workerWg *sync.WaitGroup, fanout chan chan OrderedQueueItem, closeCh chan struct{}) {
defer workerWg.Done()
for {
select {
case <-closeCh:
log.Printf("worker #%d closing!", dispatcher.workerId)
return
case inputCh := <-fanout:
log.Printf("worker #%d running!", dispatcher.workerId)
for item := range inputCh {
for _, subscriber := range dispatcher.subscribers {
subscriber.InputCh() <- item
}
}
}
}
}
func NewOrderedQueue(opts *OrderedQueueOpts) *OrderedQueue {
buffers := [2][]OrderedQueueItem{
make([]OrderedQueueItem, 0, opts.Size),
make([]OrderedQueueItem, 0, opts.Size),
}
return &OrderedQueue{
opts: opts,
buffers: buffers,
front: 0,
back: 1,
signal: NewOrderedQueueSignal(),
}
}
func (queue *OrderedQueue) Signal() *OrderedQueueSignal {
return queue.signal
}
func (queue *OrderedQueue) write(item OrderedQueueItem) int {
queue.mutex.Lock()
defer queue.mutex.Unlock()
queue.buffers[queue.back] = append(queue.buffers[queue.back], item)
return len(queue.buffers[queue.back])
}
func (queue *OrderedQueue) writeUnsafe(item OrderedQueueItem) int {
queue.buffers[queue.back] = append(queue.buffers[queue.back], item)
return len(queue.buffers[queue.back])
}
func (queue *OrderedQueue) read() []OrderedQueueItem {
queue.mutex.RLock()
defer queue.mutex.RUnlock()
items := queue.buffers[queue.front][:]
if len(items) == 0 {
return nil
}
if queue.opts.Sorted {
slices.SortFunc(items, func(a, b OrderedQueueItem) int {
if a < b {
return -1
}
if a > b {
return 1
}
return 0
})
}
return items
}
func (queue *OrderedQueue) readUnsafe() []OrderedQueueItem {
items := queue.buffers[queue.front][:]
if len(items) == 0 {
return nil
}
if queue.opts.Sorted {
slices.SortFunc(items, func(a, b OrderedQueueItem) int {
if a < b {
return -1
}
if a > b {
return 1
}
return 0
})
}
return items
}
func (queue *OrderedQueue) swap() {
queue.mutex.Lock()
defer queue.mutex.Unlock()
queue.buffers[queue.front] = queue.buffers[queue.front][:0]
queue.front = (queue.front + 1) % 2
queue.back = (queue.front + 1) % 2
}
func (queue *OrderedQueue) swapUnsafe() {
queue.buffers[queue.front] = queue.buffers[queue.front][:0]
queue.front = (queue.front + 1) % 2
queue.back = (queue.front + 1) % 2
}
// Handles the state transitions
func (queue *OrderedQueue) doSwitch(runWg *sync.WaitGroup, stateCh <-chan OrderedQueueState, outputCh chan<- []OrderedQueueItem) {
defer runWg.Done()
for {
select {
case state := <-stateCh:
if state == ORDERED_QUEUE_SHUTDOWN {
queue.swapUnsafe()
items := queue.readUnsafe()
if len(items) > 0 {
outputCh <- items
}
close(outputCh)
return
}
if state == ORDERED_QUEUE_FULL {
outputCh <- queue.readUnsafe()
} else if state == ORDERED_QUEUE_SWITCH {
queue.swap()
outputCh <- queue.readUnsafe()
}
}
}
}
// It process the values to be pushed to the queue and generates the state transitions
func (queue *OrderedQueue) doRead(runWg *sync.WaitGroup, doneCh chan struct{}, stateCh chan<- OrderedQueueState, inputCh <-chan int) {
defer runWg.Done()
ticker := time.NewTicker(queue.opts.Interval)
for {
select {
case <-doneCh:
ticker.Stop()
stateCh <- ORDERED_QUEUE_SHUTDOWN
return
case <-ticker.C:
stateCh <- ORDERED_QUEUE_SWITCH
continue
case in := <-inputCh:
flush := false
queue.mutex.Lock()
total := queue.writeUnsafe(OrderedQueueItem(in))
if total >= queue.opts.Size {
queue.swapUnsafe()
flush = true
}
queue.mutex.Unlock()
if flush {
ticker.Reset(queue.opts.Interval)
stateCh <- ORDERED_QUEUE_FULL
}
}
}
}
func (queue *OrderedQueue) doDispatch(runWg *sync.WaitGroup, outputCh <-chan []OrderedQueueItem, doneCh chan struct{}) {
defer runWg.Done()
fanout := make(chan chan OrderedQueueItem)
closeCh := make(chan struct{})
// Initialize workers
var workerWg sync.WaitGroup
for i := range queue.opts.Workers {
workerWg.Add(1)
worker := NewOrderedQueueDispatcher(i+1, queue.subscribers)
go worker.Run(&workerWg, fanout, closeCh)
}
// Signal the subscribers to start reading items
for range queue.subscribers {
queue.signal.WaitGroup.Add(1)
}
close(queue.signal.SignalCh)
for {
select {
case <-doneCh:
time.Sleep(500 * time.Millisecond)
// Send any remaining values
items, ok := <-outputCh
if ok {
out := make(chan OrderedQueueItem)
fanout <- out
for _, item := range items {
out <- item
}
close(out)
}
// Let workers know they must exit
close(closeCh)
workerWg.Wait()
// Signal subscribers to end their execution
close(queue.signal.CloseCh)
queue.signal.WaitGroup.Wait()
return
case items := <-outputCh:
// Prevent workers handling nil lists
if len(items) == 0 {
continue
}
// Send a new channel for which we'll send the values
out := make(chan OrderedQueueItem)
fanout <- out
for _, item := range items {
out <- item
}
close(out)
continue
}
}
}
// Keep reading values from inputCh until doneCh signal
func (queue *OrderedQueue) Run(wg *sync.WaitGroup, inputCh <-chan int, doneCh chan struct{}) {
stateCh := make(chan OrderedQueueState)
outputCh := make(chan []OrderedQueueItem)
var runWg sync.WaitGroup
runWg.Add(1)
go queue.doDispatch(&runWg, outputCh, doneCh)
runWg.Add(1)
go queue.doSwitch(&runWg, stateCh, outputCh)
runWg.Add(1)
go queue.doRead(&runWg, doneCh, stateCh, inputCh)
go func() {
defer wg.Done()
<-doneCh
runWg.Wait()
}()
}
func (queue *OrderedQueue) AddSubscriber(subscriber OrderedQueueSubscriber) {
queue.subscribers = append(queue.subscribers, subscriber)
}
// An exmple subscriber implementation
type DummySubscriber struct {
subscriptionId int
inputCh chan OrderedQueueItem
}
func NewDummySubscriber(subscriptionId int) *DummySubscriber {
inputCh := make(chan OrderedQueueItem)
return &DummySubscriber{
subscriptionId,
inputCh,
}
}
func (s *DummySubscriber) SubscriptionId() int {
return s.subscriptionId
}
func (s *DummySubscriber) InputCh() chan OrderedQueueItem {
return s.inputCh
}
func (s *DummySubscriber) Run(signal *OrderedQueueSignal) {
<-signal.SignalCh
defer signal.WaitGroup.Done()
total := 0
for {
select {
case <-signal.CloseCh:
log.Printf("subscriber #%d closing [received: %d]", s.subscriptionId, total)
return
case msg := <-s.inputCh:
log.Printf("subscriber #%d got value %d", s.subscriptionId, msg)
total += 1
continue
}
}
}
var _ OrderedQueueSubscriber = (*DummySubscriber)(nil)
func main() {
// Log to file
logFile, err := os.OpenFile("queue.log", os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil {
log.Fatalf("error opening file: %v", err)
}
defer logFile.Close()
log.SetOutput(logFile)
// Initialize queue
opts := &OrderedQueueOpts{
Interval: time.Second,
Size: 10,
Sorted: true,
Workers: 2,
}
queue := NewOrderedQueue(opts)
// Signals the end of the program
waitCh := make(chan struct{})
// Main input channel
inputCh := make(chan int)
var wg sync.WaitGroup
// The producer routine emits random values periodically
wg.Add(1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
count := 0
limit := 1000
for {
if count >= limit {
log.Printf("producer closing. total emitted %d", count)
return
}
delay := time.Duration(rand.Intn(1000))
wait := delay * time.Millisecond
amount := 90 + rand.Intn(50)
select {
case <-waitCh:
log.Printf("producer closing. total emitted %d", count)
return
case <-time.After(wait):
for range amount {
randomNumber := rand.Intn(1000)
inputCh <- randomNumber
count += 1
}
}
}
}(&wg)
// Setup subscribers
dummy1 := NewDummySubscriber(1)
dummy2 := NewDummySubscriber(2)
queue.AddSubscriber(dummy1)
queue.AddSubscriber(dummy2)
go dummy1.Run(queue.Signal())
go dummy2.Run(queue.Signal())
wg.Add(1)
queue.Run(&wg, inputCh, waitCh)
go func() {
<-time.After(time.Second * 10)
log.Printf("sending close signal")
close(waitCh)
}()
wg.Wait()
log.Printf("finished")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment