一个人可以走的很快,一群人可以走的很远!

Boost.Asio 的核心组件包括:

  • io_context:事件调度器,管理所有的异步操作。
  • I/O 对象(如 tcp::socket、serial_port 等):用于执行异步 I/O 操作。
  • strand:用于保证回调函数的顺序执行。
  • 异步操作函数(如 async_read、async_write):发起异步 I/O 操作。【目前我认为是通过·回调机制以及用户线程池保证的】
  • 定时器(如 steady_timer):用于管理定时事件。

io_context

io_service

[这个部分感兴趣看看就好,也可以跳过io_service这节]

io_service的作用

io_servie 实现了一个任务队列,这里的任务就是void(void)的函数。Io_servie最常用的两个接口是post和run,post向任务队列中投递任务,run是执行队列中的任务,直到全部执行完毕,并且run可以被N个线程调用。Io_service是完全线程安全的队列。

Io_servie的接口

提供的接口有run、run_one、poll、poll_one、stop、reset、dispatch、post,最常用的是run、post、stop

Io_servie 实现代码的基本类结构:

  • Io_servie是接口类,为实现跨平台,采用了策略模式,所有接口均有impl_type实现。根据平台不同impl_type分为

  • win_iocp_io_service Win版本的实现,这里主要分析Linux版本。

    • task_io_service 非win平台下的实现,其代码结构为:
    • detail/task_io_service_fwd.hpp 简单声明task_io_service名称
    • detail/task_io_service.hpp 声明task_io_service的方法和属性
    • detail/impl/task_io_service.ipp 具体实现文件
    • 队列中的任务类型为opertioan,原型其实是typedef task_io_service_operation operation,其实现文件在detail/task_io_service_operation.hpp中,当队列中的任务被执行时,就是task_io_service_operation:: complete被调用的时候。

Io_servie::Post方法的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#include <functional>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <thread>
#include <iostream>
#include <sys/epoll.h>
#include <unistd.h>

class operation {
public:
virtual void execute() = 0;
virtual ~operation() {}
};

template <typename Handler>
class completion_handler : public operation {
public:
explicit completion_handler(Handler h) : handler_(std::move(h)) {}
void execute() override { handler_(); }
private:
Handler handler_;
};

class io_service {
public:
io_service() : outstanding_work_(0), first_idle_thread_(nullptr) {}

// 提交一个任务
template <typename Handler>
void post(Handler handler) {
auto op = std::make_shared<completion_handler<Handler>>(std::move(handler));
post_immediate_completion(op);
}

private:
void post_immediate_completion(std::shared_ptr<operation> op) {
// 增加未完成任务计数
++outstanding_work_;
post_deferred_completion(op);
}

void post_deferred_completion(std::shared_ptr<operation> op) {
std::unique_lock<std::mutex> lock(mutex_);
// 将任务加入队列
task_queue_.push(op);
// 尝试唤醒一个空闲线程
wake_one_thread_and_unlock(lock);
}

void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) {
// 如果有空闲线程,将其唤醒
if (first_idle_thread_) {
std::thread* idle_thread = first_idle_thread_;
first_idle_thread_ = nullptr;
lock.unlock();
idle_thread->detach();
} else {
// 处理阻塞在 epoll_wait 上的线程
// 在实际实现中,这将涉及更多复杂的逻辑
lock.unlock();
// 假设 epoll_wait 被中断并从任务队列中取出任务执行
std::cout << "No idle thread, handling epoll_wait interruption" << std::endl;
handle_epoll_wait_interruption();
}
}

void handle_epoll_wait_interruption() {
// 模拟从任务队列中取出任务并执行
std::shared_ptr<operation> op;
{
std::lock_guard<std::mutex> lock(mutex_);
if (!task_queue_.empty()) {
op = task_queue_.front();
task_queue_.pop();
}
}
if (op) {
op->execute();
}
// 继续阻塞在 epoll_wait 上
epoll_wait_simulation();
}

void epoll_wait_simulation() {
// 模拟 epoll_wait 的阻塞和等待
std::cout << "Simulating epoll_wait" << std::endl;
// 在实际实现中,这将是调用 epoll_wait 系统调用
}

private:
std::queue<std::shared_ptr<operation>> task_queue_;
std::mutex mutex_;
std::condition_variable cond_var_;
std::atomic<int> outstanding_work_;
std::thread* first_idle_thread_;
};

解释:

  1. post 方法:
    • post 方法接收一个 handler(实际是一个仿函数),并将其包装为 completion_handler 对象。
    • completion_handler 继承自 operation,并实现了 execute 方法,将 handler 作为任务执行。
  2. post_immediate_completion 方法:
    • post_immediate_completion 增加未完成任务计数 outstanding_work_,然后调用 post_deferred_completion
  3. post_deferred_completion 方法:
    • post_deferred_completion 方法首先加锁,然后将任务入列到 task_queue_ 中。
    • 然后它调用 wake_one_thread_and_unlock 尝试唤醒一个空闲线程。
  4. wake_one_thread_and_unlock 方法:
    • 该方法首先检查是否有空闲线程(通过 first_idle_thread_ 指针维护)。
    • 如果有空闲线程,则将其唤醒(在此示例中,使用 detach 模拟唤醒)。
    • 如果没有空闲线程,但有线程可能正在阻塞于 epoll_wait,则模拟中断 epoll_wait 并在中断后执行任务队列中的任务。
  5. handle_epoll_wait_interruption 方法:
    • 该方法模拟从 epoll_wait 中断后处理任务队列中的任务。
    • 执行任务后,它再次模拟调用 epoll_wait。在实际实现中,这将是一个真正的 epoll_wait 系统调用。

Io_servie::run方法的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
#include <functional>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <thread>
#include <iostream>
#include <sys/epoll.h>
#include <unistd.h>

// 定义一个操作基类
class operation {
public:
virtual void execute() = 0;
virtual ~operation() {}
};

// 特定任务的操作类
template <typename Handler>
class completion_handler : public operation {
public:
explicit completion_handler(Handler h) : handler_(std::move(h)) {}
void execute() override { handler_(); }
private:
Handler handler_;
};

// 用于管理线程的结构体
struct idle_thread_info {
idle_thread_info* next;
std::thread::id thread_id;
std::condition_variable cond_var;
};

// 简化版的 io_service 类
class io_service {
public:
io_service() : outstanding_work_(0), first_idle_thread_(nullptr), stopped_(false) {
// 初始化 epoll 文件描述符
epoll_fd_ = epoll_create1(0);
init_interrupt_fd();
}

~io_service() {
close(epoll_fd_);
}

// 提交一个任务
template <typename Handler>
void post(Handler handler) {
auto op = std::make_shared<completion_handler<Handler>>(std::move(handler));
post_immediate_completion(op);
}

// 执行所有任务
void run() {
idle_thread_info this_thread_info;
this_thread_info.thread_id = std::this_thread::get_id();
this_thread_info.next = nullptr;

std::unique_lock<std::mutex> lock(mutex_);

while (do_one(lock, this_thread_info)) {
// Loop until do_one returns false
}
}

private:
bool do_one(std::unique_lock<std::mutex>& lock, idle_thread_info& this_thread_info) {
if (task_queue_.empty()) {
// 将当前线程标记为空闲
this_thread_info.next = first_idle_thread_;
first_idle_thread_ = &this_thread_info;

// 阻塞等待新任务到来或被唤醒
this_thread_info.cond_var.wait(lock, [this] {
return !task_queue_.empty() || stopped_;
});

if (stopped_) {
return false;
}
}

// 从队列中取出一个任务
auto op = task_queue_.front();
task_queue_.pop();

if (task_queue_.empty()) {
lock.unlock();
} else {
wake_one_thread_and_unlock(lock);
}

// 执行任务
op->execute();

return true;
}

void post_immediate_completion(std::shared_ptr<operation> op) {
++outstanding_work_;
post_deferred_completion(op);
}

void post_deferred_completion(std::shared_ptr<operation> op) {
std::unique_lock<std::mutex> lock(mutex_);
// 将任务加入队列
task_queue_.push(op);
// 尝试唤醒一个空闲线程
wake_one_thread_and_unlock(lock);
}

void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) {
if (first_idle_thread_) {
idle_thread_info* idle_thread = first_idle_thread_;
first_idle_thread_ = idle_thread->next;
lock.unlock();
idle_thread->cond_var.notify_one();
} else {
lock.unlock();
interrupt_epoll_wait();
}
}

void interrupt_epoll_wait() {
uint64_t one = 1;
write(interrupt_fd_, &one, sizeof(one));
}

void init_interrupt_fd() {
interrupt_fd_ = eventfd(0, EFD_NONBLOCK);
epoll_event ev = {};
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = interrupt_fd_;
epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupt_fd_, &ev);
}

void epoll_wait_simulation() {
epoll_event events[10];
int nfds = epoll_wait(epoll_fd_, events, 10, -1);
for (int n = 0; n < nfds; ++n) {
if (events[n].data.fd == interrupt_fd_) {
uint64_t count;
read(interrupt_fd_, &count, sizeof(count));
std::cout << "epoll_wait interrupted!" << std::endl;
}
}
}

private:
std::queue<std::shared_ptr<operation>> task_queue_;
std::mutex mutex_;
std::condition_variable cond_var_;
std::atomic<int> outstanding_work_;
idle_thread_info* first_idle_thread_;
bool stopped_;
int epoll_fd_;
int interrupt_fd_;
};

解释

  1. run 方法:
    • run 方法是 io_service 的核心循环,负责执行所有任务,直到任务执行完毕或服务停止。
    • 它首先创建一个 idle_thread_info 对象,用于管理当前线程的状态。
    • 进入循环,不断调用 do_one 方法来处理任务。
  2. do_one 方法:
    • do_one 方法每次执行一个任务。
    • 如果任务队列为空,当前线程被标记为空闲,并进入等待状态,直到有新任务到来或服务停止。
    • 如果有任务,do_one 从任务队列中取出一个任务并执行。如果队列中还有其他任务,会尝试唤醒其他空闲线程来执行剩余的任务。
  3. post_immediate_completionpost_deferred_completion 方法:
    • 这两个方法负责将任务加入任务队列,并尝试唤醒空闲线程执行任务。
    • post_immediate_completion 增加未完成任务计数,然后调用 post_deferred_completion
    • post_deferred_completion 将任务加入任务队列后,调用 wake_one_thread_and_unlock 尝试唤醒空闲线程。
  4. wake_one_thread_and_unlock 方法:
    • 该方法尝试唤醒一个空闲线程。如果没有空闲线程,那么会调用 interrupt_epoll_wait 来中断阻塞在 epoll_wait 上的线程。
    • interrupt_epoll_wait 方法通过写入 eventfd 文件描述符来触发 epoll_wait 的中断。
  5. epoll_wait_simulationinit_interrupt_fd 方法:
    • epoll_wait_simulation 模拟 epoll_wait 的阻塞和处理中断的过程。
    • init_interrupt_fd 初始化一个 eventfd,并将其添加到 epoll 实例中,用于中断 epoll_wait

Io_servie::stop的实现

  1. 加锁:确保在多线程环境中对共享资源的访问是安全的。
  2. 调用 stop_all_threads:唤醒所有空闲线程,确保它们能够检测到 io_service 已经停止。
  3. 设置 stopped_ 标志:标记 io_service 已经停止,阻止新任务的执行,并通知所有正在等待的线程。
  4. 调用 task_interrupt 方法:中断正在运行的 epoll_wait 或其他阻塞操作,以确保所有线程都能及时退出。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#include <functional>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <thread>
#include <iostream>
#include <sys/epoll.h>
#include <unistd.h>

// 定义一个操作基类
class operation {
public:
virtual void execute() = 0;
virtual ~operation() {}
};

// 特定任务的操作类
template <typename Handler>
class completion_handler : public operation {
public:
explicit completion_handler(Handler h) : handler_(std::move(h)) {}
void execute() override { handler_(); }
private:
Handler handler_;
};

// 用于管理线程的结构体
struct idle_thread_info {
idle_thread_info* next;
std::thread::id thread_id;
std::condition_variable cond_var;
};

// 简化版的 io_service 类
class io_service {
public:
io_service() : outstanding_work_(0), first_idle_thread_(nullptr), stopped_(false), task_interrupted_(false) {
// 初始化 epoll 文件描述符
epoll_fd_ = epoll_create1(0);
init_interrupt_fd();
}

~io_service() {
close(epoll_fd_);
}

// 提交一个任务
template <typename Handler>
void post(Handler handler) {
auto op = std::make_shared<completion_handler<Handler>>(std::move(handler));
post_immediate_completion(op);
}

// 停止 io_service
void stop() {
std::unique_lock<std::mutex> lock(mutex_);
stop_all_threads();
stopped_ = true;

// 唤醒所有空闲线程
idle_thread_info* idle_thread = first_idle_thread_;
while (idle_thread) {
idle_thread->cond_var.notify_one();
idle_thread = idle_thread->next;
}

// 中断 task_ 的运行
task_interrupted_ = true;
interrupt_task();
}

private:
void stop_all_threads() {
// 在实际实现中,这里可能涉及更多的线程管理逻辑
std::cout << "Stopping all threads..." << std::endl;
}

void interrupt_task() {
// 中断 epoll_wait 或其他阻塞操作
interrupt_epoll_wait();
}

void interrupt_epoll_wait() {
uint64_t one = 1;
write(interrupt_fd_, &one, sizeof(one));
}

void init_interrupt_fd() {
interrupt_fd_ = eventfd(0, EFD_NONBLOCK);
epoll_event ev = {};
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = interrupt_fd_;
epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupt_fd_, &ev);
}

// 任务执行和线程唤醒的相关方法(省略,见前面部分代码)
void post_immediate_completion(std::shared_ptr<operation> op) {
// 增加未完成任务计数
++outstanding_work_;
post_deferred_completion(op);
}

void post_deferred_completion(std::shared_ptr<operation> op) {
std::unique_lock<std::mutex> lock(mutex_);
// 将任务加入队列
task_queue_.push(op);
// 尝试唤醒一个空闲线程
wake_one_thread_and_unlock(lock);
}

void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) {
if (first_idle_thread_) {
idle_thread_info* idle_thread = first_idle_thread_;
first_idle_thread_ = idle_thread->next;
lock.unlock();
idle_thread->cond_var.notify_one();
} else {
lock.unlock();
interrupt_epoll_wait();
}
}

private:
std::queue<std::shared_ptr<operation>> task_queue_;
std::mutex mutex_;
std::condition_variable cond_var_;
std::atomic<int> outstanding_work_;
idle_thread_info* first_idle_thread_;
bool stopped_;
bool task_interrupted_;
int epoll_fd_;
int interrupt_fd_;
};

Boost::asio io_service 实现分析对这篇文章的扩充

易理解

io_context 实际上是一个事件循环,它通过与操作系统的 I/O 多路复用机制(如 epollselect 等)及任务队列配合使用,以实现高效的异步编程。

io_context 使用了以下几个关键的数据结构:

  • 任务队列(Task Queue):存储需要执行的任务。
  • 调度器(Scheduler):负责管理任务队列,并与操作系统的 I/O 多路复用机制(如 epollselect)进行交互。
  • 执行器(Executor):用于调度任务的执行。

简易单线程版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
#include <iostream>
#include <functional>
#include <deque>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <thread>
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>
#include <cstring>
// 简化版的 io_context 类
class io_context
{
public:
io_context() : stopped_(false), epoll_fd_(::epoll_create1(0))
{
if (epoll_fd_ == -1)
{
throw std::runtime_error("Failed to create epoll file descriptor");
}
}

~io_context()
{
::close(epoll_fd_);
}

// post 方法将任务添加到任务队列中
template <typename CompletionHandler>
void post(CompletionHandler handler)
{
{
std::lock_guard<std::mutex> lock(queue_mutex_);
task_queue_.emplace_back(std::move(handler));
}
wake_up();
}

// 运行事件循环
void run()
{
while (!stopped_)
{
std::function<void()> task;
{
std::lock_guard<std::mutex> lock(queue_mutex_);
if (!task_queue_.empty())
{
task = std::move(task_queue_.front());
task_queue_.pop_front();
}
}
if (task)
{
task();
}
else
{
wait_for_events();
}
}
}

// 停止事件循环
void stop()
{
stopped_ = true;
wake_up();
}

// 重置停止状态
void restart()
{
stopped_ = false;
}

// 注册文件描述符到 epoll
void add_to_epoll(int fd, uint32_t events)
{
epoll_event event;
event.events = events;
event.data.fd = fd;
if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event) == -1)
{
throw std::runtime_error("Failed to add file descriptor to epoll");
}
}

private:
// 等待 I/O 事件的发生
void wait_for_events()
{
epoll_event events[MAX_EVENTS];
int num_events = ::epoll_wait(epoll_fd_, events, MAX_EVENTS, -1);
if (num_events == -1)
{
if (errno != EINTR)
{
throw std::runtime_error("epoll_wait failed");
}
return;
}

for (int i = 0; i < num_events; ++i)
{
handle_event(events[i]);
}
}

// 处理 I/O 事件
void handle_event(epoll_event &event)
{
if (event.events & EPOLLIN)
{
// 处理可读事件
std::cout << "Handle read event on fd: " << event.data.fd << std::endl;
char buffer[1024];
ssize_t n = ::read(event.data.fd, buffer, sizeof(buffer));
if (n > 0)
{
std::cout << "Read " << n << " bytes: " << std::string(buffer, n) << std::endl;
}
else if (n == 0)
{
std::cout << "Connection closed on fd: " << event.data.fd << std::endl;
::close(event.data.fd);
}
else
{
std::cerr << "Read error on fd: " << event.data.fd << std::endl;
}
}
}

// 唤醒事件循环
void wake_up()
{
// 在真正的实现中,可能会使用 eventfd 或管道来唤醒 epoll_wait
// 这里为了简化,假设其他线程的 post 操作已唤醒 epoll_wait
}

static const int MAX_EVENTS = 10;

bool stopped_;
int epoll_fd_;
std::deque<std::function<void()>> task_queue_;
std::mutex queue_mutex_;
};

// 示例 I/O 任务
void async_read(io_context &io_ctx, int fd)
{
io_ctx.add_to_epoll(fd, EPOLLIN);
}

int main()
{
io_context io_ctx;

// 模拟异步 I/O 操作
int pipe_fds[2];
::pipe(pipe_fds);
fcntl(pipe_fds[0], F_SETFL, O_NONBLOCK); // 设置为非阻塞模式

io_ctx.post([&io_ctx, pipe_fds]()
{ async_read(io_ctx, pipe_fds[0]); });

// 在另一个线程写入数据,模拟异步事件
std::thread writer([pipe_fds]()
{
::sleep(1);
const char* msg = "Hello from the other side!";
::write(pipe_fds[1], msg, strlen(msg));
::close(pipe_fds[1]); });

// 运行事件循环
io_ctx.run();

writer.join();

return 0;
}

当然,我们也可以用多线程版本的io_context来实现啦

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
#include <iostream>
#include <functional>
#include <deque>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <thread>
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>
#include <atomic>
#include <cstring>
// 多线程版的 io_context 类
class io_context
{
public:
io_context() : stopped_(false), work_count_(0), epoll_fd_(::epoll_create1(0))
{
if (epoll_fd_ == -1)
{
throw std::runtime_error("Failed to create epoll file descriptor");
}
}

~io_context()
{
::close(epoll_fd_);
}

// post 方法将任务添加到任务队列中
template <typename CompletionHandler>
void post(CompletionHandler handler)
{
{
std::lock_guard<std::mutex> lock(queue_mutex_);
task_queue_.emplace_back(std::move(handler));
++work_count_;
}
queue_condition_.notify_one(); // 唤醒一个等待的线程
}

// 运行事件循环
void run()
{
while (true)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queue_mutex_);
queue_condition_.wait(lock, [this]()
{ return stopped_ || !task_queue_.empty(); });

if (stopped_ && task_queue_.empty())
{
return;
}

task = std::move(task_queue_.front());
task_queue_.pop_front();
}

task();
--work_count_;

wait_for_events(); // 处理 I/O 事件
}
}

// 停止事件循环
void stop()
{
{
std::lock_guard<std::mutex> lock(queue_mutex_);
stopped_ = true;
}
queue_condition_.notify_all(); // 唤醒所有等待的线程以终止
}

// 重置停止状态
void restart()
{
stopped_ = false;
}

// 注册文件描述符到 epoll
void add_to_epoll(int fd, uint32_t events)
{
epoll_event event;
event.events = events;
event.data.fd = fd;
if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event) == -1)
{
throw std::runtime_error("Failed to add file descriptor to epoll");
}
}

private:
// 等待 I/O 事件的发生
void wait_for_events()
{
epoll_event events[MAX_EVENTS];
int num_events = ::epoll_wait(epoll_fd_, events, MAX_EVENTS, 0); // Non-blocking wait
if (num_events == -1)
{
if (errno != EINTR)
{
throw std::runtime_error("epoll_wait failed");
}
return;
}

for (int i = 0; i < num_events; ++i)
{
handle_event(events[i]);
}
}

// 处理 I/O 事件
void handle_event(epoll_event &event)
{
if (event.events & EPOLLIN)
{
// 处理可读事件
std::cout << "Handle read event on fd: " << event.data.fd << std::endl;
char buffer[1024];
ssize_t n = ::read(event.data.fd, buffer, sizeof(buffer));
if (n > 0)
{
std::cout << "Read " << n << " bytes: " << std::string(buffer, n) << std::endl;
}
else if (n == 0)
{
std::cout << "Connection closed on fd: " << event.data.fd << std::endl;
::close(event.data.fd);
}
else
{
std::cerr << "Read error on fd: " << event.data.fd << std::endl;
}
}
}

static const int MAX_EVENTS = 10;

std::atomic<bool> stopped_;
std::atomic<int> work_count_;
int epoll_fd_;
std::deque<std::function<void()>> task_queue_;
std::mutex queue_mutex_;
std::condition_variable queue_condition_;
};

int main()
{
io_context io_ctx;

// 创建线程池
std::vector<std::thread> thread_pool;
const int num_threads = std::thread::hardware_concurrency(); // 根据硬件并发数设置线程数

for (int i = 0; i < num_threads; ++i)
{
thread_pool.emplace_back([&io_ctx]()
{ io_ctx.run(); });
}

// 模拟异步 I/O 操作
int pipe_fds[2];
::pipe(pipe_fds);
fcntl(pipe_fds[0], F_SETFL, O_NONBLOCK); // 设置为非阻塞模式

io_ctx.post([&io_ctx, pipe_fds]()
{ io_ctx.add_to_epoll(pipe_fds[0], EPOLLIN); });

// 在另一个线程写入数据,模拟异步事件
std::thread writer([pipe_fds]()
{
::sleep(1);
const char* msg = "Hello from the other side!";
::write(pipe_fds[1], msg, strlen(msg));
::close(pipe_fds[1]); });

// 等待所有线程完成
for (auto &thread : thread_pool)
{
thread.join();
}

writer.join();

return 0;
}

I/O 对象

这是一个简化版的 I/O 对象类(basic_socket),它模拟了 boost::asio::basic_socket 的一些基本功能。这个类将展示如何通过 io_context 实现异步 I/O 操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#include <iostream>
#include <functional>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <boost/asio.hpp>

// 简化版的 basic_socket 类
class basic_socket {
public:
// 构造函数
basic_socket(boost::asio::io_context& io_context)
: io_context_(io_context), socket_fd_(-1) {}

// 析构函数,关闭 socket
~basic_socket() {
if (socket_fd_ != -1) {
::close(socket_fd_);
}
}

// 打开 socket
void open(int domain, int type, int protocol) {
socket_fd_ = ::socket(domain, type, protocol);
if (socket_fd_ == -1) {
throw std::runtime_error("Failed to create socket");
}
// 设置非阻塞模式
fcntl(socket_fd_, F_SETFL, O_NONBLOCK);
}

// 异步连接
void async_connect(const sockaddr_in& remote, std::function<void(const boost::system::error_code&)> handler) {
int result = ::connect(socket_fd_, (sockaddr*)&remote, sizeof(remote));
if (result == 0) {
// 立即成功
handler(boost::system::error_code());
} else if (errno == EINPROGRESS) {
// 连接正在进行中,注册到 io_context 的 epoll 中
io_context_.add_to_epoll(socket_fd_, EPOLLOUT);
/*
在异步连接操作完成后,使用 getsockopt 函数检查套接字的错误状态。
如果连接成功(即没有错误),则通过调用 handler 回调函数通知调用者操作成功。
如果连接失败,则通过 handler 将错误码传递给调用者,以便进行错误处理。
*/
// 将回调函数包装为一个 lambda 并 post 到 io_context
io_context_.post([this, handler]() {
int so_error;
socklen_t len = sizeof(so_error);
getsockopt(socket_fd_, SOL_SOCKET, SO_ERROR, &so_error, &len);
if (so_error == 0) {
handler(boost::system::error_code());
} else {
handler(boost::system::error_code(so_error, boost::asio::error::get_system_category()));
}
});
} else {
// 立即失败
handler(boost::system::error_code(errno, boost::asio::error::get_system_category()));
}
}

// 异步读
void async_read(char* buffer, std::size_t length, std::function<void(const boost::system::error_code&, std::size_t)> handler) {
io_context_.add_to_epoll(socket_fd_, EPOLLIN);

io_context_.post([this, buffer, length, handler]() {
ssize_t bytes_read = ::read(socket_fd_, buffer, length);
if (bytes_read >= 0) {
handler(boost::system::error_code(), bytes_read);
} else {
handler(boost::system::error_code(errno, boost::asio::error::get_system_category()), 0);
}
});
}

// 异步写
void async_write(const char* buffer, std::size_t length, std::function<void(const boost::system::error_code&, std::size_t)> handler) {
io_context_.add_to_epoll(socket_fd_, EPOLLOUT);

io_context_.post([this, buffer, length, handler]() {
ssize_t bytes_written = ::write(socket_fd_, buffer, length);
if (bytes_written >= 0) {
handler(boost::system::error_code(), bytes_written);
} else {
handler(boost::system::error_code(errno, boost::asio::error::get_system_category()), 0);
}
});
}

// 关闭 socket
void close() {
if (socket_fd_ != -1) {
::close(socket_fd_);
socket_fd_ = -1;
}
}

private:
boost::asio::io_context& io_context_; // 关联的 io_context
int socket_fd_; // 套接字文件描述符
};

主函数代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
int main() {
boost::asio::io_context io_ctx;

// 创建 I/O 对象
basic_socket socket(io_ctx);

// 打开一个 IPv4 TCP 套接字
socket.open(AF_INET, SOCK_STREAM, 0);

// 远程地址
sockaddr_in remote_addr;
remote_addr.sin_family = AF_INET;
remote_addr.sin_port = htons(8080);
inet_pton(AF_INET, "93.184.216.34", &remote_addr.sin_addr); // example.com

// 异步连接
socket.async_connect(remote_addr, [&socket](const boost::system::error_code& ec) {
if (!ec) {
std::cout << "Connected successfully!\n";

// 异步写入数据
const char* msg = "GET / HTTP/1.1\r\nHost: example.com\r\n\r\n";
socket.async_write(msg, strlen(msg), [&socket](const boost::system::error_code& ec, std::size_t bytes_transferred) {
if (!ec) {
std::cout << "Sent " << bytes_transferred << " bytes\n";

// 异步读取响应
char buffer[1024];
socket.async_read(buffer, sizeof(buffer), [](const boost::system::error_code& ec, std::size_t bytes_read) {
if (!ec) {
std::cout << "Received " << bytes_read << " bytes\n";
std::cout << std::string(buffer, bytes_read) << "\n";
} else {
std::cerr << "Read error: " << ec.message() << "\n";
}
});
} else {
std::cerr << "Write error: " << ec.message() << "\n";
}
});
} else {
std::cerr << "Connect error: " << ec.message() << "\n";
}
});

// 运行 io_context 事件循环
io_ctx.run();

return 0;
}

异步操作函数

https://www.cnblogs.com/my_life/articles/5329955.html

asio在linux下是模拟Proactor模式,在win下是真正的Proactor

这2部分代码结合起来看就更容易理解。

代码1.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void async_read(io_context& io_ctx, int socket_fd, char* buffer, size_t buffer_size, std::function<void(const boost::system::error_code&, size_t)> handler) {
// 将 socket_fd 注册到 epoll 中,监听可读事件
io_ctx.add_to_epoll(socket_fd, EPOLLIN);

// 将读取任务 post 到 io_context 中
io_ctx.post([socket_fd, buffer, buffer_size, handler]() {
ssize_t bytes_read = ::read(socket_fd, buffer, buffer_size);
if (bytes_read >= 0) {
handler(boost::system::error_code(), bytes_read);
}
else {
handler(boost::system::error_code(errno, boost::asio::error::get_system_category()), 0);
}
});
}

代码2.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <boost/asio.hpp>
#include <iostream>

void async_read_some_example(boost::asio::ip::tcp::socket& socket) {
char data[1024];

socket.async_read(boost::asio::buffer(data),
[&socket, &data](const boost::system::error_code& ec, std::size_t bytes_transferred) {
if (!ec) {
std::cout << "Read " << bytes_transferred << " bytes: "
<< std::string(data, bytes_transferred) << std::endl;

// 可以继续进行异步读取
async_read_some_example(socket);
} else {
std::cerr << "Error: " << ec.message() << std::endl;
}
}
);
}

int main() {
boost::asio::io_context io_context;
boost::asio::ip::tcp::socket socket(io_context);

boost::asio::ip::tcp::resolver resolver(io_context);
auto endpoints = resolver.resolve("example.com", "http");
boost::asio::connect(socket, endpoints);

async_read_some_example(socket);

io_context.run(); // 启动事件循环

return 0;
}

在 Boost.Asio 中,异步操作是通过事件驱动机制实现的,操作完成后回调函数被调用处理结果。

问题:linux下start_async_read函数中的data[1024]数据是由谁读取的,是内核还是用户态线程?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <boost/asio.hpp>
#include <iostream>

using boost::asio::ip::tcp;

void async_read_handler(const boost::system::error_code& ec, std::size_t bytes_transferred) {
if (!ec) {
std::cout << "Read " << bytes_transferred << " bytes" << std::endl;
} else {
std::cerr << "Error: " << ec.message() << std::endl;
}
}

void start_async_read(tcp::socket& socket) {
char data[1024];
socket.async_read_some(boost::asio::buffer(data), async_read_handler);
}

int main() {
boost::asio::io_context io_context;
tcp::socket socket(io_context);

// Assume connection is established
start_async_read(socket);

io_context.run(); // Starts the event loop

return 0;
}

start_async_read 函数中的 data[1024] 数据是由谁读取的?是内核还是用户态线程?

  • 内核负责接收数据:最初的数据是由内核从网络接口接收,并存储在内核态的 socket 缓冲区中。
  • 用户态线程负责拷贝数据:当异步读取操作完成时,io_context 线程会将数据从内核缓冲区拷贝到 data[1024] 这个用户态缓冲区中。

因此,读取数据的过程是由用户态线程完成的,但数据最初是由内核接收并存储的。内核完成数据的接收工作,而用户态线程负责将这些数据从内核缓冲区中拷贝到用户态的 data[1024] 缓冲区中。这个负责将数据从内核缓冲区拷贝到用户态 data[1024] 缓冲区中的线程,通常就是 io_context 线程池中的一个线程

真正能实现异步的需要使用win下的api:IOCP。深入理解Windows异步机制:IOCP的工作原理与应用

**这里其实遗留了一个问题:**通常会有多个文件描述符(如套接字)监听不同的事件(如可读、可写等)。当这些事件发生时,如何将事件与相应的任务(即处理这些事件的回调函数)对应起来呢?

感兴趣的可以看看源码部分!

结合协程

Boost.Asio 与协程结合起来可以提高效率,主要原因在于协程使得异步操作的代码看起来更像同步代码,从而更易于编写、理解和维护,同时减少了回调地狱(callback hell)的复杂性。

优势

  • 代码简化:协程使得异步代码更容易理解,因为它消除了大量的回调函数和状态管理代码。
  • 自然的控制流:协程允许你使用 await 或类似的操作来等待异步操作完成,代码结构更直观。
  • 减少上下文切换:协程在同一个线程内执行,不需要像线程那样频繁切换上下文,从而提高了性能。

对比

需求:

我们要实现一个简单的 TCP 服务器,它需要依次执行以下操作:

  • 接受客户端连接。
  • 从客户端读取数据。
  • 处理数据(假设只是简单的回显)。
  • 返回处理后的数据给客户端。

不使用协程的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#include <boost/asio.hpp>
#include <iostream>
#include <memory>

using boost::asio::ip::tcp;

class Session : public std::enable_shared_from_this<Session> {
public:
Session(tcp::socket socket) : socket_(std::move(socket)) {}

void start() {
do_read();
}

private:
void do_read() {
auto self(shared_from_this());
socket_.async_read_some(boost::asio::buffer(data_, max_length),
[this, self](boost::system::error_code ec, std::size_t length) {
if (!ec) {
do_write(length);
}
});
}

void do_write(std::size_t length) {
auto self(shared_from_this());
boost::asio::async_write(socket_, boost::asio::buffer(data_, length),
[this, self](boost::system::error_code ec, std::size_t /*length*/) {
if (!ec) {
do_read();
}
});
}

tcp::socket socket_;
enum { max_length = 1024 };
char data_[max_length];
};

class Server {
public:
Server(boost::asio::io_context& io_context, short port)
: acceptor_(io_context, tcp::endpoint(tcp::v4(), port)) {
do_accept();
}

private:
void do_accept() {
acceptor_.async_accept(
[this](boost::system::error_code ec, tcp::socket socket) {
if (!ec) {
std::make_shared<Session>(std::move(socket))->start();
}

do_accept();
});
}

tcp::acceptor acceptor_;
};

int main() {
try {
boost::asio::io_context io_context;
Server s(io_context, 12345);
io_context.run();
} catch (std::exception& e) {
std::cerr << "Exception: " << e.what() << std::endl;
}

return 0;
}

回调函数的数量

在这个例子中,你可以看到:

  • do_read():读取数据时使用了一个回调函数。
  • do_write():写入数据时使用了一个回调函数。
  • do_accept():接受连接时使用了一个回调函数。

使用协程的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#include <boost/asio.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <iostream>

using boost::asio::awaitable;
using boost::asio::co_spawn;
using boost::asio::detached;
using boost::asio::ip::tcp;
using boost::asio::use_awaitable;

awaitable<void> session(tcp::socket socket) {
try {
char data[1024];

while (true) {
// 异步读取数据
std::size_t n = co_await socket.async_read_some(boost::asio::buffer(data), use_awaitable);

// 异步写入数据
co_await boost::asio::async_write(socket, boost::asio::buffer(data, n), use_awaitable);
}
} catch (const std::exception& e) {
std::cerr << "Session exception: " << e.what() << std::endl;
}
}

awaitable<void> listener(boost::asio::io_context& io_context, short port) {
tcp::acceptor acceptor(io_context, tcp::endpoint(tcp::v4(), port));

while (true) {
// 异步接受连接
tcp::socket socket = co_await acceptor.async_accept(use_awaitable);

// 启动处理 session 的协程
co_spawn(io_context, session(std::move(socket)), detached);
}
}

int main() {
try {
boost::asio::io_context io_context;

// 启动监听协程
co_spawn(io_context, listener(io_context, 12345), detached);

io_context.run();
} catch (std::exception& e) {
std::cerr << "Main exception: " << e.what() << std::endl;
}

return 0;
}

回调函数的数量

在这个版本中,你可以看到:

  • session():协程函数内使用 co_await 关键字等待异步操作完成,代码看起来像同步代码,但实际上在每个 co_await 点都让出了执行权。
  • listener():同样通过 co_await 来等待异步接受连接。

总共只有 1 个回调函数,那就是 co_spawn 的回调,但这个回调是为了启动协程并不直接参与异步 I/O 操作。

扩展

coost

https://coostdocs.github.io/cn/about/co/

coost v3.0.0 (微型boost库)

coost-A tiny boost library in C++11github.com/idealvin/coost

coost 是一个兼具性能与易用性的跨平台 C++ 基础库,原名为 co,后改为 cocoyaxi,前者过短,后者过长,取中庸之道,又改为 coost。

为什么叫 coost 呢?以前有朋友称之为小型 boost 库,比 boost 小一点,那就叫 coost 好了。它有多小呢?在 linux 与 mac 上编译出来的静态库仅 1M 左右大小。虽然小,却提供了足够强大的功能:

  • 命令行参数与配置文件解析库(flag)
  • 高性能日志库(log)
  • 单元测试框架(unitest)
  • go-style 协程
  • 基于协程的网络编程框架
  • 高效 JSON 库
  • 基于 JSON 的 RPC 框架
  • 面向玄学编程
  • 原子操作(atomic)
  • 随机数生成器(random)
  • 高效字符流(fastream)
  • 高效字符串(fastring)
  • 字符串操作(str)
  • 时间库(time)
  • 线程库(thread)
  • 定时任务调度器
  • 高性能内存分配器
  • LruMap
  • hash 库
  • path 库
  • 文件系统操作(fs)
  • 系统操作(os)

本次发布的版本,直接从 v2.0.3 跳到了 v3.0.0,跨度非常之大,它在性能易用性稳定性等方面均有全面的提升。

协程

协程

coost 实现了类似 golang 中 goroutine 的协程机制,它有如下特性:

  • 支持多线程调度,默认线程数为系统 CPU 核数。
  • 共享栈,同一线程中的协程共用若干个栈(大小默认为 1MB),内存占用低。
  • 各协程之间为平级关系,可以在任何地方(包括在协程中)创建新的协程。
  • 支持协程同步事件、协程锁、channel、waitgroup 等协程同步机制。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include "co/co.h"

int main(int argc, char** argv) {
flag::parse(argc, argv);

co::wait_group wg;
wg.add(2);

go([wg](){
LOG << "hello world";
wg.done();
});

go([wg](){
LOG << "hello again";
wg.done();
});

wg.wait();
return 0;
}

上面的代码中,go() 创建的协程会分配到不同的调度线程中。用户也可以自行控制协程的调度:

1
2
3
4
5
6
7
8
9
// run f1 and f2 in the same scheduler
auto s = co::next_sched();
s->go(f1);
s->go(f2);

// run f in all schedulers
for (auto& s : co::scheds()) {
s->go(f);
}

网络编程

coost 提供了一套基于协程的网络编程框架:

  • 协程化的 socket API,形式上与系统 socket API 类似,熟悉 socket 编程的用户,可以轻松的用同步的方式写出高性能的网络程序。
  • TCPHTTPRPC 等高层网络编程组件,兼容 IPv6,同时支持 SSL,用起来比 socket API 更方便。

RPC server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#include "co/co.h"
#include "co/rpc.h"
#include "co/time.h"

int main(int argc, char** argv) {
flag::parse(argc, argv);

rpc::Server()
.add_service(new xx::HelloWorldImpl)
.start("127.0.0.1", 7788, "/xx");

for (;;) sleep::sec(80000);
return 0;
}

rpc::Server 同时支持 HTTP 协议,可以用 HTTP 的 POST 方法调用 RPC 服务:

1
curl http://127.0.0.1:7788/xx --request POST --data '{"api":"ping"}'