谈谈 RocketMQ 的事务消息?

一则或许对你有用的小广告

欢迎 加入小哈的星球 ,你将获得: 专属的项目实战(已更新的所有项目都能学习) / 1v1 提问 / Java 学习路线 / 学习打卡 / 每月赠书 / 社群讨论

  • 新开坑项目: 《Spring AI 项目实战(问答机器人、RAG 增强检索、联网搜索)》 正在持续爆肝中,基于 Spring AI + Spring Boot3.x + JDK 21...点击查看;
  • 《从零手撸:仿小红书(微服务架构)》 已完结,基于 Spring Cloud Alibaba + Spring Boot3.x + JDK 17...点击查看项目介绍; 演示链接: http://116.62.199.48:7070/;
  • 《从零手撸:前后端分离博客项目(全栈开发)》 2 期已完结,演示链接: http://116.62.199.48/

面试考察点

  1. 分布式事务理解:面试官不仅仅是想知道 RocketMQ 事务消息怎么用,更是想考察你是否理解分布式事务的痛点——本地事务和消息发送的原子性问题,以及为什么需要 "事务消息" 这个机制。

  2. 二阶段提交思想:考察你是否能将事务消息与 "二阶段提交" 的思想联系起来,理解 "半消息" 的概念和回查机制的设计原理。

  3. 生产实践能力:面试官想看你是否在项目中真正用过事务消息,能否说出使用场景、注意事项以及和本地消息表等方案的对比。

核心答案

RocketMQ 事务消息是一种 "二阶段提交 + 回查机制" 的方案,用来解决 本地事务执行与消息发送的一致性问题

核心思想:先发 "半消息"(对消费者不可见),再根据本地事务结果决定提交或回滚。如果 Broker 长时间没收到确认,就主动回查本地事务状态。

阶段动作说明
第一阶段发送半消息(Half Message)消息发送到 Broker,但消费者不可见
第二阶段执行本地事务根据本地事务结果返回 Commit/Rollback
补偿阶段事务回查Broker 未收到确认时,主动回查事务状态

深度解析

一、为什么需要事务消息?

先看一个典型的分布式场景:用户下单成功后,需要发一条消息通知积分系统给用户加积分

上图展示了两种常见方案的问题:

  • 方案 A(先发消息后执行事务):消息发送成功但本地事务失败,下游系统已经处理了消息,导致数据不一致。
  • 方案 B(先执行事务后发消息):本地事务成功但消息发送失败,下游系统没收到消息,同样数据不一致。

本质问题就是:本地事务和消息发送不在同一个事务中,无法保证原子性。RocketMQ 的事务消息就是为解决这个问题而生的。

二、事务消息的完整流程

上图展示了事务消息从发送到消费的完整流程,整体分为 5 个核心步骤

  • 步骤 ① 发送半消息:Producer 向 Broker 发送一条 "半消息"(Half Message)。这条消息虽然到了 Broker,但会被存入特殊的内部 Topic(RMQ_SYS_TRANS_HALF_TOPIC),对消费者完全不可见

  • 步骤 ② 返回发送成功:Broker 确认半消息写入成功后,返回确认给 Producer。注意,此时消息只是 "暂存",还没有真正投递。

  • 步骤 ③ 执行本地事务:Producer 收到确认后,开始执行本地事务(比如创建订单)。根据本地事务的执行结果,返回三种状态之一:

    • COMMIT:本地事务成功,消息应该投递
    • ROLLBACK:本地事务失败,消息应该丢弃
    • UNKNOWN:本地事务状态不确定,等待回查
  • 步骤 ④ 提交或回滚:Producer 将本地事务结果发送给 Broker。如果是 COMMIT,Broker 将消息从半消息 Topic 移到目标 Topic,消费者可以消费;如果是 ROLLBACK,Broker 直接删除该半消息。

  • 步骤 ⑤ 事务回查:如果 Broker 长时间(默认 15 秒)没有收到步骤 ④ 的确认(比如 Producer 宕机、网络异常),Broker 会主动回查 Producer 的本地事务状态,根据返回结果决定提交或回滚。回查最多执行 15 次,超过则回滚。

三、代码示例

// 1. 创建事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("tx_producer_group");
producer.setNamesrvAddr("localhost:9876");

// 2. 设置事务监听器(核心)
producer.setTransactionListener(new TransactionListener() {

    // ③ 执行本地事务
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            // 执行本地业务逻辑,比如:创建订单、扣减库存等
            Order order = JSON.parseObject(msg.getBody(), Order.class);
            orderService.createOrder(order);

            // 本地事务成功,提交消息
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            // 本地事务失败,回滚消息
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        // 如果不确定,返回 UNKNOW,等待 Broker 回查
        // return LocalTransactionState.UNKNOW;
    }

    // ⑤ 事务回查
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 查询本地事务状态(比如查数据库看订单是否创建成功)
        String orderId = msg.getProperty("orderId");
        Order order = orderService.getOrderById(orderId);

        if (order != null && order.getStatus() == OrderStatus.CREATED) {
            // 订单存在且状态正常,提交消息
            return LocalTransactionState.COMMIT_MESSAGE;
        } else if (order != null && order.getStatus() == OrderStatus.FAILED) {
            // 订单已标记失败,回滚消息
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        // 还是查不到,返回 UNKNOW,继续等待下次回查
        return LocalTransactionState.UNKNOW;
    }
});

producer.start();

// 3. 发送事务消息(① + 自动触发 ③)
Message msg = new Message("ORDER_TOPIC", "TagA", orderJson.getBytes());
msg.setProperty("orderId", "ORD_123456");

// sendMessageInTransaction 会:
// 先发送半消息 → Broker 返回确认 → 执行 executeLocalTransaction
producer.sendMessageInTransaction(msg, null);

四、半消息的存储原理

上图展示了半消息在 Broker 端的存储和转换过程:

  • 半消息暂存:半消息写入特殊的内部 Topic(RMQ_SYS_TRANS_HALF_TOPIC),这个 Topic 对普通消费者不可见。消息的原始 Topic 信息保存在消息的属性中。
  • 提交时转换:当 Producer 返回 COMMIT 后,Broker 将消息从半消息 Topic 中取出,还原为原始 Topic 投递到正常的 ConsumeQueue 中,此时消费者才能消费。
  • 回滚时清理:当 Producer 返回 ROLLBACK 后,Broker 不会移动消息,而是在 RMQ_SYS_TRANS_OP_HALF_TOPIC 中记录一条操作日志,标记该半消息已处理。
  • 回查依据:Broker 通过对比 HALF_TOPICOP_HALF_TOPIC,找出那些没有操作记录的半消息,对这些消息发起事务回查。

五、事务消息 vs 本地消息表

对比维度RocketMQ 事务消息本地消息表
实现复杂度中(依赖 MQ 支持)高(需要自己建表 + 定时任务)
额外存储不需要需要建本地消息表
一致性保证MQ 层面保证业务层面保证
适用场景使用 RocketMQ 的项目任何 MQ 都可用
回查机制Broker 自动回查定时任务扫描重发
耦合度低(MQ 内建能力)中(业务侵入性强)

面试高频追问

  1. 追问一:事务消息的回查最多几次?间隔多久?

    最多回查 15 次,第一次回查间隔为 15 秒,后续每次间隔递增。超过 15 次仍未确认,默认回滚消息。

  2. 追问二:事务消息支持延迟消息和顺序消息吗?

    事务消息不支持延迟消息。事务消息支持顺序消息,但需要保证本地事务执行和消息确认的顺序性。

  3. 追问三:如果 Producer 一直不返回确认会怎样?

    Broker 会持续回查,直到超过最大回查次数(15 次),最终回滚该消息。所以 checkLocalTransaction 方法一定要实现,且逻辑要尽量轻量快速。

  4. 追问四:事务消息和普通消息的性能差距大吗?

    事务消息多了半消息写入 + 二次确认 + 可能的回查,性能比普通消息低约 20%-30%。但对大多数业务场景来说完全可接受。

常见面试变体

  • 变体一:RocketMQ 怎么保证分布式事务的最终一致性?
  • 变体二:RocketMQ 事务消息的半消息是怎么实现的?
  • 变体三:RocketMQ 事务消息和本地消息表方案怎么选?
  • 变体四:RocketMQ 的事务回查机制是怎样的?

记忆口诀

"半消息、本地事务、二阶段确认、回查兜底"——先发半消息(消费者看不见),再执行本地事务,成功了就提交让消息可见,失败了就回滚删消息。如果 Producer "失联" 了,Broker 就主动回查来兜底。

总结

RocketMQ 事务消息通过 "半消息 + 二阶段提交 + 事务回查" 三板斧,优雅地解决了本地事务与消息发送的原子性问题。核心流程是:先发半消息(对消费者不可见)→ 执行本地事务 → 根据结果提交或回滚 → 超时未确认则 Broker 主动回查。这是分布式事务中实现 最终一致性 的经典方案。