# 同步,第 3 部分:使用互斥鎖和信號量
> 原文:<https://github.com/angrave/SystemProgramming/wiki/Synchronization%2C-Part-3%3A-Working-with-Mutexes-And-Semaphores>
## 線程安全棧
## 什么是原子操作?
用維基百科來解釋
> 如果系統的其余部分看起來是即時發生的,則操作(或一組操作)是原子的或不可中斷的。沒有鎖,只有簡單的 CPU 指令(“從內存中讀取這個字節”)是原子的(不可分割的)。在單 CPU 系統上,可以暫時禁用中斷(因此不會中斷一系列操作)但實際上通過使用同步原語(通常是互斥鎖)來實現原子性。
增加變量(`i++`)是 _ 而不是 _ 原子,因為它需要三個不同的步驟:將位模式從存儲器復制到 CPU 中;使用 CPU 的寄存器執行計算;將位模式復制回內存。在此遞增序列期間,另一個線程或進程仍然可以讀取舊值,并且當遞增序列完成時,對同一存儲器的其他寫入也將被覆蓋。
## 如何使用互斥鎖使我的數據結構線程安全?
注意,這只是一個介紹 - 編寫高性能的線程安全數據結構需要它自己的書!這是一個非線程安全的簡單數據結構(棧):
```c
// A simple fixed-sized stack (version 1)
#define STACK_SIZE 20
int count;
double values[STACK_SIZE];
void push(double v) {
values[count++] = v;
}
double pop() {
return values[--count];
}
int is_empty() {
return count == 0;
}
```
棧的版本 1 不是線程安全的,因為如果兩個線程同時調用 push 或 pop,則結果或棧可能不一致。例如,假設兩個線程同時調用 pop,則兩個線程可以讀取相同的值,兩者都可以讀取原始計數值。
為了將其轉換為線程安全的數據結構,我們需要識別代碼的 _ 關鍵部分 _,即代碼的哪個部分一次只能有一個線程。在上面的例子中,`push`,`pop`和`is_empty`函數訪問相同的變量(即存儲器)和棧的所有關鍵部分。
當`push`(和`pop`)正在執行時,數據結構是不一致的狀態(例如,計數可能尚未寫入,因此可能仍包含原始值)。通過使用互斥鎖包裝這些方法,我們可以確保一次只有一個線程可以更新(或讀取)棧。
候選“解決方案”如下所示。這是對的嗎?如果沒有,它將如何失敗?
```c
// An attempt at a thread-safe stack (version 2)
#define STACK_SIZE 20
int count;
double values[STACK_SIZE];
pthread_mutex_t m1 = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t m2 = PTHREAD_MUTEX_INITIALIZER;
void push(double v) {
pthread_mutex_lock(&m1);
values[count++] = v;
pthread_mutex_unlock(&m1);
}
double pop() {
pthread_mutex_lock(&m2);
double v = values[--count];
pthread_mutex_unlock(&m2);
return v;
}
int is_empty() {
pthread_mutex_lock(&m1);
return count == 0;
pthread_mutex_unlock(&m1);
}
```
上面的代碼('版本 2')包含至少一個錯誤。花點時間看看你是否可以得到錯誤并找出后果。
如果三個同時調用`push()`,則`m1`確保只有一個線程處理棧(兩個線程需要等到第一個線程完成(調用解鎖)),然后第二個線程將被允許繼續進入臨界區,最后第二個線程完成后,第三個線程將被允許繼續。
類似的參數適用于`pop`的并發調用(同時調用)。但是,版本 2 不會阻止 push 和 pop 同時運行,因為`push`和`pop`使用兩個不同的互斥鎖。
在這種情況下,修復很簡單 - 對推送和彈出功能使用相同的互斥鎖。
代碼有第二個錯誤;比較后`is_empty`返回,不會解鎖互斥鎖。但是,錯誤不會立即被發現。例如,假設一個線程調用`is_empty`,第二個線程稍后調用`push`。這個線程會神秘地停止。使用調試器可以發現線程卡在`push`方法內的 lock()方法中,因為鎖定從未被先前的`is_empty`調用解鎖。因此,在一個線程中的疏忽導致在任意其他線程中很晚的問題。
更好的版本如下所示 -
```c
// An attempt at a thread-safe stack (version 3)
int count;
double values[count];
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
void push(double v) {
pthread_mutex_lock(&m);
values[count++] = v;
pthread_mutex_unlock(&m);
}
double pop() {
pthread_mutex_lock(&m);
double v = values[--count];
pthread_mutex_unlock(&m);
return v;
}
int is_empty() {
pthread_mutex_lock(&m);
int result= count == 0;
pthread_mutex_unlock(&m);
return result;
}
```
版本 3 是線程安全的(我們已確保所有關鍵部分的互斥)但是有兩點需要注意:
* `is_empty`是線程安全的,但其結果可能已經過時,即在線程獲得結果時棧可能不再為空!
* 沒有防止下溢(在空棧上彈出)或溢出(推入已經滿的棧)的保護
后一點可以使用計數信號量來修復。
該實現假設一個棧。更通用的版本可能包含互斥鎖作為內存結構的一部分,并使用 pthread_mutex_init 初始化互斥鎖。例如,
```c
// Support for multiple stacks (each one has a mutex)
typedef struct stack {
int count;
pthread_mutex_t m;
double *values;
} stack_t;
stack_t* stack_create(int capacity) {
stack_t *result = malloc(sizeof(stack_t));
result->count = 0;
result->values = malloc(sizeof(double) * capacity);
pthread_mutex_init(&result->m, NULL);
return result;
}
void stack_destroy(stack_t *s) {
free(s->values);
pthread_mutex_destroy(&s->m);
free(s);
}
// Warning no underflow or overflow checks!
void push(stack_t *s, double v) {
pthread_mutex_lock(&s->m);
s->values[(s->count)++] = v;
pthread_mutex_unlock(&s->m); }
double pop(stack_t *s) {
pthread_mutex_lock(&s->m);
double v = s->values[--(s->count)];
pthread_mutex_unlock(&s->m);
return v;
}
int is_empty(stack_t *s) {
pthread_mutex_lock(&s->m);
int result = s->count == 0;
pthread_mutex_unlock(&s->m);
return result;
}
```
使用示例:
```c
int main() {
stack_t *s1 = stack_create(10 /* Max capacity*/);
stack_t *s2 = stack_create(10);
push(s1, 3.141);
push(s2, pop(s1));
stack_destroy(s2);
stack_destroy(s1);
}
```
## 棧信號量
## 如果棧為空或已滿,我如何強制我的線程等待?
使用計數信號量!使用計數信號量來跟蹤剩余的空格數和另一個信號量來跟蹤棧中的項目數。我們將這兩個信號量稱為“sremain”和“sitems”。記住,如果信號量的計數已減少到零(由另一個調用 sem_post 的線程),`sem_wait`將等待。
```c
// Sketch #1
sem_t sitems;
sem_t sremain;
void stack_init(){
sem_init(&sitems, 0, 0);
sem_init(&sremain, 0, 10);
}
double pop() {
// Wait until there's at least one item
sem_wait(&sitems);
...
void push(double v) {
// Wait until there's at least one space
sem_wait(&sremain);
...
```
草圖#2 過早地實現了`post`。另一個在推送中等待的線程可能會錯誤地嘗試寫入完整棧(類似地,在 pop()中等待的線程被允許過早地繼續)。
```c
// Sketch #2 (Error!)
double pop() {
// Wait until there's at least one item
sem_wait(&sitems);
sem_post(&sremain); // error! wakes up pushing() thread too early
return values[--count];
}
void push(double v) {
// Wait until there's at least one space
sem_wait(&sremain);
sem_post(&sitems); // error! wakes up a popping() thread too early
values[count++] = v;
}
```
Sketch 3 實現了正確的信號量邏輯,但是你能發現錯誤嗎?
```c
// Sketch #3 (Error!)
double pop() {
// Wait until there's at least one item
sem_wait(&sitems);
double v= values[--count];
sem_post(&sremain);
return v;
}
void push(double v) {
// Wait until there's at least one space
sem_wait(&sremain);
values[count++] = v;
sem_post(&sitems);
}
```
Sketch 3 使用信號量正確執行緩沖區已滿和緩沖區空條件。然而,沒有 _ 互斥 _:兩個線程可能同時位于 _ 臨界區 _ 中,這會破壞數據結構(或最不會導致數據丟失)。解決方法是在臨界區周圍包含一個互斥鎖:
```c
// Simple single stack - see above example on how to convert this into a multiple stacks.
// Also a robust POSIX implementation would check for EINTR and error codes of sem_wait.
// PTHREAD_MUTEX_INITIALIZER for statics (use pthread_mutex_init() for stack/heap memory)
pthread_mutex_t m= PTHREAD_MUTEX_INITIALIZER;
int count = 0;
double values[10];
sem_t sitems, sremain;
void init() {
sem_init(&sitems, 0, 0);
sem_init(&sremains, 0, 10); // 10 spaces
}
double pop() {
// Wait until there's at least one item
sem_wait(&sitems);
pthread_mutex_lock(&m); // CRITICAL SECTION
double v= values[--count];
pthread_mutex_unlock(&m);
sem_post(&sremain); // Hey world, there's at least one space
return v;
}
void push(double v) {
// Wait until there's at least one space
sem_wait(&sremain);
pthread_mutex_lock(&m); // CRITICAL SECTION
values[count++] = v;
pthread_mutex_unlock(&m);
sem_post(&sitems); // Hey world, there's at least one item
}
// Note a robust solution will need to check sem_wait's result for EINTR (more about this later)
```
## 常見的 Mutex 陷阱是什么?
* 鎖定/解鎖錯誤的互斥鎖(由于愚蠢的錯字)
* 沒有解鎖互斥鎖(因為在錯誤情況下提前返回)
* 資源泄漏(不調用`pthread_mutex_destroy`)
* 使用未初始化的互斥鎖(或使用已經被破壞的互斥鎖)
* 在線程上鎖定互斥鎖兩次(不先解鎖)
* 死鎖和優先級倒置(稍后我們將討論這些)
- UIUC CS241 系統編程中文講義
- 0. 簡介
- #Informal 詞匯表
- #Piazza:何時以及如何尋求幫助
- 編程技巧,第 1 部分
- 系統編程短篇小說和歌曲
- 1.學習 C
- C 編程,第 1 部分:簡介
- C 編程,第 2 部分:文本輸入和輸出
- C 編程,第 3 部分:常見問題
- C 編程,第 4 部分:字符串和結構
- C 編程,第 5 部分:調試
- C 編程,復習題
- 2.進程
- 進程,第 1 部分:簡介
- 分叉,第 1 部分:簡介
- 分叉,第 2 部分:Fork,Exec,等等
- 進程控制,第 1 部分:使用信號等待宏
- 進程復習題
- 3.內存和分配器
- 內存,第 1 部分:堆內存簡介
- 內存,第 2 部分:實現內存分配器
- 內存,第 3 部分:粉碎堆棧示例
- 內存復習題
- 4.介紹 Pthreads
- Pthreads,第 1 部分:簡介
- Pthreads,第 2 部分:實踐中的用法
- Pthreads,第 3 部分:并行問題(獎金)
- Pthread 復習題
- 5.同步
- 同步,第 1 部分:互斥鎖
- 同步,第 2 部分:計算信號量
- 同步,第 3 部分:使用互斥鎖和信號量
- 同步,第 4 部分:臨界區問題
- 同步,第 5 部分:條件變量
- 同步,第 6 部分:實現障礙
- 同步,第 7 部分:讀者編寫器問題
- 同步,第 8 部分:環形緩沖區示例
- 同步復習題
- 6.死鎖
- 死鎖,第 1 部分:資源分配圖
- 死鎖,第 2 部分:死鎖條件
- 死鎖,第 3 部分:餐飲哲學家
- 死鎖復習題
- 7.進程間通信&amp;調度
- 虛擬內存,第 1 部分:虛擬內存簡介
- 管道,第 1 部分:管道介紹
- 管道,第 2 部分:管道編程秘密
- 文件,第 1 部分:使用文件
- 調度,第 1 部分:調度過程
- 調度,第 2 部分:調度過程:算法
- IPC 復習題
- 8.網絡
- POSIX,第 1 部分:錯誤處理
- 網絡,第 1 部分:簡介
- 網絡,第 2 部分:使用 getaddrinfo
- 網絡,第 3 部分:構建一個簡單的 TCP 客戶端
- 網絡,第 4 部分:構建一個簡單的 TCP 服務器
- 網絡,第 5 部分:關閉端口,重用端口和其他技巧
- 網絡,第 6 部分:創建 UDP 服務器
- 網絡,第 7 部分:非阻塞 I O,select()和 epoll
- RPC,第 1 部分:遠程過程調用簡介
- 網絡復習題
- 9.文件系統
- 文件系統,第 1 部分:簡介
- 文件系統,第 2 部分:文件是 inode(其他一切只是數據...)
- 文件系統,第 3 部分:權限
- 文件系統,第 4 部分:使用目錄
- 文件系統,第 5 部分:虛擬文件系統
- 文件系統,第 6 部分:內存映射文件和共享內存
- 文件系統,第 7 部分:可擴展且可靠的文件系統
- 文件系統,第 8 部分:從 Android 設備中刪除預裝的惡意軟件
- 文件系統,第 9 部分:磁盤塊示例
- 文件系統復習題
- 10.信號
- 過程控制,第 1 部分:使用信號等待宏
- 信號,第 2 部分:待處理的信號和信號掩碼
- 信號,第 3 部分:提高信號
- 信號,第 4 部分:信號
- 信號復習題
- 考試練習題
- 考試主題
- C 編程:復習題
- 多線程編程:復習題
- 同步概念:復習題
- 記憶:復習題
- 管道:復習題
- 文件系統:復習題
- 網絡:復習題
- 信號:復習題
- 系統編程笑話