100 行写一个 go 的协程池 (任务池)

前言

go 的 goroutine 提供了一种较线程而言更廉价的方式处理并发场景, go 使用二级线程的模式, 将 goroutine 以 M:N 的形式复用到系统线程上, 节省了 cpu 调度的开销, 也避免了用户级线程(协程)进行系统调用时阻塞整个系统线程的问题。【1】

但 goroutine 太多仍会导致调度性能下降、GC 频繁、内存暴涨, 引发一系列问题。在面临这样的场景时, 限制 goroutine 的数量、重用 goroutine 显然很有价值。

本文正是针对上述情况而提供一种简单的解决方案, 编写一个协程池(任务池)来实现对 goroutine 的管控。

思路

要解决这个问题, 要思考两个问题

  • goroutine 的数量如何限制, goroutine 如何重用
  • 任务如何执行

goroutine 的数量如何限制, goroutine 如何重用

说到限制和重用, 那么最先想到的就是池化。比如 TCP 连接池, 线程池, 都是有效限制、重用资源的最好实践。所以, 我们可以创建一个 goroutine 池, 用来管理 goroutine。

任务如何执行

在使用原生 goroutine 的场景中, 运行一个任务直接启动一个 goroutine 来运行, 在池化的场景而言, 任务也是要在 goroutine 中执行, 但是任务需要任务池来放入 goroutine。

生产者消费者模型

在连接池中, 连接在使用时从池中取出, 用完后放入池中。对于 goroutine 而言, goroutine 通过语言关键字启动, 无法像连接一样操作。那么如何让 goroutine 可以执行任务, 且执行后可以重新用来执行其它任务呢?这里就需要使用生产者消费者模型了:

生产者 --(生产任务)--> 队列 --(消费任务)--> 消费者

用来执行任务的 goroutine 可以作为消费者, 操作任务池的 goroutine 作为生产者, 而队列则可以使用 go 的 buffer channel, 任务池的建模到此结束。

实现

Talk is cheap. Show me the code.

任务的定义

任务要包含需要执行的函数、以及函数要传的参数, 因为参数类型、个数不确定, 这里使用可变参数和空接口的形式

type Task struct {
    Handler func(v ...interface{})
    Params  []interface{}
}

任务池的定义

任务池的定义包括了池的容量 capacity、当前运行的 worker(goroutine)数量 runningWorkers、任务队列(channel)chTask 以及任务池的状态 status(运行中或已关闭, 用于安全关闭任务池), 最后还有一把互斥锁 sync.Mutex

type Pool struct {
    capacity       uint64
    runningWorkers uint64
    status          int64
    chTask          chan *Task
    sync.Mutex
}

任务池的构造函数:


var ErrInvalidPoolCap = errors.New("invalid pool cap")

const (
    RUNNING = 1
    STOPED = 0
)

func NewPool(capacity uint64) (*Pool, error) {
    if capacity <= 0 {
        return nil, ErrInvalidPoolCap
    }
    return &Pool{
        capacity: capacity,
        status:    RUNNING,
        // 初始化任务队列, 队列长度为容量
        chTask:    make(chan *Task, capacity),
    }, nil
}

启动 worker

新建 run() 方法作为启动 worker 的方法:

func (p *Pool) run() {
    p.runningWorkers++ // 运行中的任务加一

    go func() {
        defer func() {
            p.runningWorkers-- // worker 结束, 运行中的任务减一
        }()

        for {
            select { // 阻塞等待任务、结束信号到来
            case task, ok := <-p.chTask: // 从 channel 中消费任务
                if !ok { // 如果 channel 被关闭, 结束 worker 运行
                    return
                }
                // 执行任务
                task.Handler(task.Params...)
            }
        }
    }()
}

上述代码中, runningWorkers 的加减直接使用了自增运算, 但是考虑到启动多个 worker 时, runningWorkers 就会有数据竞争, 所以我们使用 sync.atomic 包来保证 runningWorkers 的自增操作是原子的。

对 runningWorkers 的操作进行封装:

func (p *Pool) incRunning() { // runningWorkers + 1
    atomic.AddUint64(&p.runningWorkers, 1)
}

func (p *Pool) decRunning() { // runningWorkers - 1
    atomic.AddUint64(&p.runningWorkers, ^uint64(0))
}

func (p *Pool) GetRunningWorkers() uint64 {
    return atomic.LoadUint64(&p.runningWorkers)
}

对于 capacity 的操作无需考虑数据竞争, 因为 capacity 在初始化时已经固定。封装 GetCap() 方法:

func (p *Pool) GetCap() uint64 {
    return p.capacity
}

趁热打铁, status 的操作也加锁封装为安全操作:


func (p *Pool) setStatus(status int64) bool {
    p.Lock()
    defer p.Unlock()

    if p.status == status {
        return false
    }

    p.status = status

    return true
}

run() 方法改造:

func (p *Pool) run() {
    p.incRunning()

    go func() {
        defer func() {
            p.decRunning()
        }()

        for {
            select {
            case task, ok := <-p.chTask:
                if !ok {
                    return
                }
                task.Handler(task.Params...)
            }
        }
    }()
}

生产任务

新建 Put() 方法用来将任务放入池中:

func (p *Pool) Put(task *Task) {

    // 加锁防止启动多个 worker
    p.Lock()
    defer p.Unlock()

    if p.GetRunningWorkers() < p.GetCap() { // 如果任务池满, 则不再创建 worker
        // 创建启动一个 worker
        p.run()
    }

    // 将任务推入队列, 等待消费
    p.chTask <- task
}

任务池安全关闭

当有关闭任务池来节省 goroutine 资源的场景时, 我们需要有一个关闭任务池的方法。

直接销毁 worker 关闭 channel 并不合适, 因为此时可能还有任务在队列中没有被消费掉。要确保所有任务被安全消费后再销毁掉 worker。

首先, 在关闭任务池时, 需要先关闭掉生产任务的入口。同时, 也要考虑到任务推送到 chTask 时 status 改变的问题。改造 Put() 方法:


var ErrPoolAlreadyClosed = errors.New("pool already closed")

func (p *Pool) Put(task *Task) error {
    p.Lock()
    defer p.Unlock()
    
    if p.status == STOPED { // 如果任务池处于关闭状态, 再 put 任务会返回 ErrPoolAlreadyClosed 错误
        return ErrPoolAlreadyClosed
    }
    
    // run worker
    if p.GetRunningWorkers() < p.GetCap() {
        p.run()
    }

    // send task
    if p.status == RUNNING {
        p.chTask <- task
    }
     
    return nil
}

在 run() 方法中已经对 chTask 的关闭进行了监听, 销毁 worker 只需等待任务被消费完后关闭 chTask。Close() 方法如下:

func (p *Pool) Close() {
    p.setStatus(STOPED) // 设置 status 为已停止

    for len(p.chTask) > 0 { // 阻塞等待所有任务被 worker 消费
        time.Sleep(1e6) // 防止等待任务清空 cpu 负载突然变大, 这里小睡一下
    }

    close(p.chTask) // 关闭任务队列
}

panic handler

每个 worker 都是一个 goroutine, 如果 goroutine 中产生了 panic, 会导致整个程序崩溃。为了保证程序的安全进行, 任务池需要对每个 worker 中的 panic 进行 recover 操作, 并提供可订制的 panic handler。

更新任务池定义:

type Pool struct {
    capacity       uint64
    runningWorkers uint64
    status          int64
    chTask          chan *Task
    sync.Mutex
    PanicHandler   func(interface{})
}

更新 run() 方法:

func (p *Pool) run() {
    p.incRunning()

    go func() {
        defer func() {
            p.decRunning()
            if r := recover(); r != nil { // 恢复 panic
                if p.PanicHandler != nil { // 如果设置了 PanicHandler, 调用
                    p.PanicHandler(r)
                } else { // 默认处理
                    log.Printf("Worker panic: %s\n", r)
                }
            }
        }()

        for {
            select {
            case task, ok := <-p.chTask:
                if !ok {
                    return
                }
                task.Handler(task.Params...)
            }
        }
    }()
}

可用 worker 检查

recover 后,gorotine 退出,当池的容量为 1 时,此时会有一个问题,观察 Put() 方法:


if p.GetRunningWorkers() < p.GetCap() {
    p.run() // 此时有一个 task (上一次 Put) panic,worker 退出了
}


if p.status == RUNNING {
    p.chTask <- task // 当前的 task 推送到 chTask,但是没有一个 worker 可以消费到,deadlock!
}

感谢提出这个场景的朋友,详细参考 issue 极端情况 #4,此问题已经在 release v1.5 中修复

为了解决这个 bug,我们需要在 worker 退出时检查当前是否还有运行着的 worker,如果没有,则创建一个,保证 task 可以被正常消费,checkWorker() 方法如下:

func (p *Pool) checkWorker() {
    p.Lock()
    defer p.Unlock()

    // 当前没有 worker 且有任务存在,运行一个 worker 消费任务
    // 没有任务无需考虑 (当前 Put 不会阻塞,下次 Put 会启动 worker)
    if p.runningWorkers == 0 && len(p.chTask) > 0 {
        p.run()
    }
}

改造 run() 方法:


func (p *Pool) run() {
    p.incRunning()

    go func() {
        defer func() {
            p.decRunning()
            if r := recover(); r != nil {
                if p.PanicHandler != nil {
                    p.PanicHandler(r)
                } else {
                    log.Printf("Worker panic: %s\n", r)
                }
            }
            p.checkWorker() // worker 退出时检测是否有可运行的 worker
        }()

        for {
            select {
            case task, ok := <-p.chTask:
                if !ok {
                    return
                }
                task.Handler(task.Params...)
            }
        }
    }()
}

使用

OK, 我们的任务池就这么简单的写好了, 试试:

func main() {
    // 创建任务池
    pool, err := NewPool(10)
    if err != nil {
        panic(err)
    }

    for i := 0; i < 20; i++ {
        // 任务放入池中
        pool.Put(&Task{
            Handler: func(v ...interface{}) {
                fmt.Println(v)
            },
            Params: []interface{}{i},
        })
    }

    time.Sleep(1e9) // 等待执行
}

详细例子见 mortar/examples

benchmark

作为协程池, 性能和内存占用的指标测试肯定是少不了的, 测试数据才是最有说服力的

测试流程

100w 次执行,原子增量操作

测试任务:

var wg = sync.WaitGroup{}

var sum int64

func demoTask(v ...interface{}) {
    defer wg.Done()
    for i := 0; i < 100; i++ {
        atomic.AddInt64(&sum, 1)
    }
}

测试方法:

var runTimes = 1000000
// 原生 goroutine
func BenchmarkGoroutineTimeLifeSetTimes(b *testing.B) {

    for i := 0; i < runTimes; i++ {
        wg.Add(1)
        go demoTask2()
    }
    wg.Wait() // 等待执行完毕
}

// 使用协程池
func BenchmarkPoolTimeLifeSetTimes(b *testing.B) {
    pool, err := NewPool(20)
    if err != nil {
        b.Error(err)
    }

    task := &Task{
        Handler: demoTask2,
    }

    for i := 0; i < runTimes; i++ {
        wg.Add(1)
        pool.Put(task)
    }

    wg.Wait() // 等待执行完毕
}

对比结果

模式 操作时间消耗 ns/op 内存分配大小 B/op 内存分配次数 allocs/op
原生 goroutine (100w goroutine) 1596177880 103815552 240022
任务池开启 20 个 worker 20 goroutine) 1378909099 15312 89

使用任务池和原生 goroutine 性能相近(略好于原生)

使用任务池比直接 goroutine 内存分配节省 7000 倍左右, 内存分配次数减少 2700 倍左右

tips: 当任务为耗时任务时, 防止任务堆积(消费不过来)可以结合业务调整容量, 或根据业务控制每个任务的超时时间

源码地址

该项目的全部源码详见 mortar

参考文章:

【1】线程的 3 种实现方式

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,053评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,527评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,779评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,685评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,699评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,609评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,989评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,654评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,890评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,634评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,716评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,394评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,976评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,950评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,191评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,849评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,458评论 2 342