终于站在CSAPP最后一章的门前了, 一年以前买这本书的时候还看不进去, 现在竟然已经全部看完而且看懂了. 内心还是有点小小的激动, 这种力量正不断涌现的感觉...
- 并发
- 多进程程序
- I/O多路复用 - select 函数
- I/O多路复用 - 事件驱动的并发
- I/O多路复用 - 优缺点
并发
在第八章看信号的时候, 已经会知道各个逻辑控制流在时间上会交错发生, 这就是并发. 并发是一种很普遍的现象, 出现在计算机系统的各个层面上.
操作系统提供了构造并发程序的三种基本方法:
- 进程. 进程是相对独立的程序运行环境, 如果想要和其他进程通信, 需要显式的使用进程间通信(IPC)机制, 比如信号或者wait函数.
- I/O多路复用, 这是应用程序在一个进程中调度逻辑流, 数据到达文件描述符后, 整个进程在不同的状态中切换来切换去. 所有的逻辑流共享同一个空间.
- 线程.线程是运行在一个进程内部的独立的逻辑流, 也由内核进行调度, 既像进程一样相对独立, 又像I/O多路复用一样共享所有的虚拟地址空间.
基于进程的并发服务器
基于进程的并发思想是, 将每一个逻辑流对应一个进程去执行, 由于每个逻辑流完全独立, 所以可以各自做各自的工作.
多进程的一个特点是一定要定时回收僵死的子进程, 需要采取之前的信号策略, 即在处理信号的时候阻塞相同信号, 每次处理信号都尽可能的处理完所有要处理的情况.
这里可以用多进程来修改一下11章中的套接字服务端, 让其可以同时接收多个连接, 除了上述的要求之外, 还必须要注意描述符. 子进程出现瞬间, 父进程和子进程各有两个文件描述符, 一个指向同一个监听套接字描述符, 一个指向同一个已连接描述符.
在子进程结束的时候, 子进程的监听套接字描述符和已连接描述符都被系统关闭. 但是父进程依然持有已连接描述符, 如果不加以关闭, 很快就会占满资源, 而且父进程不关闭已连接描述符, 系统中的连接依然存在, 因此这需要在分支出子进程的时候做一些处理, 即父进程关闭已连接描述符, 子进程关闭监听套接字描述符.
来改造一下这个服务端, 红色的部分是改造的部分:
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include "csapp.h"
#define MAXLINE 8192
void sigchld_handler(int sig) {
//不断循环直到所有的僵死子进程都被回收
while ((waitpid(-1, 0, WNOHANG)) > 0){}
}
void echo(int connfd);
int main(int argc, char **argv) {
//声明监听套接字和已连接套接字
int listenfd, connfd;
//声明客户端长度
socklen_t clientlen;
//这个是特殊的结构, 用于存放客户端的套接字地址结构
struct sockaddr_storage clientaddr;
//这两个结构用于存放getnameinfo的结果
char client_hostname[MAXLINE], client_port[MAXLINE];
//判断命令行是否错误
if (argc != 2) {
fprintf(stderr, "usage: %s <port>\n", argv[0]);
exit(0);
}
//添加信号处理函数
Signal(SIGCHLD, sigchld_handler);
//使用编写的函数打开套接字
listenfd = Open_listenfd(argv[1]);
//开始无限循环, 每一次进来连接就开启新进程, 让子进程操作连接, 主进程关闭已连接描述符
while (1) {
//计算保存客户端套接字地址的长度
clientlen = sizeof(struct sockaddr_storage);
//调用accept函数, 将客户端套接字地址放入 clientaddr
connfd = Accept(listenfd, (SA *) &clientaddr, &clientlen);
if(Fork()==0) {
//子进程关闭监听套接字描述符
Close(listenfd);
//调用 getnameinfo 将获取的客户端套接字地址转换成域名和端口
Getnameinfo((SA *) &clientaddr, clientlen, client_hostname, MAXLINE, client_port, MAXLINE, 0);
//打印客户端的连接信息
printf("Connected to (%s, %s)\n", client_hostname, client_port);
//调用函数处理客户端发来的信息
echo(connfd);
//关闭已连接套接字描述符并退出
Close(connfd);
exit(0);
}
//父进程关闭已连接描述符
Close(connfd);
}
exit(0);
}
注意其中的红色部分, 子进程先关闭监听套接字, 父进程关闭已连接套接字. 这样子进程正常退出的时候, 才能关闭整个客户端的连接, 而监听套接字一直被主进程使用.
多进程的好处是:
- 文件表共享, 可以方便共享第三方信息
- 独立内存空间和写时复制的变量, 各个进程之间状态互相不影响
多进程的缺点是:
- 进程的代价高昂, 运行速度慢
- 进程通信比较困难, 必须使用显式的IPC比如waitpid, 管道等
练习 12.1
解答, 因为父子进程共享同一个文件表, 在fork()瞬间, 父子进程的connfd指向同一个文件表项, 这个表项的引用是2, 在父进程关闭了已连接描述符后, 表项的引用是1, 因此依然是一个有效的描述符, 子进程可以继续使用这个描述符进行通信.
练习 12.2
因为子进程结束的时候, 会释放所有的文件描述符. 而此时父进程已经关闭了已连接描述符, 所以子进程无论是否关闭, 它自己的这个已连接描述符不会被其他进程使用. 所以子进程结束的时候文件表项会被删除, 不会出现内存泄露.
I/O多路复用 - select 函数
依稀记得刚开始学Python的时候, 到最后看I/O多路复用还以为自己稍微理解了内容. 现在看来当时的自己还差得远, 现在可以从系统上来看看I/O多路复用了.
当一个程序同时需要多个I/O, 并且针对不同的I/O进行处理的时候, 程序如果在等待其中一个I/O的时候阻塞, 就无法去做其他的事情. 针对这种困境的解决方案就是不让程序自己去阻塞, 而是使用select函数, 让内核挂起这个进程, 在某个或者多个I/O事件发生的时候, 将控制返回给应用程序.
通过I/O复用, 一个程序就可以像下边这样工作:
- 在{0, 4}两个描述符中的任一个准备好读的时候干活
- 在{1,2,7}三个描述符中的任一个准备好写的时候干活
- 在等待I/O的过程中超过了152.13秒, 就超时错误, 终止程序
select 函数很复杂, CSAPP 讲了一种场景, 就是等待一组描述符准备好被读. 这是一种让一个程序可以接受多种不同输入并且有针对性处理的场景.
select 函数操作的是一组描述符集合, 有点像之前信号集的操作, 用一组宏来操作, 然后将设置好的描述符集合交给select函数用来监听, select函数会一直阻塞, 直到有描述符可读而且至少能读一个字节. select函数会更新传入的第二个参数指向的fd_set, 其中放着所有准备好的集合, select 返回的int就是这个准备好的集合中描述符的数量.
#include <sys/select.h>
//返回已准备好的描述符的个数.
int select(int n, fd_set *fdset, NULL, NULL, NULL);
//操作描述符集的宏
FD_ZERO(fd_set, *fd_set); //清空描述符集
FD_CLR(int fd, fd_set *fdset); //从描述符集中清理fd描述符
FD_SET(int fd, fd_set *fdset); //向描述符集中设置fd描述符
FD_ISSET(int fd, fd_set *fdset); //描述符集中的fd描述符是否被设置?
看一个简单的例子来学习使用:
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/select.h>
#include "csapp.h"
#define MAXLINE 8192
void command(void);
void echo(int connfd);
int main(int argc, char **argv) {
int listenfd, connfd;
socklen_t clientlen;
struct sockaddr_storage clientaddr;
char client_hostname[MAXLINE], client_port[MAXLINE];
//声明描述符集
fd_set read_set, ready_set;
if (argc != 2) {
fprintf(stderr, "usage: %s <port>\n", argv[0]);
exit(0);
}
listenfd = Open_listenfd(argv[1]);
//清空描述符集
FD_ZERO(&read_set);
//打开标准输入
FD_SET(STDIN_FILENO, &read_set);
//打开listenfd
FD_SET(listenfd, &read_set);
//开始无限循环, 每一次进来连接就将地址写入 clientaddr 和 clientlen 然后打印出来
while (1){
//将准备完毕集合设置成read_set一致
ready_set = read_set;
//第一个参数是要监听的描述符集合的最大基数, 这里一共要监听 0, 1 ,2 ,3 四个描述符, 3号就是listenfd
Select(listenfd + 1, &ready_set, NULL, NULL, NULL);
//如果标准输入的描述符就绪, 就执行command函数
if (FD_ISSET(STDIN_FILENO, &ready_set)) {
command();
}
if(FD_ISSET(listenfd, &ready_set)){
clientlen = sizeof(struct sockaddr_storage);
connfd = Accept(listenfd, (SA *) &clientaddr, &clientlen);
echo(connfd);
Close(connfd);
}
}
exit(0);
}
void echo(int connfd){
size_t n;
char buf[MAXLINE];
rio_t rio;
Rio_readinitb(&rio, connfd);
while ((n = Rio_readlineb(&rio, buf, MAXLINE)) != 0) {
printf("server received %d bytes \n", (int) n);
Rio_writen(connfd, buf, n);
}
}
void command(void){
char buf[MAXLINE];
if (!Fgets(buf, MAXLINE, stdin)) {
exit(0);
}
printf("From stdin: %s\n", buf);
}
这个程序使用了两个描述符集合, 这样可以区分开原始的和当成参数被 select 函数修改之后的准备集.
这里我感觉首先要理解的是, listenfd描述符有连接进来的时候, 就会激活select函数, 这时候就可以判断然后去调用 connfd 这是第一个点.
第二个点就是每次都需要判断ready_set, 因为每次select函数执行完不阻塞的时候, ready_set都会变动.
由于这个服务器在标准输入不按下回车的时候, select是阻塞的. 而连接进来的时候, select会被激活而打开阻塞, 由于echo是循环的, 所以这个服务器的表现行为是:外部连接随时可以打断本地标准输入.
外部连接结束的时候, 所有已经按过回车的本地连接的内容又会激活select然后再打印出来.
12.3 在上边这个程序里如果本地输入按下了Ctrl+D, 会发生什么?
按下Ctrl+D是发送一个EOF到操作系统, EOF的本质是读到了0, 所以也是一个字节. 会触发select让其不阻塞. 不过我在实际试验的时候, 都会直接退出.
I/O多路复用 - 事件驱动的并发
I/O多路复用有点像操作系统给应用程序发送了一个信号, 因为 select 函数会阻塞, 当不阻塞的时候, 应用程序的代码得以执行, 应用程序的代码如果之后是根据可用描述符进行操作的话, 相当于每次有可用的描述符, 操作系统都"通知"了程序.
直观的来说, 我们的程序不等操作系统通知是不干活的, 这就是事件驱动, 有事干活, 没事挂起.
事件驱动的并发一般思路是将逻辑流模型变化为状态机. 所谓状态机就是一组(状态,输入事件)对应到一个输出. 只要改变输入, 就会得到一个输出. 在并发的时候, 实际上是针对每个输入, 创建一个逻辑流对应上, 只要这个输入进来, 就执行对应的逻辑流, 得到输出.
所以事件驱动程序的编写原则是:
- 监听事件
- 事件发生的时候, 使用一个容器放入要处理的内容
- 按一定顺序处理容器中的事件
- 将处理完的事件扔出容器,释放资源
- 继续监听事件
现在要写一个事件驱动的服务器示例, 其核心就是要使用一个自建的数据结构来存放事件. 当描述符可读的时候, 就更新这个事件数据结构, 然后服务器会按照顺序处理事件结构中的内容.
执行完毕的时候又回到等待 select 函数解除阻塞的过程. 这其中对于每次处理要小心的处理和释放资源. 如果用一个队列来装事件处理结构, 这个就是经常听到的事件队列.
先来看主程序的逻辑:
#include "csapp.h"
//针对每一次select返回, 创建一个事件数据结构池
typedef struct {
int maxfd; //read_set中最大的描述符
fd_set read_set; //可读描述符集合
fd_set ready_set; //送给select当参数的描述符集合
int nready; //select的返回值
int maxi; //clientfd的最大索引, 也就是已经使用的位置
int clientfd[FD_SETSIZE]; //存放可用的已连接描述符数组
rio_t clientrio[FD_SETSIZE]; //这个事件里的描述符使用的缓冲区数组
} pool;
void init_pool(int listenfd, pool *p);
void add_client(int connfd, pool *p);
void check_clients(pool *p);
int byte_cnt = 0; //接受到的总的字节数量
int main(int argc, char **argv) {
int listenfd, connfd;
socklen_t clientlen;
struct sockaddr_storage clientaddr;
//静态变量声明, 初始化一次, 之后不会被回收
static pool pool;
if (argc != 2) {
fprintf(stderr, "usage: %s <port>\n", argv[0]);
exit(0);
}
listenfd = Open_listenfd(argv[1]);
//关键函数之一, 初始化池子
init_pool(listenfd, &pool);
while (1) {
//这里完成了池子的初始化, 像原来一样调用select
pool.ready_set = pool.read_set;
//接收返回的可用描述符的数量放入pool.nready中
pool.nready = Select(pool.maxfd + 1, &pool.ready_set, NULL, NULL, NULL);
//此时select函数阻塞完毕, 需要处理事件了
// 如果监听描述符可用, 即有连接进来, 创建连接, 然后将连接描述符加入到池子数据结构中
// 纯粹的连接会将新的连接加入到池子, 同时减掉nready, 此时nready就等于0, 所以不会导致check_clients进行任何处理
// 如果没有创建新的连接, 则不会调用add_client, 此时nready就是大于等于1
if (FD_ISSET(listenfd, &pool.ready_set)) {
clientlen = sizeof(struct sockaddr_storage);
connfd = Accept(listenfd, (SA *) &clientaddr, &clientlen);
//关键函数之二, 将已连接描述符放入池子里
add_client(connfd, &pool);
}
//关键函数之三, 检查池子并处理已连接描述符
check_clients(&pool);
}
}
主程序的逻辑就是按照上边说的基础逻辑, 这里要注意使用了一个pool.nready变量. 每次有新的连接进来的时候, 会将select返回的可读写的描述符数量减去1, 因为同时只会有一个新的listenfd发生, 剩下的数量就是所有已经连接的描述符中可读的数量.
在进入到check_clients中, 程序只会在nready大于0 的情况下工作, 就说明仅仅是一个新连接的请求, 不会触发 check_clients 的工作, 也是要达到的目标.
void init_pool(int listenfd, pool *p) {
//将maxi和已连接描述符的数组全部设置为-1. 数组中-1表示一个空闲的位置.
int i;
p->maxi = -1;
for (i = 0; i < FD_SETSIZE; i++) {
p->clientfd[i] = -1;
}
//初始化read_set为仅包含listenfd描述符
p->maxfd = listenfd;
FD_ZERO(&p->read_set);
FD_SET(listenfd, &p->read_set);
}
初始化池子的函数主要是将数组初始化为-1, 每个-1表示可用的槽位, 将来放入已连接描述符. 而read_set始终保持为listenfd 加上所有已连接未关闭的描述符集合, 在每次循环中交给select去监听.
void add_client(int connfd, pool *p) {
int i;
printf("执行到这里add_client之前的 nready 等于 %d\n", p->nready);
//在执行add_client之前, select 已经解除阻塞, 所以此时的事件池中 nready 大于等于1, 由于有了一个新的连接, 所以要先将其减去1
p->nready--;
//遍历事件池中的已连接描述符数组
for (i = 0; i < FD_SETSIZE; i++) {
//如果有空槽位
if (p->clientfd[i] < 0) {
//空槽位中放入描述符
p->clientfd[i] = connfd;
//之后关联起来描述符和相同索引的缓冲区
Rio_readinitb(&p->clientrio[i], connfd);
//将描述符放入可读描述符集合
FD_SET(connfd, &p->read_set);
//更新最大描述符和数组最大索引
if (i > p->maxi) {
p->maxi = i;
}
if (connfd > p->maxfd) {
p->maxfd = connfd;
}
//成功之后跳出循环
break;
}
}
//循环执行完毕之后判断是不是数组已满, 即 i 已经等于FD_SETSIZE
if (i == FD_SETSIZE) {
unix_error("add_client error: Too many clients");
}
//函数执行完之后, 放入了一个 connfd, 然后将这个 connfd 加入了read_set.
}
add_client 当监听的listenfd可读的时候, 即有新连接的时候才工作, 会将nready减去1, 以让后边的check_clients正常工作. add_client的关键工作之一是将获取的已连接描述符放入read_set中, 以便让 check_clients 进行操作, 并再根据客户端关闭的情况操作一次read_set.
void check_clients(pool *p){
int i, connfd, n;
char buf[MAXLINE];
rio_t rio;
//遍历数组, 处理所有连接, 只搜索最大索引的部分, 这样效率比较高
for (i = 0; (i <= p->maxi) && (p->nready > 0); i++) {
connfd = p->clientfd[i];
rio = p->clientrio[i];
if ((connfd > 0) && (FD_ISSET(connfd, &p->ready_set))) {
p->nready--;
if ((n = Rio_readlineb(&rio, buf, MAXLINE)) != 0) {
byte_cnt += n;
printf("Server received %d (%d total) bytes on fd %d\n", n, byte_cnt, connfd);
Rio_writen(connfd, buf, n);
} else {
Close(connfd);
FD_CLR(connfd, &p->read_set);
p->clientfd[i] = -1;
}
}
}
}
最后是实际进行和客户端交流的 check_clients, 这个函数先根据nready判断要不要干活, 需要干活了, 就遍历已连接描述符数组, 对每个已连接描述符进行处理. 如果EOF了, 就将描述符从read_set中删除掉.
练习 12.4 为何每次调用 Select 之前, 需要重新初始化pool.ready_set变量
因为每次循环如果有新连接进来, read_set 中会保存 listenfd 外加所有的已连接描述符, 因此必须让Select去监听这个新的合集, 然而由于Select函数的特性, 会直接修改第二个变量指向的集合, 所以只好把 read_set 的值赋给 ready_set, 以分离参数值和结果值, 因为在后边的程序中, 会分别操作read_set 和 ready_set.
I/O多路复用 - 优缺点
I/O多路复用的优点有:
- 共享代码和数据, 对于需要共享数据的程序来说非常方便
- 无需切换进程, 上下文开销小, 代码是顺序执行, 调试方便
- 事件驱动设计给予程序员更大的灵活性
I/O多路复用的缺点有:
- 代码和控制流复杂, 如果并发粒度高, 而且业务复杂, 代码复杂度更高
- 单进程(线程)运作, 无法有效使用多核硬件
个人感觉对于牛逼程序员来说, 代码复杂度不算事. 不能充分利用多核才是真正的缺点. 但是超级快速的玩意比如 nginx 竟然也是使用I/O多路复用技术, 看来这玩意还真的快的不是一点点啊.
CSAPP这个原型还是不错的, 至少讲清楚了Socket通信方面使用事件数组存放数据, 然后不断进行事件处理循环的过程.