Golang-Channel底层是怎么实现的?


引言

channel 是 golang 的最重要的一个结构,是区别于其他高级语言的最重要的特色之一,也是 goroutine 通信必须要的要素之一。下文将基于golang1.14从channel的数据结构&收、发操作的代码实现,进一步了解channel。

hchan struct

hchan 中的所有属性大致可以分为三类:

  1. buffer 相关的属性。例如 buf、dataqsiz、qcount 等。 当 channel 的缓冲区大小不为 0 时,buffer 中存放了待接收的数据。使用 ==环形队列==(ring buffer) 实现,FIFO。
  2. waitq 相关的属性,可以理解为是一个 FIFO 的标准队列。其中 recvq 中是正在等待接收数据的 goroutine,sendq 中是等待发送数据的 goroutine。waitq 使用==双向链表==实现。
  3. 其他属性,例如 lock、elemtype、closed 等。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
type hchan struct {
qcount uint // 队列中数据个数
dataqsiz uint // channel大小
buf unsafe.Pointer// 存放数据的环形数组
elemsize uint16 // channel中数据类型的大小
closed uint32 // 表示channel是否关闭
elemtype *_type // 元素数据类型
sendx uint // buffer 中已发送的索引位置 send index
recvx uint // buffer 中已接收的索引位置 receive index
recvq waitq // 等待接收的 goroutine list of recv waiters
sendq waitq // 等待发送的 goroutine list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}

type waitq struct {
first *sudog
last *sudog
}

type sudog struct {
g *g
selectdone *uint32 // CAS to 1 to win select race (may point to stack)
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
acquiretime int64
releasetime int64
ticket uint32
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}

我们可以看到

  1. channel 其实就是一个队列加一个锁,只不过这个锁是一个轻量级锁。
  2. recvq 是读操作阻塞在 channel 的 goroutine 列表,sendq 是写操作阻塞在 channel 的 goroutine 列表。
  3. 链表的实现是 sudog,其实就是一个对 g 的结构的封装。

makechan

  1. 参数校验 2-15行
  2. 初始化hchan 17-37行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func makechan(t *chantype, size int) *hchan {
elem := t.elem

// compiler checks this but be safe.
// 元素类型大小限制,不能啥也放
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
// 对齐限制
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}

mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}

var c *hchan
switch {
case mem == 0: // 没有buffer,只分配hchan结构体
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// 元素不含指针
// Allocate hchan and buf in one call. hchan和buffer一起分配,内存块连续
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers. hchan和buffer单独分配
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}

c.elemsize = uint16(elem.size) // channel里元素的大小
c.elemtype = elem // 表示channel里放的是啥
c.dataqsiz = uint(size) // 数组大小

return c
}

chansend

chansend 函数是在编译器解析到 c <- x 这样的代码的时候插入的,本质上就是把一个用户元素投递到 hchan 的 ringbuffer 中。chansend 调用的时候,一般用户会遇到三种情况:

  1. 投递成功,非常顺利,正常返回true
  2. 投递受阻,该函数阻塞,goroutine 切走
  3. 投递失败返回false
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 各种前置检测
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}

// channel wasn't closed during the first observation.
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}

var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 开始了,开始了
// 安全第一,先锁起来
lock(&c.lock)

// 向已关闭的channel发东西,panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}

// 场景1:发的时候刚好有人等着收,不需要走buffer,所以性能最好
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 场景2:还有空间,放入buffer,索引增加
if c.qcount < c.dataqsiz {
// 复制,相当于 c.buf[c.sendx]
qp := chanbuf(c, c.sendx)
// 数据拷贝到buffer中
typedmemmove(c.elemtype, qp, ep)
c.sendx++
// 环形
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 存储元素个数增加
c.qcount++
unlock(&c.lock)
return true
}
// 场景3
// 如果是非阻塞直接返回
if !block {
unlock(&c.lock)
return false
}

// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// goroutine相关结构入队列,等待唤醒
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
// 将 goroutine 转入 waiting 状态,并解锁,用户侧看就是阻塞住了。
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
KeepAlive(ep)

// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
// 资源释放
gp.waiting = nil
gp.activeStackChans = false
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}

golang内执行 ==<- x== 会调用chansend函数,会有三种场景:

  1. 场景一:如果有人( goroutine )等着取 channel 的元素,这种场景最快乐,直接把元素给他就完了,然后把它唤醒,hchan 本身递增下 ringbuffer 索引;举一反三:kafka也有这种高效操作。

  2. 场景二:如果 ringbuffer 还有空间,那么就把元素存着,这种也是场景的流程,存和取走的是异步流程,可以把 channel 理解成消息队列,生产者和消费者解耦;

  3. 场景三:ringbuffer 没空间,这个时候就要是否需要 block 了,一般来讲,c <- x 编译出的代码都是 block = true ,那么什么时候 chansend 的 block 参数会是 false 呢?答案是:select 的时候;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
select {
case c <- v:
// ... foo
default:
// ... bar
}

// 编译后
if selectnbsend(c, v) {
// ... foo
} else {
// ... bar
}

func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
// 调用 chansend 函数,block 参数为 false;
return chansend(c, elem, false, getcallerpc())
}

chanrecv

chanrecv 函数是在编译器解析到 <- c 这样的代码的时候插入的,本质上就是从sender或 hchan 的 ringbuffer 中取一个元素。chanrecv 调用的时候,一般用户会遇到三种情况:

  1. 接收成功,非常顺利,正常返回元素,true
  2. 接收受阻,该函数阻塞,goroutine 切走
  3. 接收失败返回nil,false
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// <- c 对应
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}

// v, ok := <- c 对应
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}

// 除了select的时候,block都是true
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 先各种判断
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}

// incorrect behavior when racing with a close.
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}

var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}

// 上锁,干活
lock(&c.lock)

// 关了且队列里没数据了
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}

// 场景1:你想要的时候刚好有人可以给,直接交到手里就好了
if sg := c.sendq.dequeue(); sg != nil {
// 如果 buffer 内没有剩余的元素,直接从sender拿数据,否则,从buffer的头部拿,并将sender的值放到buffer的尾部,拿一个立马在原位置放一个,能一定程度上保证有序性。
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// ringbuffer内还有没拿的元素
if c.qcount > 0 {
// 从队列拿
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
// 加索引值
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}

if !block {
unlock(&c.lock)
return false, false
}

// 没有人等着给,又没有存货,block住了
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}

mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// 入队等待唤醒
c.recvq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
// goroutine切走,让出cpu
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}

可以看到send和recv代码和处理情况基本一样:

  1. 如果是非阻塞模式( block=false ),并且没有任何可用元素,返回 (selected=false,received=false),这样就不会进到 select 的 case 分支;
  2. 如果是阻塞模式( block=true ),如果 chan 已经 closed 了,那么返回的是 (selected=true,received=false),说明需要进到 select 的分支,但是是没有取到元素的;
  3. 如果是阻塞模式,chan 还是正常状态,那么返回(selected=true,recived=true),说明正常取到了元素;

select部分和send基本一致,在编译时block参数设为false,不再重复。不过recv还可以通过range的方式进行。

  • for循环

    for-range 和 chan 的结束条件只有这个 chan 被 close 了,否则一直会处于这个死循环内部,因为block参数为true。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    for m := range c {
    // ... do something
    }

    func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    // 注意了,这个 block=true,说明 chanrecv 内部是阻塞的;
    _, received = chanrecv(c, elem, true)
    return
    }

    // 伪代码
    for ( ; ok = chanrecv2( c, ep ) ; ) {
    // do something
    }

参考

http://legendtkl.com/2017/08/06/golang-channel-implement/

https://zhuanlan.zhihu.com/p/297053654

https://www.cyhone.com/articles/analysis-of-golang-channel/


  目录