前言
工作以来做项目大部分的使用的轮子(第三方库)都是一些大牛写好开源出来的,自己只是拼拼凑凑利用现有的轮子完成工作就算完事了。现在我也来造个小轮子吧,不过这个轮子是在写测试程序和分析程序时提取的,并没有用在线上项目中。这里记录下来,也看看有没有人用得上。
简介
grqueue是goroutine queue是缩写,实际是一个利用goroutine实现的一个同步队列,用于程序中可同步任务的并发执行,可以减少等待时间。比如批量日志分析,并发请求等等。
原理及实现
qrqueue的原理很简单,利用channel将需要执行的task存起来,通过go routine不断从channel中取出task执行,再利用官方sync包的WaitGroup等待执行完毕。具体routine的数量和channel的容量可以由使用者自定义,使用者也可以设置回调函数用于处理每个task结束后和所有task结束时需要处理的事务。简单画了个图
实现代码也很简单,一百行不到,具体可以看看代码和注释,各位发现有问题或者可改进的点欢迎拍砖评论。
package grqueue
import (
"sync"
)
type GoroutineQueue struct {
Number int //并发执行的任务个数
Total int //总任务数
tasks chan func() interface{}
task_end_callback func(result interface{})
finish_callback func()
wg sync.WaitGroup
}
func NewGoroutineQueue(number int, total int) *GoroutineQueue {
queue := &GoroutineQueue{
tasks: make(chan func() interface{}, total),
Number: number,
Total: total}
return queue
}
//开始执行task
func (queue *GoroutineQueue) Start() {
defer close(queue.tasks)
//加锁,锁的数量是tasks的数量
queue.wg.Add(len(queue.tasks))
for i := 0; i < queue.Number; i++ {
//分number个routine执行work
go queue.work()
}
//等待routine执行完毕
queue.wg.Wait()
//所有task完毕,若finish回调函数存在则执行则回调
if queue.finish_callback != nil {
queue.finish_callback()
}
}
func (queue *GoroutineQueue) work() {
for {
//不断取出task执行,直到chan关闭
task, ok := <-queue.tasks
if !ok {
break
}
res := task()
//完成一个task立即回调
if queue.task_end_callback != nil {
queue.task_end_callback(res)
}
//每执行完一个task,解锁一次
wg.Done()
}
}
func (queue *GoroutineQueue) AddTask(task func() interface{}) {
queue.tasks <- task
}
func (queue *GoroutineQueue) SetFinishCallback(callback func()) {
queue.finish_callback = callback
}
func (queue *GoroutineQueue) SetTaskEndCallback(callback func(result interface{})) {
queue.task_end_callback = callback
}
使用
代码已放github,直接使用直接go get
就可以了
go get github.com/yaodd/grqueue
使用示例在github的README有写,这里就不重复叙述了。
以上
--------------2018-03-23更新-------------
实际项目中应用发现wg不应该作为全局变量使用,而是应该作为GoroutineQueue成员变量使用,已修正。