Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save billiepander/e9be00da431e86453713550129532151 to your computer and use it in GitHub Desktop.

Select an option

Save billiepander/e9be00da431e86453713550129532151 to your computer and use it in GitHub Desktop.
代码来源于微信公众号flysnow_org,添加了对于[部分]返回结果的处理
package main
import (
"errors"
"os"
"os/signal"
"time"
"log"
)
var ErrTimeOut = errors.New("执行者执行超时")
var ErrInterrupt = errors.New("执行者被中断")
//一个执行者,可以执行任何任务,但是这些任务是限制完成的,//该执行者可以通过发送终止信号终止它
type Runner struct {
tasks []func(int) //要执行的任务
complete chan error //用于通知任务全部完成
timeout <-chan time.Time //这些任务在多久内完成
interrupt chan os.Signal //可以控制强制终止的信号
}
func New(tm time.Duration) *Runner {
return &Runner{
complete: make(chan error),
timeout: time.After(tm), //After waits for the duration to elapse and then sends the current time on the returned channel.
interrupt: make(chan os.Signal, 1),
}
}
//将需要执行的任务,添加到Runner里
func (r *Runner) Add(tasks ...func(int)) {
r.tasks = append(r.tasks, tasks...)
}
//执行任务,执行的过程中接收到中断信号时,返回中断错误//如果任务全部执行完,还没有接收到中断信号,则返回nil
func (r *Runner) run() error {
for id, task := range r.tasks {
if r.isInterrupt() {
return ErrInterrupt
}
task(id) // 第n个task睡眠n秒
}
return nil
}
//检查是否接收到了中断信号
func (r *Runner) isInterrupt() bool {
select {
case <-r.interrupt:
signal.Stop(r.interrupt) // r.interrupt channel不再接受系统的signal
return true
default:
return false
}
}
//开始执行所有任务,并且监视通道事件
func (r *Runner) Start() error {
//希望接收哪些系统信号
signal.Notify(r.interrupt, os.Interrupt, os.Kill) // relay incoming signals to r.interrupt
go func() {
r.complete <- r.run()
}()
select { // 柱塞直到报错或者完成
case err := <-r.complete:
return err
case <-r.timeout: // here it return ErrTimeOut
return ErrTimeOut
}
}
func main() {
log.Println("...开始执行任务...")
timeout := 3 * time.Second // (*)
r := New(timeout)
results := make(chan int, 3)
r.Add(createTask(results), createTask(results), createTask(results))
if err:=r.Start();err!=nil{
switch err {
case ErrTimeOut:
log.Println(err)
close(results)
for completed_task_id := range results{
log.Printf("Timeout前部分完成的有%d", completed_task_id)
}
os.Exit(1)
case ErrInterrupt:
log.Println(err)
close(results)
for completed_task_id := range results{
log.Printf("Interrupt前部分完成的有%d", completed_task_id)
}
os.Exit(2)
}
}
log.Println("...任务执行结束...")
close(results)
for completed_task_id := range results{
log.Printf("全部完成了%d", completed_task_id)
}
}
func createTask(results chan int) func(int) {
return func(id int) {
log.Printf("正在执行任务%d", id)
time.Sleep(time.Duration(id)* time.Second)
log.Printf("任务%d已执行完毕", id)
results <- id
}
}
/*
超时的示例输出:
2017/04/25 10:55:40 ...开始执行任务...
2017/04/25 10:55:40 正在执行任务0
2017/04/25 10:55:40 任务0已执行完毕
2017/04/25 10:55:40 正在执行任务1
2017/04/25 10:55:41 任务1已执行完毕
2017/04/25 10:55:41 正在执行任务2
2017/04/25 10:55:43 执行者执行超时
2017/04/25 10:55:43 任务2已执行完毕
2017/04/25 10:55:43 Timeout前部分完成的有0
2017/04/25 10:55:43 Timeout前部分完成的有1
exit status 1
*/
/*
被外界中断的示例输出:
2017/04/25 10:55:20 ...开始执行任务...
2017/04/25 10:55:20 正在执行任务0
2017/04/25 10:55:20 任务0已执行完毕
2017/04/25 10:55:20 正在执行任务1
2017/04/25 10:55:21 任务1已执行完毕
2017/04/25 10:55:21 执行者被中断
2017/04/25 10:55:21 Interrupt前部分完成的有0
2017/04/25 10:55:21 Interrupt前部分完成的有1
exit status 2
*/
/*
全部完成的示例输出[将(*)处的数值改大]:
2017/04/25 11:02:10 ...开始执行任务...
2017/04/25 11:02:10 正在执行任务0
2017/04/25 11:02:10 任务0已执行完毕
2017/04/25 11:02:10 正在执行任务1
2017/04/25 11:02:11 任务1已执行完毕
2017/04/25 11:02:11 正在执行任务2
2017/04/25 11:02:13 任务2已执行完毕
2017/04/25 11:02:13 ...任务执行结束...
2017/04/25 11:02:13 全部完成了0
2017/04/25 11:02:13 全部完成了1
2017/04/25 11:02:13 全部完成了2
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment