缓存击穿
高并发场景下,为降低数据库压力,通常通过localcache、redis等设置缓存。接到请求后先从缓存读,如果存在则直接返回,否则需要从数据库读取,然后写到缓存。
通常查询数据库在整个请求流程中的耗时是最长的,高并发情况下,可能存在某个请求触发了读数据库的操作但没有返回和写入缓存,在这个期间后续的请求也触发相同的读请求,导致数据库瞬间请求量暴增或者直接被打死。
分布式锁可以解决这个问题,即第一个触发的请求加锁,等到写入缓存后释放锁。其他的请求只需要在发现有锁后等待并读取缓存数据即可。golang提供了更轻量的解决方法——singleFlight
SingleFlight
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| package main
import ( "errors" "fmt" "log" "strconv" "sync" "time"
"golang.org/x/sync/singleflight" )
var ( g singleflight.Group ErrCacheMiss = errors.New("cache miss") )
func main() { var wg sync.WaitGroup wg.Add(10)
for i := 0; i < 10; i++ { go func() { defer wg.Done() data, err := load("key") if err != nil { log.Print(err) return } log.Println(data) }() } wg.Wait() }
func load(key string) (string, error) { data, err := loadFromCache(key) if err != nil && err == ErrCacheMiss { v, err, _ := g.Do(key, func() (interface{}, error) { data, err := loadFromDB(key) if err != nil { return nil, err } setCache(key, data) return data, nil }) if err != nil { log.Println(err) return "", err } data = v.(string) } return data, nil }
func loadFromCache(key string) (string, error) { return "", ErrCacheMiss }
func setCache(key, data string) {}
func loadFromDB(key string) (string, error) { fmt.Println("query db") unix := strconv.Itoa(int(time.Now().UnixNano())) return unix, nil }
|
结果:
1 2 3 4 5 6 7 8 9 10 11
| query db 2021/07/17 11:04:13 1626491053454483100 2021/07/17 11:04:13 1626491053454483100 2021/07/17 11:04:13 1626491053454483100 2021/07/17 11:04:13 1626491053454483100 2021/07/17 11:04:13 1626491053454483100 2021/07/17 11:04:13 1626491053454483100 2021/07/17 11:04:13 1626491053454483100 2021/07/17 11:04:13 1626491053454483100 2021/07/17 11:04:13 1626491053454483100 2021/07/17 11:04:13 1626491053454483100
|
源码
Group
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| type Group struct { mu sync.Mutex m map[string]*call }
type call struct { wg sync.WaitGroup val interface{} err error
forgotten bool
dups int chans []chan<- Result }
|
Do
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } if c, ok := g.m[key]; ok { c.dups++ g.mu.Unlock() c.wg.Wait() if e, ok := c.err.(*panicError); ok { panic(e) } else if c.err == errGoexit { runtime.Goexit() } return c.val, c.err, true } c := new(call) c.wg.Add(1) g.m[key] = c g.mu.Unlock() g.doCall(c, key, fn) return c.val, c.err, c.dups > 0 }
|
doCall
defer的使用值得学习
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { normalReturn := false recovered := false
defer func() { if !normalReturn && !recovered { c.err = errGoexit }
c.wg.Done() g.mu.Lock() defer g.mu.Unlock() if !c.forgotten { delete(g.m, key) }
if e, ok := c.err.(*panicError); ok { if len(c.chans) > 0 { go panic(e) select {} } else { panic(e) } } else if c.err == errGoexit { } else { for _, ch := range c.chans { ch <- Result{c.val, c.err, c.dups > 0} } } }()
func() { defer func() { if !normalReturn { if r := recover(); r != nil { c.err = newPanicError(r) } } }()
c.val, c.err = fn() normalReturn = true }() if !normalReturn { recovered = true } }
|
DoChan
和 do 唯一的区别是 go g.doCall(c, key, fn)
,但对起了一个 goroutine 来执行,并通过 channel 来返回数据,这样外部可以自定义超时逻辑,防止因为 fn 的阻塞,导致大量请求都被阻塞。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { ch := make(chan Result, 1) g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } if c, ok := g.m[key]; ok { c.dups++ c.chans = append(c.chans, ch) g.mu.Unlock() return ch } c := &call{chans: []chan<- Result{ch}} c.wg.Add(1) g.m[key] = c g.mu.Unlock()
go g.doCall(c, key, fn)
return ch }
|
Forget
手动移除某个 key,让后续请求能走 doCall 的逻辑,而不是直接阻塞。
1 2 3 4 5 6 7 8
| func (g *Group) Forget(key string) { g.mu.Lock() if c, ok := g.m[key]; ok { c.forgotten = true } delete(g.m, key) g.mu.Unlock() }
|
注意
阻塞
singleflight 内部使用 waitGroup 来让同一个 key 的除了第一个请求的后续所有请求都阻塞。直到第一个请求执行 fn 返回后,其他请求才会返回。
这意味着,如果 fn 执行需要很长时间,那么后面的所有请求都会被一直阻塞。
这时候我们可以使用 DoChan 结合 ctx + select 做超时控制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| func loadChan(ctx context.Context,key string) (string, error) { data, err := loadFromCache(key) if err != nil && err == ErrCacheMiss { result := g.DoChan(key, func() (interface{}, error) { data, err := loadFromDB(key) if err != nil { return nil, err } setCache(key, data) return data, nil }) select { case r := <-result: return r.Val.(string), r.Err case <-ctx.Done(): return "", ctx.Err() } } return data, nil }
|
手动forget
比如1秒内有100个请求过来,正常是第一个请求能执行queryDB,后续99个都会阻塞。
增加这个 Forget 之后,每 100ms 就能有一个请求执行 queryDB,相当于是多了几次尝试的机会,相对的也给DB造成了更大的压力,需要根据具体场景进去取舍
。
1 2 3 4
| go func() { time.Sleep(100 * time.Millisecond) g.Forget(key) }()
|
原文
https://www.lixueduan.com/posts/go/singleflight/