sync.Cond
实现了一个条件变量,用于等待一个或一组goroutines满足条件后唤醒的场景。每个
Cond关联一个
Locker通常是一个
*Mutex或
RWMutex`根据需求初始化不同的锁。
基本用法
老规矩正式剖析源码前,先来看看sync.Cond
如何使用。比如我们实现一个FIFO
的队列
package main
import (
"fmt"
"math/rand"
"os"
"os/signal"
"sync"
"time"
)
type FIFO struct {
lock sync.Mutex
cond *sync.Cond
queue []int
}
type Queue interface {
Pop() int
Offer(num int) error
}
func (f *FIFO) Offer(num int) error {
f.lock.Lock()
defer f.lock.Unlock()
f.queue = append(f.queue, num)
f.cond.Broadcast()
return nil
}
func (f *FIFO) Pop() int {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
f.cond.Wait()
}
item := f.queue[0]
f.queue = f.queue[1:]
return item
}
}
func main() {
l := sync.Mutex{}
fifo := &FIFO{
lock: l,
cond: sync.NewCond(&l),
queue: []int{},
}
go func() {
for {
fifo.Offer(rand.Int())
}
}()
time.Sleep(time.Second)
go func() {
for {
fmt.Println(fmt.Sprintf("goroutine1 pop-->%d", fifo.Pop()))
}
}()
go func() {
for {
fmt.Println(fmt.Sprintf("goroutine2 pop-->%d", fifo.Pop()))
}
}()
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
<-ch
}
我们定一个FIFO
队列有Offer
和Pop
两个操作,我们起一个gorountine
不断向队列投放数据,另外两个gorountine
不断取拿数据。
-
Pop
操作会判断如果队列里没有数据len(f.queue) == 0
则调用f.cond.Wait()
将goroutine
挂起。 - 等到
Offer
操作投放数据成功,里面调用f.cond.Broadcast()
来唤醒所有挂起在这个mutex
上的goroutine
。当然sync.Cond
也提供了一个Signal()
,有点儿类似Java中的notify()
和notifyAll()
的意思 主要是唤醒一个和唤醒全部的区别。
总结一下sync.Mutex
的大致用法
- 首先声明一个
mutex
,这里sync.Mutex
/sync.RWMutex
可根据实际情况选用 - 调用
sync.NewCond(l Locker) *Cond
使用1中的mutex
作为入参 注意 这里传入的是指针 为了避免c.L.Lock()
、c.L.Unlock()
调用频繁复制锁 导致死锁 - 根据业务条件 满足则调用
cond.Wait()
挂起goroutine
-
cond.Broadcast()
唤起所有挂起的gorotune
另一个方法cond.Signal()
唤醒一个最先挂起的goroutine
需要注意的是cond.wait()
的使用需要参照如下模版 具体为啥我们后续分析
c.L.Lock()
for !condition() {
c.Wait()
}
... make use of condition ...
c.L.Unlock()
源码分析
数据结构
分析具体方法前,我们先来了解下sync.Cond
的数据结构。具体源码如下:
type Cond struct {
noCopy noCopy // Cond使用后不允许拷贝
// L is held while observing or changing the condition
L Locker
//通知列表调用wait()方法的goroutine会被放到notifyList中
notify notifyList
checker copyChecker //检查Cond实例是否被复制
}
noCopy
之前讲过 不清楚的可以看下《你真的了解mutex吗》,除此之外,Locker
是我们刚刚谈到的mutex
,copyChecker
是用来检查Cond实例是否被复制的,就有一个方法 :
func (c *copyChecker) check() {
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}
大致意思是说,初始type copyChecker uintptr
默认为0,当第一次调用check()
会将copyChecker
自身的地址复制给自己,至于为什么uintptr(*c) != uintptr(unsafe.Pointer(c))
会被调用2次,因为期间goroutine
可能已经改变copyChecker
。二次调用如果不相等,则说明sync.Cond
被复制,重新分配了内存地址。
sync.Cond
比较有意思的是notifyList
type notifyList struct {
// wait is the ticket number of the next waiter. It is atomically
// incremented outside the lock.
wait uint32 // 等待goroutine操作的数量
// notify is the ticket number of the next waiter to be notified. It can
// be read outside the lock, but is only written to with lock held.
//
// Both wait & notify can wrap around, and such cases will be correctly
// handled as long as their "unwrapped" difference is bounded by 2^31.
// For this not to be the case, we'd need to have 2^31+ goroutines
// blocked on the same condvar, which is currently not possible.
notify uint32 // 唤醒goroutine操作的数量
// List of parked waiters.
lock mutex
head *sudog
tail *sudog
}
包含了3类字段:
-
wait
和notify
两个无符号整型,分别表示了Wait()
操作的次数和goroutine
被唤醒的次数,wait
应该是恒大于等于notify
-
lock mutex
这个跟sync.Mutex
我们分析信号量阻塞队列时semaRoot
里的mutex
一样,并不是Go
提供开发者使用的sync.Mutex
,而是系统内部运行时实现的一个简单版本的互斥锁。 -
head
和tail
看名字,我们就能脑补出跟链表很像 没错这里就是维护了阻塞在当前sync.Cond
上的goroutine
构成的链表
整体来讲sync.Cond
大体结构为:
操作方法
Wait()操作
func (c *Cond) Wait() {
//1. 检查cond是否被拷贝
c.checker.check()
//2. notifyList.wait+1
t := runtime_notifyListAdd(&c.notify)
//3. 释放锁 让出资源给其他goroutine
c.L.Unlock()
//4. 挂起goroutine
runtime_notifyListWait(&c.notify, t)
//5. 尝试获得锁
c.L.Lock()
}
从Wait()
方法源码很容易看出它的操作大概分了5步:
- 调用
copyChecker.check()
保证sync.Cond
不会被拷贝 - 每次调用
Wait()
会将sync.Cond.notifyList.wait
属性进行加一操作,这也是它完成FIFO
的基石,根据wait
来判断`goroutine1等待的顺序
//go:linkname notifyListAdd sync.runtime_notifyListAdd
func notifyListAdd(l *notifyList) uint32 {
// This may be called concurrently, for example, when called from
// sync.Cond.Wait while holding a RWMutex in read mode.
return atomic.Xadd(&l.wait, 1) - 1
}
- 调用
c.L.Unlock()
释放锁,因为当前goroutine
即将被gopark
,让出锁给其他goroutine
避免死锁 - 调用
runtime_notifyListWait(&c.notify, t)
可能稍微复杂一点儿
// notifyListWait waits for a notification. If one has been sent since
// notifyListAdd was called, it returns immediately. Otherwise, it blocks.
//go:linkname notifyListWait sync.runtime_notifyListWait
func notifyListWait(l *notifyList, t uint32) {
lockWithRank(&l.lock, lockRankNotifyList)
// 如果已经被唤醒 则立即返回
if less(t, l.notify) {
unlock(&l.lock)
return
}
// Enqueue itself.
s := acquireSudog()
s.g = getg()
// 把等待递增序号赋值给s.ticket 为FIFO打基础
s.ticket = t
s.releasetime = 0
t0 := int64(0)
if blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
// 将当前goroutine插入到notifyList链表中
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
// 最终调用gopark挂起当前goroutine
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
if t0 != 0 {
blockevent(s.releasetime-t0, 2)
}
// goroutine被唤醒后释放sudog
releaseSudog(s)
}
主要完成两个任务:
- 将当前goroutine插入到notifyList链表中
- 调用gopark将当前goroutine挂起
- 当其他goroutine调用了
Signal
或Broadcast
方法,当前goroutine
被唤醒后 再次尝试获得锁
Signal操作
Signal
唤醒一个等待时间最长的goroutine
,调用时不要求持有锁。
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
具体实现也不复杂,先判断sync.Cond
是否被复制,然后调用runtime_notifyListNotifyOne
//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
func notifyListNotifyOne(l *notifyList) {
// wait==notify 说明没有等待的goroutine了
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
lockWithRank(&l.lock, lockRankNotifyList)
// 锁下二次检查
t := l.notify
if t == atomic.Load(&l.wait) {
unlock(&l.lock)
return
}
// 更新下一个需要被唤醒的ticket number
atomic.Store(&l.notify, t+1)
// Try to find the g that needs to be notified.
// If it hasn't made it to the list yet we won't find it,
// but it won't park itself once it sees the new notify number.
//
// This scan looks linear but essentially always stops quickly.
// Because g's queue separately from taking numbers,
// there may be minor reorderings in the list, but we
// expect the g we're looking for to be near the front.
// The g has others in front of it on the list only to the
// extent that it lost the race, so the iteration will not
// be too long. This applies even when the g is missing:
// it hasn't yet gotten to sleep and has lost the race to
// the (few) other g's that we find on the list.
//这里是FIFO实现的核心 其实就是遍历链表 sudog.ticket查找指定需要唤醒的节点
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
unlock(&l.lock)
s.next = nil
readyWithTime(s, 4)
return
}
}
unlock(&l.lock)
}
主要逻辑:
- 判断是否存在等待需要被唤醒的goroutine 没有直接返回
- 递增
notify
属性,因为是根据notify
和sudog.ticket
匹配来查找需要唤醒的goroutine
,因为其是递增生成的,故而有了FIFO
语义。 - 遍历notifyList持有的链表,从
head
开始依据next
指针依次遍历。这个过程是线性的,故而时间复杂度为O(n),不过官方说法这个过程实际比较快This scan looks linear but essentially always stops quickly.
有个小细节:还记得我们Wait()
操作中,wait
属性原子更新和goroutine插入等待链表是两个单独的步骤,所以存在竞争的情况下,链表中的节点可能会轻微的乱序产生。但是不要担心,因为ticket是原子递增的 所以唤醒顺序不会乱。
Broadcast操作
Broadcast()
与Singal()
区别主要是它可以唤醒全部等待的goroutine
,并直接将wait
属性的值赋值给notify
。
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
// notifyListNotifyAll notifies all entries in the list.
//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
func notifyListNotifyAll(l *notifyList) {
// Fast-path 无等待goroutine直接返回
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
lockWithRank(&l.lock, lockRankNotifyList)
s := l.head
l.head = nil
l.tail = nil
// 直接更新notify=wait
atomic.Store(&l.notify, atomic.Load(&l.wait))
unlock(&l.lock)
// 依次调用goready唤醒goroutine
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}
逻辑比较简单不再赘述
总结
-
sync.Cond
一旦创建使用 不允许被拷贝,由noCopy
和copyChecker
来限制保护。 -
Wait()
操作先是递增notifyList.wait
属性 然后将goroutine
封装进sudog
,将notifyList.wait
赋值给sudog.ticket
,然后将sudog
插入notifyList
链表中 -
Singal()
实际是按照notifyList.notify
跟notifyList
链表中节点的ticket
匹配 来确定唤醒的goroutine,因为notifyList.notify
和notifyList.wait
都是原子递增的,故而有了FIFO
的语义 -
Broadcast()
相对简单 就是唤醒全部等待的goroutine