心跳是并發進程向外界發出信號的一種方式。命名者從人體解剖學中受到啟發,使用心跳一詞表示被觀察者的生命體征。心跳在Go語言出現前就已被廣泛使用。
在并發中使用心跳是有原因的。心跳能夠讓我們更加深入的了解系統,并且在系統存在不確定性的時候對其測試。
我們將在本節中討論兩種不同類型的心跳:
* 以固定時間間隔產生的心跳。
* 在工作單元開始時產生的心跳。
固定時間間隔產生的心跳對于并發來說很有用,它可能在等待處理某個工作單元執行某個任務時發生。由于你不知道這項工作什么時候會進行,所以你的goroutine可能會持續等待。心跳是一種向監聽者發出信號的方式,即一切都很好,當前靜默是正常的。
以下代碼演示了會產生心跳的goroutine:
```
doWork := func(done <-chan interface{}, pulseInterval time.Duration) (<-chan interface{}, <-chan time.Time) {
heartbeat := make(chan interface{}) //1
results := make(chan time.Time)
go func() {
defer close(heartbeat)
defer close(results)
pulse := time.Tick(pulseInterval) //2
workGen := time.Tick(2 * pulseInterval) //3
sendPulse := func() {
select {
case heartbeat <- struct{}{}:
default: //4
}
}
sendResult := func(r time.Time) {
for {
select {
case <-done:
return
case <-pulse: //5
sendPulse()
case results <- r:
return
}
}
}
for {
select {
case <-done:
return
case <-pulse: //5
sendPulse()
case r := <-workGen:
sendResult(r)
}
}
}()
return heartbeat, results
}
```
1. 在這里,我們設置了一個發送心跳信號的通道。doWork會返回該通道。
2. 我們按傳入的pulseInterval值定時發送心跳,每次心跳都意味著可以從該通道上讀取到內容。
3. 這只是用來模擬進入的工作的另一處代碼。我們選擇一個比pulseInterval更長的持續時間,以便我們可以看到來自goroutine的心跳。
4. 請注意,我們包含一個default子句。我們必須考慮如果沒有人接受到心跳的情況。從goroutine發出的結果是至關重要的,但心跳不是。
5. 就像done通道,無論何時執行發送或接收,你都需要考慮心跳發送的情況。
請注意,由于我們可能在等待輸入時發送多個pulse,或者在等待發送結果時發送多個pulse,所有select語句都需要在for循環內。 目前看起來不錯; 我們如何利用這個函數并消費它發出的事件? 讓我們來看看:
```
done := make(chan interface{})
time.AfterFunc(10*time.Second, func() { close(done) }) //1
const timeout = 2 * time.Second //2
heartbeat, results := doWork(done, timeout/2) //3
for {
select {
case _, ok := <-heartbeat: //4
if ok == false {
return
}
fmt.Println("pulse")
case r, ok := <-results: //5
if ok == false {
return
}
fmt.Printf("results %v\n", r.Second())
case <-time.After(timeout): //6
return
}
}
```
1. 我們設置done通道并在10秒后關閉它。
2. 我們在這里設定超時時間 我們將用它將心跳間隔與超時時間相耦合。
3. 我們向dowork傳入超時時間的一半。
4. 我們將hearbeat的讀取放入select語句中。每間隔 timeout/2 獲取一次來自心跳通道的消息。如果我們沒有收到消息,那就說明該goroutine存在問題。
5. 我們從result通道獲取數據,沒有什么特別的。
6. 如果我們沒有收到心跳或result,程序就會超時結束。
這會輸出:
```
pulse
pulse
results 52
pulse
pulse
results 54
pulse
pulse
results 56
pulse
pulse
results 58
pulse
```
和預期的一樣,每次從result中接收到信息,都會收到兩次心跳。
我們可能會使用這樣的功能來收集系統的統計參數,當你的goroutine沒有像預期那樣運行,那么基于固定時間的心跳信號的作用會非常明顯。
考慮下一個例子。 我們將在兩次迭代后停止goroutine來模擬循環中斷,然后不關閉任何一個通道;
```
doWork := func(done <-chan interface{}, pulseInterval time.Duration) (<-chan interface{}, <-chan time.Time) {
heartbeat := make(chan interface{})
results := make(chan time.Time)
go func() {
pulse := time.Tick(pulseInterval)
workGen := time.Tick(2 * pulseInterval)
sendPulse := func() {
select {
case heartbeat <- struct{}{}:
default:
}
}
sendResult := func(r time.Time) {
for {
select {
case <-pulse:
sendPulse()
case results <- r:
return
}
}
}
for i := 0; i < 2; i++ { //1
select {
case <-done:
return
case <-pulse:
sendPulse()
case r := <-workGen:
sendResult(r)
}
}
}()
return heartbeat, results
}
done := make(chan interface{})
time.AfterFunc(10*time.Second, func() { close(done) })
const timeout = 2 * time.Second
heartbeat, results := doWork(done, timeout/2)
for {
select {
case _, ok := <-heartbeat:
if ok == false {
return
}
fmt.Println("pulse")
case r, ok := <-results:
if ok == false {
return
}
fmt.Printf("results %v\n", r)
case <-time.After(timeout):
fmt.Println("worker goroutine is not healthy!")
return
}
}
```
1. 這里我們簡單模擬循環中斷。前面的例子中,未收到通知會無限循環。這里我們只循環兩次。
這會輸出:
```
pulse
pulse
worker goroutine is not healthy!
```
效果很不錯。在兩秒鐘之內,我們的系統意識到goroutine未能正確讀取,并且打破了for-select循環。通過使用心跳,我們已經成功地避免了死鎖,并且不必通過依賴較長的超時而保持穩定性。 我們將在“Goroutines異常行為修復”中進一步理解這個概念。
另外請注意,心跳會幫助處理相反的情況:它讓我們知道長時間運行的goroutine依然存在,但花了一段時間才產生一個值并發送至通道。
接下來讓我們看看另一個場景:在工作單元開始時產生的心跳。這對測試非常有用。下面是個例子:
```
doWork := func(done <-chan interface{}) (<-chan interface{}, <-chan int) {
heartbeatStream := make(chan interface{}, 1) //1
workStream := make(chan int)
go func() {
defer close(heartbeatStream)
defer close(workStream)
for i := 0; i < 10; i++ {
select { //2
case heartbeatStream <- struct{}{}:
default: //3
}
select {
case <-done:
return
case workStream <- rand.Intn(10):
}
}
}()
return heartbeatStream, workStream
}
done := make(chan interface{})
defer close(done)
heartbeat, results := doWork(done)
for {
select {
case _, ok := <-heartbeat:
if ok {
fmt.Println("pulse")
} else {
return
}
case r, ok := <-results:
if ok {
fmt.Printf("results %v\n", r)
} else {
return
}
}
}
```
1. 這里我們用一個緩沖區創建心跳通道。這確保即使沒有人及時監聽發送,也總會發送至少一個pulse。
2. 在這里,我們為心跳設置了一個單獨的select塊。我們不希望將它與發送結果一起包含在同一個select塊中,因為如果接收器未準備好,它們將接收到一個pulse,而result的當前值將會丟失。我們也沒有為done通道提供case語句,因為我們有一個default可以處理這種情況。
3. 我們再次處理如果沒有人監聽到心頭。因為我們的心跳通道是用緩沖區創建的,如果有人在監聽,但沒有及時處理第一個心跳,仍會被通知。
這會輸出:
```
pulse
results 1
pulse
results 7
pulse
results 7
pulse
results 9
pulse
results 1
pulse
results 8
pulse
results 5
pulse
results 0
pulse
results 6
pulse
results 0
```
如預期一致,每個結果都會有一個心跳。
至于測試的編寫。考慮下面的代碼:
```
func DoWork( done <-chan interface {}, nums ...int ) (<-chan interface{}, <-chan int) {
heartbeat := make(chan interface{}, 1)
intStream := make(chan int)
go func () {
defer close(heartbeat)
defer close(intStream)
time.Sleep(2*time.Second) // 1
for _, n := range nums {
select {
case heartbeat <- struct{}{}:
default:
}
select {
case <-done:
return
case intStream <- n:
}
}
}()
return heartbeat, intStream
}
```
1. 我們在goroutine開始工作之前模擬延遲。在實踐中,延遲可以由各種各樣的原因導致,例如CPU負載,磁盤爭用,網絡延遲和bug。
DoWork函數是一個相當簡單的生成器,它將傳入的數字轉換為它返回通道上的數據流。我們來試試這個函數。下面提供了一個測試的反例:
```
func TestDoWork_GeneratesAllNumbers(t *testing.T) {
done := make(chan interface{})
defer close(done)
intSlice := []int{0, 1, 2, 3, 5}
_, results := DoWork(done, intSlice...)
for i, expected := range intSlice {
select {
case r := <-results:
if r != expected {
t.Errorf(
"index %v: expected %v, but received %v,", i,
expected, r,
)
}
case <-time.After(1 * time.Second): // 1
t.Fatal("test timed out")
}
}
}
```
1. 在這里,我們設置超時,以防止goroutine出現問題導致死鎖。
運行結果為:
```
go test ./bad_concurrent_test.go
--- FAIL: TestDoWork_GeneratesAllNumbers (1.00s) bad_concurrent_test.go:46: test timed out
FAIL
FAIL command-line-arguments 1.002s
```
這個測試之所以不好,是因為它的不確定性。如果移除time.Sleep情況會變得更糟:這個測試會有時通過,有時失敗。
我們之前提到過程中的外部因素可能會導致goroutine花費更長的時間才能完成第一次迭代。關鍵在于我們不能保證在超時之前第一次迭代會完成,所以我們開始考慮:這時候超時會有多大意義?我們可以增加超時時間,但這意味著測試時失敗也需要很長時間,從而減慢我們的測試效率。
這種情況很可怕,項目組甚至會對測試的正確性及必要性產生懷疑。
幸運的是這種情況并非無解。這是一個正確的測試:
```
func TestDoWork_GeneratesAllNumbers(t *testing.T) {
done := make(chan interface{})
defer close(done)
intSlice := []int{0, 1, 2, 3, 5}
heartbeat, results := DoWork(done, intSlice...)
<-heartbeat //1
i := 0
for r := range results {
if expected := intSlice[i]; r != expected {
t.Errorf("index %v: expected %v, but received %v,", i, expected, r)
}
i++
}
}
```
1. 在這里,我們等待goroutine發出信號表示它正在開始處理迭代。 運行此測試會產生以下輸出
```
ok command-line-arguments 2.002s
```
使用心跳我們可以安全地編寫該測試,而不會超時。運行的唯一風險是我們的一次迭代花費了過多的時間。 如果這對我們很重要,我們可以利用更安全的、基于間隔的心跳。
以下是使用基于間隔的心跳的測試示例:
```
func DoWork(done <-chan interface{}, pulseInterval time.Duration, nums ...int) (<-chan interface{}, <-chan int) {
heartbeat := make(chan interface{}, 1)
intStream := make(chan int)
go func() {
defer close(heartbeat)
defer close(intStream)
time.Sleep(2 * time.Second)
pulse := time.Tick(pulseInterval)
numLoop: //2
for _, n := range nums {
for { //1
select {
case <-done:
return
case <-pulse:
select {
case heartbeat <- struct{}{}: default:
}
case intStream <- n:
continue numLoop //3
}
}
}
}()
return heartbeat, intStream
}
func TestDoWork_GeneratesAllNumbers(t *testing.T) {
done := make(chan interface{})
defer close(done)
intSlice := []int{0, 1, 2, 3, 5}
const timeout = 2 * time.Second
heartbeat, results := DoWork(done, timeout/2, intSlice...)
<-heartbeat //4
i := 0
for {
select {
case r, ok := <-results:
if ok == false {
return
} else if expected := intSlice[i]; r != expected {
t.Errorf(
"index %v: expected %v, but received %v,", i,
expected, r,
)
}
i++
case <-heartbeat: //5
case <-time.After(timeout):
t.Fatal("test timed out")
}
}
}
```
1. 我們需要兩個循環:一個用來覆蓋我們的數字列表,并且這個內部循環會運行直到intStream上的數字成功發送。
2. 我們在這里使用一個標簽來使內部循環繼續更簡單一些。
3. 這里我們繼續執行外部循環。
4. 我們仍然等待第一次心跳出現,表明我們已經進入了goroutine的循環。
5. 我們在這里獲取心跳以實現超時。
運行此測試會輸出:
```
ok command-line-arguments 3.002s
```
你可能已經注意到這個版本的邏輯有點混亂。如果你確信goroutine的循環在啟動后不會停止執行,我建議只阻塞第一次心跳,然后進入循環語句。你可以編寫單獨的測試,專門來測試如未能關閉通道,循環迭代耗時過長以及其他與時間相關的情況。
在編寫并發代碼時,心跳不是絕對必要的,但本節將展示其的實用性。對于任何需要測試的長期運行的goroutines,我強烈推薦這種模式。
* * * * *
學識淺薄,錯誤在所難免。我是長風,歡迎來Golang中國的群(211938256)就本書提出修改意見。
- 前序
- 誰適合讀這本書
- 章節導讀
- 在線資源
- 第一章 并發編程介紹
- 摩爾定律,可伸縮網絡和我們所處的困境
- 為什么并發編程如此困難
- 數據競爭
- 原子性
- 內存訪問同步
- 死鎖,活鎖和鎖的饑餓問題
- 死鎖
- 活鎖
- 饑餓
- 并發安全性
- 優雅的面對復雜性
- 第二章 代碼建模:序列化交互處理
- 并發與并行
- 什么是CSP
- CSP在Go中的衍生物
- Go的并發哲學
- 第三章 Go的并發構建模塊
- Goroutines
- sync包
- WaitGroup
- Mutex和RWMutex
- Cond
- Once
- Pool
- Channels
- select語句
- GOMAXPROCS
- 結論
- 第四章 Go的并發編程范式
- 訪問范圍約束
- fo-select循環
- 防止Goroutine泄漏
- or-channel
- 錯誤處理
- 管道
- 構建管道的最佳實踐
- 便利的生成器
- 扇入扇出
- or-done-channel
- tee-channel
- bridge-channel
- 隊列
- context包
- 小結
- 第五章 可伸縮并發設計
- 錯誤傳遞
- 超時和取消
- 心跳
- 請求并發復制處理
- 速率限制
- Goroutines異常行為修復
- 本章小結
- 第六章 Goroutines和Go運行時
- 任務調度