队列(进程通信ipc)
队列主要用于解决进程间通信的问题,队列底层就是通过管道和锁的方式实现的。
代码示例:
from multiprocessing import Queue
import time
q=Queue(3) # 指定队列的长度
#队列相关的操作方法
# put,get,put_nowait,get_nowait,full,empty
q.put(3) # 向队列中存放数据,可以是任何类型的数据
q.put(3)
q.put(3)
print(q.full()) # 如果队列满,则返回 True, 否则返回 False
print(q.get()) # 依次取出数据
print(q.get())
print(q.get())
print(q.empty()) # 如果队列为空,则返回True,否则返回 False
主要方法
- q.put(): 用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常
- q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常
- q.get_nowait():同q.get(False)
- q.put_nowait():同q.put(False)
- q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目
- q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
- q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
线程Queue
同进程队列一样,线程也有对于的方法,叫做线程Queue.
import queue
q=queue.Queue(3) # 队列:先进先出,指定队列的大小
q.put(1) # 向队列中放入数据
q.put(2)
q.put(3)
print(q.get()) # 从队列中取出数据
# q.put(4) # 当队列满后会等待有空闲位置时再放入
# q.put_nowait(4) # 立即放入数据,不等待,如果队列已经满,则会报错。
q.put(4,block=False) # 与put_nowait()方法一样,设置不等待,直接放入
q.put(4,block=True,timeout=3) # 等待,且超时时间为3s
优先级队列:
import queue
q=queue.PriorityQueue(3) # 优先级队列
q.put((10,'a')) # 指定优先级,数字越小,优先级越高
q.put((-3,'b'))
q.put((100,'c'))
print(q.get())
print(q.get())
print(q.get())
# 输出结果:
(-3, 'b')
(10, 'a')
(100, 'c')
堆栈,后进先出:
import queue
q=queue.LifoQueue(3) # 堆栈:后进先出
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
输出:
3
2
1
生产者和消费者模型
为了避免死锁问题,能够解耦合,定义了生产者消费者模型。生产者只需要创造数据,然后将数据放入队列,消费者则从队列中取出数据,对数据进行消费。
下面是使用多进程实现了简单的生产者和消费者模型:
from multiprocessing import Process,Queue
import random
import time
def producer(name,food,q):
for i in range(10):
res='%s%s' %(food,i)
time.sleep(random.randint(1,3))
q.put(res)
print("厨师[%s]生产了<%s>" %(name,res))
def consumer(name,q):
while True:
res=q.get()
time.sleep(random.randint(1,3))
print('吃货[%s]吃了<%s>' %(name,res))
if __name__=='__main__':
q=Queue()
p1=Process(target=producer,args=('andy','包子',q))
c1=Process(target=consumer,args=('bob',q))
p1.start()
c1.start()
print('主进程')
在实际的应用中,可能会有多个生产者和消费者,而且我们必须保证在生产者已经生产完数据,并且消费者消费完数据后程序正常退出,所以这里需要使用到JoinableQueue
模块。
from multiprocessing import Process,JoinableQueue # 导入可以使用join方法的模块
import random
import time
def producer(name,food,q):
for i in range(3):
res='%s%s' %(food,i)
time.sleep(random.randint(1,3))
q.put(res)
print("厨师[%s]生产了<%s>" %(name,res))
def consumer(name,q):
while True:
res=q.get()
time.sleep(random.randint(1,3))
print('吃货[%s]吃了<%s>' %(name,res))
q.task_done() # 通过使用队列的task_done方法,通知每一次从队列取出的信息
if __name__=='__main__':
q=JoinableQueue()
p1=Process(target=producer,args=('andy','包子',q))
p2=Process(target=producer,args=('Tom','包子',q))
c1=Process(target=consumer,args=('bob',q))
c2=Process(target=consumer,args=('Lucy',q))
c3=Process(target=consumer,args=('David',q))
c1.daemon=True # 设置为守护进程,当主进程运行完毕时,此子进程也退出
c2.daemon=True
c3.daemon=True
p1.start()
p2.start()
c1.start()
c2.start()
c3.start()
p1.join() # 等待生产子进程运行结束
p2.join()
q.join() # 等待队列为空 后结束主进程
print('主进程')
说明:
- JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
- q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
- q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止