最新的 C++ 迭代(稱為 C++11,在去年通過了國際標準化組織 (ISO) 的審批)形式化了一組新庫和一些保留字以處理并發。許多開發者以前都在 C++ 中使用過并發功能,但都是通過第三方的庫,即,通常直接公開 OS API。
Herb Sutter 在 2004 年 12 月宣告“免費的性能午餐”結束,因為禁止 CPU 制造商通過物理能耗和增加碳排放量來生產更快的 CPU。由此進入了當前主流的多核時代,一種新的實現,而 C++(標準組件)為適應此類變化取得了重要的飛躍。
本文下面的內容將分成兩節,另外還有一些小節。第一節,從并行執行開始,介紹允許應用程序并行運行獨立或半獨立活動的技術。第二節,從同步并發執行開始,探討同步機制,這些活動通過同步方式處理數據,以避免出現爭用情況。
本文基于即將推出的 Visual C++ 版本(現在稱為 Visual C++ 11)中包括的功能。當前版本 (Visual C++ 2010) 中已提供其中部分功能。盡管本文不提供關于為并行算法建模的指南,也不提供關于所有可用選項的詳盡文檔,但卻全面介紹了新的 C++11 并發功能,內容豐富詳實。
### 并行執行
當您對數據建模和設計算法時,很自然地就會按照具有一定順序的步驟指定這些建模和設計過程。只要性能位于可接受的范圍內,這就是最值得推薦的方案,因為它通常更易于理解,而這符合維護代碼的要求。
當性能成為令人擔憂的問題時,為處理這種情況通常都會先嘗試優化序列算法以減少使用的 CPU 循環。這種做法始終可行,直到無法再進行優化或難以優化。這時就需要將連續的一系列步驟拆分為同時進行的多項活動。
在第一節中,您將了解到以下內容:
- **異步任務:** 一小部分原始算法,僅通過它們生成或使用的數據進行鏈接。
- **線程:** 運行時環境管理的執行單元。它們與任務相關,因為任務在某種程度上在線程上運行。
- **線程內部:** 線程綁定變量、線程傳播的異常等等
### 異步任務
在本文隨附的代碼中,您將找到一個名為“順序案列”的項目,如**圖 1** 所示。
圖 1 順序案例代碼
~~~
int a, b, c;
int calculateA()
{
return a+a*b;
}
int calculateB()
{
return a*(a+a*(a+1));
}
int calculateC()
{
return b*(b+1)-b;
}
int main(int argc, char *argv[])
{
getUserData(); // initializes a and b
c = calculateA() * (calculateB() + calculateC());
showResult();
}
~~~
主函數向用戶請求一些數據,然后將該數據提交給三個函數: calculateA、calculateB 和 calculateC。稍后將組合這些結果,以便為用戶生成一些輸出信息。
隨附材料中計算函數的編碼方式在每個函數中引入了 1 到 3 秒的隨機延遲。由于這些步驟是按順序執行的,因此只要輸入數據,就會產生一個在最糟糕情況下為 9 秒的總體執行時間。您可以通過按 F5 運行本示例來嘗試此代碼。
因此,我需要修改執行序列和查找并發執行步驟。由于這些函數都是獨立的,因此我可以使用異步函數并行執行它們:
~~~
int main(int argc, char *argv[])
{
? getUserData();
? future<int> f1 = async(calculateB), f2 = async(calculateC);
? c = (calculateA() + f1.get()) * f2.get();
? showResult();
}
~~~
在這里我引入了兩個概念: async 和 future,在 <future> 標頭和 std 命名空間中都有定義。前者接收函數、lambda 或函數對象(即算符)并返回 future。您可以將 future 的概念理解為事件結果的占位符。什么結果?異步調用函數返回的結果。
在某些時候,我將需要這些并行運行函數的結果。對每個 future 調用 get 方法會阻止執行,直到值可用。
您可以通過運行隨附示例中的 AsyncTasks 項目來測試修改后的代碼,并將其與順序案例進行比較。經過此修改后最糟情況下的延遲大約為 3 秒,與順序版本的 9 秒相比有很大進步。
此輕型編程模型將開發者從創建線程的任務中解放出來。然而,您可以指定線程策略,但這里我不介紹此內容。
### 線程
前面一節介紹的異步任務模型在某些指定的應用場景中可能已經足夠了,但如果您需要進一步處理和控制線程的執行,那么 C++11 還提供了線程類,該類在 <thread> 標頭中聲明并位于 std 命名空間中。
盡管編程模型更為復雜,但線程可以提供更好的同步和協調方法,以允許它們執行其他線程并等待既定的時間長度,或直到其他線程完成后再繼續。
在以下隨附代碼的“線程”項目中提供的示例中,我讓 lambda 函數(為其賦予了整數參數)將其小于 100,000 的倍數顯示到控制臺:
~~~
auto multiple_finder = [](int n) {
? for (int i = 0; i < 100000; i++)
??? if (i%n==0)
????? cout << i << " is a multiple of " << n << endl;
};
int main(int argc, char *argv[])
{
? thread th(multiple_finder, 23456);
? multiple_finder(34567);
? th.join();
}
~~~
正如您將在后面的示例中看到的,我視情況將 lambda 傳遞給線程;一個函數或算符就已足夠。
在主函數中,我使用不同的參數在兩個線程中運行此函數。看一下生成的結果(因為運行時機不同,運行產生的結果也不同):
~~~
0 is a multiple of 23456
0 is a multiple of 34567
23456 is a multiple of 23456
34567 is a multiple of 34567
46912 is a multiple of 23456
69134 is a multiple of 34567
70368 is a multiple of 23456
93824 is a multiple of 23456
~~~
我可以使用線程實現前面一節中有關異步任務的示例。為此,我需要引入 promise 的概念。可以將 promise 理解為一個用于放置可用結果的接收器。將結果放置在其中后又從哪個位置提取該結果呢?每個 promise 都有一個關聯的 future。
圖 2中顯示的、示例代碼的 Promise 項目中提供的代碼將三個線程(而非任務)與 promise 關聯并讓每個線程調用 calculate 函數。將這些細節與輕型 AsyncTasks 版本比較。
圖 2 關聯 Future 和 Promise
~~~
typedef int (*calculate)(void);
void func2promise(calculate f, promise<int> &p)
{
p.set_value(f());
}
int main(int argc, char *argv[])
{
getUserData();
promise<int> p1, p2;
future<int> f1 = p1.get_future(), f2 = p2.get_future();
thread t1(&func2promise, calculateB, std::ref(p1)),
t2(&func2promise, calculateC, std::ref(p2));
c = (calculateA() + f1.get()) * f2.get();
t1.join(); t2.join();
showResult();
}
~~~
### 線程綁定變量和異常
在 C++ 中,您可以定義全局變量,它的范圍綁定到整個應用程序,包括線程。但相對于線程,現在有方法定義這些全局變量,以便每個線程保有自己的副本。此概念稱為線程本地存儲,其聲明如下:
~~~
thread_local int subtotal = 0;
~~~
如果聲明在函數范圍內完成,則只有該函數能夠看到變量,但每個線程將繼續維護自己的靜態副本。也就是說,每個線程的變量的值在函數調用之間將得到保持。
盡管 thread_local 在 Visual C++ 11 中不可用,但可以使用非標準的 Microsoft 擴展對它進行模擬:
~~~
#define??thread_local?__declspec(thread)
~~~
如果線程內引發異常將會發生什么?有時候可以在線程內的調用堆棧中捕獲和處理異常。但如果線程不處理異常,則需要采用一種方法將異常傳輸到發起方線程。C++11 引入了此類機制。
在圖 3 中,在隨附代碼的項目 ThreadInternals 中提供了一個 sum_until_element_with_threshold 函數,該函數遍歷矢量直至找到特定元素,而在此過程中它會對所有元素求和。如果總和超過閾值,則引發異常。
圖 3 線程本地存儲和線程異常
~~~
thread_local unsigned sum_total = 0;
void sum_until_element_with_threshold(unsigned element,
unsigned threshold, exception_ptr& pExc)
{
try{
find_if_not(begin(v), end(v), [=](const unsigned i) -> bool {
bool ret = (i!=element);
sum_total+= i;
if (sum_total>threshold)
throw runtime_error("Sum exceeded threshold.");
return ret;
});
cout << "(Thread #" << this_thread::get_id() << ") " <<
"Sum of elements until " << element << " is found: " << sum_total << endl;
} catch (...) {
pExc = current_exception();
}
}
~~~
如果發生該情況,將通過 current_exception 將異常捕獲到 exception_ptr 中。
主函數對 sum_until_element_with_threshold 觸發線程,同時使用其他參數調用該相同的函數。當兩個調用(一個在主線程中,另一個在從主線程觸發的線程中)都完成后,將對其相應的 exception_ptrs 進行分析:
~~~
const unsigned THRESHOLD = 100000;
vector<unsigned> v;
int main(int argc, char *argv[])
{
exception_ptr pExc1, pExc2;
scramble_vector(1000);
thread th(sum_until_element_with_threshold, 0, THRESHOLD, ref(pExc1));
sum_until_element_with_threshold(100, THRESHOLD, ref(pExc2));
th.join();
dealWithExceptionIfAny(pExc1);
dealWithExceptionIfAny(pExc2);
}
~~~
如果其中任何 exception_ptrs 進行了初始化(即,表明出現某些異常),將使用 rethrow_exception 再次觸發它們的異常:
~~~
void dealWithExceptionIfAny(exception_ptr pExc)
{
try
{
if (!(pExc==exception_ptr()))
rethrow_exception(pExc);
} catch (const exception& exc) {
cout << "(Main thread) Exception received from thread: " <<
exc.what() << endl;
}
}
~~~
當第二個線程中的總和超過其閾值時,我們將獲得以下執行結果:
~~~
(Thread #10164) Sum of elements until 0 is found: 94574
(Main thread) Exception received from thread: Sum exceeded threshold.
~~~
### 同步并發執行
最好能夠將所有應用程序拆分為 100% 獨立的異步任務組。但實際上這幾乎是不可能的,因為各方并發處理的數據都至少具有一定的依賴關系。本節介紹可避免發生爭用情況的新 C++11 技術。
您將了解到以下信息:
- **原子類型:** 與基元數據類型相似,但允許進行線程安全修改。
- **互斥和鎖定:** 允許我們定義線程安全臨界區的元素。
- **條件變量:** 一種在滿足某條件之前停止執行線程的方法。
### 原子類型
<atomic> 標頭引入了一系列可通過連鎖操作實現的基元類型(atomic_char、atomic_int 等等)。因此,這些類型等同于它們的不帶 atomic_ 前綴的同音詞,但不同的是這些類型的所有賦值運算符(==、++、--、+=、*= 等等)均不受爭用情況的影響。因此,在為這些數據類型賦值期間,其他線程無法在我們完成賦值操作之前中斷并更改值。
在下面的示例中,有兩個并行線程(其中一個是主線程)在相同矢量中查找不同的元素:
~~~
atomic_uint total_iterations;
vector<unsigned> v;
int main(int argc, char *argv[])
{
total_iterations = 0;
scramble_vector(1000);
thread th(find_element, 0);
find_element(100);
th.join();
cout << total_iterations << " total iterations." << endl;
}
~~~
當找到每個元素后,會顯示來自線程內部的消息,告知在矢量(或迭代)中的哪個位置找到了該元素。
~~~
void find_element(unsigned element)
{
unsigned iterations = 0;
find_if(begin(v), end(v), [=, &iterations](const unsigned i) -> bool {
++iterations;
return (i==element);
});
total_iterations+= iterations;
cout << "Thread #" << this_thread::get_id() << ": found after " <<
iterations << " iterations." << endl;
}
~~~
還有一個常用變量 total_iterations,它使用兩個線程都應用的總迭代次數進行更新。因此,total_iterations 必須為原子以防止兩個線程同時對其進行更新。在前面的示例中,即使您不需要在 find_element 中顯示部分數量的迭代,您仍然在該本地變量(而非 total_iterations)中累積迭代,以避免爭用原子變量。
您可以在隨附代碼下載的“原子”項目中找到上述示例。運行該示例,可獲得下面的結果:
~~~
Thread #8064: found after 168 iterations.
Thread #6948: found after 395 iterations.
563 total iterations.
~~~
### 互斥和鎖定
前面一節介紹了在對基元類型進行寫訪問時發生互斥的特殊情況。<mutex> 標頭定義了一系列用于定義臨界區的可鎖定類。這樣,您就可以定義互斥以在一系列函數或方法中建立臨界區,在這種情況下,一次只能有一個線程可以通過成功鎖定系列互斥來訪問此系列中的任何成員。
嘗試鎖定互斥的線程可以保持阻止狀態直到互斥可用,或直接放棄嘗試。在這兩種極端的做法之間,還可以使 timed_mutex 類保持阻止狀態一段時間,然后再放棄嘗試。允許鎖定將嘗試停止幫助防止死鎖。
鎖定的互斥必須明確解鎖后,其他線程才能對其進行鎖定。無法解鎖可能會導致不確定的應用程序行為,繼而容易出錯,這與忘記釋放動態內存相似。忘記釋放鎖定實際上更嚴重,因為它可能意味著如果其他代碼繼續等待該鎖定,那么應用程序將再也無法正常運行。幸運的是,C++11 還提供鎖定類。雖然針對互斥執行鎖定,但其析構函數確保鎖定后還會解鎖。
代碼下載的“互斥”項目中提供的以下代碼定義有關互斥 mx 的臨界區:
~~~
mutex mx;
void funcA();
void funcB();
int main()
{
thread th(funcA)
funcB();
th.join();
}
~~~
此互斥用于保證兩個函數(funcA 和 funcB)可以并行運行,而不會在臨界區中同時出現。
如果需要,函數 funcA 將等待進入臨界區。為了實現此過程,您只需要最簡單的鎖定機制,即 lock_guard:
~~~
void funcA()
{
for (int i = 0; i<3; ++i)
{
this_thread::sleep_for(chrono::seconds(1));
cout << this_thread::get_id() << ": locking with wait... "
<< endl;
lock_guard<mutex> lg(mx);
...
// Do something in the critical region.
cout << this_thread::get_id() << ": releasing lock." << endl;
}
}
~~~
這樣定義后,funcA 應訪問臨界區三次。而函數 funcB 將嘗試鎖定,但如果互斥到那時已鎖定,則 funcB 將等待幾秒,然后再次嘗試訪問臨界區。它使用的機制是 unique_lock,另外還有策略 try_to_lock_t,如**圖 4** 所示。
圖 4 鎖定與等待
~~~
void funcB()
{
int successful_attempts = 0;
for (int i = 0; i<5; ++i)
{
unique_lock<mutex> ul(mx, try_to_lock_t());
if (ul)
{
++successful_attempts;
cout << this_thread::get_id() << ": lock attempt successful." <<
endl;
...
// Do something in the critical region
cout << this_thread::get_id() << ": releasing lock." << endl;
} else {
cout << this_thread::get_id() <<
": lock attempt unsuccessful.
Hibernating..." << endl;
this_thread::sleep_for(chrono::seconds(1));
}
}
cout << this_thread::get_id() << ": " << successful_attempts
<< " successful attempts." << endl;
}
~~~
這樣定義后,funcB 將最多五次嘗試進入臨界區。**圖 5** 顯示執行的結果。五次嘗試中,funcB 只能進入臨界區兩次。
圖 5 執行示例項目互斥
~~~
funcB: lock attempt successful.
funcA: locking with wait ...
funcB: releasing lock.
funcA: lock secured ...
funcB: lock attempt unsuccessful.
Hibernating ...
funcA: releasing lock.
funcB: lock attempt successful.
funcA: locking with wait ...
funcB: releasing lock.
funcA: lock secured ...
funcB: lock attempt unsuccessful.
Hibernating ...
funcB: lock attempt unsuccessful.
Hibernating ...
funcA: releasing lock.
funcB: 2 successful attempts.
funcA: locking with wait ...
funcA: lock secured ...
funcA: releasing lock.
~~~
### 條件變量
標頭 <condition_variable> 指出了本文最后的內容,這些內容是當線程之間的協調受制于事件時所出現的各種情況的基礎。
在代碼下載的 CondVar 項目中提供的以下示例中,producer 函數推送隊列中的元素:
~~~
mutex mq;
condition_variable cv;
queue<int> q;
void producer()
{
for (int i = 0;i<3;++i)
{
...
// Produce element
cout << "Producer: element " << i << " queued." << endl;
mq.lock(); q.push(i); mq.unlock();
cv.
notify_all();
}
}
~~~
標準隊列不是線程安全的,所以您必須確保排隊時沒有其他人正在使用它(即,consumer 沒有彈出任何元素)。
consumer 函數嘗試在可用時從隊列中獲取元素,或者它只是針對條件變量等待一會兒,然后再重新嘗試;在連續兩次嘗試失敗后,consumer 結束(參見圖 6)。
圖 6 通過條件變量喚醒線程
~~~
void consumer()
{
unique_lock<mutex> l(m);
int failed_attempts = 0;
while (true)
{
mq.lock();
if (q.size())
{
int elem = q.front();
q.pop();
mq.unlock();
failed_attempts = 0;
cout << "Consumer: fetching " << elem << " from queue." << endl;
...
// Consume elem
} else {
mq.unlock();
if (++failed_attempts>1)
{
cout << "Consumer: too many failed attempts -> Exiting." << endl;
break;
} else {
cout << "Consumer: queue not ready -> going to sleep." << endl;
cv.wait_for(l, chrono::seconds(5));
}
}
}
}
~~~
每次新的元素可用時,producer 都會通過 notify_all 喚醒 consumer。這樣,producer 可以在元素準備就緒的情況下避免 consumer 在整個間隔期內都處于睡眠狀態。
圖 7顯示相關運行的結果。
圖 7 使用條件變量進行同步
~~~
Consumer: queue not ready -> going to sleep.
Producer: element 0 queued.
Consumer: fetching 0 from queue.
Consumer: queue not ready -> going to sleep.
Producer: element 1 queued.
Consumer: fetching 1 from queue.
Consumer: queue not ready -> going to sleep.
Producer: element 2 queued.
Producer: element 3 queued.
Consumer: fetching 2 from queue.
Producer: element 4 queued.
Consumer: fetching 3 from queue.
Consumer: fetching 4 from queue.
Consumer: queue not ready -> going to sleep.
Consumer: two consecutive failed attempts -> Exiting.
~~~
### 整體概覽
綜上所述,本文展示了 C++11 中所引入機制的概念性全景圖,這些機制允許在多核環境為主流的時代中并行執行任務。
異步任務允許輕型編程模型并行化執行。可以通過關聯的 future 檢索每項任務的結果。
線程可以提供比任務更多的粒度,盡管它們更大一些,并且所提供的機制可保持靜態變量的單獨副本并在線程之間傳輸異常。
在對同一數據執行并行線程時,C++11 可提供資源以避免發生爭用情況。原子類型通過受信任的方式來確保一次只有一個線程修改數據。
互斥可幫助我們定義代碼中的臨界區,即防止線程同時訪問的區域。鎖定可包裝互斥,以嘗試在前者的生命周期內解鎖后者。
最后,條件變量提高了線程同步的效率,因為某些線程可以等待其他線程通知的事件。
趕緊下載VS11體驗吧
[http://www.microsoft.com/click/services/Redirect2.ashx?CR_CC=200098144](http://www.microsoft.com/click/services/Redirect2.ashx?CR_CC=200098144)
[](http://blog.csdn.net/yincheng01/article/details/7495227)?
- 前言
- Visual Studio 11開發指南(1) Visual Studio 11簡介與新特性
- Visual Studio 11開發指南(2) Visual Studio 11放棄宏處理
- Visual Studio 11開發指南(3)Visual Studio 11開發SharePoint 2011程序
- Visual Studio 11開發指南(4)Visual Studio 11編程語言發展
- Visual Studio 11開發指南(5)Visual Studio 11 IDE增強
- Visual Studio 11開發指南(6)Visual Studio 11平臺改進
- Visual Studio 11開發指南(7)NET 4.5的改善
- Visual Studio 11開發指南(8)Visual C++ 11新特色
- Visual Studio 11開發指南(9)Visual C++ 新功能體驗
- Visual Studio 11開發指南(10)Visual C++11 IDE 新功能體驗
- Visual Studio 11開發指南(11)Visual Studio 11調試游戲
- Visual Studio 11開發指南(12)Visual Studio 11可視化多核多線程編程的行為
- Visual Studio 11開發指南(13)C++11語言新特性
- Visual Studio 11開發指南(14)C++11---C++/ CX設計
- Visual Studio 11開發指南(15)C++11單元測試
- Visual Studio 11開發指南(16)C++11更新-多線程和異步操作管理
- Visual Studio 11開發指南(17)C++11更新- Lambda表達式
- Visual Studio 11開發指南(18)C++11更新-自動矢量器使用
- Visual Studio 11開發指南(19)C++11更新-并行模式庫和代理庫
- 在 C++ 中使用 PPL 進行異步編程
- 基于VisualStudio11開發Windows8的Metro sample講解(1)MessageBox
- Visual C++ 11 中新的并發功能
- 基于Windows8與Visual Studio2012開發內核隱藏注冊表
- 基于VC++2012在Windows8上實現文件隱藏
- 實現諾基亞 lumia Windows phone 的手機通話記錄截取
- 最短代碼實現windows8下的下載器-下載安裝執行一體化
- 用Visual studio2012在Windows8上開發內核驅動監視線程創建
- 用Visual studio2012在Windows8上開發內核驅動監視進程創建
- 基于Windows8與Visual Studio2012實現殺毒通用模塊
- 用Visual studio2012在Windows8上開發內核中隱藏進程
- 用Visual studio11在Windows8上開發內核枚舉注冊表
- 用Visual studio11在Windows8上開發內核驅動隱藏注冊表
- 用Visual studio11在Windows8上開發驅動實現注冊表監控和過濾
- 用Visual studio11在Windows8上開發驅動實現內存填0殺進程
- 【CSDN2012年度博客之星】喜歡本博客的讀者,投票贈送《visual C++2010開發權威指南》電子稿--感謝支持 ~(截至到2012年12月30日)
- 今天在清華圖書館看到我的杰作,感慨萬千,而我要歸零一切 !
- use Visual studio2012 developing kernel driver monitor thread creation on Windows8
- To kernel driver monitoring process developed in Windows8 create using Visual studio2012
- Under Windows8 kernel mode development NDIS application-NDIS Filter explain
- use Visual studio2012 development kernel to hidden process on Windows8