当前位置:嗨网首页>书籍在线阅读

02-runner

  
选择背景色: 黄橙 洋红 淡粉 水蓝 草绿 白色 选择字体: 宋体 黑体 微软雅黑 楷体 选择字体大小: 恢复默认

7.1 runner

runner 包用于展示如何使用通道来监视程序的执行时间,如果程序运行时间太长,也可以用 runner 包来终止程序。当开发需要调度后台处理任务的程序的时候,这种模式会很有用。这个程序可能会作为cron作业执行,或者在基于定时任务的云环境(如iron.io)里执行。

让我们来看一下 runner 包里的runner.go代码文件,如代码清单7-1所示。

代码清单7-1  runner /runner.go

 01 // Gabriel Aszalos协助完成了这个示例
 02 // runner包管理处理任务的运行和生命周期
 03 package runner
 04
 05 import (
 06   "errors"
 07   "os"
 08   "os/signal"
 09   "time"
 10 )
 11
 12 // Runner在给定的超时时间内执行一组任务,
 13 // 并且在操作系统发送中断信号时结束这些任务
 14 type Runner struct {
 15   // interrupt通道报告从操作系统
 16   // 发送的信号
 17   interrupt chan os.Signal
 18
 19   // complete通道报告处理任务已经完成
 20   complete chan error
 21
 22   // timeout报告处理任务已经超时
 23   timeout <-chan time.Time
 24
 25   // tasks持有一组以索引顺序依次执行的
 26   // 函数
 27   tasks []func(int)
 28 }
 29
 30 // ErrTimeout会在任务执行超时时返回
 31 var ErrTimeout = errors.New("received timeout")
 32
 33 // ErrInterrupt会在接收到操作系统的事件时返回
 34 var ErrInterrupt = errors.New("received interrupt")
 35
 36 // New返回一个新的准备使用的Runner
 37 func New(d time.Duration) *Runner {
 38   return &Runner{
 39     interrupt: make(chan os.Signal, 1),
 40     complete: make(chan error),
 41     timeout:  time.After(d),
 42   }
 43 }
 44
 45 // Add将一个任务附加到Runner上。这个任务是一个
 46 // 接收一个int类型的ID作为参数的函数
 47 func (r *Runner) Add(tasks ...func(int)) {
 48   r.tasks = append(r.tasks, tasks...)
 49 }
 50
 51 // Start执行所有任务,并监视通道事件
 52 func (r *Runner) Start() error {
 53   // 我们希望接收所有中断信号
 54   signal.Notify(r.interrupt, os.Interrupt)
 55
 56   // 用不同的goroutine执行不同的任务
 57   go func() {
 58     r.complete <- r.run()
 59   }()
 60
 61   select {
 62   // 当任务处理完成时发出的信号
 63   case err := <-r.complete:
 64     return err
 65
 66   // 当任务处理程序运行超时时发出的信号
 67   case <-r.timeout:
 68     return ErrTimeout
 69   }
 70 }
 71
 72 // run执行每一个已注册的任务
 73 func (r *Runner) run() error {
 74   for id, task := range r.tasks {
 75     // 检测操作系统的中断信号
 76     if r.gotInterrupt() {
 77       return ErrInterrupt
 78     }
 79
 80     // 执行已注册的任务
 81     task(id)
 82   }
 83
 84   return nil
 85 }
 86
 87 // gotInterrupt验证是否接收到了中断信号
 88 func (r *Runner) gotInterrupt() bool {
 89   select {
 90   // 当中断事件被触发时发出的信号
 91   case <-r.interrupt:
 92     // 停止接收后续的任何信号
 93     signal.Stop(r.interrupt)
 95     return true
 96
 97   // 继续正常运行
 98   default:
 99     return false
100   }
101 }

代码清单7-1 中的程序展示了依据调度运行的无人值守的面向任务的程序,及其所使用的并发模式。在设计上,可支持以下终止点:

  • 程序可以在分配的时间内完成工作,正常终止;
  • 程序没有及时完成工作,“自杀”;
  • 接收到操作系统发送的中断事件,程序立刻试图清理状态并停止工作。

让我们走查一遍代码,看看每个终止点是如何实现的,如代码清单7-2所示。

代码清单7-2  runner /runner.go:第12行到第28行

12 // Runner在给定的超时时间内执行一组任务,  
13 // 并且在操作系统发送中断信号时结束这些任务
14 type Runner struct {
15   // interrupt通道报告从操作系统
16   // 发送的信号
17   interrupt chan os.Signal
18
19   // complete通道报告处理任务已经完成
20   complete chan error
21
22   // timeout报告处理任务已经超时
23   timeout <-chan time.Time
24
25   // tasks持有一组以索引顺序依次执行的
26   // 函数
27   tasks []func(int)
28 }

代码清单7-2从第14行声明 Runner 结构开始。这个类型声明了3个通道,用来辅助管理程序的生命周期,以及用来表示顺序执行的不同任务的函数切片。

第17行的 interrupt 通道收发 os.Signal 接口类型的值,用来从主机操作系统接收中断事件。 os.Signal 接口的声明如代码清单7-3所示。

代码清单7-3 golang.org/pkg/os/#Signal

// Signal用来描述操作系统发送的信号。其底层实现通常会
// 依赖操作系统的具体实现:在UNIX系统上是
// syscall.Signal 
type Signal interface {
  String() string
  Signal()//用来区分其他Stringer
}

代码清单7-3展示了 os.Signal 接口的声明。这个接口抽象了不同操作系统上捕获和报告信号事件的具体实现。

第二个字段被命名为 complete ,是一个收发 error 接口类型值的通道,如代码清单7-4所示。

代码清单7-4  runner /runner.go:第19行到第20行

19   // complete通道报告处理任务已经完成
20   complete chan error

这个通道被命名为 complete ,因为它被执行任务的goroutine用来发送任务已经完成的信号。如果执行任务时发生了错误,会通过这个通道发回一个 error 接口类型的值。如果没有发生错误,会通过这个通道发回一个 nil 值作为 error 接口值。

第三个字段被命名为 timeout ,接收 time.Time 值,如代码清单7-5所示。

代码清单7-5  runner /runner.go:第22行到第23行

22   // timeout报告处理任务已经超时
23   timeout <-chan time.Time

这个通道用来管理执行任务的时间。如果从这个通道接收到一个 time.Time 的值,这个程序就会试图清理状态并停止工作。

最后一个字段被命名为 tasks ,是一个函数值的切片,如代码清单7-6所示。

代码清单7-6  runner /runner.go:第25行到第27行

25   // tasks持有一组以索引顺序依次执行的
26   // 函数
27   tasks []func(int)

这些函数值代表一个接一个顺序执行的函数。会有一个与 main 函数分离的goroutine来执行这些函数。

现在已经声明了 Runner 类型,接下来看一下两个 error 接口变量,这两个变量分别代表不同的错误值,如代码清单7-7所示。

代码清单7-7  runner /runner.go:第30行到第34行

30 // ErrTimeout会在任务执行超时时返回
31 var ErrTimeout = errors.New("received timeout")
32
33 // ErrInterrupt会在接收到操作系统的事件时返回
34 var ErrInterrupt = errors.New("received interrupt")

第一个 error 接口变量名为 ErrTimeout 。这个错误值会在收到超时事件时,由 Start 方法返回。第二个 error 接口变量名为 ErrInterrupt 。这个错误值会在收到操作系统的中断事件时,由 Start 方法返回。

现在我们来看一下用户如何创建一个 Runner 类型的值,如代码清单7-8所示。

代码清单7-8  runner /runner.go:第36行到第43行

36 // New返回一个新的准备使用的Runner
37 func New(d time.Duration) *Runner {
38   return &Runner{
39     interrupt: make(chan os.Signal, 1),
40     complete: make(chan error),
41     timeout:  time.After(d),
42   }
43 }

代码清单7-8 展示了名为 New 的工厂函数。这个函数接收一个 time.Duration 类型的值,并返回 Runner 类型的指针。这个函数会创建一个 Runner 类型的值,并初始化每个通道字段。因为 task 字段的零值是 nil ,已经满足初始化的要求,所以没有被明确初始化。每个通道字段都有独立的初始化过程,让我们探究一下每个字段的初始化细节。

通道 interrupt 被初始化为缓冲区容量为1的通道。这可以保证通道至少能接收一个来自语言运行时的 os.Signal 值,确保语言运行时发送这个事件的时候不会被阻塞。如果goroutine没有准备好接收这个值,这个值就会被丢弃。例如,如果用户反复敲 Ctrl+C组合键,程序只会在这个通道的缓冲区可用的时候接收事件,其余的所有事件都会被丢弃。

通道 complete 被初始化为无缓冲的通道。当执行任务的goroutine完成时,会向这个通道发送一个 error 类型的值或者 nil 值。之后就会等待 main 函数接收这个值。一旦 main 接收了这个 error 值,goroutine就可以安全地终止了。

最后一个通道 timeout 是用 time 包的 After 函数初始化的。 After 函数返回一个 time.Time 类型的通道。语言运行时会在指定的 duration 时间到期之后,向这个通道发送一个 time.Time 的值。

现在知道了如何创建并初始化一个 Runner 值,我们再来看一下与 Runner 类型关联的方法。第一个方法 Add 用来增加一个要执行的任务函数,如代码清单7-9所示。

代码清单7-9  runner /runner.go:第45行到第49行

45 // Add将一个任务附加到Runner上。这个任务是一个
46 // 接收一个int类型的ID作为参数的函数
47 func (r *Runner) Add(tasks ...func(int)) {
48   r.tasks = append(r.tasks, tasks...)
49 }

代码清单7-9展示了 Add 方法,这个方法接收一个名为 tasks 的可变参数。 可变参数 可以接受任意数量的值作为传入参数。这个例子里,这些传入的值必须是一个接收一个整数且什么都不返回的函数。函数执行时的参数 tasks 是一个存储所有这些传入函数值的切片。

现在让我们来看一下 run 方法,如代码清单7-10所示。

代码清单7-10  runner /runner.go:第72行到第85行

72 // run执行每一个已注册的任务
73 func (r *Runner) run() error {
74   for id, task := range r.tasks {
75     // 检测操作系统的中断信号
76     if r.gotInterrupt() {
77       return ErrInterrupt
78     }
79
80     // 执行已注册的任务
81     task(id)
82   }
83
84   return nil
85 }

代码清单7-10的第73行的 run 方法会迭代 tasks 切片,并按顺序执行每个函数。函数会在第81行被执行。在执行之前,会在第76行调用 gotInterrupt 方法来检查是否有要从操作系统接收的事件。

代码清单7-11中的方法 gotInterrupt 展示了带 default 分支的 select 语句的经典用法。

代码清单7-11  runner /runner.go:第87行到第101行

 87 // gotInterrupt验证是否接收到了中断信号
 88 func (r *Runner) gotInterrupt() bool {
 89   select {
 90   // 当中断事件被触发时发出的信号
 91   case <-r.interrupt:
 92     // 停止接收后续的任何信号
 93     signal.Stop(r.interrupt)
 95     return true
 96
 97   // 继续正常运行
 98   default:
 99     return false
100   }
101 }

在第91行,代码试图从 interrupt 通道去接收信号。一般来说, select 语句在没有任何要接收的数据时会阻塞,不过有了第98行的 default 分支就不会阻塞了。 default 分支会将接收 interrupt 通道的阻塞调用转变为非阻塞的。如果 interrupt 通道有中断信号需要接收,就会接收并处理这个中断。如果没有需要接收的信号,就会执行 default 分支。

当收到中断信号后,代码会通过在第93行调用 Stop 方法来停止接收之后的所有事件。之后函数返回 true 。如果没有收到中断信号,在第99行该方法会返回 false 。本质上, gotInterrupt 方法会让goroutine检查中断信号,如果没有发出中断信号,就继续处理工作。

这个包里的最后一个方法名为 Start ,如代码清单7-12所示。

代码清单7-12  runner /runner.go:第51行到第70行

51 // Start执行所有任务,并监视通道事件
52 func (r *Runner) Start() error {
53   // 我们希望接收所有中断信号
54   signal.Notify(r.interrupt, os.Interrupt)
55
56   // 用不同的goroutine执行不同的任务
57   go func() {
58     r.complete <- r.run()
59   }()
60
61   select {
62   // 当任务处理完成时发出的信号
63   case err := <-r.complete:
64     return err
65
66   // 当任务处理程序运行超时时发出的信号
67   case <-r.timeout:
68     return ErrTimeout
69   }
70 }

方法 Start 实现了程序的主流程。在代码清单7-12的第52行, Start 设置了 gotInterrupt 方法要从操作系统接收的中断信号。在第56行到第59行,声明了一个匿名函数,并单独启动goroutine来执行。这个goroutine会执行一系列被赋予的任务。在第58行,在goroutine的内部调用了 run 方法,并将这个方法返回的 error 接口值发送到 complete 通道。一旦 error 接口的值被接收,该goroutine就会通过通道将这个值返回给调用者。

创建goroutine后, Start 进入一个 select 语句,阻塞等待两个事件中的任意一个。如果从 complete 通道接收到 error 接口值,那么该goroutine要么在规定的时间内完成了分配的工作,要么收到了操作系统的中断信号。无论哪种情况,收到的 error 接口值都会被返回,随后方法终止。如果从 timeout 通道接收到 time.Time 值,就表示goroutine没有在规定的时间内完成工作。这种情况下,程序会返回 ErrTimeout 变量。

现在看过了 runner 包的代码,并了解了代码是如何工作的,让我们看一下main.go代码文件中的测试程序,如代码清单7-13所示。

代码清单7-13  runner /main/main.go

01 // 这个示例程序演示如何使用通道来监视
02 // 程序运行的时间,以在程序运行时间过长
03 // 时如何终止程序
03 package main
04
05 import (
06   "log"
07   "time"
08
09   "github.com/goinaction/code/chapter7/patterns/runner"
10 )
11
12 // timeout规定了必须在多少秒内处理完成
13 const timeout = 3 * time.Second
14
15 // main是程序的入口
16 func main() {
17   log.Println("Starting work.")
18
19   // 为本次执行分配超时时间
20   r := runner.New(timeout)
21
22   // 加入要执行的任务
23   r.Add(createTask(), createTask(), createTask())
24
25   // 执行任务并处理结果
26   if err := r.Start(); err != nil {
27     switch err {
28     case runner.ErrTimeout:
29       log.Println("Terminating due to timeout.")
30       os.Exit(1)
31     case runner.ErrInterrupt:
32       log.Println("Terminating due to interrupt.")
33       os.Exit(2)
34     }
35   }
36
37   log.Println("Process ended.")
38 }
39
40 // createTask返回一个根据id 
41 // 休眠指定秒数的示例任务
42 func createTask() func(int) {
43   return func(id int) {
44     log.Printf("Processor - Task #%d.", id)
45     time.Sleep(time.Duration(id) * time.Second)
46   }
47 }

代码清单7-13的第16行是 main 函数。在第20行,使用 timeout 作为超时时间传给 New 函数,并返回了一个指向 Runner 类型的指针。之后在第23行,使用 createTask 函数创建了几个任务,并被加入 Runner 里。在第42行声明了 createTask 函数。这个函数创建的任务只是休眠了一段时间,用来模拟正在进行工作。增加完任务后,在第26行调用了 Start 方法, main 函数会等待 Start 方法的返回。

Start 返回时,会检查其返回的 error 接口值,并存入 err 变量。如果确实发生了错误,代码会根据 err 变量的值来判断方法是由于超时终止的,还是由于收到了中断信号终止。如果没有错误,任务就是按时执行完成的。如果执行超时,程序就会用错误码1终止。如果接收到中断信号,程序就会用错误码2终止。其他情况下,程序会使用错误码0正常终止。