当前位置: 首页 > article >正文

rocketmq延迟消息的底层原理浅析

rocketmq延迟消息的底层原理

消息实体

延时消息是指允许消息在指定延迟时间后才被消费者消费

Apache RocketMQ 中,消息的核心实体类是 org.apache.rocketmq.common.message.Message

public class Message implements Serializable {private String topic;                 // 消息主题(必填)private int flag;                     // 消息标志(用户自定义)private Map<String, String> properties; // 消息属性(键值对,可用于过滤、追踪等)private byte[] body;                  // 消息体(消息内容)private String transactionId;         // 事务ID(用于事务消息)...
}

在消息属性中properties,有一些常见属性:

  • TAGS:标签,可用于消息过滤
  • DELAY:延迟级别(延时消息)
  • TIMER_DELAY_MS:延迟投递的毫秒数
  • TIMER_DELAY_SEC:延迟投递的秒数
  • TIMER_DELIVER_MS:精准指定投递时间戳

实现机制

延迟等级

RocketMQ 将延迟消息设计成 “延迟级别(delayLevel)” 的形式,每个级别对应一个固定的延迟时间。在4.x的版本中,RocketMQ不支持任意时间精度的延迟,而是预设了18个延迟等级。使用使用ConcurrentSkipListMap存储延迟级别与时间的映射关系

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2hprivate final ConcurrentSkipListMap<Integer /* level */, Long/* delay timeMillis */>delayLevelTable = new ConcurrentSkipListMap<>();public void setDelayTimeLevel(int level) {this.putProperty("DELAY", String.valueOf(level));
}

在5.x的版本中支持任意时间延迟

public void setDelayTimeSec(long sec) {this.putProperty("TIMER_DELAY_SEC", String.valueOf(sec));
}public void setDelayTimeMs(long timeMs) {this.putProperty("TIMER_DELAY_MS", String.valueOf(timeMs));
}public void setDeliverTimeMs(long timeMs) {this.putProperty("TIMER_DELIVER_MS", String.valueOf(timeMs));
}

消息的处理流程

消息进入延迟队列

Producer 发送延迟消息时,设置 message.setDelayTimeLevel(x)。延迟消息到达Broker不会立即进入你设置的业务 Topic;会先被投递到名为 SCHEDULE_TOPIC_XXXX 的系统内置 Topic

public class CommitLog {public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {// ...if (msg.getDelayTimeLevel() > 0) {// 如果超过了最大延迟级别if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}// 获取RMQ_SYS_SCHEDULE_TOPICtopic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;// 根据延迟级别选取对应的队列int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// 将消息原本的TOPIC和队列ID设置到消息属性中MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));// 设置SCHEDULE_TOPICmsg.setTopic(topic);msg.setQueueId(queueId);}}// ...}
}
  1. 判断消息的延迟级别是否超过了最大延迟级别,如果超过了就使用最大延时等级

  2. topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPICRMQ_SYS_SCHEDULE_TOPIC是在TopicValidator中定义的常量,值为SCHEDULE_TOPIC_XXXX

  3. int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()),根据延迟级别选取对应的队列,把相同延迟级别的消息放在同一个队列中

  4. 将消息原本的TOPIC和队列ID设置到消息属性中,MessageAccessor 是 RocketMQ 中的一个工具类,作用是以非公开的方式修改 Message 对象的内部字段或属性

    public static void putProperty(Message msg, String name, String value) {msg.putProperty(name, value);}
    

启动定时任务

Broker启动的时候会调用ScheduleMessageServicestart方法,start方法中为不同的延迟级别创建了对应的定时任务来处理延迟消息

public class ScheduleMessageService extends ConfigManager {// 首次执行延迟的时间private static final long FIRST_DELAY_TIME = 1000L;public void start() {if (started.compareAndSet(false, true)) {super.load();this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));if (this.enableAsyncDeliver) {this.handleExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));}// 遍历所有的延迟级别for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {Integer level = entry.getKey();Long timeDelay = entry.getValue();Long offset = this.offsetTable.get(level);if (null == offset) { // 如果获取的消费进度为空offset = 0L; // 默认为0,从第一条消息开始处理}if (timeDelay != null) {if (this.enableAsyncDeliver) {this.handleExecutorService.schedule(new HandlePutResultTask(level), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);}// 为每个延迟级别创建对应的定时任务this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);}}// ...}}
}

遍历所有的延迟等级,为每个延迟等级创建对应的定时任务。

每个DeliverDelayedMessageTimerTask负责:

  1. 从对应延迟级别的队列中扫描消息
  2. 检查消息的投递时间是否到达
  3. 将到期消息重新投递到目标主题

相关文章:

rocketmq延迟消息的底层原理浅析

rocketmq延迟消息的底层原理 消息实体 延时消息是指允许消息在指定延迟时间后才被消费者消费 Apache RocketMQ 中&#xff0c;消息的核心实体类是 org.apache.rocketmq.common.message.Message public class Message implements Serializable {private String topic; …...

【openssl】升级为3.3.1,避免安全漏洞

本文档旨在形成 对Linux系统openssl版本进行升级 的搭建标准操作过程&#xff0c;搭建完成后&#xff0c;实现 openssl 达到3.3以上版本&#xff0c;避免安全漏洞 效果。 一、查看当前版本 版本不高于3.1的&#xff0c;均需要升级。 # 服务器上运行以下命令&#xff0c;查看…...

使用 HTML +JavaScript 从零构建视频帧提取器

在视频编辑、内容分析和多媒体处理领域&#xff0c;常常需要从视频中提取关键帧。手动截取不仅效率低下&#xff0c;还容易遗漏重要画面。本文介绍的视频帧提取工具通过 HTML5 技术栈实现了一个完整的浏览器端解决方案&#xff0c;用户可以轻松选择视频文件并进行手动或自动帧捕…...

基于若依前后分离版-用户密码错误锁定

sys_config配置参数 user.password.maxRetryCount&#xff1a;最大错误次数 user.password.lockTime&#xff1a;锁定时长 //SysLoginController//登录 PostMapping("/login") public AjaxResult login(RequestBody LoginBody loginBody) {AjaxResult ajax AjaxR…...

论文速读《DexWild:野外机器人策略的灵巧人机交互》

项目链接&#xff1a;https://dexwild.github.io/ 论文链接&#xff1a;https://arxiv.org/pdf/2505.07813 0. 简介 2025年5月&#xff0c;卡内基梅隆大学&#xff08;CMU&#xff09;发布了一篇突破性论文《DexWild: Dexterous Human Interactions for In-the-Wild Robot Pol…...

Bug问题

一、list 页面 import React, { useEffect, useState } from react; import { shallowEqual, useHistory, useSelector } from dva; import { Button, message } from choerodon-ui/pro; import formatterCollections from hzero-front/lib/utils/intl/formatterCollections; …...

【数据结构】5. 双向链表

文章目录 一、链表的分类1、双向链表的结构 二、双向链表的实现0、准备工作1、初始化2、打印3、尾插4、头插5、尾删6、头删7、查找8、在指定位置之后插入数据9、删除指定位置10、销毁 一、链表的分类 链表总共分为8种&#xff0c;具体的分组方式如图所示&#xff1a; 带头指的…...

【Linux手册】冯诺依曼体系结构

目录 前言 五大组件 数据信号 存储器&#xff08;内存&#xff09;有必要吗 常见面试题 前言 冯诺依曼体系结构是当代计算机基本架构&#xff0c;冯诺依曼体系有五大组件&#xff0c;通过这五大组件直观的描述了计算机的工作原理&#xff1b;学习冯诺依曼体系可以让给我们更…...

Mobile App UI自动化locator

在开展mobile app UI层自动化测试时&#xff0c;编写目标元素的locator是比较耗时的一个环节&#xff0c;弄清楚locator背后的逻辑&#xff0c;可以有效降低UI层测试维护成本。此篇博客以webdriverioappium作为UI自动化工具为例子&#xff0c;看看有哪些selector方法&#xff0…...

PaloAlto-Expedition OS命令注入漏洞复现(CVE-2025-0107)

免责申明: 本文所描述的漏洞及其复现步骤仅供网络安全研究与教育目的使用。任何人不得将本文提供的信息用于非法目的或未经授权的系统测试。作者不对任何由于使用本文信息而导致的直接或间接损害承担责任。如涉及侵权,请及时与我们联系,我们将尽快处理并删除相关内容。 前…...

(LeetCode 每日一题) 1061. 按字典序排列最小的等效字符串 (并查集)

题目&#xff1a;1061. 按字典序排列最小的等效字符串 思路&#xff1a;使用并查集&#xff0c;来将等价的字符连起来&#xff0c;形成一棵树。这棵树最小的字母&#xff0c;就代表整颗树&#xff0c;时间复杂度0(n)&#xff0c;细节看注释。 C版本&#xff1a; class Solutio…...

linux 安装mysql8.0;支持国产麒麟,统信uos系统

一&#xff1a;使用我已经改好的mysql linux mysql8.0解压可用&#xff0c;点我下载 也在国产麒麟系统&#xff0c;统信uos系统也测试过&#xff0c;可用&#xff1b; 下载后&#xff0c;上传mysql.tar.gz 然后使用root角色去执行几个命令即可&#xff1b;数据库密码&#xf…...

C#实现远程锁屏

前言 这是一次提前下班没有锁屏进而引发的一次思考后的产物&#xff0c;思考的主要场景是当人离开电脑后&#xff0c;怎么能控制电脑锁屏&#xff0c;避免屏幕上的聊天记录被曝光。 首先想到通过系统的电源计划设置闲置超时时间熄屏&#xff0c;这可能是最接近场景的解决方案&a…...

历史记录隐藏的安全风险

引言 在数字化生活与工作场景中&#xff0c;历史记录功能广泛存在于浏览器、办公软件、移动应用等各类平台。它通过记录用户的搜索内容、操作痕迹、访问路径等信息&#xff0c;为用户提供便捷的操作体验和个性化服务。然而&#xff0c;这种看似便利的功能背后&#xff0c;却隐藏…...

SpringBoot3整合MySQL8的注意事项

版权声明 本文原创作者&#xff1a;谷哥的小弟作者博客地址&#xff1a;http://blog.csdn.net/lfdfhl 注意事项 1、请添加添加如下依赖&#xff1a; <dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><…...

网络安全大模型理解

一、网络安全大模型的概述 网络安全大模型是一种用于识别和应对各种网络安全威胁的模型。它通过分析网络数据包、网络行为等信息&#xff0c;识别潜在的网络安全事件&#xff0c;并采取相应的措施进行防御。网络安全大模型主要包括以下几个部分&#xff1a; 1. 数据预处理&am…...

智语心桥:当AI遇上“星星的孩子”,科技如何点亮沟通之路?

目录: 引言:当科技的温度,遇见“星星的孩子”“智语心桥”:一座为孤独症儿童搭建的AI沟通之桥核心技术探秘:AI如何赋能“读心”与“对话”?个性化魔法:AI如何实现“千人千面”的精准干预?应用场景畅想:从家庭到机构,AI的全方位支持为什么是“智语心桥”?——价值、可…...

itop-3568开发板机器视觉opencv开发手册-图像绘制-画线

本小节代码在配套资料“iTOP-3568 开发板\03_【iTOP-RK3568 开发板】指南教程 \04_OpenCV 开发配套资料\11”目录下&#xff0c;如下图所示&#xff1a; cv2.line 函数功能&#xff1a; 绘制一条直线。 函数原型&#xff1a; cv2.line(img,pt1,pt2,color,thicknessNone,lin…...

【高频面试题】快慢指针及相关应用

文章目录 1 简介2 相关应用3 相关题目4 典型例题4.1 判断链表是否有环4.2 寻找链表的入环点4.3 寻找链表的中点4.4 寻找链表的倒数第k个节点4.5 重排链表 &#xff08;反转链表找链表中点合并链表&#xff09;4.6 寻找重复数&#xff08;快慢指针 or 二分&#xff09;4.7 回文链…...

sudo docker exec -it backend bash 以交互方式(interactive)进入正在运行的 Docker 容器的命令行环境

sudo docker exec -it backend bash&#x1f50d; 总体作用 这条命令的作用是&#xff1a; 以交互方式&#xff08;interactive&#xff09;进入名为 backend 的正在运行的 Docker 容器的命令行环境。 你会进入容器的“终端”&#xff0c;就像登录到一个 Linux 系统一样&#…...

[论文阅读] 人工智能 | 当AI遇见绿色软件工程:可持续AI实践的研究新方向

【论文解读】当AI遇见绿色软件工程&#xff1a;可持续AI实践的研究新方向 论文信息 作者&#xff1a;Maja H. Kirkeby, Enrique Barba Roque, Justus Bogner等 标题&#xff1a;Greening AI-enabled Systems with Software Engineering: A Research Agenda for Environment…...

[论文阅读] 人工智能 | 用大语言模型抓虫:如何让网络协议实现与RFC规范对齐

用大语言模型抓虫&#xff1a;如何让网络协议实现与RFC规范对齐&#xff1f; 论文信息 arXiv:2506.01249 SysLLMatic: Large Language Models are Software System Optimizers Huiyun Peng, Arjun Gupte, Ryan Hasler, Nicholas John Eliopoulos, Chien-Chou Ho, Rishi Mantr…...

浅析EXCEL自动连接PowerBI的模板

浅析EXCEL自动连接PowerBI的模板 之前我分享过&#xff1a;PowerBI链接EXCEL实现自动化报表 &#xff0c;其中一个关键工具就是提到的EXCEL链接模板&#xff0c;即宏工作薄。 今天就大概来聊一聊这个宏工作簿的底层原理是啥&#xff0c;怎么实现的。 第一步&#xff1a; 打开…...

DeepSeek 赋能金融反洗钱:AI 驱动的风险监测革新之路

目录 一、引言二、金融反洗钱监测的现状与挑战2.1 现状概述2.2 面临的挑战 三、DeepSeek 技术原理剖析3.1 核心架构3.2 关键技术 四、DeepSeek 在金融反洗钱监测中的应用优势4.1 强大的数据处理与分析能力4.2 精准的风险识别与预警4.3 提升工作效率与降低成本 五、DeepSeek 在金…...

java32

1.反射 获取类&#xff1a; 获取构造方法&#xff1a; 获取权限修饰符&#xff1a; 获取参数信息&#xff1a; 利用反射出来的构造器来创建对象&#xff1a; 获取成员变量&#xff1a; 获取成员方法&#xff1a; 综合练习&#xff1a; 动态代理&#xff1a;...

【Redis】zset 类型

zset 一. zset 类型介绍二. zset 命令zaddzcard、zcountzrange、zrevrange、zrangebyscorezpopmax、zpopminzrank、zrevrank、zscorezrem、zremrangebyrank、zremrangebyscorezincrby阻塞版本命令&#xff1a;bzpopmax、bzpopmin集合间操作&#xff1a;zinterstore、zunionstor…...

从Gartner报告看Atlassian在生成式AI领域的创新路径与实践价值

本文来源atlassian.com&#xff0c;由Atlassian全球白金合作伙伴——龙智翻译整理。 二十余年来&#xff0c;Atlassian始终是创新领域的领军者。凭借对团队协作本质的深刻理解&#xff0c;Atlassian在AI时代仍持续引领协作方式的革新。如今&#xff0c;这一领先地位再次获得权威…...

Kafka 安装教程(支持 Windows / Linux / macOS)

一、下载 1、kafka官网下载地址:https://kafka.apache.org/downloads 根据实际情况下载对应的版本 2、JDK的版本最好是17+ JDK下载地址:https://www.oracle.com/java/technologies/javase/jdk17-0-13-later-archive-downloads.html 二、安装 前置条件 安装 Java(至少 Jav…...

OpenCV种的cv::Mat与Qt种的QImage类型相互转换

一、首先了解cv::Mat结构体 cv::Mat::step与QImage转换有着较大的关系。 step的几个类别区分: step:矩阵第一行元素的字节数step[0]:矩阵第一行元素的字节数step[1]:矩阵中一个元素的字节数step1(0):矩阵中一行有几个通道数step1(1):一个元素有几个通道数(channel()) cv::Ma…...

机器学习——什么时候使用决策树

无论是决策树&#xff0c;包括集成树还是神经网络都是非常强大、有效的学习方法。 下面是各自的优缺点&#xff1a; 决策树和集成树通常在表格数据上表现良好&#xff0c;也称为结构化数据&#xff0c;这意味着如果你的数据集看起来像一个巨大的电子表格&#xff0c;那么决策…...