原理
- 以v的速度向桶内放置令牌,桶的容量为b,如果桶满了多余令牌就会被丢弃。
- 请求到达时,我们向桶内获取令牌,如果令牌足够,则请求通过。
- 如果桶内令牌不够,则这个请求会被缓存等待令牌足够是转发,或者被直接丢掉。
- 桶的容量b,可以应对突发的流量。
rate库
基本结构体
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19type Limit float64
type Limiter struct {
limit Limit // 每秒放入的令牌数量
burst int // 桶大小
mu sync.Mutex // 锁
tokens float64 // 剩余令牌数量
last time.Time // 上一次取走tokens的时间
lastEvent time.Time // 最近限流事件的时间
}
// 令牌发放之后,会存储在 Reservation 预约对象中
type Reservation struct {
ok bool // 是否满足条件分配了token
lim *Limiter // 发送令牌的限流器
tokens int // 发送 token 令牌的数量
timeToAct time.Time // 满足令牌发放的时间
limit Limit // 令牌发放速度
}方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29// 构建一个限流器,r 是每秒放入的令牌数量,b 是桶的大小
func NewLimiter(r Limit, b int) *Limiter
// 分别返回 b 和 r 的值
func (lim *Limiter) Burst() int
func (lim *Limiter) Limit() Limit
// token 3种消费方法
func (lim *Limiter) Allow() bool // Allow(now, 1)
// 表示截止到now这个时间点,是否存在n个token
func (lim *Limiter) AllowN(now time.Time, n int) bool
func (lim *Limiter) Reserve() *Reservation // ReserveN(now, 1)
// ReserveN和AllowN类似,表示截止到now这个时间点,是否存在n个token,无论Token是否充足,都会返回Reservation对象
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation
func (lim *Limiter) Wait(ctx context.Context) (err error) // WaitN(ctx, 1)
// 如果存在n个令牌就直接转发,不存在就阻塞,直至满足条件,传入的ctx的Deadline就是等待的Deadline
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)
// 动态流控
// 设置桶的大小
func (lim *Limiter) SetBurst(newBurst int)
// 流控最后的更新时间
func (lim *Limiter) SetBurstAt(now time.Time, newBurst int)
// token 生产速率
func (lim *Limiter) SetLimit(newLimit Limit)
// 流控最后的更新时间
func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit)1
2
3
4
5
6
7// 如果不想等待,可以调用Cancel()方法,该方法会将Token归还。
func (r *Reservation) Cancel()
func (r *Reservation) CancelAt(now time.Time)
// 返回需要等待的时间。如果等待时间为0,则说明不用等待
func (r *Reservation) Delay() time.Duration
func (r *Reservation) DelayFrom(now time.Time) time.Duration
func (r *Reservation) OK() bool使用
1
2
3
4
5
6
7
8
9
10
11
12
13// 对ip进行限流
var limiters = &sync.Map{}
func rateLimiter(r rate.Limit, b int, ip string, t time.Duration) bool {
l, _ := limiters.LoadOrStore(ip, rate.NewLimiter(r, b))
ctx, cancel := context.WithTimeout(context.Background(), t)
defer cancel()
if err := l.(*rate.Limiter).Wait(ctx); err != nil {
return false
}
return true
}