golang中启动一个协程不会消耗太多资源,有人认为可以不用协程池。但是当访问量增大时,可能造成内存消耗完,程序崩溃。于是写了一个协程池的Demo。
Demo中有worker和job。worker是一个协程,在worker中完成一个job。Jobs是一个channel
,使用Jobs记录job。当生成一个新任务,就发送到Jobs中。程序启动时,首先启动3个worker协程,每个协程都尝试从Jobs中接收job。如果Jobs中没有job,worker协程就等待。
基本逻辑如下:
- Jobs管道存放job,Results管道存放结果。
- 程序一启动,启动3个worker协程,等待从Jobs管道中取数据。
- 向Jobs管道中发送3个数据。
- 关闭Jobs管道。
- worker协程从Jobs管道中接收到数据以后,执行程序,把结果放到Results管道中。然后继续等待。
- 当Jobs管道中没有数据,并且Results有3个数据时。退出主程序。
代码如下:
package main
import (
"fmt"
"time"
)
func worker(id int) {
go func() {
for {
fmt.Println("Waiting for job...")
select {
// Receive from channel
case j := <-Jobs :
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
Results <- true
}
}
}()
}
const channelLength = 3
var (
Jobs chan int
Results chan bool
)
func main() {
Jobs = make(chan int, channelLength)
Results = make(chan bool, channelLength)
// Start worker goroutines
for i:= 0; i < channelLength; i++ {
worker(i)
}
// Send to channel
time.Sleep(time.Second)
for j := 0; j < channelLength; j++ {
Jobs <- j
}
close(Jobs)
for len(Jobs) != 0 || len(Results) != channelLength {
time.Sleep(100 * time.Millisecond)
}
fmt.Println("Complete main")
}
运行结果如下:
Waiting for job...
Waiting for job...
Waiting for job...
worker 1 started job 2
worker 2 started job 0
worker 0 started job 1
worker 0 finished job 1
Waiting for job...
worker 0 started job 0
worker 2 finished job 0
Waiting for job...
worker 2 started job 0
worker 1 finished job 2
Waiting for job...
worker 1 started job 0
Complete main
这个程序出现问题了,bug在哪里?
开始的3次,协程运行都是正常。
worker 1 started job 2
worker 2 started job 0
worker 0 started job 1
worker 0 finished job 1
worker 2 finished job 0
worker 1 finished job 2
根据设计,向Jobs管道中发送3个数据以后,就关闭了管道。此后,协程不应该再从Jobs管道中接收到数据。
for j := 0; j < channelLength; j++ {
jobs <- j
}
close(jobs)
实际运行中,协程接收完3个数据以后,worker还能不断的从Jobs管道中接收到数据。与设计不符。
worker 0 started job 0
worker 2 started job 0
worker 1 started job 0
开始以为问题出在worker()中,j := <- job
,只有当job中有返回,才会打印worker started
。但是后面的job id都是0,说明没有向jobs管道中发送新数据。
for {
fmt.Println("Waiting for job...")
select {
case j := <-Jobs :
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
Results <- true
}
}
研究向Jobs管道发送数据的代码,突发奇想,把close(Jobs)
注释掉,看看如何。
for j := 0; j < channelLength; j++ {
Jobs <- j
}
//close(Jobs)
程序居然正常了。
Waiting for job...
Waiting for job...
Waiting for job...
worker 1 started job 0
worker 0 started job 2
worker 2 started job 1
worker 1 finished job 0
worker 0 finished job 2
Waiting for job...
Waiting for job...
worker 2 finished job 1
Waiting for job...
Complete main
原来问题出在close()
上,马上查注释。close()
是在sender
中调用,当管道中最后一个数据被接收以后,就关闭管道。此时,不能再向管道中发送数据。否则会报错panic: send on closed channel
。
使用x, ok := <-c
可以判断一个管道是否关闭,如果管道已经关闭,ok
的值为false
。
管道关闭以后,并且管道中的数据被接收完以后,居然还能从管道中接收到数据0
。于是就造成了后续协程接收到job 0
的问题。
// The close built-in function closes a channel, which must be either
// bidirectional or send-only. It should be executed only by the sender,
// never the receiver, and has the effect of shutting down the channel after
// the last sent value is received. After the last value has been received
// from a closed channel c, any receive from c will succeed without
// blocking, returning the zero value for the channel element. The form
// x, ok := <-c
// will also set ok to false for a closed channel.
func close(c chan<- Type)
如果要使用close,应该怎么做
管道不用时,close()
管道是个好习惯。此时,应该怎么解决这个问题呢?首先要在协程中检查接收到的数据,j:=<-jobs
,判断j
是否为0
。如果Jobs
中存放的是非指针数据
,不能分辨0
是真正的0值
,还是close以后接收到的0
。因此需要在Jobs
管道中存放指针。管道打开时,接收的都是非nil
指针。close以后才返回0
,也就是nil
指针。
修改程序。新生成一个机构体Job。
type Job struct {
JobId int
}
Jobs保存指向Job的指针。
Jobs chan *Job
func main() {
Jobs = make(chan *Job, channelLength)
...
for j := 0; j < channelLength; j++ {
Jobs <- &Job{JobId:j}
}
close(Jobs)
...
}
在worker协程中,从管道取出Job指针以后,判断指针是否为nil。如果为nil,说明管道已经关闭,协程退出。
func worker(id int) {
go func() {
for {
fmt.Println("Waiting for job...")
select {
// Receive from channel
case j := <-Jobs :
if j == nil {
fmt.Println("Close the worker", id)
return
}
fmt.Println("worker", id, "started job", j.JobId)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j.JobId)
Results <- true
}
}
}()
}
运行结果达到预期。
Waiting for job...
Waiting for job...
Waiting for job...
worker 0 started job 0
worker 1 started job 1
worker 2 started job 2
worker 2 finished job 2
worker 0 finished job 0
Waiting for job...
Waiting for job...
Close the worker 2
Close the worker 0
worker 1 finished job 1
Waiting for job...
Close the worker 1
Complete main
附上最终的代码。
package main
import (
"fmt"
"time"
)
type Job struct {
JobId int
}
func worker(id int) {
go func() {
for {
fmt.Println("Waiting for job...")
select {
// Receive from channel
case j := <-Jobs :
if j == nil {
fmt.Println("Close the worker", id)
return
}
fmt.Println("worker", id, "started job", j.JobId)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j.JobId)
Results <- true
}
}
}()
}
const channelLength = 3
var (
Jobs chan *Job
Results chan bool
)
func main() {
Jobs = make(chan *Job, channelLength)
Results = make(chan bool, channelLength)
// Start worker goroutines
for i:= 0; i < channelLength; i++ {
worker(i)
}
// Send to channel
time.Sleep(time.Second)
for j := 0; j < channelLength; j++ {
Jobs <- &Job{JobId:j}
}
close(Jobs)
for len(Jobs) != 0 || len(Results) != channelLength {
time.Sleep(100 * time.Millisecond)
}
fmt.Println("Complete main")
}