RocketMQ 延迟队列
什么是延迟队列
指消息发送到某个队列后,在指定多长时间之后才能被消费。
应用场景
RocketMQ 延迟队列
定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。
broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m
8m 9m 10m 20m 30m 1h 2h”,18个level。
可以配置自定义messageDelayLevel。需要注意的是 messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:
msg.setDelayLevel(level)level有以下三种情况:
level == 0,消息为非延迟消息1<=level<=maxLevel,消息延迟特定时间,例如level1,延迟1s
level > maxLevel,则level maxLevel,例如level==20,延迟2h
在 RocketMQ中定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。
RocketMQ 延迟队列和RabbitMQ延迟队列相比
RocketMQ直接一步到位,功能类似于RabbitMQ的延迟交换器插件、而RabbitMQ提供了延迟队列,但如果是基于消息设置每个消息设置不同的延迟时间,会产生前面的消息早已过期,但后面的消息还存在消息队列中,故此RabbitMQ提供了延迟交换器插件,而RocketMQ 延迟队列设计就比较好了。
具体示例
生产者
// 实例化生产者,并指定生产组名称DefaultMQProducer producer =newDefaultMQProducer("myproducer_group_topic_name_delay_01");//设置实例名称,一个jvm中有多个生产者可以根据实例名区分//默认defaultproducer.setInstanceName("topic_delay");// 指定nameserver的地址producer.setNamesrvAddr("localhost:9876");//设置同步重试次数producer.setRetryTimesWhenSendFailed(2);//设置异步发送次数//producer.setRetryTimesWhenSendAsyncFailed(2);// 初始化生产者producer.start();for(int i =0; i <20; i++){Message message =newMessage("topic_name_delay",("key="+ i).getBytes("utf-8"));//设置延迟消费时间 设置延迟时间级别0,18,0表示不延迟,18表示延迟2h,大于18的都是2hmessage.setDelayTimeLevel(i);// 1 同步发送 如果发送失败会根据重试次数重试SendResult send = producer.send(message);SendStatus sendStatus = send.getSendStatus();System.out.println(sendStatus.toString());}消费者
/*** 推消息消费*/DefaultMQPushConsumer defaultMQPushConsumer =newDefaultMQPushConsumer("consumer_group_delay_01");// 指定nameserver的地址defaultMQPushConsumer.setNamesrvAddr("localhost:9876");defaultMQPushConsumer.subscribe("topic_name_delay","*");// 1 提高消费并行度defaultMQPushConsumer.setConsumeThreadMax(10);defaultMQPushConsumer.setConsumeThreadMin(1);// 2 以批量方式进行 消费// 设置消息批处理的一个批次中消息的最大个数defaultMQPushConsumer.setConsumeMessageBatchMaxSize(10);//设置重试次数 默认16次defaultMQPushConsumer.setMaxReconsumeTimes(1);// 添加消息监听器,一旦有消息推送过来,就进行消费defaultMQPushConsumer.setMessageListener(newMessageListenerConcurrently(){@OverridepublicConsumeConcurrentlyStatusconsumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context){//final MessageQueue messageQueue = context.getMessageQueue();for(MessageExt msg : msgs){System.out.println(msg);try{System.out.println(newString(msg.getBody(),"utf-8"));}catch(UnsupportedEncodingException e){e.printStackTrace();}}// 消息消费成功returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 消息消费失败// return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});消费消息,按照0到18级别来,0 表示不延迟,1表示延迟1s,大于等于18表示延迟2h
按照级别一次类推
默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m8m 9m 10m 20m 30m 1h 2h”,18个level。
这里只拷贝0-8 的打印日志,可以自己等待确认。
MessageExt [queueId=0, storeSize=195, queueOffset=19, sysFlag=0, bornTimestamp=1628949643548, bornHost=/192.168.0.103:55518, storeTimestamp=1628949643554, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F0000000000029A8A, commitLogOffset=170634, bodyCRC=858365373, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=20, CONSUME_START_TIME=1628949650433, UNIQ_KEY=C0A8006748D07C53A9EB47ABD51C0000, CLUSTER=DefaultCluster, WAIT=true, DELAY=0}, body=[107, 101, 121, 61, 48], transactionId=‘null’}]
key=0
MessageExt [queueId=1, storeSize=234, queueOffset=22, sysFlag=0, bornTimestamp=1628949643558, bornHost=/192.168.0.103:55518, storeTimestamp=1628949644566, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002ACF8, commitLogOffset=175352, bodyCRC=1143909675, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=23, CONSUME_START_TIME=1628949650435, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5260001, CLUSTER=DefaultCluster, WAIT=true, DELAY=1, REAL_QID=1}, body=[107, 101, 121, 61, 49], transactionId=‘null’}]
key=1
MessageExt [queueId=2, storeSize=234, queueOffset=18, sysFlag=0, bornTimestamp=1628949643561, bornHost=/192.168.0.103:55518, storeTimestamp=1628949648566, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002ADE2, commitLogOffset=175586, bodyCRC=1562901649, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=19, CONSUME_START_TIME=1628949650436, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5290002, CLUSTER=DefaultCluster, WAIT=true, DELAY=2, REAL_QID=2}, body=[107, 101, 121, 61, 50], transactionId=‘null’}]
key=2
MessageExt [queueId=3, storeSize=234, queueOffset=17, sysFlag=0, bornTimestamp=1628949643566, bornHost=/192.168.0.103:55518, storeTimestamp=1628949653569, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002AECC, commitLogOffset=175820, bodyCRC=706792455, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=18, CONSUME_START_TIME=1628949653572, UNIQ_KEY=C0A8006748D07C53A9EB47ABD52E0003, CLUSTER=DefaultCluster, WAIT=true, DELAY=3, REAL_QID=3}, body=[107, 101, 121, 61, 51], transactionId=‘null’}]
key=3
MessageExt [queueId=0, storeSize=234, queueOffset=20, sysFlag=0, bornTimestamp=1628949643568, bornHost=/192.168.0.103:55518, storeTimestamp=1628949673574, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002B0A0, commitLogOffset=176288, bodyCRC=876894628, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=21, CONSUME_START_TIME=1628949673577, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5300004, CLUSTER=DefaultCluster, WAIT=true, DELAY=4, REAL_QID=0}, body=[107, 101, 121, 61, 52], transactionId=‘null’}]
key=4
MessageExt [queueId=1, storeSize=234, queueOffset=23, sysFlag=0, bornTimestamp=1628949643570, bornHost=/192.168.0.103:55518, storeTimestamp=1628949703574, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002B18A, commitLogOffset=176522, bodyCRC=1128491314, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=24, CONSUME_START_TIME=1628949703577, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5320005, CLUSTER=DefaultCluster, WAIT=true, DELAY=5, REAL_QID=1}, body=[107, 101, 121, 61, 53], transactionId=‘null’}]
key=5
MessageExt [queueId=2, storeSize=234, queueOffset=20, sysFlag=0, bornTimestamp=1628949643572, bornHost=/192.168.0.103:55518, storeTimestamp=1628949763575, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002B35E, commitLogOffset=176990, bodyCRC=1514813576, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=21, CONSUME_START_TIME=1628949763580, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5340006, CLUSTER=DefaultCluster, WAIT=true, DELAY=6, REAL_QID=2}, body=[107, 101, 121, 61, 54], transactionId=‘null’}]
key=6
MessageExt [queueId=3, storeSize=234, queueOffset=19, sysFlag=0, bornTimestamp=1628949643574, bornHost=/192.168.0.103:55518, storeTimestamp=1628949823580, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002B532, commitLogOffset=177458, bodyCRC=760023070, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=20, CONSUME_START_TIME=1628949823583, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5360007, CLUSTER=DefaultCluster, WAIT=true, DELAY=7, REAL_QID=3}, body=[107, 101, 121, 61, 55], transactionId=‘null’}]
key=7
MessageExt [queueId=0, storeSize=234, queueOffset=22, sysFlag=0, bornTimestamp=1628949643576, bornHost=/192.168.0.103:55518, storeTimestamp=1628949883582, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002B706, commitLogOffset=177926, bodyCRC=1039275407, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=23, CONSUME_START_TIME=1628949883586, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5380008, CLUSTER=DefaultCluster, WAIT=true, DELAY=8, REAL_QID=0}, body=[107, 101, 121, 61, 56], transactionId=‘null’}]
key=8
同样我们在控制台可以看到,存放的消息

相关文章:
RocketMQ 延迟队列
什么是延迟队列指消息发送到某个队列后,在指定多长时间之后才能被消费。应用场景RocketMQ 延迟队列定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。broker有配置项messageD…...
【精准计时】北斗GPS卫星时钟同步改变精准计时年代
【精准计时】北斗GPS卫星时钟同步改变精准计时年代 【精准计时】北斗GPS卫星时钟同步改变精准计时年代 北斗GPS成精确计时先锋 北斗GPS精确时间自动校准技术,是一种简便的获取北斗GPS精确时间信息的专利技术,具有灵敏度高、不受时间及地域限制等特点…...
【C#基础】C# 面向对象编程
序号系列文章5【C#基础】C# 运算符总结6【C#基础】C# 常用语句讲解7【C#基础】C# 常用数据结构文章目录前言面向对象的 C#1,类的概念2,类的定义3,类成员4,对象5,继承6,多态性结语前言 😊大家好&…...
数据结构与算法入门
目录数据结构概述逻辑结构存储结构算法概述如何理解“大O记法”时间复杂度空间复杂度数据结构概述 数据结构可以简单的理解为数据与数据之间所存在的一些关系,数据的结构分为数据的存储结构和数据的逻辑结构。 逻辑结构 集合结构:数据元素同属于一个集…...
【OpenAI】基于 Gym-CarRacing 的自动驾驶练习项目 | 路径训练功能的实现 | GYM-Box2D CarRacing
限时开放,猛戳订阅! 👉 《一起玩蛇》🐍 💭 写在前面: 本篇是关于多伦多大学自动驾驶专业项目的博客。GYM-Box2D CarRacing 是一种在 OpenAI Gym 平台上开发和比较强化学习算法的模拟环境。它是流行的 Box2…...
亚马逊、沃尔玛测评自养号测评、退款、撸卡撸货怎么做?
大家好,有很多的测评工作室做亚马逊测评、沃尔玛测评自养号大额退款,撸卡撸货的找到我,问我有什么方式可以解决成本,效率,纯净度,便捷性等问题,测评养号系统从最早的模拟器,虚拟机到…...
Apollo 2.1.0最新版docker 部署多环境 与java spring boot 接入demo (附带一键部署脚本)
最新Apollo 版本发布2.1.0 https://www.apolloconfig.com/#/zh/design/apollo-design 环境说明 ecs 主机一台数据库mysql 8.0docker 环境 apollo 是内网可信应用,最好是部署在内网里面,外网不可使用,避免配置信息泄漏,这里为了方…...
分布式算法 - 一致性Hash算法
一致性Hash算法是个经典算法,Hash环的引入是为解决单调性(Monotonicity) 的问题;虚拟节点的引入是为了解决 平衡性(Balance) 问题。一致性Hash算法引入在分布式集群中,对机器的添加删除,或者机器故障后自动脱离集群这些操作是分布…...
OAuth2.0入门
什么是OAuth2.0 OAuth(Open Authorization)是一个关于授权(authorization)的开放网络标准,允许用户授权第三方应用访问他们存储在另外的服务提供者上的信息,而不需要将用户名和密码提供给第三方移动应用或…...
【HTTP——了解HTTP协议及状态码】
一, 什么是通信通信,就是信息的传递和交换。通信三要素:通信的主体,通信的内容,通信的方式现实生活中的通信:我打电话叫小明来我家吃饭【其中通信的主体是,我,小明。通信内容是&…...
骨传导耳机靠谱吗,骨传导耳机的原理是什么
很多人刚开始接触骨传导耳机时都会具有一个疑问,骨传导耳机是不是真的靠谱,是不是真的不伤害听力?骨传导耳机传输声音的原理是什么? 下面就给大家讲解一下骨传导耳机传输声音的原理以及骨传导耳机对听力到底有没有伤害。 骨传导…...
对个人博客系统进行web自动化测试(包含测试代码和测试的详细过程)
目录 一、总述 二、登录页面测试 一些准备工作 验证页面显示是否正确 验证正常登录的情况 该过程中出现的问题 验证登录失败的情况 关于登录界面的总代码 测试视频 三、注册界面的自动化测试 测试代码 过程中出现的bug 测试视频 四、博客列表页测试(…...
[ 2204听力 ] 五
[ 第五次课 对话1 ] Narrator Listen to a conversation between a student and her Ecology professor (woman) Hi, professor, did you want to talk about my paper? I didn’t get a grade. (man) Ah, yes, I think you might have done the wrong assignment. assign…...
嵌入式常问问题和知识
12、并发和并行的区别? 最本质的区别就是:并发是轮流处理多个任务,并行是同时处理多个任务。 你吃饭吃到一半,电话来了,你一直到吃完了以后才去接,这就说明你不支持并发也不支持并行。 你吃饭吃到一半&…...
【数据结构】空间复杂度
🚀write in front🚀 📜所属专栏:初阶数据结构 🛰️博客主页:睿睿的博客主页 🛰️代码仓库:🎉VS2022_C语言仓库 🎡您的点赞、关注、收藏、评论,是对…...
湖南中创教育提醒校外培训留意这几点,避免维权
校外教育培训机构是市场经济发展的必然产物,有需求就有市场,这个无可厚非。而校外教育培训机构的火热,正是反映出人民群众对教育发展的需求在不断增强。 培训机构分类中有面对大学生参加公务员招考、教师考编等考证考试的培训机构࿱…...
docker 配置私有/本地镜像仓库
docker 配置私有/本地镜像仓库docker pull registry mkdir -p /usr/local/docker/registry-data docker tag registry 192.168.28.132:5000/registry docker run -di -p 5000:5000 --namelocal_registry --restartalways --privilegedtrue --log-drivernone -v /usr/local/d…...
每日学术速递2.23
Subjects: Robotics 1.On discrete symmetries of robotics systems: A group-theoretic and data-driven analysis 标题:关于机器人系统的离散对称性:群论和数据驱动分析 作者:Daniel Ordonez-Apraez, Mario Martin, Antonio Agudo, F…...
LeetCode 232. 用栈实现队列
LeetCode 232. 用栈实现队列 难度:easy\color{Green}{easy}easy 题目描述 请你仅使用两个栈实现先入先出队列。队列应当支持一般队列支持的所有操作(pushpushpush、poppoppop、peekpeekpeek、emptyemptyempty): 实现 MyQueueM…...
AI算法创新赛-人车目标检测竞赛总结04
队伍:AI000038 小组成员:杨志强,林松 1. 算法介绍 1.1 相关工作 当前流行的目标检测算法主要分为三种,一阶段算法:SSD,FCOS,Scaled,YOLO系列等;二阶段算法:…...
能源研究院转让选哪家
能源研究院转让选择建议选择适合的能源研究院转让机构需综合考虑资质、经验、服务范围及行业口碑。以下为关键筛选方向:专业资质与行业经验 优先选择具备国家级资质认证(如科技转移服务机构备案)的机构,尤其在新能源、储能技术等领…...
OpenClaw极简安装:Qwen3.5-9B云端体验与快速验证方案
OpenClaw极简安装:Qwen3.5-9B云端体验与快速验证方案 1. 为什么选择云端体验OpenClaw? 上周我在本地尝试部署OpenClaw时,被各种环境依赖折腾得够呛——Node版本冲突、Python包缺失、端口占用问题接踵而至。正当准备放弃时,偶然发…...
Python 闭包与装饰器
在 Python 学习中,闭包和装饰器是两个既关联又容易混淆的知识点,尤其是结合嵌套函数使用时,常常分不清执行逻辑。但其实只要抓住核心原理,再结合简单案例拆解,就能轻松掌握。 一、前置回顾:函数与局部变量的…...
DTM智慧监控:构建企业级分布式事务一致性保障的终极指南
DTM智慧监控:构建企业级分布式事务一致性保障的终极指南 【免费下载链接】dtm A distributed transaction framework, supports workflow, saga, tcc, xa, 2-phase message, outbox patterns, supports many languages. 项目地址: https://gitcode.com/gh_mirrors…...
新疆某工程围岩等级,包含以下7列,均为数值型数据
一、文件结构 文件包含1个工作表:Sheet1仅 Sheet1 包含数据,其余为空表。二、数据列说明 Sheet1 包含以下7列,均为数值型数据:列名(英文)列名(中文推测)数据类型说明VPR未知参数1数值…...
OpenClaw截图分析进阶:千问3.5-9B识别UI元素与操作建议
OpenClaw截图分析进阶:千问3.5-9B识别UI元素与操作建议 1. 为什么需要截图分析能力? 上周我在测试一个内部工具时遇到了一个典型问题——某个按钮在特定分辨率下会消失不见。手动排查需要反复调整窗口尺寸并肉眼检查,效率极低。这时我想到了…...
PyTorch 2.8镜像惊艳案例:碳排放数据→双碳目标达成路径视频推演
PyTorch 2.8镜像惊艳案例:碳排放数据→双碳目标达成路径视频推演 1. 效果惊艳开场 想象一下,只需输入简单的碳排放数据,就能自动生成一段专业级的双碳目标达成路径推演视频。这不是科幻场景,而是我们基于PyTorch 2.8镜像实现的真…...
Python无锁并发避坑手册(20年C Python核心贡献者亲授:从字节码级锁定到原子内存序的17个致命盲区)
第一章:Python无锁并发的本质与GIL真相Python常被误认为“天生支持多线程并发”,但其核心限制源于全局解释器锁(Global Interpreter Lock, GIL)。GIL并非语言规范,而是CPython解释器为内存管理安全而引入的互斥机制——…...
论文写作“智多星”:书匠策AI,开启期刊论文新纪元
在学术的广袤天地里,论文写作宛如一场充满挑战的冒险之旅。尤其是期刊论文,它不仅是学者研究成果的集中展现,更是学术交流与进步的重要桥梁。但面对选题迷茫、资料繁杂、结构搭建困难等诸多难题,许多学者常常感到力不从心。别担心…...
MentorBit红外驱动库:裸机与RTOS下的精准时序控制
1. MentorBit-DetectorIR 库概述MentorBit-DetectorIR 是一款专为 MentorBit 红外发射/接收模块设计的嵌入式底层驱动库,其核心定位并非通用红外协议栈(如 NEC、RC5 解码),而是面向硬件验证、模块级功能测试与快速原型开发的轻量级…...
