进程池
进程并不是越多越好,由于进程的启动,销毁和调度都需要消耗资源,所以开过多的进程会消耗大量资源,而并不能够获得更高的性能.
所以提出了进程池的概念,先启动一定个数的进程,然后等待任务,用这些进程去执行这些任务,执行完任务后,这个进程接受新的任务.这就是进程池的概念.
python中的进程池用multiprocessing.Pool模块控制.Pool模块是一个简单的进程池模块,只能指定固定个数的进程,不能像有些语言一样可以设定上限和下限.
# 进程池例子
import time
from multiprocessing import Pool
def test(n):
print('This is fun{} running'.format(n))
time.sleep(1)
print('This is fun{} ending'.format(n))
if __name__ == '__main__':
pool = Pool(9)
pool.map(test, range(10)) # map后边传一个序列对象作为参数,如果想要传多个参数,可以用元组传递,在函数内部拆解
用map方法执行的时候,默认异步调用,然后自带.close()和.join()方法,就和之前用Process的情况一样.除此之外,还可以用apply和apply_async方法可以启动进程池:
import time
from multiprocessing import Pool
import os
def test(n):
print('This is fun{} running'.format(n))
print('My id is', os.getpid())
time.sleep(1)
print('This is fun{} ending'.format(n))
if __name__ == '__main__':
pool = Pool(9)
for i in range(10):
pool.apply(test, args=(i,)) # 同步启动线程
经过运行可知,apply是同步提交任务,任务依次完成.异步的方法是apply_async
import time
from multiprocessing import Pool
import os
def test(n):
print('This is fun{} running'.format(n))
print('My id is', os.getpid())
time.sleep(1)
print('This is fun{} ending'.format(n))
if __name__ == '__main__':
pool = Pool(9)
for i in range(10):
pool.apply_async(test, args=(i,)) # 同步启动线程
结果发现主进程直接执行完毕,没有等待子进程,async是真异步.如果要改变成通常状态,需要在所有的任务提交完毕之后,先用pool.close()让进程池不再接受新的任务.此后再用pool.join()来感知进程池中的任务执行结束(进程池中的进程一直存在,但是任务会结束)
import time
from multiprocessing import Pool
import os
def test(n):
print('This is fun{} running'.format(n))
print('My id is', os.getpid())
time.sleep(1)
print('This is fun{} ending'.format(n))
if __name__ == '__main__':
pool = Pool(9)
for i in range(10):
pool.apply_async(test, args=(i,))
pool.close() # 结束进程池接受任务
pool.join() # 感知进程池中的任务都结束了
这样的结果就和原来的传统方法一样了.
用多进程可以比较方便的实现并发,用进程池写一个自己实现的socketserver:
# Mysocketserver
import socket
from multiprocessing import Pool
def handler(conn, addr):
print('Connection established from:', addr)
while True:
data = conn.recv(1024)
conn.send(data.upper())
if __name__ == '__main__':
pool = Pool()
buffer_size = 1024
ip_port = ('127.0.0.1', 8080,)
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sk.bind(ip_port)
sk.listen()
while True:
conn, addr = sk.accept()
pool.apply_async(handler, args=(conn, addr,))
每生成一个新的连接,就把这个连接丢到一个新的进程中,用handler去处理它.然后循环继续监听下一个连接.
进程池的返回值
如果进程中的func有返回值,会如何呢?进程池不像Process,可以拿到进程执行函数的返回值.
from multiprocessing import Pool
def func(i):
return i*i
if __name__ == '__main__':
pool = Pool()
for j in range(10):
res = pool.apply(func,args=(j,))
print(res)
可以发现,拿到了各个进程调用函数的返回值.正常情况下,子进程里return一个值无法被父进程直接接收,只能通过IPC来实现.但是在用进程池的同步方法时,可以拿到各个进程调用函数的返回值.
如果改成apply_async,试验下边的代码:
from multiprocessing import Pool
import time
def func(i):
time.sleep(2)
return i*i
if __name__ == '__main__':
pool = Pool()
for j in range(10):
res = pool.apply_async(func,args=(j,))
print(res.get()) # get这里会阻塞,导致立刻计算出上一步的apply_async对象的结果
发现依然是同步结果,这是因为.get会阻塞,所以虽然异步提交,但依然是同步运行.如果想要从异步的程序里获得结果,依据一样的思路,将res放到一个列表里去,当所有的任务都结束以后,用for循环去get一下结果.
from multiprocessing import Pool
import time
def func(i):
time.sleep(2)
return i * i
if __name__ == '__main__':
pool = Pool()
res_list = []
for j in range(10):
res = pool.apply_async(func, args=(j,))
res_list.append(res)
for k in res_list:
print(k.get())
可以看到先得到了第一批(进程池大小个数)的结果,之后是剩下的结果.
如果用map改写,由于map自带close和join,直接接收结果看看:
from multiprocessing import Pool
import time
def func(i):
time.sleep(0.5)
return i * i
if __name__ == '__main__':
pool = Pool()
res = pool.map(func,range(10))
print(res) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
map一次性会把所有的结果做成一个列表返回.在各个进程执行时间差异比较大的时候,一般还是使用async来部分取得返回结果.
回调函数
先来看例子:
from multiprocessing import Pool
def func1(n):
print('in func')
return n * 4
def func2(nn):
print('in func2')
print('in fun2',nn)
if __name__ == '__main__':
pool = Pool(5)
res = pool.apply_async(func1, args=(10,), callback=func2)
print(res.get())
pool.close()
pool.join()
异步启动了一个进程,执行了func1,传参数10,func1执行的结果交给callback回调函数作为参数执行.注意,res这个时候拿到的依然是func1的返回值,不是回调函数的值.
回调函数不能通过进程池再传其他参数,其参数必须通过进程内调用的函数返回.回调函数是在主进程里执行.
回调函数因为是在主进程中执行,经常会在爬虫中用到.一般在爬虫中最耗时的是网络延迟,举一个例子:
import requests
from multiprocessing import Pool
def get_content(url):
a = requests.get(url)
return a.content.decode('utf-8'), url
def count_length(args):
content,url = args
print(url,len(content))
return len(content)
if __name__ == ""__main__"":
url_lst = [
'http://www.cnblogs.com',
'http://www.baidu.com',
'http://www.sogou.com',
'http://www.sohu.com'
]
pool = Pool(5)
for i in url_lst:
pool.apply_async(get_content, args=(i,),callback=count_length)
pool.close()
pool.join()
这样可以显示出从各个网站取得字符串的长度.还有一个爬虫例子,暂且放着,回头来看.
到这里进程相关的内容就学习完了.知道了多进程之后,编程序可以启动同时启动多项工作来进行,如果各个进程直接需要IPC,则要小心的使用进程安全的传递方式.根据任务的分配,可以选择异步,也可以将各个子进程的结果进行同步.
这里练习比较少,因为多进程主要是附加在任务上的一种方式,需要多多练习才能掌握.