当前位置: 首页 > news >正文

RocketMQ 延迟消息

RocketMQ 延迟消息

RocketMQ 消费者启动流程

什么是延迟消息

RocketMQ 延迟消息是指,生产者发送消息给消费者消息,消费者需要等待一段时间后才能消费到。

使用场景

用户下单之后,15分钟未支付,对支付账单进行提醒或者关单处理。

RocketMQ 开源版本的消息不支持任意时间精度,只支持5s 10s 1m等等。

Broker 如何处理延迟消息

消息投递如下:

  1. 生产者发送一个延迟消息到一个 topic
  2. Broker 判断是个延迟消息后,将消息暂存
  3. Broker 通过延迟服务, 先检查消息是否过期,如果到期将消息投递到目标 topic
  4. 消费者消费topic中的投递延迟消息。

开源RocketMQ 的消息不支持任意精度,默认支持 18个 level:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

Broker 在启动的时候,会创建一个内部 topic:“SCHEDULE_TOPIC_XXXX” 根据延迟 level 数量,创建对应数量的 队列。 也就是说 18 level 对应了18 个队列。

具体可以在 代码TopicConfigManager.java 中 看到:

private static final int SCHEDULE_TOPIC_QUEUE_NUM = 18;

要注意的是,Broker 一般是集群模式
部署,也就是说,每个Broker 都会有18个队列。

TopicConfigManager#TopicConfigManager(BrokerController brokerController)

生产者消息延迟发送

代码示例如下:

Message msg=new Message();
msg.setTopic("TopicA");
msg.setTags("Tag");
msg.setBody("this is a delay message".getBytes());
//设置延迟level为5,对应延迟1分钟
msg.setDelayTimeLevel(5);
producer.send(msg);

Broker 存储延迟消息

上一篇文章已经谈到,Broker 收到消费者消息后,会进行消息存储,然后再转发到消费队列(ConsumerQueue),然后再推给消费者。

其实一旦消息转发到

存储延迟消息的流程也类似

  1. 确定延迟消息投递到topic 哪个队列。存储生产者写入的消息时,将消息转发到 ConsumeQueue 中,消费者就能消费到。 延迟消息不能立即消息到,于是将 topic 名称修改为 SCHEDULE_TOPIC_XXX,并根据延迟消息级别,确定投递到哪个队列上。同时还会将原来消息要发送到的目标 topic 和队列记录投递到哪个队列。

代码在CommitLog#asyncPutMessage 中

设置延迟消息的投递队列信息代码如下:

 // Delay Deliveryif (msg.getDelayTimeLevel() > 0) {// 如果设置的级别超过了最大级别,重置延迟级别if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}
// 计算延迟消息应该投递到 SCHEDULE_TOPIC_XXXX 到哪个队列。topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueId// 记录原始 topic ,queueid,方便后期投递到目标 topicMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 更新消息投递目标为 SCHEDULE_TOPIC_XXX,queueIdmsg.setTopic(topic);msg.setQueueId(queueId);}

消息转发

消息转发过程其实中会对延迟消息做一些特殊处理

CommitLog中的消息转发到CosumeQueue中是异步进行的。在转发过程中,会对延迟消息进行特殊处理,主要是计算这条延迟消息需要在什么时候进行投递。

相关文章:

RocketMQ 延迟消息

RocketMQ 延迟消息 RocketMQ 消费者启动流程 什么是延迟消息 RocketMQ 延迟消息是指,生产者发送消息给消费者消息,消费者需要等待一段时间后才能消费到。 使用场景 用户下单之后,15分钟未支付,对支付账单进行提醒或者关单处理…...

Dex文件混淆(一):BlackObfuscator

Dex文件混淆(一):BlackObfuscator 首发地址:http://zhuoyue360.com/crack/105.html 文章目录 Dex文件混淆(一):BlackObfuscator1. 前言2.小试牛刀3. 参考学习1. dex2jar源码简析2. BlackObfuscator简析1. 控制流平坦化1. 控制流平坦化基本介绍 2. Dex解析…...

Linux下编译arm 32 出错(/bin/bash: arm-none-linux-gnueabi-gcc: command not found )

一、arm-none-linux-gnueabi-gcc不能再64位系统下下编译ARM的32位库的问题解决方法如下: sudo apt-get install lib32stdc6 sudo apt-get install lib32ncurses5 sudo apt-get install lib32z1 二、交叉编译工具没有写入环境变量或写错,重新写入环境变量…...

最近遇到的两个小问题总结:git问题和node问题

这两个问题都是我帮别人看问题的解决的,在windows系统上遇到的: 1、git没有配置全局变量 在使用git的时候,报’git‘不是内部或外部命令,也不是可运行的程序。然后再在其他文件下面试一下(git --version)…...

Java # Spring(1)

一、概念 1、核心技术:依赖注入(DI),AOP,事件(events),资源,i18n,验证,数据绑定,类型转换,SpEL。 2、测试:模…...

SCL更换阿里数据源

问题: zabbix安装前端环境报错 yum install zabbix-web-mysql-scl zabbix-apache-conf-scl -y 报错:Could not retrieve mirrorlist http://mirrorlist.centos.org/ 能上网 但是不能ping通http://mirrorlist.centos.org/ 解决: 修改repo数…...

【web逆向】全报文加密流量的去加密测试方案

aHR0cHM6Ly90ZGx6LmNjYi5jb20vIy9sb2dpbg 国密混合 WEB JS逆向篇 先看报文:请求和响应都是全加密,这种情况就不像参数加密可以方便全文搜索定位加密代码,但因为前端必须解密响应的密文,因此万能的方法就是搜索拦截器&#xff0c…...

Django实现音乐网站 ⑼

使用Python Django框架制作一个音乐网站, 本篇主要是后台对专辑、首页轮播图原有功能的基础上进行部分功能实现和显示优化。 目录 专辑功能优化 新增编辑 专辑语种改为下拉选项 添加单曲优化显示 新增单曲多选 更新歌手专辑数、专辑单曲数 获取歌手专辑数 保…...

【脚踢数据结构】

(꒪ꇴ꒪ ),Hello我是祐言QAQ我的博客主页:C/C语言,Linux基础,ARM开发板,软件配置等领域博主🌍快上🚘,一起学习,让我们成为一个强大的攻城狮!送给自己和读者的一句鸡汤🤔&…...

uni-app使用vue语法进行开发注意事项

目录 uni-app 项目目录结构 生命周期 路由 路由跳转 页面栈 条件编译 文本渲染 样式渲染 条件渲染 遍历渲染 事件处理 事件修饰符 uni-app 项目目录结构 组件/标签 使用(类似)小程序 语法/结构 使用vue 具体项目目录如下: 生命…...

数据结构---B树

目录标题 B-树的由来B-树的规则和原理B-树的插入分析B-树的插入实现准备工作find函数insert中序遍历 B-树的性能测试B-树的删除B树B树的元素插入B*树的介绍 B-树的由来 在前面的学习过程中,我们见过很多搜索结构比比如说顺序查找,二分查找,搜…...

c++11以后c++标准库定义的固定位宽的整数类型(Fixed width integer types)

Fixed width integer types Fixed width integer types (since C11) - cppreference.com 相关定义文件如下: Windows系统MSVC: Microsoft Visual Studio\2022\Community\VC\Tools\MSVC\14.33.31629\include\cstdint Linux系统GCC: gcc\libstdc-v3\include\c_g…...

Object.values()

Object.values() 是ES2017新增的一个对象方法,它可以将一个对象自身的所有可枚举属性值,组成一个数组返回。 基本语法: Object.values(obj)示例: jsCopy codeconst obj {foo: bar,baz: 42 };Object.values(obj); // [bar, 42]Object.values()的特点: 只返回可枚举的属性值…...

Oracle 开发篇+Java调用OJDBC访问Oracle数据库

标签:JAVA语言、Oracle数据库、Java访问Oracle数据库释义:OJDBC是Oracle公司提供的Java数据库连接驱动程序 ★ 实验环境 ※ Oracle 19c ※ OJDBC8 ※ JDK 8 ★ Java代码案例 package PAC_001; import java.sql.Connection; import java.sql.ResultSet…...

linux 查询后台任务及杀掉进程

查看后台任务命令 jobs -l删除后台进程命令 kill -9 28719...

【Vue3 博物馆管理系统】使用Vue3、Element-plus菜单组件构建前台用户菜单

系列文章目录 第一章 定制上中下(顶部菜单、底部区域、中间主区域显示)三层结构首页 第二章 使用Vue3、Element-plus菜单组件构建菜单 [第三章 使用Vue3、Element-plus菜单组件构建轮播图] [第四章 使用Vue3、Element-plus菜单组件构建组图文章] 文章目…...

Windows 11清除无效、回收站、过期、缓存、补丁更新文件

Windows 11与之前的Windows版本类似,也需要定期清理无效、垃圾、过期、缓存文件来保持系统性能和存储空间的优化。以下是在Windows 11中进行这些清理操作的一些建议方法: 磁盘清理工具 Windows 11内置了磁盘清理工具,可以帮助你删除临时文件…...

栈和队列详解(2)

目录 一、什么是队列? 二、创建一个我们自己的队列 1.前置准备 1.1需要的三个文件 1.2结构体的创建和头文件的引用 2.接口的实现 2.1初始化队列 2.2入队 2.3队列元素个数和判空 2.4取队头元素和队尾元素 2.5出队 2.6摧毁队列 2.7测试接口 三、所有代码 1.…...

EMC传导干扰滤波电路设计

1.EMC概念 2.EMC 传导干扰详解 EMC传导滤波电路的设计--传导干扰详解 3.EMC 传导干扰的测量方法 4.EMC 滤波电路设计 5.浪涌抑制电路设计 6.开关电源的安全要求 7.当前开关电源灯的应用...

【win10专业版远程控制】 自带远程桌面公司内网电脑

使用win10专业版自带远程桌面公司内网电脑 文章目录 使用win10专业版自带远程桌面公司内网电脑 在现代社会中,各类电子硬件已经遍布我们身边,除了应用在个人娱乐场景的消费类电子产品外,各项工作也离不开电脑的帮助,特别是涉及到数…...

Leetcode 3576. Transform Array to All Equal Elements

Leetcode 3576. Transform Array to All Equal Elements 1. 解题思路2. 代码实现 题目链接:3576. Transform Array to All Equal Elements 1. 解题思路 这一题思路上就是分别考察一下是否能将其转化为全1或者全-1数组即可。 至于每一种情况是否可以达到&#xf…...

循环冗余码校验CRC码 算法步骤+详细实例计算

通信过程:(白话解释) 我们将原始待发送的消息称为 M M M,依据发送接收消息双方约定的生成多项式 G ( x ) G(x) G(x)(意思就是 G ( x ) G(x) G(x) 是已知的)&#xff0…...

C# SqlSugar:依赖注入与仓储模式实践

C# SqlSugar:依赖注入与仓储模式实践 在 C# 的应用开发中,数据库操作是必不可少的环节。为了让数据访问层更加简洁、高效且易于维护,许多开发者会选择成熟的 ORM(对象关系映射)框架,SqlSugar 就是其中备受…...

CMake 从 GitHub 下载第三方库并使用

有时我们希望直接使用 GitHub 上的开源库,而不想手动下载、编译和安装。 可以利用 CMake 提供的 FetchContent 模块来实现自动下载、构建和链接第三方库。 FetchContent 命令官方文档✅ 示例代码 我们将以 fmt 这个流行的格式化库为例,演示如何: 使用 FetchContent 从 GitH…...

高防服务器能够抵御哪些网络攻击呢?

高防服务器作为一种有着高度防御能力的服务器,可以帮助网站应对分布式拒绝服务攻击,有效识别和清理一些恶意的网络流量,为用户提供安全且稳定的网络环境,那么,高防服务器一般都可以抵御哪些网络攻击呢?下面…...

3-11单元格区域边界定位(End属性)学习笔记

返回一个Range 对象,只读。该对象代表包含源区域的区域上端下端左端右端的最后一个单元格。等同于按键 End 向上键(End(xlUp))、End向下键(End(xlDown))、End向左键(End(xlToLeft)End向右键(End(xlToRight)) 注意:它移动的位置必须是相连的有内容的单元格…...

Rapidio门铃消息FIFO溢出机制

关于RapidIO门铃消息FIFO的溢出机制及其与中断抖动的关系,以下是深入解析: 门铃FIFO溢出的本质 在RapidIO系统中,门铃消息FIFO是硬件控制器内部的缓冲区,用于临时存储接收到的门铃消息(Doorbell Message)。…...

九天毕昇深度学习平台 | 如何安装库?

pip install 库名 -i https://pypi.tuna.tsinghua.edu.cn/simple --user 举个例子: 报错 ModuleNotFoundError: No module named torch 那么我需要安装 torch pip install torch -i https://pypi.tuna.tsinghua.edu.cn/simple --user pip install 库名&#x…...

服务器--宝塔命令

一、宝塔面板安装命令 ⚠️ 必须使用 root 用户 或 sudo 权限执行! sudo su - 1. CentOS 系统: yum install -y wget && wget -O install.sh http://download.bt.cn/install/install_6.0.sh && sh install.sh2. Ubuntu / Debian 系统…...

Git 3天2K星标:Datawhale 的 Happy-LLM 项目介绍(附教程)

引言 在人工智能飞速发展的今天,大语言模型(Large Language Models, LLMs)已成为技术领域的焦点。从智能写作到代码生成,LLM 的应用场景不断扩展,深刻改变了我们的工作和生活方式。然而,理解这些模型的内部…...