并发 - 并发问题的解决与事件驱动

并发 - 并发问题的解决与事件驱动

并发的原语看完, 套路总结一下, 基本有如下三种基本类型: 各个线程读写同一个共享变量, 此时需要使用锁. 各个线程就像是汇聚到一个点的各种线一样. 线程需要互相协作, 在某些点上完成之后再进入到下一个阶段. 有点像本来各个线程是平行线, 然后在会在一个固定的地方交汇, 然后再到下一阶段, 再在固定

并发的原语看完, 套路总结一下, 基本有如下三种基本类型:
  1. 各个线程读写同一个共享变量, 此时需要使用锁. 各个线程就像是汇聚到一个点的各种线一样.
  2. 线程需要互相协作, 在某些点上完成之后再进入到下一个阶段. 有点像本来各个线程是平行线, 然后在会在一个固定的地方交汇, 然后再到下一阶段, 再在固定的地方交汇
  3. 线程之间不交汇, 而是通过共享缓冲区操作, 就像两个车间之间通过一条传送带传送数据, 两个车间中各种一批线程在运行
这些并发的内容, 都使用到了阻塞这个词语, 阻塞实际上是操作系统选择不调度这个线程让其运行, 带给我们的假象. 下边就来看一下, 常见的并发问题, 毕竟有了编写并发的工具, 但还要知道避免常见的问题才可以.
  1. 违反原子性缺陷
  2. 违反顺序缺陷
  3. 死锁
  4. 操作系统提供的事件API
  5. select与异步IO合用

违反原子性缺陷

这种缺陷往往是预期要原子性执行一个代码, 但是实际代码却没有考虑到原子性问题. 这个问题常见于多个线程对同一个变量进行操作, 想要避免这个问题, 很显然需要加锁. 这种代码的例子就不举了, 因为很多直接将单线程程序改成多线程, 就会遇到这种问题

违反顺序缺陷

这种常见于各个线程的操作实际上是有顺序的, 但是忽视了这种顺序, 我自己编写了一段程序:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <semaphore.h>
#include <pthread.h>

sem_t sem;
sem_t sem1;

typedef struct my {
    int a;
    int *b;
} my_struct;

void *init(void *arg) {
    printf("This is child \n");

    my_struct *new_arg = (my_struct *) arg;

    new_arg->a = 0;
    new_arg->b = &new_arg->a;

    printf("init completed\n");

    sem_post(&sem);

    return NULL;
}

void *child2(void *arg) {
    printf("This is child2 \n");

    my_struct *new_arg = (my_struct *) arg;

    if (new_arg->b == NULL) {
        printf("pointer is null , forbidden to read");
    } else {
        printf("b is %d\n", *(new_arg->b));

    }

    printf("child2 ends\n");

    sem_post(&sem1);

    return NULL;
}

int main(void) {

    sem_init(&sem, 0, 0);
    sem_init(&sem1, 0, 0);
    my_struct * cc = malloc(sizeof(struct my));
    printf("child thread begins\n");
    pthread_t c;
    pthread_t c2;


    pthread_create(&c, NULL, child2, cc);
    pthread_create(&c2, NULL, init, cc);

    sem_wait(&sem);
    sem_wait(&sem1);

    printf("Child threads ends\n");

    return 0;
}
这段程序中, 定义了两个线程, 一个线程init用来初始化struct中的a 和 b两个变量, 另外一个B程序去读结构体中b指针指向的值. 主线程等待这两个线程工作都结束之后再向下执行. 从语义上看, 本来的目的是要先初始化结构体, 然后再用另外一个线程去读写, 但是这里仅仅创建了两个线程, 却没有规定好顺序. 这就可能导致在尚未初始化之前, 线程就去读b指针的值, 但由于b指针的值不可知, 所以可能会产生segment fault. 对于需要顺序的工作, 就可以使用信号量来让线程互相等待. 在之前可以知道, 每一个设置成0的信号量, 就可以让一个线程等待另一个线程的结束. 于是在这里, 我们应该让主线程等待child2线程, 而child2线程去等待init线程, 这样就可以保证正确的顺序, 代码修改如下:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <semaphore.h>
#include <pthread.h>

sem_t sem;
sem_t sem1;

typedef struct my {
    int a;
    int *b;
} my_struct;


void *init(void *arg) {
    printf("This is init \n");

    my_struct *new_arg = (my_struct *) arg;

    new_arg->a = 0;
    new_arg->b = &new_arg->a;

    printf("init completed\n");

    //init线程执行结束之后, 对sem信号量执行V操作
    sem_post(&sem);

    return NULL;
}


void *child2(void *arg) {

    //child2等待init1结束再开始进行动作
    sem_wait(&sem);

    printf("This is child2 \n");

    my_struct *new_arg = (my_struct *) arg;

    printf("b is %d\n", *(new_arg->b));

    printf("child2 ends\n");

    //init线程执行结束之后, 对sem1信号量执行V操作
    sem_post(sem1)
    
    return NULL;
}

int main(void) {

    sem_init(&sem, 0, 0);
    sem_init(&sem1, 0, 0);

    my_struct * cc = malloc(sizeof(struct my));


    printf("child thread begins\n");

    pthread_t c;
    pthread_t c2;


    pthread_create(&c, NULL, child2, cc);
    pthread_create(&c2, NULL, init, cc);

    //主线程等待sem1信号量
    sem_wait(&sem1);

    printf("Child threads ends\n");

    return 0;
}
如此修改之后, child2线程会等待init线程完成工作后才开始工作, 主线程则会等到child2线程完成工作后才会工作. 这样就完成了想要的顺序.

死锁

死锁发生在复数互斥资源被竞争的时候, 如果一个线程需要多个互斥资源才能够运作的话, 那么多个线程就可能会产生死锁, 死锁有四个必要条件:
  1. 互斥: 线程对于需要的资源进行互斥访问
  2. 持有并等待: 线程持有一个互斥资源, 同时等待获取其他互斥资源
  3. 资源被抢占后除非主动放弃, 否则无法再被抢占
  4. 循环等待: 线程之间存在一个环路, 环路上的每个线程都持有其他线程等待的资源.
这四个条件必须同时具备才会产生死锁, 只要消除任意一个条件, 就可以解除死锁. 解除死锁的方法主要有:

解除死锁 - 循环等待

循环等待的问题主要是获取资源的顺序问题. 最简单的方法, 就是指定获取锁的顺序. 比如我们设计了两个锁L1和L2, 在每次的调用的时候, 都按照先获取L1才能获取L2的顺序调用, 就不会出现死锁. 有一些函数, 根据传入锁的地址的大小来进行获取锁的操作, 这是一种比较有趣的防止死锁的办法. 当然, 在锁和其他东西混用的时候, 操作就要当心一些了, 尤其是信号量背后是条件变量或者锁, 与普通锁使用的时候, 要注意其中的逻辑

解除死锁 - 持有并等待

避免持有并等待的方法, 就是要么抢到全部的锁, 要么就抢不到任何锁. 这样就不会让任何线程处于持有并等待的过程中. 因为锁本身就是控制互斥, 所以可以用一把大锁把其他锁保护起来, 这样同时只能有一个线程去抢锁, 就避免了持有并等待, 不过这样会降低并发效率, 因为实际上强制所有线程进行同步.

解除死锁 - 非抢占

非抢占的话, 就要改变目前抢占锁的策略, 在之前提到过操作系统提供的try_lock()函数, 当抢锁失败的时候, 会返回-1. 这样就可以采用一个策略, 即如果抢锁失败, 就释放自己的锁, 让其他线程去工作, 比如在一个两个锁的情况下:
while true:
    lock(&L1);
    if (trylock(L2) == -1:
        unlock(L1);
    else:
        do_something()
        unlock(L2)
        unlock(L1)

线程首先会去尝试获取L1, 在拿到L1之后, 会尝试获取L2, 如果L2已经被别人拿走了, 则会返回-1, 此时线程就把L1锁也释放掉. 这样这个线程就会立刻返回到没有持有任意锁的状态, 这实际上是把L2变成了允许别人抢占的东西, 所以不会产生死锁, 即两个线程互相阻塞. 然后这会产生活锁, 如果A线程抢到L1, B线程抢到L2, 然后A一看抢不到L2, 就彻底放飞自我, B抢不到L1, 也放飞自我, 结果在下一轮, 两个线程又各自抢到一个锁. 于是不会阻塞, 但是两个线程会反复运行. 这种情况叫做活锁, 避免活锁的方式是延迟一个线程一段时间, 让其他线程有机会连续拿到锁. 活锁的真正问题还不在于此, 而是在抢第二个锁失败的时候, 如何返回到初始状态, 在一些包装的比较深的函数中, 就比较难以做到这一点.

解除死锁 - 互斥

这里其实没有太多的例子, 只给出了一些导向性的方法, 比如使用硬件的支持来进行操作. 比如比较并交换, 假如如下函数是原子的:
int compare_and_swap(int *address, int expected, int new){
    if (*address == expected){
        *address = new;
        return 1;
    }
    return 0;
}
这个函数比较address位置的值与expected位置的值, 如果相等, 就把new放到address位置中. 如果一个线程去循环执行这个方法的话, 只要其他的线程修改了address位置的值, 交换就不会进行. 比如下列:
void AtomicIncrement(int *value, int amount){
    do {
        int old = *value
    } while(compare_and_swap(value, old, old+amount) == 0)
}
这个线程会先读出原来的值, 然后不断尝试去替换, 如果读出到替换之间被修改, 就继续会尝试下一次读出值然后比较并替换. 这样即使不用锁, 迟早(也未必, 可能会成为活锁)能够更新成功.

解除死锁 - 调度

这个东西我理解了一下, 由于现代处理器可以同时运行多个线程, 实际上操作系统只要做到让会产生冲突的线程不在同一个时间并发运行就可以了.

解除死锁 - 检查和恢复

这里突然变成了心理学教材, 即不要追求完美. 不是所有值得做的事情都值的做好. 当然这不是说对于自己需要做好的事情就可以随便对待, 而是总体情况可以接受的情况下寻找适当的策略. 比如操作系统偶尔也会崩溃, 那么就愉快的重启就可以了. 这里很多理论听上去还是比较容易的, 不过稍微一想就会发现写起来并不容易. 此外这些原语在Java中如何体现, 还是要好好的看一下Java并发编程. 下边是事件驱动, 纯粹的事件驱动就不是用多线程并发来解决问题, 而是使用单线程来解决问题, 这次对事件驱动的理解会更深一些. 事件驱动也叫基于事件的并发, 这其实是依赖了操作系统提供的功能, 处理事件的服务器并不一定需要使用多线程. 所以基于事件驱动的程序, 就不像多线程程序, 其中必须要有一个线程时刻运行, 整个进程不能被长时间挂起, 而基于事件的服务器最大的区别就是整个进程可以被挂起, 直到某些事件发生.

操作系统提供的事件API

基于事件驱动的理论, 与现实世界的联系更加紧密, 比如人常常就是事件驱动的, 睡觉休息, 需要做事情的时候喊一声, 做完了之后继续睡觉. 就相当于一个休眠的程序, 被外界通知了, 就开始做事情. 一个典型的基于事件的服务器的伪代码如下, 注意这个是单线程的, 不会强制使用多线程:
while(1){
    // 获取所有的事件
    events = getEvents();

    for event in events:
        processEvent(event)

}
重点就是每次事件由谁来提供, 一个事件服务器中的事件可以由外界来提供, 也可以像有界缓冲区那样, 一个线程作为事件服务器,而另外一些线程用于提供事件. 服务器经常针对的是I/O操作, 这里的向事件服务器提供事件的函数, 是由操作系统实现的. 类UNIX系统提供了select()函数, 应用程序调用这个系统调用的时候, 一旦调用, 就会被挂起, 如果有任何的IO返回, 操作系统就会将描述符准备好, 然后唤醒休眠的进程, 这样就好像在这个期间内应用程序睡眠了, 一有事件, 就醒了. 详细的select函数如下:
int select(
    int nfds, 一个数量, 检查每个集合中的描述符的数量
    fd_set *readfds  准备好读取的描述符集合
    fd_set *writefds 准备好写入的描述符集合
    fd_set *errorfds 有异常待处理的描述符集合
    struct timeval *timeout
    )
每次执行select函数的时候, 需要传入nfds, select会检查每个描述符集合中不超过这个nfds数量的描述符. 三个描述符集合被传入的时候, 应该是所有要监听的描述符集合, 在执行了select之后, 这三个描述符集合会被替换成可用的描述符集合. 如果没有任何IO操作, 应用程序执行select就会进入休眠. 如果休眠时间超过timeout, 就会报错. 如果将timeout设置成NULL, 会导致只要没有描述符可用, 操作系统就会一直让应用程序休眠在select调用上. 如果将timeout设置为0, 则方法会立刻返回, 不管有没有描述符可用, 没有可用的描述符就返回0, 有可用的描述符则select不会返回0. 这就提供了一个基础的非阻塞事件循环的方法. 即先准备好要监听哪些描述符, 然后需要更新描述符到一个新的结构中, 之后针对描述符来调用select函数. 可以选择让select阻塞还是直接返回. 看CSAPP当时记录的代码在这里, 只要注意select函数每次会用一个集合代替传入的集合, 所以要准备好两个集合, 一个集合放着所有需要监听的描述符. 一个集合每次复制所有需要监听的描述符, 然后将这个集合传给select函数当做参数, 然后去检查并处理.

select与异步IO合用

可以看到, 如果想要不阻塞的操作, 可以将select的超时设置为0, 这样在等待IO可用的时候不会阻塞. 但是还有一个问题, 在处理事件的时候要操作I/O, 如果I/O阻塞了会如何. 由于事件服务器是单线程, 任意阻塞都会造成整个服务器阻塞, 则之后的事件请求会一直堆积得不到处理, 直到解决当前事件为止. 很显然, 由于事件处理这一模型的出现, 就要去I/O也不能堵塞, 这就出现了异步I/O, 即 asynchronous I/O 也被称作AIO. 传统的IO进行一次系统调用, 然后阻塞, 当返回的时候, IO已经完成. 为了达成异步IO, 操作系统现在提供了新的接口, 即调用一个接口启动IO, 再调用另外一个接口检查刚才IO的结果. 即为了实现AIO, 需要用至少两个接口来代替原来的单个IO接口. 看一个例子:
  1. 发起IO的接口: int aio_read(struct aiocb *target), 这个方法接收一个特殊的结构作为参数, 通过这个结构指定要读取的内容, 大小和存放的位置.
  2. 检查IO是否完成的接口: int aio_error(const struct aiocb *target), 这个方法的名称有点奇怪, 用于检查这个特殊的结构是否读取完成, 如果完成了,就返回0, 如果还没有, 就返回非零.
  3. 这个结构如下:
        struct aiocb {
            int aio_fildes;  //这个是文件描述符
            off_t aio_offeset; //这个是文件的偏移量, 也就是起始点
            volatile void *aio_buf; //要读取到的缓冲区位置
            size_t aio_nbytes; //要读取的长度
        }
    
这样应用程序可以周期性的轮询, 来检测指定的IO操作是否完成. 不过如果事件很多, 轮询的效率不高. 所以很多时候可以采用信号的方法, 在异步IO完成的时候通知应用程序. 信号属于操作系统的一个基础设施, 因此可以放心的使用. 所以将检查IO的操作放在信号处理程序中的话, 就不用轮询, 而是可以等待操作系统通知了. 异步IO是需要操作系统的支持的, 所以在不支持异步IO的操作系统中, 无法实现纯粹的基于事件的处理器, 只能采取一系列模拟的方式. 比如用一个线程充当事件服务器, 然后用其他线程去处理所有需要处理的文件描述符, 而不是真正等待操作系统通知. 由于有了多个线程, 因此阻塞IO是不影响那个事件处理线程的响应的. 基于事件驱动的服务器显然效率更高而且不吃多线程, 但是带来的问题就是复杂的状态管理, 也即编写代码的复杂程度比较高, 最直观的就是要采取额外的数据结构来保存上边提到的struct aiocb.
LICENSED UNDER CC BY-NC-SA 4.0
Comment