sync.RWMutex
介绍
读写互斥锁,可以由多个读者或单个写者持有。RWMutex的零值是未锁定的互斥锁。
结构体及其常量
1
2
3
4
5
6
7
8
9
10type RWMutex struct {
w Mutex // 互斥锁
writerSem uint32 // 写操作等待读操作完成的信号量
readerSem uint32 // 读操作等待写操作完成的信号量
readerCount int32 // 读锁计数器
readerWait int32 // 获取写锁时当前需要等待的读锁释放数量
}
// 最大只支持 1 << 30 个读锁
const rwmutexMaxReaders = 1 << 30四种方法
1
2
3
4func (rw *RWMutex) Lock // 提供写锁加锁操作
func (rw *RWMutex) RLock // 提供读锁加锁操作
func (rw *RWMutex) RUnlock // 提供读锁解锁操作
func (rw *RWMutex) Unlock // 提供写锁解锁操作读操作可并发重入,写操作是互斥的,读写锁通常用互斥锁、条件变量、信号量实现。
场景&实现
写操作互斥写操作
读写锁包含一个互斥锁,写锁定必须先获取该互斥锁,如果互斥锁已被协程A获取(或者协程A在阻塞等待读结束),意味着协程A获取了互斥锁,那么协程B只能阻塞等待该互斥锁。
写操作互斥读操作
通过RWMutex.readerCount表示读者数量,不考虑写操作的情况下,每次读写锁将该值+1,每次解除读锁定将该值-1,所以readerCount取值为[0,N],N为读者个数,实际上最大可支持2^30个并发读者。
写操作将readerCount变成负数来阻止读操作,当写锁定时将readerCount减去2^30,从而readerCount变成负值,此时再有读锁定检测到readerCount为负值,则开始阻塞等待。真实的读操作个数并不会丢失,只需要将readerCount加上2^30即可。
读操作互斥写操作
读锁定会先将readerCount加1,此时写操作来是发现读者数量不为0,会阻塞等待所有读操作结束。
写锁定不会被饿死
写操作要等待读操作结束后才可以获得锁,写操作等待期间可能还有新的读操作持续到来,如果写操作等待所有读操作结束,很可能被饿死。
通过RWMutex.readerWait解决,在写操作到来是,会把RWMutex.readerCount值拷贝到RWMutex.readerWait中,用于标记排在写操作前面的读者个数。前面的读操作结束后,除了会递减readerCount,还会递减readerWait值,当readerWait值变为0时唤醒写操作。写操作相当于把一段连续的读划分为两部分,前面的读操作结束后唤醒写操作,写操作结束后唤醒后面的读操作。
源码实现
写锁加锁 Lock()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29func (rw *RWMutex) Lock() {
// 竞态检测
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// 1.使用 Mutex 锁,解决与其他写者的竞争
rw.w.Lock()
// 2.判断当前是否存在读锁:先通过原子操作改变readerCount(readerCount-rwmutexMaxReaders),
// 使其变为负数,告诉 RUnLock 当前存在写锁等待;
// 然后再加回 rwmutexMaxReaders 并赋给r,若r仍然不为0, 代表当前还有读锁
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// 3.如果仍然有其他 Goroutine 持有互斥锁的读锁(r != 0)
// 会先将 readerCount 的值加到 readerWait中,防止源源不断的读者进来导致写锁饿死,
// 然后该 Goroutine 会调用 sync.runtime_SemacquireMutex 进入休眠状态,
// 并等待所有读锁所有者执行结束后释放 writerSem 信号量将当前协程唤醒。
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
// 阻塞写锁
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
// 竞态检测
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}- 使用
sync.Mutex
中的互斥锁sync.Mutex.Lock()
先解决与其他写者的竞争问题; - 判断当前是否存在读锁:先通过原子操作改变
readerCount(readerCount-rwmutexMaxReaders)
,使其变为负数,告诉 RUnLock 当前存在写锁等待,然后再加回rwmutexMaxReaders
并赋给r,若r仍然不为0,代表当前还有读锁 - 判断是否还有其他
Goroutine
持有RWMutex
互斥锁的读锁(r != 0),如果有则会先将当前的readerCount
的数量加到readerWait
中,从而防止后面源源不断的读者请求读锁,从而进来导致写锁饿死的情况发生,然后该Goroutine
会调用sync.runtime_SemacquireMutex
进入休眠状态,并等待当前持有读锁的Goroutine
结束后释放writerSem
信号量将当前Goroutine
唤醒。
- 使用
写锁释放 UnLock()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26func (rw *RWMutex) Unlock() {
// 竞态检测
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}
// 1.释放读锁:通过调用 atomic.AddInt32 函数将 readerCount 加上 rwmutexMaxReaders 从而变回正数;;
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
// 若超过读锁的最大限制, 触发panic
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// 2.通过 for 循环触发所有由于获取读锁而陷入等待的 Goroutine,也即解除阻塞的读锁(若有)
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// 3.调用 sync.Mutex.Unlock 方法释放写互斥锁
rw.w.Unlock()
// 是否开启竞态检测
if race.Enabled {
race.Enable()
}
}- 释放读锁:通过调用
atomic.AddInt32
函数将readerCount
加上rwmutexMaxReaders
从而变回正数; - 通过 for 循环触发所有由于获取读锁而陷入等待的 Goroutine,也即解除阻塞的读锁(若有);
- 调用
sync.Mutex.Unlock()
方法释放写互斥锁。
- 释放读锁:通过调用
读锁加锁 RLock()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20func (rw *RWMutex) RLock() {
// 是否开启检测race
if race.Enabled {
_ = rw.w.state
race.Disable()
}
//这里分两种情况:
// 1.此时无写锁 (readerCount + 1) > 0,那么可以上读锁, 并且readerCount原子加1(读锁可重入[只要匹配了释放次数就行])
// 2.此时有写锁 (readerCount + 1) < 0,所以通过readerSem读信号量, 使读操作睡眠等待
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// 当前有个写锁, 读操作需要阻塞等待写锁释放;
// 其实做的事情是将 goroutine 排到G队列的后面,挂起 goroutine
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
// 是否开启检测race
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}- 此时无写锁
(readerCount + 1) > 0
(注意,在写锁是加锁那里,我们对readerCount 进行了readerCount-rwmutexMaxReaders
处理),那么可以上读锁, 并且readerCount
原子加1(读锁可重入[只要匹配了释放次数就行]); - 此时有写锁
(readerCount + 1) < 0,
所以通过readerSem
读信号量, 使读操作睡眠等待;
- 此时无写锁
读锁释放 RUnlock()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33func (rw *RWMutex) RUnlock() {
// 竞态检测
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
// 写锁等待状态,检查当前是否可以进行获取;
// 首先将 readerCount 减1并赋予r,然后分两种情况判断
// 1.若r大于等于0,读锁直接解锁成功,直接结束本次操作;
// 2.若r小于0,有一个正在执行的写操作,在这时会调用sync.RWMutex.rUnlockSlow 方法;
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
if race.Enabled {
race.Enable()
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
// r + 1 == 0 表示本来就没读锁, 直接执行RUnlock()
// r + 1 == -rwmutexMaxReaders 表示执行Lock()再执行RUnlock()
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// 如果当前有写锁等待,则减少一个readerWait的数目
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// 写锁前的最后一个读锁唤醒写锁执行
runtime_Semrelease(&rw.writerSem, false, 1)
}
}首先readerCount 减1,然后进行两种情况的判断:
- 若 r 大于等于0,读锁直接解锁成功,直接结束本次操作;
- 若 r 小于0, 有一个正在执行的写操作,在这时会调用
sync.RWMutex.rUnlockSlow
方法;
然后倘若上面判断 r 小于0,则进入
rUnlockSlow()
慢解锁,先进行一个判断,若有以下两种情况发生:r + 1 == 0
表示直接执行RUnlock()
,r + 1 == -rwmutexMaxReaders
表示执行Lock()
再执行RUnlock()
,这两种情况都会进行报错。如果没有上述两种情况发生,则
sync.RWMutex.rUnlockSlow
会减少获取锁的写操作等待的读操作数readerWait
,并在所有读操作都被释放之后触发写操作的信号量writerSem
,该信号量被触发时,调度器就会唤醒尝试获取写锁的Goroutine
。