先看下之前的大文件传输,也就是游双书上的代码,发送数据只调用了writev函数,并对其返回值是否异常做了处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
bool http_conn::write()
{
int temp = 0; // 用于存储 writev 返回的字节数
int bytes_have_send = 0; // 已发送的字节数
int bytes_to_send = m_write_idx; // 需要发送的字节数

// 如果没有数据需要发送,则将套接字重新设置为可读状态,并初始化连接
if (bytes_to_send == 0)
{
modfd(m_epollfd, m_sockfd, EPOLLIN);
init();
return true;
}

// 进入循环,直到所有数据发送完毕
while (1)
{
// 使用 writev 系统调用发送数据
temp = writev(m_sockfd, m_iv, m_iv_count);

// 如果 writev 返回值小于等于 -1,表示发送失败
if (temp <= -1)
{
// 如果错误码是 EAGAIN,表示当前套接字不可写,需要重新注册 EPOLLOUT 事件
if (errno == EAGAIN)
{
modfd(m_epollfd, m_sockfd, EPOLLOUT);
return true;
}
// 其他错误,释放内存映射,返回 false
unmap();
return false;
}

// 更新需要发送的字节数和已发送的字节数
bytes_to_send -= temp;
bytes_have_send += temp;

// 如果需要发送的字节数小于等于已发送的字节数,表示所有数据已发送完毕
if (bytes_to_send <= bytes_have_send)
{
// 释放内存映射
unmap();

// 如果设置了长连接,则初始化连接并重新设置套接字为可读状态
if (m_linger)
{
init();
modfd(m_epollfd, m_sockfd, EPOLLIN);
return true;
}
else
{
// 否则,重新设置套接字为可读状态,并返回 false,表示连接关闭
modfd(m_epollfd, m_sockfd, EPOLLIN);
return false;
}
}
}
}

在实际测试中发现,当请求小文件时,一次调用 writev 函数就能将数据全部发送出去,这种情况下不会出现问题,并且也不需要再次进入 while 循环。然而,当请求较大的文件时,需要多次调用 writev 函数来分批传输数据,这时就会出现问题,比如文件无法完整显示或完全无法显示。

经过对数据传输过程的分析,问题出在 writevm_iv 结构体成员上。每次传输后,m_iv 并不会自动更新文件指针和传输的长度,因此,它仍会使用原来的指针和长度来发送数据。结果就是,发送的数据没有按预期更新,导致发送重复的内容或者漏掉部分数据。

根据基础 API 的分析,我们知道 writev 函数会按照顺序(iov[0], iov[1], …, iov[iovcnt-1])从缓冲区中聚集并输出数据。在项目中,iov[0] 用来存储 HTTP 响应的报文头,而 iov[1] 指向文件的起始位置。

为了解决这个问题,修改如下:

  • 由于报文头的大小较小,通常在第一次传输后就能发送完毕。
  • 因此,在第一次传输后,需要更新 m_iv[1].iov_basem_iv[1].iov_len,将它们设置为文件未发送部分的起始位置和剩余长度。
  • 同时,将 m_iv[0].iov_len 设置为 0,表示之后的传输只会发送文件内容,不再传输响应报文头。
  • 每次调用 writev 传输数据后,都要更新文件的起始位置和剩余长度。

通过这些调整,文件指针和长度能够正确更新,大文件的传输问题也得到了有效解决。

改进代码

服务器端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
#include <iostream>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <cstring>
#include <cstdlib>
#include <sys/stat.h>
#include <sys/mman.h>
#include <sys/uio.h>
#include <cerrno>

#define PORT 8899
#define FILENAME "file_to_send.txt"
#define MAX_EVENTS 10
#define BUF_SIZE 1024

// 设置文件描述符为非阻塞模式
int set_non_blocking(int sockfd) {
int opts = fcntl(sockfd, F_GETFL);
if (opts < 0) {
perror("fcntl(F_GETFL)");
return -1;
}
opts = (opts | O_NONBLOCK);
if (fcntl(sockfd, F_SETFL, opts) < 0) {
perror("fcntl(F_SETFL)");
return -1;
}
return 0;
}

// 客户端处理类
class TcpServer {
public:
TcpServer(int listen_port);
~TcpServer();
void run();

private:
void handle_connection(int client_sock);
bool send_file(int client_sock);

int m_listen_sock;
int m_epollfd;
struct epoll_event ev, events[MAX_EVENTS];
};

TcpServer::TcpServer(int listen_port) {
struct sockaddr_in server_addr;

// 创建监听 socket
m_listen_sock = socket(AF_INET, SOCK_STREAM, 0);
if (m_listen_sock < 0) {
perror("socket");
exit(EXIT_FAILURE);
}

// 设置服务器地址
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(listen_port);

// 绑定地址
if (bind(m_listen_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
perror("bind");
exit(EXIT_FAILURE);
}

// 监听
if (listen(m_listen_sock, 5) < 0) {
perror("listen");
exit(EXIT_FAILURE);
}

// 创建 epoll 实例
m_epollfd = epoll_create1(0);
if (m_epollfd == -1) {
perror("epoll_create1");
exit(EXIT_FAILURE);
}

// 设置监听 socket 为非阻塞
set_non_blocking(m_listen_sock);

// 将监听 socket 加入 epoll
ev.events = EPOLLIN;
ev.data.fd = m_listen_sock;
if (epoll_ctl(m_epollfd, EPOLL_CTL_ADD, m_listen_sock, &ev) == -1) {
perror("epoll_ctl: m_listen_sock");
exit(EXIT_FAILURE);
}
}

TcpServer::~TcpServer() {
close(m_listen_sock);
close(m_epollfd);
}

void TcpServer::run() {
while (true) {
int nfds = epoll_wait(m_epollfd, events, MAX_EVENTS, -1);
if (nfds == -1) {
perror("epoll_wait");
exit(EXIT_FAILURE);
}

for (int i = 0; i < nfds; i++) {
if (events[i].data.fd == m_listen_sock) {
// 接受新的连接
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
int client_sock = accept(m_listen_sock, (struct sockaddr*)&client_addr, &client_len);
if (client_sock == -1) {
perror("accept");
continue;
}
set_non_blocking(client_sock);

// 将客户端 socket 加入 epoll
ev.events = EPOLLOUT; // 我们只关心写事件
ev.data.fd = client_sock;
if (epoll_ctl(m_epollfd, EPOLL_CTL_ADD, client_sock, &ev) == -1) {
perror("epoll_ctl: client_sock");
close(client_sock);
continue;
}
} else if (events[i].events & EPOLLOUT) {
// 处理客户端的文件发送
int client_sock = events[i].data.fd;
if (!send_file(client_sock)) {
close(client_sock);
epoll_ctl(m_epollfd, EPOLL_CTL_DEL, client_sock, NULL);
}
}
}
}
}

bool TcpServer::send_file(int client_sock) {
// 打开文件
int file_fd = open(FILENAME, O_RDONLY);
if (file_fd == -1) {
perror("open file");
return false;
}

// 获取文件大小
struct stat file_stat;
if (fstat(file_fd, &file_stat) < 0) {
perror("fstat");
close(file_fd);
return false;
}

// 将文件映射到内存
char* file_address = (char*)mmap(0, file_stat.st_size, PROT_READ, MAP_PRIVATE, file_fd, 0);
if (file_address == MAP_FAILED) {
perror("mmap");
close(file_fd);
return false;
}

// 创建用于 writev 的 iovec 结构
struct iovec iv[2];
char header[BUF_SIZE];
snprintf(header, BUF_SIZE, "HTTP/1.1 200 OK\r\nContent-Length: %ld\r\n\r\n", file_stat.st_size);
iv[0].iov_base = header;
iv[0].iov_len = strlen(header);
iv[1].iov_base = file_address;
iv[1].iov_len = file_stat.st_size;

// 用于跟踪已发送的字节数
int bytes_to_send = iv[0].iov_len + iv[1].iov_len;
int bytes_have_send = 0;

while (bytes_to_send > 0) {
int temp = writev(client_sock, iv, 2);
if (temp <= 0) {
if (errno == EAGAIN) {
// 如果不可写,等待下一次可写事件
ev.events = EPOLLOUT;
ev.data.fd = client_sock;
epoll_ctl(m_epollfd, EPOLL_CTL_MOD, client_sock, &ev);
munmap(file_address, file_stat.st_size);
close(file_fd);
return true; // 等待下一次发送
}
munmap(file_address, file_stat.st_size);
close(file_fd);
return false; // 发送失败
}

bytes_have_send += temp;
bytes_to_send -= temp;

// 更新 iovec 的发送状态
if (bytes_have_send >= iv[0].iov_len) {
iv[1].iov_base = file_address + (bytes_have_send - iv[0].iov_len);
iv[1].iov_len = bytes_to_send;
iv[0].iov_len = 0;
} else {
iv[0].iov_base = (char*)iv[0].iov_base + bytes_have_send;
iv[0].iov_len -= bytes_have_send;
}
}

// 发送完成
munmap(file_address, file_stat.st_size);
close(file_fd);
return false; // 传输完毕,关闭连接
}

int main() {
TcpServer server(PORT);
server.run();
return 0;
}

客户端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#include <iostream>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <cstdlib>
#include <fstream>

#define SERVER_IP "127.0.0.1"
#define SERVER_PORT 8899
#define BUF_SIZE 1024

int main()
{
int sockfd;
struct sockaddr_in server_addr;
char buffer[BUF_SIZE];

// 创建 socket
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
{
perror("socket");
exit(EXIT_FAILURE);
}

// 设置服务器地址
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT);

if (inet_pton(AF_INET, SERVER_IP, &server_addr.sin_addr) <= 0)
{
perror("inet_pton");
close(sockfd);
exit(EXIT_FAILURE);
}

// 连接服务器
if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0)
{
perror("connect");
close(sockfd);
exit(EXIT_FAILURE);
}

// 打开文件用于保存接收到的数据
std::ofstream outfile("gdal-2.4.11.tar.gz", std::ios::binary);
if (!outfile.is_open())
{
perror("ofstream");
close(sockfd);
exit(EXIT_FAILURE);
}

// 接收数据
int bytes_received = 0;
while ((bytes_received = recv(sockfd, buffer, BUF_SIZE, 0)) > 0)
{
outfile.write(buffer, bytes_received);
}

if (bytes_received < 0)
{
perror("recv");
}

std::cout << "File received successfully!" << std::endl;

// 关闭文件和连接
outfile.close();
close(sockfd);
return 0;
}

上述的流程可以总结为:

TcpServer::send_file 函数负责将文件内容通过 TCP 连接发送给客户端。它首先打开文件并获取文件大小,然后将文件内容映射到内存。接着,它创建一个 iovec 数组,包含 HTTP 响应头和文件内容,并使用 writev 系统调用发送数据。在发送过程中,它会处理各种情况,如套接字不可写、数据发送完毕等。通过这种方式,可以高效地将文件内容发送给客户端,并保持代码的结构清晰和易于维护。

TcpServer::send_file 流程

  1. 打开文件
    • 使用 open 系统调用打开指定文件,获取文件描述符 file_fd
    • 如果文件打开失败,输出错误信息并返回 false
  2. 获取文件大小
    • 使用 fstat 系统调用获取文件的统计信息,包括文件大小 file_stat.st_size
    • 如果获取文件信息失败,输出错误信息并关闭文件描述符,返回 false
  3. 内存映射文件
    • 使用 mmap 系统调用将文件映射到内存,获取文件内容的内存地址 file_address
    • 如果内存映射失败,输出错误信息并关闭文件描述符,返回 false
  4. 准备发送数据
    • 创建一个 iovec 数组,包含两个元素:
      • 第一个元素是 HTTP 响应头,包含状态行和内容长度。
      • 第二个元素是文件内容的内存地址。
    • 计算总共需要发送的字节数 bytes_to_send
  5. 发送数据
    • 进入循环,直到所有数据发送完毕。
    • 使用 writev 系统调用发送数据。
    • 如果 writev 返回值小于等于 0,表示发送失败:
      • 如果错误码是 EAGAIN,表示当前套接字不可写,需要重新注册 EPOLLOUT 事件,并等待下一次可写事件。
      • 其他错误则释放内存映射,关闭文件描述符,返回 false
    • 更新已发送字节数 bytes_have_send 和剩余字节数 bytes_to_send
    • 根据已发送的字节数更新 iovec 数组的状态。
  6. 发送完成
    • 如果所有数据发送完毕,释放内存映射,关闭文件描述符,返回 false,表示传输完毕,关闭连接。