令牌桶


原理

  1. 以v的速度向桶内放置令牌,桶的容量为b,如果桶满了多余令牌就会被丢弃。
  2. 请求到达时,我们向桶内获取令牌,如果令牌足够,则请求通过。
  3. 如果桶内令牌不够,则这个请求会被缓存等待令牌足够是转发,或者被直接丢掉。
  4. 桶的容量b,可以应对突发的流量。

rate库

  1. 基本结构体

    type Limit float64
    type Limiter struct {
     limit     Limit      // 每秒放入的令牌数量
     burst     int        // 桶大小
     mu        sync.Mutex // 锁
     tokens    float64    // 剩余令牌数量
     last      time.Time  // 上一次取走tokens的时间
     lastEvent time.Time  // 最近限流事件的时间
    }
    // 令牌发放之后,会存储在 Reservation 预约对象中
    type Reservation struct {
     ok        bool      // 是否满足条件分配了token
     lim       *Limiter  // 发送令牌的限流器
     tokens    int       // 发送 token 令牌的数量
     timeToAct time.Time // 满足令牌发放的时间
     limit     Limit     // 令牌发放速度
    }
  2. 方法

    // 构建一个限流器,r 是每秒放入的令牌数量,b 是桶的大小
    func NewLimiter(r Limit, b int) *Limiter
    // 分别返回 b 和 r 的值
    func (lim *Limiter) Burst() int
    func (lim *Limiter) Limit() Limit
    // token 3种消费方法
    func (lim *Limiter) Allow() bool // Allow(now, 1)
    // 表示截止到now这个时间点,是否存在n个token
    func (lim *Limiter) AllowN(now time.Time, n int) bool
    func (lim *Limiter) Reserve() *Reservation // ReserveN(now, 1)
    // ReserveN和AllowN类似,表示截止到now这个时间点,是否存在n个token,无论Token是否充足,都会返回Reservation对象
    func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation
    func (lim *Limiter) Wait(ctx context.Context) (err error) // WaitN(ctx, 1)
    // 如果存在n个令牌就直接转发,不存在就阻塞,直至满足条件,传入的ctx的Deadline就是等待的Deadline
    func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)
    // 动态流控
    // 设置桶的大小
    func (lim *Limiter) SetBurst(newBurst int)
    // 流控最后的更新时间
    func (lim *Limiter) SetBurstAt(now time.Time, newBurst int)
    // token 生产速率
    func (lim *Limiter) SetLimit(newLimit Limit)
    // 流控最后的更新时间
    func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit)
    // 如果不想等待,可以调用Cancel()方法,该方法会将Token归还。
    func (r *Reservation) Cancel()
    func (r *Reservation) CancelAt(now time.Time)
    // 返回需要等待的时间。如果等待时间为0,则说明不用等待
    func (r *Reservation) Delay() time.Duration
    func (r *Reservation) DelayFrom(now time.Time) time.Duration
    func (r *Reservation) OK() bool
  3. 使用

    // 对ip进行限流
    var limiters = &sync.Map{}
    func rateLimiter(r rate.Limit, b int, ip string, t time.Duration) bool {
     l, _ := limiters.LoadOrStore(ip, rate.NewLimiter(r, b))
     ctx, cancel := context.WithTimeout(context.Background(), t)
     defer cancel()
     if err := l.(*rate.Limiter).Wait(ctx); err != nil {
         return false
     }
     return true
    }

评论
  目录