Created
March 11, 2026 11:26
-
-
Save Ichi-1/ecf03690ca93fa5e180b3d2d9aa350d7 to your computer and use it in GitHub Desktop.
Worker Pool (KPSN Live Coding)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| import ( | |
| "errors" | |
| "sync" | |
| ) | |
| var ( | |
| ErrQueueFull = errors.New("task queue is full") | |
| ErrStopped = errors.New("worker pool is stopped") | |
| ) | |
| type WorkerPool struct { | |
| taskChan chan func() // Очередь для входящих задач | |
| stopped bool // Флаг остановки WP | |
| stopChan chan struct{} // Сигнал Hard Stop | |
| drainChan chan struct{} // Сигнал Graceful Stop | |
| wg sync.WaitGroup | |
| mu sync.Mutex | |
| } | |
| func NewWorkerPool(workerLimit int) *WorkerPool { | |
| wp := &WorkerPool{ | |
| taskChan: make(chan func(), 10), | |
| stopChan: make(chan struct{}), | |
| drainChan: make(chan struct{}), | |
| } | |
| wp.wg.Add(workerLimit) | |
| for range workerLimit { | |
| go wp.worker() | |
| } | |
| return wp | |
| } | |
| // worker Реализует основноой цикл вокрера в 3х кейсах | |
| // 1. Normal Mode: обработка задачи из очереди <-wp.taskChan | |
| // 2. Обработка сигнала Hard Stop <-wp.stopChan, выходим и не дорабатываем задачи из очереди | |
| // 3. Обработка сигнала Graceful Stop <-wp.drainChan, вычитваем задачи из очереди | |
| func (wp *WorkerPool) worker() { | |
| defer wp.wg.Done() | |
| for { | |
| select { | |
| // Normal Mode | |
| case task := <-wp.taskChan: | |
| task() | |
| // Hard Stop | |
| case <-wp.stopChan: | |
| return | |
| // Graceful Stop | |
| case <-wp.drainChan: | |
| for { | |
| select { | |
| case task := <-wp.taskChan: | |
| task() | |
| default: | |
| return | |
| } | |
| } | |
| } | |
| } | |
| } | |
| // Submit Неблокирующая операция добавления задачи в очередь | |
| func (wp *WorkerPool) Submit(task func()) error { | |
| wp.mu.Lock() | |
| if wp.stopped { | |
| wp.mu.Unlock() | |
| return ErrStopped | |
| } | |
| wp.mu.Unlock() | |
| select { | |
| case wp.taskChan <- task: | |
| return nil | |
| default: | |
| return ErrQueueFull | |
| } | |
| } | |
| // SubmitWait - добавить таску в воркер пул и дождаться окончания ее выполнения если был вызван метод Stop. | |
| // Проверяем только сигнал Hard Stop <-stopChan | |
| // При StopWait (<-drainChan) задача гарантированно выполнится — ждём done. | |
| func (wp *WorkerPool) SubmitWait(task func()) error { | |
| wp.mu.Lock() | |
| if wp.stopped { | |
| wp.mu.Unlock() | |
| return ErrStopped | |
| } | |
| wp.mu.Unlock() | |
| var ( | |
| started = make(chan struct{}) | |
| done = make(chan struct{}) | |
| wrappedTask = func() { | |
| close(started) | |
| task() | |
| close(done) | |
| } | |
| ) | |
| if err := wp.Submit(wrappedTask); err != nil { | |
| return err | |
| } | |
| // При получении сигнала об остановке WP, ждем завершения начатой таски | |
| // Если таска не началась, то возвращаем ошибку ErrStopped и оставляем ее в очереди | |
| select { | |
| case <-done: | |
| return nil | |
| case <-wp.stopChan: | |
| select { | |
| case <-started: | |
| <-done | |
| return nil | |
| default: | |
| return ErrStopped | |
| } | |
| } | |
| } | |
| // Stop Отправка сигнала Hard Stop в воркеры | |
| func (wp *WorkerPool) Stop() error { | |
| wp.mu.Lock() | |
| if wp.stopped { | |
| wp.mu.Unlock() | |
| return ErrStopped | |
| } | |
| wp.stopped = true | |
| close(wp.stopChan) | |
| wp.mu.Unlock() | |
| wp.wg.Wait() | |
| return nil | |
| } | |
| // StopWait Отправка сигнала Graceful Stop в воркеры | |
| func (wp *WorkerPool) StopWait() error { | |
| wp.mu.Lock() | |
| if wp.stopped { | |
| wp.mu.Unlock() | |
| return ErrStopped | |
| } | |
| wp.stopped = true | |
| close(wp.drainChan) | |
| wp.mu.Unlock() | |
| wp.wg.Wait() | |
| return nil | |
| } |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Отказ от закрытия канала данных: Канал taskChan не закрывается никогда. Управление жизненным циклом перенесено на независимые сигнальные каналы: stopChan (Hard Stop) и drainChan (Graceful Stop). Это полностью исключает Data Race при конкурентном Submit.
Гарантии Graceful Stop (StopWait):
При вызове StopWait() воркеры перехватывают сигнал через select и переходят в drain-режим, неблокирующим образом вычитывают остатки из taskChan
Load Shedding в SubmitWait: