<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                # 協程(微線程,纖程) [TOC] --- ## 基礎概念 多線程和多進程的模型雖然解決了并發問題,但是系統不能無上限地增加線程。由于系統切換線程的開銷也很大,所以,一旦線程數量過多,CPU的時間就花在線程切換上了,真正運行代碼的時間就少了,結果導致性能嚴重下降。由于我們要解決的問題是CPU高速執行能力和IO設備的龜速嚴重不匹配,多線程和多進程只是解決這一問題的一種方法。 另一種解決IO問題的方法是異步IO。當代碼需要執行一個耗時的IO操作時,它只發出IO指令,并不等待IO結果,然后就去執行其他代碼了。一段時間后,當IO返回結果時,再通知CPU進行處理。 ### 同步IO模型和異步IO模型 ```python # 同步IO模型: do_some_code() f = open('/path/to/file', 'r') r = f.read() # <== 線程停在此處等待IO操作結果 # IO操作完成后線程才能繼續執行: do_some_code(r) # 異步io模型 loop = get_event_loop() # 需要一個消息循環 while True: # 主線程不斷地重復“讀取消息-處理消息”這一過程 event = loop.get_event() process_event(event) ``` 在“發出IO請求”到收到“IO完成”的這段時間里,同步IO模型下,主線程只能掛起,但異步IO模型下,主線程并沒有休息,而是在消息循環中繼續處理其他消息。這樣,**在異步IO模型下,一個線程就可以同時處理多個IO請求,并且沒有切換線程的操作。對于大多數IO密集型的應用程序,使用異步IO將大大提升系統的多任務處理能力。** 協程是實現異步IO的高級形式,又稱微線程,纖程。英文名Coroutine。 子程序,或者稱為函數,在所有語言中都是層級調用,比如A調用B,B在執行過程中又調用了C,C執行完畢返回,B執行完畢返回,最后是A執行完畢。 子程序調用總是一個入口,一次返回,調用順序是明確的。而協程的調用和子程序不同。協程看上去也是子程序,但執行過程中,在子程序內部可中斷,然后轉而執行別的子程序,在適當的時候再返回來接著執行。 ``` def A(): print('1') print('2') print('3') def B(): print('x') print('y') print('z') # 協程執行結果,可能如下 1 2 x y 3 z ``` 看起來A、B的執行有點像多線程,但協程的特點在于是一個線程執行,那和多線程比,協程有何優勢? 主要行效率比多線程高很多,主要表現一下兩點: 1. 不用切換線程,而是由程序自身控制,因此,沒有線程切換的開銷,和多線程比,線程數量越多,協程的性能優勢就越明顯。 2. 不需要多線程的鎖機制,因為只有一個線程,也不存在同時寫變量沖突,在協程中控制共享資源不加鎖,只需要判斷狀態就好了。 因為協程是一個線程執行,那怎么利用多核CPU呢?最簡單的方法是多進程+協程,既充分利用多核,又充分發揮協程的高效率,可獲得極高的性能。 Python對協程的支持是通過generator實現的。 在generator中,我們不但可以通過for循環來迭代,還可以不斷調用next()函數獲取由yield語句返回的下一個值。Python的yield不但可以返回一個值,它還可以接收調用者發出的參數。 ```python import inspect def consumer(): r = '' while True: # 3. 通過yield拿到消息n(最開始send進來的n為none,不返回只做啟動用) # yield關鍵字右邊可以不需要加表達式(yield默認返回None) n = yield r if not n: return # 4. 拿到n之后進行處理 print('[CONSUMER] Consuming %s...' % n) # 5. 處理完成,再下個循環又通過yield返回 r = '200 OK' def produce(c): # 1. 調用c.send(None)啟動生成器; # GEN_CREATED: 等待開始執行 print(inspect.getgeneratorstate(c)) c.send(None) n = 0 while n < 3: n = n + 1 print('[PRODUCER] Producing %s...' % n) # 2. 一旦生產了東西,通過c.send(n)切換到consumer執行 r = c.send(n) print(inspect.getgeneratorstate(c)) # 6. 得到consumer處理的結果,再通過下一個循環,繼續生產下一條消息 print('[PRODUCER] Consumer return: %s' % r) # 在close前,狀態 都是 GEN_SUSPENDED # 在yield表達式處暫停 print(inspect.getgeneratorstate(c)) # 7. produce決定不生產了,通過c.close()關閉consumer,整個過程結束。 c.close() # close后 狀態為GEN_CLOSED # 執行結束 print(inspect.getgeneratorstate(c)) c = consumer() produce(c) ``` > 整個流程無鎖,由一個線程執行,`produce`和`consumer`協作完成任務,所以稱為“協程”,而非線程的搶占式多任務。 ### 協程生成器的基本行為 協程有四個狀態,可以使用`inspect.getgeneratorstate(...)`函數確定: GEN_CREATED # 等待開始執行 GEN_RUNNING # 解釋器正在執行(只有在多線程應用中才能看到這個狀態) GEN_SUSPENDED # 在yield表達式處暫停 GEN_CLOSED # 執行結束 ### 生成器api 1. `.send(value)`方法,生成器可以使用`.send(...)`方法發送數據,發送的數據會成為生成器函數中yield表達式的值。如上列中的n和r 2. `.throw(...)`方法,讓調用方拋出異常,在生成器中處理 3. `.close()`方法,終止生成器 ## asyncio asyncio是Python 3.4版本引入的標準庫,直接內置了對異步IO的支持,asyncio的編程模型就是一個消息循環。我們從asyncio模塊中直接獲取一個EventLoop的引用,然后把需要執行的協程扔到EventLoop中執行,就實現了異步IO。 ### @asyncio.coroutine 用asyncio提供的@asyncio.coroutine可以把一個generator標記為coroutine類型,然后在coroutine內部用yield from調用另一個coroutine實現異步操作。 ```python import asyncio import threading # @asyncio.coroutine把一個generator標記為coroutine類型 @asyncio.coroutine def baby(num): print('baby %s sleep! (%s)' % (num,threading.currentThread())) # 異步調用asyncio.sleep(2)生成器: 假設是一個耗時2秒的IO操作,在此期間,主線程并未等待,而是去執行EventLoop中其他可以執行的coroutine了,因此可以實現并發執行。 yield from asyncio.sleep(2) print('baby %s week up! (%s)' % (num,threading.currentThread())) # 獲取EventLoop: 事件循環對象 loop = asyncio.get_event_loop() tasks = [baby(1), baby(2), baby(3)] # 把上面coroutine扔到EventLoop中執行 loop.run_until_complete(asyncio.wait(tasks)) loop.close() ''' baby 1 sleep! (<_MainThread(MainThread, started 29028)>) baby 2 sleep! (<_MainThread(MainThread, started 29028)>) baby 3 sleep! (<_MainThread(MainThread, started 29028)>) # (暫停約2秒,并且是在同一線程里面,實現了并發) baby 1 week up! (<_MainThread(MainThread, started 29028)>) baby 2 week up! (<_MainThread(MainThread, started 29028)>) baby 3 week up! (<_MainThread(MainThread, started 29028)>) 1. baby(1)執行到yield,線程不會等待asyncio.sleep(),而是直接中斷并執行下一個消息循環baby(2) 2. baby(2)執行到yield,線程不會等待asyncio.sleep(),而是直接中斷并執行下一個消息循環baby(3) 3. baby(3)執行到yield,線程不會等待asyncio.sleep(),而是直接中斷并執行消息循環baby(1),至此所有操作都是以極快的時間完成的,花費了極少時間,此時三個baby同時都在睡眠,(asyncio.sleep) 4. 等待baby(1)睡眠完成,此時幾乎同時其他baby也的睡眠也結束了,所以接著執行各個baby的打印wake up操作.結束整個消息循環,run_until_complete結束. ''' ``` **用asyncio的異步網絡連接來獲取sina、sohu和163的網站首頁** ```python import asyncio @asyncio.coroutine def wget(host): print('wget %s...' % host) connect = asyncio.open_connection(host, 80) reader, writer = yield from connect header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host writer.write(header.encode('utf-8')) # 刷新底層傳輸的寫緩沖區。也就是把需要發送出去的數據,從緩沖區發送出去。沒有手工刷新,asyncio為你自動刷新了。當執行到reader.readline()時,asyncio知道應該把發送緩沖區的數據發送出去了。 yield from writer.drain() while True: line = yield from reader.readline() if line == b'\r\n': break print('%s header > %s' % (host, line.decode('utf-8').rstrip())) # Ignore the body, close the socket writer.close() loop = asyncio.get_event_loop() tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']] loop.run_until_complete(asyncio.wait(tasks)) loop.close() ``` ### async/await async和await是針對coroutine的新語法,要使用新的語法,只需要做兩步簡單的替換: 1. 把@asyncio.coroutine替換為async; 2. 把yield from替換為await。 使用async可以定義協程對象,使用await可以針對耗時的操作進行掛起,就像生成器里的yield一樣,函數讓出控制權。**協程遇到await,事件循環將會掛起該協程,執行別的協程,直到其他的協程也掛起或者執行完畢,再進行下一個協程的執行**。 ```python import asyncio import threading async def baby(num): print('baby %s sleep! (%s)' % (num,threading.currentThread())) await asyncio.sleep(1) print('baby %s week up! (%s)' % (num,threading.currentThread())) loop = asyncio.get_event_loop() # ???? 執行完的順序讓人疑惑 tasks = [baby(2), baby(1), baby(3),baby(4),baby(5)] loop.run_until_complete(asyncio.wait(tasks)) loop.close() ``` > tips: await 和 yield from 可以理解為 “不等了” (主線程是一個事件循環,執行到await,就“我不等了,您慢慢執行,我先走一步,好了再給我說”) ### 綁定回調 ```python import time import asyncio now = lambda : time.time() async def do_some_work(x): print('Waiting: ', x) return 'Done after {}s'.format(x) def callback(future): print('Callback: ', future.result()) start = now() coroutine = do_some_work(2) loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) task.add_done_callback(callback) loop.run_until_complete(task) print('TIME: ', now() - start) ``` **利用future對象回調別的函數** ?? future對象的特性,以下代碼不太懂 ?? ```python import asyncio import functools def callback(future, n): print('{}: future done: {}'.format(n, future.result())) async def register_callbacks(all_done): print('registering callbacks on future') # 偏函數配合回調,all_done是future對象 all_done.add_done_callback(functools.partial(callback, n=1)) all_done.add_done_callback(functools.partial(callback, n=2)) async def main(all_done): # 到此同步中斷,異步執行回調函數注冊 await register_callbacks(all_done) print('setting result of future') all_done.set_result('the result') event_loop = asyncio.get_event_loop() try: all_done = asyncio.Future() event_loop.run_until_complete(main(all_done)) finally: event_loop.close() ''' registering callbacks on future setting result of future 1: future done: the result 2: future done: the result ''' ``` ### 多線程與asyncio對比 **多線程** ```python # sinner_thread.py import threading import itertools import time import sys # 這個類定義一個可變對象,用于從外部控制線程 class Signal: go = True # 這個函數會在單獨的線程中運行,signal 參數是前邊定義的Signal類的實例 def spin(msg, signal): write, flush = sys.stdout.write, sys.stdout.flush # itertools.cycle 函數從指定的序列中反復不斷地生成元素 for char in itertools.cycle('|/-\\'): status = char + ' ' + msg write(status) flush() write('\x08' * len(status)) # 使用退格符把光標移回行首 time.sleep(.1) # 每 0.1 秒刷新一次 if not signal.go: # 如果 go屬性不是 True,退出循環 break write(' ' * len(status) + '\x08' * len(status)) # 使用空格清除狀態消息,把光標移回開頭 def slow_function(): # 模擬耗時操作 # 假裝等待I/O一段時間 time.sleep(20) # 調用sleep 會阻塞主線程,這么做事為了釋放GIL,創建從屬線程 return 42 # 這個函數設置從屬線程,顯示線程對象,運行耗時計算,最后殺死進程 def supervisor(): signal = Signal() spinner = threading.Thread(target=spin, args=('thinking!', signal)) print('spinner object:', spinner) # 顯示線程對象 輸出 spinner object: <Thread(Thread-1, initial)> spinner.start() # 啟動從屬進程 result = slow_function() # 運行slow_function 行數,阻塞主線程。同時從屬線程以動畫形式旋轉指針 # python 并沒有提供終止線程的API,所以若想關閉線程,必須給線程發送消息。這里我們使用signal.go 屬性:在主線程中把它設置為False后,spinner 線程會接收到,然后退出 signal.go = False spinner.join() # 等待spinner 線程結束 return result def main(): result = supervisor() print('Answer', result) if __name__ == '__main__': main() ``` **協程** ```python # spinner_asyncio.py # 通過協程以動畫的形式顯示文本式旋轉指針 import asyncio import itertools import sys async def spin(msg): write, flush = sys.stdout.write, sys.stdout.flush for char in itertools.cycle('|/-\\'): # itertools.cycle 函數從指定的序列中反復不斷地生成元素 status = char + ' ' + msg write(status) flush() write('\x08' * len(status)) # 使用退格符把光標移回行首 try: # 使用 yield from asyncio.sleep(0.1) 代替 time.sleep(.1), 這樣的休眠不會阻塞事件循環 # 除非想阻塞主線程,從而凍結事件循環或整個應用,否則不要再 asyncio 協程中使用 time.sleep().如果協程需要在一段時間內什么都不做,應該使用 yield from asyncio.sleep(DELAY) # 此處相當于另一協程 await asyncio.sleep(0.1) # 如果 spin 函數蘇醒后拋出 asyncio.CancelledError 異常,其原因是發出了取消請求 except asyncio.CancelledError as e: print(str(e)) break write(' ' * len(status) + '\x08' * len(status)) # 使用空格清除狀態消息,把光標移回開頭 async def slow_function(): # 5 現在此函數是協程,使用休眠假裝進行I/O 操作時,使用 yield from 繼續執行事件循環 # 假裝等待I/O一段時間 await asyncio.sleep(3) # 此表達式把控制權交給主循環,在休眠結束后回復這個協程 return 42 # ?? 不能改為asynic supervisor 否則asyncio.async會報錯 ,已找到原因已被asyncio.ensure_future替代?? async def supervisor(): spinner = asyncio.ensure_future(spin('thinking!')) # asyncio.async() 函數排定協程的運行時間,使用一個 Task 對象包裝spin 協程,并立即返回 print('spinner object:', spinner) # Task 對象,輸出類似 spinner object: <Task pending coro=<spin() running at spinner_asyncio.py:6>> # 驅動slow_function() 函數,結束后,獲取返回值。同事事件循環繼續運行, # 因為slow_function 函數最后使用yield from asyncio.sleep(3) 表達式把控制權交給主循環 result = await slow_function() # Task 對象可以取消;取消后會在協程當前暫停的yield處拋出 asyncio.CancelledError 異常 # 協程可以捕獲這個異常,也可以延遲取消,甚至拒絕取消 spinner.cancel() return result def main(): loop = asyncio.get_event_loop() # 獲取事件循環引用 # 驅動supervisor 協程,讓它運行完畢;這個協程的返回值是這次調用的返回值 result = loop.run_until_complete(supervisor()) loop.close() print('Answer', result) if __name__ == '__main__': main() ``` **分析兩段代碼** 1. Task對象不由自己動手實例化,而是通過把協程傳給 asyncio.async(...) 函數或 loop.create_task(...) 方法獲取Task 對象已經排定了運行時間;而Thread 實例必須調用start方法,明確告知它運行 2. 在線程版supervisor函數中,slow_function 是普通的函數,由線程直接調用,而異步版的slow_function 函數是協程,由yield from 驅動。 3. 沒有API能從外部終止線程,因為線程隨時可能被中斷。而協程如果想終止任務,可以使用Task.cancel() 實例方法,在協程內部拋出CancelledError 異常。協程可以在暫停的yield 處捕獲這個異常,處理終止請求 4. supervisor 協程必須在main 函數中由loop.run_until_complete 方法執行。 5. **協程和線程相比關鍵的一個優點是**線程必須記住保留鎖,去保護程序中的重要部分,防止多步操作在執行的過程中中斷,而協程默認會做好保護,我們必須顯式產出(使用yield 或 yield from 交出控制權)才能讓程序的余下部分運行。 #### asyncio.Future:故意不阻塞 asyncio.Future 類與 concurrent.futures.Future 類的接口基本一致,不過實現方式不同,不可互換。在 concurrent.futures.Future 中,future只是調度執行某物的結果。在 asyncio 包中,BaseEventLoop.create_task(...) 方法接收一個協程,排定它的運行時間,然后返回一個asyncio.Task 實例(也是asyncio.Future 類的實例,因為 Task 是 Future 的子類,用于包裝協程。(在 concurrent.futures.Future 中,類似的操作是Executor.submit(...))。 與concurrent.futures.Future 類似,asyncio.Future 類也提供了: * `.done()` 返回布爾值,表示Future 是否已經執行 * `.add_done_callback()` 這個方法只有一個參數,類型是可調用對象,Future運行結束后會回調這個對象。 * `.result()` 這個方法沒有參數,因此不能指定超時時間。 如果調用 .result() 方法時期還沒有運行完畢,會拋出 asyncio.InvalidStateError 異常。 ### 協程嵌套 (協程的常用方式) ```python import asyncio import time now = lambda: time.time() async def do_some_work(x): print('Waiting: ', x) await asyncio.sleep(x) return 'Done after {}s'.format(x) async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] ''' # wait 有timeout參數 dones, pendings = await asyncio.wait(tasks) for task in dones: print('Task ret: ', task.result()) ''' # 如果使用的是 asyncio.gather創建協程對象,那么await的返回值就是協程運行的結果。 # 使用asyncio.wait(tasks)返回的順序有點難以理解,但使用asyncio.gather(*tasks)返回值的順序就好理解得多 results = await asyncio.gather(*tasks) for result in results: print('Task ret: ', result) start = now() loop = asyncio.get_event_loop() loop.run_until_complete(main()) print('TIME: ', now() - start) ``` **拋出返回值到run_until_complete:** ```python async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] return await asyncio.gather(*tasks) start = now() loop = asyncio.get_event_loop() # 也可以直接返回到run_until_complete處理協程結果 results = loop.run_until_complete(main()) for result in results: print('Task ret: ', result) print('TIME: ', now() - start) ``` ***使用as_completed** ```python async def main(num): tasks = [] i = 1 while i <= num : tasks.append(asyncio.ensure_future(do_some_work(i))) i += 1 for task in asyncio.as_completed(tasks): result = await task print('Task ret: {}'.format(result)) start = now() loop = asyncio.get_event_loop() done = loop.run_until_complete(main(10)) print('TIME: ', now() - start) ``` ### 協程停止 future對象有幾個狀態:Pending,Running,Done,Cancelled. 創建future的時候,task為pending,事件循環調用執行的時候當然就是running,調用完畢自然就是done,如果需要停止事件循環,就需要先把task取消。可以使用asyncio.Task獲取事件循環的task ```python try: loop = asyncio.get_event_loop() loop.run_until_complete(main(5)) except KeyboardInterrupt as e: for task in asyncio.Task.all_tasks(): # 啟動事件循環之后,馬上ctrl+c,會觸發run_until_complete的執行異常 KeyBorardInterrupt。然后通過循環asyncio.Task取消future。 print(task) # 返回true或false,已執行的返回false print(task.cancel()) #loop stop之后還需要再次開啟事件循環,最后再close,不然還會拋出異常: # 拋出異常后要重新啟動循環 loop.stop() loop.run_forever() finally: loop.close() ``` **批量停止** ```python loop = asyncio.get_event_loop() task = asyncio.ensure_future(main(5)) try: loop.run_until_complete(task) except KeyboardInterrupt as e: print(asyncio.Task.all_tasks()) print('-------------------') # 批量停止,如果全部停止成功就直接返回true,與上列不同 print(asyncio.gather(*asyncio.Task.all_tasks()).cancel()) loop.stop() loop.run_forever() finally: loop.close() ``` ### 不同線程的事件循環 很多時候,我們的事件循環用于注冊協程,而有的協程需要動態的添加到事件循環中。一個簡單的方式就是使用多線程。當前線程創建一個事件循環,然后在新建一個線程,在新線程中啟動事件循環。當前線程不會被block。 ```python import asyncio from threading import Thread import time now = lambda: time.time() def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() def more_work(x): print('More work {}'.format(x)) time.sleep(x) print('Finished more work {}'.format(x)) start = now() new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.start() new_loop.call_soon_threadsafe(more_work, 6) new_loop.call_soon_threadsafe(more_work, 3) # 這里的計時沒有意義,因為more_work具體的執行是在新的兩個線程里面 print('TIME: {}'.format(now() - start)) ``` 啟動上述代碼之后,當前線程不會被block,新線程中會按照順序執行call_soon_threadsafe方法注冊的more_work方法,后者因為time.sleep操作是同步阻塞的,因此運行完畢more_work需要大致6 + 3 ### 新線程協程 ```python def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() async def do_some_work(x): print('Waiting {}'.format(x)) await asyncio.sleep(x) print('Done after {}s'.format(x)) start = now() new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.start() asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop) asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop) print('TIME: {}'.format(time.time() - start)) ``` 上述的例子,主線程中創建一個new_loop,然后在另外的兩個子線程中開啟一個無限事件循環。主線程通過run_coroutine_threadsafe新注冊協程對象。這樣就能在子線程中進行事件循環的并發操作,同時主線程又不會被block。一共執行的時間大概在6s左右。 ### master-worker主從模式 對于并發任務,通常是用生成消費模型,對隊列的處理可以使用類似master-worker的方式,master主要用戶獲取隊列的msg,worker用戶處理消息。 為了簡單起見,并且協程更適合單線程的方式,我們的主線程用來監聽隊列,子線程用于處理隊列。這里使用redis的隊列。主線程中有一個是無限循環,用戶消費隊列。 ```python while True: task = rcon.rpop("queue") if not task: time.sleep(1) continue asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop) ``` ### 停止子線程 如果一切正常,那么上面的例子很完美。可是,需要停止程序,直接ctrl+c,會拋出KeyboardInterrupt錯誤,我們修改一下主循環: ```python try: while True: task = rcon.rpop("queue") if not task: time.sleep(1) continue asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop) except KeyboardInterrupt as e: print(e) new_loop.stop() ``` 可是實際上并不好使,雖然主線程try了KeyboardInterrupt異常,但是子線程并沒有退出,為了解決這個問題,可以設置子線程為守護線程,這樣當主線程結束的時候,子線程也隨機退出。 ```python new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.setDaemon(True) # 設置子線程為守護線程 t.start() try: while True: # print('start rpop') task = rcon.rpop("queue") if not task: time.sleep(1) continue asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop) except KeyboardInterrupt as e: print(e) new_loop.stop() ``` 線程停止程序的時候,主線程退出后,子線程也隨機退出才了,并且停止了子線程的協程任務。 ```python try: while True: # 用brpop方法,會block住task,如果主線程有消息,才會消費。 # 這種方式更適合隊列消費,不用上面的要停頓一秒 _, task = rcon.brpop("queue") asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop) except Exception as e: print('error', e) new_loop.stop() finally: pass ``` ### 協程消費 主線程用于監聽隊列,然后子線程的做事件循環的worker是一種方式。還有一種方式實現這種類似master-worker的方案。即把監聽隊列的無限循環挪進協程中。程序初始化就創建若干個協程,實現類似并行的效果。一般這個方案就可以了 ```python import time import asyncio import redis now = lambda : time.time() # 最多開多少個協程 MAX_COROUTINES = 10 def get_redis(): connection_pool = redis.ConnectionPool(host='127.0.0.1', db=1,port=6379) # connection_pool = redis.ConnectionPool(host='172.28.3.24', db=1,port=6379) return redis.Redis(connection_pool=connection_pool) rcon = get_redis() async def worker(): print('Start worker') while True: start = now() task = rcon.rpop("queue") if not task: await asyncio.sleep(1) continue print('Wait ', int(task)) await asyncio.sleep(int(task)) print('Done ', task, now() - start) def main(): i = 0 while i < MAX_COROUTINES: asyncio.ensure_future(worker()) i += 1 loop = asyncio.get_event_loop() try: loop.run_forever() except KeyboardInterrupt as e: print(asyncio.gather(*asyncio.Task.all_tasks()).cancel()) loop.stop() loop.run_forever() finally: loop.close() if __name__ == '__main__': main() ``` ## aiohttp asyncio可以實現單線程并發IO操作。如果僅用在客戶端,發揮的威力不大。如果把asyncio用在服務器端,例如Web服務器,由于HTTP連接就是IO操作,因此可以用單線程+coroutine實現多用戶的高并發支持。 asyncio實現了TCP、UDP、SSL等協議,aiohttp則是基于asyncio實現的HTTP框架。 ### 實現web服務器 ```python import asyncio from aiohttp import web async def index(request): await asyncio.sleep(0.5) return web.Response(body=b'<h1>Index</h1>') async def hello(request): await asyncio.sleep(0.5) text = '<h1>hello, %s!</h1>' % request.match_info['name'] return web.Response(body=text.encode('utf-8')) async def init(loop): app = web.Application(loop=loop) app.router.add_route('GET', '/', index) app.router.add_route('GET', '/hello/{name}', hello) srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000) print('Server started at http://127.0.0.1:8000...') return srv loop = asyncio.get_event_loop() loop.run_until_complete(init(loop)) loop.run_forever() ``` ## 例1,[使用asyncio 和 aiohttp 包下載國旗][1] ## 例2,[使用python-aiohttp爬取網易云音樂,今日頭條,搭建微信公眾平臺][2] [1]: https://gitee.com/nixi8/Python/tree/master/script [2]: https://github.com/SigalHu/WeiXin
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看