Golang-信号量semaphore


semaphore

带权重的信号量在Golang中提供了一种灵活的机制,用于管理对共享资源的并发访问。通过合理使用带权重的信号量,可以更好地平衡并发访问和资源使用,从而提高程序的性能和稳定性

数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 一个 watier 就表示一个请求,其中n表示这次请求的资源数量
type waiter struct {
n int64
ready chan<- struct{} // Closed when semaphore acquired.
}

type Weighted struct {
// 最大资源数量,取走时会减少,释放时会增加
size int64
// 记录当前已使用资源数,值范围[0 - size]
cur int64
mu sync.Mutex
// 当前处于等待休眠的请求者goroutine,每个请求者请求的资源数量可能不一样,只有在请求时,可用资源数量不足时请求者才会进入请求链表,每个请求者表示一个goroutine
waiters list.List
}

方法列表

1
2
3
4
5
6
7
8
9
// 创建一类资源,参数 n 资源表示最大可用资源总个数
func NewWeighted(n int64) *Weighted
// 获取指定个数的资源,如果当前没有空闲资源可用,当前请求者goroutine将陷入休眠状态
// 可以一次获取多个资源,如果没有足够多的资源,调用者就会被阻塞。它的第一个参数是 Context,这就意味着,你可以通过 Context 增加超时或者 cancel 的机制。如果是正常获取了资源,就返回 nil;否则,就返回 ctx.Err(),信号量不改变
func (s *Weighted) Acquire(ctx context.Context, n int64) error
// 释放资源
func (s *Weighted) Release(n int64)
// 同 Acquire 一样,但当无空闲资源将直接返回false,而不阻塞
func (s *Weighted) TryAcquire(n int64) bool

Acquire 和 TryAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
s.mu.Lock()
// 如果恰好有足够的资源,也没有排队等待获取资源的goroutine,
// 将cur加上n后直接返回
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n
s.mu.Unlock()
return nil
}

// 请求的资源数大于能提供的最大的资源数
// 这个任务处理不了,走错误处理逻辑
if n > s.size {
s.mu.Unlock()
// 依赖ctx的状态返回,否则一直等待
<-ctx.Done()
return ctx.Err()
}
// 现存资源不够, 需要把调用者加入到等待队列中
// 创建了一个ready chan,以便被通知唤醒
ready := make(chan struct{})
w := waiter{n: n, ready: ready}
elem := s.waiters.PushBack(w)
s.mu.Unlock()


// 等待
select {
case <-ctx.Done(): // context的Done被关闭
err := ctx.Err()
s.mu.Lock()
select {
case <-ready: // 如果被唤醒了,忽略ctx的状态
err = nil
default: // 通知waiter
isFront := s.waiters.Front() == elem
// 已经通过ctx退出了,删除
s.waiters.Remove(elem)
// 如果当前元素正好位于链表最前面,且还存在可用的资源,就通知其它waiters
if isFront && s.size > s.cur {
s.notifyWaiters()
}
}
s.mu.Unlock()
return err
case <-ready: // 等待者被唤醒了
return nil
}
}

func (s *Weighted) TryAcquire(n int64) bool {
s.mu.Lock()
success := s.size-s.cur >= n && s.waiters.Len() == 0
if success {
s.cur += n
}
s.mu.Unlock()
return success
}

func (s *Weighted) notifyWaiters() {
for {
next := s.waiters.Front()
if next == nil {
break // 没有等待者了,直接返回
}

w := next.Value.(waiter)
if s.size-s.cur < w.n {
// 如果现有资源不够队列头调用者请求的资源数,就退出所有等待者会继续等待
// 这里还是按照先入先出的方式处理是为了避免饥饿
break
}
s.cur += w.n
s.waiters.Remove(next)
close(w.ready)
}
}

Release

1
2
3
4
5
6
7
8
9
10
11
12
func (s *Weighted) Release(n int64) {
s.mu.Lock()
// 将当前计数值减去释放的资源数n
s.cur -= n
if s.cur < 0 {
s.mu.Unlock()
panic("semaphore: released more than held")
}
// 尝试唤醒等待队列中的调用者,看是否有足够的资源被获取
s.notifyWaiters()
s.mu.Unlock()
}

  目录