前言

本博客主要介绍什么是惊群,惊群在线程和进程中的具体表现,惊群的系统消耗和惊群的处理方法。

介绍

惊群效应也有人叫做雷鸣群体效应,不过叫什么,简言之,惊群现象就是多进程(多线程)在同时阻塞等待同一个事件的时候(休眠状态),如果等待的这个事件发生,那么他就会唤醒等待的所有进程(或者线程),但是最终却只可能有一个进程(线程)获得这个时间的“控制权”,对该事件进行处理,而其他进程(线程)获取“控制权”失败,只能重新进入休眠状态,这种现象和性能浪费就叫做惊群。

这里打个有趣的比方,就像在学校的湖里面用面包🍞喂天鹅,当你往一群天鹅中间扔一撮面包🍞,所有的天鹅各自都被惊动前来抢夺这可口的食物,但是最终注定只有一个天鹅抢到食物,没有抢到的天鹅只好回去继续等待。

惊群效应存在的问题

(1)系统对用户进程/线程频繁地做无效的调度,上下文切换系统性能大打折扣。

(2)为了确保只有一个线程得到资源,用户必须对资源操作进行加锁保护,进一步加大了系统开销。

惊群效应示例

accept

场景:主进程创建了socket、bind、listen之后,fork()出来多个进程,每个子进程都开始循环处理(accept)这个listen_fd。每个进程都阻塞在accept上,当一个新的连接到来时候,所有的进程都会被唤醒,但是其中只有一个进程会接受成功,其余皆失败,重新休眠。

main.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <string.h>
#include <netinet/in.h>
#include <unistd.h>
#include <cerrno>
#define PROCESS_NUM 10
int main()
{
int fd = socket(PF_INET, SOCK_STREAM, 0);
int connfd;
int pid;

char sendbuff[1024];
struct sockaddr_in serveraddr;
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
serveraddr.sin_port = htons(1234);
bind(fd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
listen(fd, 1024);
int i;
for (i = 0; i < PROCESS_NUM; ++i)
{
pid = fork();
if (pid == 0)
{
while (1)
{
connfd = accept(fd, (struct sockaddr *)NULL, NULL);
if (connfd == 0)
{

snprintf(sendbuff, sizeof(sendbuff), "接收到accept事件的进程PID = %d\n", getpid());

send(connfd, sendbuff, strlen(sendbuff) + 1, 0);
printf("process %d accept success\n", getpid());
close(connfd);
}
else
{
printf("process %d accept a connection failed: %s\n", getpid(), strerror(errno));
close(connfd);
}
}
}
}
// int status;
wait(0);
return 0;
}
1
2
penge@penge-virtual-machine  ~/Desktop/MordenCpp/test  g++ main.cpp -o main -pthread
penge@penge-virtual-machine  ~/Desktop/MordenCpp/test  strace -f ./main
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
[pid 171338] set_robust_list(0x7fc56358fa20, 24 <unfinished ...>
[pid 171337] clone(child_stack=NULL, flags=CLONE_CHILD_CLEARTID|CLONE_CHILD_SETTID|SIGCHLD <unfinished ...>
[pid 171338] <... set_robust_list resumed>) = 0
[pid 171338] accept(3, NULL, NULLstrace: Process 171339 attached
<unfinished ...>
[pid 171337] <... clone resumed>, child_tidptr=0x7fc56358fa10) = 171339
[pid 171339] set_robust_list(0x7fc56358fa20, 24 <unfinished ...>
[pid 171337] clone(child_stack=NULL, flags=CLONE_CHILD_CLEARTID|CLONE_CHILD_SETTID|SIGCHLD <unfinished ...>
[pid 171339] <... set_robust_list resumed>) = 0
[pid 171339] accept(3, NULL, NULLstrace: Process 171340 attached
<unfinished ...>
[pid 171337] <... clone resumed>, child_tidptr=0x7fc56358fa10) = 171340
[pid 171340] set_robust_list(0x7fc56358fa20, 24 <unfinished ...>
[pid 171337] clone(child_stack=NULL, flags=CLONE_CHILD_CLEARTID|CLONE_CHILD_SETTID|SIGCHLD <unfinished ...>
[pid 171340] <... set_robust_list resumed>) = 0
[pid 171340] accept(3, NULL, NULLstrace: Process 171341 attached
<unfinished ...>
[pid 171337] <... clone resumed>, child_tidptr=0x7fc56358fa10) = 171341
[pid 171337] clone(child_stack=NULL, flags=CLONE_CHILD_CLEARTID|CLONE_CHILD_SETTID|SIGCHLD <unfinished ...>
[pid 171341] set_robust_list(0x7fc56358fa20, 24) = 0
strace: Process 171342 attached
[pid 171337] <... clone resumed>, child_tidptr=0x7fc56358fa10) = 171342
[pid 171341] accept(3, NULL, NULL <unfinished ...>
[pid 171342] set_robust_list(0x7fc56358fa20, 24 <unfinished ...>
[pid 171337] clone(child_stack=NULL, flags=CLONE_CHILD_CLEARTID|CLONE_CHILD_SETTID|SIGCHLD <unfinished ...>
[pid 171342] <... set_robust_list resumed>) = 0
[pid 171342] accept(3, NULL, NULLstrace: Process 171343 attached
<unfinished ...>
[pid 171337] <... clone resumed>, child_tidptr=0x7fc56358fa10) = 171343
[pid 171337] clone(child_stack=NULL, flags=CLONE_CHILD_CLEARTID|CLONE_CHILD_SETTID|SIGCHLD <unfinished ...>
[pid 171343] set_robust_list(0x7fc56358fa20, 24) = 0
[pid 171343] accept(3, NULL, NULLstrace: Process 171344 attached
<unfinished ...>
[pid 171337] <... clone resumed>, child_tidptr=0x7fc56358fa10) = 171344
[pid 171337] clone(child_stack=NULL, flags=CLONE_CHILD_CLEARTID|CLONE_CHILD_SETTID|SIGCHLD <unfinished ...>
[pid 171344] set_robust_list(0x7fc56358fa20, 24) = 0
[pid 171344] accept(3, NULL, NULLstrace: Process 171345 attached
<unfinished ...>
[pid 171337] <... clone resumed>, child_tidptr=0x7fc56358fa10) = 171345
[pid 171337] clone(child_stack=NULL, flags=CLONE_CHILD_CLEARTID|CLONE_CHILD_SETTID|SIGCHLD <unfinished ...>
[pid 171345] set_robust_list(0x7fc56358fa20, 24) = 0
strace: Process 171346 attached
[pid 171337] <... clone resumed>, child_tidptr=0x7fc56358fa10) = 171346
[pid 171345] accept(3, NULL, NULL <unfinished ...>
[pid 171337] clone(child_stack=NULL, flags=CLONE_CHILD_CLEARTID|CLONE_CHILD_SETTID|SIGCHLD <unfinished ...>
[pid 171346] set_robust_list(0x7fc56358fa20, 24) = 0
[pid 171346] accept(3, NULL, NULLstrace: Process 171347 attached
<unfinished ...>
[pid 171337] <... clone resumed>, child_tidptr=0x7fc56358fa10) = 171347
[pid 171347] set_robust_list(0x7fc56358fa20, 24 <unfinished ...>
[pid 171337] wait4(-1, <unfinished ...>
[pid 171347] <... set_robust_list resumed>) = 0
[pid 171347] accept(3, NULL, NULL <unfinished ...>

之后,在启用一个终端

1
telnet 127.0.0.1 1234

看看发生了什么变化

1
2
3
4
5
6
7
8
[pid 172089] fstat(1, {st_mode=S_IFCHR|0620, st_rdev=makedev(0x88, 0x9), ...}) = 0
[pid 172089] brk(NULL) = 0x55f621f04000
[pid 172089] brk(0x55f621f25000) = 0x55f621f25000
[pid 172089] write(1, "process 172089 accept a connecti"..., 51process 172089 accept a connection failed: Success
) = 51
[pid 172089] close(4) = 0
[pid 172089] accept(3, NULL, NULL^C <unfinished ...>
[pid 172098] <... accept resumed>) = ? ERESTARTSYS (To be restarted if SA_RESTART is set)

很明显当telnet连接的时候只有一个进程accept成功,也就说明了这里并没有发生惊群。

其实在linux2.6版本以后,linux内核已经解决了accept()函数的“惊群”现象,大概的处理方式就是,当内核接收到一个客户连接后,只会唤醒等待队列上的第一个进程(线程),所以如果服务器采用accept阻塞调用方式,在最新的linux系统中已经没有“惊群效应”了

epoll

场景:如果多个进程/线程阻塞在监听同一个监听socket fd的epoll_wait上,当有一个新的连接到来时,所有的进程都会被唤醒。

主进程创建socket,bind,listen后,将该socket加入到epoll中,然后fork出多个子进程,每个进程都阻塞在epoll_wait上,如果有事件到来,则判断该事件是否是该socket上的事件如果是,说明有新的连接到来了,则进行接受操作。为了简化处理,忽略后续的读写以及对接受返回的新的套接字的处理,直接断开连接。
main.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <netdb.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <errno.h>
#define PROCESS_NUM 10
#define MAXEVENTS 64
// socket创建和绑定
int sock_creat_bind(char *port)
{
int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in serveraddr;
serveraddr.sin_family = AF_INET;
serveraddr.sin_port = htons(atoi(port));
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);

bind(sock_fd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
return sock_fd;
}
// 利用fcntl设置文件或者函数调用的状态标志
int make_nonblocking(int fd)
{
int val = fcntl(fd, F_GETFL);
val |= O_NONBLOCK;
if (fcntl(fd, F_SETFL, val) < 0)
{
perror("fcntl set");
return -1;
}
return 0;
}

int main(int argc, char *argv[])
{
int sock_fd, epoll_fd;
struct epoll_event event;
struct epoll_event *events;

if (argc < 2)
{
printf("usage: [port] %s", argv[1]);
exit(1);
}
if ((sock_fd = sock_creat_bind(argv[1])) < 0)
{
perror("socket and bind");
exit(1);
}
if (make_nonblocking(sock_fd) < 0)
{
perror("make non blocking");
exit(1);
}
if (listen(sock_fd, SOMAXCONN) < 0)
{
perror("listen");
exit(1);
}
if ((epoll_fd = epoll_create(MAXEVENTS)) < 0)
{
perror("epoll_create");
exit(1);
}
event.data.fd = sock_fd;
event.events = EPOLLIN;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock_fd, &event) < 0)
{
perror("epoll_ctl");
exit(1);
}
/*buffer where events are returned*/
events = static_cast<epoll_event *>(calloc(MAXEVENTS, sizeof(event)));
int i;
for (i = 0; i < PROCESS_NUM; ++i)
{
int pid = fork();
if (pid == 0)
{
while (1)
{
int num, j;
num = epoll_wait(epoll_fd, events, MAXEVENTS, -1);
printf("process %d returnt from epoll_wait\n", getpid());
sleep(2);
for (i = 0; i < num; ++i)
{
if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events & EPOLLIN)))
{
fprintf(stderr, "epoll error\n");
close(events[i].data.fd);
continue;
}
else if (sock_fd == events[i].data.fd)
{
// 收到关于监听套接字的通知,意味着一盒或者多个传入连接
struct sockaddr in_addr;
socklen_t in_len = sizeof(in_addr);
if (accept(sock_fd, &in_addr, &in_len) < 0)
{
printf("process %d accept failed!\n", getpid());
}
else
{
printf("process %d accept successful!\n", getpid());
}
}
}
}
}
}
wait(0);
free(events);
close(sock_fd);
return 0;
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
 ✘ penge@penge-virtual-machine  ~/Desktop/MordenCpp/test  ./main 1234  
process 173413 returnt from epoll_wait
process 173412 returnt from epoll_wait
process 173411 returnt from epoll_wait
process 173410 returnt from epoll_wait
process 173409 returnt from epoll_wait
process 173408 returnt from epoll_wait
process 173407 returnt from epoll_wait
process 173406 returnt from epoll_wait
process 173405 returnt from epoll_wait
process 173404 returnt from epoll_wait
process 173407 accept successful!
process 173409 accept failed!
process 173410 accept failed!
process 173411 accept failed!
process 173412 accept failed!
process 173406 accept failed!
process 173404 accept failed!
process 173413 accept failed!
process 173405 accept failed!
process 173408 accept failed!

显然,发生了惊群效应。

怎么判断发生了惊群呢?

我们根据strace的返回信息可以确定:

(1) 系统只会让一个进程真正的接受这个连接,而剩余的进程会获得一个EAGAIN信号。

(2)通过返回结果和进程执行的系统调用判断。

为什么内核处理了accept的惊群,却不处理epoll_wait的惊群呢?

accept确实应该只能被一个进程调用成功,内核很清楚这一点。但epoll不一样,他监听的文件描述符,除了可能后续被accept调用外,还有可能是其他网络IO事件的,而其他IO事件是否只能由一个进程处理,是不一定的,内核不能保证这一点,这是一个由用户决定的事情,例如可能一个文件会由多个进程来读写。所以,对epoll的惊群,内核则不予处理。

解决方法

解决方式一共有三种

  1. accept_mutex(应用层的解决方案)
  2. EPOLLEXCLUSIVE(内核层的解决方案)
  3. SO_REUSEPORT(内核层的解决方案)

accept_mutex

看到 mutex 可能你就知道了,锁嘛!这也是对于高并发处理的 ”基操“ 遇事不决加锁,没错,加锁肯定能解决问题。

感兴趣的可以看看这部分的代码实现

EPOLLEXCLUSIVE

EPOLLEXCLUSIVE 是 2016 年 4.5+ 内核新添加的一个 epoll 的标识。它降低了多个进程/线程通过 epoll_ctl 添加共享 fd 引发的惊群概率,使得一个事件发生时,只唤醒一个正在 epoll_wait 阻塞等待唤醒的进程(而不是全部唤醒)。

关键是:每次内核只唤醒一个睡眠的进程处理资源。

SO_REUSEPORT

Linux内核的3.9版本带来了SO_REUSEPORT特性,该特性支持多个进程或者线程绑定到同一端口,提高服务器程序的性能,允许多个套接字bind()以及listen()同一个TCP或UDP端口,并且在内核层面实现负载均衡。

  • 在未开启SO_REUSEPORT的时候,由一个监听socket将新接收的连接请求交给各个工作者处理,看图示:
    • image-20240430110045200
  • 在使用SO_REUSEPORT后,多个进程可以同时监听同一个IP:端口,然后由内核决定将新链接发送给哪个进程,显然会降低每个工人接收新链接时锁竞争。
    • image-20240430110103466

SO_REUSEPORT解决了什么问题

(1)允许多个套接字bind()/listen()同一个tcp/udp端口。每一个线程拥有自己的服务器套接字,在服务器套接字上没有锁的竞争。

(2)内核层面实现负载均衡

(3)安全层面,监听同一个端口的套接字只能位于同一个用户下面。

(4)处理新建连接时,查找listener的时候,能够支持在监听相同IP和端口的多个sock之间均衡选择。

当一个连接到来的时候,系统到底是怎么决定那个套接字来处理它?

对于不同内核,存在两种模式,这两种模式并不共存,一种叫做热备份模式,另一种叫做负载均衡模式,3.9内核以后,全部改为负载均衡模式。

  • 热备份模式:一般而言,会将所有的reuseport同一个IP地址/端口的套接字挂在一个链表上,取第一个即可,工作的只有一个,其他的作为备份存在,如果该套接字挂了,它会被从链表删除,然后第二个便会成为第一个。
  • 负载均衡模式:和热备份模式一样,所有reuseport同一个IP地址/端口的套接字会挂在一个链表上,你也可以认为是一个数组,这样会更加方便,当有连接到来时,用数据包的源IP/源端口作为一个HASH函数的输入,将结果对reuseport套接字数量取模,得到一个索引,该索引指示的数组位置对应的套接字便是工作套接字。这样就可以达到负载均衡的目的,从而降低某个服务的压力。

参考资料