RocketMQ 延迟队列
什么是延迟队列
指消息发送到某个队列后,在指定多长时间之后才能被消费。
应用场景
RocketMQ 延迟队列
定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。
broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m
8m 9m 10m 20m 30m 1h 2h”,18个level。
可以配置自定义messageDelayLevel。需要注意的是 messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:
msg.setDelayLevel(level)level有以下三种情况:
level == 0,消息为非延迟消息1<=level<=maxLevel,消息延迟特定时间,例如level1,延迟1s
level > maxLevel,则level maxLevel,例如level==20,延迟2h
在 RocketMQ中定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。
RocketMQ 延迟队列和RabbitMQ延迟队列相比
RocketMQ直接一步到位,功能类似于RabbitMQ的延迟交换器插件、而RabbitMQ提供了延迟队列,但如果是基于消息设置每个消息设置不同的延迟时间,会产生前面的消息早已过期,但后面的消息还存在消息队列中,故此RabbitMQ提供了延迟交换器插件,而RocketMQ 延迟队列设计就比较好了。
具体示例
生产者
// 实例化生产者,并指定生产组名称DefaultMQProducer producer =newDefaultMQProducer("myproducer_group_topic_name_delay_01");//设置实例名称,一个jvm中有多个生产者可以根据实例名区分//默认defaultproducer.setInstanceName("topic_delay");// 指定nameserver的地址producer.setNamesrvAddr("localhost:9876");//设置同步重试次数producer.setRetryTimesWhenSendFailed(2);//设置异步发送次数//producer.setRetryTimesWhenSendAsyncFailed(2);// 初始化生产者producer.start();for(int i =0; i <20; i++){Message message =newMessage("topic_name_delay",("key="+ i).getBytes("utf-8"));//设置延迟消费时间 设置延迟时间级别0,18,0表示不延迟,18表示延迟2h,大于18的都是2hmessage.setDelayTimeLevel(i);// 1 同步发送 如果发送失败会根据重试次数重试SendResult send = producer.send(message);SendStatus sendStatus = send.getSendStatus();System.out.println(sendStatus.toString());}消费者
/*** 推消息消费*/DefaultMQPushConsumer defaultMQPushConsumer =newDefaultMQPushConsumer("consumer_group_delay_01");// 指定nameserver的地址defaultMQPushConsumer.setNamesrvAddr("localhost:9876");defaultMQPushConsumer.subscribe("topic_name_delay","*");// 1 提高消费并行度defaultMQPushConsumer.setConsumeThreadMax(10);defaultMQPushConsumer.setConsumeThreadMin(1);// 2 以批量方式进行 消费// 设置消息批处理的一个批次中消息的最大个数defaultMQPushConsumer.setConsumeMessageBatchMaxSize(10);//设置重试次数 默认16次defaultMQPushConsumer.setMaxReconsumeTimes(1);// 添加消息监听器,一旦有消息推送过来,就进行消费defaultMQPushConsumer.setMessageListener(newMessageListenerConcurrently(){@OverridepublicConsumeConcurrentlyStatusconsumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context){//final MessageQueue messageQueue = context.getMessageQueue();for(MessageExt msg : msgs){System.out.println(msg);try{System.out.println(newString(msg.getBody(),"utf-8"));}catch(UnsupportedEncodingException e){e.printStackTrace();}}// 消息消费成功returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 消息消费失败// return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});消费消息,按照0到18级别来,0 表示不延迟,1表示延迟1s,大于等于18表示延迟2h
按照级别一次类推
默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m8m 9m 10m 20m 30m 1h 2h”,18个level。
这里只拷贝0-8 的打印日志,可以自己等待确认。
MessageExt [queueId=0, storeSize=195, queueOffset=19, sysFlag=0, bornTimestamp=1628949643548, bornHost=/192.168.0.103:55518, storeTimestamp=1628949643554, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F0000000000029A8A, commitLogOffset=170634, bodyCRC=858365373, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=20, CONSUME_START_TIME=1628949650433, UNIQ_KEY=C0A8006748D07C53A9EB47ABD51C0000, CLUSTER=DefaultCluster, WAIT=true, DELAY=0}, body=[107, 101, 121, 61, 48], transactionId=‘null’}]
key=0
MessageExt [queueId=1, storeSize=234, queueOffset=22, sysFlag=0, bornTimestamp=1628949643558, bornHost=/192.168.0.103:55518, storeTimestamp=1628949644566, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002ACF8, commitLogOffset=175352, bodyCRC=1143909675, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=23, CONSUME_START_TIME=1628949650435, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5260001, CLUSTER=DefaultCluster, WAIT=true, DELAY=1, REAL_QID=1}, body=[107, 101, 121, 61, 49], transactionId=‘null’}]
key=1
MessageExt [queueId=2, storeSize=234, queueOffset=18, sysFlag=0, bornTimestamp=1628949643561, bornHost=/192.168.0.103:55518, storeTimestamp=1628949648566, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002ADE2, commitLogOffset=175586, bodyCRC=1562901649, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=19, CONSUME_START_TIME=1628949650436, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5290002, CLUSTER=DefaultCluster, WAIT=true, DELAY=2, REAL_QID=2}, body=[107, 101, 121, 61, 50], transactionId=‘null’}]
key=2
MessageExt [queueId=3, storeSize=234, queueOffset=17, sysFlag=0, bornTimestamp=1628949643566, bornHost=/192.168.0.103:55518, storeTimestamp=1628949653569, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002AECC, commitLogOffset=175820, bodyCRC=706792455, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=18, CONSUME_START_TIME=1628949653572, UNIQ_KEY=C0A8006748D07C53A9EB47ABD52E0003, CLUSTER=DefaultCluster, WAIT=true, DELAY=3, REAL_QID=3}, body=[107, 101, 121, 61, 51], transactionId=‘null’}]
key=3
MessageExt [queueId=0, storeSize=234, queueOffset=20, sysFlag=0, bornTimestamp=1628949643568, bornHost=/192.168.0.103:55518, storeTimestamp=1628949673574, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002B0A0, commitLogOffset=176288, bodyCRC=876894628, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=21, CONSUME_START_TIME=1628949673577, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5300004, CLUSTER=DefaultCluster, WAIT=true, DELAY=4, REAL_QID=0}, body=[107, 101, 121, 61, 52], transactionId=‘null’}]
key=4
MessageExt [queueId=1, storeSize=234, queueOffset=23, sysFlag=0, bornTimestamp=1628949643570, bornHost=/192.168.0.103:55518, storeTimestamp=1628949703574, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002B18A, commitLogOffset=176522, bodyCRC=1128491314, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=24, CONSUME_START_TIME=1628949703577, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5320005, CLUSTER=DefaultCluster, WAIT=true, DELAY=5, REAL_QID=1}, body=[107, 101, 121, 61, 53], transactionId=‘null’}]
key=5
MessageExt [queueId=2, storeSize=234, queueOffset=20, sysFlag=0, bornTimestamp=1628949643572, bornHost=/192.168.0.103:55518, storeTimestamp=1628949763575, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002B35E, commitLogOffset=176990, bodyCRC=1514813576, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=21, CONSUME_START_TIME=1628949763580, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5340006, CLUSTER=DefaultCluster, WAIT=true, DELAY=6, REAL_QID=2}, body=[107, 101, 121, 61, 54], transactionId=‘null’}]
key=6
MessageExt [queueId=3, storeSize=234, queueOffset=19, sysFlag=0, bornTimestamp=1628949643574, bornHost=/192.168.0.103:55518, storeTimestamp=1628949823580, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002B532, commitLogOffset=177458, bodyCRC=760023070, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=20, CONSUME_START_TIME=1628949823583, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5360007, CLUSTER=DefaultCluster, WAIT=true, DELAY=7, REAL_QID=3}, body=[107, 101, 121, 61, 55], transactionId=‘null’}]
key=7
MessageExt [queueId=0, storeSize=234, queueOffset=22, sysFlag=0, bornTimestamp=1628949643576, bornHost=/192.168.0.103:55518, storeTimestamp=1628949883582, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002B706, commitLogOffset=177926, bodyCRC=1039275407, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=23, CONSUME_START_TIME=1628949883586, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5380008, CLUSTER=DefaultCluster, WAIT=true, DELAY=8, REAL_QID=0}, body=[107, 101, 121, 61, 56], transactionId=‘null’}]
key=8
同样我们在控制台可以看到,存放的消息

相关文章:
RocketMQ 延迟队列
什么是延迟队列指消息发送到某个队列后,在指定多长时间之后才能被消费。应用场景RocketMQ 延迟队列定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。broker有配置项messageD…...
【精准计时】北斗GPS卫星时钟同步改变精准计时年代
【精准计时】北斗GPS卫星时钟同步改变精准计时年代 【精准计时】北斗GPS卫星时钟同步改变精准计时年代 北斗GPS成精确计时先锋 北斗GPS精确时间自动校准技术,是一种简便的获取北斗GPS精确时间信息的专利技术,具有灵敏度高、不受时间及地域限制等特点…...
【C#基础】C# 面向对象编程
序号系列文章5【C#基础】C# 运算符总结6【C#基础】C# 常用语句讲解7【C#基础】C# 常用数据结构文章目录前言面向对象的 C#1,类的概念2,类的定义3,类成员4,对象5,继承6,多态性结语前言 😊大家好&…...
数据结构与算法入门
目录数据结构概述逻辑结构存储结构算法概述如何理解“大O记法”时间复杂度空间复杂度数据结构概述 数据结构可以简单的理解为数据与数据之间所存在的一些关系,数据的结构分为数据的存储结构和数据的逻辑结构。 逻辑结构 集合结构:数据元素同属于一个集…...
【OpenAI】基于 Gym-CarRacing 的自动驾驶练习项目 | 路径训练功能的实现 | GYM-Box2D CarRacing
限时开放,猛戳订阅! 👉 《一起玩蛇》🐍 💭 写在前面: 本篇是关于多伦多大学自动驾驶专业项目的博客。GYM-Box2D CarRacing 是一种在 OpenAI Gym 平台上开发和比较强化学习算法的模拟环境。它是流行的 Box2…...
亚马逊、沃尔玛测评自养号测评、退款、撸卡撸货怎么做?
大家好,有很多的测评工作室做亚马逊测评、沃尔玛测评自养号大额退款,撸卡撸货的找到我,问我有什么方式可以解决成本,效率,纯净度,便捷性等问题,测评养号系统从最早的模拟器,虚拟机到…...
Apollo 2.1.0最新版docker 部署多环境 与java spring boot 接入demo (附带一键部署脚本)
最新Apollo 版本发布2.1.0 https://www.apolloconfig.com/#/zh/design/apollo-design 环境说明 ecs 主机一台数据库mysql 8.0docker 环境 apollo 是内网可信应用,最好是部署在内网里面,外网不可使用,避免配置信息泄漏,这里为了方…...
分布式算法 - 一致性Hash算法
一致性Hash算法是个经典算法,Hash环的引入是为解决单调性(Monotonicity) 的问题;虚拟节点的引入是为了解决 平衡性(Balance) 问题。一致性Hash算法引入在分布式集群中,对机器的添加删除,或者机器故障后自动脱离集群这些操作是分布…...
OAuth2.0入门
什么是OAuth2.0 OAuth(Open Authorization)是一个关于授权(authorization)的开放网络标准,允许用户授权第三方应用访问他们存储在另外的服务提供者上的信息,而不需要将用户名和密码提供给第三方移动应用或…...
【HTTP——了解HTTP协议及状态码】
一, 什么是通信通信,就是信息的传递和交换。通信三要素:通信的主体,通信的内容,通信的方式现实生活中的通信:我打电话叫小明来我家吃饭【其中通信的主体是,我,小明。通信内容是&…...
骨传导耳机靠谱吗,骨传导耳机的原理是什么
很多人刚开始接触骨传导耳机时都会具有一个疑问,骨传导耳机是不是真的靠谱,是不是真的不伤害听力?骨传导耳机传输声音的原理是什么? 下面就给大家讲解一下骨传导耳机传输声音的原理以及骨传导耳机对听力到底有没有伤害。 骨传导…...
对个人博客系统进行web自动化测试(包含测试代码和测试的详细过程)
目录 一、总述 二、登录页面测试 一些准备工作 验证页面显示是否正确 验证正常登录的情况 该过程中出现的问题 验证登录失败的情况 关于登录界面的总代码 测试视频 三、注册界面的自动化测试 测试代码 过程中出现的bug 测试视频 四、博客列表页测试(…...
[ 2204听力 ] 五
[ 第五次课 对话1 ] Narrator Listen to a conversation between a student and her Ecology professor (woman) Hi, professor, did you want to talk about my paper? I didn’t get a grade. (man) Ah, yes, I think you might have done the wrong assignment. assign…...
嵌入式常问问题和知识
12、并发和并行的区别? 最本质的区别就是:并发是轮流处理多个任务,并行是同时处理多个任务。 你吃饭吃到一半,电话来了,你一直到吃完了以后才去接,这就说明你不支持并发也不支持并行。 你吃饭吃到一半&…...
【数据结构】空间复杂度
🚀write in front🚀 📜所属专栏:初阶数据结构 🛰️博客主页:睿睿的博客主页 🛰️代码仓库:🎉VS2022_C语言仓库 🎡您的点赞、关注、收藏、评论,是对…...
湖南中创教育提醒校外培训留意这几点,避免维权
校外教育培训机构是市场经济发展的必然产物,有需求就有市场,这个无可厚非。而校外教育培训机构的火热,正是反映出人民群众对教育发展的需求在不断增强。 培训机构分类中有面对大学生参加公务员招考、教师考编等考证考试的培训机构࿱…...
docker 配置私有/本地镜像仓库
docker 配置私有/本地镜像仓库docker pull registry mkdir -p /usr/local/docker/registry-data docker tag registry 192.168.28.132:5000/registry docker run -di -p 5000:5000 --namelocal_registry --restartalways --privilegedtrue --log-drivernone -v /usr/local/d…...
每日学术速递2.23
Subjects: Robotics 1.On discrete symmetries of robotics systems: A group-theoretic and data-driven analysis 标题:关于机器人系统的离散对称性:群论和数据驱动分析 作者:Daniel Ordonez-Apraez, Mario Martin, Antonio Agudo, F…...
LeetCode 232. 用栈实现队列
LeetCode 232. 用栈实现队列 难度:easy\color{Green}{easy}easy 题目描述 请你仅使用两个栈实现先入先出队列。队列应当支持一般队列支持的所有操作(pushpushpush、poppoppop、peekpeekpeek、emptyemptyempty): 实现 MyQueueM…...
AI算法创新赛-人车目标检测竞赛总结04
队伍:AI000038 小组成员:杨志强,林松 1. 算法介绍 1.1 相关工作 当前流行的目标检测算法主要分为三种,一阶段算法:SSD,FCOS,Scaled,YOLO系列等;二阶段算法:…...
接口测试中缓存处理策略
在接口测试中,缓存处理策略是一个关键环节,直接影响测试结果的准确性和可靠性。合理的缓存处理策略能够确保测试环境的一致性,避免因缓存数据导致的测试偏差。以下是接口测试中常见的缓存处理策略及其详细说明: 一、缓存处理的核…...
【网络】每天掌握一个Linux命令 - iftop
在Linux系统中,iftop是网络管理的得力助手,能实时监控网络流量、连接情况等,帮助排查网络异常。接下来从多方面详细介绍它。 目录 【网络】每天掌握一个Linux命令 - iftop工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景…...
多模态2025:技术路线“神仙打架”,视频生成冲上云霄
文|魏琳华 编|王一粟 一场大会,聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中,汇集了学界、创业公司和大厂等三方的热门选手,关于多模态的集中讨论达到了前所未有的热度。其中,…...
C++_核心编程_多态案例二-制作饮品
#include <iostream> #include <string> using namespace std;/*制作饮品的大致流程为:煮水 - 冲泡 - 倒入杯中 - 加入辅料 利用多态技术实现本案例,提供抽象制作饮品基类,提供子类制作咖啡和茶叶*//*基类*/ class AbstractDr…...
AI编程--插件对比分析:CodeRider、GitHub Copilot及其他
AI编程插件对比分析:CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展,AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者,分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...
MySQL用户和授权
开放MySQL白名单 可以通过iptables-save命令确认对应客户端ip是否可以访问MySQL服务: test: # iptables-save | grep 3306 -A mp_srv_whitelist -s 172.16.14.102/32 -p tcp -m tcp --dport 3306 -j ACCEPT -A mp_srv_whitelist -s 172.16.4.16/32 -p tcp -m tcp -…...
初探Service服务发现机制
1.Service简介 Service是将运行在一组Pod上的应用程序发布为网络服务的抽象方法。 主要功能:服务发现和负载均衡。 Service类型的包括ClusterIP类型、NodePort类型、LoadBalancer类型、ExternalName类型 2.Endpoints简介 Endpoints是一种Kubernetes资源…...
IP如何挑?2025年海外专线IP如何购买?
你花了时间和预算买了IP,结果IP质量不佳,项目效率低下不说,还可能带来莫名的网络问题,是不是太闹心了?尤其是在面对海外专线IP时,到底怎么才能买到适合自己的呢?所以,挑IP绝对是个技…...
C++课设:简易日历程序(支持传统节假日 + 二十四节气 + 个人纪念日管理)
名人说:路漫漫其修远兮,吾将上下而求索。—— 屈原《离骚》 创作者:Code_流苏(CSDN)(一个喜欢古诗词和编程的Coder😊) 专栏介绍:《编程项目实战》 目录 一、为什么要开发一个日历程序?1. 深入理解时间算法2. 练习面向对象设计3. 学习数据结构应用二、核心算法深度解析…...
NPOI Excel用OLE对象的形式插入文件附件以及插入图片
static void Main(string[] args) {XlsWithObjData();Console.WriteLine("输出完成"); }static void XlsWithObjData() {// 创建工作簿和单元格,只有HSSFWorkbook,XSSFWorkbook不可以HSSFWorkbook workbook new HSSFWorkbook();HSSFSheet sheet (HSSFSheet)workboo…...
