锁,信号量和事件,都是用来同步异步程序的组件.
锁
之前多线程编程的时候,都是操作逻辑,还没有实际操作需要共享的数据.但是已经知道,创建进程并且运行,是操作系统去完成的,实际的执行顺序未必可知.如果对多个进程都需要的文件进行操作,必须有一种机制来保证不会错乱,就引入了锁的抽象概念,即一个进程在操作某个I/O对象的时候,会给这个对象上一把锁,如果其他进程来访问,看到锁之后就知道了该对象现在不可访问.待锁去掉之后,再进行访问.from multiprocessing import Process
import os, time, json
def show(i):
with open('ticket') as f:
dic = json.load(f)
print('编号{}查看了余票: %s'.format(i,dic['ticket']))
def buy(i):
with open('ticket',) as f:
dic = json.load(f)
if dic['ticket'] >0:
dic['ticket'] -= 1
print('{}买到票了'.format(i))
else:
print('{}没买到票'.format(i))
time.sleep(0.1)
with open('ticket','w') as f:
json.dump(dic,f)
if __name__ == '__main__':
for i in range(10):
p = Process(target=buy,args=(i,))
p.start()
这个程序想要实现查看文件里的剩余票数,如果有票,就将票数减少1,如果为0,就返回买不到票.
但是实际运行起来,发现10个人都买到了票,这显然有问题,是因为进程运行都很快,在第一个访问文件的进程将0写入文件之前,其他进程也读取了1,结果都拿到一样的结果.
如果要改变这种情况,就在文件的门口放一把锁和挂上唯一的一把钥匙.先拿到钥匙的人把门锁上,完成工作之后,打开门然后把钥匙挂在门上,下一个人来的时候,看到门锁着,则无法访问文件,看到有钥匙,则开门进房间.
这个过程抽象成一个锁的概念.锁的使用方法:在需要使用锁的程序代码前后,用Lock()实例化的对象的acquire方法来获得锁,release方法来释放锁.
import json
import time
from multiprocessing import Process
from multiprocessing import Lock
def buy_ticket(i, lock):
lock.acquire() # 拿钥匙进门
with open('ticket') as f:
dic = json.load(f)
time.sleep(0.1)
if dic['ticket'] > 0:
dic['ticket'] -= 1
print('\033[32m%s买到票了\033[0m' % i)
else:
print('\033[31m%s没买到票\033[0m' % i)
time.sleep(0.1)
with open('ticket', 'w') as f:
json.dump(dic, f)
lock.release() # 还钥匙
if __name__ == '__main__':
lock = Lock()
for i in range(10):
p = Process(target=buy_ticket, args=(i, lock))
p.start()
# 以下是恢复票数
time.sleep(3)
with open('ticket') as f:
dic = json.load(f)
dic['ticket'] = 3
with open('ticket','w') as f:
json.dump(dic, f)
这样每次可以看到,10个人里确实有3个人抢到了票.
注意,启动的10个进程,传入的参数是同一个对象.
信号量
在锁的基础上,如果同时有n个资源可以供超过这些资源的数目进程访问,那么同时只能让n个进程分别访问这个资源,就相当于指定数量的锁,这个时候就用信号量类,来实例化一个信号量对象,用于控制总数.
import time
import random
from multiprocessing import Process
from multiprocessing import Semaphore
def ktv(i, sem):
sem.acquire() # 取一个钥匙然后进门
print('{}进入了KTV'.format(i))
time.sleep(random.randint(2, 5))
print('{}离开了KTV'.format(i))
sem.release() # 归还钥匙
if __name__ == '__main__':
sem = Semaphore(4)
for i in range(10):
p = Process(target=ktv, args=(i, sem))
p.start()
这个程序模拟了一个只能容纳4个人的KTV房间,有10个人想唱歌,则必须等进去的人唱好出来之后,才能再进去.
实际上把sem.acquire()直接写在主进程里边,可以看到,反复调用sem.acquire(),只有前四次,之后的代码会响应,之后就会阻塞住.其实就相当于一个加了计数器的锁.
事件
对于锁和信号量可以使所有的进程都进入阻塞状态,也可以控制所有的进程解除阻塞.
一个事件被创建之后,默认是阻塞状态.
from multiprocessing import Event
if __name__ == '__main__':
e = Event() # 实例化事件
print(e.is_set()) # 查看一个事件的状态,默认被设置成阻塞
print(123456)
结果是False,并且打印了123456,似乎没有阻塞,实际上,事件e是根据e.wait()的值来决定是否阻塞.
if __name__ == '__main__':
e = Event()
print(e.is_set())
e.set() # 设置事件的状态为True
print(e.is_set()) # 用来查看一个事件的状态
e.wait() # 是否阻塞取决于set是否为True
print(123456)
e.clear() # 将事件的状态重新设置为False
e.wait()
print(123456)
第一个123456正常打印,第二个123456之前事件e发生阻塞.事件被创建的时候默认为阻塞,并且会在wait()方法的地方阻塞.设置为True的时候,不阻塞.
事件的作用,就是等待某一件事中的一个信号变化,然后做一些动作,再将状态改回来,继续再进入等待状态.这几个进程间同步控制,内部其实写了基于文件的socket,所以如果用错了,可能会出现socket错误.
用一个经典例子解释事件:
import time
from multiprocessing import Process
from multiprocessing import Event
def light(e):
while True:
if e.is_set():
time.sleep(2)
e.clear()
print('\033[31m红灯亮了\033[0m ')
else:
time.sleep(2)
e.set()
print('\033[32m绿灯亮了\033[0m ')
def car(i, e):
while True:
if not e.is_set():
print('{}车在等待通行')
e.wait()
print('{}车通过绿灯')
if __name__ == '__main__':
e = Event()
traffic = Process(target=light, args=(e,))
traffic.start()
for i in range(5):
cc = Process(target=car, args=(i, e,))
cc.start()
用一个事件传递到所有的进程中,这个事件的状态一旦改变,各个进程便可以通过这个事件的状态改变作出相应的逻辑.