要讓Python程序實現多進程(multiprocessing),我們先了解操作系統的相關知識。
Unix/Linux操作系統提供了一個`fork()`系統調用,它非常特殊。普通的函數調用,調用一次,返回一次,但是`fork()`調用一次,返回兩次,因為操作系統自動把當前進程(稱為父進程)復制了一份(稱為子進程),然后,分別在父進程和子進程內返回。
子進程永遠返回`0`,而父進程返回子進程的ID。這樣做的理由是,一個父進程可以fork出很多子進程,所以,父進程要記下每個子進程的ID,而子進程只需要調用`getppid()`就可以拿到父進程的ID。
Python的`os`模塊封裝了常見的系統調用,其中就包括`fork`,可以在Python程序中輕松創建子進程:
~~~
import os
print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
~~~
運行結果如下:
~~~
Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.
~~~
由于Windows沒有`fork`調用,上面的代碼在Windows上無法運行。由于Mac系統是基于BSD(Unix的一種)內核,所以,在Mac下運行是沒有問題的,推薦大家用Mac學Python!
有了`fork`調用,一個進程在接到新任務時就可以復制出一個子進程來處理新任務,常見的Apache服務器就是由父進程監聽端口,每當有新的http請求時,就fork出子進程來處理新的http請求。
### multiprocessing
如果你打算編寫多進程的服務程序,Unix/Linux無疑是正確的選擇。由于Windows沒有`fork`調用,難道在Windows上無法用Python編寫多進程的程序?
由于Python是跨平臺的,自然也應該提供一個跨平臺的多進程支持。`multiprocessing`模塊就是跨平臺版本的多進程模塊。
`multiprocessing`模塊提供了一個`Process`類來代表一個進程對象,下面的例子演示了啟動一個子進程并等待其結束:
~~~
from multiprocessing import Process
import os
# 子進程要執行的代碼
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')
~~~
執行結果如下:
~~~
Parent process 928.
Process will start.
Run child process test (929)...
Process end.
~~~
創建子進程時,只需要傳入一個執行函數和函數的參數,創建一個`Process`實例,用`start()`方法啟動,這樣創建進程比`fork()`還要簡單。
`join()`方法可以等待子進程結束后再繼續往下運行,通常用于進程間的同步。
### Pool
如果要啟動大量的子進程,可以用進程池的方式批量創建子進程:
~~~
from multiprocessing import Pool
import os, time, random
def long_time_task(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Pool(4)
for i in range(5):
p.apply_async(long_time_task, args=(i,))
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')
~~~
執行結果如下:
~~~
Parent process 669.
Waiting for all subprocesses done...
Run task 0 (671)...
Run task 1 (672)...
Run task 2 (673)...
Run task 3 (674)...
Task 2 runs 0.14 seconds.
Run task 4 (673)...
Task 1 runs 0.27 seconds.
Task 3 runs 0.86 seconds.
Task 0 runs 1.41 seconds.
Task 4 runs 1.91 seconds.
All subprocesses done.
~~~
代碼解讀:
對`Pool`對象調用`join()`方法會等待所有子進程執行完畢,調用`join()`之前必須先調用`close()`,調用`close()`之后就不能繼續添加新的`Process`了。
請注意輸出的結果,task `0`,`1`,`2`,`3`是立刻執行的,而task `4`要等待前面某個task完成后才執行,這是因為`Pool`的默認大小在我的電腦上是4,因此,最多同時執行4個進程。這是`Pool`有意設計的限制,并不是操作系統的限制。如果改成:
~~~
p = Pool(5)
~~~
就可以同時跑5個進程。
由于`Pool`的默認大小是CPU的核數,如果你不幸擁有8核CPU,你要提交至少9個子進程才能看到上面的等待效果。
### 子進程
很多時候,子進程并不是自身,而是一個外部進程。我們創建了子進程后,還需要控制子進程的輸入和輸出。
`subprocess`模塊可以讓我們非常方便地啟動一個子進程,然后控制其輸入和輸出。
下面的例子演示了如何在Python代碼中運行命令`nslookup www.python.org`,這和命令行直接運行的效果是一樣的:
~~~
import subprocess
print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)
~~~
運行結果:
~~~
$ nslookup www.python.org
Server: 192.168.19.4
Address: 192.168.19.4#53
Non-authoritative answer:
www.python.org canonical name = python.map.fastly.net.
Name: python.map.fastly.net
Address: 199.27.79.223
Exit code: 0
~~~
如果子進程還需要輸入,則可以通過`communicate()`方法輸入:
~~~
import subprocess
print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)
~~~
上面的代碼相當于在命令行執行命令`nslookup`,然后手動輸入:
~~~
set q=mx
python.org
exit
~~~
運行結果如下:
~~~
$ nslookup
Server: 192.168.19.4
Address: 192.168.19.4#53
Non-authoritative answer:
python.org mail exchanger = 50 mail.python.org.
Authoritative answers can be found from:
mail.python.org internet address = 82.94.164.166
mail.python.org has AAAA address 2001:888:2000:d::a6
Exit code: 0
~~~
### 進程間通信
`Process`之間肯定是需要通信的,操作系統提供了很多機制來實現進程間的通信。Python的`multiprocessing`模塊包裝了底層的機制,提供了`Queue`、`Pipes`等多種方式來交換數據。
我們以`Queue`為例,在父進程中創建兩個子進程,一個往`Queue`里寫數據,一個從`Queue`里讀數據:
~~~
from multiprocessing import Process, Queue
import os, time, random
# 寫數據進程執行的代碼:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
# 讀數據進程執行的代碼:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)
if __name__=='__main__':
# 父進程創建Queue,并傳給各個子進程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 啟動子進程pw,寫入:
pw.start()
# 啟動子進程pr,讀取:
pr.start()
# 等待pw結束:
pw.join()
# pr進程里是死循環,無法等待其結束,只能強行終止:
pr.terminate()
~~~
運行結果如下:
~~~
Process to write: 50563
Put A to queue...
Process to read: 50564
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
~~~
在Unix/Linux下,`multiprocessing`模塊封裝了`fork()`調用,使我們不需要關注`fork()`的細節。由于Windows沒有`fork`調用,因此,`multiprocessing`需要“模擬”出`fork`的效果,父進程所有Python對象都必須通過pickle序列化再傳到子進程去,所有,如果`multiprocessing`在Windows下調用失敗了,要先考慮是不是pickle失敗了。
### 小結
在Unix/Linux下,可以使用`fork()`調用實現多進程。
要實現跨平臺的多進程,可以使用`multiprocessing`模塊。
進程間通信是通過`Queue`、`Pipes`等實現的。
### 參考源碼
[do_folk.py](https://github.com/michaelliao/learn-python3/blob/master/samples/multitask/do_folk.py)
[multi_processing.py](https://github.com/michaelliao/learn-python3/blob/master/samples/multitask/multi_processing.py)
[pooled_processing.py](https://github.com/michaelliao/learn-python3/blob/master/samples/multitask/pooled_processing.py)
[do_subprocess.py](https://github.com/michaelliao/learn-python3/blob/master/samples/multitask/do_subprocess.py)
[do_queue.py](https://github.com/michaelliao/learn-python3/blob/master/samples/multitask/do_queue.py)
- 關于
- Python簡介
- 安裝Python
- Python解釋器
- 第一個Python程序
- 使用文本編輯器
- Python代碼運行助手
- 輸入和輸出
- Python基礎
- 數據類型和變量
- 字符串和編碼
- 使用list和tuple
- 條件判斷
- 循環
- 使用dict和set
- 函數
- 調用函數
- 定義函數
- 函數的參數
- 遞歸函數
- 高級特性
- 切片
- 迭代
- 列表生成式
- 生成器
- 迭代器
- 函數式編程
- 高階函數
- 返回函數
- 匿名函數
- 裝飾器
- 偏函數
- 模塊
- 使用模塊
- 安裝第三方模塊
- 面向對象編程
- 類和實例
- 訪問限制
- 繼承和多態
- 獲取對象信息
- 實例屬性和類屬性
- 面向對象高級編程
- 使用slots
- 使用@property
- 多重繼承
- 定制類
- 使用枚舉類
- 使用元類
- 錯誤、調試和測試
- 錯誤處理
- 調試
- 單元測試
- 文檔測試
- IO編程
- 文件讀寫
- StringIO和BytesIO
- 操作文件和目錄
- 序列化
- 進程和線程
- 多進程
- 多線程
- ThreadLocal
- 進程 vs. 線程
- 分布式進程
- 正則表達式
- 常用內建模塊
- datetime
- collections
- base64
- struct
- hashlib
- itertools
- XML
- HTMLParser
- urllib
- 常用第三方模塊
- PIL
- virtualenv
- 圖形界面
- 網絡編程
- TCP/IP簡介
- TCP編程
- UDP編程
- 電子郵件
- SMTP發送郵件
- POP3收取郵件
- 訪問數據庫
- 使用SQLite
- 使用MySQL
- 使用SQLAlchemy
- Web開發
- HTTP協議簡介
- HTML簡介
- WSGI接口
- 使用Web框架
- 使用模板
- 異步IO
- 協程
- asyncio
- aiohttp
- 實戰
- Day 1 - 搭建開發環境
- Day 2 - 編寫Web App骨架
- Day 3 - 編寫ORM
- Day 4 - 編寫Model
- Day 5 - 編寫Web框架
- Day 6 - 編寫配置文件
- Day 7 - 編寫MVC
- Day 8 - 構建前端
- Day 9 - 編寫API
- Day 10 - 用戶注冊和登錄
- Day 11 - 編寫日志創建頁
- Day 12 - 編寫日志列表頁
- Day 13 - 提升開發效率
- Day 14 - 完成Web App
- Day 15 - 部署Web App
- Day 16 - 編寫移動App
- FAQ
- 期末總結