[TOC]
## 隊列介紹(Queue)
進程彼此之間互相隔離,要實現進程間通信IPC(Inter-Process Communication),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的
**管道存在數據不安全性,而隊列是基于管道+鎖實現的,是安全的,所以主要掌握隊列即可**
### **隊列**
Queue([maxsize]):
創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。
~~~
maxsize是隊列中允許最大項數,省略則無大小限制。
但需要明確:
1、隊列內存放的是消息而非大數據
2、隊列占用的是內存空間,因而maxsize即便是無大小限制也受限于內存大小
~~~
### **主要方法介紹:**
---
* q.put(item [, block [,timeout ] ] )
將item放入隊列。如果隊列已滿,此方法將阻塞至有空間可用為止。
block控制阻塞行為,默認為True。如果設置為False,將引發Queue.Empty異常(定義在Queue庫模塊中)。
timeout指定在阻塞模式中等待可用空間的時間長短。超時后將引發Queue.Full異常。
* q.get( [ block [ ,timeout ] ] )
返回隊列q中的一個項目。如果q為空,此方法將阻塞,直到隊列中有項目可用為止。
block用于控制阻塞行為,默認為True. 如果設置為False,將引發Queue.Empty異常。
timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有項目變為可用,將引發Queue.Empty異常。
---
* q.get_nowait( )
同q.get(False)方法。如果隊列中有項目則獲取,如果沒有也不阻塞而是直接拋出異常
* q.put_nowait( )
同q.put(False)方法。往隊列中放入數據,如果隊列已滿,則不能等待而直接拋出異常(容易丟失數據)
---
* q.qsize()
返回隊列中目前項目的正確數量,多進程模式下并不可靠。
* q.empty()
判斷隊列q是否為空,空返回True,多進程模式下并不可靠。
* q.full()
判斷隊列q是否已滿,已滿返回為True,多進程模式下并不可靠。
---
* q.close()
關閉隊列,防止隊列中加入更多數據。已入隊列但尚未寫入的數據不收影響,
如果q被垃圾收集,將自動調用此方法。
不會對正在被阻塞的get或put方法使用者返回錯誤
### **隊列的簡單使用**
一個進程不停的生成對象并加入到隊列中,另外一個進程不停的從隊列中獲取數據,如果隊列中沒有數據則阻塞,直到產生新數據
~~~
import os,time,random
from multiprocessing import Process,Queue
def inputQ(queue,i):# 向queue中輸入數據的函數
info = str('輸入進程:%s 進程ID:%s'%(i,os.getpid()))
queue.put(info)
time.sleep(random.randint(1,3))
def outputQ(queue,i):# 向queue中輸出數據的函數
info = queue.get()
print('輸出進程:%s 進程ID:%s \t[%s]'%(i,os.getpid(),info))
# print(info)
if __name__ == '__main__':
record1 = []
record2 = []
queue = Queue(3)
for i in range(5):# 輸入進程
process = Process(target=inputQ,args=(queue,i))
process.start()
record1.append(process)
for i in range(5):# 輸出進程
process = Process(target=outputQ,args=(queue,i))
process.start()
record2.append(process)
for p in record1:
p.join()
for p in record2:
p.join()
~~~
運行結果
```
輸出進程:0 進程ID:12320 [輸入進程:0 進程ID:11584]
輸出進程:1 進程ID:8852 [輸入進程:2 進程ID:3716]
輸出進程:2 進程ID:11744 [輸入進程:1 進程ID:12372]
輸出進程:3 進程ID:1252 [輸入進程:3 進程ID:8904]
輸出進程:4 進程ID:12432 [輸入進程:4 進程ID:12968]
```
## 管道(Pipe)
管道在數據管理上是不安全的,而隊列是基于管道+鎖實現了安全的數據管理的,所以管道知識了解即可
管道實例化時會產生兩個端口(左端和右端),分別都可以收發數據,\
### 簡單語法案例如下:
~~~
from multiprocessing import Pipe
l,r=Pipe() #實例化
l.send('來自:l') #從左端口發數據
print(r.recv()) #從右端口收數據
r.send('來自:r') #從右端口發數據
print(l.recv()) #從左端口收數據
l.close() #關閉左端口,不能再寫入數據
#運行結果:
來自:l
來自:r
~~~
如果端口不關閉,且管道沒有數據了,再用recv獲取就會導致程序阻塞
如果端口關閉了,且管道沒有數據了,再用recv獲取就會拋出`EOFError`異常,可以捕獲
### 多進程下語法案例:
由于管道兩端都可以結束數據,所以在多進程下,需要在不不適用關閉不使用的端口,才能在管道中沒有數據,捕獲異常以便結束
~~~
from multiprocessing import Pipe,Process
def cons(L,R):
L.close() #3.子進程不用L輸入,所以一開始就關閉
while True:
try: #子進程用R端接收
print(R.recv())
except EOFError:break
if __name__ == '__main__':
L,R=Pipe()
p1=Process(target=cons,args=(L,R)).start()
R.close() #1.主進程不使R端收發數據,所以一開始就關閉R端
L.send('來自L')
L.close() #2.主進程用L端發完后,也要關閉Ll端
~~~
必須以上端口都關閉后,才能產生我們需要的報錯,然后才能捕獲后停止程序
## 生產者消費者模型
### **為什么要使用生產者消費者模型**
生產者指的是生產數據的任務,消費者指的是處理數據的任務,在并發編程中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。
同樣的道理,如果消費者的處理能力大于生產者,那么消費者就必須等待生產者。為了解決這個問題于是引入了生產者和消費者模式。
### **什么是生產者和消費者模式**
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。
生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區,平衡了生產者和消費者的處理能力。
**阻塞隊列就是用來給生產者和消費者解耦的**
### 生產者消費者模型總結
1. 程序中有兩類角色
一類負責生產數據(生產者)
一類負責處理數據(消費者)
2. 引入生產者消費者模型為了解決的問題是
平衡生產者與消費者之間的速度差
程序解開耦合
3. 如何實現生產者消費者模型
生產者<--->隊列<--->消費者
### 生產者消費者模型實現
基于隊列來實現一個生產者消費者模型
~~~
from multiprocessing import Process,Queue
import time,random,os
def consumer(q,name):#
消費者們:即吃貨們
while True:
res=q.get()
time.sleep(random.randint(1,3))
print('%s 吃 %s' %(name,res))
def producer(q,name,food): #生產者們:即廚師們
for i in range(3):
time.sleep(random.randint(1,3))
res='%s%s' %(food,i)
q.put(res)
print('%s 生產了 %s' %(name,res))
if __name__ == '__main__':
q=Queue()
p1=Process(target=producer,args=(q,'noah','包子'))
c1=Process(target=consumer,args=(q,'bobo'))
p1.start()
c1.start()
print('主'.center(15,'-'))
~~~
執行結果
~~~
-------主-------
noah 生產了 包子0
noah 生產了 包子1
bobo 吃 包子0
noah 生產了 包子2
bobo 吃 包子1
bobo 吃 包子2
~~~
此時的問題是主進程永遠不會結束,原因是:生產者p在生產完后就結束了,但是消費者c在取空了q之后,則一直處于死循環中且卡在q.get()這一步。
解決方式是讓生產者在生產完畢后,往隊列中再發一個結束信號(None),這樣消費者在接收到結束信號后就可以break出死循環
~~~
from multiprocessing import Process, Queue
import time, random, os
def consumer(q, name):# 消費者們:即吃貨們
while True:
res = q.get()
if res is None:break
time.sleep(random.randint(1, 3))
print('%s 吃 %s' % (name, res))
def producer(q, name, food):# 生產者們:即廚師們
for i in range(3):
time.sleep(random.randint(1, 3))
res = '%s%s' % (food, i)
q.put(res)
print('%s 生產了 %s' % (name, res))
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args=(q, 'noah', '包子'))
c1 = Process(target=consumer, args=(q, 'bobo'))
p1.start()
c1.start()
print('主'.center(15, '-'))
p1.join()
q.put(None)
~~~
但是當有多個生產者和多個消費者時,幾個消費者就需要發送幾次結束信號,所以可以使用更高級的JoinableQueue隊列
## JoinableQueue隊列
像是一個Queue對象,但隊列允許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。
但從實際案例中,可以看出JoinableQueue隊列也并不會節省好多代碼,加上真實環境中消息中間件的大量應用,隊列實際使用并不多
### 語法介紹
**JoinableQueue([maxsize])**
* 參數介紹
~~~
maxsize是隊列中允許最大項數,省略則無大小限制。
~~~
### 方法介紹
JoinableQueue的實例p除了與Queue對象相同的方法之外還具有:
* q.task_done():
使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。
如果調用此方法的次數大于從隊列中刪除項目的數量,將引發ValueError異常
* q.join():
生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。
阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止
### 基于JoinableQueue實現生產者消費者模型
~~~
from multiprocessing import Process, Queue,JoinableQueue
import time, random, os
def consumer(q, name):# 消費者們:即吃貨們
while True:
res = q.get()
time.sleep(random.randint(1, 3))
print('%s 吃 %s' % (name, res))
q.task_done() #發送信號給隊列,已經從隊列中取走一個數據并處理完畢了
def producer(q, name, food):# 生產者們:即廚師們
for i in range(2):
time.sleep(random.randint(1, 3))
res = '%s%s' % (food, i)
q.put(res)
print('%s 生產了 %s' % (name, res))
q.join() #等到消費者把自己放入隊列中的所有的數據都取走之后,生產者才結束
if __name__ == '__main__':
q = JoinableQueue()
p1 = Process(target=producer, args=(q, 'noah', '包子'))
p2 = Process(target=producer, args=(q, 'claire', '骨頭'))
c1 = Process(target=consumer, args=(q, 'bobo'))
c1.daemon=True #注意這個守護進程
p1.start()
p2.start()
c1.start()
print('主'.center(15, '-'))
p1.join()
p2.join()
#1、主進程等生產者p1、p2結束
#2、而p1、p2、是在消費者把所有數據都取干凈之后才會結束
#3、主進程結束后,被設置成守護進程的生產者也會結束
~~~
執行結果:
```
-------主-------
noah 生產了 包子0
noah 生產了 包子1
claire 生產了 骨頭0
bobo 吃 包子0
claire 生產了 骨頭1
bobo 吃 包子1
bobo 吃 骨頭0
bobo 吃 骨頭1
Process finished with exit code 0
```
## 進程之間的數據共享(Manager)
進程間數據是獨立的,可以借助于隊列或管道實現通信,二者都是基于消息傳遞的
雖然進程間數據獨立,但可以通過Manager實現數據共享,事實上Manager的功能遠不止于此
**但進程間應該盡量避免通信,即便需要通信,也應該選擇進程安全的工具來避免加鎖帶來的問題。**
**正式環境會使用數據庫來解決現在進程之間的數據共享問題。**
Manager支持很多類型的進程方法,但常用的是list, dict兩種
~~~
from multiprocessing import Process,Manager,Lock
def wahaha(lock,i,dic):
with lock: #操作進程間共享數據一定要要加鎖
dic['count']-=1
if __name__ == '__main__':
lock=Lock()
m=Manager()
dic=m.dict({'count':5})
p_l=[]
for i in range(5):
p1=Process(target=wahaha,args=(lock,i,dic))
p_l.append(p1)
p1.start()
for p in p_l:
p.join()
print('end.....%s'%dic)
~~~
執行結果:
~~~
end.....{'count': 0}
~~~
- 基礎部分
- 基礎知識
- 變量
- 數據類型
- 數字與布爾詳解
- 列表詳解list
- 字符串詳解str
- 元組詳解tup
- 字典詳解dict
- 集合詳解set
- 運算符
- 流程控制與循環
- 字符編碼
- 編的小程序
- 三級菜單
- 斐波那契數列
- 漢諾塔
- 文件操作
- 函數相關
- 函數基礎知識
- 函數進階知識
- lambda與map-filter-reduce
- 裝飾器知識
- 生成器和迭代器
- 琢磨的小技巧
- 通過operator函數將字符串轉換回運算符
- 目錄規范
- 異常處理
- 常用模塊
- 模塊和包相關概念
- 絕對導入&相對導入
- pip使用第三方源
- time&datetime模塊
- random隨機數模塊
- os 系統交互模塊
- sys系統模塊
- shutil復制&打包模塊
- json&pickle&shelve模塊
- xml序列化模塊
- configparser配置模塊
- hashlib哈希模塊
- subprocess命令模塊
- 日志logging模塊基礎
- 日志logging模塊進階
- 日志重復輸出問題
- re正則表達式模塊
- struct字節處理模塊
- abc抽象類與多態模塊
- requests與urllib網絡訪問模塊
- 參數控制模塊1-optparse-過時
- 參數控制模塊2-argparse
- pymysql數據庫模塊
- requests網絡請求模塊
- 面向對象
- 面向對象相關概念
- 類與對象基礎操作
- 繼承-派生和組合
- 抽象類與接口
- 多態與鴨子類型
- 封裝-隱藏與擴展性
- 綁定方法與非綁定方法
- 反射-字符串映射屬性
- 類相關內置方法
- 元類自定義及單例模式
- 面向對象的軟件開發
- 網絡-并發編程
- 網絡編程SOCKET
- socket簡介和入門
- socket代碼實例
- 粘包及粘包解決辦法
- 基于UDP協議的socket
- 文件傳輸程序實戰
- socketserver并發模塊
- 多進程multiprocessing模塊
- 進程理論知識
- 多進程與守護進程
- 鎖-信號量-事件
- 隊列與生產消費模型
- 進程池Pool
- 多線程threading模塊
- 進程理論和GIL鎖
- 死鎖與遞歸鎖
- 多線程與守護線程
- 定時器-條件-隊列
- 線程池與進程池(新方法)
- 協程與IO模型
- 協程理論知識
- gevent與greenlet模塊
- 5種網絡IO模型
- 非阻塞與多路復用IO實現
- 帶著目標學python
- Pycharm基本使用
- 爬蟲
- 案例-爬mzitu美女
- 案例-爬小說
- beautifulsoup解析模塊
- etree中的xpath解析模塊
- 反爬對抗-普通驗證碼
- 反爬對抗-session登錄
- 反爬對抗-代理池
- 爬蟲技巧-線程池
- 爬蟲對抗-圖片懶加載
- selenium瀏覽器模擬