设计理念
执行业务处理的 goroutine 不要通过共享内存的方式通信,而是要通过 Channel 通信的方式分享数据。
Channel 类型和基本并发原语是有竞争关系的,它应用于并发场景,涉及到 goroutine 之间的通讯,可以提供并发的保护,等等。
应用场景五种类型
- 数据交流:当作并发的 buffer 或者 queue,解决生产者 - 消费者问题。多个 goroutine 可以并发当作生产者(Producer)和消费者(Consumer)。
- 数据传递:一个 goroutine 将数据交给另一个 goroutine,相当于把数据的拥有权 (引用) 托付出去。
- 信号通知:一个 goroutine 可以将信号 (closing、closed、data ready 等) 传递给另一个或者另一组 goroutine 。
- 任务编排:可以让一组 goroutine 按照一定的顺序并发或者串行的执行,这就是编排的功能。
- 锁:利用 Channel 也可以实现互斥锁的机制。
- 控制并发执行的goroutine数量,例如令牌桶。
基本用法
Channel 分为只能接收、只能发送、既可以接收又可以发送三种类型。下面是它的语法定义:
ChannelType = ( "chan" | "chan" "<-" | "<-" "chan" ) ElementType .
相应地,Channel 的正确语法如下:
chan string // 可以发送接收string
chan<- struct{} // 只能发送struct{}
<-chan int // 只能从chan接收int
通过 make,我们可以初始化一个 chan,未初始化的 chan 的零值是 nil。你可以设置它的容量,比如下面的 chan 的容量是 9527,我们把这样的 chan 叫做 buffered chan;如果没有设置,它的容量是 0,我们把这样的 chan 叫做 unbuffered chan。
make(chan int, 9527)
如果 chan 中还有数据,那么,从这个 chan 接收数据的时候就不会阻塞,如果 chan 还未满(“满”指达到其容量),给它发送数据也不会阻塞,否则就会阻塞。unbuffered chan 只有读写都准备好之后才不会阻塞,这也是很多使用 unbuffered chan 时的常见 Bug。
nil 是 chan 的零值,是一种特殊的 chan,对值是 nil 的 chan 的发送接收调用者总是会阻塞。
for + select + case + chan
func main() {
var ch = make(chan int, 10)
for i := 0; i < 10; i++ {
select {
case ch <- i:
case v := <-ch:
fmt.Println(v)
}
}
}
使用for + select循环监控多个chan的情况,一旦发送成功 or 接收数据成功则会完成一次循环
for chan
for v := range ch {
fmt.Println(v)
}
可以一直从ch中等待读取数据,直到ch关闭才会退出循环
实现原理
- qcount:代表 chan 中已经接收但还没被取走的元素的个数。内建函数 len 可以返回这个字段的值。- dataqsiz:队列的大小。chan 使用一个循环队列来存放元素,循环队列很适合这种生产者 - 消费者的场景(我很好奇为什么这个字段省略 size 中的 e)。
- buf:存放元素的循环队列的 buffer。
- elemtype 和 elemsize:chan 中元素的类型和 size。因为 chan 一旦声明,它的元素类型是固定的,即普通类型或者指针类型,所以元素大小也是固定的。
- sendx:处理发送数据的指针在 buf 中的位置。一旦接收了新的数据,指针就会加上 elemsize,移向下一个位置。buf 的总大小是 elemsize 的整数倍,而且 buf 是一个循环列表。
- recvx:处理接收请求时的指针在 buf 中的位置。一旦取出数据,此指针会移动到下一个位置。
- recvq:chan 是多生产者多消费者的模式,如果消费者因为没有数据可读而被阻塞了,就会被加入到 recvq 队列中。
- sendq:如果生产者因为 buf 满了而阻塞,会被加入到 sendq 队列中。
send、recv、close流程
本质是 "值的拷贝"
发送操作步骤
- 在发送数据的逻辑执行之前会先为当前 Channel 加锁,防止多个线程并发修改数据
- 如果当前 Channel 的 recvq 上存在已经被阻塞的 Goroutine,那么会直接将数据发送给当前 Goroutine 并将其设置成下一个运行的 Goroutine
- 如果recvq为空,则判断缓冲区是否可写,可写则从当前goroutine复制数据到缓冲区中
- 如果不满足上面的两种情况,会创建一个 runtime.sudog 结构并将其加入 Channel 的 sendq 队列中,当前 Goroutine 也会陷入阻塞等待其他的协程从 Channel 接收数据
- 写入完成释放锁.
发送数据的过程中包含几个会触发 Goroutine 调度的时机:
- 发送数据时发现 Channel 上存在等待接收数据的 Goroutine,立刻设置处理器的 runnext 属性,但是并不会立刻触发调度;
- 发送数据时并没有找到接收方并且缓冲区已经满了,这时会将自己加入 Channel 的 sendq 队列并调用 runtime.goparkunlock 触发 Goroutine 的调度让出处理器的使用权;
读取/接收操作步骤
- 在接收数据的逻辑执行之前会先为当前 Channel 加锁,防止多个线程并发修改数据;
- 如果 Channel 为空,那么会直接调用 runtime.gopark 挂起当前 Goroutine;
- 如果 Channel 已经关闭并且缓冲区没有任何数据,runtime.chanrecv 会直接返回;
- 如果 Channel 的 sendq 队列中存在挂起的 Goroutine, 如果不存在缓冲区,则将 Channel 发送队列中 Goroutine 存储的 elem 数据拷贝到目标内存地址中; 如果存在缓冲区; 将队列头中的数据拷贝到接收方的内存地址; 然后将发送队列下一个的数据拷贝到缓冲区中,释放一个阻塞的发送方;
- 如果 Channel 的缓冲区中包含数据,那么直接读取 recvx 索引对应的数据;
- 在默认情况下会挂起当前的 Goroutine,将 runtime.sudog 结构加入 recvq 队列并陷入休眠等待调度器的唤醒;
- 读取完成后释放锁.
总结一下从 Channel 接收数据时,会触发 Goroutine 调度的两个时机:
- 当 Channel 为空时;
- 当缓冲区中不存在数据并且也不存在数据的发送者时.
关闭流程
- 加锁.
- 接着把所有挂在这个 channel 上的 sender 和 receiver 全都连成一个 sudog 链表
- 解锁.
- 最后,再将所有的 sudog 全都唤醒.
- 对于等待接收者而言,会收到一个相应类型的零值。对于等待发送者,会直接 panic。所以,在不了解 channel 还有没有发送者的情况下,不能贸然关闭 channel. 所以最好是由唯一的channel发送者去执行关闭操作.
关闭后
关闭流程仅仅是做一些标记和通知操作, 实际上没有回收掉这个chan, chan依然是可读的, 当读到第二个字段为false/nil时, 代表chanel已经关闭, 通道没有数据. 对于一个 channel,如果最终没有任何 goroutine 引用它,不管 channel 有没有被关闭,最终都会被 gc 回收
一个泄漏资源例子
func process(timeout time.Duration) bool {
ch := make(chan bool)
go func() {
// 模拟处理耗时的业务
time.Sleep((timeout + time.Second))
ch <- true // block
fmt.Println("exit goroutine")
}()
select {
case result := <-ch:
return result
case <-time.After(timeout):
return false
}
}
在这个例子中,process 函数会启动一个 goroutine,去处理需要长时间处理的业务,处理完之后,会发送 true 到 chan 中,目的是通知其它等待的 goroutine,可以继续处理了。
主 goroutine 接收到任务处理完成的通知,或者超时后就返回了。
如果发生超时,process 函数就返回了,这就会导致 unbuffered 的 chan 从来就没有被读取。我们知道,unbuffered chan 必须等 reader 和 writer 都准备好了才能交流,否则就会阻塞。超时导致未读,结果就是子 goroutine 就阻塞在第 7 行永远结束不了,进而导致 goroutine 泄漏。
解决这个 Bug 的办法很简单,就是将 unbuffered chan 改成容量为 1 的 chan,这样第 7 行就不会被阻塞了。
chan 与 传统并发原语选择
- 共享资源的并发访问使用传统并发原语;
- 复杂的任务编排和消息传递使用 Channel;
- 消息通知机制使用 Channel,除非只想 signal 一个 goroutine,才使用 Cond;
- 简单等待所有任务的完成用 WaitGroup,也有 Channel 的推崇者用 Channel,都可以;
- 需要和 Select 语句结合,使用 Channel;
- 需要和超时配合时,使用 Channel 和 Context。
panic情况汇总
文章来源
<<极客时间>>Go 并发编程实战课13讲