RocketMQ 延迟队列
什么是延迟队列
指消息发送到某个队列后,在指定多长时间之后才能被消费。
应用场景
RocketMQ 延迟队列
定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。
broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m
8m 9m 10m 20m 30m 1h 2h”,18个level。
可以配置自定义messageDelayLevel。需要注意的是 messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:
msg.setDelayLevel(level)level有以下三种情况:
level == 0,消息为非延迟消息1<=level<=maxLevel,消息延迟特定时间,例如level1,延迟1s
level > maxLevel,则level maxLevel,例如level==20,延迟2h
在 RocketMQ中定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。
RocketMQ 延迟队列和RabbitMQ延迟队列相比
RocketMQ直接一步到位,功能类似于RabbitMQ的延迟交换器插件、而RabbitMQ提供了延迟队列,但如果是基于消息设置每个消息设置不同的延迟时间,会产生前面的消息早已过期,但后面的消息还存在消息队列中,故此RabbitMQ提供了延迟交换器插件,而RocketMQ 延迟队列设计就比较好了。
具体示例
生产者
// 实例化生产者,并指定生产组名称DefaultMQProducer producer =newDefaultMQProducer("myproducer_group_topic_name_delay_01");//设置实例名称,一个jvm中有多个生产者可以根据实例名区分//默认defaultproducer.setInstanceName("topic_delay");// 指定nameserver的地址producer.setNamesrvAddr("localhost:9876");//设置同步重试次数producer.setRetryTimesWhenSendFailed(2);//设置异步发送次数//producer.setRetryTimesWhenSendAsyncFailed(2);// 初始化生产者producer.start();for(int i =0; i <20; i++){Message message =newMessage("topic_name_delay",("key="+ i).getBytes("utf-8"));//设置延迟消费时间 设置延迟时间级别0,18,0表示不延迟,18表示延迟2h,大于18的都是2hmessage.setDelayTimeLevel(i);// 1 同步发送 如果发送失败会根据重试次数重试SendResult send = producer.send(message);SendStatus sendStatus = send.getSendStatus();System.out.println(sendStatus.toString());}消费者
/*** 推消息消费*/DefaultMQPushConsumer defaultMQPushConsumer =newDefaultMQPushConsumer("consumer_group_delay_01");// 指定nameserver的地址defaultMQPushConsumer.setNamesrvAddr("localhost:9876");defaultMQPushConsumer.subscribe("topic_name_delay","*");// 1 提高消费并行度defaultMQPushConsumer.setConsumeThreadMax(10);defaultMQPushConsumer.setConsumeThreadMin(1);// 2 以批量方式进行 消费// 设置消息批处理的一个批次中消息的最大个数defaultMQPushConsumer.setConsumeMessageBatchMaxSize(10);//设置重试次数 默认16次defaultMQPushConsumer.setMaxReconsumeTimes(1);// 添加消息监听器,一旦有消息推送过来,就进行消费defaultMQPushConsumer.setMessageListener(newMessageListenerConcurrently(){@OverridepublicConsumeConcurrentlyStatusconsumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context){//final MessageQueue messageQueue = context.getMessageQueue();for(MessageExt msg : msgs){System.out.println(msg);try{System.out.println(newString(msg.getBody(),"utf-8"));}catch(UnsupportedEncodingException e){e.printStackTrace();}}// 消息消费成功returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 消息消费失败// return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});消费消息,按照0到18级别来,0 表示不延迟,1表示延迟1s,大于等于18表示延迟2h
按照级别一次类推
默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m8m 9m 10m 20m 30m 1h 2h”,18个level。
这里只拷贝0-8 的打印日志,可以自己等待确认。
MessageExt [queueId=0, storeSize=195, queueOffset=19, sysFlag=0, bornTimestamp=1628949643548, bornHost=/192.168.0.103:55518, storeTimestamp=1628949643554, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F0000000000029A8A, commitLogOffset=170634, bodyCRC=858365373, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=20, CONSUME_START_TIME=1628949650433, UNIQ_KEY=C0A8006748D07C53A9EB47ABD51C0000, CLUSTER=DefaultCluster, WAIT=true, DELAY=0}, body=[107, 101, 121, 61, 48], transactionId=‘null’}]
key=0
MessageExt [queueId=1, storeSize=234, queueOffset=22, sysFlag=0, bornTimestamp=1628949643558, bornHost=/192.168.0.103:55518, storeTimestamp=1628949644566, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002ACF8, commitLogOffset=175352, bodyCRC=1143909675, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=23, CONSUME_START_TIME=1628949650435, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5260001, CLUSTER=DefaultCluster, WAIT=true, DELAY=1, REAL_QID=1}, body=[107, 101, 121, 61, 49], transactionId=‘null’}]
key=1
MessageExt [queueId=2, storeSize=234, queueOffset=18, sysFlag=0, bornTimestamp=1628949643561, bornHost=/192.168.0.103:55518, storeTimestamp=1628949648566, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002ADE2, commitLogOffset=175586, bodyCRC=1562901649, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=19, CONSUME_START_TIME=1628949650436, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5290002, CLUSTER=DefaultCluster, WAIT=true, DELAY=2, REAL_QID=2}, body=[107, 101, 121, 61, 50], transactionId=‘null’}]
key=2
MessageExt [queueId=3, storeSize=234, queueOffset=17, sysFlag=0, bornTimestamp=1628949643566, bornHost=/192.168.0.103:55518, storeTimestamp=1628949653569, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002AECC, commitLogOffset=175820, bodyCRC=706792455, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=18, CONSUME_START_TIME=1628949653572, UNIQ_KEY=C0A8006748D07C53A9EB47ABD52E0003, CLUSTER=DefaultCluster, WAIT=true, DELAY=3, REAL_QID=3}, body=[107, 101, 121, 61, 51], transactionId=‘null’}]
key=3
MessageExt [queueId=0, storeSize=234, queueOffset=20, sysFlag=0, bornTimestamp=1628949643568, bornHost=/192.168.0.103:55518, storeTimestamp=1628949673574, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002B0A0, commitLogOffset=176288, bodyCRC=876894628, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=21, CONSUME_START_TIME=1628949673577, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5300004, CLUSTER=DefaultCluster, WAIT=true, DELAY=4, REAL_QID=0}, body=[107, 101, 121, 61, 52], transactionId=‘null’}]
key=4
MessageExt [queueId=1, storeSize=234, queueOffset=23, sysFlag=0, bornTimestamp=1628949643570, bornHost=/192.168.0.103:55518, storeTimestamp=1628949703574, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002B18A, commitLogOffset=176522, bodyCRC=1128491314, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=24, CONSUME_START_TIME=1628949703577, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5320005, CLUSTER=DefaultCluster, WAIT=true, DELAY=5, REAL_QID=1}, body=[107, 101, 121, 61, 53], transactionId=‘null’}]
key=5
MessageExt [queueId=2, storeSize=234, queueOffset=20, sysFlag=0, bornTimestamp=1628949643572, bornHost=/192.168.0.103:55518, storeTimestamp=1628949763575, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002B35E, commitLogOffset=176990, bodyCRC=1514813576, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=21, CONSUME_START_TIME=1628949763580, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5340006, CLUSTER=DefaultCluster, WAIT=true, DELAY=6, REAL_QID=2}, body=[107, 101, 121, 61, 54], transactionId=‘null’}]
key=6
MessageExt [queueId=3, storeSize=234, queueOffset=19, sysFlag=0, bornTimestamp=1628949643574, bornHost=/192.168.0.103:55518, storeTimestamp=1628949823580, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002B532, commitLogOffset=177458, bodyCRC=760023070, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=20, CONSUME_START_TIME=1628949823583, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5360007, CLUSTER=DefaultCluster, WAIT=true, DELAY=7, REAL_QID=3}, body=[107, 101, 121, 61, 55], transactionId=‘null’}]
key=7
MessageExt [queueId=0, storeSize=234, queueOffset=22, sysFlag=0, bornTimestamp=1628949643576, bornHost=/192.168.0.103:55518, storeTimestamp=1628949883582, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002B706, commitLogOffset=177926, bodyCRC=1039275407, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=23, CONSUME_START_TIME=1628949883586, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5380008, CLUSTER=DefaultCluster, WAIT=true, DELAY=8, REAL_QID=0}, body=[107, 101, 121, 61, 56], transactionId=‘null’}]
key=8
同样我们在控制台可以看到,存放的消息

相关文章:
RocketMQ 延迟队列
什么是延迟队列指消息发送到某个队列后,在指定多长时间之后才能被消费。应用场景RocketMQ 延迟队列定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。broker有配置项messageD…...
【精准计时】北斗GPS卫星时钟同步改变精准计时年代
【精准计时】北斗GPS卫星时钟同步改变精准计时年代 【精准计时】北斗GPS卫星时钟同步改变精准计时年代 北斗GPS成精确计时先锋 北斗GPS精确时间自动校准技术,是一种简便的获取北斗GPS精确时间信息的专利技术,具有灵敏度高、不受时间及地域限制等特点…...
【C#基础】C# 面向对象编程
序号系列文章5【C#基础】C# 运算符总结6【C#基础】C# 常用语句讲解7【C#基础】C# 常用数据结构文章目录前言面向对象的 C#1,类的概念2,类的定义3,类成员4,对象5,继承6,多态性结语前言 😊大家好&…...
数据结构与算法入门
目录数据结构概述逻辑结构存储结构算法概述如何理解“大O记法”时间复杂度空间复杂度数据结构概述 数据结构可以简单的理解为数据与数据之间所存在的一些关系,数据的结构分为数据的存储结构和数据的逻辑结构。 逻辑结构 集合结构:数据元素同属于一个集…...
【OpenAI】基于 Gym-CarRacing 的自动驾驶练习项目 | 路径训练功能的实现 | GYM-Box2D CarRacing
限时开放,猛戳订阅! 👉 《一起玩蛇》🐍 💭 写在前面: 本篇是关于多伦多大学自动驾驶专业项目的博客。GYM-Box2D CarRacing 是一种在 OpenAI Gym 平台上开发和比较强化学习算法的模拟环境。它是流行的 Box2…...
亚马逊、沃尔玛测评自养号测评、退款、撸卡撸货怎么做?
大家好,有很多的测评工作室做亚马逊测评、沃尔玛测评自养号大额退款,撸卡撸货的找到我,问我有什么方式可以解决成本,效率,纯净度,便捷性等问题,测评养号系统从最早的模拟器,虚拟机到…...
Apollo 2.1.0最新版docker 部署多环境 与java spring boot 接入demo (附带一键部署脚本)
最新Apollo 版本发布2.1.0 https://www.apolloconfig.com/#/zh/design/apollo-design 环境说明 ecs 主机一台数据库mysql 8.0docker 环境 apollo 是内网可信应用,最好是部署在内网里面,外网不可使用,避免配置信息泄漏,这里为了方…...
分布式算法 - 一致性Hash算法
一致性Hash算法是个经典算法,Hash环的引入是为解决单调性(Monotonicity) 的问题;虚拟节点的引入是为了解决 平衡性(Balance) 问题。一致性Hash算法引入在分布式集群中,对机器的添加删除,或者机器故障后自动脱离集群这些操作是分布…...
OAuth2.0入门
什么是OAuth2.0 OAuth(Open Authorization)是一个关于授权(authorization)的开放网络标准,允许用户授权第三方应用访问他们存储在另外的服务提供者上的信息,而不需要将用户名和密码提供给第三方移动应用或…...
【HTTP——了解HTTP协议及状态码】
一, 什么是通信通信,就是信息的传递和交换。通信三要素:通信的主体,通信的内容,通信的方式现实生活中的通信:我打电话叫小明来我家吃饭【其中通信的主体是,我,小明。通信内容是&…...
骨传导耳机靠谱吗,骨传导耳机的原理是什么
很多人刚开始接触骨传导耳机时都会具有一个疑问,骨传导耳机是不是真的靠谱,是不是真的不伤害听力?骨传导耳机传输声音的原理是什么? 下面就给大家讲解一下骨传导耳机传输声音的原理以及骨传导耳机对听力到底有没有伤害。 骨传导…...
对个人博客系统进行web自动化测试(包含测试代码和测试的详细过程)
目录 一、总述 二、登录页面测试 一些准备工作 验证页面显示是否正确 验证正常登录的情况 该过程中出现的问题 验证登录失败的情况 关于登录界面的总代码 测试视频 三、注册界面的自动化测试 测试代码 过程中出现的bug 测试视频 四、博客列表页测试(…...
[ 2204听力 ] 五
[ 第五次课 对话1 ] Narrator Listen to a conversation between a student and her Ecology professor (woman) Hi, professor, did you want to talk about my paper? I didn’t get a grade. (man) Ah, yes, I think you might have done the wrong assignment. assign…...
嵌入式常问问题和知识
12、并发和并行的区别? 最本质的区别就是:并发是轮流处理多个任务,并行是同时处理多个任务。 你吃饭吃到一半,电话来了,你一直到吃完了以后才去接,这就说明你不支持并发也不支持并行。 你吃饭吃到一半&…...
【数据结构】空间复杂度
🚀write in front🚀 📜所属专栏:初阶数据结构 🛰️博客主页:睿睿的博客主页 🛰️代码仓库:🎉VS2022_C语言仓库 🎡您的点赞、关注、收藏、评论,是对…...
湖南中创教育提醒校外培训留意这几点,避免维权
校外教育培训机构是市场经济发展的必然产物,有需求就有市场,这个无可厚非。而校外教育培训机构的火热,正是反映出人民群众对教育发展的需求在不断增强。 培训机构分类中有面对大学生参加公务员招考、教师考编等考证考试的培训机构࿱…...
docker 配置私有/本地镜像仓库
docker 配置私有/本地镜像仓库docker pull registry mkdir -p /usr/local/docker/registry-data docker tag registry 192.168.28.132:5000/registry docker run -di -p 5000:5000 --namelocal_registry --restartalways --privilegedtrue --log-drivernone -v /usr/local/d…...
每日学术速递2.23
Subjects: Robotics 1.On discrete symmetries of robotics systems: A group-theoretic and data-driven analysis 标题:关于机器人系统的离散对称性:群论和数据驱动分析 作者:Daniel Ordonez-Apraez, Mario Martin, Antonio Agudo, F…...
LeetCode 232. 用栈实现队列
LeetCode 232. 用栈实现队列 难度:easy\color{Green}{easy}easy 题目描述 请你仅使用两个栈实现先入先出队列。队列应当支持一般队列支持的所有操作(pushpushpush、poppoppop、peekpeekpeek、emptyemptyempty): 实现 MyQueueM…...
AI算法创新赛-人车目标检测竞赛总结04
队伍:AI000038 小组成员:杨志强,林松 1. 算法介绍 1.1 相关工作 当前流行的目标检测算法主要分为三种,一阶段算法:SSD,FCOS,Scaled,YOLO系列等;二阶段算法:…...
web vue 项目 Docker化部署
Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段: 构建阶段(Build Stage):…...
1.3 VSCode安装与环境配置
进入网址Visual Studio Code - Code Editing. Redefined下载.deb文件,然后打开终端,进入下载文件夹,键入命令 sudo dpkg -i code_1.100.3-1748872405_amd64.deb 在终端键入命令code即启动vscode 需要安装插件列表 1.Chinese简化 2.ros …...
基于Docker Compose部署Java微服务项目
一. 创建根项目 根项目(父项目)主要用于依赖管理 一些需要注意的点: 打包方式需要为 pom<modules>里需要注册子模块不要引入maven的打包插件,否则打包时会出问题 <?xml version"1.0" encoding"UTF-8…...
C# 类和继承(抽象类)
抽象类 抽象类是指设计为被继承的类。抽象类只能被用作其他类的基类。 不能创建抽象类的实例。抽象类使用abstract修饰符声明。 抽象类可以包含抽象成员或普通的非抽象成员。抽象类的成员可以是抽象成员和普通带 实现的成员的任意组合。抽象类自己可以派生自另一个抽象类。例…...
【HarmonyOS 5 开发速记】如何获取用户信息(头像/昵称/手机号)
1.获取 authorizationCode: 2.利用 authorizationCode 获取 accessToken:文档中心 3.获取手机:文档中心 4.获取昵称头像:文档中心 首先创建 request 若要获取手机号,scope必填 phone,permissions 必填 …...
华硕a豆14 Air香氛版,美学与科技的馨香融合
在快节奏的现代生活中,我们渴望一个能激发创想、愉悦感官的工作与生活伙伴,它不仅是冰冷的科技工具,更能触动我们内心深处的细腻情感。正是在这样的期许下,华硕a豆14 Air香氛版翩然而至,它以一种前所未有的方式&#x…...
C++使用 new 来创建动态数组
问题: 不能使用变量定义数组大小 原因: 这是因为数组在内存中是连续存储的,编译器需要在编译阶段就确定数组的大小,以便正确地分配内存空间。如果允许使用变量来定义数组的大小,那么编译器就无法在编译时确定数组的大…...
【Android】Android 开发 ADB 常用指令
查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...
MacOS下Homebrew国内镜像加速指南(2025最新国内镜像加速)
macos brew国内镜像加速方法 brew install 加速formula.jws.json下载慢加速 🍺 最新版brew安装慢到怀疑人生?别怕,教你轻松起飞! 最近Homebrew更新至最新版,每次执行 brew 命令时都会自动从官方地址 https://formulae.…...
【FTP】ftp文件传输会丢包吗?批量几百个文件传输,有一些文件没有传输完整,如何解决?
FTP(File Transfer Protocol)本身是一个基于 TCP 的协议,理论上不会丢包。但 FTP 文件传输过程中仍可能出现文件不完整、丢失或损坏的情况,主要原因包括: ✅ 一、FTP传输可能“丢包”或文件不完整的原因 原因描述网络…...
