[TOC]
## MUTEX
### 互斥鎖的實現機制
是 臨界區
在并發編程中,如果程序中的一部分會被并發訪問或修改,為了避免并發訪問導致的意想不到的結果,這部分程序需要被保護起來,這部分被保護起來的程序,就叫做臨界區。
### 對外暴漏的方法和結構
Locker接口:
~~~
type Locker interface {
Lock()
Unlock()
}
~~~
Mutex 就實現了這個接口,Lock請求鎖,Unlock釋放鎖
~~~
type Mutex struct {
state int32 //鎖狀態,保護四部分含義
sema uint32 //信號量,用于阻塞等待或者喚醒
}
~~~

**Locked**:表示該 mutex 是否被鎖定,0 表示沒有,1 表示處于鎖定狀態;
**Woken**:表示是否有協程被喚醒,0 表示沒有,1 表示有協程處于喚醒狀態,并且在加鎖過程中;
**Starving**:Go1.9 版本之后引入,表示 mutex 是否處于饑餓狀態,0 表示沒有,1 表示有協程處于饑餓狀態;
**Waiter**: 等待鎖的協程數量。
#### 方法解析
~~~
const (
// mutex is locked ,在低位,值 1
mutexLocked = 1 << iota
//標識有協程被喚醒,處于 state 中的第二個 bit 位,值 2
mutexWoken
//標識 mutex 處于饑餓模式,處于 state 中的第三個 bit 位,值 4
mutexStarving
// 值 3,state 值通過右移三位可以得到 waiter 的數量
// 同理,state += 1 << mutexWaiterShift,可以累加 waiter 的數量
mutexWaiterShift = iota
// 標識協程處于饑餓狀態的最長阻塞時間,當前被設置為 1ms
starvationThresholdNs = 1e6
)
~~~
#### Lock
~~~
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex. //運氣好,直接加鎖成功
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
//內聯,加鎖失敗,就得去自旋競爭或者饑餓模式下競爭
m.lockSlow()
}
~~~
~~~
func (m *Mutex) lockSlow() {
var waitStartTime int64
// 標識是否處于饑餓模式
starving := false
// 喚醒標記
awoke := false
// 自旋次數
iter := 0
old := m.state
for {
// 非饑餓模式下,開啟自旋操作
// 從 runtime_canSpin(iter) 的實現中(runtime/proc.sync_runtime_canSpin)可以知道,
// 如果 iter 的值大于 4,將返回 false
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 如果沒有其他 waiter 被喚醒,那么將當前協程置為喚醒狀態,同時 CAS 更新 mutex 的 Woken 位
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
// 開啟自旋
runtime_doSpin()
iter++
// 重新檢查 state 的值
old = m.state
continue
}
new := old
// 非饑餓狀態
if old&mutexStarving == 0 {
// 當前協程可以直接加鎖
new |= mutexLocked
}
// mutex 已經被鎖住或者處于饑餓模式
// 那么當前協程不能獲取到鎖,將會進入等待狀態
if old&(mutexLocked|mutexStarving) != 0 {
// waiter 數量加 1,當前協程處于等待狀態
new += 1 << mutexWaiterShift
}
// 當前協程處于饑餓狀態并且 mutex 依然被鎖住,那么設置 mutex 為饑餓模式
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// 清除喚醒標記
// &^ 與非操作,mutexWoken: 10 -> 01
// 此操作之后,new 的 Locked 位值是 1,如果能夠成功寫入到 m.state 字段,那么當前協程獲取鎖成功
new &^= mutexWoken
}
// CAS 設置新狀態成功
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 舊的鎖狀態已經被釋放并且處于非饑餓狀態
// 這個時候當前協程正常請求到了鎖,就可以直接返回了
if old&(mutexLocked|mutexStarving) == 0 {
break
}
// 處理當前協程的饑餓狀態
// 如果之前已經處于等待狀態了(已經在隊列里面),那么將其加入到隊列頭部,從而可以被高優喚醒
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
// 阻塞開始時間
waitStartTime = runtime_nanotime()
}
// P 操作,阻塞等待
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 喚醒之后,如果當前協程等待超過 1ms,那么標識當前協程處于饑餓狀態
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// mutex 已經處于饑餓模式
if old&mutexStarving != 0 {
// 1. 如果當前協程被喚醒但是 mutex 還是處于鎖住狀態
// 那么 mutex 處于非法狀態
//
// 2. 或者如果此時 waiter 數量是 0,并且 mutex 未被鎖住
// 代表當前協程沒有在 waiters 中,但是卻想要獲取到鎖,那么 mutex 狀態非法
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// delta 代表加鎖并且將 waiter 數量減 1 兩步操作
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 非饑餓狀態 或者 當前只剩下一個 waiter 了(就是當前協程本身)
if !starving || old>>mutexWaiterShift == 1 {
// 那么 mutex 退出饑餓模式
delta -= mutexStarving
}
// 設置新的狀態
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
~~~
### 加鎖過程:
(1)協程調用Lock()函數。
(2)判斷這個鎖是否未初始化,如果該鎖沒有初始化,則對該鎖進行初始化設置,然后獲取鎖,Lock()函數返回。如果該鎖已被初始化過,則進入下一步操作。
(3)判斷鎖與協程的當前狀態,如果該鎖處于加鎖狀態且當前協程不處于饑餓狀態,則嘗試獲取該鎖的協程會進入自旋狀態(最多自旋4次)。
(4)自旋完成后,計算當前鎖的狀態,協程嘗試獲取鎖,如果獲取成功,Lock()函數返回。如果獲取失敗,則進入下一步操作。
(5)自旋完成后嘗試獲取鎖失敗,該協程進入休眠狀態,協程進入阻塞狀態,并不斷嘗試獲取信號。
(6)休眠協程獲取到信號(有其他協程釋放了鎖),被喚醒,此時go會判斷當前協程是否處于饑餓狀態(如果協程從調用Lock()函數起,超過1ms后都沒有成功獲取到鎖,則該協程進入饑餓狀態)。如果該協程處于饑餓狀態,不會啟動自旋過程,而是再次被阻塞,一旦有其他的協程釋放了鎖,那么該饑餓協程就會被喚醒并直接獲取鎖,Lock()函數返回。如果該協程處于正常狀態,則進入下一步操作。
(7)如果協程被判定處于正常狀態,即滿足再次啟動自旋條件(協程從調用Lock()函數起到現在被喚醒的總時間不超過1ms),則重置自旋次數,再次啟動自旋,嘗試獲取鎖。如果自旋結束后,該協程還是沒有獲取到鎖,則回到步驟(3),開始新一輪循環。
在加鎖過程中有兩種情況:
* Mutex 處于非饑餓模式:這個時候如果協程加鎖不成功不會立刻進入阻塞隊列,而是判斷自己是否滿足自旋的條件,如果滿足,則啟動自旋,在自旋過程中嘗試獲取鎖。
* Mutex 處于饑餓模式:如果一個協程阻塞等待的時間過長(超過 1ms),那么 mutex 會被標記為饑餓模式,此時協程搶鎖過程中不會開啟自旋,而是一旦有協程釋放了鎖,那么一定會喚醒協程,被喚醒的協程將會成功獲取到鎖。
> 非饑餓模式下,協程會開啟自旋操作。自旋能夠避免協程切換,使當前自旋的協程有機會更快獲取到鎖。自旋對應 CPU 的 PAUSE 指令,相當于 CPU 進行空轉。
由于自旋操作會很大程度上給 CPU 帶來一定的壓力,因此自旋不能無限制進行下去,所有在這里,會通過?`sync_runtime_canSpin(int)`?判斷能否進入自旋,
>允許自旋的條件如下:
* 自旋次數要不超過 4 次,這個是根據入參確定的;
* CPU 的核數要大于 1,否則自旋沒有意義,因為沒有其他的協程能夠在當前協程自旋期間獲取到時間片而釋放鎖,自旋只會白白浪費時間;
* gomaxprocs 要大于 1,也就是 GMP 中的 P (Processor);
* 至少有一個本地的 P 隊列,并且其可運行的 G 隊列為空。
#### UnLock
~~~
// 解鎖操作
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// mutexLocked 位設置為 0,解鎖
new := atomic.AddInt32(&m.state, -mutexLocked)
// 如果此時 state 值不是 0,代表其他位不是 0(或者出現異常使用導致 mutexLocked 位也不是 0)
// 此時需要進一步做一些其他操作,比如喚醒等待中的協程等
if new != 0 {
m.unlockSlow(new)
}
}
~~~
解鎖操作會根據 Mutex.state 的狀態來判斷需不需要去喚醒其他等待中的協程。
~~~
func (m *Mutex) unlockSlow(new int32) {
// new - state 字段原子減 1 之后的值,如果之前是處于加鎖狀態,那么此時 new 的末位應該是 0
// 此時 new+mutexLocked 正常情況下會將 new 末位變成 1
// 那么如果和 mutexLocked 做與運算之后的結果是 0,代表 new 值非法,解鎖了一個未加鎖的 mutex
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// 如果不是處于饑餓狀態
if new&mutexStarving == 0 {
old := new
for {
// old>>mutexWaiterShift == 0 代表沒有等待加鎖的協程了,自然不需要執行喚醒操作
// old&mutexLocked != 0 代表已經有協程加鎖成功,此時沒有必要再喚醒一個協程(因為它不可能加鎖成功)
// old&mutexWoken != 0 代表已經有協程被喚醒并且在加鎖過程中,此時不需要再執行喚醒操作了
// old&mutexStarving != 0 代表已經進入了饑餓狀態,
// 以上四種情況,皆不需要執行喚醒操作
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 喚醒一個等待中的協程,將 state woken 位置為 1
// old - 1<<mutexWaiterShift waiter 數量減 1
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// 饑餓模式
// 將 mutex 的擁有權轉移給下一個 waiter,并且交出 CPU 時間片,從而能夠讓下一個 waiter 立刻開始執行
runtime_Semrelease(&m.sema, true, 1)
}
}
~~~
饑餓模式的最大等待時間閾值設置成了 1 毫秒,一旦等待者等待的時間超過了這個閾值,Mutex 的處理就有可能進入饑餓模式,優先讓等待者先獲取到鎖,新來的讓一下,給之前的一些機會。
通過加入饑餓模式,可以避免把機會全都留給新來的 goroutine,保證了請求鎖的goroutine 獲取鎖的公平性。
### 解鎖過程
1、先判斷當前是否加鎖了,如果沒加鎖就解鎖,直接報錯
2、分兩種情況:
>1、正常模式下,判斷是否需要喚醒g,如果不需要,就直接返回,如果需要就把waiter-1操作
>2、饑餓模式,直接把鎖交給等待隊列頭部g,讓他運行
#### 注意事項
1、**Lock() / Unlock() 方法一定要成對出現**
2、**小心拷貝 Mutex**,拷貝的話會帶上之前的mutex的數據,導致出現問題
3、**Lock() / Unlock() 盡量在同一個協程里面**,遵循“**誰申請,誰釋放**”
### 饑餓模式和正常模式
> Mutex 可能處于兩種操作模式下:正常模式和饑餓模式
#### 正常模式(非公平鎖)
正常模式下waiter 都是進入先入先出隊列,被喚醒的 waiter 并不會直接持有鎖,而是要和新來的 goroutine 進行競爭。新來的 goroutine 有先天的優勢,它們正在 CPU 中運行,可能它們的數量還不少,所以,在高并發情況下,被喚醒的 waiter 可能比較悲劇地獲取不到鎖,這時,它會被插入到隊列的前面。
##### 正常鎖-->饑餓鎖觸發條件:
>如果 waiter 獲取不到鎖的時間超過閾值 1 毫秒,那么,這個 Mutex 就進入到了饑餓模式。
#### 饑餓模式(公平鎖)
Mutex 的擁有者將直接把鎖交給隊列最前面的 waiter。新來的 goroutine不會嘗試獲取鎖,即使看起來鎖沒有被持有,它也不會去搶,也不會 spin,它會乖乖地加入到等待隊列的尾部。
##### 饑餓-->正常 轉換:
>* 此 waiter 已經是隊列中的最后一個 waiter 了,沒有其它的等待鎖的 goroutine 了;
>* 此 waiter 的等待時間小于 1 毫秒。
### 總結
正常模式擁有更好的性能,因為即使有等待搶鎖的 waiter,goroutine 也可以連續多次獲取到鎖。
饑餓模式是對公平性和性能的一種平衡,它避免了某些 goroutine 長時間的等待鎖。在饑餓模式下,優先對待的是那些一直在等待的 waiter。
- Go準備工作
- 依賴管理
- Go基礎
- 1、變量和常量
- 2、基本數據類型
- 3、運算符
- 4、流程控制
- 5、數組
- 數組聲明和初始化
- 遍歷
- 數組是值類型
- 6、切片
- 定義
- slice其他內容
- 7、map
- 8、函數
- 函數基礎
- 函數進階
- 9、指針
- 10、結構體
- 類型別名和自定義類型
- 結構體
- 11、接口
- 12、反射
- 13、并發
- 14、網絡編程
- 15、單元測試
- Go常用庫/包
- Context
- time
- strings/strconv
- file
- http
- Go常用第三方包
- Go優化
- Go問題排查
- Go框架
- 基礎知識點的思考
- 面試題
- 八股文
- 操作系統
- 整理一份資料
- interface
- array
- slice
- map
- MUTEX
- RWMUTEX
- Channel
- waitGroup
- context
- reflect
- gc
- GMP和CSP
- Select
- Docker
- 基本命令
- dockerfile
- docker-compose
- rpc和grpc
- consul和etcd
- ETCD
- consul
- gin
- 一些小點
- 樹
- K8s
- ES
- pprof
- mycat
- nginx
- 整理后的面試題
- 基礎
- Map
- Chan
- GC
- GMP
- 并發
- 內存
- 算法
- docker