<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                # IO IO指的就是輸入輸出。一般涉及到數據交換的地方都需要IO接口,比如說磁盤或者網絡。 IO 編程中,Stream(流)是一個很重要的概念,可以把流想象成一個水管,數據就是水管里的水,但是只能單向流動。 Input Stream 就是數據從外面(磁盤、網絡)流進內存,Output Stream 就是數據從內存流到外面去。 我們知道CPU和內存的速度遠遠高于外設的速度,也就是說在IO編程里面,存在速度嚴重不匹配的情況: 怎么辦呢? - 讓CPU等著,也就是程序暫停執行等待,這種模式稱為同步IO - 另一種方法就是CPU不等待,然后去干別的去了,于是后續代碼可以立刻執行,這種模式稱為異步IO 使用異步 IO 來編寫程序性能會遠遠高于同步 IO,但是異步 IO 的缺點是編程模型復雜。 比如你得知道什么時候通知你 “漢堡做好了”,而通知你的方法也各不相同。 如果是服務員跑過來找到你,這是回調模式 如果服務員發短信通知你,你就得不停地檢查手機,這是輪詢模式。 總之,異步 IO 的復雜度遠遠高于同步 IO。 ## 文件讀寫 ### 文件讀 現代操作系統其實不允許普通的程序直接操作磁盤,實際上磁盤上的讀寫文件的功能都是由操作系統提供,而讀寫文件就是請求操作系統打開一個文件對象(也就是文件描述符),然后通過這個接口進行文件的讀寫 ```python try: f = open('/path/to/file', 'r') print(f.read()) finally: if f: f.close() ``` 文件讀寫可能產生IOError,不管是否出錯,都會執行 `f.close()` 不過這么寫還是比較繁瑣,可以處用 `with` 語句 ```python with open('path/to/file', 'r') as f: print (f.read()) ``` 不過調用 `f.read()`有個巨大的缺點,它需要一次性將文件全部讀到內存,如果文件比較大的話,內存就爆了。 可以使用 `read(size)` 來限定每次讀取size個字節的內容 也可以使用 `readline()` 來讀取一行內容,并且按行返回 list 總結一下,如果文件很小,read() 一次性讀取最方便;如果不能確定文件大小,反復調用 read(size) 比較保險;如果是配置文件,調用 readlines() 最方便: ```python for line in f.readlines(): print(line.strip()) # 把末尾的'\n'刪掉 ``` 如果要讀取非UTF-8編碼的文本文件,可以將 `encoding` 參數傳入進去。 ```python >>> f = open('/Users/michael/gbk.txt', 'r', encoding='gbk') >>> f.read() '測試' ``` 如果有編碼不規范的,可以會遇到 `UnicodeDecodeError` 可以直接忽略錯誤 ```python >>> f = open('/Users/michael/gbk.txt', 'r', encoding='gbk', errors='ignore') ``` ### 文件寫 如果要寫文件 ```python with open('/Users/michael/test.txt', 'w') as f: f.write('Hello, world!') ``` 如果我們希望追加到文件末尾怎么辦?可以傳入'a' 以追加(append)模式寫入。 要寫入特定編碼的文本文件,請給 open() 函數傳入 encoding 參數,將字符串自動轉換成指定編碼。 ## 文件和目錄 如果要使用Python對文件進行刪除、新建等,可以使用Python內置的 `os`模塊 ```python import os # 詳細系統信息 os.uname() # 操作系統類型 os.name # 環境變量 os.environ # 獲取環境變量的值 os.environ.get('key') ``` ### 目錄操作 > 操作文件和目錄的函數一部分放在 os 模塊中,一部分放在 os.path 模塊中 查看路徑: ```python >>> os.path.abspath('.') ``` 新建一個目錄和刪除一個目錄 ```python >>> os.path.join('/Users/michael', 'testdir') '/Users/michael/testdir' # 然后創建一個目錄: >>> os.mkdir('/Users/michael/testdir') # 刪掉一個目錄: >>> os.rmdir('/Users/michael/testdir') ``` 拆分路徑 ```python >>> os.path.split('/Users/michael/testdir/file.txt') ``` os.path.splitext() 可以直接讓你得到文件擴展名 ```python >>> os.path.splitext('/path/to/file.txt') ('/path/to/file', '.txt') ``` 注意這些合并、拆分路徑的函數并**不**要求目錄和文件要真實存在,它們只對字符串進行操作 如果要列出當前目錄下的所有目錄 ```python >>> [x for x in os.listdir('.') if os.path.isdir(x)] ``` 要列出所有的.py 文件,也只需一行代碼: ```python >>> [x for x in os.listdir('.') if os.path.isfile(x) and os.path.splitext(x)[1]=='.py' ``` ### 文件操作 重命名 ```python >>> os.rename('test.txt', 'test.py') ``` 刪掉文件 ```python >>> os.remove('test.py') ``` 復制文件需要使用 `shutil` 模塊 ## StringIO 和 Bytes IO ### StringIO 除了file之外,內存的字節流、網絡流等都可以使用open()返回一個有read()方法的對象。 在Python中統程為 file-like Object StringIO 就是在內存中創建的 file-like Object,常用作臨時緩沖。 也就是在內存中讀寫 str。 要把 str 寫入 StringIO,我們需要先創建一個 StringIO,然后,像文件一樣寫入即可 ```python >>> from io import StringIO >>> f = StringIO() >>> f.write('hello') 5 >>> print(f.getvalue()) hello ``` getvalue() 方法用于獲得寫入后的 str 要讀取 StringIO ,可以用一個 str初始化 StringIO,然后像讀文件一樣讀 ```python >>> from io import StringIO >>> f = StringIO('Hello!\nHi!\nGoodbye!') >>> while True: ... s = f.readline() ... if s == '': ... break ... print(s.strip()) ... Hello! Hi! Goodbye! ``` ### BytesIO 如果要操作二進制數據,就需要使用 BytesIO。 BytesIO 實現了在內存中讀寫 bytes,我們創建一個 BytesIO,然后寫入一些 bytes: ```python >>> from io import BytesIO >>> f = BytesIO() >>> f.write('中文'.encode('utf-8')) 6 >>> print(f.getvalue()) b'\xe4\xb8\xad\xe6\x96\x87' ``` 請注意,寫入的不是 str,而是經過 UTF-8 編碼的 bytes。 ```python >>> from io import BytesIO >>> f = BytesIO(b'\xe4\xb8\xad\xe6\x96\x87') >>> f.read() b'\xe4\xb8\xad\xe6\x96\x87' ``` 和 StringIO 類似,可以用一個 bytes 初始化 BytesIO,然后,像讀文件一樣讀取: ## 序列化 在程序運行過程中,所有的變量都在內存中,一旦掉點,就內存就會被回收。 比如說定義 `d = dict(name='Bob', age=20, score=88)`,如果把name 修改為了 'Bill',但是沒有保存到磁盤上,下次重新運行,則變量又被初始化為 'Bob' 我們把變量從內存中變成可存儲或者傳輸的過程稱為序列化,Python中稱為 `pickling` 其他語言中也稱之為 serialization,marshalling,flattening 等等, 反過來,把變量內容從序列化的對象中重新讀到內存稱為反序列化。 ### pickle 如何把對象序列化并寫入文件呢? ```python >>> import pickle >>> d = dict(name='Bob', age=20, score=88) >>> pickle.dumps(d) ``` 這樣把任意對象序列化成一個 bytes,然后,就可以把這個 bytes 寫入文件 或者可以直接寫入一個 file-like Object中 ```python >>> f = open('dump.txt', 'wb') >>> pickle.dump(d, f) >>> f.close() ``` 如何反序列化呢? ```python >>> f = open('dump.txt', 'rb') >>> d = pickle.load(f) >>> f.close() >>> d {'age': 20, 'score': 88, 'name': 'Bob'} ``` 但是這樣以二進制的方法進行存儲,與其他的語言甚至其他版本的Python都不兼容。 所以我們可以考慮使用JSON這樣比較標準的格式。 ### JSON JSON與Python內置的數據類型對應如下: | JSON 類型 | Python 類型 | | :--------- | :----------- | | {} | dict | | [] | list | | "string" | str | | 1234.56 | int 或 float | | true/false | True/False | | null | None | 如何把一個Python對象序列化為JSON ```python >>> import json >>> d = dict(name='Bob', age=20, score=88) >>> json.dumps(d) ``` 如果要把JSON反序列化為Python對象 可以使用 `loads()` ```python >>> json_str = '{"age": 20, "score": 88, "name": "Bob"}' >>> json.loads(json_str) ``` 剛剛我們只是試驗了將dict對象直接序列化為JSON的 {} 對于普通的類,我們直接使用 `json.dumps(s)` 會得到一個 `TypeError` 因為 Student不是一個可以序列化的json對象 我們還需要寫一個轉換函數再把函數傳入到 `dumps()`里面。 ```python def student2dict(std): return { 'name': std.name, 'age': std.age, 'score': std.score } ``` 也就是說,實例首先把轉換成 `dict` ,然后序列化為JSON ```python >>> print(json.dumps(s, default=student2dict)) ``` 其實我們可以偷一個懶,把任意的class實例都變為 dict 因為通常 class 的實例都有一個__dict__屬性,它就是一個 dict,用來存儲實例變量 ```python print (json.dumps(s, default=lambda obj: obj.__dict__)) ``` 如果我們要把 JSON 反序列化為一個 Student 對象實例,loads() 方法首先轉換出一個 dict 對象,然后,我們傳入的 object_hook 函數負責把 dict 轉換為 Student 實例: ```python def dict2student(d): return Student(d['name'], d['age'], d['score']) ``` 所以重載代碼為: ```python >>> json_str = '{"age": 20, "score": 88, "name": "Bob"}' >>> print(json.loads(json_str, object_hook=dict2student)) <__main__.Student object at 0x10cd3c190> ``` # 異步IO 在一個線程中,CPU執行速度非常快,但是IO操作又特別的慢,如果遇到了IO操作,CPU就必須停下來等待。 也就是說在 IO 操作的過程中,當前線程被掛起,而其他需要 CPU 執行的代碼就無法被當前線程執行了。 這樣我們就必須多開幾個線程或者進程來解決這個問題了,每個用戶都會分配到一個線程,如果遇到IO導致線程被掛起,其他用戶的線程不受影響。 這樣雖然解決了并發問題,但是首先系統不能無上限的增加線程,而且系統切換線程的開銷也非常大,所以一旦線程數量過多,CPU的時間就花在了線程切換上了, 另外一種解決思路是異步IO。 當代碼需要執行一個耗時的 IO 操作時,它只發出 IO 指令,并不等待 IO 結果,然后就去執行其他代碼了。一段時間后,當 IO 返回結果時,再通知 CPU 進行處理。 異步 IO 模型需要一個消息循環,在消息循環中,主線程不斷地重復 “讀取消息 - 處理消息” 這一過程: ```python loop = get_event_loop() while True: event = loop.get_event() process_event(event) ``` 消息模型其實早在應用在桌面應用程序中了。一個 GUI 程序的主線程就負責不停地讀取消息并處理消息。所有的鍵盤、鼠標等消息都被發送到 GUI 程序的消息隊列中,然后由 GUI 程序的主線程處理。由于 GUI 線程處理鍵盤、鼠標等消息的速度非常快,所以用戶感覺不到延遲。 但是在某些時候,GUI 線程在一個消息處理的過程中遇到了問題,導致一次消息處理時間過長,此時,用戶會感覺到整個 GUI 程序停止響應了,敲鍵盤、點鼠標都沒有反應。所以在消息模型中,處理一個消息必須非常迅速,否則,主線程將無法及時處理消息隊列中的其他消息,導致程序看上去停止響應。 那么消息模型是如何解決同步 IO 必須等待 IO 操作這一問題的呢? 當遇到 IO 操作時,代碼只負責發出 IO 請求,不等待 IO 結果,然后直接結束本輪消息處理,進入下一輪消息處理過程。 當 IO 操作完成后,將收到一條 “IO 完成” 的消息,處理該消息時就可以直接獲取 IO 操作結果。 在 “發出 IO 請求” 到收到 “IO 完成” 的這段時間里,同步 IO 模型下,主線程只能掛起,但異步 IO 模型下,主線程并沒有休息,而是在消息循環中繼續處理其他消息。 這樣,在異步 IO 模型下,一個線程就可以同時處理多個 IO 請求,并且沒有切換線程的操作。對于大多數 IO 密集型的應用程序,使用異步 IO 將大大提升系統的多任務處理能力。 ## 協程 在學習異步 IO 模型前,我們先來了解協程。 協程,又稱微線程,纖程。英文名 Coroutine 我們知道函數(也叫子程序)是通過棧來實現的,一個線程就執行一個子程序,所有語言都是層級調用的。比如 A 調用 B,B 在執行過程中又調用了 C,C 執行完畢返回,B 執行完畢返回,最后是 A 執行完畢 子程序調用總是一個入口,一次返回,調用順序是明確的 而協程在執行過程中,可中斷,然后執行別的子程序,適當的時候再返回 注意,在一個子程序中中斷,去執行其他子程序,不是函數調用,有點類似 CPU 的中斷 ```python def A(): print('1') print('2') print('3') def B(): print('x') print('y') print('z') ``` 假設由協程執行,在執行 A 的過程中,可以隨時中斷,去執行 B,B 也可能在執行過程中中斷再去執行 A 看起來 A、B 的執行有點像多線程,但協程的特點在于是一個線程執行 與多線程相比,子程序切換不是線程切換,而是由程序自身控制,所以沒有線程切換的開銷,和多線程相比,線程數量越多,協程性能越高。 而且協程還不需要多線程的 **鎖機制**。因為只有一個線程,不存在同時寫沖突。在協程中控制共享資源不加鎖,只需要判斷狀態就好了,所以執行效率比多線程高很多。 因為協程是一個線程執行,那怎么利用多核 CPU 呢? 最簡單的方法是多進程 + 協程,既充分利用多核,又充分發揮協程的高效率,可獲得極高的性能 ## 使用 yield實現協程 Python 對協程的支持是通過 generator 實現的。 在 generator 中,我們不但可以通過 for 循環來迭代,還可以不斷調用 next() 函數獲取由 yield 語句返回的下一個值。 但是 Python 的 yield 不但可以返回一個值,它還可以接收調用者發出的參數 比如說: 傳統的生產者 - 消費者模型是一個線程寫消息,一個線程取消息,通過鎖機制控制隊列和等待,但一不小心就可能死鎖。 如果改用協程,生產者生產消息后,直接通過 yield 跳轉到消費者開始執行,待消費者執行完畢后,切換回生產者繼續生產 ```python def produce(c): # 啟動生成器 c.send(None) n = 0 while n < 5: # 生產的東西 n = n + 1 print('[PRODUCER] Producing %s...' % n) # 切換到consumer執行 r = c.send(n) print('[PRODUCER] Consumer return: %s' % r) # 關閉consumer c.close() def consumer(): r = '' while True: # 通過 yield 拿到消息,處理完畢又返回結果。 n = yield r if not n: return print('[CONSUMER] Consuming %s...' % n) # 返回結果 r = '200 OK' c = consumer() produce(c) ``` 執行結果: ```python [PRODUCER] Producing 1... [CONSUMER] Consuming 1... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 2... [CONSUMER] Consuming 2... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 3... [CONSUMER] Consuming 3... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 4... [CONSUMER] Consuming 4... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 5... [CONSUMER] Consuming 5... [PRODUCER] Consumer return: 200 OK ``` 需要先理解如下幾點: - 例子中的 c.send(None),其功能類似于 next(c) - - n = yield r,這里是一條語句,賦值語句先計算 = 右邊,由于右邊是 yield 語句,所以 yield 語句執行完以后,進入暫停,而賦值語句在下一次啟動生成器的時候首先被執行 - send 在接受 None 參數的情況下,等同于 next(generator) 的功能,但 send 同時也可接收其他參數,比如例子中的 c.send(n),要理解這種用法 比如說定義 ```python >>> def num(): a = yield 1 while True: a = yield a >>> c = num() >>> c.send(None) 1 >>> c.send(5) 5 >>> c.send(100) 100 ``` 首先使用 c.send(None),返回生成器的第一個值,a = yield 1 ,也就是 1(但此時,并未執行賦值語句) 接著我們使用了 c.send(5),再次啟動生成器,并同時傳入了一個參數 5,再次啟動生成的時候,從上次 yield 語句斷掉的地方開始執行,即 a 的賦值語句,由于我們傳入了一個參數 5,所以 a 被賦值為 5 接著程序進入 whlie 循環,當程序執行到 a = yield a,同理,先返回生成器的值 5,下次啟動生成器的時候,再執行賦值語句,以此類推... > 但注意,在一個生成器函數未啟動之前,是不能傳遞值進去。也就是說在使用 c.send(n) 之前,必須先使用 c.send(None) 或者 next(c) 來返回生成器的第一個值。 下面我們來看之前的例子 consumer是一個 generator , 把一個 consumer 傳入 producer 里面之后, - 首先調用 `c.send(None)` 啟動生成器,n = yield r,此時 r 為空,n 還未賦值,然后生成器暫停,等待下一次啟動 - 生成器返回空值后進入暫停,produce(c) 接著往下運行,進入 While 循環,此時 n 為 1,所以打印:[PRODUCER] Producing 1... - 運行到 r = c.send(1),再次啟動生成器,并傳入了參數 1,而生成器從上次 n 的賦值語句開始執行,n 被賦值為 1,n 存在,if 語句不執行,然后打印: ```python [CONSUMER] Consuming 1... ``` 接著 r 被賦值為'200 OK',然后又進入循環,執行 n = yield r,返回生成器的第二個值,'200 OK',然后生成器進入暫停,等待下一次啟動。 也就是一旦生產了東西,通過 `c.send(n)`切換到 consumer執行 - consumer通過 yield 拿到消息,處理之后,又通過 yield 把結果傳回 - producer 拿到 consumer 處理的結果,繼續生產下一條消息,生成器返回'200 OK' 進入暫停后,produce(c) 往下運行,進入 r 的賦值語句,r 被賦值為'200 OK',接著往下運行,打印: ```python [PRODUCER] Consumer return: 200 OK ``` - producer決定不生產了,通過 `c.close()` 關閉 consumer 整個過程無鎖,由一個線程執行,produce 和 consumer 協作完成任務,所以稱為 “協程”,而非線程的搶占式多任務。 ## asyncio 在 Python3.4 中,協程都是通過使用 yield from 和 asyncio模塊中的 @asyncio.coroutine 來實現的。asyncio 專門被用來實現異步 IO 操作 ### yield from yield 在生成器中有中斷的功能,可以傳出值,也可以從函數外部接收值,而 yield from 的實現就是簡化了 yield 操作。 ```python def generator_1(titles): yield titles def generator_2(titles): yield from titles titles = ['Python','Java','C++'] for title in generator_1(titles): print('生成器1:',title) for title in generator_2(titles): print('生成器2:',title) ``` 執行結果: ```python 生成器1: ['Python', 'Java', 'C++'] 生成器2: Python 生成器2: Java 生成器2: C++ ``` yield titles 返回了 titles 完整列表,而 yield from titles 實際等價于: ```python for title in titles: # 等價于yield from titles yield title  ``` 而且yield from 還省去了很多異常處理。 ```python def generator_1(): total = 0 while True: x = yield print('加',x) if not x: break total += x return total def generator_2(): # 委托生成器 while True: total = yield from generator_1() # 子生成器 print('加和總數是:',total) def main(): # 調用方 g1 = generator_1() g1.send(None) g1.send(2) g1.send(3) g1.send(None) # g2 = generator_2() # g2.send(None) # g2.send(2) # g2.send(3) # g2.send(None) main() ``` - 【子生成器】:yield from 后的 generator_1 () 生成器函數是子生成 器 - 【委托生成器】:generator_2 () 是程序中的委托生成器,它負責委托子 生成器完成具體任務。 - 【調用方】:main () 是程序中的調用方,負責調用委托生成器。 **yield from 在其中還有一個關鍵的作用是:建立調用方和子生成器的通道,** - 在上述代碼中 main() 每一次在調用 send(value) 時,value 不是傳遞給了委托生成器 generator_2 (),而是借助 yield from 傳遞給了子生成器 generator_1 () 中的 yield - 同理,子生成器中的數據也是通過 yield 直接發送到調用方 main () 中。 - 之后我們的代碼都依據調用方-子生成器-委托生成器的規范形式書寫。 ## 結合 @asyncio.coroutine 實現協程 在協程中,只要是和 IO 任務類似的、耗費時間的任務都需要使用 yield from 來進行中斷,達到異步功能! ```python # 使用同步方式編寫異步功能 import time import asyncio @asyncio.coroutine # 標志協程的裝飾器 def taskIO_1(): print('開始運行IO任務1...') yield from asyncio.sleep(2) # 假設該任務耗時2s print('IO任務1已完成,耗時2s') return taskIO_1.__name__ @asyncio.coroutine # 標志協程的裝飾器 def taskIO_2(): print('開始運行IO任務2...') yield from asyncio.sleep(3) # 假設該任務耗時3s print('IO任務2已完成,耗時3s') return taskIO_2.__name__ @asyncio.coroutine # 標志協程的裝飾器 def main(): # 調用方 tasks = [taskIO_1(), taskIO_2()] # 把所有任務添加到task中 done,pending = yield from asyncio.wait(tasks) # 子生成器 for r in done: # done和pending都是一個任務,所以返回結果需要逐個調用result() print('協程無序返回值:'+r.result()) if __name__ == '__main__': start = time.time() loop = asyncio.get_event_loop() # 創建一個事件循環對象loop try: loop.run_until_complete(main()) # 完成事件循環,直到最后一個任務結束 finally: loop.close() # 結束事件循環 print('所有IO任務總耗時%.5f秒' % float(time.time()-start)) ``` 執行結果如下 ```python 開始運行IO任務1... 開始運行IO任務2... IO任務1已完成,耗時2s IO任務2已完成,耗時3s 協程無序返回值:taskIO_2 協程無序返回值:taskIO_1 所有IO任務總耗時3.00209秒 ``` asyncio 的編程模型就是一個消息循環。我們從 asyncio 模塊中直接獲取一個 EventLoop 的引用,然后把需要執行的協程扔到 EventLoop 中執行,就實現了異步 IO。 【使用方法】: @asyncio.coroutine裝飾器是協程函數的標志,我們需要在每一個任務函數前加這個裝飾器,并在函數中使用 yield from。 在同步 IO 任務的代碼中使用的 time.sleep(2) 來假設任務執行了 2 秒。但在協程中 yield from 后面必須是子生成器函數,而 time.sleep() 并不是生成器,所以這里需要使用內置模塊提供的生成器函數 asyncio.sleep()。 【功能】:通過使用協程,極大增加了多任務執行效率,最后消耗的時間是任務隊列中耗時最多的時間。上述例子中的總耗時 3 秒就是 taskIO_2() 的耗時時間。 【執行過程】: 上面代碼先通過 get_event_loop()獲取了一個標準事件循環 loop (因為是一個,所以協程是單線程) 然后,我們通過 run_until_complete(main()) 來運行協程 此處把調用方協程 main () 作為參數,調用方負責調用其他委托生成器,run_until_complete 的特點就像該函數的名字,直到循環事件的所有事件都處理完才能完整結束。 進入調用方協程,我們把多個任務 [taskIO_1() 和 taskIO_2()] 放到一個 task 列表中,可理解為打包任務。 現在,我們使用 asyncio.wait(tasks) 來獲取一個 awaitable objects 即可等待對象的集合 (此處的 aws 是協程的列表),并發運行傳入的 aws,同時通過 yield from 返回一個包含 (done, pending) 的元組,done 表示已完成的任務列表,pending 表示未完成的任務列表; 如果使用 asyncio.as_completed(tasks) 則會按完成順序生成協程的迭代器 (常用于 for 循環中),因此當你用它迭代時,會盡快得到每個可用的結果。【此外,當輪詢到某個事件時 (如 taskIO_1 ()),直到遇到該任務中的 yield from 中斷,開始處理下一個事件 (如 taskIO_2 ())),當 yield from 后面的子生成器完成任務時,該事件才再次被喚醒】 因為 done 里面有我們需要的返回結果,但它目前還是個任務列表,所以要取出返回的結果值,我們遍歷它并逐個調用 result() 取出結果即可。 > 注:對于 asyncio.wait() 和 asyncio.as_completed() 返回的結果均是先完成的任務結果排在前面,所以此時打印出的結果不一定和原始順序相同,但使用 gather() 的話可以得到原始順序的結果集, 最后我們通過 loop.close() 關閉事件循環。 綜上所述:協程的完整實現是靠①事件循環+②協程。 ```python import asyncio @asyncio.coroutine def hello(): print("Hello world!") # 異步調用asyncio.sleep(1): r = yield from asyncio.sleep(1) print("Hello again!") # 獲取EventLoop: loop = asyncio.get_event_loop() # 執行coroutine loop.run_until_complete(hello()) loop.close() ``` @asyncio.coroutine 把一個 generator 標記為 coroutine 類型,然后,我們就把這個 coroutine 扔到 EventLoop 中執行。 hello() 會首先打印出 Hello world!,然后,yield from 語法可以讓我們方便地調用另一個 generator。 由于 asyncio.sleep() 也是一個 coroutine,所以線程不會等待 asyncio.sleep(),而是直接中斷并執行下一個消息循環。當 asyncio.sleep() 返回時,線程就可以從 yield from 拿到返回值(此處是 None),然后接著執行下一行語句。 把 asyncio.sleep(1) 看成是一個耗時 1 秒的 IO 操作,在此期間,主線程并未等待,而是去執行 EventLoop 中其他可以執行的 coroutine 了,因此可以實現并發執行。 我們用 Task 封裝兩個 coroutine ```python import threading import asyncio @asyncio.coroutine def hello(): print('Hello world! (%s)' % threading.currentThread()) yield from asyncio.sleep(1) print('Hello again! (%s)' % threading.currentThread()) loop = asyncio.get_event_loop() tasks = [hello(), hello()] loop.run_until_complete(asyncio.wait(tasks)) loop.close() ``` 執行過程 ```python Hello world! (<_MainThread(MainThread, started 140735195337472)>) Hello world! (<_MainThread(MainThread, started 140735195337472)>) (暫停約1秒) Hello again! (<_MainThread(MainThread, started 140735195337472)>) Hello again! (<_MainThread(MainThread, started 140735195337472)>) ``` 由打印的當前線程名稱可以看出,兩個 coroutine 是由同一個線程并發執行的。 如果把 asyncio.sleep() 換成真正的 IO 操作,則多個 coroutine 就可以由一個線程并發執行。 比如獲取 sina、 sohu和163 ```python import asyncio @asyncio.coroutine def wget(host): print('wget %s...' % host) connect = asyncio.open_connection(host, 80) reader, writer = yield from connect header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host writer.write(header.encode('utf-8')) yield from writer.drain() while True: line = yield from reader.readline() if line == b'\r\n': break print('%s header > %s' % (host, line.decode('utf-8').rstrip())) # Ignore the body, close the socket writer.close() loop = asyncio.get_event_loop() tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']] loop.run_until_complete(asyncio.wait(tasks)) loop.close() ``` 執行結果如下: ```python wget www.sohu.com... wget www.sina.com.cn... wget www.163.com... (等待一段時間) (打印出sohu的header) www.sohu.com header > HTTP/1.1 200 OK www.sohu.com header > Content-Type: text/html ... (打印出sina的header) www.sina.com.cn header > HTTP/1.1 200 OK www.sina.com.cn header > Date: Wed, 20 May 2015 04:56:33 GMT ... (打印出163的header) www.163.com header > HTTP/1.0 302 Moved Temporarily www.163.com header > Server: Cdn Cache Server V2.0 ... ``` ## async + await實現協程 在 Python 3.4 中,我們發現很容易將協程和生成器混淆 在 Python 3.5 開始引入了新的語法 async 和 await,以簡化并更好地標識異步 IO 要使用新的語法,只需要做兩步簡單的替換: - 把 @asyncio.coroutine 替換為 async; - 把 yield from 替換為 await。 ```python import time import asyncio async def taskIO_1(): print('開始運行IO任務1...') await asyncio.sleep(2) # 假設該任務耗時2s print('IO任務1已完成,耗時2s') return taskIO_1.__name__ async def taskIO_2(): print('開始運行IO任務2...') await asyncio.sleep(3) # 假設該任務耗時3s print('IO任務2已完成,耗時3s') return taskIO_2.__name__ async def main(): # 調用方 tasks = [taskIO_1(), taskIO_2()] # 把所有任務添加到task中 done,pending = await asyncio.wait(tasks) # 子生成器 for r in done: # done和pending都是一個任務,所以返回結果需要逐個調用result() print('協程無序返回值:'+r.result()) if __name__ == '__main__': start = time.time() loop = asyncio.get_event_loop() # 創建一個事件循環對象loop try: loop.run_until_complete(main()) # 完成事件循環,直到最后一個任務結束 finally: loop.close() # 結束事件循環 print('所有IO任務總耗時%.5f秒' % float(time.time()-start)) ``` ## aiohttp asyncio 可以實現單線程并發 IO 操作。如果僅用在客戶端,發揮的威力不大。 如果把 asyncio 用在服務器端,例如 Web 服務器,由于 HTTP 連接就是 IO 操作,因此可以用單線程 +coroutine 實現多用戶的高并發支持。 asyncio 實現了 TCP、UDP、SSL 等協議,aiohttp 則是基于 asyncio 實現的 HTTP 框架。 我們先安裝 aiohttp: pip install aiohttp 然后編寫一個 HTTP 服務器,分別處理以下 URL: / - 首頁返回 b'<h1>Index</h1>'; /hello/{name} - 根據 URL 參數返回文本 hello, %s!。 代碼如下: ```python # coding: utf-8 import asyncio from aiohttp import web async def index(request): return web.Response(body='<h1>Index</h1>'.encode(), content_type='text/html') async def init(loop): app = web.Application() app.router.add_route('GET','/', index) srv = await loop.create_server(app._make_handler(),'127.0.0.1', 9000) logging.info('server started at http://127.0.0.1:9000...') return srv init() ``` ## 總結 【引出問題】: 同步編程的并發性不高 多進程編程效率受 CPU 核數限制,當任務數量遠大于 CPU 核數時,執行效率會降低。 多線程編程需要線程之間的通信,而且需要鎖機制來防止共享變量被不同線程亂改,而且由于 Python 中的 GIL (全局解釋器鎖),所以實際上也無法做到真正的并行。 【產生需求】: 可不可以采用同步的方式來編寫異步功能代碼? 能不能只用一個單線程就能做到不同任務間的切換?這樣就沒有了線程切換的時間消耗,也不用使用鎖機制來削弱多任務并發效率! 對于 IO 密集型任務,可否有更高的處理方式來節省 CPU 等待時間? 此外,多進程和多線程是內核級別的程序,而協程是函數級別的程序,是可以通過程序員進行調用的。以下是協程特性的總結: |協程|屬性| |:--|:--| |所需線程|單線程| |編程方式|同步| |實現效果|異步| |是否需要鎖機制|否| |程序級別|函數級| |實現機制|事件循環+協程| |總耗時|最耗時事件的時間| |應用場景|IO 密集型任務等| 另外tqdm 是一個用來生成進度條的優秀的庫。這個協程就像 asyncio.wait 一樣工作,不過會顯示一個代表完成度的進度條 ```python async def wait_with_progress(coros): for f in tqdm.tqdm(asyncio.as_completed(coros), total=len(coros)): await f ``` # 進程與線程 現代操作系統都是支持多任務的。 什么是多任務?比如說操作系統可以一邊開著瀏覽器進行上網,也可以一般打開Word 即使過去的單核 CPU,也可以執行多任務。由于 CPU 執行代碼都是順序執行的,那么,單核 CPU 是怎么執行多任務的呢 答案就是操作系統輪流讓各個任務交替執行,由于 CPU 的執行速度實在是太快了,我們感覺就像所有任務都在同時執行一樣。 真正的并行執行多任務只能在多核 CPU 上實現,但是,由于任務數量遠遠多于 CPU 的核心數量,所以,操作系統也會自動把很多任務輪流調度到每個核心上執行 對操作系統而言,一個任務就是一個進程。 有些進程還不止同時干一件事,比如 Word,它可以同時進行打字、拼寫檢查、打印等事情。在一個進程內部,要同時干多件事,就需要同時運行多個 “子任務”,我們把進程內的這些 “子任務” 稱為線程(Thread)。 由于每個進程至少要干一件事,所以,一個進程至少有一個線程, 如果我們要同時執行多個任務怎么辦? - 一種是啟動多個進程,每個進程雖然只有一個線程,但多個進程可以一塊執行多個任務。 - 還有一種方法是啟動一個進程,在一個進程內啟動多個線程,這樣,多個線程也可以一塊執行多個任務。 - 當然還有第三種方法,就是啟動多個進程,每個進程再啟動多個線程,這樣同時執行的任務就更多了,當然這種模型更復雜,實際很少采用。 總結一下,多任務有3種方式 - 多進程模式; - 多線程模式; - 多進程 + 多線程模式。 Python 既支持多進程,又支持多線程,我們會討論如何編寫這兩種多任務程序。 線程是最小的執行單元,而進程由至少一個線程組成。如何調度進程和線程,完全由操作系統決定,程序自己不能決定什么時候執行,執行多長時間。 多進程和多線程的程序涉及到同步、數據共享的問題,編寫起來更復雜。 ## 多進程與多線程對比 要實現多任務,通常我們會設計 Master-Worker 模式,Master 負責分配任務,Worker 負責執行任務,因此,多任務環境下,通常是一個 Master,多個 Worker。 - 如果用多進程實現 Master-Worker,主進程就是 Master,其他進程就是 Worker。 - 如果用多線程實現 Master-Worker,主線程就是 Master,其他線程就是 Worker。 多進程最大的優點在于穩定性高,一個子進程崩潰了,不會影響其他的主進程或者子進程。 當然主進程掛了所有進程就全掛了,但是 Master 進程只負責分配任務,掛掉的概率低。著名的 Apache 最早就是采用多進程模式。 但是缺點在于創建進程的代價大,在Linux上可以使用fork調用,在 Windows 下創建進程開銷巨大。另外,操作系統能同時運行的進程數也是有限的,在內存和 CPU 的限制下,如果有幾千個進程同時運行,操作系統連調度都會成問題。 多線程模式通常比多進程快一點,但是也快不到哪去,而且,多線程模式致命的缺點就是任何一個線程掛掉都可能直接造成整個進程崩潰,因為所有線程共享進程的內存。在 Windows 上,如果一個線程執行的代碼出了問題,你經常可以看到這樣的提示:“該程序執行了非法操作,即將關閉”,其實往往是某個線程出了問題,但是操作系統會強制結束整個進程。 在 Windows 下,多線程的效率比多進程要高,所以微軟的 IIS 服務器默認采用多線程模式。由于多線程存在穩定性的問題,IIS 的穩定性就不如 Apache。為了緩解這個問題,IIS 和 Apache 現在又有多進程 + 多線程的混合模式,真是把問題越搞越復雜。 而且不管是多線程還是多進程,數量一多,效率肯定上不去。 魚尾紋切換作業是有代價的,比如從語文切到數學,要先收拾桌子上的語文書本、鋼筆(這叫保存現場),然后,打開數學課本、找出圓規直尺(這叫準備新環境),才能開始做數學作業。操作系統在切換進程或者線程時也是一樣的,它需要先保存當前執行的現場環境(CPU 寄存器狀態、內存頁等),然后,把新任務的執行環境準備好(恢復上次的寄存器狀態,切換內存頁等),才能開始執行。這個切換過程雖然很快,但是也需要耗費時間。如果有幾千個任務同時進行,操作系統可能就主要忙著切換任務,根本沒有多少時間去執行任務了,這種情況最常見的就是硬盤狂響,點窗口無反應,系統處于假死狀態。 所以,多任務一旦多到一個限度,就會消耗掉系統所有的資源,結果效率急劇下降,所有任務都做不好。 考慮到 CPU 和 IO 之間巨大的速度差異,一個任務在執行的過程中大部分時間都在等待 IO 操作,單進程單線程模型會導致別的任務無法并行執行,因此,我們才需要多進程模型或者多線程模型來支持多任務并發執行。 現代操作系統對 IO 操作已經做了巨大的改進,最大的特點就是支持異步 IO。如果充分利用操作系統提供的異步 IO 支持,就可以用單進程單線程模型來執行多任務,這種全新的模型稱為事件驅動模型,Nginx 就是支持異步 IO 的 Web 服務器,它在單核 CPU 上采用單進程模型就可以高效地支持多任務。 在多核 CPU 上,可以運行多個進程(數量與 CPU 核心數相同),充分利用多核 CPU。由于系統總的進程數量十分有限,因此操作系統調度非常高效。用異步 IO 編程模型來實現多任務是一個主要的趨勢。 對應到 Python 語言,單線程的異步編程模型稱為協程 ## 多進程 Linux操作系統使用 `fork()`這個系統調用來創建子進程。 `fork()`非常的特殊,普通的函數調用,調用一次,返回一次,但是 fork() 調用一次,返回兩次,因為操作系統自動把當前進程(稱為父進程)復制了一份(稱為子進程),然后,分別在父進程和子進程內返回。 子進程永遠返回 0,而父進程返回子進程的 ID。 這樣做的理由是,一個父進程可以 fork 出很多子進程,所以,父進程要記下每個子進程的 ID,而子進程只需要調用 getppid() 就可以拿到父進程的 ID。 如何使用Python來創建子進程呢? ```python import os print('Process (%s) start...' % os.getpid()) # Only works on Unix/Linux/Mac: pid = os.fork() if pid == 0: print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid())) else: print('I (%s) just created a child process (%s).' % (os.getpid(), pid)) ``` 運行結果如下: ```python Process (876) start... I (876) just created a child process (877). I am child process (877) and my parent is 876. ``` 由于 Windows 沒有 fork 調用,上面的代碼在 Windows 上無法運行。 有了 fork 調用,一個進程在接到新任務時就可以復制出一個子進程來處理新任務,常見的 Apache 服務器就是由父進程監聽端口,每當有新的 http 請求時,就 fork 出子進程來處理新的 http 請求。 難道在 Windows 上無法用 Python 編寫多進程的程序? 由于 Python 是跨平臺的,自然也應該提供一個跨平臺的多進程支持。multiprocessing 模塊就是跨平臺版本的多進程模塊 multiprocessing 模塊提供了一個 Process 類來代表一個進程對象 ```python from multiprocessing import Process import os # 子進程要執行的代碼 def run_proc(name): print('Run child process %s (%s)...' % (name, os.getpid())) if __name__=='__main__': print('Parent process %s.' % os.getpid()) p = Process(target=run_proc, args=('test',)) print('Child process will start.') p.start() p.join() print('Child process end.') ``` 創建子進程時,只需要傳入一個執行函數和函數的參數,創建一個Process實例,用start()方法啟動,這樣創建進程比fork()還要簡單。 join()方法可以等待子進程結束后再繼續運行。 如果要啟動大量的子進程,可以用進程池的方式批量創建子進程: ```python from multiprocessing import Pool import os, time, random def long_time_task(name): print('Run task %s (%s)...' % (name, os.getpid())) start = time.time() time.sleep(random.random() * 3) end = time.time() print('Task %s runs %0.2f seconds.' % (name, (end - start))) if __name__=='__main__': print('Parent process %s.' % os.getpid()) p = Pool(4) for i in range(5): p.apply_async(long_time_task, args=(i,)) print('Waiting for all subprocesses done...') p.close() p.join() print('All subprocesses done.') ``` task 0,1,2,3 是立刻執行的,而 task 4 要等待前面某個 task 完成后才執行,這是因為 Pool 的默認大小為CPU的核數,也就是 4,因此,最多同時執行 4 個進程。這是 Pool 有意設計的限制,并不是操作系統的限制 如果改成: ```python p = Pool(5) ``` 就可以同時跑 5 個進程。 執行結果如下: ```python Parent process 669. Waiting for all subprocesses done... Run task 0 (671)... Run task 1 (672)... Run task 2 (673)... Run task 3 (674)... Task 2 runs 0.14 seconds. Run task 4 (673)... Task 1 runs 0.27 seconds. Task 3 runs 0.86 seconds. Task 0 runs 1.41 seconds. Task 4 runs 1.91 seconds. All subprocesses done. ``` ### 子進程 很多時候,子進程并不是自身,而是一個外部進程。我們創建了子進程后,還需要控制子進程的輸入和輸出。 subprocess 模塊可以讓我們非常方便地啟動一個子進程,然后控制其輸入和輸出。 ```python import subprocess print('$ nslookup www.python.org') r = subprocess.call(['nslookup', 'www.python.org']) print('Exit code:', r) ``` 這個演示了如何在 Python 代碼中運行命令 nslookup www.python.org,這和命令行直接運行的效果是一樣。 如果子進程還需要輸入,則可以通過 communicate() 方法輸入: ```python import subprocess print('$ nslookup') p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, err = p.communicate(b'set q=mx\npython.org\nexit\n') print(output.decode('utf-8')) print('Exit code:', p.returncode) ``` 相當于在命令行執行命令 nslookup,然后手動輸入: ```python set q=mx python.org exit ``` ### 進程間通信 Process 之間肯定是需要通信的,操作系統提供了很多機制來實現進程間的通信。Python 的 multiprocessing 模塊包裝了底層的機制,提供了 Queue、Pipes 等多種方式來交換數據。 我們以 Queue 為例,在父進程中創建兩個子進程,一個往 Queue 里寫數據,一個從 Queue 里讀數據: ```python from multiprocessing import Process, Queue import os, time, random # 寫數據進程執行的代碼: def write(q): print('Process to write: %s' % os.getpid()) for value in ['A', 'B', 'C']: print('Put %s to queue...' % value) q.put(value) time.sleep(random.random()) # 讀數據進程執行的代碼: def read(q): print('Process to read: %s' % os.getpid()) while True: value = q.get(True) print('Get %s from queue.' % value) if __name__=='__main__': # 父進程創建Queue,并傳給各個子進程: q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) # 啟動子進程pw,寫入: pw.start() # 啟動子進程pr,讀取: pr.start() # 等待pw結束: pw.join() # pr進程里是死循環,無法等待其結束,只能強行終止: pr.terminate() ``` 運行結果如下: ```python Process to write: 50563 Put A to queue... Process to read: 50564 Get A from queue. Put B to queue... Get B from queue. Put C to queue... Get C from queue. ``` 在 Unix/Linux 下,multiprocessing 模塊封裝了 fork() 調用,使我們不需要關注 fork() 的細節。由于 Windows 沒有 fork 調用,因此,multiprocessing 需要 “模擬” 出 fork 的效果,父進程所有 Python 對象都必須通過 pickle 序列化再傳到子進程去,所以,如果 multiprocessing 在 Windows 下調用失敗了,要先考慮是不是 pickle 失敗了。 ## 多線程 多任務可以由多進程完成,也可以由一個進程內的多線程完成。 線程是操作系統直接支持的執行單元,因此,高級語言通常都內置多線程的支持,Python 也不例外,并且,Python 的線程是真正的 Posix Thread,而不是模擬出來的線程。 Python 的標準庫提供了兩個模塊:_thread 和 threading,_thread 是低級模塊,threading 是高級模塊,對_thread 進行了封裝。絕大多數情況下,我們只需要使用 threading 這個高級模塊。 啟動一個線程就是把一個函數傳入并創建 Thread 實例,然后調用 start() 開始執行: ```python import time, threading # 新線程執行的代碼: def loop(): print('thread %s is running...' % threading.current_thread().name) n = 0 while n < 5: n = n + 1 print('thread %s >>> %s' % (threading.current_thread().name, n)) time.sleep(1) print('thread %s ended.' % threading.current_thread().name) print('thread %s is running...' % threading.current_thread().name) t = threading.Thread(target=loop, name='LoopThread') t.start() t.join() print('thread %s ended.' % threading.current_thread().name) ``` 結果為: ```python thread MainThread is running... thread LoopThread is running... thread LoopThread >>> 1 thread LoopThread >>> 2 thread LoopThread >>> 3 thread LoopThread >>> 4 thread LoopThread >>> 5 thread LoopThread ended. thread MainThread ended. ``` 由于任何進程默認就會啟動一個線程,我們把該線程稱為主線程,主線程又可以啟動新的線程 Python 的 threading 模塊有個 current_thread() 函數,它永遠返回當前線程的實例。 主線程實例的名字叫 MainThread,子線程的名字在創建時指定,我們用 LoopThread 命名子線程。名字僅僅在打印時用來顯示,完全沒有其他意義,如果不起名字 Python 就自動給線程命名為 Thread-1,Thread-2…… ### Lock 多線程和多進程最大的區別在于,多進程,同樣一個變量各自有一份拷貝在每個進程中。 而多線程,所有變量都由所有線程共享。所以,任何一個變量都可以被任何一個線程修改,因此,線程之間共享數據最大的危險在于多個線程同時改一個變量,把內容給改亂了。 比如 ```python import time, threading # 假定這是你的銀行存款: balance = 0 def change_it(n): # 先存后取,結果應該為0: global balance balance = balance + n balance = balance - n def run_thread(n): for i in range(100000): change_it(n) t1 = threading.Thread(target=run_thread, args=(5,)) t2 = threading.Thread(target=run_thread, args=(8,)) t1.start() t2.start() t1.join() t2.join() print(balance) ``` 我們定義一個共享變量balance,初始值為0. 然后啟動兩個線程,先存后取,理論上應該一直為0. 但是由于線程的調度由操作系統決定,當 t1、t2 交替執行時,只要循環次數足夠多,balance 的結果就不一定是 0 了。 原因是即使是簡單的一條語句在 CPU 執行時是若干條語句 ```python balance = balance + n ``` 也分兩步: - 計算 balance + n,存入臨時變量中; - 將臨時變量的值賦給 balance。 也就是可以看成: ```python x = balance + n balance = x ``` 而x就是局部變量,每個線程各有自己的x 如果操作系統以下面的順序執行 t1、t2: ```python 初始值 balance = 0 t1: x1 = balance + 5 # x1 = 0 + 5 = 5 t2: x2 = balance + 8 # x2 = 0 + 8 = 8 t2: balance = x2 # balance = 8 t1: balance = x1 # balance = 5 t1: x1 = balance - 5 # x1 = 5 - 5 = 0 t1: balance = x1 # balance = 0 t2: x2 = balance - 8 # x2 = 0 - 8 = -8 t2: balance = x2 # balance = -8 結果 balance = -8 ``` 這是因為修改balance需要多條語句,執行這幾條語句,線程可能中斷,從而導致多個線程把同一個對象改亂了。 如果要確保 balance 計算正確,需要給 change_it() 上一把鎖。 當某個線程開始執行 change_it() 時,我們說,該線程因為獲得了鎖,因此其他線程不能同時執行 change_it(),只能等待,直到鎖被釋放后,獲得該鎖以后才能改。由于鎖只有一個,無論多少線程,同一時刻最多只有一個線程持有該鎖,所以,不會造成修改的沖突。創建一個鎖就是通過 threading.Lock() 來實現: ```python balance = 0 lock = threading.Lock() def run_thread(n): for i in range(100000): # 先要獲取鎖: lock.acquire() try: # 放心地改吧: change_it(n) finally: # 改完了一定要釋放鎖: lock.release() ``` 當多個線程同時執行 lock.acquire() 時,只有一個線程能成功地獲取鎖,然后繼續執行代碼,其他線程就繼續等待直到獲得鎖為止。 獲得鎖的線程用完后一定要釋放鎖,否則那些苦苦等待鎖的線程將永遠等待下去,成為死線程。所以我們用 try...finally 來確保鎖一定會被釋放。 鎖的好處就是確保了某段關鍵代碼只能由一個線程從頭到尾完整地執行,壞處當然也很多,首先是阻止了多線程并發執行,包含鎖的某段代碼實際上只能以單線程模式執行,效率就大大地下降了。 其次,由于可以存在多個鎖,不同的線程持有不同的鎖,并試圖獲取對方持有的鎖時,可能會造成死鎖,導致多個線程全部掛起,既不能執行,也無法結束,只能靠操作系統強制終止。 ### 多核CPU 要想把 N 核 CPU 的核心全部跑滿,就必須啟動 N 個死循環線程。 現在啟動一個與CPU核心數量相同的N個線程 ```python import threading, multiprocessing def loop(): x = 0 while True: x = x ^ 1 for i in range(multiprocessing.cpu_count()): t = threading.Thread(target=loop) t.start() ``` 但是4 核 CPU 上可以監控到 CPU 占用率僅有 102%,也就是僅使用了一核。 但是用 C、C++ 或 Java 來改寫相同的死循環,直接可以把全部核心跑滿,4 核就跑到 400%,8 核就跑到 800%,為什么 Python 不行呢? 因為 Python 的線程雖然是真正的線程,但解釋器執行代碼時,有一個 GIL 鎖:Global Interpreter Lock,任何 Python 線程執行前,必須先獲得 GIL 鎖,然后,每執行 100 條字節碼,解釋器就自動釋放 GIL 鎖,讓別的線程有機會執行。這個 GIL 全局鎖實際上把所有線程的執行代碼都給上了鎖,所以,多線程在 Python 中只能交替執行,即使 100 個線程跑在 100 核 CPU 上,也只能用到 1 個核。 GIL 是 Python 解釋器設計的歷史遺留問題,通常我們用的解釋器是官方實現的 CPython,要真正利用多核,除非重寫一個不帶 GIL 的解釋器。 所以,在 Python 中,可以使用多線程,但不要指望能有效利用多核。如果一定要通過多線程利用多核,那只能通過 C 擴展來實現,不過這樣就失去了 Python 簡單易用的特點。 不過,也不用過于擔心,Python 雖然不能利用多線程實現多核任務,但可以通過多進程實現多核任務。多個 Python 進程有各自獨立的 GIL 鎖,互不影響。 ## ThreadLocal 在多線程環境下,每個線程都有自己的數據。一個線程使用自己的局部變量比使用全局變量好,因為局部變量只有線程自己能看見,不會影響其他線程,而全局變量的修改必須加鎖。 但是局部變量也有問題,就是在函數調用的時候,傳遞起來很麻煩: ```python def process_student(name): std = Student(name) # std是局部變量,但是每個函數都要用它,因此必須傳進去: do_task_1(std) do_task_2(std) def do_task_1(std): do_subtask_1(std) do_subtask_2(std) def do_task_2(std): do_subtask_2(std) do_subtask_2(std) ``` 每個函數如果要使用局部變量,則需要一層一層的傳進去。 我們也可以用一個全局 dict 存放所有的 Student 對象,然后以 thread 自身作為 key 獲得線程對應的 Student 對象 ```python global_dict = {} def std_thread(name): std = Student(name) # 把std放到全局變量global_dict中: global_dict[threading.current_thread()] = std do_task_1() do_task_2() def do_task_1(): # 不傳入std,而是根據當前線程查找: std = global_dict[threading.current_thread()] ... def do_task_2(): # 任何函數都可以查找出當前線程的std變量: std = global_dict[threading.current_thread()] ... ``` 這樣寫理論上是可行的,但是代碼可讀性比較差。 所以ThreadLocal 應運而生,ThreadLocal 幫你查找 dict ```python import threading # 創建全局ThreadLocal對象: local_school = threading.local() def process_student(): # 獲取當前線程關聯的student: std = local_school.student print('Hello, %s (in %s)' % (std, threading.current_thread().name)) def process_thread(name): # 綁定ThreadLocal的student: local_school.student = name process_student() t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A') t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B') t1.start() t2.start() t1.join() t2.join() ``` 執行結果: ```python Hello, Alice (in Thread-A) Hello, Bob (in Thread-B) ``` 全局變量 local_school 就是一個 ThreadLocal 對象,每個 Thread 對它都可以讀寫 student 屬性,但互不影響。 可以把 local_school 看成全局變量,但每個屬性如 local_school.student 都是線程的局部變量,可以任意讀寫而互不干擾,也不用管理鎖的問題,ThreadLocal 內部會處理。 可以理解為全局變量 local_school 是一個 dict,不但可以用 local_school.student,還可以綁定其他變量,如 local_school.teacher 等等。 ThreadLocal 最常用的地方就是為每個線程綁定一個數據庫連接,HTTP 請求,用戶身份信息等,這樣一個線程的所有調用到的處理函數都可以非常方便地訪問這些資源。 > ThreadLocal 解決了參數在一個線程中各個函數之間互相傳遞的問題。 ## 分布式進程 在 Thread 和 Process 中,應當優選 Process,因為 Process 更穩定,而且,Process 可以分布到多臺機器上,而 Thread 最多只能分布到同一臺機器的多個 CPU 上。 Python 的 multiprocessing 模塊不但支持多進程,其中 managers 子模塊還支持把多進程分布到多臺機器上。一個服務進程可以作為調度者,將任務分布到其他多個進程中,依靠網絡通信。 舉個例子:如果我們已經有一個通過 Queue 通信的多進程程序在同一臺機器上運行,現在,由于處理任務的進程任務繁重,希望把發送任務的進程和處理任務的進程分布到兩臺機器上。怎么用分布式進程實現? 原有的 Queue 可以繼續使用,但是,通過 managers 模塊把 Queue 通過網絡暴露出去,就可以讓其他機器的進程訪問 Queue 了。 我們先看服務進程,服務進程負責啟動 Queue,把 Queue 注冊到網絡上,然后往 Queue 里面寫入任務: ```python import random, time, queue from multiprocessing.managers import BaseManager # 發送任務的隊列: task_queue = queue.Queue() # 接收結果的隊列: result_queue = queue.Queue() # 從BaseManager繼承的QueueManager: class QueueManager(BaseManager): pass # 把兩個Queue都注冊到網絡上, callable參數關聯了Queue對象: QueueManager.register('get_task_queue', callable=lambda: task_queue) QueueManager.register('get_result_queue', callable=lambda: result_queue) # 綁定端口5000, 設置驗證碼'abc': manager = QueueManager(address=('', 5000), authkey=b'abc') # 啟動Queue: manager.start() # 獲得通過網絡訪問的Queue對象: task = manager.get_task_queue() result = manager.get_result_queue() # 放幾個任務進去: for i in range(10): n = random.randint(0, 10000) print('Put task %d...' % n) task.put(n) # 從result隊列讀取結果: print('Try get results...') for i in range(10): r = result.get(timeout=10) print('Result: %s' % r) # 關閉: manager.shutdown() print('master exit.') ``` 請注意,當我們在一臺機器上寫多進程程序時,創建的 Queue 可以直接拿來用,但是,在分布式多進程環境下,添加任務到 Queue 不可以直接對原始的 task_queue 進行操作,必須通過 manager.get_task_queue() 獲得的 Queue 接口添加,否則那樣就繞過了 QueueManager 的封裝, 然后,在另一臺機器上啟動任務進程(本機上啟動也可以): ```python import time, sys, queue from multiprocessing.managers import BaseManager # 創建類似的QueueManager: class QueueManager(BaseManager): pass # 由于這個QueueManager只從網絡上獲取Queue,所以注冊時只提供名字: QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') # 連接到服務器,也就是運行task_master.py的機器: server_addr = '127.0.0.1' print('Connect to server %s...' % server_addr) # 端口和驗證碼注意保持與task_master.py設置的完全一致: m = QueueManager(address=(server_addr, 5000), authkey=b'abc') # 從網絡連接: m.connect() # 獲取Queue的對象: task = m.get_task_queue() result = m.get_result_queue() # 從task隊列取任務,并把結果寫入result隊列: for i in range(10): try: n = task.get(timeout=1) print('run task %d * %d...' % (n, n)) r = '%d * %d = %d' % (n, n, n*n) time.sleep(1) result.put(r) except Queue.Empty: print('task queue is empty.') # 處理結束: print('worker exit.') ``` 任務進程要通過網絡連接到服務進程,所以要指定服務進程的 IP。 現在,可以試試分布式進程的工作效果了。先啟動 task_master.py 服務進程: ```python $ python3 task_master.py Put task 3411... Put task 1605... Put task 1398... Put task 4729... Put task 5300... Put task 7471... Put task 68... Put task 4219... Put task 339... Put task 7866... Try get results... ``` task_master.py 進程發送完任務后,開始等待 result 隊列的結果。現在啟動 task_worker.py 進程: ```python $ python3 task_worker.py Connect to server 127.0.0.1... run task 3411 * 3411... run task 1605 * 1605... run task 1398 * 1398... run task 4729 * 4729... run task 5300 * 5300... run task 7471 * 7471... run task 68 * 68... run task 4219 * 4219... run task 339 * 339... run task 7866 * 7866... worker exit. ``` task_worker.py 進程結束,在 task_master.py 進程中會繼續打印出結果: ```python Result: 3411 * 3411 = 11634921 Result: 1605 * 1605 = 2576025 Result: 1398 * 1398 = 1954404 Result: 4729 * 4729 = 22363441 Result: 5300 * 5300 = 28090000 Result: 7471 * 7471 = 55815841 Result: 68 * 68 = 4624 Result: 4219 * 4219 = 17799961 Result: 339 * 339 = 114921 Result: 7866 * 7866 = 61873956 ``` Master/Worker 模型有什么用? 其實這就是一個簡單但真正的分布式計算,把代碼稍加改造,啟動多個 worker,就可以把任務分布到幾臺甚至幾十臺機器上,比如把計算 n*n 的代碼換成發送郵件,就實現了郵件隊列的異步發送。 Queue 對象存儲在哪?注意到 task_worker.py 中根本沒有創建 Queue 的代碼,所以,Queue 對象存儲在 task_master.py 進程中: ```python ┌─────────────────────────────────────────┐ ┌──────────────────────────────────────┐ │task_master.py │ │ │task_worker.py │ │ │ │ │ │ task = manager.get_task_queue() │ │ │ task = manager.get_task_queue() │ │ result = manager.get_result_queue() │ │ result = manager.get_result_queue() │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ │ │ ┌─────────────────────────────────┐ │ │ │ │ │ │QueueManager │ │ │ │ │ │ │ │ ┌────────────┐ ┌──────────────┐ │ │ │ │ │ │ │ │ task_queue │ │ result_queue │ │<───┼──┼──┼──────────────┘ │ │ │ └────────────┘ └──────────────┘ │ │ │ │ │ └─────────────────────────────────┘ │ │ │ │ └─────────────────────────────────────────┘ └──────────────────────────────────────┘ │ Network ``` 而 Queue 之所以能通過網絡訪問,就是通過 QueueManager 實現的。由于 QueueManager 管理的不止一個 Queue,所以,要給每個 Queue 的網絡調用接口起個名字,比如 get_task_queue。 authkey 有什么用?這是為了保證兩臺機器正常通信,不被其他機器惡意干擾。如果 task_worker.py 的 authkey 和 task_master.py 的 authkey 不一致,肯定連接不上。 注意 Queue 的作用是用來傳遞任務和接收結果,每個任務的描述數據量要盡量小。比如發送一個處理日志文件的任務,就不要發送幾百兆的日志文件本身,而是發送日志文件存放的完整路徑,由 Worker 進程再去共享的磁盤上讀取文件。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看