Golang-Channel底层是怎么实现的?


引言

channel 是 golang 的最重要的一个结构,是区别于其他高级语言的最重要的特色之一,也是 goroutine 通信必须要的要素之一。下文将基于golang1.14从channel的数据结构&收、发操作的代码实现,进一步了解channel。

hchan struct

hchan 中的所有属性大致可以分为三类:

  1. buffer 相关的属性。例如 buf、dataqsiz、qcount 等。 当 channel 的缓冲区大小不为 0 时,buffer 中存放了待接收的数据。使用 ==环形队列==(ring buffer) 实现,FIFO。
  2. waitq 相关的属性,可以理解为是一个 FIFO 的标准队列。其中 recvq 中是正在等待接收数据的 goroutine,sendq 中是等待发送数据的 goroutine。waitq 使用==双向链表==实现。
  3. 其他属性,例如 lock、elemtype、closed 等。
type hchan struct {
  qcount		uint					// 队列中数据个数
  dataqsiz	uint					// channel大小
  buf				unsafe.Pointer// 存放数据的环形数组
  elemsize	uint16				// channel中数据类型的大小
  closed		uint32				// 表示channel是否关闭
  elemtype	*_type				// 元素数据类型
  sendx			uint					// buffer 中已发送的索引位置 send index
  recvx			uint					// buffer 中已接收的索引位置 receive index
  recvq			waitq					// 等待接收的 goroutine list of recv waiters
  sendq			waitq					// 等待发送的 goroutine list of send waiters
  // lock protects all fields in hchan, as well as several
  // fields in sudogs blocked on this channel.
  // Do not change another G's status while holding this lock
  // (in particular, do not ready a G), as this can deadlock
  // with stack shrinking.
  lock			mutex
}
type waitq struct {
  first *sudog
  last	*sudog
}
type sudog struct {
  g						*g
  selectdone	*uint32 // CAS to 1 to win select race (may point to stack)
  next				*sudog
  prev				*sudog
  elem				unsafe.Pointer // data element (may point to stack)
  acquiretime int64
  releasetime int64
  ticket      uint32
  parent      *sudog // semaRoot binary tree
  waitlink    *sudog // g.waiting list or semaRoot
  waittail    *sudog // semaRoot
  c           *hchan // channel
}

我们可以看到

  1. channel 其实就是一个队列加一个锁,只不过这个锁是一个轻量级锁。
  2. recvq 是读操作阻塞在 channel 的 goroutine 列表,sendq 是写操作阻塞在 channel 的 goroutine 列表。
  3. 链表的实现是 sudog,其实就是一个对 g 的结构的封装。

makechan

  1. 参数校验 2-15行
  2. 初始化hchan 17-37行
    func makechan(t *chantype, size int) *hchan {
     elem := t.elem
     // compiler checks this but be safe.
      // 元素类型大小限制,不能啥也放
     if elem.size >= 1<<16 {
         throw("makechan: invalid channel element type")
     }
      // 对齐限制
     if hchanSize%maxAlign != 0 || elem.align > maxAlign {
         throw("makechan: bad alignment")
     }
     mem, overflow := math.MulUintptr(elem.size, uintptr(size))
     if overflow || mem > maxAlloc-hchanSize || size < 0 {
         panic(plainError("makechan: size out of range"))
     }
     var c *hchan
     switch {
     case mem == 0: // 没有buffer,只分配hchan结构体
         // Queue or element size is zero.
         c = (*hchan)(mallocgc(hchanSize, nil, true))
         // Race detector uses this location for synchronization.
         c.buf = c.raceaddr()
     case elem.ptrdata == 0:
         // 元素不含指针
         // Allocate hchan and buf in one call. hchan和buffer一起分配,内存块连续
         c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
         c.buf = add(unsafe.Pointer(c), hchanSize)
     default:
         // Elements contain pointers. hchan和buffer单独分配
         c = new(hchan)
         c.buf = mallocgc(mem, elem, true)
     }
     c.elemsize = uint16(elem.size) // channel里元素的大小
     c.elemtype = elem // 表示channel里放的是啥
     c.dataqsiz = uint(size) // 数组大小
     return c
    }

chansend

chansend 函数是在编译器解析到 c <- x 这样的代码的时候插入的,本质上就是把一个用户元素投递到 hchan 的 ringbuffer 中。chansend 调用的时候,一般用户会遇到三种情况:

  1. 投递成功,非常顺利,正常返回true
  2. 投递受阻,该函数阻塞,goroutine 切走
  3. 投递失败返回false
    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
      // 各种前置检测
     if c == nil {
         if !block {
             return false
         }
         gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
         throw("unreachable")
     }
     // channel wasn't closed during the first observation.
     if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
         (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
         return false
     }
     var t0 int64
     if blockprofilerate > 0 {
         t0 = cputicks()
     }
     // 开始了,开始了
      // 安全第一,先锁起来
     lock(&c.lock)
      // 向已关闭的channel发东西,panic
     if c.closed != 0 {
         unlock(&c.lock)
         panic(plainError("send on closed channel"))
     }
      // 场景1:发的时候刚好有人等着收,不需要走buffer,所以性能最好
     if sg := c.recvq.dequeue(); sg != nil {
         send(c, sg, ep, func() { unlock(&c.lock) }, 3)
         return true
     }
     // 场景2:还有空间,放入buffer,索引增加
     if c.qcount < c.dataqsiz {
     // 复制,相当于 c.buf[c.sendx]
         qp := chanbuf(c, c.sendx)
     // 数据拷贝到buffer中
         typedmemmove(c.elemtype, qp, ep)
         c.sendx++
     // 环形
         if c.sendx == c.dataqsiz {
             c.sendx = 0
         }
     // 存储元素个数增加
         c.qcount++
         unlock(&c.lock)
         return true
     }
      // 场景3
     // 如果是非阻塞直接返回
     if !block {
         unlock(&c.lock)
         return false
     }
     // Block on the channel. Some receiver will complete our operation for us.
     gp := getg()
     mysg := acquireSudog()
     mysg.releasetime = 0
     if t0 != 0 {
         mysg.releasetime = -1
     }
     // No stack splits between assigning elem and enqueuing mysg
     // on gp.waiting where copystack can find it.
     mysg.elem = ep
     mysg.waitlink = nil
     mysg.g = gp
     mysg.isSelect = false
     mysg.c = c
     gp.waiting = mysg
     gp.param = nil
      // goroutine相关结构入队列,等待唤醒
     c.sendq.enqueue(mysg)
     atomic.Store8(&gp.parkingOnChan, 1)
      // 将 goroutine 转入 waiting 状态,并解锁,用户侧看就是阻塞住了。
     gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
     KeepAlive(ep)
     // someone woke us up.
     if mysg != gp.waiting {
         throw("G waiting list is corrupted")
     }
      // 资源释放
     gp.waiting = nil
     gp.activeStackChans = false
     if gp.param == nil {
         if c.closed == 0 {
             throw("chansend: spurious wakeup")
         }
         panic(plainError("send on closed channel"))
     }
     gp.param = nil
     if mysg.releasetime > 0 {
         blockevent(mysg.releasetime-t0, 2)
     }
     mysg.c = nil
     releaseSudog(mysg)
     return true
    }

golang内执行 ==<- x== 会调用chansend函数,会有三种场景:

  1. 场景一:如果有人( goroutine )等着取 channel 的元素,这种场景最快乐,直接把元素给他就完了,然后把它唤醒,hchan 本身递增下 ringbuffer 索引;举一反三:kafka也有这种高效操作。

  2. 场景二:如果 ringbuffer 还有空间,那么就把元素存着,这种也是场景的流程,存和取走的是异步流程,可以把 channel 理解成消息队列,生产者和消费者解耦;

  3. 场景三:ringbuffer 没空间,这个时候就要是否需要 block 了,一般来讲,c <- x 编译出的代码都是 block = true ,那么什么时候 chansend 的 block 参数会是 false 呢?答案是:select 的时候;

    select {
      case c <- v:
      // ... foo
      default:
      // ... bar
    }
    // 编译后
    if selectnbsend(c, v) {
      //  ... foo
    } else {
      //  ... bar
    }
    func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
      // 调用 chansend 函数,block 参数为 false;
      return chansend(c, elem, false, getcallerpc())
    }

chanrecv

chanrecv 函数是在编译器解析到 <- c 这样的代码的时候插入的,本质上就是从sender或 hchan 的 ringbuffer 中取一个元素。chanrecv 调用的时候,一般用户会遇到三种情况:

  1. 接收成功,非常顺利,正常返回元素,true
  2. 接收受阻,该函数阻塞,goroutine 切走
  3. 接收失败返回nil,false
    // <- c 对应
    func chanrecv1(c *hchan, elem unsafe.Pointer) {
     chanrecv(c, elem, true)
    }
    // v, ok := <- c 对应
    func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
     _, received = chanrecv(c, elem, true)
     return
    }
    // 除了select的时候,block都是true
    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
     // 先各种判断
     if c == nil {
         if !block {
             return
         }
         gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
         throw("unreachable")
     // incorrect behavior when racing with a close.
     if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
         c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
         atomic.Load(&c.closed) == 0 {
         return
     }
     var t0 int64
     if blockprofilerate > 0 {
         t0 = cputicks()
     }
      // 上锁,干活
     lock(&c.lock)
      // 关了且队列里没数据了
     if c.closed != 0 && c.qcount == 0 {
         unlock(&c.lock)
         if ep != nil {
             typedmemclr(c.elemtype, ep)
         }
         return true, false
     }
      // 场景1:你想要的时候刚好有人可以给,直接交到手里就好了
     if sg := c.sendq.dequeue(); sg != nil {
     // 如果 buffer 内没有剩余的元素,直接从sender拿数据,否则,从buffer的头部拿,并将sender的值放到buffer的尾部,拿一个立马在原位置放一个,能一定程度上保证有序性。
         recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
         return true, true
     }
     // ringbuffer内还有没拿的元素
     if c.qcount > 0 {
         // 从队列拿
         qp := chanbuf(c, c.recvx)
         if ep != nil {
             typedmemmove(c.elemtype, ep, qp)
         }
         typedmemclr(c.elemtype, qp)
     // 加索引值
         c.recvx++
         if c.recvx == c.dataqsiz {
             c.recvx = 0
         }
         c.qcount--
         unlock(&c.lock)
         return true, true
     }
     if !block {
         unlock(&c.lock)
         return false, false
     }
     // 没有人等着给,又没有存货,block住了
     gp := getg()
     mysg := acquireSudog()
     mysg.releasetime = 0
     if t0 != 0 {
         mysg.releasetime = -1
     }
     mysg.elem = ep
     mysg.waitlink = nil
     gp.waiting = mysg
     mysg.g = gp
     mysg.isSelect = false
     mysg.c = c
     gp.param = nil
      // 入队等待唤醒
     c.recvq.enqueue(mysg)
     atomic.Store8(&gp.parkingOnChan, 1)
      // goroutine切走,让出cpu
     gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
     // someone woke us up
     if mysg != gp.waiting {
         throw("G waiting list is corrupted")
     }
     gp.waiting = nil
     gp.activeStackChans = false
     if mysg.releasetime > 0 {
         blockevent(mysg.releasetime-t0, 2)
     }
     closed := gp.param == nil
     gp.param = nil
     mysg.c = nil
     releaseSudog(mysg)
     return true, !closed
    }

可以看到send和recv代码和处理情况基本一样:

  1. 如果是非阻塞模式( block=false ),并且没有任何可用元素,返回 (selected=false,received=false),这样就不会进到 select 的 case 分支;
  2. 如果是阻塞模式( block=true ),如果 chan 已经 closed 了,那么返回的是 (selected=true,received=false),说明需要进到 select 的分支,但是是没有取到元素的;
  3. 如果是阻塞模式,chan 还是正常状态,那么返回(selected=true,recived=true),说明正常取到了元素;
    select部分和send基本一致,在编译时block参数设为false,不再重复。不过recv还可以通过range的方式进行。
  • for循环
    for-range 和 chan 的结束条件只有这个 chan 被 close 了,否则一直会处于这个死循环内部,因为block参数为true。
    for m := range c {
      // ...   do something
    }
    func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
      // 注意了,这个 block=true,说明 chanrecv 内部是阻塞的;
      _, received = chanrecv(c, elem, true)        
      return
    }
    // 伪代码
    for (   ; ok = chanrecv2( c, ep )  ;   ) {
      // do something
    }

参考

http://legendtkl.com/2017/08/06/golang-channel-implement/

https://zhuanlan.zhihu.com/p/297053654

https://www.cyhone.com/articles/analysis-of-golang-channel/


评论
  目录