令牌桶


原理

  1. 以v的速度向桶内放置令牌,桶的容量为b,如果桶满了多余令牌就会被丢弃。
  2. 请求到达时,我们向桶内获取令牌,如果令牌足够,则请求通过。
  3. 如果桶内令牌不够,则这个请求会被缓存等待令牌足够是转发,或者被直接丢掉。
  4. 桶的容量b,可以应对突发的流量。

rate库

  1. 基本结构体

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    type Limit float64

    type Limiter struct {
    limit Limit // 每秒放入的令牌数量
    burst int // 桶大小
    mu sync.Mutex // 锁
    tokens float64 // 剩余令牌数量
    last time.Time // 上一次取走tokens的时间
    lastEvent time.Time // 最近限流事件的时间
    }

    // 令牌发放之后,会存储在 Reservation 预约对象中
    type Reservation struct {
    ok bool // 是否满足条件分配了token
    lim *Limiter // 发送令牌的限流器
    tokens int // 发送 token 令牌的数量
    timeToAct time.Time // 满足令牌发放的时间
    limit Limit // 令牌发放速度
    }
  2. 方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    // 构建一个限流器,r 是每秒放入的令牌数量,b 是桶的大小
    func NewLimiter(r Limit, b int) *Limiter

    // 分别返回 b 和 r 的值
    func (lim *Limiter) Burst() int
    func (lim *Limiter) Limit() Limit

    // token 3种消费方法
    func (lim *Limiter) Allow() bool // Allow(now, 1)
    // 表示截止到now这个时间点,是否存在n个token
    func (lim *Limiter) AllowN(now time.Time, n int) bool

    func (lim *Limiter) Reserve() *Reservation // ReserveN(now, 1)
    // ReserveN和AllowN类似,表示截止到now这个时间点,是否存在n个token,无论Token是否充足,都会返回Reservation对象
    func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation

    func (lim *Limiter) Wait(ctx context.Context) (err error) // WaitN(ctx, 1)
    // 如果存在n个令牌就直接转发,不存在就阻塞,直至满足条件,传入的ctx的Deadline就是等待的Deadline
    func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)

    // 动态流控
    // 设置桶的大小
    func (lim *Limiter) SetBurst(newBurst int)
    // 流控最后的更新时间
    func (lim *Limiter) SetBurstAt(now time.Time, newBurst int)
    // token 生产速率
    func (lim *Limiter) SetLimit(newLimit Limit)
    // 流控最后的更新时间
    func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit)
    1
    2
    3
    4
    5
    6
    7
    // 如果不想等待,可以调用Cancel()方法,该方法会将Token归还。
    func (r *Reservation) Cancel()
    func (r *Reservation) CancelAt(now time.Time)
    // 返回需要等待的时间。如果等待时间为0,则说明不用等待
    func (r *Reservation) Delay() time.Duration
    func (r *Reservation) DelayFrom(now time.Time) time.Duration
    func (r *Reservation) OK() bool
  3. 使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    // 对ip进行限流
    var limiters = &sync.Map{}

    func rateLimiter(r rate.Limit, b int, ip string, t time.Duration) bool {
    l, _ := limiters.LoadOrStore(ip, rate.NewLimiter(r, b))
    ctx, cancel := context.WithTimeout(context.Background(), t)
    defer cancel()

    if err := l.(*rate.Limiter).Wait(ctx); err != nil {
    return false
    }
    return true
    }

  目录