# 并發編程
[TOC]
## 進程 vs. 線程
對于操作系統來說,一個任務就是一個進程(Process),比如打開一個瀏覽器就是啟動一個瀏覽器進程,打開一個記事本就啟動了一個記事本進程,打開兩個記事本就啟動了兩個記事本進程,打開一個Word就啟動了一個Word進程。有些進程還不止同時干一件事,比如Word,它可以同時進行打字、拼寫檢查、打印等事情。在一個進程內部,要同時干多件事,就需要同時運行多個“子任務”,我們把進程內的這些“子任務”稱為線程(Thread)。
真正的并行執行多任務只能在多核CPU上實現,但是,由于任務數量遠遠多于CPU的核心數量,所以,操作系統也會自動把很多任務輪流調度到每個核心上執行。
由于每個進程至少要干一件事,所以,一個進程至少有一個線程。當然,像Word這種復雜的進程可以有多個線程,多個線程可以同時執行,多線程的執行方式和多進程是一樣的,也是由操作系統在多個線程之間快速切換,讓每個線程都短暫地交替運行,看起來就像同時執行一樣。當然,真正地同時執行多線程也是需要多核CPU才可能實現。
進程只是用來把資源集中到一起(進程只是一個資源單位,或者說資源集合),而線程是操作系統能夠進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運作單位。一個程序的執行實例就是一個進程。
Python支持的并發分為多線程并發與多進程并發。概念上來說,多進程并發即運行多個獨立的程序,優勢在于并發處理的任務都由操作系統管理,不足之處在于程序與各進程之間的通信和數據共享不方便;多線程并發則由程序員管理并發處理的任務,這種并發方式可以方便地在線程間共享數據(前提是不能互斥)。Python對多線程和多進程的支持都比一般編程語言更高級,最小化了需要我們完成的工作。
多進程模式最大的優點就是穩定性高,因為一個子進程崩潰了,不會影響主進程和其他子進程。(當然主進程掛了所有進程就全掛了,但是Master進程只負責分配任務,掛掉的概率低)著名的Apache最早就是采用多進程模式。
多進程模式的缺點是創建進程的代價大,在Unix/Linux系統下,用fork調用還行,在Windows下創建進程開銷巨大。另外,操作系統能同時運行的進程數也是有限的,在內存和CPU的限制下,如果有幾千個進程同時運行,操作系統連調度都會成問題。
多線程模式致命的缺點就是任何一個線程掛掉都可能直接造成整個進程崩潰,因為所有線程共享進程的內存。在Windows上,如果一個線程執行的代碼出了問題,你經常可以看到這樣的提示:“該程序執行了非法操作,即將關閉”,其實往往是某個線程出了問題,但是操作系統會強制結束整個進程。
### 如何選擇
1. 在CPU密集型任務下,多進程更快,或者說效果更好;
2. 在IO密集型下,多線程能有效提高效率。
### 計算密集型 vs. IO密集型
計算密集型任務的特點是要進行大量的計算,消耗CPU資源,比如各種循環處理,計算圓周率、對視頻進行高清解碼等等,全靠CPU的運算能力。這種計算密集型任務雖然也可以用多任務完成,但是任務越多,花在任務切換的時間就越多,CPU執行任務的效率就越低,所以,要最高效地利用CPU,計算密集型任務同時進行的數量應當等于CPU的核心數。
涉及到網絡、磁盤IO的任務都是IO密集型任務,如文件處理、網絡爬蟲等,這類任務的特點是CPU消耗很少,任務的大部分時間都在等待IO操作完成(因為IO的速度遠遠低于CPU和內存的速度)。對于IO密集型任務,任務越多,CPU效率越高,但也有一個限度。常見的大部分任務都是IO密集型任務,比如Web應用。IO密集型任務執行期間,99%的時間都花在IO上,花在CPU上的時間很少,因此,用運行速度極快的C語言替換用Python這樣運行速度極低的腳本語言,完全無法提升運行效率。對于IO密集型任務,最合適的語言就是開發效率最高(代碼量最少)的語言,腳本語言是首選,C語言最差。
## python的GUI?
## 線程池/進程池
標準庫為我們提供了concurrent.futures模塊,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,實現了對threading和multiprocessing的進一步抽象,對編寫線程池/進程池提供了直接的支持。
### 使用submit來操作線程池/進程池
#### 線程池
```python
from concurrent.futures import ThreadPoolExecutor
import time
def return_future_result(message):
time.sleep(2)
return message
# 創建一個最大可容納2個task的線程池
pool = ThreadPoolExecutor(max_workers=2)
future1 = pool.submit(return_future_result, ("hello")) # 往線程池里面加入一個task,返回future對象,對于Future對象可以簡單地理解為一個在未來完成的操作。
future2 = pool.submit(return_future_result, ("world")) # 往線程池里面加入一個task
print(future1.done()) # 判斷task1是否結束,此時因為time.sleep(2)的原因還沒結束;
time.sleep(3)
print(future2.done()) # 判斷task2是否結束,此時過了三秒,task1和task2都已結束
print(future1.result()) # 查看task1返回的結果,此時是直接返回結果hello,不用再等待2秒
print(future2.result()) # 查看task2返回的結果,此時是直接返回結果world,不用再等待2秒
```
```
[root@izbp11qoru4kuokur8uyr9z ~]# ps -eLf | grep python
root 10849 10736 10849 2 3 14:53 pts/0 00:00:00 python _code2.py
root 10849 10736 10850 0 3 14:53 pts/0 00:00:00 python _code2.py
root 10849 10736 10851 0 3 14:53 pts/0 00:00:00 python _code2.py
```
#### 進程池
```python
# 雖然內部千差萬別,但外部的api接口都一樣
from concurrent.futures import ProcessPoolExecutor
import time
def return_future_result(message):
time.sleep(10)
return message
pool = ProcessPoolExecutor(max_workers=2)
future1 = pool.submit(return_future_result, ("hello"))
future2 = pool.submit(return_future_result, ("world"))
print(future1.done())
time.sleep(11)
print(future2.done())
print(future1.result())
print(future2.result())
```
```
[root@izbp11qoru4kuokur8uyr9z ~]# ps -eLf | grep python
root 10989 10736 10989 1 3 15:16 pts/0 00:00:00 python _code2.py
root 10989 10736 10992 0 3 15:16 pts/0 00:00:00 python _code2.py
root 10989 10736 10993 0 3 15:16 pts/0 00:00:00 python _code2.py
root 10990 10989 10990 0 1 15:16 pts/0 00:00:00 python _code2.py
root 10991 10989 10991 0 1 15:16 pts/0 00:00:00 python _code2.py
```
### 使用map/wait來操作線程池/進程池
#### map
```python
import concurrent.futures
import urllib.request
URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
### submit版本
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
# future_to_url : {<Future at 0x229c7018c88 state=running>: 'http://httpbin.org', <Future at 0x229c719ff98 state=running>: 'http://example.com/', <Future at 0x229c71bfb70 state=running>: 'https://api.github.com/'}
# as_completed:扔進一組future實例的迭代,誰先完成返回誰
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result() # b'<!doctype html>\n<html>\n<head>\n ```` '
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
'''
# 不按順序返回,誰先結束返回誰
'http://example.com/' page is 1270 bytes
'http://httpbin.org' page is 8344 bytes
'https://api.github.com/' page is 2039 bytes
'''
# map版本,代碼更簡潔
# 使用with語句會調用executor.__exit__方法,__exit__方法接著會調用executor.shutdown(wait=True)方法,
# 然后就會在所有線程都執行完畢前阻塞線程
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# zip函數把兩個可迭代的對象實例打包成一一對應的元祖
# map 與內置map方法類似,不過load_url函數會在多個線程中并發調用;
for url, data in zip(URLS, executor.map(load_url, URLS)):
print('%r page is %d bytes' % (url, len(data)))
'''
map是按照URLS列表元素的順序返回的:
'http://httpbin.org' page is 8344 bytes
'http://example.com/' page is 1270 bytes
'https://api.github.com/' page is 2039 bytes
'''
```
#### wait
wait方法接會返回一個tuple(元組),tuple中包含兩個set(集合),一個是completed(已完成的)另外一個是uncompleted(未完成的)。使用wait方法的一個優勢就是獲得更大的自由度,它接收三個參數FIRST_COMPLETED, FIRST_EXCEPTION 和ALL_COMPLETE,默認設置為ALL_COMPLETED。
```python
from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from time import sleep
from random import randint
def return_after_random_secs(num):
sleep(randint(1, 5))
return "Return of {}".format(num)
pool = ThreadPoolExecutor(5)
futures = []
for x in range(5):
futures.append(pool.submit(return_after_random_secs, x))
# 如果采用默認的ALL_COMPLETED,程序會阻塞直到線程池里面的所有任務都完成。
# print(wait(futures))
# 如果采用FIRST_COMPLETED參數,程序并不會等到線程池里面所有的任務都完成。
print(wait(futures, timeout=None, return_when='FIRST_COMPLETED'))
```
### futures的其他方法
`Future.cancel()` : 可以終止某個線程和進程的任務
`Future.cancelled()` : 判斷是否真的結束了任務
`Future.running()` : 判斷是否還在運行
`Future.done()` : 判斷是正常執行完畢的。
`Future.result(timeout=None)` : 查看線程或進程結果,其中timeout針對result結果做超時的控制。
`Future.add_done_callback(fn)` : 線程取消或者結束時會回調,fn有個參數代表當前的future對象
submit函數返回future對象,future提供了跟蹤任務執行狀態的方法。比如判斷任務是否執行中future.running(),判斷任務是否執行完成future.done()等等。
as_completed方法傳入futures迭代器和timeout兩個參數
默認timeout=None,阻塞等待任務執行完成,并返回執行完成的future對象迭代器,迭代器是通過yield實現的。
timeout>0,等待timeout時間,如果timeout時間到仍有任務未能完成,不再執行并拋出異常TimeoutError
其他請參考:
https://docs.python.org/3/library/concurrent.futures.html#future-objects
### 實例分析進程池與線程池之間的差異
```python
# 計算網頁大小,進程池方式
import requests
from concurrent.futures import ProcessPoolExecutor
import time,os
def get(url):
print('%s GET %s' %(os.getpid(),url))
response=requests.get(url)
time.sleep(3)
if response.status_code == 200:
return {'url':url,'text':response.text}
def parse(obj):
res=obj.result()
print('[%s] <%s> (%s)' % (os.getpid(), res['url'], len(res['text'])))
if __name__ == '__main__':
urls = [
'https://www.python.org',
'https://www.baidu.com',
'https://www.jd.com',
'https://www.tmall.com',
]
# 不用with的寫法
# t = ProcessPoolExecutor(2)
# for url in urls:
# t.submit(get,url).add_done_callback(parse)
# t.shutdown(wait=True)
# t.__exit__方法會調用t.shutdown(wait=True)方法,
with ProcessPoolExecutor(max_workers=4) as t:
# 開一個進程池t,然后去并發下載網絡數據,
# 下載完畢后,由于主進程、子進程不是同一個進程空間,所以在解析數據時候,在主進程中add_done_callback去解析
[t.submit(get,url).add_done_callback(parse) for url in urls]
print('主',os.getpid())
'''
output:
25896:是當前進程 29384,28224,22544,2404是開的進程池
29384 GET https://www.python.org
28224 GET https://www.baidu.com
22544 GET https://www.jd.com
2404 GET https://www.tmall.com
[25896] <https://www.jd.com> (122591)
[25896] <https://www.tmall.com> (230427)
[25896] <https://www.baidu.com> (2443)
[25896] <https://www.python.org> (48837)
主 25896
'''
# 計算網頁大小,線程池方式
import requests
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
import time,os
def get(url):
print('pid:%s %s GET %s' % (os.getpid(),current_thread().getName(),url))
response=requests.get(url)
time.sleep(3)
if response.status_code == 200:
return {'url':url,'text':response.text}
def parse(obj):
res=obj.result()
print('pid:%s [%s] <%s> (%s)' % (os.getpid(), current_thread().getName(), res['url'], len(res['text'])))
if __name__ == '__main__':
urls = [
'https://www.python.org',
'https://www.baidu.com',
'https://www.jd.com',
'https://www.tmall.com',
]
with ThreadPoolExecutor(max_workers=4) as t:
[t.submit(get,url).add_done_callback(parse) for url in urls]
print('主',current_thread().getName(),os.getpid())
'''
# 主線程、子線程是同一個進程空間,所以在解析數據時候,可能主線程、子線程都會解析
pid:26788 ThreadPoolExecutor-0_0 GET https://www.python.org
pid:26788 ThreadPoolExecutor-0_1 GET https://www.baidu.com
pid:26788 ThreadPoolExecutor-0_2 GET https://www.jd.com
pid:26788 ThreadPoolExecutor-0_3 GET https://www.tmall.com
pid:26788 [ThreadPoolExecutor-0_2] <https://www.jd.com> (122591)
pid:26788 [ThreadPoolExecutor-0_3] <https://www.tmall.com> (230425)
pid:26788 [ThreadPoolExecutor-0_1] <https://www.baidu.com> (2443)
pid:26788 [ThreadPoolExecutor-0_0] <https://www.python.org> (48837)
主 MainThread 26788
'''
```
### 注意事項:
1. 線程池或者進程池不能嵌套
```python
def wait_on_future():
# 會造成死鎖的狀態
f = executor.submit(pow, 5, 2)
print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
```
## 異步IO
考慮到CPU和IO之間巨大的速度差異,一個任務在執行的過程中大部分時間都在等待IO操作,單進程單線程模型會導致別的任務無法并行執行,因此,我們才需要多進程模型或者多線程模型來支持多任務并發執行。
現代操作系統對IO操作已經做了巨大的改進,最大的特點就是支持異步IO。如果充分利用操作系統提供的異步IO支持,就可以用單進程單線程模型來執行多任務,這種全新的模型稱為事件驅動模型,Nginx就是支持異步IO的Web服務器,它在單核CPU上采用單進程模型就可以高效地支持多任務。在多核CPU上,可以運行多個進程(數量與CPU核心數相同),充分利用多核CPU。由于系統總的進程數量十分有限,因此操作系統調度非常高效。用異步IO編程模型來實現多任務是一個主要的趨勢。
對應到Python語言,單線程的異步編程模型稱為協程,有了協程的支持,就可以基于事件驅動編寫高效的多任務程序。