# 3.6 通信原語
> 本節內容提供一個線上演講:[YouTube 在線](https://www.youtube.com/watch?v=d7fFCGGn0Wc),[Google Slides 講稿](https://changkun.de/s/chansrc/)。
Go 語言中 Channel 與 Select 語句受到 1978 年 CSP 原始理論的啟發。 在語言設計中,Goroutine 就是 CSP 理論中的并發實體, 而 Channel 則對應 CSP 中輸入輸出指令的消息信道,Select 語句則是 CSP 中守衛和選擇指令的組合。 他們的區別在于 CSP 理論中通信是隱式的,而 Go 的通信則是顯式的由程序員進行控制; CSP 理論中守衛指令只充當 Select 語句的一個分支,多個分支的 Select 語句由選擇指令進行實現。
Channel 與 Select 是 Go 語言中提供的語言級的、基于消息傳遞的同步原語。
## Channel 的本質
### Channel 底層結構
實現 Channel 的結構并不神秘,本質上就是一個`mutex`鎖加上一個環狀緩存、 一個發送方隊列和一個接收方隊列:
```
// src/runtime/chan.go
type hchan struct {
qcount uint // 隊列中的所有數據數
dataqsiz uint // 環形隊列的大小
buf unsafe.Pointer // 指向大小為 dataqsiz 的數組
elemsize uint16 // 元素大小
closed uint32 // 是否關閉
elemtype *_type // 元素類型
sendx uint // 發送索引
recvx uint // 接收索引
recvq waitq // recv 等待列表,即( <-ch )
sendq waitq // send 等待列表,即( ch<- )
lock mutex
}
type waitq struct { // 等待隊列 sudog 雙向隊列
first *sudog
last *sudog
}
```
**圖1:Channel 的結構**
其中`recvq`和`sendq`分別是`sudog`的一個鏈式隊列, 其元素是一個包含當前包含隊 Goroutine 及其要在 Channel 中發送的數據的一個封裝, 如圖 1 所示。
> 更多關于 sudog 的細節,請參考[6.8 同步原語](https://golang.design/under-the-hood/zh-cn/part2runtime/ch06sched/sync)。
### Channel 的創建
Channel 的創建語句由編譯器完成如下翻譯工作:
1
make(chan type, n) => makechan(type, n)
將一個`make`語句轉換為`makechan`調用。 而具體的`makechan`實現的本質是根據需要創建的元素大小, 對`mallocgc`進行封裝, 因此,Channel 總是在堆上進行分配,它們會被垃圾回收器進行回收, 這也是為什么 Channel 不一定總是需要調用`close(ch)`進行顯式地關閉。
```
// src/runtime/chan.go
// 將 hchan 的大小對齊
const hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&7)
func makechan(t *chantype, size int) *hchan {
elem := t.elem
...
// 檢查確認 channel 的容量不會溢出
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic("makechan: size out of range")
}
var c *hchan
switch {
case mem == 0:
// 隊列或元素大小為零
c = (*hchan)(mallocgc(hchanSize, nil, true))
...
case elem.ptrdata == 0:
// 元素不包含指針
// 在一個調用中分配 hchan 和 buf
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 元素包含指針
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
...
return c
}
```
Channel 并不嚴格支持`int64`大小的緩沖,當`make(chan type, n)`中 n 為`int64`類型時, 運行時的實現僅僅只是將其強轉為`int`,提供了對`int`轉型是否成功的檢查:
```
// src/runtime/chan.go
func makechan64(t *chantype, size int64) *hchan {
if int64(int(size)) != size {
panic("makechan: size out of range")
}
return makechan(t, int(size))
}
```
所以創建一個 Channel 最重要的操作就是創建`hchan`以及分配所需的`buf`大小的內存空間。
### 向 Channel 發送數據
發送數據完成的是如下的翻譯過程:
```
ch <- v => chansend1(ch, v)
```
而本質上它會去調用更為通用的`chansend`:
```
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true)
}
```
```
下面我們來關注`chansend`的具體實現的第一個部分:
func chansend(c *hchan, ep unsafe.Pointer, block bool) bool {
// 當向 nil channel 發送數據時,會調用 gopark
// 而 gopark 會將當前的 Goroutine 休眠,從而發生死鎖崩潰
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan)
throw("unreachable")
}
...
}
```
在這個部分中,我們可以看到,如果一個 Channel 為零值(比如沒有初始化),這時候的發送操作會暫止當前的 Goroutine(`gopark`)。 而 gopark 會將當前的 Goroutine 休眠,從而發生死鎖崩潰。
現在我們來看一切已經準備就緒,開始對 Channel 加鎖:
```
func chansend(c *hchan, ep unsafe.Pointer, block bool) bool {
...
lock(&c.lock)
// 持有鎖之前我們已經檢查了鎖的狀態,
// 但這個狀態可能在持有鎖之前、該檢查之后發生變化,
// 因此還需要再檢查一次 channel 的狀態
if c.closed != 0 { // 不允許向已經 close 的 channel 發送數據
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 1. channel 上有阻塞的接收方,直接發送
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) })
return true
}
// 2. 判斷 channel 中緩存是否有剩余空間
if c.qcount < c.dataqsiz {
// 有剩余空間,存入 c.buf
qp := chanbuf(c, c.sendx)
...
typedmemmove(c.elemtype, qp, ep) // 將要發送的數據拷貝到 buf 中
c.sendx++
if c.sendx == c.dataqsiz { // 如果 sendx 索引越界則設為 0
c.sendx = 0
}
c.qcount++ // 完成存入,記錄增加的數據,解鎖
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
...
}
```
到目前位置,代碼中考慮了當 Channel 上有接收方等待,可以直接將數據發送走,并返回(情況 1);或沒有接收方 但緩存中還有剩余空間來存放沒有讀取的數據(情況 2)。對于直接發送數據的情況,由`send`調用完成:
```
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func()) {
...
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf() // unlock(&c.lock)
gp.param = unsafe.Pointer(sg)
...
// 復始一個 Goroutine,放入調度隊列等待被后續調度
goready(gp) // 將 gp 作為下一個立即被執行的 Goroutine
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
... // 為了確保發送的數據能夠被立刻觀察到,需要寫屏障支持,執行寫屏障,保證代碼正確性
memmove(dst, src, t.size) // 直接寫入接收方的執行棧!
}
```
`send`操作其實是隱含了有接收方阻塞在 Channel 上,換句話說有接收方已經被暫止, 當我們發送完數據后,應該讓該接收方就緒(讓調度器繼續開始調度接收方)。
這個`send`操作其實是一種優化。原因在于,已經處于等待狀態的 Goroutine 是沒有被執行的, 因此用戶態代碼不會與當前所發生數據發生任何競爭。我們也更沒有必要冗余的將數據寫入到緩存, 再讓接收方從緩存中進行讀取。因此我們可以看到,`sendDirect`的調用, 本質上是將數據直接寫入接收方的執行棧。
最后我們來看第三種情況,如果既找不到接收方,`buf`也已經存滿, 這時我們就應該阻塞當前的 Goroutine 了:
```
func chansend(c *hchan, ep unsafe.Pointer, block bool) bool {
...
// 3. 阻塞在 channel 上,等待接收方接收數據
gp := getg()
mysg := acquireSudog()
...
c.sendq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock)) // 將當前的 g 從調度隊列移出
// 因為調度器在停止當前 g 的時候會記錄運行現場,當恢復阻塞的發送操作時候,會從此處繼續開始執行
...
gp.waiting = nil
gp.activeStackChans = false
if gp.param == nil {
if c.closed == 0 { // 正常喚醒狀態,Goroutine 應該包含需要傳遞的參數,但如果沒有喚醒時的參數,且 channel 沒有被關閉,則為虛假喚醒
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
...
mysg.c = nil // 取消與之前阻塞的 channel 的關聯
releaseSudog(mysg) // 從 sudog 中移除
return true
}
func chanparkcommit(gp *g, chanLock unsafe.Pointer) bool {
// 具有未解鎖的指向 gp 棧的 sudog。棧的復制必須鎖住那些 sudog 的 channel
gp.activeStackChans = true
unlock((*mutex)(chanLock))
return true
}
```
簡單總結一下,發送過程包含三個步驟:
1. 持有鎖
2. 入隊,拷貝要發送的數據
3. 釋放鎖
其中第二個步驟包含三個子步驟:
1. 找到是否有正在阻塞的接收方,是則直接發送
2. 找到是否有空余的緩存,是則存入
3. 阻塞直到被喚醒
### 從 Channel 接收數據
接收數據主要是完成以下翻譯工作:
```
v <- ch => chanrecv1(ch, v)
v, ok <- ch => ok := chanrecv2(ch, v)
```
他們的本質都是調用`chanrecv`:
```
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
```
chanrecv 的具體實現如下,由于我們已經仔細分析過發送過程了, 我們不再詳細分拆下面代碼的步驟,其處理方式基本一致:
1. 上鎖
2. 從緩存中出隊,拷貝要接收的數據
3. 解鎖
其中第二個步驟包含三個子步驟:
1. 如果 Channel 已被關閉,且 Channel 沒有數據,立刻返回
2. 如果存在正在阻塞的發送方,說明緩存已滿,從緩存隊頭取一個數據,再復始一個阻塞的發送方
3. 否則,檢查緩存,如果緩存中仍有數據,則從緩存中讀取,讀取過程會將隊列中的數據拷貝一份到接收方的執行棧中
4. 沒有能接受的數據,阻塞當前的接收方 Goroutine
```
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
// nil channel,同 send,會導致兩個 Goroutine 的死鎖
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan)
throw("unreachable")
}
// 快速路徑: 在不需要鎖的情況下檢查失敗的非阻塞操作
//
// 注意到 channel 不能由已關閉轉換為未關閉,則
// 失敗的條件是:1. 無 buf 時發送隊列為空 2. 有 buf 時,buf 為空
// 此處的 c.closed 必須在條件判斷之后進行驗證,
// 因為指令重排后,如果先判斷 c.closed,得出 channel 未關閉,無法判斷失敗條件中
// channel 是已關閉還是未關閉(從而需要 atomic 操作)
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
...
lock(&c.lock)
// 1. channel 已經 close,且 channel 中沒有數據,則直接返回
if c.closed != 0 && c.qcount == 0 {
...
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// 2. channel 上有阻塞的發送方,直接接收
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) })
return true, true
}
// 3. channel 的 buf 不空
if c.qcount > 0 {
// 直接從隊列中接收
qp := chanbuf(c, c.recvx)
...
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
// 4. 沒有數據可以接收,阻塞當前 Goroutine
gp := getg()
mysg := acquireSudog()
...
c.recvq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive)
...
// 被喚醒
gp.waiting = nil
...
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
```
接收數據同樣包含直接往接收方的執行棧中拷貝要發送的數據,但這種情況當且僅當緩存大小為0時(即無緩沖 Channel)。
```
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
...
if ep != nil {
// 直接從對方的棧進行拷貝
recvDirect(c.elemtype, sg, ep)
}
} else {
// 從緩存隊列拷貝
qp := chanbuf(c, c.recvx)
...
// 從隊列拷貝數據到接收方
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 從發送方拷貝數據到隊列
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
...
goready(gp, skip+1)
}
```
到目前為止我們終于明白了為什么無緩沖 Channel 而言`v <- ch`happens before`ch <- v`了, 因為**無緩沖 Channel 的接收方會先從發送方棧拷貝數據后,發送方才會被放回調度隊列中,等待重新調度**。
### Channel 的關閉
關閉 Channel 主要是完成以下翻譯工作:
1
close(ch) => closechan(ch)
具體的實現中,首先對 Channel 上鎖,而后依次將阻塞在 Channel 的 g 添加到一個 gList 中,當所有的 g 均從 Channel 上移除時,可釋放鎖,并喚醒 gList 中的所有接收方和發送方:
```
func closechan(c *hchan) {
if c == nil { // close 一個空的 channel 會 panic
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 { // close 一個已經關閉的的 channel 會 panic
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
...
c.closed = 1
var glist gList
// 釋放所有的接收方
for {
sg := c.recvq.dequeue()
if sg == nil { // 隊列已空
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem) // 清零
sg.elem = nil
}
...
gp := sg.g
gp.param = nil
...
glist.push(gp)
}
// 釋放所有的發送方
for {
sg := c.sendq.dequeue()
if sg == nil { // 隊列已空
break
}
sg.elem = nil
...
gp := sg.g
gp.param = nil
...
glist.push(gp)
}
// 釋放 channel 的鎖
unlock(&c.lock)
// 就緒所有的 G
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
```
當 Channel 關閉時,我們必須讓所有阻塞的接收方重新被調度,讓所有的發送方也重新被調度,這時候 的實現先將 Goroutine 統一添加到一個列表中(需要鎖),然后逐個地進行復始(不需要鎖)。
## Select 語句的本質
### 分支的隨機化
Select 本身會被編譯為`selectgo`調用。這與普通的多個 if 分支不同。`selectgo`則用于隨機化每條分支的執行順序,普通多個 if 分支的執行順序始終是一致的。
```
type scase struct {
c *hchan // chan
elem unsafe.Pointer // 數據元素
kind uint16
...
}
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
...
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
scases := cas1[:ncases:ncases]
pollorder := order1[:ncases:ncases]
lockorder := order1[ncases:][:ncases:ncases]
// 替換零值的 channel
for i := range scases {
cas := &scases[i]
if cas.c == nil && cas.kind != caseDefault {
*cas = scase{}
}
}
...
// 生成隨機順序
for i := 1; i < ncases; i++ {
j := fastrandn(uint32(i + 1))
pollorder[i] = pollorder[j]
pollorder[j] = uint16(i)
}
// 根據 channel 的地址進行堆排序,決定加鎖的順序,避免死鎖
for i := 0; i < ncases; i++ {
...
}
...
// 依次加鎖
sellock(scases, lockorder)
var (
gp *g
sg *sudog
c *hchan
k *scase
sglist *sudog
sgnext *sudog
qp unsafe.Pointer
nextp **sudog
)
loop:
// 1 遍歷 channel,檢查是否就緒(可發送/可接收)
var dfli int
var dfl *scase
var casi int
var cas *scase
var recvOK bool
for i := 0; i < ncases; i++ {
casi = int(pollorder[i])
cas = &scases[casi]
c = cas.c
switch cas.kind {
case caseNil:
continue
case caseRecv:
sg = c.sendq.dequeue()
if sg != nil {
goto recv
}
if c.qcount > 0 {
goto bufrecv
}
if c.closed != 0 {
goto rclose
}
case caseSend:
...
if c.closed != 0 {
goto sclose
}
sg = c.recvq.dequeue()
if sg != nil {
goto send
}
if c.qcount < c.dataqsiz {
goto bufsend
}
case caseDefault:
dfli = casi
dfl = cas
}
}
// 存在 default 分支,直接去 retc 執行
if dfl != nil {
selunlock(scases, lockorder)
casi = dfli
cas = dfl
goto retc
}
// 2 入隊所有的 channel
gp = getg()
...
nextp = &gp.waiting
for _, casei := range lockorder {
casi = int(casei)
cas = &scases[casi]
if cas.kind == caseNil {
continue
}
c = cas.c
sg := acquireSudog()
sg.g = gp
sg.isSelect = true
// 在 gp.waiting 上分配 elem 和入隊 sg 之間沒有棧分段,copystack 可以在其中找到它。
sg.elem = cas.elem
...
sg.c = c
// 按鎖的順序創建等待鏈表
*nextp = sg
nextp = &sg.waitlink
switch cas.kind {
case caseRecv:
c.recvq.enqueue(sg)
case caseSend:
c.sendq.enqueue(sg)
}
}
// 等待被喚醒
gp.param = nil
// selparkcommit 根據等待列表依次解鎖
gopark(selparkcommit, nil, waitReasonSelect)
// 重新上鎖
sellock(scases, lockorder)
gp.selectDone = 0
sg = (*sudog)(gp.param)
gp.param = nil
// pass 3 - 從不成功的 channel 中出隊
// 否則將它們堆到一個安靜的 channel 上并記錄所有成功的分支
// 我們按鎖的順序單向鏈接 sudog
casi = -1
cas = nil
sglist = gp.waiting
// 從 gp.waiting 取消鏈接之前清除所有的 elem
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
sg1.isSelect = false
sg1.elem = nil
sg1.c = nil
}
gp.waiting = nil
for _, casei := range lockorder {
k = &scases[casei]
if k.kind == caseNil {
continue
}
...
if sg == sglist {
// sg 已經被喚醒我們的 G 出隊了。
casi = int(casei)
cas = k
} else {
c = k.c
if k.kind == caseSend {
c.sendq.dequeueSudoG(sglist)
} else {
c.recvq.dequeueSudoG(sglist)
}
}
sgnext = sglist.waitlink
sglist.waitlink = nil
releaseSudog(sglist)
sglist = sgnext
}
if cas == nil {
// 當一個參與在 select 語句中的 channel 被關閉時,我們可以在 gp.param == nil 時進行喚醒(所以 cas == nil)
// 最簡單的方法就是循環并重新運行該操作,然后就能看到它現在已經被關閉了
// 也許未來我們可以顯式的發送關閉信號,
// 但我們就必須區分在接收方上關閉和在發送方上關閉這兩種情況了
// 最簡單的方法是不復制代碼并重新檢查上面的代碼。
// 我們知道某些 channel 被關閉了,也知道某些可能永遠不會被重新打開,因此我們不會再次阻塞
goto loop
}
c = cas.c
...
if cas.kind == caseRecv {
recvOK = true
}
...
selunlock(scases, lockorder)
goto retc
bufrecv:
// 可以從 buf 接收
...
recvOK = true
qp = chanbuf(c, c.recvx)
if cas.elem != nil {
typedmemmove(c.elemtype, cas.elem, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
selunlock(scases, lockorder)
goto retc
bufsend:
// 可以發送到 buf
...
typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
selunlock(scases, lockorder)
goto retc
recv:
// 可以從一個休眠的發送方 (sg)直接接收
recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
...
recvOK = true
goto retc
rclose:
// 在已經關閉的 channel 末尾進行讀
selunlock(scases, lockorder)
recvOK = false
if cas.elem != nil {
typedmemclr(c.elemtype, cas.elem)
}
...
goto retc
send:
// 可以向一個休眠的接收方 (sg) 發送
...
send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
...
goto retc
retc:
...
return casi, recvOK
sclose:
// 向已關閉的 channel 進行發送
selunlock(scases, lockorder)
panic(plainError("send on closed channel"))
}
```
### 發送數據的分支
Select 的諸多用法其實本質上仍然是 Channel 操作,編譯器會完成如下翻譯工作:
```
select {
case c <- v:
...
default:
...
}
=>;
if selectnbsend(c, v) {
...
} else {
...
}
```
其中:
```
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}
```
注意,這時`chansend`的第三個參數為`false`,這與前面的普通 Channel 發送操作不同, 說明這時 Select 的操作是非阻塞的。
我們現在來關注`chansend`中當 block 為`false`的情況:
```
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// 快速路徑: 檢查不需要加鎖時失敗的非阻塞操作
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
...
lock(&c.lock)
...
}
```
這里的快速路徑是一個優化,它發生在持有 Channel 鎖之前。 這一連串檢查不需要加鎖有以下原因:
1. Channel 沒有被關閉與 Channel 是否滿的檢查沒有因果關系。換句話說,無論 Channel 是否被關閉,都不能得出 Channel 是否已滿;Channel 是否滿,也與 Channel 是否關閉無關,從而當發生指令重排時,這個檢查也不會出錯。
2. 當 Channel 已經被關閉、且緩存已滿時,發送操作一定失敗。
第二個關于 Select 的處理則是在當判斷完 Channel 是否有`buf`可緩存當前的數據后, 如果沒有讀者阻塞在 Channel 上則會立即返回失敗:
```
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
lock(&c.lock)
...
// 2. 判斷 channel 中緩存是否仍然有空間剩余
if c.qcount < c.dataqsiz {
// 有空間剩余,存入 buffer
...
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
...
}
```
因此這也是為什么,我們在沒有配合 for 循環使用 Select 時,需要對發送失敗進行處理,例如:
```
func main() {
ch := make(chan interface{})
x := 1
select {
case ch <- x:
println("send success") // 如果初始化為有緩存 channel,則會發送成功
default:
println("send failed") // 此時 send failed 會被輸出
}
return
}
```
如果讀者進一步嘗試沒有 default 的例子:
```
// main.go
package main
func main() {
ch := make(chan interface{})
x := 1
select {
case ch <- x:
println("send success") // 如果初始化為有緩存 channel,則會發送成功
}
return
}
```
會發現,此時程序會發生 panic:
```
$ go run main.go
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
```
似乎與源碼中發生的行為并不一致,因為按照之前的分析,當鎖被解除后,并不會出現任何 panic。 這是為什么呢?事實上,編譯器會特殊處理**當 Select 語句只有一個分支的情況,即`select`關鍵字在只有一個分支時,沒有被翻譯成`selectgo`。**只有一個分支的情況下,`select`與`if`是沒有區別的,這種優化消除了只有一個分支情況下調用`selectgo`的性能開銷:
```
// src/cmd/compile/internal/gc/select.go
func walkselectcases(cases *Nodes) []*Node {
// 獲取 case 分支的數量
n := cases.Len()
// 優化: 沒有 case 的情況
if n == 0 {
// 翻譯為:block()
...
return
}
// 優化: 只有一個 case 的情況
if n == 1 {
// 翻譯為:if ch == nil { block() }; n;
...
return
}
// 一般情況,調用 selecggo
...
}
```
根據編譯器的代碼,我們甚至可以看到沒有分支的 Select 會被編譯成`block`的調用:
```
func block() {
gopark(nil, nil, waitReasonSelectNoCases) // forever
}
```
即讓整個 Goroutine 暫止。
### 接收數據的分支
對于接收數據而言,編譯器會將這段語法:
```
select {
case v = <-c:
...
default:
...
}
=>
if selectnbrecv(&v, c) {
...
} else {
...
}
```
而
```
select {
case v, ok = <-c:
... foo
default:
... bar
}
=>
if c != nil && selectnbrecv2(&v, &ok, c) {
... foo
} else {
... bar
}
```
其中:
```
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
selected, _ = chanrecv(c, elem, false)
return
}
func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
selected, *received = chanrecv(c, elem, false)
return
}
```
## Channel 的無鎖實現
早在 2014 年時,Dmitry Vyukov 就已經提出實現無鎖版本的 Channel \[Vyukov, 2014a\] \[Vyukov, 2014b\], 但這提案雖然早年已經實現,但至今未被接受,其未被接收這一現實可以總結為以下三個原因。
早年的 Channel 實現基于比較交換的重試機制,換句話說:多個阻塞在同一 Channel 的 Goroutine 被喚醒時, 需要重新持有鎖,這時誰搶到鎖誰就能拿到數據。所以這些 Goroutine 被喚醒的順序不是 FIFO,而是隨機的, 最壞情況下可能存在一個 Goroutine 始終不會接受到數據。
后來 Russ Cox 希望 \[Cox, 2015\] 阻塞的 Goroutine 能夠按照 FIFO 的順序被喚醒 (雖然在語言層面上未定義多個 Goroutine 的喚醒順序),保證得到數據的公平性,參與討論的人中也表示支持這一提案。 但這一決定基本上抹殺了無鎖 Channel 的實現機制 \[Randall, 2015a\]。 這是目前未使用無鎖實現 Channel 的一個最主要的原因。
那在這個決定之前,無鎖 Channel 早就已經實現了,為什么當時沒有接受使用無鎖版本的 Channel 呢?
第一個原因是提出的無鎖 Channel 并非無等待算法,是否能有效提高 Channel 在大規模應用的性能并沒有大規模測試的強有力的證據, 支撐性能表現的只有 Dmitry Vyukov 提交的性能測試; 與此同時,運行時調度器不是 NUMA-aware 的實現,在 CPU 核心與調度器 P 數量較多時, 一個社區實現的無鎖 Channel \[OneOfOne, 2016\] 的性能測試結果 \[Gjengset, 2016\] 表明: 無鎖版本的 Channel 甚至比基于 futex 加鎖版本的 Channel 還要慢。 在后續對 Channel 性能優化的跟進中雖然沒有采用無鎖實現, 但仍然跟進了兩個小成本的優化 \[Vyukov, 2014d\]:增加不需要鎖時的快速路徑和減少互斥鎖的粒度。
第二個原因導致沒有被接受的原因則在于:無鎖版本的 Channel 可維護性大打折扣。 這里我們簡單提一個由于無鎖實現導致的維護性大打折扣的教訓 \[Randall, 2015b\]。 在早年簡化 Channel 實現的過程中,由于沒有考慮到發送數據過程中, 對要發送數據的指針進行讀取,將會與調度器對執行棧的伸縮發生競爭。這是因為 直接讀取 Channel 的數據分為兩個過程:1. 讀取發送方的值的指針 2. 拷貝到要接收的位置。 然而在 1 和 2 這兩個步驟之間,發送方的執行棧可能發生收縮,進而指針失效,成為競爭的源頭。
雖然后來有人提出使用無鎖編程的形式化驗證工具 spin \[Bell Labs, 1980\] 來讓調度器代碼與形式驗證的模型進行同步,但顯然這需要更多的工作量,并沒有人采取任何行動。
## 小結
Channel 的實現是一個典型的環形隊列加上`mutex`鎖的實現, 與 Channel 同步出現的 Select 更像是一個語法糖, 其本質仍然是一個`chansend`和`chanrecv`的兩個通用實現。 但為了支持 Select 在不同分支上的非阻塞操作,`selectgo`完成了這一需求。
考慮到整個 Channel 操作帶鎖的成本較高,官方也曾考慮過使用無鎖 Channel 的設計, 但由于年代久遠,該改進仍處于擱置狀態 \[Vyukov, 2014b\]。
## 進一步閱讀的參考文獻
* \[Vyukov, 2014a\][Dmitry Vyukov, Go channels on steroids, January 2014](https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub)
* \[Vyukov, 2014b\][Dmitry Vyukov, runtime: lock-free channels, October 2014](https://github.com/golang/go/issues/8899)
* \[Vyukov, 2014c\][Dmitry Vyukov, runtime: chans on steroids, October 2014](https://codereview.appspot.com/12544043)
* \[Vyukov, 2014d\][update on “lock-free channels”, 2015](https://groups.google.com/forum/#!msg/golang-dev/0IElw_BbTrk/cGHMdNoHGQEJ)
* \[Cox, 2015\][runtime: make sure blocked channels run operations in FIFO order](https://github.com/golang/go/issues/11506)
* \[Randall, 2015a\][Keith Randall, runtime: simplify buffered channels, 2015](https://go-review.googlesource.com/c/go/+/9345/)
* \[Randall, 2015b\][Keith Randall, runtime: simplify chan ops, take 2, 2015](https://go-review.googlesource.com/c/go/+/16740)
* \[OneOfOne, 2016\][OneOfOne, A scalable lock-free channel, 2016](https://github.com/OneOfOne/lfchan)
* \[Gjengset, 2016\][Jon Gjengset, Fix poor scalability to many (true-SMP) cores, 2016](https://github.com/OneOfOne/lfchan/issues/3)
* \[Chenebault, 2017\][Benjamin Chenebault, runtime: select is not fair](https://github.com/golang/go/issues/21806)
* \[Bell Labs, 1980\][Bell Labs, Verifying Multi-threaded Software with Spin, 1980](http://spinroot.com/spin/whatispin.html)
- 第一部分 :基礎篇
- 第1章 Go語言的前世今生
- 1.2 Go語言綜述
- 1.3 順序進程通訊
- 1.4 Plan9匯編語言
- 第2章 程序生命周期
- 2.1 從go命令談起
- 2.2 Go程序編譯流程
- 2.3 Go 程序啟動引導
- 2.4 主Goroutine的生與死
- 第3 章 語言核心
- 3.1 數組.切片與字符串
- 3.2 散列表
- 3.3 函數調用
- 3.4 延遲語句
- 3.5 恐慌與恢復內建函數
- 3.6 通信原語
- 3.7 接口
- 3.8 運行時類型系統
- 3.9 類型別名
- 3.10 進一步閱讀的參考文獻
- 第4章 錯誤
- 4.1 問題的演化
- 4.2 錯誤值檢查
- 4.3 錯誤格式與上下文
- 4.4 錯誤語義
- 4.5 錯誤處理的未來
- 4.6 進一步閱讀的參考文獻
- 第5章 同步模式
- 5.1 共享內存式同步模式
- 5.2 互斥鎖
- 5.3 原子操作
- 5.4 條件變量
- 5.5 同步組
- 5.6 緩存池
- 5.7 并發安全散列表
- 5.8 上下文
- 5.9 內存一致模型
- 5.10 進一步閱讀的文獻參考
- 第二部分 運行時篇
- 第6章 并發調度
- 6.1 隨機調度的基本概念
- 6.2 工作竊取式調度
- 6.3 MPG模型與并發調度單
- 6.4 調度循環
- 6.5 線程管理
- 6.6 信號處理機制
- 6.7 執行棧管理
- 6.8 協作與搶占
- 6.9 系統監控
- 6.10 網絡輪詢器
- 6.11 計時器
- 6.12 非均勻訪存下的調度模型
- 6.13 進一步閱讀的參考文獻
- 第7章 內存分配
- 7.1 設計原則
- 7.2 組件
- 7.3 初始化
- 7.4 大對象分配
- 7.5 小對象分配
- 7.6 微對象分配
- 7.7 頁分配器
- 7.8 內存統計
- 第8章 垃圾回收
- 8.1 垃圾回收的基本想法
- 8.2 寫屏幕技術
- 8.3 調步模型與強弱觸發邊界
- 8.4 掃描標記與標記輔助
- 8.5 免清掃式位圖技術
- 8.6 前進保障與終止檢測
- 8.7 安全點分析
- 8.8 分代假設與代際回收
- 8.9 請求假設與實務制導回收
- 8.10 終結器
- 8.11 過去,現在與未來
- 8.12 垃圾回收統一理論
- 8.13 進一步閱讀的參考文獻
- 第三部分 工具鏈篇
- 第9章 代碼分析
- 9.1 死鎖檢測
- 9.2 競爭檢測
- 9.3 性能追蹤
- 9.4 代碼測試
- 9.5 基準測試
- 9.6 運行時統計量
- 9.7 語言服務協議
- 第10章 依賴管理
- 10.1 依賴管理的難點
- 10.2 語義化版本管理
- 10.3 最小版本選擇算法
- 10.4 Vgo 與dep之爭
- 第12章 泛型
- 12.1 泛型設計的演進
- 12.2 基于合約的泛型
- 12.3 類型檢查技術
- 12.4 泛型的未來
- 12.5 進一步閱讀的的參考文獻
- 第13章 編譯技術
- 13.1 詞法與文法
- 13.2 中間表示
- 13.3 優化器
- 13.4 指針檢查器
- 13.5 逃逸分析
- 13.6 自舉
- 13.7 鏈接器
- 13.8 匯編器
- 13.9 調用規約
- 13.10 cgo與系統調用
- 結束語: Go去向何方?