那么你已經建立了一條管道。 數據在你的系統中歡暢地流動,并在莫連接在一起的各個階段發生變化。 它就像一條美麗的溪流; 一個美麗的,緩慢的溪流,哦,我的上帝為什么這需要這么久?
有時候,管道中的各個階段可能在計算上特別耗費資源。當發生這種情況時,管道中的上游階段可能會在等待完成時被阻塞。不僅如此,管道本身可能需要很長時間才能整體執行。 我們如何解決這個問題?
管道的一個有趣屬性是它的各個階段相互獨立,方便組合。你可以多次重復使用管道的各個階段。因此,在多個goroutine上重用管道的單個階段實現并行化,將有助于提高管道的性能。
事實上,這種模式被稱為扇入扇出。
扇出(Fan-out)是一個術語,用于描述啟動多個goroutines以處理來自管道的輸入的過程,并且扇入(fan-in)是描述將多個結果組合到一個通道中的過程的術語。
那么在什么情況下適用于這種模式呢?如果出現以下兩種情況,你就可以考慮這么干了:
* 不依賴模塊之前的計算結果。
* 運行需要很長時間。
運行的獨立性是非常重要的,因為你無法保證各階段的并發程序以何種順序運行,也無法保證其返回的順序。
我們來看一個例子。在下面的例子中,構建了一個尋找素數的方法。我們將使用在“管道”中的經驗,創建各個階段,并將它們拼接在一起:
```
rand := func() interface{} { return rand.Intn(50000000) }
done := make(chan interface{})
defer close(done)
start := time.Now()
randIntStream := toInt(done, repeatFn(done, rand))
fmt.Println("Primes:")
for prime := range take(done, primeFinder(done, randIntStream), 10) {
fmt.Printf("\t%d\n", prime)
}
fmt.Printf("Search took: %v", time.Since(start))
```
這會輸出:
```
Primes:
24941317
36122539
6410693
10128161
25511527
2107939
14004383
7190363
45931967
2393161
Search took: 23.437511647s
```
我們生成一串隨機數,最大值為50000000,將數據流轉換為整數流,然后將其傳入primeFinder。pri meFinder會嘗試將輸入流提供的數字除以比它小的每個數字。如果不成功,會將該值傳遞到下一個階段。當然,這個方法很低效,但它符合我們程序運行時間較長的要求。
在我們的for循環中,搜索找到的素數,在進入時將它們打印出來,并且take在找到10個素數后關閉管道。然后,我們打印出搜索需要多長時間,完成的通道被延遲聲明關閉,管道停止 。
為了避免結果中出現重復,我們可以把已找到的素數緩存起來,但為了簡單起見,我們將忽略這些。
你可以看到大概需要23秒才能找到10個素數,這實在是有點慢。通常遇到這種情況,我們首先看一下算法本身,也許是拿一本算法書籍,然后看看我們是否能在哪個階段改進。但是,由于目的是通過扇出來解決該問題,所以算法我們暫時先不去管它。
我們的程序現在有兩個階段:生成隨機數和篩選素數。在更大的程序中,你的管道可能由更多的階段組成,那我們該對什么樣的階段使用扇出模式進行改進?請記住我們之前提出的標準:執行順序的獨立性和執行時間。我們的隨機數生成器肯定是與順序無關的,但運行起來并不需要很長的時間。PrimeFinder階段也是順序無關的,因為我們采用的算法效率非常低下,它需要很長時間才能運行完成。因此,我們可以把關注點放在PrimeFinder身上。
為此,我們可以將其操作拆散,就像這樣:
```
numFinders := runtime.NumCPU()
finders := make([]<-chan int, numFinders)
for i := 0; i < numFinders; i++ {
finders[i] = primeFinder(done, randIntStream)
}
```
在我的電腦上,runtime.NumCPU()返回8,在生產中,我們可能會做一些經驗性的測試來確定CPU的最佳數量,但在這里我們將保持簡單,并且假設只有一個findPrimes階段的CPU會被占用。
這就好像一個班級的作業,原本由1位老師批改,現在變成了8位老師同時批改。
接下來我們遇到的問題是,如何將結果匯總到一起。為此,我們開始考慮使用扇入(fan-in)。
正如我們前面所提到的,扇入意味著將多個數據流復用或合并成一個流。 這樣做相對簡單:
```
fanIn := func(done <-chan interface{}, channels ...<-chan interface{}) <-chan interface{} { // 1
var wg sync.WaitGroup // 2
multiplexedStream := make(chan interface{})
multiplex := func(c <-chan interface{}) { // 3
defer wg.Done()
for i := range c {
select {
case <-done:
return
case multiplexedStream <- i:
}
}
}
// 從所有的通道中取數據
wg.Add(len(channels)) // 4
for _, c := range channels {
go multiplex(c)
}
// 等待所有數據匯總完畢
go func() { // 5
wg.Wait()
close(multiplexedStream)
}()
return multiplexedStream
}
```
1. 一如既往,我們使用done通道來關閉衍生的goroutine,并接收接口類型的通道切片來匯總數據。
2. 這里我們使用sync.WaitGroup以等待全部通道讀取完成。
3. 我們在這里建立函數multiplex,它會讀取傳入的通道,并把該通道的值放入multiplexedStream。
4. 這里增加等待計數。
5. 這里我們建立一個goroutine等待匯總完畢。***這樣函數塊可以快速return,不必等待wg.Wait()。這種用法不多見,但在這里很符合場景需求。***
簡而言之,扇入涉及讀取多路復用通道,然后為每個傳入通道啟動一個goroutine,以及在傳入通道全部關閉時關閉復用通道。由于我們要創建一個等待N個其他goroutine完成的goroutine,因此創建sync.WaitGroup來協調處理是有意義的。multiplex還通知WaitGroup它已執行完成。
額外提醒,在對返回結果的順序有要求的情況下扇入扇出可能工作的不是很好。我們沒有做任何事情來保證從randIntStream中讀取數據的順序。稍后,我們將看一個維護順序的例子。
讓我們把所有這些改進放在一起,看看運行時長是否有所減少:
```
done := make(chan interface{})
defer close(done)
start := time.Now()
rand := func() interface{} { return rand.Intn(50000000) }
randIntStream := toInt(done, repeatFn(done, rand))
numFinders := runtime.NumCPU()
fmt.Printf("Spinning up %d prime finders.\n", numFinders)
finders := make([]<-chan interface{}, numFinders)
fmt.Println("Primes:")
for i := 0; i < numFinders; i++ {
finders[i] = primeFinder(done, randIntStream)
}
for prime := range take(done, fanIn(done, finders...), 10) {
fmt.Printf("\t%d\n", prime)
}
fmt.Printf("Search took: %v", time.Since(start))
```
這會輸出:
```
Spinning up 8 prime finders. Primes:
6410693
24941317
10128161
36122539
25511527
2107939
14004383
7190363
2393161
45931967
Search took: 5.438491216s
```
最大降幅23秒,這簡直是個壯舉。運用扇入扇出可以在不大幅改變程序結構的前提下將運行時間縮短了大約78%。
* * * * *
學識淺薄,錯誤在所難免。我是長風,歡迎來Golang中國的群(211938256)就本書提出修改意見。
- 前序
- 誰適合讀這本書
- 章節導讀
- 在線資源
- 第一章 并發編程介紹
- 摩爾定律,可伸縮網絡和我們所處的困境
- 為什么并發編程如此困難
- 數據競爭
- 原子性
- 內存訪問同步
- 死鎖,活鎖和鎖的饑餓問題
- 死鎖
- 活鎖
- 饑餓
- 并發安全性
- 優雅的面對復雜性
- 第二章 代碼建模:序列化交互處理
- 并發與并行
- 什么是CSP
- CSP在Go中的衍生物
- Go的并發哲學
- 第三章 Go的并發構建模塊
- Goroutines
- sync包
- WaitGroup
- Mutex和RWMutex
- Cond
- Once
- Pool
- Channels
- select語句
- GOMAXPROCS
- 結論
- 第四章 Go的并發編程范式
- 訪問范圍約束
- fo-select循環
- 防止Goroutine泄漏
- or-channel
- 錯誤處理
- 管道
- 構建管道的最佳實踐
- 便利的生成器
- 扇入扇出
- or-done-channel
- tee-channel
- bridge-channel
- 隊列
- context包
- 小結
- 第五章 可伸縮并發設計
- 錯誤傳遞
- 超時和取消
- 心跳
- 請求并發復制處理
- 速率限制
- Goroutines異常行為修復
- 本章小結
- 第六章 Goroutines和Go運行時
- 任務調度