Python 35 并发编程-进程 进程同步控制组件: 锁 信号量 事件

Python 35 并发编程-进程 进程同步控制组件: 锁 信号量 事件

旧博客Python系列

锁,信号量和事件,都是用来同步异步程序的组件.

之前多线程编程的时候,都是操作逻辑,还没有实际操作需要共享的数据.但是已经知道,创建进程并且运行,是操作系统去完成的,实际的执行顺序未必可知.如果对多个进程都需要的文件进行操作,必须有一种机制来保证不会错乱,就引入了锁的抽象概念,即一个进程在操作某个I/O对象的时候,会给这个对象上一把锁,如果其他进程来访问,看到锁之后就知道了该对象现在不可访问.待锁去掉之后,再进行访问.
from multiprocessing import Process
import os, time, json


def show(i):
    with open('ticket') as f:
        dic = json.load(f)
        print('编号{}查看了余票: %s'.format(i,dic['ticket']))

def buy(i):
    with open('ticket',) as f:
        dic = json.load(f)
        if dic['ticket'] >0:
            dic['ticket'] -= 1
            print('{}买到票了'.format(i))
        else:
            print('{}没买到票'.format(i))
    time.sleep(0.1)
    with open('ticket','w') as f:
        json.dump(dic,f)

if __name__ == '__main__':
    for i in range(10):
        p = Process(target=buy,args=(i,))
        p.start()

这个程序想要实现查看文件里的剩余票数,如果有票,就将票数减少1,如果为0,就返回买不到票.
但是实际运行起来,发现10个人都买到了票,这显然有问题,是因为进程运行都很快,在第一个访问文件的进程将0写入文件之前,其他进程也读取了1,结果都拿到一样的结果.
如果要改变这种情况,就在文件的门口放一把锁和挂上唯一的一把钥匙.先拿到钥匙的人把门锁上,完成工作之后,打开门然后把钥匙挂在门上,下一个人来的时候,看到门锁着,则无法访问文件,看到有钥匙,则开门进房间.
这个过程抽象成一个锁的概念.锁的使用方法:在需要使用锁的程序代码前后,用Lock()实例化的对象的acquire方法来获得锁,release方法来释放锁.

import json
import time
from multiprocessing import Process
from multiprocessing import Lock


def buy_ticket(i, lock):
    lock.acquire()  # 拿钥匙进门
    with open('ticket') as f:
        dic = json.load(f)
        time.sleep(0.1)
    if dic['ticket'] > 0:
        dic['ticket'] -= 1
        print('\033[32m%s买到票了\033[0m' % i)
    else:
        print('\033[31m%s没买到票\033[0m' % i)
    time.sleep(0.1)
    with open('ticket', 'w') as f:
        json.dump(dic, f)
    lock.release()  # 还钥匙


if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        p = Process(target=buy_ticket, args=(i, lock))
        p.start()

    # 以下是恢复票数
    time.sleep(3)
    with open('ticket') as f:
        dic = json.load(f)
        dic['ticket'] = 3
    with open('ticket','w') as f:
        json.dump(dic, f)

这样每次可以看到,10个人里确实有3个人抢到了票.
注意,启动的10个进程,传入的参数是同一个对象.

信号量

在锁的基础上,如果同时有n个资源可以供超过这些资源的数目进程访问,那么同时只能让n个进程分别访问这个资源,就相当于指定数量的锁,这个时候就用信号量类,来实例化一个信号量对象,用于控制总数.

import time
import random
from multiprocessing import Process
from multiprocessing import Semaphore


def ktv(i, sem):
    sem.acquire()   # 取一个钥匙然后进门
    print('{}进入了KTV'.format(i))
    time.sleep(random.randint(2, 5))
    print('{}离开了KTV'.format(i))
    sem.release()   # 归还钥匙


if __name__ == '__main__':
    sem = Semaphore(4)
    for i in range(10):
        p = Process(target=ktv, args=(i, sem))
        p.start()

这个程序模拟了一个只能容纳4个人的KTV房间,有10个人想唱歌,则必须等进去的人唱好出来之后,才能再进去.
实际上把sem.acquire()直接写在主进程里边,可以看到,反复调用sem.acquire(),只有前四次,之后的代码会响应,之后就会阻塞住.其实就相当于一个加了计数器的锁.

事件

对于锁和信号量可以使所有的进程都进入阻塞状态,也可以控制所有的进程解除阻塞.
一个事件被创建之后,默认是阻塞状态.

from multiprocessing import Event
if __name__ == '__main__':
    e = Event()  # 实例化事件
    print(e.is_set())  # 查看一个事件的状态,默认被设置成阻塞
    print(123456)

结果是False,并且打印了123456,似乎没有阻塞,实际上,事件e是根据e.wait()的值来决定是否阻塞.

if __name__ == '__main__':
    e = Event()
    print(e.is_set())
    e.set()  # 设置事件的状态为True
    print(e.is_set())  # 用来查看一个事件的状态
    e.wait() # 是否阻塞取决于set是否为True
    print(123456)
    e.clear()  # 将事件的状态重新设置为False
    e.wait()
    print(123456)

第一个123456正常打印,第二个123456之前事件e发生阻塞.事件被创建的时候默认为阻塞,并且会在wait()方法的地方阻塞.设置为True的时候,不阻塞.

事件的作用,就是等待某一件事中的一个信号变化,然后做一些动作,再将状态改回来,继续再进入等待状态.这几个进程间同步控制,内部其实写了基于文件的socket,所以如果用错了,可能会出现socket错误.
用一个经典例子解释事件:

import time
from multiprocessing import Process
from multiprocessing import Event


def light(e):
    while True:
        if e.is_set():
            time.sleep(2)
            e.clear()
            print('\033[31m红灯亮了\033[0m ')

        else:
            time.sleep(2)
            e.set()
            print('\033[32m绿灯亮了\033[0m ')


def car(i, e):
    while True:
        if not e.is_set():
            print('{}车在等待通行')
            e.wait()
            print('{}车通过绿灯')


if __name__ == '__main__':
    e = Event()
    traffic = Process(target=light, args=(e,))
    traffic.start()

    for i in range(5):
        cc = Process(target=car, args=(i, e,))
        cc.start()

用一个事件传递到所有的进程中,这个事件的状态一旦改变,各个进程便可以通过这个事件的状态改变作出相应的逻辑.

LICENSED UNDER CC BY-NC-SA 4.0
Comment