python 之 併發編程(守護進程、互斥鎖、IPC通信機制)

9.5 守護進程

主進程創建守護進程

  其一:守護進程會在主進程代碼執行結束后就立即終止

  其二:守護進程內無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children

注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止

  • p.daemon:默認值為False,如果設為True,代表p為後台運行的守護進程,當p的父進程終止時,p也隨之終止,並且設定為True后,p不能創建自己的新進程,必須在p.start()之前設置

from multiprocessing import Process
​
def task(name):
    print('%s is running' % name)
    time.sleep(3)
​
if __name__ == '__main__':
    obj = Process(target=task, args=('egon',))
    obj.daemon=True         #設置obj為守護進程,並且父進程代碼執行結束,obj即終止運行
    obj.start()             # 發送信號給操作系統
    print('')

9.6 互斥鎖

互斥鎖用來將併發編程串行,犧牲了效率而保證了數據安全

強調:必須是lock.acquire()一次,然後 lock.release()釋放一次,才能繼續lock.acquire(),不能連續的lock.acquire()

互斥鎖和 join的區別:

二者的原理都是一樣,都是將併發變成串行,從而保證有序

區別一:join是按照人為指定的順序執行,而互斥鎖是進程平等地競爭,誰先搶到誰執行,一個人拿到鎖,其餘人都等待

from multiprocessing import Process,Lock
import random
​
mutex=Lock()
def task1(lock):
    lock.acquire() 
    print('task1:名字是egon')
    print('task1:性別是male')
    lock.release()
    
def task2(lock):
    lock.acquire()
    print('task2:名字是alex')
    print('task2:性別是male')
    lock.release()
    
def task3(lock):
    lock.acquire()
    print('task3:名字是lxx')
    print('task3:性別是female')
    lock.release()
    
if __name__ == '__main__':
    p1=Process(target=task1,args=(mutex,))
    p2=Process(target=task2,args=(mutex,))
    p3=Process(target=task3,args=(mutex,))
    p1.start()# p1.start()
    p2.start()# p1.join()
    p3.start()# p2.start()
             # p2.join()
             # p3.start()
             # p3.join()

9.61 模擬搶票

互斥鎖和 join的區別二:

互斥鎖可以讓一部分代碼(修改共享數據的代碼)串行,而join只能將代碼整體串行

import json
import time
import random
import os
from multiprocessing import Process,Lock
​
mutex=Lock()
def search():
    time.sleep(random.randint(1,3))
    with open('db.json','r',encoding='utf-8') as f:
        dic=json.load(f)
        print('%s 剩餘票數:%s' %(os.getpid(),dic['count']))
​
def get():
    with open('db.json','r',encoding='utf-8') as f:
        dic=json.load(f)
    if dic['count'] > 0:
        dic['count']-=1
        with open('db.json','w',encoding='utf-8') as f:
            json.dump(dic,f)
        print('%s 購票成功' %os.getpid())
​
def task(lock):
    search()
    lock.acquire()
    get()
    lock.release()
​
if __name__ == '__main__':
    for i in range(10):
        p=Process(target=task,args=(mutex,))
        p.start()
        # p.join()

9.7 IPC通信機制

進程彼此之間互相隔離,要實現進程間通信(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的

進程之間通信必須找到一種介質,該介質必須滿足: 1、是所有進程共享的 2、必須是內存空間 附加:幫我們自動處理好鎖的問題

from multiprocessing import Process,Manager,Lock
import time
​
mutex=Lock()
def task(dic,lock):
    lock.acquire()
    temp=dic['num']
    time.sleep(0.1)
    dic['num']=temp-1
    lock.release()
​
if __name__ == '__main__':
    m=Manager()
    dic=m.dict({'num':10})
    l=[]
    for i in range(10):
        p=Process(target=task,args=(dic,mutex))
        l.append(p)
        p.start()
    for p in l:
        p.join()
    print(dic)      #{'num': 0}

9.71創建隊列的類Queue

底層就是以管道和鎖定的方式實現:

隊列 (管道+鎖) :1、共享的空間 2、是內存空間 3、自動幫我們處理好鎖定問題

  • Queue([maxsize]):創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。

  • maxsize是隊列中允許最大項數,省略則無大小限制。

from multiprocessing import Queue
q=Queue(3)      #maxsize=3
q.put('first')
q.put({'second':None})
q.put('')
​
# q.put(4) #阻塞
print(q.get())  #first
print(q.get())  #{'second': None}
print(q.get())  #

強調: 1、隊列用來存成進程之間溝通的消息,數據量不應該過大 2、maxsize的值超過的內存限制就變得毫無意義

了解:block=True(默認值)、timeout

q=Queue(1)
q.put('first',block=False)      #q.put方法用以插入數據到隊列中
q.put('fourth',block=False/True)#queue.Full/一直等
​
q.put('first',block=True)
q.put('fourth',block=True,timeout=3)#等3秒后報錯queue.Full
​
q.get(block=False)#q.get方法可以從隊列讀取並且刪除一個元素
q.get(block=False)#queue.Empty
​
q.get(block=True)
q.get(block=True,timeout=2)#等2秒后報錯queue.Empty
点赞

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *