<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                [TOC] Concurrency is the art of making a computer do (or appear to do) multiple things at once. Historically, this meant inviting the processor to switch between different tasks many times per second. In modern systems, it can also literally mean doing two or more things simultaneously on separate processor cores. Concurrency is not inherently an object-oriented topic, but Python's concurrent systems are built on top of the object-oriented constructs we've covered throughout the book. This chapter will introduce you to the following topics: ? Threads ? Multiprocessing ? Futures ? AsyncIO Concurrency is complicated. The basic concepts are fairly simple, but the bugs that can occur are notoriously dificult to track down. However, for many projects, concurrency is the only way to get the performance we need. Imagine if a web server couldn't respond to a user's request until the previous one was completed! We won't be going into all the details of just how hard it is (another full book would be required) but we'll see how to do basic concurrency in Python, and some of the most common pitfalls to avoid. 并發是讓計算機做(或看起來做)多件事情的藝術 立刻。歷史上,這意味著邀請處理器在不同的 每秒執行多次任務。在現代系統中,它也可以字面意思是做 在不同的處理器內核上同時處理兩件或更多的事情。 并發性本質上不是一個面向對象的主題,而是Python的并發性 系統是建立在我們貫穿始終的面向對象結構之上的 這本書。本章將向您介紹以下主題: 螺紋 多重處理 未來 AsyncIO 并發是復雜的。基本概念相當簡單,但是缺陷 眾所周知,這種情況很難追蹤。然而,對于許多項目來說, 并發是獲得我們需要的性能的唯一方法。想象一下如果一個網絡服務器 在上一個請求完成之前,無法響應用戶的請求!我們 不會詳細討論它有多難(另一本完整的書會 (這是必需的),但是我們將看到如何在Python中實現基本并發,以及 要避免的最常見陷阱。 ## 線程 Most often, concurrency is created so that work can continue happening while the program is waiting for I/O to happen. For example, a server can start processing a new network request while it waits for data from a previous request to arrive. An interactive program might render an animation or perform a calculation while waiting for the user to press a key. Bear in mind that while a person can type more than 500 characters per minute, a computer can perform billions of instructions per second. Thus, a ton of processing can happen between individual key presses, even when typing quickly. It's theoretically possible to manage all this switching between activities within your program, but it would be virtually impossible to get right. Instead, we can rely on Python and the operating system to take care of the tricky switching part, while we create objects that appear to be running independently, but simultaneously. These objects are called threads; in Python they have a very simple API. Let's take a look at a basic example: 大多數情況下,創建并發性是為了在 程序正在等待輸入輸出發生。例如,服務器可以開始處理 新的網絡請求,同時等待來自先前請求的數據到達。一 交互式程序可能會在等待時渲染動畫或執行計算 供用戶按鍵。請記住,雖然一個人可以輸入500多種 一臺計算機每秒可以執行數十億條指令。 因此,即使在以下情況下,在單個按鍵之間也可能發生大量的處理 打字很快。 理論上有可能管理您的內部活動之間的所有切換 程序,但實際上不可能做對。相反,我們可以依靠 Python和操作系統來處理棘手的切換部分,而我們 創建看起來獨立運行但同時運行的對象。這些 對象被稱為線程;在Python中,它們有一個非常簡單的應用編程接口。讓我們看看 在一個基本的例子中: ``` from threading import Thread class InputReader(Thread): def run(self): self.line_of_text = input() print("Enter some text and press enter: ") thread = InputReader() thread.start() count = result = 1 while thread.is_alive(): result = count * count count += 1 print("calculated squares up to {0} * {0} = {1}".format( count, result)) print("while you typed '{}'".format(thread.line_of_text)) ``` This example runs two threads. Can you see them? Every program has one thread, called the main thread. The code that executes from the beginning is happening in this thread. The second thread, more obviously, exists as the InputReader class. To construct a thread, we must extend the Thread class and implement the run method. Any code inside the run method (or that is called from within that method) is executed in a separate thread. 本示例運行兩個線程。你能看見他們嗎?每個程序都有一個線程, 叫做主線。從頭開始執行的代碼發生在 這根線。更明顯的是,第二個線程是作為InputReader類存在的。 要構建線程,我們必須擴展線程類并實現運行 方法。run方法中的任何代碼(或者從該方法中調用的代碼 方法)在單獨的線程中執行。 The new thread doesn't start running until we call the start() method on the object. In this case, the thread immediately pauses to wait for input from the keyboard. In the meantime, the original thread continues executing at the point start was called. It starts calculating squares inside a while loop. The condition in the while loop checks if the InputReader thread has exited its run method yet; once it does, it outputs some summary information to the screen. If we run the example and type the string "hello world", the output looks as follows: 直到我們對對象調用start()方法,新線程才會開始運行。 在這種情況下,線程會立即暫停,等待鍵盤輸入。 同時,原始線程在開始時繼續執行 打電話來。它開始在while循環中計算方塊。當時的情況 循環檢查輸入線程是否已經退出其運行方法;一旦它做到了,它 向屏幕輸出一些摘要信息。 如果我們運行示例并鍵入字符串“hello world”,輸出如下: ``` Enter some text and press enter: hello world calculated squares up to 1044477 * 1044477 = 1090930114576 while you typed 'hello world' ``` You will, of course, calculate more or less squares while typing the string as the numbers are related to both our relative typing speeds, and to the processor speeds of the computers we are running. A thread only starts running in concurrent mode when we call the start method. If we want to take out the concurrent call to see how it compares, we can call thread. run() in the place that we originally called thread.start(). The output is telling: 當然,在將字符串作為 數字既與我們的相對打字速度有關,也與處理器速度有關 我們運行的計算機中。 當我們調用start方法時,線程才開始以并發模式運行。如果 我們想取出并發調用,看看它是如何比較的,我們可以調用thread。 在我們最初稱之為thread.start()的地方運行()。輸出表明: ``` Enter some text and press enter: hello world calculated squares up to 1 * 1 = 1 while you typed 'hello world' ``` In this case, the thread never becomes alive and the while loop never executes. We wasted a lot of CPU power sitting idle while we were typing. There are a lot of different patterns for using threads effectively. We won't be covering all of them, but we will look at a common one so we can learn about the join method. Let's check the current temperature in the capital city of every province in Canada: 在這種情況下,線程永遠不會變得活躍,while循環永遠不會執行。 我們在打字的時候閑坐著浪費了大量的中央處理器能量。 有效使用線程有許多不同的模式。我們不會報道 所有這些,但是我們將看一個公共的,這樣我們就可以了解join方法。 讓我們來看看加拿大各省首府的當前溫度: ``` from threading import Thread import json from urllib.request import urlopen import time CITIES = [ 'Edmonton', 'Victoria', 'Winnipeg', 'Fredericton', "St. John's", 'Halifax', 'Toronto', 'Charlottetown', 'Quebec City', 'Regina' ] class TempGetter(Thread): def __init__(self, city): super().__init__() self.city = city def run(self): url_template = ( 'http://api.openweathermap.org/data/2.5/' 'weather?q={},CA&units=metric') response = urlopen(url_template.format(self.city)) data = json.loads(response.read().decode()) self.temperature = data['main']['temp'] threads = [TempGetter(c) for c in CITIES] start = time.time() for thread in threads: thread.start() for thread in threads: thread.join() for thread in threads: print( "it is {0.temperature:.0f}°C in {0.city}".format(thread)) print( "Got {} temps in {} seconds".format( len(threads), time.time() - start)) ``` This code constructs 10 threads before starting them. Notice how we can override the constructor to pass them into the Thread object, remembering to call super to ensure the Thread is properly initialized. Pay attention to this: the new thread isn't running yet, so the \_\_init\_\_ method is still executing from inside the main thread. Data we construct in one thread is accessible from other running threads. After the 10 threads have been started, we loop over them again, calling the join() method on each. This method essentially says "wait for the thread to complete before doing anything". We call this ten times in sequence; the for loop won't exit until all ten threads have completed. 這段代碼在啟動前構造了10個線程。注意我們如何重寫 構造函數將它們傳遞給線程對象,記住調用super以確保 線程已正確初始化。請注意:新線程沒有運行 然而,所以__init__方法仍然在主線程內部執行。數據我們 一個線程中的構造可以從其他正在運行的線程中訪問。 10個線程啟動后,我們再次循環它們,調用join() 方法。這個方法本質上說是“等待線程完成之前” 做任何事”。我們依次稱之為十次;for循環直到所有 十個線程已經完成。 At this point, we can print the temperature that was stored on each thread object. Notice once again that we can access data that was constructed within the thread from the main thread. In threads, all state is shared by default. Executing this code on my 100 mbit connection takes about two tenths of a second: 此時,我們可以打印存儲在每個線程對象上的溫度。 再次注意,我們可以訪問線程中構建的數據 從主線開始。在線程中,默認情況下所有狀態都是共享的。 在我的100兆位連接上執行這段代碼大約需要十分之二秒: ``` it is 5°C in Edmonton it is 11°C in Victoria it is 0°C in Winnipeg it is -10°C in Fredericton it is -12°C in St. John's it is -8°C in Halifax it is -6°C in Toronto it is -13°C in Charlottetown it is -12°C in Quebec City it is 2°C in Regina Got 10 temps in 0.18970298767089844 seconds ``` If we run this code in a single thread (by changing the start() call to run() and commenting out the join() call), it takes closer to 2 seconds because each 0.2 second request has to complete before the next one begins. This speedup of 10 times shows just how useful concurrent programming can be. 如果我們在一個線程中運行這段代碼(通過將start()調用改為run()和 注釋掉join()調用),因為每0.2秒需要將近2秒 請求必須在下一個開始之前完成。十倍的加速顯示 并發編程有多有用。 ### 與線程有關的問題 Threads can be useful, especially in other programming languages, but modern Python programmers tend to avoid them for several reasons. As we'll see, there are other ways to do concurrent programming that are receiving more attention from the Python developers. Let's discuss some of these pitfalls before moving on to more salient topics. 線程可能很有用,尤其是在其他編程語言中,但是很現代 Python程序員傾向于避免它們,原因有幾個。我們會看到,在那里 其他進行并發編程的方法受到更多關注嗎 來自Python開發人員。在開始之前,讓我們討論一下這些陷阱 更突出的話題。 #### 共享內存 The main problem with threads is also their primary advantage. Threads have access to all the memory and thus all the variables in the program. This can too easily cause inconsistencies in the program state. Have you ever encountered a room where a single light has two switches and two different people turn them on at the same time? Each person (thread) expects their action to turn the lamp (a variable) on, but the resulting value (the lamp is off) is inconsistent with those expectations. Now imagine if those two threads were transferring funds between bank accounts or managing the cruise control in a vehicle. 線程的主要問題也是它們的主要優勢。線程有 訪問所有內存,從而訪問程序中的所有變量。這也可以 容易導致程序狀態不一致。你曾經遇到過 一盞燈有兩個開關,兩個不同的人轉動開關的房間 同時打開嗎?每個人(線)都期望他們的行動能點亮燈 (一個變量)打開,但是結果值(燈關閉)與這些值不一致 期望。現在想象一下,如果這兩個線程在 銀行賬戶或管理汽車巡航控制。 The solution to this problem in threaded programming is to "synchronize" access to any code that reads or writes a shared variable. There are a few different ways to do this, but we won't go into them here so we can focus on more Pythonic constructs. The synchronization solution works, but it is way too easy to forget to apply it. Worse, bugs due to inappropriate use of synchronization are really hard to track down because the order in which threads perform operations is inconsistent. We can't easily reproduce the error. Usually, it is safest to force communication between threads to happen using a lightweight data structure that already uses locks appropriately. Python offers the queue.Queue class to do this; it's functionality is basically the same as the multiprocessing.Queue that we will discuss in the next section. In some cases, these disadvantages might be outweighed by the one advantage of allowing shared memory: it's fast. If multiple threads need access to a huge data structure, shared memory can provide that access quickly. However, this advantage is usually nulliied by the fact that, in Python, it is impossible for two threads running on different CPU cores to be performing calculations at exactly the same time. This brings us to our second problem with threads. 線程編程中這個問題的解決方案是“同步”訪問 讀取或寫入共享變量的任何代碼。有幾種不同的方法可以做 這一點,但我們不會在這里討論它們,這樣我們就可以專注于更多的蟒蛇構造。這 同步解決方案可以工作,但是忘記應用它太容易了。更糟的是,蟲子 由于同步使用不當,很難跟蹤 線程執行操作的順序不一致。我們不能輕易復制 錯誤。通常,強制線程之間進行通信是最安全的 使用已經適當使用鎖的輕量級數據結構。大蟒 提供隊列。隊列類來執行此操作;它的功能與 多重處理。排隊,我們將在下一節討論。 在某些情況下,這些缺點可能會被以下優點所壓倒 允許共享內存:速度很快。如果多個線程需要訪問大量數據 結構化共享內存可以快速提供訪問。然而,這一優勢是 通常是因為在Python中,兩個線程不可能運行在 不同的中央處理器核心在完全相同的時間執行計算。這帶來了 線程的第二個問題。 #### 全局解釋器鎖 In order to eficiently manage memory, garbage collection, and calls to machine code in libraries, Python has a utility called the global interpreter lock, or GIL. It's impossible to turn off, and it means that threads are useless in Python for one thing that they excel at in other languages: parallel processing. The GIL's primary effect, for our purposes is to prevent any two threads from doing work at the exact same time, even if they have work to do. In this case, "doing work" means using the CPU, so it's perfectly ok for multiple threads to access the disk or network; the GIL is released as soon as the thread starts to wait for something. The GIL is quite highly disparaged, mostly by people who don't understand what it is or all the beneits it brings to Python. It would deinitely be nice if our language didn't have this restriction, but the Python reference developers have determined that, for now at least, it brings more value than it costs. It makes the reference implementation easier to maintain and develop, and during the single-core processor days when Python was originally developed, it actually made the interpreter faster. The net result of the GIL, however, is that it limits the beneits that threads bring us, without alleviating the costs. 為了有效地管理內存、垃圾收集和對機器的調用 在庫代碼中,Python有一個名為全局解釋器鎖(或稱GIL)的實用程序。這是 無法關閉,這意味著線程在Python中毫無用處 他們擅長的其他語言:并行處理。GIL的主要影響, 我們的目的是防止任何兩個線程完全相同地工作 時間,即使他們有工作要做。在這種情況下,“工作”意味著使用中央處理器, 所以多線程訪問磁盤或網絡是完全可以的;GIL是 線程開始等待某個東西時釋放。 GIL受到了相當高的蔑視,主要是被那些不了解它是什么的人所蔑視 或者它給蟒蛇帶來的所有好處。如果我們的語言不是,那肯定會很好 有這個限制,但是Python參考開發人員已經確定,對于 至少現在,它帶來的價值超過了成本。它進行引用實現 更易于維護和開發,在單核處理器時代 Python最初是開發出來的,它實際上使解釋器更快。網絡 然而,GIL的結果是,它限制了線程給我們帶來的好處 降低成本。 While the GIL is a problem in the reference implementation of Python that most people use, it has been solved in some of the nonstandard implementations such as IronPython and Jython. Unfortunately, at the time of publication, none of these support Python 3. > 雖然GIL是Python參考實現中的一個問題 大多數人使用的,它已經在一些非標準的 實現,如IronPython和Jython。不幸的是,在 發布時,這些都不支持Python 3。 ### 線程過載 One inal limitation of threads as compared to the asynchronous system we will be discussing later is the cost of maintaining the thread. Each thread takes up a certain amount of memory (both in the Python process and the operating system kernel) to record the state of that thread. Switching between the threads also uses a (small) amount of CPU time. This work happens seamlessly without any extra coding (we just have to call start() and the rest is taken care of), but the work still has to happen somewhere. This can be alleviated somewhat by structuring our workload so that threads can be reused to perform multiple jobs. Python provides a ThreadPool feature to handle this. It is shipped as part of the multiprocessing library and behaves identically to the ProcessPool, that we will discuss shortly, so let's defer discussion until the next section. 與異步系統相比,線程的一個最終限制是 稍后討論的是維護線程的成本。每根線占用一定的空間 內存量(在Python進程和操作系統內核中) 來記錄線程的狀態。線程之間的切換也使用(小) CPU時間量。這項工作無縫進行,無需任何額外的編碼(我們 只需要調用start()就可以了,剩下的都處理好了),但是工作仍然需要 發生在某個地方。 這可以通過構建我們的工作負載來緩解,這樣線程就可以 被重用以執行多個作業。Python提供了線程池功能來處理 這個。它作為多處理庫的一部分提供,其行為與 我們將很快討論流程池,所以我們將討論推遲到 下一節。 ## 多進程 The multiprocessing API was originally designed to mimic the thread API. However, it has evolved and in recent versions of Python 3, it supports more features more robustly. The multiprocessing library is designed when CPU-intensive jobs need to happen in parallel and multiple cores are available (given that a four core Raspberry Pi can currently be purchased for $35, there are usually multiple cores available). Multiprocessing is not useful when the processes spend a majority of their time waiting on I/O (for example, network, disk, database, or keyboard), but they are the way to go for parallel computation. The multiprocessing module spins up new operating system processes to do the work. On Windows machines, this is a relatively expensive operation; on Linux, processes are implemented in the kernel the same way threads are, so the overhead is limited to the cost of running separate Python interpreters in each process. 多處理應用編程接口最初是為了模擬線程應用編程接口而設計的。然而, 它已經發展,在Python 3的最新版本中,它更多地支持更多功能 強壯。多處理庫是在CPU密集型作業需要時設計的 并行發生且有多個內核可用(假設四核覆盆子 Pi目前可以35美元購買,通常有多個內核可用)。 當進程花費大部分時間時,多重處理沒有用 等待輸入/輸出(例如,網絡、磁盤、數據庫或鍵盤),但它們是 并行計算之路。 多處理模塊啟動新的操作系統進程來完成 工作。在視窗機器上,這是一個相對昂貴的操作;在Linux上, 進程在內核中的實現方式與線程相同,因此開銷 僅限于在每個過程中運行獨立的Python解釋器的成本。 Let's try to parallelize a compute-heavy operation using similar constructs to those provided by the threading API: 讓我們嘗試使用與那些類似的結構來并行化計算密集型操作 由線程應用編程接口提供: ``` from multiprocessing import Process, cpu_count import time import os class MuchCPU(Process): def run(self): print(os.getpid()) for i in range(200000000): pass if __name__ == '__main__': procs = [MuchCPU() for f in range(cpu_count())] t = time.time() for p in procs: p.start() for p in procs: p.join() print('work took {} seconds'.format(time.time() - t)) ``` This example just ties up the CPU for 200 million iterations. You may not consider this to be useful work, but it's a cold day and I appreciate the heat my laptop generates under such load. The API should be familiar; we implement a subclass of Process (instead of Thread) and implement a run method. This method prints out the process ID (a unique number the operating system assigns to each process on the machine) before doing some intense (if misguided) work. Pay special attention to the if \_\_name\_\_ == '\_\_main\_\_': guard around the module level code that prevents it to run if the module is being imported, rather than run as a program. This is good practice in general, but when using multiprocessing on some operating systems, it is essential. Behind the scenes, multiprocessing may have to import the module inside the new process in order to execute the run() method. If we allowed the entire module to execute at that point, it would start creating new processes recursively until the operating system ran out of resources. We construct one process for each processor core on our machine, then start and join each of those processes. On my 2014 era quad-core laptop, the output looks like this: 這個例子只是將中央處理器占用了2億次迭代。你可能不會考慮 這是一項很有用的工作,但是今天很冷,我很感激我的筆記本電腦的熱度 在這樣的負載下產生。 該應用編程接口應該是熟悉的;我們實現了一個進程子類(而不是線程) 并實現一個運行方法。該方法打印出進程標識(唯一的數字 操作系統分配給機器上的每個進程) 緊張的工作(如果被誤導的話)。 請特別注意if _ _ name _ _ _ =“_ _ main _ _ _”:保護模塊周圍 如果模塊正在導入而不是運行,則阻止其運行的級別代碼 作為一個程序。一般來說,這是一個很好的實踐,但是在使用多處理時 一些操作系統,這是必不可少的。在幕后,多重處理可能 在新進程中導入模塊,以便執行run()方法。 如果我們允許整個模塊在那個時候執行,它將開始創建新的 遞歸處理,直到操作系統耗盡資源。 我們為機器上的每個處理器內核構建一個進程,然后啟動并加入 每一個過程。在我的2014年款四核筆記本電腦上,輸出如下: ``` 6987 6988 6989 6990 work took 12.96659541130066 seconds ``` The irst four lines are the process ID that was printed inside each MuchCPU instance. The last line shows that the 200 million iterations can run in about 13 seconds on my machine. During that 13 seconds, my process monitor indicated that all four of my cores were running at 100 percent. If we subclass threading.Thread instead of multiprocessing.Process in MuchCPU, the output looks like this: 前四行是打印在每個MuchCPU實例中的進程標識。 最后一行顯示,在我的 機器。在這13秒鐘內,我的過程監視器顯示 內核以100%的速度運行。 如果我們將線程子類化。線程代替多處理。過程在 很多CPU,輸出如下: ``` 7235 7235 7235 7235 work took 28.577413082122803 seconds ``` This time, the four threads are running inside the same process and take close to three times as long to run. This is the cost of the global interpreter lock; in other languages or implementations of Python, the threaded version would run at least as fast as the multiprocessing version, We might expect it to be four times as long, but remember that many other programs are running on my laptop. In the multiprocessing version, these programs also need a share of the four CPUs. In the threading version, those programs can use the other three CPUs instead. 這一次,四個線程在同一個進程中運行,并且占用了近三個線程 跑步時間的兩倍。這是全局解釋器鎖的成本;用其他語言 、或Python的實現,線程版本的運行速度至少與 多重處理版本,我們可能預計它會有四倍長,但是請記住 許多其他程序正在我的筆記本電腦上運行。在多處理版本中, 這些項目也需要四個中央處理器中的一部分。在線程版本中,那些 程序可以使用其他三個處理器來代替。 ### 多進程池 In general, there is no reason to have more processes than there are processors on the computer. There are a few reasons for this: ? Only cpu\_count() processes can run simultaneously ? Each process consumes resources with a full copy of the Python interpreter ? Communication between processes is expensive ? Creating processes takes a nonzero amount of time Given these constraints, it makes sense to create at most cpu\_count() processes when the program starts and then have them execute tasks as needed. It is not dificult to implement a basic series of communicating processes that does this, but it can be tricky to debug, test, and get right. Of course, Python being Python, we don't have to do all this work because the Python developers have already done it for us in the form of multiprocessing pools. 總的來說,沒有理由比上有更多的處理器 電腦。這有幾個原因: 只有cpu_count()進程可以同時運行 每一個過程都使用Python解釋器的完整副本來消耗資源 流程之間的溝通很昂貴 創建流程需要非零的時間 鑒于這些限制,創建最多個cpu_count()進程是有意義的 當程序啟動后,讓他們根據需要執行任務。事實并非如此 很難實現一系列基本的溝通過程, 但是調試、測試和糾正可能很棘手。當然,蟒蛇就是蟒蛇, 我們不必做所有這些工作,因為Python開發人員已經做了 以多處理池的形式為我們做這件事。 The primary advantage of pools is that they abstract away the overhead of iguring out what code is executing in the main process and which code is running in the subprocess. As with the threading API that multiprocessing mimics, it can often be hard to remember who is executing what. The pool abstraction restricts the number of places that code in different processes interact with each other, making it much easier to keep track of. ? Pools also seamlessly hide the process of passing data between processes. Using a pool looks much like a function call; you pass data into a function, it is executed in another process or processes, and when the work is done, a value is returned. It is important to understand that under the hood, a lot of work is being done to support this: objects in one process are being pickled and passed into a pipe. ? Another process retrieves data from the pipe and unpickles it. Work is done in the subprocess and a result is produced. The result is pickled and passed into a pipe. Eventually, the original process unpickles it and returns it. All this pickling and passing data into pipes takes time and memory. Therefore, it is ideal to keep the amount and size of data passed into and returned from the pool to a minimum, and it is only advantageous to use the pool if a lot of processing has to be done on the data in question. Armed with this knowledge, the code to make all this machinery work is surprisingly simple. Let's look at the problem of calculating all the prime factors of a list of random numbers. This is a common and expensive part of a variety of cryptography algorithms (not to mention attacks on those algorithms!). It requires years of processing power to crack the extremely large numbers used to secure your bank accounts. The following implementation, while readable, is not at all eficient, but that's ok because we want to see it using lots of CPU time: 池的主要優勢是它們可以減少配置開銷 找出在主進程中執行的代碼以及在 子流程。如同多處理模擬的線程應用編程接口一樣,它通常可以 很難記起誰在執行什么。池抽象限制了數量 不同進程中的代碼相互作用的地方 更容易跟蹤。 池還無縫隱藏了進程之間傳遞數據的過程。 使用池看起來很像函數調用;您將數據傳遞到函數中, 它在另一個或多個進程中執行,當工作完成時 返回值。重要的是要理解,在引擎蓋下,很多 支持這一點的工作正在進行:一個過程中的對象正在被腌制 然后進入管道。 另一個過程從管道中檢索數據并解除鎖定。工作完成了 并產生結果。結果被腌制并通過 放進管子里。最終,原始進程會將其拆封并返回。 所有這些酸洗和將數據傳入管道需要時間和內存。因此,它是 非常適合將傳入池和從池中返回的數據量和大小保持在 最低限度,只有在大量處理必須使用池的情況下才是有利的 對有問題的數據進行處理。 有了這些知識,讓所有這些機器運轉的代碼令人驚訝 簡單。讓我們來看看計算一個隨機列表的所有質因數的問題 數字。這是各種密碼算法中常見且昂貴的部分 (更不用說對那些算法的攻擊了!)。需要多年的處理能力來 破解用來保護你銀行賬戶的巨大數字。接下來的 實現雖然可讀,但一點也不有效,但是沒關系,因為我們想要 要使用大量的CPU時間來查看它: ``` import random from multiprocessing.pool import Pool def prime_factor(value): factors = [] for divisor in range(2, value-1): quotient, remainder = divmod(value, divisor) if not remainder: factors.extend(prime_factor(divisor)) factors.extend(prime_factor(quotient)) break else: factors = [value] return factors if __name__ == '__main__': pool = Pool() to_factor = [ random.randint(100000, 50000000) for i in range(20) ] results = pool.map(prime_factor, to_factor) for value, factors in zip(to_factor, results): print("The factors of {} are {}".format(value, factors)) ``` Let's focus on the parallel processing aspects as the brute force recursive algorithm for calculating factors is pretty clear. We irst construct a multiprocessing pool instance. By default, this pool creates a separate process for each of the CPU cores in the machine it is running on. The map method accepts a function and an iterable. The pool pickles each of the values in the iterable and passes it into an available process, which executes the function on it. When that process is inished doing it's work, it pickles the resulting list of factors and passes it back to the pool. Once all the pools are inished processing work (which could take some time), the results list is passed back to the original process, which has been waiting patiently for all this work to complete. It is often more useful to use the similar map\_async method, which returns immediately even though the processes are still working. In that case, the results variable would not be a list of values, but a promise to return a list of values later by calling results.get(). This promise object also has methods like ready(), and wait(), which allow us to check whether all the results are in yet. Alternatively, if we don't know all the values we want to get results for in advance, we can use the apply\_async method to queue up a single job. If the pool has a process that isn't already working, it will start immediately; otherwise, it will hold onto the task until there is a free process available. Pools can also be closed, which refuses to take any further tasks, but processes everything currently in the queue, or terminated, which goes one step further and refuses to start any jobs still on the queue, although any jobs currently running are still permitted to complete. 讓我們把重點放在并行處理方面,如強力遞歸算法 因為計算因素很清楚。我們首先構建一個多處理池 實例。默認情況下,該池為每個CPU創建一個單獨的進程 它運行的機器中的核心。 映射方法接受一個函數和一個可迭代函數。游泳池腌制每一種價值 并將它傳遞給一個可用的進程,該進程在其上執行函數。 當這個過程完成了它的工作,它就把結果列成了一系列因素 把它送回游泳池。一旦所有池都完成了處理工作(這可能 需要一些時間),結果列表被傳遞回原始過程 耐心等待所有這些工作完成。 使用類似的map_async方法通常更有用,該方法返回 即使這些過程仍在運行。在這種情況下,結果 變量不是一個值列表,而是一個稍后返回值列表的承諾 通過調用results.get()。這個promise對象也有類似ready()的方法, 和wait(),這樣我們就可以檢查是否所有的結果都已經出來了。 或者,如果我們事先不知道我們想要得到結果的所有值, 我們可以使用apply_async方法將單個作業排隊。如果游泳池有一個 尚未工作的進程將立即啟動;否則,它將保持不變 直到有可用的自由流程。 池也可以關閉,這將拒絕接受任何進一步的任務,但進程 當前隊列中的所有內容,或者終止的內容,更進一步 拒絕啟動任何仍在隊列中的作業,盡管當前運行的任何作業 仍然可以完成。 ### 隊列 If we need more control over communication between processes, we can use a Queue. Queue data structures are useful for sending messages from one process into one or more other processes. Any picklable object can be sent into a Queue, but remember that pickling can be a costly operation, so keep such objects small. To illustrate queues, let's build a little search engine for text content that stores all relevant entries in memory. This is not the most sensible way to build a text-based search engine, but I have used this pattern to query numerical data that needed to use CPU-intensive processes to construct a chart that was then rendered to the user. This particular search engine scans all iles in the current directory in parallel. A process is constructed for each core on the CPU. Each of these is instructed to load some of the iles into memory. Let's look at the function that does the loading and searching: 如果我們需要更多地控制進程間的通信,我們可以使用隊列。 隊列數據結構對于將消息從一個進程發送到一個或 更多其他流程。任何可選擇的對象都可以被發送到隊列中,但是記住 酸洗可能是一個昂貴的操作,所以保持這些物體小。為了說明 隊列,讓我們為存儲所有相關內容的文本內容構建一個小型搜索引擎 內存中的條目。 這不是建立基于文本的搜索引擎的最明智的方法,但是我有 使用這種模式來查詢需要使用大量中央處理器的數字數據 構建圖表的過程,然后呈現給用戶。 這個特定的搜索引擎并行掃描當前目錄中的所有文件。A 進程是為中央處理器上的每個核心構建的。每個都被指示加載 記憶中的一些文件。讓我們看看加載的函數 搜索: ``` def search(paths, query_q, results_q): lines = [] for path in paths: lines.extend(l.strip() for l in path.open()) query = query_q.get() while query: results_q.put([l for l in lines if query in l]) query = query_q.get() ``` Remember, this function is run in a different process (in fact, it is run in cpucount() different processes) from the main thread. It is passes a list of path.path objects and two multiprocessing.Queue objects; one for incoming queries and one to send outgoing results. These queues have a similar interface to the Queue class we discussed in Chapter 6, Python Data Structures. However, they are doing extra work to pickle the data in the queue and pass it into the subprocess over a pipe. These two queues are set up in the main process and passed through the pipes into the search function inside the child processes. The search code is pretty dumb, both in terms of eficiency and of capabilities; it loops over every line stored in memory and puts the matching ones in a list. The list is placed on a queue and passed back to the main process. Let's look at the main process, which sets up these queues: 請記住,此函數在不同的進程中運行(事實上,它在cpucount()中運行 不同的進程)。它傳遞路徑列表。路徑對象 和兩個多重處理。隊列對象;一個用于傳入查詢,一個用于 發送輸出結果。這些隊列與我們的隊列類有相似的接口 在第6章,Python數據結構中討論。然而,他們正在做額外的工作 提取隊列中的數據,并通過管道將其傳遞給子流程。這兩個 隊列在主進程中建立,并通過管道進入搜索 子進程內部的函數。 搜索代碼在效率和能力方面都相當愚蠢;它循環往復 存儲在內存中的每一行,并將匹配的行放入列表中。列表已被放置 并傳遞回主進程。 讓我們看看建立這些隊列的主要過程: ``` if __name__ == '__main__': from multiprocessing import Process, Queue, cpu_count from path import path cpus = cpu_count() pathnames = [f for f in path('.').listdir() if f.isfile()] paths = [pathnames[i::cpus] for i in range(cpus)] query_queues = [Queue() for p in range(cpus)] results_queue = Queue() search_procs = [ Process(target=search, args=(p, q, results_queue)) for p, q in zip(paths, query_queues) ] for proc in search_procs: proc.start() ``` For easier description, let's assume cpu\_count is four. Notice how the import statements are placed inside the if guard? This is a small optimization that prevents them from being imported in each subprocess (where they aren't needed) on certain operating systems. We list all the paths in the current directory and then split the list into four approximately equal parts. We also construct a list of four Queue objects to send data into each subprocess. Finally, we construct a single results queue; this is passed into all four of the subprocesses. Each of them can put data into the queue and it will be aggregated in the main process. Now let's look at the code that makes a search actually happen: 為了便于描述,讓我們假設cpu_count為4。請注意導入 語句被放在if保護內部?這是一個小的優化,防止 它們一定不會在每個子過程(不需要的地方)中被導入 操作系統。我們列出當前目錄中的所有路徑,然后拆分列表 分成四個大致相等的部分。我們還構建了四個隊列對象的列表,以 將數據發送到每個子流程。最后,我們構建了一個單一的結果隊列;這 被傳遞到所有四個子過程中。他們每個人都可以將數據放入隊列 它將在主過程中聚合。 現在讓我們來看看真正實現搜索的代碼: ``` for q in query_queues: q.put("def") q.put(None) # Signal process termination for i in range(cpus): for match in results_queue.get(): print(match) for proc in search_procs: proc.join() ``` This code performs a single search for "def" (because it's a common phrase in a directory full of Python iles!). In a more production ready system, we would probably hook a socket up to this search code. In that case, we'd have to change the inter-process protocol so that the message coming back on the return queue contained enough information to identify which of many queries the results were attached to. This use of queues is actually a local version of what could become a distributed system. Imagine if the searches were being sent out to multiple computers and then recombined. We won't discuss it here, but the multiprocessing module includes a manager class that can take a lot of the boilerplate out of the preceding code. There is even a version of the multiprocessing.Manager that can manage subprocesses on remote systems to construct a rudimentary distributed application. Check the Python multiprocessing documentation if you are interested in pursuing this further. 這段代碼對“def”執行一次搜索(因為它是 一個滿是Python文件的目錄!)。在一個更易于生產的系統中,我們會 可能會將一個套接字連接到這個搜索代碼。在這種情況下,我們必須改變 進程間協議,以便消息返回到返回隊列 包含足夠的信息來識別結果是哪個查詢 附著于。 隊列的這種使用實際上是分布式的本地版本 系統。想象一下,如果搜索被發送到多臺計算機,然后 重組。我們不在這里討論它,但是多處理模塊包括一個 manager類,它可以從前面的代碼中提取大量樣板文件。在那里 甚至是多重處理的一個版本。可以管理子流程的管理器 構建一個基本的分布式應用程序。檢查 Python多處理文檔,如果您有興趣進一步研究的話。 ### 多進程的問題 As threads do, multiprocessing also has problems, some of which we have already discussed. There is no best way to do concurrency; this is especially true in Python. We always need to examine the parallel problem to igure out which of the many available solutions is the best one for that problem. Sometimes, there is no best solution. In the case of multiprocessing, the primary drawback is that sharing data between processes is very costly. As we have discussed, all communication between processes, whether by queues, pipes, or a more implicit mechanism requires pickling the objects. Excessive pickling quickly dominates processing time. Multiprocessing works best when relatively small objects are passed between processes and a tremendous amount of work needs to be done on each one. On the other hand, if no communication between processes is required, there may not be any point in using the module at all; we can spin up four separate Python processes and use them independently. The other major problem with multiprocessing is that, like threads, it can be hard to tell which process a variable or method is being accessed in. In multiprocessing, if you access a variable from another process it will usually overwrite the variable in the currently running process while the other process keeps the old value. This is really confusing to maintain, so don't do it. 和線程一樣,多重處理也有問題,其中一些我們已經有了 討論過了。沒有最佳的并發方式;在Python中尤其如此。我們 總是需要檢查并行問題,找出許多可用問題中的哪一個 解決這個問題的最好辦法。有時候,沒有最好的解決辦法。 在多處理的情況下,主要缺點是在 過程非常昂貴。正如我們已經討論過的,流程之間的所有通信, 無論是通過隊列、管道還是更隱含的機制,都需要酸洗對象。 過度酸洗很快支配了加工時間。多重處理效果最好 當相對較小的對象在進程和大量的 每一個都需要做大量的工作。另一方面,如果沒有交流 在需要的過程之間,使用該模塊可能沒有任何意義; 我們可以旋轉四個獨立的Python進程,并獨立使用它們。 多重處理的另一個主要問題是,像線程一樣,很難 告知變量或方法正在哪個進程中被訪問。在多重處理中,如果 從另一個進程訪問變量時,它通常會覆蓋 當前正在運行的進程,而另一個進程保持舊值。這真是 難以維護,所以不要這樣做。 ## Futures Let's start looking at a more asynchronous way of doing concurrency. Futures wrap either multiprocessing or threading depending on what kind of concurrency we need (tending towards I/O versus tending towards CPU). They don't completely solve the problem of accidentally altering shared state, but they allow us to structure our code such that it is easier to track down when we do so. Futures provide distinct boundaries between the different threads or processes. Similar to the multiprocessing pool, they are useful for "call and answer" type interactions in which processing can happen in another thread and then at some point in the future (they are aptly named, after all), you can ask it for the result. It's really just a wrapper around multiprocessing pools and thread pools, but it provides a cleaner API and encourages nicer code. A future is an object that basically wraps a function call. That function call is run in the background in a thread or process. The future object has methods to check if the future has completed and to get the results after it has completed. 讓我們開始考慮一種更異步的并發方式。期貨包裝 多處理或線程處理取決于我們需要哪種并發 (傾向于輸入輸出而不是傾向于中央處理器)。他們沒有完全解決 意外改變共享狀態的問題,但是它們允許我們構造代碼 以便在我們這樣做時更容易追蹤。期貨提供了截然不同的界限 在不同的線程或進程之間。與多處理池相似,它們 對于“呼叫和應答”類型的交互非常有用,在這種交互中可以進行處理 另一個線程,然后在將來的某個時候(畢竟它們被恰當地命名), 你可以向它詢問結果。它實際上只是多處理池的包裝 和線程池,但它提供了更干凈的應用編程接口,并鼓勵更好的代碼。 未來是一個基本上包裝函數調用的對象。該函數調用在 線程或進程中的背景。未來的對象有方法來檢查 未來已經完成,并在完成后得到結果。 Let's do another ile search example. In the last section, we implemented a version of the unix grep command. This time, let's do a simple version of the find command. The example will search the entire ilesystem for paths that contain a given string of characters: 讓我們做另一個文件搜索示例。在最后一節中,我們實現了一個版本 unix grep命令的。這次,讓我們做一個簡單的查找版本 命令。該示例將在整個ilesystem中搜索包含以下內容的路徑 給定的字符串: ``` from concurrent.futures import ThreadPoolExecutor from pathlib import Path from os.path import sep as pathsep from collections import deque def find_files(path, query_string): subdirs = [] for p in path.iterdir(): full_path = str(p.absolute()) if p.is_dir() and not p.is_symlink(): subdirs.append(p) if query_string in full_path: print(full_path) return subdirs query = '.py' futures = deque() basedir = Path(pathsep).absolute() with ThreadPoolExecutor(max_workers=10) as executor: futures.append( executor.submit(find_files, basedir, query)) while futures: future = futures.popleft() if future.exception(): continue elif future.done(): subdirs = future.result() for subdir in subdirs: futures.append(executor.submit( find_files, subdir, query)) else: futures.append(future) ``` This code consists of a function named find\_files that is run in a separate thread (or process, if we used ProcessPoolExecutor). There isn't anything particularly special about this function, but note how it does not access any global variables. All interaction with the external environment is passed into the function or returned from it. This is not a technical requirement, but it is the best way to keep your brain inside your skull when programming with futures. 這段代碼由一個名為find_files的函數組成,該函數在單獨的線程中運行(或者 流程,如果我們使用ProcessPoolExecutor)。沒有什么特別的 關于這個函數,但是注意它如何不訪問任何全局變量。所有互動 與外部環境一起傳遞到函數中或從函數中返回。這是 這不是一個技術要求,但這是讓你的大腦保持清醒的最好方法 當用期貨編程時。 Accessing outside variables without proper synchronization results in something called a race condition. For example, imagine two concurrent writes trying to increment an integer counter. They start at the same time and both read the value as 5. Then they both increment the value and write back the result as 6. But if two processes are trying to increment a variable, the expected result would be that it gets incremented by two, so the result should be 7. Modern wisdom is that the easiest way to avoid doing this is to keep as much state as possible private and share them through known-safe constructs, such as queues. > 在沒有正確同步結果的情況下訪問外部變量 在一種叫做種族狀況的情況下。例如,想象兩個 試圖遞增整數計數器的并發寫入。他們從 同時讀取值為5。然后它們都增加 值,并將結果寫回6。但是如果有兩個過程 試圖增加一個變量,預期的結果是它 增加2,所以結果應該是7。現代智慧 避免這樣做的最簡單的方法是保持盡可能多的狀態 可能是私有的,并通過已知安全的結構共享它們,例如 排隊。 We set up a couple variables before we get started; we'll be searching for all iles that contain the characters '.py' for this example. We have a queue of futures that we'll discuss shortly. The basedir variable points to the root of the ilesystem; '/' on Unix machines and probably C:\\ on Windows. First, let's have a short course on search theory. This algorithm implements breadth irst search in parallel. Rather than recursively searching every directory using a depth irst search, it adds all the subdirectories in the current folder to the queue, then all the subdirectories of each of those folders and so on. The meat of the program is known as an event loop. We can construct a ThreadPoolExecutor as a context manager so that it is automatically cleaned up and its threads closed when it is done. It requires a max\_workers argument to indicate the number of threads running at a time; if more than this many jobs are submitted, it queues up the rest until a worker thread becomes available. When using ProcessPoolExecutor, this is normally constrained to the number of CPUs on the machine, but with threads, it can be much higher, depending how many are waiting on I/O at a time. Each thread takes up a certain amount of memory, so it shouldn't be too high; it doesn't take all that many threads before the speed of the disk, rather than number of parallel requests, is the bottleneck. 開始之前,我們設置了幾個變量;我們會搜索所有 包含字符’。這個例子的py。我們有一個未來的隊列 稍后討論。basedir變量指向ilesystem的根;/'打開 Unix機器,可能還有視窗上的C:\系統。 首先,讓我們上一堂關于搜索理論的短期課程。該算法實現了廣度 第一次并行搜索。而不是使用深度遞歸搜索每個目錄 首先搜索,它將當前文件夾中的所有子目錄添加到隊列中,然后 每個文件夾的子目錄等等。 程序的核心被稱為事件循環。我們可以構建一個 線程池執行器作為上下文管理器,以便自動清理它 完成后,它的線程會關閉。它需要一個max_workers參數來 指示一次運行的線程數;如果比這更多的工作 提交后,它將剩余的線程排隊,直到有一個工作線程可用。使用時 ProcessPoolExecutor,這通常受限于 機器,但是有線程,它可以更高,這取決于有多少人在等待 一次輸入/輸出。每個線程都占用一定量的內存,所以不應該占用 太高;磁盤速度之前不需要那么多線程,而是 并行請求的數量是瓶頸。 Once the executor has been constructed, we submit a job to it using the root directory. The submit() method immediately returns a Future object, which promises to give us a result eventually. The future is placed on the queue. The loop then repeatedly removes the irst future from the queue and inspects it. If it is still running, it gets added back to the end of the queue. Otherwise, we check if the function raised an exception with a call to future.exception(). If it did, we just ignore it (it's usually a permission error, although a real app would need to be more careful about what the exception was). If we didn't check this exception here, it would be raised when we called result() and could be handled through the normal try...except mechanism. Assuming no exception occurred, we can call result() to get the return value of the function call. Since the function returns a list of subdirectories that are not symbolic links (my lazy way of preventing an ininite loop), result() returns the same thing. These new subdirectories are submitted to the executor and the resulting futures are tossed onto the queue to have their contents searched in a later iteration. So that's all that is required to develop a future-based I/O-bound application. Under the hood, it's using the same thread or process APIs we've already discussed, but it provides a more understandable interface and makes it easier to see the boundaries between concurrently running functions (just don't try to access global variables from inside the future!). 一旦構建了執行器,我們就使用根目錄向它提交一個作業。 submit()方法立即返回一個Future對象,該對象承諾給出 我們最終會有結果。未來就在隊列中。然后循環重復 從隊列中刪除第一個未來并檢查它。如果它還在運行,它會 添加回隊列的末尾。否則,我們檢查函數是否引發了 調用future時出現異常。異常()。如果有,我們就忽略它(通常是 權限錯誤,盡管真正的應用需要更加小心 例外是)。如果我們不在這里檢查這個異常,當我們 調用result(),可以通過正常嘗試進行處理...除了機械裝置。 假設沒有異常發生,我們可以調用result()來獲取 函數調用。因為函數返回一個非符號子目錄列表 鏈接(我防止站點循環的懶惰方法),結果()返回同樣的內容。 這些新的子目錄被提交給執行器,結果的未來是 扔到隊列中,以便在以后的迭代中搜索它們的內容。 這就是開發基于未來的輸入輸出綁定應用程序所需要的全部。下面的 引擎蓋,它使用了我們已經討論過的相同的線程或進程APIs,但是它 提供了一個更容易理解的界面,并且更容易看到邊界 在并發運行的函數之間(只是不要試圖訪問全局變量 從未來開始!)。 ## AsyncIO AsyncIO is the current state of the art in Python concurrent programming. It combines the concept of futures and an event loop with the coroutines we discussed in Chapter 9, The Iterator Pattern. The result is about as elegant and easy to understand as it is possible to get when writing concurrent code, though that isn't saying a lot! AsyncIO can be used for a few different concurrent tasks, but it was speciically designed for network I/O. Most networking applications, especially on the server side, spend a lot of time waiting for data to come in from the network. This can be solved by handling each client in a separate thread, but threads use up memory and other resources. AsyncIO uses coroutines instead of threads. The library also provides its own event loop, obviating the need for the several lines long while loop in the previous example. However, event loops come with a cost. When we run code in an async task on the event loop, that code must return immediately, blocking neither on I/O nor on long-running calculations. This is a minor thing when writing our own code, but it means that any standard library or third-party functions that block on I/O have to have non-blocking versions created. AsyncIO是Python并發編程的最新技術。它 將期貨和事件循環的概念與我們討論過的協程結合起來 在第9章迭代器模式中。結果是一樣優雅和容易理解 因為編寫并發代碼時有可能得到,盡管這并不意味著什么! AsyncIO可以用于一些不同的并發任務,但是它特別 專為網絡輸入輸出設計。大多數網絡應用程序,尤其是服務器上的應用程序 另一方面,花大量時間等待數據從網絡傳入。這可能是 通過在單獨的線程中處理每個客戶端來解決,但是線程會耗盡內存 其他資源。AsyncIO使用coroutines而不是線程。 該庫還提供了自己的事件循環,避免了對幾個 在前面的例子中是長while循環。然而,事件循環伴隨著 成本。當我們在事件循環的異步任務中運行代碼時,該代碼必須返回 立即,既不阻塞輸入/輸出,也不阻塞長期運行的計算。這是一個 在編寫我們自己的代碼時,這是一件小事,但這意味著任何標準庫或 在輸入/輸出上阻塞的第三方函數必須創建非阻塞版本。 AsyncIO solves this by creating a set of coroutines that use the yield from syntax to return control to the event loop immediately. The event loop takes care of checking whether the blocking call has completed and performing any subsequent tasks, just like we did manually in the previous section. AsyncIO通過創建一組使用從語法到 立即將控制返回事件循環。事件循環負責檢查 阻塞調用是否已經完成并執行任何后續任務,只是 就像我們在前一節手動做的那樣。 ### AsyncIO原理 A canonical example of a blocking function is the time.sleep call. Let's use the asynchronous version of this call to illustrate the basics of an AsyncIO event loop: 阻塞函數的典型例子是time.sleep調用。讓我們使用 此調用的異步版本,用于說明AsyncIO事件循環的基礎: ``` import asyncio import random @asyncio.coroutine def random_sleep(counter): delay = random.random() * 5 print("{} sleeps for {:.2f} seconds".format(counter, delay)) yield from asyncio.sleep(delay) print("{} awakens".format(counter)) @asyncio.coroutine def five_sleepers(): print("Creating five tasks") tasks = [ asyncio.async(random_sleep(i)) for i in range(5)] print("Sleeping after starting five tasks") yield from asyncio.sleep(2) print("Waking and waiting for five tasks") yield from asyncio.wait(tasks) asyncio.get_event_loop().run_until_complete(five_sleepers()) print("Done five tasks") ``` This is a fairly basic example, but it covers several features of AsyncIO programming. It is easiest to understand in the order that it executes, which is more or less bottom to top. The second last line gets the event loop and instructs it to run a future until it is inished. The future in question is named five\_sleepers. Once that future has done its work, the loop will exit and our code will terminate. As asynchronous programmers, we don't need to know too much about what happens inside that run\_ until\_complete call, but be aware that a lot is going on. It's a souped up coroutine version of the futures loop we wrote in the previous chapter that knows how to deal with iteration, exceptions, function returns, parallel calls, and more. 這是一個相當基本的例子,但是它涵蓋了AsyncIO編程的幾個特性。 最容易理解的是它的執行順序,大致是底部 到頂端。 最后第二行獲取事件循環,并指示它運行未來,直到它 結束了。正在討論的未來被命名為五個沉睡者。一旦未來到來 完成工作后,循環將退出,我們的代碼將終止。異步 程序員,我們不需要太多了解運行中發生了什么 直到完成通話,但是要注意很多事情正在發生。這是一個加大的花冠 我們在上一章寫的期貨循環的版本,知道如何處理 迭代、異常、函數返回、并行調用等等。 Now look a little more closely at that five\_sleepers future. Ignore the decorator for a few paragraphs; we'll get back to it. The coroutine irst constructs ive instances of the random\_sleep future. The resulting futures are wrapped in an asyncio.async task, which adds them to the loop's task queue so they can execute concurrently when control is returned to the event loop. That control is returned whenever we call yield from. In this case, we call yield from asyncio.sleep to pause execution of this coroutine for two seconds. During this break, the event loop executes the tasks that it has queued up; namely the ive random\_sleep futures. These coroutines each print a starting message, then send control back to the event loop for a speciic amount of time. If any of the sleep calls inside random\_sleep are shorter than two seconds, the event loop passes control back into the relevant future, which prints its awakening message before returning. When the sleep call inside five\_sleepers wakes up, it executes up to the next yield from call, which waits for the remaining random\_sleep tasks to complete. When all the sleep calls have inished executing, the random\_sleep tasks return, which removes them from the event queue. Once all ive of those are completed, the asyncio.wait call and then the five\_sleepers method also return. Finally, since the event queue is now empty, the run\_until\_complete call is able to terminate and the program ends. The asyncio.coroutine decorator mostly just documents that this coroutine is meant to be used as a future in an event loop. In this case, the program would run just ine without the decorator. However, the asyncio.coroutine decorator can also be used to wrap a normal function (one that doesn't yield) so that it can be treated as a future. In this case, the entire function executes before returning control to the event loop; the decorator just forces the function to fulill the coroutine API so the event loop knows how to handle it. 現在再仔細看看五個睡眠者未來。忽略裝飾者 幾段;我們會回來的。協程首先構造了 隨機睡眠的未來。結果期貨被包裝在asyncio.async中 任務,將它們添加到循環的任務隊列中,以便它們可以并發執行 當控制返回到事件循環時。 每當我們調用。在這種情況下,我們稱之為收益 從asyncio.sleep到暫停執行此coroutine兩秒鐘。在...期間 這個中斷,事件循環執行它已經排隊的任務;即ive 隨機睡眠期貨。這些協同程序每個都打印一個開始消息,然后發送 控制返回事件循環一段特定的時間。如果任何睡眠呼叫 在random_sleep短于兩秒鐘的時間內,事件循環將控制權傳回 進入相關的未來,在返回之前打印它的覺醒信息。當...的時候 五個睡眠者中的睡眠調用醒來,它執行到下一個產出 調用,等待剩余的random_sleep任務完成。當所有的 睡眠調用已經完成執行,random_sleep任務返回,這將刪除 他們來自事件隊列。一旦所有這些都完成了,asyncio .等等 調用,然后五睡眠者方法也返回。最后,由于事件隊列是 現在為空,run _直到_complete調用能夠終止,程序結束。 asyncio.coroutine裝飾器主要只是記錄這個coroutine的含義 在事件循環中用作未來。在這種情況下,程序運行正常 沒有裝潢師。然而,也可以使用asyncio.coroutine裝飾器 包裝一個正常的函數(一個不屈服的函數),這樣它就可以被當作一個未來。 在這種情況下,整個函數在將控制返回到事件循環之前執行;這 decorator只是強迫函數填充coroutine應用程序接口,這樣事件循環就知道了 如何處理。 ### 閱讀 AsyncIO Futures An AsyncIO coroutine executes each line in order until it encounters a yield from statement, at which point it returns control to the event loop. The event loop then executes any other tasks that are ready to run, including the one that the original coroutine was waiting on. Whenever that child task completes, the event loop sends the result back into the coroutine so that it can pick up executing until it encounters another yield from statement or returns. This allows us to write code that executes synchronously until we explicitly need to wait for something. This removes the nondeterministic behavior of threads, so we don't need to worry nearly so much about shared state. AsyncIO coroutine按順序執行每一行,直到它從 語句,此時它將控制權返回給事件循環。然后事件循環 執行任何其他準備運行的任務,包括原始任務 科羅廷在等著。每當子任務完成時,事件循環都會發送 結果返回到coroutine中,以便它可以繼續執行,直到遇到 報表或收益的另一個收益。 這允許我們編寫同步執行的代碼,直到我們明確需要 等待什么。這消除了線程的不確定行為,所以我們 不需要太擔心共享狀態。 It's still a good idea to avoid accessing shared state from inside a coroutine. It makes your code much easier to reason about. More importantly, even though an ideal world might have all asynchronous execution happen inside coroutines, the reality is that some futures are executed behind the scenes inside threads or processes. Stick to a "share nothing" philosophy to avoid a ton of dificult bugs. > 避免從內部訪問共享狀態仍然是個好主意 科羅丁。這使得您的代碼更容易推理。更多 重要的是,即使一個理想的世界可能所有的都是異步的 執行發生在協同程序內部,事實是有些未來 在線程或進程內部的幕后執行。堅持 “什么都不分享”的哲學來避免大量困難的錯誤。 In addition, AsyncIO allows us to collect logical sections of code together inside a single coroutine, even if we are waiting for other work elsewhere. As a speciic instance, even though the yield from asyncio.sleep call in the random\_sleep coroutine is allowing a ton of stuff to happen inside the event loop, the coroutine itself looks like it's doing everything in order. This ability to read related pieces of asynchronous code without worrying about the machinery that waits for tasks to complete is the primary beneit of the AsyncIO module. 此外,AsyncIO允許我們在內部收集代碼的邏輯部分 即使我們在其他地方等待其他工作,也只有一個花冠。具體來說 實例,即使asyncio.sleep在隨機睡眠中調用 科羅廷允許大量的事情發生在事件循環中,科羅廷 它看起來一切都井然有序。閱讀相關文章的能力 異步代碼不用擔心等待任務的機器 完成是AsyncIO模塊的主要好處。 ### AsyncIO 網絡 AsyncIO was speciically designed for use with network sockets, so let's implement a DNS server. More accurately, let's implement one extremely basic feature of a DNS server. The domain name system's basic purpose is to translate domain names, such as www.amazon.com into IP addresses such as 72.21.206.6. It has to be able to perform many types of queries and know how to contact other DNS servers if it doesn't have the answer required. We won't be implementing any of this, but the following example is able to respond directly to a standard DNS query to look up IPs for my three most recent employers: AsyncIO是專門為網絡套接字設計的,所以讓我們實現 域名系統服務器。更準確地說,讓我們實現 DNS服務器。 域名系統的基本目的是翻譯域名,例如 www.amazon.com變成了72.21.206.6這樣的知識產權地址。它必須能夠執行 許多類型的查詢,并知道如何聯系其他DNS服務器 需要答案。我們不會實現這些,但是 示例能夠直接響應標準域名系統查詢,為我的 三個最近的雇主: ``` import asyncio from contextlib import suppress ip_map = { b'facebook.com.': '173.252.120.6', b'yougov.com.': '213.52.133.246', b'wipo.int.': '193.5.93.80' } def lookup_dns(data): domain = b'' pointer, part_length = 13, data[12] while part_length: domain += data[pointer:pointer+part_length] + b'.' pointer += part_length + 1 part_length = data[pointer - 1] ip = ip_map.get(domain, '127.0.0.1') return domain, ip def create_response(data, ip): ba = bytearray packet = ba(data[:2]) + ba([129, 128]) + data[4:6] * 2 packet += ba(4) + data[12:] packet += ba([192, 12, 0, 1, 0, 1, 0, 0, 0, 60, 0, 4]) for x in ip.split('.'): packet.append(int(x)) return packet class DNSProtocol(asyncio.DatagramProtocol): def connection_made(self, transport): self.transport = transport def datagram_received(self, data, addr): print("Received request from {}".format(addr[0])) domain, ip = lookup_dns(data) print("Sending IP {} for {} to {}".format( domain.decode(), ip, addr[0])) self.transport.sendto( create_response(data, ip), addr) loop = asyncio.get_event_loop() transport, protocol = loop.run_until_complete( loop.create_datagram_endpoint( DNSProtocol, local_addr=('127.0.0.1', 4343))) print("DNS Server running") with suppress(KeyboardInterrupt): loop.run_forever() transport.close() loop.close() ``` This example sets up a dictionary that dumbly maps a few domains to IPv4 addresses. It is followed by two functions that extract information from a binary DNS query packet and construct the response. We won't be discussing these; if you want to know more about DNS read RFC ("request for comment", the format for deining most Internet protocols) 1034 and 1035. You can test this service by running the following command in another terminal: nslookup -port=4343 facebook.com localhost Let's get on with the entrée. AsyncIO networking revolves around the intimately linked concepts of transports and protocols. A protocol is a class that has speciic methods that are called when relevant events happen. Since DNS runs on top of UDP (User Datagram Protocol); we build our protocol class as a subclass of DatagramProtocol. This class has a variety of events that it can respond to; we are speciically interested in the initial connection occurring (solely so we can store the transport for future use) and the datagram\_received event. For DNS, each received datagram must be parsed and responded to, at which point the interaction is over. So, when a datagram is received, we process the packet, look up the IP, and construct a response using the functions we aren't talking about (they're black sheep in the family). Then we instruct the underlying transport to send the resulting packet back to the requesting client using its sendto method. The transport essentially represents a communication stream. In this case, it abstracts away all the fuss of sending and receiving data on a UDP socket on an event loop. There are similar transports for interacting with TCP sockets and subprocesses, for example. The UDP transport is constructed by calling the loop's create\_datagram\_endpoint coroutine. This constructs the appropriate UDP socket and starts listening on it. We pass it the address that the socket needs to listen on, and importantly, the protocol class we created so that the transport knows what to call when it receives data. Since the process of initializing a socket takes a non-trivial amount of time and would block the event loop, the create\_datagram\_endpoint function is a coroutine. In our example, we don't really need to do anything while we wait for this initialization, so we wrap the call in loop.run\_until\_complete. The event loop takes care of managing the future, and when it's complete, it returns a tuple of two values: the newly initialized transport and the protocol object that was constructed from the class we passed in. 本示例建立了一個字典,將幾個域愚蠢地映射到IPv4 地址。接下來是從二進制文件中提取信息的兩個函數 DNS查詢數據包并構造響應。我們不會討論這些;如果你 想了解更多關于域名系統的信息,請閱讀“征求意見”,格式為 定義大多數互聯網協議)1034和1035。 您可以通過在另一個終端中運行以下命令來測試該服務: nslookup-port = 4343 facebook.com本地主機 讓我們進入正題。AsyncIO網絡圍繞著 傳輸和協議的相關概念。協議是一個具有特定 相關事件發生時調用的方法。因為域名系統運行在頂端 用戶數據報協議;我們將協議類構建為的子類 DatagramProtocol。這個類有各種各樣的事件可以響應;我們是 特別感興趣的是初始連接的發生(僅僅是為了存儲 供將來使用的傳輸)和數據報接收事件。對于域名系統,每個都已收到 數據報必須被解析和響應,此時交互結束。 因此,當收到數據報時,我們處理數據包,查找IP,并構造 使用我們沒有談到的函數的響應(它們是中的害群之馬 家庭)。然后,我們指示底層傳輸將生成的數據包發送回來 使用sendto方法發送給請求客戶端。 傳輸本質上代表一個通信流。在這種情況下,它抽象了 消除了在事件循環的UDP套接字上發送和接收數據的所有麻煩。 有類似的傳輸用于與TCP套接字和子進程交互, 例如。 UDP傳輸是通過調用循環的創建數據報端點來構建的 科羅丁。這將構建適當的UDP套接字并開始偵聽它。我們 傳遞套接字需要監聽的地址,更重要的是,傳遞協議 類,以便傳輸知道在接收數據時調用什么。 因為初始化套接字的過程需要大量的時間 阻止事件循環時,create_datagram_endpoint函數是一個協同函數。在我們的 例如,在等待初始化時,我們不需要做任何事情, 所以我們將調用包裝在循環中。事件循環負責 管理未來,當它完成時,它返回一個由兩個值組成的元組 新初始化的傳輸和從 我們經過的班級。 Behind the scenes, the transport has set up a task on the event loop that is listening for incoming UDP connections. All we have to do, then, is start the event loop running with the call to loop.run\_forever() so that task can process these packets. When the packets arrive, they are processed on the protocol and everything just works. The only other major thing to pay attention to is that transports (and, indeed, event loops) are supposed to be closed when we are inished with them. In this case, the code runs just ine without the two calls to close(), but if we were constructing transports on the ly (or just doing proper error handling!), we'd need to be quite a bit more conscious of it. You may have been dismayed to see how much boilerplate is required in setting up a protocol class and underlying transport. AsyncIO provides an abstraction on top of these two key concepts called streams. We'll see an example of streams in the TCP server in the next example. 在幕后,交通工具已經在事件循環上設置了一個任務 傳入的UDP連接。那么,我們所要做的就是啟動事件循環運行 調用loop.run _ forever(),以便任務可以處理這些數據包。當 數據包到達,它們在協議上被處理,一切正常。 唯一需要注意的另一件大事是運輸(事實上,還有事件 循環)應該在我們完成它們時關閉。在這種情況下,代碼 不需要調用close()就可以正常運行,但是如果我們正在構建傳輸 在ly上(或者只是做正確的錯誤處理!),我們還需要更多 意識到這一點。 你可能會沮喪地看到設置需要多少樣板文件 協議類和底層傳輸。AsyncIO在頂部提供了一個抽象 這兩個被稱為流的關鍵概念。我們將看到一個傳輸控制協議中流的例子 下一個示例中的服務器。 ### 使用執行器包裝塊代碼 AsyncIO provides its own version of the futures library to allow us to run code in a separate thread or process when there isn't an appropriate non-blocking call to be made. This essentially allows us to combine threads and processes with the asynchronous model. One of the more useful applications of this feature is to get the best of both worlds when an application has bursts of I/O-bound and CPUbound activity. The I/O-bound portions can happen in the event-loop while the CPU-intensive work can be spun off to a different process. To illustrate this, let's implement "sorting as a service" using AsyncIO: AsyncIO提供了自己版本的期貨庫,允許我們運行代碼 在單獨的線程或進程中,當沒有合適的非阻塞調用時 有待制造。這本質上允許我們將線程和進程與 異步模型。這個特性的一個更有用的應用是 當應用程序具有突發的輸入/輸出綁定和復制綁定活動時,這是兩全其美的。輸入/輸出綁定部分可能發生在事件循環中,而 CPU密集型的工作可以分散到不同的流程中。為了說明這一點,讓我們 使用AsyncIO實現“分類即服務”: ``` import asyncio import json from concurrent.futures import ProcessPoolExecutor def sort_in_process(data): nums = json.loads(data.decode()) curr = 1 while curr < len(nums): if nums[curr] >= nums[curr-1]: curr += 1 else: nums[curr], nums[curr-1] = \ nums[curr-1], nums[curr] if curr > 1: curr -= 1 return json.dumps(nums).encode() @asyncio.coroutine def sort_request(reader, writer): print("Received connection") length = yield from reader.read(8) data = yield from reader.readexactly( int.from_bytes(length, 'big')) result = yield from asyncio.get_event_loop().run_in_executor( None, sort_in_process, data) print("Sorted list") writer.write(result) writer.close() print("Connection closed") loop = asyncio.get_event_loop() loop.set_default_executor(ProcessPoolExecutor()) server = loop.run_until_complete( asyncio.start_server(sort_request, '127.0.0.1', 2015)) print("Sort Service running") loop.run_forever() server.close() loop.run_until_complete(server.wait_closed()) loop.close() ``` This is an example of good code implementing some really stupid ideas. The whole idea of sort as a service is pretty ridiculous. Using our own sorting algorithm instead of calling Python's sorted is even worse. The algorithm we used is called gnome sort, or in some cases, "stupid sort". It is a slow sort algorithm implemented in pure Python. We deined our own protocol instead of using one of the many perfectly suitable application protocols that exist in the wild. Even the idea of using multiprocessing for parallelism might be suspect here; we still end up passing all the data into and out of the subprocesses. Sometimes, it's important to take a step back from the program you are writing and ask yourself if you are trying to meet the right goals. But let's look at some of the smart features of this design. First, we are passing bytes into and out of the subprocess. This is a lot smarter than decoding the JSON in the main process. It means the (relatively expensive) decoding can happen on a different CPU. Also, pickled JSON strings are generally smaller than pickled lists, so less data is passing between processes. 這是一個很好的代碼實現一些非常愚蠢的想法的例子。整體 將排序作為服務的想法非常荒謬。而是使用我們自己的排序算法 調用Python的排序更糟糕。我們使用的算法叫做侏儒排序, 或者在某些情況下,“愚蠢的那種”。這是一種在純Python中實現的慢速排序算法。 我們定義了自己的協議,而不是使用許多非常合適的協議之一 野外存在的應用協議。甚至使用多重處理的想法 并行性可能在這里受到懷疑;我們最終還是會將所有數據傳入傳出 子流程。有時候,從你的計劃中后退一步是很重要的 正在寫作,問自己是否在努力實現正確的目標。 但是讓我們來看看這種設計的一些智能特征。首先,我們傳遞字節 進出子流程。這比在 主要過程。這意味著(相對昂貴的)解碼可以發生在不同的地方 中央處理器。此外,腌制的JSON字符串通常比腌制列表小,因此數據較少 在進程間傳遞。 Second, the two methods are very linear; it looks like code is being executed one line after another. Of course, in AsyncIO, this is an illusion, but we don't have to worry about shared memory or concurrency primitives. 第二,這兩種方法是非常線性的;看起來代碼被執行了一行 一個接一個。當然,在安森西奧,這是一種幻覺,但我們不必擔心 關于共享內存或并發原語。 ### 數據流 The previous example should look familiar by now as it has a similar boilerplate to other AsyncIO programs. However, there are a few differences. You'll notice we called start\_server instead of create\_server. This method hooks into AsyncIO's streams instead of using the underlying transport/protocol code. Instead of passing in a protocol class, we can pass in a normal coroutine, which receives reader and writer parameters. These both represent streams of bytes that can be read from and written like iles or sockets. Second, because this is a TCP server instead of UDP, there is some socket cleanup required when the program inishes. This cleanup is a blocking call, so we have to run the wait\_closed coroutine on the event loop. Streams are fairly simple to understand. Reading is a potentially blocking call so we have to call it with yield from. Writing doesn't block; it just puts the data on a queue, which AsyncIO sends out in the background. Our code inside the sort\_request method makes two read requests. First, it reads 8 bytes from the wire and converts them to an integer using big endian notation. This integer represents the number of bytes of data the client intends to send. So in the next call, to readexactly, it reads that many bytes. The difference between read and readexactly is that the former will read up to the requested number of bytes, while the latter will buffer reads until it receives all of them, or until the connection closes. 上一個例子現在看起來應該很熟悉,因為它有相似的樣板文件 其他AsyncIO項目。然而,也有一些不同。你會注意到我們 調用start_server而不是create_server。這種方法與AsyncIO掛鉤 而不是使用底層傳輸/協議代碼。而不是路過 在一個協議類中,我們可以傳入一個普通的coroutine,它接收讀取器和 writer參數。這兩者都表示可以從和讀取的字節流 像文件或套接字一樣編寫。其次,因為這是一個TCP服務器而不是UDP, 程序完成時需要進行一些套接字清理。這次清理是 阻塞調用,所以我們必須在事件循環上運行wait_closed coroutine。 溪流很容易理解。閱讀是一個潛在的障礙 我們不得不稱之為屈服。寫作不會阻礙;它只是把數據放在 AsyncIO在后臺發送的隊列。 sort_request方法中的代碼發出兩個讀請求。首先,它是8 字節,并使用大端符號將其轉換為整數。這 integer表示客戶端打算發送的數據字節數。所以在 下一次調用,準確地說,它讀取這么多字節。閱讀和 確切地說,前者將讀取請求的字節數,而 后者將緩沖讀取,直到它接收到所有讀取,或者直到連接關閉。 #### 執行器 Now let's look at the executor code. We import the exact same ProcessPoolExecutor that we used in the previous section. Notice that we don't need a special AsyncIO version of it. The event loop has a handy run\_in\_executor coroutine that we can use to run futures on. By default, the loop runs code in ThreadPoolExecutor, but we can pass in a different executor if we wish. Or, as we did in this example, we can set a different default when we set up the event loop by calling loop.set\_default\_ executor(). As you probably recall from the previous section, there is not a lot of boilerplate for using futures with an executor. However, when we use them with AsyncIO, there is none at all! The coroutine automatically wraps the function call in a future and submits it to the executor. Our code blocks until the future completes, while the event loop continues processing other connections, tasks, or futures. When the future is done, the coroutine wakes up and continues on to write the data back to the client. 現在讓我們看看執行者代碼。我們導入完全相同的ProcessPoolExecutor 我們在前一節中使用的。請注意,我們不需要特殊的海關 它的版本。事件循環有一個方便的運行執行器協同程序 用于運行期貨。默認情況下,循環在線程池執行器中運行代碼,但是 如果我們愿意,我們可以換一個遺囑執行人。或者,正如我們在這個例子中所做的,我們可以 當我們通過調用loop.set_default_ 執行者()。 您可能還記得上一節,沒有太多關于 和遺囑執行人一起使用期貨。然而,當我們在AsyncIO中使用它們時,會有 一個也沒有!coroutine會在將來自動包裝函數調用并提交 交給遺囑執行人。我們的代碼一直阻塞到未來完成,而事件循環 繼續處理其他連接、任務或未來。當未來結束時, coroutine醒來,繼續將數據寫回客戶端。 You may be wondering if, instead of running multiple processes inside an event loop, it might be better to run multiple event loops in different processes. The answer is: "maybe". However, depending on the exact problem space, we are probably better off running independent copies of a program with a single event loop than to try to coordinate everything with a master multiprocessing process. We've hit most of the high points of AsyncIO in this section, and the chapter has covered many other concurrency primitives. Concurrency is a hard problem to solve, and no one solution its all use cases. The most important part of designing a concurrent system is deciding which of the available tools is the correct one to use for the problem. We have seen advantages and disadvantages of several concurrent systems, and now have some insights into which are the better choices for different types of requirements. 您可能會想,與其在一個事件中運行多個進程,不如 循環,最好在不同的進程中運行多個事件循環。答案是 是:“也許”。然而,根據具體的問題空間,我們可能會更好 使用單個事件循環運行程序的獨立副本 用主多處理過程協調一切。 我們已經觸及了這一部分的絕大多數要點,這一章 涵蓋了許多其他并發原語。并發是一個很難解決的問題 解決,沒有一個解決方案是所有用例。設計一個 并發系統正在決定使用哪種可用的工具是正確的 為了這個問題。我們已經看到了幾種并發的優點和缺點 系統,現在已經有了一些見解,知道哪些是不同的更好的選擇 需求類型。 ## 個案研究 ## 摘要
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看