在諸如守護進程這樣的長期進程中,擁有一組長生命周期的goroutines非常普遍。這些goroutines通常被阻塞,等待被某種方式喚醒以繼續工作。有時候,這些例程依賴于你沒有很好控制的資源。也許一個goroutine會接收到Web服務中希望獲取數據的請求,或者它正在監視一個臨時文件。 如果程序處理不夠健壯,goroutine會很容易陷入一個糟糕的狀態。在長期運行的過程中,如果能創建一種機制來確保goroutine的健康狀況良好,并在健康狀況不佳時重新啟動,那么我們的項目想必能活得久一點。 我們將在本節討論對goroutines異常行為進行修復的話題。
我們將使用心跳來檢查正在監測的goroutine的活躍程度。心跳的類型將取決于你想要監控的內容,但是如果你的goroutine可能會產生活鎖,請確保心跳包含某種信息,以表明該goroutine不僅沒死掉,而且還可以正常執行任務。在本節中,為了簡單起見,我們只會考慮goroutines是活的還是死的。
下面這段代碼建立一個管理者監視一個goroutine的健康狀況,以及它的子例程。如果例程變得不健康,管理者將重新啟動子例程。為此,它需要引用一個可以啟動goroutine的函數。讓我們看看管理程序是什么樣子的:
```
type startGoroutineFn func(done <-chan interface{},
pulseInterval time.Duration) (heartbeat <-chan interface{}) //1
newSteward := func(timeout time.Duration, startGoroutine startGoroutineFn) startGoroutineFn { //2
return func(done <-chan interface{}, pulseInterval time.Duration) <-chan interface{} {
heartbeat := make(chan interface{})
go func() {
defer close(heartbeat)
var wardDone chan interface{}
var wardHeartbeat <-chan interface{}
startWard := func() { //3
wardDone = make(chan interface{}) //4
wardHeartbeat = startGoroutine(or(wardDone, done), timeout/2) //5
}
startWard()
pulse := time.Tick(pulseInterval)
monitorLoop:
for { //6
timeoutSignal := time.After(timeout)
for {
select {
case <-pulse:
select {
case heartbeat <- struct{}{}:
default:
}
case <-wardHeartbeat: //7
continue monitorLoop
case <-timeoutSignal: //8
log.Println("steward: ward unhealthy; restarting")
close(wardDone)
startWard()
continue monitorLoop
case <-done:
return
}
}
}
}()
return heartbeat
}
}
```
1. 這里我們定義一個可以監控和重新啟動的goroutine的函數簽名。 我們看到熟悉的done通道,以及熟悉的心跳模式寫法。
2. 在這里我們設置了超時時間,并使用函數startGoroutine來啟動它正在監控的goroutine。有趣的是,監控器本身返回一個startGoroutineFn,表示監控器自身也是可監控的。
3. 在這里我們定義一個閉包,它以同樣的的方式來啟動我們正在監視的goroutine。
4. 這是我們創建一個新通道,我們會將其傳遞給監控通道,以響應發出的停止信號。
5. 在這里,我們開啟對目標goroutine的監控。如果監控器停止工作,或者監控器想要停止被監控區域,我們希望監控者也停止,因此我們將兩個done通道都包含在邏輯中。我們傳入的心跳間隔是超時時間的一半,但正如我們在“心跳”中討論的那樣,這可以調整。
6. 這是我們的內部循環,它確保監控者可以發出自己的心跳。
7. 在這里我們如果接收到監控者的心跳,就會知道它還處于正常工作狀態,程序會繼續監測循環。
8. 這里如果我們發現監控者超時,我們要求監控者停下來,并開始一個新的goroutine。然后開始新的監測。
我們的for循環有點雜亂,但如果你閱讀過前面的章節,熟悉其中的模式,那么理解起來會相對簡單。 接下來讓我們試試看如果監控一個行為異常的goroutine,會發生什么:
```
log.SetOutput(os.Stdout)
log.SetFlags(log.Ltime | log.LUTC)
doWork := func(done <-chan interface{}, _ time.Duration) <-chan interface{} {
log.Println("ward: Hello, I'm irresponsible!")
go func() {
<-done // 1
log.Println("ward: I am halting.")
}()
return nil
}
doWorkWithSteward := newSteward(4*time.Second, doWork) // 2
done := make(chan interface{})
time.AfterFunc(9*time.Second, func() { // 3
log.Println("main: halting steward and ward.")
close(done)
})
for range doWorkWithSteward(done, 4*time.Second) { // 4
}
log.Println("Done")
```
1. 可以看到這個goroutine什么都沒干,持續阻塞等待被取消,它同樣不會發出任何表明自己正常信號。
2. 這里開始建立被監控的例程,其4秒后會超時。
3. 這里我們9秒后向done通道發出信號停止整個程序。
4. 最后,我們啟動監控器并在其心跳范圍內防止示例停止。
這會輸出:
```
18:28:07 ward: Hello, I'm irresponsible!
18:28:11 steward: ward unhealthy; restarting 18:28:11 ward: Hello, I'm irresponsible!
18:28:11 ward: I am halting.
18:28:15 steward: ward unhealthy; restarting 18:28:15 ward: Hello, I'm irresponsible!
18:28:15 ward: I am halting.
18:28:16 main: halting steward and ward.
18:28:16 ward: I am halting.
18:28:16 Done
```
看起來工作正常。我們的監控器比較簡單,除了取消操作和心跳所需信息之外不接收也不返回任何參數。我們可以用閉包強化一下:
```
doWorkFn := func(done <-chan interface{}, intList ...int) (startGoroutineFn, <-chan interface{}) {//1
intChanStream := make(chan (<-chan interface{}))//2
intStream := bridge(done, intChanStream)
doWork := func(done <-chan interface{}, pulseInterval time.Duration) <-chan interface{} {//3
intStream := make(chan interface{})//4
heartbeat := make(chan interface{})
go func() {
defer close(intStream)
select {
case intChanStream <- intStream://5
case <-done:
return
}
pulse := time.Tick(pulseInterval)
for {
valueLoop:
for _, intVal := range intList {
if intVal < 0 {
log.Printf("negative value: %v\n", intVal)//6
return
}
for {
select {
case <-pulse:
select {
case heartbeat <- struct{}{}: default:
}
case intStream <- intVal:
continue valueLoop
case <-done:
return
}
}
}
}
}()
return heartbeat
}
return doWork, intStream
}
```
1. 我們將監控器關閉的內容放入返回值,并返回所有監控器用來交流數據的通道。
2. 我們建立通道的通道,這是我們在前面章節中"bridge"模式的應用。
3. 這里我們建立閉包控制監控器的啟動和關閉。
4. 這是各通道與監控器交互數據的實例。
5. 這里我們向起數據交互作用的通道傳入數據。
6. 這里我們返回負數并從goroutine返回以模擬不正常的工作狀態。
由于我們可能會啟動監控器的多個副本,因此我們使用"bridge"模式來幫助向doWorkFn的調用者呈現單個不間斷的通道。通過這樣的方式,我們的監控器可以簡單地通過組成模式而變得任意復雜。讓我們看看如何調用:
```
log.SetFlags(log.Ltime | log.LUTC)
log.SetOutput(os.Stdout)
done := make(chan interface{})
defer close(done)
doWork, intStream := doWorkFn(done, 1, 2, -1, 3, 4, 5) //1
doWorkWithSteward := newSteward(1*time.Millisecond, doWork) //2
doWorkWithSteward(done, 1*time.Hour) //3
for intVal := range take(done, intStream, 6) { //4
fmt.Printf("Received: %v\n", intVal)
}
```
1. 這里我們調用該函數,它會將傳入的不定長整數參數轉換為可通信的流。
2. 在這里,我們創建了一個檢查doWork關閉的監視器。我們預計這里會極快的進入失敗流程,所以將監控時間設置為一毫秒。
3. 我們通知 steward 開啟監測。
4. 最后,我們使用該管道,并從intStream中取出前六個值。
這會輸出:
```
Received: 1
23:25:33 negative value: -1
Received: 2
23:25:33 steward: ward unhealthy; restarting Received: 1
23:25:33 negative value: -1
Received: 2
23:25:33 steward: ward unhealthy; restarting Received: 1
23:25:33 negative value: -1
Received: 2
```
我們可以看到監控器發現錯誤并重啟。你可能還會注意到我們只接收到了1和2,這證明了重啟功能正常。如果你的系統對重復值很敏感,一定要考慮對其進行處理。你也可以考慮在一定次數的失敗后退出。比如在這樣的位置:
```
valueLoop:
for _, intVal := range intList {
// ...
}
```
稍作修改:
```
valueLoop:
for {
intVal := intList[0]
intList = intList[1:]
// ...
}
```
盡管我們依然停留在返回的無效負數上,盡管我們的監控器將繼續失敗,但這會記錄在重新啟動前的位置,你可以在這個思路上擴展。
使用這樣的方式可以確保你的系統保持健康,此外,相信系統崩潰的減少也能大幅度降低開發過程中猝死的幾率。
愿諸君健康工作,準點下班。
* * * * *
學識淺薄,錯誤在所難免。我是長風,歡迎來Golang中國的群(211938256)就本書提出修改意見。
- 前序
- 誰適合讀這本書
- 章節導讀
- 在線資源
- 第一章 并發編程介紹
- 摩爾定律,可伸縮網絡和我們所處的困境
- 為什么并發編程如此困難
- 數據競爭
- 原子性
- 內存訪問同步
- 死鎖,活鎖和鎖的饑餓問題
- 死鎖
- 活鎖
- 饑餓
- 并發安全性
- 優雅的面對復雜性
- 第二章 代碼建模:序列化交互處理
- 并發與并行
- 什么是CSP
- CSP在Go中的衍生物
- Go的并發哲學
- 第三章 Go的并發構建模塊
- Goroutines
- sync包
- WaitGroup
- Mutex和RWMutex
- Cond
- Once
- Pool
- Channels
- select語句
- GOMAXPROCS
- 結論
- 第四章 Go的并發編程范式
- 訪問范圍約束
- fo-select循環
- 防止Goroutine泄漏
- or-channel
- 錯誤處理
- 管道
- 構建管道的最佳實踐
- 便利的生成器
- 扇入扇出
- or-done-channel
- tee-channel
- bridge-channel
- 隊列
- context包
- 小結
- 第五章 可伸縮并發設計
- 錯誤傳遞
- 超時和取消
- 心跳
- 請求并發復制處理
- 速率限制
- Goroutines異常行為修復
- 本章小結
- 第六章 Goroutines和Go運行時
- 任務調度