multiprocess模块
先看一个简单的启动新的进程的程序
import os
from multiprocessing import Process
import time
def func():
print(12345)
time.sleep(1)
print(54321)
print('子进程{} 的父进程是{}'.format(os.getpid(),os.getppid()))
if __name__ == '__main__':
p = Process(target=func) # 注册进程,实例化一个进程对象p,尚未启动
p.start() # 启动一个子进程,交给操作系统创建,执行新进程的代码
print('*' * 10) # 如果这里还有语句,这一句实际上和上一句同时执行,二者的结果到底谁先执行出来,则是不确定的.
print('父进程{} 的父进程是{}'.format(os.getpid(), os.getppid()))
执行的结果,和在一个文件里不用多线程,直接先调用func()再print('*'10)的结果完全不同,因为是多进程,所以这个时候func()与print(''*10)的操作是异步的.而且可以看到两个进程的PID是完全不同的,子进程的父进程ID就是父进程ID.实际上,可以通过windows资源管理器看到,pycharm的PID就是父进程的父进程,所以pycharm里直接启动的程序的父进程都是pycharm
仔细说来,multiprocess不是一个模块而是python中一个操作、管理进程的包。 之所以叫multi是取自multiple的多功能的意思,在这个包中几乎包含了和进程有关的所有子模块。由于提供的子模块非常多,为了方便大家归类记忆,我将这部分大致分为四个部分:创建进程部分,进程同步部分,进程池部分,进程之间数据共享。而且multiprocessing是C语言实现的,模块源码内东西不多,官方文档看此.
进程的生命周期:
正常情况下,进程在其程序执行完毕的时候结束.
开启了子进程的主进程:
如果主进程自己的代码后结束,则需要等待自己的代码结束.
子进程的代码后结束,主进程会自己执行完毕之后等待子进程的结果,子进程结束之后,主进程才结束.一句话,就是主进程会在其内部所有(包括自己)的代码执行完毕之后关闭.
不过子进程未必依赖父进程存在.
multiprocessing.Process模块
刚才使用的是multiprocessing模块内的Process模块,这个模块是用于建立新的进程的.
来看一下Process模块的方法:
方法 | 说明 |
---|---|
实例化进程对象 | class Process(object):def __init__(self, group=None, target=None, name=None, args=(), kwargs={}),实例化进程对象要用关键字传参的方式,group不使用,target为调用对象,表示子进程需要执行的任务,args表示target的位置参数元组,kwargs表示调用对象的关键字参数元组,name为子进程名称 |
p.start() | 启动进程,会自动调用p对象的run()方法 |
p.run() | 启动进程时候自动运行的方法,去调用target指向的对象,如果需要自定义进程类,一定要实现run()方法 |
p.terminate() | 强制终止一个进程,不会做任何清理工作.如果p还有子进程,执行后子进程成为僵尸进程,如果p有锁,则锁也不会清除,易造成死锁. |
p.is_alive() | 判断p是否依然生存,如果是则返回True |
p.join([timeout]) | p的主线程等待p终止,timeout为可选的超时时间.主线程处于等,而p处于执行状态.这个方法只能用于start()开启的进程,不能用于run()开启的进程 |
p.daemon | 默认为False,如果设置为True,代表p是后台守护进程.如果p的父进程终止,p也终止.设置为True的时候p无法创建新进程.这个参数必须在start()之前修改,进程开始运行后无法修改 |
p.name | 获取进程名,默认是Process-n |
p.pid | 获取进程的PID |
p.exitcode | 在运行的时候该值为None,如果进程结束了,表示被某个值的信号结束了进程 |
p.authkey | 进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性 |
join方法
from multiprocessing import Process
import time
def func(arg1,arg2):
print('*'*arg1)
time.sleep(1)
print('*'*arg2)
if __name__ == '__main__':
p = Process(target=func,args = (10,20)) # 注册进程
p.start()
p.join()
print('='*10+'主进程运行完了')
join的结果可以看到,子线程运行完了之后,主进程才开始执行.也就是join方法之前的都会先执行掉,主进程才会结束等待继续执行自己的代码.join就像是在当前位置把子进程的结果拼到主进程里,感知一个子进程的结束,将异步的程序改为同步(子进程失败,主进程也失败).
在p.start()和p.join()之间的语句,依然是异步运行的,到了join那一步,才会等待子进程的结束.
开启多个子进程
from multiprocessing import Process
import time
def func(arg1,arg2):
print('*'*arg1,arg1)
time.sleep(3)
print('*'*arg2,arg2)
if __name__ == '__main__':
for i in range(10):
p = Process(target=func,args = (3*i,2*i),name='myprocess') # 注册进程
p.start()
print('主进程运行完了')
可以看到,哪一个进程运行先后是不确定的.
这里修改一下,如果在每个p.start()后边加上p.join(),就变成彻底同步,程序需要约30秒才能执行完毕.
如果想让print('主进程运行完了')代码在所有进程结束之后再打印,如何操作呢?
可以创建一个进程列表.然后确定进程全部结束之后,再运行主进程的代码.(操作系统内部也有类似的管理机制,有进程名单和对应的控制代码)
from multiprocessing import Process
import time
def func(arg1,arg2):
print('*'*arg1,arg1)
time.sleep(0.5)
print('*'*arg2,arg2)
if __name__ == '__main__':
p_list = []
for i in range(10):
p = Process(target=func,args = (3*i,2*i),name='myprocess') # 注册进程
p.start()
p_list.append(p)
[p.join() for p in p_list]
print('主进程运行完了')
这就构成了常用的多进程模型,即主进程可以分发任务给子进程,虽然子进程实际运行顺序不可控,但最后可以统一等待子进程运行结束后组织各个子进程的运行结果,得到最终结果.或者将这种方法用在某一个时点可以让所有的子进程同步.
还可以用继承Process的类来启动进程,自定义类内一定要实现run方法.
from multiprocessing import Process
import os
class MyProcess(Process):
def __init__(self,*args,**kwargs): # 还可以传参数进来
super().__init__()
self.args = args
self.kwargs = kwargs
def run(self): # 必须实现的方法
print(self.name, 'is', self.pid)
print(self.args)
print(self.kwargs)
def start(self):
# 这个时候实例化已经完成,进程已经开启
self.run() # 在进程内调用run方法
if __name__ == '__main__':
print('主进程的PID是:', os.getppid())
p1 = MyProcess(4,6.29,name = 'cony')
p2 = MyProcess(34,2.17,name = 'jenny')
p1.start()
p2.start()
可见继承类的方式要比调用函数更加灵活,可以在一个进程中实例化一个对象.简单代码才考虑直接调用方法.来实际写一个程序:
使用socket和多进程实现并发通信
from socket import *
from multiprocessing import Process
server = socket(AF_INET, SOCK_STREAM)
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 8080))
server.listen(5)
def talk(conn, client_addr):
print(conn, client_addr)
while True:
try:
msg = conn.recv(1024)
if not msg: break
conn.send(msg.upper())
except Exception:
break
if __name__ == '__main__': # windows下start进程一定要写到这下面
while True: # 这里和socketserver其实是一个道理,就是每建立一个连接,就丢到一个新的进程里去.
conn, client_addr = server.accept()
p = Process(target=talk, args=(conn, client_addr))
p.start()
守护进程
守护进程是一种特别的进程,是有普通启动的进程转换而成的.看一个例子:
from multiprocessing import Process
import os, time
def func():
while True:
time.sleep(0.5)
print('PID{} is still runing'.format(os.getpid()))
if __name__ == '__main__':
i = 11
Process(target=func).start()
while i > 0:
print('I am server')
time.sleep(2)
i -= 1
上边这个程序,在主进程运行结束后,子进程依然在运行,因为默认情况下主进程还在等候子进程结束.但是这个时候子进程已经无用.
如果想让主进程结束的时候子进程就结束,就需要将子进程改成一个守护进程.守护进程会随着主进程的代码执行完毕而结束.(而不是随着主进程结束而结束)
from multiprocessing import Process
import os, time
def func():
while True:
time.sleep(0.5)
print('PID{} is still runing'.format(os.getpid()))
if __name__ == '__main__':
i = 11
p = Process(target=func)
p.daemon = True # 设置子进程为守护进程,需要在start方法之前设置
p.start()
while i > 5:
print('I am server',i)
time.sleep(2)
i -= 1
再来看一个例子:
from multiprocessing import Process import os, time def func(): while True: print('PID{} starts'.format(os.getpid())) time.sleep(6) print('PID{} is finished'.format(os.getpid())) if __name__ == '__main__': i = 11 p1 = Process(target=func) p2 = Process(target=func) p1.daemon = True # 设置子进程为守护进程,需要在start方法之前设置 p1.start() p2.start() while i > 5: print('I am server',i) time.sleep(1) i -= 1
主进程执行约5秒,但是两个子进程都睡了6秒,结果发现5秒之后,活着的进程只剩下没有设置成守护进程的p2进程,p1在主进程执行代码完毕的时候就结束了.
守护进程在这里有一篇文章解释.
其实系统的守护进程,就是不受其他进程干扰,一直运行到系统的进程结束为止.设置daemon为True,就是将一个进程改造成守护进程.