0%

IO多路复用

应用通常需要在多个文件描述符上阻塞:在键盘输入(stdin)、进程间通信以及很多文件之间协调I/O 。

在传统的IO模型中,每个IO操作都需要创建一个新的线程或进程来处理。因为单个进程无法同时在多个文件描述符上阻塞。

I/O多路复用支持一个线程同时监测多个文件描述符并且这个过程是阻塞的,当有IO操作可以进行时才唤醒线程进行处理,避免了频繁的线程创建和切换,提高了系统的效率。

多线程/多进程并发和IO多路复用的并发处理流程进行对比(服务器端):

  • 多线程/多进程并发

    • 主线程/父进程:调用 accept()监测客户端连接请求,如果没有新的客户端的连接请求,当前线程/进程会阻塞;如果有新的客户端连接请求解除阻塞,建立连接
    • 子线程/子进程:和建立连接的客户端通信。
      • 调用 read() / recv() 接收客户端发送的通信数据,如果没有通信数据,当前线程/进程会阻塞,数据到达之后阻塞自动解除
      • 调用 write() / send() 给客户端发送数据,如果写缓冲区已满,当前线程/进程会阻塞,否则将待发送数据写入写缓冲区中
  • IO多路复用并发

    • 使用IO多路复用函数委托内核检测服务器端所有的文件描述符(通信和监听两类),检测过程会导致进程/线程的阻塞,如果检测到已就绪的文件描述符阻塞解除,并将这些已就绪的文件描述符传出

    • 根据类型对传出的所有已就绪文件描述符进行判断,并做出不同的处理

      • 监听的文件描述符:和客户端建立连接,此时调用accept()不会导致程序阻塞,因为监听的文件描述符是已就绪的(有新请求)

      • 通信的文件描述符:调用通信函数和已建立连接的客户端通信,调用 read() / recv() 不会阻塞程序,因为通信的文件描述符是就绪的,读缓冲区内已有数据;调用 write() / send() 不会阻塞程序,因为通信的文件描述符是就绪的,写缓冲区不满,可以往里面写数据

    • 对这些文件描述符继续进行下一轮的检测

Linux提供了三种I/O多路复用方案:select、poll和epoll

select

1
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
  • nfds:待检测的文件描述符的数量,即监视的文件描述符集合中最大的文件描述符加1
  • readfds:指向一个集合,包含了希望监视其读操作的文件描述符。
  • writefds:指向一个集合,包含了希望监视其写操作的文件描述符。
  • exceptfds:指向一个集合,包含了希望监视错误异常的文件描述符。
  • timeout:指定超时时间,即 select() 函数最多等待的时间。如果设置为 NULL,则表示一直等待,直到有事件发生;如果设置为 0,则表示立即返回。

返回值:

  • 大于0:成功,返回集合中已就绪的IO总个数
  • 等于-1:调用失败
  • 等于0:没有就绪的IO

fd_set本质是一个数组,为了方便我们操作该数组,操作系统提供了以下函数:

1
2
3
4
5
6
7
8
9
10
11
/ 将文件描述符fd从set集合中删除 
void FD_CLR(int fd, fd_set *set);

// 判断文件描述符fd是否在set集合中
int FD_ISSET(int fd, fd_set *set);

// 将文件描述符fd添加到set集合中
void FD_SET(int fd, fd_set *set);

// 将set集合中, 所有文件描述符对应的标志位设置为0
void FD_ZERO(fd_set *set);

select() 函数的基本工作原理

  1. 当用户process调用select的时候,select会将需要监控的readfds集合拷贝到内核空间(假设监控的仅仅是socket可读),
  2. 然后遍历自己监控的skb(SocketBuffer),挨个调用skb的poll逻辑以便检查该socket是否有可读事件,
  3. 遍历完所有的skb后,如果没有任何一个socket可读,那么select会调用schedule_timeout进入schedule循环,使得process进入睡眠。
  4. 如果在timeout时间内某个socket上有数据可读了,或者等待timeout了,则调用select的process会被唤醒,接下来select就是遍历监控的集合,挨个收集可读事件并返回给用户了

需要注意的是,select() 函数有一些局限性,其中最主要的是:

  • nfds 参数需要指定为待监视文件描述符的最大值加1,这意味着它的性能可能受到文件描述符数量的限制。
  • select() 的效率可能会随着待监视的文件描述符数量的增加而下降。
  • select() 函数的时间复杂度为 O(n),因此在大规模文件描述符的情况下可能效率较低。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>

int main() {
fd_set readfds;
struct timeval timeout;
int fds[NUM_FDS] = {0}; // 存储文件描述符
int maxfd = 0; // 最大文件描述符
int retval, i;

// 创建5个文件描述符(示例中使用stdin和stdout)
for (i = 0; i < 5; ++i) {
fds[i] = i < 3 ? STDIN_FILENO : STDOUT_FILENO; // 前三个是 stdin,后两个是 stdout
if (fds[i] > maxfd) {
maxfd = fds[i];
}
}

// 清空文件描述符集合
FD_ZERO(&readfds);

// 将所有文件描述符加入读集合
for (i = 0; i < 5; ++i) {
FD_SET(fds[i], &readfds);
}

// 设置超时时间为5秒
timeout.tv_sec = 5;
timeout.tv_usec = 0;

// 使用select()函数进行监听
retval = select(maxfd + 1, &readfds, NULL, NULL, &timeout);

if (retval == -1) {
perror("select()");
exit(EXIT_FAILURE);
} else if (retval == 0) {
printf("No data within 5 seconds.\n");
} else {
// 检查就绪的文件描述符
for (i = 0; i < 5; ++i) {
if (FD_ISSET(fds[i], &readfds)) {
printf("Data is available from fd %d.\n", fds[i]);
}
}
}

return 0;
}

poll

poll的实现和select非常相似,只是描述fd集合的方式不同。poll使用pollfd结构而不是select的fd_set结构,这就解决了select中fds集合大小1024限制问题。但poll和select同样存在一个性能缺点就是包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

1
2
3
4
5
6
7
int poll(struct pollfd *fds, nfds_t nfds, int timeout);

struct pollfd {
int fd; // 文件描述符
short events; // 要监视的事件(可通过常量定义)
short revents; // 监视的事件中满足条件返回的事件(由内核填充)
};
  • fds:一个指向 struct pollfd 结构体数组的指针,其中每个元素描述了一个要监视的文件描述符及其所关注的事件。struct pollfd有三个成员:
    • fd:委托内核检测的文件描述符
    • events:委托内核检测的fd事件(输入、输出、错误),它可以是以下几种事件的组合:
      • POLLIN:文件描述符可读。
      • POLLOUT:文件描述符可写。
      • POLLERR:发生错误。
      • POLLHUP:对端挂起。
    • revents:这是一个传出参数,数据由内核写入,存储内核检测之后的结果
  • nfds:要监视的文件描述符的数量,类型实际是unsigned long
  • timeout:指定超时时间,单位是毫秒。如果设置为负值,poll() 将会无限期等待,直到有事件发生;如果设置为 0,则 poll() 将立即返回,不管是否有事件发生。

返回值:

  • -1:失败
  • 大于0:表示检测的集合中已就绪的文件描述符的总个数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include <stdio.h>
#include <stdlib.h>
#include <poll.h>
#include <unistd.h>

#define NUM_FDS 5

int main() {
struct pollfd fds[NUM_FDS];
int timeout = 5000; // 5秒超时

// 创建5个文件描述符(示例中使用 stdin 和 stdout)
for (i = 0; i < NUM_FDS; ++i) {
fds[i].fd = i < 3 ? STDIN_FILENO : STDOUT_FILENO; // 前三个是 stdin,后两个是 stdout
fds[i].events = i < 3 ? POLLIN : POLLOUT; // 前三个监视读事件,后两个监视写事件
}

// 使用 poll() 函数进行监听
int retval = poll(fds, NUM_FDS, timeout);

if (retval == -1) {
perror("poll()");
exit(EXIT_FAILURE);
} else if (retval == 0) {
printf("No data within 5 seconds.\n");
} else {
// 检查就绪的文件描述符
for (i = 0; i < NUM_FDS; ++i) {
if (fds[i].revents & POLLIN) {
printf("Data is available from fd %d.\n", fds[i].fd);
}
if (fds[i].revents & POLLOUT) {
printf("Ready to write data to fd %d.\n", fds[i].fd);
}
}
}

return 0;
}

epoll

epoll 是 Linux 中高性能的事件通知机制,它可以监视多个文件描述符的 I/O 事件,并在这些文件描述符上发生事件时通知应用程序。

epoll把用户关心的文件描述符上的事件放在内核里的一个事件表中,从而无须像select和poll那样每次调用都要重复传入文件描述符集或事件集。但epoll需要使用一个额外的文件描述符,来唯一标识内核中的这个事件表 。这个文件描述符通过epoll_create或epoll_create1创建

创建epoll实例

1
2
3
#include <sys/epoll.h>
int epfd = epoll_create(int size);
int epfd = epoll_create1(int flags) ;

size 参数指定了 epoll 实例的大小,通常设置为大于零的值即可。在 Linux 2.6.8 之前的版本中,该参数没有意义,但是现在已经被忽略。

flags 参数是一个位掩码,用于指定一些选项。目前支持的选项只有一个:EPOLL_CLOEXEC,用于在创建 epoll 实例时设置 close-on-exec 标志。如果设置了这个选项,当调用 exec 函数执行其他程序时,内核会自动关闭该 epoll 实例。简单地创建一个 epoll 实例,可以将 flags 参数设置为 0

当我们调用 epoll_create 函数创建一个 epoll 实例时,实际上是在内核中创建了一个数据结构来表示这个 epoll 实例。epoll 实例用于存储被监听的文件描述符及其相关信息,例如需要监听的事件类型、文件描述符的状态等。每个 epoll 实例都有一个唯一的标识符,通过这个标识符可以在用户空间中操作这个 epoll 实例。

操作内核事件表

  • 事件表是 epoll 实例内部的数据结构,用于存储被监听的文件描述符的事件和状态。在 epoll 中,一个文件描述符可以关注多种类型的事件,主要包括可读事件(EPOLLIN)、可写事件(EPOLLOUT)、对端关闭事件(EPOLLRDHUP)等。当文件描述符上发生了关注的事件时,内核会通知用户程序进行相应的处理。 文件描述符的状态通常指的是其当前的就绪状态,即是否有事件发生或准备好进行IO操作。当文件描述符上有事件发生时,它的状态会被标记为就绪,用户程序可以通过 epoll_wait 函数等待并获取就绪的文件描述符列表,进行相应的操作。
  • 在注册(EPOLL_CTL_ADD)文件描述符和事件时,这些信息会被存储到事件表中。事件表通常采用高效的数据结构(如红黑树)来组织,以便快速地查询、添加和删除事件。
  • 当文件描述符上有事件发生时,内核会在事件表中查找对应的文件描述符,并更新其状态。
1
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

参数:

  • epfdepoll 实例的文件描述符。
  • op 是操作类型,用三个宏来表示:
    • EPOLL_CTL_ADD:注册新的 fd 到 epfd 中;
    • EPOLL_CTL_MOD :修改已经注册的fd的监听事件;
    • EPOLL_CTL_DEL:从 epfd 中删除一个 fd;
  • fd 是需要向内核事件表中添加/修改/删除的文件描述符。
  • event 是指向 epoll_event 结构的指针,告诉内核要监听什么事件。定义如下:
1
2
3
4
struct epoll_event {
uint32_t events; // 表示事件的类型
epoll_data_t data; // 与事件相关的数据
};

epoll_event 结构包含两个成员:

  1. events:是一个 32 位无符号整数,表示事件的类型。可以是下列宏的组合:
    • EPOLLIN:表示文件描述符上有数据可读。
    • EPOLLOUT:表示文件描述符可以进行写操作。
    • EPOLLRDHUP:表示对端断开连接。
    • EPOLLERR:表示发生错误。
    • EPOLLHUP:表示发生挂起事件。
    • EPOLLET:设置为边缘触发模式。
    • EPOLLONESHOT:设置为单次触发模式。
  2. data:表示事件的数据,它是一个联合体 epoll_data_t,可以是一个文件描述符(fd)或者一个指针(ptr)。具体定义如下:
1
2
3
4
5
6
typedef union epoll_data {
void *ptr; // 指针类型
int fd; // 文件描述符类型
uint32_t u32; // 32 位无符号整数类型
uint64_t u64; // 64 位无符号整数类型
} epoll_data_t;

epoll_data 联合体允许用户在事件中传递一些附加的数据。

  • 如果事件类型是 EPOLLINEPOLLOUT,则通常使用 fd 成员来表示文件描述符;

  • 如果需要传递其他类型的数据,可以使用 ptr 成员来存储一个指针,或者使用 u32u64 来存储一个整数值。

返回值:

  • 成功时,返回 0。
  • 失败时,返回 -1,并设置相应的errno。

示例:添加一个文件描述符到内核事件表中

1
2
3
4
5
6
7
8
struct epoll_event event;
event.events = EPOLLIN; // 指定事件为可读事件
event.data.fd = fd; // 文件描述符

if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) {
perror("epoll_ctl: add");
exit(EXIT_FAILURE);
}

等待事件发生

  • epoll_wait 调用会阻塞程序直到事件发生或者超时。

  • 如果有事件发生,**epoll_wait 将把这些事件复制到 events 数组中**,并返回发生事件的数量。

  • events 数组中的每个元素都是一个 epoll_event 结构,其中包含了发生事件的文件描述符及其事件类型。

1
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

参数:

  • maxeventsevents 数组的大小,表示最多可以存储多少个事件。
  • timeout 是超时时间,以毫秒为单位。传递 -1 将一直阻塞等待事件,传递 0 将立即返回。

返回值:

  • 如果没有事件发生,且超时时间到达,则返回 0。
  • 如果有事件发生,返回发生事件的数量。

示例:

1
2
3
4
5
6
7
8
9
10
11
struct epoll_event events[MAX_EVENTS];
int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
if (nfds == -1) {
perror("epoll_wait");
exit(EXIT_FAILURE);
}

// 处理发生的事件
for (int i = 0; i < nfds; ++i) {
// 处理 events[i].data.fd 对应的文件描述符的事件
}

示例代码

server端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/epoll.h>

#define PORT 8080
#define MAX_EVENTS 10
#define BUFFER_SIZE 1024

int main() {
int server_fd, new_socket, epoll_fd, nfds;
struct sockaddr_in address;
struct epoll_event event, events[MAX_EVENTS];
char buffer[BUFFER_SIZE];

// Creating socket file descriptor
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
perror("socket failed");
exit(EXIT_FAILURE);
}

address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);

// Forcefully attaching socket to the port 8080
if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
perror("bind failed");
exit(EXIT_FAILURE);
}
if (listen(server_fd, 5) < 0) {
perror("listen");
exit(EXIT_FAILURE);
}

epoll_fd = epoll_create1(0);
if (epoll_fd == -1) {
perror("epoll_create1");
exit(EXIT_FAILURE);
}

event.events = EPOLLIN;
event.data.fd = server_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &event) == -1) {
perror("epoll_ctl: server_fd");
exit(EXIT_FAILURE);
}

while (1) {
nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
if (nfds == -1) {
perror("epoll_wait");
exit(EXIT_FAILURE);
}

for (int i = 0; i < nfds; ++i) {
if (events[i].data.fd == server_fd) {
new_socket = accept(server_fd, NULL, NULL);
if (new_socket == -1) {
perror("accept");
exit(EXIT_FAILURE);
}

event.events = EPOLLIN | EPOLLET;
event.data.fd = new_socket;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_socket, &event) == -1) {
perror("epoll_ctl: new_socket");
exit(EXIT_FAILURE);
}
} else {
int client_socket = events[i].data.fd;
int valread = read(client_socket, buffer, BUFFER_SIZE);
if (valread == 0) {
// Client disconnected
close(client_socket);
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_socket, NULL);
} else {
printf("Client: %s\n", buffer);
send(client_socket, buffer, valread, 0);
}
}
}
}

close(server_fd);
return 0;
}
  1. 首先,服务器创建一个 TCP 套接字 server_fd,并将其绑定到本地的 8080 端口上。然后开始监听连接请求。
  2. 创建了一个 epoll 实例 epoll_fd,并将服务器的监听套接字 server_fd 添加到 epoll 实例中,以便监视它上面的事件(主要是可读事件)。
  3. 进入一个无限循环,用于等待并处理发生在 epoll 实例中的事件。每次循环开始时,调用 epoll_wait 函数等待事件发生。
  4. 当有事件发生时,epoll_wait 返回并填充一个 events 数组,其中包含了发生的事件。
  5. 遍历 events 数组,对每一个事件进行处理。如果事件是发生在服务器监听套接字 server_fd 上的可读事件,说明有新的客户端连接请求到达了。
  6. 对新的客户端连接请求,服务器调用 accept 函数接受连接,创建一个新的套接字 new_socket 用于与客户端通信,并将新套接字添加到 epoll 实例中。
  7. 如果事件不是发生在监听套接字上的,则说明是已连接的客户端套接字上发生了可读事件,这表示客户端发送了数据。
  8. 服务器尝试从客户端套接字中读取数据,并进行处理。如果读取到的数据长度为 0,说明客户端已经关闭了连接,服务器关闭客户端套接字,并从 epoll 实例中删除该套接字。
  9. 循环继续,等待下一轮事件发生。