<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                # 5.4 條件變量 sync.Cond 在生產者消費者模型中非常典型,帶有互斥鎖的隊列當元素滿時, 如果生產在向隊列插入元素時將隊列鎖住,會產生既不能讀,也不能寫的情況。 sync.Cond 就解決了這個問題。 ``` func main() { cond := sync.NewCond(new(sync.Mutex)) condition := 0 // 消費者 go func() { for { // 消費者開始消費時,鎖住 cond.L.Lock() // 如果沒有可消費的值,則等待 for condition == 0 { cond.Wait() } // 消費 condition-- fmt.Printf("Consumer: %d\n", condition) // 喚醒一個生產者 cond.Signal() // 解鎖 cond.L.Unlock() } }() // 生產者 for { // 生產者開始生產 cond.L.Lock() // 當生產太多時,等待消費者消費 for condition == 100 { cond.Wait() } // 生產 condition++ fmt.Printf("Producer: %d\n", condition) // 通知消費者可以開始消費了 cond.Signal() // 解鎖 cond.L.Unlock() } } ``` 我們來看一看內部的實現原理。 ## 結構 sync.Cond 的內部結構包含一個鎖(Locker)、通知列表(notifyList)以及一個復制檢查器 copyChecker。 ``` type Locker interface { Lock() Unlock() } type Cond struct { L Locker notify notifyList checker copyChecker } func NewCond(l Locker) *Cond { return &amp;Cond{L: l} } ``` L 的類型為 Locker 因此可以包含任何實現了 Lock 和 Unlock 的鎖,這包括 Mutex 和 RWMutex。 ## copyChecker copyChecker 非常簡單,它實現了一個`check()`方法,這個方法以 copyChecker 的指針作為 reciever, 因為 copyChecker 在一個 Cond 中并非指針,因此當 Cond 發生拷貝行為后,這個 reciever 會 發生變化,從而檢測到拷貝行為,使用 panic 以警示用戶: ``` // copyChecker 保存指向自身的指針來檢測對象的復制行為。 type copyChecker uintptr func (c *copyChecker) check() { if uintptr(*c) != uintptr(unsafe.Pointer(c)) &amp;&amp; !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &amp;&amp; uintptr(*c) != uintptr(unsafe.Pointer(c)) { panic("sync.Cond is copied") } } ``` ## Wait / Signal / Broadcast Wait/Signal/Broadcast 都是由通知列表來實現的,撇開 copyChecker, Wait 無非就是向 notifyList 注冊一個通知,而后阻塞到被通知, Signal 則負責通知一個在 notifyList 注冊過的 waiter 發出通知, Broadcast 更是直接粗暴的向所有人都發出通知。 ``` // Wait 原子式的 unlock c.L, 并暫停執行調用的 goroutine。 // 在稍后執行后,Wait 會在返回前 lock c.L. 與其他系統不同, // 除非被 Broadcast 或 Signal 喚醒,否則等待無法返回。 // // 因為等待第一次 resume 時 c.L 沒有被鎖定,所以當 Wait 返回時, // 調用者通常不能認為條件為真。相反,調用者應該在循環中使用 Wait(): // // c.L.Lock() // for !condition() { // c.Wait() // } // ... make use of condition ... // c.L.Unlock() // func (c *Cond) Wait() { c.checker.check() t := runtime_notifyListAdd(&amp;c.notify) c.L.Unlock() runtime_notifyListWait(&amp;c.notify, t) c.L.Lock() } // Signal 喚醒一個等待 c 的 goroutine(如果存在) // // 在調用時它可以(不必須)持有一個 c.L func (c *Cond) Signal() { c.checker.check() runtime_notifyListNotifyOne(&amp;c.notify) } // Broadcast 喚醒等待 c 的所有 goroutine // // 調用時它可以(不必須)持久有個 c.L func (c *Cond) Broadcast() { c.checker.check() runtime_notifyListNotifyAll(&amp;c.notify) } ``` 那么它的核心實現其實就落到了 notifyList 上。 ## notifyList notifyList 結構本質上是一個隊列: ``` // notifyList 基于 ticket 實現通知列表 type notifyList struct { // wait 為下一個 waiter 的 ticket 編號 // 在沒有 lock 的情況下原子自增 wait uint32 // notify 是下一個被通知的 waiter 的 ticket 編號 // 它可以在沒有 lock 的情況下進行讀取,但只有在持有 lock 的情況下才能進行寫 // // wait 和 notify 會產生 wrap around,只要它們 "unwrapped" // 的差別小于 2^31,這種情況可以被正確處理。對于 wrap around 的情況而言, // 我們需要超過 2^31+ 個 goroutine 阻塞在相同的 condvar 上,這是不可能的。 // notify uint32 // waiter 列表. lock mutex head *sudog tail *sudog } ``` 當一個 Cond 調用 Wait 方法時候,向 wait 字段加 1,并返回一個 ticket 編號: ``` // notifyListAdd 將調用者添加到通知列表,以便接收通知。 // 調用者最終必須調用 notifyListWait 等待這樣的通知,并傳遞返回的 ticket 編號。 //go:linkname notifyListAdd sync.runtime_notifyListAdd func notifyListAdd(l *notifyList) uint32 { // 這可以并發調用,例如,當在 read 模式下保持 RWMutex 時從 sync.Cond.Wait 調用時。 return atomic.Xadd(&amp;l.wait, 1) - 1 } ``` 而后使用這個 ticket 編號來等待通知,這個過程會將等待通知的 goroutine 進行停泊,進入等待狀態, 并將其 M 與 P 解綁,從而將 G 從 M 身上剝離,放入等待隊列 sudog 中: ``` // notifyListWait 等待通知。如果在調用 notifyListAdd 后發送了一個,則立即返回。否則,它會阻塞。 //go:linkname notifyListWait sync.runtime_notifyListWait func notifyListWait(l *notifyList, t uint32) { lock(&amp;l.lock) // 如果 ticket 編號對應的 goroutine 已經被通知到,則立刻返回 if less(t, l.notify) { unlock(&amp;l.lock) return } s := acquireSudog() s.g = getg() s.ticket = t s.releasetime = 0 t0 := int64(0) if blockprofilerate &gt; 0 { t0 = cputicks() s.releasetime = -1 } if l.tail == nil { l.head = s } else { l.tail.next = s } l.tail = s // 將 M/P/G 解綁,并將 G 調整為等待狀態,放入 sudog 等待隊列中 goparkunlock(&amp;l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3) if t0 != 0 { blockevent(s.releasetime-t0, 2) } releaseSudog(s) } // 將當前 goroutine 置于等待狀態并解鎖 lock。 // 通過調用 goready(gp) 可讓 goroutine 再次 runnable func goparkunlock(lock *mutex, reason waitReason, traceEv byte, traceskip int) { gopark(parkunlock_c, unsafe.Pointer(lock), reason, traceEv, traceskip) } ``` 當調用 Signal 時,會有一個在等待的 goroutine 被通知到,具體過程就是從 sudog 列表中找到 要通知的 goroutine,而后將其`goready`來等待調度循環將其調度: ``` // notifyListNotifyOne 通知列表中的一個條目 //go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne func notifyListNotifyOne(l *notifyList) { // Fast-path: 如果上次通知后沒有新的 waiter // 則無需加鎖 if atomic.Load(&amp;l.wait) == atomic.Load(&amp;l.notify) { return } lock(&amp;l.lock) // slow-path 的二次檢查 t := l.notify if t == atomic.Load(&amp;l.wait) { unlock(&amp;l.lock) return } // 更新下一個需要喚醒的 ticket 編號 atomic.Store(&amp;l.notify, t+1) // 嘗試找到需要被通知的 g // 如果目前還沒來得及入隊,是無法找到的 // 但是,當它看到通知編號已經發生改變是不會被 park 的 // // 這個查找過程看起來是線性復雜度,但實際上很快就停了 // 因為 g 的隊列與獲取編號不同,因而隊列中會出現少量重排,但我們希望找到靠前的 g // 而 g 只有在不再 race 后才會排在靠前的位置,因此這個迭代也不會太久, // 同時,即便找不到 g,這個情況也成立: // 它還沒有休眠,并且已經失去了我們在隊列上找到的(少數)其他 g 的 race。 for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next { if s.ticket == t { n := s.next if p != nil { p.next = n } else { l.head = n } if n == nil { l.tail = p } unlock(&amp;l.lock) s.next = nil readyWithTime(s, 4) return } } unlock(&amp;l.lock) } func readyWithTime(s *sudog, traceskip int) { if s.releasetime != 0 { s.releasetime = cputicks() } goready(s.g, traceskip) } ``` 如果是全員通知,基本類似: ``` // notifyListNotifyAll 通知列表里的所有人 //go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll func notifyListNotifyAll(l *notifyList) { // Fast-path: 如果上次通知后沒有新的 waiter // 則無需加鎖 if atomic.Load(&amp;l.wait) == atomic.Load(&amp;l.notify) { return } // 從列表中取一個,保存到局部變量,waiter 則可以在無鎖的情況下 ready lock(&amp;l.lock) s := l.head l.head = nil l.tail = nil // 更新要通知的下一個 ticket。 // 可以將它設置為等待的當前值,因為任何以前的 waiter 已經在列表中, // 或者會他們在嘗試將自己添加到列表時已經收到通知。 atomic.Store(&amp;l.notify, atomic.Load(&amp;l.wait)) unlock(&amp;l.lock) // 遍歷整個本地列表,并 ready 所有的 waiter for s != nil { next := s.next s.next = nil readyWithTime(s, 4) s = next } } ``` 比較簡單,不再贅述。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看