当前位置: 首页 > news >正文

RocketMQ 延迟消息

RocketMQ 延迟消息

RocketMQ 消费者启动流程

什么是延迟消息

RocketMQ 延迟消息是指,生产者发送消息给消费者消息,消费者需要等待一段时间后才能消费到。

使用场景

用户下单之后,15分钟未支付,对支付账单进行提醒或者关单处理。

RocketMQ 开源版本的消息不支持任意时间精度,只支持5s 10s 1m等等。

Broker 如何处理延迟消息

消息投递如下:

  1. 生产者发送一个延迟消息到一个 topic
  2. Broker 判断是个延迟消息后,将消息暂存
  3. Broker 通过延迟服务, 先检查消息是否过期,如果到期将消息投递到目标 topic
  4. 消费者消费topic中的投递延迟消息。

开源RocketMQ 的消息不支持任意精度,默认支持 18个 level:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

Broker 在启动的时候,会创建一个内部 topic:“SCHEDULE_TOPIC_XXXX” 根据延迟 level 数量,创建对应数量的 队列。 也就是说 18 level 对应了18 个队列。

具体可以在 代码TopicConfigManager.java 中 看到:

private static final int SCHEDULE_TOPIC_QUEUE_NUM = 18;

要注意的是,Broker 一般是集群模式
部署,也就是说,每个Broker 都会有18个队列。

TopicConfigManager#TopicConfigManager(BrokerController brokerController)

生产者消息延迟发送

代码示例如下:

Message msg=new Message();
msg.setTopic("TopicA");
msg.setTags("Tag");
msg.setBody("this is a delay message".getBytes());
//设置延迟level为5,对应延迟1分钟
msg.setDelayTimeLevel(5);
producer.send(msg);

Broker 存储延迟消息

上一篇文章已经谈到,Broker 收到消费者消息后,会进行消息存储,然后再转发到消费队列(ConsumerQueue),然后再推给消费者。

其实一旦消息转发到

存储延迟消息的流程也类似

  1. 确定延迟消息投递到topic 哪个队列。存储生产者写入的消息时,将消息转发到 ConsumeQueue 中,消费者就能消费到。 延迟消息不能立即消息到,于是将 topic 名称修改为 SCHEDULE_TOPIC_XXX,并根据延迟消息级别,确定投递到哪个队列上。同时还会将原来消息要发送到的目标 topic 和队列记录投递到哪个队列。

代码在CommitLog#asyncPutMessage 中

设置延迟消息的投递队列信息代码如下:

 // Delay Deliveryif (msg.getDelayTimeLevel() > 0) {// 如果设置的级别超过了最大级别,重置延迟级别if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}
// 计算延迟消息应该投递到 SCHEDULE_TOPIC_XXXX 到哪个队列。topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueId// 记录原始 topic ,queueid,方便后期投递到目标 topicMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 更新消息投递目标为 SCHEDULE_TOPIC_XXX,queueIdmsg.setTopic(topic);msg.setQueueId(queueId);}

消息转发

消息转发过程其实中会对延迟消息做一些特殊处理

CommitLog中的消息转发到CosumeQueue中是异步进行的。在转发过程中,会对延迟消息进行特殊处理,主要是计算这条延迟消息需要在什么时候进行投递。

相关文章:

RocketMQ 延迟消息

RocketMQ 延迟消息 RocketMQ 消费者启动流程 什么是延迟消息 RocketMQ 延迟消息是指,生产者发送消息给消费者消息,消费者需要等待一段时间后才能消费到。 使用场景 用户下单之后,15分钟未支付,对支付账单进行提醒或者关单处理…...

Dex文件混淆(一):BlackObfuscator

Dex文件混淆(一):BlackObfuscator 首发地址:http://zhuoyue360.com/crack/105.html 文章目录 Dex文件混淆(一):BlackObfuscator1. 前言2.小试牛刀3. 参考学习1. dex2jar源码简析2. BlackObfuscator简析1. 控制流平坦化1. 控制流平坦化基本介绍 2. Dex解析…...

Linux下编译arm 32 出错(/bin/bash: arm-none-linux-gnueabi-gcc: command not found )

一、arm-none-linux-gnueabi-gcc不能再64位系统下下编译ARM的32位库的问题解决方法如下: sudo apt-get install lib32stdc6 sudo apt-get install lib32ncurses5 sudo apt-get install lib32z1 二、交叉编译工具没有写入环境变量或写错,重新写入环境变量…...

最近遇到的两个小问题总结:git问题和node问题

这两个问题都是我帮别人看问题的解决的,在windows系统上遇到的: 1、git没有配置全局变量 在使用git的时候,报’git‘不是内部或外部命令,也不是可运行的程序。然后再在其他文件下面试一下(git --version)…...

Java # Spring(1)

一、概念 1、核心技术:依赖注入(DI),AOP,事件(events),资源,i18n,验证,数据绑定,类型转换,SpEL。 2、测试:模…...

SCL更换阿里数据源

问题: zabbix安装前端环境报错 yum install zabbix-web-mysql-scl zabbix-apache-conf-scl -y 报错:Could not retrieve mirrorlist http://mirrorlist.centos.org/ 能上网 但是不能ping通http://mirrorlist.centos.org/ 解决: 修改repo数…...

【web逆向】全报文加密流量的去加密测试方案

aHR0cHM6Ly90ZGx6LmNjYi5jb20vIy9sb2dpbg 国密混合 WEB JS逆向篇 先看报文:请求和响应都是全加密,这种情况就不像参数加密可以方便全文搜索定位加密代码,但因为前端必须解密响应的密文,因此万能的方法就是搜索拦截器&#xff0c…...

Django实现音乐网站 ⑼

使用Python Django框架制作一个音乐网站, 本篇主要是后台对专辑、首页轮播图原有功能的基础上进行部分功能实现和显示优化。 目录 专辑功能优化 新增编辑 专辑语种改为下拉选项 添加单曲优化显示 新增单曲多选 更新歌手专辑数、专辑单曲数 获取歌手专辑数 保…...

【脚踢数据结构】

(꒪ꇴ꒪ ),Hello我是祐言QAQ我的博客主页:C/C语言,Linux基础,ARM开发板,软件配置等领域博主🌍快上🚘,一起学习,让我们成为一个强大的攻城狮!送给自己和读者的一句鸡汤🤔&…...

uni-app使用vue语法进行开发注意事项

目录 uni-app 项目目录结构 生命周期 路由 路由跳转 页面栈 条件编译 文本渲染 样式渲染 条件渲染 遍历渲染 事件处理 事件修饰符 uni-app 项目目录结构 组件/标签 使用(类似)小程序 语法/结构 使用vue 具体项目目录如下: 生命…...

数据结构---B树

目录标题 B-树的由来B-树的规则和原理B-树的插入分析B-树的插入实现准备工作find函数insert中序遍历 B-树的性能测试B-树的删除B树B树的元素插入B*树的介绍 B-树的由来 在前面的学习过程中,我们见过很多搜索结构比比如说顺序查找,二分查找,搜…...

c++11以后c++标准库定义的固定位宽的整数类型(Fixed width integer types)

Fixed width integer types Fixed width integer types (since C11) - cppreference.com 相关定义文件如下: Windows系统MSVC: Microsoft Visual Studio\2022\Community\VC\Tools\MSVC\14.33.31629\include\cstdint Linux系统GCC: gcc\libstdc-v3\include\c_g…...

Object.values()

Object.values() 是ES2017新增的一个对象方法,它可以将一个对象自身的所有可枚举属性值,组成一个数组返回。 基本语法: Object.values(obj)示例: jsCopy codeconst obj {foo: bar,baz: 42 };Object.values(obj); // [bar, 42]Object.values()的特点: 只返回可枚举的属性值…...

Oracle 开发篇+Java调用OJDBC访问Oracle数据库

标签:JAVA语言、Oracle数据库、Java访问Oracle数据库释义:OJDBC是Oracle公司提供的Java数据库连接驱动程序 ★ 实验环境 ※ Oracle 19c ※ OJDBC8 ※ JDK 8 ★ Java代码案例 package PAC_001; import java.sql.Connection; import java.sql.ResultSet…...

linux 查询后台任务及杀掉进程

查看后台任务命令 jobs -l删除后台进程命令 kill -9 28719...

【Vue3 博物馆管理系统】使用Vue3、Element-plus菜单组件构建前台用户菜单

系列文章目录 第一章 定制上中下(顶部菜单、底部区域、中间主区域显示)三层结构首页 第二章 使用Vue3、Element-plus菜单组件构建菜单 [第三章 使用Vue3、Element-plus菜单组件构建轮播图] [第四章 使用Vue3、Element-plus菜单组件构建组图文章] 文章目…...

Windows 11清除无效、回收站、过期、缓存、补丁更新文件

Windows 11与之前的Windows版本类似,也需要定期清理无效、垃圾、过期、缓存文件来保持系统性能和存储空间的优化。以下是在Windows 11中进行这些清理操作的一些建议方法: 磁盘清理工具 Windows 11内置了磁盘清理工具,可以帮助你删除临时文件…...

栈和队列详解(2)

目录 一、什么是队列? 二、创建一个我们自己的队列 1.前置准备 1.1需要的三个文件 1.2结构体的创建和头文件的引用 2.接口的实现 2.1初始化队列 2.2入队 2.3队列元素个数和判空 2.4取队头元素和队尾元素 2.5出队 2.6摧毁队列 2.7测试接口 三、所有代码 1.…...

EMC传导干扰滤波电路设计

1.EMC概念 2.EMC 传导干扰详解 EMC传导滤波电路的设计--传导干扰详解 3.EMC 传导干扰的测量方法 4.EMC 滤波电路设计 5.浪涌抑制电路设计 6.开关电源的安全要求 7.当前开关电源灯的应用...

【win10专业版远程控制】 自带远程桌面公司内网电脑

使用win10专业版自带远程桌面公司内网电脑 文章目录 使用win10专业版自带远程桌面公司内网电脑 在现代社会中,各类电子硬件已经遍布我们身边,除了应用在个人娱乐场景的消费类电子产品外,各项工作也离不开电脑的帮助,特别是涉及到数…...

React 第五十五节 Router 中 useAsyncError的使用详解

前言 useAsyncError 是 React Router v6.4 引入的一个钩子,用于处理异步操作(如数据加载)中的错误。下面我将详细解释其用途并提供代码示例。 一、useAsyncError 用途 处理异步错误:捕获在 loader 或 action 中发生的异步错误替…...

边缘计算医疗风险自查APP开发方案

核心目标:在便携设备(智能手表/家用检测仪)部署轻量化疾病预测模型,实现低延迟、隐私安全的实时健康风险评估。 一、技术架构设计 #mermaid-svg-iuNaeeLK2YoFKfao {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg…...

2025 后端自学UNIAPP【项目实战:旅游项目】6、我的收藏页面

代码框架视图 1、先添加一个获取收藏景点的列表请求 【在文件my_api.js文件中添加】 // 引入公共的请求封装 import http from ./my_http.js// 登录接口(适配服务端返回 Token) export const login async (code, avatar) > {const res await http…...

【AI学习】三、AI算法中的向量

在人工智能(AI)算法中,向量(Vector)是一种将现实世界中的数据(如图像、文本、音频等)转化为计算机可处理的数值型特征表示的工具。它是连接人类认知(如语义、视觉特征)与…...

爬虫基础学习day2

# 爬虫设计领域 工商:企查查、天眼查短视频:抖音、快手、西瓜 ---> 飞瓜电商:京东、淘宝、聚美优品、亚马逊 ---> 分析店铺经营决策标题、排名航空:抓取所有航空公司价格 ---> 去哪儿自媒体:采集自媒体数据进…...

在WSL2的Ubuntu镜像中安装Docker

Docker官网链接: https://docs.docker.com/engine/install/ubuntu/ 1、运行以下命令卸载所有冲突的软件包: for pkg in docker.io docker-doc docker-compose docker-compose-v2 podman-docker containerd runc; do sudo apt-get remove $pkg; done2、设置Docker…...

html-<abbr> 缩写或首字母缩略词

定义与作用 <abbr> 标签用于表示缩写或首字母缩略词&#xff0c;它可以帮助用户更好地理解缩写的含义&#xff0c;尤其是对于那些不熟悉该缩写的用户。 title 属性的内容提供了缩写的详细说明。当用户将鼠标悬停在缩写上时&#xff0c;会显示一个提示框。 示例&#x…...

C++使用 new 来创建动态数组

问题&#xff1a; 不能使用变量定义数组大小 原因&#xff1a; 这是因为数组在内存中是连续存储的&#xff0c;编译器需要在编译阶段就确定数组的大小&#xff0c;以便正确地分配内存空间。如果允许使用变量来定义数组的大小&#xff0c;那么编译器就无法在编译时确定数组的大…...

在QWebEngineView上实现鼠标、触摸等事件捕获的解决方案

这个问题我看其他博主也写了&#xff0c;要么要会员、要么写的乱七八糟。这里我整理一下&#xff0c;把问题说清楚并且给出代码&#xff0c;拿去用就行&#xff0c;照着葫芦画瓢。 问题 在继承QWebEngineView后&#xff0c;重写mousePressEvent或event函数无法捕获鼠标按下事…...

[免费]微信小程序问卷调查系统(SpringBoot后端+Vue管理端)【论文+源码+SQL脚本】

大家好&#xff0c;我是java1234_小锋老师&#xff0c;看到一个不错的微信小程序问卷调查系统(SpringBoot后端Vue管理端)【论文源码SQL脚本】&#xff0c;分享下哈。 项目视频演示 【免费】微信小程序问卷调查系统(SpringBoot后端Vue管理端) Java毕业设计_哔哩哔哩_bilibili 项…...