當你編寫一個程序時,你可能不會坐下來寫一個長函數——至少我希望你不要! 你會以函數,結構體,方法等形式構造抽象。為什么要這樣做? 部分是為了抽象出無關的細節,另一部分是為了能夠在不影響其他區域的情況下處理某個代碼區域。你沒有必要改變整個系統,當發現自己必須調整多個區域才能做出一個合乎邏輯的改變時,說明該系統的抽象實在是很糟糕。
pipline,又名管道,或者叫流水線,是你可以用來在系統中形成抽象的另一種工具。特別是當你的程序需要處理流或批處理數據時,它是一個非常強大的工具。管道這個詞被認為是在1856年首次使用的,指將液體從一個地方輸送到另一個地方的一系列管道。計算機科學借用了這個術語,因為我們也在從一個地方向另一個地方傳輸某些東西:數據。管道是個系統,它將一系列數據輸入,執行操作并將數據傳回。我們稱這些操作都是管道的一個階段。
通過使用管道,你可以分離每個階段的關注點,這提供了許多好處。你可以獨立于彼此修改模塊,你可以混合搭配模塊的組合方式,而無需修改模塊,你可以讓每個模塊同時處理到上游或下游模塊,并且可以扇出或限制你的部分管道。 我們將在"扇出,扇入”一節中介紹扇出,我們將在第5章介紹速率限制。你現在不必擔心這些奇怪的術語; 讓我們從最簡單的開始,嘗試構建一個管道。
如前所述,一個階段只是類似于執行將數據輸入,對其進行轉換并將數據發回這樣的功能。 這是一個可以被視為管道某階段的例子:
```
multiply := func(values []int, multiplier int) []int {
multipliedValues := make([]int, len(values))
for i, v := range values {
multipliedValues[i] = v * multiplier
}
return multipliedValues
}
```
這個函數用取整數切片,循環遍歷它們,然后返回一個新的切片。看起來很無聊的功能,對吧? 讓我們創建管道的另一個階段:
```
add := func(values []int, additive int) []int {
addedValues := make([]int, len(values))
for i, v := range values {
addedValues[i] = v + additive
}
return addedValues
}
```
跟上個函數類似,只不過把乘法變成了加法。接下來,讓我們嘗試將它們合并:
```
ints := []int{1, 2, 3, 4}
for _, v := range add(multiply(ints, 2), 1) {
fmt.Println(v)
}
```
這會輸出:
```
3
5
7
9
```
看看我們是如何將他們結合起來的。這些函數就像你每天工作使用的函數一樣,但是我們可以將它們組合起來形成一個流水線。 那么我們怎么定義管道的“階段”呢?
* 一個階段消耗并返回相同的類型。
* 一個階段必須通過語言來體現,以便它可以被傳遞。 Go的函數已經實現并很好地適用于這個目的。
那些熟悉函數式編程的人可能會點頭并思考像高階函數和monad這樣的術語。 事實上,管道的各階段與函數式編程密切相關,可以被認為是monad的一個子集。 我不會在這里明確地討論Monad或函數式編程,但它們本身就是有趣的主題,在嘗試理解管道時,對這兩個主題的工作知識雖然沒有必要,但是對于加強理解是有用的。
在這里,我們的add和multiply滿足管道的階段屬性:它們都消耗int切片并返回int切片,并且因為Go支持函數傳遞,所以我們可以傳遞add和multiply。 這些屬性很有趣:在不改變階段本身的情況下,我們可以很容易地將我們的階段結合到更高層次。
例如,如果我們現在想為管道添加一個額外的階段:乘以2,只需將我們以前的管道包裝在一個新的階段內,就像這樣:
```
ints := []int{1, 2, 3, 4}
for _, v := range multiply(add(multiply(ints, 2), 1), 2) {
fmt.Println(v)
}
```
這會輸出:
```
6
10
14
18
```
注意我們是如何做到這一點的。也許你已經開始看到使用管道模式的好處了。 當然,我們也可以在程序上編寫此代碼:
```
ints := []int{1, 2, 3, 4}
for _, v := range ints {
fmt.Println(2 * (v*2 + 1))
}
```
雖然這看起來簡單得多,但正如我們接下來會看到的,程序在處理數據流時不會提供與管道相同的好處。
請注意每個階段如何獲取數據切片并返回數據的。這些階段的行為我們稱為批處理。這意味著它們一次性對大塊數據進行操作,而不是一次一個單獨進行。還有另一種類型的管道,模塊每次僅接收和返回單個元素。
批處理和流處理各有優點和缺點,我們稍微討論一下。現在,請注意,為使原始數據保持不變,每個階段都必須創建一個等長的新切片來存儲其計算結果。這意味著我們的程序在任何時候的內存占用量都是我們發送到管道開始處的切片大小的兩倍。 讓我們轉換為面向流操作,看看會有什么效果:
```
multiply := func(value, multiplier int) int {
return value * multiplier
}
add := func(value, additive int) int {
return value + additive
}
ints := []int{1, 2, 3, 4}
for _, v := range ints {
fmt.Println(multiply(add(multiply(v, 2), 1), 2))
}
```
這會輸出:
```
6
10
14
18
```
每個階段都接收并返回一個值,程序的內存占用空間將回落到管道輸入數據的大小。但是我們不得不將管道放入到for循環的體內,這讓我們的操作變“重”了。這不僅限制了我們對管道的重復使用,而且我們稍后會在本節中看到,這也限制了我們的擴展能力。 實際上,我們因為循環而在每次迭代中實例化我們的管道。盡管進行函數調用耗費的資源很少,但函數的調用次數確實增加了。那么如果涉及到并發性又如何?我之前說過,使用管道的好處之一是能夠同時處理數據的各個階段,并且我提到了一些關于扇出的內容。這些我們在接下來會進一步了解到。
我會擴展add和multiply來介紹這些概念。現在開始學習在Go中構建管道的最佳實踐的時候了,先從并發原語通道開始。
* * * * *
學識淺薄,錯誤在所難免。我是長風,歡迎來Golang中國的群(211938256)就本書提出修改意見。
- 前序
- 誰適合讀這本書
- 章節導讀
- 在線資源
- 第一章 并發編程介紹
- 摩爾定律,可伸縮網絡和我們所處的困境
- 為什么并發編程如此困難
- 數據競爭
- 原子性
- 內存訪問同步
- 死鎖,活鎖和鎖的饑餓問題
- 死鎖
- 活鎖
- 饑餓
- 并發安全性
- 優雅的面對復雜性
- 第二章 代碼建模:序列化交互處理
- 并發與并行
- 什么是CSP
- CSP在Go中的衍生物
- Go的并發哲學
- 第三章 Go的并發構建模塊
- Goroutines
- sync包
- WaitGroup
- Mutex和RWMutex
- Cond
- Once
- Pool
- Channels
- select語句
- GOMAXPROCS
- 結論
- 第四章 Go的并發編程范式
- 訪問范圍約束
- fo-select循環
- 防止Goroutine泄漏
- or-channel
- 錯誤處理
- 管道
- 構建管道的最佳實踐
- 便利的生成器
- 扇入扇出
- or-done-channel
- tee-channel
- bridge-channel
- 隊列
- context包
- 小結
- 第五章 可伸縮并發設計
- 錯誤傳遞
- 超時和取消
- 心跳
- 請求并發復制處理
- 速率限制
- Goroutines異常行為修復
- 本章小結
- 第六章 Goroutines和Go運行時
- 任務調度