<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                鎖是一種同步機制,用于在多任務環境中限制資源的訪問,以滿足互斥需求。 go源碼sync包中經常用于同步操作的方式: * 原子操作 * 互斥鎖 * 讀寫鎖 * waitgroup 我們著重來分析下互斥鎖和讀寫鎖. 互斥鎖: 下面是互斥鎖的數據結構: ~~~go // A Mutex is a mutual exclusion lock. // The zero value for a Mutex is an unlocked mutex. // // A Mutex must not be copied after first use. type Mutex struct { state int32 // 互斥鎖上鎖狀態枚舉值如下所示 sema uint32 // 信號量,向處于Gwaitting的G發送信號 } const ( mutexLocked = 1 << iota // 值為1,表示在state中由低向高第1位,意義:鎖是否可用,0可用,1不可用,鎖定中 mutexWoken // 值為2,表示在state中由低向高第2位,意義:mutex是否被喚醒 mutexStarving // 當前的互斥鎖進入饑餓狀態; mutexWaiterShift = iota //值為2,表示state中統計阻塞在此mutex上goroutine的數目需要位移的偏移量 starvationThresholdNs = 1e6 ~~~ state和sema兩個加起來只占 8 字節空間的結構體表示了 Go 語言中的互斥鎖。 互斥鎖的狀態比較復雜,如下圖所示,最低三位分別表示 mutexLocked、mutexWoken 和 mutexStarving,剩下的位置用來表示當前有多少個 Goroutine 等待互斥鎖的釋放. [![](https://github.com/KeKe-Li/data-structures-questions/raw/master/src/images/138.jpg)](https://github.com/KeKe-Li/data-structures-questions/blob/master/src/images/138.jpg) 在默認情況下,互斥鎖的所有狀態位都是 0,int32 中的不同位分別表示了不同的狀態: * mutexLocked 表示互斥鎖的鎖定狀態; * mutexWoken 表示從正常模式被從喚醒; * mutexStarving 當前的互斥鎖進入饑餓狀態; * waitersCount 當前互斥鎖上等待的 Goroutine 個數; sync.Mutex 有兩種模式,正常模式和饑餓模式。 在正常模式下,鎖的等待者會按照先進先出的順序獲取鎖。 但是剛被喚起的`Goroutine`與新創建的`Goroutine`競爭時,大概率會獲取不到鎖,為了減少這種情況的出現,一旦 Goroutine 超過 1ms 沒有獲取到鎖,它就會將當前互斥鎖切換饑餓模式,防止部分 Goroutine 被餓死。 饑餓模式是在 Go 語言 1.9 版本引入的優化的,引入的目的是保證互斥鎖的公平性(Fairness)。 在饑餓模式中,互斥鎖會直接交給等待隊列最前面的 Goroutine。新的 Goroutine 在該狀態下不能獲取鎖、也不會進入自旋狀態,它們只會在隊列的末尾等待。 如果一個 Goroutine 獲得了互斥鎖并且它在隊列的末尾或者它等待的時間少于 1ms,那么當前的互斥鎖就會被切換回正常模式。 相比于饑餓模式,正常模式下的互斥鎖能夠提供更好地性能,饑餓模式的能避免 Goroutine 由于陷入等待無法獲取鎖而造成的高尾延時。 互斥鎖的加鎖是靠 sync.Mutex.Lock 方法完成的, 當鎖的狀態是 0 時,將`mutexLocked`位置成 1: ~~~go // Lock locks m. // If the lock is already in use, the calling goroutine // blocks until the mutex is available. func (m *Mutex) Lock() { // Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } // Slow path (outlined so that the fast path can be inlined) m.lockSlow() } ~~~ 如果互斥鎖的狀態不是 0 時就會調用`sync.Mutex.lockSlow`嘗試通過自旋(Spinnig)等方式等待鎖的釋放, 這個方法是一個非常大 for 循環,它獲取鎖的過程: 1. 判斷當前 Goroutine 能否進入自旋; 2. 通過自旋等待互斥鎖的釋放; 3. 計算互斥鎖的最新狀態; 4. 更新互斥鎖的狀態并獲取鎖; 那么互斥鎖是如何判斷當前 Goroutine 能否進入自旋等互斥鎖的釋放,是通過它的lockSlow方法, 由于自旋是一種多線程同步機制,所以呢當前的進程在進入自旋的過程中會一直保持對 CPU 的占用,持續檢查某個條件是否為真。 通常在多核的 CPU 上,自旋可以避免 Goroutine 的切換,使用得當會對性能帶來很大的增益,但是往往使用的不得當就會拖慢整個程序. 所以 Goroutine 進入自旋的條件非常苛刻: * 互斥鎖只有在普通模式才能進入自旋; * `runtime.sync_runtime_canSpin`需要返回 true: a. 需要運行在多 CPU 的機器上; b. 當前的Goroutine 為了獲取該鎖進入自旋的次數小于四次; c. 當前機器上至少存在一個正在運行的處理器 P 并且處理的運行隊列為空; 一旦當前 Goroutine 能夠進入自旋就會調用`runtime.sync_runtime_doSpin`和`runtime.procyield`并執行 30 次的 PAUSE 指令,該指令只會占用 CPU 并消耗 CPU 時間. 處理了自旋相關的特殊邏輯之后,互斥鎖會根據上下文計算當前互斥鎖最新的狀態。 通過幾個不同的條件分別會更新 state 字段中存儲的不同信息,`mutexLocked`、`mutexStarving`、`mutexWoken`和`mutexWaiterShift`: ~~~go new := old if old&mutexStarving == 0 { new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } if starving && old&mutexLocked != 0 { new |= mutexStarving } if awoke { new &^= mutexWoken } ~~~ 計算了新的互斥鎖狀態之后,就會使用 CAS 函數 sync/atomic.CompareAndSwapInt32 更新該狀態: ~~~go if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break // 通過 CAS 函數獲取了鎖 } ... runtime_SemacquireMutex(&m.sema, queueLifo, 1) starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } } ~~~ 如果我們沒有通過 CAS 獲得鎖,會調用`runtime.sync_runtime_SemacquireMutex`使用信號量保證資源不會被兩個 Goroutine 獲取。 `runtime.sync_runtime_SemacquireMutex`會在方法中不斷調用嘗試獲取鎖并休眠當前 Goroutine 等待信號量的釋放,一旦當前 Goroutine 可以獲取信號量,它就會立刻返回,`sync.Mutex.Lock`方法的剩余代碼也會繼續執行。 在正常模式下,這段代碼會設置喚醒和饑餓標記、重置迭代次數并重新執行獲取鎖的循環. 在饑餓模式下,當前 Goroutine 會獲得互斥鎖,如果等待隊列中只存在當前 Goroutine,互斥鎖還會從饑餓模式中退出. 互斥鎖的解鎖過程`sync.Mutex.Unlock`與加鎖過程相比就很簡單,該過程會先使用`sync/atomic.AddInt32`函數快速解鎖,這時會發生下面的兩種情況: * 如果該函數返回的新狀態等于 0,當前 Goroutine 就成功解鎖了互斥鎖; * 如果該函數返回的新狀態不等于 0,這段代碼會調用`sync.Mutex.unlockSlow`方法開始慢速解鎖: ~~~go func (m *Mutex) Unlock() { if race.Enabled { _ = m.state race.Release(unsafe.Pointer(m)) } // Fast path: drop lock bit. new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { // Outlined slow path to allow inlining the fast path. // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock. m.unlockSlow(new) } } ~~~ `sync.Mutex.unlockSlow`方法首先會校驗鎖狀態的合法性, 如果當前互斥鎖已經被解鎖過了就會直接拋出異常`sync: unlock of unlocked mutex`中止當前程序。 在正常情況下會根據當前互斥鎖的狀態,分別處理正常模式和饑餓模式下的互斥鎖. ~~~go func (m *Mutex) unlockSlow(new int32) { if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } if new&mutexStarving == 0 { old := new for { // If there are no waiters or a goroutine has already // been woken or grabbed the lock, no need to wake anyone. // In starvation mode ownership is directly handed off from unlocking // goroutine to the next waiter. We are not part of this chain, // since we did not observe mutexStarving when we unlocked the mutex above. // So get off the way. if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // Grab the right to wake someone. new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } old = m.state } } else { // Starving mode: handoff mutex ownership to the next waiter, and yield // our time slice so that the next waiter can start to run immediately. // Note: mutexLocked is not set, the waiter will set it after wakeup. // But mutex is still considered locked if mutexStarving is set, // so new coming goroutines won't acquire it. runtime_Semrelease(&m.sema, true, 1) } } ~~~ 在正常模式下,這段代碼會分別處理以下兩種情況處理: * 如果互斥鎖不存在等待者或者互斥鎖的`mutexLocked`、`mutexStarving`、`mutexWoken`狀態不都為 0,那么當前方法就可以直接返回,不需要喚醒其他等待者; * 如果互斥鎖存在等待者,會通過`sync.runtime_Semrelease`喚醒等待者并移交鎖的所有權; 在饑餓模式下,上述代碼會直接調用`sync.runtime_Semrelease`方法將當前鎖交給下一個正在嘗試獲取鎖的等待者,等待者被喚醒后會得到鎖,在這時互斥鎖還不會退出饑餓狀態; 互斥鎖的加鎖過程比較復雜,它涉及自旋、信號量以及調度等概念: * 如果互斥鎖處于初始化狀態,就會直接通過置位 mutexLocked 加鎖; * 如果互斥鎖處于 mutexLocked 并且在普通模式下工作,就會進入自旋,執行 30 次 PAUSE 指令消耗 CPU 時間等待鎖的釋放; * 如果當前 Goroutine 等待鎖的時間超過了 1ms,互斥鎖就會切換到饑餓模式; * 互斥鎖在正常情況下會通過`runtime.sync_runtime_SemacquireMutex`函數將嘗試獲取鎖的 Goroutine 切換至休眠狀態,等待鎖的持有者喚醒當前 Goroutine; * 如果當前 Goroutine 是互斥鎖上的最后一個等待的協程或者等待的時間小于 1ms,當前 Goroutine 會將互斥鎖切換回正常模式; 互斥鎖的解鎖過程與之相比就比較簡單,其代碼行數不多、邏輯清晰,也比較容易理解: * 當互斥鎖已經被解鎖時,那么調用`sync.Mutex.Unlock`會直接拋出異常; * 當互斥鎖處于饑餓模式時,會直接將鎖的所有權交給隊列中的下一個等待者,等待者會負責設置`mutexLocked`標志位; * 當互斥鎖處于普通模式時,如果沒有 Goroutine 等待鎖的釋放或者已經有被喚醒的 Goroutine 獲得了鎖,就會直接返回;在其他情況下會通過`sync.runtime_Semrelease`喚醒對應的 Goroutine. 讀寫鎖: 讀寫互斥鎖`sync.RWMutex`是細粒度的互斥鎖,它不限制資源的并發讀,但是讀寫、寫寫操作無法并行執行。 sync.RWMutex 中總共包含5 個字段: ~~~go type RWMutex struct { w Mutex // 復用互斥鎖提供的能力 writerSem uint32 // 寫等待讀 readerSem uint32 // 讀等待寫 readerCount int32 // 存儲了當前正在執行的讀操作的數量 readerWait int32 // 當寫操作被阻塞時等待的讀操作個數 } ~~~ 我們從寫鎖開始分析: 當我們想要獲取寫鎖時,需要調用`sync.RWMutex.Lock`方法: ~~~go func (rw *RWMutex) Lock() { if race.Enabled { _ = rw.w.state race.Disable() } // First, resolve competition with other writers. rw.w.Lock() // Announce to readers there is a pending writer. r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders // Wait for active readers. if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { runtime_SemacquireMutex(&rw.writerSem, false, 0) } if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(&rw.readerSem)) race.Acquire(unsafe.Pointer(&rw.writerSem)) } } ~~~ * 這里調用結構體持有的`sync.Mutex`的`sync.Mutex.Lock`方法阻塞后續的寫操作; 因為互斥鎖已經被獲取,其他 Goroutine 在獲取寫鎖時就會進入自旋或者休眠; * 調用`sync/atomic.AddInt32`方法阻塞后續的讀操作: 如果仍然有其他 Goroutine 持有互斥鎖的讀鎖`(r != 0)`,該 Goroutine 會調用`runtime.sync_runtime_SemacquireMutex`進入休眠狀態等待所有讀鎖所有者執行結束后釋放`writerSem`信號量將當前協程喚醒。 寫鎖的釋放會調用`sync.RWMutex.Unlock`方法: ~~~go func (rw *RWMutex) Unlock() { if race.Enabled { _ = rw.w.state race.Release(unsafe.Pointer(&rw.readerSem)) race.Disable() } // Announce to readers there is no active writer. r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) if r >= rwmutexMaxReaders { race.Enable() throw("sync: Unlock of unlocked RWMutex") } // Unblock blocked readers, if any. for i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false, 0) } // Allow other writers to proceed. rw.w.Unlock() if race.Enabled { race.Enable() } } ~~~ 解鎖與加鎖的過程正好相反,寫鎖的釋放分為以下幾個步驟: 1. 調用`sync/atomic.AddInt32`函數將`readerCount`變回正數,釋放讀鎖; 2. 通過 for 循環觸發所有由于獲取讀鎖而陷入等待的 Goroutine: 3. 調用`sync.Mutex.Unlock`方法釋放寫鎖; 獲取寫鎖時會先阻塞寫鎖的獲取,后阻塞讀鎖的獲取,這種策略能夠保證讀操作不會被連續的寫操作餓死。 接著是讀鎖: 讀鎖的加鎖方法`sync.RWMutex.RLock`就比較簡單了,該方法會通過`sync/atomic.AddInt32`將`readerCount`加一: ~~~go func (rw *RWMutex) RLock() { if race.Enabled { _ = rw.w.state race.Disable() } if atomic.AddInt32(&rw.readerCount, 1) < 0 { // A writer is pending, wait for it. runtime_SemacquireMutex(&rw.readerSem, false, 0) } if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(&rw.readerSem)) } } ~~~ 如果`RLock`該方法返回負數,其他 Goroutine 獲得了寫鎖,當前 Goroutine 就會調用`runtime.sync_runtime_SemacquireMutex`陷入休眠等待鎖的釋放; 如果`RLock`該方法的結果為非負數,沒有 Goroutine 獲得寫鎖,當前方法就會成功返回. 當 Goroutine 想要釋放讀鎖時,會調用如下所示的`RUnlock`方法: ~~~go func (rw *RWMutex) RUnlock() { if race.Enabled { _ = rw.w.state race.ReleaseMerge(unsafe.Pointer(&rw.writerSem)) race.Disable() } if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { // Outlined slow-path to allow the fast-path to be inlined rw.rUnlockSlow(r) } if race.Enabled { race.Enable() } } ~~~ 該方法會先減少正在讀資源的`readerCount`整數,根據`sync/atomic.AddInt32`的返回值不同會分別進行處理: * 如果返回值大于等于零,表示讀鎖直接解鎖成功. * 如果返回值小于零 ,表示有一個正在執行的寫操作,在這時會調用`rUnlockSlow`方法. ~~~go func (rw *RWMutex) rUnlockSlow(r int32) { if r+1 == 0 || r+1 == -rwmutexMaxReaders { race.Enable() throw("sync: RUnlock of unlocked RWMutex") } // A writer is pending. if atomic.AddInt32(&rw.readerWait, -1) == 0 { // The last reader unblocks the writer. runtime_Semrelease(&rw.writerSem, false, 1) } } ~~~ `rUnlockSlow`該方法會減少獲取鎖的寫操作等待的讀操作數`readerWait`并在所有讀操作都被釋放之后觸發寫操作的信號量,`writerSem`,該信號量被觸發時,調度器就會喚醒嘗試獲取寫鎖的 Goroutine。 其實讀寫互斥鎖(sync.RWMutex),雖然提供的功能非常復雜,不過因為它是在互斥鎖( sync.Mutex)的基礎上,所以整體的實現上會簡單很多。 因此呢: * 調用`sync.RWMutex.Lock`嘗試獲取寫鎖時; 每次`sync.RWMutex.RUnlock`都會將`readerCount`其減一,當它歸零時該 Goroutine 就會獲得寫鎖, 將`readerCount`減少`rwmutexMaxReaders`個數以阻塞后續的讀操作. * 調用`sync.RWMutex.Unlock`釋放寫鎖時,會先通知所有的讀操作,然后才會釋放持有的互斥鎖; 讀寫互斥鎖在互斥鎖之上提供了額外的更細粒度的控制,能夠在讀操作遠遠多于寫操作時提升性能。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看