Go 语言作为一个原生支持用户态进程(Goroutine)的语言,当提到并发编程、多线程编程时,往往都离不开锁这一概念。锁是一种并发编程中的同步原语(Synchronization Primitives),它能保证多个 Goroutine 在访问同一片内存时不会出现竞争条件(Race condition)等问题。
基本原语
Mutex
Go 语言的 sync.Mutex
由两个字段 state
和 sema
组成。其中 state
表示当前互斥锁的状态,而 sema
是用于控制锁状态的信号量。
type Mutex struct {
state int32
sema uint32
}
状态
互斥锁的状态比较复杂,如下图所示,最低三位分别表示 mutexLocked、mutexWoken 和 mutexStarving,剩下的位置用来表示当前有多少个 Goroutine 在等待互斥锁的释放:
在默认情况下,互斥锁的所有状态位都是 0,int32 中的不同位分别表示了不同的状态:
- mutexLocked — 表示互斥锁的锁定状态;
- mutexWoken — 表示从正常模式被从唤醒;
- mutexStarving — 当前的互斥锁进入饥饿状态;
- waitersCount — 当前互斥锁上等待的 Goroutine 个数;
正常模式和饥饿模式
sync.Mutex
有两种模式 — 正常模式和饥饿模式
在正常模式下,锁的等待者会按照先进先出的顺序获取锁。但是刚被唤起的 Goroutine 与新创建的 Goroutine 竞争时,大概率会获取不到锁,为了减少这种情况的出现,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换饥饿模式,防止部分 Goroutine 被『饿死』。
饥饿模式是在 Go 语言在 1.9 中通过提交 sync: make Mutex more fair 引入的优化,引入的目的是保证互斥锁的公平性。
在饥饿模式中,互斥锁会直接交给等待队列最前面的 Goroutine。新的 Goroutine 在该状态下不能获取锁、也不会进入自旋状态,它们只会在队列的末尾等待。如果一个 Goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会切换回正常模式。
与饥饿模式相比,正常模式下的互斥锁能够提供更好地性能,饥饿模式的能避免 Goroutine 由于陷入等待无法获取锁而造成的高尾延时。
加锁和解锁
互斥锁的加锁是靠 sync.Mutex.Lock
完成的,解锁过程是靠sync.Mutex.Unlock
来完成。
互斥锁的加锁过程比较复杂,它涉及自旋、信号量以及调度等概念:
- 如果互斥锁处于初始化状态,会通过置位
mutexLocked
加锁; - 如果互斥锁处于
mutexLocked
状态并且在普通模式下工作,会进入自旋,执行 30 次PAUSE
指令消耗 CPU 时间等待锁的释放; - 如果当前 Goroutine 等待锁的时间超过了 1ms,互斥锁就会切换到饥饿模式;
- 互斥锁在正常情况下会通过
runtime.sync_runtime_SemacquireMutex
将尝试获取锁的 Goroutine 切换至休眠状态,等待锁的持有者唤醒; - 如果当前 Goroutine 是互斥锁上的最后一个等待的协程或者等待的时间小于 1ms,那么它会将互斥锁切换回正常模式;
互斥锁的解锁过程与之相比就比较简单,其代码行数不多、逻辑清晰,也比较容易理解:
- 当互斥锁已经被解锁时,调用
sync.Mutex.Unlock
会直接抛出异常; - 当互斥锁处于饥饿模式时,将锁的所有权交给队列中的下一个等待者,等待者会负责设置
mutexLocked
标志位; - 当互斥锁处于普通模式时,如果没有 Goroutine 等待锁的释放或者已经有被唤醒的 Goroutine 获得了锁,会直接返回;在其他情况下会通过
sync.runtime_Semrelease
唤醒对应的 Goroutine;
RWMutex
读写互斥锁 sync.RWMutex
是细粒度的互斥锁,它不限制资源的并发读,但是读写、写写操作无法并行执行。
读 | 写 | |
---|---|---|
读 | Y | N |
写 | N | N |
结构体
type RWMutex struct {
w Mutex
writerSem uint32
readerSem uint32
readerCount int32
readerWait int32
}
- w — 复用互斥锁提供的能力;
- writerSem 和 readerSem — 分别用于写等待读和读等待写:
- readerCount 存储了当前正在执行的读操作数量;
- readerWait 表示当写操作被阻塞时等待的读操作个数;
- 写操作使用
sync.RWMutex.Lock
和sync.RWMutex.Unlock
方法; - 读操作使用
sync.RWMutex.RLock
和sync.RWMutex.RUnlock
方法;
虽然读写互斥锁sync.RWMutex
提供的功能比较复杂,但是因为它建立在 [sync.Mutex
] 上,所以实现会简单很多。我们总结一下读锁和写锁的关系:
- 调用
sync.RWMutex.Lock
尝试获取写锁时;- 每次
sync.RWMutex.RUnlock
都会将readerCount
其减一,当它归零时该 Goroutine 会获得写锁; - 将
readerCount
减少rwmutexMaxReaders
个数以阻塞后续的读操作;
- 每次
- 调用
sync.RWMutex.Unlock
释放写锁时,会先通知所有的读操作,然后才会释放持有的互斥锁;
读写互斥锁在互斥锁之上提供了额外的更细粒度的控制,能够在读操作远远多于写操作时提升性能。
WaitGroup
sync.WaitGroup
可以等待一组 Goroutine 的返回,一个比较常见的使用场景是批量发出 RPC 或者 HTTP 请求:
requests := []*Request{...}
wg := &sync.WaitGroup{}
wg.Add(len(requests))
for _, request := range requests {
go func(r *Request) {
defer wg.Done()
// res, err := service.call(r)
}(request)
}
wg.Wait()
我们可以通过 sync.WaitGroup
将原本顺序执行的代码在多个 Goroutine 中并发执行,加快程序处理的速度。
结构体
type WaitGroup struct {
noCopy noCopy
state1 [3]uint32
}
-
noCopy
— 保证sync.WaitGroup
不会被开发者通过再赋值的方式拷贝; -
state1
— 存储着状态和信号量;
sync.noCopy
是一个特殊的私有结构体,tools/go/analysis/passes/copylock
包中的分析器会在编译期间检查被拷贝的变量中是否包含 [sync.noCopy
]或者实现了 Lock
和 Unlock
方法,如果包含该结构体或者实现了对应的方法就会报出以下错误:
func main() {
wg := sync.WaitGroup{}
yawg := wg
fmt.Println(wg, yawg)
}
$ go vet proc.go
./prog.go:10:10: assignment copies lock value to yawg: sync.WaitGroup
./prog.go:11:14: call of fmt.Println copies lock value: sync.WaitGroup
./prog.go:11:18: call of fmt.Println copies lock value: sync.WaitGroup
这段代码会因为变量赋值或者调用函数时发生值拷贝导致分析器报错。
接口
sync.WaitGroup
对外暴露了三个方法 — sync.WaitGroup.Add
、sync.WaitGroup.Wait
和 sync.WaitGroup.Done
,其中的 sync.WaitGroup.Done
只是向 sync.WaitGroup.Add
方法传入了 -1。
通过对 sync.WaitGroup
的分析和研究,我们能够得出以下结论:
小结
-
sync.WaitGroup
必须在sync.WaitGroup.Wait
方法返回之后才能被重新使用; -
sync.WaitGroup.Done
只是对sync.WaitGroup.Add
方法的简单封装,我们可以向sync.WaitGroup.Add
方法传入任意负数(需要保证计数器非负)快速将计数器归零以唤醒等待的 Goroutine; - 可以同时有多个 Goroutine 等待当前
sync.WaitGroup
计数器的归零,这些 Goroutine 会被同时唤醒;
Once
Go 语言标准库中 sync.Once
可以保证在 Go 程序运行期间的某段代码只会执行一次。在运行如下所示的代码时,我们会看到如下所示的运行结果:
func main() {
o := &sync.Once{}
for i := 0; i < 10; i++ {
o.Do(func() {
fmt.Println("only once")
})
}
}
$ go run main.go
only once
结构体
type Once struct {
done uint32
m Mutex
}
作为用于保证函数执行次数的 sync.Once
结构体,它使用互斥锁和 sync/atomic
包提供的方法实现了某个函数在程序运行期间只能执行一次的语义。在使用该结构体时,我们也需要注意以下的问题:
-
sync.Once.Do
方法中传入的函数只会被执行一次,哪怕函数中发生了panic
; - 两次调用
sync.Once.Do
方法传入不同的函数只会执行第一次调传入的函数;
Cond
Go 语言标准库中还包含条件变量 sync.Cond
,它可以让一组的 Goroutine 都在满足特定条件时被唤醒。每一个 sync.Cond
结构体在初始化时都需要传入一个互斥锁,我们可以通过下面的例子了解它的使用方法:
var status int64
func main() {
c := sync.NewCond(&sync.Mutex{})
for i := 0; i < 10; i++ {
go listen(c)
}
time.Sleep(1 * time.Second)
go broadcast(c)
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
<-ch
}
func broadcast(c *sync.Cond) {
c.L.Lock()
atomic.StoreInt64(&status, 1)
c.Broadcast()
c.L.Unlock()
}
func listen(c *sync.Cond) {
c.L.Lock()
for atomic.LoadInt64(&status) != 1 {
c.Wait()
}
fmt.Println("listen")
c.L.Unlock()
}
$ go run main.go
listen
...
listen
上述代码同时运行了 11 个 Goroutine,这 11 个 Goroutine 分别做了不同事情:
- 10 个 Goroutine 通过
sync.Cond.Wait
等待特定条件的满足; - 1 个 Goroutine 会调用
sync.Cond.Broadcast
唤醒所有陷入等待的 Goroutine;
调用 sync.Cond.Broadcast
方法后,上述代码会打印出 10 次 “listen” 并结束调用。
结构体
type Cond struct {
noCopy noCopy
L Locker
notify notifyList
checker copyChecker
}
- noCopy — 用于保证结构体不会在编译期间拷贝;
- copyChecker — 用于禁止运行期间发生的拷贝;
- L — 用于保护内部的 notify 字段,Locker 接口类型的变量;
- notify — 一个 Goroutine 的链表,它是实现同步机制的核心结构;
结构
sync.Cond
对外暴露的 sync.Cond.Wait
方法会将当前 Goroutine 陷入休眠状态
sync.Cond.Signal
和 sync.Cond.Broadcast
就是用来唤醒陷入休眠的 Goroutine 的方法
-
sync.Cond.Signal
方法会唤醒队列最前面的 Goroutine; -
sync.Cond.Broadcast
方法会唤醒队列中全部的 Goroutine;
Goroutine 的唤醒顺序也是按照加入队列的先后顺序,先加入的会先被唤醒,而后加入的可能 Goroutine 需要等待调度器的调度。
在一般情况下,我们都会先调用 sync.Cond.Wait
陷入休眠等待满足期望条件,当满足唤醒条件时,就可以选择使用 sync.Cond.Signal
或者 sync.Cond.Broadcast
唤醒一个或者全部的 Goroutine。
小结
sync.Cond
不是一个常用的同步机制,但是在条件长时间无法满足时,与使用 for {}
进行忙碌等待相比,sync.Cond
能够让出处理器的使用权,提高 CPU 的利用率。使用时我们也需要注意以下问题:
-
sync.Cond.Wait
在调用之前一定要使用获取互斥锁,否则会触发程序崩溃; -
sync.Cond.Signal
唤醒的 Goroutine 都是队列最前面、等待最久的 Goroutine; -
sync.Cond.Broadcast
会按照一定顺序广播通知等待的全部 Goroutine;
扩展原语
ErrGroup
golang/sync/errgroup.Group
为我们在一组 Goroutine 中提供了同步、错误传播以及上下文取消的功能,我们可以使用如下所示的方式并行获取网页的数据:
var g errgroup.Group
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
}
for i := range urls {
url := urls[i]
g.Go(func() error {
resp, err := http.Get(url)
if err == nil {
resp.Body.Close()
}
return err
})
}
if err := g.Wait(); err == nil {
fmt.Println("Successfully fetched all URLs.")
}
golang/sync/errgroup.Group.Go
方法能够创建一个 Goroutine 并在其中执行传入的函数,而 golang/sync/errgroup.Group.Wait
会等待所有 Goroutine 全部返回,该方法的不同返回结果也有不同的含义:
- 如果返回错误 — 这一组 Goroutine 最少返回一个错误;
- 如果返回空值 — 所有 Goroutine 都成功执行;
结构体
type Group struct {
cancel func()
wg sync.WaitGroup
errOnce sync.Once
err error
}
-
cancel
— 创建context.Context
时返回的取消函数,用于在多个 Goroutine 之间同步取消信号; -
wg
— 用于等待一组 Goroutine 完成子任务的同步原语; -
errOnce
— 用于保证只接收一个子任务返回的错误;
小结
golang/sync/errgroup.Group
的实现没有涉及底层和运行时包中的 API,它只是对基本同步语义进行了封装以提供更加复杂的功能。我们在使用时也需要注意下面几个问题:
-
golang/sync/errgroup.Group
在出现错误或者等待结束后会调用context.Context
的cancel
方法同步取消信号; - 只有第一个出现的错误才会被返回,剩余的错误会被直接丢弃;
Semaphore
信号量是在并发编程中常见的一种同步机制,在需要控制访问资源的进程数量时就会用到信号量,它会保证持有的计数器在 0 到初始化的权重之间波动。
- 每次获取资源时都会将信号量中的计数器减去对应的数值,在释放时重新加回来;
- 当遇到计数器大于信号量大小时,会进入休眠等待其他线程释放信号;
Go 语言的扩展包中就提供了带权重的信号量 golang/sync/semaphore.Weighted
,我们可以按照不同的权重对资源的访问进行管理,这个结构体对外也只暴露了四个方法:
-
golang/sync/semaphore.NewWeighted
用于创建新的信号量; -
golang/sync/semaphore.Weighted.Acquire
阻塞地获取指定权重的资源,如果当前没有空闲资源,会陷入休眠等待; -
golang/sync/semaphore.Weighted.TryAcquire
非阻塞地获取指定权重的资源,如果当前没有空闲资源,会直接返回false
; -
golang/sync/semaphore.Weighted.Release
用于释放指定权重的资源;
SingleFlight
golang/sync/singleflight.Group
是 Go 语言扩展包中提供了另一种同步原语,它能够在一个服务中抑制对下游的多次重复请求。一个比较常见的使用场景是:我们在使用 Redis 对数据库中的数据进行缓存,发生缓存击穿时,大量的流量都会打到数据库上进而影响服务的尾延时。
但是 golang/sync/singleflight.Group
能有效地解决这个问题,它能够限制对同一个键值对的多次重复请求,减少对下游的瞬时流量。
type service struct {
requestGroup singleflight.Group
}
func (s *service) handleRequest(ctx context.Context, request Request) (Response, error) {
v, err, _ := requestGroup.Do(request.Hash(), func() (interface{}, error) {
rows, err := // select * from tables
if err != nil {
return nil, err
}
return rows, nil
})
if err != nil {
return nil, err
}
return Response{
rows: rows,
}, nil
}
小结
本文介绍了 Go 语言标准库中提供的基本原语以及扩展包中的扩展原语,这些并发编程的原语能够帮助我们更好地利用 Go 语言的特性构建高吞吐量、低延时的服务、解决并发带来的问题,希望能对你有所帮助。