<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                # 6.11 計時器 > 本節內容提供一個線上演講:[YouTube 在線](https://www.youtube.com/watch?v=XJx0eTP-y9I),[Google Slides 講稿](https://changkun.de/s/timer114/)。 time 是一個很有意思的包,除去需要獲取當前時間的 Now 這一平淡無奇、直接對系統調用進行 封裝(`runtime·nanotime`)的函數外,其中最有意思的莫過于它所提供的 Timer 和 Ticker 了。 他們的實現,驅動了諸如`time.After`,`time.AfterFunc`,`time.Tick`,`time.Sleep`等方法。 本節我們便來仔細了解一下 Timer 的實現機制。 TODO: Timer 最近增加了幾次更新,修復了一些性能問題,本文需要更新: * [https://go-review.googlesource.com/c/go/+/214299/](https://go-review.googlesource.com/c/go/+/214299/) * [https://go-review.googlesource.com/c/go/+/214185/](https://go-review.googlesource.com/c/go/+/214185/) * [https://go-review.googlesource.com/c/go/+/215722](https://go-review.googlesource.com/c/go/+/215722) * [https://go-review.googlesource.com/c/go/+/221077](https://go-review.googlesource.com/c/go/+/221077) 如果用戶代碼啟動和停止很多計時器(例如 context.WithTimeout)則將穩定使用內存中殘留的已經停止的計時器,而不是已經從計時器堆中刪除的計時器。,第一個 CL 解決了這個問題,會在計時器總數超過 1/4 堆計時器時候,刪除所有已經刪除的計時器。 第二個 CL 嘗試解決這個問題:空閑的 P 在偷取計時器時會與正在運行的 P 發生鎖的競爭。方法是僅在下一個計時器準備運行或有一些計時器需要運行時才獲得計時器鎖定。 但是這個方案有帶來了新的問題:導致越來越多的已經刪除的計時器積累,從而沒有就緒任何計時器。一個解決方案是在沒有獲取鎖的情況下檢查否有很多已經刪除的計時器。 Timer 和 Ticker 所有的功能核心自然由運行時機制來驅動。當創建一個 Timer 時候: ``` type Timer struct { C &lt;-chan Time r runtimeTimer } func NewTimer(d Duration) *Timer { c := make(chan Time, 1) t := &amp;Timer{ C: c, r: runtimeTimer{ when: when(d), // when 僅僅只是將事件觸發的 walltime 轉換為 int64 f: sendTime, arg: c, }, } startTimer(&amp;t.r) return t } func (t *Timer) Stop() bool { (...) return stopTimer(&amp;t.r) } func (t *Timer) Reset(d Duration) bool { (...) w := when(d) active := stopTimer(&amp;t.r) resetTimer(&amp;t.r, w) return active } ``` Ticker 與 Timer 的本質區別僅僅在與 Ticker 多設置了 period 字段: ``` type Ticker struct { C &lt;-chan Time // 當事件觸發時,Timer 會向此通道發送觸發的時間 r runtimeTimer } func NewTicker(d Duration) *Ticker { (...) c := make(chan Time, 1) t := &amp;Ticker{ C: c, r: runtimeTimer{ when: when(d), period: int64(d), f: sendTime, arg: c, }, } startTimer(&amp;t.r) return t } func (t *Ticker) Stop() { stopTimer(&amp;t.r) } func Tick(d Duration) &lt;-chan Time { (...) return NewTicker(d).C } ``` Timer 同時為 After、AfterFunc 提供了支持,代碼相對簡單,我們不在此贅述。 直接進入 runtimeTimer 結構。runtime 對外暴露的 timer 所有功能如下: ``` type runtimeTimer struct { pp uintptr // timer 所在的 P 的指針 // 當時間為 when 時,喚醒 timer,當時間為 when+period, ... (period &gt; 0) // 時,均在 timer Goroutine 中調用 f(arg, now),從而 f 必須具有良好的行為(不會阻塞) when int64 period int64 f func(interface{}, uintptr) arg interface{} seq uintptr nextwhen int64 status uint32 } func startTimer(*runtimeTimer) func stopTimer(*runtimeTimer) bool func resetTimer(*runtimeTimer, int64) ``` 可見,timer 返回的 channel 會被用戶代碼的 Goroutine 持有,為了使 channel 能正常 進行消息通信,每當 timer 被喚醒時,timer 自建的 Goroutine 會單獨向 channel 發送 當前時間`Now()`: ``` func sendTime(c interface{}, seq uintptr) { select { case c.(chan Time) &lt;- Now(): default: } } ``` ## 6.11.1 Timer 狀態機 早在 Go 1.10 以前,所有的 timer 均在一個全局的四叉小頂堆中進行維護,顯然并發性能是 不夠的,隨后到了 Go 1.10 時,將堆的數量擴充到了 64 個,但仍然需要在喚醒 timer 時, 頻繁的將 M 和 P 進行解綁(`timerproc`),性能依然不夠出眾。而到 Go 1.14 時, Go 運行時中的 timer 使用 netpoll 進行驅動,每個 timer 堆均附著在 P 上,形成一個 局部的 timer 堆,消除了喚醒一個 timer 時進行 M/P 切換的開銷,大幅削減了鎖的競爭, 與 nginx 中 timer 的實現方式非常相似。 在 P 結構的定義中,存在兩個有關 timer 的字段: ``` type p struct { (...) // timers 字段的鎖。我們通常在 P 運行時訪問 timers,但 scheduler 仍可以 // 在不同的 P 上進行訪問。 timersLock mutex // 某段時間需要進行的動作。用于實現 time 包。 timers []*timer // 在 P 堆中 timerModifiedEarlier timers 的數量。 // 僅當持有 timersLock 或者當 timer 狀態轉換為 timerModifying 時才可以修改 adjustTimers uint32 (...) } ``` **在 P 中建立一個局部的 timers 堆,用于在調度器進入調度循環時,快速確定是否需要就緒一個 timer。** `p.timers`中的 timer 以四叉小頂堆(最早發生的 timer 維護堆頂)的形式進行維護, 每當新增一個 timer 時,都會通過`siftupTimer`和`siftdownTimer`維護堆中元素 的順序以滿足最小堆的條件。 ``` // 堆維護算法 // 當想 timer 數組末尾 append 一個 timer 后,通過 siftupTimer 進行重整 func siftupTimer(t []*timer, i int) bool { if i &gt;= len(t) { return false } (...) // 對 i 位置的 timer 根據 when 字段進行調整 return true } func siftdownTimer(t []*timer, i int) bool { n := len(t) if i &gt;= n { return false } (...) // 對 i 位置的 timer 根據 when 字段進行調整 return true } ``` 我們知道,當調度器完成對一個 G 的調度后,會重新進入調度循環(`runtime.schedule`)。 timer 作為一個對時間敏感的功能,同網絡數據的拉取操作一樣,可運行的 timer 也在此進行 檢查(`runtime.checkTimers`),如果有 timer 可以運行,則直接調度該 G。 其中 checkTimer 會調整 timers 數組中 timer 的順序(`runtime.adjusttimers`), 然后運行需要執行的 timer(`runtime.runtimer`)。 如果此時沒有直接可用的 timer,且當前 P 的 G 隊列已空,則`runtime.findrunnable`便會進行任務偷取工作(`runtime.runqsteal`),對應到 timer,操作仍可以被抽象為`runtime.checkTimers`。 在這個過程中要小心當 P 被回收時,需要將局部的 P 進行刪除,或者轉移到其他 P 上, 由`runtime.moveTimers`實現。 一個 Timer 具有十種狀態,他們之間的狀態轉換圖如圖 1 所示。 ![](https://golang.design/under-the-hood/assets/timers.png)**圖 1: 計時器狀態機** 總結來說: 1. 一個 Timer 的標準生命周期為:`NoStatus -> Waiting -> Running -> NoStatus` 2. 當人為的對 Timer 進行刪除時:`NoStatus -> Waiting -> Deleted -> Removing -> Removed` 3. 當人為的對 Timer 進行修改時:`NoStatus -> Waiting -> Modifying -> ModifiedEarlier/ModifiedLater -> Moving -> Waiting -> Running -> NoStatus` 4. 當人為的對 Timer 進行重置時:`NoStatus -> Waiting -> Deleted -> Removing -> Removed -> Waiting -> Running -> NoStatus` ### Timer 的啟動(startTimer) 啟動一個 timer 的操作非常直觀: 當 timer 分配好后,會通過 addtimer 將 timer 添加到創建 timer 的 P 上。 ``` func startTimer(t *timer) { (...) addtimer(t) } func addtimer(t *timer) { (...) if t.status != timerNoStatus { panic(...) } t.status = timerWaiting addInitializedTimer(t) } // 將 timer 初始化到當前的 P 上 func addInitializedTimer(t *timer) { when := t.when pp := getg().m.p.ptr() lock(&amp;pp.timersLock) ok := cleantimers(pp) &amp;&amp; doaddtimer(pp, t) unlock(&amp;pp.timersLock) if !ok { panic(...) } wakeNetPoller(when) } ``` 在添加數組之前,需要確保不會對其他的 timer 產生影響,存在兩種狀態轉換: 1. `timerDeleted`\->`timerRemoving`\->`timerRemoved` 2. `timerModifiedEarlier/timerModifiedLater`\->`timerMoving`\->`timerWaiting` ``` // 此時已持有 timersLock func cleantimers(pp *p) bool { for { if len(pp.timers) == 0 { return true } t := pp.timers[0] // 堆頂,when 最小,最早發生的 timer (...) switch s := atomic.Load(&amp;t.status); s { case timerDeleted: // timerDeleted --&gt; timerRemoving --&gt; 從堆中刪除 timer --&gt; timerRemoved if !atomic.Cas(&amp;t.status, s, timerRemoving) { continue } if !dodeltimer0(pp) { return false } if !atomic.Cas(&amp;t.status, timerRemoving, timerRemoved) { return false } case timerModifiedEarlier, timerModifiedLater: // timerMoving --&gt; 調整 timer 的時間 --&gt; timerWaiting // 此時 timer 被調整為更早或更晚,將原先的 timer 進行刪除,再重新添加 if !atomic.Cas(&amp;t.status, s, timerMoving) { continue } t.when = t.nextwhen if !dodeltimer0(pp) { return false } if !doaddtimer(pp, t) { return false } if s == timerModifiedEarlier { atomic.Xadd(&amp;pp.adjustTimers, -1) } if !atomic.Cas(&amp;t.status, timerMoving, timerWaiting) { return false } default: // 無需調整 return true } } } ``` 然后將 timer 鏈接到當前的 P 上,在直接 append 到數組中: ``` // 將 timer 添加到當前 P 的堆上 // 此時已持有 timersLock func doaddtimer(pp *p, t *timer) bool { (...) t.pp.set(pp) i := len(pp.timers) pp.timers = append(pp.timers, t) return siftupTimer(pp.timers, i) } ``` 處理掉已經過期的 timer: ``` func dodeltimer0(pp *p) bool { t := pp.timers[0] t.pp = 0 (...) last := len(pp.timers) - 1 if last &gt; 0 { // 將堆頂元素放到數組最后,并將其標記為 nil,并隨后進入 timerRemoved 狀態 // 要么等待 GC 進行回收,要么被 reset 復用 pp.timers[0] = pp.timers[last] } pp.timers[last] = nil pp.timers = pp.timers[:last] ok := true if last &gt; 0 { // 對堆進行重新維護 ok = siftdownTimer(pp.timers, 0) } return ok } ``` 至于`wakeNetPoller(when)`,我們展示先專注于查看 timer 的邏輯,最后再來留意為什么 需要使用 poller。 ### Timer 的終止(stopTimer) 停止一個 timer 的操作自然是嘗試將 timer 從它所在的堆中刪除,包含四種類型的狀態轉換: 1. timerWaiting/timerModifiedLater –> timerDeleted 2. timerModifiedEarlier –> timerModifying –> timerDeleted 3. timerDeleted/timerRemoving/timerRemoved 狀態保持 4. timerRunning/timerMoving 等待下一次狀態檢查 ``` func stopTimer(t *timer) bool { return deltimer(t) } func deltimer(t *timer) bool { for { switch s := atomic.Load(&amp;t.status); s { case timerWaiting, timerModifiedLater: // timerWaiting/timerModifiedLater --&gt; timerDeleted if atomic.Cas(&amp;t.status, s, timerDeleted) { // timer 尚未運行 return true } case timerModifiedEarlier: // timerModifiedEarlier --&gt; timerModifying --&gt; timerDeleted tpp := t.pp.ptr() if atomic.Cas(&amp;t.status, s, timerModifying) { atomic.Xadd(&amp;tpp.adjustTimers, -1) if !atomic.Cas(&amp;t.status, timerModifying, timerDeleted) { panic(...) } // timer 尚未運行 return true } case timerDeleted, timerRemoving, timerRemoved: // Timer 已經運行 return false case timerRunning, timerMoving: // timer 正在運行或被其他 P 移動,等待完成 osyield() case timerNoStatus: // 刪除一個從未被添加或者已經運行的 timer return false case timerModifying: // 并發調用 deltimer 和 modtimer panic(...) default: panic(...) } } } ``` ### Timer 的重置(resetTimer) 重置 timer 可以是將 timer 的時間提前或者延后,為了保證程序的正確性,從而引入了 timerModifiedEarlier 和 timerModifiedLater 兩種狀態。 當進行 timer 的 ``` func resetTimer(t *timer, when int64) { (...) resettimer(t, when) } func resettimer(t *timer, when int64) { (...) if when &lt; 0 { when = maxWhen } for { switch s { case timerNoStatus, timerRemoved: // 復用一個為初始化或者已經運行完畢或者已經刪除的 timer // timerNoStatus/timerRemoved --&gt; timerWaiting atomic.Store(&amp;t.status, timerWaiting) t.when = when addInitializedTimer(t) return case timerDeleted: if atomic.Cas(&amp;t.status, s, timerModifying) { t.nextwhen = when newStatus := uint32(timerModifiedLater) if when &lt; t.when { newStatus = timerModifiedEarlier atomic.Xadd(&amp;t.pp.ptr().adjustTimers, 1) } if !atomic.Cas(&amp;t.status, timerModifying, newStatus) { panic(...) } if newStatus == timerModifiedEarlier { wakeNetPoller(when) } return } case timerRemoving: // Wait for the removal to complete. osyield() case timerRunning: // Even though the timer should not be active, // we can see timerRunning if the timer function // permits some other goroutine to call resettimer. // Wait until the run is complete. osyield() case timerWaiting, timerModifying, timerModifiedEarlier, timerModifiedLater, timerMoving: // Called resettimer on active timer. panic(...) default: panic(...) } } } ``` ### Timer 的執行(runtimer) 我們還沒有分析到一個 timer 究竟如何被喚醒,但我們不妨先查看當 timer 被喚醒后會做什么。 ``` //go:systemstack func runtimer(pp *p, now int64) int64 { for { t := pp.timers[0] (...) switch s := atomic.Load(&amp;t.status); s { case timerWaiting: if t.when &gt; now { // Not ready to run. return t.when } if !atomic.Cas(&amp;t.status, s, timerRunning) { continue } // runOneTimer 可能臨時解鎖 pp.timersLock runOneTimer(pp, t, now) return 0 case timerDeleted: if !atomic.Cas(&amp;t.status, s, timerRemoving) { continue } if !dodeltimer0(pp) { panic(...) } if !atomic.Cas(&amp;t.status, timerRemoving, timerRemoved) { panic(...) } if len(pp.timers) == 0 { return -1 } case timerModifiedEarlier, timerModifiedLater: if !atomic.Cas(&amp;t.status, s, timerMoving) { continue } t.when = t.nextwhen if !dodeltimer0(pp) { panic(...) } if !doaddtimer(pp, t) { panic(...) } if s == timerModifiedEarlier { atomic.Xadd(&amp;pp.adjustTimers, -1) } if !atomic.Cas(&amp;t.status, timerMoving, timerWaiting) { panic(...) } case timerModifying: // 等待修改完成 osyield() // usleep(1) case timerNoStatus, timerRemoved,timerRunning, timerRemoving, timerMoving: // 這些狀態的 timer 是不應該被觀察到的 panic(...) default: panic(...) } } } //go:systemstack func runOneTimer(pp *p, t *timer, now int64) { (...) f := t.f arg := t.arg seq := t.seq // 如果是 period &gt; 0 則說明此時 timer 為 ticker,需要再次觸發 if t.period &gt; 0 { // 放入堆中并調整觸發時間 delta := t.when - now t.when += t.period * (1 + -delta/t.period) if !siftdownTimer(pp.timers, 0) { panic(...) } if !atomic.Cas(&amp;t.status, timerRunning, timerWaiting) { panic(...) } } else { // 否則為一次性 timer // 從堆中移除 if !dodeltimer0(pp) { panic(...) } if !atomic.Cas(&amp;t.status, timerRunning, timerNoStatus) { panic(...) } } (...) unlock(&amp;pp.timersLock) f(arg, seq) // 觸發 sendTime 信號 通知用戶 Goroutine lock(&amp;pp.timersLock) (...) } ``` ## 6.11.2 Timer 的觸發 ### 從調度循環中直接觸發 前面我們已經簡單的提到了 Timer 會在調度循環中進行例行檢查,或者是通過`wakeNetPoller`來強制觸發 timer。 在調度循環中: ``` func schedule() { _g_ := getg() (...) top: pp := _g_.m.p.ptr() (...) checkTimers(pp, 0) (...) execute(gp, inheritTime) } //go:yeswritebarrierrec func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) { lock(&amp;pp.timersLock) adjusttimers(pp) rnow = now if len(pp.timers) &gt; 0 { if rnow == 0 { rnow = nanotime() } for len(pp.timers) &gt; 0 { // Note that runtimer may temporarily unlock // pp.timersLock. if tw := runtimer(pp, rnow); tw != 0 { if tw &gt; 0 { pollUntil = tw } break } ran = true } } unlock(&amp;pp.timersLock) return rnow, pollUntil, ran } func adjusttimers(pp *p) { if len(pp.timers) == 0 { return } if atomic.Load(&amp;pp.adjustTimers) == 0 { return } var moved []*timer for i := 0; i &lt; len(pp.timers); i++ { t := pp.timers[i] if t.pp.ptr() != pp { throw("adjusttimers: bad p") } switch s := atomic.Load(&amp;t.status); s { case timerDeleted: if atomic.Cas(&amp;t.status, s, timerRemoving) { if !dodeltimer(pp, i) { panic(...) } if !atomic.Cas(&amp;t.status, timerRemoving, timerRemoved) { panic(...) } // Look at this heap position again. i-- } case timerModifiedEarlier, timerModifiedLater: if atomic.Cas(&amp;t.status, s, timerMoving) { // Now we can change the when field. t.when = t.nextwhen // Take t off the heap, and hold onto it. // We don't add it back yet because the // heap manipulation could cause our // loop to skip some other timer. if !dodeltimer(pp, i) { panic(...) } moved = append(moved, t) if s == timerModifiedEarlier { if n := atomic.Xadd(&amp;pp.adjustTimers, -1); int32(n) &lt;= 0 { addAdjustedTimers(pp, moved) return } } } case timerNoStatus, timerRunning, timerRemoving, timerRemoved, timerMoving: panic(...) case timerWaiting: // OK, nothing to do. case timerModifying: // Check again after modification is complete. osyield() i-- default: panic(...) } } if len(moved) &gt; 0 { addAdjustedTimers(pp, moved) } } func addAdjustedTimers(pp *p, moved []*timer) { for _, t := range moved { if !doaddtimer(pp, t) { panic(...) } if !atomic.Cas(&amp;t.status, timerMoving, timerWaiting) { panic(...) } } } ``` 與調度器調度 Goroutine 的機制相同,如果一個 P 中沒有了 timer,同樣會嘗試從其他 的 P 中偷取一半的 timer: ``` func findrunnable() (gp *g, inheritTime bool) { _g_ := getg() top: _p_ := _g_.m.p.ptr() (...) now, pollUntil, _ := checkTimers(_p_, 0) (...) // Poll 網絡,優先級比從其他 P 中偷要高。 // 在我們嘗試去其他 P 偷之前,這個 netpoll 只是一個優化。 // 如果沒有 waiter 或 netpoll 中的線程已被阻塞,則可以安全地跳過它。 // 如果有任何類型的邏輯競爭與被阻塞的線程(例如它已經從 netpoll 返回,但尚未設置 lastpoll) // 該線程無論如何都將阻塞 netpoll。 if netpollinited() &amp;&amp; atomic.Load(&amp;netpollWaiters) &gt; 0 &amp;&amp; atomic.Load64(&amp;sched.lastpoll) != 0 { if list := netpoll(0); !list.empty() { // 無阻塞 gp := list.pop() injectglist(&amp;list) casgstatus(gp, _Gwaiting, _Grunnable) (...) return gp, false } } // 從其他 P 中偷 timer ranTimer := false (...) for i := 0; i &lt; 4; i++ { // 隨機偷 for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() { (...) // Consider stealing timers from p2. // This call to checkTimers is the only place where // we hold a lock on a different P's timers. // Lock contention can be a problem here, so avoid // grabbing the lock if p2 is running and not marked // for preemption. If p2 is running and not being // preempted we assume it will handle its own timers. if i &gt; 2 &amp;&amp; shouldStealTimers(p2) { tnow, w, ran := checkTimers(p2, now) now = tnow if w != 0 &amp;&amp; (pollUntil == 0 || w &lt; pollUntil) { pollUntil = w } if ran { // Running the timers may have // made an arbitrary number of G's // ready and added them to this P's // local run queue. That invalidates // the assumption of runqsteal // that is always has room to add // stolen G's. So check now if there // is a local G to run. if gp, inheritTime := runqget(_p_); gp != nil { return gp, inheritTime } ranTimer = true } } } } if ranTimer { // 執行完一個 timer 后可能存在已經就緒的 Goroutine goto top } stop: // 沒有任何 timer (...) delta := int64(-1) if pollUntil != 0 { // checkTimers ensures that polluntil &gt; now. delta = pollUntil - now } (...) // poll 網絡 // 和上面重新找 runqueue 的邏輯類似 if netpollinited() &amp;&amp; (atomic.Load(&amp;netpollWaiters) &gt; 0 || pollUntil != 0) &amp;&amp; atomic.Xchg64(&amp;sched.lastpoll, 0) != 0 { atomic.Store64(&amp;sched.pollUntil, uint64(pollUntil)) (...) list := netpoll(delta) // block until new work is available atomic.Store64(&amp;sched.pollUntil, 0) atomic.Store64(&amp;sched.lastpoll, uint64(nanotime())) (...) lock(&amp;sched.lock) _p_ = pidleget() unlock(&amp;sched.lock) if _p_ == nil { injectglist(&amp;list) } else { acquirep(_p_) if !list.empty() { gp := list.pop() injectglist(&amp;list) casgstatus(gp, _Gwaiting, _Grunnable) (...) return gp, false } (...) goto top } } else if pollUntil != 0 &amp;&amp; netpollinited() { pollerPollUntil := int64(atomic.Load64(&amp;sched.pollUntil)) if pollerPollUntil == 0 || pollerPollUntil &gt; pollUntil { netpollBreak() } } // 真的什么都沒找到 // park 當前的 m stopm() goto top } func shouldStealTimers(p2 *p) bool { if p2.status != _Prunning { return true } mp := p2.m.ptr() if mp == nil || mp.locks &gt; 0 { return false } gp := mp.curg if gp == nil || gp.atomicstatus != _Grunning || !gp.preempt { return false } return true } ``` 當一個 P 因為某種原因被銷毀時,還需要考慮 timer 的轉移: ``` func (pp *p) destroy() { (...) if len(pp.timers) &gt; 0 { plocal := getg().m.p.ptr() // The world is stopped, but we acquire timersLock to // protect against sysmon calling timeSleepUntil. // This is the only case where we hold the timersLock of // more than one P, so there are no deadlock concerns. lock(&amp;plocal.timersLock) lock(&amp;pp.timersLock) moveTimers(plocal, pp.timers) pp.timers = nil pp.adjustTimers = 0 unlock(&amp;pp.timersLock) unlock(&amp;plocal.timersLock) } (...) } func moveTimers(pp *p, timers []*timer) { for _, t := range timers { loop: for { switch s := atomic.Load(&amp;t.status); s { case timerWaiting: t.pp = 0 if !doaddtimer(pp, t) { panic(...) } break loop case timerModifiedEarlier, timerModifiedLater: if !atomic.Cas(&amp;t.status, s, timerMoving) { continue } t.when = t.nextwhen t.pp = 0 if !doaddtimer(pp, t) { panic(...) } if !atomic.Cas(&amp;t.status, timerMoving, timerWaiting) { panic(...) } break loop case timerDeleted: if !atomic.Cas(&amp;t.status, s, timerRemoved) { continue } t.pp = 0 // We no longer need this timer in the heap. break loop case timerModifying: // Loop until the modification is complete. osyield() case timerNoStatus, timerRemoved: // We should not see these status values in a timers heap. panic(...) case timerRunning, timerRemoving, timerMoving: // Some other P thinks it owns this timer, // which should not happen. panic(...) default: panic(...) } } } } ``` 在調度循環中,我們可以看到,netpoller 的作用在于喚醒喚醒調度循環,每當一個 timer 被設置后 當通過 wakeNetPoller 喚醒 netpoller 時,都能快速讓調度循環進入 timer 的檢查過程, 從而高效的觸發設置的 timer。 ``` func wakeNetPoller(when int64) { if atomic.Load64(&amp;sched.lastpoll) == 0 { pollerPollUntil := int64(atomic.Load64(&amp;sched.pollUntil)) if pollerPollUntil == 0 || pollerPollUntil &gt; when { netpollBreak() } } } ``` ### 從系統監控中觸發 與 Goroutine 調度完全一樣,系統監控也負責 netpoller 的觸發,并在必要時啟動 M 來執行需要的 timer 或獲取網絡數據。 ``` //go:nowritebarrierrec func sysmon() { (...) checkdead() (...) for { (...) now := nanotime() next := timeSleepUntil() // 如果在 STW,則暫時休眠 if debug.schedtrace &lt;= 0 &amp;&amp; (sched.gcwaiting != 0 || atomic.Load(&amp;sched.npidle) == uint32(gomaxprocs)) { lock(&amp;sched.lock) if atomic.Load(&amp;sched.gcwaiting) != 0 || atomic.Load(&amp;sched.npidle) == uint32(gomaxprocs) { if next &gt; now { (...) notetsleep(&amp;sched.sysmonnote, sleep) (...) now = nanotime() next = timeSleepUntil() (...) } (...) } unlock(&amp;sched.lock) } (...) // 如果超過 10ms 沒有 poll,則 poll 一下網絡 lastpoll := int64(atomic.Load64(&amp;sched.lastpoll)) if netpollinited() &amp;&amp; lastpoll != 0 &amp;&amp; lastpoll+10*1000*1000 &lt; now { atomic.Cas64(&amp;sched.lastpoll, uint64(lastpoll), uint64(now)) list := netpoll(0) // 非阻塞,返回 Goroutine 列表 if !list.empty() { // 需要在插入 g 列表前減少空閑鎖住的 m 的數量(假裝有一個正在運行) // 否則會導致這些情況: // injectglist 會綁定所有的 p,但是在它開始 M 運行 P 之前,另一個 M 從 syscall 返回, // 完成運行它的 G ,注意這時候沒有 work 要做,且沒有其他正在運行 M 的死鎖報告。 incidlelocked(-1) injectglist(&amp;list) incidlelocked(1) } } if next &lt; now { // There are timers that should have already run, // perhaps because there is an unpreemptible P. // Try to start an M to run them. startm(nil, false) } (...) } } ``` 在死鎖檢查中,可以對 timers 進行一次檢查,并根據 timer 的狀態選擇喚醒 M 來執行: ``` func checkdead() { (...) // Maybe jump time forward for playground. _p_ := timejump() if _p_ != nil { for pp := &amp;sched.pidle; *pp != 0; pp = &amp;(*pp).ptr().link { if (*pp).ptr() == _p_ { *pp = _p_.link break } } mp := mget() if mp == nil { // There should always be a free M since // nothing is running. throw("checkdead: no m for timer") } mp.nextp.set(_p_) notewakeup(&amp;mp.park) return } // There are no goroutines running, so we can look at the P's. for _, _p_ := range allp { if len(_p_.timers) &gt; 0 { return } } (...) } func timejump() *p { (...) // Nothing is running, so we can look at all the P's. // Determine a timer bucket with minimum when. var ( minT *timer minWhen int64 minP *p ) for _, pp := range allp { (...) if len(pp.timers) == 0 { continue } c := pp.adjustTimers for _, t := range pp.timers { switch s := atomic.Load(&amp;t.status); s { case timerWaiting: if minT == nil || t.when &lt; minWhen { minT = t minWhen = t.when minP = pp } case timerModifiedEarlier, timerModifiedLater: if minT == nil || t.nextwhen &lt; minWhen { minT = t minWhen = t.nextwhen minP = pp } if s == timerModifiedEarlier { c-- } case timerRunning, timerModifying, timerMoving: panic(...) } // The timers are sorted, so we only have to check // the first timer for each P, unless there are // some timerModifiedEarlier timers. The number // of timerModifiedEarlier timers is in the adjustTimers // field, used to initialize c, above. if c == 0 { break } } } (...) return minP } ``` ## 小結 Timer 的實現已經經歷了好幾次大幅度的優化。如今的 Timer 生存在 P 中,每當進入調度循環時, 都會對 Timer 進行檢查,從而快速的啟動那些對時間敏感的 Goroutine, 這一思路也同樣得益于 netpoller,通過系統事件來喚醒那些對有效性極度敏感的任務。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看