RocketMQ 4.7.1 DLedger 模式下延时消息重复投递问题深度排查与源码解析

RocketMQ 4.7.1 DLedger 模式下延时消息重复投递问题深度排查与源码解析

前言

在分布式高并发的场景下,“消息重复消费”几乎是每个工程师都会遇到的老朋友。通常情况下,我们习惯性地把它归咎于“网络抖动导致客户端重试”或者“消费者未及时提交 Offset”。然而,最近在一套基于 RocketMQ 4.7.1 + DLedger(一主两从强一致性部署) 的测试环境中,遇到了一个堪称“幽灵”的延时消息重复消费问题:特定延时级别(5秒)的消息,在到期后几乎 100% 必现并发双发投递。

在经历了常规排查、日志解密、十六进制物理偏移量推算,最终通过 MAT(Memory Analyzer Tool)对 Broker 进程执行“开颅手术”级堆栈解剖后,我们彻底剥离了这一深藏在 RocketMQ 早期高可用状态机中的高危深水区 Bug。

本文将还原整场硬核排查的完整闭环,希望能为遇到同类诡异重复问题的技术同行提供一套无懈可击的触底排查方法论。


一、 诡异的案发现场:完美的“1毫秒”与“532字节”

1. 业务现象

业务系统发送一条延时 5 秒(DelayLevel=2)的订单超时检查消息。5 秒后,消费者几乎在同一瞬间收到了两条一模一样的消息,导致业务幂等层承受了不必要的并发压力。

2. 消费端日志抓拍

Talk is cheap, show me the log. 我们直接把两条重复消息的 MessageExt 属性调出来对比:

Plaintext

// 第一条消息消费日志 (14:50:22.629)
received msg: MessageExt [brokerName=RaftNode03, queueId=9, storeSize=484, queueOffset=11, 
bornTimestamp=1775803817623, storeTimestamp=1775803822896, 
msgId=0A640E59000078BF00000003CEDCDF11, commitLogOffset=16355483409, 
UNIQ_KEY=0A140454544C18B4AAC2318852960015, ...]

// 第二条消息消费日志 (14:50:22.630)
received msg: MessageExt [brokerName=RaftNode03, queueId=9, storeSize=484, queueOffset=12, 
bornTimestamp=1775803817623, storeTimestamp=1775803822897, 
msgId=0A640E59000078BF00000003CEDCE125, commitLogOffset=16355483941, 
UNIQ_KEY=0A140454544C18B4AAC2318852960015, ...]

3. 提取核心密码

将上述日志中的底层存储属性提炼出来,可以得出以下惊人的物理规律:

  • 业务唯一键完全相同:两者的 UNIQ_KEY(客户端生成的 MessageId)均为 0A14045454...0015

  • 逻辑队列连续queueOffset 分别是 11 和 12,说明它们在 ConsumeQueue 索引里是紧挨着的兄弟。

  • 物理落盘紧挨:第一条消息的 commitLogOffset16355483409,第二条是 16355483941

    做一道减法:$16355483941 – 16355483409 = 532 \text{ 字节}$。

    而消息本体的 storeSize 是 484 字节。剩下的 $532 – 484 = 48 \text{ 字节}$,恰好是 DLedger 模式下特有的 Raft 日志条目头(DLedgerEntry Header)的固定大小

  • 落盘时间差 1 毫秒storeTimestamp 分别是 ...896...897

初探结论:这绝对不是消费者重复拉取,也不是索引多建。这是 Broker 的物理磁盘 CommitLog 里,确确实实、背靠背、严丝合缝地连续刻下了两段一模一样的业务消息数据!


二、 艰难的排查之路:排除一切不可能

为了找出这多出来的物理消息从何而来,我们开始顺着链路层层证伪:

猜想一:客户端网络超时导致的 Producer 自动重试?

  • 证据收集:翻遍发送端客户端日志,没有任何 send message timeout 异常,发送一切平滑。
  • 铁证反驳:源头系统延时队列 SCHEDULE_TOPIC_XXXX 精确追踪显示:生产者只发了一次。如果是客户端重试,由于在短时间内连续发起了两次写请求,在源头延时 Topic 中必然能揪出两条相邻的源消息。但通过分析底层 ConsumeQueue 二进制块,发现源头延时队列只有孤零零的 1 条物理记录。

猜想二:主从切换引起的“幽灵多线程”并发读取?

  • 证据收集:有人怀疑在 DLedger 的 Raft 频繁选举中,原本代班的 Slave 节点退化回从节点时,没有干净地关闭内部的延时调度服务,导致主从节点同时读取了同一条延时消息转存。
  • 铁证反驳:通过对全集群所有节点执行 jstack 线程栈普查,发现除了当前的 Master 节点外,其余两台 Slave 机器内部的 ScheduleMessageTimerThread 线程均处于干净的未启动状态。Master 节点上也仅有唯一的一个延时线程在孤军奋战。

既然发送端只发了一次,集群里只有一个单线程,从源头也只读了一条消息,没有任何报错重试,为什么物理磁盘上会被这个单线程连拍两次?


三、 终极破案:MAT 内存“开颅手术”

我们将 Master 节点的 JVM 堆内存整体 Dump 出来,导入 Eclipse MAT 中。顺着执行延时消息调度的核心线程 ScheduleMessageTimerThread 的方法栈和局部变量一路下挖:

终于,在 java.util.TimerThread 的核心调度推车—— java.util.TaskQueue 的局部变量数组 queue 里,我们抓到了无法被辩驳的物理铁证:

在等待执行的任务队列数组里,竟然同时挂载了两个不同的 DeliverDelayedMessageTimerTask 实例,而它们的内部属性 delayLevel 全部等于 2(5秒级别)!

真相大白:一个调度线程的肚子里,竟然离奇地塞进了两个都在负责监听“5秒级别延时队列”的幽灵双胞胎任务。当 5 秒时间一到,这两个任务在同一个线程里被先后唤醒,顺理成章地对同一个源消息连续调用了两次 putMessage() 写入了业务 Topic。因为是单线程紧挨着执行,所以两者的落盘时间恰好只差了微小的 1 毫秒!


四、 源码深挖:幽灵双胞胎任务是如何诞生的?

为什么内存中会多注册一个一模一样的 Task?翻阅 RocketMQ 4.7.1 版本的 ScheduleMessageService 源码,其背后的高危生命周期控制漏洞终于浮出水面。

当 DLedger 集群因为网络微抖或 I/O 阻塞发生瞬时的主从角色切换时(Master -> Follower -> Master),Broker 会响应角色变更事件并对延时服务进行重启。

Java

// org.apache.rocketmq.store.schedule.ScheduleMessageService.java 底层缺陷逻辑还原

public void stop() {
    this.started = false; // 1. 将状态标志位置为 false
    if (null != this.timer) {
        this.timer.cancel(); // 2. 取消旧计时器
    }
}

public void start() {
    this.started = true; // 3. 重新拉起时,置为 true
    this.timer = new Timer(); // 4. 创建全新的 Timer 实例
    for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
        // 5. 重新为每个级别 schedule 全新的任务 (Task B)
        this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), 0);
    }
}

乍一看逻辑非常闭环,然而致命的漏洞隐藏在 DeliverDelayedMessageTimerTaskrun() 方法尾部的 finally 块中:

Java

class DeliverDelayedMessageTimerTask extends TimerTask {
    public void run() {
        try {
            // 执行具体的转存逻辑,调用 writeMessageStore.putMessage()
            executeOnTimeup(); 
        } catch (Exception e) {
            log.error("...", e);
        } finally {
            // 【万恶之源就在这里】
            // 如果旧任务 Task A 正在写盘时发生了主从闪断,stop() 无法强行中止正在运行的 Task A。
            // 紧接着系统瞬间恢复,start() 被调用,started 被设回 true,并注册了全新的 Task B。
            // 此时卡顿的 Task A 终于写完盘走到了 finally 块:
            if (!ScheduleMessageService.this.isStarted()) {
                return; // 因为 start() 把它重新改为了 true,此处的校验被完美绿灯绕过!
            }
            
            // Task A 误以为服务一切正常,顺手将自己克隆出的新任务,重新塞进了最新的 timer 容器中!
            ScheduleMessageService.this.timer.schedule(
                new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
        }
    }
}

这就是著名的定时任务生命周期并发重叠缺陷:旧的 Task A 因为底层写盘卡顿错过了销毁期,在服务重新启动后,它通过 finally 块的“僵尸复活”逻辑,成功将自己强行塞进了新启用的 Timer 容器里,与正常初始化的 Task B 胜利会师。这就是为什么该 Bug 只严格发生在发生过卡顿或频繁切换的特定延时级别上


五、 混沌工程:如何在实验室 100% 还原现场

为了验证这一推论,我们在本地基于 Win11 的虚拟环回网卡(Microsoft KM-TEST Loopback Adapter)搭建了一套 4.7.1 的高仿 DLedger 集群,并通过 Linux 混沌指令(或对特定 Broker 进程实施信号冻结)成功将其复现:

  1. 高频诱饵打入:编写测试客户端,以每 10 毫秒一条的高频率不间断向集群灌入 DelayLevel=2 的延时消息,确保 Broker 内部的延时任务线程始终处于密集的写盘忙碌状态。
  2. 闪断状态机:利用 Linux 的 kill -STOP <PID> 瞬间冻结当前 Leader 节点的 JVM,迫使从节点因心跳超时触发选举并成功篡位。
  3. 闪电夺权:几秒后,立刻执行 kill -CONT <PID> 唤醒旧 Leader,由于集群拓扑的动态变化,人为制造极其密集的 LEADER -> FOLLOWER -> LEADER 瞬间切换。
  4. 见证 Bug:集群稳定后,消费端立刻开始爆发连续相差 1 毫秒、物理间距 532 字节的并发双发消费日志。此时使用 OQL 检索内存,双胞胎 Task 完美现身。

六、 总结与避坑指南

通过这次长达数天的极限排查,我们从表象的消费逻辑一路下潜到 JVM 内存最底层的运行期状态,彻底终结了这起分布式疑案。针对此类由于开源组件底层架构演进带来的特有风险,我们提炼出以下几点共勉:

  1. 分布式 MQ 的铁律——必须死守消费端幂等

    无论分布式消息中间件底盘宣传得多么高可用、一致性多么完美,在遇到极端高并发抖动、底层状态机错位或内存乱序时,MQ 的物理层永远只能保证 “At Least Once(至少投递一次)”。在 MessageListener 的入口处,必须坚决套死基于业务唯一单号(如 orderNo)的 Redis 分布式锁、或者数据库强一致唯一索引,作为拦截重复数据的底线防御。

  2. 架构优化:跨越 4.7.x 的早期高可用深水区

    RocketMQ 4.7.1 属于早期拥抱分布式 Raft 一致性(DLedger)的尝鲜版本,其角色切换与各上层后台 Service 的生命周期管理存在诸多元气未复的并发漏洞。社区在后续的 4.8.x 以及更为稳定的 4.9.x 版本中,不仅引入了 Asynchronous Pipeline(异步流式处理模型)大幅减轻了 DLedger 带来的 I/O 卡顿,更对 ScheduleMessageService 的启停状态机进行了严密重构。如果你的生产环境正在深度依赖延时消息且使用了 DLedger,强烈建议安排平滑升级至 4.9.x 稳定版,从底层根治此类隐患。

文章摘自:https://www.cnblogs.com/imadc/p/20067651