Golang-sync-SingleFlight


缓存击穿

  高并发场景下,为降低数据库压力,通常通过localcache、redis等设置缓存。接到请求后先从缓存读,如果存在则直接返回,否则需要从数据库读取,然后写到缓存。

  通常查询数据库在整个请求流程中的耗时是最长的,高并发情况下,可能存在某个请求触发了读数据库的操作但没有返回和写入缓存,在这个期间后续的请求也触发相同的读请求,导致数据库瞬间请求量暴增或者直接被打死。

  分布式锁可以解决这个问题,即第一个触发的请求加锁,等到写入缓存后释放锁。其他的请求只需要在发现有锁后等待并读取缓存数据即可。golang提供了更轻量的解决方法——singleFlight

SingleFlight

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package main

import (
"errors"
"fmt"
"log"
"strconv"
"sync"
"time"

"golang.org/x/sync/singleflight"
)

var (
g singleflight.Group
ErrCacheMiss = errors.New("cache miss")
)

func main() {
var wg sync.WaitGroup
wg.Add(10)

// 模拟10个并发
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
data, err := load("key")
if err != nil {
log.Print(err)
return
}
log.Println(data)
}()
}
wg.Wait()
}

// 获取数据
func load(key string) (string, error) {
data, err := loadFromCache(key)
if err != nil && err == ErrCacheMiss {
// 利用 singleflight 来归并请求
v, err, _ := g.Do(key, func() (interface{}, error) {
data, err := loadFromDB(key)
if err != nil {
return nil, err
}
setCache(key, data)
return data, nil
})
if err != nil {
log.Println(err)
return "", err
}
data = v.(string)
}
return data, nil
}

// getDataFromCache 模拟从cache中获取值 cache miss
func loadFromCache(key string) (string, error) {
return "", ErrCacheMiss
}

// setCache 写入缓存
func setCache(key, data string) {}

// getDataFromDB 模拟从数据库中获取值
func loadFromDB(key string) (string, error) {
fmt.Println("query db")
unix := strconv.Itoa(int(time.Now().UnixNano()))
return unix, nil
}

结果:

1
2
3
4
5
6
7
8
9
10
11
query db
2021/07/17 11:04:13 1626491053454483100
2021/07/17 11:04:13 1626491053454483100
2021/07/17 11:04:13 1626491053454483100
2021/07/17 11:04:13 1626491053454483100
2021/07/17 11:04:13 1626491053454483100
2021/07/17 11:04:13 1626491053454483100
2021/07/17 11:04:13 1626491053454483100
2021/07/17 11:04:13 1626491053454483100
2021/07/17 11:04:13 1626491053454483100
2021/07/17 11:04:13 1626491053454483100

源码

Group

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized,map的key就是唯一标识
}

type call struct {
wg sync.WaitGroup
// 函数返回值和err信息
val interface{}
err error

// 是否调用了 forget 方法
forgotten bool

// 记录这个 key 被分享了多少次
dups int
chans []chan<- Result
}

Do

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil { // 懒加载
g.m = make(map[string]*call)
}
// 先判断 key 是否已经存在
if c, ok := g.m[key]; ok { // 存在则说明有其他请求在同步执行,本次请求只需要等待即可
c.dups++
g.mu.Unlock()
c.wg.Wait() // / 等待最先进来的那个请求执行完成,因为需要完成后才能获取到结果,这里用 wg 来阻塞,避免了手动写一个循环等待的逻辑
// 这里区分 panic 错误和 runtime 的错误,避免出现死锁,后面可以看到为什么这么做
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
// 最后直接从 call 对象中取出数据并返回
return c.val, c.err, true
}
// 如果 key 不存在则会走到这里 new 一个 call 并执行
c := new(call)
c.wg.Add(1)
g.m[key] = c // 注意 这里在 Unlock 之前就把 call 写到 m 中了,所以 这部分逻辑只有第一次请求会执行
g.mu.Unlock()

// 然后我们调用 doCall 去执行
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}

doCall

defer的使用值得学习

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
// 首先这两个 bool 用于标记是否正常返回或者触发了 recover
normalReturn := false
recovered := false

defer func() {
// 如果既没有正常执行完毕,又没有 recover 那就说明需要直接退出了
if !normalReturn && !recovered {
c.err = errGoexit
}

c.wg.Done() // 这里 done 之后前面的所有 wait 都会返回了
g.mu.Lock()
defer g.mu.Unlock()
// forgotten 默认值就是 false,所以默认就会调用 delete 移除掉 m 中的 key
if !c.forgotten { // 然后这里也很巧妙,前面先调用了 done,于是所有等待的请求都返回了,那么这个c也没有用了,所以直接 delete 把这个 key 删掉,让后续的请求能再次触发 doCall,而不是直接从 m 中获取结果返回。
delete(g.m, key)
}

if e, ok := c.err.(*panicError); ok {
// 如果返回的是 panic 错误,为了避免 channel 死锁,我们需要确保这个 panic 无法被恢复
if len(c.chans) > 0 {
go panic(e)
select {} // Keep this goroutine around so that it will appear in the crash dump.
} else {
panic(e)
}
} else if c.err == errGoexit {
// 如果是exitError就直接退出
} else {
// 这里就是正常逻辑了,往 channel 里写入数据
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()

func() { // 使用匿名函数,保证下面的 defer 能在上一个defer之前执行
defer func() {
// 如果不是正常退出那肯定是 panic 了
if !normalReturn {
// 如果 panic 了我们就 recover 掉,然后 new 一个 panic 的错误后面在上层重新 panic
if r := recover(); r != nil {
c.err = newPanicError(r)
}
}
}()

c.val, c.err = fn()
// 如果我们传入的 fn 正常执行了 normalReturn 肯定会被修改为 true
// 所以 defer 里可以通过这个标记来判定是否 panic 了
normalReturn = true
}()

// 如果 normalReturn 为 false 就表示,我们的 fn panic 了
// 如果执行到了这一步,也说明我们的 fn 也被 recover 住了,不是直接 runtime exit
if !normalReturn {
recovered = true
}
}

DoChan

和 do 唯一的区别是 go g.doCall(c, key, fn),但对起了一个 goroutine 来执行,并通过 channel 来返回数据,这样外部可以自定义超时逻辑,防止因为 fn 的阻塞,导致大量请求都被阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()

go g.doCall(c, key, fn)

return ch
}

Forget

手动移除某个 key,让后续请求能走 doCall 的逻辑,而不是直接阻塞。

1
2
3
4
5
6
7
8
func (g *Group) Forget(key string) {
g.mu.Lock()
if c, ok := g.m[key]; ok {
c.forgotten = true
}
delete(g.m, key)
g.mu.Unlock()
}

注意

阻塞

singleflight 内部使用 waitGroup 来让同一个 key 的除了第一个请求的后续所有请求都阻塞。直到第一个请求执行 fn 返回后,其他请求才会返回。

这意味着,如果 fn 执行需要很长时间,那么后面的所有请求都会被一直阻塞。

这时候我们可以使用 DoChan 结合 ctx + select 做超时控制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func loadChan(ctx context.Context,key string) (string, error) {
data, err := loadFromCache(key)
if err != nil && err == ErrCacheMiss {
// 使用 DoChan 结合 select 做超时控制
result := g.DoChan(key, func() (interface{}, error) {
data, err := loadFromDB(key)
if err != nil {
return nil, err
}
setCache(key, data)
return data, nil
})
select {
case r := <-result:
return r.Val.(string), r.Err
case <-ctx.Done():
return "", ctx.Err()
}
}
return data, nil
}

手动forget

比如1秒内有100个请求过来,正常是第一个请求能执行queryDB,后续99个都会阻塞。

增加这个 Forget 之后,每 100ms 就能有一个请求执行 queryDB,相当于是多了几次尝试的机会,相对的也给DB造成了更大的压力,需要根据具体场景进去取舍

1
2
3
4
go func() {
time.Sleep(100 * time.Millisecond)
g.Forget(key)
}()

原文

https://www.lixueduan.com/posts/go/singleflight/


  目录