## 問題
You have heard about packages based on “event-driven” or “asynchronous” I/O, butyou’re not entirely sure what it means, how it actually works under the covers, or howit might impact your program if you use it.
## 解決方案
At a fundamental level, event-driven I/O is a technique that takes basic I/O operations(e.g., reads and writes) and converts them into events that must be handled by yourprogram. For example, whenever data was received on a socket, it turns into a “receive”event that is handled by some sort of callback method or function that you supply torespond to it. As a possible starting point, an event-driven framework might start witha base class that implements a series of basic event handler methods like this:
class EventHandler:def fileno(self):‘Return the associated file descriptor'raise NotImplemented(‘must implement')def wants_to_receive(self):‘Return True if receiving is allowed'return Falsedef handle_receive(self):‘Perform the receive operation'passdef wants_to_send(self):‘Return True if sending is requested'return Falsedef handle_send(self):‘Send outgoing data'pass
Instances of this class then get plugged into an event loop that looks like this:
import select
def event_loop(handlers):while True:
wants_recv = [h for h in handlers if h.wants_to_receive()]wants_send = [h for h in handlers if h.wants_to_send()]can_recv, can_send, _ = select.select(wants_recv, wants_send, [])for h in can_recv:
> h.handle_receive()
for h in can_send:h.handle_send()
That’s it! The key to the event loop is the select() call, which polls file descriptors foractivity. Prior to calling select(), the event loop simply queries all of the handlers tosee which ones want to receive or send. It then supplies the resulting lists to select().As a result, select() returns the list of objects that are ready to receive or send. Thecorresponding handle_receive() or handle_send() methods are triggered.To write applications, specific instances of EventHandler classes are created. For ex‐ample, here are two simple handlers that illustrate two UDP-based network services:
import socketimport time
class UDPServer(EventHandler):def __init__(self, address):self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)self.sock.bind(address)def fileno(self):return self.sock.fileno()def wants_to_receive(self):return Trueclass UDPTimeServer(UDPServer):def handle_receive(self):msg, addr = self.sock.recvfrom(1)self.sock.sendto(time.ctime().encode(‘ascii'), addr)class UDPEchoServer(UDPServer):def handle_receive(self):msg, addr = self.sock.recvfrom(8192)self.sock.sendto(msg, addr)if __name__ == ‘__main__':handlers = [ UDPTimeServer((‘',14000)), UDPEchoServer((‘',15000)) ]event_loop(handlers)
To test this code, you can try connecting to it from another Python interpreter:
>>> from socket import *
>>> s = socket(AF_INET, SOCK_DGRAM)
>>> s.sendto(b'',('localhost',14000))
0
>>> s.recvfrom(128)
(b'Tue Sep 18 14:29:23 2012', ('127.0.0.1', 14000))
>>> s.sendto(b'Hello',('localhost',15000))
5
>>> s.recvfrom(128)
(b'Hello', ('127.0.0.1', 15000))
>>>
Implementing a TCP server is somewhat more complex, since each client involves theinstantiation of a new handler object. Here is an example of a TCP echo client.
class TCPServer(EventHandler):def __init__(self, address, client_handler, handler_list):self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)self.sock.bind(address)self.sock.listen(1)self.client_handler = client_handlerself.handler_list = handler_listdef fileno(self):return self.sock.fileno()def wants_to_receive(self):return Truedef handle_receive(self):client, addr = self.sock.accept()# Add the client to the event loop's handler listself.handler_list.append(self.client_handler(client, self.handler_list))class TCPClient(EventHandler):def __init__(self, sock, handler_list):self.sock = sockself.handler_list = handler_listself.outgoing = bytearray()def fileno(self):return self.sock.fileno()def close(self):self.sock.close()# Remove myself from the event loop's handler listself.handler_list.remove(self)def wants_to_send(self):return True if self.outgoing else Falsedef handle_send(self):nsent = self.sock.send(self.outgoing)self.outgoing = self.outgoing[nsent:]class TCPEchoClient(TCPClient):def wants_to_receive(self):return Truedef handle_receive(self):
data = self.sock.recv(8192)if not data:
> self.close()
else:self.outgoing.extend(data)if __name__ == ‘__main__':handlers = []handlers.append(TCPServer((‘',16000), TCPEchoClient, handlers))event_loop(handlers)
The key to the TCP example is the addition and removal of clients from the handler list.On each connection, a new handler is created for the client and added to the list. Whenthe connection is closed, each client must take care to remove themselves from the list.If you run this program and try connecting with Telnet or some similar tool, you’ll seeit echoing received data back to you. It should easily handle multiple clients.
## 討論
Virtually all event-driven frameworks operate in a manner that is similar to that shownin the solution. The actual implementation details and overall software architecturemight vary greatly, but at the core, there is a polling loop that checks sockets for activityand which performs operations in response.One potential benefit of event-driven I/O is that it can handle a very large number ofsimultaneous connections without ever using threads or processes. That is, the select() call (or equivalent) can be used to monitor hundreds or thousands of socketsand respond to events occuring on any of them. Events are handled one at a time by theevent loop, without the need for any other concurrency primitives.The downside to event-driven I/O is that there is no true concurrency involved. If anyof the event handler methods blocks or performs a long-running calculation, it blocksthe progress of everything. There is also the problem of calling out to library functionsthat aren’t written in an event-driven style. There is always the risk that some librarycall will block, causing the event loop to stall.Problems with blocking or long-running calculations can be solved by sending the workout to a separate thread or process. However, coordinating threads and processes withan event loop is tricky. Here is an example of code that will do it using the concurrent.futures module:
from concurrent.futures import ThreadPoolExecutorimport os
class ThreadPoolHandler(EventHandler):def __init__(self, nworkers):if os.name == ‘posix':self.signal_done_sock, self.done_sock = socket.socketpair()else:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server.bind((‘127.0.0.1', 0))server.listen(1)self.signal_done_sock = socket.socket(socket.AF_INET,
> socket.SOCK_STREAM)
self.signal_done_sock.connect(server.getsockname())self.done_sock, _ = server.accept()server.close()
self.pending = []self.pool = ThreadPoolExecutor(nworkers)
def fileno(self):return self.done_sock.fileno()
# Callback that executes when the thread is donedef _complete(self, callback, r):
> self.pending.append((callback, r.result()))self.signal_done_sock.send(b'x')
# Run a function in a thread pooldef run(self, func, args=(), kwargs={},*,callback):
> r = self.pool.submit(func, [*](#)args, [**](#)kwargs)r.add_done_callback(lambda r: self._complete(callback, r))
def wants_to_receive(self):return True
# Run callback functions of completed workdef handle_receive(self):
> > # Invoke all pending callback functionsfor callback, result in self.pending:
> > callback(result)self.done_sock.recv(1)
> self.pending = []
In this code, the run() method is used to submit work to the pool along with a callbackfunction that should be triggered upon completion. The actual work is then submittedto a ThreadPoolExecutor instance. However, a really tricky problem concerns the co‐ordination of the computed result and the event loop. To do this, a pair of sockets arecreated under the covers and used as a kind of signaling mechanism. When work iscompleted by the thread pool, it executes the _complete() method in the class. Thismethod queues up the pending callback and result before writing a byte of data on oneof these sockets. The fileno() method is programmed to return the other socket. Thus,when this byte is written, it will signal to the event loop that something has happened.The handle_receive() method, when triggered, will then execute all of the callbackfunctions for previously submitted work. Frankly, it’s enough to make one’s head spin.Here is a simple server that shows how to use the thread pool to carry out a long-runningcalculation:
# A really bad Fibonacci implementationdef fib(n):
> if n < 2:return 1else:return fib(n - 1) + fib(n - 2)
class UDPFibServer(UDPServer):def handle_receive(self):msg, addr = self.sock.recvfrom(128)n = int(msg)pool.run(fib, (n,), callback=lambda r: self.respond(r, addr))def respond(self, result, addr):self.sock.sendto(str(result).encode(‘ascii'), addr)if __name__ == ‘__main__':pool = ThreadPoolHandler(16)handlers = [ pool, UDPFibServer((‘',16000))]event_loop(handlers)
To try this server, simply run it and try some experiments with another Python program:
from socket import *sock = socket(AF_INET, SOCK_DGRAM)for x in range(40):
> sock.sendto(str(x).encode(‘ascii'), (‘localhost', 16000))resp = sock.recvfrom(8192)print(resp[0])
You should be able to run this program repeatedly from many different windows andhave it operate without stalling other programs, even though it gets slower and sloweras the numbers get larger.Having gone through this recipe, should you use its code? Probably not. Instead, youshould look for a more fully developed framework that accomplishes the same task.However, if you understand the basic concepts presented here, you’ll understand thecore techniques used to make such frameworks operate. As an alternative to callback-based programming, event-driven code will sometimes use coroutines. See Recipe 12.12for an example.
- Copyright
- 前言
- 第一章:數據結構和算法
- 1.1 解壓序列賦值給多個變量
- 1.2 解壓可迭代對象賦值給多個變量
- 1.3 保留最后N個元素
- 1.4 查找最大或最小的N個元素
- 1.5 實現一個優先級隊列
- 1.6 字典中的鍵映射多個值
- 1.7 字典排序
- 1.8 字典的運算
- 1.9 查找兩字典的相同點
- 1.10 刪除序列相同元素并保持順序
- 1.11 命名切片
- 1.12 序列中出現次數最多的元素
- 1.13 通過某個關鍵字排序一個字典列表
- 1.14 排序不支持原生比較的對象
- 1.15 通過某個字段將記錄分組
- 1.16 過濾序列元素
- 1.17 從字典中提取子集
- 1.18 映射名稱到序列元素
- 1.19 轉換并同時計算數據
- 1.20 合并多個字典或映射
- 第二章:字符串和文本
- 2.1 使用多個界定符分割字符串
- 2.2 字符串開頭或結尾匹配
- 2.3 用Shell通配符匹配字符串
- 2.4 字符串匹配和搜索
- 2.5 字符串搜索和替換
- 2.6 字符串忽略大小寫的搜索替換
- 2.7 最短匹配模式
- 2.8 多行匹配模式
- 2.9 將Unicode文本標準化
- 2.10 在正則式中使用Unicode
- 2.11 刪除字符串中不需要的字符
- 2.12 審查清理文本字符串
- 2.13 字符串對齊
- 2.14 合并拼接字符串
- 2.15 字符串中插入變量
- 2.16 以指定列寬格式化字符串
- 2.17 在字符串中處理html和xml
- 2.18 字符串令牌解析
- 2.19 實現一個簡單的遞歸下降分析器
- 2.20 字節字符串上的字符串操作
- 第三章:數字日期和時間
- 3.1 數字的四舍五入
- 3.2 執行精確的浮點數運算
- 3.3 數字的格式化輸出
- 3.4 二八十六進制整數
- 3.5 字節到大整數的打包與解包
- 3.6 復數的數學運算
- 3.7 無窮大與NaN
- 3.8 分數運算
- 3.9 大型數組運算
- 3.10 矩陣與線性代數運算
- 3.11 隨機選擇
- 3.12 基本的日期與時間轉換
- 3.13 計算最后一個周五的日期
- 3.14 計算當前月份的日期范圍
- 3.15 字符串轉換為日期
- 3.16 結合時區的日期操作
- 第四章:迭代器與生成器
- 4.1 手動遍歷迭代器
- 4.2 代理迭代
- 4.3 使用生成器創建新的迭代模式
- 4.4 實現迭代器協議
- 4.5 反向迭代
- 4.6 帶有外部狀態的生成器函數
- 4.7 迭代器切片
- 4.8 跳過可迭代對象的開始部分
- 4.9 排列組合的迭代
- 4.10 序列上索引值迭代
- 4.11 同時迭代多個序列
- 4.12 不同集合上元素的迭代
- 4.13 創建數據處理管道
- 4.14 展開嵌套的序列
- 4.15 順序迭代合并后的排序迭代對象
- 4.16 迭代器代替while無限循環
- 第五章:文件與IO
- 5.1 讀寫文本數據
- 5.2 打印輸出至文件中
- 5.3 使用其他分隔符或行終止符打印
- 5.4 讀寫字節數據
- 5.5 文件不存在才能寫入
- 5.6 字符串的I/O操作
- 5.7 讀寫壓縮文件
- 5.8 固定大小記錄的文件迭代
- 5.9 讀取二進制數據到可變緩沖區中
- 5.10 內存映射的二進制文件
- 5.11 文件路徑名的操作
- 5.12 測試文件是否存在
- 5.13 獲取文件夾中的文件列表
- 5.14 忽略文件名編碼
- 5.15 打印不合法的文件名
- 5.16 增加或改變已打開文件的編碼
- 5.17 將字節寫入文本文件
- 5.18 將文件描述符包裝成文件對象
- 5.19 創建臨時文件和文件夾
- 5.20 與串行端口的數據通信
- 5.21 序列化Python對象
- 第六章:數據編碼和處理
- 6.1 讀寫CSV數據
- 6.2 讀寫JSON數據
- 6.3 解析簡單的XML數據
- 6.4 增量式解析大型XML文件
- 6.5 將字典轉換為XML
- 6.6 解析和修改XML
- 6.7 利用命名空間解析XML文檔
- 6.8 與關系型數據庫的交互
- 6.9 編碼和解碼十六進制數
- 6.10 編碼解碼Base64數據
- 6.11 讀寫二進制數組數據
- 6.12 讀取嵌套和可變長二進制數據
- 6.13 數據的累加與統計操作
- 第七章:函數
- 7.1 可接受任意數量參數的函數
- 7.2 只接受關鍵字參數的函數
- 7.3 給函數參數增加元信息
- 7.4 返回多個值的函數
- 7.5 定義有默認參數的函數
- 7.6 定義匿名或內聯函數
- 7.7 匿名函數捕獲變量值
- 7.8 減少可調用對象的參數個數
- 7.9 將單方法的類轉換為函數
- 7.10 帶額外狀態信息的回調函數
- 7.11 內聯回調函數
- 7.12 訪問閉包中定義的變量
- 第八章:類與對象
- 8.1 改變對象的字符串顯示
- 8.2 自定義字符串的格式化
- 8.3 讓對象支持上下文管理協議
- 8.4 創建大量對象時節省內存方法
- 8.5 在類中封裝屬性名
- 8.6 創建可管理的屬性
- 8.7 調用父類方法
- 8.8 子類中擴展property
- 8.9 創建新的類或實例屬性
- 8.10 使用延遲計算屬性
- 8.11 簡化數據結構的初始化
- 8.12 定義接口或者抽象基類
- 8.13 實現數據模型的類型約束
- 8.14 實現自定義容器
- 8.15 屬性的代理訪問
- 8.16 在類中定義多個構造器
- 8.17 創建不調用init方法的實例
- 8.18 利用Mixins擴展類功能
- 8.19 實現狀態對象或者狀態機
- 8.20 通過字符串調用對象方法
- 8.21 實現訪問者模式
- 8.22 不用遞歸實現訪問者模式
- 8.23 循環引用數據結構的內存管理
- 8.24 讓類支持比較操作
- 8.25 創建緩存實例
- 第九章:元編程
- 9.1 在函數上添加包裝器
- 9.2 創建裝飾器時保留函數元信息
- 9.3 解除一個裝飾器
- 9.4 定義一個帶參數的裝飾器
- 9.5 可自定義屬性的裝飾器
- 9.6 帶可選參數的裝飾器
- 9.7 利用裝飾器強制函數上的類型檢查
- 9.8 將裝飾器定義為類的一部分
- 9.9 將裝飾器定義為類
- 9.10 為類和靜態方法提供裝飾器
- 9.11 裝飾器為被包裝函數增加參數
- 9.12 使用裝飾器擴充類的功能
- 9.13 使用元類控制實例的創建
- 9.14 捕獲類的屬性定義順序
- 9.15 定義有可選參數的元類
- 9.16 *args和**kwargs的強制參數簽名
- 9.17 在類上強制使用編程規約
- 9.18 以編程方式定義類
- 9.19 在定義的時候初始化類的成員
- 9.20 利用函數注解實現方法重載
- 9.21 避免重復的屬性方法
- 9.22 定義上下文管理器的簡單方法
- 9.23 在局部變量域中執行代碼
- 9.24 解析與分析Python源碼
- 9.25 拆解Python字節碼
- 第十章:模塊與包
- 10.1 構建一個模塊的層級包
- 10.2 控制模塊被全部導入的內容
- 10.3 使用相對路徑名導入包中子模塊
- 10.4 將模塊分割成多個文件
- 10.5 利用命名空間導入目錄分散的代碼
- 10.6 重新加載模塊
- 10.7 運行目錄或壓縮文件
- 10.8 讀取位于包中的數據文件
- 10.9 將文件夾加入到sys.path
- 10.10 通過字符串名導入模塊
- 10.11 通過導入鉤子遠程加載模塊
- 10.12 導入模塊的同時修改模塊
- 10.13 安裝私有的包
- 10.14 創建新的Python環境
- 10.15 分發包
- 第十一章:網絡與Web編程
- 11.1 作為客戶端與HTTP服務交互
- 11.2 創建TCP服務器
- 11.3 創建UDP服務器
- 11.4 通過CIDR地址生成對應的IP地址集
- 11.5 生成一個簡單的REST接口
- 11.6 通過XML-RPC實現簡單的遠程調用
- 11.7 在不同的Python解釋器之間交互
- 11.8 實現遠程方法調用
- 11.9 簡單的客戶端認證
- 11.10 在網絡服務中加入SSL
- 11.11 進程間傳遞Socket文件描述符
- 11.12 理解事件驅動的IO
- 11.13 發送與接收大型數組
- 第十二章:并發編程
- 12.1 啟動與停止線程
- 12.2 判斷線程是否已經啟動
- 12.3 線程間的通信
- 12.4 給關鍵部分加鎖
- 12.5 防止死鎖的加鎖機制
- 12.6 保存線程的狀態信息
- 12.7 創建一個線程池
- 12.8 簡單的并行編程
- 12.9 Python的全局鎖問題
- 12.10 定義一個Actor任務
- 12.11 實現消息發布/訂閱模型
- 12.12 使用生成器代替線程
- 12.13 多個線程隊列輪詢
- 12.14 在Unix系統上面啟動守護進程
- 第十三章:腳本編程與系統管理
- 13.1 通過重定向/管道/文件接受輸入
- 13.2 終止程序并給出錯誤信息
- 13.3 解析命令行選項
- 13.4 運行時彈出密碼輸入提示
- 13.5 獲取終端的大小
- 13.6 執行外部命令并獲取它的輸出
- 13.7 復制或者移動文件和目錄
- 13.8 創建和解壓壓縮文件
- 13.9 通過文件名查找文件
- 13.10 讀取配置文件
- 13.11 給簡單腳本增加日志功能
- 13.12 給內庫增加日志功能
- 13.13 記錄程序執行的時間
- 13.14 限制內存和CPU的使用量
- 13.15 啟動一個WEB瀏覽器
- 第十四章:測試調試和異常
- 14.1 測試輸出到標準輸出上
- 14.2 在單元測試中給對象打補丁
- 14.3 在單元測試中測試異常情況
- 14.4 將測試輸出用日志記錄到文件中
- 14.5 忽略或者期望測試失敗
- 14.6 處理多個異常
- 14.7 捕獲所有異常
- 14.8 創建自定義異常
- 14.9 捕獲異常后拋出另外的異常
- 14.10 重新拋出最后的異常
- 14.11 輸出警告信息
- 14.12 調試基本的程序崩潰錯誤
- 14.13 給你的程序做基準測試
- 14.14 讓你的程序跑的更快
- 第十五章:C語言擴展
- 15.1 使用ctypes訪問C代碼
- 15.2 簡單的C擴展模塊
- 15.3 一個操作數組的擴展函數
- 15.4 在C擴展模塊中操作隱形指針
- 15.5 從擴張模塊中定義和導出C的API
- 15.6 從C語言中調用Python代碼
- 15.7 從C擴展中釋放全局鎖
- 15.8 C和Python中的線程混用
- 15.9 用WSIG包裝C代碼
- 15.10 用Cython包裝C代碼
- 15.11 用Cython寫高性能的數組操作
- 15.12 將函數指針轉換為可調用對象
- 15.13 傳遞NULL結尾的字符串給C函數庫
- 15.14 傳遞Unicode字符串給C函數庫
- 15.15 C字符串轉換為Python字符串
- 15.16 不確定編碼格式的C字符串
- 15.17 傳遞文件名給C擴展
- 15.18 傳遞已打開的文件給C擴展
- 15.19 從C語言中讀取類文件對象
- 15.20 處理C語言中的可迭代對象
- 15.21 診斷分析代碼錯誤
- 附錄A
- 關于譯者
- Roadmap