谈谈 RocketMQ 的事务消息?
一则或许对你有用的小广告
欢迎 加入小哈的星球 ,你将获得: 专属的项目实战(已更新的所有项目都能学习) / 1v1 提问 / Java 学习路线 / 学习打卡 / 每月赠书 / 社群讨论
- 新开坑项目: 《Spring AI 项目实战(问答机器人、RAG 增强检索、联网搜索)》 正在持续爆肝中,基于
Spring AI + Spring Boot3.x + JDK 21..., 点击查看; - 《从零手撸:仿小红书(微服务架构)》 已完结,基于
Spring Cloud Alibaba + Spring Boot3.x + JDK 17..., 点击查看项目介绍; 演示链接: http://116.62.199.48:7070/; - 《从零手撸:前后端分离博客项目(全栈开发)》 2 期已完结,演示链接: http://116.62.199.48/
面试考察点
-
分布式事务理解:面试官不仅仅是想知道 RocketMQ 事务消息怎么用,更是想考察你是否理解分布式事务的痛点——本地事务和消息发送的原子性问题,以及为什么需要 "事务消息" 这个机制。
-
二阶段提交思想:考察你是否能将事务消息与 "二阶段提交" 的思想联系起来,理解 "半消息" 的概念和回查机制的设计原理。
-
生产实践能力:面试官想看你是否在项目中真正用过事务消息,能否说出使用场景、注意事项以及和本地消息表等方案的对比。
核心答案
RocketMQ 事务消息是一种 "二阶段提交 + 回查机制" 的方案,用来解决 本地事务执行与消息发送的一致性问题。
核心思想:先发 "半消息"(对消费者不可见),再根据本地事务结果决定提交或回滚。如果 Broker 长时间没收到确认,就主动回查本地事务状态。
| 阶段 | 动作 | 说明 |
|---|---|---|
| 第一阶段 | 发送半消息(Half Message) | 消息发送到 Broker,但消费者不可见 |
| 第二阶段 | 执行本地事务 | 根据本地事务结果返回 Commit/Rollback |
| 补偿阶段 | 事务回查 | Broker 未收到确认时,主动回查事务状态 |
深度解析
一、为什么需要事务消息?
先看一个典型的分布式场景:用户下单成功后,需要发一条消息通知积分系统给用户加积分。
上图展示了两种常见方案的问题:
- 方案 A(先发消息后执行事务):消息发送成功但本地事务失败,下游系统已经处理了消息,导致数据不一致。
- 方案 B(先执行事务后发消息):本地事务成功但消息发送失败,下游系统没收到消息,同样数据不一致。
本质问题就是:本地事务和消息发送不在同一个事务中,无法保证原子性。RocketMQ 的事务消息就是为解决这个问题而生的。
二、事务消息的完整流程
上图展示了事务消息从发送到消费的完整流程,整体分为 5 个核心步骤:
-
步骤 ① 发送半消息:Producer 向 Broker 发送一条 "半消息"(Half Message)。这条消息虽然到了 Broker,但会被存入特殊的内部 Topic(
RMQ_SYS_TRANS_HALF_TOPIC),对消费者完全不可见。 -
步骤 ② 返回发送成功:Broker 确认半消息写入成功后,返回确认给 Producer。注意,此时消息只是 "暂存",还没有真正投递。
-
步骤 ③ 执行本地事务:Producer 收到确认后,开始执行本地事务(比如创建订单)。根据本地事务的执行结果,返回三种状态之一:
COMMIT:本地事务成功,消息应该投递ROLLBACK:本地事务失败,消息应该丢弃UNKNOWN:本地事务状态不确定,等待回查
-
步骤 ④ 提交或回滚:Producer 将本地事务结果发送给 Broker。如果是
COMMIT,Broker 将消息从半消息 Topic 移到目标 Topic,消费者可以消费;如果是ROLLBACK,Broker 直接删除该半消息。 -
步骤 ⑤ 事务回查:如果 Broker 长时间(默认 15 秒)没有收到步骤 ④ 的确认(比如 Producer 宕机、网络异常),Broker 会主动回查 Producer 的本地事务状态,根据返回结果决定提交或回滚。回查最多执行 15 次,超过则回滚。
三、代码示例
// 1. 创建事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("tx_producer_group");
producer.setNamesrvAddr("localhost:9876");
// 2. 设置事务监听器(核心)
producer.setTransactionListener(new TransactionListener() {
// ③ 执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 执行本地业务逻辑,比如:创建订单、扣减库存等
Order order = JSON.parseObject(msg.getBody(), Order.class);
orderService.createOrder(order);
// 本地事务成功,提交消息
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
// 本地事务失败,回滚消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
// 如果不确定,返回 UNKNOW,等待 Broker 回查
// return LocalTransactionState.UNKNOW;
}
// ⑤ 事务回查
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 查询本地事务状态(比如查数据库看订单是否创建成功)
String orderId = msg.getProperty("orderId");
Order order = orderService.getOrderById(orderId);
if (order != null && order.getStatus() == OrderStatus.CREATED) {
// 订单存在且状态正常,提交消息
return LocalTransactionState.COMMIT_MESSAGE;
} else if (order != null && order.getStatus() == OrderStatus.FAILED) {
// 订单已标记失败,回滚消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
// 还是查不到,返回 UNKNOW,继续等待下次回查
return LocalTransactionState.UNKNOW;
}
});
producer.start();
// 3. 发送事务消息(① + 自动触发 ③)
Message msg = new Message("ORDER_TOPIC", "TagA", orderJson.getBytes());
msg.setProperty("orderId", "ORD_123456");
// sendMessageInTransaction 会:
// 先发送半消息 → Broker 返回确认 → 执行 executeLocalTransaction
producer.sendMessageInTransaction(msg, null);
四、半消息的存储原理
上图展示了半消息在 Broker 端的存储和转换过程:
- 半消息暂存:半消息写入特殊的内部 Topic(
RMQ_SYS_TRANS_HALF_TOPIC),这个 Topic 对普通消费者不可见。消息的原始 Topic 信息保存在消息的属性中。 - 提交时转换:当 Producer 返回
COMMIT后,Broker 将消息从半消息 Topic 中取出,还原为原始 Topic 投递到正常的ConsumeQueue中,此时消费者才能消费。 - 回滚时清理:当 Producer 返回
ROLLBACK后,Broker 不会移动消息,而是在RMQ_SYS_TRANS_OP_HALF_TOPIC中记录一条操作日志,标记该半消息已处理。 - 回查依据:Broker 通过对比
HALF_TOPIC和OP_HALF_TOPIC,找出那些没有操作记录的半消息,对这些消息发起事务回查。
五、事务消息 vs 本地消息表
| 对比维度 | RocketMQ 事务消息 | 本地消息表 |
|---|---|---|
| 实现复杂度 | 中(依赖 MQ 支持) | 高(需要自己建表 + 定时任务) |
| 额外存储 | 不需要 | 需要建本地消息表 |
| 一致性保证 | MQ 层面保证 | 业务层面保证 |
| 适用场景 | 使用 RocketMQ 的项目 | 任何 MQ 都可用 |
| 回查机制 | Broker 自动回查 | 定时任务扫描重发 |
| 耦合度 | 低(MQ 内建能力) | 中(业务侵入性强) |
面试高频追问
-
追问一:事务消息的回查最多几次?间隔多久?
最多回查 15 次,第一次回查间隔为 15 秒,后续每次间隔递增。超过 15 次仍未确认,默认回滚消息。
-
追问二:事务消息支持延迟消息和顺序消息吗?
事务消息不支持延迟消息。事务消息支持顺序消息,但需要保证本地事务执行和消息确认的顺序性。
-
追问三:如果 Producer 一直不返回确认会怎样?
Broker 会持续回查,直到超过最大回查次数(15 次),最终回滚该消息。所以
checkLocalTransaction方法一定要实现,且逻辑要尽量轻量快速。 -
追问四:事务消息和普通消息的性能差距大吗?
事务消息多了半消息写入 + 二次确认 + 可能的回查,性能比普通消息低约 20%-30%。但对大多数业务场景来说完全可接受。
常见面试变体
- 变体一:RocketMQ 怎么保证分布式事务的最终一致性?
- 变体二:RocketMQ 事务消息的半消息是怎么实现的?
- 变体三:RocketMQ 事务消息和本地消息表方案怎么选?
- 变体四:RocketMQ 的事务回查机制是怎样的?
记忆口诀
"半消息、本地事务、二阶段确认、回查兜底"——先发半消息(消费者看不见),再执行本地事务,成功了就提交让消息可见,失败了就回滚删消息。如果 Producer "失联" 了,Broker 就主动回查来兜底。
总结
RocketMQ 事务消息通过 "半消息 + 二阶段提交 + 事务回查" 三板斧,优雅地解决了本地事务与消息发送的原子性问题。核心流程是:先发半消息(对消费者不可见)→ 执行本地事务 → 根据结果提交或回滚 → 超时未确认则 Broker 主动回查。这是分布式事务中实现 最终一致性 的经典方案。