我們在之前的章節列舉了管道的各種優點,但有時候,盡管管道沒有準備好,我們的程序依然還是要干活的,這種處理方式,被稱為“隊列”。
這意味著,一旦管道的某個階段完成了工作,將其存儲在內存中的臨時位置,以便其他階段可以稍后檢索它,而你無需保持其引用。在“Channels”章節,我們討論了帶緩沖的通道,你可以把它視作隊列的一種。
雖然在系統中引入隊列功能非常有用,但它通常是優化程序時希望采用的最后一種技術之一。過早地添加隊列會隱藏同步問題,例如死鎖和活鎖,并且,隨著程序不斷重構,你可能會發現需要更多或更少的隊列。
那么使用隊列有什么好處呢?隊列通常用來嘗試解決性能問題。隊列幾乎不會減少程序的總運行時間,它只會讓程序的行為有所不同。
讓我們看個簡單的管道例子:
```
done := make(chan interface{})
defer close(done)
zeros := take(done, 3, repeat(done, 0))
short := sleep(done, 1*time.Second, zeros)
long := sleep(done, 4*time.Second, short)
pipeline := long
```
這個管道鏈共有4個階段:
1. 間隔0s,不間斷生成數據流。
2. 在接收到3條數據后取消前置操作。
3. 休眠1秒,短耗時階段。
4. 休眠3秒,長耗時階段。
我們假設階段1和階段2是即時的,那么需要關注的是休眠如何影響管道的運行時間。

你可以看到,這個管道耗時13秒。短耗時階段花費了大約9秒。
如果我們給管道加入緩存會怎么樣?讓我們試試在長短耗時階段之間添加個緩沖:
```
done := make(chan interface{})
defer close(done)
zeros := take(done, 3, repeat(done, 0))
short := sleep(done, 1*time.Second, zeros)
buffer := buffer(done, 2, short) // Buffers sends from short by 2
long := sleep(done, 4*time.Second, short)
pipeline := long
```


整個管道依然是13秒,但短耗時階段時長降低到了3秒,看來加入緩存是有效的。但是如果整個管道仍然需要13秒來執行,這對我們有什么幫助?
我們來看看下面這個操作:
```
p := processRequest(done, acceptConnection(done, httpHandler))
```
這條管道會持續運行直到被取消,并且在取消之前會持續接受連接。在這期間,你肯定不希望處理連接的processRequest因acceptConnection接受連接而阻塞,你會希望processRequest是持續可用的,否則程序的用戶可能會發現連接請求被拒絕。
因此,隊列的價值并不是減少了某個階段的運行時間,而是減少了它處于阻塞狀態的時間。 這可以讓程序繼續工作。 在這個例子中,用戶可能會在他們的請求中感受到延遲,但不會被拒絕服務。
通過這種方式,隊列的真正用途是將操作流程分離,以便一個階段的運行時間不會影響另一個階段的運行時間。以這種方式解耦來改變整個系統的運行時行為,這取決于你的程序,產生的結果可能是好的也可能是不好的。
接下來我們回到關于隊列的討論。 隊列應該放在哪里? 緩沖區大小應該是多少? 這些問題的答案取決于管道的性質。
我們首先分析隊列提高系統整體性能適用于哪些情況:
* 如果某個階段執行批處理能夠節省時間。
* 如果推遲某個階段產生結果可以在程序中循環執行。
適用于第一種情況的一個例子是,將輸入緩沖到內存而非硬盤中。實際上bufio包就是這么干的。下面這個例子比較了使用緩沖與非緩沖進行寫操作:
```
func BenchmarkUnbufferedWrite(b *testing.B) {
performWrite(b, tmpFileOrFatal())
}
func BenchmarkBufferedWrite(b *testing.B) {
bufferredFile := bufio.NewWriter(tmpFileOrFatal())
performWrite(b, bufio.NewWriter(bufferredFile))
}
func tmpFileOrFatal() *os.File {
file, err := ioutil.TempFile("", "tmp")
if err != nil {
log.Fatal("error: %v", err)
}
return file
}
func performWrite(b *testing.B, writer io.Writer) {
done := make(chan interface{})
defer close(done)
b.ResetTimer()
for bt := range take(done, repeat(done, byte(0)), b.N) {
writer.Write([]byte{bt.(byte)})
}
}
```
執行命令行:
```
go test -bench=. src/concurrency-patterns-in-go/queuing/buffering_test.go
```
這會輸出:

如預期的那樣,有緩沖的寫入比無緩沖更快。這是因為在bufio.Writer中,寫入操作在內部緩沖區中進入隊列,直到已經積累了足夠長的數據塊,該塊才被寫出。這個過程通常稱為分塊。
分塊速度更快,因為bytes.Buffer必須增加其分配的內存以容納存儲的字節數據。出于各種原因,內存擴張操作代價高昂; 因此,我們需要增長的時間越少,整個系統的整體效率就越高。 于是,隊列提高了整個系統的性能。
這只是一個簡單的內存分塊示例,但是你可能會頻繁地進行分塊。通常,執行任何操作都存在開銷,分塊可能會提高系統性能。例如打開數據庫事務,計算消息校驗和以及分配內存連續空間。
除了分塊之外,如果程序算法支持向后查找或排序優化,隊列也可以起到幫助作用。
第二種情況,某個階段的延遲執行導致更多的數據進入管道,但沒有被發現,這更加致命,因為它可能導致上游系統的崩潰。
這個想法通常被稱為負反饋循環,甚至是死亡螺旋。 這是因為管道與上游系統之間存在經常性關系;上游系統提交新請求的速度在某種程度上與管道的有效性有關。
如果管道的效率降低到某個臨界閾值以下,則管道上游的系統開始增加其對管道的輸入,這導致管道損失更多效率,并且死亡螺旋開始。 如果沒有安全防護,該系統將無法恢復。
通過在管道入口處引入隊列,你可以延遲請求來打破反饋循環。從調用者的角度來看,請求似乎正在處理中,但需要很長時間。只要調用者不超時,管道將保持穩定。如果調用方超時,則需要確保你在出列時支持安全檢查。如果不這樣做,可能會無意中通過處理無效請求創建了另一個負饋循環,從而降低管道的效率。
>如果你曾嘗試過一些熱門的新系統(例如,新游戲服務器,用于產品發布的網站等),并且盡管開發人員盡了最大的努力,但該網站一直處于不穩定狀態,恭喜!你可能目睹了一個負反饋循環。
>開發團隊總是在嘗試不同的解決方案,直到有人意識到他們需要一個隊列,并且匆忙地將其實現。
>然后客戶開始抱怨排隊時間。
從以上的例子中,我們可以看到一種模式慢慢浮出水面,隊列應該滿足以下情況:
* 在管道的入口處。
* 在某個階段進行批處理會更高效。
您可能會試圖在其他地方添加隊列,例如,某個階段會執行密集計算。要避免這種誘惑!正如我們所知道的那樣,只有少數情況下,對立會減少管道的運行時間。為排除干擾而嘗試隊列會產生災難性后果。
為了理解為什么,我們必須討論管道的吞吐量。別擔心,這并不困難,這也將幫助我們回答關于如何確定隊列應該多大的問題。
在隊列理論中,有一條定律(進行足夠的抽樣)可以預測管道的吞吐量。這就是所謂的"最小原則",你只需要知道幾點就可以理解和利用它。
我們以代數的方式定義“最小原則”,它通常表示為:L = λW,其中
* L = 系統中的平均單位數。
* λ = 單位的平均到達率。
* W = 單位在系統中花費的平均時間。
這個等式僅適用于所謂的穩定系統。 在一條管道中,一個穩定的系統就是數據進入管道或入口的速率等于它退出系統或出口的速率。 如果進入速率超過出口速度,那么你的系統就不穩定,并且已經進入死亡螺旋。 如果入口速率小于出口速率,則系統仍然不穩定,因為你的資源沒有被完全利用。這不是世界上最糟糕的情況,但是如果發現資源利用嚴重不足(例如,集群或數據中心),也許你會關心這一點。
假設我們的管道是穩定的。如果我們想要減少單位花費在系統中的平均時間n,只有一個選擇:減少系統中平均單位數:L/n = λW / n。 如果提高出口率,我們只能減少系統中的平均單位數量。還要注意,如果我們將隊列添加到階段,我們增加L,這會增加單位的到達率(nL = nλ* W)或增加單位在系統中的平均時間(nL = λ* nW)。通過最小原則,我們可以證明,隊列對于減少系統花費的時間幫助不大。
同時請注意,由于我們正在觀察整個管道,因此將W減少n倍將分布在我們管道的所有階段。在我們的案例中,最小原則應該是這樣定義的:
```
L = λΣiWi
```
不分青紅皂白的優化,可能導致你的管道完全被最慢的執行階段影響。
這個原則可以幫助我們分析管道的各個階段。假設我們的管道有三個階段。
讓我們嘗試確定管道每秒可以處理多少個請求。假設我們在管道上啟用了采樣,發現1個請求(r)需要約1秒才能通過管道。讓我們向公式放入這些數字:
```
3r = λr/s * 1s
3r/s = λr/s
λr/s = 3r/s
```
我們將L設置為3,因為我們管道中的每個階段都在處理請求。然后我們將W設置為1秒,做一個小代數計算,瞧!在這個管道中,我們每秒可以處理三個請求。
假設采樣表明請求需要1 ms來處理。 我們的隊列需要處理每秒100000次請求的大小是多少?
```
Lr-3r = 100,000r/s * 0.0001s
Lr-3r = 10r
Lr = 7r
```
我們的管道有三個階段,所以我們將L遞減3。將λ設置為100000 r/s,我們發現如果想要處理很多請求,我們的隊列應該有7個容量。請記住, 如果增加隊列的大小,它需要更長的時間才能完成! 你實際上在延遲降低系統利用率。
但這個公式也存在缺陷,它無法觀察對失敗的處理。請記住,如果由于某種原因管道發生混亂,你將丟失隊列中的所有請求。為了緩解這種情況,你可以將隊列大小保持為零,也可以將其移至持久隊列中,該隊列是一個持續存在的隊列,可以在需要時再讀取。
隊列在你的系統中可能很有用,但由于它的復雜性,它通常是我建議實現的最后優化手段之一。
* * * * *
學識淺薄,錯誤在所難免。我是長風,歡迎來Golang中國的群(211938256)就本書提出修改意見。
- 前序
- 誰適合讀這本書
- 章節導讀
- 在線資源
- 第一章 并發編程介紹
- 摩爾定律,可伸縮網絡和我們所處的困境
- 為什么并發編程如此困難
- 數據競爭
- 原子性
- 內存訪問同步
- 死鎖,活鎖和鎖的饑餓問題
- 死鎖
- 活鎖
- 饑餓
- 并發安全性
- 優雅的面對復雜性
- 第二章 代碼建模:序列化交互處理
- 并發與并行
- 什么是CSP
- CSP在Go中的衍生物
- Go的并發哲學
- 第三章 Go的并發構建模塊
- Goroutines
- sync包
- WaitGroup
- Mutex和RWMutex
- Cond
- Once
- Pool
- Channels
- select語句
- GOMAXPROCS
- 結論
- 第四章 Go的并發編程范式
- 訪問范圍約束
- fo-select循環
- 防止Goroutine泄漏
- or-channel
- 錯誤處理
- 管道
- 構建管道的最佳實踐
- 便利的生成器
- 扇入扇出
- or-done-channel
- tee-channel
- bridge-channel
- 隊列
- context包
- 小結
- 第五章 可伸縮并發設計
- 錯誤傳遞
- 超時和取消
- 心跳
- 請求并發復制處理
- 速率限制
- Goroutines異常行為修復
- 本章小結
- 第六章 Goroutines和Go運行時
- 任務調度