# 協程(微線程,纖程)
[TOC]
---
## 基礎概念
多線程和多進程的模型雖然解決了并發問題,但是系統不能無上限地增加線程。由于系統切換線程的開銷也很大,所以,一旦線程數量過多,CPU的時間就花在線程切換上了,真正運行代碼的時間就少了,結果導致性能嚴重下降。由于我們要解決的問題是CPU高速執行能力和IO設備的龜速嚴重不匹配,多線程和多進程只是解決這一問題的一種方法。
另一種解決IO問題的方法是異步IO。當代碼需要執行一個耗時的IO操作時,它只發出IO指令,并不等待IO結果,然后就去執行其他代碼了。一段時間后,當IO返回結果時,再通知CPU進行處理。
### 同步IO模型和異步IO模型
```python
# 同步IO模型:
do_some_code()
f = open('/path/to/file', 'r')
r = f.read() # <== 線程停在此處等待IO操作結果
# IO操作完成后線程才能繼續執行:
do_some_code(r)
# 異步io模型
loop = get_event_loop()
# 需要一個消息循環
while True:
# 主線程不斷地重復“讀取消息-處理消息”這一過程
event = loop.get_event()
process_event(event)
```
在“發出IO請求”到收到“IO完成”的這段時間里,同步IO模型下,主線程只能掛起,但異步IO模型下,主線程并沒有休息,而是在消息循環中繼續處理其他消息。這樣,**在異步IO模型下,一個線程就可以同時處理多個IO請求,并且沒有切換線程的操作。對于大多數IO密集型的應用程序,使用異步IO將大大提升系統的多任務處理能力。**
協程是實現異步IO的高級形式,又稱微線程,纖程。英文名Coroutine。
子程序,或者稱為函數,在所有語言中都是層級調用,比如A調用B,B在執行過程中又調用了C,C執行完畢返回,B執行完畢返回,最后是A執行完畢。
子程序調用總是一個入口,一次返回,調用順序是明確的。而協程的調用和子程序不同。協程看上去也是子程序,但執行過程中,在子程序內部可中斷,然后轉而執行別的子程序,在適當的時候再返回來接著執行。
```
def A():
print('1')
print('2')
print('3')
def B():
print('x')
print('y')
print('z')
# 協程執行結果,可能如下
1
2
x
y
3
z
```
看起來A、B的執行有點像多線程,但協程的特點在于是一個線程執行,那和多線程比,協程有何優勢?
主要行效率比多線程高很多,主要表現一下兩點:
1. 不用切換線程,而是由程序自身控制,因此,沒有線程切換的開銷,和多線程比,線程數量越多,協程的性能優勢就越明顯。
2. 不需要多線程的鎖機制,因為只有一個線程,也不存在同時寫變量沖突,在協程中控制共享資源不加鎖,只需要判斷狀態就好了。
因為協程是一個線程執行,那怎么利用多核CPU呢?最簡單的方法是多進程+協程,既充分利用多核,又充分發揮協程的高效率,可獲得極高的性能。
Python對協程的支持是通過generator實現的。
在generator中,我們不但可以通過for循環來迭代,還可以不斷調用next()函數獲取由yield語句返回的下一個值。Python的yield不但可以返回一個值,它還可以接收調用者發出的參數。
```python
import inspect
def consumer():
r = ''
while True:
# 3. 通過yield拿到消息n(最開始send進來的n為none,不返回只做啟動用)
# yield關鍵字右邊可以不需要加表達式(yield默認返回None)
n = yield r
if not n:
return
# 4. 拿到n之后進行處理
print('[CONSUMER] Consuming %s...' % n)
# 5. 處理完成,再下個循環又通過yield返回
r = '200 OK'
def produce(c):
# 1. 調用c.send(None)啟動生成器;
# GEN_CREATED: 等待開始執行
print(inspect.getgeneratorstate(c))
c.send(None)
n = 0
while n < 3:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
# 2. 一旦生產了東西,通過c.send(n)切換到consumer執行
r = c.send(n)
print(inspect.getgeneratorstate(c))
# 6. 得到consumer處理的結果,再通過下一個循環,繼續生產下一條消息
print('[PRODUCER] Consumer return: %s' % r)
# 在close前,狀態 都是 GEN_SUSPENDED # 在yield表達式處暫停
print(inspect.getgeneratorstate(c))
# 7. produce決定不生產了,通過c.close()關閉consumer,整個過程結束。
c.close()
# close后 狀態為GEN_CLOSED # 執行結束
print(inspect.getgeneratorstate(c))
c = consumer()
produce(c)
```
> 整個流程無鎖,由一個線程執行,`produce`和`consumer`協作完成任務,所以稱為“協程”,而非線程的搶占式多任務。
### 協程生成器的基本行為
協程有四個狀態,可以使用`inspect.getgeneratorstate(...)`函數確定:
GEN_CREATED # 等待開始執行
GEN_RUNNING # 解釋器正在執行(只有在多線程應用中才能看到這個狀態)
GEN_SUSPENDED # 在yield表達式處暫停
GEN_CLOSED # 執行結束
### 生成器api
1. `.send(value)`方法,生成器可以使用`.send(...)`方法發送數據,發送的數據會成為生成器函數中yield表達式的值。如上列中的n和r
2. `.throw(...)`方法,讓調用方拋出異常,在生成器中處理
3. `.close()`方法,終止生成器
## asyncio
asyncio是Python 3.4版本引入的標準庫,直接內置了對異步IO的支持,asyncio的編程模型就是一個消息循環。我們從asyncio模塊中直接獲取一個EventLoop的引用,然后把需要執行的協程扔到EventLoop中執行,就實現了異步IO。
### @asyncio.coroutine
用asyncio提供的@asyncio.coroutine可以把一個generator標記為coroutine類型,然后在coroutine內部用yield from調用另一個coroutine實現異步操作。
```python
import asyncio
import threading
# @asyncio.coroutine把一個generator標記為coroutine類型
@asyncio.coroutine
def baby(num):
print('baby %s sleep! (%s)' % (num,threading.currentThread()))
# 異步調用asyncio.sleep(2)生成器: 假設是一個耗時2秒的IO操作,在此期間,主線程并未等待,而是去執行EventLoop中其他可以執行的coroutine了,因此可以實現并發執行。
yield from asyncio.sleep(2)
print('baby %s week up! (%s)' % (num,threading.currentThread()))
# 獲取EventLoop: 事件循環對象
loop = asyncio.get_event_loop()
tasks = [baby(1), baby(2), baby(3)]
# 把上面coroutine扔到EventLoop中執行
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
'''
baby 1 sleep! (<_MainThread(MainThread, started 29028)>)
baby 2 sleep! (<_MainThread(MainThread, started 29028)>)
baby 3 sleep! (<_MainThread(MainThread, started 29028)>)
# (暫停約2秒,并且是在同一線程里面,實現了并發)
baby 1 week up! (<_MainThread(MainThread, started 29028)>)
baby 2 week up! (<_MainThread(MainThread, started 29028)>)
baby 3 week up! (<_MainThread(MainThread, started 29028)>)
1. baby(1)執行到yield,線程不會等待asyncio.sleep(),而是直接中斷并執行下一個消息循環baby(2)
2. baby(2)執行到yield,線程不會等待asyncio.sleep(),而是直接中斷并執行下一個消息循環baby(3)
3. baby(3)執行到yield,線程不會等待asyncio.sleep(),而是直接中斷并執行消息循環baby(1),至此所有操作都是以極快的時間完成的,花費了極少時間,此時三個baby同時都在睡眠,(asyncio.sleep)
4. 等待baby(1)睡眠完成,此時幾乎同時其他baby也的睡眠也結束了,所以接著執行各個baby的打印wake up操作.結束整個消息循環,run_until_complete結束.
'''
```
**用asyncio的異步網絡連接來獲取sina、sohu和163的網站首頁**
```python
import asyncio
@asyncio.coroutine
def wget(host):
print('wget %s...' % host)
connect = asyncio.open_connection(host, 80)
reader, writer = yield from connect
header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
writer.write(header.encode('utf-8'))
# 刷新底層傳輸的寫緩沖區。也就是把需要發送出去的數據,從緩沖區發送出去。沒有手工刷新,asyncio為你自動刷新了。當執行到reader.readline()時,asyncio知道應該把發送緩沖區的數據發送出去了。
yield from writer.drain()
while True:
line = yield from reader.readline()
if line == b'\r\n':
break
print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
# Ignore the body, close the socket
writer.close()
loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
```
### async/await
async和await是針對coroutine的新語法,要使用新的語法,只需要做兩步簡單的替換:
1. 把@asyncio.coroutine替換為async;
2. 把yield from替換為await。
使用async可以定義協程對象,使用await可以針對耗時的操作進行掛起,就像生成器里的yield一樣,函數讓出控制權。**協程遇到await,事件循環將會掛起該協程,執行別的協程,直到其他的協程也掛起或者執行完畢,再進行下一個協程的執行**。
```python
import asyncio
import threading
async def baby(num):
print('baby %s sleep! (%s)' % (num,threading.currentThread()))
await asyncio.sleep(1)
print('baby %s week up! (%s)' % (num,threading.currentThread()))
loop = asyncio.get_event_loop()
# ???? 執行完的順序讓人疑惑
tasks = [baby(2), baby(1), baby(3),baby(4),baby(5)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
```
> tips: await 和 yield from 可以理解為 “不等了” (主線程是一個事件循環,執行到await,就“我不等了,您慢慢執行,我先走一步,好了再給我說”)
### 綁定回調
```python
import time
import asyncio
now = lambda : time.time()
async def do_some_work(x):
print('Waiting: ', x)
return 'Done after {}s'.format(x)
def callback(future):
print('Callback: ', future.result())
start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
loop.run_until_complete(task)
print('TIME: ', now() - start)
```
**利用future對象回調別的函數**
?? future對象的特性,以下代碼不太懂 ??
```python
import asyncio
import functools
def callback(future, n):
print('{}: future done: {}'.format(n, future.result()))
async def register_callbacks(all_done):
print('registering callbacks on future')
# 偏函數配合回調,all_done是future對象
all_done.add_done_callback(functools.partial(callback, n=1))
all_done.add_done_callback(functools.partial(callback, n=2))
async def main(all_done):
# 到此同步中斷,異步執行回調函數注冊
await register_callbacks(all_done)
print('setting result of future')
all_done.set_result('the result')
event_loop = asyncio.get_event_loop()
try:
all_done = asyncio.Future()
event_loop.run_until_complete(main(all_done))
finally:
event_loop.close()
'''
registering callbacks on future
setting result of future
1: future done: the result
2: future done: the result
'''
```
### 多線程與asyncio對比
**多線程**
```python
# sinner_thread.py
import threading
import itertools
import time
import sys
# 這個類定義一個可變對象,用于從外部控制線程
class Signal:
go = True
# 這個函數會在單獨的線程中運行,signal 參數是前邊定義的Signal類的實例
def spin(msg, signal):
write, flush = sys.stdout.write, sys.stdout.flush
# itertools.cycle 函數從指定的序列中反復不斷地生成元素
for char in itertools.cycle('|/-\\'):
status = char + ' ' + msg
write(status)
flush()
write('\x08' * len(status)) # 使用退格符把光標移回行首
time.sleep(.1) # 每 0.1 秒刷新一次
if not signal.go: # 如果 go屬性不是 True,退出循環
break
write(' ' * len(status) + '\x08' * len(status)) # 使用空格清除狀態消息,把光標移回開頭
def slow_function(): # 模擬耗時操作
# 假裝等待I/O一段時間
time.sleep(20) # 調用sleep 會阻塞主線程,這么做事為了釋放GIL,創建從屬線程
return 42
# 這個函數設置從屬線程,顯示線程對象,運行耗時計算,最后殺死進程
def supervisor():
signal = Signal()
spinner = threading.Thread(target=spin,
args=('thinking!', signal))
print('spinner object:', spinner) # 顯示線程對象 輸出 spinner object: <Thread(Thread-1, initial)>
spinner.start() # 啟動從屬進程
result = slow_function() # 運行slow_function 行數,阻塞主線程。同時從屬線程以動畫形式旋轉指針
# python 并沒有提供終止線程的API,所以若想關閉線程,必須給線程發送消息。這里我們使用signal.go 屬性:在主線程中把它設置為False后,spinner 線程會接收到,然后退出
signal.go = False
spinner.join() # 等待spinner 線程結束
return result
def main():
result = supervisor()
print('Answer', result)
if __name__ == '__main__':
main()
```
**協程**
```python
# spinner_asyncio.py
# 通過協程以動畫的形式顯示文本式旋轉指針
import asyncio
import itertools
import sys
async def spin(msg):
write, flush = sys.stdout.write, sys.stdout.flush
for char in itertools.cycle('|/-\\'): # itertools.cycle 函數從指定的序列中反復不斷地生成元素
status = char + ' ' + msg
write(status)
flush()
write('\x08' * len(status)) # 使用退格符把光標移回行首
try:
# 使用 yield from asyncio.sleep(0.1) 代替 time.sleep(.1), 這樣的休眠不會阻塞事件循環
# 除非想阻塞主線程,從而凍結事件循環或整個應用,否則不要再 asyncio 協程中使用 time.sleep().如果協程需要在一段時間內什么都不做,應該使用 yield from asyncio.sleep(DELAY)
# 此處相當于另一協程
await asyncio.sleep(0.1)
# 如果 spin 函數蘇醒后拋出 asyncio.CancelledError 異常,其原因是發出了取消請求
except asyncio.CancelledError as e:
print(str(e))
break
write(' ' * len(status) + '\x08' * len(status)) # 使用空格清除狀態消息,把光標移回開頭
async def slow_function(): # 5 現在此函數是協程,使用休眠假裝進行I/O 操作時,使用 yield from 繼續執行事件循環
# 假裝等待I/O一段時間
await asyncio.sleep(3) # 此表達式把控制權交給主循環,在休眠結束后回復這個協程
return 42
# ?? 不能改為asynic supervisor 否則asyncio.async會報錯 ,已找到原因已被asyncio.ensure_future替代??
async def supervisor():
spinner = asyncio.ensure_future(spin('thinking!')) # asyncio.async() 函數排定協程的運行時間,使用一個 Task 對象包裝spin 協程,并立即返回
print('spinner object:', spinner) # Task 對象,輸出類似 spinner object: <Task pending coro=<spin() running at spinner_asyncio.py:6>>
# 驅動slow_function() 函數,結束后,獲取返回值。同事事件循環繼續運行,
# 因為slow_function 函數最后使用yield from asyncio.sleep(3) 表達式把控制權交給主循環
result = await slow_function()
# Task 對象可以取消;取消后會在協程當前暫停的yield處拋出 asyncio.CancelledError 異常
# 協程可以捕獲這個異常,也可以延遲取消,甚至拒絕取消
spinner.cancel()
return result
def main():
loop = asyncio.get_event_loop() # 獲取事件循環引用
# 驅動supervisor 協程,讓它運行完畢;這個協程的返回值是這次調用的返回值
result = loop.run_until_complete(supervisor())
loop.close()
print('Answer', result)
if __name__ == '__main__':
main()
```
**分析兩段代碼**
1. Task對象不由自己動手實例化,而是通過把協程傳給 asyncio.async(...) 函數或 loop.create_task(...) 方法獲取Task 對象已經排定了運行時間;而Thread 實例必須調用start方法,明確告知它運行
2. 在線程版supervisor函數中,slow_function 是普通的函數,由線程直接調用,而異步版的slow_function 函數是協程,由yield from 驅動。
3. 沒有API能從外部終止線程,因為線程隨時可能被中斷。而協程如果想終止任務,可以使用Task.cancel() 實例方法,在協程內部拋出CancelledError 異常。協程可以在暫停的yield 處捕獲這個異常,處理終止請求
4. supervisor 協程必須在main 函數中由loop.run_until_complete 方法執行。
5. **協程和線程相比關鍵的一個優點是**線程必須記住保留鎖,去保護程序中的重要部分,防止多步操作在執行的過程中中斷,而協程默認會做好保護,我們必須顯式產出(使用yield 或 yield from 交出控制權)才能讓程序的余下部分運行。
#### asyncio.Future:故意不阻塞
asyncio.Future 類與 concurrent.futures.Future 類的接口基本一致,不過實現方式不同,不可互換。在 concurrent.futures.Future 中,future只是調度執行某物的結果。在 asyncio 包中,BaseEventLoop.create_task(...) 方法接收一個協程,排定它的運行時間,然后返回一個asyncio.Task 實例(也是asyncio.Future 類的實例,因為 Task 是 Future 的子類,用于包裝協程。(在 concurrent.futures.Future 中,類似的操作是Executor.submit(...))。
與concurrent.futures.Future 類似,asyncio.Future 類也提供了:
* `.done()` 返回布爾值,表示Future 是否已經執行
* `.add_done_callback()` 這個方法只有一個參數,類型是可調用對象,Future運行結束后會回調這個對象。
* `.result()` 這個方法沒有參數,因此不能指定超時時間。 如果調用 .result() 方法時期還沒有運行完畢,會拋出 asyncio.InvalidStateError 異常。
### 協程嵌套 (協程的常用方式)
```python
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
'''
# wait 有timeout參數
dones, pendings = await asyncio.wait(tasks)
for task in dones:
print('Task ret: ', task.result())
'''
# 如果使用的是 asyncio.gather創建協程對象,那么await的返回值就是協程運行的結果。
# 使用asyncio.wait(tasks)返回的順序有點難以理解,但使用asyncio.gather(*tasks)返回值的順序就好理解得多
results = await asyncio.gather(*tasks)
for result in results:
print('Task ret: ', result)
start = now()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print('TIME: ', now() - start)
```
**拋出返回值到run_until_complete:**
```python
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
return await asyncio.gather(*tasks)
start = now()
loop = asyncio.get_event_loop()
# 也可以直接返回到run_until_complete處理協程結果
results = loop.run_until_complete(main())
for result in results:
print('Task ret: ', result)
print('TIME: ', now() - start)
```
***使用as_completed**
```python
async def main(num):
tasks = []
i = 1
while i <= num :
tasks.append(asyncio.ensure_future(do_some_work(i)))
i += 1
for task in asyncio.as_completed(tasks):
result = await task
print('Task ret: {}'.format(result))
start = now()
loop = asyncio.get_event_loop()
done = loop.run_until_complete(main(10))
print('TIME: ', now() - start)
```
### 協程停止
future對象有幾個狀態:Pending,Running,Done,Cancelled. 創建future的時候,task為pending,事件循環調用執行的時候當然就是running,調用完畢自然就是done,如果需要停止事件循環,就需要先把task取消。可以使用asyncio.Task獲取事件循環的task
```python
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(main(5))
except KeyboardInterrupt as e:
for task in asyncio.Task.all_tasks():
# 啟動事件循環之后,馬上ctrl+c,會觸發run_until_complete的執行異常 KeyBorardInterrupt。然后通過循環asyncio.Task取消future。
print(task)
# 返回true或false,已執行的返回false
print(task.cancel())
#loop stop之后還需要再次開啟事件循環,最后再close,不然還會拋出異常:
# 拋出異常后要重新啟動循環
loop.stop()
loop.run_forever()
finally:
loop.close()
```
**批量停止**
```python
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(main(5))
try:
loop.run_until_complete(task)
except KeyboardInterrupt as e:
print(asyncio.Task.all_tasks())
print('-------------------')
# 批量停止,如果全部停止成功就直接返回true,與上列不同
print(asyncio.gather(*asyncio.Task.all_tasks()).cancel())
loop.stop()
loop.run_forever()
finally:
loop.close()
```
### 不同線程的事件循環
很多時候,我們的事件循環用于注冊協程,而有的協程需要動態的添加到事件循環中。一個簡單的方式就是使用多線程。當前線程創建一個事件循環,然后在新建一個線程,在新線程中啟動事件循環。當前線程不會被block。
```python
import asyncio
from threading import Thread
import time
now = lambda: time.time()
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
def more_work(x):
print('More work {}'.format(x))
time.sleep(x)
print('Finished more work {}'.format(x))
start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
new_loop.call_soon_threadsafe(more_work, 6)
new_loop.call_soon_threadsafe(more_work, 3)
# 這里的計時沒有意義,因為more_work具體的執行是在新的兩個線程里面
print('TIME: {}'.format(now() - start))
```
啟動上述代碼之后,當前線程不會被block,新線程中會按照順序執行call_soon_threadsafe方法注冊的more_work方法,后者因為time.sleep操作是同步阻塞的,因此運行完畢more_work需要大致6 + 3
### 新線程協程
```python
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
async def do_some_work(x):
print('Waiting {}'.format(x))
await asyncio.sleep(x)
print('Done after {}s'.format(x))
start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)
asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)
print('TIME: {}'.format(time.time() - start))
```
上述的例子,主線程中創建一個new_loop,然后在另外的兩個子線程中開啟一個無限事件循環。主線程通過run_coroutine_threadsafe新注冊協程對象。這樣就能在子線程中進行事件循環的并發操作,同時主線程又不會被block。一共執行的時間大概在6s左右。
### master-worker主從模式
對于并發任務,通常是用生成消費模型,對隊列的處理可以使用類似master-worker的方式,master主要用戶獲取隊列的msg,worker用戶處理消息。
為了簡單起見,并且協程更適合單線程的方式,我們的主線程用來監聽隊列,子線程用于處理隊列。這里使用redis的隊列。主線程中有一個是無限循環,用戶消費隊列。
```python
while True:
task = rcon.rpop("queue")
if not task:
time.sleep(1)
continue
asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)
```
### 停止子線程
如果一切正常,那么上面的例子很完美。可是,需要停止程序,直接ctrl+c,會拋出KeyboardInterrupt錯誤,我們修改一下主循環:
```python
try:
while True:
task = rcon.rpop("queue")
if not task:
time.sleep(1)
continue
asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)
except KeyboardInterrupt as e:
print(e)
new_loop.stop()
```
可是實際上并不好使,雖然主線程try了KeyboardInterrupt異常,但是子線程并沒有退出,為了解決這個問題,可以設置子線程為守護線程,這樣當主線程結束的時候,子線程也隨機退出。
```python
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.setDaemon(True) # 設置子線程為守護線程
t.start()
try:
while True:
# print('start rpop')
task = rcon.rpop("queue")
if not task:
time.sleep(1)
continue
asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)
except KeyboardInterrupt as e:
print(e)
new_loop.stop()
```
線程停止程序的時候,主線程退出后,子線程也隨機退出才了,并且停止了子線程的協程任務。
```python
try:
while True:
# 用brpop方法,會block住task,如果主線程有消息,才會消費。
# 這種方式更適合隊列消費,不用上面的要停頓一秒
_, task = rcon.brpop("queue")
asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)
except Exception as e:
print('error', e)
new_loop.stop()
finally:
pass
```
### 協程消費
主線程用于監聽隊列,然后子線程的做事件循環的worker是一種方式。還有一種方式實現這種類似master-worker的方案。即把監聽隊列的無限循環挪進協程中。程序初始化就創建若干個協程,實現類似并行的效果。一般這個方案就可以了
```python
import time
import asyncio
import redis
now = lambda : time.time()
# 最多開多少個協程
MAX_COROUTINES = 10
def get_redis():
connection_pool = redis.ConnectionPool(host='127.0.0.1', db=1,port=6379)
# connection_pool = redis.ConnectionPool(host='172.28.3.24', db=1,port=6379)
return redis.Redis(connection_pool=connection_pool)
rcon = get_redis()
async def worker():
print('Start worker')
while True:
start = now()
task = rcon.rpop("queue")
if not task:
await asyncio.sleep(1)
continue
print('Wait ', int(task))
await asyncio.sleep(int(task))
print('Done ', task, now() - start)
def main():
i = 0
while i < MAX_COROUTINES:
asyncio.ensure_future(worker())
i += 1
loop = asyncio.get_event_loop()
try:
loop.run_forever()
except KeyboardInterrupt as e:
print(asyncio.gather(*asyncio.Task.all_tasks()).cancel())
loop.stop()
loop.run_forever()
finally:
loop.close()
if __name__ == '__main__':
main()
```
## aiohttp
asyncio可以實現單線程并發IO操作。如果僅用在客戶端,發揮的威力不大。如果把asyncio用在服務器端,例如Web服務器,由于HTTP連接就是IO操作,因此可以用單線程+coroutine實現多用戶的高并發支持。
asyncio實現了TCP、UDP、SSL等協議,aiohttp則是基于asyncio實現的HTTP框架。
### 實現web服務器
```python
import asyncio
from aiohttp import web
async def index(request):
await asyncio.sleep(0.5)
return web.Response(body=b'<h1>Index</h1>')
async def hello(request):
await asyncio.sleep(0.5)
text = '<h1>hello, %s!</h1>' % request.match_info['name']
return web.Response(body=text.encode('utf-8'))
async def init(loop):
app = web.Application(loop=loop)
app.router.add_route('GET', '/', index)
app.router.add_route('GET', '/hello/{name}', hello)
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
print('Server started at http://127.0.0.1:8000...')
return srv
loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()
```
## 例1,[使用asyncio 和 aiohttp 包下載國旗][1]
## 例2,[使用python-aiohttp爬取網易云音樂,今日頭條,搭建微信公眾平臺][2]
[1]: https://gitee.com/nixi8/Python/tree/master/script
[2]: https://github.com/SigalHu/WeiXin