## 問題
You want to implement concurrency using generators (coroutines) as an alternative tosystem threads. This is sometimes known as user-level threading or green threading.
## 解決方案
To implement your own concurrency using generators, you first need a fundamentalinsight concerning generator functions and the yield statement. Specifically, the fun‐damental behavior of yield is that it causes a generator to suspend its execution. Bysuspending execution, it is possible to write a scheduler that treats generators as a kindof “task” and alternates their execution using a kind of cooperative task switching.To illustrate this idea, consider the following two generator functions using a simpleyield:
# Two simple generator functionsdef countdown(n):
> while n > 0:print(‘T-minus', n)yieldn -= 1> print(‘Blastoff!')
def countup(n):
x = 0while x < n:
> print(‘Counting up', x)yieldx += 1
These functions probably look a bit funny using yield all by itself. However, considerthe following code that implements a simple task scheduler:
from collections import deque
class TaskScheduler:def __init__(self):self._task_queue = deque()def new_task(self, task):
‘''Admit a newly started task to the scheduler
‘''self._task_queue.append(task)
def run(self):
‘''Run until there are no more tasks‘''while self._task_queue:
> > task = self._task_queue.popleft()try:
> > # Run until the next yield statementnext(task)self._task_queue.append(task)
except StopIteration:# Generator is no longer executingpass
# Example usesched = TaskScheduler()sched.new_task(countdown(10))sched.new_task(countdown(5))sched.new_task(countup(15))sched.run()
In this code, the TaskScheduler class runs a collection of generators in a round-robinmanner—each one running until they reach a yield statement. For the sample, theoutput will be as follows:
T-minus 10T-minus 5Counting up 0T-minus 9T-minus 4Counting up 1T-minus 8T-minus 3Counting up 2T-minus 7T-minus 2...
At this point, you’ve essentially implemented the tiny core of an “operating system” ifyou will. Generator functions are the tasks and the yield statement is how tasks signalthat they want to suspend. The scheduler simply cycles over the tasks until none are leftexecuting.In practice, you probably wouldn’t use generators to implement concurrency for some‐thing as simple as shown. Instead, you might use generators to replace the use of threadswhen implementing actors (see Recipe 12.10) or network servers.
The following code illustrates the use of generators to implement a thread-free versionof actors:
from collections import deque
class ActorScheduler:def __init__(self):self._actors = { } # Mapping of names to actorsself._msg_queue = deque() # Message queuedef new_actor(self, name, actor):‘''Admit a newly started actor to the scheduler and give it a name‘''self._msg_queue.append((actor,None))self._actors[name] = actordef send(self, name, msg):
‘''Send a message to a named actor‘''actor = self._actors.get(name)if actor:
> self._msg_queue.append((actor,msg))
def run(self):
‘''Run as long as there are pending messages.‘''while self._msg_queue:
> > actor, msg = self._msg_queue.popleft()try:
> > actor.send(msg)
except StopIteration:pass
# Example useif __name__ == ‘__main__':
> def printer():while True:msg = yieldprint(‘Got:', msg)def counter(sched):while True:> # Receive the current countn = yieldif n == 0:
> > break
> # Send to the printer tasksched.send(‘printer', n)# Send the next count to the counter task (recursive)
> sched.send(‘counter', n-1)
> sched = ActorScheduler()# Create the initial actorssched.new_actor(‘printer', printer())sched.new_actor(‘counter', counter(sched))
> # Send an initial message to the counter to initiatesched.send(‘counter', 10000)sched.run()
The execution of this code might take a bit of study, but the key is the queue of pendingmessages. Essentially, the scheduler runs as long as there are messages to deliver. Aremarkable feature is that the counter generator sends messages to itself and ends upin a recursive cycle not bound by Python’s recursion limit.Here is an advanced example showing the use of generators to implement a concurrentnetwork application:
from collections import dequefrom select import select
# This class represents a generic yield event in the schedulerclass YieldEvent:
> def handle_yield(self, sched, task):passdef handle_resume(self, sched, task):pass
# Task Schedulerclass Scheduler:
> def __init__(self):self._numtasks = 0 # Total num of tasksself._ready = deque() # Tasks ready to runself._read_waiting = {} # Tasks waiting to readself._write_waiting = {} # Tasks waiting to write> # Poll for I/O events and restart waiting tasksdef _iopoll(self):
> > rset,wset,eset = select(self._read_waiting,self._write_waiting,[])for r in rset:evt, task = self._read_waiting.pop(r)evt.handle_resume(self, task)for w in wset:evt, task = self._write_waiting.pop(w)evt.handle_resume(self, task)
def new(self,task):> ‘''Add a newly started task to the scheduler‘'‘
> self._ready.append((task, None))self._numtasks += 1
def add_ready(self, task, msg=None):‘''Append an already started task to the ready queue.msg is what to send into the task when it resumes.‘''self._ready.append((task, msg))> # Add a task to the reading setdef _read_wait(self, fileno, evt, task):
> > self._read_waiting[fileno] = (evt, task)
> # Add a task to the write setdef _write_wait(self, fileno, evt, task):
> > self._write_waiting[fileno] = (evt, task)
def run(self):> ‘''Run the task scheduler until there are no tasks‘''while self._numtasks:
> > if not self._ready:self._iopoll()> > task, msg = self._ready.popleft()try:
> > > > > > # Run the coroutine to the next yieldr = task.send(msg)if isinstance(r, YieldEvent):
> > > > r.handle_yield(self, task)
else:raise RuntimeError(‘unrecognized yield event')
except StopIteration:self._numtasks -= 1
# Example implementation of coroutine-based socket I/Oclass ReadSocket(YieldEvent):
> def __init__(self, sock, nbytes):self.sock = sockself.nbytes = nbytesdef handle_yield(self, sched, task):sched._read_wait(self.sock.fileno(), self, task)def handle_resume(self, sched, task):data = self.sock.recv(self.nbytes)sched.add_ready(task, data)
class WriteSocket(YieldEvent):def __init__(self, sock, data):self.sock = sockself.data = data
def handle_yield(self, sched, task):
> sched._write_wait(self.sock.fileno(), self, task)
def handle_resume(self, sched, task):nsent = self.sock.send(self.data)sched.add_ready(task, nsent)class AcceptSocket(YieldEvent):def __init__(self, sock):self.sock = sockdef handle_yield(self, sched, task):sched._read_wait(self.sock.fileno(), self, task)def handle_resume(self, sched, task):r = self.sock.accept()sched.add_ready(task, r)
# Wrapper around a socket object for use with yieldclass Socket(object):
> def __init__(self, sock):self._sock = sockdef recv(self, maxbytes):return ReadSocket(self._sock, maxbytes)def send(self, data):return WriteSocket(self._sock, data)def accept(self):return AcceptSocket(self._sock)def __getattr__(self, name):return getattr(self._sock, name)
if __name__ == ‘__main__':
from socket import socket, AF_INET, SOCK_STREAMimport time
# Example of a function involving generators. This should# be called using line = yield from readline(sock)def readline(sock):
> > chars = []while True:
> > > > c = yield sock.recv(1)if not c:
> > > break
> > chars.append(c)if c == b'n':
> > > break
> return b'‘.join(chars)
# Echo server using generatorsclass EchoServer:
> def __init__(self,addr,sched):self.sched = schedsched.new(self.server_loop(addr))def server_loop(self,addr):> s = Socket(socket(AF_INET,SOCK_STREAM))
> s.bind(addr)s.listen(5)while True:
> > c,a = yield s.accept()print(‘Got connection from ‘, a)self.sched.new(self.client_handler(Socket(c)))
def client_handler(self,client):while True:> line = yield from readline(client)if not line:
> > break
> line = b'GOT:' + linewhile line:
> > nsent = yield client.send(line)line = line[nsent:]
> client.close()print(‘Client closed')
sched = Scheduler()EchoServer((‘',16000),sched)sched.run()
This code will undoubtedly require a certain amount of careful study. However, it isessentially implementing a small operating system. There is a queue of tasks ready torun and there are waiting areas for tasks sleeping for I/O. Much of the scheduler involvesmoving tasks between the ready queue and the I/O waiting area.
## 討論
When building generator-based concurrency frameworks, it is most common to workwith the more general form of yield:
def some_generator():...result = yield data...
Functions that use yield in this manner are more generally referred to as “coroutines.”Within a scheduler, the yield statement gets handled in a loop as follows:
f = some_generator()
# Initial result. Is None to start since nothing has been computedresult = Nonewhile True:
> try:data = f.send(result)result = ... do some calculation ...except StopIteration:break
The logic concerning the result is a bit convoluted. However, the value passed to send()defines what gets returned when the yield statement wakes back up. So, if a yield isgoing to return a result in response to data that was previously yielded, it gets returnedon the next send() operation. If a generator function has just started, sending in a valueof None simply makes it advance to the first yield statement.In addition to sending in values, it is also possible to execute a close() method on agenerator. This causes a silent GeneratorExit exception to be raised at the yield state‐ment, which stops execution. If desired, a generator can catch this exception and per‐form cleanup actions. It’s also possible to use the throw() method of a generator to raisean arbitrary execution at the yield statement. A task scheduler might use this to com‐municate errors into running generators.The yield from statement used in the last example is used to implement coroutinesthat serve as subroutines or procedures to be called from other generators. Essentially,control transparently transfers to the new function. Unlike normal generators, a func‐tion that is called using yield from can return a value that becomes the result of theyield from statement. More information about yield from can be found in PEP 380.Finally, if programming with generators, it is important to stress that there are somemajor limitations. In particular, you get none of the benefits that threads provide. Forinstance, if you execute any code that is CPU bound or which blocks for I/O, it willsuspend the entire task scheduler until the completion of that operation. To work aroundthis, your only real option is to delegate the operation to a separate thread or processwhere it can run independently. Another limitation is that most Python libraries havenot been written to work well with generator-based threading. If you take this approach,you may find that you need to write replacements for many standard library functions.As basic background on coroutines and the techniques utilized in this recipe, see PEP342 and “A Curious Course on Coroutines and Concurrency”.PEP 3156 also has a modern take on asynchronous I/O involving coroutines. In practice,it is extremelyunlikely that you will write a low-level coroutine scheduler yourself.However, ideas surrounding coroutines are the basis for many popular libraries, in‐cluding gevent, greenlet, Stackless Python, and similar projects.
- Copyright
- 前言
- 第一章:數據結構和算法
- 1.1 解壓序列賦值給多個變量
- 1.2 解壓可迭代對象賦值給多個變量
- 1.3 保留最后N個元素
- 1.4 查找最大或最小的N個元素
- 1.5 實現一個優先級隊列
- 1.6 字典中的鍵映射多個值
- 1.7 字典排序
- 1.8 字典的運算
- 1.9 查找兩字典的相同點
- 1.10 刪除序列相同元素并保持順序
- 1.11 命名切片
- 1.12 序列中出現次數最多的元素
- 1.13 通過某個關鍵字排序一個字典列表
- 1.14 排序不支持原生比較的對象
- 1.15 通過某個字段將記錄分組
- 1.16 過濾序列元素
- 1.17 從字典中提取子集
- 1.18 映射名稱到序列元素
- 1.19 轉換并同時計算數據
- 1.20 合并多個字典或映射
- 第二章:字符串和文本
- 2.1 使用多個界定符分割字符串
- 2.2 字符串開頭或結尾匹配
- 2.3 用Shell通配符匹配字符串
- 2.4 字符串匹配和搜索
- 2.5 字符串搜索和替換
- 2.6 字符串忽略大小寫的搜索替換
- 2.7 最短匹配模式
- 2.8 多行匹配模式
- 2.9 將Unicode文本標準化
- 2.10 在正則式中使用Unicode
- 2.11 刪除字符串中不需要的字符
- 2.12 審查清理文本字符串
- 2.13 字符串對齊
- 2.14 合并拼接字符串
- 2.15 字符串中插入變量
- 2.16 以指定列寬格式化字符串
- 2.17 在字符串中處理html和xml
- 2.18 字符串令牌解析
- 2.19 實現一個簡單的遞歸下降分析器
- 2.20 字節字符串上的字符串操作
- 第三章:數字日期和時間
- 3.1 數字的四舍五入
- 3.2 執行精確的浮點數運算
- 3.3 數字的格式化輸出
- 3.4 二八十六進制整數
- 3.5 字節到大整數的打包與解包
- 3.6 復數的數學運算
- 3.7 無窮大與NaN
- 3.8 分數運算
- 3.9 大型數組運算
- 3.10 矩陣與線性代數運算
- 3.11 隨機選擇
- 3.12 基本的日期與時間轉換
- 3.13 計算最后一個周五的日期
- 3.14 計算當前月份的日期范圍
- 3.15 字符串轉換為日期
- 3.16 結合時區的日期操作
- 第四章:迭代器與生成器
- 4.1 手動遍歷迭代器
- 4.2 代理迭代
- 4.3 使用生成器創建新的迭代模式
- 4.4 實現迭代器協議
- 4.5 反向迭代
- 4.6 帶有外部狀態的生成器函數
- 4.7 迭代器切片
- 4.8 跳過可迭代對象的開始部分
- 4.9 排列組合的迭代
- 4.10 序列上索引值迭代
- 4.11 同時迭代多個序列
- 4.12 不同集合上元素的迭代
- 4.13 創建數據處理管道
- 4.14 展開嵌套的序列
- 4.15 順序迭代合并后的排序迭代對象
- 4.16 迭代器代替while無限循環
- 第五章:文件與IO
- 5.1 讀寫文本數據
- 5.2 打印輸出至文件中
- 5.3 使用其他分隔符或行終止符打印
- 5.4 讀寫字節數據
- 5.5 文件不存在才能寫入
- 5.6 字符串的I/O操作
- 5.7 讀寫壓縮文件
- 5.8 固定大小記錄的文件迭代
- 5.9 讀取二進制數據到可變緩沖區中
- 5.10 內存映射的二進制文件
- 5.11 文件路徑名的操作
- 5.12 測試文件是否存在
- 5.13 獲取文件夾中的文件列表
- 5.14 忽略文件名編碼
- 5.15 打印不合法的文件名
- 5.16 增加或改變已打開文件的編碼
- 5.17 將字節寫入文本文件
- 5.18 將文件描述符包裝成文件對象
- 5.19 創建臨時文件和文件夾
- 5.20 與串行端口的數據通信
- 5.21 序列化Python對象
- 第六章:數據編碼和處理
- 6.1 讀寫CSV數據
- 6.2 讀寫JSON數據
- 6.3 解析簡單的XML數據
- 6.4 增量式解析大型XML文件
- 6.5 將字典轉換為XML
- 6.6 解析和修改XML
- 6.7 利用命名空間解析XML文檔
- 6.8 與關系型數據庫的交互
- 6.9 編碼和解碼十六進制數
- 6.10 編碼解碼Base64數據
- 6.11 讀寫二進制數組數據
- 6.12 讀取嵌套和可變長二進制數據
- 6.13 數據的累加與統計操作
- 第七章:函數
- 7.1 可接受任意數量參數的函數
- 7.2 只接受關鍵字參數的函數
- 7.3 給函數參數增加元信息
- 7.4 返回多個值的函數
- 7.5 定義有默認參數的函數
- 7.6 定義匿名或內聯函數
- 7.7 匿名函數捕獲變量值
- 7.8 減少可調用對象的參數個數
- 7.9 將單方法的類轉換為函數
- 7.10 帶額外狀態信息的回調函數
- 7.11 內聯回調函數
- 7.12 訪問閉包中定義的變量
- 第八章:類與對象
- 8.1 改變對象的字符串顯示
- 8.2 自定義字符串的格式化
- 8.3 讓對象支持上下文管理協議
- 8.4 創建大量對象時節省內存方法
- 8.5 在類中封裝屬性名
- 8.6 創建可管理的屬性
- 8.7 調用父類方法
- 8.8 子類中擴展property
- 8.9 創建新的類或實例屬性
- 8.10 使用延遲計算屬性
- 8.11 簡化數據結構的初始化
- 8.12 定義接口或者抽象基類
- 8.13 實現數據模型的類型約束
- 8.14 實現自定義容器
- 8.15 屬性的代理訪問
- 8.16 在類中定義多個構造器
- 8.17 創建不調用init方法的實例
- 8.18 利用Mixins擴展類功能
- 8.19 實現狀態對象或者狀態機
- 8.20 通過字符串調用對象方法
- 8.21 實現訪問者模式
- 8.22 不用遞歸實現訪問者模式
- 8.23 循環引用數據結構的內存管理
- 8.24 讓類支持比較操作
- 8.25 創建緩存實例
- 第九章:元編程
- 9.1 在函數上添加包裝器
- 9.2 創建裝飾器時保留函數元信息
- 9.3 解除一個裝飾器
- 9.4 定義一個帶參數的裝飾器
- 9.5 可自定義屬性的裝飾器
- 9.6 帶可選參數的裝飾器
- 9.7 利用裝飾器強制函數上的類型檢查
- 9.8 將裝飾器定義為類的一部分
- 9.9 將裝飾器定義為類
- 9.10 為類和靜態方法提供裝飾器
- 9.11 裝飾器為被包裝函數增加參數
- 9.12 使用裝飾器擴充類的功能
- 9.13 使用元類控制實例的創建
- 9.14 捕獲類的屬性定義順序
- 9.15 定義有可選參數的元類
- 9.16 *args和**kwargs的強制參數簽名
- 9.17 在類上強制使用編程規約
- 9.18 以編程方式定義類
- 9.19 在定義的時候初始化類的成員
- 9.20 利用函數注解實現方法重載
- 9.21 避免重復的屬性方法
- 9.22 定義上下文管理器的簡單方法
- 9.23 在局部變量域中執行代碼
- 9.24 解析與分析Python源碼
- 9.25 拆解Python字節碼
- 第十章:模塊與包
- 10.1 構建一個模塊的層級包
- 10.2 控制模塊被全部導入的內容
- 10.3 使用相對路徑名導入包中子模塊
- 10.4 將模塊分割成多個文件
- 10.5 利用命名空間導入目錄分散的代碼
- 10.6 重新加載模塊
- 10.7 運行目錄或壓縮文件
- 10.8 讀取位于包中的數據文件
- 10.9 將文件夾加入到sys.path
- 10.10 通過字符串名導入模塊
- 10.11 通過導入鉤子遠程加載模塊
- 10.12 導入模塊的同時修改模塊
- 10.13 安裝私有的包
- 10.14 創建新的Python環境
- 10.15 分發包
- 第十一章:網絡與Web編程
- 11.1 作為客戶端與HTTP服務交互
- 11.2 創建TCP服務器
- 11.3 創建UDP服務器
- 11.4 通過CIDR地址生成對應的IP地址集
- 11.5 生成一個簡單的REST接口
- 11.6 通過XML-RPC實現簡單的遠程調用
- 11.7 在不同的Python解釋器之間交互
- 11.8 實現遠程方法調用
- 11.9 簡單的客戶端認證
- 11.10 在網絡服務中加入SSL
- 11.11 進程間傳遞Socket文件描述符
- 11.12 理解事件驅動的IO
- 11.13 發送與接收大型數組
- 第十二章:并發編程
- 12.1 啟動與停止線程
- 12.2 判斷線程是否已經啟動
- 12.3 線程間的通信
- 12.4 給關鍵部分加鎖
- 12.5 防止死鎖的加鎖機制
- 12.6 保存線程的狀態信息
- 12.7 創建一個線程池
- 12.8 簡單的并行編程
- 12.9 Python的全局鎖問題
- 12.10 定義一個Actor任務
- 12.11 實現消息發布/訂閱模型
- 12.12 使用生成器代替線程
- 12.13 多個線程隊列輪詢
- 12.14 在Unix系統上面啟動守護進程
- 第十三章:腳本編程與系統管理
- 13.1 通過重定向/管道/文件接受輸入
- 13.2 終止程序并給出錯誤信息
- 13.3 解析命令行選項
- 13.4 運行時彈出密碼輸入提示
- 13.5 獲取終端的大小
- 13.6 執行外部命令并獲取它的輸出
- 13.7 復制或者移動文件和目錄
- 13.8 創建和解壓壓縮文件
- 13.9 通過文件名查找文件
- 13.10 讀取配置文件
- 13.11 給簡單腳本增加日志功能
- 13.12 給內庫增加日志功能
- 13.13 記錄程序執行的時間
- 13.14 限制內存和CPU的使用量
- 13.15 啟動一個WEB瀏覽器
- 第十四章:測試調試和異常
- 14.1 測試輸出到標準輸出上
- 14.2 在單元測試中給對象打補丁
- 14.3 在單元測試中測試異常情況
- 14.4 將測試輸出用日志記錄到文件中
- 14.5 忽略或者期望測試失敗
- 14.6 處理多個異常
- 14.7 捕獲所有異常
- 14.8 創建自定義異常
- 14.9 捕獲異常后拋出另外的異常
- 14.10 重新拋出最后的異常
- 14.11 輸出警告信息
- 14.12 調試基本的程序崩潰錯誤
- 14.13 給你的程序做基準測試
- 14.14 讓你的程序跑的更快
- 第十五章:C語言擴展
- 15.1 使用ctypes訪問C代碼
- 15.2 簡單的C擴展模塊
- 15.3 一個操作數組的擴展函數
- 15.4 在C擴展模塊中操作隱形指針
- 15.5 從擴張模塊中定義和導出C的API
- 15.6 從C語言中調用Python代碼
- 15.7 從C擴展中釋放全局鎖
- 15.8 C和Python中的線程混用
- 15.9 用WSIG包裝C代碼
- 15.10 用Cython包裝C代碼
- 15.11 用Cython寫高性能的數組操作
- 15.12 將函數指針轉換為可調用對象
- 15.13 傳遞NULL結尾的字符串給C函數庫
- 15.14 傳遞Unicode字符串給C函數庫
- 15.15 C字符串轉換為Python字符串
- 15.16 不確定編碼格式的C字符串
- 15.17 傳遞文件名給C擴展
- 15.18 傳遞已打開的文件給C擴展
- 15.19 從C語言中讀取類文件對象
- 15.20 處理C語言中的可迭代對象
- 15.21 診斷分析代碼錯誤
- 附錄A
- 關于譯者
- Roadmap