进程间通信叫做 IPC (Inter-Process Communication)
进程间通信通过multiprocessing中的Queue(队列)和Pipe(管道)模块来实现.
multiprocessing.Queue 队列
队列是先进先出的,multiprocessing模块里的Queue是一个多进程安全的队列对象,几乎就是queue.Queue的克隆.
from multiprocessing import Queue
if __name__ == '__main__':
q = Queue(5) # 实例化队列对象,参数表示队列的容纳元素的多少.省略则表示无大小限制.
q.put(1) # 向队列中增加一个元素,如果此时超过队列容量,默认会阻塞
q.get() # 从队列中取出一个元素,如果此时队列为空,默认会阻塞
q.full() # 判断队列是否已满
q.empty() # 判断队列是否已空
将队列对象作为参数传入各个进程,即可实现进程间通信.
这里有一篇介绍Queue的文章,官方文档在此
put和get方法还有各自的put_nowait和get_nowait方法,等于各自的方法里设置false,这样就不会阻塞
但是空的时候get_nowait会报错,而队列满的时候,put_nowait会报错并失去put的值.这样可以捕获异常然后进行后续处理,就不用一直阻塞在那里等待队列.
在一般的数据处理中,生成数据的速度一般比处理比较快,比如在爬虫的实际运行环境中,一般采取多线程进行爬取数据,但是对数据的处理比较慢,就可以将爬取的数据暂时放入一个队列中,然后慢慢处理.或者在数据彼此不相关的情况下,可以增加处理者的线程,同时处理多个数据:
import time,random
from multiprocessing import Process
from multiprocessing import Queue
def producer(name,food, q):
for i in range(10):
time.sleep(random.randint(1,2))
f = '{}生产的{}号{}'.format(name,i, food)
q.put(f)
print('{}生产了{}号{}'.format(name,i, food))
def consumer(name,q):
while True:
try:
k = q.get(timeout=5)
if k is not None:
print(' {}吃了{}'.format(name,k))
time.sleep(2)
else:
print('{}没东西吃了'.format(name))
break
except Exception:
print(' {}等了这么久也没东西吃,看来是没了'.format(name))
break
if __name__ == '__main__':
queue = Queue()
p1 = Process(target=producer, args=('吉祥', '包子',queue,))
p2 = Process(target=producer, args=('红宝石', '蛋糕',queue,))
p3 = Process(target=producer, args=('五芳斋', '粽子',queue,))
p1.start()
p2.start()
p3.start()
c1 = Process(target=consumer,args=('Jenny',queue,))
c2 = Process(target=consumer,args=('Cony',queue,))
c1.start()
c2.start()
p1.join()
p2.join()
queue.put(None)
这个模型初步建立,用到的一个关键是,要在主进程内跟踪所以数据生产者的状态,保证全部生产完毕之后,给队列一个生产完毕的信号.目前的主要问题是,消费者很难判断生产是否结束,现在的做法是向队列里放一个None,子进程拿到None后正常退出,然后使用了一个长时间的超时来判断结束并退出,有没有更好的解决办法呢?
JoinableQueue,每次从队列里获取一个数据的时候,需要向队列提交一个回执,叫做q.taskdone().在每次向JoinableQueue put数据的时候,会有个计数器去加1,提交taskdone的时候,这个计数器会-1.这个兑队列本身也可以.join()来感知这个队列中的数据全部被执行完毕,这样就可以通过这个队列对象来
import time,random
from multiprocessing import Process
from multiprocessing import JoinableQueue
def producer(name,food, q):
for i in range(10):
time.sleep(random.randint(1,2))
f = '{}生产的{}号{}'.format(name,i, food)
q.put(f)
print('{}生产了{}号{}'.format(name,i, food))
q.join() # 阻塞,直到一个队列中的所有数据全部处理完毕,等于一直在等着消费者拿了包子吃完才结束
def consumer(name,q):
while True:
food = q.get()
if food is None:
print('%s获取到了一个空' %name)
break
print('\033[31m%s消费了%s\033[0m' %(name,food))
time.sleep(random.randint(1,3))
q.task_done() # 减少一个计数,get方法并不减少计数,put方法会增加计数.这样就会和生产者进程最后同步,大家一起结束.
if __name__ == '__main__':
queue = JoinableQueue()
p1 = Process(target=producer, args=('吉祥', '包子',queue,))
p2 = Process(target=producer, args=('红宝石', '蛋糕',queue,))
p3 = Process(target=producer, args=('五芳斋', '粽子',queue,))
p1.start()
p2.start()
p3.start()
c1 = Process(target=consumer,args=('Jenny',queue,))
c2 = Process(target=consumer,args=('Cony',queue,))
c1.daemon = True # 设置为守护进程,主进程代码执行完毕就结束
c2.daemon = True # 设置为守护进程,主进程代码执行完毕就结束
c1.start()
c2.start()
p1.join()
p2.join()
这里的流程是这样: 主进程启动各个子进程后,就通过最后两行join等待所有生产者结束,然后每个生产者,又通过队列等待消费者结束.而消费者又是守护进程,会在主进程结束的时候结束.
等到消费者吃光了所有队列里的东西,并且task_done通知队列全部活干完了,生产者就会从阻塞的地方结束,此时主进程等到了生产者结束,也执行到了最后的代码,然后消费者守护进程结束,就在工作全部处理完毕的情况下,关闭了所有进程.这样,可以增加任意多的消费者和生产者,由于队列是多线程安全的(自带锁),这些消费者和生产者互相不冲突.
# 任意多个生产者消费者的模型
import time, random
from multiprocessing import Process
from multiprocessing import JoinableQueue
def producer(name, food, q):
for k in range(10):
time.sleep(random.randint(1, 2))
f = '{}生产的{}号{}'.format(name, k, food)
q.put(f)
print('{}生产了{}号{}'.format(name, k, food))
q.join() # 阻塞,直到一个队列中的所有数据全部处理完毕,等于一直在等着消费者拿了包子吃完才结束
def consumer(name, q):
while True:
food = q.get()
print('\033[31m%s消费了%s\033[0m' % (name, food))
time.sleep(random.randint(1, 3))
q.task_done() # 减少一个计数,get方法并不减少计数,put方法会增加计数.这样就会和生产者进程最后同步,大家一起结束.
if __name__ == '__main__':
queue = JoinableQueue()
p_list = []
for i in range(10):
p = Process(target=producer, args=(str(i), '包子', queue,))
p_list.append(p)
p.start()
for j in range(5):
c = Process(target=consumer, args=(str(j), queue,))
c.daemon = True
c.start()
[i.join() for i in p_list]
multiprocessing.Pipe 管道
管道也是一种组件,可以理解为一个双向的管道,有两个端点,每个端点都有发送和接受方法,从一端send东西过去,另外一端就可以recv进来.如果recv收不到,就会阻塞.
Pipe实例化会得到两个对象,是一个全双工通信管道的两端,假如两个端点为A和B,注意,A的send需要由B的recv来接收,A的recv收到的东西,是B发送的内容.
这里有一个问题就是,如果知道信息全部传送完毕.实际上,管道的引用计数是操作系统控制的,当一个管道双向全部被关闭的时候,这个管道就会关闭,再对这个管道进行操作,就会触发EOFError异常,通过捕捉这个异常,就可以知道管道是否传输完毕.
如果在主进程和多个进程间通信,需要把管道的两端都传递给所有进程,然后只用一端发的进程,就把另外一端close掉,主进程也关闭不用的一端,就像是全双工关闭了一条线路.等通信完毕之后,各自再关闭另外一端,管道就中断,引用计数变成了0,可以捕捉异常了.这里关键是理解,要用的端点,就是插在那一段数据上的管子的一头,另外一头不用了,就关闭,只用插在自己身上的那个端点.
from multiprocessing import Process, Pipe
def senp(sendp, recvp):
recvp.close()
while True:
try:
msg = sendp.recv()
print(msg)
except EOFError:
sendp.close()
break
if __name__ == '__main__':
sendp, recvp = Pipe()
Process(target=senp, args=(sendp, recvp,)).start()
sendp.close()
for i in range(20):
recvp.send('hello')
recvp.close()
其实可以理解为,主进程一根管子分支了很多根插到子进程上,子进程关闭recvp,实际是表示自己不使用这一端,主进程关闭sendp表示不使用这一头,但是其他头依然有引用.最后当主进程关闭自己的recvp这一头,所有的sendp各自被子进程关闭之后,管道的一端被所有进程完全关闭,就会引发EOF错误,表面管道通信已经结束,这个时候就退出通信即可.
将管道作用于生产者消费者模型:
from multiprocessing import Process, Pipe
def producer(con, pro, name):
con.close()
for i in range(10):
pro.send('{}的{}号产品'.format(name, i))
pro.close()
def consumer(con, pro, name):
pro.close()
while True:
try:
msg = con.recv()
print('{}消费了{}'.format(name, msg))
except EOFError:
con.close()
break
if __name__ == '__main__':
con_p, pro_p = Pipe()
for j in range(1):
Process(target=producer, args=(con_p, pro_p, '{}'.format(j))).start()
for k in range(3):
Process(target=consumer, args=(con_p, pro_p, '{}'.format(k))).start()
con_p.close() # 在主进程里创建的管道,分别交给各个子进程,在子进程里都close之后,主进程也需要关闭,所有进程里都需要关闭所使用的管道
pro_p.close() # 所有进程内关闭管道
管道实际上是通过socket来通信的IPC,管道并不是进程安全的,如果想要安全,必须要给管道加锁.进程安全的是multiprocessing.Queue.
所以尽量使用Queue.
应该特别注意管道端点的正确管理问题。如果是生产者或消费者中都没有使用管道的某个端点,就应将它关闭。这也说明了为何在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记执行这些步骤,程序可能在消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常。因此,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。
进程间数据共享
队列和管道,是进程间互相发送消息的渠道.如果所有的进程需要共享数据,一般需要通过锁来保证数据安全性.
展望未来,基于消息传递的并发编程是大势所趋,即便是使用线程,推荐做法也是将程序设计为大量独立的线程集合,通过消息队列交换数据。这样极大地减少了对使用锁定和其他同步手段的需求,还可以扩展到分布式系统中。但进程间应该尽量避免通信,即便需要通信,也应该选择进程安全的工具来避免加锁带来的问题。
多进程其实是不推荐使用数据共享的,如果要共享,就要通过一些能够在进程之间传递的数据结构.比如multiprocessing.Manager模块.
from multiprocessing import Process, Manager
def minus(dic):
dic['count'] -= 1
if __name__ == '__main__':
m = Manager()
dic = m.dict({'count': 100}) # 这个字典会变成可用于数据共享的字典
p_list = []
for i in range(70):
p =Process(target=minus, args=(dic,))
p_list.append(p)
p.start()
[i.join() for i in p_list]
print(dic)
结果运行可以发现,有的时候结果会出现不等于50的情况,说明manager支持的数据类型里,有一部分是多进程不安全的.只要加个锁就可以了.
from multiprocessing import Process, Manager,Lock
def minus(dic,lock):
lock.acquire()
dic['count'] -= 1
lock.release()
if __name__ == '__main__':
m = Manager()
lock = Lock()
dic = m.dict({'count': 100}) # 这个字典会变成可用于数据共享的字典
p_list = []
for i in range(70):
p =Process(target=minus, args=(dic,lock))
p_list.append(p)
p.start()
[i.join() for i in p_list]
print(dic)
列出几篇文章来供多进程通信参考:
理解Python并发编程一篇就够了 - 线程篇
【Multiprocessing系列】共享资源.其中的其他文章也可以看看.
Python multiprocessing 模块解析 (2) – managers 初探轮廓
2023年11月15日后记。最后这几个链接里有两个已经消失了。第一个的博主还在更新博客。