Golang-信号量semaphore


semaphore

带权重的信号量在Golang中提供了一种灵活的机制,用于管理对共享资源的并发访问。通过合理使用带权重的信号量,可以更好地平衡并发访问和资源使用,从而提高程序的性能和稳定性

数据结构

// 一个 watier 就表示一个请求,其中n表示这次请求的资源数量
type waiter struct {
    n     int64
    ready chan<- struct{} // Closed when semaphore acquired.
}

type Weighted struct {
  // 最大资源数量,取走时会减少,释放时会增加
    size    int64
  // 记录当前已使用资源数,值范围[0 - size]
    cur     int64
    mu      sync.Mutex
  // 当前处于等待休眠的请求者goroutine,每个请求者请求的资源数量可能不一样,只有在请求时,可用资源数量不足时请求者才会进入请求链表,每个请求者表示一个goroutine
    waiters list.List
}

方法列表

// 创建一类资源,参数 n 资源表示最大可用资源总个数
func NewWeighted(n int64) *Weighted
// 获取指定个数的资源,如果当前没有空闲资源可用,当前请求者goroutine将陷入休眠状态
// 可以一次获取多个资源,如果没有足够多的资源,调用者就会被阻塞。它的第一个参数是 Context,这就意味着,你可以通过 Context 增加超时或者 cancel 的机制。如果是正常获取了资源,就返回 nil;否则,就返回 ctx.Err(),信号量不改变
func (s *Weighted) Acquire(ctx context.Context, n int64) error
// 释放资源
func (s *Weighted) Release(n int64)
// 同 Acquire 一样,但当无空闲资源将直接返回false,而不阻塞
func (s *Weighted) TryAcquire(n int64) bool

Acquire 和 TryAcquire

func (s *Weighted) Acquire(ctx context.Context, n int64) error {
    s.mu.Lock()
    // 如果恰好有足够的资源,也没有排队等待获取资源的goroutine,
    // 将cur加上n后直接返回
    if s.size-s.cur >= n && s.waiters.Len() == 0 {
      s.cur += n
      s.mu.Unlock()
      return nil
    }
  
    // 请求的资源数大于能提供的最大的资源数
    // 这个任务处理不了,走错误处理逻辑
    if n > s.size {
      s.mu.Unlock()
      // 依赖ctx的状态返回,否则一直等待
      <-ctx.Done()
      return ctx.Err()
    }
    // 现存资源不够, 需要把调用者加入到等待队列中
    // 创建了一个ready chan,以便被通知唤醒
    ready := make(chan struct{})
    w := waiter{n: n, ready: ready}
    elem := s.waiters.PushBack(w)
    s.mu.Unlock()
  

    // 等待
    select {
    case <-ctx.Done(): // context的Done被关闭
      err := ctx.Err()
      s.mu.Lock()
      select {
      case <-ready: // 如果被唤醒了,忽略ctx的状态
        err = nil
      default: // 通知waiter
        isFront := s.waiters.Front() == elem
        // 已经通过ctx退出了,删除
        s.waiters.Remove(elem)
        // 如果当前元素正好位于链表最前面,且还存在可用的资源,就通知其它waiters
        if isFront && s.size > s.cur {
          s.notifyWaiters()
        }
      }
      s.mu.Unlock()
      return err
    case <-ready: // 等待者被唤醒了
      return nil
    }
  }

func (s *Weighted) TryAcquire(n int64) bool {
    s.mu.Lock()
    success := s.size-s.cur >= n && s.waiters.Len() == 0
    if success {
        s.cur += n
    }
    s.mu.Unlock()
    return success
}

func (s *Weighted) notifyWaiters() {
    for {
      next := s.waiters.Front()
      if next == nil {
        break // 没有等待者了,直接返回
      }
  
      w := next.Value.(waiter)
      if s.size-s.cur < w.n {
        // 如果现有资源不够队列头调用者请求的资源数,就退出所有等待者会继续等待
        // 这里还是按照先入先出的方式处理是为了避免饥饿
        break
      }
      s.cur += w.n
      s.waiters.Remove(next)
      close(w.ready)
    }
  }

Release

func (s *Weighted) Release(n int64) {
    s.mu.Lock()
    // 将当前计数值减去释放的资源数n
    s.cur -= n
    if s.cur < 0 {
      s.mu.Unlock()
      panic("semaphore: released more than held")
    }
    // 尝试唤醒等待队列中的调用者,看是否有足够的资源被获取
    s.notifyWaiters()
    s.mu.Unlock()
}

评论
  目录