Golang-RWMutex


sync.RWMutex

介绍

读写互斥锁,可以由多个读者或单个写者持有。RWMutex的零值是未锁定的互斥锁。

  1. 结构体及其常量

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    type RWMutex struct {
    w Mutex // 互斥锁
    writerSem uint32 // 写操作等待读操作完成的信号量
    readerSem uint32 // 读操作等待写操作完成的信号量
    readerCount int32 // 读锁计数器
    readerWait int32 // 获取写锁时当前需要等待的读锁释放数量
    }

    // 最大只支持 1 << 30 个读锁
    const rwmutexMaxReaders = 1 << 30
  2. 四种方法

    1
    2
    3
    4
    func (rw *RWMutex) Lock         // 提供写锁加锁操作
    func (rw *RWMutex) RLock // 提供读锁加锁操作
    func (rw *RWMutex) RUnlock // 提供读锁解锁操作
    func (rw *RWMutex) Unlock // 提供写锁解锁操作

    读操作可并发重入,写操作是互斥的,读写锁通常用互斥锁、条件变量、信号量实现。

场景&实现

  1. 写操作互斥写操作

    读写锁包含一个互斥锁,写锁定必须先获取该互斥锁,如果互斥锁已被协程A获取(或者协程A在阻塞等待读结束),意味着协程A获取了互斥锁,那么协程B只能阻塞等待该互斥锁。

  2. 写操作互斥读操作

    通过RWMutex.readerCount表示读者数量,不考虑写操作的情况下,每次读写锁将该值+1,每次解除读锁定将该值-1,所以readerCount取值为[0,N],N为读者个数,实际上最大可支持2^30个并发读者。

    写操作将readerCount变成负数来阻止读操作,当写锁定时将readerCount减去2^30,从而readerCount变成负值,此时再有读锁定检测到readerCount为负值,则开始阻塞等待。真实的读操作个数并不会丢失,只需要将readerCount加上2^30即可。

  3. 读操作互斥写操作

    读锁定会先将readerCount加1,此时写操作来是发现读者数量不为0,会阻塞等待所有读操作结束。

  4. 写锁定不会被饿死

    写操作要等待读操作结束后才可以获得锁,写操作等待期间可能还有新的读操作持续到来,如果写操作等待所有读操作结束,很可能被饿死。

    通过RWMutex.readerWait解决,在写操作到来是,会把RWMutex.readerCount值拷贝到RWMutex.readerWait中,用于标记排在写操作前面的读者个数。前面的读操作结束后,除了会递减readerCount,还会递减readerWait值,当readerWait值变为0时唤醒写操作。写操作相当于把一段连续的读划分为两部分,前面的读操作结束后唤醒写操作,写操作结束后唤醒后面的读操作。

源码实现

  1. 写锁加锁 Lock()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    func (rw *RWMutex) Lock() {
    // 竞态检测
    if race.Enabled {
    _ = rw.w.state
    race.Disable()
    }
    // 1.使用 Mutex 锁,解决与其他写者的竞争
    rw.w.Lock()

    // 2.判断当前是否存在读锁:先通过原子操作改变readerCount(readerCount-rwmutexMaxReaders),
    // 使其变为负数,告诉 RUnLock 当前存在写锁等待;
    // 然后再加回 rwmutexMaxReaders 并赋给r,若r仍然不为0, 代表当前还有读锁
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders

    // 3.如果仍然有其他 Goroutine 持有互斥锁的读锁(r != 0)
    // 会先将 readerCount 的值加到 readerWait中,防止源源不断的读者进来导致写锁饿死,
    // 然后该 Goroutine 会调用 sync.runtime_SemacquireMutex 进入休眠状态,
    // 并等待所有读锁所有者执行结束后释放 writerSem 信号量将当前协程唤醒。
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
    // 阻塞写锁
    runtime_SemacquireMutex(&rw.writerSem, false, 0)
    }
    // 竞态检测
    if race.Enabled {
    race.Enable()
    race.Acquire(unsafe.Pointer(&rw.readerSem))
    race.Acquire(unsafe.Pointer(&rw.writerSem))
    }
    }
    1. 使用 sync.Mutex 中的互斥锁 sync.Mutex.Lock() 先解决与其他写者的竞争问题;
    2. 判断当前是否存在读锁:先通过原子操作改变readerCount(readerCount-rwmutexMaxReaders),使其变为负数,告诉 RUnLock 当前存在写锁等待,然后再加回rwmutexMaxReaders 并赋给r,若r仍然不为0,代表当前还有读锁
    3. 判断是否还有其他 Goroutine 持有RWMutex互斥锁的读锁(r != 0),如果有则会先将当前的 readerCount 的数量加到 readerWait中,从而防止后面源源不断的读者请求读锁,从而进来导致写锁饿死的情况发生,然后该 Goroutine 会调用 sync.runtime_SemacquireMutex 进入休眠状态,并等待当前持有读锁的 Goroutine 结束后释放 writerSem 信号量将当前 Goroutine 唤醒。
  2. 写锁释放 UnLock()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    func (rw *RWMutex) Unlock() {
    // 竞态检测
    if race.Enabled {
    _ = rw.w.state
    race.Release(unsafe.Pointer(&rw.readerSem))
    race.Disable()
    }
    // 1.释放读锁:通过调用 atomic.AddInt32 函数将 readerCount 加上 rwmutexMaxReaders 从而变回正数;;
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
    // 若超过读锁的最大限制, 触发panic
    if r >= rwmutexMaxReaders {
    race.Enable()
    throw("sync: Unlock of unlocked RWMutex")
    }
    // 2.通过 for 循环触发所有由于获取读锁而陷入等待的 Goroutine,也即解除阻塞的读锁(若有)
    for i := 0; i < int(r); i++ {
    runtime_Semrelease(&rw.readerSem, false, 0)
    }
    // 3.调用 sync.Mutex.Unlock 方法释放写互斥锁
    rw.w.Unlock()

    // 是否开启竞态检测
    if race.Enabled {
    race.Enable()
    }
    }
    1. 释放读锁:通过调用 atomic.AddInt32 函数将 readerCount 加上 rwmutexMaxReaders 从而变回正数;
    2. 通过 for 循环触发所有由于获取读锁而陷入等待的 Goroutine,也即解除阻塞的读锁(若有);
    3. 调用 sync.Mutex.Unlock() 方法释放写互斥锁。
  3. 读锁加锁 RLock()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    func (rw *RWMutex) RLock() {
    // 是否开启检测race
    if race.Enabled {
    _ = rw.w.state
    race.Disable()
    }
    //这里分两种情况:
    // 1.此时无写锁 (readerCount + 1) > 0,那么可以上读锁, 并且readerCount原子加1(读锁可重入[只要匹配了释放次数就行])
    // 2.此时有写锁 (readerCount + 1) < 0,所以通过readerSem读信号量, 使读操作睡眠等待
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
    // 当前有个写锁, 读操作需要阻塞等待写锁释放;
    // 其实做的事情是将 goroutine 排到G队列的后面,挂起 goroutine
    runtime_SemacquireMutex(&rw.readerSem, false, 0)
    }
    // 是否开启检测race
    if race.Enabled {
    race.Enable()
    race.Acquire(unsafe.Pointer(&rw.readerSem))
    }
    }
    1. 此时无写锁 (readerCount + 1) > 0(注意,在写锁是加锁那里,我们对readerCount 进行了readerCount-rwmutexMaxReaders处理),那么可以上读锁, 并且readerCount原子加1(读锁可重入[只要匹配了释放次数就行]);
    2. 此时有写锁 (readerCount + 1) < 0,所以通过readerSem读信号量, 使读操作睡眠等待;
  4. 读锁释放 RUnlock()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    func (rw *RWMutex) RUnlock() {
    // 竞态检测
    if race.Enabled {
    _ = rw.w.state
    race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
    race.Disable()
    }
    // 写锁等待状态,检查当前是否可以进行获取;
    // 首先将 readerCount 减1并赋予r,然后分两种情况判断
    // 1.若r大于等于0,读锁直接解锁成功,直接结束本次操作;
    // 2.若r小于0,有一个正在执行的写操作,在这时会调用sync.RWMutex.rUnlockSlow 方法;
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
    // Outlined slow-path to allow the fast-path to be inlined
    rw.rUnlockSlow(r)
    }
    if race.Enabled {
    race.Enable()
    }
    }

    func (rw *RWMutex) rUnlockSlow(r int32) {
    // r + 1 == 0 表示本来就没读锁, 直接执行RUnlock()
    // r + 1 == -rwmutexMaxReaders 表示执行Lock()再执行RUnlock()
    if r+1 == 0 || r+1 == -rwmutexMaxReaders {
    race.Enable()
    throw("sync: RUnlock of unlocked RWMutex")
    }
    // 如果当前有写锁等待,则减少一个readerWait的数目
    if atomic.AddInt32(&rw.readerWait, -1) == 0 {
    // 写锁前的最后一个读锁唤醒写锁执行
    runtime_Semrelease(&rw.writerSem, false, 1)
    }
    }
    1. 首先readerCount 减1,然后进行两种情况的判断:

      1. 若 r 大于等于0,读锁直接解锁成功,直接结束本次操作;
      2. 若 r 小于0, 有一个正在执行的写操作,在这时会调用sync.RWMutex.rUnlockSlow 方法;
    2. 然后倘若上面判断 r 小于0,则进入 rUnlockSlow() 慢解锁,先进行一个判断,若有以下两种情况发生:r + 1 == 0表示直接执行RUnlock()r + 1 == -rwmutexMaxReaders表示执行 Lock() 再执行 RUnlock(),这两种情况都会进行报错。

    3. 如果没有上述两种情况发生,则sync.RWMutex.rUnlockSlow 会减少获取锁的写操作等待的读操作数 readerWait ,并在所有读操作都被释放之后触发写操作的信号量 writerSem,该信号量被触发时,调度器就会唤醒尝试获取写锁的 Goroutine


  目录