Go Channel
以下代码为 golang 1.24.5
chan
chan 底层 结构
const (
maxAlign = 8 // 最大对齐字节数(通常与架构相关)
hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)) // hchan 结构体的大小(含对齐填充)
debugChan = false // 调试标志,是否开启 channel 调试
)
// Go 语言运行时中 channel 的底层结构
type hchan struct {
qcount uint // 队列中当前的元素数量
dataqsiz uint // 环形缓冲区的大小(也就是 buf 中元素的总数)
buf unsafe.Pointer // 指向实际数据缓冲区(数组,元素个数为 dataqsiz)
elemsize uint16 // 每个元素的大小(字节)
synctest bool // 是否用于 sync 包的测试中(true 表示在 synctest 模式中创建)
closed uint32 // 标志位,表示 channel 是否已关闭
timer *timer // 用于超时操作的定时器指针(如 select 的超时)
elemtype *_type // 指向元素类型的类型描述符
sendx uint // 当前发送的索引(用于环形缓冲区)
recvx uint // 当前接收的索引(用于环形缓冲区)
recvq waitq // 等待接收的 goroutine 队列(recv 阻塞队列)
sendq waitq // 等待发送的 goroutine 队列(send 阻塞队列)
// lock 保护 hchan 中的所有字段,
// 以及阻塞在该 channel 上的 sudog 的多个字段。
// ⚠️ 在持有该锁时不要修改其他 G 的状态(尤其是不要唤醒 G),
// 否则可能与栈收缩操作发生死锁。
lock mutex
}
// 等待队列结构,用于接收和发送阻塞的 goroutine 链表
type waitq struct {
first *sudog // 队首 goroutine
last *sudog // 队尾 goroutine
}
先来解释 maxAlign = 8 和 hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
这两句的意思是 将 hchanSize 向上对齐为 8 的整数倍
hchanSize = size + (-size & (align - 1))
size = 3
align = 8
8 - 1 = 7
=> 二进制: 0000 0111
-3 的二进制表示(假设是 int32)是:
原码:0000 0011 (3)
反码:1111 1100
补码:1111 1101 (即 -3)
所以 -3 的补码是:1111 1101
-3: 1111 1101
7: 0000 0111
--------------
0000 0101 => 5
3+5=8
初始化 chan
func makechan(t *chantype, size int) *hchan {
elem := t.Elem
// compiler checks this but be safe.
if elem.Size_ >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0: // 无缓存的情况
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case !elem.Pointers(): // chan 类型是指针
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) // 申请 连续内存
c.buf = add(unsafe.Pointer(c), hchanSize) // 将缓存区指针指向缓存区
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true) // 单独分配缓存区内存
}
c.elemsize = uint16(elem.Size_)
c.elemtype = elem
c.dataqsiz = uint(size)
if getg().syncGroup != nil {
c.synctest = true
}
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n")
}
return c
}
写 chan
// entry point for c <- x from compiled code.
//
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, sys.GetCallerPC())
}
/*
* generic single channel send/recv
* If block is not nil,
* then the protocol will not
* sleep but return if it could
* not complete.
*
* sleep can wake up with g.param == nil
* when a channel involved in the sleep has
* been closed. it is easiest to loop and re-run
* the operation; we'll see that it's now closed.
*/
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2) // 向 nil chan 写数据会阻塞当前 goroutine
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
}
if c.synctest && getg().syncGroup == nil {
panic(plainError("send on synctest channel from outside bubble"))
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second full()).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation. However, nothing here
// guarantees forward progress. We rely on the side effects of lock release in
// chanrecv() and closechan() to update this thread's view of c.closed and full().
if !block && c.closed == 0 && full(c) { // golang 对单字(word-sized)的 observation 是原子操作
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock) // 加锁
if c.closed != 0 { // 向已经关闭的 chan 写会 panic
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
if sg := c.recvq.dequeue(); sg != nil { // 如果找到一个只在等待的 g,直接将数据给 g 而不是将数据存到buf
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3) // send 函数除开会复制数据还会调用 goready
return true
}
if c.qcount < c.dataqsiz { // buf 未满
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx) // 获取缓存区中待写入的指针
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep) // 写入数据
c.sendx++ // 带写入指针++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
if !block { // 非阻塞直接返回失败
unlock(&c.lock)
return false
}
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg) // 进发送队列等待
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
gp.parkingOnChan.Store(true)
reason := waitReasonChanSend
if c.synctest {
reason = waitReasonSynctestChanSend
}
gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanSend, 2) // 阻塞当前 goroutine,由于 chan 的锁没有释放,下一个 goroutine 写数据也会阻塞到获取锁的位置
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
总结一下写 chan
- 阻塞写 nil chan 会阻塞
package main
func main() {
var ch chan int
ch <- 1
}
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send (nil chan)]:
main.main()
/tmp/sandbox1677365408/prog.go:5 +0x1c
- 向已经关闭的 chan 写会 panic
package main
func main() {
ch := make(chan int)
close(ch)
ch <- 1
}
panic: send on closed channel
goroutine 1 [running]:
main.main()
/tmp/sandbox2376909824/prog.go:6 +0x37
有等待读者,写 chan 会直接写入等待读者,否则写入 buf
阻塞写 满 chan 会阻塞,并且后续 goroutine 也会阻塞(如果其中未有读者)
读 chan
// entry points for <- c from compiled code.
//
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.
if debugChan {
print("chanrecv: chan=", c, "\n")
}
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2) // 阻塞读 nil chan 会阻塞
throw("unreachable")
}
if c.synctest && getg().syncGroup == nil {
panic(plainError("receive on synctest channel from outside bubble"))
}
if c.timer != nil {
c.timer.maybeRunChan()
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
if !block && empty(c) {
// After observing that the channel is not ready for receiving, we observe whether the
// channel is closed.
//
// Reordering of these checks could lead to incorrect behavior when racing with a close.
// For example, if the channel was open and not empty, was closed, and then drained,
// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
// we use atomic loads for both checks, and rely on emptying and closing to happen in
// separate critical sections under the same lock. This assumption fails when closing
// an unbuffered channel with a blocked send, but that is an error condition anyway.
if atomic.Load(&c.closed) == 0 {
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
return
}
// The channel is irreversibly closed. Re-check whether the channel has any pending data
// to receive, which could have arrived between the empty and closed checks above.
// Sequential consistency is also required here, when racing with such a send.
if empty(c) {
// The channel is irreversibly closed and empty.
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock) // 加锁
if c.closed != 0 {
if c.qcount == 0 { // chan 已经关闭,而且缓存区没有数据
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// The channel has been closed, but the channel's buffer have data.
} else {
// Just found waiting sender with not closed.
if sg := c.sendq.dequeue(); sg != nil { // 如果发送队列有 goroutine,直接接收数据
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
if c.qcount > 0 { // 缓存区有数据
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
if !block { // 非阻塞直接返回
unlock(&c.lock)
return false, false
}
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg) // 当前 goroutine 进接收等待队列
if c.timer != nil {
blockTimerChan(c)
}
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
gp.parkingOnChan.Store(true)
reason := waitReasonChanReceive
if c.synctest {
reason = waitReasonSynctestChanReceive
}
gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanRecv, 2) // 阻塞当前 goroutine
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
if c.timer != nil {
unblockTimerChan(c)
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg) // 释放 mysg 到 pool
return true, success
}
总结一下读 chan
- 阻塞读 nil chan 会阻塞
package main
func main() {
var ch chan int
<-ch
}
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive (nil chan)]:
main.main()
/tmp/sandbox565923349/prog.go:5 +0x17
- 向已经关闭的 chan 读不会 panic,并且能正常读
package main
func main() {
ch := make(chan int)
go func() { ch <- 1; close(ch) }()
print(<-ch)
}
1
有等待写者,读 chan 会直接读取等待写者,否则读 buf
阻塞读 空 chan 会阻塞,并且后续 goroutine 也会阻塞(如果其中未有写者)