首页>>后端>>Golang->源码分析go channel以及相关的操作

源码分析go channel以及相关的操作

时间:2023-11-30 本站 点击:0

go version: 1.17

本文从源码层面分析channel是如何创建、发送、接收、关闭的。

找到源码位置

packagemainfuncmain(){ch:=make(chanint)ch<-1<-chselect{casech<-1:default:}select{case<-ch:default:}close(ch)}

查看汇编代码: go tool compile -S -l -N main.go 输出为(省略了不必要的代码):

(...)0x002100033(main.go:5)CALLruntime.makechan(SB)0x002600038(main.go:5)MOVQAX,"".ch+32(SP)0x002b00043(main.go:6)LEAQ""..stmp_0(SB),BX0x003200050(main.go:6)PCDATA$1,$10x003200050(main.go:6)CALLruntime.chansend1(SB)0x003700055(main.go:7)MOVQ"".ch+32(SP),AX0x003c00060(main.go:7)XORLBX,BX0x003e00062(main.go:7)NOP0x004000064(main.go:7)CALLruntime.chanrecv1(SB)(...)0x006200098(main.go:10)CALLruntime.selectnbsend(SB)(...)0x008000128(main.go:15)CALLruntime.selectnbrecv(SB)(...)0x009a00154(main.go:18)CALLruntime.closechan(SB)(...)

可以得出:

makechan : channel的创建

chansend1: channel的阻塞发送

chanrecv1: channel的阻塞接收

selectnbsend: channel的无阻塞发送

selectnbrecv: channel的无阻塞接收

closechan: channel的关闭

channel结构

typehchanstruct{qcountuint//目前循环队列里数据个数dataqsizuint//这个循环队列的大小bufunsafe.Pointer//为指针,指向一个大小固定的数组,用来存放channel的数据elemsizeuint16//channel中元素的大小closeduint32//channel是否关闭,1为关闭elemtype*_type//channel中元素的类型sendxuint//发送数据的索引recvxuint//接收数据的索引recvqwaitq//等待接收的队列,里面放的是goroutnesendqwaitq//等待发送的队列,里面放的是goroutne//lockprotectsallfieldsinhchan,aswellasseveral//fieldsinsudogsblockedonthischannel.////DonotchangeanotherG'sstatuswhileholdingthislock//(inparticular,donotreadyaG),asthiscandeadlock//withstackshrinking.lockmutex//此channel的互斥锁}

sendqrecvq的数据结构如下所示:

typewaitqstruct{first*sudoglast*sudog}

创建channel

const(maxAlign=8hchanSize=unsafe.Sizeof(hchan{})+uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)))funcmakechan(t*chantype,sizeint)*hchan{elem:=t.elem(...)//这里判断elem.size*size是否越界。返回值:需要申请的空间大小以及是否越界mem,overflow:=math.MulUintptr(elem.size,uintptr(size))ifoverflow||mem>maxAlloc-hchanSize||size<0{panic(plainError("makechan:sizeoutofrange"))}//HchandoesnotcontainpointersinterestingforGCwhenelementsstoredinbufdonotcontainpointers.//bufpointsintothesameallocation,elemtypeispersistent.//SudoG'sarereferencedfromtheirowningthreadsotheycan'tbecollected.//TODO(dvyukov,rlh):Rethinkwhencollectorcanmoveallocatedobjects.varc*hchanswitch{//如果无缓存,则不需要创建bufcasemem==0://Queueorelementsizeiszero.c=(*hchan)(mallocgc(hchanSize,nil,true))//Racedetectorusesthislocationforsynchronization.c.buf=c.raceaddr()caseelem.ptrdata==0://元素不包含指针,则给hchan和buf分配一块公用的空间,并且buf紧挨着hchanc=(*hchan)(mallocgc(hchanSize+mem,nil,true))c.buf=add(unsafe.Pointer(c),hchanSize)default://元素包含指针,hchan和buf各自单独分配空间c=new(hchan)c.buf=mallocgc(mem,elem,true)}c.elemsize=uint16(elem.size)c.elemtype=elemc.dataqsiz=uint(size)lockInit(&c.lock,lockRankHchan)ifdebugChan{print("makechan:chan=",c,";elemsize=",elem.size,";dataqsiz=",size,"\n")}returnc}

hchanSize: 意思就是按照maxAlign倍数对齐,大于等于size(hchan),需要申请的最小空间 比如size(hchan)为5字节,maxAlign为8字节,转换成公式为(5 + uintptr( (-5) & (8 - 1)))=8,只需要申请8字节,用来存放hchan

总结流程

发送数据

阻塞发送

funcchansend1(c*hchan,elemunsafe.Pointer){chansend(c,elem,true,getcallerpc())}

无阻塞发送

funcselectnbsend(c*hchan,elemunsafe.Pointer)(selectedbool){returnchansend(c,elem,false,getcallerpc())}

可以看出来这两个函数都是调用chansend,只不过block参数一个为true,一个为false而已. 所以直接分析chansend即可:

funcchansend(c*hchan,epunsafe.Pointer,blockbool,callerpcuintptr)bool{ifc==nil{if!block{returnfalse}gopark(nil,nil,waitReasonChanSendNilChan,traceEvGoStop,2)throw("unreachable")}(...)}

逻辑如下 如果是channel空的

对于非阻塞,直接返回false

对于阻塞,会调用gopark挂起,不会返回

funcchansend(c*hchan,epunsafe.Pointer,blockbool,callerpcuintptr)bool{(...)if!block&&c.closed==0&&full(c){returnfalse}(...)}

funcfull(c*hchan)bool{//c.dataqsizisimmutable(neverwrittenafterthechanneliscreated)//soitissafetoreadatanytimeduringchanneloperation.ifc.dataqsiz==0{//Assumesthatapointerreadisrelaxed-atomic.returnc.recvq.first==nil}//Assumesthatauintreadisrelaxed-atomic.returnc.qcount==c.dataqsiz}

逻辑如下 对于非阻塞,并且channel没有关闭 满足以下两种情况会直接返回false

没有缓冲区,并且当前没有接受者

有缓冲区,并且满了

(...)0x002100033(main.go:5)CALLruntime.makechan(SB)0x002600038(main.go:5)MOVQAX,"".ch+32(SP)0x002b00043(main.go:6)LEAQ""..stmp_0(SB),BX0x003200050(main.go:6)PCDATA$1,$10x003200050(main.go:6)CALLruntime.chansend1(SB)0x003700055(main.go:7)MOVQ"".ch+32(SP),AX0x003c00060(main.go:7)XORLBX,BX0x003e00062(main.go:7)NOP0x004000064(main.go:7)CALLruntime.chanrecv1(SB)(...)0x006200098(main.go:10)CALLruntime.selectnbsend(SB)(...)0x008000128(main.go:15)CALLruntime.selectnbrecv(SB)(...)0x009a00154(main.go:18)CALLruntime.closechan(SB)(...)0

看下send函数

(...)0x002100033(main.go:5)CALLruntime.makechan(SB)0x002600038(main.go:5)MOVQAX,"".ch+32(SP)0x002b00043(main.go:6)LEAQ""..stmp_0(SB),BX0x003200050(main.go:6)PCDATA$1,$10x003200050(main.go:6)CALLruntime.chansend1(SB)0x003700055(main.go:7)MOVQ"".ch+32(SP),AX0x003c00060(main.go:7)XORLBX,BX0x003e00062(main.go:7)NOP0x004000064(main.go:7)CALLruntime.chanrecv1(SB)(...)0x006200098(main.go:10)CALLruntime.selectnbsend(SB)(...)0x008000128(main.go:15)CALLruntime.selectnbrecv(SB)(...)0x009a00154(main.go:18)CALLruntime.closechan(SB)(...)1

逻辑如下 recvq中有接收者

则直接把数据拷贝到接收者存数据的地方

唤醒接收者的goroutine. 继续往下看:

(...)0x002100033(main.go:5)CALLruntime.makechan(SB)0x002600038(main.go:5)MOVQAX,"".ch+32(SP)0x002b00043(main.go:6)LEAQ""..stmp_0(SB),BX0x003200050(main.go:6)PCDATA$1,$10x003200050(main.go:6)CALLruntime.chansend1(SB)0x003700055(main.go:7)MOVQ"".ch+32(SP),AX0x003c00060(main.go:7)XORLBX,BX0x003e00062(main.go:7)NOP0x004000064(main.go:7)CALLruntime.chanrecv1(SB)(...)0x006200098(main.go:10)CALLruntime.selectnbsend(SB)(...)0x008000128(main.go:15)CALLruntime.selectnbrecv(SB)(...)0x009a00154(main.go:18)CALLruntime.closechan(SB)(...)2

逻辑如下 如果缓冲区没满

则先获得缓冲区中要放数据的位置

将要发送的数据拷贝到缓冲区

更新sendx

channel中元素数量+1

(...)0x002100033(main.go:5)CALLruntime.makechan(SB)0x002600038(main.go:5)MOVQAX,"".ch+32(SP)0x002b00043(main.go:6)LEAQ""..stmp_0(SB),BX0x003200050(main.go:6)PCDATA$1,$10x003200050(main.go:6)CALLruntime.chansend1(SB)0x003700055(main.go:7)MOVQ"".ch+32(SP),AX0x003c00060(main.go:7)XORLBX,BX0x003e00062(main.go:7)NOP0x004000064(main.go:7)CALLruntime.chanrecv1(SB)(...)0x006200098(main.go:10)CALLruntime.selectnbsend(SB)(...)0x008000128(main.go:15)CALLruntime.selectnbrecv(SB)(...)0x009a00154(main.go:18)CALLruntime.closechan(SB)(...)3

逻辑如下 缓冲区满了

非阻塞直接返回false

阻塞的新建sudog,并放入sendq

总结流程

接收数据

阻塞接收

(...)0x002100033(main.go:5)CALLruntime.makechan(SB)0x002600038(main.go:5)MOVQAX,"".ch+32(SP)0x002b00043(main.go:6)LEAQ""..stmp_0(SB),BX0x003200050(main.go:6)PCDATA$1,$10x003200050(main.go:6)CALLruntime.chansend1(SB)0x003700055(main.go:7)MOVQ"".ch+32(SP),AX0x003c00060(main.go:7)XORLBX,BX0x003e00062(main.go:7)NOP0x004000064(main.go:7)CALLruntime.chanrecv1(SB)(...)0x006200098(main.go:10)CALLruntime.selectnbsend(SB)(...)0x008000128(main.go:15)CALLruntime.selectnbrecv(SB)(...)0x009a00154(main.go:18)CALLruntime.closechan(SB)(...)4

无阻塞接收

(...)0x002100033(main.go:5)CALLruntime.makechan(SB)0x002600038(main.go:5)MOVQAX,"".ch+32(SP)0x002b00043(main.go:6)LEAQ""..stmp_0(SB),BX0x003200050(main.go:6)PCDATA$1,$10x003200050(main.go:6)CALLruntime.chansend1(SB)0x003700055(main.go:7)MOVQ"".ch+32(SP),AX0x003c00060(main.go:7)XORLBX,BX0x003e00062(main.go:7)NOP0x004000064(main.go:7)CALLruntime.chanrecv1(SB)(...)0x006200098(main.go:10)CALLruntime.selectnbsend(SB)(...)0x008000128(main.go:15)CALLruntime.selectnbrecv(SB)(...)0x009a00154(main.go:18)CALLruntime.closechan(SB)(...)5

可以看到,最终都是调用chanrecv,所以直接分析chanrecv

(...)0x002100033(main.go:5)CALLruntime.makechan(SB)0x002600038(main.go:5)MOVQAX,"".ch+32(SP)0x002b00043(main.go:6)LEAQ""..stmp_0(SB),BX0x003200050(main.go:6)PCDATA$1,$10x003200050(main.go:6)CALLruntime.chansend1(SB)0x003700055(main.go:7)MOVQ"".ch+32(SP),AX0x003c00060(main.go:7)XORLBX,BX0x003e00062(main.go:7)NOP0x004000064(main.go:7)CALLruntime.chanrecv1(SB)(...)0x006200098(main.go:10)CALLruntime.selectnbsend(SB)(...)0x008000128(main.go:15)CALLruntime.selectnbrecv(SB)(...)0x009a00154(main.go:18)CALLruntime.closechan(SB)(...)6

逻辑如下

channel为nil a. 非阻塞直接返回 b. 阻塞则等待

非阻塞并且 && (channel无缓冲|| channel无数据) a. channel已关闭,返回false,false b. channel未关闭,返回true,false

(...)0x002100033(main.go:5)CALLruntime.makechan(SB)0x002600038(main.go:5)MOVQAX,"".ch+32(SP)0x002b00043(main.go:6)LEAQ""..stmp_0(SB),BX0x003200050(main.go:6)PCDATA$1,$10x003200050(main.go:6)CALLruntime.chansend1(SB)0x003700055(main.go:7)MOVQ"".ch+32(SP),AX0x003c00060(main.go:7)XORLBX,BX0x003e00062(main.go:7)NOP0x004000064(main.go:7)CALLruntime.chanrecv1(SB)(...)0x006200098(main.go:10)CALLruntime.selectnbsend(SB)(...)0x008000128(main.go:15)CALLruntime.selectnbrecv(SB)(...)0x009a00154(main.go:18)CALLruntime.closechan(SB)(...)7

(...)0x002100033(main.go:5)CALLruntime.makechan(SB)0x002600038(main.go:5)MOVQAX,"".ch+32(SP)0x002b00043(main.go:6)LEAQ""..stmp_0(SB),BX0x003200050(main.go:6)PCDATA$1,$10x003200050(main.go:6)CALLruntime.chansend1(SB)0x003700055(main.go:7)MOVQ"".ch+32(SP),AX0x003c00060(main.go:7)XORLBX,BX0x003e00062(main.go:7)NOP0x004000064(main.go:7)CALLruntime.chanrecv1(SB)(...)0x006200098(main.go:10)CALLruntime.selectnbsend(SB)(...)0x008000128(main.go:15)CALLruntime.selectnbrecv(SB)(...)0x009a00154(main.go:18)CALLruntime.closechan(SB)(...)8

逻辑如下

再次判断channel已关闭,并且channel中无数据,则直接返回false,false

sendq里有等待的sender a. channel无缓冲区,直接将sender的数据copy到ep b. channel有缓冲区,将缓冲区到数据copy到ep,再将sender的数据copy到缓冲区

唤起sender到goroutine

(...)0x002100033(main.go:5)CALLruntime.makechan(SB)0x002600038(main.go:5)MOVQAX,"".ch+32(SP)0x002b00043(main.go:6)LEAQ""..stmp_0(SB),BX0x003200050(main.go:6)PCDATA$1,$10x003200050(main.go:6)CALLruntime.chansend1(SB)0x003700055(main.go:7)MOVQ"".ch+32(SP),AX0x003c00060(main.go:7)XORLBX,BX0x003e00062(main.go:7)NOP0x004000064(main.go:7)CALLruntime.chanrecv1(SB)(...)0x006200098(main.go:10)CALLruntime.selectnbsend(SB)(...)0x008000128(main.go:15)CALLruntime.selectnbrecv(SB)(...)0x009a00154(main.go:18)CALLruntime.closechan(SB)(...)9

逻辑如下 缓冲区有数据

直接将缓冲区数据copy到ep

清空当前缓冲区刚刚读数据的地方

更新c.recvx

c.qcount--

typehchanstruct{qcountuint//目前循环队列里数据个数dataqsizuint//这个循环队列的大小bufunsafe.Pointer//为指针,指向一个大小固定的数组,用来存放channel的数据elemsizeuint16//channel中元素的大小closeduint32//channel是否关闭,1为关闭elemtype*_type//channel中元素的类型sendxuint//发送数据的索引recvxuint//接收数据的索引recvqwaitq//等待接收的队列,里面放的是goroutnesendqwaitq//等待发送的队列,里面放的是goroutne//lockprotectsallfieldsinhchan,aswellasseveral//fieldsinsudogsblockedonthischannel.////DonotchangeanotherG'sstatuswhileholdingthislock//(inparticular,donotreadyaG),asthiscandeadlock//withstackshrinking.lockmutex//此channel的互斥锁}0

逻辑如下

缓冲区满了,无阻塞channel直接返回

阻塞channel的话 a. 新建sudog b. 将sudog放入recvq c. 调用gopark,等待

总结流程

关闭channel

typehchanstruct{qcountuint//目前循环队列里数据个数dataqsizuint//这个循环队列的大小bufunsafe.Pointer//为指针,指向一个大小固定的数组,用来存放channel的数据elemsizeuint16//channel中元素的大小closeduint32//channel是否关闭,1为关闭elemtype*_type//channel中元素的类型sendxuint//发送数据的索引recvxuint//接收数据的索引recvqwaitq//等待接收的队列,里面放的是goroutnesendqwaitq//等待发送的队列,里面放的是goroutne//lockprotectsallfieldsinhchan,aswellasseveral//fieldsinsudogsblockedonthischannel.////DonotchangeanotherG'sstatuswhileholdingthislock//(inparticular,donotreadyaG),asthiscandeadlock//withstackshrinking.lockmutex//此channel的互斥锁}1


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/Golang/4567.html