为了解决客户端(这里是浏览器)与服务器端建立连接后,长时间不交换数据,一直占用服务器端的文件描述符,导致连接资源的浪费。因此定期剔除不活跃事件。

SIGALRM

最为朴素的方式实现~

整体思路:

利用alarm函数周期性地触发SIGALRM信号,信号处理函数利用管道通知主循环,主循环接收到该信号后对升序链表上所有定时器进行处理,若该段时间内没有交换数据,则将该连接关闭,释放所占用的资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
#include <iostream>
#include <unistd.h>
#include <signal.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <sys/timerfd.h>
#include <time.h>
#include <vector>
#include <algorithm>
#include <string>
#include <cassert>
#include <cstring>
#include <cerrno>

// 定义定时器类
class util_timer
{
public:
util_timer(int fd, time_t expire) : sockfd(fd), expire_time(expire), prev(nullptr), next(nullptr) {}

int sockfd; // 连接的文件描述符
time_t expire_time; // 过期时间
util_timer *prev; // 前一个定时器
util_timer *next; // 后一个定时器
};

// 定义定时器链表类
class sort_timer_lst
{
public:
sort_timer_lst() : head(nullptr), tail(nullptr) {}
~sort_timer_lst()
{
util_timer *tmp = head;
while (tmp)
{
head = tmp->next;
delete tmp;
tmp = head;
}
}

void add_timer(util_timer *timer)
{
if (!timer)
return;
if (!head)
{
head = tail = timer;
return;
}
if (timer->expire_time < head->expire_time)
{
timer->next = head;
head->prev = timer;
head = timer;
return;
}
add_timer(timer, head);
}

void adjust_timer(util_timer *timer)
{
if (!timer)
return;
util_timer *tmp = timer->next;
if (!tmp || (timer->expire_time < tmp->expire_time))
return;
if (timer == head)
{
head = head->next;
head->prev = nullptr;
timer->next = nullptr;
add_timer(timer, head);
}
else
{
timer->prev->next = timer->next;
timer->next->prev = timer->prev;
add_timer(timer, timer->next);
}
}

void del_timer(util_timer *timer)
{
if (!timer)
return;
if (timer == head && timer == tail)
{
delete timer;
head = tail = nullptr;
return;
}
if (timer == head)
{
head = head->next;
head->prev = nullptr;
delete timer;
return;
}
if (timer == tail)
{
tail = tail->prev;
tail->next = nullptr;
delete timer;
return;
}
timer->prev->next = timer->next;
timer->next->prev = timer->prev;
delete timer;
}

void tick()
{
if (!head)
return;
time_t cur = time(nullptr);
util_timer *tmp = head;
while (tmp)
{
if (cur < tmp->expire_time)
break;
close(tmp->sockfd);
util_timer *next = tmp->next;
del_timer(tmp);
tmp = next;
}
}

private:
void add_timer(util_timer *timer, util_timer *lst_head)
{
util_timer *prev = lst_head;
util_timer *tmp = prev->next;
while (tmp)
{
if (timer->expire_time < tmp->expire_time)
{
prev->next = timer;
timer->next = tmp;
tmp->prev = timer;
timer->prev = prev;
break;
}
prev = tmp;
tmp = tmp->next;
}
if (!tmp)
{
prev->next = timer;
timer->prev = prev;
timer->next = nullptr;
tail = timer;
}
}

util_timer *head;
util_timer *tail;
};

// 全局变量
int pipefd[2];
sort_timer_lst timer_lst;

// 信号处理函数
void sig_handler(int sig)
{
int save_errno = errno;
int msg = sig;
send(pipefd[1], (char *)&msg, 1, 0);
errno = save_errno;
}

// 设置信号处理函数
void addsig(int sig, void(handler)(int), bool restart = true)
{
struct sigaction sa;
memset(&sa, '\0', sizeof(sa));
sa.sa_handler = handler;
if (restart)
sa.sa_flags |= SA_RESTART;
sigfillset(&sa.sa_mask);
assert(sigaction(sig, &sa, nullptr) != -1);
}

// 主循环
void main_loop()
{
int epollfd = epoll_create(5);
assert(epollfd != -1);

addfd(epollfd, pipefd[0], false, 0);

epoll_event events[1024];
while (true)
{
int number = epoll_wait(epollfd, events, 1024, -1);
if (number < 0 && errno != EINTR)
{
std::cerr << "epoll failure" << std::endl;
break;
}

for (int i = 0; i < number; i++)
{
int sockfd = events[i].data.fd;
if (sockfd == pipefd[0] && (events[i].events & EPOLLIN))
{
int sig;
int ret = recv(pipefd[0], &sig, 1, 0);
if (ret <= 0)
continue;
if (sig == SIGALRM)
{
timer_lst.tick();
alarm(5);
}
}
}
}

close(epollfd);
}

int main()
{
// 创建管道
int ret = socketpair(PF_UNIX, SOCK_STREAM, 0, pipefd);
assert(ret != -1);

// 设置管道读端为非阻塞
int flags = fcntl(pipefd[0], F_GETFL);
fcntl(pipefd[0], F_SETFL, flags | O_NONBLOCK);

// 设置信号处理函数
addsig(SIGALRM, sig_handler);


// 启动定时器
alarm(5);

// 启动主循环
main_loop();

return 0;
}

定时器超时时间 = 浏览器和服务器连接时刻 + 固定时间(TIMESLOT)

也可以用优先队列维护!

timefd

timerfd 是 Linux 内核提供的一种文件描述符,它允许程序员创建一个定时器,并通过文件描述符的方式来管理和监视定时器的状态变化。它主要用于两个目的:

  • 时间跟踪和管理:timerfd 允许程序员创建单次或周期性的定时器,并在特定时间点或一段时间之后触发事件。
  • I/O 多路复用:将 timerfd 与 select、poll、epoll 等 I/O 多路复用机制结合使用,以便在定时器到期时进行通知和处理。

使用 timerfd 的基本步骤

  • 创建 timerfd:使用 timerfd_create 系统调用创建一个新的 timerfd 文件描述符。
  • 配置定时器参数:使用 timerfd_settime 来设置定时器的初始值和到期时间。
  • 处理定时器事件:通过 read 等文件 I/O 操作来读取 timerfd,从而获知定时器是否到期。
  • 可选的定时器操作:使用 timerfd_gettime 来获取当前定时器的状态,或使用 timerfd_settime 更新定时器的到期时间或参数。

使用场景

  • 网络编程:在服务器端,timerfd 可以用于处理超时事件,比如实现超时断开空闲连接。

  • 性能监控:在性能监控工具中,timerfd 可以用来采集系统各个操作的执行时间。

  • 任务调度:在多线程或多进程环境下,timerfd 可以帮助调度任务的执行顺序或进行周期性任务的触发。

  • 实时系统:在实时系统中,timerfd 可以用于实现精确的时间控制和任务调度。

优势

  • 精度更高timerfd 基于内核的定时器机制,能够提供纳秒级的定时精度。而信号机制的精度通常受限于系统的时间片(通常为几毫秒到几十毫秒)。
  • 统一事件处理timerfd 生成的文件描述符可以与其他文件描述符(如套接字、管道等)一起被 epoll 监听,统一处理 IO 和定时事件。这种事件驱动模型更易于管理和扩展。
  • 信号干扰系统调用:信号可能会导致系统调用中断(如 read, write, accept 等),需要使用 SA_RESTART 标志或手动处理 EINTR 错误。timerfd 不会中断系统调用,提供了更稳定的行为。
  • 线程安全性:在多线程环境下,信号的处理往往比较复杂,因为信号是进程范围内的,而不是线程范围内的。timerfd 是一个文件描述符,可以在特定线程中处理,避免了信号跨线程传递带来的麻烦。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
#include <iostream>
#include <unistd.h>
#include <signal.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <sys/timerfd.h>
#include <time.h>
#include <vector>
#include <algorithm>
#include <string>
#include <cassert>
#include <cstring>
#include <cerrno>

// 定义定时器类
class util_timer
{
public:
util_timer(int fd, time_t expire) : sockfd(fd), expire_time(expire), prev(nullptr), next(nullptr) {}

int sockfd; // 连接的文件描述符
time_t expire_time; // 过期时间
util_timer *prev; // 前一个定时器
util_timer *next; // 后一个定时器
};

// 定义定时器链表类
class sort_timer_lst
{
public:
sort_timer_lst() : head(nullptr), tail(nullptr) {}
~sort_timer_lst()
{
util_timer *tmp = head;
while (tmp)
{
head = tmp->next;
delete tmp;
tmp = head;
}
}

void add_timer(util_timer *timer)
{
if (!timer)
return;
if (!head)
{
head = tail = timer;
return;
}
if (timer->expire_time < head->expire_time)
{
timer->next = head;
head->prev = timer;
head = timer;
return;
}
add_timer(timer, head);
}

void adjust_timer(util_timer *timer)
{
if (!timer)
return;
util_timer *tmp = timer->next;
if (!tmp || (timer->expire_time < tmp->expire_time))
return;
if (timer == head)
{
head = head->next;
head->prev = nullptr;
timer->next = nullptr;
add_timer(timer, head);
}
else
{
timer->prev->next = timer->next;
timer->next->prev = timer->prev;
add_timer(timer, timer->next);
}
}

void del_timer(util_timer *timer)
{
if (!timer)
return;
if (timer == head && timer == tail)
{
delete timer;
head = tail = nullptr;
return;
}
if (timer == head)
{
head = head->next;
head->prev = nullptr;
delete timer;
return;
}
if (timer == tail)
{
tail = tail->prev;
tail->next = nullptr;
delete timer;
return;
}
timer->prev->next = timer->next;
timer->next->prev = timer->prev;
delete timer;
}

void tick()
{
if (!head)
return;
time_t cur = time(nullptr);
util_timer *tmp = head;
while (tmp)
{
if (cur < tmp->expire_time)
break;
close(tmp->sockfd);
util_timer *next = tmp->next;
del_timer(tmp);
tmp = next;
}
}

private:
void add_timer(util_timer *timer, util_timer *lst_head)
{
util_timer *prev = lst_head;
util_timer *tmp = prev->next;
while (tmp)
{
if (timer->expire_time < tmp->expire_time)
{
prev->next = timer;
timer->next = tmp;
tmp->prev = timer;
timer->prev = prev;
break;
}
prev = tmp;
tmp = tmp->next;
}
if (!tmp)
{
prev->next = timer;
timer->prev = prev;
timer->next = nullptr;
tail = timer;
}
}

util_timer *head;
util_timer *tail;
};

// 全局变量
int pipefd[2];
sort_timer_lst timer_lst;

// 设置信号处理函数
void addsig(int sig, void(handler)(int), bool restart = true)
{
struct sigaction sa;
memset(&sa, '\0', sizeof(sa));
sa.sa_handler = handler;
if (restart)
sa.sa_flags |= SA_RESTART;
sigfillset(&sa.sa_mask);
assert(sigaction(sig, &sa, nullptr) != -1);
}

int setnonblocking(int fd)
{
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}

void addfd(int epollfd, int fd, bool one_shot, int TRIGMode)
{
epoll_event event;
event.data.fd = fd;

event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;

if (one_shot)
event.events |= EPOLLONESHOT;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}

// 主循环
void main_loop(int timerfd)
{
int epollfd = epoll_create(5);
assert(epollfd != -1);

addfd(epollfd, timerfd, false, 0);

epoll_event events[1024];
while (true)
{
int number = epoll_wait(epollfd, events, 1024, -1);
if (number < 0 && errno != EINTR)
{
std::cerr << "epoll failure" << std::endl;
break;
}

for (int i = 0; i < number; i++)
{
int sockfd = events[i].data.fd;
if (sockfd == timerfd && (events[i].events & EPOLLIN))
{
uint64_t exp;
ssize_t s = read(timerfd, &exp, sizeof(uint64_t));
if (s != sizeof(uint64_t))
continue;

timer_lst.tick();
}
}
}

close(epollfd);
}

int main()
{
// 创建 timerfd
int timerfd = timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK | TFD_CLOEXEC);
assert(timerfd != -1);

// 设置定时器时间
struct itimerspec new_value;
memset(&new_value, 0, sizeof(new_value));
new_value.it_value.tv_sec = 5; // 第一次超时时间
new_value.it_interval.tv_sec = 5; // 之后每次的超时时间
timerfd_settime(timerfd, 0, &new_value, nullptr);

// 启动主循环
main_loop(timerfd);

close(timerfd);
return 0;
}

Timer wheel

image-20240906230639097

在时间轮中:

  • 实线指针指向轮子上的一个槽(slot)。它以恒定的速度顺时针转动,每转动一步就指向下一个槽(slot)。每次转动称为一个滴答(tick),一个tick时间间隔为时间轮的si(slot interval)。该时间轮共有N个槽,因此它转动一周的时间是N*si
  • 每个槽指向一条定时器链表,每条链表上的定时器具有相同的特征:它们的定时时间相差N*si的整数倍。时间轮正是利用这个关系将定时器散列到不同的链表中
  • 假如现在指针指向槽cs,我们要添加一个定时时间为ti的定时器,则该定时器将被插入槽ts(timer slot)对应的链表中:ts=(cs + (ti / si)) % N

时间轮使用了哈希表处理冲突的思想,将定时器散列到不同的链表上。这样每条链表上的定时器数目都将明显少于原来的排序链表上的定时器数目,插入操作的效率基本不受定时器数目的影响。对于时间轮而言,要提高精度,就要使si的值足够小; 要提高执行效率,则要求N值足够大,使定时器尽可能的分布在不同的槽

对于时间轮,添加、删除和执行一个定时器的时间复杂度都是O(1)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
#ifndef TIME_WHEEL_TIMER
#define TIME_WHEEL_TIMER

#include <time.h>
#include <netinet/in.h>
#include <stdio.h>
#include <sys/timerfd.h>
#include <unistd.h>
#include <fcntl.h>
#include <cstring>
#include <cassert>
#include <sys/epoll.h>

// 定义缓冲区大小
#define BUFFER_SIZE 64

class tw_timer;

// 绑定 socket 和定时器的数据结构
struct client_data
{
sockaddr_in address; // 客户端地址
int sockfd; // 连接的文件描述符
char buf[BUFFER_SIZE]; // 数据缓冲区
tw_timer *timer; // 定时器
};

// 定时器类
class tw_timer
{
public:
// 构造函数
tw_timer(int rot, int ts)
: next(nullptr), prev(nullptr), rotation(rot), time_slot(ts) {}

public:
int rotation; // 记录定时器在时间轮转多少圈后生效
int time_slot; // 记录定时器属于哪个槽
void (*cb_func)(client_data *); // 定时器回调函数
client_data *user_data; // 用户数据
tw_timer *next; // 指向下一个定时器
tw_timer *prev; // 指向上一个定时器
};

// 时间轮类
class time_wheel
{
public:
// 构造函数,初始化时间轮的槽和当前槽位置
time_wheel()
: cur_slot(0), timer_fd(-1)
{
for (int i = 0; i < N; ++i)
{
slots[i] = nullptr;
}
setup_timerfd();
}

// 析构函数,释放所有定时器
~time_wheel()
{
for (int i = 0; i < N; ++i)
{
tw_timer *tmp = slots[i];
while (tmp)
{
slots[i] = tmp->next;
delete tmp;
tmp = slots[i];
}
}
close(timer_fd);
}

// 添加定时器
tw_timer *add_timer(int timeout)
{
if (timeout < 0)
{
return nullptr;
}
int ticks = 0;
if (timeout < TI)
{
ticks = 1;
}
else
{
ticks = timeout / TI;
}
int rotation = ticks / N;
int ts = (cur_slot + (ticks % N)) % N;
tw_timer *timer = new tw_timer(rotation, ts);
if (!slots[ts])
{
printf("add timer, rotation is %d, ts is %d, cur_slot is %d\n", rotation, ts, cur_slot);
slots[ts] = timer;
}
else
{
timer->next = slots[ts];
slots[ts]->prev = timer;
slots[ts] = timer;
}
return timer;
}

// 删除定时器
void del_timer(tw_timer *timer)
{
if (!timer)
{
return;
}
int ts = timer->time_slot;
if (timer == slots[ts])
{
slots[ts] = slots[ts]->next;
if (slots[ts])
{
slots[ts]->prev = nullptr;
}
delete timer;
}
else
{
timer->prev->next = timer->next;
if (timer->next)
{
timer->next->prev = timer->prev;
}
delete timer;
}
}

// 时间轮轮转一次
void tick()
{
tw_timer *tmp = slots[cur_slot];
printf("current slot is %d\n", cur_slot);
while (tmp)
{
printf("tick the timer once\n");
if (tmp->rotation > 0)
{
tmp->rotation--;
tmp = tmp->next;
}
else
{
tmp->cb_func(tmp->user_data);
if (tmp == slots[cur_slot])
{
printf("delete header in cur_slot\n");
slots[cur_slot] = tmp->next;
delete tmp;
if (slots[cur_slot])
{
slots[cur_slot]->prev = nullptr;
}
tmp = slots[cur_slot];
}
else
{
tmp->prev->next = tmp->next;
if (tmp->next)
{
tmp->next->prev = tmp->prev;
}
tw_timer *tmp2 = tmp->next;
delete tmp;
tmp = tmp2;
}
}
}
cur_slot = ++cur_slot % N;
}

// 获取 timerfd 文件描述符
int get_timer_fd() const
{
return timer_fd;
}

private:
// 初始化 timerfd
void setup_timerfd()
{
timer_fd = timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK | TFD_CLOEXEC);
assert(timer_fd != -1);

struct itimerspec new_value;
memset(&new_value, 0, sizeof(new_value));
new_value.it_value.tv_sec = TI; // 第一次超时时间
new_value.it_interval.tv_sec = TI; // 之后每次的超时时间
timerfd_settime(timer_fd, 0, &new_value, nullptr);
}

private:
static const int N = 60; // 槽的数量
static const int TI = 1; // 槽的时间间隔
tw_timer *slots[N]; // 时间轮的槽
int cur_slot; // 当前槽的位置
int timer_fd; // timerfd 文件描述符
};

// 工具函数:设置文件描述符为非阻塞
int setnonblocking(int fd)
{
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}

// 工具函数:将文件描述符添加到 epoll 中
void addfd(int epollfd, int fd)
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET; // 监听可读事件和边缘触发
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}

// 主循环,使用 epoll 监听事件
void epoll_main_loop(int listenfd, time_wheel &tw)
{
int epollfd = epoll_create(5);
assert(epollfd != -1);
epoll_event events[1024];

// 将监听描述符和 timerfd 加入到 epoll 中
addfd(epollfd, listenfd);
addfd(epollfd, tw.get_timer_fd());

while (true)
{
int num_events = epoll_wait(epollfd, events, 1024, -1);
if (num_events < 0 && errno != EINTR)
{
printf("epoll failure\n");
break;
}

for (int i = 0; i < num_events; ++i)
{
int sockfd = events[i].data.fd;
if (sockfd == listenfd)
{
// 处理新连接
sockaddr_in client_address;
socklen_t client_addrlen = sizeof(client_address);
int connfd = accept(listenfd, (sockaddr *)&client_address, &client_addrlen);
addfd(epollfd, connfd);

// 创建客户端数据并初始化
client_data *data = new client_data;
data->address = client_address;
data->sockfd = connfd;

// 创建定时器并绑定到客户端数据
tw_timer *timer = tw.add_timer(20); // 设置定时器时间为20秒
timer->user_data = data;
timer->cb_func = [](client_data *data)
{
close(data->sockfd);
printf("close fd %d\n", data->sockfd);
delete data;
};
data->timer = timer;
}
else if (sockfd == tw.get_timer_fd())
{
// 处理定时器事件
uint64_t exp;
read(sockfd, &exp, sizeof(uint64_t));
tw.tick();
}
else if (events[i].events & EPOLLIN)
{
// 处理客户端数据
client_data *data = nullptr;
char buf[BUFFER_SIZE];
int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
if (ret <= 0)
{
// 客户端关闭连接或出错,删除定时器并关闭 socket
tw.del_timer(data->timer);
close(sockfd);
delete data;
}
else
{
// 延长定时器的超时时间
tw.del_timer(data->timer);
tw_timer *timer = tw.add_timer(20);
timer->user_data = data;
timer->cb_func = [](client_data *data)
{
close(data->sockfd);
printf("close fd %d\n", data->sockfd);
delete data;
};
data->timer = timer;
}
}
}
}

close(epollfd);
}

int main()
{
int listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(listenfd != -1);

// 初始化监听 socket
sockaddr_in address;
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(8888);
int ret = bind(listenfd, (sockaddr *)&address, sizeof(address));
assert(ret != -1);
ret = listen(listenfd, 5);
assert(ret != -1);

// 创建时间轮
time_wheel tw;

// 进入 epoll 主循环
epoll_main_loop(listenfd, tw);

close(listenfd);
return 0;
}

业务逻辑:

  1. 时间轮类 (time_wheel):
    • 时间轮的实现没有太大变化,依然负责管理定时器的添加、删除和轮转操作。
  2. epoll 集成:
    • addfd() 函数:将文件描述符设置为非阻塞模式,并将其添加到 epoll 实例中。
    • epoll_main_loop() 函数:在主循环中使用 epoll_wait 监听事件,包括新连接、定时器超时以及客户端数据的到达。
  3. 定时器与客户端的关联:
    • 每当有新的客户端连接时,创建一个 client_data 结构并初始化,将其与一个新创建的定时器绑定。
    • 定时器到期时,关闭客户端连接并删除客户端数据。
  4. 处理定时器事件:
    • timerfd 超时时,tick() 函数会被调用,这会检查当前槽中的所有定时器并执行对应的回调函数。
  5. 处理客户端数据:
    • 当客户端数据到达时,读取数据并根据需要处理。如果数据读取成功,则延长客户端定时器的超时时间;如果读取失败(如客户端断开连接),则删除定时器并关闭连接。