落魄谷中寒风吹,春秋蝉鸣少年归。【2024.7.7】

前言

网络编程是一个比较难码的模块,也是相对其他编程进阶的模块,特别是各种回调函数的理解调用以及其中会遇到各式各样难以解决的Bug。

借着学习陈硕大神muduo库的机会阅读了这本经典的网络编程教材《Linux多线程服务端编程》,这本也是这学期一直想学习的一本书,PS:就像研一下一直想学习Mit6.824一样。

image-20240502214205866

开始整体部分!

muduo网络库是Multi-Reactor架构,具体可以分为3个部分:

  • 网络相关模块:如Socket、InetAddress、TcpConnection、Acceptor、TcpServer等
  • 事件循环相关模块:如EventLoop、Channel、Poller、EPollPoller等
  • 线程相关模块:如Thread、EventLoopThread、EventLoopThreadPool等
  • 基础模块:如用户态缓冲区Buffer、时间戳Timestamp、日志类Logger等

本文参考自:

原理篇

Multi-Reactor

Muduo库是基于Reactor模式实现的TCP网络编程库。

Multi-Reactor模型如下所示:

image-20240502220126199

事件循环模块

muduo库基于三个关键组件来实现一个reactor,这个reactor可以持续地监听一组文件描述符(fd),并根据每个fd上发生的事件来调用相应的处理函数。这三个核心组件包括Channel类、Poller/EpollPoller类以及EventLoop类。

可以先看下整体的架构,先有个整体的了解。

image-20240502224744115

Channel类

概述

在TCP网络编程中,要想使用IO多路复用来监听某个文件描述符(fd),需要通过epoll_ctl将这个fd及其关心的事件注册到IO多路复用模块(也可以称之为事件监听器)上。当事件监听器检测到该fd发生了某个事件时,它会返回一个包含发生事件的fd集合,以及每个fd都发生了什么事件。

Channel类就是这样一个封装,它包含了一个文件描述符(fd),这个fd关心的事件,以及事件监听器实际检测到的事件。除此之外,Channel类还提供了一些方法,允许你设置这个fd的关心的事件,将这个fd及其关心的事件注册到事件监听器中或者从事件监听器中移除,以及保存这个fd的每种事件所对应的处理函数。

成员变量

  • int fd_:Channel对象关心的文件描述符。
  • int events_:fd感兴趣的事件类型集合。
  • int revents_:代表事件监听器实际监听到该fd发生的事件类型集合。当事件监听器监听到一个fd发生了什么事件,通过Channel::set_revents()函数来设置revents值。
  • EventLoop* loop:表示当前的Channel是在哪个loop中。
  • read_callback_ 、write_callback_、close_callback_、error_callback_:这些是std::function类型,代表着这个Channel为这个文件描述符保存的各事件类型发生时的处理函数。比如这个fd发生了可读事件,需要执行可读事件处理函数,Channel类都替你保管好了这些可调用函数。

成员函数

成员函数的设计是根据成员变量和这个类的功能设计,在设计之前我们自己也可以先想想作者为什么这么设计。

  • 向Channel对象注册各类事件的处理函数。

    1
    2
    3
    4
    void setReadCallback(ReadEventCallback cb) {read_callback_ = std::move(cb);}
    void setWriteCallback(Eventcallback cb) {write_callback_ = std::move(cb);}
    void setCloseCallback(EventCallback cb) {close_callback_ = std::move(cb);}
    void setErrorCallback(EventCallback cb) {error_callback_ = std::move(cb);}

    简单来说,就是事件监听器监听到描述符发生的事件的时候,相应的处理函数来处理。处理函数保存在Channel类中,这样调用也比较方便。第二点所示。

  • 根据poller通知的channel发生的具体事件, 由channel负责调用具体的回调操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    void Channel::handleEventWithGuard(Timestamp receiveTime)
    {
    LOG_INFO("channel handleEvent revents:%d\n", revents_);
    if ((revents_ & EPOLLHUP) && !(revents_ & EPOLLIN))
    {
    if (closeCallback_)
    {
    closeCallback_();
    }
    }
    ...
    }
    /*
    EPOLLHUP(挂起事件):这个事件表示连接被挂起,通常是指对方关闭了连接或者连接中断。这是一个输出事件,表明文件描述符上的挂起状态。
    EPOLLIN(输入事件):这个事件表示文件描述符上有数据可以读取,通常用于表示有数据到达,可以读取。

    上述代码的含义:如果revents_中包含EPOLLHUP事件,并且不包含EPOLLIN事件。那么,可能的场景是对方已经关闭连接(挂起),并且当前没有数据可读。根据这个条件,你可以决定如何处理这种情况,例如关闭文件描述符,清理资源等。
    */

    当调用epoll_wait()后,可以得知事件监听器上哪些Channel(文件描述符)发生了哪些事件,事件发生后自然就要调用这些Channel对应的处理函数。

  • 将这个文件描述符实际发生的事件封装进这个Channel中

    1
    int set_revents(int revt) {revents_ = revt;}

    这个函数是在EPollPoller::fillActiveChannels调用,即获取活跃的连接的时候设置给Channel。

  • 将Channel中的文件描述符及其感兴趣事件注册事件监听器上或从事件监听器上移除

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    // 设置完成后会跟新epoll中的fd为当前需要监听[设置]的事件
    void enableReading() {events_ |= kReadEvent; upadte();}
    void disableReading() {events_ &= ~kReadEvent; update();}
    void enableWriting() {events_ |= kWriteEvent; update();}
    void disableWriting() {events_ &= ~kWriteEvent; update();}
    void disableAll() {events_ |= kNonEvent; update();}
    /*
    void Channel::update()
    {
    // 通过channel所属的EventLoop,调用poller的相应方法,注册fd的events事件
    loop_->updateChannel(this);
    }
    */

    这个update的调用过程是Channel=>EventLoop=>Poller。update本质上就是调用了epoll_ctl()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void Channel::handleEvent(Timestamp receiveTime)
{
if (tied_)// 可用作监视者,在muduo网络库中,channel类中在处理方法之前会先检测TCPConnection是否存在,存在才会调用
{
std::shared_ptr<void> guard = tie_.lock();
if (guard)
{
handleEventWithGuard(receiveTime);
}
}
else // 专门为Accept类设计的!
{
handleEventWithGuard(receiveTime);
}
}

细节:

1.noncopyable类:如果想要禁止类的拷贝行为只需要把相应的函数设为 = delete即可。具体学习noncopyable类的写法。【我觉得是因为资源管理和线程安全这么设计的】

问个问题:muduo网络在设计的时候为什么使用noncopyable呢

  1. 资源管理
  • 网络编程涉及到许多资源的管理,比如文件描述符(fd)、内存缓冲区、线程等。通常,这些资源需要唯一的所有者来管理其生命周期。
  • 如果允许拷贝对象,可能会导致多个对象同时管理同一个文件描述符或同一个网络连接。这种情况会引发资源的双重释放或未定义行为,导致程序崩溃或资源泄漏。
  1. 防止对象状态的不一致
  • 网络库中的许多对象包含了复杂的状态和指针,比如 TcpConnection 持有一个指向 Channel 的指针,Channel 又关联到具体的文件描述符。
  • 如果对象被复制,新的副本可能与原对象共享某些状态,这会导致对象状态的不一致,进而引发难以调试的错误。例如,两个 TcpConnection 对象操作同一个 Channel,但预期它们应当是独立的。
  1. 保证对象的唯一性
  • 网络连接、事件循环(EventLoop)、Channel 等对象在逻辑上应当是唯一的。例如,一个 TcpConnection 对象对应一个特定的网络连接,一个 EventLoop 对象管理一个特定的线程。
  • 通过禁止拷贝操作,可以确保这些对象在系统中的唯一性,从而避免逻辑上的错误。
  1. 避免潜在的性能问题
  • 拷贝大型对象或深拷贝复杂的对象结构会消耗大量的系统资源和时间,尤其是在高性能网络库中,拷贝操作可能会成为性能瓶颈。
  • 通过禁止拷贝,Muduo 避免了不必要的性能损耗。
  1. 线程安全
  • 许多 Muduo 对象是在线程间共享的,特别是在多线程网络服务器中。如果对象可以被复制,则需要考虑线程安全的问题,这会增加系统的复杂性。
  • 禁止拷贝可以简化线程间的同步逻辑,避免由多个线程操作同一对象副本引发的竞争条件。

2.常见标识位

  • EPOLLIN | EPOLLPRI
    • 含义: 表示文件描述符上有数据可读。当有新的数据可读时,epoll 会通知该事件。
    • 含义: 表示文件描述符上有紧急数据可读(带外数据)。通常用于带外数据(out-of-band data),这在一些协议中可能是紧急数据。
  • EPOLLOUT
    • 含义: 表示文件描述符上可以进行写操作。当文件描述符上的输出缓冲区有空间可以写入数据时,epoll 会通知该事件。

本质就是十六进制数罢了!

EpollPoller类

概述

负责监听文件描述符事件是否触发以及返回发生事件的文件描述符以及具体事件的模块就是Poller。一个Poller对象对应一个事件监听器。其中,1个reactor中有1个Poller,也就是说有多少reactor就有多少Poller。

目前,项目中muduo库只支持epoll。

Poller是个抽象虚类,由EpollPoller和PollPoller继承实现,与监听文件描述符和返回监听结果的具体方法也基本上是在这两个派生类中实现。EpollPoller就是封装了用epoll方法实现的与事件监听有关的各种方法,PollPoller就是封装了poll方法实现的与事件监听有关的各种方法。

成员变量

这里将Poller/EpollPoller成员变量放在一起。

  • epollfd_:用epoll_create方法返回的epoll句柄。
  • channels_:这个变量是std::unordered_map<int, Channel*>类型,负责记录 文件描述符 —> Channel的映射,也帮忙保管所有注册在这个Poller上的Channel。
  • ownerLoop_:表示当前epollfd在哪个loop。

成员函数

  • 获取监听发生事件的描述符的Channel集合

    核心是以下2个函数。

    1
    2
    TimeStamp poll(int timeoutMs, ChannelList *activeChannels)
    void fillActiveChannels(int numEvents, ChannelList *activeChannels)

    具体代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    Timestamp EPollPoller::poll(int timeoutMs, ChannelList *activeChannels)
    {
    // events_每次传入都是初始化过的,也就说里面没有数据,等待epoll_wait获取数据。
    int numEvents = ::epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), timeoutMs);
    fillActiveChannels(numEvents, activeChannels);
    }
    // 填写活跃的连接
    void EPollPoller::fillActiveChannels(int numEvents, ChannelList *activeChannels) const
    {
    for (int i = 0; i < numEvents; ++i)
    {
    Channel *channel = static_cast<Channel *>(events_[i].data.ptr);
    channel->set_revents(events_[i].events);
    activeChannels->push_back(channel); // EventLoop就拿到了它的poller给它返回的所有发生事件的channel列表了
    }
    }

    这里有个问题:fillActiveChannels中遍历numEvents为什么就能得到channel对象?

    这是因为在将EPollPoller::update中(也就是在将文件描述符交给epoll监听的时候,将channel对象也放进去便于传递方便后期使用)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    typedef union epoll_data
    {
    void *ptr;
    int fd;
    uint32_t u32;
    uint64_t u64;
    } epoll_data_t;

    struct epoll_event
    {
    uint32_t events; /* Epoll events */
    epoll_data_t data; /* User data variable */
    } __EPOLL_PACKED;
  • 更新文件描述符监听的事件

    1
    2
    void updateChannel(Channel *channel) override;
    void removeChannel(Channel *channel) override;
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    // 关系:channel update remove => EventLoop updateChannel removeChannel => Poller updateChannel removeChannel
    // update
    void EPollPoller::updateChannel(Channel *channel)
    {
    const int index = channel->index();

    if (index == kNew || index == kDeleted)
    {
    if (index == kNew)
    {
    int fd = channel->fd();
    channels_[fd] = channel;
    }
    channel->set_index(kAdded);
    update(EPOLL_CTL_ADD, channel);
    }
    else // channel已经在poller上注册过了
    {
    ...
    }
    }
    // 更新channel通道 epoll_ctl add/mod/del
    void EPollPoller::update(int operation, Channel *channel)
    {
    epoll_event event;
    bzero(&event, sizeof event);

    int fd = channel->fd();

    event.events = channel->events();
    event.data.fd = fd;
    event.data.ptr = channel;

    if (::epoll_ctl(epollfd_, operation, fd, &event) < 0)
    {
    ...
    }
    }

    上述的代码比较清楚了,调用updateChannel其中再调用update。本质还是调用epoll_ctl,只不过层层封装适合框架。

问题:epollfd_(::epoll_create1(EPOLL_CLOEXEC))为什么在前面要加::

在 C++ 中,前面加 :: 的作用是指定使用全局作用域中的符号。对于 epollfd_(::epoll_create1(EPOLL_CLOEXEC)) 这种写法,:: 用于确保调用的是全局命名空间中的 epoll_create1 函数,而不是当前作用域或类作用域中的同名函数或变量。【避免命名冲突,提高代码可读性】

问题:为什么要使用 EPOLL_CLOEXEC

它的含义是指在执行 execve 等执行新程序的系统调用时,自动关闭由 epoll_create1 创建的文件描述符。

1
2
3
4
5
int epollfd = epoll_create1(EPOLL_CLOEXEC);
if (epollfd == -1) {
perror("epoll_create1");
exit(EXIT_FAILURE);
}

在这个示例中,创建了一个 epoll 实例,并且设置了 EPOLL_CLOEXEC 标志。如果当前进程随后调用 execve 执行了一个新程序,这个 epollfd 文件描述符会自动关闭。

安全性: 防止文件描述符无意间被新程序继承,从而避免泄露敏感资源。

EventLoop类

EventLoop类就是将Channel类和EpollPoller类进行统一管理。

EventLoop就是负责实现**“循环”,负责驱动“循环”**的重要模块。

这里循环也就是while循环不断检测是否存在监听事件的发生,然后使用传入的回调函数处理对应的事件。

可以看下下面这张图

image-20240502224744115

  • 开启事件循环

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    void EventLoop::loop()
    {
    looping_ = true;
    quit_ = false;

    while (!quit_)
    {
    activeChannels_.clear();
    // 监听两类fd 一种是client的fd,一种wakeupfd
    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
    for (Channel *channel : activeChannels_)
    {
    // Poller监听哪些channel发生事件了,然后上报给EventLoop,通知channel处理相应的事件
    channel->handleEvent(pollReturnTime_);
    }
    // 执行当前EventLoop事件循环需要处理的回调操作
    /**
    * IO线程 mainLoop accept fd《=channel subloop
    * mainLoop 事先注册一个回调cb(需要subloop来执行) wakeup subloop后,执行下面的方法,执行之前mainloop注册的cb操作
    */
    doPendingFunctors();
    }
    looping_ = false;
    }
    //--------------------执行回调----------------------
    void EventLoop::doPendingFunctors() // 执行回调
    {
    std::vector<Functor> functors;
    callingPendingFunctors_ = true;

    {
    std::unique_lock<std::mutex> lock(mutex_);
    functors.swap(pendingFunctors_);
    }

    for (const Functor &functor : functors)
    {
    functor(); // 执行当前loop需要执行的回调操作
    }

    callingPendingFunctors_ = false;
    }

    其实,刚开始的时候对应为什么还会执行doPendingFunctors有点迷惑?这里给出我的理解:用于loop之间通信使用,比如:mainloop要将客户端交给一个loop处理请求。

    这个doPendingFunctors设计大有讲究!!!

    EventLoop : : doPendingFunctors()不是简单地在临界区内依次调用Functor,而是把回调列表swap()到局部变量functors中,这样的好处有2点:

    • 一方面减小了临界区的长度(意味着不会阻塞其他线程调用queueInLoop())
    • 一方面也避免了死锁(因为Functor可能再调用queueInLoop() )。
      • 由于doPendingFunctors()调用的 Functor可能再调用queueInLoop(cb),这时queueInLoop()就必须wakeup(),否则这些新加的cb就不能被及时调用了。muduo这里没有反复执行doPendingFunctors()直到pendingFunctors_为空,这是有意的,否则IO线程有可能陷人死循环,无法处理IO事件。
  • 子loop唤醒

    接着上面的问题,当主loop要将连接的客户端交给一个subloop子loop监听处理,但是子loop怎么唤醒呢?

    这里muduo库是先创建一个wakeup描述符,专门用于唤醒描述符,加入到epoll中监听。那么对于唤醒子loop的方法只用往wakeup写入即可,当epoll监听到写事件的时候,就会调用wakeup之前设置的事件发生处理函数进行处理(也就将写事件写入的读出来)。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    void EventLoop::handleRead()
    {
    uint64_t one = 1;
    ssize_t n = read(wakeupFd_, &one, sizeof one);
    if (n != sizeof one)
    {
    LOG_ERROR("EventLoop::handleRead() reads %lu bytes instead of 8", n);
    }
    }

    // 用来唤醒loop所在的线程的 向wakeupfd_写一个数据,wakeupChannel就发生读事件,当前loop线程就会被唤醒
    void EventLoop::wakeup()
    {
    uint64_t one = 1;
    ssize_t n = write(wakeupFd_, &one, sizeof one);
    if (n != sizeof one)
    {
    LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8 \n", n);
    }
    }
  • 当多线程的情况如何正确调用函数呢?

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    void EventLoop::quit()
    {
    quit_ = true;
    // 如果是在其它线程中,调用的quit 在一个subloop(woker)中,调用了mainLoop(IO)的quit
    if (!isInLoopThread())
    {
    wakeup();
    }
    }
    // 在当前loop中执行cb
    void EventLoop::runInLoop(Functor cb)
    {
    if (isInLoopThread()) // 在当前的loop线程中,执行cb
    {
    cb();
    }
    else // 在非当前loop线程中执行cb , 就需要唤醒loop所在线程,执行cb
    {
    queueInLoop(cb);
    }
    }

简单来说,1.loop在自己的线程中调用quit 2.在非loop的线程中,调用loop的quit。

  • 小注意点·

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    // 在当前loop中执行cb
    void EventLoop::runInLoop(Functor cb)
    {
    if (isInLoopThread()) // 在当前的loop线程中,执行cb
    {
    cb();
    }
    else // 在非当前loop线程中执行cb , 就需要唤醒loop所在线程,执行cb
    {
    queueInLoop(cb);
    }
    }
    // 把cb放入队列中,唤醒loop所在的线程,执行cb
    void EventLoop::queueInLoop(Functor cb)
    {
    {
    std::unique_lock<std::mutex> lock(mutex_);
    pendingFunctors_.emplace_back(cb);
    }

    // 唤醒相应的,需要执行上面回调操作的loop的线程了
    // || callingPendingFunctors_的意思是:当前loop正在执行回调,但是loop又有了新的回调
    if (!isInLoopThread() || callingPendingFunctors_)
    {
    wakeup(); // 唤醒loop所在线程
    }
    }

    在当前的线程直接执行回调函数,但是如果不在的就需要唤醒下。

    这里2个注意点:

    1.怎么判断线程是否是当前线程?

    isInLoopThread() const { return threadId_ == CurrentThread::tid(); }

    muduo库设计中使用了这条语句判断。我们只要使用这个函数进行判断。【这个深入的话有点抽象】

    2.(!isInLoopThread() || callingPendingFunctors_)

    如果不在当前线程,唤醒比较好理解。还有一种情况需要唤醒是正在执行回调函数,如果不唤醒,那么这个回调函数就不知道要什么时候才能执行,也可能不执行。因此需要唤醒一下。

  • 其他

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    // EventLoop的方法 =》 Poller的方法
    void EventLoop::updateChannel(Channel *channel)
    {
    poller_->updateChannel(channel);
    }

    void EventLoop::removeChannel(Channel *channel)
    {
    poller_->removeChannel(channel);
    }

    bool EventLoop::hasChannel(Channel *channel)
    {
    return poller_->hasChannel(channel);
    }

    这个点在Channel调用过程中已经说明。

线程模块

这部分体现了One loop one thread的思想,也是muduo的核心!

Thread类

Thread类主要是创建线程,然后执行相应的回调函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Thread : noncopyable
{
public:
using ThreadFunc = std::function<void()>;

explicit Thread(ThreadFunc, const std::string &name = std::string());
~Thread();

void start();
void join();

bool started() const { return started_; }
pid_t tid() const { return tid_; }
const std::string &name() const { return name_; }

static int numCreated() { return numCreated_; }

private:
void setDefaultName();

bool started_;
bool joined_;
std::shared_ptr<std::thread> thread_;
pid_t tid_;
ThreadFunc func_;
std::string name_;
static std::atomic_int numCreated_;
};

EventLoopThread类

这个类体现了One loop one thread的思想,就是一个线程负责一个loop循环。

这个类主要的工作是将eventloop和thread封装,正好一个loop一个线程。

成员函数

  • EventLoop *loop_:对应的loop。
  • _ bool exiting_:是否退出。
  • Thread thread_:线程类。
  • std::mutex mutex_:互斥锁。
  • std::condition_variable cond:条件变量。
  • ThreadInitCallback callback_:线程初始化回调函数。

成员函数

  • 开启loop
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
EventLoop *EventLoopThread::startLoop(){
thread_.start(); // 启动底层的新线程

EventLoop *loop = nullptr;
{
std::unique_lock<std::mutex> lock(mutex_);
while (loop_ == nullptr)
{
cond_.wait(lock);
}
loop = loop_;
}
return loop;
}

// 下面这个方法,实在单独的新线程里面运行的
void EventLoopThread::threadFunc()
{ // 创建一个独立的eventloop,和上面的线程是一一对应的,one loop per thread
EventLoop loop;

if (callback_)
{
callback_(&loop);
}
{
std::unique_lock<std::mutex> lock(mutex_);
loop_ = &loop;
cond_.notify_one();
}

loop.loop(); // EventLoop loop => Poller.poll
std::unique_lock<std::mutex> lock(mutex_);
loop_ = nullptr;
}

这里就是创建loop,然后线程中不断的loop循环。

EventLoopPoolThread类

eventloop池,从最开始的架构图就可以知道,muduo有主Loop和若干个子loop。这些loop就是这个类产生的。

成员变量

  • EventLoop *baseLoop_:这是主EventLoop,通常在主线程中运行。
  • std::string name: EventLoopThreadPool的名称。
  • bool started _ :表示EventLoopThreadPool是否已经启动。
  • int numThreads _ : 线程池中线程的数量。
  • int next _ : 下一个要被处理的线程的索引。
  • std::vector< std::unique_ptr< EventLoopThread> > threads_: 存储线程池中所有线程的容器,使用unique_ptr管理每个线程的生命周期。
  • _std::vector<EventLoop *> loops: 存储所有EventLoop的指针的容器,用于分发任务。

成员函数

  • 启动线程池,并且在每个线程中创建并启动一个EventLoop。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    void EventLoopThreadPool::start(const ThreadInitCallback &cb)
    {
    started_ = true;

    for (int i = 0; i < numThreads_; ++i)
    {
    char buf[name_.size() + 32];
    snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);
    EventLoopThread *t = new EventLoopThread(cb, buf);
    threads_.push_back(std::unique_ptr<EventLoopThread>(t));
    loops_.push_back(t->startLoop()); // 底层创建线程,绑定一个新的EventLoop,并返回该loop的地址
    }

    // 整个服务端只有一个线程,运行着baseloop
    if (numThreads_ == 0 && cb)
    {
    cb(baseLoop_);
    }
    }
  • 主Loop_默认以轮询的方式分配channel给子loop,通过轮询挑选出一个子loop

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    EventLoop* EventLoopThreadPool::getNextLoop()
    {
    EventLoop *loop = baseLoop_;

    if (!loops_.empty()) // 通过轮询获取下一个处理事件的loop
    {
    loop = loops_[next_];
    ++next_;
    if (next_ >= loops_.size())
    {
    next_ = 0;
    }
    }

    return loop;
    }

网络模块

Acceptor类

概述

Accetpor封装了服务器专门用于监听是否有客户端连接的套接字fd以及相关处理方法。主要是对其他类的方法调用进行封装。

成员变量

  • acceptSocket_:这个是服务器监听套接字的文件描述符。
  • acceptChannel_:这是个Channel类,把acceptSocket _ 及其感兴趣事件和事件对应的处理函数都封装进去。
  • EventLoop *loop:监听套接字的fd由哪个EventLoop负责循环监听以及处理相应事件,其实这个EventLoop就是main EventLoop。
  • newConnectionCallback_: TcpServer构造函数中将TcpServer::newConnection( )函数注册给了这个成员变量。这个TcpServer::newConnection函数的功能是公平的选择一个subEventLoop,并把已经接受的连接分发给这个subEventLoop。

成员函数

  • 开启链接监听

    listen( ):该函数底层调用了linux的函数listen( ),开启对acceptSocket _ 的监听同时将acceptChannel及其感兴趣事件(可读事件)注册到main EventLoop 的事件监听器上。换言之就是让main EventLoop事件监听器去监听acceptSocket_。

    1
    2
    3
    4
    5
    6
    void Acceptor::listen()
    {
    listenning_ = true;
    acceptSocket_.listen(); // listen
    acceptChannel_.enableReading(); // acceptChannel_ => Poller
    }
  • 处理新用户连接

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport)
    : loop_(loop)
    , acceptSocket_(createNonblocking()) // socket
    , acceptChannel_(loop, acceptSocket_.fd())
    , listenning_(false)
    {
    acceptSocket_.setReuseAddr(true);
    acceptSocket_.setReusePort(true);
    acceptSocket_.bindAddress(listenAddr); // bind
    // TcpServer::start() Acceptor.listen 有新用户的连接,要执行一个回调(connfd=》channel=》subloop)
    // baseLoop => acceptChannel_(listenfd) =>
    acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
    }

    // listenfd有事件发生了,就是有新用户连接了
    void Acceptor::handleRead()
    {
    InetAddress peerAddr;
    int connfd = acceptSocket_.accept(&peerAddr);
    if (connfd >= 0)
    {
    if (newConnectionCallback_)
    {
    newConnectionCallback_(connfd, peerAddr); // 轮询找到subLoop,唤醒,分发当前的新客户端的Channel
    }
    else
    {
    ::close(connfd);
    }
    }
    else
    {
    ...
    }
    }

    handleRead( ):这是一个私有成员方法,这个方法是要注册到acceptChannel_ 上的,在构造accept对象的时候设置给acceptchannel对象, 同时handleRead( )方法内部还调用了成员变量newConnectionCallback_保存的函数。当main EventLoop监听到acceptChannel _ 上发生了可读事件时(新用户连接事件),就是调用这个handleRead( )方法。

Buffer类

为什么要有缓冲区的设计

  1. TcpConnection必须有output buffer:使程序在write()操作上不会产生阻塞,当write()操作后,操作系统一次性没有接受完时,网络库把剩余数据则放入outputBuffer中,然后注册POLLOUT事件,一旦socket变得可写,则立刻调用write()进行写入数据。——应用层buffer到操作系统buffer
  2. TcpConnection必须有input buffer:当发送方send数据后,接收方收到数据不一定是整个的数据,网络库在处理socket可读事件的时候,必须一次性把socket里的数据读完,否则会反复触发POLLIN事件,造成busy-loop。所以网路库为了应对数据不完整的情况,收到的数据先放到inputBuffer里。——操作系统buffer到应用层buffer

概述

Buffer类其实是封装了一个用户缓冲区,以及向这个缓冲区写数据读数据等一系列控制方法。

算法

这张图已经可以很好的说明这个类的具体功能。

可以和CS144 TCP协议中的buffer进行比对下。

image-20240502231632156

从上面的图示我们知道:

如果写入空间不够,Buffer 内部会有两个方案来应付:

  1. 将数据往前移动:因为每次读取数据,readIndex索引都会往后移动,从而导致前面预留的空间逐渐增大。我们需要将后面的元素重新移动到前面。
  2. 如果第一种方案的空间仍然不够,那么我们就直接对 buffer_ 进行扩容(buffer_.resize(len))操作。

成员方法

  • ssize_t Buffer::readFd(int fd, int* saveErrno);:客户端发来数据,readFd从该TCP接收缓冲区中将数据读出来并放到Buffer中。
  • ssize_t Buffer::writeFd(int fd, int* saveErrno);:服务端要向这条TCP连接发送数据,通过该方法将Buffer中的数据拷贝到TCP发送缓冲区中。

巧妙的设计:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 从fd上读取数据 Poller工作在LT模式
* Buffer缓冲区是有大小的! 但是从fd上读数据的时候,却不知道tcp数据最终的大小
*/
ssize_t Buffer::readFd(int fd, int* saveErrno)
{
char extrabuf[65536] = {0}; // 栈上的内存空间 64K

struct iovec vec[2];

const size_t writable = writableBytes(); // 这是Buffer底层缓冲区剩余的可写空间大小
vec[0].iov_base = begin() + writerIndex_;
vec[0].iov_len = writable;

vec[1].iov_base = extrabuf;
vec[1].iov_len = sizeof extrabuf;

const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1;
const ssize_t n = ::readv(fd, vec, iovcnt);
if (n < 0)
{
*saveErrno = errno;
}
else if (n <= writable) // Buffer的可写缓冲区已经够存储读出来的数据了
{
writerIndex_ += n;
}
else // extrabuf里面也写入了数据
{
writerIndex_ = buffer_.size();
append(extrabuf, n - writable); // writerIndex_开始写 n - writable大小的数据
}

return n;
}

解释:可以让用户一次性把所有TCP接收缓冲区的所有数据全部都读出来并放到用户自定义的缓冲区Buffer中。用户自定义缓冲区Buffer是有大小限制的,我们一开始不知道TCP接收缓冲区中的数据量有多少,如果一次性读出来会不会导致Buffer装不下而溢出。所以在readFd( )函数中会在栈上创建一个临时空间extrabuf,然后使用readv的分散读特性,将TCP缓冲区中的数据先拷贝到Buffer中,如果Buffer容量不够,就把剩余的数据都拷贝到extrabuf中,然后再调整Buffer的容量(动态扩容),再把extrabuf的数据拷贝到Buffer中。当这个函数结束后,extrabuf也会被释放。另外extrabuf是在栈上开辟的空间,速度比在堆上开辟还要快。

补充:iovec结构体

struct iovec 结构体定义了一个向量元素,通常这个 iovec 结构体用于一个多元素的数组,对于每一个元素,iovec 结构体的字段 iov_base 指向一个缓冲区,这个缓冲区存放的是网络接收的数据(read),或者网络将要发送的数据(write)。iovec 结构体的字段 iov_len 存放的是接收数据的最大长度(read),或者实际写入的数据长度(write)。

为什么这么设计?是因为内存管理,不用一下开辟大量的内存。

结合TCPConnection理解

TcpConnection 拥有 inputBuffer 和 outputBuffer 两个缓冲区成员。

  1. 当服务端接收客户端数据,EventLoop 返回活跃的 Channel,并调用对应的读事件处理函数,即 TcpConnection 调用 handleRead 方法从相应的 fd 中读取数据到 inputBuffer 中。在 Buffer 内部 inputBuffer 中的 writeIndex 向后移动。
  2. 当服务端向客户端发送数据,TcpConnection 调用 handleWrite 方法将 outputBuffer 的数据写入到 TCP 发送缓冲区。outputBuffer 内部调用 retrieve 方法移动 readIndex 索引。

从客户端sock读取数据到inputBuffer

调用inputBuffer_.readFd(channel_->fd(), &savedErrno);将对端fd数据读取到inputBuffer中。

  1. 如果读取成功,调用「可读事件发生回调函数」
  2. 如果读取数据长度为0,说明对端关闭连接。调用handleCose()
  3. 出错,则保存errno,调用handleError()

将ouputBuffer数据输出到socket中

  1. 要在channel_确实关注写事件的前提下正常发送数据:因为一般有一个send函数发送数据,如果TCP接收缓冲区不够接收ouputBuffer的数据,就需要多次写入。需要重新注册写事件,因此是在注册了写事件的情况下调用的handleWrite
  2. channel->fd()发送outputBuffer中的可读取数据。成功发送数据则移动readIndex,并且如果一次性成功写完数据,就不再让此channel关注写事件了,并调用写事件完成回调函数没写完则继续关注!

TcpConnection/TcpServer类

这个类是集大成的类,需要清楚的知道具体的回调函数的调用。

TCP网络编程的本质是处理下面这几个事件:

  • 连接的建立。
  • 连接的断开。(包括主动断开和被动断开)
  • 消息到达,客户端连接文件描述符可读。
  • 消息发送,向客户端连接文件描述符写数据。

先来看看整体的图示:

其他

调用LOG_INFO形式 类似宏调用 每条语句都会产生一个临时对象 也就是Logger 每条语句的结束 也就是临时对象的析构 就会把定长的buffer传进最后的处理函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <iostream>
#include <sstream>

// 定义一个Logger类
class Logger {
public:
Logger() : buffer_() {}

~Logger() {
flush();
}

template<typename T>
Logger& operator<<(const T& value) {
buffer_ << value;
return *this;
}

private:
std::ostringstream buffer_;

void flush() {
std::cout << "Logging: " << buffer_.str() << std::endl;
// 这里可以添加更多的处理逻辑,比如将日志写入文件或发送到服务器
}
};

// 定义一个宏来简化日志记录
#define LOG_INFO Logger()

int main() {
LOG_INFO << "Hello, " << "World!";
LOG_INFO << "This is a test log.";
return 0;
}
使用示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#include <mymuduo/TcpServer.h>
#include <mymuduo/Logger.h>

#include <string>
#include <functional>

class EchoServer
{
public:
EchoServer(EventLoop *loop,
const InetAddress &addr,
const std::string &name)
: server_(loop, addr, name)
, loop_(loop)
{
// 注册回调函数
server_.setConnectionCallback(
std::bind(&EchoServer::onConnection, this, std::placeholders::_1)
);

server_.setMessageCallback(
std::bind(&EchoServer::onMessage, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)
);

// 设置合适的loop线程数量 loopthread
server_.setThreadNum(3);
}
void start()
{
server_.start();
}
private:
// 连接建立或者断开的回调
void onConnection(const TcpConnectionPtr &conn)
{
if (conn->connected())
{
LOG_INFO("Connection UP : %s", conn->peerAddress().toIpPort().c_str());
}
else
{
LOG_INFO("Connection DOWN : %s", conn->peerAddress().toIpPort().c_str());
}
}

// 可读写事件回调
void onMessage(const TcpConnectionPtr &conn,
Buffer *buf,
Timestamp time)
{
std::string msg = buf->retrieveAllAsString();
conn->send(msg);
conn->shutdown(); // 写端 EPOLLHUP =》 closeCallback_
}

EventLoop *loop_;
TcpServer server_;
};

int main()
{
EventLoop loop;
InetAddress addr(8000);
EchoServer server(&loop, addr, "EchoServer-01"); // Acceptor non-blocking listenfd create bind
server.start(); // listen loopthread listenfd => acceptChannel => mainLoop =>
loop.loop(); // 启动mainLoop的底层Poller

return 0;
}

Note:using TcpConnectionPtr = std::shared_ptr< TcpConnection > ;

连接建立

image-20240503105422549

大体的整个流程如上图所示:

  • TcpServer::TcpServer()
    当我们创建一个TcpServer对象,即执行代码TcpServer server(&loop, listenAddr);调用了TcpServer的构造函数,TcpServer构造函数最主要的就是类的内部实例化了一个Acceptor对象,并往这个Acceptor对象注册了一个回调函数TcpServer::newConnection()。

  • Acceptor::Acceptor()
    当我们在TcpServer构造函数实例化Acceptor对象时,Acceptor的构造函数中实例化了一个Channel对象,即acceptChannel _ ,该Channel对象封装了服务器监听套接字文件描述符(尚未注册到main EventLoop的事件监听器上)。接着Acceptor构造函数将Acceptor::handleRead()方法注册进acceptChannel_ 中,这也意味着,日后如果事件监听器监听到acceptChannel_发生可读事件,将会调用Acceptor::handleRead()函数。

    至此,TcpServer对象创建完毕,用户调用TcpServer::start()方法,开启TcpServer。主要就是调用Acceptor::listen()函数(底层是调用了linux的函数listen())监听服务器套接字,以及将acceptChannel_注册到main EventLoop的事件监听器上监听它的可读事件(新用户连接事件)接着用户调用loop.loop(),即调用了EventLoop::loop()函数,该函数就会循环的获取事件监听器的监听结果,并且根据监听结果调用注册在事件监听器上的Channel对象的事件处理函数。

  • Acceptor::handleRead()
    当程序如果执行到了这个函数里面,说明acceptChannel_ 发生可读事件,程序处理新客户连接请求。该函数首先调用了Linux的函数accept()接受新客户连接。接着调用了TcpServer::newConnection()函数,这个函数是在步骤1中注册给Acceptor并由成员变量newConnectionCallback_保存。

  • TcpServer::newConnection()
    该函数的主要功能就是将建立好的连接进行封装(封装成TcpConnection对象),并使用选择算法公平的选择一个sub EventLoop,并调用TcpConnection::connectEstablished()将TcpConnection::channel_注册到刚刚选择的sub EventLoop上。

消息处理

SubEventLoop中的EventLoop::loop()函数内部会循环的执行上图中的步骤1和步骤2。步骤1就是调用Poller::poll()方法获取事件监听结果,这个事件监听结果是一个Channel集合,每一个Channel封装着 [一个fd] 及 [fd感兴趣的事件] 和 [事件监听器监听到该fd实际发生的事件]。步骤2就是调用每一个Channel的Channel::HandlerEvent方法。该方法会根据每一个Channel的感兴趣事件以及实际发生的事件调用提前注册在Channel内的对应的事件处理函数(readCallback_、writeCallback_、closeCallback_、errorCallback _ )。

image-20240503105957775

消息读取

readCallback_保存的函数其实是TcpConnection::handleRead( ),消息读取的处理逻辑也就是由这个函数提供的,我们稍微剖析一下这个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if(n > 0) //从fd读到了数据,并且放在了inputBuffer_上
{
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if(n == 0)
handleClose();
else
{
errno = savedErrno;
LOG_ERROR("TcpConnection::handleRead");
handleError();
}
}

TcpConnection::handleRead( )函数首先调用Buffer_.readFd(channel_->fd(), &saveErrno),该函数底层调用Linux的函数readv( ),将Tcp接收缓冲区数据拷贝到用户定义的缓冲区中(inputBuffer_)。如果在读取拷贝的过程中发生了什么错误,这个错误信息就会保存在savedErrno中。

  • 当readFd( )返回值大于0,说明从接收缓冲区中读取到了数据,那么会接着调用messageCallback_中保存的用户自定义的读取消息后的处理函数。
  • readFd( )返回值等于0,说明客户端连接关闭,这时候应该调用TcpConnection::handleClose( )来处理连接关闭事件
  • readFd( )返回值等于-1,说明发生了错误,调用TcpConnection::handleError( )来处理savedErrno的错误事件。Moduo库只支持LT模式,所以读事件不会出现EAGAIN的错误,所以一旦出现错误,说明肯定是比较不好的非正常错误了。而EAGAIN错误只不过是非阻塞IO调用时的一种常见错误而已。

消息发送

当用户调用了TcpConnetion::send(buf)函数时,相当于要求muduo库把数据buf发送给该Tcp连接的客户端。此时该TcpConnection注册在事件监听器上的感兴趣事件中是没有可写事件的。TcpConnection::send(buf)函数内部其实是调用了Linux的函数write( )

如果TCP发送缓冲区内不能一次性容纳buf:

  • 这时候write( )函数buf数据尽可能地拷贝到TCP发送缓冲区中,并且将errno设置为EWOULDBLOCK。
  • 剩余未拷贝到TCP发送缓冲区中的buf数据会被存放在TcpConnection::outputBuffer_中。并且向事件监听器上注册该TcpConnection::channel_的可写事件。
  • 事件监听器监听到该Tcp连接可写事件,就会调用TcpConnection::handleWrite( )函数把TcpConnection::outputBuffer_中剩余的数据发送出去。_
    • 在TcpConnection::handleWrite( )函数中,通过调用Buffer::writeFd()函数将outputBuffer_的数据写入到Tcp发送缓冲区,如果Tcp发送缓冲区能容纳全部剩余的未发送数据,那最好不过了。如果Tcp发送缓冲区依旧没法容纳剩余的未发送数据,那就尽可能地将数据拷贝到Tcp发送缓冲区中,继续保持可写事件的监听。
  • 当数据全部拷贝到Tcp发送缓冲区之后,就会调用用户自定义的【写完后的事件处理函数】,并且移除该TcpConnection在事件监听器上的可写事件。(移除可写事件是为了提高效率,不会让epoll_wait() 毫无意义的频繁触发可写事件。因为大多数时候是没有数据需要发送的,频繁触发可写事件但又没有数据可写。)

TcpConnection::send和TcpConnection::handleWrite的区别:

  • sendInLoop 的目的是尝试立即将数据写入内核空间。如果一次就能写完,那么就没问题。但如果写入操作被阻塞,或者只写入了部分数据,那么剩下的数据就会被放到 outputBuffer_ 中,然后通过注册写事件 enableWriting(),在有空间可写的时候,再继续写入剩下的数据。这个函数的调用通常是在应用程序想要发送数据的时候。
  • handleWrite 则是在内核通知应用程序可以写入数据的时候调用的,即当套接字的发送缓冲区有足够的空间可以接收更多数据时。这个函数的目的是尽可能地将 outputBuffer_ 中的数据写入到内核空间,直到写入全部数据或者内核空间无法接收更多数据为止。然后,如果 outputBuffer_ 中的数据全部写入完成,那么就取消对写事件的关注 disableWriting(),因为此时不需要再写数据了。
  • 总的来说,sendInLoop 是应用程序主动发送数据,handleWrite 是在收到内核的可写通知后尝试写入数据。这两个函数共同实现了数据的异步写入:sendInLoop 负责将应用程序的数据放入 outputBuffer_ ,并尝试写入内核;handleWrite 负责在内核准备好接收数据时,将 outputBuffer_ 中的数据写入内核。

在阅读源码的时候,有个疑问?也就是之前在学习buffer设计的时候,buffer既支持写也支持读,而且缓冲区的设计就有readerIndex_ 和writerIndex_,那为什么之后TcpConnnection中又放2个缓冲区buffer呢?

GPT回答:

1
2
3
4
5
在muduo网络库中,接收数据的缓冲区inputBuffer_和发送数据的缓冲区outputBuffer_ 都是独立的,这是因为在网络编程中,接收和发送数据的操作通常是异步的,也就是说,它们可以在不同的时间、以不同的速度发生,因此,它们需要各自的缓冲区来维护它们的状态。

具体来说,inputBuffer_被用来存储从网络中接收到的数据,直到应用程序准备好处理这些数据为止。同样,outputBuffer_ 被用来存储应用程序准备发送到网络中的数据,直到这些数据被实际发送为止。

虽然你提到的一个缓冲区可以处理读写操作,但这只适用于单一的读写操作。在网络编程中,接收和发送数据的操作是分离的,并且经常需要在不同的时间进行。因此,使用两个独立的缓冲区可以使得接收和发送数据的操作更加灵活和高效。
连接断开

连接被动断开

服务端TcpConnection::handleRead()中感知到客户端把连接断开了。
TcpConnection::handleRead( )函数内部调用了Linux的函数readv( ),当readv( )返回0的时候,服务端就知道客户端断开连接了。然后就接着调用TcpConnection::handleClose( )。

image-20240503111746345

  1. 在执行TcpConnection::handle_Close()的时候,该函数还是在SubEventLoop线程中运行的,接着调用closeCallback_(connPtr)回调函数,该函数保存的其实是TcpServer::removeConnection( )函数
  2. TcpServer::removeConnection( )函数调用了remvoveConnectionInLoop( )函数,该函数的运行是在MainEventLoop线程中执行的,这里涉及到线程切换技术。
  3. removeConnectionInLoop( )函数:TcpServer对象中有一个connections_ 成员变量,这是一个unordered_map,负责保存【string --> TcpConnection】的映射,其实就是保存着Tcp连接的名字到TcpConnection对象的映射。因为这个Tcp连接要关闭了,所以也要把这个TcpConnection对象从connections_中删掉。然后再调用TcpConnection::connectDestroyed函数。
    另外为什么removeConnectionInLoop()要在MainEventLoop中运行,因为该函数主要是从TcpServer对象中删除某条数据。而TcpServer对象是属于MainEventLoop的。这也是贯彻了One Loop Per Thread的理念。
  4. TcpConnection::connectDestroyed( )函数的执行是又跳回到了subEventLoop线程中。该函数就是将Tcp连接的监听描述符从事件监听器中移除。另外SubEventLoop中的Poller类对象还保存着这条Tcp连接的channel_,所以调用channel_.remove( )将这个Tcp连接的channel对象从Poller内的数据结构中删除。

连接主动断开

服务器主动关闭导致连接断开。当服务器主动关闭时,调用TcpServer::~TcpServer()析构函数。

这里在提示一下EventLoop::runInLoop()函数的意义,假如你有一个EventLoop对象 loop_,当你调用了loop_->runInLoop(function)函数时,这个function函数的执行会在这个loop_绑定的线程上运行!

所以我们画了下面这幅图,在创建TcpConnection对象时,Acceptor都要将这个对象分发给一个SubEventLoop来管理。这个TcpConnection对象的一切函数执行都要在其管理的SubEventLoop线程中运行。再一次贯彻One Loop Per Thread的设计模式。比如要想彻底删除一个TcpConnection对象,就必须要调用这个对象的connecDestroyed()方法,这个方法执行完后才能释放这个对象的堆内存。每个TcpConnection对象的connectDestroyed()方法都必须在这个TcpConnection对象所属的SubEventLoop绑定的线程中执行。

所有上面的TcpServer::~TcpServer()函数就是干这事儿的,不断循环的让这个TcpConnection对象所属的SubEventLoop线程执行TcpConnection::connectDestroyed()函数,同时在MainEventLoop的TcpServer::~TcpServer()函数中调用item.second.reset()释放保管TcpConnection对象的共享智能指针,以达到释放TcpConnection对象的堆内存空间的目的。
但是这里面其实有一个问题需要解决,TcpConnection::connectDestroyed()函数的执行以及这个TcpConnection对象的堆内存释放操作不在同一个线程中运行,所以要考虑怎么保证一个TcpConnectino对象的堆内存释放操作是在TcpConnection::connectDestroyed()调用完后。
这个析构函数巧妙利用了共享智能指针的特点,当没有共享智能指针指向这个TcpConnection对象时(引用计数为0),这个TcpConnection对象就会被析构删除(堆内存释放)。

1
2
3
4
5
6
7
8
9
10
TcpServer::~TcpServer()
{
//connections类型为std::unordered_map<std::string, TcpConnectionPtr>;
for(auto &item : connections_)
{
TcpConnectionPtr conn(item.second);
item.second.reset();
conn->getLoop()->runInLoop(bind(&TcpConnection::connectDestroyed, conn));
}
}
  • 首先TcpServer::connections_是一个unordered_map<string, TcpConnectionPtr>,其中TcpConnectionPtr的含义是指向TcpConnection的shared_ptr。
  • 在一开始,每一个TcpConnection对象都被一个共享智能指针TcpConnetionPtr持有,当执行了TcpConnectionPtr conn(item.second)时,这个TcpConnetion对象就被conn和这个item.second共同持有,但是这个conn的生存周期很短,只要离开了当前的这一次for循环,conn就会被释放。
  • 紧接着调用item.second.reset()释放掉TcpServer中保存的该TcpConnectino对象的智能指针。此时在当前情况下,只剩下conn还持有这个TcpConnection对象,因此当前TcpConnection对象还不会被析构。
  • 接着调用了conn->getLoop()->runInLoop(bind(&TcpConnection::connectDestroyed, conn));
    这句话的含义是让SubEventLoop线程去执行TcpConnection::connectDestroyed()函数。当你把这个conn的成员函数传进去的时候,conn所指向的资源的引用计数会加1。因为传给runInLoop的不只有函数,还有这个函数所属的对象conn。
  • SubEventLoop线程开始运行TcpConnection::connectDestroyed()
  • MainEventLoop线程当前这一轮for循环跑完,共享智能指针conn离开代码块,因此被析构,但是TcpConnection对象还不会被释放,因为还有一个共享智能指针指向这个TcpConnection对象,而且这个智能指针在TcpConnection::connectDestroyed()中,只不过这个智能指针你看不到,它在这个函数中是一个隐式的this的存在。当这个函数执行完后,智能指针就真的被释放了。到此,就没有任何智能指针指向这个TcpConnection对象了。TcpConnection对象就彻底被析构删除了。

补充:

__thread 是一个 GCC 和 Clang 扩展,用于声明线程局部存储(TLS)变量。它保证每个线程都有自己独立的变量副本,而不是所有线程共享同一个变量。这在多线程编程中很重要,尤其是当每个线程需要持有自己的数据时,比如线程ID(TID)。

在你的示例中,t_cachedTid 需要用 __thread 修饰的原因如下:

  1. 线程局部存储: __thread 修饰的变量会在每个线程中分配一个独立的存储空间。当线程访问 t_cachedTid 时,它实际上是访问它自己线程的副本,而不是其他线程的副本。这样可以避免多个线程对同一个变量的竞争和冲突。
  2. 线程安全: 由于每个线程拥有自己独立的 t_cachedTid 变量副本,所以在不同线程中修改 t_cachedTid 不会影响其他线程的 t_cachedTid 值。这确保了线程间的安全性和隔离性。
  3. 性能优化: 使用线程局部存储可以减少线程间的同步开销,因为每个线程都处理自己的数据副本,而不需要进行锁操作来保护共享变量。

面试篇

Q:muduo中如何解决跨线程的对象析构问题?

  1. 线程安全的任务派发机制:通过 runInLoop()queueInLoop() 确保操作在正确的线程中执行,避免跨线程访问对象的并发问题。
  2. 延迟删除机制:通过事件循环中的任务队列,将对象的销毁操作推迟到事件处理的安全时机。
  3. 使用智能指针管理对象生命周期:使用 shared_ptrweak_ptr 来管理对象的生存期,避免悬空指针问题。

Q:在Muduo中,如何处理长连接和短连接?对于这两者,你会在设计中做哪些优化?

长连接和短链接:

  1. 长连接:客户端和服务端在建立连接后,会保持连接的状态,进行多次网络通信,直到客户端或服务端显式关闭连接。
  2. 短连接:客户端和服务端在每次通信后都会关闭连接。

处理的话

  • 短连接
    • 连接建立:客户端与服务器建立 TCP 连接,TcpConnection 进入 kConnected 状态。
    • 接收数据:服务器读取到客户端发送的数据,调用用户设置的 messageCallback_ 处理数据。
    • 发送响应并关闭写端:服务器处理完请求后,调用 TcpConnection::shutdown() 关闭写端,向客户端发送 FIN 包,通知对方我不再发送数据了。
    • 等待对端关闭:客户端收到 FIN 包后,会发送 ACK 包确认,之后客户端也会关闭连接,发送 FIN 包给服务器。
    • 关闭连接:服务器收到客户端的 FIN 包后,调用 handleClose() 关闭连接,释放资源。【执行优雅关闭】
  • 长连接:就是连接建立就让其建立着,不人为手动干涉。

长连接优化策略

  • 心跳检测:定期检查连接是否存活,及时关闭异常连接。【TCP传输层和时间轮,定时器的方式】
  • 空闲连接管理:关闭长时间没有活动的连接,避免资源占用。
  • Nagle 算法控制:按需禁用 Nagle 算法,减少延迟。
  • 批量数据传输:利用 Buffer 模块进行高效的数据读写。
  • 负载均衡:合理分发连接到不同的线程,避免单线程过载。

短连接优化策略

  • TCP 快速打开(TFO)
    • TCP 快速打开(TCP Fast Open,简称 TFO)是一种优化 TCP 连接建立过程的技术,旨在减少短连接的三次握手开销,从而减少延迟。TFO 允许在 TCP 握手的同时传递数据,从而避免了传统三次握手过程中的延迟。
1
2
3
4
5
6
7
8
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>

int enable_tfo(int sockfd) {
int optval = 1;
return setsockopt(sockfd, IPPROTO_TCP, TCP_FASTOPEN, &optval, sizeof(optval));
}

Q:Muduo中,如何处理大流量下的网络拥塞问题?你会采用哪些策略来防止网络过载?

  • TCP 快速打开(TFO):减少握手开销,提升短连接性能。

  • 应用层流控与限速:通过缓冲区管理和限速机制控制数据的发送与接收,防止缓冲区溢出。

    • void sendWithRateLimit(const string& data, size_t maxRate)
      {
          size_t totalSize = data.size();
          size_t offset = 0;
          while (offset < totalSize)
          {
              size_t chunkSize = std::min(maxRate, totalSize - offset);
              conn_->send(data.substr(offset, chunkSize));
              offset += chunkSize;
              // 使用定时器或睡眠机制来限制发送速率
              usleep(10000);  // 例如每次发送后等待 10 毫秒
          }
      }
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
        
      - **背压机制**

      背压机制是防止发送方过多地向接收方发送数据的有效手段。Muduo 支持通过应用层的背压机制来防止接收方的缓冲区被填满。

      - 当 `TcpConnection` 的发送缓冲区被填满时,Muduo 会自动将写事件从 `epoll` 的监听事件中移除,直到缓冲区有空间时再重新加入监听。这种机制可以防止数据过载导致的网络拥塞。

      > **Q:muduo在哪存在线程竞争**

      虽然大部分操作在 Muduo 中都避免了线程竞争,但在某些情况下还是需要使用锁来保护共享资源。

      `EventLoop::queueInLoop()` 和 `doPendingFunctors()`

      Muduo 支持在外部线程向 `EventLoop` 中提交任务(Functor),这些任务会被放入一个 **任务队列** 中,等待 `EventLoop` 线程执行。由于任务的提交和执行可能发生在不同的线程中,因此需要通过**锁**来保护任务队列,防止多个线程同时访问任务队列时发生竞争。

      `queueInLoop()` 方法将任务添加到 `pendingFunctors_` 队列中,而 `doPendingFunctors()` 在 `EventLoop` 的运行过程中会执行这些任务。为防止多个线程同时修改任务队列,必须使用锁来保护它。

      ```c++
      void EventLoop::queueInLoop(Functor cb)
      {
      {
      std::lock_guard<std::mutex> lock(mutex_);
      pendingFunctors_.push_back(std::move(cb)); // 保护任务队列的访问
      }

      // 如果不是在当前线程中调用,唤醒 EventLoop 线程处理任务
      if (!isInLoopThread() || callingPendingFunctors_)
      {
      wakeup(); // 唤醒 EventLoop 处理任务
      }
      }

在这里,std::mutex 用于保护 pendingFunctors_ 队列,防止多个线程同时访问该队列时发生竞争。

Q:「one loop per thread」线程模型的优点

  1. 简化并发模型

每个线程独立运行一个事件循环,简化了并发模型的复杂性。线程之间不需要共享事件循环,减少了锁和同步机制的使用,从而降低了死锁和竞争条件的风险。

  1. 提高并发性能

每个线程独立处理事件循环,可以充分利用多核处理器的并行处理能力。多个线程可以同时处理不同的事件,提高了系统的并发性能。

  1. 减少锁竞争

在「one loop per thread」模型中,线程之间不需要共享事件循环,因此减少了锁竞争。每个线程只需要管理自己的事件循环和相关资源,避免了多线程环境下常见的锁竞争问题。

  1. 简化任务调度

每个线程独立处理事件循环,简化了任务调度。线程只需要关注自己的事件循环,不需要考虑其他线程的调度情况。这样可以减少任务调度的复杂性,提高系统的可维护性。

  1. 提高响应速度

每个线程独立处理事件循环,可以提高系统的响应速度。事件循环可以快速处理事件,避免了事件堆积和延迟。这对于实时系统和需要快速响应的应用程序非常重要。

  1. 易于扩展

「one loop per thread」模型易于扩展。可以根据系统的负载情况动态调整线程数量,增加或减少事件循环的数量,以适应不同的工作负载。

eventfd的好处

与事件循环集成

eventfd 可以与事件循环(如 epollselectpoll)无缝集成。通过将 eventfd 文件描述符添加到事件循环中,可以方便地监听事件通知,并在事件到达时执行相应的异步任务。

减少锁竞争

使用 eventfd 可以减少锁竞争。传统的线程间通信机制(如条件变量)通常需要使用锁来保护共享数据,而 eventfd 直接使用文件描述符进行事件通知,避免了锁的使用,减少了锁竞争。

eventfd底层怎么保证线程安全的呢?

eventfd 的实现位于 Linux 内核的 fs/eventfd.c 文件中。为了说明 eventfd 是如何实现线程安全的,我们需要重点关注:

  1. 计数器的原子性操作eventfd 的核心是一个 64 位的计数器,所有对该计数器的读写操作都是原子的,避免了竞态条件。
  2. 锁机制eventfd 使用自旋锁来保护数据结构。
  3. 等待队列:当 read()write() 无法立即完成时(例如,read() 时计数器为 0),会将调用线程加入到等待队列中,直到条件满足。

以下是 eventfd 的一些关键代码片段,展示了如何保证线程安全。

1. eventfd 结构体

首先,eventfd 的核心结构体是 eventfd_ctx,它包含了计数器、等待队列和自旋锁等内容。

1
2
3
4
5
6
7
8
struct eventfd_ctx {
atomic64_t count; // 64 位的计数器,使用原子操作
unsigned int flags; // 标志字段,表示是否为 EFD_SEMAPHORE 模式等
wait_queue_head_t wqh; // 内核等待队列,用于阻塞与唤醒
struct kref kref; // 引用计数
struct list_head fasync; // 异步 I/O 支持
spinlock_t lock; // 自旋锁,保护对该结构体的访问
};
  • counteventfd 的计数器,使用 atomic64_t 来确保其操作是原子的。
  • wqh 是等待队列,当 readwrite 操作无法立即完成时,线程会被放入该队列。
  • lock 是自旋锁,用于保护 eventfd_ctx 的其他字段,确保并发访问时的安全性。

2. eventfd_ctx_read 函数

eventfd_ctx_read()eventfd 的读操作,当用户调用 read() 时会调用该函数。它确保线程安全,并使用原子操作与自旋锁来保护计数器和等待队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static __poll_t eventfd_poll(struct file *file, poll_table *wait)
{
struct eventfd_ctx *ctx = file->private_data;
__poll_t events = 0;

poll_wait(file, &ctx->wqh, wait);

spin_lock_irq(&ctx->lock); // 加锁,保护 ctx 结构体
if (READ_ONCE(ctx->count) > 0)
events |= EPOLLIN; // 数据可读
if (READ_ONCE(ctx->count) == ULLONG_MAX)
events |= EPOLLOUT; // 可写
spin_unlock_irq(&ctx->lock); // 解锁

return events;
}

eventfd_poll() 函数中:

  • 使用 spin_lock_irq() 加锁,保护对 ctx->count 和其他字段的并发访问。
  • READ_ONCE() 确保对 ctx->count 的读取是原子的,避免编译器优化带来的数据不一致问题。
  • 释放自旋锁后返回文件描述符的状态。

3. eventfd_ctx_write 函数

eventfd_ctx_write() 负责处理 write() 系统调用,它使用原子操作来确保对计数器的安全更新。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static int eventfd_ctx_write(struct eventfd_ctx *ctx, u64 to_write)
{
unsigned long flags;
int wake_count = 0;

spin_lock_irqsave(&ctx->lock, flags); // 加锁
if (ULLONG_MAX - ctx->count < to_write) {
spin_unlock_irqrestore(&ctx->lock, flags); // 解锁
return -EAGAIN; // 计数器溢出,返回错误
}
ctx->count += to_write; // 计数器增加
if (waitqueue_active(&ctx->wqh))
wake_count = 1;
spin_unlock_irqrestore(&ctx->lock, flags); // 解锁

if (wake_count)
wake_up(&ctx->wqh); // 唤醒等待队列中的线程

return 0;
}

eventfd_ctx_write() 函数中:

  • 使用 spin_lock_irqsave()spin_unlock_irqrestore() 来加锁和解锁。这确保了 ctx->count 的增减操作在多线程环境下不会出现竞态条件。
  • ULLONG_MAX - ctx->count < to_write 检查是否会发生溢出。
  • 如果等待队列中有线程(通过 waitqueue_active() 检查),则调用 wake_up() 唤醒所有等待的线程。

4. eventfd_ctx_read 函数

eventfd_ctx_read() 处理 read() 调用,确保读操作也是线程安全的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static int eventfd_ctx_read(struct eventfd_ctx *ctx, u64 *cnt)
{
unsigned long flags;

spin_lock_irqsave(&ctx->lock, flags); // 加锁
if (ctx->count == 0) {
spin_unlock_irqrestore(&ctx->lock, flags); // 解锁
return -EAGAIN; // 没有数据可读
}
*cnt = ctx->count;
if (ctx->flags & EFD_SEMAPHORE)
ctx->count = 0; // EFD_SEMAPHORE 模式下清空计数器
else
ctx->count = 0; // 否则读取后也清空计数器
spin_unlock_irqrestore(&ctx->lock, flags); // 解锁

return 0;
}

eventfd_ctx_read() 中:

  • spin_lock_irqsave()spin_unlock_irqrestore() 同样用于保护 ctx->count
  • 如果计数器为 0,则返回 -EAGAIN,表示没有数据可读。
  • 读取成功后,计数器根据 EFD_SEMAPHORE 模式进行修改。

5. 等待队列的使用

eventfd 处于阻塞模式下时,如果 read() 时没有数据可读,或者 write() 时计数器已满,调用线程会被加入等待队列,直到条件满足时被唤醒。

例如,wait_event_interruptible()wake_up() 是常用的等待队列函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static int eventfd_signal(struct eventfd_ctx *ctx, u64 n)
{
unsigned long flags;
int rc = 0;

spin_lock_irqsave(&ctx->lock, flags); // 加锁
if (ULLONG_MAX - ctx->count < n) {
rc = -EAGAIN; // 计数器溢出
} else {
ctx->count += n; // 更新计数器
if (waitqueue_active(&ctx->wqh))
wake_up(&ctx->wqh); // 唤醒等待队列中的线程
}
spin_unlock_irqrestore(&ctx->lock, flags); // 解锁

return rc;
}

6. 总结

eventfd 的线程安全性是通过以下机制保证的:

  • 原子操作:使用 atomic64_t 来确保对计数器的读写是原子的。
  • 自旋锁:在并发情况下,使用自旋锁保护关键数据结构(如计数器和等待队列)。
  • 等待队列:当无法立即完成操作时,使用等待队列来阻塞调用线程,并在条件满足时将其唤醒。

其他博客: