夫夷以近,则游者众;险以远,则至者少。 而世之奇伟、瑰怪,非常之观,常在于险远,而人之所罕至焉,故非有志者不能至也。 有志矣,不随以止,又不随以怠,至于幽暗昏惑而有物以相之,或可至也。 然力足以至焉,于人为可讥,而在己为有悔; 尽吾志也而不能至者,可以无悔矣,其孰能讥之乎? - 《游褒禅山记》

在线教育学习平台-快乐学习的项目中,实现过媒体资源管理模块,即将视频、图片等媒体资源上传到MinIO分布式文件系统,实现大文件断点续传。这里将之前的实现进行回顾,并进一步考量可以优化的点和代码实现。

需求分析

当前短视频行业的快速发展,加上用户对高清、流畅观看体验的需求不断提升,对系统的并发处理能力、视频处理速度、存储效率等多方面都提出了极高的要求。那么,我们首先需要了解一个完整的短视频系统应该具备哪些核心模块。

功能需求:用户上传视频、搜索视频、观看视频。

性能需求:

预计用户总量为20亿,日活用户约10亿,每个用户平均每天浏览10个短视频,由此可以预估,短视频日播放量为100亿:

image-20240928114357537

补充:我门对QPS也可以有个进一步的预估。

比如:

  1. 通常情况下,用户的访问并不是均匀分布的,而是集中在某些时段。假设:

    • 20% 的时间内集中发生了 80% 的请求量(这是典型的二八法则),即 80% 的请求集中在一天的 20% 的时间里发生。
  2. 计算峰值时间段的 QPS

    • 峰值时间段的时间占比 = 24 小时的 20% = 24 × 0.2 = 4.8 小时
    • 峰值时间段的总秒数 = 4.8 × 60 × 60 = 17280 秒
    • 峰值时间段的请求量 = 总请求量的 80% = 1000 万 × 0.8 = 800 万

    现在可以计算峰值时间段的 QPS:

    • 峰值 QPS = 峰值时间段的请求量 / 峰值时间段的总秒数
    • 峰值 QPS = 800 万 / 17280 ≈ 463

综上考虑点:QPS,带宽,内存。

根据考量点:

提高QPS的常用方法

  • 负载均衡:负载均衡通过将请求分发到多台服务器上,避免单台服务器过载,从而提升系统的整体吞吐量
  • 缓存:通过缓存热点数据,减少数据库或后端服务的压力,从而提高系统的响应速度和 QPS
  • 数据库优化:读写分离,分库分表,索引优化
  • 异步处理:消息队列,定时器任务执行
  • CDN(内容分发网络)

提高带宽的常用方法

使用 CDN、压缩传输数据、优化传输协议、通过负载均衡

内存

分布式文件系统

系统设计

image-20240928115024436

用户上传模块

用户上传视频时,上传请求会通过负载均衡服务器和网关服务器,到达视频上传微服务。视频上传微服务需要做两件事:一是把上传文件数据流写入视频文件暂存服务器;二是把用户名、上传时间、视频时长、视频标题等视频元数据写入分布式MySQL数据库。

视频文件上传完成后,视频上传微服务会生成一个视频上传完成消息,并将其写入到消息队列服务器。视频内容处理器将消费这个上传完成消息,并根据消息内容,从视频文件暂存服务器获取视频文件数据,进行处理。

视频内容处理器是一个由责任链模式构建起来的管道。在这个管道中,视频将会被顺序进行内容合规性审查、内容重复性及质量审查、内容标签生成、视频缩略图生成、统一视频转码处理等操作,如下图。

image-20240928115119399

合规且非重复的视频会经过统一转码,最终被写入分布式文件存储和CDN。这样视频上传处理就完成了,具体时序图如下。

image-20240928115136943

视频搜索模块

视频搜索引擎会根据用户提交的视频标题、上传用户等元数据,以及视频内容处理器生成的内容标签构建倒排索引。当用户搜索视频时,系统会根据倒排索引来检索符合条件的视频,并返回结果列表。结果列表在App端向用户呈现时,会将此前视频内容处理器生成的缩略图展现给用户,使用户对视频内容有个初步而直观的感受。

视频播放模块

当用户点击缩略图时,App开始播放视频。App并不需要下载完整个视频文件才开始播放,而是以流的方式一边下载视频数据,一边播放,使用户尽量减少等待,获得良好的观看体验。

详细设计

上传资源模块

技术选型:MinIO

理由:它的一大特点就是轻量,使用简单、功能强大,支持各种平台,单个文件最大5TB,兼容提供了Java、Python、GO等多版本SDK支持。

  • 本项目采用MinIO构建分布式文件系统,MinIO是一个非常轻量的服务,可以很简单的和其他应用结合使用。它兼容亚马逊S3云存储服务接口,非常适合于存储大容量非结构化的数据,例如图片、视频、日志文件、备份数据和容器/虚拟机镜像等

  • MinIO采用去中心化共享架构,每个节点是对等关系,通过Nginx可对MinIO进行负载均衡访问

  • 去中心化有什么好处?

    • 在大数据领域,通常的设计理念都是无中心和分布式。MinIO分布式模式可以帮助你搭建一个高可用的对象存储服务,你可以使用这些存储设备,而不用考虑其真实物理位置
  • MinIO使用纠删码技术来保护数据,它是一种恢复丢失和损坏数据的数学算法,它将数据分块,冗余地分散存储在各个节点的磁盘上,所有可用的磁盘组成一个集合,上图由8块硬盘组成一个集合,当上传一个文件时,会通过纠删码算法计算对文件进行分块存储,除了将文件本身分成4个数据块,还会生成4个校验块,数据块和校验开会分散的存储在这8块硬盘上

  • 使用纠删码的好处是即便丢失一半数量(N/2)的硬盘,仍可以恢复数据。例如上面集合中有4个以内的硬盘损害,仍可保证数据恢复,不影响上传和下载;但如果多余一半的硬盘损坏,则无法恢复

上传图片

基本需求:

  1. 前端进入上传图片界面
  2. 上传图片,请求媒资管理服务
  3. 媒资管理服务将图片文件存储在MinIO
  4. 媒资管理记录文件信息到数据库
  5. 保存课程信息,在内容管理数据库保存图片地址

媒资管理记录文件表

image-20240928160029871

上传功能逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
@Autowired
MediaFilesMapper mediaFilesMapper; // 自动注入MediaFilesMapper,用于数据库操作

@Autowired
MinioClient minioClient; // 自动注入MinioClient,用于与MinIO存储服务交互

@Value("${minio.bucket.files}")
private String bucket_files; // 从配置文件中读取MinIO的bucket名称

/**
* 上传文件的方法
* @param companyId 机构id
* @param uploadFileParamsDto 文件信息
* @param bytes 文件字节数组
* @param folder 桶下边的子目录
* @param objectName 对象名称
* @return 上传文件结果DTO
*/
@Override
public UploadFileResultDto uploadFile(Long companyId, UploadFileParamsDto uploadFileParamsDto, byte[] bytes, String folder, String objectName) {
// 计算文件的MD5值
String fileMD5 = DigestUtils.md5DigestAsHex(bytes);

// 如果目录为空,则自动生成一个目录
if (StringUtils.isEmpty(folder)) {
folder = getFileFolder(true, true, true);
} else if (!folder.endsWith("/")) {
// 如果目录末尾没有 / ,替他加一个
folder = folder + "/";
}

// 如果文件名为空,则设置其默认文件名为文件的md5码 + 文件后缀名
if (StringUtils.isEmpty(objectName)) {
String filename = uploadFileParamsDto.getFilename();
objectName = fileMD5 + filename.substring(filename.lastIndexOf("."));
}

// 拼接完整的对象名称
objectName = folder + objectName;

try {
// 将字节数组转换为输入流
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);

// 上传文件到MinIO
minioClient.putObject(PutObjectArgs.builder()
.bucket(bucket_files) // 指定bucket
.object(objectName) // 指定对象名称
.stream(byteArrayInputStream, byteArrayInputStream.available(), -1) // 文件流
.contentType(uploadFileParamsDto.getContentType()) // 文件内容类型
.build());

// 从数据库中查询文件信息
MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMD5);

// 如果文件信息不存在,则创建新的文件信息对象
if (mediaFiles == null) {
mediaFiles = new MediaFiles();
BeanUtils.copyProperties(uploadFileParamsDto, mediaFiles); // 复制文件信息
mediaFiles.setId(fileMD5); // 设置文件ID为MD5值
mediaFiles.setFileId(fileMD5); // 设置文件ID为MD5值
mediaFiles.setCompanyId(companyId); // 设置机构ID
mediaFiles.setBucket(bucket_files); // 设置bucket名称
mediaFiles.setCreateDate(LocalDateTime.now()); // 设置创建时间
mediaFiles.setStatus("1"); // 设置文件状态
mediaFiles.setFilePath(objectName); // 设置文件路径
mediaFiles.setUrl("/" + bucket_files + "/" + objectName); // 设置文件URL
mediaFiles.setAuditStatus("002003"); // 设置审核状态,002003表示审核通过
}

// 将文件信息保存到数据库
int insert = mediaFilesMapper.insert(mediaFiles);
if (insert <= 0) {
XueChengPlusException.cast("保存文件信息失败"); // 抛出异常
}

// 创建上传文件结果DTO
UploadFileResultDto uploadFileResultDto = new UploadFileResultDto();
BeanUtils.copyProperties(mediaFiles, uploadFileResultDto); // 复制文件信息
return uploadFileResultDto; // 返回上传文件结果DTO
} catch (Exception e) {
XueChengPlusException.cast("上传过程中出错"); // 抛出异常
}
return null; // 返回null
}

/**
* 自动生成目录
* @param year 是否包含年
* @param month 是否包含月
* @param day 是否包含日
* @return 生成的目录字符串
*/
private String getFileFolder(boolean year, boolean month, boolean day) {
StringBuffer stringBuffer = new StringBuffer();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); // 日期格式化
String dateString = dateFormat.format(new Date()); // 获取当前日期字符串
String[] split = dateString.split("-"); // 分割日期字符串
if (year) {
stringBuffer.append(split[0]).append("/"); // 添加年份
}
if (month) {
stringBuffer.append(split[1]).append("/"); // 添加月份
}
if (day) {
stringBuffer.append(split[2]).append("/"); // 添加日期
}
return stringBuffer.toString(); // 返回生成的目录字符串
}

该代码实现了一个文件上传的功能,包括以下步骤:

  1. 计算文件的MD5值。
  2. 处理目录和对象名称。
  3. 将文件上传到MinIO存储服务。
  4. 查询数据库中的文件信息。
  5. 创建或更新文件信息并保存到数据库。
  6. 返回上传文件结果DTO。
  7. 提供自动生成目录的方法。

问题1:

目前如果在updateFile方法上添加@Transactional,当调用updateFile方法前会开启数据库事务,如果上传文件过程时间较长(例如用户在上传超大视频文件),那么数据库的持续时间也会变长(因为在updateFile方法中,我们即要将文件上传到minio,又要将文件信息写入数据库),这样数据库连接释放就慢,最终导致数据库链接不够用。

方法:

由于文件上传可能是一个耗时较长的操作,而将其与数据库事务绑定在一起会导致事务持续时间过长,从而占用数据库连接资源,可能最终导致连接池耗尽。因此,解决方案是将耗时的文件上传操作与数据库操作分离,确保事务仅围绕数据库操作执行,而文件上传不受事务约束。

  1. uploadFile 方法:负责文件上传到 MinIO 或其他存储,不涉及事务管理。
  2. addMediaFilesToDB 方法:只负责将文件的元数据保存到数据库,且使用 @Transactional 进行事务管理。
  3. 整个流程:先上传文件,上传完成后再保存元数据,确保数据库事务的时间尽可能短。

示例如下:

  1. 文件上传与数据库操作分离
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@Service
public class MediaService {

@Autowired
private MinioClient minioClient; // MinIO 客户端

@Autowired
private MediaRepository mediaRepository; // 数据库操作的Repository

/**
* 上传文件至 MinIO,不使用事务管理。
* @param file 上传的文件
* @param bucketName MinIO中的存储桶名称
* @return 返回上传文件的路径
* @throws Exception 上传失败时抛出异常
*/
public String uploadFile(MultipartFile file, String bucketName) throws Exception {
String objectName = UUID.randomUUID().toString() + "_" + file.getOriginalFilename();

// 上传文件到 MinIO
minioClient.putObject(
PutObjectArgs.builder()
.bucket(bucketName)
.object(objectName)
.stream(file.getInputStream(), file.getSize(), -1)
.contentType(file.getContentType())
.build()
);

// 返回文件在 MinIO 中的路径
return bucketName + "/" + objectName;
}

/**
* 将文件元数据写入数据库,使用事务管理确保数据一致性。
* @param mediaFile 文件的元数据
*/
@Transactional
public void addMediaFilesToDB(MediaFile mediaFile) {
mediaRepository.save(mediaFile);
}

/**
* 上传文件并保存相关元数据信息到数据库
* @param file 上传的文件
* @param bucketName MinIO中的存储桶名称
* @param mediaFile 文件的元数据
* @throws Exception 上传失败时抛出异常
*/
public void processFileUpload(MultipartFile file, String bucketName, MediaFile mediaFile) throws Exception {
// 1. 上传文件到 MinIO,文件上传不受事务控制,避免长时间占用数据库连接
String filePath = uploadFile(file, bucketName);

// 2. 文件上传成功后,保存元数据到数据库,使用事务控制
mediaFile.setFilePath(filePath);
addMediaFilesToDB(mediaFile);
}
}

上传视频功能

需求分析

  1. 教学机构人员进入媒资管理列表查询自己上传的媒资文件
  2. 教育机构人员在媒资管理页面中点击上传视频按钮,打开上传界面
  3. 选择要上传的文件,自动执行文件上传
  4. 视频上传成功会自动处理,处理完成后可以预览视频

考量点:

断点续传

断点续传在视频上传中通过节省带宽、提升用户体验、减轻服务器负担、适应复杂网络环境,并减少用户上传失败的风险,极大地优化了上传流程。

流程如下

  1. 前端上传前先把文件分成块
  2. 一块一块的上传,上传中断后重新上传。已上传的分块则不用再上传
  3. 各分块上传完成后,在服务端合并文件

测试代码:

  • 文件分块的流程如下
    1. 获取源文件长度
    2. 根据设定的分块文件大小,计算出块数(向上取整,例如33.4M的文件,块大小为1M,则需要34块)
    3. 从源文件读取数据,并依次向每一个块文件写数据

文件分块测试代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Test
public void testChunk() throws IOException {
// 源文件
File sourceFile = new File("D:\\BaiduNetdiskDownload\\星际牛仔1998\\星际牛仔1.mp4");
// 块文件路径
String chunkPath = "D:\\BaiduNetdiskDownload\\星际牛仔1998\\chunk\\";
File chunkFolder = new File(chunkPath);
if (!chunkFolder.exists()) {
chunkFolder.mkdirs();
}
// 分块大小 1M
long chunkSize = 1024 * 1024 * 1;
// 计算块数,向上取整
long chunkNum = (long) Math.ceil(sourceFile.length() * 1.0 / chunkSize);
// 缓冲区大小
byte[] buffer = new byte[1024];
// 使用RandomAccessFile访问文件
RandomAccessFile raf_read = new RandomAccessFile(sourceFile, "r");
// 遍历分块,依次向每一个分块写入数据
for (int i = 0; i < chunkNum; i++) {
// 创建分块文件,默认文件名 path + i,例如chunk\1 chunk\2
File file = new File(chunkPath + i);
if (file.exists()){
file.delete();
}
boolean newFile = file.createNewFile();
if (newFile) {
int len;
RandomAccessFile raf_write = new RandomAccessFile(file, "rw");
// 向分块文件写入数据
while ((len = raf_read.read(buffer)) != -1) {
raf_write.write(buffer, 0, len);
// 写满就停
if (file.length() >= chunkSize)
break;
}
raf_write.close();
}
}
raf_read.close();
System.out.println("写入分块完毕");
}
  • 文件合并流程
    1. 找到要合并的文件并按文件分块的先后顺序排序
    2. 创建合并文件
    3. 依次从合并的文件中读取数据冰箱合并文件写入数据

文件合并的测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Test
public void testMerge() throws IOException {
// 块文件目录
File chunkFolder = new File("D:\\BaiduNetdiskDownload\\星际牛仔1998\\chunk\\");
// 源文件
File sourceFile = new File("D:\\BaiduNetdiskDownload\\星际牛仔1998\\星际牛仔1.mp4");
// 合并文件
File mergeFile = new File("D:\\BaiduNetdiskDownload\\星际牛仔1998\\星际牛仔1-1.mp4");
mergeFile.createNewFile();
// 用于写文件
RandomAccessFile raf_write = new RandomAccessFile(mergeFile, "rw");
// 缓冲区
byte[] buffer = new byte[1024];
// 文件名升序排序
File[] files = chunkFolder.listFiles();
List<File> fileList = Arrays.asList(files);
Collections.sort(fileList, Comparator.comparingInt(o -> Integer.parseInt(o.getName())));
// 合并文件
for (File chunkFile : fileList) {
RandomAccessFile raf_read = new RandomAccessFile(chunkFile, "r");
int len;
while ((len = raf_read.read(buffer)) != -1) {
raf_write.write(buffer, 0, len);
}
raf_read.close();
}
raf_write.close();
// 判断合并后的文件是否与源文件相同
FileInputStream fileInputStream = new FileInputStream(sourceFile);
FileInputStream mergeFileStream = new FileInputStream(mergeFile);
//取出原始文件的md5
String originalMd5 = DigestUtils.md5Hex(fileInputStream);
//取出合并文件的md5进行比较
String mergeFileMd5 = DigestUtils.md5Hex(mergeFileStream);
if (originalMd5.equals(mergeFileMd5)) {
System.out.println("合并文件成功");
} else {
System.out.println("合并文件失败");
}
}

梳理下:上传视频流程

image-20240928162234505

  1. 前端上传文件前,请求媒资接口层检查文件是否存在
    • 若存在,则不再上传
    • 若不存在,则开始上传,首先对视频文件进行分块
  2. 前端分块进行上传,上传前首先检查分块是否已经存在
    • 若分块已存在,则不再上传
    • 若分块不存在,则开始上传分块
  3. 前端请求媒资管理接口层,请求上传分块
  4. 接口层请求服务层上传分块
  5. 服务端将分块信息上传到MinIO
  6. 前端将分块上传完毕,请求接口层合并分块
  7. 接口层请求服务层合并分块
  8. 服务层根据文件信息找到MinIO中的分块文件,下载到本地临时目录,将所有分块下载完毕后开始合并
  9. 合并完成后,将合并后的文件上传至MinIO

参考:上传视频

这时候,有一个需求,就是视频内容处理完成之后,着3个操作是需要同时完成的,这就涉及到分布式事务的管理。

image-20240928171939979

有一种方案:定时器的方案

image-20240928172231926

  1. 在内容管理服务的数据库添加一个消息表(mq_message),消息表和课程发布表在同一个数据库
  2. 点击课程发布,通过本地事务向课程发布表写入课程发布信息,同时向消息表写入课程发布的信息,这两条记录需保证同时存在或者同时不存在
  3. 启动任务调度系统的定时调度,内容管理服务去定时扫描消息表的记录
  4. 当扫描到课程发布的消息时,即开始向Redis、ElasticSearch、MinIO完成同步数据的操作
  5. 同步数据的任务完成后删除消息表记录,并插入历史消息表

可能存在问题:

1、数据量多扫表慢

2、集中式扫表会影响正常业务

3、定时扫表存在延迟问题

image-20240928172258198

  1. 执行发布操作,内容管理服务存储课程发布表的同时,向消息表插入一条课程发布任务,这里使用本地事务保证课程发布信息保存成功,同时消息表也保存成功
  2. 任务调度服务定时调度内容管理服务扫描消息表,由于课程发布操作后向消息表插入了一条课程发布任务,所以此时会扫描到一条任务
  3. 拿到任务开始执行任务,分别向Redis、ElasticSearch、MinIO文件系统存储数据
  4. 任务完成后删除消息表记录

补充:xxl-job如何保证一任务只会触发一次?

https://github.com/xuxueli/xxl-job/blob/master/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java

image-20240928210737419

示例:

1. 使用 Saga 模式

Saga 模式是一种常见的分布式事务解决方案,它将全局事务拆分为多个小的局部事务,每个局部事务都有其补偿操作。如果某个步骤失败,系统可以通过执行补偿操作撤销之前的步骤,从而实现事务的回滚。

实现步骤

  • 阶段1:生成并上传课程静态页面到 MinIO。
  • 阶段2:将课程信息存储到 Redis。
  • 阶段3:将课程信息索引到 Elasticsearch。

关键点

  • 每一阶段都需要幂等性处理,确保重复执行不会导致不一致。
  • 每一阶段都有对应的补偿操作(即回滚操作),如果某个阶段失败,之前成功的阶段需要执行补偿操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
@Override
public boolean execute(MqMessage mqMessage) {
String courseId = mqMessage.getBusinessKey1();

// 1. 执行第一阶段:生成并上传课程静态页面到 MinIO
if (!generateAndUploadToMinIO(mqMessage, Long.valueOf(courseId))) {
return false;
}

// 2. 执行第二阶段:存储到 Redis
if (!storeToRedis(mqMessage, Long.valueOf(courseId))) {
// 如果发生失败,执行补偿操作:删除 MinIO 中的静态页面
rollbackMinIO(mqMessage, Long.valueOf(courseId));
return false;
}

// 3. 执行第三阶段:存储到 Elasticsearch
if (!saveCourseIndex(mqMessage, Long.valueOf(courseId))) {
// 如果Elasticsearch存储失败,执行补偿操作:删除 Redis 和 MinIO 中的数据
rollbackRedis(mqMessage, Long.valueOf(courseId)); // 回滚 Redis
rollbackMinIO(mqMessage, Long.valueOf(courseId)); // 回滚 MinIO
return false;
}

// 如果三个阶段都成功则返回 true
return true;
}

// 第一阶段:生成并上传到 MinIO
private boolean generateAndUploadToMinIO(MqMessage mqMessage, Long courseId) {
try {
// TODO: 生成课程静态页面并上传到 MinIO
log.debug("课程静态页面上传到 MinIO 成功,课程ID: {}", courseId);
return true;
} catch (Exception e) {
log.error("上传到 MinIO 失败: {}", e.getMessage());
return false;
}
}

// 第一阶段的补偿操作:删除 MinIO 中的数据
private void rollbackMinIO(MqMessage mqMessage, Long courseId) {
try {
// TODO: 删除 MinIO 中的静态页面
log.debug("MinIO 回滚成功,课程ID: {}", courseId);
} catch (Exception e) {
log.error("MinIO 回滚失败: {}", e.getMessage());
}
}

// 第二阶段:存储到 Redis
private boolean storeToRedis(MqMessage mqMessage, Long courseId) {
try {
// TODO: 将课程信息存入 Redis
log.debug("课程存入 Redis 成功,课程ID: {}", courseId);
return true;
} catch (Exception e) {
log.error("存入 Redis 失败: {}", e.getMessage());
return false;
}
}

// 第二阶段的补偿操作:删除 Redis 中的数据
private void rollbackRedis(MqMessage mqMessage, Long courseId) {
try {
// TODO: 删除 Redis 中的课程信息
log.debug("Redis 回滚成功,课程ID: {}", courseId);
} catch (Exception e) {
log.error("Redis 回滚失败: {}", e.getMessage());
}
}

// 第三阶段:存储到 Elasticsearch
private boolean saveCourseIndex(MqMessage mqMessage, Long courseId) {
try {
// TODO: 将课程信息存入 Elasticsearch
log.debug("课程索引到 Elasticsearch 成功,课程ID: {}", courseId);
return true;
} catch (Exception e) {
log.error("Elasticsearch 索引失败: {}", e.getMessage());
return false;
}
}

2. 使用基于消息队列的最终一致性

基于消息队列的最终一致性是一种松散耦合的分布式事务解决方案。基本思想是:将每个阶段的操作拆分为独立的子任务,并通过异步消息队列(如 RabbitMQ)来通知下一个阶段执行。通过消息重试和幂等操作来保证每个阶段最终执行成功,确保最终一致性。

搜索资源模块

视频搜索引擎会根据用户提交的视频标题、上传用户等元数据,以及视频内容处理器生成的内容标签构建倒排索引。当用户搜索视频时,系统会根据倒排索引来检索符合条件的视频,并返回结果列表。结果列表在App端向用户呈现时,会将此前视频内容处理器生成的缩略图展现给用户,使用户对视频内容有个初步而直观的感受。

优化设计

我们优化的角度是三高:高并发,高可用,高性能。

高可用

冷备、热备,暖备

冷备 是指备份系统在平时处于关闭状态,只有在主系统发生故障时才会启动。由于系统在故障发生后才开始恢复,因此恢复时间较长,通常需要手动干预和配置。

举例:

假设公司有一个文件服务器,冷备份的方案是每天晚上将文件系统拷贝到外部硬盘或云存储中。如果文件服务器宕机,管理员需要手动将备份文件恢复到新的服务器上,整个过程可能需要几个小时甚至更长时间。

热备 是指备份系统与主系统始终保持同步,随时可以接管主系统的工作。如果主系统发生故障,备份系统可以立即接管,几乎不会有中断。

举例:

某企业有一个电商网站,备份服务器每隔几个小时同步一次数据库。当主服务器宕机时,备份服务器可以在几分钟内接管业务,但可能会丢失最近几小时的数据。

暖备 是介于冷备和热备之间的一种备份方式。备份系统处于启动状态,但不处理业务数据。备份系统与主系统之间定期同步数据,或者以较长的延迟进行同步。一旦主系统发生故障,备份系统需要短时间的配置和调整后才能接管工作。

举例:

某企业有一个电商网站,备份服务器每隔几个小时同步一次数据库。当主服务器宕机时,备份服务器可以在几分钟内接管业务,但可能会丢失最近几小时的数据。

异地多活

异地多活 是一种高级的分布式系统架构设计,指的是在多个物理地理位置(通常是不同的城市或国家)部署多个数据中心,这些数据中心同时对外提供服务,并且这些服务处于活跃状态,能够同时处理用户请求。

举例:

假设某个全球运营的电子商务平台采用了异地多活架构。它在美国、欧洲和亚洲分别部署了数据中心,并且所有数据中心都可以同时处理用户请求。用户在美国下单时,可能会优先请求美国的数据中心,而在欧洲的用户请求则会路由到欧洲的数据中心。同时,所有订单数据会在后台进行跨数据中心同步。

如果美国的数据中心因自然灾害宕机,欧洲和亚洲的数据中心会继续无缝接受全球用户的请求,确保服务不间断运行。这种架构极大提高了系统的可用性容灾能力

同城容灾

压力测试

压测如何避免影响线上用户?

1.单独环境 2.控制压测范围 3.业务低峰压测 4.控制压测流量

武器:【解耦,隔离(单体-》微服务),异步,备份,重试,熔断,补偿,降级,多活】

网络攻击

  • SQL: SQL注入攻击
  • XSS: 跨站点脚本攻击
  • CSC: 注释与异常信息泄露
  • CSRF: 跨站点请求伪造
  • FB: 路径遍历与强制浏览- path表示要处理的请求路径,可以为空,表示处理所有请求路径。

高性能

1.接口性能优化的 15 个技巧

本地·缓存,分布式缓存,分库分表

2.读写分离

补充:主从延迟怎么办

法1:强制读主库

法2:二次读取

法3:

方案 描述 优缺点
Sleep 方案 主库更新后,先让从库 sleep(等待) 1 秒,再从从库读取数据。 简单易实现,但无法精确控制延迟时间,可能导致读取依然是旧数据。
判断主备无延迟方案 每次从库执行查询请求前,先判断 seconds_behind_master 是否为 0,如果不为 0,则等待同步完成。 可以确保从库数据是最新的,但每次查询前都要判断,可能增加查询延迟。
等待库位点方案 在从库执行查询前,确保从库已经同步到主库的特定位置,确保从库数据最新。 可确保数据一致性,但实现稍复杂,适用于对一致性要求较高的场景。
等 GTID 方案 基于 MySQL 5.7 的 GTID,允许在执行更新类事务前确认 GTID 已同步。 可以精准控制数据同步状态,确保一致性,但涉及 GTID 的管理和实现较复杂,适合需要严格数据同步的场景。

等 GTID 方案 的核心流程确实是先从 主库 取出 GTID,然后再在 从库 上判断该 GTID 是否已经同步并应用完成。在这个方案中,GTID(Global Transaction Identifier) 用于标识事务,确保从库的复制过程与主库的状态保持一致。

流程:

  1. 主库执行写操作
    • 用户提交订单,主库执行 INSERT INTO orders (id, user_id, total) VALUES (1, 123, 100.0);
    • 该事务生成了一个 GTID:GTID-1,并记录到主库的 binlog 中。
  2. 从库复制 GTID 事务
    • 从库订阅主库的 binlog,接收到 GTID-1 对应的事务,并在从库上执行该 INSERT 操作,确保订单信息同步到从库。
  3. 应用从库读取操作
    • 用户提交订单后,应用程序准备展示订单详情。应用首先从主库获取该事务的 GTID(即 GTID-1)。
    • 在从库执行读操作前,应用程序会检查 GTID-1 是否已经在从库同步并应用。如果还未同步,则等待从库完成同步;如果同步完成,则执行读操作。
  4. 确保数据一致性
    • 从库确认 GTID-1 已经应用后,应用程序从从库读取订单详情,并返回给用户。这确保了用户看到的订单信息是最新的,避免了主从延迟导致的脏读问题。

高并发

什么是服务降级?什么是服务熔断?

服务降级

服务降级 是指在某些特殊情况下(如服务响应超时、服务不可用等),为保证核心功能的可用性,主动降低某些非核心服务的质量或功能,甚至直接返回预设的默认值或降级的响应。

  • 主要目的是保证系统的核心功能可用。
  • 当某个服务不可用时,改为提供简化的功能,或者返回默认的降级响应。
  • 通常是在被调用方不可用时,调用方做出降级处理。

服务熔断

服务熔断 类似于电路中的熔断机制。当某个服务出现大量故障或超时时,为了防止该服务的持续调用导致系统整体性能下降或崩溃,熔断机制会主动切断对该服务的请求,直接返回错误,避免雪崩效应。

  • 主要目的是防止故障蔓延
  • 熔断机制会对服务的健康状况进行监控,如果失败率超过某个阈值,直接停止调用该服务。
  • 服务熔断后,经过一段时间会尝试恢复调用(即“半开状态”),如果服务恢复正常,则恢复调用。
特性 服务降级 服务熔断
触发条件 被调用方不可用或超时 调用方监控到某服务的失败率达到一定阈值
处理方式 调用方主动提供简化的功能或默认响应 熔断器主动切断对该服务的请求
目标 保证核心业务可用,降低非核心功能的质量 防止故障蔓延,保护系统整体稳定性
恢复机制 降级后可以一直使用降级方案 熔断后在一段时间内会尝试恢复服务调用

Java中的服务降级与熔断机制

常用组件

  1. Resilience4j

Resilience4j 是一个轻量级的容错库,专为Java 8和函数式编程设计。它提供了服务降级、服务熔断、限流、重试等功能。

主要功能:

  • CircuitBreaker:实现服务熔断。
  • RateLimiter:实现限流。
  • Retry:实现重试机制。
  • Bulkhead:实现隔离机制。
  • TimeLimiter:实现超时控制。
  1. Hystrix

Hystrix 是由Netflix开源的一个容错库,提供了服务降级、服务熔断、限流、重试等功能。Hystrix已经进入维护模式,不再积极开发,但仍然被广泛使用。

主要功能:

  • Circuit Breaker:实现服务熔断。
  • Fallback:实现服务降级。
  • Thread Pool Isolation:实现线程池隔离。
  • Request Collapsing:实现请求合并。
  1. Sentinel

Sentinel 是阿里巴巴开源的一个面向分布式服务架构的轻量级高可用流量控制组件。它提供了服务降级、服务熔断、限流、系统自适应保护等功能。

主要功能:

  • Flow Control:实现流量控制。
  • Circuit Breaker:实现服务熔断。
  • Degrade:实现服务降级。
  • System Adaptive Protection:实现系统自适应保护。
Sentinel Hystrix
隔离策略 信号量隔离(并发线程数限流) 线程池隔离/信号量隔离
熔断降级策略 基于响应时间、异常比率、异常数 基于异常比率
实时指标实现 滑动窗口(LeapArray) 滑动窗口(基于 RxJava)
规则配置 支持多种数据源 支持多种数据源
扩展性 多个扩展点 插件的形式
基于注解的支持 支持 支持
调用链路信息 支持同步调用 不支持
限流 基于 QPS / 并发数,支持基于调用关系的限流 有限支持
流量整形 支持慢启动、匀速器模式 不支持
系统负载保护 支持 不支持
控制台 开箱即用,可配置规则、查看秒级监控、机器发现等 较为简单
常见框架的适配 Servlet、Spring Cloud、Dubbo、gRPC 等 Servlet、Spring Cloud Netflix

第十四章 Sentinel实现熔断与限流

限流方法

看我之前博客总结内容

  • 单机:Guava 的 RateLimiter 来实现限流。

  • 集群:如果需要在集群环境中控制流量,可以使用 Redis(例如 Redis + Lua 脚本)或分布式限流框架(如 Sentinel、Hystrix 等)来实现全局限流。

缓存预热

1.启动时候加载

2.定时加载

3.用时加载

带宽处理

前面的分析得知需要的总带宽是88Tb,这是一个非常巨大的数字。如果单纯靠QuickTok自己的数据中心来承担这个带宽压力,技术挑战和成本都非常巨大。只有通过CDN将用户的网络通信请求就近返回,才能缓解数据中心的带宽压力。

App请求获取视频数据流的时候,会优先检查离自己比较近的CDN中是否有视频数据。如果有,直接从CDN加载数据,如果没有,才会从QuickTok数据中心获取视频数据流。

如果用户的大部分请求都可以通过CDN返回,那么一方面可以极大加快用户请求的响应速度,另一方面又可以较大缓解数据中心的网络和硬盘负载压力,进一步提升应用整体的性能。

通常的CDN设计,是在CDN中没有用户请求的数据时,进行回源,即由CDN请求数据中心返回需要的数据,然后缓存在CDN本地。

但QuickTok考虑到了短视频的特点:大V、网红们发布的短视频会被更快速、更广泛地播放。因此针对粉丝量超过10万的用户,系统将采用主动推送CDN的方法,以提高CDN的命中率,优化用户体验,如图:

image-20240928154229932

从图中可以看出,视频内容处理器进行完视频处理后,一方面会将视频存储到前面说过的视频存储系统中,另一方面又会调用CDN推送服务。然后,CDN推送服务将调用大数据平台,获取视频上传者的活跃粉丝数、粉丝分布区域等数据。如果是10万粉丝以上的用户发布了短视频,CDN推送服务会根据其粉丝活跃的区域,将视频推送到对应区域的CDN服务器上。

短视频的完播率通常不足30%,所以QuickTok也不需要将完整视频推送到CDN,只需要根据视频发布者的历史播放记录,计算其完播率和播放期望进度,然后将短视频切分成若干chunk,将部分chunk推送到CDN即可。

业界一般共识,视频应用CDN处理的带宽大约占总带宽的95%以上,也就是说,通过合理使用CDN,QuickTok数据中心需要处理的带宽压力不到4Tb。

推荐:

短视频系统设计:支持三千万用户同时在线看视频

三高系统的架构设计方案:高并发、高可用、高性能

学成在线笔记+踩坑(5)——【媒资模块】上传视频,断点续传

如何实现一个通知系统?

指标监控

PrometheusGrafana 是现代监控和可视化系统中常见的搭档。

Prometheus+Grafana+NodeExporter:构建出色的Linux监控解决方案,让你的运维更轻松

Prometheus

Prometheus 是一个开源的监控系统,主要用于收集、存储、查询和告警基于时间序列的数据。它的设计目标是高效地监控分布式系统,尤其适合微服务架构和容器化环境。

Prometheus 的主要功能和用途

  1. 数据采集
    • Prometheus 定期从被监控的目标(如服务器、应用、容器等)中抓取监控数据
    • 这些数据通常是由被监控对象通过 HTTP 提供的(例如,暴露一个 /metrics 端点)。Prometheus 抓取这些数据,并将其存储为时间序列格式。
    • Prometheus Exporters 是 Prometheus 的扩展工具,能够帮助从各种系统(如 MySQL、Redis、Kubernetes 等)中收集监控数据。
  2. 时间序列数据存储
    • Prometheus 将抓取到的监控数据存储为时间序列数据。时间序列数据由一组指标(metrics)随时间变化的值组成。
    • 例如,某个应用的 CPU 使用率、内存使用量、HTTP 请求响应时间等,都可以作为 Prometheus 中的时间序列数据。
  3. 查询和分析
    • Prometheus 提供了强大的查询语言——PromQL(Prometheus Query Language),可以用于对存储的数据进行复杂的查询和聚合操作。
    • 开发者和运维人员可以使用 PromQL 获取监控指标的详细信息,例如某个时间段内的服务器资源使用情况、平均响应时间等。
  4. 告警
    • Prometheus 提供了告警规则功能,可以根据监控指标的值触发告警。
    • 当某个条件满足(例如 CPU 使用率超过 90%),Prometheus 会生成告警,并通过Alertmanager 发送告警通知(如邮件、Slack、PagerDuty 等)。

适用场景

  • 分布式系统监控:Prometheus 适用于监控分布式系统中的多个服务和节点,是微服务架构和容器化环境的理想选择。
  • 高效的时间序列数据库:Prometheus 适合收集和存储高频率的时间序列数据,例如每秒钟的系统资源使用情况。
  • 告警和自动化运维:通过告警规则和 Alertmanager,Prometheus 可以在问题出现时自动通知相关人员,减少人为干预。

Grafana:数据可视化和仪表盘工具

Grafana 是一个开源的数据可视化工具,用于将监控数据通过图形化的仪表盘展示出来。它可以与多种数据源集成(包括 Prometheus),提供灵活的图表和面板,帮助用户直观地查看系统和应用的运行状态。

工作原理是这样的:

mysql-Exporter在每个节点服务器都会启动一个进程监听改节点mysql的指标,然后专门监控的Prometheus就会定时发送http请求收集指标