RWMutex
基于go 1.13源码
总的来说读写锁就是利用互斥锁和CAS维护2个关于读锁的变量以及runtime的2个信号量,来实现「当存在读锁时,读操作可以继续加锁,写操作会堵塞;当存在写锁时,读/写操作均会堵塞」的机制。
下面是具体分析,略去竞态分析和runtime的信号量P/V操作的代码。
结构
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}
方法
-
RLock(获取读锁)
每当有协程需要获取读锁的时候,就将readerCount + 1
当readerCount + 1之后的值 < 0的时候(即有写操作在持有锁),那么将会调用runtime_Semacquire方法,等待写锁释放
当readerCount + 1之后的值 >0(说明只有读操作持有锁),锁可以返回。
-
RUnlock
func (rw *RWMutex) RUnlock() { if race.Enabled { _ = rw.w.state race.ReleaseMerge(unsafe.Pointer(&rw.writerSem)) race.Disable() } if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { // Outlined slow-path to allow the fast-path to be inlined rw.rUnlockSlow(r) } if race.Enabled { race.Enable() } } func (rw *RWMutex) rUnlockSlow(r int32) { if r+1 == 0 || r+1 == -rwmutexMaxReaders { race.Enable() throw("sync: RUnlock of unlocked RWMutex") } // A writer is pending. if atomic.AddInt32(&rw.readerWait, -1) == 0 { // The last reader unblocks the writer. runtime_Semrelease(&rw.writerSem, false, 1) } }
CAS操作 readerCount-1,(判断是否没有获取读锁就想去释放读锁或者获取写锁释放读锁,直接抛异常。)如果小于0,证明确实有协程正在想要获取写锁,那么就需要CAS操作我们因Lock操作时写入的readerWait,将其值 -1,
当readerWait减到0的时候就证明没有人正在持有读锁了,就通过信号量writerSem的变化告知刚才等待的协程(想要获取写锁的协程)
当readerWait大于0时,结束流程。
-
Lock
// First, resolve competition with other writers. rw.w.Lock() // Announce to readers there is a pending writer. r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders // Wait for active readers. if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { runtime_SemacquireMutex(&rw.writerSem, false, 0) }
首先操作最前面说的互斥锁(成员变量w)加锁,目的就是处理多个写锁并发的情况,因为我们知道写锁只有一把。
然后如果写锁w可以正常获取,就执行atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) 是将当前的readerCount减去一个非常大的值rwmutexMaxReaders,在加上rwmutexMaxReaders作为r,如果r不为0(表示原来有协程已经获取到了读锁)并且将readerWait加上readerCount(表示需要等待readerCount这么多个读锁进行解锁),如果满足上述条件证明原来有读锁,所以暂时没有办法获取到写锁,所以调用runtime_Semacquire进行等待,等待的信号量为writerSem
-
Unlock
func (rw *RWMutex) Unlock() { if race.Enabled { _ = rw.w.state race.Release(unsafe.Pointer(&rw.readerSem)) race.Disable() } // Announce to readers there is no active writer. r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) if r >= rwmutexMaxReaders { race.Enable() throw("sync: Unlock of unlocked RWMutex") } // Unblock blocked readers, if any. for i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false, 0) } // Allow other writers to proceed. rw.w.Unlock() if race.Enabled { race.Enable() } }
写锁释放需要恢复readerCount,还记得上锁的时候减了一个很大的数,这个时候要加回来了。当然加完之后如果>=rwmutexMaxReaders本身,是因为没有获取写锁的时候就开始想着释放写锁了。
然后for循环就是为了通知所有在我们RLock方法中看到的,当有因为持有写锁所以等待的那些协程,通过信号量readerSem告诉他们可以动了。
最后别忘记还有一个互斥锁需要释放,让别的协程也可以开始抢写锁了。