通道非常適合在Go中構建管道,因為它們滿足了我們所有的基本要求。它們可以接收并傳遞值,它們可以在并發中安全的使用,它們可以被遍歷,而且它們被語言給予了完美的支持。讓我們用通道將之前的例子改造一下:
```
generator := func(done <-chan interface{}, integers ...int) <-chan int {
intStream := make(chan int)
go func() {
defer close(intStream)
for _, i := range integers {
select {
case <-done:
return
case intStream <- i:
}
}
}()
return intStream
}
multiply := func(done <-chan interface{}, intStream <-chan int, multiplier int) <-chan int {
multipliedStream := make(chan int)
go func() {
defer close(multipliedStream)
for i := range intStream {
select {
case <-done:
return
case multipliedStream <- i * multiplier:
}
}
}()
return multipliedStream
}
add := func(done <-chan interface{}, intStream <-chan int, additive int) <-chan int {
addedStream := make(chan int)
go func() {
defer close(addedStream)
for i := range intStream {
select {
case <-done:
return
case addedStream <- i + additive:
}
}
}()
return addedStream
}
done := make(chan interface{})
defer close(done)
intStream := generator(done, 1, 2, 3, 4)
pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)
for v := range pipeline {
fmt.Println(v)
}
```
這會輸出:
```
6
10
14
18
```
看起來我們得到了期望的輸出結果,但代價是代碼更多了。先,我們來看看我們寫的是什么。 現在有三個函數,而不是兩個。他們都看起來像是在內部開啟一個通道,并使用我們在“防止Goroutine泄漏”中建立的模式,通過一個done通道表示該通道應該退出。他們都看起來像返回通道,其中一些看起來像他們也采用了額外的通道。讓我們開始進一步分解:
```
done := make(chan interface{})
defer close(done)
```
我們的程序首先創建了done通道,并調用close通過defer延遲執行。正如前面所討論的那樣,這可以確保我們的程序干凈地退出,而不泄漏goroutines。沒有什么新鮮的。接下來,我們來看看函數generator:
```
generator := func(done <-chan interface{}, integers ...int) <-chan int {
intStream := make(chan int)
go func() {
defer close(intStream)
for _, i := range integers {
select {
case <-done:
return
case intStream <- i:
}
}
}()
return intStream
}
// ...
intStream := generator(done, 1, 2, 3, 4)
```
generator接收整數類型的切片,構造整數類型的通道,啟動一個goroutine并返回構造的通道。然后,在創建的goroutine通道上發送切片的值。
請注意,通道上的發送與done通道上的選擇共享一條select語句。這是我們在“防止Goroutine泄漏”中建立的模式。
簡而言之,generator函數將一組離散值轉換為一個通道上的數據流。這種操作的函數我們稱之為生成器。在使用管道時,你會經常看到這一點,因為在管道開始時,你總是會有一些需要轉換為通道的數據。我們將稍微介紹一些有趣的生成器的幾個例子,但我們先來完成對這個程序的分析。 接下來,我們構建管道:
```
pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)
```
這與本節之前的流程相同:對于一串數字,我們將它們乘以2,加1,然后將結果乘以2。這條管道與我們前面例子中使用函數的管道相似,但它在很重要的方面有所不同。
首先,我們正在使用通道。 這是顯而易見的,因為它允許兩件事:在我們的管道的末尾,可以使用range語句來提取值,并且在每個階段我們可以安全地并發執行,因為我們的輸入和輸出在并發上下文中是安全的。
這給我們帶來了第二個區別:管道的每個階段都在同時執行。 這意味著任何階段只需要等待其輸入,并且能夠發送其輸出。 事實證明,這會產生巨大的影響,我們將在“扇出,扇入”一節中發現,但現在我們可以簡單地注意到它允許各階段相互獨立地執行一段時間。
最后,在我們的例子中,我們對這個管道進行了遍歷取值:
```
for v := range pipeline {
fmt.Println(v)
}
```
下面是一個表格,演示系統中的每個值如何進入每個通道,以及通道何時關閉。
:-: 
讓我們更仔細地研究一下這個模式來標示goroutines退出。當處理多個相互依賴的goroutines時,這種模式如何起作用? 如果我們在程序完成執行之前在完成的通道上調用close,會發生什么情況?
要回答這些問題,再來看看管道是構建的這一行:
```
pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)
```
管道的各階段通過兩種方式連接在一起:通過默認的done通道,和被傳遞給后續階段的通道。換句話說,multiply函數創建的通道被傳遞給add函數。讓我們重新審視前面的表格,并在完成之前,關閉done通道,看看會發生什么:
:-: 
看到關閉done通道是如何影響到管道的了么?這是通過管道每個階段的兩件事情實現的:
* 對傳入的頻道進行遍歷。當輸入通道關閉時,遍歷操作將退出。
* 發送操作與done通道共享select語句。
無論流水線階段處于等待數據通道的狀態,還是處在等待發送通道關閉的狀態,都會強制管道各階段終止。
這里有一個復發關系。在管道開始時,我們已經確定必須將傳入的切片值轉換為通道。在這個過程中有兩點必須是可搶占的:
* 在生成器通道上創建值。
* 在其頻道上發送離散值。
在我們的例子中,在生成器函數中,離散值是通過遍歷切片生成的,它足夠快,不需要被搶占。 第二個是通過我們的select語句和done通道處理的,它確保發生器即使被阻塞試圖寫入intStream也是可搶占的。
在管道的另一端,同樣我們可以確保最終階段的可搶占性。因為我們正在操作的通道在搶占時會被關閉,所以當這種情況發生時,通道將會中斷。 最后階段是可搶占的,因為我們依賴的流是可搶占的。
在管道開始和結束之間,代碼總是在一個通道上遍歷,并在包含done通道的select語句內的另一個通道上發送。
如果某個階段在傳入通道檢索到值時被阻塞,則該通道關閉時它將變為未阻塞狀態。 如果某個階段在發送值時被阻塞,則由于select語句而可搶占。
因此,我們的整個管道始終可以通過關閉done通道來搶占。
* * * * *
學識淺薄,錯誤在所難免。我是長風,歡迎來Golang中國的群(211938256)就本書提出修改意見。
- 前序
- 誰適合讀這本書
- 章節導讀
- 在線資源
- 第一章 并發編程介紹
- 摩爾定律,可伸縮網絡和我們所處的困境
- 為什么并發編程如此困難
- 數據競爭
- 原子性
- 內存訪問同步
- 死鎖,活鎖和鎖的饑餓問題
- 死鎖
- 活鎖
- 饑餓
- 并發安全性
- 優雅的面對復雜性
- 第二章 代碼建模:序列化交互處理
- 并發與并行
- 什么是CSP
- CSP在Go中的衍生物
- Go的并發哲學
- 第三章 Go的并發構建模塊
- Goroutines
- sync包
- WaitGroup
- Mutex和RWMutex
- Cond
- Once
- Pool
- Channels
- select語句
- GOMAXPROCS
- 結論
- 第四章 Go的并發編程范式
- 訪問范圍約束
- fo-select循環
- 防止Goroutine泄漏
- or-channel
- 錯誤處理
- 管道
- 構建管道的最佳實踐
- 便利的生成器
- 扇入扇出
- or-done-channel
- tee-channel
- bridge-channel
- 隊列
- context包
- 小結
- 第五章 可伸縮并發設計
- 錯誤傳遞
- 超時和取消
- 心跳
- 請求并發復制處理
- 速率限制
- Goroutines異常行為修復
- 本章小結
- 第六章 Goroutines和Go運行時
- 任務調度