Python 34 并发编程-进程 multiprocessing.Process模块

Python 34 并发编程-进程 multiprocessing.Process模块

旧博客Python系列

multiprocess模块

教学博客地址

先看一个简单的启动新的进程的程序

import os
from multiprocessing import Process
import time


def func():
    print(12345)
    time.sleep(1)
    print(54321)
    print('子进程{} 的父进程是{}'.format(os.getpid(),os.getppid()))


if __name__ == '__main__':
    p = Process(target=func)  # 注册进程,实例化一个进程对象p,尚未启动
    p.start()  # 启动一个子进程,交给操作系统创建,执行新进程的代码
    print('*' * 10)  # 如果这里还有语句,这一句实际上和上一句同时执行,二者的结果到底谁先执行出来,则是不确定的.
    print('父进程{} 的父进程是{}'.format(os.getpid(), os.getppid()))

执行的结果,和在一个文件里不用多线程,直接先调用func()再print('*'10)的结果完全不同,因为是多进程,所以这个时候func()与print(''*10)的操作是异步的.而且可以看到两个进程的PID是完全不同的,子进程的父进程ID就是父进程ID.实际上,可以通过windows资源管理器看到,pycharm的PID就是父进程的父进程,所以pycharm里直接启动的程序的父进程都是pycharm

仔细说来,multiprocess不是一个模块而是python中一个操作、管理进程的包。 之所以叫multi是取自multiple的多功能的意思,在这个包中几乎包含了和进程有关的所有子模块。由于提供的子模块非常多,为了方便大家归类记忆,我将这部分大致分为四个部分:创建进程部分,进程同步部分,进程池部分,进程之间数据共享。而且multiprocessing是C语言实现的,模块源码内东西不多,官方文档看此.

进程的生命周期:
正常情况下,进程在其程序执行完毕的时候结束.
开启了子进程的主进程:
如果主进程自己的代码后结束,则需要等待自己的代码结束.
子进程的代码后结束,主进程会自己执行完毕之后等待子进程的结果,子进程结束之后,主进程才结束.一句话,就是主进程会在其内部所有(包括自己)的代码执行完毕之后关闭.
不过子进程未必依赖父进程存在.

multiprocessing.Process模块

刚才使用的是multiprocessing模块内的Process模块,这个模块是用于建立新的进程的.

来看一下Process模块的方法:

方法 说明
实例化进程对象 class Process(object):def __init__(self, group=None, target=None, name=None, args=(), kwargs={}),实例化进程对象要用关键字传参的方式,group不使用,target为调用对象,表示子进程需要执行的任务,args表示target的位置参数元组,kwargs表示调用对象的关键字参数元组,name为子进程名称
p.start() 启动进程,会自动调用p对象的run()方法
p.run() 启动进程时候自动运行的方法,去调用target指向的对象,如果需要自定义进程类,一定要实现run()方法
p.terminate() 强制终止一个进程,不会做任何清理工作.如果p还有子进程,执行后子进程成为僵尸进程,如果p有锁,则锁也不会清除,易造成死锁.
p.is_alive() 判断p是否依然生存,如果是则返回True
p.join([timeout]) p的主线程等待p终止,timeout为可选的超时时间.主线程处于等,而p处于执行状态.这个方法只能用于start()开启的进程,不能用于run()开启的进程
p.daemon 默认为False,如果设置为True,代表p是后台守护进程.如果p的父进程终止,p也终止.设置为True的时候p无法创建新进程.这个参数必须在start()之前修改,进程开始运行后无法修改
p.name 获取进程名,默认是Process-n
p.pid 获取进程的PID
p.exitcode 在运行的时候该值为None,如果进程结束了,表示被某个值的信号结束了进程
p.authkey 进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性

join方法

from multiprocessing import Process
import time


def func(arg1,arg2):
    print('*'*arg1)
    time.sleep(1)
    print('*'*arg2)


if __name__ == '__main__':
    p = Process(target=func,args = (10,20))  # 注册进程
    p.start()
    p.join()
    print('='*10+'主进程运行完了')

join的结果可以看到,子线程运行完了之后,主进程才开始执行.也就是join方法之前的都会先执行掉,主进程才会结束等待继续执行自己的代码.join就像是在当前位置把子进程的结果拼到主进程里,感知一个子进程的结束,将异步的程序改为同步(子进程失败,主进程也失败).
在p.start()和p.join()之间的语句,依然是异步运行的,到了join那一步,才会等待子进程的结束.

开启多个子进程

from multiprocessing import Process
import time


def func(arg1,arg2):
    print('*'*arg1,arg1)
    time.sleep(3)
    print('*'*arg2,arg2)


if __name__ == '__main__':
    for i in range(10):
        p = Process(target=func,args = (3*i,2*i),name='myprocess')  # 注册进程
        p.start()
    print('主进程运行完了')

可以看到,哪一个进程运行先后是不确定的.
这里修改一下,如果在每个p.start()后边加上p.join(),就变成彻底同步,程序需要约30秒才能执行完毕.
如果想让print('主进程运行完了')代码在所有进程结束之后再打印,如何操作呢?
可以创建一个进程列表.然后确定进程全部结束之后,再运行主进程的代码.(操作系统内部也有类似的管理机制,有进程名单和对应的控制代码)

from multiprocessing import Process
import time


def func(arg1,arg2):
    print('*'*arg1,arg1)
    time.sleep(0.5)
    print('*'*arg2,arg2)


if __name__ == '__main__':
    p_list = []
    for i in range(10):
        p = Process(target=func,args = (3*i,2*i),name='myprocess')  # 注册进程
        p.start()
        p_list.append(p)
    [p.join() for p in p_list]
    print('主进程运行完了')

这就构成了常用的多进程模型,即主进程可以分发任务给子进程,虽然子进程实际运行顺序不可控,但最后可以统一等待子进程运行结束后组织各个子进程的运行结果,得到最终结果.或者将这种方法用在某一个时点可以让所有的子进程同步.

还可以用继承Process的类来启动进程,自定义类内一定要实现run方法.

from multiprocessing import Process
import os


class MyProcess(Process):
    def __init__(self,*args,**kwargs):  # 还可以传参数进来
        super().__init__()
        self.args = args
        self.kwargs = kwargs

    def run(self):  # 必须实现的方法
        print(self.name, 'is', self.pid)
        print(self.args)
        print(self.kwargs)

    def start(self):
        # 这个时候实例化已经完成,进程已经开启
        self.run()  # 在进程内调用run方法


if __name__ == '__main__':
    print('主进程的PID是:', os.getppid())
    p1 = MyProcess(4,6.29,name = 'cony')
    p2 = MyProcess(34,2.17,name = 'jenny')
    p1.start()
    p2.start()

可见继承类的方式要比调用函数更加灵活,可以在一个进程中实例化一个对象.简单代码才考虑直接调用方法.来实际写一个程序:
使用socket和多进程实现并发通信

from socket import *
from multiprocessing import Process

server = socket(AF_INET, SOCK_STREAM)
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 8080))
server.listen(5)


def talk(conn, client_addr):
    print(conn, client_addr)
    while True:
        try:
            msg = conn.recv(1024)
            if not msg: break
            conn.send(msg.upper())
        except Exception:
            break


if __name__ == '__main__':  # windows下start进程一定要写到这下面
    while True:  # 这里和socketserver其实是一个道理,就是每建立一个连接,就丢到一个新的进程里去.
        conn, client_addr = server.accept()
        p = Process(target=talk, args=(conn, client_addr))
        p.start()

守护进程

守护进程是一种特别的进程,是有普通启动的进程转换而成的.看一个例子:

from multiprocessing import Process
import os, time


def func():
    while True:
        time.sleep(0.5)
        print('PID{} is still runing'.format(os.getpid()))


if __name__ == '__main__':
    i = 11
    Process(target=func).start()
    while i > 0:
        print('I am server')
        time.sleep(2)
        i -= 1

上边这个程序,在主进程运行结束后,子进程依然在运行,因为默认情况下主进程还在等候子进程结束.但是这个时候子进程已经无用.
如果想让主进程结束的时候子进程就结束,就需要将子进程改成一个守护进程.守护进程会随着主进程的代码执行完毕而结束.(而不是随着主进程结束而结束)

from multiprocessing import Process
import os, time


def func():
    while True:
        time.sleep(0.5)
        print('PID{} is still runing'.format(os.getpid()))


if __name__ == '__main__':
    i = 11

    p = Process(target=func)
    p.daemon = True  # 设置子进程为守护进程,需要在start方法之前设置
    p.start()
    while i > 5:
        print('I am server',i)
        time.sleep(2)
        i -= 1

再来看一个例子:

from multiprocessing import Process
import os, time


def func():
    while True:
        print('PID{} starts'.format(os.getpid()))
        time.sleep(6)
        print('PID{} is finished'.format(os.getpid()))


if __name__ == '__main__':
    i = 11

    p1 = Process(target=func)
    p2 = Process(target=func)
    p1.daemon = True  # 设置子进程为守护进程,需要在start方法之前设置
    p1.start()
    p2.start()
    while i > 5:
        print('I am server',i)
        time.sleep(1)
        i -= 1

主进程执行约5秒,但是两个子进程都睡了6秒,结果发现5秒之后,活着的进程只剩下没有设置成守护进程的p2进程,p1在主进程执行代码完毕的时候就结束了.
守护进程在这里有一篇文章解释.
其实系统的守护进程,就是不受其他进程干扰,一直运行到系统的进程结束为止.设置daemon为True,就是将一个进程改造成守护进程.

LICENSED UNDER CC BY-NC-SA 4.0
Comment