Skip to content

Instantly share code, notes, and snippets.

@mtio
Last active May 2, 2017 14:42
Show Gist options
  • Select an option

  • Save mtio/e5a9c4a039358c3d1c1ef26a35318451 to your computer and use it in GitHub Desktop.

Select an option

Save mtio/e5a9c4a039358c3d1c1ef26a35318451 to your computer and use it in GitHub Desktop.
package main
import (
"container/heap"
"fmt"
"time"
"math/rand"
)
func main() {
work := make(chan Request)
bal := new(Balancer)
bal.done = make(chan *Worker)
for i := 0; i < 10; i++ {
worker := new(Worker)
worker.requests = make(chan Request)
worker.index = i
bal.pool = append(bal.pool, worker)
}
go requester(work)
bal.balance(work)
}
func requester(work chan Request) {
for {
response := make(chan string)
// Kill some time (fake load).
time.Sleep(10 * time.Millisecond)
work <- Request{doWork, response} // send request
go func(response chan string) {
fmt.Println(<-response)
}(response)
}
}
func (b *Balancer) balance(work chan Request) {
for {
select {
case req := <-work: // received a Request...
b.dispatch(req) // ...so send it to a Worker
case w := <-b.done: // a worker has finished...
b.completed(w) // ...so update its info
}
}
}
// Send Request to worker
func (b *Balancer) dispatch(req Request) {
w := heap.Pop(&b.pool).(*Worker)
go w.work(b.done)
// ...send it the task
w.requests <- req
// One more in its work queue.
w.pending++
// Put it into its place on the heap.
heap.Push(&b.pool, w)
}
// Job is complete; update heap
func (b *Balancer) completed(w *Worker) {
// One fewer in the queue.
w.pending--
// Remove it from heap.
heap.Remove(&b.pool, w.index)
// Put it into its place on the heap.
heap.Push(&b.pool, w)
}
func (w *Worker) work(done chan *Worker) {
for {
req := <- w.requests // get Request from balancer
req.response <- req.fn(*w) // call fn and send result
done <- w // we've finished this request
}
}
func doWork(worker Worker) string {
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
return fmt.Sprintf("index: %d pending:%d", worker.index, worker.pending)
}
type Request struct {
fn func(worker Worker) string
response chan string
}
type Worker struct {
requests chan Request // work to do (buffered channel)
pending int // count of pending tasks
index int // index in heap
}
type Balancer struct {
pool Pool
done chan *Worker
}
type Pool []*Worker
func (p Pool) Len() int { return len(p) }
func (p Pool) Less(i, j int) bool { return p[i].pending < p[j].pending }
func (p Pool) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p *Pool) Push(x interface{}) {
*p = append(*p, x.(*Worker))
}
func (p *Pool) Pop() interface{} {
old := *p
n := len(old)
x := old[n-1]
*p = old[0 : n-1]
return x
}
@mtio
Copy link
Author

mtio commented May 2, 2017

Updated with fixes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment