sync.RWMutex
介绍
读写互斥锁,可以由多个读者或单个写者持有。RWMutex的零值是未锁定的互斥锁。
结构体及其常量
type RWMutex struct { w Mutex // 互斥锁 writerSem uint32 // 写操作等待读操作完成的信号量 readerSem uint32 // 读操作等待写操作完成的信号量 readerCount int32 // 读锁计数器 readerWait int32 // 获取写锁时当前需要等待的读锁释放数量 } // 最大只支持 1 << 30 个读锁 const rwmutexMaxReaders = 1 << 30
四种方法
func (rw *RWMutex) Lock // 提供写锁加锁操作 func (rw *RWMutex) RLock // 提供读锁加锁操作 func (rw *RWMutex) RUnlock // 提供读锁解锁操作 func (rw *RWMutex) Unlock // 提供写锁解锁操作
读操作可并发重入,写操作是互斥的,读写锁通常用互斥锁、条件变量、信号量实现。
场景&实现
写操作互斥写操作
读写锁包含一个互斥锁,写锁定必须先获取该互斥锁,如果互斥锁已被协程A获取(或者协程A在阻塞等待读结束),意味着协程A获取了互斥锁,那么协程B只能阻塞等待该互斥锁。
写操作互斥读操作
通过RWMutex.readerCount表示读者数量,不考虑写操作的情况下,每次读写锁将该值+1,每次解除读锁定将该值-1,所以readerCount取值为[0,N],N为读者个数,实际上最大可支持2^30个并发读者。
写操作将readerCount变成负数来阻止读操作,当写锁定时将readerCount减去2^30,从而readerCount变成负值,此时再有读锁定检测到readerCount为负值,则开始阻塞等待。真实的读操作个数并不会丢失,只需要将readerCount加上2^30即可。
读操作互斥写操作
读锁定会先将readerCount加1,此时写操作来是发现读者数量不为0,会阻塞等待所有读操作结束。
写锁定不会被饿死
写操作要等待读操作结束后才可以获得锁,写操作等待期间可能还有新的读操作持续到来,如果写操作等待所有读操作结束,很可能被饿死。
通过RWMutex.readerWait解决,在写操作到来是,会把RWMutex.readerCount值拷贝到RWMutex.readerWait中,用于标记排在写操作前面的读者个数。前面的读操作结束后,除了会递减readerCount,还会递减readerWait值,当readerWait值变为0时唤醒写操作。写操作相当于把一段连续的读划分为两部分,前面的读操作结束后唤醒写操作,写操作结束后唤醒后面的读操作。
源码实现
写锁加锁 Lock()
func (rw *RWMutex) Lock() { // 竞态检测 if race.Enabled { _ = rw.w.state race.Disable() } // 1.使用 Mutex 锁,解决与其他写者的竞争 rw.w.Lock() // 2.判断当前是否存在读锁:先通过原子操作改变readerCount(readerCount-rwmutexMaxReaders), // 使其变为负数,告诉 RUnLock 当前存在写锁等待; // 然后再加回 rwmutexMaxReaders 并赋给r,若r仍然不为0, 代表当前还有读锁 r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders // 3.如果仍然有其他 Goroutine 持有互斥锁的读锁(r != 0) // 会先将 readerCount 的值加到 readerWait中,防止源源不断的读者进来导致写锁饿死, // 然后该 Goroutine 会调用 sync.runtime_SemacquireMutex 进入休眠状态, // 并等待所有读锁所有者执行结束后释放 writerSem 信号量将当前协程唤醒。 if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { // 阻塞写锁 runtime_SemacquireMutex(&rw.writerSem, false, 0) } // 竞态检测 if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(&rw.readerSem)) race.Acquire(unsafe.Pointer(&rw.writerSem)) } }
- 使用
sync.Mutex
中的互斥锁sync.Mutex.Lock()
先解决与其他写者的竞争问题; - 判断当前是否存在读锁:先通过原子操作改变
readerCount(readerCount-rwmutexMaxReaders)
,使其变为负数,告诉 RUnLock 当前存在写锁等待,然后再加回rwmutexMaxReaders
并赋给r,若r仍然不为0,代表当前还有读锁 - 判断是否还有其他
Goroutine
持有RWMutex
互斥锁的读锁(r != 0),如果有则会先将当前的readerCount
的数量加到readerWait
中,从而防止后面源源不断的读者请求读锁,从而进来导致写锁饿死的情况发生,然后该Goroutine
会调用sync.runtime_SemacquireMutex
进入休眠状态,并等待当前持有读锁的Goroutine
结束后释放writerSem
信号量将当前Goroutine
唤醒。
- 使用
写锁释放 UnLock()
func (rw *RWMutex) Unlock() { // 竞态检测 if race.Enabled { _ = rw.w.state race.Release(unsafe.Pointer(&rw.readerSem)) race.Disable() } // 1.释放读锁:通过调用 atomic.AddInt32 函数将 readerCount 加上 rwmutexMaxReaders 从而变回正数;; r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) // 若超过读锁的最大限制, 触发panic if r >= rwmutexMaxReaders { race.Enable() throw("sync: Unlock of unlocked RWMutex") } // 2.通过 for 循环触发所有由于获取读锁而陷入等待的 Goroutine,也即解除阻塞的读锁(若有) for i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false, 0) } // 3.调用 sync.Mutex.Unlock 方法释放写互斥锁 rw.w.Unlock() // 是否开启竞态检测 if race.Enabled { race.Enable() } }
- 释放读锁:通过调用
atomic.AddInt32
函数将readerCount
加上rwmutexMaxReaders
从而变回正数; - 通过 for 循环触发所有由于获取读锁而陷入等待的 Goroutine,也即解除阻塞的读锁(若有);
- 调用
sync.Mutex.Unlock()
方法释放写互斥锁。
- 释放读锁:通过调用
读锁加锁 RLock()
func (rw *RWMutex) RLock() { // 是否开启检测race if race.Enabled { _ = rw.w.state race.Disable() } //这里分两种情况: // 1.此时无写锁 (readerCount + 1) > 0,那么可以上读锁, 并且readerCount原子加1(读锁可重入[只要匹配了释放次数就行]) // 2.此时有写锁 (readerCount + 1) < 0,所以通过readerSem读信号量, 使读操作睡眠等待 if atomic.AddInt32(&rw.readerCount, 1) < 0 { // 当前有个写锁, 读操作需要阻塞等待写锁释放; // 其实做的事情是将 goroutine 排到G队列的后面,挂起 goroutine runtime_SemacquireMutex(&rw.readerSem, false, 0) } // 是否开启检测race if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(&rw.readerSem)) } }
- 此时无写锁
(readerCount + 1) > 0
(注意,在写锁是加锁那里,我们对readerCount 进行了readerCount-rwmutexMaxReaders
处理),那么可以上读锁, 并且readerCount
原子加1(读锁可重入[只要匹配了释放次数就行]); - 此时有写锁
(readerCount + 1) < 0,
所以通过readerSem
读信号量, 使读操作睡眠等待;
- 此时无写锁
读锁释放 RUnlock()
func (rw *RWMutex) RUnlock() { // 竞态检测 if race.Enabled { _ = rw.w.state race.ReleaseMerge(unsafe.Pointer(&rw.writerSem)) race.Disable() } // 写锁等待状态,检查当前是否可以进行获取; // 首先将 readerCount 减1并赋予r,然后分两种情况判断 // 1.若r大于等于0,读锁直接解锁成功,直接结束本次操作; // 2.若r小于0,有一个正在执行的写操作,在这时会调用sync.RWMutex.rUnlockSlow 方法; if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { // Outlined slow-path to allow the fast-path to be inlined rw.rUnlockSlow(r) } if race.Enabled { race.Enable() } } func (rw *RWMutex) rUnlockSlow(r int32) { // r + 1 == 0 表示本来就没读锁, 直接执行RUnlock() // r + 1 == -rwmutexMaxReaders 表示执行Lock()再执行RUnlock() if r+1 == 0 || r+1 == -rwmutexMaxReaders { race.Enable() throw("sync: RUnlock of unlocked RWMutex") } // 如果当前有写锁等待,则减少一个readerWait的数目 if atomic.AddInt32(&rw.readerWait, -1) == 0 { // 写锁前的最后一个读锁唤醒写锁执行 runtime_Semrelease(&rw.writerSem, false, 1) } }
首先readerCount 减1,然后进行两种情况的判断:
- 若 r 大于等于0,读锁直接解锁成功,直接结束本次操作;
- 若 r 小于0, 有一个正在执行的写操作,在这时会调用
sync.RWMutex.rUnlockSlow
方法;
然后倘若上面判断 r 小于0,则进入
rUnlockSlow()
慢解锁,先进行一个判断,若有以下两种情况发生:r + 1 == 0
表示直接执行RUnlock()
,r + 1 == -rwmutexMaxReaders
表示执行Lock()
再执行RUnlock()
,这两种情况都会进行报错。如果没有上述两种情况发生,则
sync.RWMutex.rUnlockSlow
会减少获取锁的写操作等待的读操作数readerWait
,并在所有读操作都被释放之后触发写操作的信号量writerSem
,该信号量被触发时,调度器就会唤醒尝试获取写锁的Goroutine
。