Golang-sync-SingleFlight


缓存击穿

  高并发场景下,为降低数据库压力,通常通过localcache、redis等设置缓存。接到请求后先从缓存读,如果存在则直接返回,否则需要从数据库读取,然后写到缓存。

  通常查询数据库在整个请求流程中的耗时是最长的,高并发情况下,可能存在某个请求触发了读数据库的操作但没有返回和写入缓存,在这个期间后续的请求也触发相同的读请求,导致数据库瞬间请求量暴增或者直接被打死。

  分布式锁可以解决这个问题,即第一个触发的请求加锁,等到写入缓存后释放锁。其他的请求只需要在发现有锁后等待并读取缓存数据即可。golang提供了更轻量的解决方法——singleFlight

SingleFlight

package main

import (
    "errors"
    "fmt"
    "log"
    "strconv"
    "sync"
    "time"

    "golang.org/x/sync/singleflight"
)

var (
    g            singleflight.Group
    ErrCacheMiss = errors.New("cache miss")
)

func main() {
    var wg sync.WaitGroup
    wg.Add(10)

    // 模拟10个并发
    for i := 0; i < 10; i++ {
        go func() {
            defer wg.Done()
            data, err := load("key")
            if err != nil {
                log.Print(err)
                return
            }
            log.Println(data)
        }()
    }
    wg.Wait()
}

// 获取数据
func load(key string) (string, error) {
    data, err := loadFromCache(key)
    if err != nil && err == ErrCacheMiss {
        // 利用 singleflight 来归并请求
        v, err, _ := g.Do(key, func() (interface{}, error) {
            data, err := loadFromDB(key)
            if err != nil {
                return nil, err
            }
            setCache(key, data)
            return data, nil
        })
        if err != nil {
            log.Println(err)
            return "", err
        }
        data = v.(string)
    }
    return data, nil
}

// getDataFromCache 模拟从cache中获取值 cache miss
func loadFromCache(key string) (string, error) {
    return "", ErrCacheMiss
}

// setCache 写入缓存
func setCache(key, data string) {}

// getDataFromDB 模拟从数据库中获取值
func loadFromDB(key string) (string, error) {
    fmt.Println("query db")
    unix := strconv.Itoa(int(time.Now().UnixNano()))
    return unix, nil
}

结果:

query db
2021/07/17 11:04:13 1626491053454483100
2021/07/17 11:04:13 1626491053454483100
2021/07/17 11:04:13 1626491053454483100
2021/07/17 11:04:13 1626491053454483100
2021/07/17 11:04:13 1626491053454483100
2021/07/17 11:04:13 1626491053454483100
2021/07/17 11:04:13 1626491053454483100
2021/07/17 11:04:13 1626491053454483100
2021/07/17 11:04:13 1626491053454483100
2021/07/17 11:04:13 1626491053454483100

源码

Group

type Group struct {
    mu sync.Mutex       // protects m
    m  map[string]*call // lazily initialized,map的key就是唯一标识
}

type call struct {
    wg sync.WaitGroup
    // 函数返回值和err信息
    val interface{}
    err error

    // 是否调用了 forget 方法
    forgotten bool

    // 记录这个 key 被分享了多少次
    dups  int
    chans []chan<- Result
}

Do

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
    g.mu.Lock()
    if g.m == nil { // 懒加载
        g.m = make(map[string]*call)
    }
    // 先判断 key 是否已经存在
    if c, ok := g.m[key]; ok { // 存在则说明有其他请求在同步执行,本次请求只需要等待即可
        c.dups++
        g.mu.Unlock()
        c.wg.Wait() // / 等待最先进来的那个请求执行完成,因为需要完成后才能获取到结果,这里用 wg 来阻塞,避免了手动写一个循环等待的逻辑
        // 这里区分 panic 错误和 runtime 的错误,避免出现死锁,后面可以看到为什么这么做
        if e, ok := c.err.(*panicError); ok {
            panic(e)
        } else if c.err == errGoexit {
            runtime.Goexit()
        }
        // 最后直接从 call 对象中取出数据并返回
        return c.val, c.err, true
    }
    // 如果 key 不存在则会走到这里 new 一个 call 并执行
    c := new(call)
    c.wg.Add(1)
    g.m[key] = c // 注意 这里在 Unlock 之前就把 call 写到 m 中了,所以 这部分逻辑只有第一次请求会执行
    g.mu.Unlock()
    
     // 然后我们调用 doCall 去执行
    g.doCall(c, key, fn)
    return c.val, c.err, c.dups > 0
}

doCall

defer的使用值得学习

func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
    // 首先这两个 bool 用于标记是否正常返回或者触发了 recover
    normalReturn := false
    recovered := false

    defer func() {
        // 如果既没有正常执行完毕,又没有 recover 那就说明需要直接退出了
        if !normalReturn && !recovered {
            c.err = errGoexit
        }

        c.wg.Done() // 这里 done 之后前面的所有 wait 都会返回了
        g.mu.Lock()
        defer g.mu.Unlock()
        // forgotten 默认值就是 false,所以默认就会调用 delete 移除掉 m 中的 key
        if !c.forgotten { // 然后这里也很巧妙,前面先调用了 done,于是所有等待的请求都返回了,那么这个c也没有用了,所以直接 delete 把这个 key 删掉,让后续的请求能再次触发 doCall,而不是直接从 m 中获取结果返回。
            delete(g.m, key)
        }

        if e, ok := c.err.(*panicError); ok {
            // 如果返回的是 panic 错误,为了避免 channel 死锁,我们需要确保这个 panic 无法被恢复
            if len(c.chans) > 0 {
                go panic(e)
                select {} // Keep this goroutine around so that it will appear in the crash dump.
            } else {
                panic(e)
            }
        } else if c.err == errGoexit {
            // 如果是exitError就直接退出
        } else {
            // 这里就是正常逻辑了,往 channel 里写入数据
            for _, ch := range c.chans {
                ch <- Result{c.val, c.err, c.dups > 0}
            }
        }
    }()

    func() { // 使用匿名函数,保证下面的 defer 能在上一个defer之前执行
        defer func() {
            // 如果不是正常退出那肯定是 panic 了
            if !normalReturn {
                 // 如果 panic 了我们就 recover 掉,然后 new 一个 panic 的错误后面在上层重新 panic
                if r := recover(); r != nil {
                    c.err = newPanicError(r)
                }
            }
        }()

        c.val, c.err = fn()
        // 如果我们传入的 fn 正常执行了 normalReturn 肯定会被修改为 true
        // 所以 defer 里可以通过这个标记来判定是否 panic 了
        normalReturn = true
    }()
    
    // 如果 normalReturn 为 false 就表示,我们的 fn panic 了
    // 如果执行到了这一步,也说明我们的 fn  也被 recover 住了,不是直接 runtime exit
    if !normalReturn {
        recovered = true
    }
}

DoChan

和 do 唯一的区别是 go g.doCall(c, key, fn),但对起了一个 goroutine 来执行,并通过 channel 来返回数据,这样外部可以自定义超时逻辑,防止因为 fn 的阻塞,导致大量请求都被阻塞。

func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
    ch := make(chan Result, 1)
    g.mu.Lock()
    if g.m == nil {
        g.m = make(map[string]*call)
    }
    if c, ok := g.m[key]; ok {
        c.dups++
        c.chans = append(c.chans, ch)
        g.mu.Unlock()
        return ch
    }
    c := &call{chans: []chan<- Result{ch}}
    c.wg.Add(1)
    g.m[key] = c
    g.mu.Unlock()

    go g.doCall(c, key, fn)

    return ch
}

Forget

手动移除某个 key,让后续请求能走 doCall 的逻辑,而不是直接阻塞。

func (g *Group) Forget(key string) {
    g.mu.Lock()
    if c, ok := g.m[key]; ok {
        c.forgotten = true
    }
    delete(g.m, key)
    g.mu.Unlock()
}

注意

阻塞

singleflight 内部使用 waitGroup 来让同一个 key 的除了第一个请求的后续所有请求都阻塞。直到第一个请求执行 fn 返回后,其他请求才会返回。

这意味着,如果 fn 执行需要很长时间,那么后面的所有请求都会被一直阻塞。

这时候我们可以使用 DoChan 结合 ctx + select 做超时控制

func loadChan(ctx context.Context,key string) (string, error) {
    data, err := loadFromCache(key)
    if err != nil && err == ErrCacheMiss {
        // 使用 DoChan 结合 select 做超时控制
        result := g.DoChan(key, func() (interface{}, error) {
            data, err := loadFromDB(key)
            if err != nil {
                return nil, err
            }
            setCache(key, data)
            return data, nil
        })
        select {
        case r := <-result:
            return r.Val.(string), r.Err
        case <-ctx.Done():
            return "", ctx.Err()
        }
    }
    return data, nil
}

手动forget

比如1秒内有100个请求过来,正常是第一个请求能执行queryDB,后续99个都会阻塞。

增加这个 Forget 之后,每 100ms 就能有一个请求执行 queryDB,相当于是多了几次尝试的机会,相对的也给DB造成了更大的压力,需要根据具体场景进去取舍

go func() {
       time.Sleep(100 * time.Millisecond)
       g.Forget(key)
   }()

原文

https://www.lixueduan.com/posts/go/singleflight/


评论
  目录