Golang-RWMutex


sync.RWMutex

介绍

读写互斥锁,可以由多个读者或单个写者持有。RWMutex的零值是未锁定的互斥锁。

  1. 结构体及其常量

    type RWMutex struct {
     w           Mutex  // 互斥锁
     writerSem   uint32 // 写操作等待读操作完成的信号量
     readerSem   uint32 // 读操作等待写操作完成的信号量
     readerCount int32  // 读锁计数器
     readerWait  int32  // 获取写锁时当前需要等待的读锁释放数量
    }
    // 最大只支持 1 << 30 个读锁
    const rwmutexMaxReaders = 1 << 30
  2. 四种方法

    func (rw *RWMutex) Lock         // 提供写锁加锁操作
    func (rw *RWMutex) RLock        // 提供读锁加锁操作
    func (rw *RWMutex) RUnlock      // 提供读锁解锁操作
    func (rw *RWMutex) Unlock       // 提供写锁解锁操作

    读操作可并发重入,写操作是互斥的,读写锁通常用互斥锁、条件变量、信号量实现。

场景&实现

  1. 写操作互斥写操作

    读写锁包含一个互斥锁,写锁定必须先获取该互斥锁,如果互斥锁已被协程A获取(或者协程A在阻塞等待读结束),意味着协程A获取了互斥锁,那么协程B只能阻塞等待该互斥锁。

  2. 写操作互斥读操作

    通过RWMutex.readerCount表示读者数量,不考虑写操作的情况下,每次读写锁将该值+1,每次解除读锁定将该值-1,所以readerCount取值为[0,N],N为读者个数,实际上最大可支持2^30个并发读者。

    写操作将readerCount变成负数来阻止读操作,当写锁定时将readerCount减去2^30,从而readerCount变成负值,此时再有读锁定检测到readerCount为负值,则开始阻塞等待。真实的读操作个数并不会丢失,只需要将readerCount加上2^30即可。

  3. 读操作互斥写操作

    读锁定会先将readerCount加1,此时写操作来是发现读者数量不为0,会阻塞等待所有读操作结束。

  4. 写锁定不会被饿死

    写操作要等待读操作结束后才可以获得锁,写操作等待期间可能还有新的读操作持续到来,如果写操作等待所有读操作结束,很可能被饿死。

    通过RWMutex.readerWait解决,在写操作到来是,会把RWMutex.readerCount值拷贝到RWMutex.readerWait中,用于标记排在写操作前面的读者个数。前面的读操作结束后,除了会递减readerCount,还会递减readerWait值,当readerWait值变为0时唤醒写操作。写操作相当于把一段连续的读划分为两部分,前面的读操作结束后唤醒写操作,写操作结束后唤醒后面的读操作。

源码实现

  1. 写锁加锁 Lock()

    func (rw *RWMutex) Lock() {
     // 竞态检测
     if race.Enabled {
         _ = rw.w.state
         race.Disable()
     }
     // 1.使用 Mutex 锁,解决与其他写者的竞争
     rw.w.Lock()
     // 2.判断当前是否存在读锁:先通过原子操作改变readerCount(readerCount-rwmutexMaxReaders),
     // 使其变为负数,告诉 RUnLock 当前存在写锁等待;
     // 然后再加回 rwmutexMaxReaders 并赋给r,若r仍然不为0, 代表当前还有读锁
     r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
     // 3.如果仍然有其他 Goroutine 持有互斥锁的读锁(r != 0)
     // 会先将 readerCount 的值加到 readerWait中,防止源源不断的读者进来导致写锁饿死,
     // 然后该 Goroutine 会调用 sync.runtime_SemacquireMutex 进入休眠状态,
     // 并等待所有读锁所有者执行结束后释放 writerSem 信号量将当前协程唤醒。
     if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
         // 阻塞写锁
         runtime_SemacquireMutex(&rw.writerSem, false, 0)
     }
     // 竞态检测
     if race.Enabled {
         race.Enable()
         race.Acquire(unsafe.Pointer(&rw.readerSem))
         race.Acquire(unsafe.Pointer(&rw.writerSem))
     }
    }
    1. 使用 sync.Mutex 中的互斥锁 sync.Mutex.Lock() 先解决与其他写者的竞争问题;
    2. 判断当前是否存在读锁:先通过原子操作改变readerCount(readerCount-rwmutexMaxReaders),使其变为负数,告诉 RUnLock 当前存在写锁等待,然后再加回rwmutexMaxReaders 并赋给r,若r仍然不为0,代表当前还有读锁
    3. 判断是否还有其他 Goroutine 持有RWMutex互斥锁的读锁(r != 0),如果有则会先将当前的 readerCount 的数量加到 readerWait中,从而防止后面源源不断的读者请求读锁,从而进来导致写锁饿死的情况发生,然后该 Goroutine 会调用 sync.runtime_SemacquireMutex 进入休眠状态,并等待当前持有读锁的 Goroutine 结束后释放 writerSem 信号量将当前 Goroutine 唤醒。
  2. 写锁释放 UnLock()

    func (rw *RWMutex) Unlock() {
     // 竞态检测
     if race.Enabled {
         _ = rw.w.state
         race.Release(unsafe.Pointer(&rw.readerSem))
         race.Disable()
     }
     // 1.释放读锁:通过调用 atomic.AddInt32 函数将 readerCount 加上 rwmutexMaxReaders 从而变回正数;;
     r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
     // 若超过读锁的最大限制, 触发panic
     if r >= rwmutexMaxReaders {
         race.Enable()
         throw("sync: Unlock of unlocked RWMutex")
     }
     // 2.通过 for 循环触发所有由于获取读锁而陷入等待的 Goroutine,也即解除阻塞的读锁(若有)
     for i := 0; i < int(r); i++ {
         runtime_Semrelease(&rw.readerSem, false, 0)
     }
     // 3.调用 sync.Mutex.Unlock 方法释放写互斥锁
     rw.w.Unlock()
     // 是否开启竞态检测
     if race.Enabled {
         race.Enable()
     }
    }
    1. 释放读锁:通过调用 atomic.AddInt32 函数将 readerCount 加上 rwmutexMaxReaders 从而变回正数;
    2. 通过 for 循环触发所有由于获取读锁而陷入等待的 Goroutine,也即解除阻塞的读锁(若有);
    3. 调用 sync.Mutex.Unlock() 方法释放写互斥锁。
  3. 读锁加锁 RLock()

    func (rw *RWMutex) RLock() {
     // 是否开启检测race
     if race.Enabled {
         _ = rw.w.state
         race.Disable()
     }
     //这里分两种情况:
     // 1.此时无写锁 (readerCount + 1) > 0,那么可以上读锁, 并且readerCount原子加1(读锁可重入[只要匹配了释放次数就行])
     // 2.此时有写锁 (readerCount + 1) < 0,所以通过readerSem读信号量, 使读操作睡眠等待
     if atomic.AddInt32(&rw.readerCount, 1) < 0 {
         // 当前有个写锁, 读操作需要阻塞等待写锁释放;
         // 其实做的事情是将 goroutine 排到G队列的后面,挂起 goroutine
         runtime_SemacquireMutex(&rw.readerSem, false, 0)
     }
     // 是否开启检测race
     if race.Enabled {
         race.Enable()
         race.Acquire(unsafe.Pointer(&rw.readerSem))
     }
    }
    1. 此时无写锁 (readerCount + 1) > 0(注意,在写锁是加锁那里,我们对readerCount 进行了readerCount-rwmutexMaxReaders处理),那么可以上读锁, 并且readerCount原子加1(读锁可重入[只要匹配了释放次数就行]);
    2. 此时有写锁 (readerCount + 1) < 0,所以通过readerSem读信号量, 使读操作睡眠等待;
  4. 读锁释放 RUnlock()

    func (rw *RWMutex) RUnlock() {
     // 竞态检测
     if race.Enabled {
         _ = rw.w.state
         race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
         race.Disable()
     }
     // 写锁等待状态,检查当前是否可以进行获取;
     // 首先将 readerCount 减1并赋予r,然后分两种情况判断
     //  1.若r大于等于0,读锁直接解锁成功,直接结束本次操作;
     //  2.若r小于0,有一个正在执行的写操作,在这时会调用sync.RWMutex.rUnlockSlow 方法;
     if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
         // Outlined slow-path to allow the fast-path to be inlined
         rw.rUnlockSlow(r)
     }
     if race.Enabled {
         race.Enable()
     }
    }
    func (rw *RWMutex) rUnlockSlow(r int32) {
     // r + 1 == 0 表示本来就没读锁, 直接执行RUnlock()
     // r + 1 == -rwmutexMaxReaders 表示执行Lock()再执行RUnlock()
     if r+1 == 0 || r+1 == -rwmutexMaxReaders {
         race.Enable()
         throw("sync: RUnlock of unlocked RWMutex")
     }
     // 如果当前有写锁等待,则减少一个readerWait的数目
     if atomic.AddInt32(&rw.readerWait, -1) == 0 {
         // 写锁前的最后一个读锁唤醒写锁执行
         runtime_Semrelease(&rw.writerSem, false, 1)
     }
    }
    1. 首先readerCount 减1,然后进行两种情况的判断:

      1. 若 r 大于等于0,读锁直接解锁成功,直接结束本次操作;
      2. 若 r 小于0, 有一个正在执行的写操作,在这时会调用sync.RWMutex.rUnlockSlow 方法;
    2. 然后倘若上面判断 r 小于0,则进入 rUnlockSlow() 慢解锁,先进行一个判断,若有以下两种情况发生:r + 1 == 0表示直接执行RUnlock()r + 1 == -rwmutexMaxReaders表示执行 Lock() 再执行 RUnlock(),这两种情况都会进行报错。

    3. 如果没有上述两种情况发生,则sync.RWMutex.rUnlockSlow 会减少获取锁的写操作等待的读操作数 readerWait ,并在所有读操作都被释放之后触发写操作的信号量 writerSem,该信号量被触发时,调度器就会唤醒尝试获取写锁的 Goroutine


评论
  目录