前言

网络编程是一个比较难码的模块,也是相对其他编程进阶的模块,特别是各种回调函数的理解调用以及其中会遇到各式各样难以解决的Bug。

借着学习陈硕大神muduo库的机会阅读了这本经典的网络编程教材《Linux多线程服务端编程》,这本也是这学期一直想学习的一本书,PS:就像研一下一直想学习Mit6.824一样。

image-20240502214205866

开始整体部分!

muduo网络库是Multi-Reactor架构,具体可以分为3个部分:

  • 网络相关模块:如Socket、InetAddress、TcpConnection、Acceptor、TcpServer等
  • 事件循环相关模块:如EventLoop、Channel、Poller、EPollPoller等
  • 线程相关模块:如Thread、EventLoopThread、EventLoopThreadPool等
  • 基础模块:如用户态缓冲区Buffer、时间戳Timestamp、日志类Logger等

本文参考自:

原理篇

Multi-Reactor

Muduo库是基于Reactor模式实现的TCP网络编程库。

Multi-Reactor模型如下所示:

image-20240502220126199

事件循环模块

muduo库基于三个关键组件来实现一个reactor,这个reactor可以持续地监听一组文件描述符(fd),并根据每个fd上发生的事件来调用相应的处理函数。这三个核心组件包括Channel类、Poller/EpollPoller类以及EventLoop类。

可以先看下整体的架构,先有个整体的了解。

image-20240502224744115

Channel类

概述

在TCP网络编程中,要想使用IO多路复用来监听某个文件描述符(fd),需要通过epoll_ctl将这个fd及其关心的事件注册到IO多路复用模块(也可以称之为事件监听器)上。当事件监听器检测到该fd发生了某个事件时,它会返回一个包含发生事件的fd集合,以及每个fd都发生了什么事件。

Channel类就是这样一个封装,它包含了一个文件描述符(fd),这个fd关心的事件,以及事件监听器实际检测到的事件。除此之外,Channel类还提供了一些方法,允许你设置这个fd的关心的事件,将这个fd及其关心的事件注册到事件监听器中或者从事件监听器中移除,以及保存这个fd的每种事件所对应的处理函数。

成员变量

  • int fd_:Channel对象关心的文件描述符。
  • int events_:fd感兴趣的事件类型集合。
  • int revents_:代表事件监听器实际监听到该fd发生的事件类型集合。当事件监听器监听到一个fd发生了什么事件,通过Channel::set_revents()函数来设置revents值。
  • EventLoop* loop:表示当前的Channel是在哪个loop中。
  • read_callback_ 、write_callback_、close_callback_、error_callback_:这些是std::function类型,代表着这个Channel为这个文件描述符保存的各事件类型发生时的处理函数。比如这个fd发生了可读事件,需要执行可读事件处理函数,Channel类都替你保管好了这些可调用函数。

成员函数

成员函数的设计是根据成员变量和这个类的功能设计,在设计之前我们自己也可以先想想作者为什么这么设计。

  • 向Channel对象注册各类事件的处理函数。

    1
    2
    3
    4
    void setReadCallback(ReadEventCallback cb) {read_callback_ = std::move(cb);}
    void setWriteCallback(Eventcallback cb) {write_callback_ = std::move(cb);}
    void setCloseCallback(EventCallback cb) {close_callback_ = std::move(cb);}
    void setErrorCallback(EventCallback cb) {error_callback_ = std::move(cb);}

    简单来说,就是事件监听器监听到描述符发生的事件的时候,相应的处理函数来处理。处理函数保存在Channel类中,这样调用也比较方便。第二点所示。

  • 根据poller通知的channel发生的具体事件, 由channel负责调用具体的回调操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    void Channel::handleEventWithGuard(Timestamp receiveTime)
    {
    LOG_INFO("channel handleEvent revents:%d\n", revents_);
    if ((revents_ & EPOLLHUP) && !(revents_ & EPOLLIN))
    {
    if (closeCallback_)
    {
    closeCallback_();
    }
    }
    ...
    }

    当调用epoll_wait()后,可以得知事件监听器上哪些Channel(文件描述符)发生了哪些事件,事件发生后自然就要调用这些Channel对应的处理函数。

  • 将这个文件描述符实际发生的事件封装进这个Channel中

    1
    int set_revents(int revt) {revents_ = revt;}

    这个函数是在EPollPoller::fillActiveChannels调用,即获取活跃的连接的时候设置给Channel。

  • 将Channel中的文件描述符及其感兴趣事件注册事件监听器上或从事件监听器上移除

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    // 设置完成后会跟新epoll中的fd为当前需要监听[设置]的事件
    void enableReading() {events_ |= kReadEvent; upadte();}
    void disableReading() {events_ &= ~kReadEvent; update();}
    void enableWriting() {events_ |= kWriteEvent; update();}
    void disableWriting() {events_ &= ~kWriteEvent; update();}
    void disableAll() {events_ |= kNonEvent; update();}
    /*
    void Channel::update()
    {
    // 通过channel所属的EventLoop,调用poller的相应方法,注册fd的events事件
    loop_->updateChannel(this);
    }
    */

    这个update的调用过程是Channel=>EventLoop=>Poller。update本质上就是调用了epoll_ctl()。

EpollPoller类

概述

负责监听文件描述符事件是否触发以及返回发生事件的文件描述符以及具体事件的模块就是Poller。一个Poller对象对应一个事件监听器。其中,1个reactor中有1个Poller,也就是说有多少reactor就有多少Poller。

目前,项目中muduo库只支持epoll。

Poller是个抽象虚类,由EpollPoller和PollPoller继承实现,与监听文件描述符和返回监听结果的具体方法也基本上是在这两个派生类中实现。EpollPoller就是封装了用epoll方法实现的与事件监听有关的各种方法,PollPoller就是封装了poll方法实现的与事件监听有关的各种方法。

成员变量

这里将Poller/EpollPoller成员变量放在一起。

  • epollfd_:用epoll_create方法返回的epoll句柄。
  • channels_:这个变量是std::unordered_map<int, Channel*>类型,负责记录 文件描述符 —> Channel的映射,也帮忙保管所有注册在这个Poller上的Channel。
  • ownerLoop_:表示当前epollfd在哪个loop。

成员函数

  • 获取监听发生事件的描述符的Channel集合

    核心是以下2个函数。

    1
    2
    TimeStamp poll(int timeoutMs, ChannelList *activeChannels)
    void fillActiveChannels(int numEvents, ChannelList *activeChannels)

    具体代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    Timestamp EPollPoller::poll(int timeoutMs, ChannelList *activeChannels)
    {
    // events_每次传入都是初始化过的,也就说里面没有数据,等待epoll_wait获取数据。
    int numEvents = ::epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), timeoutMs);
    fillActiveChannels(numEvents, activeChannels);
    }
    // 填写活跃的连接
    void EPollPoller::fillActiveChannels(int numEvents, ChannelList *activeChannels) const
    {
    for (int i = 0; i < numEvents; ++i)
    {
    Channel *channel = static_cast<Channel *>(events_[i].data.ptr);
    channel->set_revents(events_[i].events);
    activeChannels->push_back(channel); // EventLoop就拿到了它的poller给它返回的所有发生事件的channel列表了
    }
    }

    这里有个问题:fillActiveChannels中遍历numEvents为什么就能得到channel对象?

    这是因为在将EPollPoller::update中(也就是在将文件描述符交给epoll监听的时候,将channel对象也放进去便于传递方便后期使用)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    typedef union epoll_data
    {
    void *ptr;
    int fd;
    uint32_t u32;
    uint64_t u64;
    } epoll_data_t;

    struct epoll_event
    {
    uint32_t events; /* Epoll events */
    epoll_data_t data; /* User data variable */
    } __EPOLL_PACKED;
  • 更新文件描述符监听的事件

    1
    2
    void updateChannel(Channel *channel) override;
    void removeChannel(Channel *channel) override;
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    // 关系:channel update remove => EventLoop updateChannel removeChannel => Poller updateChannel removeChannel
    // update
    void EPollPoller::updateChannel(Channel *channel)
    {
    const int index = channel->index();

    if (index == kNew || index == kDeleted)
    {
    if (index == kNew)
    {
    int fd = channel->fd();
    channels_[fd] = channel;
    }
    channel->set_index(kAdded);
    update(EPOLL_CTL_ADD, channel);
    }
    else // channel已经在poller上注册过了
    {
    ...
    }
    }
    // 更新channel通道 epoll_ctl add/mod/del
    void EPollPoller::update(int operation, Channel *channel)
    {
    epoll_event event;
    bzero(&event, sizeof event);

    int fd = channel->fd();

    event.events = channel->events();
    event.data.fd = fd;
    event.data.ptr = channel;

    if (::epoll_ctl(epollfd_, operation, fd, &event) < 0)
    {
    ...
    }
    }

    上述的代码比较清楚了,调用updateChannel其中再调用update。本质还是调用epoll_ctl,只不过层层封装适合框架。

EventLoop类

EventLoop类就是将Channel类和EpollPoller类进行统一管理。

EventLoop就是负责实现**“循环”,负责驱动“循环”**的重要模块。

这里循环也就是while循环不断检测是否存在监听事件的发生,然后使用传入的回调函数处理对应的事件。

可以看下下面这张图

image-20240502224744115

  • 开启事件循环

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    void EventLoop::loop()
    {
    looping_ = true;
    quit_ = false;

    while (!quit_)
    {
    activeChannels_.clear();
    // 监听两类fd 一种是client的fd,一种wakeupfd
    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
    for (Channel *channel : activeChannels_)
    {
    // Poller监听哪些channel发生事件了,然后上报给EventLoop,通知channel处理相应的事件
    channel->handleEvent(pollReturnTime_);
    }
    // 执行当前EventLoop事件循环需要处理的回调操作
    /**
    * IO线程 mainLoop accept fd《=channel subloop
    * mainLoop 事先注册一个回调cb(需要subloop来执行) wakeup subloop后,执行下面的方法,执行之前mainloop注册的cb操作
    */
    doPendingFunctors();
    }
    looping_ = false;
    }
    //--------------------执行回调----------------------
    void EventLoop::doPendingFunctors() // 执行回调
    {
    std::vector<Functor> functors;
    callingPendingFunctors_ = true;

    {
    std::unique_lock<std::mutex> lock(mutex_);
    functors.swap(pendingFunctors_);
    }

    for (const Functor &functor : functors)
    {
    functor(); // 执行当前loop需要执行的回调操作
    }

    callingPendingFunctors_ = false;
    }

    其实,刚开始的时候对应为什么还会执行doPendingFunctors有点迷惑?这里给出我的理解:用于loop之间通信使用,比如:mainloop要将客户端交给一个loop处理请求。

  • 子loop唤醒

    接着上面的问题,当主loop要将连接的客户端交给一个subloop子loop监听处理,但是子loop怎么唤醒呢?

    这里muduo库是先创建一个wakeup描述符,专门用于唤醒描述符,加入到epoll中监听。那么对于唤醒子loop的方法只用往wakeup写入即可,当epoll监听到写事件的时候,就会调用wakeup之前设置的事件发生处理函数进行处理(也就将写事件写入的读出来)。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    void EventLoop::handleRead()
    {
    uint64_t one = 1;
    ssize_t n = read(wakeupFd_, &one, sizeof one);
    if (n != sizeof one)
    {
    LOG_ERROR("EventLoop::handleRead() reads %lu bytes instead of 8", n);
    }
    }

    // 用来唤醒loop所在的线程的 向wakeupfd_写一个数据,wakeupChannel就发生读事件,当前loop线程就会被唤醒
    void EventLoop::wakeup()
    {
    uint64_t one = 1;
    ssize_t n = write(wakeupFd_, &one, sizeof one);
    if (n != sizeof one)
    {
    LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8 \n", n);
    }
    }
  • 当多线程的情况如何正确调用函数呢?

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    void EventLoop::quit()
    {
    quit_ = true;
    // 如果是在其它线程中,调用的quit 在一个subloop(woker)中,调用了mainLoop(IO)的quit
    if (!isInLoopThread())
    {
    wakeup();
    }
    }
    // 在当前loop中执行cb
    void EventLoop::runInLoop(Functor cb)
    {
    if (isInLoopThread()) // 在当前的loop线程中,执行cb
    {
    cb();
    }
    else // 在非当前loop线程中执行cb , 就需要唤醒loop所在线程,执行cb
    {
    queueInLoop(cb);
    }
    }

简单来说,1.loop在自己的线程中调用quit 2.在非loop的线程中,调用loop的quit。

  • 小注意点·

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    // 在当前loop中执行cb
    void EventLoop::runInLoop(Functor cb)
    {
    if (isInLoopThread()) // 在当前的loop线程中,执行cb
    {
    cb();
    }
    else // 在非当前loop线程中执行cb , 就需要唤醒loop所在线程,执行cb
    {
    queueInLoop(cb);
    }
    }
    // 把cb放入队列中,唤醒loop所在的线程,执行cb
    void EventLoop::queueInLoop(Functor cb)
    {
    {
    std::unique_lock<std::mutex> lock(mutex_);
    pendingFunctors_.emplace_back(cb);
    }

    // 唤醒相应的,需要执行上面回调操作的loop的线程了
    // || callingPendingFunctors_的意思是:当前loop正在执行回调,但是loop又有了新的回调
    if (!isInLoopThread() || callingPendingFunctors_)
    {
    wakeup(); // 唤醒loop所在线程
    }
    }

    在当前的线程直接执行回调函数,但是如果不在的就需要唤醒下。

    这里2个注意点:

    1.怎么判断线程是否是当前线程?

    isInLoopThread() const { return threadId_ == CurrentThread::tid(); }

    muduo库设计中使用了这条语句判断。我们只要使用这个函数进行判断。【这个深入的话有点抽象】

    2.(!isInLoopThread() || callingPendingFunctors_)

    如果不在当前线程,唤醒比较好理解。还有一种情况需要唤醒是正在执行回调函数,如果不唤醒,那么这个回调函数就不知道要什么时候才能执行,也可能不执行。因此需要唤醒一下。

  • 其他

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    // EventLoop的方法 =》 Poller的方法
    void EventLoop::updateChannel(Channel *channel)
    {
    poller_->updateChannel(channel);
    }

    void EventLoop::removeChannel(Channel *channel)
    {
    poller_->removeChannel(channel);
    }

    bool EventLoop::hasChannel(Channel *channel)
    {
    return poller_->hasChannel(channel);
    }

    这个点在Channel调用过程中已经说明。

线程模块

这部分体现了One loop one thread的思想,也是muduo的核心!

Thread类

Thread类主要是创建线程,然后执行相应的回调函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Thread : noncopyable
{
public:
using ThreadFunc = std::function<void()>;

explicit Thread(ThreadFunc, const std::string &name = std::string());
~Thread();

void start();
void join();

bool started() const { return started_; }
pid_t tid() const { return tid_; }
const std::string &name() const { return name_; }

static int numCreated() { return numCreated_; }

private:
void setDefaultName();

bool started_;
bool joined_;
std::shared_ptr<std::thread> thread_;
pid_t tid_;
ThreadFunc func_;
std::string name_;
static std::atomic_int numCreated_;
};

EventLoopThread类

这个类体现了One loop one thread的思想,就是一个线程负责一个loop循环。

这个类主要的工作是将eventloop和thread封装,正好一个loop一个线程。

成员函数

  • EventLoop *loop_:对应的loop。
  • _ bool exiting_:是否退出。
  • Thread thread_:线程类。
  • std::mutex mutex_:互斥锁。
  • std::condition_variable cond:条件变量。
  • ThreadInitCallback callback_:线程初始化回调函数。

成员函数

  • 开启loop
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
EventLoop *EventLoopThread::startLoop()
{
thread_.start(); // 启动底层的新线程

EventLoop *loop = nullptr;
{
std::unique_lock<std::mutex> lock(mutex_);
while (loop_ == nullptr)
{
cond_.wait(lock);
}
loop = loop_;
}
return loop;
}

// 下面这个方法,实在单独的新线程里面运行的
void EventLoopThread::threadFunc()
{ // 创建一个独立的eventloop,和上面的线程是一一对应的,one loop per thread
EventLoop loop;

if (callback_)
{
callback_(&loop);
}

{
std::unique_lock<std::mutex> lock(mutex_);
loop_ = &loop;
cond_.notify_one();
}

loop.loop(); // EventLoop loop => Poller.poll
std::unique_lock<std::mutex> lock(mutex_);
loop_ = nullptr;
}

这里就是创建loop,然后线程中不断的loop循环。

EventLoopPoolThread类

eventloop池,从最开始的架构图就可以知道,muduo有主Loop和若干个子loop。这些loop就是这个类产生的。

成员变量

  • EventLoop *baseLoop_:这是主EventLoop,通常在主线程中运行。
  • std::string name: EventLoopThreadPool的名称。
  • bool started _ :表示EventLoopThreadPool是否已经启动。
  • int numThreads _ : 线程池中线程的数量。
  • int next _ : 下一个要被处理的线程的索引。
  • std::vector< std::unique_ptr< EventLoopThread> > threads_: 存储线程池中所有线程的容器,使用unique_ptr管理每个线程的生命周期。
  • _std::vector<EventLoop *> loops: 存储所有EventLoop的指针的容器,用于分发任务。

成员函数

  • 启动线程池,并且在每个线程中创建并启动一个EventLoop。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    void EventLoopThreadPool::start(const ThreadInitCallback &cb)
    {
    started_ = true;

    for (int i = 0; i < numThreads_; ++i)
    {
    char buf[name_.size() + 32];
    snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);
    EventLoopThread *t = new EventLoopThread(cb, buf);
    threads_.push_back(std::unique_ptr<EventLoopThread>(t));
    loops_.push_back(t->startLoop()); // 底层创建线程,绑定一个新的EventLoop,并返回该loop的地址
    }

    // 整个服务端只有一个线程,运行着baseloop
    if (numThreads_ == 0 && cb)
    {
    cb(baseLoop_);
    }
    }
  • 主Loop_默认以轮询的方式分配channel给子loop,通过轮询挑选出一个子loop

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    EventLoop* EventLoopThreadPool::getNextLoop()
    {
    EventLoop *loop = baseLoop_;

    if (!loops_.empty()) // 通过轮询获取下一个处理事件的loop
    {
    loop = loops_[next_];
    ++next_;
    if (next_ >= loops_.size())
    {
    next_ = 0;
    }
    }

    return loop;
    }

网络模块

Acceptor类

概述

Accetpor封装了服务器专门用于监听是否有客户端连接的套接字fd以及相关处理方法。主要是对其他类的方法调用进行封装。

成员变量

  • acceptSocket_:这个是服务器监听套接字的文件描述符。
  • acceptChannel_:这是个Channel类,把acceptSocket _ 及其感兴趣事件和事件对应的处理函数都封装进去。
  • EventLoop *loop:监听套接字的fd由哪个EventLoop负责循环监听以及处理相应事件,其实这个EventLoop就是main EventLoop。
  • newConnectionCallback_: TcpServer构造函数中将TcpServer::newConnection( )函数注册给了这个成员变量。这个TcpServer::newConnection函数的功能是公平的选择一个subEventLoop,并把已经接受的连接分发给这个subEventLoop。

成员函数

  • 开启链接监听

    listen( ):该函数底层调用了linux的函数listen( ),开启对acceptSocket _ 的监听同时将acceptChannel及其感兴趣事件(可读事件)注册到main EventLoop 的事件监听器上。换言之就是让main EventLoop事件监听器去监听acceptSocket_。

    1
    2
    3
    4
    5
    6
    void Acceptor::listen()
    {
    listenning_ = true;
    acceptSocket_.listen(); // listen
    acceptChannel_.enableReading(); // acceptChannel_ => Poller
    }
  • 处理新用户连接

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport)
    : loop_(loop)
    , acceptSocket_(createNonblocking()) // socket
    , acceptChannel_(loop, acceptSocket_.fd())
    , listenning_(false)
    {
    acceptSocket_.setReuseAddr(true);
    acceptSocket_.setReusePort(true);
    acceptSocket_.bindAddress(listenAddr); // bind
    // TcpServer::start() Acceptor.listen 有新用户的连接,要执行一个回调(connfd=》channel=》subloop)
    // baseLoop => acceptChannel_(listenfd) =>
    acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
    }

    // listenfd有事件发生了,就是有新用户连接了
    void Acceptor::handleRead()
    {
    InetAddress peerAddr;
    int connfd = acceptSocket_.accept(&peerAddr);
    if (connfd >= 0)
    {
    if (newConnectionCallback_)
    {
    newConnectionCallback_(connfd, peerAddr); // 轮询找到subLoop,唤醒,分发当前的新客户端的Channel
    }
    else
    {
    ::close(connfd);
    }
    }
    else
    {
    ...
    }
    }

    handleRead( ):这是一个私有成员方法,这个方法是要注册到acceptChannel_ 上的,在构造accept对象的时候设置给acceptchannel对象, 同时handleRead( )方法内部还调用了成员变量newConnectionCallback_保存的函数。当main EventLoop监听到acceptChannel _ 上发生了可读事件时(新用户连接事件),就是调用这个handleRead( )方法。

Buffer类

概述

Buffer类其实是封装了一个用户缓冲区,以及向这个缓冲区写数据读数据等一系列控制方法。

算法

这张图已经可以很好的说明这个类的具体功能。

可以和CS144 TCP协议中的buffer进行比对下。

image-20240502231632156

成员方法

  • ssize_t Buffer::readFd(int fd, int* saveErrno);:客户端发来数据,readFd从该TCP接收缓冲区中将数据读出来并放到Buffer中。
  • ssize_t Buffer::writeFd(int fd, int* saveErrno);:服务端要向这条TCP连接发送数据,通过该方法将Buffer中的数据拷贝到TCP发送缓冲区中。

巧妙的设计:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 从fd上读取数据 Poller工作在LT模式
* Buffer缓冲区是有大小的! 但是从fd上读数据的时候,却不知道tcp数据最终的大小
*/
ssize_t Buffer::readFd(int fd, int* saveErrno)
{
char extrabuf[65536] = {0}; // 栈上的内存空间 64K

struct iovec vec[2];

const size_t writable = writableBytes(); // 这是Buffer底层缓冲区剩余的可写空间大小
vec[0].iov_base = begin() + writerIndex_;
vec[0].iov_len = writable;

vec[1].iov_base = extrabuf;
vec[1].iov_len = sizeof extrabuf;

const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1;
const ssize_t n = ::readv(fd, vec, iovcnt);
if (n < 0)
{
*saveErrno = errno;
}
else if (n <= writable) // Buffer的可写缓冲区已经够存储读出来的数据了
{
writerIndex_ += n;
}
else // extrabuf里面也写入了数据
{
writerIndex_ = buffer_.size();
append(extrabuf, n - writable); // writerIndex_开始写 n - writable大小的数据
}

return n;
}

解释:可以让用户一次性把所有TCP接收缓冲区的所有数据全部都读出来并放到用户自定义的缓冲区Buffer中。用户自定义缓冲区Buffer是有大小限制的,我们一开始不知道TCP接收缓冲区中的数据量有多少,如果一次性读出来会不会导致Buffer装不下而溢出。所以在readFd( )函数中会在栈上创建一个临时空间extrabuf,然后使用readv的分散读特性,将TCP缓冲区中的数据先拷贝到Buffer中,如果Buffer容量不够,就把剩余的数据都拷贝到extrabuf中,然后再调整Buffer的容量(动态扩容),再把extrabuf的数据拷贝到Buffer中。当这个函数结束后,extrabuf也会被释放。另外extrabuf是在栈上开辟的空间,速度比在堆上开辟还要快。

TcpConnection/TcpServer类

这个类是集大成的类,需要清楚的知道具体的回调函数的调用。

TCP网络编程的本质是处理下面这几个事件:

  • 连接的建立。
  • 连接的断开。(包括主动断开和被动断开)
  • 消息到达,客户端连接文件描述符可读。
  • 消息发送,向客户端连接文件描述符写数据。

先来看看整体的图示:

使用示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#include <mymuduo/TcpServer.h>
#include <mymuduo/Logger.h>

#include <string>
#include <functional>

class EchoServer
{
public:
EchoServer(EventLoop *loop,
const InetAddress &addr,
const std::string &name)
: server_(loop, addr, name)
, loop_(loop)
{
// 注册回调函数
server_.setConnectionCallback(
std::bind(&EchoServer::onConnection, this, std::placeholders::_1)
);

server_.setMessageCallback(
std::bind(&EchoServer::onMessage, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)
);

// 设置合适的loop线程数量 loopthread
server_.setThreadNum(3);
}
void start()
{
server_.start();
}
private:
// 连接建立或者断开的回调
void onConnection(const TcpConnectionPtr &conn)
{
if (conn->connected())
{
LOG_INFO("Connection UP : %s", conn->peerAddress().toIpPort().c_str());
}
else
{
LOG_INFO("Connection DOWN : %s", conn->peerAddress().toIpPort().c_str());
}
}

// 可读写事件回调
void onMessage(const TcpConnectionPtr &conn,
Buffer *buf,
Timestamp time)
{
std::string msg = buf->retrieveAllAsString();
conn->send(msg);
conn->shutdown(); // 写端 EPOLLHUP =》 closeCallback_
}

EventLoop *loop_;
TcpServer server_;
};

int main()
{
EventLoop loop;
InetAddress addr(8000);
EchoServer server(&loop, addr, "EchoServer-01"); // Acceptor non-blocking listenfd create bind
server.start(); // listen loopthread listenfd => acceptChannel => mainLoop =>
loop.loop(); // 启动mainLoop的底层Poller

return 0;
}

Note:using TcpConnectionPtr = std::shared_ptr< TcpConnection > ;

连接建立

image-20240503105422549

大体的整个流程如上图所示:

  • TcpServer::TcpServer()
    当我们创建一个TcpServer对象,即执行代码TcpServer server(&loop, listenAddr);调用了TcpServer的构造函数,TcpServer构造函数最主要的就是类的内部实例化了一个Acceptor对象,并往这个Acceptor对象注册了一个回调函数TcpServer::newConnection()。

  • Acceptor::Acceptor()
    当我们在TcpServer构造函数实例化Acceptor对象时,Acceptor的构造函数中实例化了一个Channel对象,即acceptChannel _ ,该Channel对象封装了服务器监听套接字文件描述符(尚未注册到main EventLoop的事件监听器上)。接着Acceptor构造函数将Acceptor::handleRead()方法注册进acceptChannel_ 中,这也意味着,日后如果事件监听器监听到acceptChannel_发生可读事件,将会调用Acceptor::handleRead()函数。

    至此,TcpServer对象创建完毕,用户调用TcpServer::start()方法,开启TcpServer。主要就是调用Acceptor::listen()函数(底层是调用了linux的函数listen())监听服务器套接字,以及将acceptChannel_注册到main EventLoop的事件监听器上监听它的可读事件(新用户连接事件)接着用户调用loop.loop(),即调用了EventLoop::loop()函数,该函数就会循环的获取事件监听器的监听结果,并且根据监听结果调用注册在事件监听器上的Channel对象的事件处理函数。

  • Acceptor::handleRead()
    当程序如果执行到了这个函数里面,说明acceptChannel_ 发生可读事件,程序处理新客户连接请求。该函数首先调用了Linux的函数accept()接受新客户连接。接着调用了TcpServer::newConnection()函数,这个函数是在步骤1中注册给Acceptor并由成员变量newConnectionCallback_保存。

  • TcpServer::newConnection()
    该函数的主要功能就是将建立好的连接进行封装(封装成TcpConnection对象),并使用选择算法公平的选择一个sub EventLoop,并调用TcpConnection::connectEstablished()将TcpConnection::channel_注册到刚刚选择的sub EventLoop上。

消息处理

SubEventLoop中的EventLoop::loop()函数内部会循环的执行上图中的步骤1和步骤2。步骤1就是调用Poller::poll()方法获取事件监听结果,这个事件监听结果是一个Channel集合,每一个Channel封装着 [一个fd] 及 [fd感兴趣的事件] 和 [事件监听器监听到该fd实际发生的事件]。步骤2就是调用每一个Channel的Channel::HandlerEvent方法。该方法会根据每一个Channel的感兴趣事件以及实际发生的事件调用提前注册在Channel内的对应的事件处理函数(readCallback_、writeCallback_、closeCallback_、errorCallback _ )。

image-20240503105957775

消息读取

readCallback_保存的函数其实是TcpConnection::handleRead( ),消息读取的处理逻辑也就是由这个函数提供的,我们稍微剖析一下这个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if(n > 0) //从fd读到了数据,并且放在了inputBuffer_上
{
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if(n == 0)
handleClose();
else
{
errno = savedErrno;
LOG_ERROR("TcpConnection::handleRead");
handleError();
}
}

TcpConnection::handleRead( )函数首先调用Buffer_.readFd(channel_->fd(), &saveErrno),该函数底层调用Linux的函数readv( ),将Tcp接收缓冲区数据拷贝到用户定义的缓冲区中(inputBuffer_)。如果在读取拷贝的过程中发生了什么错误,这个错误信息就会保存在savedErrno中。

  • 当readFd( )返回值大于0,说明从接收缓冲区中读取到了数据,那么会接着调用messageCallback_中保存的用户自定义的读取消息后的处理函数。
  • readFd( )返回值等于0,说明客户端连接关闭,这时候应该调用TcpConnection::handleClose( )来处理连接关闭事件
  • readFd( )返回值等于-1,说明发生了错误,调用TcpConnection::handleError( )来处理savedErrno的错误事件。Moduo库只支持LT模式,所以读事件不会出现EAGAIN的错误,所以一旦出现错误,说明肯定是比较不好的非正常错误了。而EAGAIN错误只不过是非阻塞IO调用时的一种常见错误而已。

消息发送

当用户调用了TcpConnetion::send(buf)函数时,相当于要求muduo库把数据buf发送给该Tcp连接的客户端。此时该TcpConnection注册在事件监听器上的感兴趣事件中是没有可写事件的。TcpConnection::send(buf)函数内部其实是调用了Linux的函数write( )

如果TCP发送缓冲区内不能一次性容纳buf:

  • 这时候write( )函数buf数据尽可能地拷贝到TCP发送缓冲区中,并且将errno设置为EWOULDBLOCK。
  • 剩余未拷贝到TCP发送缓冲区中的buf数据会被存放在TcpConnection::outputBuffer_中。并且向事件监听器上注册该TcpConnection::channel_的可写事件。
  • 事件监听器监听到该Tcp连接可写事件,就会调用TcpConnection::handleWrite( )函数把TcpConnection::outputBuffer_中剩余的数据发送出去。_
    • 在TcpConnection::handleWrite( )函数中,通过调用Buffer::writeFd()函数将outputBuffer_的数据写入到Tcp发送缓冲区,如果Tcp发送缓冲区能容纳全部剩余的未发送数据,那最好不过了。如果Tcp发送缓冲区依旧没法容纳剩余的未发送数据,那就尽可能地将数据拷贝到Tcp发送缓冲区中,继续保持可写事件的监听。
  • 当数据全部拷贝到Tcp发送缓冲区之后,就会调用用户自定义的【写完后的事件处理函数】,并且移除该TcpConnection在事件监听器上的可写事件。(移除可写事件是为了提高效率,不会让epoll_wait() 毫无意义的频繁触发可写事件。因为大多数时候是没有数据需要发送的,频繁触发可写事件但又没有数据可写。)

TcpConnection::send和TcpConnection::handleWrite的区别:

  • sendInLoop 的目的是尝试立即将数据写入内核空间。如果一次就能写完,那么就没问题。但如果写入操作被阻塞,或者只写入了部分数据,那么剩下的数据就会被放到 outputBuffer_ 中,然后通过注册写事件 enableWriting(),在有空间可写的时候,再继续写入剩下的数据。这个函数的调用通常是在应用程序想要发送数据的时候。
  • handleWrite 则是在内核通知应用程序可以写入数据的时候调用的,即当套接字的发送缓冲区有足够的空间可以接收更多数据时。这个函数的目的是尽可能地将 outputBuffer_ 中的数据写入到内核空间,直到写入全部数据或者内核空间无法接收更多数据为止。然后,如果 outputBuffer_ 中的数据全部写入完成,那么就取消对写事件的关注 disableWriting(),因为此时不需要再写数据了。
  • 总的来说,sendInLoop 是应用程序主动发送数据,handleWrite 是在收到内核的可写通知后尝试写入数据。这两个函数共同实现了数据的异步写入:sendInLoop 负责将应用程序的数据放入 outputBuffer_ ,并尝试写入内核;handleWrite 负责在内核准备好接收数据时,将 outputBuffer_ 中的数据写入内核。

在阅读源码的时候,有个疑问?也就是之前在学习buffer设计的时候,buffer既支持写也支持读,而且缓冲区的设计就有readerIndex_ 和writerIndex_,那为什么之后TcpConnnection中又放2个缓冲区buffer呢?

GPT回答:

1
2
3
4
5
在muduo网络库中,接收数据的缓冲区inputBuffer_和发送数据的缓冲区outputBuffer_ 都是独立的,这是因为在网络编程中,接收和发送数据的操作通常是异步的,也就是说,它们可以在不同的时间、以不同的速度发生,因此,它们需要各自的缓冲区来维护它们的状态。

具体来说,inputBuffer_被用来存储从网络中接收到的数据,直到应用程序准备好处理这些数据为止。同样,outputBuffer_ 被用来存储应用程序准备发送到网络中的数据,直到这些数据被实际发送为止。

虽然你提到的一个缓冲区可以处理读写操作,但这只适用于单一的读写操作。在网络编程中,接收和发送数据的操作是分离的,并且经常需要在不同的时间进行。因此,使用两个独立的缓冲区可以使得接收和发送数据的操作更加灵活和高效。
连接断开

连接被动断开

服务端TcpConnection::handleRead()中感知到客户端把连接断开了。
TcpConnection::handleRead( )函数内部调用了Linux的函数readv( ),当readv( )返回0的时候,服务端就知道客户端断开连接了。然后就接着调用TcpConnection::handleClose( )。

image-20240503111746345

  1. 在执行TcpConnection::handle_Close()的时候,该函数还是在SubEventLoop线程中运行的,接着调用closeCallback_(connPtr)回调函数,该函数保存的其实是TcpServer::removeConnection( )函数
  2. TcpServer::removeConnection( )函数调用了remvoveConnectionInLoop( )函数,该函数的运行是在MainEventLoop线程中执行的,这里涉及到线程切换技术。
  3. removeConnectionInLoop( )函数:TcpServer对象中有一个connections_ 成员变量,这是一个unordered_map,负责保存【string --> TcpConnection】的映射,其实就是保存着Tcp连接的名字到TcpConnection对象的映射。因为这个Tcp连接要关闭了,所以也要把这个TcpConnection对象从connections_中删掉。然后再调用TcpConnection::connectDestroyed函数。
    另外为什么removeConnectionInLoop()要在MainEventLoop中运行,因为该函数主要是从TcpServer对象中删除某条数据。而TcpServer对象是属于MainEventLoop的。这也是贯彻了One Loop Per Thread的理念。
  4. TcpConnection::connectDestroyed( )函数的执行是又跳回到了subEventLoop线程中。该函数就是将Tcp连接的监听描述符从事件监听器中移除。另外SubEventLoop中的Poller类对象还保存着这条Tcp连接的channel_,所以调用channel_.remove( )将这个Tcp连接的channel对象从Poller内的数据结构中删除。

连接主动断开

服务器主动关闭导致连接断开。当服务器主动关闭时,调用TcpServer::~TcpServer()析构函数。

这里在提示一下EventLoop::runInLoop()函数的意义,假如你有一个EventLoop对象 loop_,当你调用了loop_->runInLoop(function)函数时,这个function函数的执行会在这个loop_绑定的线程上运行!
所以我们画了下面这幅图,在创建TcpConnection对象时,Acceptor都要将这个对象分发给一个SubEventLoop来管理。这个TcpConnection对象的一切函数执行都要在其管理的SubEventLoop线程中运行。再一次贯彻One Loop Per Thread的设计模式。比如要想彻底删除一个TcpConnection对象,就必须要调用这个对象的connecDestroyed()方法,这个方法执行完后才能释放这个对象的堆内存。每个TcpConnection对象的connectDestroyed()方法都必须在这个TcpConnection对象所属的SubEventLoop绑定的线程中执行。

所有上面的TcpServer::~TcpServer()函数就是干这事儿的,不断循环的让这个TcpConnection对象所属的SubEventLoop线程执行TcpConnection::connectDestroyed()函数,同时在MainEventLoop的TcpServer::~TcpServer()函数中调用item.second.reset()释放保管TcpConnection对象的共享智能指针,以达到释放TcpConnection对象的堆内存空间的目的。
但是这里面其实有一个问题需要解决,TcpConnection::connectDestroyed()函数的执行以及这个TcpConnection对象的堆内存释放操作不在同一个线程中运行,所以要考虑怎么保证一个TcpConnectino对象的堆内存释放操作是在TcpConnection::connectDestroyed()调用完后。
这个析构函数巧妙利用了共享智能指针的特点,当没有共享智能指针指向这个TcpConnection对象时(引用计数为0),这个TcpConnection对象就会被析构删除(堆内存释放)。

1
2
3
4
5
6
7
8
9
10
TcpServer::~TcpServer()
{
//connections类型为std::unordered_map<std::string, TcpConnectionPtr>;
for(auto &item : connections_)
{
TcpConnectionPtr conn(item.second);
item.second.reset();
conn->getLoop()->runInLoop(bind(&TcpConnection::connectDestroyed, conn));
}
}
  • 首先TcpServer::connections_是一个unordered_map<string, TcpConnectionPtr>,其中TcpConnectionPtr的含义是指向TcpConnection的shared_ptr。
  • 在一开始,每一个TcpConnection对象都被一个共享智能指针TcpConnetionPtr持有,当执行了TcpConnectionPtr conn(item.second)时,这个TcpConnetion对象就被conn和这个item.second共同持有,但是这个conn的生存周期很短,只要离开了当前的这一次for循环,conn就会被释放。
  • 紧接着调用item.second.reset()释放掉TcpServer中保存的该TcpConnectino对象的智能指针。此时在当前情况下,只剩下conn还持有这个TcpConnection对象,因此当前TcpConnection对象还不会被析构。
  • 接着调用了conn->getLoop()->runInLoop(bind(&TcpConnection::connectDestroyed, conn));
    这句话的含义是让SubEventLoop线程去执行TcpConnection::connectDestroyed()函数。当你把这个conn的成员函数传进去的时候,conn所指向的资源的引用计数会加1。因为传给runInLoop的不只有函数,还有这个函数所属的对象conn。
  • SubEventLoop线程开始运行TcpConnection::connectDestroyed()
  • MainEventLoop线程当前这一轮for循环跑完,共享智能指针conn离开代码块,因此被析构,但是TcpConnection对象还不会被释放,因为还有一个共享智能指针指向这个TcpConnection对象,而且这个智能指针在TcpConnection::connectDestroyed()中,只不过这个智能指针你看不到,它在这个函数中是一个隐式的this的存在。当这个函数执行完后,智能指针就真的被释放了。到此,就没有任何智能指针指向这个TcpConnection对象了。TcpConnection对象就彻底被析构删除了。