<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                心跳是并發進程向外界發出信號的一種方式。命名者從人體解剖學中受到啟發,使用心跳一詞表示被觀察者的生命體征。心跳在Go語言出現前就已被廣泛使用。 在并發中使用心跳是有原因的。心跳能夠讓我們更加深入的了解系統,并且在系統存在不確定性的時候對其測試。 我們將在本節中討論兩種不同類型的心跳: * 以固定時間間隔產生的心跳。 * 在工作單元開始時產生的心跳。 固定時間間隔產生的心跳對于并發來說很有用,它可能在等待處理某個工作單元執行某個任務時發生。由于你不知道這項工作什么時候會進行,所以你的goroutine可能會持續等待。心跳是一種向監聽者發出信號的方式,即一切都很好,當前靜默是正常的。 以下代碼演示了會產生心跳的goroutine: ``` doWork := func(done <-chan interface{}, pulseInterval time.Duration) (<-chan interface{}, <-chan time.Time) { heartbeat := make(chan interface{}) //1 results := make(chan time.Time) go func() { defer close(heartbeat) defer close(results) pulse := time.Tick(pulseInterval) //2 workGen := time.Tick(2 * pulseInterval) //3 sendPulse := func() { select { case heartbeat <- struct{}{}: default: //4 } } sendResult := func(r time.Time) { for { select { case <-done: return case <-pulse: //5 sendPulse() case results <- r: return } } } for { select { case <-done: return case <-pulse: //5 sendPulse() case r := <-workGen: sendResult(r) } } }() return heartbeat, results } ``` 1. 在這里,我們設置了一個發送心跳信號的通道。doWork會返回該通道。 2. 我們按傳入的pulseInterval值定時發送心跳,每次心跳都意味著可以從該通道上讀取到內容。 3. 這只是用來模擬進入的工作的另一處代碼。我們選擇一個比pulseInterval更長的持續時間,以便我們可以看到來自goroutine的心跳。 4. 請注意,我們包含一個default子句。我們必須考慮如果沒有人接受到心跳的情況。從goroutine發出的結果是至關重要的,但心跳不是。 5. 就像done通道,無論何時執行發送或接收,你都需要考慮心跳發送的情況。 請注意,由于我們可能在等待輸入時發送多個pulse,或者在等待發送結果時發送多個pulse,所有select語句都需要在for循環內。 目前看起來不錯; 我們如何利用這個函數并消費它發出的事件? 讓我們來看看: ``` done := make(chan interface{}) time.AfterFunc(10*time.Second, func() { close(done) }) //1 const timeout = 2 * time.Second //2 heartbeat, results := doWork(done, timeout/2) //3 for { select { case _, ok := <-heartbeat: //4 if ok == false { return } fmt.Println("pulse") case r, ok := <-results: //5 if ok == false { return } fmt.Printf("results %v\n", r.Second()) case <-time.After(timeout): //6 return } } ``` 1. 我們設置done通道并在10秒后關閉它。 2. 我們在這里設定超時時間 我們將用它將心跳間隔與超時時間相耦合。 3. 我們向dowork傳入超時時間的一半。 4. 我們將hearbeat的讀取放入select語句中。每間隔 timeout/2 獲取一次來自心跳通道的消息。如果我們沒有收到消息,那就說明該goroutine存在問題。 5. 我們從result通道獲取數據,沒有什么特別的。 6. 如果我們沒有收到心跳或result,程序就會超時結束。 這會輸出: ``` pulse pulse results 52 pulse pulse results 54 pulse pulse results 56 pulse pulse results 58 pulse ``` 和預期的一樣,每次從result中接收到信息,都會收到兩次心跳。 我們可能會使用這樣的功能來收集系統的統計參數,當你的goroutine沒有像預期那樣運行,那么基于固定時間的心跳信號的作用會非常明顯。 考慮下一個例子。 我們將在兩次迭代后停止goroutine來模擬循環中斷,然后不關閉任何一個通道; ``` doWork := func(done <-chan interface{}, pulseInterval time.Duration) (<-chan interface{}, <-chan time.Time) { heartbeat := make(chan interface{}) results := make(chan time.Time) go func() { pulse := time.Tick(pulseInterval) workGen := time.Tick(2 * pulseInterval) sendPulse := func() { select { case heartbeat <- struct{}{}: default: } } sendResult := func(r time.Time) { for { select { case <-pulse: sendPulse() case results <- r: return } } } for i := 0; i < 2; i++ { //1 select { case <-done: return case <-pulse: sendPulse() case r := <-workGen: sendResult(r) } } }() return heartbeat, results } done := make(chan interface{}) time.AfterFunc(10*time.Second, func() { close(done) }) const timeout = 2 * time.Second heartbeat, results := doWork(done, timeout/2) for { select { case _, ok := <-heartbeat: if ok == false { return } fmt.Println("pulse") case r, ok := <-results: if ok == false { return } fmt.Printf("results %v\n", r) case <-time.After(timeout): fmt.Println("worker goroutine is not healthy!") return } } ``` 1. 這里我們簡單模擬循環中斷。前面的例子中,未收到通知會無限循環。這里我們只循環兩次。 這會輸出: ``` pulse pulse worker goroutine is not healthy! ``` 效果很不錯。在兩秒鐘之內,我們的系統意識到goroutine未能正確讀取,并且打破了for-select循環。通過使用心跳,我們已經成功地避免了死鎖,并且不必通過依賴較長的超時而保持穩定性。 我們將在“Goroutines異常行為修復”中進一步理解這個概念。 另外請注意,心跳會幫助處理相反的情況:它讓我們知道長時間運行的goroutine依然存在,但花了一段時間才產生一個值并發送至通道。 接下來讓我們看看另一個場景:在工作單元開始時產生的心跳。這對測試非常有用。下面是個例子: ``` doWork := func(done <-chan interface{}) (<-chan interface{}, <-chan int) { heartbeatStream := make(chan interface{}, 1) //1 workStream := make(chan int) go func() { defer close(heartbeatStream) defer close(workStream) for i := 0; i < 10; i++ { select { //2 case heartbeatStream <- struct{}{}: default: //3 } select { case <-done: return case workStream <- rand.Intn(10): } } }() return heartbeatStream, workStream } done := make(chan interface{}) defer close(done) heartbeat, results := doWork(done) for { select { case _, ok := <-heartbeat: if ok { fmt.Println("pulse") } else { return } case r, ok := <-results: if ok { fmt.Printf("results %v\n", r) } else { return } } } ``` 1. 這里我們用一個緩沖區創建心跳通道。這確保即使沒有人及時監聽發送,也總會發送至少一個pulse。 2. 在這里,我們為心跳設置了一個單獨的select塊。我們不希望將它與發送結果一起包含在同一個select塊中,因為如果接收器未準備好,它們將接收到一個pulse,而result的當前值將會丟失。我們也沒有為done通道提供case語句,因為我們有一個default可以處理這種情況。 3. 我們再次處理如果沒有人監聽到心頭。因為我們的心跳通道是用緩沖區創建的,如果有人在監聽,但沒有及時處理第一個心跳,仍會被通知。 這會輸出: ``` pulse results 1 pulse results 7 pulse results 7 pulse results 9 pulse results 1 pulse results 8 pulse results 5 pulse results 0 pulse results 6 pulse results 0 ``` 如預期一致,每個結果都會有一個心跳。 至于測試的編寫。考慮下面的代碼: ``` func DoWork( done <-chan interface {}, nums ...int ) (<-chan interface{}, <-chan int) { heartbeat := make(chan interface{}, 1) intStream := make(chan int) go func () { defer close(heartbeat) defer close(intStream) time.Sleep(2*time.Second) // 1 for _, n := range nums { select { case heartbeat <- struct{}{}: default: } select { case <-done: return case intStream <- n: } } }() return heartbeat, intStream } ``` 1. 我們在goroutine開始工作之前模擬延遲。在實踐中,延遲可以由各種各樣的原因導致,例如CPU負載,磁盤爭用,網絡延遲和bug。 DoWork函數是一個相當簡單的生成器,它將傳入的數字轉換為它返回通道上的數據流。我們來試試這個函數。下面提供了一個測試的反例: ``` func TestDoWork_GeneratesAllNumbers(t *testing.T) { done := make(chan interface{}) defer close(done) intSlice := []int{0, 1, 2, 3, 5} _, results := DoWork(done, intSlice...) for i, expected := range intSlice { select { case r := <-results: if r != expected { t.Errorf( "index %v: expected %v, but received %v,", i, expected, r, ) } case <-time.After(1 * time.Second): // 1 t.Fatal("test timed out") } } } ``` 1. 在這里,我們設置超時,以防止goroutine出現問題導致死鎖。 運行結果為: ``` go test ./bad_concurrent_test.go --- FAIL: TestDoWork_GeneratesAllNumbers (1.00s) bad_concurrent_test.go:46: test timed out FAIL FAIL command-line-arguments 1.002s ``` 這個測試之所以不好,是因為它的不確定性。如果移除time.Sleep情況會變得更糟:這個測試會有時通過,有時失敗。 我們之前提到過程中的外部因素可能會導致goroutine花費更長的時間才能完成第一次迭代。關鍵在于我們不能保證在超時之前第一次迭代會完成,所以我們開始考慮:這時候超時會有多大意義?我們可以增加超時時間,但這意味著測試時失敗也需要很長時間,從而減慢我們的測試效率。 這種情況很可怕,項目組甚至會對測試的正確性及必要性產生懷疑。 幸運的是這種情況并非無解。這是一個正確的測試: ``` func TestDoWork_GeneratesAllNumbers(t *testing.T) { done := make(chan interface{}) defer close(done) intSlice := []int{0, 1, 2, 3, 5} heartbeat, results := DoWork(done, intSlice...) <-heartbeat //1 i := 0 for r := range results { if expected := intSlice[i]; r != expected { t.Errorf("index %v: expected %v, but received %v,", i, expected, r) } i++ } } ``` 1. 在這里,我們等待goroutine發出信號表示它正在開始處理迭代。 運行此測試會產生以下輸出 ``` ok command-line-arguments 2.002s ``` 使用心跳我們可以安全地編寫該測試,而不會超時。運行的唯一風險是我們的一次迭代花費了過多的時間。 如果這對我們很重要,我們可以利用更安全的、基于間隔的心跳。 以下是使用基于間隔的心跳的測試示例: ``` func DoWork(done <-chan interface{}, pulseInterval time.Duration, nums ...int) (<-chan interface{}, <-chan int) { heartbeat := make(chan interface{}, 1) intStream := make(chan int) go func() { defer close(heartbeat) defer close(intStream) time.Sleep(2 * time.Second) pulse := time.Tick(pulseInterval) numLoop: //2 for _, n := range nums { for { //1 select { case <-done: return case <-pulse: select { case heartbeat <- struct{}{}: default: } case intStream <- n: continue numLoop //3 } } } }() return heartbeat, intStream } func TestDoWork_GeneratesAllNumbers(t *testing.T) { done := make(chan interface{}) defer close(done) intSlice := []int{0, 1, 2, 3, 5} const timeout = 2 * time.Second heartbeat, results := DoWork(done, timeout/2, intSlice...) <-heartbeat //4 i := 0 for { select { case r, ok := <-results: if ok == false { return } else if expected := intSlice[i]; r != expected { t.Errorf( "index %v: expected %v, but received %v,", i, expected, r, ) } i++ case <-heartbeat: //5 case <-time.After(timeout): t.Fatal("test timed out") } } } ``` 1. 我們需要兩個循環:一個用來覆蓋我們的數字列表,并且這個內部循環會運行直到intStream上的數字成功發送。 2. 我們在這里使用一個標簽來使內部循環繼續更簡單一些。 3. 這里我們繼續執行外部循環。 4. 我們仍然等待第一次心跳出現,表明我們已經進入了goroutine的循環。 5. 我們在這里獲取心跳以實現超時。 運行此測試會輸出: ``` ok command-line-arguments 3.002s ``` 你可能已經注意到這個版本的邏輯有點混亂。如果你確信goroutine的循環在啟動后不會停止執行,我建議只阻塞第一次心跳,然后進入循環語句。你可以編寫單獨的測試,專門來測試如未能關閉通道,循環迭代耗時過長以及其他與時間相關的情況。 在編寫并發代碼時,心跳不是絕對必要的,但本節將展示其的實用性。對于任何需要測試的長期運行的goroutines,我強烈推薦這種模式。 * * * * * 學識淺薄,錯誤在所難免。我是長風,歡迎來Golang中國的群(211938256)就本書提出修改意見。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看