《希罗多德历史》- 不管在什么事情上面,我们都必须好好地注意一下它的结尾,因为神往往不过是叫许多人看到幸福的一个影子,随后便把他们推上了毁灭的道路。

前言

很早之前写webserver项目学习到的知识点,在这里把核心知识点记载下 ~

预备知识

典型的一次I/O分为两个阶段数据准备 和 数据读写

解释:作为服务器,接收客户端的请求,得先监听客户端有没有数据过来,这是一个状态,还有就是数据过来了该怎么去读写,这又是一个状态。

实际上,阻塞,非阻塞,同步,异步,分别是这两种状态下的体系。

网络I/O阶段1:数据准备

数据准备:根据系统IO操作的就绪状态,分为

  1. 阻塞 : 让调用I/O的线程进入阻塞状态 ,数据准备好了就唤醒
  2. 非阻塞: 不会改变线程的状态,通过返回值判断

1- 阻塞

sockfd相当于就是系统的文件描述符,代表1个I/O,创建的时候默认是阻塞,当我调用1个阻塞I/O的话,如果sockfd上没有数据可读,这个recv不会返回,造成当前线程阻塞,等待sockfd上有数据到来。如果返回了,就是有数据可读了,接下去就是数据读写了。返回的是最终读的数据的大小。一直等着。

2- 非阻塞

如果我们在创建sockfd的时候设置是非阻塞,recv的体现是:如果sockfd上没有数据到来的话,recv直接返回回来,不会造成当前线程阻塞。sockfd没有数据准备好的话,不断的空转CPU。

3 - 阻塞和非阻塞的返回值(数据准备的返回值)

如果size==-1的话,表示错误:

  • size==-1真的错误,是系统的内部错误,可能要close(sockfd)

  • 如果size==-1&&errno==EAGAIN,表示正常的非阻塞返回,sockfd上没有网络事件发生

如果size!= -1,有两种情况

  • 如果size= =0,表示网络对端关闭了连接,对端直接close(sockfd)
  • 如果size>0,就是表示有数据过来了。

网络I/O阶段2:数据读写

数据读写:根据应用程序和内核的交互方式,分为两种

  1. 同步
  2. 异步

2种同步和异步的区别:【I/O的同步和异步】和 【应用层并发的同步和异步】

1- IO同步

在应用程序上调用recv函数,这个sockfd我不管它工作在阻塞模式还是非阻塞模式,真的有数据准备好了之后(TCP的接收缓冲区有数据了,就是数据可读了),我们要读这个数据,这个buf是用户层自己定义的,recv就可以开始接收了,是应用程序卡在这里recv(),从内核的TCP接收缓冲区搬数据到应用层上的buf,在搬的过程中,因为size>0,这就表示从内核搬了多少字节的数据,我们就要访问buf了,没搬完之前,不会进入到下面的if语句。搬完了,recv才返回过来,看看size是多少,就是搬了多少数据,因此I/O同步是应用程序搬的数据。

I/O同步的意思就是:当我调用网络I/O的接口,当I/O阶段1数据准备好之后,在数据读写的时候,应用层自己调用网络I/O接口自己去读写,都花在应用层上

Note:recv和send是同步的I/O接口

2 - I/O异步

当我请求内核的时候,关心sockfd上的数据,远端如果发过来数据,我需要读sockfd上的数据,我有一个buf,到时候如果有数据来了,内核能不能帮忙把数据放到buf里面,我再给内核注册一个sigio信号,也就是说,对一个操作系统级别的异步的I/O接口来说,我先塞给内核一个sockfd,表示对这个sockfd上的事件感兴趣,如果sockfd上有数据可读的话,麻烦操作系统内核把数据搬到buf里面。

内核把内核缓冲区-sockfd对应的TCP接收缓冲区的数据搬到buf里面,搬完以后,通过信号sigio给应用程序通知一下。应用程序在这期间可以玩自己的了,做任何事清都可以。

通知是异步最大的标识,是异步就有通知

linux的aio_read,aio_write就是典型的linux给我们提供的异步I/O接口。

陈硕大佬的原话:在处理 IO 的时候,阻塞和非阻塞都是同步 IO。只有使用了特殊的 API 才是异步IO

Linux五种I/O模型

  • 阻塞IO

    • 调用者调用了某个函数,等待这个函数返回,期间什么都不做,不停检查这个函数有没有返回,必须等该函数返回才能进行下一步动作。

  • 非阻塞IO

    • 每隔一段时间去检测IO时间是否就绪,没有就绪就可以进行其他操作。
    • 非阻塞IO执行系统调用总是立即返回,不管事件是!·1否已经发生,若没发生则返回-1,此时根据errno区分两种情况,对于accept、recv和send,事件未发生时,errno被设置为EAGAIN。

  • IO复用

  • select、poll、epoll函数实现,这些函数也会使进程阻塞,但是和阻塞IO不同的是,这些函数可以同时阻塞多个IO操作。

    • 可以同时对多个读操作、写操作的IO函数进行检测,直到有数据可读或可写时,才真正调用IO操作函数.

    • IO多路复用的含义就是一个进程能处理多个socket

  • 信号驱动IO

    • 注册新号处理函数,进程继续运行并不阻塞,当IO事件就绪时,进程受到SIGIO信号,然后处理IO事件。
    • 内核在第一个阶段是异步,在第二个阶段是同步。
    • 与非阻塞IO的区别在于它提供了消息通知机制,不需要用户进程不断的轮询检查,减少了系统API的调用次数,提高了效率。

  • 异步IO

    • Linux中,调用aio_read函数告诉内核描述字缓冲区指针和缓冲区大小 、文件偏移及通知的方式,然后立即返回,当内核将数据拷贝到缓冲区后,再通知应用程序。

事件处理模式

Reactor模式

使用同步I/O模型(以epoll_wait为例)实现的Reactor模式的工作流程是:

1)主线程往epoll内核事件表中注册socket上的读就绪事件。

2)主线程调用epoll_wait等待socket上有数据可读。

3)当socket上有数据可读时,epoll_wait通知主线程。主线程则将socket可读事件放入请求队列。

4)睡眠在请求队列上的某个工作线程被唤醒,它从socket读取数据,并处理客户请求,然后往epoll内核事件表中注册该socket上的写就

绪事件。

5)主线程调用epoll_wait等待socket可写。

6)当socket可写时,epoll_wait通知主线程。主线程将socket可写事件放入请求队列。

7)睡眠在请求队列上的某个工作线程被唤醒,它往socket上写入服务器处理客户请求的结果。

image-20240824220205429

Proactor模式

1)主线程调用aio_read函数向内核注册socket上的读完成事件,并告诉内核用户读缓冲区的位置,以及读操作完成时如何通知应用程序

(这里以信号为例,详情请参考sigevent的man手册)。

2)主线程继续处理其他逻辑。

3)当socket上的数据被读入用户缓冲区后,内核将向应用程序发送一个信号,以通知应用程序数据已经可用。

4)应用程序预先定义好的信号处理函数选择一个工作线程来处理客户请求。工作线程处理完客户请求之后,调用aio_write函数向内核注册socket上的写完成事件,并告诉内核用户写缓冲区的位置,以及写操作完成时如何通知应用程序(仍然以信号为例)。

5)主线程继续处理其他逻辑。

6)当用户缓冲区的数据被写入socket之后,内核将向应用程序发送一个信号,以通知应用程序数据已经发送完毕。

7)应用程序预先定义好的信号处理函数选择一个工作线程来做善后处理,比如决定是否关闭socket。

image-20240824220247733

模拟Proactor模式

1)主线程往epoll内核事件表中注册socket上的读就绪事件。

2)主线程调用epoll_wait等待socket上有数据可读。

3)当socket上有数据可读时,epoll_wait通知主线程。主线程从socket循环读取数据,直到没有更多数据可读,然后将读取到的数据封

装成一个请求对象并插入请求队列。

4)睡眠在请求队列上的某个工作线程被唤醒,它获得请求对象并处理客户请求,然后往epoll内核事件表中注册socket上的写就绪事件。

5)主线程调用epoll_wait等待socket可写。6)当socket可写时,epoll_wait通知主线程。主线程往socket上写入服务器处理客户请求的结果。

image-20240824220326832

IO多路复用

select

简单理解:

  • 委托内核进行操作
  • 只会通知有几个任务可用,但不知道具体哪几个任务,还需遍历(与NIO模型略有不同)

主旨思想

  1. 首先要构造一个关于文件描述符的列表,将要监听的文件描述符添加到该列表中
  2. 调用一个系统函数(select),监听该列表中的文件描述符,直到这些描述符中的一个或者多个进行I/O操作时,该函数才返回
    • 这个函数是阻塞
    • 函数对文件描述符的检测的操作是由内核完成的
  3. 在返回时,它会告诉进程有多少(哪些)描述符要进行I/O操作

用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <sys/time.h> 
#include <sys/types.h>
#include <unistd.h>
#include <sys/select.h>

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

// 将参数文件描述符fd对应的标志位设置为0
void FD_CLR(int fd, fd_set *set);
// 判断fd对应的标志位是0还是1, 返回值 : fd对应的标志位的值,0,返回0, 1,返回1
int FD_ISSET(int fd, fd_set *set);
// 将参数文件描述符fd 对应的标志位,设置为1
void FD_SET(int fd, fd_set *set);
// fd_set一共有1024 bit, 全部初始化为0
void FD_ZERO(fd_set *set);
  • int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

    • 通过man select查看帮助

    • 参数

      • nfds:委托内核检测的最大文件描述符的值 + 1(+1是因为遍历是下标从0开始,for循环<设定)
      • readfds:要检测的文件描述符的读的集合,委托内核检测哪些文件描述符的读的属性
        • 一般检测读操作
        • 对应的是对方发送过来的数据,因为读是被动的接收数据,检测的就是读缓冲区
        • 是一个传入传出参数
      • writefds:要检测的文件描述符的写的集合,委托内核检测哪些文件描述符的写的属性
        • 委托内核检测写缓冲区是不是还可以写数据(不满的就可以写)
      • exceptfds:检测发生异常的文件描述符的集合,一般不用
      • timeout:设置的超时时间,含义见**select参数列表说明**
        • NULL:永久阻塞,直到检测到了文件描述符有变化
        • tv_sec = tv_usec = 0, 不阻塞
        • tv_sec > 0,tv_usec > 0:阻塞对应的时间
    • 返回值

      • -1:失败
      • >0(n):检测的集合中有n个文件描述符发生了变化
  • select参数列表说明

    • fd_set:是一块固定大小的缓冲区(结构体),sizeof(fd_set)=128,即对应1024个比特位

    • timeval :结构体类型

      1
      2
      3
      4
      struct timeval { 
      long tv_sec; /* seconds */
      long tv_usec; /* microseconds */
      };

工作过程分析

1.初始设定

2.设置监听文件描述符,将fd_set集合相应位置为1

3.调用select委托内核检测

4.内核检测完毕后,返回给用户态结果

代码实现

注意事项

  • select中需要的监听集合需要两个
    • 一个是用户态真正需要监听的集合rSet
    • 一个是内核态返回给用户态的修改集合tmpSet
  • 需要先判断监听文件描述符是否发生改变
    • 如果改变了,说明有客户端连接,此时需要将新的连接文件描述符加入到rSet,并更新最大文件描述符
    • 如果没有改变,说明没有客户端连接
  • 由于select无法确切知道哪些文件描述符发生了改变,所以需要执行遍历操作,使用FD_ISSET判断是否发生了改变
  • 如果客户端断开了连接,需要从rSet中清除需要监听的文件描述符
  • 程序存在的问题:中间的一些断开连接后,最大文件描述符怎么更新?=>估计不更新,每次都会遍历到之前的最大值处,解决方案见高并发优化思考

服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#include <stdio.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/select.h>

#define SERVERIP "127.0.0.1"

#define PORT 6789

int main(){
// 1. 创建socket(用于监听的套接字)
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd == -1) {
perror("socket");
exit(-1);
}
// 2. 绑定
struct sockaddr_in server_addr;
server_addr.sin_family = PF_INET;
// 点分十进制转换为网络字节序
inet_pton(AF_INET, SERVERIP, &server_addr.sin_addr.s_addr);
// 服务端也可以绑定0.0.0.0即任意地址
// server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(PORT);
int ret = bind(listenfd, (struct sockaddr*)&server_addr, sizeof(server_addr));
if (ret == -1) {
perror("bind");
exit(-1);
}
// 3. 监听
ret = listen(listenfd, 8); // 同时最多能处理8个客户端
if (ret == -1) {
perror("listen");
exit(-1);
}
// 创建读检测集合
// rSet用于记录正在的监听集合,tmpSet用于记录在轮训过程中由内核态返回到用户态的集合
fd_set rSet, tmpSet;
// 清空
FD_ZERO(&rSet);
// 将监听文件描述符加入
FD_SET(listenfd, &rSet);
// 此时最大的文件描述符为监听描述符
int maxfd = listenfd;
// 不断循环等待客户端连接
while (1) {
tmpSet = rSet;
// 使用select,设置为永久阻塞,有文件描述符变化才返回
int num = select(maxfd + 1, &tmpSet, NULL, NULL, NULL);
if (num == -1) {
perror("select");
exit(-1);
} else if (num == 0) {
// 当前无文件描述符有变化,执行下一次遍历
// 在本次设置中无效(因为select被设置为永久阻塞)
continue;
} else {
// 首先判断监听文件描述符是否发生改变(即是否有客户端连接)
if (FD_ISSET(listenfd, &tmpSet)) {
// 4. 接收客户端连接
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
// 这行代码的作用是接受客户端的连接请求,并返回一个新的 socket 文件描述符 connfd,通过这个 connfd 可以进行和客户端的通信。
int connfd = accept(listenfd, (struct sockaddr*)&client_addr, &client_addr_len);
if (connfd == -1) {
perror("accept");
exit(-1);
}
// 输出客户端信息,IP组成至少16个字符(包含结束符)
char client_ip[16] = {0};
inet_ntop(AF_INET, &client_addr.sin_addr.s_addr, client_ip, sizeof(client_ip));
unsigned short client_port = ntohs(client_addr.sin_port);
printf("ip:%s, port:%d\n", client_ip, client_port);

FD_SET(connfd, &rSet);
// 更新最大文件符
maxfd = maxfd > connfd ? maxfd : connfd;
}

// 遍历集合判断是否有变动,如果有变动,那么通信
char recv_buf[1024] = {0};
for (int i = listenfd + 1; i <= maxfd; i++) {
if (FD_ISSET(i, &tmpSet)) {
ret = read(i, recv_buf, sizeof(recv_buf));
if (ret == -1) {
perror("read");
exit(-1);
} else if (ret > 0) {
printf("recv server data : %s\n", recv_buf);
write(i, recv_buf, strlen(recv_buf));
} else {
// 表示客户端断开连接
printf("client closed...\n");
close(i);
FD_CLR(i, &rSet);
break;
}
}
}
}
}

close(listenfd);
return 0;
}

通俗来说,就是服务器先设置个文件描述符,专门用于监听是否有客户端进来,在其上绑定IP和端口号,交给给内核进行监听,当有客户端进来则这个专门用于监听的文件描述符就会发生变化。之后,当有客户端进来的时候,我们需要创建一个描述符用于与这个新进来的客户端进行通信。

【这里举个不是很恰当的例子,listened描述符就像酒店门口迎宾人员,当有客人来的时候,带进酒店的包间,之后由其他的服务人员来接管。之后这位迎宾人员又回到原来的工作岗位上】

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#include <stdio.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>

#define SERVERIP "127.0.0.1"
#define PORT 6789

int main(){
// 1. 创建socket(用于通信的套接字)
int connfd = socket(AF_INET, SOCK_STREAM, 0);
if (connfd == -1) {
perror("socket");
exit(-1);
}
// 2. 连接服务器端
struct sockaddr_in server_addr;
server_addr.sin_family = PF_INET;
inet_pton(AF_INET, SERVERIP, &server_addr.sin_addr.s_addr);
server_addr.sin_port = htons(PORT);
int ret = connect(connfd, (struct sockaddr*)&server_addr, sizeof(server_addr));
if (ret == -1) {
perror("connect");
exit(-1);
}
// 3. 通信
char recv_buf[1024] = {0};
while (1) {
// 发送数据
char *send_buf = "client message";
write(connfd, send_buf, strlen(send_buf));
// 接收数据
ret = read(connfd, recv_buf, sizeof(recv_buf));
if (ret == -1) {
perror("read");
exit(-1);
} else if (ret > 0) {
printf("recv server data : %s\n", recv_buf);
} else {
// 表示客户端断开连接
printf("client closed...\n");
}
// 休眠的目的是为了更好的观察,放在此处可以解决read: Connection reset by peer问题
sleep(1);
}
// 关闭连接
close(connfd);
return 0;
}

存在问题

  • 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大
  • 同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大
  • select支持的文件描述符数量太小了,默认是1024
  • fds集合不能重用,每次都需要重置

poll

主旨思想

  • 用一个结构体记录文件描述符集合,并记录用户态状态和内核态状态

函数概述

  • 概览

    1
    2
    3
    4
    5
    6
    7
    8
    #include <poll.h> 
    struct pollfd {
    int fd; /* 委托内核检测的文件描述符 */
    short events; /* 委托内核检测文件描述符的什么事件 */
    short revents; /* 文件描述符实际发生的事件 */
    };

    int poll(struct pollfd *fds, nfds_t nfds, int timeout);
  • int poll(struct pollfd *fds, nfds_t nfds, int timeout);

    • 通过man poll查看帮助

    • 参数

      • fds:是一个struct pollfd 结构体数组,这是一个需要检测的文件描述符的集合
      • nfds:这个是第一个参数数组中最后一个有效元素的下标 + 1
      • timeout:阻塞时长
        • 0:不阻塞
        • -1:阻塞,当检测到需要检测的文件描述符有变化,解除阻塞
        • >0:具体的阻塞时长(ms)
    • 返回值

      • -1:失败
      • >0(n):检测的集合中有n个文件描述符发生了变化
  • eventsrevents取值,如果有多个事件需要检测,用|即可,如同时检测读和写:POLLIN | POLLOUT

代码实现

注意事项

  • nfds表示的监听文件描述符的下标,所以在遍历时,需要使用fds[i].fd取得相应的文件描述符
  • 如何优雅的更新nfds代码中使用连接的文件描述符作为替代更新
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#include <stdio.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <poll.h>

#define SERVERIP "127.0.0.1"
#define PORT 6789
int main(){
// 1. 创建socket(用于监听的套接字)
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd == -1) {
perror("socket");
exit(-1);
}
// 2. 绑定
struct sockaddr_in server_addr;
server_addr.sin_family = PF_INET;
// 点分十进制转换为网络字节序
inet_pton(AF_INET, SERVERIP, &server_addr.sin_addr.s_addr);
// 服务端也可以绑定0.0.0.0即任意地址
// server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(PORT);
int ret = bind(listenfd, (struct sockaddr*)&server_addr, sizeof(server_addr));
if (ret == -1) {
perror("bind");
exit(-1);
}
// 3. 监听
ret = listen(listenfd, 8);
if (ret == -1) {
perror("listen");
exit(-1);
}

struct pollfd fds[1024];
// 初始化
for (int i = 0; i < 1024; i++) {
fds[i].fd = -1;
fds[i].events = POLLIN;
}
// 将监听文件描述符加入
fds[0].fd = listenfd;
int nfds = 0;
// 不断循环等待客户端连接
while (1) {
// 使用poll,设置为永久阻塞,有文件描述符变化才返回
int num = poll(fds, nfds + 1, -1);
if (num == -1) {
perror("poll");
exit(-1);
} else if (num == 0) {
// 当前无文件描述符有变化,执行下一次遍历
// 在本次设置中无效(因为select被设置为永久阻塞)
continue;
} else {
// 首先判断监听文件描述符是否发生改变(即是否有客户端连接)
if (fds[0].revents & POLLIN) {
// 4. 接收客户端连接
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
int connfd = accept(listenfd, (struct sockaddr*)&client_addr, &client_addr_len);
if (connfd == -1) {
perror("accept");
exit(-1);
}
// 输出客户端信息,IP组成至少16个字符(包含结束符)
char client_ip[16] = {0};
inet_ntop(AF_INET, &client_addr.sin_addr.s_addr, client_ip, sizeof(client_ip));
unsigned short client_port = ntohs(client_addr.sin_port);
printf("ip:%s, port:%d\n", client_ip, client_port);
// 遍历集合, 将新的需要监听的文件描述符加入集合
for (int i = 1; i < 1024; i++) {
if (fds[i].fd == -1) {
fds[i].fd = connfd;
fds[i].events = POLLIN;
break;
}
}
// 更新最大的监听文件描述符集合下标
// 存在问题:使用文件描述符替代最大对应下标
nfds = nfds > connfd ? nfds : connfd;
}

// 遍历集合判断是否有变动,如果有变动,那么通信
char recv_buf[1024] = {0};
for (int i = 1; i <= nfds; i++) {
if (fds[i].fd != -1 && fds[i].revents & POLLIN) {
ret = read(fds[i].fd, recv_buf, sizeof(recv_buf));
if (ret == -1) {
perror("read");
exit(-1);
} else if (ret > 0) {
printf("recv server data : %s\n", recv_buf);
write(fds[i].fd, recv_buf, strlen(recv_buf));
} else {
// 表示客户端断开连接
printf("client closed...\n");
close(fds[i].fd);
fds[i].fd = -1;
break;
}
}
}
}
}

close(listenfd);
return 0;
}

大致的思想和select一致,只不过是使用数组来存储标识。

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#include <stdio.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>

#define SERVERIP "127.0.0.1"
#define PORT 6789

int main(){
// 1. 创建socket(用于通信的套接字)
int connfd = socket(AF_INET, SOCK_STREAM, 0);
if (connfd == -1) {
perror("socket");
exit(-1);
}
// 2. 连接服务器端
struct sockaddr_in server_addr;
server_addr.sin_family = PF_INET;
inet_pton(AF_INET, SERVERIP, &server_addr.sin_addr.s_addr);
server_addr.sin_port = htons(PORT);
int ret = connect(connfd, (struct sockaddr*)&server_addr, sizeof(server_addr));
if (ret == -1) {
perror("connect");
exit(-1);
}
// 3. 通信
char recv_buf[1024] = {0};
while (1) {
// 发送数据
char *send_buf = "client message";
write(connfd, send_buf, strlen(send_buf));
// 接收数据
ret = read(connfd, recv_buf, sizeof(recv_buf));
if (ret == -1) {
perror("read");
exit(-1);
} else if (ret > 0) {
printf("recv server data : %s\n", recv_buf);
} else {
// 表示客户端断开连接
printf("client closed...\n");
}
// 休眠的目的是为了更好的观察,放在此处可以解决read: Connection reset by peer问题
sleep(1);
}
// 关闭连接
close(connfd);
return 0;
}

存在问题

  • 缺点同select第一点和第二点(如下),即解决了第三点和第四点
  • 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大
  • 同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大

epoll

简单理解

  • 委托内核进行操作
  • 会通知具体有哪几个任务可用

主旨思想

  • 直接在内核态创建 eventpoll实例(结构体),通过epoll提供的API操作该实例
  • 结构体中有红黑树双链表,分别用来存储需要检测的文件描述符存储已经发生改变的文件描述符

函数说明

  • 概览

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    #include <sys/epoll.h>

    // 创建一个新的epoll实例
    // 在内核中创建了一个数据,这个数据中有两个比较重要的数据,一个是需要检测的文件描述符的信息(红黑树),还有一个是就绪列表,存放检测到数据发送改变的文件描述符信息(双向链表)
    int epoll_create(int size);

    // 对epoll实例进行管理:添加文件描述符信息,删除信息,修改信息
    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
    struct epoll_event {
    uint32_t events; /* Epoll events */
    epoll_data_t data; /* User data variable */
    };
    typedef union epoll_data {
    void *ptr;
    int fd;
    uint32_t u32;
    uint64_t u64;
    } epoll_data_t;

    // 检测函数
    int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
  • int epoll_create(int size);

    • 功能:创建一个新的epoll实例
    • 参数:size,目前没有意义了(之前底层实现是哈希表,现在是红黑树),随便写一个数,必须大于0
    • 返回值
      • -1:失败
      • >0:操作epoll实例的文件描述符
  • int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

    • 功能:对epoll实例进行管理:添加文件描述符信息,删除信息,修改信息

    • 参数:

      • epfd:epoll实例对应的文件描述符
      • op:要进行什么操作
        • 添加:EPOLL_CTL_ADD
        • 删除:EPOLL_CTL_DEL
        • 修改:EPOLL_CTL_MOD
      • fd:要检测的文件描述符
      • event:检测文件描述符什么事情,通过设置epoll_event.events,常见操作
        • 读事件:EPOLLIN
        • 写事件:EPOLLOUT
        • 错误事件:EPOLLERR
        • 设置边沿触发:EPOLLET(默认水平触发)
    • 返回值:成功0,失败-1

  • int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

    • 功能:检测哪些文件描述符发生了改变

    • 参数:

      • epfd:epoll实例对应的文件描述符
    • events:传出参数,保存了发生了变化的文件描述符的信息

      • maxevents:第二个参数结构体数组的大小
    • timeout:阻塞时长

      • 0:不阻塞
      • -1:阻塞,当检测到需要检测的文件描述符有变化,解除阻塞
        • >0:具体的阻塞时长(ms)
    • 返回值:

      • > 0:成功,返回发送变化的文件描述符的个数
      • -1:失败

注意事项

  • events是封装了监听描述符信息的结构体,每一个新增文件都需要这个(可重用)

  • 需要注意可能同时发生了多个监听(如监听读事件和写事件),那么代码逻辑需要做相应判断

    如本例中只检测读事件,排除了写事件

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#include <stdio.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/epoll.h>

#define SERVERIP "127.0.0.1"
#define PORT 6789

int main(){
// 1. 创建socket(用于监听的套接字)
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd == -1) {
perror("socket");
exit(-1);
}
// 2. 绑定
struct sockaddr_in server_addr;
server_addr.sin_family = PF_INET;
// 点分十进制转换为网络字节序
inet_pton(AF_INET, SERVERIP, &server_addr.sin_addr.s_addr);
// 服务端也可以绑定0.0.0.0即任意地址
// server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(PORT);
int ret = bind(listenfd, (struct sockaddr*)&server_addr, sizeof(server_addr));
if (ret == -1) {
perror("bind");
exit(-1);
}
// 3. 监听
ret = listen(listenfd, 8);
if (ret == -1) {
perror("listen");
exit(-1);
}

// 创建epoll实例
int epfd = epoll_create(100);
// 将监听文件描述符加入实例
struct epoll_event event;
event.events = EPOLLIN;
event.data.fd = listenfd;
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &event);
if (ret == -1) {
perror("epoll_ctl");
exit(-1);
}
// 此结构体用来保存内核态返回给用户态发生改变的文件描述符信息
struct epoll_event events[1024];
// 不断循环等待客户端连接
while (1) {
// 使用epoll,设置为永久阻塞,有文件描述符变化才返回
int num = epoll_wait(epfd, events, 1024, -1);
if (num == -1) {
perror("poll");
exit(-1);
} else if (num == 0) {
// 当前无文件描述符有变化,执行下一次遍历
// 在本次设置中无效(因为select被设置为永久阻塞)
continue;
} else {
// 遍历发生改变的文件描述符集合
for (int i = 0; i < num; i++) {
// 判断监听文件描述符是否发生改变(即是否有客户端连接)
int curfd = events[i].data.fd;
if (curfd == listenfd) {
// 4. 接收客户端连接
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
int connfd = accept(listenfd, (struct sockaddr*)&client_addr, &client_addr_len);
if (connfd == -1) {
perror("accept");
exit(-1);
}
// 输出客户端信息,IP组成至少16个字符(包含结束符)
char client_ip[16] = {0};
inet_ntop(AF_INET, &client_addr.sin_addr.s_addr, client_ip, sizeof(client_ip));
unsigned short client_port = ntohs(client_addr.sin_port);
printf("ip:%s, port:%d\n", client_ip, client_port);
// 将信息加入监听集合
event.events = EPOLLIN;
event.data.fd = connfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &event);
} else {
// 只检测读事件
if (events[i].events & EPOLLOUT) {
continue;
}
// 接收消息
char recv_buf[1024] = {0};
ret = read(curfd, recv_buf, sizeof(recv_buf));
if (ret == -1) {
perror("read");
exit(-1);
} else if (ret > 0) {
printf("recv server data : %s\n", recv_buf);
write(curfd, recv_buf, strlen(recv_buf));
} else {
// 表示客户端断开连接
printf("client closed...\n");
close(curfd);
epoll_ctl(epfd, EPOLL_CTL_DEL, curfd, NULL);
break;
}
}
}
}
}

close(listenfd);
close(epfd);
return 0;
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#include <stdio.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>

#define SERVERIP "127.0.0.1"
#define PORT 6789

int main(){
// 1. 创建socket(用于通信的套接字)
int connfd = socket(AF_INET, SOCK_STREAM, 0);
if (connfd == -1) {
perror("socket");
exit(-1);
}
// 2. 连接服务器端
struct sockaddr_in server_addr;
server_addr.sin_family = PF_INET;
inet_pton(AF_INET, SERVERIP, &server_addr.sin_addr.s_addr);
server_addr.sin_port = htons(PORT);
int ret = connect(connfd, (struct sockaddr*)&server_addr, sizeof(server_addr));
if (ret == -1) {
perror("connect");
exit(-1);
}
// 3. 通信
char recv_buf[1024] = {0};
while (1) {
// 发送数据
char *send_buf = "client message";
write(connfd, send_buf, strlen(send_buf));
// 接收数据
ret = read(connfd, recv_buf, sizeof(recv_buf));
if (ret == -1) {
perror("read");
exit(-1);
} else if (ret > 0) {
printf("recv server data : %s\n", recv_buf);
} else {
// 表示客户端断开连接
printf("client closed...\n");
}
// 休眠的目的是为了更好的观察,放在此处可以解决read: Connection reset by peer问题
sleep(1);
}
// 关闭连接
close(connfd);
return 0;
}

工作模式(LT与ET)

水平触发(level triggered, LT)

  • epoll的缺省的工作方式,并且同时支持 block 和 non-block socket
  • 在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的 fd 进行 IO 操作。如果你不作任何操作,内核还是会继续通知你的。

边沿触发(edge triggered, ET)

  • 是高速工作方式,只支持 non-block socket,需要对监听文件描述符设置才能实现
  • 在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了。但是请注意,如果一直不对这个 fd 作 IO 操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once)

区别与说明

  • ET 模式在很大程度上减少了 epoll 事件被重复触发的次数,因此效率要比 LT 模式高
  • epoll工作在 ET 模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死
  • 所以如果使用ET且缓冲区内容不能一次性读完,需要写一个循环将内容全部读取,且需要将套接字设置为非阻塞
  • 说明:假设委托内核检测读事件,即检测fd的读缓冲区,那么如果读缓冲区有数据 ,epoll检测到了会给用户通知
    • LT【只要缓冲区有数据,就会一直通知】
      • 用户不读数据,数据一直在缓冲区,epoll 会一直通知
      • 用户只读了一部分数据,epoll会通知
      • 缓冲区的数据读完了,不通知
    • ET【缓冲区有数据,只会通知一次】
      • 用户不读数据,数据一致在缓冲区中,epoll下次检测的时候就不通知了
      • 用户只读了一部分数据,epoll不通知
      • 缓冲区的数据读完了,不通知

api

  • int socket(int domain, int type, int protocol);

    • 功能:创建一个套接字
  • 参数:

    • domain:协议族(常用如下)

    • AF_INETipv4

      • AF_INET6 :ipv6
      • AF_UNIX, AF_LOCAL:本地套接字通信(进程间通信)
    • type:通信过程中使用的协议类型

      • SOCK_STREAM : 流式协议
        • SOCK_DGRAM : 报式协议
      • protocol:具体的一个协议,一般写0,用于指定type参数的默认协议类型
      • SOCK_STREAM : 流式协议默认使用 TCP
      • SOCK_DGRAM : 报式协议默认使用 UDP
    • 返回值

      • 成功:返回文件描述符,操作的就是内核缓冲区
    • 失败:-1

  • int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);

    • 功能:绑定,将fd 和本地的IP和端口进行绑定
    • 参数:
      • sockfd:通过socket函数得到的文件描述符
      • addr:需要绑定的socket地址,这个地址封装了本地的ip和端口号的信息
      • addrlen:第二个参数结构体占的内存大小
    • 返回值:成功:0,失败:-1
  • int listen(int sockfd, int backlog);

    • 功能:监听这个socket上的连接
    • 参数:
      • sockfd:通过socket()函数得到的文件描述符
      • backlog:未连接的和已经连接的和的最大值,可用cat /proc/sys/net/core/somaxconn查看Linux设置值
    • 返回值:
      • 成功:0
      • 失败:-1
  • int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);

    • 功能:接收客户端连接,默认是一个阻塞的函数,阻塞等待客户端连接
    • 参数:
      • sockfd : 用于监听的文件描述符
      • addr : 传出参数,记录了连接成功后客户端的地址信息(ip,port)
      • addrlen : 指定第二个参数的对应的内存大小
    • 返回值:
      • 成功:用于通信的文件描述符
      • 失败:-1
  • int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);

    • 功能: 客户端连接服务器
    • 参数:
      • sockfd : 用于**通信的文件描述符 **
      • addr : 客户端要连接的服务器的地址信息
      • addrlen : 指定第二个参数的对应的内存大小
    • 返回值:成功 0, 失败 -1
  • 其他读写函数:

    1
    2
    ssize_t write(int fd, const void *buf, size_t count); // 写数据 
    ssize_t read(int fd, void *buf, size_t count); // 读数据

IOCP

IOCP(Input/Output Completion Port)

IOCP 是 Windows 操作系统提供的一种高性能 I/O 模型,主要用于处理大量并发 I/O 操作。IOCP 通过完成端口(Completion Port)来管理 I/O 请求,可以有效地将 I/O 操作分发到多个线程,从而提高并发处理能力。

特点

  • 线程池管理IOCP 使用线程池来处理 I/O 请求,可以自动调整线程数量,避免线程过多导致的上下文切换开销。
  • 异步 I/OIOCP 支持异步 I/O 操作,可以在 I/O 操作完成时通知应用程序,从而避免阻塞线程。
  • 高效的事件通知IOCP 通过完成端口来通知应用程序 I/O 操作的完成,可以高效地处理大量并发 I/O 请求。

IOCP VS EPoll区别

  1. 操作系统支持
    • IOCP 是 Windows 操作系统特有的 I/O 模型。
    • epoll 是 Linux 操作系统特有的 I/O 事件通知机制。
  2. 线程模型
    • IOCP 使用线程池来管理 I/O 请求,可以自动调整线程数量。
    • epoll 通常与自定义的线程池结合使用,需要应用程序自己管理线程。
  3. 事件通知方式
    • IOCP 通过完成端口来通知应用程序 I/O 操作的完成。
    • epoll 通过 epoll_wait 函数来等待事件,并返回准备好的文件描述符。
  4. 触发模式
    • IOCP 没有明确的触发模式概念。
    • epoll 支持边缘触发和水平触发两种模式。

深入思考

其实呢,文章学到这,网络编程还不是真正的了解,因为没有真正的写过并且很多模糊的概念会影响你对服务器设计选型的考虑欠缺!这里,笔者将进一步补充缺失的很精华的部分!

这部分目前很乱,都是笔者看到一点搜集一点的,之后会再重新系统归纳!

epoll的优点

1.没有最大并发连接的限制,能打开的FD的上限远大于1024(1G的内存上能监听约10万个端口);

2.效率提升,不是轮询的方式,不会随着FD数目的增加效率下降。只有活跃可用的FD才会调用callback函数; 即Epoll最大的优点就在于它只管你“活跃”的连接,而跟连接总数无关,因此在实际的网络环境中,Epoll的效率就会远远高于select和poll。

3.文件映射内存 (mmap): 当你调用 epoll_wait 函数时,epoll 实际上是使用了内核和用户空间之间的一段共享内存来传递事件。这是通过 mmap 系统调用实现的。epoll_wait 将结果事件写入这段共享内存,然后用户空间程序可以直接读取这些事件,而不需要在用户空间和内核空间之间进行多次数据复制。

LT 模式不能会避免类惊群问题,ET 模式能避免类惊群问题。

【总结笔记】深度理解 Web Server 技术 —— 项目整体

考虑这样一种情况:多个进程/多个线程共享同一个 epollfd。当某个事件就绪时,多个进程/线程会同时被唤醒。假设只有 A 获取了 CPU 控制权,但没立刻处理该事件,在 LT 模式下,该事件又会放回就绪队列;等下次轮到通知该事件时,所有进程/线程又被唤醒。在 ET 模式下,无论该事件与没有被处理,该事件不会再被通知,即该事件节点被从就绪队列中删除,直到下个事件到来。

select/epoll/poll选择

  • 当监测的fd数目较小,且全部fd都比较活跃,建议使用select或者poll

  • 当监测的fd数目非常大,且单位时间只有其中的一部分fd处于就绪状态,这个时候使用epoll能够明显提升性能了。【简单来说,就是放回给用户态的时候不放回全部,只放回变化!】减少遍历的时间!

image-20240719144207819

image-20240917142911860

**LT和ET模式使用场景 **

  • LT适用于并发量小的情况,ET适用于并发量大的情况。

  • ET在通知用户之后,就会将fd从就绪链表中删除,而LT不会,它会一直保留,这就会导致随着fd增多,就绪链表越大,每次都要从头开始遍历找到对应的fd,所以并发量越大效率越低。ET因为会删除所以效率比较高。

    【和上面的考量点是一样的】

监听socket你的选择?

优先选择非阻塞,同时最好使用水平触发模式

  • 非阻塞的原因:

    • 存在特例:设置阻塞的监听sock,当客户端发起连接请求时服务端繁忙没顾上accept,等回头accept时客户端自己已经断开了,此时当服务器到达accept时会发生阻塞。很好理解,客户端发来了消息然后自己毙了,但是服务端并不知道客户端毙了,所以会卡在accept,因此,使用的是非阻塞描述符
  • 水平触发模式的原因:

    • 最好使用水平触发模式,边缘触发模式会导致高并发情况下,有的客户端会连接不上。如果非要使用边缘触发,网上有的方案是用while来循环accept()

    • 漏处理连接请求:假设有 1000 个客户端同时发起连接,epoll_wait 只通知一次,程序调用 accept() 处理一个连接,然后继续处理其他事情(如其他文件描述符上的事件),此时还有 999 个连接请求未处理,但因为 epoll 不会再次通知,因此这些连接请求被漏掉,导致客户端连接不上。
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94

      - ```c++
      #include <stdio.h>
      #include <stdlib.h>
      #include <unistd.h>
      #include <errno.h>
      #include <sys/socket.h>
      #include <netinet/in.h>
      #include <arpa/inet.h>
      #include <sys/epoll.h>

      #define MAX_EVENTS 10
      #define PORT 8888

      void handle_error(const char *msg) {
      perror(msg);
      exit(EXIT_FAILURE);
      }

      int main() {
      // 创建监听套接字
      int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
      if (listen_fd == -1) {
      handle_error("socket");
      }

      // 绑定地址和端口
      struct sockaddr_in addr;
      addr.sin_family = AF_INET;
      addr.sin_addr.s_addr = INADDR_ANY;
      addr.sin_port = htons(PORT);
      if (bind(listen_fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
      handle_error("bind");
      }
      // 开始监听
      if (listen(listen_fd, SOMAXCONN) == -1) {
      handle_error("listen");
      }
      // 创建 epoll 实例
      int epoll_fd = epoll_create1(0);
      if (epoll_fd == -1) {
      handle_error("epoll_create1");
      }
      // 将监听套接字加入 epoll 实例中
      struct epoll_event event;
      event.events = EPOLLIN | EPOLLET; // 边缘触发模式 ET
      event.data.fd = listen_fd;
      if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &event) == -1) {
      handle_error("epoll_ctl EPOLL_CTL_ADD");
      }
      // 主循环,处理 epoll 事件
      while (1) {
      struct epoll_event events[MAX_EVENTS];
      int n = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
      if (n == -1) {
      handle_error("epoll_wait");
      }

      for (int i = 0; i < n; i++) {
      if (events[i].data.fd == listen_fd) {
      while (1) {
      // 循环接受连接请求
      struct sockaddr_in client_addr;
      socklen_t client_len = sizeof(client_addr);
      int client_fd = accept(listen_fd, (struct sockaddr *)&client_addr, &client_len);
      if (client_fd == -1) {
      if (errno == EAGAIN || errno == EWOULDBLOCK) {
      // 没有更多连接请求需要接受了
      break;
      } else {
      handle_error("accept");
      }
      }

      // 处理新连接 client_fd
      printf("Accepted connection from %s:%d\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));

      // 将新连接 client_fd 加入 epoll 实例中
      event.events = EPOLLIN | EPOLLET;
      event.data.fd = client_fd;
      if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &event) == -1) {
      handle_error("epoll_ctl EPOLL_CTL_ADD");
      }
      }
      } else {
      // 处理其他事件,如 EPOLLIN、EPOLLOUT 等
      }
      }
      }
      close(epoll_fd);
      close(listen_fd);
      return 0;
      }

epoll事件常见标识位

其中epoll事件类型有以下几种:

  • EPOLLIN:表示对应的文件描述符可以读(包括对端SOCKET正常关闭)

  • EPOLLOUT:表示对应的文件描述符可以写

  • EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)

  • EPOLLERR:表示对应的文件描述符发生错误

  • EPOLLHUP:表示对应的文件描述符被挂断【EPOLLRDHUP 表示读关闭】

    • 【EPOLLHUP 通常表示文件描述符的挂断或关闭,可能是由于远程关闭连接、写端关闭、设备断开等情况引起的。】

    • Note:当对端关闭的时候,并且没有数据可读了,当前才能关闭!

    • if ((revents_ & EPOLLHUP) && !(revents_ & EPOLLIN)){
          if (closeCallback_){
              closeCallback_();
          }
      }
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
        
      - EPOLLET:将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)而言的

      - EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。【一个socket连接在任一时刻都只被一个线程处理】

      在监听事件的时候用异或操作标识!

      ### **EPOLLRDHUP vs EPOLLHUP**

      > **Q:客户端断开连接,服务端epoll监听到的事件是什么**

      原文链接:https://blog.csdn.net/weixin_44484715/article/details/120825122

      A:在使用 epoll 时,客户端正常断开连接(调用 close()),在服务器端会触发一个 epoll 事件。在早期的内核中,这个 epoll 事件一般是 EPOLLIN,即 0x1,代表连接可读。

      连接池检测到某个连接发生 EPOLLIN 事件且没有错误后,会认为有请求到来,将连接交给上层进行处理。这样一来,上层尝试在对端已经 close() 的连接上读取请求,只能读到 EOF(文件末尾),会认为发生异常,报告一个错误。

      **后期的内核中增加了 EPOLLRDHUP 事件,代表对端断开连接。对端连接断开触发的 epoll 事件会包含 EPOLLIN | EPOLLRDHUP,即 0x2001。有了这个事件,对端断开连接的异常就可以在底层进行处理了,不用再移交到上层**

      区别:

      https://blog.csdn.net/zhouguoqionghai/article/details/94591475

      ### **epoll的连接限制**

      **系统级**:当前系统可打开的最大数量,通过 cat /proc/sys/fs/file-max 查看

      **用户级**:指定用户可打开的最大数量,通过 cat /etc/security/limits.conf 查看

      **进程级**:单个进程可打开的最大数量,通过 cat /proc/sys/fs/nr_open 查看

      链接:https://www.nowcoder.com/discuss/353159645424459776

      ### **epoll是否是线程安全的**

      简单来说 epoll 是通过锁来保证线程安全,epoll 中粒度最小的自旋锁 ep->lock(spinlock) 用来保护就绪的队列,互斥锁 ep->mtx 用来保护 epoll 的重要数据结构红黑树。

      **主要的两个函数:**

      - epoll_ctl():当需要根据不同的 operation 通过 ep_insert()或者 ep_remove()等接口对 epoll 自身的数据结构进行操作时都提前获得了 ep->mtx 锁
      - epll_wait():获得自旋锁 ep->lock 来保护就绪队列

      ### **阻塞描述符和非阻塞描述符的区别**

      **阻塞描述符**

      等待操作完成:

      - 当对阻塞描述符进行I/O操作(如读、写)时,调用方会被阻塞,直到操作完成为止。例如,读取操作会一直等待,直到有数据可读为止;写入操作会一直等待,直到数据被成功写入。

      **非阻塞描述符**

      立即返回:

      - 非阻塞描述符在执行I/O操作时,不会等待操作完成。如果操作不能立即完成(如没有数据可读),调用会立即返回一个错误(如 `EWOULDBLOCK``EAGAIN`)。

      ### **ET需要非阻塞socket**

      LT阻塞和非阻塞socket都可以,ET只能用非阻塞!

      **原因**

      因为ET模式,当有数据时,只会被触发一次,所以每次读取数据时,一定要一次性把数据读取完(必须等到它们返回**`EWOULDBLOCK`(确保所有数据都已读完或写完),所以我们需要设置一个whlie循环read数据,**==但如果read是阻塞模式,那么如果没有数据时,将会阻塞,导致程序卡死。所以这里read只允许非阻塞模式,如果没有数据,read将会跳出循环,继续执行其他程序。

      > **LE和ET模式下,读的区别**

      - **水平触发:**只要缓冲区有数据,epoll_wait 就会一直被触发,直到缓冲区为空;**就是当epoll_wait 检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件。**下次调用 epoll_wait 时,会再次响应应用程序并通知此事件。
      - **边沿触发:**只有所监听的事件状态改变或者有事件发生时,epoll_wait 才会被触发;当 epoll_wait 检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该事件。 如果不处理,下次调用 epoll_wait 时,不会再次响应应用程序并通知此事件。

      编程上面的区别,read函数举例!

      LT模式下,分阻塞和非阻塞情况:

      - 阻塞socket处理

      ```C++
      while (1) {
      int n = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
      for (int i = 0; i < n; i++) {
      if (events[i].events & EPOLLIN) {
      char buffer[BUFFER_SIZE];
      ssize_t bytes_read = read(events[i].data.fd, buffer, sizeof(buffer));
      if (bytes_read > 0) {
      // 处理读取到的数据
      printf("Read %zd bytes\n", bytes_read);
      } else if (bytes_read == 0) {
      // 对端关闭连接
      close(events[i].data.fd);
      } else {
      // 处理错误
      perror("read");
      close(events[i].data.fd);
      }
      }
      }
      }
  • 非阻塞socket处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
while (1) {
int n = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
for (int i = 0; i < n; i++) {
if (events[i].events & EPOLLIN) {
while (1) {
char buffer[BUFFER_SIZE];
ssize_t bytes_read = read(events[i].data.fd, buffer, sizeof(buffer));
if (bytes_read > 0) {
// 处理读取到的数据
printf("Read %zd bytes\n", bytes_read);
} else if (bytes_read == 0) {
// 对端关闭连接
close(events[i].data.fd);
break;
} else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 缓冲区没有数据,非阻塞模式下读操作会返回此错误
break;
} else {
// 处理其他错误
perror("read");
close(events[i].data.fd);
break;
}
}
}
}
}
}

ET模式下,只有非阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
while (1) {
int n = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
for (int i = 0; i < n; i++) {
if (events[i].events & EPOLLIN) {
while (1) {
char buffer[BUFFER_SIZE];
ssize_t bytes_read = read(events[i].data.fd, buffer, sizeof(buffer));
if (bytes_read > 0) {
// 处理读取到的数据
printf("Read %zd bytes\n", bytes_read);
} else if (bytes_read == 0) {
// 对端关闭连接
close(events[i].data.fd);
break;
} else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 缓冲区没有数据,非阻塞模式下读操作会返回此错误
break;
} else {
// 处理其他错误
perror("read");
close(events[i].data.fd);
break;
}
}
}
}
}
}

总结:也就是说,阻塞socket通知我了,那么里面一定有数据,如果没有那就是错误。那么非阻塞的情况呢,我可能读到就是没有数据,我根据error去区分到底是错误还是没有数据!

记忆的话,就记住(bytes_read == 0)读到的字节为0,就是对端关闭。

而且,只有读才知道对方关闭,写是不知道的,只知道当前缓冲区可写!

LE和ET模式下,写的区别

image-20240917142506038

LT模式下

  1. LT模式下,可写状态的 fd 【不管阻塞还是非阻塞】会一直触发事件【因为缓冲区不为满,有空位】。【就是像个报警器一样,打开了需要手动关闭】
  2. 为什么会出现这种情况?
    1. 状态保持不变:只要文件描述符的状态保持可写,epoll_wait 就会每次都返回该事件,因为 LT 模式持续报告当前状态。
  3. 有什么缺点
    1. 无效循环:如果应用程序每次都不做实际的写操作,仅仅处理返回的事件而没有真正写入数据,那么 epoll_wait 会每次都返回这个可写事件,导致无效循环。
    2. CPU 消耗:不断处理同一事件会浪费 CPU 资源,因为应用程序可能会反复处理同一个文件描述符的可写事件而没有实际写入数据。
  4. 解决办法
    1. 数据量少时:如果数据量少,直接调用 sendwrite 函数将数据发送出去。如果发送成功,不需要绑定 EPOLLOUT 事件。【EPOLL_CTL_MOD】
    2. 数据量多时:如果一次性无法发送完所有数据,则需要绑定 EPOLLOUT 事件,以便在文件描述符再次可写时,epoll 能够通知应用程序继续发送剩余的数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define MAX_EVENTS 10
void setnonblocking(int fd) {
int opts = fcntl(fd, F_GETFL);
if (opts < 0) {
perror("fcntl(F_GETFL)");
exit(EXIT_FAILURE);
}
opts = (opts | O_NONBLOCK);
if (fcntl(fd, F_SETFL, opts) < 0) {
perror("fcntl(F_SETFL)");
exit(EXIT_FAILURE);
}
}
void add_epollout(int epoll_fd, int fd) {
struct epoll_event ev;
ev.events = EPOLLOUT;
ev.data.fd = fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ev) == -1) {
perror("epoll_ctl");
exit(EXIT_FAILURE);
}
}
void remove_epollout(int epoll_fd, int fd) {
struct epoll_event ev;
ev.events = EPOLLIN; // 只监听读事件
ev.data.fd = fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ev) == -1) {
perror("epoll_ctl");
exit(EXIT_FAILURE);
}
}
void handle_write(int epoll_fd, int fd, const char *data, size_t length) {
ssize_t written = send(fd, data, length, 0);
if (written == -1) {
perror("send");
close(fd);
return;
}
if (written < length) {
// 数据没有一次性发送完,绑定 EPOLLOUT 事件
add_epollout(epoll_fd, fd);
} else {
// 数据发送完毕,移除 EPOLLOUT 事件
remove_epollout(epoll_fd, fd);
}
}
int main() {
int epoll_fd = epoll_create1(0);
if (epoll_fd == -1) {
perror("epoll_create1");
exit(EXIT_FAILURE);
}
int listen_fd = ...; // 创建并绑定监听套接字
setnonblocking(listen_fd);
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = listen_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev) == -1) {
perror("epoll_ctl");
exit(EXIT_FAILURE);
}
struct epoll_event events[MAX_EVENTS];
while (1) {
int n = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
for (int i = 0; i < n; i++) {
if (events[i].events & EPOLLIN) {
// 处理读事件,例如接受新连接或读取数据
} else if (events[i].events & EPOLLOUT) {
// 处理写事件,假设要发送的数据为 data
const char *data = "Hello, world!";
handle_write(epoll_fd, events[i].data.fd, data, strlen(data));
}
}
}
close(epoll_fd);
return 0;
}

ET模式下:

在 ET 模式下,当文件描述符变为可写状态时,epoll 只会在状态变化时通知应用程序一次。如果应用程序没有在第一次通知时将所有数据写入,那么后续的数据写入不会触发新的通知,直到有新的状态变化(例如,缓冲区从满变为部分空)。【我理解的状态改变是只有在【满到不满的时候通知,假如为空的话不会通知】】

补充:ET模式下,假如第一次就可以完全往tcpsocket描述符中全部写入数据,写完后我写事件没有关闭,那么可能在第二次在出现吗?【不会,tcpsocket描述符就你一个人写】

具体从编程角度实现的话!

LT模式下的两种情况

  • 阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
while (1) {
int n = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
for (int i = 0; i < n; i++) {
if (events[i].events & EPOLLOUT) {
char buffer[BUFFER_SIZE];
// 假设 buffer 中有要发送的数据
ssize_t bytes_written = write(events[i].data.fd, buffer, sizeof(buffer));
if (bytes_written > 0) {
// 处理已经发送的数据
printf("Written %zd bytes\n", bytes_written);
} else {
// 处理错误
perror("write");
close(events[i].data.fd);
}
}
}
}

Note:假如我的buffer数据就是0,那么程序就走到处理错误这块了?因此需要进行特殊判断一下!

  • 非阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
while (1) {
int n = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
for (int i = 0; i < n; i++) {
if (events[i].events & EPOLLOUT) {
while (1) {
char buffer[BUFFER_SIZE];
// 假设 buffer 中有要发送的数据
ssize_t bytes_written = write(events[i].data.fd, buffer, sizeof(buffer));
if (bytes_written > 0) {
// 处理已经发送的数据
printf("Written %zd bytes\n", bytes_written);
} else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 发送缓冲区没有足够空间,非阻塞模式下写操作会返回此错误
break;
} else {
// 处理其他错误
perror("write");
close(events[i].data.fd);
break;
}
}
}
}
}
}

ET模式下的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
while (1) {
int n = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
for (int i = 0; i < n; i++) {
if (events[i].events & EPOLLOUT) {
while (1) {
char buffer[BUFFER_SIZE];
// 假设 buffer 中有要发送的数据
ssize_t bytes_written = write(events[i].data.fd, buffer, sizeof(buffer));
if (bytes_written > 0) {
// 处理已经发送的数据
printf("Written %zd bytes\n", bytes_written);
} else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 发送缓冲区没有足够空间,非阻塞模式下写操作会返回此错误
break;
} else {
// 处理其他错误
perror("write");
close(events[i].data.fd);
break;
}
}
}
}
}
}

在一个请求的事件加回到 epoll 后,nginx 转而去处理其他的请求。我认为 nginx 的异步体现在这里。

我认为这个和muduo的写事件很像!这样就不会阻塞在那块了!

accept的阻塞与非阻塞问题讨论

这个问题是和监听描述符是否为阻塞有关的!

  • 监听描述符为阻塞
    • 在阻塞模式下,当调用 accept 函数时,如果没有连接请求,函数会阻塞(暂停执行),直到有一个新的连接到来。
  • 监听描述符为非阻塞
    • 在非阻塞模式下,如果没有连接请求,accept 函数会立即返回 -1,并将 errno 设置为 EAGAINEWOULDBLOCK

connect的阻塞与非阻塞问题讨论

这个问题是和监听描述符是否为阻塞有关的!

  • 监听描述符为阻塞
    • 在阻塞模式下,connect 函数会阻塞(暂停执行),直到连接成功建立或发生错误。
    • 如果连接过程较长,例如目标服务器响应缓慢或网络状况不佳,调用 connect 的线程会被挂起,不能执行其他任务,直到连接完成。
  • 监听描述符为非阻塞
    • 在非阻塞模式下,connect 函数会立即返回,并且 errno 设置为 EINPROGRESS,表示连接正在进行中。
    • #include <stdio.h>
      #include <stdlib.h>
      #include <string.h>
      #include <unistd.h>
      #include <arpa/inet.h>
      #include <fcntl.h>
      #include <errno.h>
      #include <sys/epoll.h>
      
      #define MAX_EVENTS 10
      
      int main() {
          int sockfd = socket(AF_INET, SOCK_STREAM, 0);
          if (sockfd < 0) {
              perror("socket");
              exit(EXIT_FAILURE);
          }
      
          // 将套接字设置为非阻塞模式
          int flags = fcntl(sockfd, F_GETFL, 0);
          fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
      
          struct sockaddr_in server_addr;
          server_addr.sin_family = AF_INET;
          server_addr.sin_port = htons(8080);
          inet_pton(AF_INET, "127.0.0.1", &server_addr.sin_addr);
      
          int ret = connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr));
          if (ret < 0) {
              if (errno != EINPROGRESS) {
                  perror("connect");
                  close(sockfd);
                  exit(EXIT_FAILURE);
              }
          }
      
          // 创建 epoll 实例
          int epoll_fd = epoll_create1(0);
          if (epoll_fd < 0) {
              perror("epoll_create");
              close(sockfd);
              exit(EXIT_FAILURE);
          }
      
          // 将套接字添加到 epoll 中
          struct epoll_event event;
          event.events = EPOLLOUT;
          event.data.fd = sockfd;
          if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &event) < 0) {
              perror("epoll_ctl");
              close(sockfd);
              close(epoll_fd);
              exit(EXIT_FAILURE);
          }
      
          // 等待事件
          struct epoll_event events[MAX_EVENTS];
          int n = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
          if (n < 0) {
              perror("epoll_wait");
              close(sockfd);
              close(epoll_fd);
              exit(EXIT_FAILURE);
          }
      
          for (int i = 0; i < n; i++) {
              if (events[i].events & EPOLLOUT) {
                  int err;
                  socklen_t len = sizeof(err);
                  if (getsockopt(events[i].data.fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0) {
                      perror("getsockopt");
                      close(events[i].data.fd);
                      close(epoll_fd);
                      exit(EXIT_FAILURE);
                  }
      
                  if (err == 0) {
                      // 连接成功
                      printf("Connected to the server\n");
                  } else {
                      // 连接失败
                      fprintf(stderr, "connect error: %s\n", strerror(err));
                      close(events[i].data.fd);
                      close(epoll_fd);
                      exit(EXIT_FAILURE);
                  }
              }
          }
      
          close(epoll_fd);
          close(sockfd);
          return 0;
      }
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18



      ### **SO_REUSEADDR和SO_REUSEPORT 的区别**

      **bug:address** **already in use!**

      - SO_REUSEADDR:解决先断开一方 TIME-WAIT的 2ML问题!
      - SO_REUSEPORT:解决2个服务器监听同一个端口复用的问题!【内核会做负载均衡】

      不存在交叉使用的情况

      [SO_REUSEADDR 和 SO_REUSEPORT 的区别和用法实例详细探究讲解](https://blog.csdn.net/Wanglei110311089/article/details/129612988)

      使用下面命令查看端口号信息

      ```bash
      ss -antp | grep 80

SO_LINGER选项

SO_LINGER 选项用于控制当套接字关闭时,数据是否需要立即被发送,或是否需要等待数据被传输完毕。它是一种用于管理 TCP 套接字关闭行为的机制。

SO_LINGER 选项的作用

SO_LINGER 选项定义了在调用 close() 函数关闭套接字时,TCP 连接应该如何处理未发送的数据。它的主要作用是确定是否应该等待所有数据被发送完毕后再关闭连接,或者在关闭时立即放弃未发送的数据。

SO_LINGER 选项的设置

通过 setsockopt() 函数设置 SO_LINGER 选项,其原型如下:

1
int setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen);
  • sockfd: 套接字描述符。
  • level: 选项所在的协议层,通常为 SOL_SOCKET
  • optname: 选项名称,这里为 SO_LINGER
  • optval: 指向 linger 结构的指针,定义了具体的选项值。
  • optlen: 选项值的长度,通常为 sizeof(struct linger)

linger 结构定义如下:

1
2
3
4
struct linger {
int l_onoff; // 是否启用 linger 选项
int l_linger; // 等待时间(秒)
};

使用场景和效果

  1. 启用 SO_LINGER,并设置非零的等待时间

    l_onoff 设置为非零值时,l_linger 表示等待的时间(秒)。在关闭套接字时,系统会等待直到所有的数据都被传输完毕,或者等待时间结束。

    1
    2
    3
    4
    struct linger linger_opt;
    linger_opt.l_onoff = 1; // 启用 linger 选项
    linger_opt.l_linger = 10; // 等待 10 秒
    setsockopt(sockfd, SOL_SOCKET, SO_LINGER, &linger_opt, sizeof(linger_opt));
    • 优点:确保数据被完全发送到对方,避免数据丢失。
    • 缺点:可能导致套接字关闭操作阻塞,尤其是在网络拥塞或对端不可达的情况下。
  2. 启用 SO_LINGER,并设置 l_linger 为 0

    l_onoff 设置为非零值,并且 l_linger 设置为 0 时,TCP 连接会立即关闭,不等待未发送的数据。

    1
    2
    3
    4
    struct linger linger_opt;
    linger_opt.l_onoff = 1; // 启用 linger 选项
    linger_opt.l_linger = 0; // 不等待
    setsockopt(sockfd, SOL_SOCKET, SO_LINGER, &linger_opt, sizeof(linger_opt));
    • 优点:关闭操作不会被阻塞。
    • 缺点:未发送的数据可能会丢失,因为连接在数据完全发送之前就关闭了。
  3. 禁用 SO_LINGER

    l_onoff 设置为 0 时,SO_LINGER 选项被禁用,套接字关闭时不进行特殊处理。

    1
    2
    3
    struct linger linger_opt;
    linger_opt.l_onoff = 0; // 禁用 linger 选项
    setsockopt(sockfd, SOL_SOCKET, SO_LINGER, &linger_opt, sizeof(linger_opt));
    • 优点:关闭操作不会被阻塞。
    • 缺点:如果有未发送的数据,可能会丢失。

示例代码

以下是一个示例代码,演示如何设置 SO_LINGER 选项:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>

int main() {
int sockfd;
struct linger linger_opt;

// 创建套接字
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
perror("socket");
exit(EXIT_FAILURE);
}

// 配置 SO_LINGER 选项
linger_opt.l_onoff = 1; // 启用 linger 选项
linger_opt.l_linger = 5; // 等待 5 秒
if (setsockopt(sockfd, SOL_SOCKET, SO_LINGER, &linger_opt, sizeof(linger_opt)) < 0) {
perror("setsockopt");
close(sockfd);
exit(EXIT_FAILURE);
}

// 执行其他操作,例如连接、发送数据等

// 关闭套接字
close(sockfd);
return 0;
}

总结

SO_LINGER 选项用于控制 TCP 套接字在关闭时的行为,具体取决于是否需要等待未发送的数据被传输完毕。它可以用于确保数据的可靠传输,但也可能导致套接字关闭操作阻塞。根据应用程序的需求,可以选择适当的 SO_LINGER 设置来平衡数据传输的完整性和关闭操作的效率。

SO_RCVLOWAT 和 SO_SNDLOWAT

SO_RCVLOWATSO_SNDLOWAT 是用于控制 TCP 接收缓冲区和发送缓冲区的低水位标记的套接字选项。它们在网络编程中与 I/O 复用系统调用(如 select()poll()epoll)密切相关,用于判断何时可以读取或写入数据。

SO_RCVLOWAT(接收低水位标记)

  • 作用:设置接收缓冲区的低水位标记,指定套接字在 I/O 复用调用中被标记为可读前,接收缓冲区中必须有多少数据。
  • 默认值:通常为 1,表示接收缓冲区中只要有 1 字节数据,I/O 复用调用就会通知该套接字可读。
  • 使用场景:适用于需要等待一定量的数据到达后再处理的场景。例如,如果应用程序想在读取数据之前等待至少 100 字节数据到达,它可以将 SO_RCVLOWAT 设置为 100。

SO_SNDLOWAT(发送低水位标记)

  • 作用:设置发送缓冲区的低水位标记,指定套接字在 I/O 复用调用中被标记为可写前,发送缓冲区中必须有多少空闲空间。
  • 默认值:通常为 1,表示只要发送缓冲区有 1 字节的可写空间,I/O 复用调用就会通知该套接字可写。
  • 使用场景:适用于需要等待缓冲区中有一定量的可写空间后再进行写操作的场景。例如,如果应用程序想在写入数据之前等待至少 100 字节的空闲空间,它可以将 SO_SNDLOWAT 设置为 100。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <netinet/in.h>

int main() {
int sockfd;
int rcvlowat = 100; // 接收低水位标记
int sndlowat = 100; // 发送低水位标记

// 创建套接字
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
perror("socket");
exit(EXIT_FAILURE);
}

// 设置接收低水位标记
if (setsockopt(sockfd, SOL_SOCKET, SO_RCVLOWAT, &rcvlowat, sizeof(rcvlowat)) < 0) {
perror("setsockopt SO_RCVLOWAT");
close(sockfd);
exit(EXIT_FAILURE);
}

// 设置发送低水位标记
if (setsockopt(sockfd, SOL_SOCKET, SO_SNDLOWAT, &sndlowat, sizeof(sndlowat)) < 0) {
perror("setsockopt SO_SNDLOWAT");
close(sockfd);
exit(EXIT_FAILURE);
}

// 继续其他操作,例如连接、发送和接收数据

close(sockfd);
return 0;
}

SOCK_CLOEXEC

SOCK_CLOEXEC 是一个用于创建套接字时的标志,目的是控制文件描述符的行为,特别是在执行 exec 系列系统调用(如 execve)时的行为。它在网络编程中非常实用,可以帮助避免某些类型的文件描述符泄漏问题。

SOCK_CLOEXEC 的作用

SOCK_CLOEXEC 是一个标志,用于在调用 socket()socketpair() 创建套接字时自动设置文件描述符的 close-on-exec 属性。

  • close-on-exec 属性:这是文件描述符的一个特性,表示当进程调用 exec 系列函数(如 execve)来运行另一个程序时,这个文件描述符会自动关闭。
  • SOCK_CLOEXEC:直接在创建套接字时设置 close-on-exec 属性,而不需要在创建套接字后再调用 fcntl() 来设置。这种方式更加高效,并且避免了在多线程程序中可能出现的竞态条件。

文件描述符泄漏问题

在多进程编程中,特别是在使用 fork()exec() 的场景中,文件描述符泄漏是一个常见问题。如果父进程在 fork() 之后调用 exec(),而没有显式地关闭不必要的文件描述符,这些文件描述符将被子进程继承。

  • 问题:如果子进程继承了父进程的文件描述符(如套接字、管道等),但子进程并不需要这些文件描述符,这可能导致资源泄漏或不预期的行为。
  • 解决方案:通过设置 close-on-exec 属性,确保在子进程执行新程序时自动关闭这些文件描述符,避免泄漏。

使用 SOCK_CLOEXEC 的好处

  1. 简化代码:你不再需要在创建套接字后手动调用 fcntl() 来设置 close-on-exec,这使得代码更加简洁和易读。

    1
    2
    3
    4
    5
    6
    // 传统方式:创建套接字后手动设置 close-on-exec
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    fcntl(sockfd, F_SETFD, FD_CLOEXEC);

    // 使用 SOCK_CLOEXEC 方式
    int sockfd = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, 0);
  2. 避免竞态条件:在多线程程序中,如果一个线程在套接字创建后但在 fcntl() 设置 close-on-exec 之前调用 fork(),可能导致文件描述符在不应该被继承的情况下被子进程继承。SOCK_CLOEXEC 可以避免这种竞态条件,因为它在套接字创建时就设置了 close-on-exec 属性。

  3. 提高安全性:通过自动关闭不必要的文件描述符,可以减少意外的文件描述符泄漏,从而提高程序的安全性,特别是在编写网络服务程序时。

主机上最多能保持多少个连接

1.端口号:理论端口号是16位,范围1~65535

2.描述符

3.线程并发问题

4.内存

5.CPU

https://www.nowcoder.com/discuss/353159645424459776

Connection reset by peer

“Connection reset by peer”表示当前服务器接受到了通信对端发送的TCP RST信号,即通信对端已经关闭了连接,通过RST信号希望接收方关闭连接。

  • 当尝试和未开放的服务器端口建立tcp连接时,服务器tcp将会直接向客户端发送reset报文;
  • 双方之前已经正常建立了通信通道,也可能进行过了交互,当某一方在交互的过程中发生了异常,如崩溃等,异常的一方会向对端发送reset报文,通知对方将连接关闭;
  • 当收到TCP报文,但是发现该报文不是已建立的TCP连接列表可处理的,则其直接向对端发送reset报文;
  • ack报文丢失,并且超出一定的重传次数或时间后,会主动向对端发送reset报文释放该TCP连接;

https://www.cnblogs.com/toSeeMyDream/p/9890024.html

将SIGPIPE信号设置为SIG_IGN

image-20240719165520782

当客户端发起close()命令后会向服务端发送FIN包,不再接受和发送数据,服务端接收到信息后还可以向客户端发送数据,此时发送数据到客户端就会产生SIGPIPE信号,默认产生该信号会关闭进程,所以我们要将该信号的处理方式设置为忽略.

1
2
3
4
5
6
7
8
9
10
// 忽略 SIGPIPE 信号的处理函数
void handle_sigpipe(int signum) {
printf("SIGPIPE caught and ignored\n");
}

int main() {
...
signal(SIGPIPE, handle_sigpipe);
...
}

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

// 忽略 SIGPIPE 信号的处理函数
void handle_sigpipe(int signum) {
printf("SIGPIPE caught and ignored\n");
}

int main() {
int sockfd, newsockfd;
struct sockaddr_in serv_addr, cli_addr;
socklen_t clilen;
char buffer[256];

// 创建套接字
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
perror("ERROR opening socket");
exit(1);
}

// 配置服务器地址结构
bzero((char *) &serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = INADDR_ANY;
serv_addr.sin_port = htons(12345);

// 绑定套接字到地址
if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
perror("ERROR on binding");
close(sockfd);
exit(1);
}

// 监听连接
listen(sockfd, 5);
clilen = sizeof(cli_addr);

// 接受连接
newsockfd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen);
if (newsockfd < 0) {
perror("ERROR on accept");
close(sockfd);
exit(1);
}

// 设置忽略 SIGPIPE 信号
signal(SIGPIPE, handle_sigpipe);

// 读取客户端消息
bzero(buffer, 256);
int n = read(newsockfd, buffer, 255);
if (n < 0) {
perror("ERROR reading from socket");
close(newsockfd);
close(sockfd);
exit(1);
}
printf("Here is the message: %s\n", buffer);

// 模拟客户端关闭连接后发送数据
sleep(5); // 等待客户端关闭连接
n = write(newsockfd, "I got your message", 18);
if (n < 0) {
perror("ERROR writing to socket");
}

close(newsockfd);
close(sockfd);
return 0;
}

EINTR 信号

即系统在处理过程中由于收到某个信号从而中断当前操作去做信号处理后返回时的错误

image-20240719161859209

如果程序在执行处于阻塞状态的系统调用时接收到信号,并且我们为该信号设置了信号处理函数,则默认情况下系统调用将被中断,并且errno被设置为EINTR。

定时器信号

另一种常见的情况是使用 alarm() 或 POSIX 定时器(如 timer_create())来设置一个定时器信号。当定时器到期时,它可能会中断正在进行的系统调用。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include <stdio.h>
#include <unistd.h>
#include <signal.h>
#include <errno.h>

// Signal handler for SIGALRM
void handle_alarm(int sig) {
printf("Caught SIGALRM\n");
}

int main() {
char buffer[128];
ssize_t bytes_read;

// Register signal handler for SIGALRM
signal(SIGALRM, handle_alarm);

// Set an alarm to go off in 5 seconds
alarm(5);

printf("Reading from stdin (you have 5 seconds to type something)...\n");

// Blocking read from stdin
bytes_read = read(STDIN_FILENO, buffer, sizeof(buffer));

if (bytes_read < 0) {
if (errno == EINTR) {
// read() was interrupted by SIGALRM
printf("read() was interrupted by SIGALRM\n");
} else {
perror("read");
}
} else {
buffer[bytes_read] = '\0';
printf("You typed: %s\n", buffer);
}

return 0;
}

解释:

  • 在这个例子中,read() 调用可能会被 SIGALRM 信号中断。
  • alarm(5) 设置的定时器到期时,SIGALRM 信号会被发送到进程,导致正在阻塞的 read() 调用返回 -1 并设置 errnoEINTR
  • 程序接收到 SIGALRM 后,在信号处理函数中打印消息,并在 read() 返回后检查是否发生了 EINTR,从而决定如何处理。

timefd是什么,有什么用?举出和epoll使用的例子

timerfd 是 Linux 内核提供的一个接口,用于创建一个定时器,定时器事件通过文件描述符的形式通知用户。它可以与 epoll 等 I/O 多路复用机制结合使用,实现高效的定时器管理。

主要功能

  • 定时器:创建一次性或周期性的定时器。
  • 文件描述符:定时器通过文件描述符表示,可以像其他文件描述符一样使用,例如与 selectpollepoll 等机制结合使用。
  • 高效:适用于需要高效处理定时事件的应用场景。

使用方法

  1. 创建定时器文件描述符:timerfd_create
  2. 设置定时器:timerfd_settime
  3. 读取定时器事件:通过读取文件描述符来获取定时器的触发次数

示例代码

以下示例展示了如何使用 timerfdepoll 结合来处理定时事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#include <iostream>
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/timerfd.h>
#include <cstring>

int main() {
// 创建epoll实例
int epoll_fd = epoll_create1(0);
if (epoll_fd == -1) {
perror("epoll_create1");
return 1;
}

// 创建timerfd实例
int timer_fd = timerfd_create(CLOCK_REALTIME, 0);
if (timer_fd == -1) {
perror("timerfd_create");
close(epoll_fd);
return 1;
}

// 设置定时器
struct itimerspec new_value;
memset(&new_value, 0, sizeof(new_value));
new_value.it_value.tv_sec = 2; // 2秒后第一次触发
new_value.it_interval.tv_sec = 2; // 之后每2秒触发一次

if (timerfd_settime(timer_fd, 0, &new_value, NULL) == -1) {
perror("timerfd_settime");
close(timer_fd);
close(epoll_fd);
return 1;
}

// 将timerfd添加到epoll实例中
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = timer_fd;

if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, timer_fd, &ev) == -1) {
perror("epoll_ctl");
close(timer_fd);
close(epoll_fd);
return 1;
}

// 事件循环
while (true) {
struct epoll_event events[1];
int nfds = epoll_wait(epoll_fd, events, 1, -1);
if (nfds == -1) {
perror("epoll_wait");
close(timer_fd);
close(epoll_fd);
return 1;
}

if (events[0].data.fd == timer_fd) {
uint64_t expirations;
if (read(timer_fd, &expirations, sizeof(expirations)) != sizeof(expirations)) {
perror("read");
close(timer_fd);
close(epoll_fd);
return 1;
}
std::cout << "Timer expired " << expirations << " times" << std::endl;
}
}

close(timer_fd);
close(epoll_fd);
return 0;
}

目前我所接触到的有:sockfd、eventfdtimerfd、pipefd

注意:普通文件不可以监听

Reactor和Proactor区别

区别:从上面可以看出,Reactor中需要应用程序自己读取或者写入数据,而Proactor模式中,应用程序不需要用户再自己接收数据,直接使用就可以了,操作系统会将数据从内核拷贝到用户区

5种IO模型

(1)阻塞IO:调用者调用了某个函数,等待这个函数返回,期间什么也不做,不停的检查这个函数有没有返回,必须等这个函数返回后才能进行下一步动作。

(2)非阻塞IO:非阻塞等待,每隔一段时间就去检查IO事件是否就绪。没有就绪就可以做其他事情。

(3)信号驱动IO:Linux用套接口进行信号驱动IO,安装一个信号处理函数,进程继续运行并不阻塞,当IO事件就绪,进程收到SIGIO信号,然后处理IO事件。

(4)IO多路复用:Linux用select/poll函数实现IO复用模型,这两个函数也会使进程阻塞,但是和阻塞IO所不同的是这两个函数可以同时阻塞多个IO操作。而且可以同时对多个读操作、写操作的IO函数进行检查。知道有数据可读或可写时,才真正调用IO操作函数。

(5)异步IO:Linux中,可以调用aio_read函数告诉内核描述字缓冲区指针和缓冲区的大小、文件偏移及通知的方式,然后立即返回,当内核将数据拷贝到缓冲区后,再通知应用程序。用户可以直接去使用数据。

前四种模型–阻塞IO、非阻塞IO、多路复用IO和信号驱动IO都属于同步模式,因为其中真正的IO操作(函数)都将会阻塞进程,只有异步IO模型真正实现了IO操作的异步性

【version two】

  • Reactor是被动的,先将文件描述符注册到事件处理上,被动等待select返回就绪的文件描述符以后再处理后续的读写IO操作;Proactor是主动的,调用异步后立刻返回,由内核负责读写IO操作,处理完以后调用相应的完成事件回调函数处理后续逻辑

  • Reactor是同步IO,Proactor是异步IO

  • Reactor实现相对简单;Proactor实现复杂

  • Reactor处理耗时长的操作会造成事件分发的阻塞,影响到后续事件的处理;Proactor异步接收能够处理多个耗时长的并发场景

  • Reactor模型中用户定义的操作是在实际操作之前调用的(比如:定义了操作要向某个socket写数据,那么当该socket可以接收数据的时候,你的操作就会被调用);Proactor模型中用户定义的操作是在实际操作之后调用的(比如:定义了一个操作要向某个socket写数据,那么当操作系统把数据写入socket完成以后,你的操作才会被调用)

errno == EAGAIN 和errno == EWOULDBLOCK分别在什么时候会设置这个,为什么不统一一个呢=

EAGAINEWOULDBLOCK 原本是独立定义的错误码,用于不同的场景。随着时间的推移和 POSIX 标准的演化,这两个错误码被定义为具有相同的值。在大多数现代操作系统中,它们是同义的。这是为了向后兼容旧的代码和系统,而不改变现有程序的行为

网络编程中read使用错误引起的崩溃

学习自:https://blog.csdn.net/u012906122/article/details/108037828

网络编程中,基础接口read的使用要格外注意,先上有问题的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int socketTCP::readn(char *buf, size_t len){
int ret = 0;
size_t left = len;

while(left > 0){
ret = ::read(sock_, buf, len);
if (ret < 0) {
return -1;
}
if (ret == 0) {
break;
}
buf += ret;
left -= ret;
}

return len-left;
}
崩溃现象:

崩溃的地址每次不一样,崩溃地址随机。

复现方法:

客户端每次给服务端发送的包字节大小是32B:

32B,32B,32B,

正常情况下,read接收的包字节大小是:

32B,32B,32B,…

异常情况下,read接收的包字节大小是:

32B,16B,32B,…

分析:

一旦read返回的ret值为16,那么上述代码即陷入“死循环”中,直到buf越界,出现Segmentation Fault.

之所以每次崩溃地址随机,是因为buf每次越界的地址随机。

为什么会"死循环呢"?举例分析下:

(1) readn->32B left=32,ret=32,left=32-32=0 退出循环

(2) readn->32B left=32,ret=16,left=32-16=16

​ ret=32,left=16-32=-16 //read还是会读len个字节,ret=32

因为left是size_t类型,而size_t是unsigned类型,left=-16时left>0为true!!!所以之后会发生死循环.

ret=32,left=-16-32=-48

1
2
3
4
5
6
7
8
size_t的真实类型与操作系统有关。
在32位架构中被普遍定义为:

typedef unsigned int size_t;

而在64位架构中被普遍定义为:

typedef unsigned long size_t;

解决:

修改的话有两种方式:

(1)size_t left=len;改为int left=len;

目的是要当left<0时退出循环。

(2)read(sock_ ,buf,len );改为 read(sock_,buf,left);

目的是保证left会==0,退出循环。

双方同时send导致死锁问题

https://blog.csdn.net/CNHK1225/article/details/64131277

TCP通信中的死锁问题

死锁例子:

数据流向

  • 客户端读取文件数据,调用write()发送到服务端。
  • 服务端接收数据,压缩后调用write()发送回客户端。
  • 客户端再通过read()接收服务端的数据。

缓冲区

  • 每个TCP连接都有两个缓冲区:发送队列(SendQ)和接收队列(RecvQ)。
  • 发送队列存储即将发送的数据,接收队列存储已经接收但未处理的数据。

死锁发生条件

  • 假设客户端和服务端的SendQ和RecvQ队列都有500字节的容量。
  • 客户端发送一个10000字节的文件,服务端读取并压缩后返回。

详细过程

  • 客户端发送2000字节到服务端,服务端将这些数据全部读取并压缩,返回1000字节(压缩比2:1)。
  • 由于客户端还没有调用read()方法,RecvQ队列中存满了1000字节。
  • 继续发送数据,客户端发送了3000字节,服务端接收并尝试返回1500字节,但由于客户端的RecvQ已满,服务端的发送被阻塞。
  • 客户端继续发送,SendQ也很快填满,而此时没有地方再存放数据,客户端的write()操作被阻塞。

解决方案:

1.多线程或异步I/O

  • 通过多线程或异步I/O技术,确保客户端在发送数据的同时,也能接收数据。

2.定期读取数据

  • 客户端应该在发送数据的同时,定期调用read()方法从接收缓冲区中取出数据,防止RecvQ满。

3.发送和接收并行处理

  • 将发送和接收操作并行处理,避免发送数据时接收缓冲区满的问题。

为什么这么多CLOSE_WAIT

情况1:close_wait 状态是在 TCP 四次挥手的时候收到 FIN 但是没有发送自己的 FIN 时出现的。

情况2:服务器的父进程派生出子进程,子进程继承了 socket收到 FIN 的时候子进程处理但父进程没有处理该信号,这时候就会导致 socket 的引用不为 0 无法回收。

本质:CLOSE_WAIT就是连接被动关闭端的应用没调socket.close【CLOSE_WAIT是被动关闭端在等待应用进程的关闭

套接字函数 描述 对应的 TCP 状态
socket() 创建一个套接字 未涉及 TCP 状态
bind() 绑定套接字到一个本地地址和端口 未涉及 TCP 状态
listen() 将套接字设置为被动模式,准备接受连接 LISTEN
accept() 接受一个连接请求 LISTEN -> SYN-RECEIVED -> ESTABLISHED
connect() 客户端连接到远程服务器 SYN-SENT -> ESTABLISHED
send() / sendto() / sendmsg() 发送数据到已连接的套接字或指定的地址 ESTABLISHED
recv() / recvfrom() / recvmsg() 从已连接的套接字或指定的地址接收数据 ESTABLISHED
close() 关闭套接字 FIN-WAIT-1 -> FIN-WAIT-2 -> TIME-WAIT -> CLOSED (主动关闭)或 CLOSE-WAIT -> LAST-ACK -> CLOSED(被动关闭)
shutdown() 部分或全部关闭套接字的连接 根据方向影响状态:SHUT_WR -> FIN-WAIT-1 等

close()

  • 描述:关闭套接字。

  • TCP 状态:调用 close() 后,发送一个 FIN 报文:

    • 如果是主动关闭方,套接字进入 FIN-WAIT-1 状态。

    • 接收到对方的 ACK 后,进入 FIN-WAIT-2 状态。

    • 接收到对方的 FIN 后,发送 ACK 并进入 TIME-WAIT 状态,之后一段时间后进入 CLOSED 状态。

    • 被动关闭方接收到 FIN 报文后,进入 CLOSE-WAIT 状态,调用 close() 后,发送 ACK 并进入 LAST-ACK 状态,接收到对方的 ACK 后进入 CLOSED 状态。

shutdown()

  • 描述:部分或全部关闭套接字的连接。
  • TCP 状态:取决于关闭的方向:
    • 关闭发送方向(SHUT_WR):发送 FIN 并进入 FIN-WAIT-1 状态。
    • 关闭接收方向(SHUT_RD):不影响 TCP 状态。
    • 同时关闭(SHUT_RDWR):同 close() 操作。

概念补充:

  • 半连接队列:当服务器收到来自客户端的 SYN包时,服务器会把这个连接放入“半连接队列”(也被称为SYN队列)。这个时候,服务器已经收到客户端的连接请求(SYN包),并且服务器也回复了SYN-ACK包,但还没有收到客户端的ACK包,所以这个连接还不是一个完全建立的连接,被称为“半连接”。

  • 全连接队列:当服务器收到客户端的 ACK包时,这个连接就被转移到“全连接队列”(也被称为accept 队列)。这个时候,TCP三次握手已经完成,连接已经完全建立。

image-20240726170803863

因此这个问题可能有这样几种情况:

主要问题:CLOSE_WAIT只跟应用不调 close() 有关系。

  • 程序问题:如果代码层面忘记了 close 相应的 socket 连接,那么自然不会发出 FIN 包,从而导致 CLOSE_WAIT 累积;或者代码不严谨,出现死循环之类的问题,导致即便后面写了 close 也永远执行不到。【可能就是CPU/IO有问题,然后没及时调用这个close函数
  • 响应太慢或者超时设置过小:如果连接双方不和谐,一方不耐烦直接 timeout,另一方却还在忙于耗时逻辑,就会导致 close 被延后。响应太慢是首要问题,不过换个角度看,也可能是 timeout 设置过小。
  • BACKLOG 太大:此处的 backlog 不是 syn backlog,而是 accept 的 backlog,如果 backlog 太大的话,设想突然遭遇大访问量的话,即便响应速度不慢,也可能出现来不及消费的情况,导致多余的请求还在队列里就被对方关闭了。【这种情况就是服务器的文件描述符不够用的情况导致的!】

如何快速关闭 close_wait 状态的连接?

使用 gdb 关闭,首先使用 gdb -p 连接到对应进程中,如何使用 call close(27u),27u 就表示 close_wait 状态连接的文件描述符。

Time_wait状态太多怎么回事

出现的主要原因是短连接太多导致的,每次断开连接都需要等待 2 倍的 MSL,因此会存在大量的 Time_wait 状态。

服务器可以设置 SO_REUSEADDR 套接字选项来通知内核,如果端口被占用,但 TCP 连接位于 TIME_WAIT 状态时可以重用端口。如果你的服务器程序停止后想立即重启,而新的套接字依旧希望使用同一端口,此时 SO_REUSEADDR 选项就可以避免 TIME-WAIT 状态。

进一步深入epoll

从底层代码的角度来看,涉及到 epoll 和设备驱动程序之间的交互,尤其是 ep_poll_callbackrdlist 双向链表的实现,主要发生在 Linux 内核中。为了说明这个过程,可以参考 Linux 内核源代码中的相关部分。

1.创建 epoll 对象

当用户进程调用 epoll_create 时,内核会创建一个 eventpoll 结构体,这个结构体包含了一个红黑树(rbtree)用于存储事件,以及一个双向链表(rdlist)用于存储已触发的事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
cCopy codeint epoll_create(int size) {
int error;
struct eventpoll *ep;

// 分配并初始化 eventpoll 结构体
ep = kmalloc(sizeof(*ep), GFP_KERNEL);
if (!ep)
return -ENOMEM;

INIT_LIST_HEAD(&ep->rdlist); // 初始化 rdlist 双向链表
init_waitqueue_head(&ep->wq); // 初始化等待队列
// 其他初始化工作...

// 创建并返回 epoll 文件描述符
error = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep, O_RDWR | (flags & O_CLOEXEC));
if (error < 0)
kfree(ep);

return error;
}

2.添加事件到 epoll

当用户进程调用 epoll_ctl 添加一个新的事件时,内核会将该事件插入到 eventpoll 结构体的红黑树中,并与设备驱动程序建立回调关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) {
struct eventpoll *ep = file_inode(file)->i_private;
struct epitem *epi;

// 分配并初始化 epitem 结构体
epi = kmalloc(sizeof(*epi), GFP_KERNEL);
if (!epi)
return -ENOMEM;

epi->event = *event; // 复制事件信息
epi->fd = fd;

// 将事件添加到红黑树中
rb_link_node(&epi->rbn, parent, rb_link);
rb_insert_color(&epi->rbn, &ep->rbr);

// 注册设备驱动程序的回调函数
error = vfs_poll(fd, &epi->pt);
if (error)
kfree(epi);

return error;
}

3.设备驱动程序回调

当设备(例如网卡)上的事件发生时,设备驱动程序会调用 ep_poll_callback,将事件放入 eventpoll 结构体的 rdlist 双向链表中。

1
2
3
4
5
6
7
8
9
10
11
12
13
static void ep_poll_callback(struct file *file, wait_queue_head_t *whead, poll_table *pt) {
struct eventpoll *ep = pt->private;
struct epitem *epi = container_of(pt, struct epitem, pt);

spin_lock(&ep->lock);

// 将事件放入 rdlist 双向链表中
list_add_tail(&epi->rdllink, &ep->rdlist);

wake_up_interruptible(&ep->wq); // 唤醒等待队列上的进程

spin_unlock(&ep->lock);
}

4.处理就绪事件

当用户进程调用 epoll_wait 时,内核会检查 rdlist 双向链表,将已触发的事件从内核空间复制到用户空间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) {
struct eventpoll *ep = file_inode(file)->i_private;
struct epitem *epi;
int event_count = 0;

spin_lock(&ep->lock);

// 遍历 rdlist 双向链表,复制事件到用户空间
list_for_each_entry(epi, &ep->rdlist, rdllink) {
if (event_count >= maxevents)
break;
events[event_count++] = epi->event;
list_del(&epi->rdllink); // 从 rdlist 中移除已处理的事件
}

spin_unlock(&ep->lock);

return event_count;
}

总结

以上代码展示了 epoll 在 Linux 内核中的基本工作流程,包括创建 epoll 对象、添加事件、处理回调以及从内核空间复制事件到用户空间的过程。通过这些步骤,epoll 实现了高效的事件通知机制,适用于处理高并发 I/O 请求的场景。

水平触发(Level Triggered,LT)和边缘触发(Edge Triggered,ET)。让我们逐步解读代码,并用通俗的方式来说明这两种模式的区别及其实现。

代码角度看LT和ET模式

水平触发(LT)模式

1
2
3
4
5
else if (!(epi->event.events & EPOLLET)) {
// LT模式:如果不是ET模式,就会进入这个分支
list_add_tail(&epi->rdllink, &ep->rdllist); // 将事件重新插入ready-list
ep_pm_stay_awake(epi);
}

在 LT 模式下,如果有事件发生(revents),且该事件不是 ET 模式(即 LT 模式),代码会将这个事件重新插入 ready-list(准备好再次被处理的事件列表)。这意味着在下一次调用 epoll_wait 时,如果该事件仍然就绪(例如,仍然有数据可读),它会再次通知应用程序。

解释

  • LT 模式:只要有事件(例如,有数据可读),epoll_wait 每次都会通知应用程序。即使应用程序没有处理完该事件(例如,数据还没有完全读取完),在下一次调用 epoll_wait 时,它仍然会被通知。
  • 例子:想象有一个信箱(事件),每次信箱里有信(数据),邮递员(epoll_wait)就会通知你(应用程序)。如果你没有处理完所有的信,下一次邮递员来时还会再次通知你。

边缘触发(ET)模式

1
2
3
else if (epi->event.events & EPOLLET) {
// ET模式:如果是ET模式,什么都不做(不会将事件重新插入ready-list)
}

在 ET 模式下,如果有事件发生(revents),且该事件是 ET 模式,代码不会将事件重新插入 ready-list。这意味着在下一次调用 epoll_wait 时,如果该事件仍然就绪,它不会再次通知应用程序。只有在事件状态发生变化时(例如,从无数据到有数据),才会通知一次。

水平触发(Level Triggered,LT)和边缘触发(Edge Triggered,ET)。让我们逐步解读代码,并用通俗的方式来说明这两种模式的区别及其实现。

水平触发(LT)模式

1
2
3
4
5
else if (!(epi->event.events & EPOLLET)) {
// LT模式:如果不是ET模式,就会进入这个分支
list_add_tail(&epi->rdllink, &ep->rdllist); // 将事件重新插入ready-list
ep_pm_stay_awake(epi);
}

在 LT 模式下,如果有事件发生(revents),且该事件不是 ET 模式(即 LT 模式),代码会将这个事件重新插入 ready-list(准备好再次被处理的事件列表)。这意味着在下一次调用 epoll_wait 时,如果该事件仍然就绪(例如,仍然有数据可读),它会再次通知应用程序。

解释

  • LT 模式:只要有事件(例如,有数据可读),epoll_wait 每次都会通知应用程序。即使应用程序没有处理完该事件(例如,数据还没有完全读取完),在下一次调用 epoll_wait 时,它仍然会被通知。
  • 例子:想象有一个信箱(事件),每次信箱里有信(数据),邮递员(epoll_wait)就会通知你(应用程序)。如果你没有处理完所有的信,下一次邮递员来时还会再次通知你。

边缘触发(ET)模式

解析

1
2
3
else if (epi->event.events & EPOLLET) {
// ET模式:如果是ET模式,什么都不做(不会将事件重新插入ready-list)
}

在 ET 模式下,如果有事件发生(revents),且该事件是 ET 模式,代码不会将事件重新插入 ready-list。这意味着在下一次调用 epoll_wait 时,如果该事件仍然就绪,它不会再次通知应用程序。只有在事件状态发生变化时(例如,从无数据到有数据),才会通知一次。

解释

  • ET 模式:当有事件发生时,epoll_wait 只通知应用程序一次。应用程序必须在这次通知时处理完所有事件,否则后续的相同事件不会再次通知。
  • 例子:想象有一个信箱,每次信箱里有信,邮递员只会通知你一次。如果你没处理完所有的信,下一次邮递员来时不会再通知你,除非信箱里有新的信件(事件状态发生变化)。

代码总结

ep_send_events_proc 函数中,revents 表示有事件发生。对于 LT 模式,如果有事件发生,会将事件重新插入 ready-list,这样在下一次调用 epoll_wait 时,这个事件还会被通知。而对于 ET 模式,有事件发生时只通知一次,不会重新插入 ready-list

再次总结

  • LT 模式:事件未处理完会不断通知你(反复提醒)。
  • ET 模式:事件只通知一次,处理不完不会再通知你(提醒一次就不管了,除非有新情况)。

这种设计确保了 LT 模式下,应用程序总是可以知道有未处理完的事件,而 ET 模式下,应用程序需要一次性处理完所有事件以避免遗漏通知。

  • ET 模式:当有事件发生时,epoll_wait 只通知应用程序一次。应用程序必须在这次通知时处理完所有事件,否则后续的相同事件不会再次通知。
  • 例子:想象有一个信箱,每次信箱里有信,邮递员只会通知你一次。如果你没处理完所有的信,下一次邮递员来时不会再通知你,除非信箱里有新的信件(事件状态发生变化)。

ep_send_events_proc 函数中,revents 表示有事件发生。对于 LT 模式,如果有事件发生,会将事件重新插入 ready-list,这样在下一次调用 epoll_wait 时,这个事件还会被通知。而对于 ET 模式,有事件发生时只通知一次,不会重新插入 ready-list

边缘触发模式高并发情况下编写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>

#define MAX_EVENTS 10
#define PORT 8080

int main() {
int listen_fd, conn_fd, epoll_fd, nfds, n;
struct epoll_event ev, events[MAX_EVENTS];
struct sockaddr_in server_addr;

listen_fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (listen_fd == -1) {
perror("socket");
exit(EXIT_FAILURE);
}

memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(PORT);

if (bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
perror("bind");
close(listen_fd);
exit(EXIT_FAILURE);
}

if (listen(listen_fd, SOMAXCONN) == -1) {
perror("listen");
close(listen_fd);
exit(EXIT_FAILURE);
}

epoll_fd = epoll_create1(0);
if (epoll_fd == -1) {
perror("epoll_create1");
close(listen_fd);
exit(EXIT_FAILURE);
}

ev.events = EPOLLIN | EPOLLET;
ev.data.fd = listen_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev) == -1) {
perror("epoll_ctl: listen_fd");
close(listen_fd);
close(epoll_fd);
exit(EXIT_FAILURE);
}

while (1) {
nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
if (nfds == -1) {
perror("epoll_wait");
break;
}

for (n = 0; n < nfds; ++n) {
if (events[n].data.fd == listen_fd) {
while (1) {
conn_fd = accept(listen_fd, NULL, NULL);
if (conn_fd == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break;
}
perror("accept");
break;
}

ev.events = EPOLLIN | EPOLLET;
ev.data.fd = conn_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, conn_fd, &ev) == -1) {
perror("epoll_ctl: conn_fd");
close(conn_fd);
}
}
} else {
// Handle client data
}
}
}

close(listen_fd);
close(epoll_fd);
return 0;
}

惊群效应剖析

学习自:

目前有两种常见的基于epoll处理listen_fd的方式:

  1. 多进程共用一个epfd来监听同一listen_fd。
    • LT模式下:可能会出现伪惊群。
      • 原因:epoll_wait刚刚取到事件的时候的时候,不可能马上就调用accept去处理,事实上,逻辑在epoll_wait函数调用的ep_poll中还没返回的,这个时候,显然符合仍然有未处理的事件这个条件。这时候就会去继续唤醒其他的进程。【epoll_wait和accept之间有空挡】
    • ET模式下:一个“就绪队列”上的epi上的事件被上报了,它就会被删除出“就绪队列”。

本质就是源码这边加了个特判。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
ep_scan_ready_list()
{
// 遍历“就绪链表”,处理每个已准备好的文件描述符
ready_list_for_each() {
// 从“就绪链表”中删除当前的 epi 项,以便后续可以决定是否重新加入
list_del_init(&epi->rdllink);
// 检查该文件描述符上是否有事件需要处理,并获取事件类型
revents = ep_item_poll(epi, &pt);

// 如果有事件需要处理
if (revents) {
// 将事件类型写入用户空间,通知应用程序有事件发生
__put_user(revents, &uevent->events);

// 如果当前是 LT 模式(即没有设置 EPOLLET 标志)
if (!(epi->event.events & EPOLLET)) {
// 将 epi 重新加入“就绪链表”,确保未处理的事件可以再次被通知
list_add_tail(&epi->rdllink, &ep->rdllist);
}
}
}

// 如果“就绪链表”不为空(仍有未处理的事件)
if (!list_empty(&ep->rdllist)) {
// 如果有进程在 epoll 句柄的睡眠队列上阻塞,则唤醒这些进程
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
}
}
  1. 多进程拥有各自的epfd来监听listen_fd。

UDP没有粘包半包的风险

img

网络编程那些事儿

参考文章