导语 |在日常开发中,select语句被高频使用。但目前,全网分析select在编译期和运行时的完整底层原理资料,非常匮乏。本文基于Go1.18.1版本的源码,讲解select访问Channel在编译期和运行时的底层原理——select编译器优化用到的src/cmd/compile/internal/walk/select.go的walkSelectCases()函数和多case情况下运行时用到的runtime.selectgo()函数。希望能帮助到各位开发者。
结论先行
select {case <- chan1:// 如果 chan1 成功读到数据,则进行该 case 处理语句case chan2 <- 1:// 如果成功向 chan2 写入数据,则进行该 case 处理语句default:// 如果上面都没有成功,则进入default处理流程}
package mainfunc main() {select {}}
fatal error: all goroutines are asleep - deadlock!goroutine 1 [select (no cases)]:...
package mainimport ("fmt")func main() {ch1 := make(chan int, 1)ch2 := make(chan int)select {case <- ch1:// 从有缓冲chan中读取数据,由于缓冲区没有数据且没有发送者,该分支会阻塞fmt.Println("Received from ch")case i := <- ch2:// 从无缓冲chan中读取数据,由于没有发送者,该分支会阻塞fmt.Printf("i is: %d", i)}}
fatal error: all goroutines are asleep - deadlock!goroutine 1 [select]:...
package mainimport ("fmt")func main() {ch1 := make(chan int, 1)select {case <- ch1:// 从有缓冲chan中读取数据,由于缓冲区没有数据且没有发送者,该分支会阻塞fmt.Println("Received from ch")default:fmt.Println("this is default")}}
this is default
package mainimport ("fmt")func main() {ch1 := make(chan int, 1)ch1 <- 10select {case <- ch1:// ch1有发送者,该分支满足执行条件fmt.Println("Received from ch1")default:fmt.Println("this is default")}}
Received from ch1
package mainimport ("fmt")func main() {ch := make(chan int, 1)ch <- 10select {case val := <-ch:fmt.Println("Received from ch1, val =", val)case val := <-ch:fmt.Println("Received from ch2, val =", val)case val := <-ch:fmt.Println("Received from ch3, val =", val)default:fmt.Println("Run in default")}}
Received from ch2, val = 10
select在编译期和运行时的执行过程
type scase struct {c *hchan // case中使用的chanelem unsafe.Pointer // 指向case包含数据的指针}
func walkStmt(n ir.Node) ir.Node {......switch n.Op() {......case ir.OSELECT:n := n.(*ir.SelectStmt)walkSelect(n)return n......}......}
func walkSelect(sel *ir.SelectStmt) {lno := ir.SetPos(sel)if sel.Walked() {base.Fatalf("double walkSelect")}sel.SetWalked(true)init := ir.TakeInit(sel)// 编译器在中间代码生成期间会根据select中case的不同对控制语句进行优化init = append(init, walkSelectCases(sel.Cases)...)sel.Cases = nilsel.Compiled = initwalkStmtList(sel.Compiled)base.Pos = lno}
func walkSelectCases(cases []*ir.CommClause) []ir.Node {ncas := len(cases)sellineno := base.Pos// 编译器优化: select 没有case时if ncas == 0 {return []ir.Node{mkcallstmt("block")}}// 编译器优化: select只有一个case时if ncas == 1 {......}......}
select{}
func walkSelectCases(cases []*ir.CommClause) []ir.Node {ncas := len(cases)sellineno := base.Pos// 编译器优化: select没有case时if ncas == 0 {return []ir.Node{mkcallstmt("block")}}......}
// src/runtime/select.gofunc block() {gopark(nil, nil, waitReasonSelectNoCases, traceEvGoStop, 1) // forever}
ch := make(chan struct{})select {case data <- ch:fmt.Printf("ch data: %v\n", data)}
data := <- chfmt.Printf("ch data: %v\n", data)
// src/cmd/compile/internal/walk/select.gofunc walkSelectCases(cases []*ir.CommClause) []ir.Node {......// 编译器优化: select只有一个case时if ncas == 1 {cas := cases[0] // 获取第一个也是唯一的一个caseir.SetPos(cas)l := cas.Init()if cas.Comm != nil { // case类型不是default:n := cas.Comm // 获取case的条件语句l = append(l, ir.TakeInit(n)...)switch n.Op() { // 检查case对channel的操作类型:读或写default: // 如果case既不是读,也不是写channel,则直接报错base.Fatalf("select %v", n.Op())case ir.OSEND:// 如果对chan操作是写入类型,编译器无须做任何转换,直接是 chan <- datacase ir.OSELRECV2:// 如果对chan操作是接收类型, 完整形式为:data, ok := <- chanr := n.(*ir.AssignListStmt)// 如果具体是<- chan这种形式,即接收字段 data和ok为空,则直接转成 <- chanif ir.IsBlank(r.Lhs[0]) && ir.IsBlank(r.Lhs[1]) {n = r.Rhs[0]break}// 否则,是 data, ok := <- chan 这种形式r.SetOp(ir.OAS2RECV)}// 把编译器处理后的case语句条件加入待执行语句列表l = append(l, n)}// 把case条件后要执行的语句体加入待执行语句列表l = append(l, cas.Body...)// 默认加入break类型语句,跳出select-case语句体l = append(l, ir.NewBranchStmt(base.Pos, ir.OBREAK, nil))return l}......}
package mainimport ("fmt")func main() {ch := make(chan int)select {case ch <- 1:fmt.Println("run case 1")default:fmt.Println("run default")}}
if selectnbsend(ch, 1) {fmt.Println("run case 1")} else {fmt.Println("run default")}
func walkSelectCases(cases []*ir.CommClause) []ir.Node {......// 编译器优化: case 有两个case,一个是普通的channel操作,一个是defaultif ncas == 2 && dflt != nil {// 获取非default的casecas := cases[0]if cas == dflt {cas = cases[1]}n := cas.Commir.SetPos(n)r := ir.NewIfStmt(base.Pos, nil, nil, nil)r.SetInit(cas.Init())var cond ir.Nodeswitch n.Op() {default:base.Fatalf("select %v", n.Op())case ir.OSEND:// 如果该case是对channel的写入操作,则调用运行时的selectnbsend 函数n := n.(*ir.SendStmt)ch := n.Chancond = mkcall1(chanfn("selectnbsend", 2, ch.Type()), types.Types[types.TBOOL], r.PtrInit(), ch, n.Value)case ir.OSELRECV2:// 如果该case是对channel的读取操作,会调用运行时的selectnbrecv 函数n := n.(*ir.AssignListStmt)recv := n.Rhs[0].(*ir.UnaryExpr)ch := recv.Xelem := n.Lhs[0]if ir.IsBlank(elem) {elem = typecheck.NodNil()}cond = typecheck.Temp(types.Types[types.TBOOL])fn := chanfn("selectnbrecv", 2, ch.Type())call := mkcall1(fn, fn.Type().Results(), r.PtrInit(), elem, ch)as := ir.NewAssignListStmt(r.Pos(), ir.OAS2, []ir.Node{cond, n.Lhs[1]}, []ir.Node{call})r.PtrInit().Append(typecheck.Stmt(as))}r.Cond = typecheck.Expr(cond)r.Body = cas.Body// 将default语句放入if语句的else分支r.Else = append(dflt.Init(), dflt.Body...)return []ir.Node{r, ir.NewBranchStmt(base.Pos, ir.OBREAK, nil)}}......}
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {return chanrecv(c, elem, false)}func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {return chansend(c, elem, false, getcallerpc())}5)当select有多个channel的case
package mainimport ("fmt")func main() {ch1 := make(chan int)ch2 := make(chan int)select {case ch1 <- 1:fmt.Println("run case 1")case data := <- ch2:fmt.Printf("run case 2, data is: %d", data)}}
func walkSelectCases(cases []*ir.CommClause) []ir.Node {......// 从这里开始是多case的情况// ncas是select的全部分支的个数,如果有default分支,ncas个数减一if dflt != nil {ncas--}//定义casorder为ncas大小的case语句的数组casorder := make([]*ir.CommClause, ncas)// 分别定义nsends为发送channel的case个数,nrecvs为接收channel的case个数nsends, nrecvs := 0, 0// 定义init为多case编译后待执行的语句列表var init []ir.Nodebase.Pos = sellineno// 定义selv为长度为ncas的scase类型的数组,scasetype()函数返回的就是scase结构体,包含chan和elem两个字段selv := typecheck.Temp(types.NewArray(scasetype(), int64(ncas)))init = append(init, typecheck.Stmt(ir.NewAssignStmt(base.Pos, selv, nil)))// 定义order为2倍的ncas长度的TUINT16类型的数组// 注意:selv和order作为runtime.selectgo()函数的入参,前者存放scase列表内存地址,后者用来做scase排序使用,排序是为了便于挑选出待执行的caseorder := typecheck.Temp(types.NewArray(types.Types[types.TUINT16], 2*int64(ncas)))......// 第一个阶段:遍历case生成scase对象放到selv中for _, cas := range cases {ir.SetPos(cas)init = append(init, ir.TakeInit(cas)...)n := cas.Commif n == nil { // 如果是default分支,先跳过continue}var i intvar c, elem ir.Node// 根据case分别是发送或接收类型,获取chan, elem的值switch n.Op() {default:base.Fatalf("select %v", n.Op())case ir.OSEND:n := n.(*ir.SendStmt)i = nsends // 对发送channel类型的case,i从0开始递增nsends++c = n.Chanelem = n.Valuecase ir.OSELRECV2:n := n.(*ir.AssignListStmt)nrecvs++i = ncas - nrecvs // 对接收channel类型的case,i从ncas开始递减recv := n.Rhs[0].(*ir.UnaryExpr)c = recv.Xelem = n.Lhs[0]}// 编译器对多个case排列后,发送chan的case在左边,接收chan的case在右边,在selv中也是如此casorder[i] = cas// 定义一个函数,写入chan或elem到selv数组setField := func(f string, val ir.Node) {r := ir.NewAssignStmt(base.Pos, ir.NewSelectorExpr(base.Pos, ir.ODOT, ir.NewIndexExpr(base.Pos, selv, ir.NewInt(int64(i))), typecheck.Lookup(f)), val)init = append(init, typecheck.Stmt(r))}// 将c代表的chan写入selvc = typecheck.ConvNop(c, types.Types[types.TUNSAFEPTR])setField("c", c)// 将elem写入selvif !ir.IsBlank(elem) {elem = typecheck.ConvNop(elem, types.Types[types.TUNSAFEPTR])setField("elem", elem)}......}// 如果发送chan和接收chan的个数不等于ncas,说明代码有错误,直接报错if nsends+nrecvs != ncas {base.Fatalf("walkSelectCases: miscount: %v + %v != %v", nsends, nrecvs, ncas)}// 从这里开始执行select动作base.Pos = sellineno// 定义chosen, recvOK作为selectgo()函数的两个返回值,chosen 表示被选中的case的索引,recvOK表示对于接收操作,是否成功接收chosen := typecheck.Temp(types.Types[types.TINT])recvOK := typecheck.Temp(types.Types[types.TBOOL])r := ir.NewAssignListStmt(base.Pos, ir.OAS2, nil, nil)r.Lhs = []ir.Node{chosen, recvOK}// 调用runtime.selectgo()函数作为运行时实际执行多case的select动作的函数fn := typecheck.LookupRuntime("selectgo")var fnInit ir.Nodesr.Rhs = []ir.Node{mkcall1(fn, fn.Type().Results(), &fnInit, bytePtrToIndex(selv, 0), bytePtrToIndex(order, 0), pc0, ir.NewInt(int64(nsends)), ir.NewInt(int64(nrecvs)), ir.NewBool(dflt == nil))}init = append(init, fnInit...)init = append(init, typecheck.Stmt(r))// 执行完selectgo()函数后,销毁selv和order数组.init = append(init, ir.NewUnaryExpr(base.Pos, ir.OVARKILL, selv))init = append(init, ir.NewUnaryExpr(base.Pos, ir.OVARKILL, order))......// 定义一个函数,根据chosen确定的case分支生成if语句,执行该分支的语句dispatch := func(cond ir.Node, cas *ir.CommClause) {cond = typecheck.Expr(cond)cond = typecheck.DefaultLit(cond, nil)r := ir.NewIfStmt(base.Pos, cond, nil, nil)if n := cas.Comm; n != nil && n.Op() == ir.OSELRECV2 {n := n.(*ir.AssignListStmt)if !ir.IsBlank(n.Lhs[1]) {x := ir.NewAssignStmt(base.Pos, n.Lhs[1], recvOK)r.Body.Append(typecheck.Stmt(x))}}r.Body.Append(cas.Body.Take()...)r.Body.Append(ir.NewBranchStmt(base.Pos, ir.OBREAK, nil))init = append(init, r)}// 如果多case中有default分支,并且chosen小于0,执行该default分支if dflt != nil {ir.SetPos(dflt)dispatch(ir.NewBinaryExpr(base.Pos, ir.OLT, chosen, ir.NewInt(0)), dflt)}// 如果有chosen选中的case分支,即chosen等于i,则执行该分支for i, cas := range casorder {ir.SetPos(cas)dispatch(ir.NewBinaryExpr(base.Pos, ir.OEQ, chosen, ir.NewInt(int64(i))), cas)}return init}
// cas0 指向一个类型为 [ncases]scase 的数组// order0 是一个指向[2*ncases]uint16,数组中的值都是 0// 返回值有两个, chosen 和 recvOK,分别表示选中的case的序号,和对接收操作是否接收成功的布尔值func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {......// 为了将scase分配到栈上,这里直接给cas1分配了64KB大小的数组,同理, 给order1分配了128KB大小的数组cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))// ncases个数是发送chan个数nsends加上接收chan个数nrecvsncases := nsends + nrecvs// scases切片是上面分配cas1数组的前ncases个元素scases := cas1[:ncases:ncases]// 顺序列表pollorder是order1数组的前ncases个元素pollorder := order1[:ncases:ncases]// 加锁列表lockorder是order1数组的第二批ncase个元素lockorder := order1[ncases:][:ncases:ncases]......// 生成排列顺序norder := 0for i := range scases {cas := &scases[i]// 处理case中channel为空的情况if cas.c == nil {cas.elem = nil // 将elem置空,便于GCcontinue}// 通过fastrandn函数引入随机性,确定pollorder列表中case的随机顺序索引j := fastrandn(uint32(norder + 1))pollorder[norder] = pollorder[j]pollorder[j] = uint16(i)norder++}pollorder = pollorder[:norder]lockorder = lockorder[:norder]// 根据chan地址确定lockorder加锁排序列表的顺序// 通过简单的堆排序,以nlogn时间复杂度完成排序for i := range lockorder {j := i// Start with the pollorder to permute cases on the same channel.c := scases[pollorder[i]].cfor j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {k := (j - 1) / 2lockorder[j] = lockorder[k]j = k}lockorder[j] = pollorder[i]}for i := len(lockorder) - 1; i >= 0; i-- {o := lockorder[i]c := scases[o].clockorder[i] = lockorder[0]j := 0for {k := j*2 + 1if k >= i {break}if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {k++}if c.sortkey() < scases[lockorder[k]].c.sortkey() {lockorder[j] = lockorder[k]j = kcontinue}break}lockorder[j] = o}......}
func sellock(scases []scase, lockorder []uint16) {var c *hchanfor _, o := range lockorder {c0 := scases[o].cif c0 != c {c = c0lock(&c.lock)}}}
func selunlock(scases []scase, lockorder []uint16) {for i := len(lockorder) - 1; i >= 0; i-- {c := scases[lockorder[i]].cif i > 0 && c == scases[lockorder[i-1]].c {continue}unlock(&c.lock)}}
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {......sellock(scases, lockorder)......// 阶段一: 查找可以处理的channelvar casi intvar cas *scasevar caseSuccess boolvar caseReleaseTime int64 = -1var recvOK boolfor _, casei := range pollorder {casi = int(casei) // case的索引cas = &scases[casi] // 当前的casec = cas.cif casi >= nsends { // 处理接收channel的casesg = c.sendq.dequeue()if sg != nil { // 如果当前channel的sendq上有等待的goroutine,就会跳到 recv标签并从缓冲区读取数据后将等待goroutine中的数据放入到缓冲区中相同的位置;goto recv}if c.qcount > 0 { //如果当前channel的缓冲区不为空,就会跳到bufrecv标签处从缓冲区获取数据;goto bufrecv}if c.closed != 0 { //如果当前channel已经被关闭,就会跳到rclose做一些清除的收尾工作;goto rclose}} else { // 处理发送channel的case......if c.closed != 0 { // 如果当前channel已经被关闭就会直接跳到sclose标签,触发 panic 尝试中止程序;goto sclose}sg = c.recvq.dequeue()if sg != nil { // 如果当前channel的recvq上有等待的goroutine,就会跳到 send标签向channel发送数据;goto send}if c.qcount < c.dataqsiz { // 如果当前channel的缓冲区存在空闲位置,就会将待发送的数据存入缓冲区;goto bufsend}}}if !block { // 如果是非阻塞,即包含default分支,会解锁所有 Channel 并返回selunlock(scases, lockorder)casi = -1goto retc}......}
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {......// 阶段2: 将当前goroutine根据需要挂在chan的sendq和recvq上gp = getg()if gp.waiting != nil {throw("gp.waiting != nil")}nextp = &gp.waitingfor _, casei := range lockorder {casi = int(casei)cas = &scases[casi]c = cas.c// 获取sudog,将当前goroutine绑定到sudog上sg := acquireSudog()sg.g = gpsg.isSelect = truesg.elem = cas.elemsg.releasetime = 0if t0 != 0 {sg.releasetime = -1}sg.c = c*nextp = sgnextp = &sg.waitlink// 加入相应等待队列if casi < nsends {c.sendq.enqueue(sg)} else {c.recvq.enqueue(sg)}}......// 被唤醒后会根据 param 来判断是否是由 close 操作唤醒的,所以先置为 nilgp.param = nil......// 挂起当前goroutinegopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)......}
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {......// 加锁所有的channelsellock(scases, lockorder)gp.selectDone = 0// param 存放唤醒 goroutine 的 sudog,如果是关闭操作唤醒的,那么就为 nilsg = (*sudog)(gp.param)gp.param = nilcasi = -1cas = nilcaseSuccess = false// 当前goroutine 的 waiting 链表按照lockorder顺序存放着case的sudogsglist = gp.waiting// 在从 gp.waiting 取消case的sudog链接之前清除所有元素,便于GCfor sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {sg1.isSelect = falsesg1.elem = nilsg1.c = nil}// 清楚当前goroutine的waiting链表,因为被sg代表的协程唤醒了gp.waiting = nilfor _, casei := range lockorder {k = &scases[casei]// 如果相等说明,goroutine是被当前case的channel收发操作唤醒的if sg == sglist {// sg唤醒了当前goroutine, 则当前G已经从sg的队列中出队,这里不需要再次出队casi = int(casei)cas = kcaseSuccess = sglist.successif sglist.releasetime > 0 {caseReleaseTime = sglist.releasetime}} else {// 不是此case唤醒当前goroutine, 将goroutine从此case的发送队列或接收队列出队c = k.cif int(casei) < nsends {c.sendq.dequeueSudoG(sglist)} else {c.recvq.dequeueSudoG(sglist)}}// 释放当前case的sudog,然后处理下一个case的sudogsgnext = sglist.waitlinksglist.waitlink = nilreleaseSudog(sglist)sglist = sgnext}......}
bufrecv:......recvOK = trueqp = chanbuf(c, c.recvx)if cas.elem != nil {typedmemmove(c.elemtype, cas.elem, qp)}typedmemclr(c.elemtype, qp)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.qcount--selunlock(scases, lockorder)goto retcbufsend:......typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)c.sendx++if c.sendx == c.dataqsiz {c.sendx = 0}c.qcount++selunlock(scases, lockorder)goto retcrecv:// 可以直接从休眠的goroutine获取数据recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)......recvOK = truegoto retcrclose://从一个关闭 channel 中接收数据会直接清除 Channel 中的相关内容;selunlock(scases, lockorder)recvOK = falseif cas.elem != nil {typedmemclr(c.elemtype, cas.elem)}......goto retcsend:......// 可以直接从休眠的goroutine获取数据send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)if debugSelect {print("syncsend: cas0=", cas0, " c=", c, "\n")}goto retcretc:// 退出selectgo()函数if caseReleaseTime > 0 {blockevent(caseReleaseTime-t0, 1)}return casi, recvOKsclose:// 向一个关闭的 channel 发送数据就会直接 panic 造成程序崩溃;selunlock(scases, lockorder)panic(plainError("send on closed channel"))
总结
推荐阅读