Channel,即通道,衍生自Charles Antony Richard Hoare的CSP并發模型,是Go的并發原語,在Go語言中具有極其重要的地位。雖然它可用于同步內存的訪問,但更適合用于goroutine之間傳遞信息。就像我們在之前“Go的并發哲學”章節所提到的那樣,通道在任何規模的程序編碼中都非常有用,因為它足夠靈活,能夠以各種方式組合在一起。我們在隨后的“select語句”章節會進一步探討其是如何構造的。
可以把通道想象為一條河流,通道作為信息流的載體;數據可以沿著通道被傳遞,然后從下游被取出。基于這個原因,我通常用單詞“Stream”為我的通道變量命名。在使用通道時,你把一個值傳遞給chan類型的變量,然后在程序的其他位置再把這個值從通道中讀取出來。該值被傳遞的入口和出口不需要知道彼此的存在,你只需使用通道的引用進行操作就好。
建立一個通道是非常簡單的。下面這個例子展現了如何對通道聲明和如何實例化。與Go中的其他類型一樣,你可以使用 := 來簡化通道的創建。不過在程序中通常會先聲明通道,所以了解該如何將二者分成單獨的步驟是很有用的:
```
var dataStream chan interface{} // 1
dataStream = make(chan interface{}) // 2
```
1.這里聲明了一個通道。我們說該通道的“類型”是interface{}的。
2.這里我們使用內置函數make實例化通道。
這個示例定義了一個名為dataStream的通道,在該通道上可以寫入或讀取任意類型的值(因為我們使用了空的接口)。通道也可以聲明為僅支持單向數據流——即你可以定義僅支持發送或僅支持接收數據的通道。我將在本節的末尾解釋單向數據流的重要性。
要聲明一個只能被讀取的單向通道,你可以簡單的使用 <- 符號,如下所示:
```
var dataStream <-chan interface{}
dataStream := make(<-chan interface{})
```
與之相對應,要聲明一個只能被發送的單向通道,把 <-放在 chan關鍵字的右側即可:
```
var dataStream chan<- interface{}
dataStream := make(chan<- interface{})
```
你不會經常看到實例化的單向通道,但你會經常看到它們被用作函數參數和返回類型,這是非常有用的,因為Go可以在需要時將雙向通道隱式轉換為單向通道,比如這樣:
```
var receiveChan <-chan interface{}
var sendChan chan<- interface{}
dataStream := make(chan interface{})
// 這樣做是有效的
receiveChan = dataStream
sendChan = dataStream
```
要注意通道是有“類型”的。 在這個例子中,我們創建了一個接口“類型”的chan,這意味著我們可以在其上放置任何類型的數據,但是我們也可以給它一個更嚴格的類型來約束它可以傳遞的數據類型。 這是一個整數通道的例子:
```
intStream := make(chan int)
```
為了操作通道,我們再一次使用 <- 符號。我們看一個實際的例子:
```
stringStream := make(chan string)
go func() {
stringStream <- "Hello channels!" //1
}()
fmt.Println(<-stringStream) //2
```
1. 這里我們將字符串放入通道stringStream。
2. 這里我們從通道中取出字符串并打印到標準輸出流。
這會輸出:
```
Hello channels!
```
很簡單,是吧。你所需要做的只是建立一個通道變量,然后將數據傳遞給它并從中讀取數據。但是,嘗試將值寫入只讀通道或從只寫通道讀取值都是錯誤的。如果我們嘗試編譯下面的例子,Go的編譯器會報錯:
```
writeStream := make(chan<- interface{})
readStream := make(<-chan interface{})
<-writeStream
readStream <- struct{}{}
```
這會輸出:
```
invalid operation: <-writeStream (receive from send-only type chan<- interface {})
invalid operation: readStream <- struct {} literal (send to receive-only type <-chan interface {})
```
這是Go類型系統的一部分,即使在處理并發原語時也為我們保證類型安全。稍后我們會看到,這對構建易于推理的可組合邏輯程序提供了強大的保證。
回想一下,在之前我們強調過,僅簡單的定義一個goroutine并不能保證它在main goroutine退出之前運行。為此我們介紹了對sync包的各種使用案例。那么在使用通道的情況下該如何呢?看下面這個例子:
該示例之所以產生這樣的結果,是因為在Go中,通道是包含有阻塞機制的。這意味著試圖寫入已滿的通道的任何goroutine都會等待直到通道被清空,并且任何嘗試從空閑通道讀取的goroutine都將等待,直到至少有一個元素被放置 。在這個例子中,我們的fmt.Println包含一個對通道stringStream的讀取,并且將阻塞在那里,直到通道上被放置一個值。同樣,匿名goroutine試圖在stringStream上放置一個字符串,然后阻塞住等待被讀取,所以goroutine在寫入成功之前不會退出。因此,main goroutine和匿名的goroutine發生阻塞是毫無疑問的。
```
stringStream := make(chan string)
go func() {
if 0 != 1 { //1
return
}
stringStream <- "Hello channels!"
}()
fmt.Println(<-stringStream)
```
1. 在這里 我們確保通道stringStream永遠不會獲得值。
這會產生錯誤:
```
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive]:
main.main()
/tmp/babel-23079IVB/go-src-230795Jc.go:15 +0x97
exit status 2
```
main goroutine等待著stringSteam通道被放上一個值,而且由于我們的if條件,導致這不會發生。當匿名goroutine退出時,Go發現并報告死鎖。在本節后面,我將解釋如何構建我們的程序,以防止這種死鎖。與此同時,讓我們回到從通道讀取數據的討論。
< - 運算符的接收形式也可以選擇返回兩個值,如下所示:
```
stringStream := make(chan string)
go func() {
stringStream <- "Hello channels!"
}()
salutation, ok := <-stringStream //1
fmt.Printf("(%v): %v", ok, salutation)
```
1. 我們在這里接收一個字符串salutation和一個布爾值ok。
這會輸出:
```
(true): Hello channels!
```
有意思吧。那么布爾值代表什么呢?這個值是讀取操作的一個標識,用于指示讀取的通道是由過程中其他位置的寫入生成的值,還是由已關閉通道生成的默認值。 等一下; 一個已關閉的通道,那是什么?
在程序中,能夠指示還有沒有更多值將通過通道發送是非常有用的。 這有助于下游流程知道何時移動,退出,或在新的通道上重新開啟通信等。我們可以通過為每種類型提供特殊的標識符來完成此操作,但這會開發人員的工作產生巨大的重復性,如果能夠內置將產生極大的便利,因此關閉通道就像是一個萬能的哨兵,它說:“嘿,上游不會寫更多的數據啦,做你想做的事吧。”要關閉頻道,我們使用close關鍵字,就像這樣:
```
valueStream := make(chan interface{})
close(valueStream)
```
有趣的是,我們也可以從已關閉的通道讀取。 看這個例子:
```
intStream := make(chan int)
close(intStream)
integer, ok := <- intStream // 1
fmt.Printf("(%v): %v", ok, integer)
```
1. 這里我們從已關閉的通道讀取。
這會輸出:
```
(false): 0
```
注意我們在關閉通道前并沒有把任何值放入通道。即便如此我們依然可以執行讀取操作,而且盡管通道處在關閉狀態,我們依然可以無限期地在此通道上執行讀取操作。這是為了支持單個通道的上游寫入器可以被多個下游讀取器讀取(在第四章我們會看到這是一種常見的情況)。第二個返回值——即布爾值ok——表明收到的值是int的零值,而非被放入流中傳遞過來。
這為我們開辟了一些新的模式。首先是通道的range操作。與for語句一起使用的range關鍵字支持將通道作為參數,并且在通道關閉時自動結束循環。這允許對通道上的值進行簡潔的迭代。 我們來看一個例子:
```
intStream := make(chan int)
go func() {
defer close(intStream) // 1
for i := 1; i <= 5; i++ {
intStream <- i
}
}()
for integer := range intStream { // 2
fmt.Printf("%v ", integer)
}
```
1. 在這里我們在通道退出之前保證正常關閉。這是一種很常見的Go慣用法。
2. 這里對intStream進行迭代。
正如你所看到的,所有的值被打印后程序退出:
```
1 2 3 4 5
```
注意循環退出并沒有設置條件,并且range也不返回第二個布爾值。對通道進行關閉的處理被隱藏了起來,以此保證循環的簡潔。
關閉某個通道同樣可以被作為向多個goroutine同時發生消息的方式之一。如果你有多個goroutine在單個通道上等待,你可以簡單的關閉通道,而不是循環解除每一個goroutine的阻塞。由于一個已關閉的通道可以被無限次的讀取,因此其中有多少goroutine在阻塞狀態并不重要,關閉通道(以解除所有阻塞)消耗的資源又少執行的速度又快。以下是一次解除多個goroutine的示例:
```
begin := make(chan interface{})
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
<-begin //1
fmt.Printf("%v has begun\n", i)
}(i)
}
fmt.Println("Unblocking goroutines...")
close(begin) //2
wg.Wait()
```
1. 這里對begin通道進行讀取,由于通道中沒有任何值,會產生阻塞。
2. 這里我們關閉通道,這樣所有goroutine的阻塞會被解除。
你可以看到,在我們關閉begin 通道之前,沒有任何一個goroutine開始運行:
```
Unblocking goroutines...
4 has begun
2 has begun
3 has begun
0 has begun
1 has begun
```
回想一下在“sync包”中我們討論過sync.Cond實現類似功能的例子,你當然可以使用Single或者Brocast來做,不過通道是可組合的,所以這也是我最喜歡的同時解除多個goroutine阻塞的方法。
接下來,我們來討論“緩沖通道”,這種通道實在實例化時候提供可攜帶元素的容量。這意味著,即使沒有對通道進行讀取操作,goroutine仍然可以執行n次寫入,這里的n即緩沖通道的容量。下面是一個實例化的例子:
```
var dataStream chan interface{}
dataStream = make(chan interface{}, 4)
```
1. 這里我們創建一個容量為4的緩沖通道。 這意味著我們可以將4個元素放在通道上,而不管它是否被讀取(在數量達到上限之前,寫入行為都不會發生阻塞)。
我們再一次把初始化分為了兩行,這樣你可以清楚的發現,一個緩沖通道和一個非緩沖通道
在聲明上上沒有區別的(區別只在實例化部分)。有趣的地方在于,我們可以在實例化的位置對通道是否是緩沖的進行控制。這表明,通道的建立應該與goroutine緊密結合,這樣我們可以極大的提高代碼的可讀性。
無緩沖的通道也可以按緩沖通道定義:無緩沖的通道可以視作一個容量為0的緩沖通道。就像下面這樣:
```
a := make(chan int)
b := make(chan int, 0)
```
這兩個通道都是int“類型”的。請記住我們在討論“阻塞”時所代表的含義,我們說向一個已滿的通道寫入,會出現阻塞,從一個已空的通道讀取,也會出現阻塞。這里的“滿”和“空”是針對容量或緩沖區大小而言的。無緩沖的通道所擁有的容量為0,所以任何寫入行為之后它都會是滿的。一個容量為4的緩沖通道在4次寫入后會是滿的,并且會在第5次寫入時出現阻塞,因為它已經沒有其他位置可以放置第5個元素,這時它表現出的行為與無緩沖通道一樣:由此可見,緩沖通道和無緩沖通道的區別在于,通道為空和滿的前提條件是不同的。通過這種方式,緩沖通道可以在內存中構建用于并發進程通信的FIFO隊列。
為了幫助理解這一點,我們來舉例說明緩沖通道容量為4的示例中發生了什么。 首先,讓我們初始化它:
```
c := make(chan rune, 4)
```
從邏輯上講,這會創建一個帶有四個空位的緩沖區的通道:

現在,讓我們向通道寫入:
```
c <- 'A'
```
當這個通道沒有被讀取時,A字符將被放置在通道緩沖區的第一個空位中,像這樣:

隨后每次寫入緩沖通道(同樣假設沒有被讀取),將填充緩沖通道中的剩余空位,像這樣:
```
c <- 'B'
```

```
c <- 'C'
```

```
c <- 'D'
```

經過四次寫入,我們的緩沖通道已經裝滿了4個元素。如果我們再向通道中進行寫入的話:
```
c <- 'E'
```

當前的goroutine會表現為阻塞!并且goroutine將一直保持阻塞狀態,直到由其他的goroutine執行讀取操作在緩沖區中空出了位置。 讓我們看看是什么樣子的:
```
<- c
```

正如你所看到的那樣,讀取時會接收到放在通道上的第一個字符A,被阻塞的寫入阻塞解除,E被放置在緩沖區的末尾。
如果,如果緩沖通道為空且有接收器讀取,則緩沖器將被繞過,并且該值將直接從發送器傳遞到接收器(存疑)。實際上,這是透明地發生的,但值得了解。
緩沖通道在某些情況下很有用,但你應該小心使用。正如我們將在下一章中看到的那樣,緩沖通道很容易成為不成熟的優化,并且通過使用它們死鎖會變得更為隱蔽。我猜你寧愿在第一次編寫代碼時找到一個死鎖,而不是在半夜系統停機的時候。
讓我們來看看另一個更完整的代碼示例,以便更好地了解緩沖通道的工作方式:
```
var stdoutBuff bytes.Buffer //1
defer stdoutBuff.WriteTo(os.Stdout) //2
intStream := make(chan int, 4) //3
go func() {
defer close(intStream)
defer fmt.Fprintln(&stdoutBuff, "Producer Done.")
for i := 0; i < 5; i++ {
fmt.Fprintf(&stdoutBuff, "Sending: %d\n", i)
intStream <- i
}
}()
for integer := range intStream {
fmt.Fprintf(&stdoutBuff, "Received %v.\n", integer)
}
```
1. 這里我們創建一個內存緩沖區來幫助緩解輸出的不確定性。 它不會給帶來我們任何保證,但比直接寫stdout要快一些。
2. 在這里,我們確保在進程退出之前將緩沖區內容寫入標準輸出。
3. 這里我們創建一個容量為4的緩沖通道。
在這個例子中,寫入stdout的順序是不確定的,但你仍然可以大致了解匿名goroutine是如何工作的。 如果你檢查輸出結果,可以看到我們的匿名goroutine能夠將所有五個結果放在intStream中,并在主要goroutine將一個結果關閉之前退出。
```
Sending: 0
Sending: 1
Sending: 2
Sending: 3
Sending: 4
Producer Done.
Received 0
Received 1
Received 2
Received 3
Received 4
```
這是一個在正確條件下可以使用的優化示例:如果寫入通道的goroutine明確知道將會寫入多少信息,則創建相對應的緩沖通道容量會很有用,就可以盡可能快地進行讀取。 當然,這樣做是有限制的,我們將在下一章中介紹。
我們已經討論了無緩沖的頻道,緩沖頻道,雙向頻道和單向頻道。目前還沒有討論到的還有通道的默認值:nil。程序是如何處理處理nil通道的呢?首先,讓我們試著從一個nil通道中讀取:
```
var dataStream chan interface{}
<-dataStream
```
這會輸出:
```
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive (nil chan)]:
main.main()
F:/code/gospcace/src/myConcurrency/l1introduction/l01/main.go:6 +0x30
```
死鎖出現了。這說明從一個nil通道進行讀取會阻塞程序(*注意,這段代碼的前提是在main函數中執行,所以會導致死鎖。如果是運行在單個gouroutine中,那么就不會是死鎖而是阻塞*)。讓我們再試試寫入:
```
var dataStream chan interface{}
dataStream <- struct{}{}
```
這會輸出:
```
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send (nil chan)]:
main.main()
F:/code/gospcace/src/myConcurrency/l1introduction/l01/main.go:6 +0x53
```
看來對一個nil通道進行寫入操作同樣會阻塞。
我們再試試關閉操作:
```
var dataStream chan interface{}
close(dataStream)
```
這會輸出:
```
panic: close of nil channel
goroutine 1 [running]:
main.main()
F:/code/gospcace/src/myConcurrency/l1introduction/l01/main.go:6 +0x31
```
程序掛掉了,這也許是最符合你預期的結果。無論如何,請務必確保你的通道在工作前已經完成了初始化。
我們已經了解了很多與通道互動的規則。現在你已經了解了在通道上執行操作的方式和為什么這樣做的原因。 下表列舉了通道上的操作對應狀態的通道會發生什么。
***注意:表中的用詞都很簡短,為了減少不必要的歧義或混亂,并未對該表進行不必要的翻譯,此外,正如上面例子所展現的,該表的操作結果默認都是在main函數下操作。請以批判的眼光審視下表。***
:-: 
如果我們查看該表,可以察覺到在操作中可能產生問題的地方。這里有三個可能導致阻塞的操作,以及三個可能導致程序恐慌的操作。乍看之下,通道的使用上限制很多,但在檢查了這個限制產生的動機并熟悉了通道的使用后,它變得不那么可怕并開始具有很大意義。讓我們討論如何組織不同類型的通道來構筑穩健的程序。
我們應該做的第一件事是將通道置于正確的環境中,即分配通道所有權。我將所有權定義為goroutine的實例化,寫入和關閉。就像在那些沒有垃圾回收的語言中使用內存一樣,重要的是要明確哪個goroutine擁有該通道,以便從邏輯上推理我們的程序。單向通道聲明是一種工具,它可以讓我們區分哪些gouroutine擁有通道,哪些goroutine僅使用通道:通道所有者對通道具有寫入訪問權(chan或chan<- ),而通道使用者僅具有讀取權(<-chan)。一旦我們對通道權責區分,上表的結果自然就會出現。我們可以開始對擁有通道和不擁有通道的goroutine賦予不同的責任并給予對應的檢查(以增強程序和邏輯的健壯性)。
讓我們從通道的所有者說起。當一個goroutine擁有一個通道時應該:
1. 初始化該通道。
2. 執行寫入操作,或將所有權交給另一個goroutine。
3. 關閉該通道。
4. 將此前列出的三件事情封裝在一個列表中,并通過訂閱通道將其公開。
通過將這些責任分配給通道所有者,會發生一些事情:
* 因為我們是初始化頻道的人,所以我們要了解寫入nil通道會帶來死鎖的風險。
* 因為我們是初始化頻道的人,所以我們要了解關閉ni通道會帶來恐慌的風險。
* 因為我們是決定頻道何時關閉的人,所以我們要了解寫入已關閉的通道會帶來恐慌的風險。
* 因為我們是決定何時關閉頻道的人,所以我們要了解多次關閉通道會帶來恐慌的風險。
* 我們在編譯時使用類型檢查器來防止對通道進行不正確的寫入。
現在我們來看看讀取時可能發生的那些阻塞操作。 作為一個通道的消費者,我只需要擔心兩件事情:
* 通道什么時候會被關閉。
* 處理基于任何原因出現的阻塞。
解決第一個問題,通過檢查讀取操作的第二個返回值就可以。第二點很難,因為它取決于你的算法(和業務邏輯):你可能想要超時,當獲得通知時你可能想停止讀取操作,或者你可能只是滿足于在整個生命周期中產生阻塞。 重要的是,作為一個消費者,你應該明確這樣一個事實,即讀取操作可以并且必將產生阻塞。我們將在下一章中探討如何實現select語句解決這個棘手的問題。
現在,讓我們用一個例子來總結以上的思考結果。我們建立一個goroutine,它擁有一個通道,一個消費者,它會處理阻塞問題:
```
chanOwner := func() <-chan int {
resultStream := make(chan int, 5)//1
go func() {//2
defer close(resultStream)//3
for i := 0; i <= 5; i++ {
resultStream <- i
}
}()
return resultStream//4
}
resultStream := chanOwner()
for result := range resultStream {//5
fmt.Printf("Received: %d\n", result)
}
fmt.Println("Done receiving!")
```
1. 這里我們實例化一個緩沖通道。 由于我們知道我們將產生六個結果,因此我們創建了五個緩沖通道,以便該goroutine可以盡快完成操作。
2. 在這里,我們啟動一個匿名的goroutine,它在resultStream上執行寫操作。 請注意,我們是如果創建goroutines的, 它現在被封裝在函數中。
3. 這里我們確保resultStream在操作完成后關閉。作為通道所有者,這是我們的責任。
4. 我們在這里返回通道。由于返回值被聲明為只讀通道,resultStream將隱式轉換為只讀的。
5. 這里我們消費了resultStream。 作為消費者,我們只關心阻塞和通道的關閉。
這會輸出:
```
Received: 0
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Done receiving!
```
注意resultStream通道的生命周期如何封裝在chanOwner函數中。很明顯,寫入不會發生在nil或已關閉的頻道上,并且關閉總是會發生一次。這消除了我們之前提到的部分風險。我強烈建議你在自己的程序中盡可能做到保持通道覆蓋范圍最小,以便這些事情保持明顯。如果你將一個通道作為一個結構體的成員變量,并且有很多方法,它很快就會把你自己給繞進去(***雖然很多庫和書中都這么干,但只有這本書的作者將這一點給明確提出來了***)。
消費者功能只能讀取通道,因此只需知道應該如何處理阻塞讀取和通道關閉。 在這個小例子中,我們采取了這樣的方式:在通道關閉之前阻塞程序是完全沒問題的。
如果你設計自己的代碼時來遵循這個原則,那么對你的系統進行推理就會容易得多,而且它很可能會像你期望的那樣執行。我不能保證你永遠不會引入阻塞或恐慌,但是當你這樣遇到這樣的情況時,我認為你會發現你的通道所有權范圍要么太大,要么所有權不清晰。
通道是首先吸引我使用Go的原因之一。 結合goroutines和閉包的簡約性,編寫干凈、正確的并發代碼是比較容易的。在很多方面,通道是將goroutine綁在一起的膠水。 本節為你概述了什么是通道以及如何使用它們。當我們開始編寫通道以形成更高階的并發設計模式時,真正的樂趣就開始了。我們將在下一章中體會到這一點。
* * * * *
學識淺薄,錯誤在所難免。我是長風,歡迎來Golang中國的群(211938256)就本書提出修改意見。
感謝"王大錘"提出的,關于通道狀態圖片匹配問題的意見,該圖片已調整。
- 前序
- 誰適合讀這本書
- 章節導讀
- 在線資源
- 第一章 并發編程介紹
- 摩爾定律,可伸縮網絡和我們所處的困境
- 為什么并發編程如此困難
- 數據競爭
- 原子性
- 內存訪問同步
- 死鎖,活鎖和鎖的饑餓問題
- 死鎖
- 活鎖
- 饑餓
- 并發安全性
- 優雅的面對復雜性
- 第二章 代碼建模:序列化交互處理
- 并發與并行
- 什么是CSP
- CSP在Go中的衍生物
- Go的并發哲學
- 第三章 Go的并發構建模塊
- Goroutines
- sync包
- WaitGroup
- Mutex和RWMutex
- Cond
- Once
- Pool
- Channels
- select語句
- GOMAXPROCS
- 結論
- 第四章 Go的并發編程范式
- 訪問范圍約束
- fo-select循環
- 防止Goroutine泄漏
- or-channel
- 錯誤處理
- 管道
- 構建管道的最佳實踐
- 便利的生成器
- 扇入扇出
- or-done-channel
- tee-channel
- bridge-channel
- 隊列
- context包
- 小結
- 第五章 可伸縮并發設計
- 錯誤傳遞
- 超時和取消
- 心跳
- 請求并發復制處理
- 速率限制
- Goroutines異常行為修復
- 本章小結
- 第六章 Goroutines和Go運行時
- 任務調度