一个人可以走的很快,一群人可以走的很远!
Boost.Asio 的核心组件包括:
- io_context:事件调度器,管理所有的异步操作。
- I/O 对象(如 tcp::socket、serial_port 等):用于执行异步 I/O 操作。
- strand:用于保证回调函数的顺序执行。
- 异步操作函数(如 async_read、async_write):发起异步 I/O 操作。【目前我认为是通过·回调机制以及用户线程池保证的】
- 定时器(如 steady_timer):用于管理定时事件。
io_context
io_service
[这个部分感兴趣看看就好,也可以跳过io_service这节]
io_service的作用
io_servie 实现了一个任务队列,这里的任务就是void(void)的函数。Io_servie最常用的两个接口是post和run,post向任务队列中投递任务,run是执行队列中的任务,直到全部执行完毕,并且run可以被N个线程调用。Io_service是完全线程安全的队列。
Io_servie的接口
提供的接口有run、run_one、poll、poll_one、stop、reset、dispatch、post,最常用的是run、post、stop
Io_servie 实现代码的基本类结构:
Io_servie::Post方法的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
| #include <functional> #include <queue> #include <mutex> #include <condition_variable> #include <vector> #include <thread> #include <iostream> #include <sys/epoll.h> #include <unistd.h>
class operation { public: virtual void execute() = 0; virtual ~operation() {} };
template <typename Handler> class completion_handler : public operation { public: explicit completion_handler(Handler h) : handler_(std::move(h)) {} void execute() override { handler_(); } private: Handler handler_; };
class io_service { public: io_service() : outstanding_work_(0), first_idle_thread_(nullptr) {}
template <typename Handler> void post(Handler handler) { auto op = std::make_shared<completion_handler<Handler>>(std::move(handler)); post_immediate_completion(op); }
private: void post_immediate_completion(std::shared_ptr<operation> op) { ++outstanding_work_; post_deferred_completion(op); }
void post_deferred_completion(std::shared_ptr<operation> op) { std::unique_lock<std::mutex> lock(mutex_); task_queue_.push(op); wake_one_thread_and_unlock(lock); }
void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) { if (first_idle_thread_) { std::thread* idle_thread = first_idle_thread_; first_idle_thread_ = nullptr; lock.unlock(); idle_thread->detach(); } else { lock.unlock(); std::cout << "No idle thread, handling epoll_wait interruption" << std::endl; handle_epoll_wait_interruption(); } }
void handle_epoll_wait_interruption() { std::shared_ptr<operation> op; { std::lock_guard<std::mutex> lock(mutex_); if (!task_queue_.empty()) { op = task_queue_.front(); task_queue_.pop(); } } if (op) { op->execute(); } epoll_wait_simulation(); }
void epoll_wait_simulation() { std::cout << "Simulating epoll_wait" << std::endl; }
private: std::queue<std::shared_ptr<operation>> task_queue_; std::mutex mutex_; std::condition_variable cond_var_; std::atomic<int> outstanding_work_; std::thread* first_idle_thread_; };
|
解释:
post
方法:
post
方法接收一个 handler
(实际是一个仿函数),并将其包装为 completion_handler
对象。
completion_handler
继承自 operation
,并实现了 execute
方法,将 handler
作为任务执行。
post_immediate_completion
方法:
post_immediate_completion
增加未完成任务计数 outstanding_work_
,然后调用 post_deferred_completion
。
post_deferred_completion
方法:
post_deferred_completion
方法首先加锁,然后将任务入列到 task_queue_
中。
- 然后它调用
wake_one_thread_and_unlock
尝试唤醒一个空闲线程。
wake_one_thread_and_unlock
方法:
- 该方法首先检查是否有空闲线程(通过
first_idle_thread_
指针维护)。
- 如果有空闲线程,则将其唤醒(在此示例中,使用
detach
模拟唤醒)。
- 如果没有空闲线程,但有线程可能正在阻塞于
epoll_wait
,则模拟中断 epoll_wait
并在中断后执行任务队列中的任务。
handle_epoll_wait_interruption
方法:
- 该方法模拟从
epoll_wait
中断后处理任务队列中的任务。
- 执行任务后,它再次模拟调用
epoll_wait
。在实际实现中,这将是一个真正的 epoll_wait
系统调用。
Io_servie::run方法的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
| #include <functional> #include <queue> #include <mutex> #include <condition_variable> #include <vector> #include <thread> #include <iostream> #include <sys/epoll.h> #include <unistd.h>
class operation { public: virtual void execute() = 0; virtual ~operation() {} };
template <typename Handler> class completion_handler : public operation { public: explicit completion_handler(Handler h) : handler_(std::move(h)) {} void execute() override { handler_(); } private: Handler handler_; };
struct idle_thread_info { idle_thread_info* next; std::thread::id thread_id; std::condition_variable cond_var; };
class io_service { public: io_service() : outstanding_work_(0), first_idle_thread_(nullptr), stopped_(false) { epoll_fd_ = epoll_create1(0); init_interrupt_fd(); }
~io_service() { close(epoll_fd_); }
template <typename Handler> void post(Handler handler) { auto op = std::make_shared<completion_handler<Handler>>(std::move(handler)); post_immediate_completion(op); }
void run() { idle_thread_info this_thread_info; this_thread_info.thread_id = std::this_thread::get_id(); this_thread_info.next = nullptr;
std::unique_lock<std::mutex> lock(mutex_);
while (do_one(lock, this_thread_info)) { } }
private: bool do_one(std::unique_lock<std::mutex>& lock, idle_thread_info& this_thread_info) { if (task_queue_.empty()) { this_thread_info.next = first_idle_thread_; first_idle_thread_ = &this_thread_info;
this_thread_info.cond_var.wait(lock, [this] { return !task_queue_.empty() || stopped_; });
if (stopped_) { return false; } }
auto op = task_queue_.front(); task_queue_.pop();
if (task_queue_.empty()) { lock.unlock(); } else { wake_one_thread_and_unlock(lock); }
op->execute();
return true; }
void post_immediate_completion(std::shared_ptr<operation> op) { ++outstanding_work_; post_deferred_completion(op); }
void post_deferred_completion(std::shared_ptr<operation> op) { std::unique_lock<std::mutex> lock(mutex_); task_queue_.push(op); wake_one_thread_and_unlock(lock); }
void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) { if (first_idle_thread_) { idle_thread_info* idle_thread = first_idle_thread_; first_idle_thread_ = idle_thread->next; lock.unlock(); idle_thread->cond_var.notify_one(); } else { lock.unlock(); interrupt_epoll_wait(); } }
void interrupt_epoll_wait() { uint64_t one = 1; write(interrupt_fd_, &one, sizeof(one)); }
void init_interrupt_fd() { interrupt_fd_ = eventfd(0, EFD_NONBLOCK); epoll_event ev = {}; ev.events = EPOLLIN | EPOLLET; ev.data.fd = interrupt_fd_; epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupt_fd_, &ev); }
void epoll_wait_simulation() { epoll_event events[10]; int nfds = epoll_wait(epoll_fd_, events, 10, -1); for (int n = 0; n < nfds; ++n) { if (events[n].data.fd == interrupt_fd_) { uint64_t count; read(interrupt_fd_, &count, sizeof(count)); std::cout << "epoll_wait interrupted!" << std::endl; } } }
private: std::queue<std::shared_ptr<operation>> task_queue_; std::mutex mutex_; std::condition_variable cond_var_; std::atomic<int> outstanding_work_; idle_thread_info* first_idle_thread_; bool stopped_; int epoll_fd_; int interrupt_fd_; };
|
解释
run
方法:
run
方法是 io_service
的核心循环,负责执行所有任务,直到任务执行完毕或服务停止。
- 它首先创建一个
idle_thread_info
对象,用于管理当前线程的状态。
- 进入循环,不断调用
do_one
方法来处理任务。
do_one
方法:
do_one
方法每次执行一个任务。
- 如果任务队列为空,当前线程被标记为空闲,并进入等待状态,直到有新任务到来或服务停止。
- 如果有任务,
do_one
从任务队列中取出一个任务并执行。如果队列中还有其他任务,会尝试唤醒其他空闲线程来执行剩余的任务。
post_immediate_completion
和 post_deferred_completion
方法:
- 这两个方法负责将任务加入任务队列,并尝试唤醒空闲线程执行任务。
post_immediate_completion
增加未完成任务计数,然后调用 post_deferred_completion
。
post_deferred_completion
将任务加入任务队列后,调用 wake_one_thread_and_unlock
尝试唤醒空闲线程。
wake_one_thread_and_unlock
方法:
- 该方法尝试唤醒一个空闲线程。如果没有空闲线程,那么会调用
interrupt_epoll_wait
来中断阻塞在 epoll_wait
上的线程。
interrupt_epoll_wait
方法通过写入 eventfd
文件描述符来触发 epoll_wait
的中断。
epoll_wait_simulation
和 init_interrupt_fd
方法:
epoll_wait_simulation
模拟 epoll_wait
的阻塞和处理中断的过程。
init_interrupt_fd
初始化一个 eventfd
,并将其添加到 epoll
实例中,用于中断 epoll_wait
。
Io_servie::stop的实现
- 加锁:确保在多线程环境中对共享资源的访问是安全的。
- 调用
stop_all_threads
:唤醒所有空闲线程,确保它们能够检测到 io_service
已经停止。
- 设置
stopped_
标志:标记 io_service
已经停止,阻止新任务的执行,并通知所有正在等待的线程。
- 调用
task_
的 interrupt
方法:中断正在运行的 epoll_wait
或其他阻塞操作,以确保所有线程都能及时退出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
| #include <functional> #include <queue> #include <mutex> #include <condition_variable> #include <vector> #include <thread> #include <iostream> #include <sys/epoll.h> #include <unistd.h>
class operation { public: virtual void execute() = 0; virtual ~operation() {} };
template <typename Handler> class completion_handler : public operation { public: explicit completion_handler(Handler h) : handler_(std::move(h)) {} void execute() override { handler_(); } private: Handler handler_; };
struct idle_thread_info { idle_thread_info* next; std::thread::id thread_id; std::condition_variable cond_var; };
class io_service { public: io_service() : outstanding_work_(0), first_idle_thread_(nullptr), stopped_(false), task_interrupted_(false) { epoll_fd_ = epoll_create1(0); init_interrupt_fd(); }
~io_service() { close(epoll_fd_); }
template <typename Handler> void post(Handler handler) { auto op = std::make_shared<completion_handler<Handler>>(std::move(handler)); post_immediate_completion(op); }
void stop() { std::unique_lock<std::mutex> lock(mutex_); stop_all_threads(); stopped_ = true;
idle_thread_info* idle_thread = first_idle_thread_; while (idle_thread) { idle_thread->cond_var.notify_one(); idle_thread = idle_thread->next; }
task_interrupted_ = true; interrupt_task(); }
private: void stop_all_threads() { std::cout << "Stopping all threads..." << std::endl; }
void interrupt_task() { interrupt_epoll_wait(); }
void interrupt_epoll_wait() { uint64_t one = 1; write(interrupt_fd_, &one, sizeof(one)); }
void init_interrupt_fd() { interrupt_fd_ = eventfd(0, EFD_NONBLOCK); epoll_event ev = {}; ev.events = EPOLLIN | EPOLLET; ev.data.fd = interrupt_fd_; epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupt_fd_, &ev); }
void post_immediate_completion(std::shared_ptr<operation> op) { ++outstanding_work_; post_deferred_completion(op); }
void post_deferred_completion(std::shared_ptr<operation> op) { std::unique_lock<std::mutex> lock(mutex_); task_queue_.push(op); wake_one_thread_and_unlock(lock); }
void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) { if (first_idle_thread_) { idle_thread_info* idle_thread = first_idle_thread_; first_idle_thread_ = idle_thread->next; lock.unlock(); idle_thread->cond_var.notify_one(); } else { lock.unlock(); interrupt_epoll_wait(); } }
private: std::queue<std::shared_ptr<operation>> task_queue_; std::mutex mutex_; std::condition_variable cond_var_; std::atomic<int> outstanding_work_; idle_thread_info* first_idle_thread_; bool stopped_; bool task_interrupted_; int epoll_fd_; int interrupt_fd_; };
|
Boost::asio io_service 实现分析对这篇文章的扩充
易理解
io_context
实际上是一个事件循环,它通过与操作系统的 I/O 多路复用机制(如 epoll
、select
等)及任务队列配合使用,以实现高效的异步编程。
io_context
使用了以下几个关键的数据结构:
- 任务队列(Task Queue):存储需要执行的任务。
- 调度器(Scheduler):负责管理任务队列,并与操作系统的 I/O 多路复用机制(如
epoll
、select
)进行交互。
- 执行器(Executor):用于调度任务的执行。
简易单线程版本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
| #include <iostream> #include <functional> #include <deque> #include <mutex> #include <condition_variable> #include <vector> #include <thread> #include <sys/epoll.h> #include <unistd.h> #include <fcntl.h> #include <cstring>
class io_context { public: io_context() : stopped_(false), epoll_fd_(::epoll_create1(0)) { if (epoll_fd_ == -1) { throw std::runtime_error("Failed to create epoll file descriptor"); } }
~io_context() { ::close(epoll_fd_); }
template <typename CompletionHandler> void post(CompletionHandler handler) { { std::lock_guard<std::mutex> lock(queue_mutex_); task_queue_.emplace_back(std::move(handler)); } wake_up(); }
void run() { while (!stopped_) { std::function<void()> task; { std::lock_guard<std::mutex> lock(queue_mutex_); if (!task_queue_.empty()) { task = std::move(task_queue_.front()); task_queue_.pop_front(); } } if (task) { task(); } else { wait_for_events(); } } }
void stop() { stopped_ = true; wake_up(); }
void restart() { stopped_ = false; }
void add_to_epoll(int fd, uint32_t events) { epoll_event event; event.events = events; event.data.fd = fd; if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event) == -1) { throw std::runtime_error("Failed to add file descriptor to epoll"); } }
private: void wait_for_events() { epoll_event events[MAX_EVENTS]; int num_events = ::epoll_wait(epoll_fd_, events, MAX_EVENTS, -1); if (num_events == -1) { if (errno != EINTR) { throw std::runtime_error("epoll_wait failed"); } return; }
for (int i = 0; i < num_events; ++i) { handle_event(events[i]); } }
void handle_event(epoll_event &event) { if (event.events & EPOLLIN) { std::cout << "Handle read event on fd: " << event.data.fd << std::endl; char buffer[1024]; ssize_t n = ::read(event.data.fd, buffer, sizeof(buffer)); if (n > 0) { std::cout << "Read " << n << " bytes: " << std::string(buffer, n) << std::endl; } else if (n == 0) { std::cout << "Connection closed on fd: " << event.data.fd << std::endl; ::close(event.data.fd); } else { std::cerr << "Read error on fd: " << event.data.fd << std::endl; } } }
void wake_up() { }
static const int MAX_EVENTS = 10;
bool stopped_; int epoll_fd_; std::deque<std::function<void()>> task_queue_; std::mutex queue_mutex_; };
void async_read(io_context &io_ctx, int fd) { io_ctx.add_to_epoll(fd, EPOLLIN); }
int main() { io_context io_ctx;
int pipe_fds[2]; ::pipe(pipe_fds); fcntl(pipe_fds[0], F_SETFL, O_NONBLOCK);
io_ctx.post([&io_ctx, pipe_fds]() { async_read(io_ctx, pipe_fds[0]); });
std::thread writer([pipe_fds]() { ::sleep(1); const char* msg = "Hello from the other side!"; ::write(pipe_fds[1], msg, strlen(msg)); ::close(pipe_fds[1]); });
io_ctx.run();
writer.join();
return 0; }
|
当然,我们也可以用多线程版本的io_context来实现啦
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
| #include <iostream> #include <functional> #include <deque> #include <mutex> #include <condition_variable> #include <vector> #include <thread> #include <sys/epoll.h> #include <unistd.h> #include <fcntl.h> #include <atomic> #include <cstring>
class io_context { public: io_context() : stopped_(false), work_count_(0), epoll_fd_(::epoll_create1(0)) { if (epoll_fd_ == -1) { throw std::runtime_error("Failed to create epoll file descriptor"); } }
~io_context() { ::close(epoll_fd_); }
template <typename CompletionHandler> void post(CompletionHandler handler) { { std::lock_guard<std::mutex> lock(queue_mutex_); task_queue_.emplace_back(std::move(handler)); ++work_count_; } queue_condition_.notify_one(); }
void run() { while (true) { std::function<void()> task; { std::unique_lock<std::mutex> lock(queue_mutex_); queue_condition_.wait(lock, [this]() { return stopped_ || !task_queue_.empty(); });
if (stopped_ && task_queue_.empty()) { return; }
task = std::move(task_queue_.front()); task_queue_.pop_front(); }
task(); --work_count_;
wait_for_events(); } }
void stop() { { std::lock_guard<std::mutex> lock(queue_mutex_); stopped_ = true; } queue_condition_.notify_all(); }
void restart() { stopped_ = false; }
void add_to_epoll(int fd, uint32_t events) { epoll_event event; event.events = events; event.data.fd = fd; if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event) == -1) { throw std::runtime_error("Failed to add file descriptor to epoll"); } }
private: void wait_for_events() { epoll_event events[MAX_EVENTS]; int num_events = ::epoll_wait(epoll_fd_, events, MAX_EVENTS, 0); if (num_events == -1) { if (errno != EINTR) { throw std::runtime_error("epoll_wait failed"); } return; }
for (int i = 0; i < num_events; ++i) { handle_event(events[i]); } }
void handle_event(epoll_event &event) { if (event.events & EPOLLIN) { std::cout << "Handle read event on fd: " << event.data.fd << std::endl; char buffer[1024]; ssize_t n = ::read(event.data.fd, buffer, sizeof(buffer)); if (n > 0) { std::cout << "Read " << n << " bytes: " << std::string(buffer, n) << std::endl; } else if (n == 0) { std::cout << "Connection closed on fd: " << event.data.fd << std::endl; ::close(event.data.fd); } else { std::cerr << "Read error on fd: " << event.data.fd << std::endl; } } }
static const int MAX_EVENTS = 10;
std::atomic<bool> stopped_; std::atomic<int> work_count_; int epoll_fd_; std::deque<std::function<void()>> task_queue_; std::mutex queue_mutex_; std::condition_variable queue_condition_; };
int main() { io_context io_ctx;
std::vector<std::thread> thread_pool; const int num_threads = std::thread::hardware_concurrency();
for (int i = 0; i < num_threads; ++i) { thread_pool.emplace_back([&io_ctx]() { io_ctx.run(); }); }
int pipe_fds[2]; ::pipe(pipe_fds); fcntl(pipe_fds[0], F_SETFL, O_NONBLOCK);
io_ctx.post([&io_ctx, pipe_fds]() { io_ctx.add_to_epoll(pipe_fds[0], EPOLLIN); });
std::thread writer([pipe_fds]() { ::sleep(1); const char* msg = "Hello from the other side!"; ::write(pipe_fds[1], msg, strlen(msg)); ::close(pipe_fds[1]); });
for (auto &thread : thread_pool) { thread.join(); }
writer.join();
return 0; }
|
I/O 对象
这是一个简化版的 I/O 对象类(basic_socket
),它模拟了 boost::asio::basic_socket
的一些基本功能。这个类将展示如何通过 io_context
实现异步 I/O 操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
| #include <iostream> #include <functional> #include <sys/socket.h> #include <arpa/inet.h> #include <unistd.h> #include <fcntl.h> #include <boost/asio.hpp>
class basic_socket { public: basic_socket(boost::asio::io_context& io_context) : io_context_(io_context), socket_fd_(-1) {}
~basic_socket() { if (socket_fd_ != -1) { ::close(socket_fd_); } }
void open(int domain, int type, int protocol) { socket_fd_ = ::socket(domain, type, protocol); if (socket_fd_ == -1) { throw std::runtime_error("Failed to create socket"); } fcntl(socket_fd_, F_SETFL, O_NONBLOCK); }
void async_connect(const sockaddr_in& remote, std::function<void(const boost::system::error_code&)> handler) { int result = ::connect(socket_fd_, (sockaddr*)&remote, sizeof(remote)); if (result == 0) { handler(boost::system::error_code()); } else if (errno == EINPROGRESS) { io_context_.add_to_epoll(socket_fd_, EPOLLOUT);
io_context_.post([this, handler]() { int so_error; socklen_t len = sizeof(so_error); getsockopt(socket_fd_, SOL_SOCKET, SO_ERROR, &so_error, &len); if (so_error == 0) { handler(boost::system::error_code()); } else { handler(boost::system::error_code(so_error, boost::asio::error::get_system_category())); } }); } else { handler(boost::system::error_code(errno, boost::asio::error::get_system_category())); } }
void async_read(char* buffer, std::size_t length, std::function<void(const boost::system::error_code&, std::size_t)> handler) { io_context_.add_to_epoll(socket_fd_, EPOLLIN);
io_context_.post([this, buffer, length, handler]() { ssize_t bytes_read = ::read(socket_fd_, buffer, length); if (bytes_read >= 0) { handler(boost::system::error_code(), bytes_read); } else { handler(boost::system::error_code(errno, boost::asio::error::get_system_category()), 0); } }); }
void async_write(const char* buffer, std::size_t length, std::function<void(const boost::system::error_code&, std::size_t)> handler) { io_context_.add_to_epoll(socket_fd_, EPOLLOUT);
io_context_.post([this, buffer, length, handler]() { ssize_t bytes_written = ::write(socket_fd_, buffer, length); if (bytes_written >= 0) { handler(boost::system::error_code(), bytes_written); } else { handler(boost::system::error_code(errno, boost::asio::error::get_system_category()), 0); } }); }
void close() { if (socket_fd_ != -1) { ::close(socket_fd_); socket_fd_ = -1; } }
private: boost::asio::io_context& io_context_; int socket_fd_; };
|
主函数代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| int main() { boost::asio::io_context io_ctx;
basic_socket socket(io_ctx);
socket.open(AF_INET, SOCK_STREAM, 0);
sockaddr_in remote_addr; remote_addr.sin_family = AF_INET; remote_addr.sin_port = htons(8080); inet_pton(AF_INET, "93.184.216.34", &remote_addr.sin_addr);
socket.async_connect(remote_addr, [&socket](const boost::system::error_code& ec) { if (!ec) { std::cout << "Connected successfully!\n";
const char* msg = "GET / HTTP/1.1\r\nHost: example.com\r\n\r\n"; socket.async_write(msg, strlen(msg), [&socket](const boost::system::error_code& ec, std::size_t bytes_transferred) { if (!ec) { std::cout << "Sent " << bytes_transferred << " bytes\n";
char buffer[1024]; socket.async_read(buffer, sizeof(buffer), [](const boost::system::error_code& ec, std::size_t bytes_read) { if (!ec) { std::cout << "Received " << bytes_read << " bytes\n"; std::cout << std::string(buffer, bytes_read) << "\n"; } else { std::cerr << "Read error: " << ec.message() << "\n"; } }); } else { std::cerr << "Write error: " << ec.message() << "\n"; } }); } else { std::cerr << "Connect error: " << ec.message() << "\n"; } });
io_ctx.run();
return 0; }
|
异步操作函数
https://www.cnblogs.com/my_life/articles/5329955.html
asio在linux下是模拟Proactor模式,在win下是真正的Proactor
这2部分代码结合起来看就更容易理解。
代码1.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| void async_read(io_context& io_ctx, int socket_fd, char* buffer, size_t buffer_size, std::function<void(const boost::system::error_code&, size_t)> handler) { io_ctx.add_to_epoll(socket_fd, EPOLLIN);
io_ctx.post([socket_fd, buffer, buffer_size, handler]() { ssize_t bytes_read = ::read(socket_fd, buffer, buffer_size); if (bytes_read >= 0) { handler(boost::system::error_code(), bytes_read); } else { handler(boost::system::error_code(errno, boost::asio::error::get_system_category()), 0); } }); }
|
代码2.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| #include <boost/asio.hpp> #include <iostream>
void async_read_some_example(boost::asio::ip::tcp::socket& socket) { char data[1024];
socket.async_read(boost::asio::buffer(data), [&socket, &data](const boost::system::error_code& ec, std::size_t bytes_transferred) { if (!ec) { std::cout << "Read " << bytes_transferred << " bytes: " << std::string(data, bytes_transferred) << std::endl;
async_read_some_example(socket); } else { std::cerr << "Error: " << ec.message() << std::endl; } } ); }
int main() { boost::asio::io_context io_context; boost::asio::ip::tcp::socket socket(io_context);
boost::asio::ip::tcp::resolver resolver(io_context); auto endpoints = resolver.resolve("example.com", "http"); boost::asio::connect(socket, endpoints);
async_read_some_example(socket);
io_context.run();
return 0; }
|
在 Boost.Asio 中,异步操作是通过事件驱动机制实现的,操作完成后回调函数被调用处理结果。
问题:linux下start_async_read函数中的data[1024]数据是由谁读取的,是内核还是用户态线程?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| #include <boost/asio.hpp> #include <iostream>
using boost::asio::ip::tcp;
void async_read_handler(const boost::system::error_code& ec, std::size_t bytes_transferred) { if (!ec) { std::cout << "Read " << bytes_transferred << " bytes" << std::endl; } else { std::cerr << "Error: " << ec.message() << std::endl; } }
void start_async_read(tcp::socket& socket) { char data[1024]; socket.async_read_some(boost::asio::buffer(data), async_read_handler); }
int main() { boost::asio::io_context io_context; tcp::socket socket(io_context);
start_async_read(socket);
io_context.run();
return 0; }
|
start_async_read
函数中的 data[1024]
数据是由谁读取的?是内核还是用户态线程?
- 内核负责接收数据:最初的数据是由内核从网络接口接收,并存储在内核态的 socket 缓冲区中。
- 用户态线程负责拷贝数据:当异步读取操作完成时,
io_context
线程会将数据从内核缓冲区拷贝到 data[1024]
这个用户态缓冲区中。
因此,读取数据的过程是由用户态线程完成的,但数据最初是由内核接收并存储的。内核完成数据的接收工作,而用户态线程负责将这些数据从内核缓冲区中拷贝到用户态的 data[1024]
缓冲区中。这个负责将数据从内核缓冲区拷贝到用户态 data[1024]
缓冲区中的线程,通常就是 io_context
线程池中的一个线程。
真正能实现异步的需要使用win下的api:IOCP。深入理解Windows异步机制:IOCP的工作原理与应用
**这里其实遗留了一个问题:**通常会有多个文件描述符(如套接字)监听不同的事件(如可读、可写等)。当这些事件发生时,如何将事件与相应的任务(即处理这些事件的回调函数)对应起来呢?
感兴趣的可以看看源码部分!
结合协程
Boost.Asio
与协程结合起来可以提高效率,主要原因在于协程使得异步操作的代码看起来更像同步代码,从而更易于编写、理解和维护,同时减少了回调地狱(callback hell)的复杂性。
优势
- 代码简化:协程使得异步代码更容易理解,因为它消除了大量的回调函数和状态管理代码。
- 自然的控制流:协程允许你使用
await
或类似的操作来等待异步操作完成,代码结构更直观。
- 减少上下文切换:协程在同一个线程内执行,不需要像线程那样频繁切换上下文,从而提高了性能。
对比
需求:
我们要实现一个简单的 TCP 服务器,它需要依次执行以下操作:
- 接受客户端连接。
- 从客户端读取数据。
- 处理数据(假设只是简单的回显)。
- 返回处理后的数据给客户端。
不使用协程的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| #include <boost/asio.hpp> #include <iostream> #include <memory>
using boost::asio::ip::tcp;
class Session : public std::enable_shared_from_this<Session> { public: Session(tcp::socket socket) : socket_(std::move(socket)) {}
void start() { do_read(); }
private: void do_read() { auto self(shared_from_this()); socket_.async_read_some(boost::asio::buffer(data_, max_length), [this, self](boost::system::error_code ec, std::size_t length) { if (!ec) { do_write(length); } }); }
void do_write(std::size_t length) { auto self(shared_from_this()); boost::asio::async_write(socket_, boost::asio::buffer(data_, length), [this, self](boost::system::error_code ec, std::size_t ) { if (!ec) { do_read(); } }); }
tcp::socket socket_; enum { max_length = 1024 }; char data_[max_length]; };
class Server { public: Server(boost::asio::io_context& io_context, short port) : acceptor_(io_context, tcp::endpoint(tcp::v4(), port)) { do_accept(); }
private: void do_accept() { acceptor_.async_accept( [this](boost::system::error_code ec, tcp::socket socket) { if (!ec) { std::make_shared<Session>(std::move(socket))->start(); }
do_accept(); }); }
tcp::acceptor acceptor_; };
int main() { try { boost::asio::io_context io_context; Server s(io_context, 12345); io_context.run(); } catch (std::exception& e) { std::cerr << "Exception: " << e.what() << std::endl; }
return 0; }
|
回调函数的数量
在这个例子中,你可以看到:
do_read()
:读取数据时使用了一个回调函数。
do_write()
:写入数据时使用了一个回调函数。
do_accept()
:接受连接时使用了一个回调函数。
使用协程的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| #include <boost/asio.hpp> #include <boost/asio/awaitable.hpp> #include <boost/asio/co_spawn.hpp> #include <boost/asio/detached.hpp> #include <iostream>
using boost::asio::awaitable; using boost::asio::co_spawn; using boost::asio::detached; using boost::asio::ip::tcp; using boost::asio::use_awaitable;
awaitable<void> session(tcp::socket socket) { try { char data[1024];
while (true) { std::size_t n = co_await socket.async_read_some(boost::asio::buffer(data), use_awaitable);
co_await boost::asio::async_write(socket, boost::asio::buffer(data, n), use_awaitable); } } catch (const std::exception& e) { std::cerr << "Session exception: " << e.what() << std::endl; } }
awaitable<void> listener(boost::asio::io_context& io_context, short port) { tcp::acceptor acceptor(io_context, tcp::endpoint(tcp::v4(), port));
while (true) { tcp::socket socket = co_await acceptor.async_accept(use_awaitable);
co_spawn(io_context, session(std::move(socket)), detached); } }
int main() { try { boost::asio::io_context io_context;
co_spawn(io_context, listener(io_context, 12345), detached);
io_context.run(); } catch (std::exception& e) { std::cerr << "Main exception: " << e.what() << std::endl; }
return 0; }
|
回调函数的数量
在这个版本中,你可以看到:
session()
:协程函数内使用 co_await
关键字等待异步操作完成,代码看起来像同步代码,但实际上在每个 co_await
点都让出了执行权。
listener()
:同样通过 co_await
来等待异步接受连接。
总共只有 1 个回调函数,那就是 co_spawn
的回调,但这个回调是为了启动协程并不直接参与异步 I/O 操作。
扩展
coost
https://coostdocs.github.io/cn/about/co/
coost v3.0.0 (微型boost库)
coost-A tiny boost library in C++11github.com/idealvin/coost
coost 是一个兼具性能与易用性的跨平台 C++ 基础库,原名为 co,后改为 cocoyaxi,前者过短,后者过长,取中庸之道,又改为 coost。
为什么叫 coost 呢?以前有朋友称之为小型 boost 库,比 boost 小一点,那就叫 coost 好了。它有多小呢?在 linux 与 mac 上编译出来的静态库仅 1M 左右大小。虽然小,却提供了足够强大的功能:
- 命令行参数与配置文件解析库(flag)
- 高性能日志库(log)
- 单元测试框架(unitest)
- go-style 协程
- 基于协程的网络编程框架
- 高效 JSON 库
- 基于 JSON 的 RPC 框架
- 面向玄学编程
- 原子操作(atomic)
- 随机数生成器(random)
- 高效字符流(fastream)
- 高效字符串(fastring)
- 字符串操作(str)
- 时间库(time)
- 线程库(thread)
- 定时任务调度器
- 高性能内存分配器
- LruMap
- hash 库
- path 库
- 文件系统操作(fs)
- 系统操作(os)
本次发布的版本,直接从 v2.0.3 跳到了 v3.0.0,跨度非常之大,它在性能、易用性、稳定性等方面均有全面的提升。
协程
协程
coost 实现了类似 golang 中 goroutine 的协程机制,它有如下特性:
- 支持多线程调度,默认线程数为系统 CPU 核数。
- 共享栈,同一线程中的协程共用若干个栈(大小默认为 1MB),内存占用低。
- 各协程之间为平级关系,可以在任何地方(包括在协程中)创建新的协程。
- 支持协程同步事件、协程锁、channel、waitgroup 等协程同步机制。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| #include "co/co.h"
int main(int argc, char** argv) { flag::parse(argc, argv);
co::wait_group wg; wg.add(2);
go([wg](){ LOG << "hello world"; wg.done(); });
go([wg](){ LOG << "hello again"; wg.done(); });
wg.wait(); return 0; }
|
上面的代码中,go()
创建的协程会分配到不同的调度线程中。用户也可以自行控制协程的调度:
1 2 3 4 5 6 7 8 9
| auto s = co::next_sched(); s->go(f1); s->go(f2);
for (auto& s : co::scheds()) { s->go(f); }
|
网络编程
coost 提供了一套基于协程的网络编程框架:
- 协程化的 socket API,形式上与系统 socket API 类似,熟悉 socket 编程的用户,可以轻松的用同步的方式写出高性能的网络程序。
- TCP、HTTP、RPC 等高层网络编程组件,兼容 IPv6,同时支持 SSL,用起来比 socket API 更方便。
RPC server
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| #include "co/co.h" #include "co/rpc.h" #include "co/time.h"
int main(int argc, char** argv) { flag::parse(argc, argv);
rpc::Server() .add_service(new xx::HelloWorldImpl) .start("127.0.0.1", 7788, "/xx");
for (;;) sleep::sec(80000); return 0; }
|
rpc::Server 同时支持 HTTP 协议,可以用 HTTP 的 POST 方法调用 RPC 服务:
1
| curl http://127.0.0.1:7788/xx --request POST --data '{"api":"ping"}'
|
[来自剪藏] coost v3.0.0 (微型boost库)发布 - 知乎
默认知识库