<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                # 5.2 互斥鎖 ``` type Mutex struct { state int32 // 表示 mutex 鎖當前的狀態 sema uint32 // 信號量,用于喚醒 goroutine } ``` ## 狀態 ``` const ( mutexLocked = 1 &lt;&lt; iota // 互斥鎖已鎖住 mutexWoken mutexStarving mutexWaiterShift = iota starvationThresholdNs = 1e6 ) ``` Mutex 可能處于兩種不同的模式:正常模式和饑餓模式。 在正常模式中,等待者按照 FIFO 的順序排隊獲取鎖,但是一個被喚醒的等待者有時候并不能獲取 mutex, 它還需要和新到來的 goroutine 們競爭 mutex 的使用權。 新到來的 goroutine 存在一個優勢,它們已經在 CPU 上運行且它們數量很多, 因此一個被喚醒的等待者有很大的概率獲取不到鎖,在這種情況下它處在等待隊列的前面。 如果一個 goroutine 等待 mutex 釋放的時間超過 1ms,它就會將 mutex 切換到饑餓模式 在饑餓模式中,mutex 的所有權直接從解鎖的 goroutine 遞交到等待隊列中排在最前方的 goroutine。 新到達的 goroutine 們不要嘗試去獲取 mutex,即使它看起來是在解鎖狀態,也不要試圖自旋, 而是排到等待隊列的尾部。 如果一個等待者獲得 mutex 的所有權,并且看到以下兩種情況中的任一種: 1. 它是等待隊列中的最后一個, 2. 它等待的時間少于 1ms,它便將 mutex 切換回正常操作模式 正常模式下的性能會更好,因為一個 goroutine 能在即使有很多阻塞的等待者時多次連續的獲得一個 mutex,饑餓模式的重要性則在于避免了病態情況下的尾部延遲。 ## 加鎖 Lock 對申請鎖的情況分為三種: 1. 無沖突,通過 CAS 操作把當前狀態設置為加鎖狀態 2. 有沖突,開始自旋,并等待鎖釋放,如果其他 goroutine 在這段時間內釋放該鎖,直接獲得該鎖;如果沒有釋放則為下一種情況 3. 有沖突,且已經過了自旋階段,通過調用 semrelease 讓 goroutine 進入等待狀態 ``` func (m *Mutex) Lock() { // 快速路徑: 抓取并鎖上未鎖住狀態的互斥鎖 if atomic.CompareAndSwapInt32(&amp;m.state, 0, mutexLocked) { (...) return } m.lockSlow() } func (m *Mutex) lockSlow() { var waitStartTime int64 starving := false awoke := false iter := 0 old := m.state for { // Don't spin in starvation mode, ownership is handed off to waiters // so we won't be able to acquire the mutex anyway. if old&amp;(mutexLocked|mutexStarving) == mutexLocked &amp;&amp; runtime_canSpin(iter) { // Active spinning makes sense. // Try to set mutexWoken flag to inform Unlock // to not wake other blocked goroutines. if !awoke &amp;&amp; old&amp;mutexWoken == 0 &amp;&amp; old&gt;&gt;mutexWaiterShift != 0 &amp;&amp; atomic.CompareAndSwapInt32(&amp;m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue } new := old // Don't try to acquire starving mutex, new arriving goroutines must queue. if old&amp;mutexStarving == 0 { new |= mutexLocked } if old&amp;(mutexLocked|mutexStarving) != 0 { new += 1 &lt;&lt; mutexWaiterShift } // The current goroutine switches mutex to starvation mode. // But if the mutex is currently unlocked, don't do the switch. // Unlock expects that starving mutex has waiters, which will not // be true in this case. if starving &amp;&amp; old&amp;mutexLocked != 0 { new |= mutexStarving } if awoke { // The goroutine has been woken from sleep, // so we need to reset the flag in either case. if new&amp;mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &amp;^= mutexWoken } if atomic.CompareAndSwapInt32(&amp;m.state, old, new) { if old&amp;(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // If we were already waiting before, queue at the front of the queue. queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&amp;m.sema, queueLifo, 1) starving = starving || runtime_nanotime()-waitStartTime &gt; starvationThresholdNs old = m.state if old&amp;mutexStarving != 0 { (...) delta := int32(mutexLocked - 1&lt;&lt;mutexWaiterShift) if !starving || old&gt;&gt;mutexWaiterShift == 1 { // Exit starvation mode. // Critical to do it here and consider wait time. // Starvation mode is so inefficient, that two goroutines // can go lock-step infinitely once they switch mutex // to starvation mode. delta -= mutexStarving } atomic.AddInt32(&amp;m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } (...) } ``` ## 解鎖 ``` func (m *Mutex) Unlock() { (...) // Fast path: drop lock bit. new := atomic.AddInt32(&amp;m.state, -mutexLocked) if new != 0 { // Outlined slow path to allow inlining the fast path. // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock. m.unlockSlow(new) } } func (m *Mutex) unlockSlow(new int32) { (...) if new&amp;mutexStarving == 0 { old := new for { // If there are no waiters or a goroutine has already // been woken or grabbed the lock, no need to wake anyone. // In starvation mode ownership is directly handed off from unlocking // goroutine to the next waiter. We are not part of this chain, // since we did not observe mutexStarving when we unlocked the mutex above. // So get off the way. // 如果沒有等待著,或者已經存在一個 goroutine 被喚醒或者得到鎖, // 或處于饑餓模式,無需喚醒任何等待狀態的 goroutine if old&gt;&gt;mutexWaiterShift == 0 || old&amp;(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // Grab the right to wake someone. new = (old - 1&lt;&lt;mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&amp;m.state, old, new) { // 喚醒一個阻塞的 goroutine,但不是喚醒第一個等待者 runtime_Semrelease(&amp;m.sema, false, 1) return } old = m.state } } else { // Starving mode: handoff mutex ownership to the next waiter. // Note: mutexLocked is not set, the waiter will set it after wakeup. // But mutex is still considered locked if mutexStarving is set, // so new coming goroutines won't acquire it. // 饑餓模式: 直接將 mutex 所有權交給等待隊列最前端的 goroutine runtime_Semrelease(&amp;m.sema, true, 1) } } ``` ## 例:并發安全的單例模式 利用原子操作和互斥鎖我們可以輕松實現一個非常簡單的并發安全單例模式,即 sync.Once。 sync.Once 用來保證絕對一次執行的對象,例如可在單例的初始化中使用。 它內部的結構也相對簡單: ``` // Once 對象可以保證一個動作的絕對一次執行。 type Once struct { // done 表明某個動作是否被執行 // 由于其使用頻繁(熱路徑),故將其放在結構體的最上方 // 熱路徑在每個調用點進行內嵌 // 將 done 放在第一位,在某些架構下(amd64/x86)能獲得更加緊湊的指令, // 而在其他架構下能更少的指令(用于計算其偏移量)。 done uint32 m Mutex } ``` 注意,這個結構在 Go 1.13 中得到了重新調整,在其之前`done`字段在`m`之后。 源碼也非常簡單: ``` // Do 當且僅當第一次調用時,f 會被執行。換句話說,給定 // var once Once // 如果 once.Do(f) 被多次調用則只有第一次會調用 f,即使每次提供的 f 不同。 // 每次執行必須新建一個 Once 實例。 // // Do 用于變量的一次初始化,由于 f 是無參數的,因此有必要使用函數字面量來捕獲參數: // config.once.Do(func() { config.init(filename) }) // // 因為該調用無返回值,因此如果 f 調用了 Do,則會導致死鎖。 // // 如果 f 發生 panic,則 Do 認為 f 已經返回;之后的調用也不會調用 f。 // func (o *Once) Do(f func()) { // 原子讀取 Once 內部的 done 屬性,是否為 0,是則進入慢速路徑,否則直接調用 if atomic.LoadUint32(&amp;o.done) == 0 { o.doSlow(f) } } func (o *Once) doSlow(f func()) { // 注意,我們只使用原子讀讀取了 o.done 的值,這是最快速的路徑執行原子操作,即 fast-path // 但當我們需要確保在并發狀態下,是不是有多個人讀到 0,因此必須加鎖,這個操作相對昂貴,即 slow-path o.m.Lock() defer o.m.Unlock() // 正好我們有一個并發的 goroutine 讀到了 0,那么立即執行 f 并在結束時候調用原子寫,將 o.done 修改為 1 if o.done == 0 { defer atomic.StoreUint32(&amp;o.done, 1) f() } // 當 o.done 為 0 的 goroutine 解鎖后,其他人會繼續加鎖,這時會發現 o.done 已經為了 1 ,于是 f 已經不用在繼續執行了 } ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看