python 之 併發編程(生產者消費者模型、守護進程的應用)

9.8 生產者消費者模型

該模型中包含兩類重要的角色:

1、生產者:將負責造數據的任務比喻為生產者 2、消費者:接收生產者造出的數據來做進一步的處理的被比喻成消費者

實現生產者消費者模型三要素:1、生產者 2、消費者 3、隊列

什麼時候用該模型:

程序中出現明顯的兩類任何,一類任務是負責生產,另外一類任務是負責處理生產的數據的

該模型的好處:

1、實現了生產者與消費者解耦和

2、平衡了生產力與消費力,即生產者可以一直不停地生產,消費者可以不停地處理,因為二者不再直接溝通的,而是跟隊列溝通,從而提高程序整體處理數據的速度

import time
import random
from multiprocessing import Process,Queue
def consumer(name,q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('\033[46m消費者===》%s 吃了 %s\033[0m' %(name,res))
​
def producer(name,q,food):
    for i in range(5):
        time.sleep(random.randint(1,2))
        res='%s%s' %(food,i)
        q.put(res)
        print('\033[45m生產者者===》%s 生產了 %s\033[0m' %(name,res))
​
if __name__ == '__main__':
    q=Queue()                                       #1、共享的盆
   
    p1=Process(target=producer,args=('egon',q,'包子'))  #2、生產者們
    p2=Process(target=producer,args=('劉清政',q,'泔水'))
    p3=Process(target=producer,args=('楊軍',q,'米飯'))
​
    c1=Process(target=consumer,args=('alex',q))         #3、消費者們
    c2=Process(target=consumer,args=('xxx',q))
​
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

9.81 守護進程的應用

問題:消費者c1和c2在取空了q之後,則一直處於死循環中且卡在q.get()這一步

解決方式無非是讓生產者在生產完畢后,往隊列中再發一個結束信號,這樣消費者在接收到結束信號后就可以break退出死循環

import time
import random
from multiprocessing import Process,Queue
​
def consumer(name,q):
    while True:
        res=q.get()
        if res is None:break
        time.sleep(random.randint(1,3))
        print('\033[46m消費者===》%s 吃了 %s\033[0m' %(name,res))
​
def producer(name,q,food):
    for i in range(5):
        time.sleep(random.randint(1,2))
        res='%s%s' %(food,i)
        q.put(res)
        print('\033[45m生產者者===》%s 生產了 %s\033[0m' %(name,res))
    #q.put(None)
if __name__ == '__main__':
    #1、共享的盆
    q=Queue()
    #2、生產者們
    p1=Process(target=producer,args=('egon',q,'包子'))
    p2=Process(target=producer,args=('劉清政',q,'泔水'))
    p3=Process(target=producer,args=('楊軍',q,'米飯'))
    #3、消費者們
    c1=Process(target=consumer,args=('alex',q))
    c2=Process(target=consumer,args=('梁書東',q))
​
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()
​
    p1.join()# 在生產者生產完畢后,往隊列的末尾添加一個結束信號None
    p2.join()
    p3.join()
    # 有幾個消費者就應該放幾個結束信號
    q.put(None)#隊列是共享的,主進程同樣可以往隊列里放None
    q.put(None)

升級版:設置守護進程,向隊列發送結束信號,解決管道取空阻塞問題

JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列允許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

  • maxsize是隊列中允許最大項數,省略則無大小限制

  • q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大於從隊列中刪除項目的數量,將引發ValueError異常

  • q.join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止

import time
import random
from multiprocessing import Process,JoinableQueue
​
def consumer(name,q):
    while True:
        res=q.get()
        if res is None:break
        time.sleep(random.randint(1,3))
        print('\033[46m消費者===》%s 吃了 %s\033[0m' %(name,res))
        q.task_done()   #向q.join()發送一次信號,證明一個數據已經被取走了
def producer(name,q,food):
..........
if __name__ == '__main__':
    #1、共享的盆
    q=JoinableQueue()
    #2、生產者們
    p1=Process(target=producer,args=('egon',q,'包子'))
    p2=Process(target=producer,args=('劉清政',q,'泔水'))
    p3=Process(target=producer,args=('楊軍',q,'米飯'))
    #3、消費者們
    c1=Process(target=consumer,args=('alex',q))
    c2=Process(target=consumer,args=('梁書東',q))
    c1.daemon=True
    c2.daemon=True
​
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()
​
    p1.join()# 確定生產者確確實實已經生產完畢
    p2.join()
    p3.join()
    # 在生產者生產完畢后,拿到隊列中元素的總個數,然後直到元素總數變為0,q.join()這一行代碼才算運行完畢
    q.join()
    #q.join()一旦結束就意味着隊列確實被取空,消費者已經確確實實把數據都取乾淨了
    print('主進程結束')
点赞

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *