在快乐学习项目中有个需求,课程发布时利用 RabbitMQ 来同步数据,确保 Redis、Elasticsearch 和 MinIO 之间的数据一致性。
需求分析
在现代的在线教育平台中,数据的一致性和快速检索是确保平台运行流畅的重要基础。为了提高用户体验,通常会将频繁访问的数据缓存到 Redis 中,从而提高读写性能。同时,为了支持复杂的搜索需求,数据会同步到 Elasticsearch(ES)中,提供快速的全文检索功能。虽然 Redis 和 ES 各自具备优势,但它们并非直接存储数据库,因此需要确保它们与数据库(例如 MySQL)之间的数据一致性,防止用户看到过时或不完整的信息。
1.业务场景
当管理员或系统执行课程发布操作时,新的课程信息需要:
- 更新Redis 中的缓存,确保即时访问性能。
- 同步到Elasticsearch,以便支持新课程的搜索功能。
- 上传到MinIO,以存储课程的多媒体资源文件。
2. 面临的挑战
- 数据一致性:确保Redis、Elasticsearch、MinIO之间的数据同步不会出现偏差(如部分同步成功,部分失败)。
- 性能瓶颈:每个目标系统(Redis、Elasticsearch、MinIO)可能具有不同的处理速度,无法同步完成时可能会造成延迟。
- 容错和失败恢复:如果其中某个系统出现故障(如Elasticsearch不可用),需要保证任务不会丢失,能够在系统恢复后重新执行。
Q:为什么要保证一致性,不一致会出现什么场景
场景 1: 用户更新了课程信息,但搜索结果没有更新
过程:
- 用户更新课程信息:用户或管理员在系统中更新了某门课程的名称、描述或其他内容。
- Redis 缓存更新成功:由于 Redis 是缓存系统,系统会优先更新 Redis 中的课程数据,并确保用户从 Redis 获取最新的课程信息。
- Elasticsearch 更新延迟或失败:由于某些原因(可能是网络问题、消息丢失等),Elasticsearch 中的索引没有及时同步更新,仍然保存着旧的数据。
用户体验:
- 当用户访问该课程详情页面时,系统从 Redis 中读取数据,因此用户看到的是最新更新的课程信息。
- 当用户通过 搜索功能 查找这门课程时,由于 Elasticsearch 的数据没有更新,用户看到的仍然是旧的课程名称或描述。
方案调研
1.同步双写
优点:实现简单
缺点:
- 业务耦合,商品的管理中耦合大量数据同步代码
- 影响性能,写入两个存储,响应时间变长
- 不便扩展:搜索可能有一些个性化需求,需要对数据进行聚合,这种方式不便实现
2.异步双写
优点:
- 解耦合,商品服务无需关注数据同步
- 实时性较好,使用MQ,正常情况下,同步完成在秒级
缺点:
3.定时任务
摘自:三分恶公众号
这种方式:
优点:实现比较简单
缺点:
4.数据订阅
在开始数据订阅之前,先回顾下MySQL 主从复制原理。
MySQL 主从复制原理
MySQL 主从复制的核心机制是通过主库 (Master
) 和从库 (Slave
) 之间的数据同步,以确保多个数据库实例间的数据一致性。其工作过程可以划分为三个主要步骤:
1. 主库 (Master) 写入二进制日志 (Binary Log)
- 当 Master 上的数据发生修改(如
INSERT
、UPDATE
、DELETE
等操作)时,MySQL 会将这些数据变更以二进制日志事件 (Binary Log Events) 的形式写入到 Binary Log 中。
- Binary Log 是 Master 记录所有数据变更的日志文件,包含了每一次事务的详细操作。这个日志是 MySQL 复制的核心机制,Slave 将依靠它来同步数据。
- 可以通过命令
SHOW BINLOG EVENTS
查看日志中的具体事件。
2.从库 (Slave) 读取并保存中继日志 (Relay Log)
- Slave 将通过专门的 IO 线程 与 Master 建立连接,并从 Master 的 Binary Log 中异步地读取二进制日志事件。
- 读取到的二进制日志事件会被复制并写入到从库的中继日志 (Relay Log) 中。中继日志是从库本地的一个临时日志,保存了从 Master 获取的所有二进制日志事件。
- Relay Log 的作用是供从库的 SQL 线程读取,并将这些事件应用到从库的数据中。
3. 从库 (Slave) 重放中继日志 (Relay Log)
- Slave 的 SQL 线程 负责从中继日志中读取二进制日志事件,并将这些事件按照原始顺序重放,即执行与 Master 上相同的 SQL 或事务操作。
- 通过重放这些事件,Slave 将自己的数据更新为与 Master 一致的状态,完成数据同步。
- 这个过程是顺序执行的,因此 Slave 的数据状态会逐步追赶 Master,达到一致性。
Canal 工作原理
Canal 作为一个 MySQL 增量数据订阅和消费组件,核心是基于 MySQL 主从复制的原理,通过监听 MySQL 的 binary log (binlog) 来实现增量数据的捕获。以下是 Canal 的工作原理的详细步骤:
1. Canal 模拟 MySQL Slave,发送 dump 协议
- Canal 通过模拟 MySQL 从库(Slave)的交互协议,伪装自己为一个 MySQL 从库。它与 MySQL 主库(Master)建立连接,发送
dump
协议,表示请求获取主库的 binary log。
- 在 MySQL 主从复制的机制中,从库会通过
dump
协议请求主库发送二进制日志,Canal 通过这种方式订阅 MySQL 的 binlog。
2. MySQL Master 推送 Binary Log 给 Canal
- 当 MySQL 主库收到 Canal 的
dump
请求时,会开始向 Canal 推送主库的 binary log。
- Binary log 是 MySQL 数据库记录的所有数据变更的日志,包括
INSERT
、UPDATE
、DELETE
、DDL(如 CREATE TABLE
)等操作。
- 这些推送的 binlog 数据是以 byte 流 的形式进行传输的。
3. Canal 解析 Binary Log
- Canal 接收到来自 MySQL 主库的 binary log(原始的
byte
流数据)后,会对其进行解析,转化为可读的 binary log 事件(Binlog Event
)。
- Canal 通过解析每个 binlog event,提取出相应的表名、操作类型(如
INSERT
、UPDATE
、DELETE
)、字段值等信息。
- 最终,Canal 将这些解析后的事件对象转换为结构化的数据,通常以 JSON 的形式输出供下游消费者(如 Kafka、Redis、Elasticsearch 等)使用。
总结
- 模拟 MySQL Slave:Canal 通过模拟 MySQL 从库的行为,与主库建立连接并发送
dump
协议,订阅 binary log。
- 获取 Binary Log:MySQL 主库根据 Canal 的请求,实时推送 binary log,这些日志记录了主库上发生的所有数据变更。
- 解析 Binary Log:Canal 接收到主库推送的二进制日志后,对其进行解析,将日志中的数据变更事件(如
INSERT
、UPDATE
、DELETE
)转化为结构化的对象或 JSON 格式。下游系统可以通过这些解析后的数据进行业务处理、同步或实时分析。
数据流示意:
- MySQL 主库 → Canal Client:通过 binlog 订阅,Canal Client 伪装成从库,订阅主库的 binlog 日志。
- Canal Client → Canal Adapter:Canal Client 将 binlog 数据发送给 Canal Adapter 进行转换。
- Canal Adapter → Elasticsearch:Canal Adapter 将数据转换为 Elasticsearch 的格式并同步到 ES。
大致实现
引入依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| <dependencies> <dependency> <groupId>com.xpand</groupId> <artifactId>canal-client</artifactId> <version>1.0.4</version> </dependency>
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.15.1</version> </dependency>
<dependency> <groupId>io.minio</groupId> <artifactId>minio</artifactId> <version>8.3.2</version> </dependency>
<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.17.0</version> </dependency> </dependencies>
|
创建一个 CourseHandler 类,监听 MySQL 中与课程相关的表(假设表名为 tb_course),并将变更的数据同步到 Redis、Elasticsearch 和 MinIO。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
| import com.alibaba.fastjson.JSON; import com.xpand.starter.canal.annotation.CanalTable; import com.xpand.starter.canal.client.core.EntryHandler; import io.minio.MinioClient; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.redisson.api.RBucket; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component @CanalTable(value = "tb_course") public class CourseHandler implements EntryHandler<Course> {
@Autowired private RedissonClient redissonClient;
@Autowired private RestHighLevelClient esClient;
@Autowired private MinioClient minioClient;
@Override public void insert(Course course) { syncToRedis(course);
syncToElasticsearch(course);
uploadToMinIO(course); }
@Override public void update(Course before, Course after) { syncToRedis(after);
syncToElasticsearch(after);
uploadToMinIO(after); }
@Override public void delete(Course course) { deleteFromRedis(course.getId());
deleteFromElasticsearch(course.getId());
deleteFromMinIO(course); }
private void syncToRedis(Course course) { String redisKey = "course:" + course.getId(); RBucket<String> bucket = redissonClient.getBucket(redisKey); bucket.set(JSON.toJSONString(course)); }
private void syncToElasticsearch(Course course) { try { IndexRequest request = new IndexRequest("courses") .id(String.valueOf(course.getId())) .source("title", course.getTitle(), "description", course.getDescription(), "price", course.getPrice(), "status", course.getStatus()); esClient.index(request, RequestOptions.DEFAULT); } catch (Exception e) { e.printStackTrace(); } }
private void uploadToMinIO(Course course) { try { String bucketName = "course-files"; String objectName = "course-cover-" + course.getId(); byte[] courseCoverData = course.getCoverImage(); minioClient.putObject(bucketName, objectName, courseCoverData, courseCoverData.length, null, null, null); } catch (Exception e) { e.printStackTrace(); } }
private void deleteFromRedis(Long courseId) { String redisKey = "course:" + courseId; redissonClient.getBucket(redisKey).delete(); }
private void deleteFromElasticsearch(Long courseId) { try { esClient.delete(new DeleteRequest("courses", String.valueOf(courseId)), RequestOptions.DEFAULT); } catch (Exception e) { e.printStackTrace(); } }
private void deleteFromMinIO(Course course) { try { String bucketName = "course-files"; String objectName = "course-cover-" + course.getId(); minioClient.removeObject(bucketName, objectName); } catch (Exception e) { e.printStackTrace(); } } }
|
解析代码
insert(Course course)
: 当 MySQL 中插入一条课程数据后,Canal 会捕获到该事件,并调用 insert
方法。此方法会将课程信息同步到 Redis、Elasticsearch,并将相关文件上传到 MinIO。
update(Course before, Course after)
: 当 MySQL 中某条课程数据发生更新时,Canal 会调用 update
方法,更新 Redis 中的缓存、Elasticsearch 中的索引,并同步更新 MinIO 中的文件。
delete(Course course)
: 当 MySQL 中某条课程数据被删除时,Canal 会调用 delete
方法,分别从 Redis、Elasticsearch 和 MinIO 中删除相应的数据和文件。
继续思考,数据同步通常分为 增量同步 和 全量同步
- 增量同步 是指只同步自上次同步后发生改变的数据
- 全量同步 是指将数据库的全部数据进行同步
在实际java项目中,我们通常会使用全量同步和增量同步结合的方式来保证数据的一致性和实时性。
接下来给出大致实现
1.在系统启动时,调用全量同步服务来同步 MySQL 中的所有课程数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component;
@Component public class StartupFullSyncRunner implements ApplicationRunner {
private final CourseFullSyncService courseFullSyncService;
public StartupFullSyncRunner(CourseFullSyncService courseFullSyncService) { this.courseFullSyncService = courseFullSyncService; }
@Override public void run(ApplicationArguments args) { courseFullSyncService.fullSync(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
| import io.minio.MinioClient; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.redisson.api.RBucket; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Service; import java.util.List;
@Service public class CourseFullSyncService {
@Autowired private JdbcTemplate jdbcTemplate;
@Autowired private RedissonClient redissonClient;
@Autowired private RestHighLevelClient esClient;
@Autowired private MinioClient minioClient;
public void fullSync() { String sql = "SELECT * FROM tb_course"; List<Course> courseList = jdbcTemplate.query(sql, (rs, rowNum) -> { Course course = new Course(); course.setId(rs.getLong("id")); course.setTitle(rs.getString("title")); course.setDescription(rs.getString("description")); course.setPrice(rs.getBigDecimal("price")); course.setCoverImage(rs.getBytes("cover_image")); return course; });
for (Course course : courseList) { syncToRedis(course); syncToElasticsearch(course); uploadToMinIO(course); } }
private void syncToRedis(Course course) { String redisKey = "course:" + course.getId(); RBucket<String> bucket = redissonClient.getBucket(redisKey); bucket.set(course.toString()); }
private void syncToElasticsearch(Course course) { try { IndexRequest request = new IndexRequest("courses") .id(String.valueOf(course.getId())) .source("title", course.getTitle(), "description", course.getDescription(), "price", course.getPrice()); esClient.index(request, RequestOptions.DEFAULT); } catch (Exception e) { e.printStackTrace(); } }
private void uploadToMinIO(Course course) { try { String bucketName = "course-files"; String objectName = "course-cover-" + course.getId(); byte[] coverImage = course.getCoverImage(); minioClient.putObject(bucketName, objectName, coverImage, coverImage.length, null, null, null); } catch (Exception e) { e.printStackTrace(); } } }
|
2.通过 Canal 实现增量捕获并同步到 Redis、Elasticsearch 和 MinIO。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
| import com.alibaba.fastjson.JSON; import com.xpand.starter.canal.annotation.CanalTable; import com.xpand.starter.canal.client.core.EntryHandler; import io.minio.MinioClient; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.redisson.api.RBucket; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component @CanalTable(value = "tb_course") public class CourseHandler implements EntryHandler<Course> {
@Autowired private RedissonClient redissonClient;
@Autowired private RestHighLevelClient esClient;
@Autowired private MinioClient minioClient;
@Override public void insert(Course course) { syncToRedis(course); syncToElasticsearch(course); uploadToMinIO(course); }
@Override public void update(Course before, Course after) { syncToRedis(after); syncToElasticsearch(after); uploadToMinIO(after); }
@Override public void delete(Course course) { deleteFromRedis(course.getId()); deleteFromElasticsearch(course.getId()); deleteFromMinIO(course); }
private void syncToRedis(Course course) { String redisKey = "course:" + course.getId(); RBucket<String> bucket = redissonClient.getBucket(redisKey); bucket.set(JSON.toJSONString(course)); }
private void syncToElasticsearch(Course course) { try { IndexRequest request = new IndexRequest("courses") .id(String.valueOf(course.getId())) .source("title", course.getTitle(), "description", course.getDescription(), "price", course.getPrice()); esClient.index(request, RequestOptions.DEFAULT); } catch (Exception e) { e.printStackTrace(); } }
private void uploadToMinIO(Course course) { try { String bucketName = "course-files"; String objectName = "course-cover-" + course.getId(); byte[] coverImage = course.getCoverImage(); minioClient.putObject(bucketName, objectName, coverImage, coverImage.length, null, null, null); } catch (Exception e) { e.printStackTrace(); } }
private void deleteFromRedis(Long courseId) { String redisKey = "course:" + courseId; redissonClient.getBucket(redisKey).delete(); }
private void deleteFromElasticsearch(Long courseId) { try { esClient.delete(new DeleteRequest("courses", String.valueOf(courseId)), RequestOptions.DEFAULT); } catch (Exception e) { e.printStackTrace(); } }
private void deleteFromMinIO(Course course) { try { String bucketName = "course-files"; String objectName = "course-cover-" + course.getId(); minioClient.removeObject(bucketName, objectName); } catch (Exception e) { e.printStackTrace(); } } }
|
技术选型
先了解下这个RabbitMQ的常用方法。
回顾MQ
架构图
- Producer(生产者):指负责发送消息的一方,生产者将消息发送到 RabbitMQ 的交换机(Exchange)。
- VHost(虚拟主机):类似于系统中的命名空间,允许将 RabbitMQ 的资源(如交换机、队列、绑定等)进行隔离。每个 VHost 拥有独立的资源和访问权限,VHost 之间互不干扰,常用于多租户系统中。
- Exchange(交换机):交换机是消息路由的核心,它接收来自生产者的消息,并根据路由规则将消息转发到相应的队列中。
- Queue(队列):队列是消息的存储位置,消息会保存在队列中,等待消费者(Consumer)来取出并处理。
- Binding(绑定):绑定是交换机与队列之间的连接,定义了交换机如何将消息路由到特定的队列。
- Consumer(消费者):消费者负责从队列中提取消息,对消息进行处理和消费。
工作模式
rabbitMQ一共有6种工作模式(消息分发方式)分别是简单模式、工作队列模式、发布订阅模式、路由模式、主题模式以及RPC模式。
RabbitMQ的发布-订阅模式通常使用推模式(push model)。在这种模式下,消息从交换机推送到绑定的队列,然后由消费者从队列中接收消息。消费者不需要主动请求消息,而是由RabbitMQ自动将消息分发给已连接的消费者。
可能问题
为什么选用RabbitMQ,有考虑过其他的MQ吗
近些年ActiveMQ更新较少,而且没有大规模吞吐量场景的验证,就不考虑了。
【ActiveMQ 是 Apache 基金会开发的一款开源消息中间件,支持多种消息传输协议、消息队列模式和发布/订阅模式。它】
kafka的话,它对数据丢失不敏感,不能保证消息的完全可靠,且kafka适合较高数据吞吐量。
有考虑过RocketMQ,无论是从可用性、吞吐量、成功案例方面来说,它都不错。
但公司的运维团队对RabbitMQ更熟悉,而且更看重消息的可靠性,最后选择了RabbitMQ。
为什么选用RabbitMQ,有考虑过其他的MQ吗
RabbitMQ消费端限流
问题描述:由于消费者处理速度较慢,可能导致消息积压,进而影响 Redis、Elasticsearch、MinIO 数据的实时性,导致延迟较高。
在RabbitMQ中,为了实现消费端限流,可以使用**消息预取(prefetch count)**机制。这个机制允许消费者在处理完一定数量的消息后再请求更多的消息,从而实现限流。
实现步骤
-
设置预取值:消费者可以通过设置prefetch count
来限制未确认消息的数量。这样,RabbitMQ只会发送指定数量的消息给消费者,直到这些消息被确认。
-
使用basic.qos
方法:在消费者代码中配置预取值。可以通过basic.qos
方法设置,例如:
1
| channel.basic_qos(prefetch_count=10)
|
这表示消费者在处理完10条消息之前,不会再接收新的消息。
好处
- 控制负载:防止消费者因为处理过多消息而超载。
- 优化资源:消费者可以根据自身能力处理消息,提高系统的稳定性和效率。
RabbitMQ重复消费问题
问题描述:可能会因为网络抖动或消费者崩溃等原因,导致 RabbitMQ 的消息被重复消费,进而导致 Redis、Elasticsearch 或 MinIO 中重复的数据写入。
在 RabbitMQ 中,消息的消费是有确认机制的。正常情况下,当消费者成功处理消息后,会发送一个 确认消息(ACK)给 RabbitMQ,随后 RabbitMQ 会将该消息从队列中删除,确保下次不会再投递相同的消息。
问题:
如果由于 网络延迟 或 其他问题,确认消息没有及时到达 RabbitMQ,RabbitMQ 会认为该消息尚未被消费,从而导致 消息被重复投递。在这种情况下,消费者可能会多次收到相同的消息,造成 消息重复消费 问题。
解决方案:
为了防止这种情况,消费者需要进行去重处理。通常,我们可以使用 幂等性 来确保即使消息被多次投递,处理结果依然是正确的。
幂等性处理:
当我们设计系统时,幂等性可以通过以下方式来实现常见的 “一键、二判、三更新” 的策略:
- 一键:为每条消息生成一个唯一的标识符(如消息 ID),并将此标识符与消息体一同存储。
- 二判:每次处理消息时,先判断该标识符是否已经存在。如果存在,说明该消息已经被消费过,直接跳过处理;如果不存在,则继续进行处理。
- 三更新:在消息处理成功后,将标识符和处理结果存储到数据库,确保下次同样的消息不会再次被处理。
RabbitMQ 保证消息不丢失
问题描述:在消息从 RabbitMQ 发送到消费者的过程中,可能会出现消息丢失的情况。导致 Redis、Elasticsearch、MinIO 数据不一致。
- 队列和交换机的持久化:
- 持久化交换机:在 RabbitMQ 重启后,交换机依然存在。
- 持久化队列:确保 RabbitMQ 重启后队列依然存在。
- 消息持久化:将消息标记为持久化,使其在系统崩溃或重启后仍然存在。
- 消费者确认机制:
- 通过手动确认机制(
basicAck
),确保消息被消费者成功处理后才从队列中移除。
- 如果消费者未确认消息,RabbitMQ 会将消息重新投递给其他消费者。
- 发布确认机制(Publisher Confirms):
- 生产者开启发布确认模式,确保消息成功发送到 RabbitMQ。
- 支持同步或异步确认,确保生产者知道消息是否成功到达队列。
- 镜像队列(Mirrored Queue):
- 使用镜像队列将消息同步到多个节点,避免单点故障导致消息丢失。
- 即使节点故障,消息仍然可以从其他节点读取。
- 消息重试机制:
- 通过死信队列(DLX)或延迟队列实现消息重试,确保处理失败的消息不会丢失。
- 可以设计幂等性逻辑应对消息的重复投递。
- 无法做到 100% 不丢失:
- 即使使用上述方案,由于系统崩溃、磁盘故障、网络中断等不可控因素,仍然无法达到 100% 消息不丢失的保证。
- 结合 监控和报警、容灾备份、幂等性 设计,可以进一步提高系统的可靠性。
【version2】
- 解决方案
- 启用发布确认机制(Publisher Confirms):确保生产者发布的每条消息都被 RabbitMQ 成功接收。
- 持久化消息:将消息队列设置为持久化,使消息存储在磁盘上,即使 RabbitMQ 服务重启也不会丢失消息。
- 消费确认机制(Consumer Acknowledgements):消费者在成功处理消息后,向 RabbitMQ 发送确认,RabbitMQ 在收到确认后才将消息从队列中删除。若消费者处理失败,消息会重新入队。
- 死信队列(Dead Letter Queue, DLQ):当消息处理失败,或者达到最大重试次数后,将消息发送到死信队列,以便后续分析和处理。
RabbitMQ 消费者处理失败
问题描述:消费者在处理消息时可能发生异常(如 Redis、Elasticsearch、MinIO 服务不可用),导致部分数据没有同步成功。
对应方案:
- 在消息中添加重试次数:当消息进入死信队列时,我们可以为消息设置一个重试次数。
- 处理死信队列中的消息:从死信队列中消费消息时,检查当前的重试次数,决定是否再次重试。
- 重试失败后的处理:如果消息重试次数超过限制,记录日志或将其存储到数据库,供后续人工处理。
补充:
死信队列
死信队列(Dead Letter Queue, DLQ)是消息队列系统中的一种机制,用于处理无法被消费的消息。RabbitMQ中,当消息在队列中无法被正常处理时,会被转发到一个预定义的死信队列。以下是一些可能导致消息进入死信队列的原因:
- 消息被拒绝(rejected)并且不重新入队。
- 消息在队列中存活时间超过了TTL(Time-To-Live)。
- 队列达到其最大长度。
- 消息处理失败。
大致实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RabbitMQConfig {
public static final String COURSE_EXCHANGE = "course.exchange"; public static final String COURSE_QUEUE = "course.queue"; public static final String COURSE_ROUTING_KEY = "course.routing.key";
public static final String DLX_EXCHANGE = "dlx.exchange"; public static final String DLX_QUEUE = "dlx.queue"; public static final String DLX_ROUTING_KEY = "dlx.routing.key";
@Bean public Queue courseQueue() { return QueueBuilder.durable(COURSE_QUEUE) .withArgument("x-dead-letter-exchange", DLX_EXCHANGE) .withArgument("x-dead-letter-routing-key", DLX_ROUTING_KEY) .build(); }
@Bean public Queue dlxQueue() { return QueueBuilder.durable(DLX_QUEUE).build(); }
@Bean public TopicExchange courseExchange() { return new TopicExchange(COURSE_EXCHANGE); }
@Bean public TopicExchange dlxExchange() { return new TopicExchange(DLX_EXCHANGE); }
@Bean public Binding courseBinding(Queue courseQueue, TopicExchange courseExchange) { return BindingBuilder.bind(courseQueue).to(courseExchange).with(COURSE_ROUTING_KEY); }
@Bean public Binding dlxBinding(Queue dlxQueue, TopicExchange dlxExchange) { return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(DLX_ROUTING_KEY); } }
|
当课程信息发生变化时,发送消息到 RabbitMQ。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service public class CourseService {
@Resource private RabbitTemplate rabbitTemplate;
public void updateCourseInfo(String courseId, String courseData) { rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE, RabbitMQConfig.COURSE_ROUTING_KEY, courseData); } }
|
消费者代码
每个消费者负责将课程信息同步到 Redis、Elasticsearch 和 MinIO。如果消费者处理失败,RabbitMQ 会将消息发送到死信队列。
Redis 消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service public class RedisCourseConsumer {
@Resource private StringRedisTemplate redisTemplate;
@RabbitListener(queues = RabbitMQConfig.COURSE_QUEUE) public void handleCourseMessage(String message) { try { String courseId = extractCourseId(message); redisTemplate.opsForValue().set("course:" + courseId, message); } catch (Exception e) { throw new RuntimeException("Redis 消费失败", e); } }
private String extractCourseId(String message) { return "courseId"; } }
|
RabbitMQ 消息顺序问题
- 问题描述:RabbitMQ 默认不保证消息的全局顺序,在某些情况下,如果消息顺序错乱,可能导致 Redis、Elasticsearch、MinIO 中的数据不一致。
- 可能原因:
- RabbitMQ 的多个消费者并行处理同一个队列中的消息,导致消息顺序混乱。
- 消息在队列中被重新投递,造成乱序。
- 解决方案:
- 使用单个消费者:确保消息按顺序被消费。虽然降低了吞吐量,但能够确保顺序性。
- 使用消息分组机制:可以通过 RabbitMQ 的 消息分区 或 routing key 机制,将相关的消息发送到同一个消费者,确保消息的顺序。
- 在消费者端实现消息顺序校验:在消费端通过消息中的时间戳或序列号校验消息顺序,确保处理时的顺序正确。
RabbitMQ 数据一致性问题
- 问题描述:因为 Redis、Elasticsearch、MinIO 处理时间差异,或由于网络、服务不可用等原因,可能导致这三个数据存储系统之间的数据不一致。
- 可能原因:
- 消费者处理消息时部分操作失败,但其他操作成功,导致数据不一致。
- 消息被重复消费,导致 Redis、Elasticsearch、MinIO 中部分数据重复或冲突。
- 解决方案:
- 分布式事务:如果对数据一致性要求非常高,可以考虑使用分布式事务机制(例如通过 二阶段提交 或 TCC(Try-Confirm-Cancel) 模式)来确保所有操作要么全部成功要么全部失败。
- 最终一致性:在一些场景下,如果对实时一致性要求不高,可以采用 最终一致性 的策略,通过定时任务或补偿机制,确保在一定时间窗口内数据达到一致。
- 操作日志和补偿机制:记录每次操作的日志,在出现数据不一致的情况下,可以通过补偿机制(如重试、回滚)来修复数据。
常见方案实现:
TCC(Try-Confirm-Cancel)模式
TCC 模式是一种常用的分布式事务解决方案,它将事务分为三个步骤:
- Try:预留资源或锁定资源,准备执行事务。
- Confirm:确认执行事务(在 Try 成功的情况下),并提交所有操作。
- Cancel:如果 Try 失败,或者在 Confirm 过程中出现问题,则回滚预留的资源。
我们以 RabbitMQ 消息驱动的课程信息同步为例,分别在 Redis、Elasticsearch 和 MinIO 中使用 TCC 模式来保证数据一致性。
假设我们有一个 CourseInfo
对象要同步到三个系统中。
- 定义 TCC 接口
1 2 3 4 5
| public interface TccTransactionalService { boolean tryOperation(CourseInfo courseInfo); boolean confirmOperation(CourseInfo courseInfo); boolean cancelOperation(CourseInfo courseInfo); }
|
- Redis TCC 实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service public class RedisTccServiceImpl implements TccTransactionalService {
@Resource private StringRedisTemplate redisTemplate;
@Override public boolean tryOperation(CourseInfo courseInfo) { try { redisTemplate.opsForValue().set("course:pending:" + courseInfo.getId(), courseInfo.toString()); return true; } catch (Exception e) { return false; } }
@Override public boolean confirmOperation(CourseInfo courseInfo) { try { redisTemplate.opsForValue().set("course:" + courseInfo.getId(), courseInfo.toString()); redisTemplate.delete("course:pending:" + courseInfo.getId()); return true; } catch (Exception e) { return false; } }
@Override public boolean cancelOperation(CourseInfo courseInfo) { try { redisTemplate.delete("course:pending:" + courseInfo.getId()); return true; } catch (Exception e) { return false; } } }
|
- Elasticsearch TCC 实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate; import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service public class ElasticsearchTccServiceImpl implements TccTransactionalService {
@Resource private ElasticsearchRestTemplate elasticsearchTemplate;
@Override public boolean tryOperation(CourseInfo courseInfo) { try { elasticsearchTemplate.save(new PendingCourseInfo(courseInfo)); return true; } catch (Exception e) { return false; } }
@Override public boolean confirmOperation(CourseInfo courseInfo) { try { elasticsearchTemplate.save(courseInfo); elasticsearchTemplate.delete(PendingCourseInfo.class, courseInfo.getId()); return true; } catch (Exception e) { return false; } }
@Override public boolean cancelOperation(CourseInfo courseInfo) { try { elasticsearchTemplate.delete(PendingCourseInfo.class, courseInfo.getId()); return true; } catch (Exception e) { return false; } } }
|
- MinIO TCC 实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| import io.minio.MinioClient; import io.minio.PutObjectArgs; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import java.io.ByteArrayInputStream;
@Service public class MinioTccServiceImpl implements TccTransactionalService {
@Resource private MinioClient minioClient;
@Override public boolean tryOperation(CourseInfo courseInfo) { try { ByteArrayInputStream inputStream = new ByteArrayInputStream(courseInfo.toString().getBytes()); minioClient.putObject( PutObjectArgs.builder().bucket("courses").object("pending/course-" + courseInfo.getId() + ".json") .stream(inputStream, courseInfo.toString().getBytes().length, -1) .contentType("application/json") .build() ); return true; } catch (Exception e) { return false; } }
@Override public boolean confirmOperation(CourseInfo courseInfo) { try { ByteArrayInputStream inputStream = new ByteArrayInputStream(courseInfo.toString().getBytes()); minioClient.putObject( PutObjectArgs.builder().bucket("courses").object("course-" + courseInfo.getId() + ".json") .stream(inputStream, courseInfo.toString().getBytes().length, -1) .contentType("application/json") .build() ); minioClient.removeObject("courses", "pending/course-" + courseInfo.getId() + ".json"); return true; } catch (Exception e) { return false; } }
@Override public boolean cancelOperation(CourseInfo courseInfo) { try { minioClient.removeObject("courses", "pending/course-" + courseInfo.getId() + ".json"); return true; } catch (Exception e) { return false; } } }
|
- TCC 事务协调器
我们需要一个事务协调器,在所有服务都执行 Try
成功后,再执行 Confirm
;如果任何 Try
失败,则执行 Cancel
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| import org.springframework.stereotype.Service;
import javax.annotation.Resource; import java.util.Arrays; import java.util.List;
@Service public class TccTransactionCoordinator {
@Resource private RedisTccServiceImpl redisTccService;
@Resource private ElasticsearchTccServiceImpl elasticsearchTccService;
@Resource private MinioTccServiceImpl minioTccService;
public boolean processCourseInfo(CourseInfo courseInfo) { List<TccTransactionalService> services = Arrays.asList(redisTccService, elasticsearchTccService, minioTccService);
for (TccTransactionalService service : services) { if (!service.tryOperation(courseInfo)) { for (TccTransactionalService cancelService : services) { cancelService.cancelOperation(courseInfo); } return false; } }
for (TccTransactionalService service : services) { if (!service.confirmOperation(courseInfo)) { for (TccTransactionalService cancelService : services) { cancelService.cancelOperation(courseInfo); } return false; } }
return true; } }
|
- RabbitMQ 消费者调用 TCC 事务协调器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service public class CourseInfoConsumer {
@Resource private TccTransactionCoordinator tccTransactionCoordinator;
@RabbitListener(queues = RabbitMQConfig.COURSE_QUEUE) public void handleCourseInfoMessage(String message) { CourseInfo courseInfo = parseMessage(message);
boolean success = tccTransactionCoordinator.processCourseInfo(courseInfo); if (!success) { System.err.println("课程信息处理失败: " + courseInfo); } }
private CourseInfo parseMessage(String message) { return new CourseInfo(); } }
|
总结
- Try 阶段:首先在 Redis、Elasticsearch 和 MinIO 中预留资源,确保能够执行后续操作。
- Confirm 阶段:当所有预留操作成功后,提交所有操作并删除预留的资源。
- Cancel 阶段:如果 Try 阶段中的任何操作失败,所有系统的预留操作都将被回滚,以保证一致性。
补偿机制
分布式数据一致性:
-
同步操作失败记录:当同步操作失败时,将失败的操作记录到数据库中。
-
补偿机制:通过定时任务自动重试失败的操作,直到所有操作成功为止。
-
幂等性:确保每次同步操作和补偿操作都是幂等的,重复执行不会产生副作用。
-
RabbitMQ 消费者:从消息队列中消费数据,进行同步,同时支持失败的补偿机制。
-
定义课程信息同步服务接口
1 2 3 4 5 6
| public interface CourseSyncService { boolean syncToRedis(CourseInfo courseInfo); boolean syncToElasticsearch(CourseInfo courseInfo); boolean syncToMinio(CourseInfo courseInfo); boolean compensateSync(CourseInfo courseInfo); }
|
- 实现课程信息同步服务
在 CourseSyncService
中实现同步到 Redis、Elasticsearch 和 MinIO 的逻辑,并在操作失败时记录失败信息,供后续补偿使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
| import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate; import io.minio.MinioClient; import org.springframework.stereotype.Service; import org.springframework.beans.factory.annotation.Autowired;
@Service public class CourseSyncServiceImpl implements CourseSyncService {
@Autowired private StringRedisTemplate redisTemplate;
@Autowired private ElasticsearchRestTemplate elasticsearchTemplate;
@Autowired private MinioClient minioClient;
@Autowired private FailedOperationRepository failedOperationRepository;
@Override public boolean syncToRedis(CourseInfo courseInfo) { try { redisTemplate.opsForValue().set("course:" + courseInfo.getId(), courseInfo.toString()); return true; } catch (Exception e) { failedOperationRepository.save(new FailedOperation("REDIS", courseInfo.getId(), "Sync to Redis failed")); return false; } }
@Override public boolean syncToElasticsearch(CourseInfo courseInfo) { try { elasticsearchTemplate.save(courseInfo); return true; } catch (Exception e) { failedOperationRepository.save(new FailedOperation("ES", courseInfo.getId(), "Sync to Elasticsearch failed")); return false; } }
@Override public boolean syncToMinio(CourseInfo courseInfo) { try { ByteArrayInputStream inputStream = new ByteArrayInputStream(courseInfo.toString().getBytes()); minioClient.putObject( PutObjectArgs.builder() .bucket("courses") .object("course-" + courseInfo.getId() + ".json") .stream(inputStream, courseInfo.toString().getBytes().length, -1) .contentType("application/json") .build() ); return true; } catch (Exception e) { failedOperationRepository.save(new FailedOperation("MINIO", courseInfo.getId(), "Sync to MinIO failed")); return false; } }
@Override public boolean compensateSync(CourseInfo courseInfo) { List<FailedOperation> failedOperations = failedOperationRepository.findByCourseId(courseInfo.getId()); for (FailedOperation failedOperation : failedOperations) { switch (failedOperation.getSystem()) { case "REDIS": if (syncToRedis(courseInfo)) { failedOperationRepository.delete(failedOperation); } break; case "ES": if (syncToElasticsearch(courseInfo)) { failedOperationRepository.delete(failedOperation); } break; case "MINIO": if (syncToMinio(courseInfo)) { failedOperationRepository.delete(failedOperation); } break; default: throw new IllegalArgumentException("Unknown system: " + failedOperation.getSystem()); } } return true; } }
|
- 定义失败操作记录实体
当某个系统的操作失败时,我们将其记录到数据库中,供后续补偿机制使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| import javax.persistence.Entity; import javax.persistence.Id;
@Entity public class FailedOperation { @Id private String id;
private String system; private String courseId; private String reason;
public FailedOperation(String system, String courseId, String reason) { this.id = system + "_" + courseId; this.system = system; this.courseId = courseId; this.reason = reason; }
}
|
- 定义 FailedOperationRepository
通过数据库存储和查询失败的操作记录。
1 2 3 4 5 6
| import org.springframework.data.jpa.repository.JpaRepository; import java.util.List;
public interface FailedOperationRepository extends JpaRepository<FailedOperation, String> { List<FailedOperation> findByCourseId(String courseId); }
|
- 消费 RabbitMQ 消息并触发同步操作
接下来,我们需要在 RabbitMQ 消费者中调用同步服务,并在操作失败时记录失败信息,供后续补偿使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service public class CourseInfoConsumer {
@Resource private CourseSyncService courseSyncService;
@RabbitListener(queues = RabbitMQConfig.COURSE_QUEUE) public void handleCourseInfoMessage(String message) { CourseInfo courseInfo = parseMessage(message);
boolean redisSuccess = courseSyncService.syncToRedis(courseInfo); boolean esSuccess = courseSyncService.syncToElasticsearch(courseInfo); boolean minioSuccess = courseSyncService.syncToMinio(courseInfo);
if (!redisSuccess || !esSuccess || !minioSuccess) { System.err.println("课程信息同步部分失败: " + courseInfo); } }
private CourseInfo parseMessage(String message) { return new CourseInfo(); } }
|
- 使用定时任务进行补偿重试
通过定时任务定期检查失败的操作记录,并尝试重新同步。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import java.util.List;
@Service public class CompensationService {
@Resource private FailedOperationRepository failedOperationRepository;
@Resource private CourseSyncService courseSyncService;
@Scheduled(fixedRate = 60000) public void runCompensation() { List<FailedOperation> failedOperations = failedOperationRepository.findAll();
for (FailedOperation failedOperation : failedOperations) { CourseInfo courseInfo = getCourseInfo(failedOperation.getCourseId()); courseSyncService.compensateSync(courseInfo); } }
private CourseInfo getCourseInfo(String courseId) { return new CourseInfo(); } }
|
RabbitMQ 出现消息积压
- 扩展消费者:增加更多消费者实例,提升处理能力。
- 限流机制:可以对生产者进行限流,避免生产者发送过多消息超过系统处理能力。
- 消息处理优先级:可以使用 RabbitMQ 的 优先级队列,将重要的消息设置为高优先级,确保高优先级消息优先处理,避免消息积压。
补偿机制+定时任务
当 RabbitMQ 或 Redis、Elasticsearch 等组件出现故障,系统如何应对
- RabbitMQ 容错机制
1.1 消息持久化
确保消息不丢失的关键是消息持久化。在 RabbitMQ 中,可以将消息和队列标记为持久化,即使 RabbitMQ 服务重启,这些持久化内容也能够被恢复。
- 持久化队列:确保队列在服务器崩溃或重启后仍然存在。
- 持久化消息:将消息写入硬盘,避免消息丢失。
1 2
| channel.queueDeclare("queue_name", true, false, false, null); channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
|
1.2 消息确认机制
RabbitMQ 提供了**消息确认(ACK)**机制。消费者在成功处理消息后发送 ACK,如果 RabbitMQ 没有收到 ACK,则会重新将消息放回队列。这种机制确保消息不会因消费者故障而丢失。
1
| channel.basicAck(deliveryTag, false);
|
- 自动重试:在消息处理失败时,可以设计自动重试机制,允许消费者在一定次数尝试后将失败的消息放入死信队列(DLQ),以供后续分析或手动处理。
1.3 集群与高可用性
RabbitMQ 支持集群模式和镜像队列(Mirrored Queues),来确保高可用性。通过集群模式,队列和节点可以跨多个服务器进行复制,防止单点故障。
- 镜像队列:在集群环境中,可以设置镜像队列,将队列的消息同步到其他节点,即使一个节点崩溃,消息仍能被消费。
1 2
| # 镜像队列策略 rabbitmqctl set_policy ha-all "^queue\." '{"ha-mode":"all"}'
|
1.4 降级处理
如果 RabbitMQ 故障,可以采取降级策略:
- 本地缓存:在 RabbitMQ 不可用时,消费者可以将消息暂时存储在本地缓存(如内存或文件系统)中,等 RabbitMQ 恢复后重新发送。
- 重试机制:在生产者发送消息失败时,采用指数退避策略进行重试,避免 RabbitMQ 短暂不可用时丢失消息。
rabbitMQ实现延迟消息
在 RabbitMQ 中,延迟消息的实现可以通过以下两种方式:
- 使用 TTL(Time-To-Live)和死信队列(Dead Letter Exchange, DLX)组合实现延迟消息。
- 使用 RabbitMQ 的延迟队列插件(
rabbitmq-delayed-message-exchange
)。
方式 1:使用 TTL 和死信队列
实现原理:
- TTL(Time-To-Live):为消息或队列设置生存时间,消息在队列中达到设定的 TTL 时间后,未被消费的消息会被转发到死信队列。
- 死信队列(DLX):消息在原队列中超时后,会被转发到一个死信交换机(DLX),该交换机会将消息路由到指定的死信队列,消费者可以从这个队列中接收延迟的消息。
实现步骤:
- 创建两个队列,一个是用于存放带有 TTL 的消息队列,另一个是用于接收延迟消息的死信队列。
- 为带有 TTL 的队列设置 TTL 时间,并绑定到死信交换机。
- 消息在 TTL 时间内未被消费,会自动转发到死信队列,消费者从死信队列中消费延迟消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
import java.util.HashMap; import java.util.Map;
@Component public class RabbitMQDelayedMessageExample {
private static final String DELAY_EXCHANGE = "delay_exchange"; private static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";
private static final String DELAY_QUEUE = "delay_queue"; private static final String DEAD_LETTER_QUEUE = "dead_letter_queue";
private static final String DELAY_ROUTING_KEY = "delay_routing_key"; private static final String DEAD_LETTER_ROUTING_KEY = "dead_letter_routing_key";
@Autowired private AmqpAdmin amqpAdmin;
public void setupQueues() { Queue deadLetterQueue = new Queue(DEAD_LETTER_QUEUE);
Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY); Queue delayQueue = new Queue(DELAY_QUEUE, true, false, false, args);
DirectExchange delayExchange = new DirectExchange(DELAY_EXCHANGE); DirectExchange deadLetterExchange = new DirectExchange(DEAD_LETTER_EXCHANGE);
amqpAdmin.declareQueue(delayQueue); amqpAdmin.declareQueue(deadLetterQueue); amqpAdmin.declareExchange(delayExchange); amqpAdmin.declareExchange(deadLetterExchange);
amqpAdmin.declareBinding(BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY)); amqpAdmin.declareBinding(BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(DEAD_LETTER_ROUTING_KEY)); }
public void sendDelayedMessage(String message, long delayMillis) { MessagePostProcessor messagePostProcessor = msg -> { msg.getMessageProperties().setExpiration(String.valueOf(delayMillis)); return msg; }; amqpAdmin.getRabbitTemplate().convertAndSend(DELAY_EXCHANGE, DELAY_ROUTING_KEY, message, messagePostProcessor); System.out.println("发送延迟消息: " + message); }
@RabbitListener(queues = DEAD_LETTER_QUEUE) public void receiveDelayedMessage(String message) { System.out.println("接收到延迟消息: " + message); } }
|
关键点:
- 延迟队列(
delay_queue
):消息会被发送到此队列,并设置消息的 TTL。
- 死信交换机(
dead_letter_exchange
):TTL 超时后,消息会自动转发到此交换机。
- 死信队列(
dead_letter_queue
):消费者从此队列获取延迟消息。
方式 2:使用 RabbitMQ 延迟队列插件
实现原理:
RabbitMQ 提供了一个官方插件 rabbitmq-delayed-message-exchange
,支持直接在 RabbitMQ 中创建延迟队列。该插件可以让你在发送消息时通过设置 x-delay
参数来控制消息的延迟投递时间,无需配置死信队列和 TTL,使用更为简单。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
import java.util.HashMap; import java.util.Map;
@Component public class RabbitMQDelayedPluginExample {
private static final String DELAYED_EXCHANGE = "delayed_exchange"; private static final String DELAYED_QUEUE = "delayed_queue"; private static final String ROUTING_KEY = "delayed_routing_key";
@Autowired private AmqpAdmin amqpAdmin;
public void setupDelayedQueue() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); CustomExchange delayedExchange = new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, args);
Queue delayedQueue = new Queue(DELAYED_QUEUE, true);
amqpAdmin.declareExchange(delayedExchange); amqpAdmin.declareQueue(delayedQueue);
amqpAdmin.declareBinding(BindingBuilder.bind(delayedQueue).to(delayedExchange).with(ROUTING_KEY).noargs()); }
public void sendDelayedMessage(String message, int delayMillis) { MessagePostProcessor messagePostProcessor = msg -> { msg.getMessageProperties().setHeader("x-delay", delayMillis); return msg; }; amqpAdmin.getRabbitTemplate().convertAndSend(DELAYED_EXCHANGE, ROUTING_KEY, message, messagePostProcessor); System.out.println("发送延迟消息: " + message); }
@RabbitListener(queues = DELAYED_QUEUE) public void receiveDelayedMessage(String message) { System.out.println("接收到延迟消息: " + message); } }
|
优势
优点 |
说明 |
成熟稳定 |
RabbitMQ 基于 Erlang 开发,具有高并发、稳定性强的特点,广泛应用于工业级别的生产环境,经过多年发展和广泛使用。 |
强大的路由机制 |
支持多种交换机类型(Direct、Topic、Fanout、Headers),可以根据不同的路由规则灵活地将消息分发到不同的队列中,适用于复杂的消息路由场景。 |
可靠的消息确认机制 |
提供生产者发布确认(Publisher Confirms)和消费者消费确认(ACK),确保消息不丢失、不重复,保证数据一致性。 |
消息持久化 |
RabbitMQ 支持消息、队列和交换机的持久化,确保即使服务重启或发生故障,消息也不会丢失,保证消息传递的可靠性。 |
多协议支持 |
支持 AMQP、MQTT、STOMP、HTTP 等多种协议,具有强大的兼容性,适用于不同的系统和场景。 |
高可扩展性 |
RabbitMQ 支持集群模式和镜像队列,能够通过水平扩展提高系统吞吐量和容错能力,保证服务的高可用性。 |
插件机制 |
提供丰富的插件,如 Shovel、Federation、Management Plugin 等,能扩展 RabbitMQ 的功能,适应不同的业务需求。 |
支持消息顺序性 |
虽然 RabbitMQ 默认不保证消息顺序,但通过合理的队列设计和配置,可以实现消息的顺序消费,适用于需要严格顺序的场景。 |
支持延迟队列 |
通过插件支持延迟队列功能,控制消息延迟处理,对于需要延时处理的场景非常有用。 |
轻量级 |
RabbitMQ 更加轻量和灵活,适合处理相对较小的消息量和复杂的消息路由场景,不需要像 Kafka 那样设计复杂的分区和日志管理。 |
广泛的语言和框架支持 |
支持多种语言和框架(Java、Python、C#、Node.js、Go 等),具有良好的跨平台和跨语言兼容性,易于与现有系统集成。 |
公平调度与消息优先级 |
RabbitMQ 支持公平调度和消息优先级设置,确保消息能够合理分发,优化系统的消息处理性能。 |
RabbitMQ什么情况下会出现 blackholed 问题?
RabbitMQ 中出现 “blackholed” 问题 指的是消息被发送到 exchange 后,由于各种原因消息没有成功路由到任何队列,导致消息丢失,但消息发送者并不知情。这种情况可能会让消息“消失”,但发送者没有任何反馈。
- 向未绑定队列的 exchange 发送消息
- 原因:消息被发送到一个 exchange,但该 exchange 没有绑定任何队列,因此消息找不到目标队列去存储,最终消息丢失。
- 解释:exchange 是消息路由的中间层,如果它没有绑定任何队列,即使消息被成功投递到 exchange,也无法路由到任何地方,导致消息“黑洞化”。
- 路由键不匹配
- 原因:exchange 与队列的绑定键(binding key)和消息使用的路由键(routing key)不匹配。例如,exchange 通过
binding_key
key_A 绑定了队列 queue_A
,但发送消息时使用了不同的 routing_key
key_B。
- 解释:如果消息使用的
routing_key
和 binding_key
不匹配,exchange 无法将消息正确路由到相应的队列,导致消息丢失。
- 未指定
mandatory
标志
- 原因:发送消息时,未设置
mandatory
标志。如果消息不能被路由到任何队列时,RabbitMQ 不会告知发送者消息丢失,消息会被直接丢弃。
- 解释:设置
mandatory
标志为 true
后,如果消息没有成功路由到任何队列,RabbitMQ 会将消息返回给发送者,并触发 Basic.Return
事件。如果没有设置 mandatory
,消息将被悄悄丢弃。
- 未设置
alternate-exchange
(备用交换机)
- 原因:当消息无法路由时,RabbitMQ 没有设置备用的
alternate-exchange
来处理无法路由的消息。
- 解释:
alternate-exchange
是一种机制,可以将无法路由的消息发送到另一个 exchange 进行处理。如果没有设置 alternate-exchange
,无法路由的消息将直接丢失。
- 消息 TTL(Time-To-Live)过期
- 原因:消息被发送到队列,但队列有 TTL 限制,消息过期后直接被丢弃,未被消费者消费。
- 解释:队列或消息设置了 TTL,消息在队列中超过了存活时间后被丢弃,且如果没有使用
dead-letter exchange
,消息将无法进行进一步处理,直接丢失。
- 队列满了(Queue Overflow)
- 原因:队列已达到最大容量,无法再接收新消息。消息发送到 exchange 后,exchange 能够路由到队列,但队列已满,无法再接收新消息,导致消息丢失。
- 解释:队列的长度超出限制时,消息无法存入队列而丢失,且不会告知发送者。
- 持久化消息在服务器宕机后丢失
- 原因:消息未标记为持久化(persistent),或者队列未标记为持久化,在 RabbitMQ 服务宕机或重启后,非持久化的消息将丢失。
- 解释:非持久化的消息不会在服务器重启后保留,宕机后消息丢失。
通过启用 mandatory
标志、设置 alternate-exchange
及 dead-letter exchange
等手段,可以有效避免消息丢失。
kafka为啥没有死信队列
Kafka 没有原生的死信队列(DLQ),这是因为其设计理念和使用模式与传统消息队列不同。精辟地讲:
- 消费者控制重试与失败处理:
- Kafka 假设消费者可以完全控制消息的处理。消费者可以手动管理消息的偏移量,决定是否重试或跳过某条消息。相比于自动化的死信队列,Kafka 更强调消费者的灵活性。
- 消息持久化与重复消费:
- Kafka 的消息是持久化的,消费者可以从任意偏移量重新消费消息。因此,不需要死信队列来存储“失败消息”,而是通过应用层逻辑来决定何时重试或放弃处理。
- Kafka 更关注分布式日志的存储与传输:
- Kafka 作为分布式日志系统,它的核心职责是高效传输和存储消息,而不是直接管理消息处理失败的逻辑。消息的处理失败与否,更多由应用程序来决定,Kafka 本身并不干涉。
总结:
Kafka 没有死信队列,因为它将失败管理交给消费者,同时提供了灵活、持久的消息存储机制,使得消费者可以自行构建重试和失败处理逻辑。
代码实现
由于项目的规模,最后采用异步双写,利用RabbitMQ实现。
RabbitMQ 交换机、队列和路由键
1. 交换机(Exchange)
交换机是RabbitMQ中负责接收生产者发送的消息,并根据一定的规则将消息路由到一个或多个队列。简单来说,交换机是消息分发的中心,它不存储消息,而是根据类型和规则将消息发送到正确的队列。
- 类型:交换机有不同的类型来控制消息的路由方式:
- Direct(直连交换机):根据完全匹配的路由键将消息发送到队列。
- Topic(主题交换机):根据模式匹配的路由键将消息发送到队列。
- Fanout(扇形交换机):将消息广播到所有绑定的队列,不考虑路由键。
- Headers(头交换机):根据消息的头信息来匹配队列。
2. 队列(Queue)
队列是RabbitMQ中存储消息的容器,消费者从队列中读取消息并进行处理。队列的主要职责是临时保存消息,直到有消费者来获取它。每个队列都有一个唯一的名称,并且可以绑定到一个或多个交换机。
- 特性:
- 队列是有顺序的,先进先出(FIFO)。
- 队列可以是持久的(消息会存储在磁盘上)或非持久的(消息只存在于内存中,RabbitMQ重启后丢失)。
3. 路由键(Routing Key)
路由键是生产者在发送消息时附带的一个字符串,用于告诉交换机如何路由消息。路由键在不同类型的交换机中起到不同的作用。
- Direct交换机:路由键必须与队列绑定时指定的路由键精确匹配,消息才能被路由到该队列。
- Topic交换机:路由键可以是一个模式,允许使用通配符(如
*
表示匹配一个单词,#
表示匹配多个单词),根据模式匹配将消息路由到队列。
- Fanout交换机:不使用路由键,消息会广播到所有绑定的队列。
总结:
- 交换机负责接收并分发消息,根据类型和规则决定将消息发送到哪些队列。
- 队列是消息的存储容器,消费者从队列中消费消息。
- 路由键是生产者发送消息时的指示,决定消息如何在交换机和队列之间路由。【交换机根据路由键来决定如何路由消息】
这里,我们使用发布-订阅模式
思路
- 课程发布服务:当课程信息发布后,发送一个消息到 RabbitMQ。
- Redis 同步服务:监听 RabbitMQ 队列,接收消息并更新 Redis 缓存。
- Elasticsearch 同步服务:监听 RabbitMQ 队列,接收消息并更新 Elasticsearch 索引。
- MinIO 文件同步服务:监听 RabbitMQ 队列,接收消息并上传课程相关文件到 MinIO 存储。
1.首先,配置 RabbitMQ 交换机、队列和路由键。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RabbitMQConfig {
public static final String COURSE_EXCHANGE = "course.direct.exchange"; public static final String REDIS_QUEUE = "redis.sync.queue"; public static final String ES_QUEUE = "es.sync.queue"; public static final String MINIO_QUEUE = "minio.sync.queue"; public static final String REDIS_ROUTING_KEY = "sync.redis"; public static final String ES_ROUTING_KEY = "sync.es"; public static final String MINIO_ROUTING_KEY = "sync.minio"; @Bean public DirectExchange courseExchange() { return new DirectExchange(COURSE_EXCHANGE); } @Bean public Queue redisQueue() { return new Queue(REDIS_QUEUE); } @Bean public Queue esQueue() { return new Queue(ES_QUEUE); } @Bean public Queue minioQueue() { return new Queue(MINIO_QUEUE); } @Bean public Binding redisBinding() { return BindingBuilder.bind(redisQueue()).to(courseExchange()).with(REDIS_ROUTING_KEY); } @Bean public Binding esBinding() { return BindingBuilder.bind(esQueue()).to(courseExchange()).with(ES_ROUTING_KEY); } @Bean public Binding minioBinding() { return BindingBuilder.bind(minioQueue()).to(courseExchange()).with(MINIO_ROUTING_KEY); } }
|
2. 课程发布服务发送消息
当课程发布时,将课程信息发送到 RabbitMQ 交换机,然后由 Redis、Elasticsearch、MinIO 等服务来消费这个消息并执行数据同步。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;
@Service public class CoursePublishService {
@Autowired private RabbitTemplate rabbitTemplate; public void publishCourse(Long courseId) { Course course = getCourseById(courseId); rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE, RabbitMQConfig.REDIS_ROUTING_KEY, course); rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE, RabbitMQConfig.ES_ROUTING_KEY, course); rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE, RabbitMQConfig.MINIO_ROUTING_KEY, course); System.out.println("课程发布成功,课程ID:" + courseId); }
private Course getCourseById(Long courseId) { Course course = new Course(); course.setId(courseId); course.setTitle("Java编程入门"); course.setDescription("这是一个Java编程的入门课程"); return course; } }
|
RabbitTemplate
是 Spring AMQP 提供的一个模板类,用于简化与 RabbitMQ 的交互。
convertAndSend
:主要用于将消息转换并发送到指定的交换机和队列。
这个方法会:
- 将对象(如 Java 对象)转换为消息格式(通常是 JSON 或其他可序列化的格式)。
- 将消息发送到指定的交换机(Exchange),并根据指定的路由键(Routing Key)将消息路由到相应的队列。
3. Redis 同步服务
Redis 同步服务监听 redis.sync.queue
队列,接收到消息后将课程信息同步到 Redis 缓存。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service;
@Service public class RedisSyncService {
@Autowired private RedisTemplate<String, Object> redisTemplate;
@RabbitListener(queues = RabbitMQConfig.REDIS_QUEUE) public void syncToRedis(Course course) { redisTemplate.opsForHash().put("course:" + course.getId(), "course", course); System.out.println("课程信息已同步到 Redis,课程ID:" + course.getId()); } }
|
@RabbitListener(queues = RabbitMQConfig.REDIS_QUEUE)
:
- 这里的
queues
参数指定了要监听的 RabbitMQ 队列,即 RabbitMQConfig.REDIS_QUEUE
。当这个队列中有新消息时,Spring 会调用 syncToRedis
方法。
RabbitMQConfig.REDIS_QUEUE
是队列的名称,通常会在配置类中定义。
一个有意思的问题:这个监听机制是怎么做的?
总结
- 监听机制:
@RabbitListener
注解通过 Spring AMQP 框架来监听 RabbitMQ 队列。当队列中有新消息时,RabbitMQ 服务器会主动推送消息给监听器。
- 队列有任务时的通知:RabbitMQ 使用长连接机制,监听器会保持与 RabbitMQ 服务器的连接。当队列中有新消息时,RabbitMQ 服务器会将消息推送给监听器,而不是监听器主动去获取消息或者轮询队列。
- Spring 自动配置:Spring Boot 和 Spring AMQP 自动配置了 RabbitMQ 连接工厂、消息监听容器等,使得开发者可以专注于业务逻辑,而无需手动管理这些基础设施。
Spring 是如何扫描并注册监听器的?
Spring AMQP 的工作原理是基于 Spring 的 Bean 后处理器(BeanPostProcessor) 机制:
- 扫描 Bean:Spring 容器启动时,会扫描所有被
@RabbitListener
注解标记的方法。
- 注册监听器:Spring AMQP 提供了一个
RabbitListenerAnnotationBeanPostProcessor
,它会拦截所有带有 @RabbitListener
的 Bean,并将这些方法注册为 RabbitMQ 消费者。
- 创建监听容器:Spring AMQP 为每个
@RabbitListener
方法创建一个 消息监听容器(SimpleMessageListenerContainer
)。这个容器负责管理 RabbitMQ 连接、监听队列、接收消息,并将消息传递给相应的监听方法。
- 启动监听:监听容器启动后,会保持与 RabbitMQ 服务器的连接,并等待消息的到来。当队列中有新消息时,RabbitMQ 会将消息推送给容器,容器再调用
@RabbitListener
注解的方法来处理消息。
4. Elasticsearch 同步服务
Elasticsearch 同步服务监听 es.sync.queue
队列,接收到消息后将课程信息同步到 Elasticsearch 索引。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentType;
@Service public class ElasticsearchSyncService {
@Autowired private RestHighLevelClient esClient;
@RabbitListener(queues = RabbitMQConfig.ES_QUEUE) public void syncToElasticsearch(Course course) throws Exception { IndexRequest request = new IndexRequest("courses").id(course.getId().toString()) .source("{ \"title\": \"" + course.getTitle() + "\", \"description\": \"" + course.getDescription() + "\" }", XContentType.JSON); esClient.index(request, RequestOptions.DEFAULT); System.out.println("课程信息已同步到 Elasticsearch,课程ID:" + course.getId()); } }
|
流程
- 创建
IndexRequest
:
- 首先,创建一个
IndexRequest
,指定要操作的索引(courses
)及文档的 ID(课程 ID)。
- 文档内容:
- 使用
.source()
方法将 Course
对象的属性(如 title
和 description
)转换为 JSON 格式的文档内容。
- 发送请求:
- 使用
esClient.index()
方法将文档插入到 Elasticsearch 中。如果该文档的 ID 已存在,则会更新现有的文档;如果不存在,则会插入新文档。
- 同步到 Elasticsearch:
- 最终,课程信息会以 JSON 格式存储到
courses
索引中,供后续搜索和查询使用。
5. MinIO 文件同步服务
MinIO 同步服务监听 minio.sync.queue
队列,接收到消息后将课程相关的文件上传到 MinIO 中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| import io.minio.MinioClient; import io.minio.PutObjectArgs; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;
import java.io.ByteArrayInputStream;
@Service public class MinioSyncService {
@Autowired private MinioClient minioClient;
@RabbitListener(queues = RabbitMQConfig.MINIO_QUEUE) public void syncToMinio(Course course) throws Exception { String bucketName = "course-files"; String objectName = "course-" + course.getId() + ".txt"; String fileContent = "课程标题: " + course.getTitle() + "\n课程描述: " + course.getDescription(); ByteArrayInputStream inputStream = new ByteArrayInputStream(fileContent.getBytes()); minioClient.putObject( PutObjectArgs.builder().bucket(bucketName).object(objectName) .stream(inputStream, inputStream.available(), -1) .build() ); System.out.println("课程文件已上传到 MinIO,课程ID:" + course.getId()); } }
|
流程
- Step 1:
ByteArrayInputStream
将字符串 fileContent
转换为字节流,准备上传。
- Step 2:通过
PutObjectArgs.builder()
,构建上传对象的参数,包括存储桶名 (bucketName
)、对象名 (objectName
)、流 (inputStream
)、流的大小(通过 available()
计算)等。
- Step 3:调用
minioClient.putObject(...)
方法,使用构建好的参数,将对象上传到 MinIO 的指定存储桶中。
测试方案
测试方案总结表:
测试场景 |
测试目标 |
测试步骤 |
预期结果 |
功能性测试 |
验证消息从 RabbitMQ 到 Redis 和 Elasticsearch 的同步是否正确 |
1. 发送课程更新消息到 RabbitMQ 2. 检查 Redis 和 Elasticsearch 中的课程数据 |
- Redis 和 Elasticsearch 中的数据与 RabbitMQ 消息完全一致 |
并发测试 |
验证高并发下数据同步是否保持一致 |
1. 模拟 1000 个并发请求向 RabbitMQ 发送课程更新消息 2. 检查 Redis 和 Elasticsearch 中的数据 |
- 所有并发更新均正确同步 - 无丢失消息或不一致数据 |
故障恢复测试 |
验证 RabbitMQ、Redis 或 Elasticsearch 故障时数据是否一致性 |
1. 模拟 RabbitMQ 断开连接,发送消息后恢复 2. 模拟 Redis 或 Elasticsearch 宕机并恢复,检查是否重新同步 |
- 消息发送或消费故障后重试,恢复后消息继续同步 - Redis 和 Elasticsearch 保持一致 |
边界情况测试 |
验证在极端情况或特殊输入下的同步效果 |
1. 发送空消息 2. 发送重复消息 3. 发送大量课程信息 |
- 空消息被正确处理,不影响系统运行 - 重复消息不会导致重复写入 - 大数据量同步正常,Redis 和 Elasticsearch 一致性保持 |
延迟和性能测试 |
验证同步延迟及性能表现 |
1. 记录每条消息的时间戳,测量从 RabbitMQ 到 Redis 和 Elasticsearch 同步的延迟 2. 执行压力测试 |
- 消息处理延迟在可接受范围内(如低于 100ms) - 大量消息情况下,系统保持较好性能和低延迟 |
数据一致性测试 |
确保 Redis 和 Elasticsearch 的数据一致性 |
1. 发送课程的创建、更新和删除消息 2. 检查 Redis 和 Elasticsearch 数据的一致性 |
- 创建、更新和删除操作在 Redis 和 Elasticsearch 中同步 - 数据保持一致 |