注意
本文最后更新于 2024-03-14,文中内容可能已过时。
数据结构
channel 的底层是一个叫做 hchan 的结构体。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
type hchan struct {
//channel分为无缓冲和有缓冲两种。
//对于有缓冲的channel存储数据,借助的是如下循环数组的结构
qcount uint // 循环数组中的元素数量
dataqsiz uint // 循环数组的长度
buf unsafe.Pointer // 指向底层循环数组的指针
elemsize uint16 //能够收发元素的大小
closed uint32 //channel是否关闭的标志
elemtype *_type //channel中的元素类型
//有缓冲channel内的缓冲数组会被作为一个“环型”来使用。
//当下标超过数组容量后会回到第一个位置,所以需要有两个字段记录当前读和写的下标位置
sendx uint // 下一次发送数据的下标位置
recvx uint // 下一次读取数据的下标位置
//当循环数组中没有数据时,收到了接收请求,那么接收数据的变量地址将会写入读等待队列
//当循环数组中数据已满时,收到了发送请求,那么发送数据的变量地址将写入写等待队列
recvq waitq // 读等待队列
sendq waitq // 写等待队列
lock mutex //互斥锁,保证读写channel时不存在并发竞争问题
}
|
创建 channel
golang runtime 调用mallocgc()
来创建 channel,在堆上开辟内存空间,channel 本身会被 GC 自动回收。
channel 发送数据&接收数据
前置检查
步骤一:
先上锁,保证线程安全。
步骤二:
再次检查 channel 是否关闭,如果关闭则抛出 panic。如果加锁成功并且 channel 未关闭,则开始发送。
同步
g1 同步发送数据到 g2 时,检查recvq队列是否有 g2 在等待读取数据,如果有,直接 copy 数据到目标地址。
如果没有,阻塞自己(把自己打包为sudog,放在sendq队列,执行gopark()将g1阻塞,让出cpu使用权),然后激活g2(调用goready()
将等待接收的阻塞 goroutine 的状态从 Gwaiting 或者 Gscanwaiting 改变成 Grunnable),g2激活从sendq队列读取g1发送的数据。
需要强调的是,通道并不提供跨 goroutine 的数据访问保护机制。如果通过通道传输数据的一份副本,那么每个 goroutine 都持有一份副本,各自对自己的副本做修改是安全的。当传输的是指向数据的指针时,如果读和写是由不同的 goroutine 完成的,那么每个 goroutine 依旧需要额外的同步操作。
异步
异步发送时,会利用hchan的buf环形数组,无论是发送数据还是读取数据都不会等待对方。然而如果发送时buf满了,或者读取时buf空了,也会作出上文的阻塞操作。
当G1向ch里发送数据时,首先会对buf加锁,然后将数据copy到buf中,然后sendx++,然后释放对buf的锁。
当G2消费ch的时候,会首先对buf加锁,然后将buf中的数据copy到task变量对应的内存里,然后recvx++,并释放锁。
可以发现整个过程,G1和G2没有共享的内存,底层是通过hchan结构体的buf,并使用copy内存的方式进行通信,最后达到了共享内存的目的,这里也体现了Go中的CSP并发模型。
优雅关闭 channel
- 关闭已经关闭的 Channel 会导致 panic。(如果channel为nil也会panic)
- 写入已经关闭的 Channel 会导致 panic。(同上)
优雅关闭 Channel 的方式就 2 种:
- context
- done channel
本节聊聊 done channel 的做法:假设有多个生产者,有多个消费者。在生产者和消费者之间增加一个额外的辅助控制channel,用来传递关闭信号。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
type session struct {
done chan struct{}
doneOnce sync.Once
data chan int
}
func (sess *session) Serve() {
go sess.loopRead()
sess.loopWrite()
}
func (sess *session) loopRead() {
defer func() {
if err := recover(); err != nil {
sess.doneOnce.Do(func() { close(sess.done) })
}
}()
var err error
for {
select {
case <-sess.done:
return
default:
}
if err == io.ErrUnexpectedEOF || err == io.EOF {
goto failed
}
}
failed:
sess.doneOnce.Do(func() { close(sess.done) })
}
func (sess *session) loopWrite() {
defer func() {
if err := recover(); err != nil {
sess.doneOnce.Do(func() { close(sess.done) })
}
}()
var err error
for {
select {
case <-sess.done:
return
case sess.data <- rand.Intn(100):
}
if err != nil {
goto done
}
}
done:
if err != nil {
log("sess: loop write failed: %v, %s", err, sess)
}
}
func (sess *session) ForceClose() {
sess.doneOnce.Do(func() { close(sess.done) })
}
|
总结一下 done channel 的做法:消费者利用辅助的 done channel 发送信号,并先开始退出协程。生产者接收到 done channel 的信号,也开始退出协程。最终 data channel 无人持有,被 gc 回收关闭。
消费者侧发送关闭 done channel,由于消费者有多个,如果每一个都关闭 done channel,会导致 panic。所以这里用 doneOnce.Do() 保证只会关闭 done channel 一次。
原文地址:探究Golang channel的底层实现