RocketMQ 消息堆积了怎么处理?

一则或许对你有用的小广告

欢迎 加入小哈的星球 ,你将获得: 专属的项目实战(已更新的所有项目都能学习) / 1v1 提问 / Java 学习路线 / 学习打卡 / 每月赠书 / 社群讨论

  • 新开坑项目: 《Spring AI 项目实战(问答机器人、RAG 增强检索、联网搜索)》 正在持续爆肝中,基于 Spring AI + Spring Boot3.x + JDK 21...点击查看;
  • 《从零手撸:仿小红书(微服务架构)》 已完结,基于 Spring Cloud Alibaba + Spring Boot3.x + JDK 17...点击查看项目介绍; 演示链接: http://116.62.199.48:7070/;
  • 《从零手撸:前后端分离博客项目(全栈开发)》 2 期已完结,演示链接: http://116.62.199.48/

面试考察点

  1. 问题诊断能力:面试官不仅仅是想知道你怎么 "救火",更是想知道你是否能快速定位消息堆积的根因——是消费端太慢、还是生产端太猛、还是 Broker 出了问题。

  2. 应急处理经验:考察你是否具备生产环境实战经验,能否在消息堆积时采取正确的应急策略,而不是手忙脚乱、越搞越乱。

  3. 架构优化思维:面试官想看你是否具备从源头预防堆积的能力,比如合理设计消费者并发度、选择合适的消息模型、做好监控告警等。

核心答案

消息堆积的处理思路可以用一句话概括:先止血、再恢复、最后根治

具体来说,分为 应急处理根本治理 两个层面:

层面策略核心动作
应急处理扩容消费者增加消费者实例数量或线程数
应急处理消息转发将堆积消息转发到新 Topic,用独立消费者消费
应急处理临时丢弃非核心业务可丢弃部分历史消息
根本治理优化消费逻辑减少单条消息处理耗时
根本治理合理并发设计调整 consumeThreadMin/consumeThreadMax
根本治理监控告警建立堆积量监控,提前预警

深度解析

一、消息堆积的常见原因

先搞清楚 "为什么堆",才能对症下药:

上图展示了消息堆积的三大来源。根据生产经验,Consumer 端消费能力不足是导致堆积最常见的原因,占比超过 80%。具体包括:

  • 消费逻辑慢:比如消费消息时同步调用数据库、RPC 调用耗时过长、业务逻辑复杂等
  • 消费端异常:代码 Bug 导致反复重试、消费者频繁重启、Full GC 停顿等
  • 并发度不够:消费者实例数少、消费线程数配置过低
  • 顺序消息瓶颈:使用顺序消息时,同一 MessageQueue 只能单线程消费,吞吐量受限

二、应急处理方案

方案 1:扩容消费者(最快见效)

// 方式一:增加消费者实例(部署更多节点)
// 注意:消费者实例数不能超过 Topic 的 MessageQueue 数量
// 例如 Topic 有 16 个 Queue,最多只能部署 16 个消费者实例

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setConsumeThreadMin(20);   // 最小消费线程数,默认 20
consumer.setConsumeThreadMax(100);  // 最大消费线程数,默认 20
  • 原理:增加消费端的并行处理能力
  • 注意:消费者实例数受限于 MessageQueue 数量,超过后新实例会 "空转",不会分配到 Queue
  • 适用场景:消费逻辑本身不太慢,只是并发度不够

方案 2:消息转发到新 Topic(堆积量巨大时)

// Step 1:写一个临时消费者,快速消费堆积消息并转发到新 Topic
DefaultMQPushConsumer tempConsumer = new DefaultMQPushConsumer("temp_group");
tempConsumer.subscribe("original_topic", "*");
tempConsumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(
            List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            // 快速转发到新 Topic,不做业务处理
            Message newMsg = new Message("new_topic", msg.getBody());
            producer.send(newMsg);
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

上图展示了消息转发方案的核心思路:用一个轻量级的临时消费者快速将堆积消息从原 Topic 转发到新 Topic,然后由新的消费者集群按照自己的节奏慢慢消费。

  • 为什么有效:原 Topic 的堆积被快速清理,恢复了正常的生产消费链路;新 Topic 的消费可以按业务能力慢慢来
  • 关键点:转发逻辑要尽量轻量,不做任何业务处理,纯粹 "搬运" 消息
  • 适用场景:堆积量特别大(百万级以上),且消费逻辑确实很慢

方案 3:临时丢弃非核心消息

// 对于非核心业务,可以跳过历史消息,从最新消息开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

// 或者设置消息过期时间,过期自动丢弃
// 在 Broker 端配置:fileReservedTime = 48(小时)
  • 适用场景:日志类、监控类等对历史数据不敏感的业务
  • 注意:核心业务链路绝对不能用这种方式,会丢数据

三、根本治理方案

1. 优化消费逻辑

// 反面教材:同步调用多个外部服务
MessageListenerConcurrently listener = (msgs, context) -> {
    for (MessageExt msg : msgs) {
        userService.getUser(msg.getUserId());      // RPC 调用 1:200ms
        orderService.getOrder(msg.getOrderId());    // RPC 调用 2:300ms
        inventoryService.deduct(msg.getSkuId());    // RPC 调用 3:150ms
        // 单条消息处理耗时 650ms,极容易堆积!
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
};

// 正面教材:批量操作 + 异步化
MessageListenerConcurrently listener = (msgs, context) -> {
    // 批量查询,减少 RPC 次数
    List<Long> userIds = msgs.stream()
        .map(m -> parseUserId(m)).collect(Collectors.toList());
    Map<Long, User> userMap = userService.batchGetUsers(userIds);  // 一次批量查

    // 本地缓存热点数据
    // 异步处理非核心逻辑
    CompletableFuture.runAsync(() -> {
        notifyService.sendNotification(msgs);  // 非核心逻辑异步处理
    }, asyncExecutor);

    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
};

2. 合理设置消费并发度

参数默认值建议值说明
consumeThreadMin20根据业务调整最小消费线程数
consumeThreadMax2050-200最大消费线程数
pullBatchSize3232-100每次拉取消息批量大小
consumeMessageBatchMaxSize110-50每次消费消息批量大小

3. 避免顺序消息导致的堆积

上图对比了并发消息和顺序消息的吞吐量差异。对于顺序消息,由于同一个 MessageQueue 只能被一个消费者线程顺序消费,吞吐量直接受限于 Queue 的数量。

  • 如果业务确实需要顺序消费,可以通过 增加 Topic 的 Queue 数量 来提升并行度
  • 例如:原来 4 个 Queue 只能 4 线程并发,扩到 16 个 Queue 就能 16 线程并发

4. 建立监控告警机制

// 通过 MQAdmin 工具监控堆积量
// 命令:mqadmin consumerProgress -g consumer_group

// 核心监控指标:
// 1. Diff(堆积量):Consumer Offset 与 Broker Offset 的差值
// 2. 消费 TPS:每秒消费消息数
// 3. 消费延迟:消息从产生到被消费的时间差

// 告警阈值建议(根据业务 SLA 调整):
// - Diff > 10000 条:P2 告警
// - Diff > 100000 条:P1 告警
// - 消费延迟 > 5 分钟:P0 告警

四、处理流程总结

上图展示了从发现堆积到最终根治的完整处理流程。核心原则是:先快速止血恢复服务,再排查根因彻底治理

面试高频追问

  1. 追问一:消费者实例数和 MessageQueue 数量是什么关系?

    消费者实例数 ≤ MessageQueue 数量时,每个实例至少分配一个 Queue;超过 Queue 数量的实例会空闲。所以扩容消费者前先确认 Queue 数量。

  2. 追问二:消息堆积会导致什么严重后果?

    磁盘打满导致 Broker 拒绝写入、消息过期被清理导致数据丢失、消费延迟导致业务逻辑时效性丧失。

  3. 追问三:RocketMQ 怎么查看消息堆积情况?

    使用 mqadmin consumerProgress 命令查看每个消费者组的 Diff 值,或在 Dashboard 控制台监控 Consumer Lag。

  4. 追问四:广播消费模式会有堆积问题吗?

    广播模式下每个消费者都会消费所有消息,堆积只和单个消费者自身消费能力有关,扩容消费者并不能分担压力,需要通过优化消费逻辑来解决。

常见面试变体

  • 变体一:线上 RocketMQ 消费延迟很高怎么排查?
  • 变体二:如何避免 RocketMQ 消息堆积?
  • 变体三:RocketMQ 消费者怎么提高消费速度?
  • 变体四:百万级消息堆积,如何快速恢复?

记忆口诀

"先止血、再治病"——应急三板斧:扩容(加实例/线程)、转发(新 Topic 缓冲)、丢弃(非核心业务);根治四件套:优化逻辑、调并发、加 Queue、建监控。

总结

消息堆积的处理核心是 "先止血再根治":应急阶段通过扩容消费者、消息转发、临时丢弃等手段快速恢复;根治阶段从优化消费逻辑、合理配置并发度、增加 Queue 数量、建立监控告警等方面入手。其中 80% 以上的堆积根因在消费端,所以日常开发中要格外关注消费逻辑的性能。