golang实现协程池

golang中启动一个协程不会消耗太多资源,有人认为可以不用协程池。但是当访问量增大时,可能造成内存消耗完,程序崩溃。于是写了一个协程池的Demo。

Demo中有worker和job。worker是一个协程,在worker中完成一个job。Jobs是一个channel,使用Jobs记录job。当生成一个新任务,就发送到Jobs中。程序启动时,首先启动3个worker协程,每个协程都尝试从Jobs中接收job。如果Jobs中没有job,worker协程就等待。

基本逻辑如下:

  1. Jobs管道存放job,Results管道存放结果。
  2. 程序一启动,启动3个worker协程,等待从Jobs管道中取数据。
  3. 向Jobs管道中发送3个数据。
  4. 关闭Jobs管道。
  5. worker协程从Jobs管道中接收到数据以后,执行程序,把结果放到Results管道中。然后继续等待。
  6. 当Jobs管道中没有数据,并且Results有3个数据时。退出主程序。

代码如下:

package main

import (
    "fmt"
    "time"
)

func worker(id int) {
    go func() {
        for {
            fmt.Println("Waiting for job...")
            select {
            // Receive from channel
            case j := <-Jobs :
                fmt.Println("worker", id, "started  job", j)
                time.Sleep(time.Second)
                fmt.Println("worker", id, "finished job", j)
                Results <- true
            }
        }
    }()
}

const channelLength = 3

var (
    Jobs chan int
    Results chan bool
)

func main() {
    Jobs = make(chan int, channelLength)
    Results = make(chan bool, channelLength)

    // Start worker goroutines
    for i:= 0; i < channelLength; i++ {
        worker(i)
    }

    // Send to channel
    time.Sleep(time.Second)
    for j := 0; j < channelLength; j++ {
        Jobs <- j
    }
    close(Jobs)

    for len(Jobs) != 0 || len(Results) != channelLength  {
        time.Sleep(100 * time.Millisecond)
    }
    fmt.Println("Complete main")
}

运行结果如下:

Waiting for job...
Waiting for job...
Waiting for job...
worker 1 started  job 2
worker 2 started  job 0
worker 0 started  job 1
worker 0 finished job 1
Waiting for job...
worker 0 started  job 0
worker 2 finished job 0
Waiting for job...
worker 2 started  job 0
worker 1 finished job 2
Waiting for job...
worker 1 started  job 0
Complete main

这个程序出现问题了,bug在哪里?

开始的3次,协程运行都是正常。

worker 1 started  job 2
worker 2 started  job 0
worker 0 started  job 1
worker 0 finished job 1
worker 2 finished job 0
worker 1 finished job 2

根据设计,向Jobs管道中发送3个数据以后,就关闭了管道。此后,协程不应该再从Jobs管道中接收到数据。

for j := 0; j < channelLength; j++ {
        jobs <- j
    }
close(jobs)

实际运行中,协程接收完3个数据以后,worker还能不断的从Jobs管道中接收到数据。与设计不符。

worker 0 started  job 0
worker 2 started  job 0
worker 1 started  job 0

开始以为问题出在worker()中,j := <- job,只有当job中有返回,才会打印worker started。但是后面的job id都是0,说明没有向jobs管道中发送新数据。

for {
            fmt.Println("Waiting for job...")
            select {
            case j := <-Jobs :
                fmt.Println("worker", id, "started  job", j)
                time.Sleep(time.Second)
                fmt.Println("worker", id, "finished job", j)
                Results <- true
            }
        }

研究向Jobs管道发送数据的代码,突发奇想,把close(Jobs)注释掉,看看如何。

for j := 0; j < channelLength; j++ {
        Jobs <- j
    }
//close(Jobs)

程序居然正常了。

Waiting for job...
Waiting for job...
Waiting for job...
worker 1 started  job 0
worker 0 started  job 2
worker 2 started  job 1
worker 1 finished job 0
worker 0 finished job 2
Waiting for job...
Waiting for job...
worker 2 finished job 1
Waiting for job...
Complete main

原来问题出在close()上,马上查注释。close()是在sender中调用,当管道中最后一个数据被接收以后,就关闭管道。此时,不能再向管道中发送数据。否则会报错panic: send on closed channel

使用x, ok := <-c可以判断一个管道是否关闭,如果管道已经关闭,ok的值为false

管道关闭以后,并且管道中的数据被接收完以后,居然还能从管道中接收到数据0。于是就造成了后续协程接收到job 0的问题。

// The close built-in function closes a channel, which must be either
// bidirectional or send-only. It should be executed only by the sender,
// never the receiver, and has the effect of shutting down the channel after
// the last sent value is received. After the last value has been received
// from a closed channel c, any receive from c will succeed without
// blocking, returning the zero value for the channel element. The form
//  x, ok := <-c
// will also set ok to false for a closed channel.
func close(c chan<- Type)

如果要使用close,应该怎么做

管道不用时,close()管道是个好习惯。此时,应该怎么解决这个问题呢?首先要在协程中检查接收到的数据,j:=<-jobs,判断j是否为0。如果Jobs中存放的是非指针数据,不能分辨0是真正的0值,还是close以后接收到的0。因此需要在Jobs管道中存放指针。管道打开时,接收的都是非nil指针。close以后才返回0,也就是nil指针。

修改程序。新生成一个机构体Job。

type Job struct {
    JobId int
}

Jobs保存指向Job的指针。

Jobs chan *Job
func main() {
    Jobs = make(chan *Job, channelLength)
    ...
    for j := 0; j < channelLength; j++ {
        Jobs <- &Job{JobId:j}
    }
    close(Jobs)
    ...
}

在worker协程中,从管道取出Job指针以后,判断指针是否为nil。如果为nil,说明管道已经关闭,协程退出。

func worker(id int) {
    go func() {
        for {
            fmt.Println("Waiting for job...")
            select {
            // Receive from channel
            case j := <-Jobs :
                if j == nil {
                    fmt.Println("Close the worker", id)
                    return
                }
                fmt.Println("worker", id, "started  job", j.JobId)
                time.Sleep(time.Second)
                fmt.Println("worker", id, "finished job", j.JobId)
                Results <- true
            }
        }
    }()
}

运行结果达到预期。

Waiting for job...
Waiting for job...
Waiting for job...
worker 0 started  job 0
worker 1 started  job 1
worker 2 started  job 2
worker 2 finished job 2
worker 0 finished job 0
Waiting for job...
Waiting for job...
Close the worker 2
Close the worker 0
worker 1 finished job 1
Waiting for job...
Close the worker 1
Complete main

附上最终的代码。

package main

import (
    "fmt"
    "time"
)

type Job struct {
    JobId int
}

func worker(id int) {
    go func() {
        for {
            fmt.Println("Waiting for job...")
            select {
            // Receive from channel
            case j := <-Jobs :
                if j == nil {
                    fmt.Println("Close the worker", id)
                    return
                }
                fmt.Println("worker", id, "started  job", j.JobId)
                time.Sleep(time.Second)
                fmt.Println("worker", id, "finished job", j.JobId)
                Results <- true
            }
        }
    }()
}

const channelLength = 3

var (
    Jobs chan *Job
    Results chan bool
)

func main() {
    Jobs = make(chan *Job, channelLength)
    Results = make(chan bool, channelLength)

    // Start worker goroutines
    for i:= 0; i < channelLength; i++ {
        worker(i)
    }

    // Send to channel
    time.Sleep(time.Second)
    for j := 0; j < channelLength; j++ {
        Jobs <- &Job{JobId:j}
    }
    close(Jobs)

    for len(Jobs) != 0 || len(Results) != channelLength  {
        time.Sleep(100 * time.Millisecond)
    }
    fmt.Println("Complete main")
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,099评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,473评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,229评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,570评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,427评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,335评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,737评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,392评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,693评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,730评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,512评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,349评论 3 314
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,750评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,017评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,290评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,706评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,904评论 2 335

推荐阅读更多精彩内容

  • 原文链接:https://github.com/EasyKotlin 在常用的并发模型中,多进程、多线程、分布式是...
    JackChen1024阅读 10,689评论 3 23
  • Coroutine in Python 引言: 本文出自David Beazley 的关于协程的PPT,现在笔者将...
    LumiaXu阅读 1,589评论 4 8
  • 第一章 Nginx简介 Nginx是什么 没有听过Nginx?那么一定听过它的“同行”Apache吧!Ngi...
    JokerW阅读 32,608评论 24 1,002
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,511评论 18 139
  • 小景新透绿窗纱 映阶春色 入目无瑕 囿于幽幽屋檐下 红粉佳人 锦瑟年华 顾盼不见去时路 相思难掩 潸然泪下 ...
    万象live阅读 136评论 0 1