简易协程池

前言

Go 的 goroutine 提供了一种较线程而言更廉价的方式处理并发场景, go 使用二级线程的模式, 将 goroutine 以 M:N 的形式复用到系统线程上, 节省了 cpu 调度的开销, 也避免了用户级线程(协程)进行系统调用时阻塞整个系统线程的问题。

但在实际的开发中如果待执行的tasks数量过多,可能带来的问题就是goroutine数量激增进而导致调度性能下降、GC 频繁、内存暴涨, 引发一系列问题。在面临这样的场景时, 限制 goroutine 的数量、重用 goroutine 显然很有价值。所以可以通过一个goroutine池来对所有的tasks进行排队执行,类似下面就是一个简单的goroutine池,通过定量的goroutine来进行任务的消费。设计下面的协程池有三个重点问题是需要重点考虑到的
1.如何确保任务全部都被消费完?
2.子goroutine发生panic如何处理?
3.子goroutine如果全部发生panic了怎么办?放任死锁吗?

这里并没考虑子goroutine执行时间太长的问题,主要原因我们可以自定义子goroutine的数量,我更希望的是任务能够执行完全而不是任务什么时候执行完,任务执行时间太长应该是业务侧的问题。

代码展示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import (
"sync"
"sync/atomic"

"git.code.oa.com/trpc-go/trpc-go/errs"
"git.code.oa.com/trpc-go/trpc-go/log"
)

//自定义简单协程池 统一管理并发协程
//整体思路:定量的协程从同一个任务队列循环消费任务
//缺点:目前功能对协程超时情况存在缺陷 初始化函数中没有对初始化数量进行校验
type Pool struct {
TaskQueue chan func() error //任务队列
WorkerNumber int //初始化worker数量
TaskNumber int //待消费任务数量
ResultQuene chan error //返回结果队列
FinishCallback func() //全部任务消费完成的回调函数
runningWorkers int64 //运行中的worker数
sync.Mutex
// PanicHandler func(interface{}) //协程panic的处理函数 防止协程panic后导致主协程崩溃
}

//初始化 根据自定义需求设定worker数量以及任务队列长度
func (self *Pool) Init(WorkerNumber int, TaskNumber int) {
self.WorkerNumber = WorkerNumber
self.TaskNumber = TaskNumber
self.TaskQueue = make(chan func() error, TaskNumber)
self.ResultQuene = make(chan error, TaskNumber)
}

//开启worker的消费动作
func (self *Pool) Start() {
//默认开启 WorkerNumber个goruntine
for i := 0; i < self.WorkerNumber; i++ {
self.runGroutine()
}

//获取每个任务的处理结果
for j := 0; j < self.TaskNumber; j++ {
err, ok := <-self.ResultQuene
//这里可以通过自定义决定是否上报日志
if !ok {
break
} else {
if err != nil {
log.Error(err.Error())
}
}
}

//结束时的回调函数 可以做一些通知操作等等
if self.FinishCallback != nil {
self.FinishCallback()
}
}

//运行一个groutine 开始消费任务
func (self *Pool) runGroutine() { // runningWorkers + 1
self.incRunningWorkers() //worker运行ing数量原子自增1
go func() {
defer func() {
self.decRunningWorkers() //worker运行ing数量原子自减1
if r := recover(); r != nil {
// if self.PanicHandler != nil {
// self.PanicHandler(r)
// } else {
// log.Printf("Worker panic: %s\n", r)
// }
err := errs.New(Panic_Sub_Goroutine, "子协程panic") //子协程panic是会导致主协程挂掉的,这一步也是必须进行捕获处理
self.ResultQuene <- err
}
self.checkWorker() // 兜底机制,避免worker全部panic后没有worker消费队列中的数据,理论上这一步非常重要!否则有可能出现死锁状态
}()
for {
select {
case task, ok := <-self.TaskQueue:
if !ok {
//break 这里用break导致线上遇到了一个cpu被打满的bug 下一篇文章中进行说明
return
}
err := task()
self.ResultQuene <- err
}
}
}()
}

func (self *Pool) incRunningWorkers() { // runningWorkers + 1
atomic.AddInt64(&self.runningWorkers, 1)
}

func (self *Pool) decRunningWorkers() { // runningWorkers - 1
atomic.AddInt64(&self.runningWorkers, -1)
}

func (self *Pool) GetRunningWorkers() int64 {
return atomic.LoadInt64(&self.runningWorkers)
}

func (self *Pool) checkWorker() {
self.Lock()
defer self.Unlock()

if self.GetRunningWorkers() == 0 && len(self.TaskQueue) > 0 {
self.runGroutine()
}
}

//关闭通道避免内存泄漏
func (self *Pool) Stop() {
close(self.TaskQueue)
close(self.ResultQuene)
}

//任务入队
func (self *Pool) AddTask(task func() error) {
self.TaskQueue <- task
}

//执行结束后的回调
func (self *Pool) SetFinishCallback(fun func()) {
self.FinishCallback = fun
}

使用方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
ctx := context.Background()
ids := make([]int32, 0)
var pool comm.Pool
pool.Init(50, len(ids))
count := int32(0)
for _, id := range ids {
idValue := id
pool.AddTask(func() error {
return func(innerId int32) error {
err := DoSomething(innerId)
if err != nil {
log.ErrorContextf(ctx, "DoSomething failed, id:%d err:%s", innerId, err.Error())
return nil
}
//以上执行无问题,完成数就+1
atomic.AddInt32(&count, 1)
return nil
}(int32(idValue))
})
}
//回调
pool.SetFinishCallback(func() {
//自定义通知操作
fmt.Println("all done")
})
//开始执行
pool.Start()
//关闭资源
pool.Stop()

总结

池化管理自己的goroutine是非常有必要的,可以帮助我们提升开发效率降低异常风险!所以赶紧把你的goroutine管理起来吧