一、守护进程
1、作用:可以用来监测父进程的运行状态(起辅助作用)
2、特点:
》》守护进程内无法再创建子进程
》》当主进程代码正常运行完毕后守护进程就会立即随之结束,这里注意是主进程的代码
3、实现:p.daemon=True 设置为守护进程,必须在开启(start)之前设置
4、举例:
from multiprocessing import Processimport timeimport randomclass Piao(Process): def __init__(self,name): self.name=name super().__init__() def run(self): print('%s is piaoing' %self.name) time.sleep(random.randrange(1,3)) print('%s is piao end' %self.name)p=Piao('egon')p.daemon=True #一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行p.start()print('主')
二、互斥锁
1、应用背景:进程之间地址空间是隔离的,数据不共享,若多个进程要共用同一数据或终端这就产生了竞争关系,竞争就带来了错乱,而将竞争变得有序化的核心就是并发变为串行,这就引入了锁的概念,将共享的数据存到硬盘中
2、作用:可以实现当多个进程要对同一块数据进行修改时,能够保证同一时间只有一个任务可以进行修改(若不加锁的话,谁抢着就可以修改的话,这样是会引起数据的错乱的,数据的错乱就会导致后续执行存在隐患)
3、特点:锁保证了数据的安全性,但是却导致效率变低并且还得自己加锁
4、代码实现:运用multiprocessing模块的Lock类
》》一定要记得开锁操作,也可以用with( with lock :)
》》实现串行也可以用join方法,但是join方法是将多个子进程所有的方法串行,这样显然是不行的5、举例:
from multiprocessing import Process,Lockimport time,json,randomdef search(): dic=json.load(open('db.txt')) print('\033[43m剩余票数%s\033[0m' %dic['count'])def get(): dic=json.load(open('db.txt')) time.sleep(0.1) #模拟读数据的网络延迟 if dic['count'] >0: dic['count']-=1 time.sleep(0.2) #模拟写数据的网络延迟 json.dump(dic,open('db.txt','w')) print('\033[43m购票成功\033[0m')def task(lock): #接受lock对象 search() lock.acquire() #加锁,其余进程就无法继续执行get函数,只能等当前进程执行完后才能执行 get() lock.release() #开锁,必须要有开锁这一步,否则其余进程就无法继续执行了if __name__ == '__main__': lock=Lock() #得到lock对象 for i in range(100): #模拟并发100个客户端抢票 p=Process(target=task,args=(lock,)) #传入lock对象 p.start()
三、IPC机制:进程之间的通信
1、应用背景:针对锁的效率低还要自己加锁的缺点,运用multiprocessing模块提供的基于消息的IPC机制也就是队列和管道,以实现效率高且替我们处理锁的问题
2、队列:基于管道+锁实现的,所以通常我们都用队列;将数据存放在内存中
3、队列的方法: q=Queue (3) 代表最多放3次数据,要想放第4次第5次数据必须要等队列被取出数据有剩余空间时才行
基本方法:
q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的 时间,直到该队列有剩余的空间可以继续放入数据(若timeout为None,那么就会一直等着直到有剩余空间)。如果超时,会抛出Queue.Full异常。如果blocked为False,但 该Queue已满,会立即抛出Queue.Full异常。 q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时 间内没有取到任何元素,会抛出Queue.Empty异常(若timeout为None且是死循环,那么就会一直等下去)。如果blocked为False,有两种情况存在,如果Queue有一个值可 用,则立即返回该值,否则,如果队列为空,则立即抛 出Queue.Empty异常. q.get_nowait():同q.get(False)q.put_nowait():同q.put(False)q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
其它方法(了解)
1 q.cancel_join_thread():不会在进程退出时自动连接后台线程。可以防止join_thread()方法阻塞2 q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集, 将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,如果某个使用者正在被阻塞在get()操作上,关闭生产者中的队列不会导致get()方 法返回错误。3 q.join_thread():连接队列的后台线程。此方法用于在调用q.close()方法之后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。 调用q.cancel_join_thread方法可以禁止这种行为
四、生产者和消费者模型
1、生产者:即是生产数据的程序
2、消费者:即是处理数据的程序
3、模型:生产者和消费者属于不同的进程,它们之间的数据不能直接交互,所以就通过队列来存储生产者生产的数据,而消费者则直接从队列里取出数据去处理
4、举例:
from multiprocessing import Queue,Processimport time,randomdef producer(name,q): for i in range(10): time.sleep(random.randint(1,3)) res='泔水%s' %i q.put(res) #往队列里放入数据 print('厨师 %s 生产了 %s' %(name,res))def consumer(name,q): while True: #因为要不断的从队列中去数据,所以要加循环 res=q.get() #从队列中取出数据,有就取,若队列为空则会等(因为是死循环) if res is None:break #对结束标志进行判断 time.sleep(random.randint(1,3)) print('%s 吃了 %s' %(name,res))if __name__ == '__main__': q=Queue() p1=Process(target=producer,args=('egon',q)) c1=Process(target=consumer,args=('alex',q)) p1.start() c1.start() p1.join() #必须等到生产者程序结束以后才能发送标志位 q.put(None) #若不加生产者的结束标识,那么消费者就会一直处在接受数据的状态下,这样主进程就不会结束了