引言
channel 是 golang 的最重要的一个结构,是区别于其他高级语言的最重要的特色之一,也是 goroutine 通信必须要的要素之一。下文将基于golang1.14从channel的数据结构&收、发操作的代码实现,进一步了解channel。
hchan struct
hchan 中的所有属性大致可以分为三类:
- buffer 相关的属性。例如 buf、dataqsiz、qcount 等。 当 channel 的缓冲区大小不为 0 时,buffer 中存放了待接收的数据。使用 ==环形队列==(ring buffer) 实现,FIFO。
- waitq 相关的属性,可以理解为是一个 FIFO 的标准队列。其中 recvq 中是正在等待接收数据的 goroutine,sendq 中是等待发送数据的 goroutine。waitq 使用==双向链表==实现。
- 其他属性,例如 lock、elemtype、closed 等。
type hchan struct {
qcount uint // 队列中数据个数
dataqsiz uint // channel大小
buf unsafe.Pointer// 存放数据的环形数组
elemsize uint16 // channel中数据类型的大小
closed uint32 // 表示channel是否关闭
elemtype *_type // 元素数据类型
sendx uint // buffer 中已发送的索引位置 send index
recvx uint // buffer 中已接收的索引位置 receive index
recvq waitq // 等待接收的 goroutine list of recv waiters
sendq waitq // 等待发送的 goroutine list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
type waitq struct {
first *sudog
last *sudog
}
type sudog struct {
g *g
selectdone *uint32 // CAS to 1 to win select race (may point to stack)
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
acquiretime int64
releasetime int64
ticket uint32
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
我们可以看到
- channel 其实就是一个队列加一个锁,只不过这个锁是一个轻量级锁。
- recvq 是读操作阻塞在 channel 的 goroutine 列表,sendq 是写操作阻塞在 channel 的 goroutine 列表。
- 链表的实现是 sudog,其实就是一个对 g 的结构的封装。
makechan
- 参数校验 2-15行
- 初始化hchan 17-37行
func makechan(t *chantype, size int) *hchan { elem := t.elem // compiler checks this but be safe. // 元素类型大小限制,不能啥也放 if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } // 对齐限制 if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } var c *hchan switch { case mem == 0: // 没有buffer,只分配hchan结构体 // Queue or element size is zero. c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.ptrdata == 0: // 元素不含指针 // Allocate hchan and buf in one call. hchan和buffer一起分配,内存块连续 c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // Elements contain pointers. hchan和buffer单独分配 c = new(hchan) c.buf = mallocgc(mem, elem, true) } c.elemsize = uint16(elem.size) // channel里元素的大小 c.elemtype = elem // 表示channel里放的是啥 c.dataqsiz = uint(size) // 数组大小 return c }
chansend
chansend 函数是在编译器解析到 c <- x
这样的代码的时候插入的,本质上就是把一个用户元素投递到 hchan 的 ringbuffer 中。chansend 调用的时候,一般用户会遇到三种情况:
- 投递成功,非常顺利,正常返回true
- 投递受阻,该函数阻塞,goroutine 切走
- 投递失败返回false
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // 各种前置检测 if c == nil { if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") } // channel wasn't closed during the first observation. if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) { return false } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } // 开始了,开始了 // 安全第一,先锁起来 lock(&c.lock) // 向已关闭的channel发东西,panic if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } // 场景1:发的时候刚好有人等着收,不需要走buffer,所以性能最好 if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } // 场景2:还有空间,放入buffer,索引增加 if c.qcount < c.dataqsiz { // 复制,相当于 c.buf[c.sendx] qp := chanbuf(c, c.sendx) // 数据拷贝到buffer中 typedmemmove(c.elemtype, qp, ep) c.sendx++ // 环形 if c.sendx == c.dataqsiz { c.sendx = 0 } // 存储元素个数增加 c.qcount++ unlock(&c.lock) return true } // 场景3 // 如果是非阻塞直接返回 if !block { unlock(&c.lock) return false } // Block on the channel. Some receiver will complete our operation for us. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil // goroutine相关结构入队列,等待唤醒 c.sendq.enqueue(mysg) atomic.Store8(&gp.parkingOnChan, 1) // 将 goroutine 转入 waiting 状态,并解锁,用户侧看就是阻塞住了。 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) KeepAlive(ep) // someone woke us up. if mysg != gp.waiting { throw("G waiting list is corrupted") } // 资源释放 gp.waiting = nil gp.activeStackChans = false if gp.param == nil { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) return true }
golang内执行 ==<- x== 会调用chansend函数,会有三种场景:
场景一:如果有人( goroutine )等着取 channel 的元素,这种场景最快乐,直接把元素给他就完了,然后把它唤醒,hchan 本身递增下 ringbuffer 索引;举一反三:kafka也有这种高效操作。
场景二:如果 ringbuffer 还有空间,那么就把元素存着,这种也是场景的流程,存和取走的是异步流程,可以把 channel 理解成消息队列,生产者和消费者解耦;
场景三:ringbuffer 没空间,这个时候就要是否需要 block 了,一般来讲,
c <- x
编译出的代码都是block = true
,那么什么时候 chansend 的 block 参数会是 false 呢?答案是:select 的时候;select { case c <- v: // ... foo default: // ... bar } // 编译后 if selectnbsend(c, v) { // ... foo } else { // ... bar } func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) { // 调用 chansend 函数,block 参数为 false; return chansend(c, elem, false, getcallerpc()) }
chanrecv
chanrecv 函数是在编译器解析到 <- c
这样的代码的时候插入的,本质上就是从sender或 hchan 的 ringbuffer 中取一个元素。chanrecv 调用的时候,一般用户会遇到三种情况:
- 接收成功,非常顺利,正常返回元素,true
- 接收受阻,该函数阻塞,goroutine 切走
- 接收失败返回nil,false
// <- c 对应 func chanrecv1(c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true) } // v, ok := <- c 对应 func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { _, received = chanrecv(c, elem, true) return } // 除了select的时候,block都是true func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // 先各种判断 if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") // incorrect behavior when racing with a close. if !block && (c.dataqsiz == 0 && c.sendq.first == nil || c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) && atomic.Load(&c.closed) == 0 { return } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } // 上锁,干活 lock(&c.lock) // 关了且队列里没数据了 if c.closed != 0 && c.qcount == 0 { unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } // 场景1:你想要的时候刚好有人可以给,直接交到手里就好了 if sg := c.sendq.dequeue(); sg != nil { // 如果 buffer 内没有剩余的元素,直接从sender拿数据,否则,从buffer的头部拿,并将sender的值放到buffer的尾部,拿一个立马在原位置放一个,能一定程度上保证有序性。 recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } // ringbuffer内还有没拿的元素 if c.qcount > 0 { // 从队列拿 qp := chanbuf(c, c.recvx) if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) // 加索引值 c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } if !block { unlock(&c.lock) return false, false } // 没有人等着给,又没有存货,block住了 gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil // 入队等待唤醒 c.recvq.enqueue(mysg) atomic.Store8(&gp.parkingOnChan, 1) // goroutine切走,让出cpu gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // someone woke us up if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } closed := gp.param == nil gp.param = nil mysg.c = nil releaseSudog(mysg) return true, !closed }
可以看到send和recv代码和处理情况基本一样:
- 如果是非阻塞模式( block=false ),并且没有任何可用元素,返回 (selected=false,received=false),这样就不会进到 select 的 case 分支;
- 如果是阻塞模式( block=true ),如果 chan 已经 closed 了,那么返回的是 (selected=true,received=false),说明需要进到 select 的分支,但是是没有取到元素的;
- 如果是阻塞模式,chan 还是正常状态,那么返回(selected=true,recived=true),说明正常取到了元素;
select部分和send基本一致,在编译时block参数设为false,不再重复。不过recv还可以通过range的方式进行。
- for循环
for-range
和 chan 的结束条件只有这个 chan 被 close 了,否则一直会处于这个死循环内部,因为block参数为true。for m := range c { // ... do something } func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { // 注意了,这个 block=true,说明 chanrecv 内部是阻塞的; _, received = chanrecv(c, elem, true) return } // 伪代码 for ( ; ok = chanrecv2( c, ep ) ; ) { // do something }
参考
http://legendtkl.com/2017/08/06/golang-channel-implement/