当前位置: 首页 > news >正文

【分布式技术专题】RocketMQ延迟消息实现原理和源码分析

痛点背景

业务场景

假设有这么一个需求,用户下单后如果30分钟未支付,则该订单需要被关闭。你会怎么做?

之前方案

最简单的做法,可以服务端启动个定时器,隔个几秒扫描数据库中待支付的订单,如果(当前时间-订单创建时间)>30分钟,则关闭订单。

方案评估
  • 优点:是实现简单,缺点呢?
  • *缺点:定时扫描意味着隔个几秒就得查一次数据库,频率高的情况下,如果数据库中订单总量特别大,这种高频扫描会对数据库带来一定压力,待付款订单特别多时(做个爆品秒杀活动,或者啥促销活动),若一次性查到内存中,容易引起宕机,需要分页查询,多少也会有一定数据库层面压力。
延时队列出现
  • 能够在指定时间间隔后触发某个业务操作
  • 能够应对业务数据量特别大的特殊场景

RocketMQ延时消息能够完美的解决上述需求,正常的消息在投递后会立马被消费者所消费,而延时消息在投递时,需要设置指定的延时级别(不同延迟级别对应不同延迟时间),即等到特定的时间间隔后消息才会被消费者消费,这样就将数据库层面的压力转移到了MQ中,也不需要手写定时器,降低了业务复杂度,同时MQ自带削峰功能,能够很好的应对业务高峰。

功能特点

  • RocketMQ支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息;
  • 预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h;
  • 在消息创建的时候,调用 setDelayTimeLevel(int level) 方法设置延迟时间;
  • *broker在接收到延迟消息的时候会把对应延迟级别的消息先存储到对应的延迟队列中,等延迟消息时间到达时,会把消息重新存储到对应的topic的queue里面。

Broker处理延迟消息

延时队列生产者端:

延时消息的关键点在于Producer生产者需要给消息设置特定延时级别,消费端代码与正常消费者没有差别。

public class Producer {private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("111.231.110.149:9876");producer.start();for (int i = 0; i < 10; i++) {try {Message msg = new Message("TopicTest" ,"TagA" ,("test message" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));msg.setDelayTimeLevel(3);SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}producer.shutdown();}
}
复制代码
初始化

DefaultMessageStore在启动时,会调用ScheduleMessageService#load()方法来加载消息消费进度和初始化延迟级别对应map,然后调用ScheduleMessageService#start()方法来启动类

load方法

public boolean load() {boolean result = super.load();result = result && this.parseDelayLevel();return result;
}
复制代码

ScheduleMessageService继承自ConfigManager类,super.load()方法对应

public boolean load() {String fileName = null;try {fileName = this.configFilePath();String jsonString = MixAll.file2String(fileName);if (null == jsonString || jsonString.length() == 0) {return this.loadBak();} else {this.decode(jsonString);log.info("load " + fileName + " OK");return true;}} catch (Exception e) {log.error("load " + fileName + " failed, and try to load backup file", e);return this.loadBak();}
}
复制代码

延时队列源码分析:

先从延时消息延迟级别设置与broker端消息持久化入手。

具体实现

RocketMQ发送延时消息时先把消息按照延迟时间段发送到指定的队列中(rocketmq把每种延迟时间段的消息都存放到同一个队列中)然后通过一个定时器进行轮训这些队列,查看消息是否到期,如果到期就把这个消息发送到指定topic的队列中,这样的好处是同一队列中的消息延时时间是一致的,还有一个好处是这个队列中的消息时按照消息到期时间进行递增排序的,说的简单直白就是队列中消息越靠前的到期时间越早。

启动延迟消息定时任务

如果想要深入了解的可以看一下ScheduleMessageService这个类

内部变量含义

延时消息定时投递相关具体实现代码在ScheduleMessageService中,先看下变量定义

  • delayLevelTable定义了延迟级别和延迟时间的对应关系
  • *offsetTable存放延延迟级别对应的队列消费的offset
ScheduleMessageService.start()
复制代码

延迟消息投递

其中根据,delayLevel获取消费队列id的方法如下,即queueId = delayLevel-1

public static int delayLevel2QueueId(final int delayLevel) {return delayLevel - 1;
}
复制代码

核心逻辑就是取出tagCode(延时消息持久化时,tagsCode存储的是消息投递时间),解析成消息投递时间,与当前时间戳做差,判断是否应该进行消息投递,具体进行消息投递的方法,在if (countdown

分享资源

资源分享
获取以上资源请访问开源项目 点击跳转

相关文章:

【分布式技术专题】RocketMQ延迟消息实现原理和源码分析

痛点背景 业务场景 假设有这么一个需求&#xff0c;用户下单后如果30分钟未支付&#xff0c;则该订单需要被关闭。你会怎么做&#xff1f; 之前方案 最简单的做法&#xff0c;可以服务端启动个定时器&#xff0c;隔个几秒扫描数据库中待支付的订单&#xff0c;如果(当前时间-订…...

Qt中ffmpeg API存储和显示摄像头视频

Qt中ffmpeg API存储和显示摄像头视频的功能需要之前写的视频ffmpegAPI的视频播放的流程。 代码源码位置&#xff1a;https://download.csdn.net/download/qq_43812868/88157743?spm1001.2014.3001.5503 一、存储和显示摄像头的视频的流程 这是读取打开视频文件的流程&#x…...

jfinal tomcat部署

首先明确一下 JFinal 项目是标准的 java web 项目&#xff0c;其部署方式与普通 java web 项目没有任何差别。Java Web 项目在 Tomcat 下部署有一些不必要的坑需要避免&#xff0c;所以撰写此文方便大家绕过一些坑&#xff0c;以下部署以 linux 为例&#xff0c;windows 与此类…...

Linux - MongoDB 数据库自动退出服务问题/闪退

问题&#xff1a;MongoDB 自动退出服务问题 原因&#xff1a; 由于 Mongodb 服务&#xff0c;使用过多系统内存&#xff0c;导致系统强制停止 Mongodb 服务。 解决方法&#xff1a; 在 mongodb.conf 配置文件内&#xff0c;添加新配置内容&#xff1a; wiredTigerCacheSi…...

B2B2C多语言电商系统平台搭建,多用户商城系统搭建(app、小程序、微商城)

搭建B2B2C多语言电商系统平台以及多用户商城系统&#xff08;包括app、小程序、微商城&#xff09;的步骤如下&#xff1a; 步骤一&#xff1a;需求分析和规划 1. 确定项目的目标和范围。明确平台所需的功能、支持的语言、用户权限等要求。 2. 分析竞争对手&#xff0c;并确定…...

【MySQL】创建高级联结

目录 一、使用表别名 二、使用不同类型的联结 1.自联结 2.自然联结 3.外部联结 3.使用带聚集函数的联结 4.使用联结和联结条件 一、使用表别名 别名除了用于列名和计算字段外&#xff0c;SQL还允许给表名起别名。 起别名有两个好处&#xff1a; 一个是缩短SQL语句&am…...

chatGPT应用于房地产行业

作为 2023 年的房地产专业人士&#xff0c;您无疑认识到技术对行业的重大影响。近年来&#xff0c;一项技术进步席卷了世界——人工智能。人工智能彻底改变了房地产业务的各个方面&#xff0c;从简化管理任务到增强客户互动。 在本文中&#xff0c;我们将探讨几种巧妙的人工智…...

java之jmh初识及使用

最近有场景需要数据支撑json的toJsonString方法和java原生的toString方法的运行速度&#xff0c;因此选用了JMH测试工具。 以下代码大致意思是&#xff1a;初始化一个list集合&#xff0c;放入100个对象&#xff0c;然后遍历这个集合&#xff0c;调用fastjson的toJsonString方…...

利用状态监测和机器学习提高冷却塔性能的具体方法

在现代工业生产中&#xff0c;冷却塔扮演着至关重要的角色&#xff0c;它们的性能直接影响着工艺流程的稳定性和效率。为了确保冷却塔的正常运行和减少系统故障&#xff0c;状态监测和机器学习成为了关键技术。 图.冷却塔&#xff08;PreMaint&#xff09; 在前文《基于人工智…...

LeetCode_02_1289. 下降路径最小和 II

1289. 下降路径最小和 II 给你一个 n x n 整数矩阵 grid &#xff0c;请你返回 非零偏移下降路径 数字和的最小值。 非零偏移下降路径 定义为&#xff1a;从 grid 数组中的每一行选择一个数字&#xff0c;且按顺序选出来的数字中&#xff0c;相邻数字不在原数组的同一列。 示…...

consul servicecheck 查看健康信息

在浏览器中地址栏输入如下信息&#xff1a;http://localhost:8500/v1/agent/checks 返回信息如下&#xff1a; { "service:springboot-security-oauth2-zuul-sso-server-1881": { "Node": "8DBQ2F05HUXZ2QO", "Check…...

什么是信息孤岛?如何打破信息孤岛?

一文让你看懂&#xff1a;什么是信息孤岛&#xff1f;信息孤岛形成的原因&#xff1f;以及如何打破信息孤岛&#xff1f; 本文重点结合了企业信息系统的需求&#xff0c;给出了整合企业现有信息系统的方法&#xff0c;能有效解决企业信息孤岛的问题&#xff0c;并帮助企业快速…...

Android开源 Skeleton 骨架屏

目录 一、简介 二、效果图 三、引用 Skeleton 添加jitpack 仓库 添加依赖: 四、使用 Skeleton 1、VIew 骨架屏使用 ViewSkeletonScreen 2、列表类View 骨架屏 RecyclerViewSkeletonScreen、GridViewSkeletonScreen、 ListViewSkeletonScreen 一、简介 骨架屏的作用是…...

都说IT就业难?到底难在哪?

现在网上关于IT行业&#xff0c;劝退的帖子真的很多&#xff0c;很多人看了后无比焦虑&#xff0c;没入行的&#xff0c;还没开始学&#xff0c;就担心找不到工作了&#xff1b;在行业内的&#xff0c;想跳槽的也纷纷开始摆烂&#xff0c;觉得市场根本没啥机会&#xff0c;简历…...

STM32芯片的内部架构介绍

STM32芯片由内核和片上外设两部分组成。STM32F103采用Cortex-M3内核&#xff0c;该内核由ARM公司设计。芯片生产厂商ST则负责在内核之外设计部件并生产整个芯片。这些内核之外的部件被称为核外外设或片上外设&#xff0c;如GPIO、USART&#xff08;串口&#xff09;、I2C、SPI等…...

Angular FormControl value属性的一些事

背景&#xff1a;一个输入校验&#xff0c;允许输入多行&#xff0c;每一行是ip或网段。写了个校验&#xff0c;将其按行拆分后单独校验。 1. FormControl无法深复制 使用JSON.parse(JSON.stringify(control))进行简单深复制报错&#xff0c;因为不是json类型&#xff1b;使用d…...

Nim游戏:取石头

&#xff08;一&#xff09;一堆取石头 背景&#xff1a; 在博弈论中&#xff0c;有一种称为Nim游戏的经典问题&#xff0c;它涉及到取石子的问题&#xff0c;其中有许多变种。Nim游戏是一种零和博弈&#xff0c;即两名玩家交替行动&#xff0c;每次只能从一堆物品中取走一定数…...

springboot国际化

springboot国际化 不需要引入额外的jar包 参考&#xff1a;https://zhuanlan.zhihu.com/p/551605839 1.rources要创建Resource Bundle 2.yml配置中引入Resource Bundle 引入Resource Bundle spring:messages:encoding: UTF-8basename: i18n/messages_common3.创建国际化工具…...

12种不宜使用的Javascript语法

1. Javascript有两组相等运算符&#xff0c;一组是和!&#xff0c;另一组是和!。前者只比较值的相等&#xff0c;后者除了值以外&#xff0c;还比较类型是否相同。 请尽量不要使用前一组&#xff0c;永远只使用和!。因为默认会进行类型转换&#xff0c;规则十分难记。如果你…...

vue3+element-plus点击列表中的图片预览时,图片被表格覆盖

文章目录 问题解决 问题 视觉 点击图片进行预览&#xff0c;但还能继续选中其他的图片进行预览&#xff0c;鼠标放在表格上&#xff0c;那一行表格也会选中&#xff0c;如图所示第一行的效果。 代码 <el-table-column prop"id" label"ID" width"…...

椭圆曲线密码学(ECC)

一、ECC算法概述 椭圆曲线密码学&#xff08;Elliptic Curve Cryptography&#xff09;是基于椭圆曲线数学理论的公钥密码系统&#xff0c;由Neal Koblitz和Victor Miller在1985年独立提出。相比RSA&#xff0c;ECC在相同安全强度下密钥更短&#xff08;256位ECC ≈ 3072位RSA…...

测试markdown--肇兴

day1&#xff1a; 1、去程&#xff1a;7:04 --11:32高铁 高铁右转上售票大厅2楼&#xff0c;穿过候车厅下一楼&#xff0c;上大巴车 &#xffe5;10/人 **2、到达&#xff1a;**12点多到达寨子&#xff0c;买门票&#xff0c;美团/抖音&#xff1a;&#xffe5;78人 3、中饭&a…...

select、poll、epoll 与 Reactor 模式

在高并发网络编程领域&#xff0c;高效处理大量连接和 I/O 事件是系统性能的关键。select、poll、epoll 作为 I/O 多路复用技术的代表&#xff0c;以及基于它们实现的 Reactor 模式&#xff0c;为开发者提供了强大的工具。本文将深入探讨这些技术的底层原理、优缺点。​ 一、I…...

均衡后的SNRSINR

本文主要摘自参考文献中的前两篇&#xff0c;相关文献中经常会出现MIMO检测后的SINR不过一直没有找到相关数学推到过程&#xff0c;其中文献[1]中给出了相关原理在此仅做记录。 1. 系统模型 复信道模型 n t n_t nt​ 根发送天线&#xff0c; n r n_r nr​ 根接收天线的 MIMO 系…...

Pinocchio 库详解及其在足式机器人上的应用

Pinocchio 库详解及其在足式机器人上的应用 Pinocchio (Pinocchio is not only a nose) 是一个开源的 C 库&#xff0c;专门用于快速计算机器人模型的正向运动学、逆向运动学、雅可比矩阵、动力学和动力学导数。它主要关注效率和准确性&#xff0c;并提供了一个通用的框架&…...

深入解析光敏传感技术:嵌入式仿真平台如何重塑电子工程教学

一、光敏传感技术的物理本质与系统级实现挑战 光敏电阻作为经典的光电传感器件&#xff0c;其工作原理根植于半导体材料的光电导效应。当入射光子能量超过材料带隙宽度时&#xff0c;价带电子受激发跃迁至导带&#xff0c;形成电子-空穴对&#xff0c;导致材料电导率显著提升。…...

Docker环境下安装 Elasticsearch + IK 分词器 + Pinyin插件 + Kibana(适配7.10.1)

做RAG自己打算使用esmilvus自己开发一个&#xff0c;安装时好像网上没有比较新的安装方法&#xff0c;然后找了个旧的方法对应试试&#xff1a; &#x1f680; 本文将手把手教你在 Docker 环境中部署 Elasticsearch 7.10.1 IK分词器 拼音插件 Kibana&#xff0c;适配中文搜索…...

C++ Saucer 编写Windows桌面应用

文章目录 一、背景二、Saucer 简介核心特性典型应用场景 三、生成自己的项目四、以Win32项目方式构建Win32项目禁用最大化按钮 五、总结 一、背景 使用Saucer框架&#xff0c;开发Windows桌面应用&#xff0c;把一个html页面作为GUI设计放到Saucer里&#xff0c;隐藏掉运行时弹…...

[10-1]I2C通信协议 江协科技学习笔记(17个知识点)

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17...

qt 双缓冲案例对比

双缓冲 1.双缓冲原理 单缓冲&#xff1a;在paintEvent中直接绘制到屏幕&#xff0c;绘制过程被用户看到 双缓冲&#xff1a;先在redrawBuffer绘制到缓冲区&#xff0c;然后一次性显示完整结果 代码结构 单缓冲&#xff1a;所有绘制逻辑在paintEvent中 双缓冲&#xff1a;绘制…...