【分布式技术专题】RocketMQ延迟消息实现原理和源码分析
痛点背景
业务场景
假设有这么一个需求,用户下单后如果30分钟未支付,则该订单需要被关闭。你会怎么做?
之前方案
最简单的做法,可以服务端启动个定时器,隔个几秒扫描数据库中待支付的订单,如果(当前时间-订单创建时间)>30分钟,则关闭订单。
方案评估
- 优点:是实现简单,缺点呢?
- *缺点:定时扫描意味着隔个几秒就得查一次数据库,频率高的情况下,如果数据库中订单总量特别大,这种高频扫描会对数据库带来一定压力,待付款订单特别多时(做个爆品秒杀活动,或者啥促销活动),若一次性查到内存中,容易引起宕机,需要分页查询,多少也会有一定数据库层面压力。
延时队列出现
- 能够在指定时间间隔后触发某个业务操作
- 能够应对业务数据量特别大的特殊场景
RocketMQ延时消息能够完美的解决上述需求,正常的消息在投递后会立马被消费者所消费,而延时消息在投递时,需要设置指定的延时级别(不同延迟级别对应不同延迟时间),即等到特定的时间间隔后消息才会被消费者消费,这样就将数据库层面的压力转移到了MQ中,也不需要手写定时器,降低了业务复杂度,同时MQ自带削峰功能,能够很好的应对业务高峰。
功能特点
- RocketMQ支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息;
- 预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h;
- 在消息创建的时候,调用 setDelayTimeLevel(int level) 方法设置延迟时间;
- *broker在接收到延迟消息的时候会把对应延迟级别的消息先存储到对应的延迟队列中,等延迟消息时间到达时,会把消息重新存储到对应的topic的queue里面。
Broker处理延迟消息
延时队列生产者端:
延时消息的关键点在于Producer生产者需要给消息设置特定延时级别,消费端代码与正常消费者没有差别。
public class Producer {private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("111.231.110.149:9876");producer.start();for (int i = 0; i < 10; i++) {try {Message msg = new Message("TopicTest" ,"TagA" ,("test message" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));msg.setDelayTimeLevel(3);SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}producer.shutdown();}
}
复制代码
初始化
DefaultMessageStore在启动时,会调用ScheduleMessageService#load()方法来加载消息消费进度和初始化延迟级别对应map,然后调用ScheduleMessageService#start()方法来启动类
load方法
public boolean load() {boolean result = super.load();result = result && this.parseDelayLevel();return result;
}
复制代码
ScheduleMessageService继承自ConfigManager类,super.load()方法对应
public boolean load() {String fileName = null;try {fileName = this.configFilePath();String jsonString = MixAll.file2String(fileName);if (null == jsonString || jsonString.length() == 0) {return this.loadBak();} else {this.decode(jsonString);log.info("load " + fileName + " OK");return true;}} catch (Exception e) {log.error("load " + fileName + " failed, and try to load backup file", e);return this.loadBak();}
}
复制代码
延时队列源码分析:
先从延时消息延迟级别设置与broker端消息持久化入手。
具体实现
RocketMQ发送延时消息时先把消息按照延迟时间段发送到指定的队列中(rocketmq把每种延迟时间段的消息都存放到同一个队列中)然后通过一个定时器进行轮训这些队列,查看消息是否到期,如果到期就把这个消息发送到指定topic的队列中,这样的好处是同一队列中的消息延时时间是一致的,还有一个好处是这个队列中的消息时按照消息到期时间进行递增排序的,说的简单直白就是队列中消息越靠前的到期时间越早。
启动延迟消息定时任务
如果想要深入了解的可以看一下ScheduleMessageService这个类
内部变量含义
延时消息定时投递相关具体实现代码在ScheduleMessageService中,先看下变量定义
- delayLevelTable定义了延迟级别和延迟时间的对应关系
- *offsetTable存放延延迟级别对应的队列消费的offset
ScheduleMessageService.start()
复制代码
延迟消息投递
其中根据,delayLevel获取消费队列id的方法如下,即queueId = delayLevel-1
public static int delayLevel2QueueId(final int delayLevel) {return delayLevel - 1;
}
复制代码
核心逻辑就是取出tagCode(延时消息持久化时,tagsCode存储的是消息投递时间),解析成消息投递时间,与当前时间戳做差,判断是否应该进行消息投递,具体进行消息投递的方法,在if (countdown
分享资源
获取以上资源请访问开源项目 点击跳转
相关文章:

【分布式技术专题】RocketMQ延迟消息实现原理和源码分析
痛点背景 业务场景 假设有这么一个需求,用户下单后如果30分钟未支付,则该订单需要被关闭。你会怎么做? 之前方案 最简单的做法,可以服务端启动个定时器,隔个几秒扫描数据库中待支付的订单,如果(当前时间-订…...

Qt中ffmpeg API存储和显示摄像头视频
Qt中ffmpeg API存储和显示摄像头视频的功能需要之前写的视频ffmpegAPI的视频播放的流程。 代码源码位置:https://download.csdn.net/download/qq_43812868/88157743?spm1001.2014.3001.5503 一、存储和显示摄像头的视频的流程 这是读取打开视频文件的流程&#x…...
jfinal tomcat部署
首先明确一下 JFinal 项目是标准的 java web 项目,其部署方式与普通 java web 项目没有任何差别。Java Web 项目在 Tomcat 下部署有一些不必要的坑需要避免,所以撰写此文方便大家绕过一些坑,以下部署以 linux 为例,windows 与此类…...
Linux - MongoDB 数据库自动退出服务问题/闪退
问题:MongoDB 自动退出服务问题 原因: 由于 Mongodb 服务,使用过多系统内存,导致系统强制停止 Mongodb 服务。 解决方法: 在 mongodb.conf 配置文件内,添加新配置内容: wiredTigerCacheSi…...
B2B2C多语言电商系统平台搭建,多用户商城系统搭建(app、小程序、微商城)
搭建B2B2C多语言电商系统平台以及多用户商城系统(包括app、小程序、微商城)的步骤如下: 步骤一:需求分析和规划 1. 确定项目的目标和范围。明确平台所需的功能、支持的语言、用户权限等要求。 2. 分析竞争对手,并确定…...

【MySQL】创建高级联结
目录 一、使用表别名 二、使用不同类型的联结 1.自联结 2.自然联结 3.外部联结 3.使用带聚集函数的联结 4.使用联结和联结条件 一、使用表别名 别名除了用于列名和计算字段外,SQL还允许给表名起别名。 起别名有两个好处: 一个是缩短SQL语句&am…...

chatGPT应用于房地产行业
作为 2023 年的房地产专业人士,您无疑认识到技术对行业的重大影响。近年来,一项技术进步席卷了世界——人工智能。人工智能彻底改变了房地产业务的各个方面,从简化管理任务到增强客户互动。 在本文中,我们将探讨几种巧妙的人工智…...
java之jmh初识及使用
最近有场景需要数据支撑json的toJsonString方法和java原生的toString方法的运行速度,因此选用了JMH测试工具。 以下代码大致意思是:初始化一个list集合,放入100个对象,然后遍历这个集合,调用fastjson的toJsonString方…...

利用状态监测和机器学习提高冷却塔性能的具体方法
在现代工业生产中,冷却塔扮演着至关重要的角色,它们的性能直接影响着工艺流程的稳定性和效率。为了确保冷却塔的正常运行和减少系统故障,状态监测和机器学习成为了关键技术。 图.冷却塔(PreMaint) 在前文《基于人工智…...
LeetCode_02_1289. 下降路径最小和 II
1289. 下降路径最小和 II 给你一个 n x n 整数矩阵 grid ,请你返回 非零偏移下降路径 数字和的最小值。 非零偏移下降路径 定义为:从 grid 数组中的每一行选择一个数字,且按顺序选出来的数字中,相邻数字不在原数组的同一列。 示…...
consul servicecheck 查看健康信息
在浏览器中地址栏输入如下信息:http://localhost:8500/v1/agent/checks 返回信息如下: { "service:springboot-security-oauth2-zuul-sso-server-1881": { "Node": "8DBQ2F05HUXZ2QO", "Check…...

什么是信息孤岛?如何打破信息孤岛?
一文让你看懂:什么是信息孤岛?信息孤岛形成的原因?以及如何打破信息孤岛? 本文重点结合了企业信息系统的需求,给出了整合企业现有信息系统的方法,能有效解决企业信息孤岛的问题,并帮助企业快速…...

Android开源 Skeleton 骨架屏
目录 一、简介 二、效果图 三、引用 Skeleton 添加jitpack 仓库 添加依赖: 四、使用 Skeleton 1、VIew 骨架屏使用 ViewSkeletonScreen 2、列表类View 骨架屏 RecyclerViewSkeletonScreen、GridViewSkeletonScreen、 ListViewSkeletonScreen 一、简介 骨架屏的作用是…...

都说IT就业难?到底难在哪?
现在网上关于IT行业,劝退的帖子真的很多,很多人看了后无比焦虑,没入行的,还没开始学,就担心找不到工作了;在行业内的,想跳槽的也纷纷开始摆烂,觉得市场根本没啥机会,简历…...

STM32芯片的内部架构介绍
STM32芯片由内核和片上外设两部分组成。STM32F103采用Cortex-M3内核,该内核由ARM公司设计。芯片生产厂商ST则负责在内核之外设计部件并生产整个芯片。这些内核之外的部件被称为核外外设或片上外设,如GPIO、USART(串口)、I2C、SPI等…...
Angular FormControl value属性的一些事
背景:一个输入校验,允许输入多行,每一行是ip或网段。写了个校验,将其按行拆分后单独校验。 1. FormControl无法深复制 使用JSON.parse(JSON.stringify(control))进行简单深复制报错,因为不是json类型;使用d…...
Nim游戏:取石头
(一)一堆取石头 背景: 在博弈论中,有一种称为Nim游戏的经典问题,它涉及到取石子的问题,其中有许多变种。Nim游戏是一种零和博弈,即两名玩家交替行动,每次只能从一堆物品中取走一定数…...

springboot国际化
springboot国际化 不需要引入额外的jar包 参考:https://zhuanlan.zhihu.com/p/551605839 1.rources要创建Resource Bundle 2.yml配置中引入Resource Bundle 引入Resource Bundle spring:messages:encoding: UTF-8basename: i18n/messages_common3.创建国际化工具…...
12种不宜使用的Javascript语法
1. Javascript有两组相等运算符,一组是和!,另一组是和!。前者只比较值的相等,后者除了值以外,还比较类型是否相同。 请尽量不要使用前一组,永远只使用和!。因为默认会进行类型转换,规则十分难记。如果你…...

vue3+element-plus点击列表中的图片预览时,图片被表格覆盖
文章目录 问题解决 问题 视觉 点击图片进行预览,但还能继续选中其他的图片进行预览,鼠标放在表格上,那一行表格也会选中,如图所示第一行的效果。 代码 <el-table-column prop"id" label"ID" width"…...
树莓派超全系列教程文档--(62)使用rpicam-app通过网络流式传输视频
使用rpicam-app通过网络流式传输视频 使用 rpicam-app 通过网络流式传输视频UDPTCPRTSPlibavGStreamerRTPlibcamerasrc GStreamer 元素 文章来源: http://raspberry.dns8844.cn/documentation 原文网址 使用 rpicam-app 通过网络流式传输视频 本节介绍来自 rpica…...
Spring Boot面试题精选汇总
🤟致敬读者 🟩感谢阅读🟦笑口常开🟪生日快乐⬛早点睡觉 📘博主相关 🟧博主信息🟨博客首页🟫专栏推荐🟥活动信息 文章目录 Spring Boot面试题精选汇总⚙️ **一、核心概…...
【决胜公务员考试】求职OMG——见面课测验1
2025最新版!!!6.8截至答题,大家注意呀! 博主码字不易点个关注吧,祝期末顺利~~ 1.单选题(2分) 下列说法错误的是:( B ) A.选调生属于公务员系统 B.公务员属于事业编 C.选调生有基层锻炼的要求 D…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...

【JVM】Java虚拟机(二)——垃圾回收
目录 一、如何判断对象可以回收 (一)引用计数法 (二)可达性分析算法 二、垃圾回收算法 (一)标记清除 (二)标记整理 (三)复制 (四ÿ…...

Unity VR/MR开发-VR开发与传统3D开发的差异
视频讲解链接:【XR马斯维】VR/MR开发与传统3D开发的差异【UnityVR/MR开发教程--入门】_哔哩哔哩_bilibili...
LangChain 中的文档加载器(Loader)与文本切分器(Splitter)详解《二》
🧠 LangChain 中 TextSplitter 的使用详解:从基础到进阶(附代码) 一、前言 在处理大规模文本数据时,特别是在构建知识库或进行大模型训练与推理时,文本切分(Text Splitting) 是一个…...
用神经网络读懂你的“心情”:揭秘情绪识别系统背后的AI魔法
用神经网络读懂你的“心情”:揭秘情绪识别系统背后的AI魔法 大家好,我是Echo_Wish。最近刷短视频、看直播,有没有发现,越来越多的应用都开始“懂你”了——它们能感知你的情绪,推荐更合适的内容,甚至帮客服识别用户情绪,提升服务体验。这背后,神经网络在悄悄发力,撑起…...
FOPLP vs CoWoS
以下是 FOPLP(Fan-out panel-level packaging 扇出型面板级封装)与 CoWoS(Chip on Wafer on Substrate)两种先进封装技术的详细对比分析,涵盖技术原理、性能、成本、应用场景及市场趋势等维度: 一、技术原…...

生信服务器 | 做生信为什么推荐使用Linux服务器?
原文链接:生信服务器 | 做生信为什么推荐使用Linux服务器? 一、 做生信为什么推荐使用服务器? 大家好,我是小杜。在做生信分析的同学,或是将接触学习生信分析的同学,<font style"color:rgb(53, 1…...