[TOC]
## RWMUTEX
標準庫中的 RWMutex 是一個 reader/writer 互斥鎖。RWMutex 在某一時刻只能由任意數量的 reader 持有,或者是只被單個的 writer 持有。
讀寫鎖的設計一般滿足如下規則:
* 寫寫互斥:這個很好理解,與一般的互斥鎖語義相同;
* 讀寫互斥,包含兩部分含義,都是為了避免讀不一致的情況產生:(類似mysql的幻讀)
* ?在擁有寫鎖的時候其他協程不能獲取到讀鎖;????
* 在擁有讀鎖的時候其他協程不能獲取到寫鎖;
* 讀讀不互斥:不同的協程可以同時獲取到讀鎖,這個是提高讀性能的關鍵所在。
### 提供的方法
RWMutex 對外共提供了五個方法,分別是:
* Lock/Unlock:寫操作時調用的方法。如果鎖已經被 reader 或者 writer 持有,那么,Lock 方法會一直阻塞,直到能獲取到鎖;Unlock 則是配對的釋放鎖的方法。
* RLock/RUnlock:讀操作時調用的方法。如果鎖已經被 writer 持有的話,RLock 方法會一直阻塞,直到能獲取到鎖,否則就直接返回;而 RUnlock 是 reader 釋放鎖的方法。
* RLocker:這個方法的作用是為讀操作返回一個 Locker 接口的對象。它的 Lock 方法會調用 RWMutex 的 RLock 方法,它的 Unlock 方法會調用 RWMutex 的 RUnlock 方法。
### RWMutex 實現
通過記錄 readerCount 讀鎖的數量來進行控制,當有一個寫鎖的時候,會將讀 鎖數量設置為負數 1<<30。目的是讓新進入的讀鎖等待之前的寫鎖釋放通知讀 鎖。同樣的當有寫鎖進行搶占時,也會等待之前的讀鎖都釋放完畢,才會開始 21 進行后續的操作。 而等寫鎖釋放完之后,會將值重新加上 1<<30, 并通知剛才 新進入的讀鎖(rw.readerSem),兩者互相限制。
### RWMutex 結構體
~~~
type RWMutex struct {
w Mutex // 復用互斥鎖能力
//寫鎖信號量 當阻塞寫操作的讀操作goroutine釋放讀鎖時,通過該信號量通知阻塞的寫操作的goroutine;
writerSem uint32
// 讀鎖信號量 當寫操作goroutine釋放寫鎖時,通過該信號量通知阻塞的讀操作的goroutine
readerSem uint32
// 當前讀操作的數量,包含所有已經獲取到讀鎖或者被寫操作阻塞的等待獲取讀鎖的讀操作數量
readerCount int32
// 獲取寫鎖需要等待讀鎖釋放的數量
readerWait int32
}
~~~
RWMutex 利用 readerCount / readerWait 屬性,提供了一個非常巧妙的思路。寫操作到來的時候,會把 readerCount 的值復制給 readerWait,用來標記在當前寫操作之前的讀操作的數量。當讀鎖釋放的時候,會遞減 readerWait,當它變成 0 的時候,就代表前面的讀操作全部完成了,此時寫操作會被喚醒。
### 寫鎖
~~~
const rwmutexMaxReaders = 1 << 30
func (rw *RWMutex) Lock() {
// First, resolve competition with other writers.
// 寫鎖也就是互斥鎖,復用互斥鎖的能力來解決與其他寫鎖的競爭
// 如果寫鎖已經被獲取了,其他goroutine在獲取寫鎖時會進入自旋或者休眠
rw.w.Lock()
// 將readerCount設置為負值,告訴讀鎖現在有一個正在等待運行的寫鎖(獲取互斥鎖成功)
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// 獲取互斥鎖成功并不代表goroutine獲取寫鎖成功,我們默認最大有2^30的讀操作數目,減去這個最大數目
// 后仍然不為0則表示前面還有讀鎖,需要等待讀鎖釋放并更新寫操作被阻塞時等待的讀操作goroutine個數;
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}
~~~
* 獲取互斥鎖,寫鎖也就是互斥鎖,這里我們復用互斥鎖mutex的加鎖能力,當互斥鎖加鎖成功后,其他寫鎖goroutine再次嘗試獲取鎖時就會進入自旋休眠等待;
* 判斷獲取寫鎖是否成功,這里有一個變量rwmutexMaxReaders = 1 << 30表示最大支持2^30個并發讀,互斥鎖加鎖成功后,假設2^30個讀操作都已經釋放了讀鎖,通過原子操作將readerCount設置為負數在加上2^30,如果此時r仍然不為0說面還有讀操作正在進行,則寫鎖需要等待,同時通過原子操作更新readerWait字段,也就是更新寫操作被阻塞時等待的讀操作goroutine個數;readerWait在上文的讀鎖釋放鎖時會進行判斷,進行遞減,當前readerWait遞減到0時就會喚醒寫鎖。
~~~
func (rw *RWMutex) Unlock() {
// Announce to readers there is no active writer.
// 將readerCount的恢復為正數,也就是解除對讀鎖的互斥
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// 如果后面還有讀操作的goroutine則需要喚醒他們
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// 釋放互斥鎖,寫操作的goroutine和讀操作的goroutine同時競爭
rw.w.Unlock()
}
~~~
### 讀鎖
~~~
func (rw *RWMutex) RLock() {
// 原子操作readerCount 只要值不是負數就表示獲取讀鎖成功
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// 有一個正在等待的寫鎖,為了避免饑餓后面進來的讀鎖進行阻塞等待
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
}
~~~
精簡了競態檢測的方法,讀鎖方法就只有兩行代碼了,邏輯如下:
使用原子操作更新readerCount,將readercount值加1,只要原子操作后值不為負數就表示加讀鎖成功,如果值為負數表示已經有寫鎖獲取互斥鎖成功,寫鎖goroutine正在等待或運行,所以為了避免饑餓后面進來的讀鎖要進行阻塞等待,調用runtime\_SemacquireMutex阻塞等待。
釋放讀鎖代碼主要分為兩部分,第一部分:
~~~
func (rw *RWMutex) RUnlock() {
// 將readerCount的值減1,如果值等于等于0直接退出即可;否則進入rUnlockSlow處理
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
}
~~~
readerCount的值代表當前正在執行的讀操作goroutine數量,執行遞減操作后的值大于等于0表示當前沒有異常場景或寫鎖阻塞等待,所以直接退出即可,否則需要處理這兩個邏輯:
rUnlockSlow邏輯如下:
~~~
func (rw *RWMutex) rUnlockSlow(r int32) {
// r+1等于0表示沒有加讀鎖就釋放讀鎖,異常場景要拋出異常
// r+1 == -rwmutexMaxReaders 也表示沒有加讀鎖就是釋放讀鎖
// 因為寫鎖加鎖成功后會將readerCout的值減去rwmutexMaxReaders
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// 如果有寫鎖正在等待讀鎖時會更新readerWait的值,所以一步遞減rw.readerWait值
// 如果readerWait在原子操作后的值等于0了說明當前阻塞寫鎖的讀鎖都已經釋放了,需要喚醒等待的寫鎖
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
~~~
代碼解讀:
* r+1等于0說明當前goroutine沒有加讀鎖就進行釋放讀鎖操作,屬于非法操作
* r+1 == -rwmutexMaxReaders?說明寫鎖加鎖成功了會將readerCount的減去rwmutexMaxReaders變成負數,如果此前沒有加讀鎖,那么直接釋放讀鎖就會造成這個等式成立,也屬于沒有加讀鎖就進行釋放讀鎖操作,屬于非法操作;
* readerWait代表寫操作被阻塞時讀操作的goroutine數量,如果有寫鎖正在等待時就會更新readerWait的值,讀鎖釋放鎖時需要readerWait進行遞減,如果遞減后等于0說明當前阻塞寫鎖的讀鎖都已經釋放了,需要喚醒等待的寫鎖。(看下文寫鎖的代碼就呼應上了)
### 讀寫鎖插隊的策略
假設現在有5個goroutine分別是G1、G2、G3、G4、G5,現在G1、G2獲取讀鎖成功,還沒釋放讀鎖,G3要執行寫操作,獲取寫鎖失敗就會阻塞等待,當前阻塞寫鎖的讀鎖goroutine數量為2

后續G4進來想要獲取讀鎖,這時她就會判斷如果當前有寫鎖的goroutine正在阻塞等待,為了避免寫鎖饑餓,那這個G4也會進入阻塞等待,后續G5進來想要獲取寫鎖,因為G3在占用互斥鎖,所以G5會進入自旋/休眠 阻塞等待;

現在G1、G2釋放了讀鎖,當釋放讀鎖是判斷如果阻塞寫鎖goroutine的讀鎖goroutine數量為0了并且有寫鎖等待就會喚醒正在阻塞等待的寫鎖G3,G3得到了喚醒:

G3處理完寫操作后會釋放寫鎖,這一步會同時喚醒等待的讀鎖/寫鎖的goroutine,至于G4、G5誰能先獲取鎖就看誰比較快
### 總結
* 讀寫鎖提供四種操作:讀上鎖,讀解鎖,寫上鎖,寫解鎖;加鎖規則是讀讀共享,寫寫互斥,讀寫互斥,寫讀互斥;
* 讀寫鎖中的讀鎖是一定要存在的,其目的是也是為了規避原子性問題,只有寫鎖沒有讀鎖的情況下會導致我們讀取到中間值;
* Go語言的讀寫鎖在設計上也避免了寫鎖饑餓的問題,通過字段readerCount、readerWait進行控制,當寫鎖的goroutine被阻塞時,后面進來想要獲取讀鎖的goroutine也都會被阻塞住,當寫鎖釋放時,會將后面的讀操作goroutine、寫操作的goroutine都喚醒,剩下的交給他們競爭吧;
> **讀鎖獲取鎖流程:**
* 鎖空閑時,讀鎖可以立馬被獲取
* 如果當前有寫鎖正在阻塞,那么想要獲取讀鎖的goroutine就會被休眠
>**釋放讀鎖流程:**
* 判斷是否沒加鎖就釋放,如果是就拋出異常;
* 寫鎖被讀鎖阻塞等待的場景下,會將readerWait的值進行遞減,readerWait表示阻塞寫操作goroutine的讀操作goroutine數量,當readerWait減到0時則可以喚醒被阻塞寫操作的goroutine了;
>**寫鎖獲取鎖流程**
* 寫鎖復用了mutex互斥鎖的能力,首先嘗試獲取互斥鎖,獲取互斥鎖失敗就會進入自旋/休眠;
* 獲取互斥鎖成功并不代表寫鎖加鎖成功,此時如果還有占用讀鎖的goroutine,那么就會阻塞住,否則就會加寫鎖成功
>**釋放寫鎖流程**
* 釋放寫鎖會將負值的readerCount變成正值,解除對讀鎖的互斥
* 喚醒當前阻塞住的所有讀鎖
* 釋放互斥鎖
### RWMutex 注意事項
1. RWMutex 是單寫多讀鎖,該鎖可以加多個讀鎖或者一個寫鎖
2. 讀鎖占用的情況下會阻止寫,不會阻止讀,多個 Goroutine 可以同時獲取 讀鎖
3. 寫鎖會阻止其他 Goroutine(無論讀和寫)進來,整個鎖由該 Goroutine 獨占
4. 適用于讀多寫少的場景
5. RWMutex 類型變量的零值是一個未鎖定狀態的互斥鎖
6. RWMutex 在首次被使用之后就不能再被拷貝
7. RWMutex 的讀鎖或寫鎖在未鎖定狀態,解鎖操作都會引發 panic
8. RWMutex 的一個寫鎖去鎖定臨界區的共享資源,如果臨界區的共享資源已 被(讀鎖或寫鎖)鎖定,這個寫鎖操作的 goroutine 將被阻塞直到解鎖
9. RWMutex 的讀鎖不要用于遞歸調用,比較容易產生死鎖
10. RWMutex 的鎖定狀態與特定的 goroutine 沒有關聯。一個 goroutine 可 以 RLock(Lock),另一個 goroutine 可以 RUnlock(Unlock)
11. 寫鎖被解鎖后,所有因操作鎖定讀鎖而被阻塞的 goroutine 會被喚醒,并 都可以成功鎖定讀鎖
12. 讀鎖被解鎖后,在沒有被其他讀鎖鎖定的前提下,所有因操作鎖定寫鎖而 被阻塞的 Goroutine,其中等待時間最長的一個 Goroutine 會被喚醒
- Go準備工作
- 依賴管理
- Go基礎
- 1、變量和常量
- 2、基本數據類型
- 3、運算符
- 4、流程控制
- 5、數組
- 數組聲明和初始化
- 遍歷
- 數組是值類型
- 6、切片
- 定義
- slice其他內容
- 7、map
- 8、函數
- 函數基礎
- 函數進階
- 9、指針
- 10、結構體
- 類型別名和自定義類型
- 結構體
- 11、接口
- 12、反射
- 13、并發
- 14、網絡編程
- 15、單元測試
- Go常用庫/包
- Context
- time
- strings/strconv
- file
- http
- Go常用第三方包
- Go優化
- Go問題排查
- Go框架
- 基礎知識點的思考
- 面試題
- 八股文
- 操作系統
- 整理一份資料
- interface
- array
- slice
- map
- MUTEX
- RWMUTEX
- Channel
- waitGroup
- context
- reflect
- gc
- GMP和CSP
- Select
- Docker
- 基本命令
- dockerfile
- docker-compose
- rpc和grpc
- consul和etcd
- ETCD
- consul
- gin
- 一些小點
- 樹
- K8s
- ES
- pprof
- mycat
- nginx
- 整理后的面試題
- 基礎
- Map
- Chan
- GC
- GMP
- 并發
- 內存
- 算法
- docker