RocketMQ 怎么保证消息不丢失?
一则或许对你有用的小广告
欢迎 加入小哈的星球 ,你将获得: 专属的项目实战(已更新的所有项目都能学习) / 1v1 提问 / Java 学习路线 / 学习打卡 / 每月赠书 / 社群讨论
- 新开坑项目: 《Spring AI 项目实战(问答机器人、RAG 增强检索、联网搜索)》 正在持续爆肝中,基于
Spring AI + Spring Boot3.x + JDK 21..., 点击查看; - 《从零手撸:仿小红书(微服务架构)》 已完结,基于
Spring Cloud Alibaba + Spring Boot3.x + JDK 17..., 点击查看项目介绍; 演示链接: http://116.62.199.48:7070/; - 《从零手撸:前后端分离博客项目(全栈开发)》 2 期已完结,演示链接: http://116.62.199.48/
面试考察点
-
全链路思维:面试官不仅仅是想知道某一个点,而是想考察你是否具备 "Producer → Broker → Consumer" 全链路的消息可靠性意识,能否识别消息在哪个环节可能丢。
-
机制理解深度:考察你是否了解同步刷盘、同步复制、消息确认等底层机制的原理和区别,以及它们各自对性能的影响。
-
生产实践能力:面试官想看你是否能在 "可靠性 vs 性能" 之间做出合理权衡,而不是一味追求最高可靠性。
核心答案
消息可能在 三个环节 丢失,需要逐个击破:
| 可能丢失的环节 | 原因 | 解决方案 |
|---|---|---|
| Producer 发送时 | 网络异常、发送超时 | 同步发送 + 重试机制 |
| Broker 存储时 | Broker 宕机、磁盘故障 | 同步刷盘 + 同步复制 |
| Consumer 消费时 | 消费失败但误确认 | 手动确认 + 消费重试机制 |
一句话总结:Producer 用同步发送确保消息到达 Broker,Broker 用同步刷盘和主从复制确保消息持久化,Consumer 用手动确认确保消息处理成功。
深度解析
一、消息丢失的三个环节全景图
上图展示了消息从生产到消费全链路中可能丢失的三个关键风险点:
- 风险点 ①(Producer → Broker):消息发送阶段。如果 Producer 使用异步发送(
Oneway),消息发出去就不管了,网络抖动或 Broker 不可用时消息直接丢失。 - 风险点 ②(Broker 内部):消息存储阶段。消息到达 Broker 后先写入内存(PageCache),如果此时 Broker 宕机、磁盘故障,内存中的数据来不及落盘就丢了。
- 风险点 ③(Broker → Consumer):消息消费阶段。Consumer 拉取到消息后,如果业务处理失败但已经向 Broker 返回了
CONSUME_SUCCESS,Broker 就会更新消费进度,这条消息就再也拿不回来了。
二、Producer 端:确保消息发送成功
1. 使用同步发送
// ❌ 危险:Oneway 发送,不等结果,消息可能丢失
producer.sendOneway(msg);
// ❌ 有风险:异步发送,回调可能来不及处理
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) { }
@Override
public void onException(Throwable e) {
// 发送失败了,可能没来得及处理
}
});
// ✅ 推荐:同步发送,等待 Broker 确认
SendResult result = producer.send(msg);
if (result.getSendStatus() != SendStatus.SEND_OK) {
// 发送失败,记录日志或走降级逻辑
log.error("消息发送失败:{}", result);
}
2. 发送失败自动重试
// 设置同步发送失败重试次数,默认 2 次(共发送 3 次)
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setRetryTimesWhenSendFailed(3); // 同步发送重试次数
producer.setRetryTimesWhenSendAsyncFailed(3); // 异步发送重试次数
3. 三种发送方式对比
| 发送方式 | 方法 | 是否等结果 | 可靠性 | 性能 | 适用场景 |
|---|---|---|---|---|---|
| 同步发送 | send(msg) | 等 | 高 | 中 | 核心业务(推荐) |
| 异步发送 | send(msg, callback) | 不等,回调通知 | 中 | 较高 | 对实时性要求高的场景 |
| 单向发送 | sendOneway(msg) | 不等,不回调 | 低 | 最高 | 日志采集等可丢场景 |
三、Broker 端:确保消息持久化
1. 同步刷盘 vs 异步刷盘
上图对比了两种刷盘策略的工作方式:
- 异步刷盘(
ASYNC_FLUSH):消息写入内存(PageCache)后就立即返回成功给 Producer,由后台线程异步将数据刷到磁盘。性能高,但 Broker 宕机时 PageCache 中还没来得及落盘的数据会丢失。 - 同步刷盘(
SYNC_FLUSH):消息写入内存后,必须等待数据真正落盘到磁盘后才返回成功给 Producer。性能稍低,但即使 Broker 宕机,数据也不会丢。
# Broker 配置文件 broker.conf 中设置刷盘策略
flushDiskType=SYNC_FLUSH # 同步刷盘(推荐核心业务使用)
# flushDiskType=ASYNC_FLUSH # 异步刷盘(默认,性能更好)
2. 同步复制 vs 异步复制
上图对比了主从复制的两种模式:
- 异步复制(
ASYNC_MASTER):Master 写入成功后立即返回给 Producer,异步将数据复制到 Slave。如果 Master 宕机时数据还没复制到 Slave,这部分数据就丢了。 - 同步复制(
SYNC_MASTER):Master 写入后,必须等待 Slave 也写入成功,才返回成功给 Producer。双写保障,任何一台宕机数据都不丢。
# Broker 配置文件中设置主从复制策略
brokerRole=SYNC_MASTER # 同步复制(推荐核心业务使用)
# brokerRole=ASYNC_MASTER # 异步复制(默认)
# brokerRole=SLAVE # 从节点
3. Broker 端可靠性配置组合
| 配置组合 | 可靠性 | 性能 | 适用场景 |
|---|---|---|---|
| 异步刷盘 + 异步复制 | 低 | 最高 | 日志、监控数据 |
| 同步刷盘 + 异步复制 | 中 | 较高 | 一般业务 |
| 异步刷盘 + 同步复制 | 较高 | 中 | 一般核心业务 |
| 同步刷盘 + 同步复制 | 最高 | 较低 | 金融、交易等核心业务 |
四、Consumer 端:确保消费成功
1. 消费成功后再确认
// ✅ 正确做法:业务处理成功后再返回 CONSUME_SUCCESS
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
// 先执行业务逻辑
orderService.processOrder(msg);
}
// 业务处理成功后,才确认消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 业务处理失败,返回 RECONSUME_LATER,消息会进入重试队列
log.error("消费失败,稍后重试", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
// ❌ 错误做法:先确认再处理
// 这样如果业务处理失败,消息已经确认了,就再也拿不回来了
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
orderService.processOrder(msg); // 这行可能根本不会执行!
2. 消费重试机制
上图展示了 RocketMQ 的消费重试机制,整体分为三个阶段:
- 消费失败 → 重试 Topic:Consumer 返回
RECONSUME_LATER后,消息不会丢失,而是被转移到重试 Topic(%RETRY% + ConsumerGroup),等待下一次重新投递。 - 延迟等级递增重试:RocketMQ 采用递增的延迟策略进行重试,从 10 秒开始,逐步增加到 2 小时。共 16 个延迟等级,避免频繁重试打爆系统。
- 最终兜底 → 死信队列:如果重试 16 次仍然失败,消息会被移入死信队列(
%DLQ% + ConsumerGroup)。死信队列中的消息不会再自动投递,需要人工介入排查和处理。
// 设置最大重试次数
consumer.setMaxReconsumeTimes(16); // 默认 16 次
// 消费时可以获取重试次数
MessageExt msg = msgs.get(0);
int reconsumeTimes = msg.getReconsumeTimes();
if (reconsumeTimes >= 3) {
// 重试次数过多,记录日志,走降级逻辑
log.warn("消息重试 {} 次仍失败,msgId={}", reconsumeTimes, msg.getMsgId());
}
五、生产环境的最佳配置组合
面试高频追问
-
追问一:同步刷盘 + 同步复制会不会太慢?
确实有性能损耗,同步刷盘的吞吐量大约是异步刷盘的 60%-70%。但金融、交易等核心业务不能丢数据,这点性能损耗是值得的。可以通过 SSD 磁盘来缓解。
-
追问二:消息确认(ACK)是在 Consumer 端还是 Broker 端?
Consumer 端返回
CONSUME_SUCCESS后,Broker 更新消费进度(Offset)。Broker 负责管理 Offset,Consumer 负责告知消费结果。 -
追问三:如果 Consumer 消费一直失败怎么办?
重试 16 次后进入死信队列(DLQ)。生产环境一定要监控死信队列,及时告警和人工处理。可以在代码中判断
reconsumeTimes,超过阈值就走降级逻辑。 -
追问四:RocketMQ 的消息一定是完全不丢吗?
即使做了所有配置,极端情况下(如磁盘损坏、机房断电)仍可能丢。对于绝对不能丢的场景,需要业务层面做对账补偿,不能完全依赖 MQ。
常见面试变体
- 变体一:RocketMQ 的消息可靠性怎么保证?
- 变体二:RocketMQ 消息丢了怎么排查?
- 变体三:RocketMQ 同步刷盘和异步刷盘有什么区别?
- 变体四:RocketMQ 的死信队列是什么?
记忆口诀
"三端三保"——Producer 同步发送加重试,Broker 同步刷盘加同步复制,Consumer 业务成功再确认。记住三句话:发出去要确认、存下来要落盘、消费完再告诉 Broker。
总结
RocketMQ 保证消息不丢失需要 三端配合:Producer 端使用同步发送 + 重试机制确保消息到达 Broker;Broker 端使用同步刷盘 + 同步复制确保消息持久化到磁盘和从节点;Consumer 端业务处理成功后再确认消费,失败则走重试 + 死信队列兜底。核心思想就是 "每个环节都不丢,丢了有兜底"。