并发 - 多线程API

并发 - 多线程API

在之前的算法中写出了Dijkstra算法, 后来发现这个人是并发领域的先驱, 很多专有名词都是这个人搞出来的. 又把数据结构过了一遍的今天, 我leetcode也刷了30多题了, 感觉后边一个要好好的看看并发, 一个要好好的看一下网络编程. 之前试着看了一下经典的 Java并发编程实战, 发现还是有

在之前的算法中写出了Dijkstra算法, 后来发现这个人是并发领域的先驱, 很多专有名词都是这个人搞出来的. 又把数据结构过了一遍的今天, 我leetcode也刷了30多题了, 感觉后边一个要好好的看看并发, 一个要好好的看一下网络编程. 之前试着看了一下经典的 Java并发编程实战, 发现还是有点难度的, 正好按照自学计算机的进度, 也需要看一下操作系统了. 这时候正好发现Operating Systems Three Easy Pieces这本书有了中文版, 赶快买回来看看, 发现讲解确实比现代操作系统好多了. 于是现在就准备跟着操作系统先看一遍并发基础理论, 然后通过On Java 8 以及Core Java看一下Java的并发基础部分, 然后逐步再看Java并发实战了.
  1. 老生常谈, 并发问题
  2. POSIX下的线程API
  3. 条件变量

老生常谈, 并发问题

这一段程序:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <pthread.h>

static volatile int counter = 0;

void *mythread(void *arg) {
    printf("%s\n", (char *) arg);

    int i = 0;
    for (i = 0; i < 100000000; i++) {
        if (i & 0x1) {
            counter = counter + 1;
        }
    }

    return NULL;
}


int main(int argc, char *argv[]) {

    pthread_t p1, p2;
    int rc;

    printf("main:begin\n");

    rc = pthread_create(&p1, NULL, mythread, "A");
    rc = pthread_create(&p2, NULL, mythread, "B");

    rc = pthread_join(p1, NULL);
    rc = pthread_join(p2, NULL);

    printf("main:end and counter = %d\n", counter);

    return 0;

};
在Linux下必须使用gcc main.c -lpthread来使用POSIX标准的线程库来进行编译. 可以发现, 每次运行结果都未必相同, 原因其实都知道了, 就是counter = counter + 1这个语句并非是原子操作.

POSIX下的线程API

首先来看创建线程的函数:
pthread_create(pthread_t *thread, const pthread_attr_t *attr, void * (*start_routine)(void *), void *arg);
第一个参数是一个pthread_t类型的指针, 创建线程的函数会往这个结构中放入数据, 以便后续跟踪该线程. 第二个参数是属性, 用于设置该线程的一些属性, 一般可以设置为NULL, 详细的情况就需要看手册. 第三个参数是线程开始运行的函数, 是一个函数指针, 默认就是一个接受参数是void *, 返回 void* 的函数指针, 这也意味这个函数接受任意类型的指针, 返回任意类型的指针, 因此非常灵活. 第四个参数就是实际要传递给函数的参数. 来启动一个线程随便做点事情:
#include <stdio.h>
#include <stdlib.h>

//定义一个结构用于传参数
typedef struct myarg {
    int a;
    int b;
} myarg_t;

//线程函数
void *mythread(void *arg) {

    //把arg强制转换, 然后从其中取出参数
    myarg_t *i = (myarg_t *) arg;
    printf("%c|%c\n", i->a, i->b);

    return (void *) 6666;
}


int main(int argc, char *argv[]) {

    //准备线程标记
    pthread_t new_thread;

    int rc;

    //准备参数
    myarg_t args;
    args.a = 99;
    args.b = 101;

    //创建线程, 其中第四个参数使用参数结构的指针
    rc = pthread_create(&new_thread, NULL, mythread, &args);

    printf("RC is %d\n", rc);

    //等待线程结束
    pthread_join(new_thread, NULL);

    printf("end");
};

上边这个例子里, 线程函数想返回一个6666的int值, 然而是不能够直接返回值的, 更不能够返回指向线程栈中的变量, 否则不知道会是什么样子, 一般是要将结果放入一个已经声明好的结构或者值中. 注意, 我们将 int 转换成了 void *, 即一个指针, 要从其中取出6666, 绝对不能使用*参数, 而是要使用指向指针的指针参数才行. 来看一个上边返回整数的改动:
#include <stdio.h>
#include <stdlib.h>

typedef struct myarg {
    int a;
    int b;
} myarg_t;




void *mythread(void *arg) {

    myarg_t *i = (myarg_t *) arg;
    printf("%c|%c\n", i->a, i->b);

    return (void *) 6666;
}


int main(int argc, char *argv[]) {

    pthread_t new_thread;

    int rc;

    myarg_t args;
    args.a = 99;
    args.b = 101;

    rc = pthread_create(&new_thread, NULL, mythread, &args);

    printf("RC is %d\n", rc);

    int result;

    pthread_join(new_thread, (void **) &result);

    printf("end with result = %d", result);

};
这个如何理解, pthread_join的第二个参数是一个指向指针的指针, 由于线程返回的是一个指针, 因此这里接受一个指向指针的指针, 供函数将返回值(指针)写入到指针的地址中. 这里返回的是一个指向6666地址的指针, 将这个指针的值, 写入到 &result中, 实际上就是把6666写入到rc中. 实在记不住, 就记住将获取结果的指针强制转型成 void** 即可.

使用锁需要先声明一个锁变量, 这个锁变量的类型是pthread_mutex_t, 从名字可以看出来这是一个互斥锁. 然后有两个函数用来获取和释放锁, 很显然, 需要在进入临界区的时候用锁保护:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <pthread.h>


pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;

static volatile int counter = 0;

void *mythread(void *arg) {
    printf("%s\n", (char *) arg);

    int i = 0;
    for (i = 0; i < 100000000; i++) {
        if (i & 0x1) {
            pthread_mutex_lock(&lock);
            counter = counter + 1;
            pthread_mutex_unlock(&lock);
        }
    }

    return NULL;
}


int main(int argc, char *argv[]) {

    pthread_t p1, p2;
    int rc;

    printf("main:begin\n");

    rc = pthread_create(&p1, NULL, mythread, "A");
    rc = pthread_create(&p2, NULL, mythread, "B");

    rc = pthread_join(p1, NULL);
    rc = pthread_join(p2, NULL);

    printf("main:end and counter = %d\n", counter);

    return 0;

};
锁在使用之前必须先初始化, 不能仅仅声明. 这里的锁初始化直接用了一个宏. 如果是用代码初始化, 则是调用pthread_mutex_init(&lock, NULL). 这里的程序的问题是, 获取锁的时候可能出现失败, 导致多个线程进入临界区, 所以一般应该来检查结果, 获取锁正常返回的结果是0, 失败的结果是非零. 在实际程序中, 如果获取锁失败, 则必须要进行一些处理, 想办法继续让不同的线程隔离是好事情.

条件变量

锁是用于只能让一个线程处于临界区内, 而线程之间没有什么交互, 就是去争抢那个锁, 抢到就执行. 在很多情况下, 如果线程之间需要发生某种信号, 如果一个线程等待另外一个线程之类, 条件变量就很有用了. 使用条件变量主要是两个函数:
  1. int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
  2. int pthread_cond_signal(pthread_cond_t *cond)
很显然, 已经知道pthread_mutex_t类型是一个互斥锁, 则可以理解pthread_cond_t是一个条件锁. 很显然, 这两个函数表明了需要使用条件锁的时候, 必须持有另外一个与条件锁相关的互斥锁, 以免在更新条件锁的时候出现问题. 这两个函数都需要在持有锁的情况下进行调用. 第一个函数的具体作用是让当前线程休眠, 并且去等待这个条件. 第二个函数的具体作用是向所有休眠的线程中发送一个信号, 以期望能够唤醒其他线程进行工作. 典型的用法如下:
  1. 线程需要休眠的时候:
        //初始化两个锁, 一个互斥锁, 一个条件锁对象(在全局变量中)
        pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER
        pthread_cond_t cond = PTHREAD_COND_INITIALIZER
    
        //等待判断条件的时候, 必须先持有锁
        pthread_mutex_lock(&lock);
        //判断某个标志
        while(signal == 0)
            //线程进入等待状态(这里为什么传入&lock), 是因为要去释放互斥锁. 线程到这里就被挂起了, 如果不释放, 就要完蛋.
            //在被唤醒之后, 这个函数就返回, 此时会重新先去获得互斥锁, 所以之后的代码依然是在互斥锁中的
            pthread_cond_wait(&cond, &lock);
    
        //在互斥锁的保护下执行与共享变量有关的业务代码
        do something
    
        //业务执行完毕释放锁
        pthread_mutext_unlock(&lock);
    
注意, 一定需要用条件变量, 切不可让一个线程反复自旋(即用while)判断来判断一个条件然后进行工作, 因为普通的自旋也可能会同时读取共享变量导致错误. 虽然知道了这些原理, 但离编写出真正可用的多线程程序还有很长的路要走. 虽然之前看CSAPP的时候, 使用多进程多线程写了一些例子, 其中用到的是信号量的概念, 比锁要稍微低级一点, 但是CSAPP是站在应用程序员的角度, 即比操作系统要高一个层次. 但操作系统导论就比CSAPP的层次要低了, 虽然一开始使用了POSIX的API, 不过讨论了锁的实现. 就继续一路看下去吧, 虽然很难. 下一步就是锁的实现. 看了一会, 发现锁其实也是软硬结合的产物, 还真的有原子性的硬件支持, 怪不得自己总是想不明白.
LICENSED UNDER CC BY-NC-SA 4.0
Comment