Skip to content

Instantly share code, notes, and snippets.

@Ichi-1
Created March 11, 2026 11:26
Show Gist options
  • Select an option

  • Save Ichi-1/ecf03690ca93fa5e180b3d2d9aa350d7 to your computer and use it in GitHub Desktop.

Select an option

Save Ichi-1/ecf03690ca93fa5e180b3d2d9aa350d7 to your computer and use it in GitHub Desktop.
Worker Pool (KPSN Live Coding)
package main
import (
"errors"
"sync"
)
var (
ErrQueueFull = errors.New("task queue is full")
ErrStopped = errors.New("worker pool is stopped")
)
type WorkerPool struct {
taskChan chan func() // Очередь для входящих задач
stopped bool // Флаг остановки WP
stopChan chan struct{} // Сигнал Hard Stop
drainChan chan struct{} // Сигнал Graceful Stop
wg sync.WaitGroup
mu sync.Mutex
}
func NewWorkerPool(workerLimit int) *WorkerPool {
wp := &WorkerPool{
taskChan: make(chan func(), 10),
stopChan: make(chan struct{}),
drainChan: make(chan struct{}),
}
wp.wg.Add(workerLimit)
for range workerLimit {
go wp.worker()
}
return wp
}
// worker Реализует основноой цикл вокрера в 3х кейсах
// 1. Normal Mode: обработка задачи из очереди <-wp.taskChan
// 2. Обработка сигнала Hard Stop <-wp.stopChan, выходим и не дорабатываем задачи из очереди
// 3. Обработка сигнала Graceful Stop <-wp.drainChan, вычитваем задачи из очереди
func (wp *WorkerPool) worker() {
defer wp.wg.Done()
for {
select {
// Normal Mode
case task := <-wp.taskChan:
task()
// Hard Stop
case <-wp.stopChan:
return
// Graceful Stop
case <-wp.drainChan:
for {
select {
case task := <-wp.taskChan:
task()
default:
return
}
}
}
}
}
// Submit Неблокирующая операция добавления задачи в очередь
func (wp *WorkerPool) Submit(task func()) error {
wp.mu.Lock()
if wp.stopped {
wp.mu.Unlock()
return ErrStopped
}
wp.mu.Unlock()
select {
case wp.taskChan <- task:
return nil
default:
return ErrQueueFull
}
}
// SubmitWait - добавить таску в воркер пул и дождаться окончания ее выполнения если был вызван метод Stop.
// Проверяем только сигнал Hard Stop <-stopChan
// При StopWait (<-drainChan) задача гарантированно выполнится — ждём done.
func (wp *WorkerPool) SubmitWait(task func()) error {
wp.mu.Lock()
if wp.stopped {
wp.mu.Unlock()
return ErrStopped
}
wp.mu.Unlock()
var (
started = make(chan struct{})
done = make(chan struct{})
wrappedTask = func() {
close(started)
task()
close(done)
}
)
if err := wp.Submit(wrappedTask); err != nil {
return err
}
// При получении сигнала об остановке WP, ждем завершения начатой таски
// Если таска не началась, то возвращаем ошибку ErrStopped и оставляем ее в очереди
select {
case <-done:
return nil
case <-wp.stopChan:
select {
case <-started:
<-done
return nil
default:
return ErrStopped
}
}
}
// Stop Отправка сигнала Hard Stop в воркеры
func (wp *WorkerPool) Stop() error {
wp.mu.Lock()
if wp.stopped {
wp.mu.Unlock()
return ErrStopped
}
wp.stopped = true
close(wp.stopChan)
wp.mu.Unlock()
wp.wg.Wait()
return nil
}
// StopWait Отправка сигнала Graceful Stop в воркеры
func (wp *WorkerPool) StopWait() error {
wp.mu.Lock()
if wp.stopped {
wp.mu.Unlock()
return ErrStopped
}
wp.stopped = true
close(wp.drainChan)
wp.mu.Unlock()
wp.wg.Wait()
return nil
}
@Ichi-1
Copy link
Author

Ichi-1 commented Mar 11, 2026

  1. Отказ от закрытия канала данных: Канал taskChan не закрывается никогда. Управление жизненным циклом перенесено на независимые сигнальные каналы: stopChan (Hard Stop) и drainChan (Graceful Stop). Это полностью исключает Data Race при конкурентном Submit.

  2. Гарантии Graceful Stop (StopWait):
    При вызове StopWait() воркеры перехватывают сигнал через select и переходят в drain-режим, неблокирующим образом вычитывают остатки из taskChan

  3. Load Shedding в SubmitWait:

  • Метод реализован через паттерн Task Wrapping (оборачивание задачи с каналами started и done).
  • Дополнительно реализован паттерн Fail-Fast (Load Shedding): если очередь пула переполнена, SubmitWait немедленно отбрасывает задачу, не блокируя вызывающую горутину.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment