# 6.11 計時器
> 本節內容提供一個線上演講:[YouTube 在線](https://www.youtube.com/watch?v=XJx0eTP-y9I),[Google Slides 講稿](https://changkun.de/s/timer114/)。
time 是一個很有意思的包,除去需要獲取當前時間的 Now 這一平淡無奇、直接對系統調用進行 封裝(`runtime·nanotime`)的函數外,其中最有意思的莫過于它所提供的 Timer 和 Ticker 了。 他們的實現,驅動了諸如`time.After`,`time.AfterFunc`,`time.Tick`,`time.Sleep`等方法。 本節我們便來仔細了解一下 Timer 的實現機制。
TODO: Timer 最近增加了幾次更新,修復了一些性能問題,本文需要更新:
* [https://go-review.googlesource.com/c/go/+/214299/](https://go-review.googlesource.com/c/go/+/214299/)
* [https://go-review.googlesource.com/c/go/+/214185/](https://go-review.googlesource.com/c/go/+/214185/)
* [https://go-review.googlesource.com/c/go/+/215722](https://go-review.googlesource.com/c/go/+/215722)
* [https://go-review.googlesource.com/c/go/+/221077](https://go-review.googlesource.com/c/go/+/221077)
如果用戶代碼啟動和停止很多計時器(例如 context.WithTimeout)則將穩定使用內存中殘留的已經停止的計時器,而不是已經從計時器堆中刪除的計時器。,第一個 CL 解決了這個問題,會在計時器總數超過 1/4 堆計時器時候,刪除所有已經刪除的計時器。 第二個 CL 嘗試解決這個問題:空閑的 P 在偷取計時器時會與正在運行的 P 發生鎖的競爭。方法是僅在下一個計時器準備運行或有一些計時器需要運行時才獲得計時器鎖定。 但是這個方案有帶來了新的問題:導致越來越多的已經刪除的計時器積累,從而沒有就緒任何計時器。一個解決方案是在沒有獲取鎖的情況下檢查否有很多已經刪除的計時器。
Timer 和 Ticker 所有的功能核心自然由運行時機制來驅動。當創建一個 Timer 時候:
```
type Timer struct {
C <-chan Time
r runtimeTimer
}
func NewTimer(d Duration) *Timer {
c := make(chan Time, 1)
t := &Timer{
C: c,
r: runtimeTimer{
when: when(d), // when 僅僅只是將事件觸發的 walltime 轉換為 int64
f: sendTime,
arg: c,
},
}
startTimer(&t.r)
return t
}
func (t *Timer) Stop() bool {
(...)
return stopTimer(&t.r)
}
func (t *Timer) Reset(d Duration) bool {
(...)
w := when(d)
active := stopTimer(&t.r)
resetTimer(&t.r, w)
return active
}
```
Ticker 與 Timer 的本質區別僅僅在與 Ticker 多設置了 period 字段:
```
type Ticker struct {
C <-chan Time // 當事件觸發時,Timer 會向此通道發送觸發的時間
r runtimeTimer
}
func NewTicker(d Duration) *Ticker {
(...)
c := make(chan Time, 1)
t := &Ticker{
C: c,
r: runtimeTimer{
when: when(d),
period: int64(d),
f: sendTime,
arg: c,
},
}
startTimer(&t.r)
return t
}
func (t *Ticker) Stop() {
stopTimer(&t.r)
}
func Tick(d Duration) <-chan Time {
(...)
return NewTicker(d).C
}
```
Timer 同時為 After、AfterFunc 提供了支持,代碼相對簡單,我們不在此贅述。 直接進入 runtimeTimer 結構。runtime 對外暴露的 timer 所有功能如下:
```
type runtimeTimer struct {
pp uintptr // timer 所在的 P 的指針
// 當時間為 when 時,喚醒 timer,當時間為 when+period, ... (period > 0)
// 時,均在 timer Goroutine 中調用 f(arg, now),從而 f 必須具有良好的行為(不會阻塞)
when int64
period int64
f func(interface{}, uintptr)
arg interface{}
seq uintptr
nextwhen int64
status uint32
}
func startTimer(*runtimeTimer)
func stopTimer(*runtimeTimer) bool
func resetTimer(*runtimeTimer, int64)
```
可見,timer 返回的 channel 會被用戶代碼的 Goroutine 持有,為了使 channel 能正常 進行消息通信,每當 timer 被喚醒時,timer 自建的 Goroutine 會單獨向 channel 發送 當前時間`Now()`:
```
func sendTime(c interface{}, seq uintptr) {
select {
case c.(chan Time) <- Now():
default:
}
}
```
## 6.11.1 Timer 狀態機
早在 Go 1.10 以前,所有的 timer 均在一個全局的四叉小頂堆中進行維護,顯然并發性能是 不夠的,隨后到了 Go 1.10 時,將堆的數量擴充到了 64 個,但仍然需要在喚醒 timer 時, 頻繁的將 M 和 P 進行解綁(`timerproc`),性能依然不夠出眾。而到 Go 1.14 時, Go 運行時中的 timer 使用 netpoll 進行驅動,每個 timer 堆均附著在 P 上,形成一個 局部的 timer 堆,消除了喚醒一個 timer 時進行 M/P 切換的開銷,大幅削減了鎖的競爭, 與 nginx 中 timer 的實現方式非常相似。
在 P 結構的定義中,存在兩個有關 timer 的字段:
```
type p struct {
(...)
// timers 字段的鎖。我們通常在 P 運行時訪問 timers,但 scheduler 仍可以
// 在不同的 P 上進行訪問。
timersLock mutex
// 某段時間需要進行的動作。用于實現 time 包。
timers []*timer
// 在 P 堆中 timerModifiedEarlier timers 的數量。
// 僅當持有 timersLock 或者當 timer 狀態轉換為 timerModifying 時才可以修改
adjustTimers uint32
(...)
}
```
**在 P 中建立一個局部的 timers 堆,用于在調度器進入調度循環時,快速確定是否需要就緒一個 timer。**
`p.timers`中的 timer 以四叉小頂堆(最早發生的 timer 維護堆頂)的形式進行維護, 每當新增一個 timer 時,都會通過`siftupTimer`和`siftdownTimer`維護堆中元素 的順序以滿足最小堆的條件。
```
// 堆維護算法
// 當想 timer 數組末尾 append 一個 timer 后,通過 siftupTimer 進行重整
func siftupTimer(t []*timer, i int) bool {
if i >= len(t) {
return false
}
(...) // 對 i 位置的 timer 根據 when 字段進行調整
return true
}
func siftdownTimer(t []*timer, i int) bool {
n := len(t)
if i >= n {
return false
}
(...) // 對 i 位置的 timer 根據 when 字段進行調整
return true
}
```
我們知道,當調度器完成對一個 G 的調度后,會重新進入調度循環(`runtime.schedule`)。 timer 作為一個對時間敏感的功能,同網絡數據的拉取操作一樣,可運行的 timer 也在此進行 檢查(`runtime.checkTimers`),如果有 timer 可以運行,則直接調度該 G。 其中 checkTimer 會調整 timers 數組中 timer 的順序(`runtime.adjusttimers`), 然后運行需要執行的 timer(`runtime.runtimer`)。
如果此時沒有直接可用的 timer,且當前 P 的 G 隊列已空,則`runtime.findrunnable`便會進行任務偷取工作(`runtime.runqsteal`),對應到 timer,操作仍可以被抽象為`runtime.checkTimers`。
在這個過程中要小心當 P 被回收時,需要將局部的 P 進行刪除,或者轉移到其他 P 上, 由`runtime.moveTimers`實現。
一個 Timer 具有十種狀態,他們之間的狀態轉換圖如圖 1 所示。
**圖 1: 計時器狀態機**
總結來說:
1. 一個 Timer 的標準生命周期為:`NoStatus -> Waiting -> Running -> NoStatus`
2. 當人為的對 Timer 進行刪除時:`NoStatus -> Waiting -> Deleted -> Removing -> Removed`
3. 當人為的對 Timer 進行修改時:`NoStatus -> Waiting -> Modifying -> ModifiedEarlier/ModifiedLater -> Moving -> Waiting -> Running -> NoStatus`
4. 當人為的對 Timer 進行重置時:`NoStatus -> Waiting -> Deleted -> Removing -> Removed -> Waiting -> Running -> NoStatus`
### Timer 的啟動(startTimer)
啟動一個 timer 的操作非常直觀: 當 timer 分配好后,會通過 addtimer 將 timer 添加到創建 timer 的 P 上。
```
func startTimer(t *timer) {
(...)
addtimer(t)
}
func addtimer(t *timer) {
(...)
if t.status != timerNoStatus {
panic(...)
}
t.status = timerWaiting
addInitializedTimer(t)
}
// 將 timer 初始化到當前的 P 上
func addInitializedTimer(t *timer) {
when := t.when
pp := getg().m.p.ptr()
lock(&pp.timersLock)
ok := cleantimers(pp) && doaddtimer(pp, t)
unlock(&pp.timersLock)
if !ok {
panic(...)
}
wakeNetPoller(when)
}
```
在添加數組之前,需要確保不會對其他的 timer 產生影響,存在兩種狀態轉換:
1. `timerDeleted`\->`timerRemoving`\->`timerRemoved`
2. `timerModifiedEarlier/timerModifiedLater`\->`timerMoving`\->`timerWaiting`
```
// 此時已持有 timersLock
func cleantimers(pp *p) bool {
for {
if len(pp.timers) == 0 {
return true
}
t := pp.timers[0] // 堆頂,when 最小,最早發生的 timer
(...)
switch s := atomic.Load(&t.status); s {
case timerDeleted:
// timerDeleted --> timerRemoving --> 從堆中刪除 timer --> timerRemoved
if !atomic.Cas(&t.status, s, timerRemoving) {
continue
}
if !dodeltimer0(pp) {
return false
}
if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
return false
}
case timerModifiedEarlier, timerModifiedLater:
// timerMoving --> 調整 timer 的時間 --> timerWaiting
// 此時 timer 被調整為更早或更晚,將原先的 timer 進行刪除,再重新添加
if !atomic.Cas(&t.status, s, timerMoving) {
continue
}
t.when = t.nextwhen
if !dodeltimer0(pp) {
return false
}
if !doaddtimer(pp, t) {
return false
}
if s == timerModifiedEarlier {
atomic.Xadd(&pp.adjustTimers, -1)
}
if !atomic.Cas(&t.status, timerMoving, timerWaiting) {
return false
}
default:
// 無需調整
return true
}
}
}
```
然后將 timer 鏈接到當前的 P 上,在直接 append 到數組中:
```
// 將 timer 添加到當前 P 的堆上
// 此時已持有 timersLock
func doaddtimer(pp *p, t *timer) bool {
(...)
t.pp.set(pp)
i := len(pp.timers)
pp.timers = append(pp.timers, t)
return siftupTimer(pp.timers, i)
}
```
處理掉已經過期的 timer:
```
func dodeltimer0(pp *p) bool {
t := pp.timers[0]
t.pp = 0
(...)
last := len(pp.timers) - 1
if last > 0 {
// 將堆頂元素放到數組最后,并將其標記為 nil,并隨后進入 timerRemoved 狀態
// 要么等待 GC 進行回收,要么被 reset 復用
pp.timers[0] = pp.timers[last]
}
pp.timers[last] = nil
pp.timers = pp.timers[:last]
ok := true
if last > 0 {
// 對堆進行重新維護
ok = siftdownTimer(pp.timers, 0)
}
return ok
}
```
至于`wakeNetPoller(when)`,我們展示先專注于查看 timer 的邏輯,最后再來留意為什么 需要使用 poller。
### Timer 的終止(stopTimer)
停止一個 timer 的操作自然是嘗試將 timer 從它所在的堆中刪除,包含四種類型的狀態轉換:
1. timerWaiting/timerModifiedLater –> timerDeleted
2. timerModifiedEarlier –> timerModifying –> timerDeleted
3. timerDeleted/timerRemoving/timerRemoved 狀態保持
4. timerRunning/timerMoving 等待下一次狀態檢查
```
func stopTimer(t *timer) bool {
return deltimer(t)
}
func deltimer(t *timer) bool {
for {
switch s := atomic.Load(&t.status); s {
case timerWaiting, timerModifiedLater:
// timerWaiting/timerModifiedLater --> timerDeleted
if atomic.Cas(&t.status, s, timerDeleted) {
// timer 尚未運行
return true
}
case timerModifiedEarlier:
// timerModifiedEarlier --> timerModifying --> timerDeleted
tpp := t.pp.ptr()
if atomic.Cas(&t.status, s, timerModifying) {
atomic.Xadd(&tpp.adjustTimers, -1)
if !atomic.Cas(&t.status, timerModifying, timerDeleted) {
panic(...)
}
// timer 尚未運行
return true
}
case timerDeleted, timerRemoving, timerRemoved:
// Timer 已經運行
return false
case timerRunning, timerMoving:
// timer 正在運行或被其他 P 移動,等待完成
osyield()
case timerNoStatus:
// 刪除一個從未被添加或者已經運行的 timer
return false
case timerModifying:
// 并發調用 deltimer 和 modtimer
panic(...)
default:
panic(...)
}
}
}
```
### Timer 的重置(resetTimer)
重置 timer 可以是將 timer 的時間提前或者延后,為了保證程序的正確性,從而引入了 timerModifiedEarlier 和 timerModifiedLater 兩種狀態。
當進行 timer 的
```
func resetTimer(t *timer, when int64) {
(...)
resettimer(t, when)
}
func resettimer(t *timer, when int64) {
(...)
if when < 0 {
when = maxWhen
}
for {
switch s {
case timerNoStatus, timerRemoved:
// 復用一個為初始化或者已經運行完畢或者已經刪除的 timer
// timerNoStatus/timerRemoved --> timerWaiting
atomic.Store(&t.status, timerWaiting)
t.when = when
addInitializedTimer(t)
return
case timerDeleted:
if atomic.Cas(&t.status, s, timerModifying) {
t.nextwhen = when
newStatus := uint32(timerModifiedLater)
if when < t.when {
newStatus = timerModifiedEarlier
atomic.Xadd(&t.pp.ptr().adjustTimers, 1)
}
if !atomic.Cas(&t.status, timerModifying, newStatus) {
panic(...)
}
if newStatus == timerModifiedEarlier {
wakeNetPoller(when)
}
return
}
case timerRemoving:
// Wait for the removal to complete.
osyield()
case timerRunning:
// Even though the timer should not be active,
// we can see timerRunning if the timer function
// permits some other goroutine to call resettimer.
// Wait until the run is complete.
osyield()
case timerWaiting, timerModifying, timerModifiedEarlier, timerModifiedLater, timerMoving:
// Called resettimer on active timer.
panic(...)
default:
panic(...)
}
}
}
```
### Timer 的執行(runtimer)
我們還沒有分析到一個 timer 究竟如何被喚醒,但我們不妨先查看當 timer 被喚醒后會做什么。
```
//go:systemstack
func runtimer(pp *p, now int64) int64 {
for {
t := pp.timers[0]
(...)
switch s := atomic.Load(&t.status); s {
case timerWaiting:
if t.when > now {
// Not ready to run.
return t.when
}
if !atomic.Cas(&t.status, s, timerRunning) {
continue
}
// runOneTimer 可能臨時解鎖 pp.timersLock
runOneTimer(pp, t, now)
return 0
case timerDeleted:
if !atomic.Cas(&t.status, s, timerRemoving) {
continue
}
if !dodeltimer0(pp) {
panic(...)
}
if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
panic(...)
}
if len(pp.timers) == 0 {
return -1
}
case timerModifiedEarlier, timerModifiedLater:
if !atomic.Cas(&t.status, s, timerMoving) {
continue
}
t.when = t.nextwhen
if !dodeltimer0(pp) {
panic(...)
}
if !doaddtimer(pp, t) {
panic(...)
}
if s == timerModifiedEarlier {
atomic.Xadd(&pp.adjustTimers, -1)
}
if !atomic.Cas(&t.status, timerMoving, timerWaiting) {
panic(...)
}
case timerModifying:
// 等待修改完成
osyield() // usleep(1)
case timerNoStatus, timerRemoved,timerRunning, timerRemoving, timerMoving:
// 這些狀態的 timer 是不應該被觀察到的
panic(...)
default:
panic(...)
}
}
}
//go:systemstack
func runOneTimer(pp *p, t *timer, now int64) {
(...)
f := t.f
arg := t.arg
seq := t.seq
// 如果是 period > 0 則說明此時 timer 為 ticker,需要再次觸發
if t.period > 0 {
// 放入堆中并調整觸發時間
delta := t.when - now
t.when += t.period * (1 + -delta/t.period)
if !siftdownTimer(pp.timers, 0) {
panic(...)
}
if !atomic.Cas(&t.status, timerRunning, timerWaiting) {
panic(...)
}
} else { // 否則為一次性 timer
// 從堆中移除
if !dodeltimer0(pp) {
panic(...)
}
if !atomic.Cas(&t.status, timerRunning, timerNoStatus) {
panic(...)
}
}
(...)
unlock(&pp.timersLock)
f(arg, seq) // 觸發 sendTime 信號 通知用戶 Goroutine
lock(&pp.timersLock)
(...)
}
```
## 6.11.2 Timer 的觸發
### 從調度循環中直接觸發
前面我們已經簡單的提到了 Timer 會在調度循環中進行例行檢查,或者是通過`wakeNetPoller`來強制觸發 timer。 在調度循環中:
```
func schedule() {
_g_ := getg()
(...)
top:
pp := _g_.m.p.ptr()
(...)
checkTimers(pp, 0)
(...)
execute(gp, inheritTime)
}
//go:yeswritebarrierrec
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
lock(&pp.timersLock)
adjusttimers(pp)
rnow = now
if len(pp.timers) > 0 {
if rnow == 0 {
rnow = nanotime()
}
for len(pp.timers) > 0 {
// Note that runtimer may temporarily unlock
// pp.timersLock.
if tw := runtimer(pp, rnow); tw != 0 {
if tw > 0 {
pollUntil = tw
}
break
}
ran = true
}
}
unlock(&pp.timersLock)
return rnow, pollUntil, ran
}
func adjusttimers(pp *p) {
if len(pp.timers) == 0 {
return
}
if atomic.Load(&pp.adjustTimers) == 0 {
return
}
var moved []*timer
for i := 0; i < len(pp.timers); i++ {
t := pp.timers[i]
if t.pp.ptr() != pp {
throw("adjusttimers: bad p")
}
switch s := atomic.Load(&t.status); s {
case timerDeleted:
if atomic.Cas(&t.status, s, timerRemoving) {
if !dodeltimer(pp, i) {
panic(...)
}
if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
panic(...)
}
// Look at this heap position again.
i--
}
case timerModifiedEarlier, timerModifiedLater:
if atomic.Cas(&t.status, s, timerMoving) {
// Now we can change the when field.
t.when = t.nextwhen
// Take t off the heap, and hold onto it.
// We don't add it back yet because the
// heap manipulation could cause our
// loop to skip some other timer.
if !dodeltimer(pp, i) {
panic(...)
}
moved = append(moved, t)
if s == timerModifiedEarlier {
if n := atomic.Xadd(&pp.adjustTimers, -1); int32(n) <= 0 {
addAdjustedTimers(pp, moved)
return
}
}
}
case timerNoStatus, timerRunning, timerRemoving, timerRemoved, timerMoving:
panic(...)
case timerWaiting:
// OK, nothing to do.
case timerModifying:
// Check again after modification is complete.
osyield()
i--
default:
panic(...)
}
}
if len(moved) > 0 {
addAdjustedTimers(pp, moved)
}
}
func addAdjustedTimers(pp *p, moved []*timer) {
for _, t := range moved {
if !doaddtimer(pp, t) {
panic(...)
}
if !atomic.Cas(&t.status, timerMoving, timerWaiting) {
panic(...)
}
}
}
```
與調度器調度 Goroutine 的機制相同,如果一個 P 中沒有了 timer,同樣會嘗試從其他 的 P 中偷取一半的 timer:
```
func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()
top:
_p_ := _g_.m.p.ptr()
(...)
now, pollUntil, _ := checkTimers(_p_, 0)
(...)
// Poll 網絡,優先級比從其他 P 中偷要高。
// 在我們嘗試去其他 P 偷之前,這個 netpoll 只是一個優化。
// 如果沒有 waiter 或 netpoll 中的線程已被阻塞,則可以安全地跳過它。
// 如果有任何類型的邏輯競爭與被阻塞的線程(例如它已經從 netpoll 返回,但尚未設置 lastpoll)
// 該線程無論如何都將阻塞 netpoll。
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
if list := netpoll(0); !list.empty() { // 無阻塞
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
(...)
return gp, false
}
}
// 從其他 P 中偷 timer
ranTimer := false
(...)
for i := 0; i < 4; i++ {
// 隨機偷
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
(...)
// Consider stealing timers from p2.
// This call to checkTimers is the only place where
// we hold a lock on a different P's timers.
// Lock contention can be a problem here, so avoid
// grabbing the lock if p2 is running and not marked
// for preemption. If p2 is running and not being
// preempted we assume it will handle its own timers.
if i > 2 && shouldStealTimers(p2) {
tnow, w, ran := checkTimers(p2, now)
now = tnow
if w != 0 && (pollUntil == 0 || w < pollUntil) {
pollUntil = w
}
if ran {
// Running the timers may have
// made an arbitrary number of G's
// ready and added them to this P's
// local run queue. That invalidates
// the assumption of runqsteal
// that is always has room to add
// stolen G's. So check now if there
// is a local G to run.
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
ranTimer = true
}
}
}
}
if ranTimer {
// 執行完一個 timer 后可能存在已經就緒的 Goroutine
goto top
}
stop:
// 沒有任何 timer
(...)
delta := int64(-1)
if pollUntil != 0 {
// checkTimers ensures that polluntil > now.
delta = pollUntil - now
}
(...)
// poll 網絡
// 和上面重新找 runqueue 的邏輯類似
if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
atomic.Store64(&sched.pollUntil, uint64(pollUntil))
(...)
list := netpoll(delta) // block until new work is available
atomic.Store64(&sched.pollUntil, 0)
atomic.Store64(&sched.lastpoll, uint64(nanotime()))
(...)
lock(&sched.lock)
_p_ = pidleget()
unlock(&sched.lock)
if _p_ == nil {
injectglist(&list)
} else {
acquirep(_p_)
if !list.empty() {
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
(...)
return gp, false
}
(...)
goto top
}
} else if pollUntil != 0 && netpollinited() {
pollerPollUntil := int64(atomic.Load64(&sched.pollUntil))
if pollerPollUntil == 0 || pollerPollUntil > pollUntil {
netpollBreak()
}
}
// 真的什么都沒找到
// park 當前的 m
stopm()
goto top
}
func shouldStealTimers(p2 *p) bool {
if p2.status != _Prunning {
return true
}
mp := p2.m.ptr()
if mp == nil || mp.locks > 0 {
return false
}
gp := mp.curg
if gp == nil || gp.atomicstatus != _Grunning || !gp.preempt {
return false
}
return true
}
```
當一個 P 因為某種原因被銷毀時,還需要考慮 timer 的轉移:
```
func (pp *p) destroy() {
(...)
if len(pp.timers) > 0 {
plocal := getg().m.p.ptr()
// The world is stopped, but we acquire timersLock to
// protect against sysmon calling timeSleepUntil.
// This is the only case where we hold the timersLock of
// more than one P, so there are no deadlock concerns.
lock(&plocal.timersLock)
lock(&pp.timersLock)
moveTimers(plocal, pp.timers)
pp.timers = nil
pp.adjustTimers = 0
unlock(&pp.timersLock)
unlock(&plocal.timersLock)
}
(...)
}
func moveTimers(pp *p, timers []*timer) {
for _, t := range timers {
loop:
for {
switch s := atomic.Load(&t.status); s {
case timerWaiting:
t.pp = 0
if !doaddtimer(pp, t) {
panic(...)
}
break loop
case timerModifiedEarlier, timerModifiedLater:
if !atomic.Cas(&t.status, s, timerMoving) {
continue
}
t.when = t.nextwhen
t.pp = 0
if !doaddtimer(pp, t) {
panic(...)
}
if !atomic.Cas(&t.status, timerMoving, timerWaiting) {
panic(...)
}
break loop
case timerDeleted:
if !atomic.Cas(&t.status, s, timerRemoved) {
continue
}
t.pp = 0
// We no longer need this timer in the heap.
break loop
case timerModifying:
// Loop until the modification is complete.
osyield()
case timerNoStatus, timerRemoved:
// We should not see these status values in a timers heap.
panic(...)
case timerRunning, timerRemoving, timerMoving:
// Some other P thinks it owns this timer,
// which should not happen.
panic(...)
default:
panic(...)
}
}
}
}
```
在調度循環中,我們可以看到,netpoller 的作用在于喚醒喚醒調度循環,每當一個 timer 被設置后 當通過 wakeNetPoller 喚醒 netpoller 時,都能快速讓調度循環進入 timer 的檢查過程, 從而高效的觸發設置的 timer。
```
func wakeNetPoller(when int64) {
if atomic.Load64(&sched.lastpoll) == 0 {
pollerPollUntil := int64(atomic.Load64(&sched.pollUntil))
if pollerPollUntil == 0 || pollerPollUntil > when {
netpollBreak()
}
}
}
```
### 從系統監控中觸發
與 Goroutine 調度完全一樣,系統監控也負責 netpoller 的觸發,并在必要時啟動 M 來執行需要的 timer 或獲取網絡數據。
```
//go:nowritebarrierrec
func sysmon() {
(...)
checkdead()
(...)
for {
(...)
now := nanotime()
next := timeSleepUntil()
// 如果在 STW,則暫時休眠
if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) {
lock(&sched.lock)
if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) {
if next > now {
(...)
notetsleep(&sched.sysmonnote, sleep)
(...)
now = nanotime()
next = timeSleepUntil()
(...)
}
(...)
}
unlock(&sched.lock)
}
(...)
// 如果超過 10ms 沒有 poll,則 poll 一下網絡
lastpoll := int64(atomic.Load64(&sched.lastpoll))
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
list := netpoll(0) // 非阻塞,返回 Goroutine 列表
if !list.empty() {
// 需要在插入 g 列表前減少空閑鎖住的 m 的數量(假裝有一個正在運行)
// 否則會導致這些情況:
// injectglist 會綁定所有的 p,但是在它開始 M 運行 P 之前,另一個 M 從 syscall 返回,
// 完成運行它的 G ,注意這時候沒有 work 要做,且沒有其他正在運行 M 的死鎖報告。
incidlelocked(-1)
injectglist(&list)
incidlelocked(1)
}
}
if next < now {
// There are timers that should have already run,
// perhaps because there is an unpreemptible P.
// Try to start an M to run them.
startm(nil, false)
}
(...)
}
}
```
在死鎖檢查中,可以對 timers 進行一次檢查,并根據 timer 的狀態選擇喚醒 M 來執行:
```
func checkdead() {
(...)
// Maybe jump time forward for playground.
_p_ := timejump()
if _p_ != nil {
for pp := &sched.pidle; *pp != 0; pp = &(*pp).ptr().link {
if (*pp).ptr() == _p_ {
*pp = _p_.link
break
}
}
mp := mget()
if mp == nil {
// There should always be a free M since
// nothing is running.
throw("checkdead: no m for timer")
}
mp.nextp.set(_p_)
notewakeup(&mp.park)
return
}
// There are no goroutines running, so we can look at the P's.
for _, _p_ := range allp {
if len(_p_.timers) > 0 {
return
}
}
(...)
}
func timejump() *p {
(...)
// Nothing is running, so we can look at all the P's.
// Determine a timer bucket with minimum when.
var (
minT *timer
minWhen int64
minP *p
)
for _, pp := range allp {
(...)
if len(pp.timers) == 0 {
continue
}
c := pp.adjustTimers
for _, t := range pp.timers {
switch s := atomic.Load(&t.status); s {
case timerWaiting:
if minT == nil || t.when < minWhen {
minT = t
minWhen = t.when
minP = pp
}
case timerModifiedEarlier, timerModifiedLater:
if minT == nil || t.nextwhen < minWhen {
minT = t
minWhen = t.nextwhen
minP = pp
}
if s == timerModifiedEarlier {
c--
}
case timerRunning, timerModifying, timerMoving:
panic(...)
}
// The timers are sorted, so we only have to check
// the first timer for each P, unless there are
// some timerModifiedEarlier timers. The number
// of timerModifiedEarlier timers is in the adjustTimers
// field, used to initialize c, above.
if c == 0 {
break
}
}
}
(...)
return minP
}
```
## 小結
Timer 的實現已經經歷了好幾次大幅度的優化。如今的 Timer 生存在 P 中,每當進入調度循環時, 都會對 Timer 進行檢查,從而快速的啟動那些對時間敏感的 Goroutine, 這一思路也同樣得益于 netpoller,通過系統事件來喚醒那些對有效性極度敏感的任務。
- 第一部分 :基礎篇
- 第1章 Go語言的前世今生
- 1.2 Go語言綜述
- 1.3 順序進程通訊
- 1.4 Plan9匯編語言
- 第2章 程序生命周期
- 2.1 從go命令談起
- 2.2 Go程序編譯流程
- 2.3 Go 程序啟動引導
- 2.4 主Goroutine的生與死
- 第3 章 語言核心
- 3.1 數組.切片與字符串
- 3.2 散列表
- 3.3 函數調用
- 3.4 延遲語句
- 3.5 恐慌與恢復內建函數
- 3.6 通信原語
- 3.7 接口
- 3.8 運行時類型系統
- 3.9 類型別名
- 3.10 進一步閱讀的參考文獻
- 第4章 錯誤
- 4.1 問題的演化
- 4.2 錯誤值檢查
- 4.3 錯誤格式與上下文
- 4.4 錯誤語義
- 4.5 錯誤處理的未來
- 4.6 進一步閱讀的參考文獻
- 第5章 同步模式
- 5.1 共享內存式同步模式
- 5.2 互斥鎖
- 5.3 原子操作
- 5.4 條件變量
- 5.5 同步組
- 5.6 緩存池
- 5.7 并發安全散列表
- 5.8 上下文
- 5.9 內存一致模型
- 5.10 進一步閱讀的文獻參考
- 第二部分 運行時篇
- 第6章 并發調度
- 6.1 隨機調度的基本概念
- 6.2 工作竊取式調度
- 6.3 MPG模型與并發調度單
- 6.4 調度循環
- 6.5 線程管理
- 6.6 信號處理機制
- 6.7 執行棧管理
- 6.8 協作與搶占
- 6.9 系統監控
- 6.10 網絡輪詢器
- 6.11 計時器
- 6.12 非均勻訪存下的調度模型
- 6.13 進一步閱讀的參考文獻
- 第7章 內存分配
- 7.1 設計原則
- 7.2 組件
- 7.3 初始化
- 7.4 大對象分配
- 7.5 小對象分配
- 7.6 微對象分配
- 7.7 頁分配器
- 7.8 內存統計
- 第8章 垃圾回收
- 8.1 垃圾回收的基本想法
- 8.2 寫屏幕技術
- 8.3 調步模型與強弱觸發邊界
- 8.4 掃描標記與標記輔助
- 8.5 免清掃式位圖技術
- 8.6 前進保障與終止檢測
- 8.7 安全點分析
- 8.8 分代假設與代際回收
- 8.9 請求假設與實務制導回收
- 8.10 終結器
- 8.11 過去,現在與未來
- 8.12 垃圾回收統一理論
- 8.13 進一步閱讀的參考文獻
- 第三部分 工具鏈篇
- 第9章 代碼分析
- 9.1 死鎖檢測
- 9.2 競爭檢測
- 9.3 性能追蹤
- 9.4 代碼測試
- 9.5 基準測試
- 9.6 運行時統計量
- 9.7 語言服務協議
- 第10章 依賴管理
- 10.1 依賴管理的難點
- 10.2 語義化版本管理
- 10.3 最小版本選擇算法
- 10.4 Vgo 與dep之爭
- 第12章 泛型
- 12.1 泛型設計的演進
- 12.2 基于合約的泛型
- 12.3 類型檢查技術
- 12.4 泛型的未來
- 12.5 進一步閱讀的的參考文獻
- 第13章 編譯技術
- 13.1 詞法與文法
- 13.2 中間表示
- 13.3 優化器
- 13.4 指針檢查器
- 13.5 逃逸分析
- 13.6 自舉
- 13.7 鏈接器
- 13.8 匯編器
- 13.9 調用規約
- 13.10 cgo與系統調用
- 結束語: Go去向何方?