Created
September 3, 2025 17:27
-
-
Save emaphp/749160832b614c0afdcfd99edbf525a3 to your computer and use it in GitHub Desktop.
Double buffer queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| import ( | |
| "log" | |
| "math/rand" | |
| "os" | |
| "slices" | |
| "sync" | |
| "time" | |
| ) | |
| // A value representing a state transition that needs to be handled | |
| type OrderedQueueState int | |
| const ( | |
| ORDERED_QUEUE_SHUTDOWN OrderedQueueState = iota | |
| ORDERED_QUEUE_SWITCH | |
| ORDERED_QUEUE_FULL | |
| ) | |
| // The type of value to store | |
| type OrderedQueueItem int | |
| // A subscriber exposes a method that returns the channel it will be using to read values | |
| // It also includes a subscription ID | |
| type OrderedQueueSubscriber interface { | |
| SubscriptionId() int | |
| InputCh() chan OrderedQueueItem | |
| } | |
| // A signal allows managing the execution of a subscriber | |
| type OrderedQueueSignal struct { | |
| SignalCh chan struct{} | |
| WaitGroup *sync.WaitGroup | |
| CloseCh chan struct{} | |
| } | |
| func NewOrderedQueueSignal() *OrderedQueueSignal { | |
| var subsWg sync.WaitGroup | |
| signalCh := make(chan struct{}) | |
| closeCh := make(chan struct{}) | |
| return &OrderedQueueSignal{ | |
| SignalCh: signalCh, | |
| WaitGroup: &subsWg, | |
| CloseCh: closeCh, | |
| } | |
| } | |
| type OrderedQueueOpts struct { | |
| // How often the queue needs to be flushed | |
| Interval time.Duration | |
| // Queue size | |
| Size int | |
| // If items are sorted before broadcast | |
| Sorted bool | |
| // The amount of goroutines used to broadcast | |
| Workers int | |
| } | |
| // An OrderedQueue is a double-buffer queue that periodically flushes a partially ordered list | |
| // to a set of subscribers | |
| // The queue is flushed whenever the buffers gets full or the timer triggers a swap | |
| type OrderedQueue struct { | |
| opts *OrderedQueueOpts | |
| mutex sync.RWMutex | |
| buffers [2][]OrderedQueueItem | |
| front int | |
| back int | |
| subscribers []OrderedQueueSubscriber | |
| signal *OrderedQueueSignal | |
| } | |
| // A worker that dispatches items to a list of subscribers | |
| type OrderedQueueDispatcher struct { | |
| workerId int | |
| subscribers []OrderedQueueSubscriber | |
| } | |
| func NewOrderedQueueDispatcher(workerId int, subscribers []OrderedQueueSubscriber) *OrderedQueueDispatcher { | |
| return &OrderedQueueDispatcher{ | |
| workerId, | |
| subscribers, | |
| } | |
| } | |
| func (dispatcher *OrderedQueueDispatcher) Run(workerWg *sync.WaitGroup, fanout chan chan OrderedQueueItem, closeCh chan struct{}) { | |
| defer workerWg.Done() | |
| for { | |
| select { | |
| case <-closeCh: | |
| log.Printf("worker #%d closing!", dispatcher.workerId) | |
| return | |
| case inputCh := <-fanout: | |
| log.Printf("worker #%d running!", dispatcher.workerId) | |
| for item := range inputCh { | |
| for _, subscriber := range dispatcher.subscribers { | |
| subscriber.InputCh() <- item | |
| } | |
| } | |
| } | |
| } | |
| } | |
| func NewOrderedQueue(opts *OrderedQueueOpts) *OrderedQueue { | |
| buffers := [2][]OrderedQueueItem{ | |
| make([]OrderedQueueItem, 0, opts.Size), | |
| make([]OrderedQueueItem, 0, opts.Size), | |
| } | |
| return &OrderedQueue{ | |
| opts: opts, | |
| buffers: buffers, | |
| front: 0, | |
| back: 1, | |
| signal: NewOrderedQueueSignal(), | |
| } | |
| } | |
| func (queue *OrderedQueue) Signal() *OrderedQueueSignal { | |
| return queue.signal | |
| } | |
| func (queue *OrderedQueue) write(item OrderedQueueItem) int { | |
| queue.mutex.Lock() | |
| defer queue.mutex.Unlock() | |
| queue.buffers[queue.back] = append(queue.buffers[queue.back], item) | |
| return len(queue.buffers[queue.back]) | |
| } | |
| func (queue *OrderedQueue) writeUnsafe(item OrderedQueueItem) int { | |
| queue.buffers[queue.back] = append(queue.buffers[queue.back], item) | |
| return len(queue.buffers[queue.back]) | |
| } | |
| func (queue *OrderedQueue) read() []OrderedQueueItem { | |
| queue.mutex.RLock() | |
| defer queue.mutex.RUnlock() | |
| items := queue.buffers[queue.front][:] | |
| if len(items) == 0 { | |
| return nil | |
| } | |
| if queue.opts.Sorted { | |
| slices.SortFunc(items, func(a, b OrderedQueueItem) int { | |
| if a < b { | |
| return -1 | |
| } | |
| if a > b { | |
| return 1 | |
| } | |
| return 0 | |
| }) | |
| } | |
| return items | |
| } | |
| func (queue *OrderedQueue) readUnsafe() []OrderedQueueItem { | |
| items := queue.buffers[queue.front][:] | |
| if len(items) == 0 { | |
| return nil | |
| } | |
| if queue.opts.Sorted { | |
| slices.SortFunc(items, func(a, b OrderedQueueItem) int { | |
| if a < b { | |
| return -1 | |
| } | |
| if a > b { | |
| return 1 | |
| } | |
| return 0 | |
| }) | |
| } | |
| return items | |
| } | |
| func (queue *OrderedQueue) swap() { | |
| queue.mutex.Lock() | |
| defer queue.mutex.Unlock() | |
| queue.buffers[queue.front] = queue.buffers[queue.front][:0] | |
| queue.front = (queue.front + 1) % 2 | |
| queue.back = (queue.front + 1) % 2 | |
| } | |
| func (queue *OrderedQueue) swapUnsafe() { | |
| queue.buffers[queue.front] = queue.buffers[queue.front][:0] | |
| queue.front = (queue.front + 1) % 2 | |
| queue.back = (queue.front + 1) % 2 | |
| } | |
| // Handles the state transitions | |
| func (queue *OrderedQueue) doSwitch(runWg *sync.WaitGroup, stateCh <-chan OrderedQueueState, outputCh chan<- []OrderedQueueItem) { | |
| defer runWg.Done() | |
| for { | |
| select { | |
| case state := <-stateCh: | |
| if state == ORDERED_QUEUE_SHUTDOWN { | |
| queue.swapUnsafe() | |
| items := queue.readUnsafe() | |
| if len(items) > 0 { | |
| outputCh <- items | |
| } | |
| close(outputCh) | |
| return | |
| } | |
| if state == ORDERED_QUEUE_FULL { | |
| outputCh <- queue.readUnsafe() | |
| } else if state == ORDERED_QUEUE_SWITCH { | |
| queue.swap() | |
| outputCh <- queue.readUnsafe() | |
| } | |
| } | |
| } | |
| } | |
| // It process the values to be pushed to the queue and generates the state transitions | |
| func (queue *OrderedQueue) doRead(runWg *sync.WaitGroup, doneCh chan struct{}, stateCh chan<- OrderedQueueState, inputCh <-chan int) { | |
| defer runWg.Done() | |
| ticker := time.NewTicker(queue.opts.Interval) | |
| for { | |
| select { | |
| case <-doneCh: | |
| ticker.Stop() | |
| stateCh <- ORDERED_QUEUE_SHUTDOWN | |
| return | |
| case <-ticker.C: | |
| stateCh <- ORDERED_QUEUE_SWITCH | |
| continue | |
| case in := <-inputCh: | |
| flush := false | |
| queue.mutex.Lock() | |
| total := queue.writeUnsafe(OrderedQueueItem(in)) | |
| if total >= queue.opts.Size { | |
| queue.swapUnsafe() | |
| flush = true | |
| } | |
| queue.mutex.Unlock() | |
| if flush { | |
| ticker.Reset(queue.opts.Interval) | |
| stateCh <- ORDERED_QUEUE_FULL | |
| } | |
| } | |
| } | |
| } | |
| func (queue *OrderedQueue) doDispatch(runWg *sync.WaitGroup, outputCh <-chan []OrderedQueueItem, doneCh chan struct{}) { | |
| defer runWg.Done() | |
| fanout := make(chan chan OrderedQueueItem) | |
| closeCh := make(chan struct{}) | |
| // Initialize workers | |
| var workerWg sync.WaitGroup | |
| for i := range queue.opts.Workers { | |
| workerWg.Add(1) | |
| worker := NewOrderedQueueDispatcher(i+1, queue.subscribers) | |
| go worker.Run(&workerWg, fanout, closeCh) | |
| } | |
| // Signal the subscribers to start reading items | |
| for range queue.subscribers { | |
| queue.signal.WaitGroup.Add(1) | |
| } | |
| close(queue.signal.SignalCh) | |
| for { | |
| select { | |
| case <-doneCh: | |
| time.Sleep(500 * time.Millisecond) | |
| // Send any remaining values | |
| items, ok := <-outputCh | |
| if ok { | |
| out := make(chan OrderedQueueItem) | |
| fanout <- out | |
| for _, item := range items { | |
| out <- item | |
| } | |
| close(out) | |
| } | |
| // Let workers know they must exit | |
| close(closeCh) | |
| workerWg.Wait() | |
| // Signal subscribers to end their execution | |
| close(queue.signal.CloseCh) | |
| queue.signal.WaitGroup.Wait() | |
| return | |
| case items := <-outputCh: | |
| // Prevent workers handling nil lists | |
| if len(items) == 0 { | |
| continue | |
| } | |
| // Send a new channel for which we'll send the values | |
| out := make(chan OrderedQueueItem) | |
| fanout <- out | |
| for _, item := range items { | |
| out <- item | |
| } | |
| close(out) | |
| continue | |
| } | |
| } | |
| } | |
| // Keep reading values from inputCh until doneCh signal | |
| func (queue *OrderedQueue) Run(wg *sync.WaitGroup, inputCh <-chan int, doneCh chan struct{}) { | |
| stateCh := make(chan OrderedQueueState) | |
| outputCh := make(chan []OrderedQueueItem) | |
| var runWg sync.WaitGroup | |
| runWg.Add(1) | |
| go queue.doDispatch(&runWg, outputCh, doneCh) | |
| runWg.Add(1) | |
| go queue.doSwitch(&runWg, stateCh, outputCh) | |
| runWg.Add(1) | |
| go queue.doRead(&runWg, doneCh, stateCh, inputCh) | |
| go func() { | |
| defer wg.Done() | |
| <-doneCh | |
| runWg.Wait() | |
| }() | |
| } | |
| func (queue *OrderedQueue) AddSubscriber(subscriber OrderedQueueSubscriber) { | |
| queue.subscribers = append(queue.subscribers, subscriber) | |
| } | |
| // An exmple subscriber implementation | |
| type DummySubscriber struct { | |
| subscriptionId int | |
| inputCh chan OrderedQueueItem | |
| } | |
| func NewDummySubscriber(subscriptionId int) *DummySubscriber { | |
| inputCh := make(chan OrderedQueueItem) | |
| return &DummySubscriber{ | |
| subscriptionId, | |
| inputCh, | |
| } | |
| } | |
| func (s *DummySubscriber) SubscriptionId() int { | |
| return s.subscriptionId | |
| } | |
| func (s *DummySubscriber) InputCh() chan OrderedQueueItem { | |
| return s.inputCh | |
| } | |
| func (s *DummySubscriber) Run(signal *OrderedQueueSignal) { | |
| <-signal.SignalCh | |
| defer signal.WaitGroup.Done() | |
| total := 0 | |
| for { | |
| select { | |
| case <-signal.CloseCh: | |
| log.Printf("subscriber #%d closing [received: %d]", s.subscriptionId, total) | |
| return | |
| case msg := <-s.inputCh: | |
| log.Printf("subscriber #%d got value %d", s.subscriptionId, msg) | |
| total += 1 | |
| continue | |
| } | |
| } | |
| } | |
| var _ OrderedQueueSubscriber = (*DummySubscriber)(nil) | |
| func main() { | |
| // Log to file | |
| logFile, err := os.OpenFile("queue.log", os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666) | |
| if err != nil { | |
| log.Fatalf("error opening file: %v", err) | |
| } | |
| defer logFile.Close() | |
| log.SetOutput(logFile) | |
| // Initialize queue | |
| opts := &OrderedQueueOpts{ | |
| Interval: time.Second, | |
| Size: 10, | |
| Sorted: true, | |
| Workers: 2, | |
| } | |
| queue := NewOrderedQueue(opts) | |
| // Signals the end of the program | |
| waitCh := make(chan struct{}) | |
| // Main input channel | |
| inputCh := make(chan int) | |
| var wg sync.WaitGroup | |
| // The producer routine emits random values periodically | |
| wg.Add(1) | |
| go func(wg *sync.WaitGroup) { | |
| defer wg.Done() | |
| count := 0 | |
| limit := 1000 | |
| for { | |
| if count >= limit { | |
| log.Printf("producer closing. total emitted %d", count) | |
| return | |
| } | |
| delay := time.Duration(rand.Intn(1000)) | |
| wait := delay * time.Millisecond | |
| amount := 90 + rand.Intn(50) | |
| select { | |
| case <-waitCh: | |
| log.Printf("producer closing. total emitted %d", count) | |
| return | |
| case <-time.After(wait): | |
| for range amount { | |
| randomNumber := rand.Intn(1000) | |
| inputCh <- randomNumber | |
| count += 1 | |
| } | |
| } | |
| } | |
| }(&wg) | |
| // Setup subscribers | |
| dummy1 := NewDummySubscriber(1) | |
| dummy2 := NewDummySubscriber(2) | |
| queue.AddSubscriber(dummy1) | |
| queue.AddSubscriber(dummy2) | |
| go dummy1.Run(queue.Signal()) | |
| go dummy2.Run(queue.Signal()) | |
| wg.Add(1) | |
| queue.Run(&wg, inputCh, waitCh) | |
| go func() { | |
| <-time.After(time.Second * 10) | |
| log.Printf("sending close signal") | |
| close(waitCh) | |
| }() | |
| wg.Wait() | |
| log.Printf("finished") | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment