golang chan
烟草的香味 人气:0前言
之前在看golang
多线程通信的时候, 看到了go 的管道. 当时就觉得这玩意很神奇, 因为之前接触过的不管是php
, java
, Python
, js
, c
等等, 都没有这玩意, 第一次见面, 难免勾起我的好奇心. 所以就想着看一看它具体是什么东西. 很明显, 管道是go
实现在语言层面的功能, 所以我以为需要去翻他的源码了. 虽然最终没有翻到C
的层次, 不过还是受益匪浅.
见真身
结构体
要想知道他是什么东西, 没什么比直接看他的定义更加直接的了. 但是其定义在哪里么? 去哪里找呢? 还记得我们是如何创建chan
的么? make
方法. 但是当我找过去的时候, 发现make
方法只是一个函数的声明.
这, 还是没有函数的具体实现啊. 汇编看一下. 编写以下内容:
package main func main() { _ = make(chan int) }
执行命令:
go tool compile -N -l -S main.go
虽然汇编咱看不懂, 但是其中有一行还是引起了我的注意.
make
调用了runtime.makechan
. 漂亮, 就找他.
找到他了, 是hchan
指针对象. 整理了一下对象的字段(不过人家自己也有注释的):
// 其内部维护了一个循环队列(数组), 用于管理发送与接收的缓存数据. type hchan struct { // 队列中元素个数 qcount uint // 队列的大小(数组长度) dataqsiz uint // 指向底层的缓存队列, 是一个可以指向任意类型的指针. buf unsafe.Pointer // 管道每个元素的大小 elemsize uint16 // 是否被关闭了 closed uint32 // 管道的元素类型 elemtype *_type // 当前可以发送的元素索引(队尾) sendx uint // 当前可以接收的元素索引(队首) recvx uint // 当前等待接收数据的 goroutine 队列 recvq waitq // 当前等待发送数据的 goroutine 队列 sendq waitq // 锁, 用来保证管道的每个操作都是原子性的. lock mutex }
可以看的出来, 管道简单说就是一个队列加一把锁.
发送数据
依旧使用刚才的方法分析, 发送数据时调用了runtime.chansend1
函数. 其实现简单易懂:
然后查看真正实现, 函数步骤如下(个人理解, 有一些 test 使用的代码被我删掉了. ):
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // 异常处理, 若管道指针为空 if c == nil { if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") } // 常量判断, 恒为 false, 应该是开发时调试用的. if debugChan { print("chansend: chan=", c, "\n") } // 常量, 恒为 false, 没看懂这个判断 if raceenabled { racereadpc(c.raceaddr(), callerpc, funcPC(chansend)) } // 若当前操作不阻塞, 且管道还没有关闭时判断 // 当前队列容量为0且没有等待接收数据的 或 当前队列容量不为0且队列已满 // 那么问题来了, 什么时候不加锁呢? select 的时候. 可以在不阻塞的时候快速返回 if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) { return false } // 上锁, 保证操作的原子性 lock(&c.lock) // 若管道已经关闭, 报错 if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } // 从接受者队列获取一个接受者, 若存在, 数据直接发送, 不走缓存, 提高效率 if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } // 若缓存为满, 则将数据放到缓存中排队 if c.qcount < c.dataqsiz { // 取出对尾的地址 qp := chanbuf(c, c.sendx) // 将ep 的内容拷贝到 ap 地址 typedmemmove(c.elemtype, qp, ep) // 更新队尾索引 c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } // 若当前不阻塞, 直接返回 if !block { unlock(&c.lock) return false } // 当走到这里, 说明数据没有成功发送, 且需要阻塞等待. // 以下代码没看懂, 不过可以肯定的是, 其操作为阻塞当前协程, 等待发送数据 gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) KeepAlive(ep) if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if gp.param == nil { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) return true }
虽然最终阻塞的地方没看太明白, 不过发送数据的大体流程很清楚:
- 若无需阻塞且不能发送数据, 返回失败
- 若存在接收者, 直接发送数据
- 若存在缓存, 将数据放到缓存中
- 若无需阻塞, 返回失败
- 阻塞等待发送数据
其中不加锁的操作, 在看到selectnbsend
函数的注释时如下:
// compiler implements // // select { // case c <- v: // ... foo // default: // ... bar // } // // as // // if selectnbsend(c, v) { // ... foo // } else { // ... bar // } // func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) { return chansend(c, elem, false, getcallerpc()) }
看这意思, select
关键字有点类似于语法糖, 其内部会转换成调用selectnbsend
函数的简单if
判断.
接收数据
至于接收数据的方法, 其内部实现与发送大同小异. runtime.chanrecv
方法.
源码简单看了一下, 虽理解不深, 但对channel
也有了大体的认识.
上手
简单对channel
的使用总结一下.
定义
// 创建普通的管道类型, 非缓冲 a := make(chan int) // 创建缓冲区大小为10的管道 b := make(chan int, 10) // 创建只用来发送的管道 c := make(chan<- int) // 创建只用来接收的管道 d := make(<-chan int) // eg: 只用来接收的管道, 每秒一个 e := time.After(time.Second)
发送与接收
// 接收数据 a := <- ch b, ok := <- ch // 发送数据 ch <- 2
最后, 看了一圈, 感觉channel
并不是很复杂, 就是一个队列, 一端接受, 一端发送. 不过其对多协程处理做了很多优化. 与协程配合, 灵活使用的话, 应该会有不错的效果.
加载全部内容