RocketMQ 延迟队列
什么是延迟队列
指消息发送到某个队列后,在指定多长时间之后才能被消费。
应用场景
RocketMQ 延迟队列
定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。
broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m
8m 9m 10m 20m 30m 1h 2h”,18个level。
可以配置自定义messageDelayLevel。需要注意的是 messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:
msg.setDelayLevel(level)level有以下三种情况:
level == 0,消息为非延迟消息1<=level<=maxLevel,消息延迟特定时间,例如level1,延迟1s
level > maxLevel,则level maxLevel,例如level==20,延迟2h
在 RocketMQ中定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。
RocketMQ 延迟队列和RabbitMQ延迟队列相比
RocketMQ直接一步到位,功能类似于RabbitMQ的延迟交换器插件、而RabbitMQ提供了延迟队列,但如果是基于消息设置每个消息设置不同的延迟时间,会产生前面的消息早已过期,但后面的消息还存在消息队列中,故此RabbitMQ提供了延迟交换器插件,而RocketMQ 延迟队列设计就比较好了。
具体示例
生产者
// 实例化生产者,并指定生产组名称DefaultMQProducer producer =newDefaultMQProducer("myproducer_group_topic_name_delay_01");//设置实例名称,一个jvm中有多个生产者可以根据实例名区分//默认defaultproducer.setInstanceName("topic_delay");// 指定nameserver的地址producer.setNamesrvAddr("localhost:9876");//设置同步重试次数producer.setRetryTimesWhenSendFailed(2);//设置异步发送次数//producer.setRetryTimesWhenSendAsyncFailed(2);// 初始化生产者producer.start();for(int i =0; i <20; i++){Message message =newMessage("topic_name_delay",("key="+ i).getBytes("utf-8"));//设置延迟消费时间 设置延迟时间级别0,18,0表示不延迟,18表示延迟2h,大于18的都是2hmessage.setDelayTimeLevel(i);// 1 同步发送 如果发送失败会根据重试次数重试SendResult send = producer.send(message);SendStatus sendStatus = send.getSendStatus();System.out.println(sendStatus.toString());}消费者
/*** 推消息消费*/DefaultMQPushConsumer defaultMQPushConsumer =newDefaultMQPushConsumer("consumer_group_delay_01");// 指定nameserver的地址defaultMQPushConsumer.setNamesrvAddr("localhost:9876");defaultMQPushConsumer.subscribe("topic_name_delay","*");// 1 提高消费并行度defaultMQPushConsumer.setConsumeThreadMax(10);defaultMQPushConsumer.setConsumeThreadMin(1);// 2 以批量方式进行 消费// 设置消息批处理的一个批次中消息的最大个数defaultMQPushConsumer.setConsumeMessageBatchMaxSize(10);//设置重试次数 默认16次defaultMQPushConsumer.setMaxReconsumeTimes(1);// 添加消息监听器,一旦有消息推送过来,就进行消费defaultMQPushConsumer.setMessageListener(newMessageListenerConcurrently(){@OverridepublicConsumeConcurrentlyStatusconsumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context){//final MessageQueue messageQueue = context.getMessageQueue();for(MessageExt msg : msgs){System.out.println(msg);try{System.out.println(newString(msg.getBody(),"utf-8"));}catch(UnsupportedEncodingException e){e.printStackTrace();}}// 消息消费成功returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 消息消费失败// return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});消费消息,按照0到18级别来,0 表示不延迟,1表示延迟1s,大于等于18表示延迟2h
按照级别一次类推
默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m8m 9m 10m 20m 30m 1h 2h”,18个level。
这里只拷贝0-8 的打印日志,可以自己等待确认。
MessageExt [queueId=0, storeSize=195, queueOffset=19, sysFlag=0, bornTimestamp=1628949643548, bornHost=/192.168.0.103:55518, storeTimestamp=1628949643554, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F0000000000029A8A, commitLogOffset=170634, bodyCRC=858365373, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=20, CONSUME_START_TIME=1628949650433, UNIQ_KEY=C0A8006748D07C53A9EB47ABD51C0000, CLUSTER=DefaultCluster, WAIT=true, DELAY=0}, body=[107, 101, 121, 61, 48], transactionId=‘null’}]
key=0
MessageExt [queueId=1, storeSize=234, queueOffset=22, sysFlag=0, bornTimestamp=1628949643558, bornHost=/192.168.0.103:55518, storeTimestamp=1628949644566, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002ACF8, commitLogOffset=175352, bodyCRC=1143909675, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=23, CONSUME_START_TIME=1628949650435, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5260001, CLUSTER=DefaultCluster, WAIT=true, DELAY=1, REAL_QID=1}, body=[107, 101, 121, 61, 49], transactionId=‘null’}]
key=1
MessageExt [queueId=2, storeSize=234, queueOffset=18, sysFlag=0, bornTimestamp=1628949643561, bornHost=/192.168.0.103:55518, storeTimestamp=1628949648566, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002ADE2, commitLogOffset=175586, bodyCRC=1562901649, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=19, CONSUME_START_TIME=1628949650436, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5290002, CLUSTER=DefaultCluster, WAIT=true, DELAY=2, REAL_QID=2}, body=[107, 101, 121, 61, 50], transactionId=‘null’}]
key=2
MessageExt [queueId=3, storeSize=234, queueOffset=17, sysFlag=0, bornTimestamp=1628949643566, bornHost=/192.168.0.103:55518, storeTimestamp=1628949653569, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002AECC, commitLogOffset=175820, bodyCRC=706792455, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=18, CONSUME_START_TIME=1628949653572, UNIQ_KEY=C0A8006748D07C53A9EB47ABD52E0003, CLUSTER=DefaultCluster, WAIT=true, DELAY=3, REAL_QID=3}, body=[107, 101, 121, 61, 51], transactionId=‘null’}]
key=3
MessageExt [queueId=0, storeSize=234, queueOffset=20, sysFlag=0, bornTimestamp=1628949643568, bornHost=/192.168.0.103:55518, storeTimestamp=1628949673574, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002B0A0, commitLogOffset=176288, bodyCRC=876894628, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=21, CONSUME_START_TIME=1628949673577, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5300004, CLUSTER=DefaultCluster, WAIT=true, DELAY=4, REAL_QID=0}, body=[107, 101, 121, 61, 52], transactionId=‘null’}]
key=4
MessageExt [queueId=1, storeSize=234, queueOffset=23, sysFlag=0, bornTimestamp=1628949643570, bornHost=/192.168.0.103:55518, storeTimestamp=1628949703574, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002B18A, commitLogOffset=176522, bodyCRC=1128491314, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=24, CONSUME_START_TIME=1628949703577, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5320005, CLUSTER=DefaultCluster, WAIT=true, DELAY=5, REAL_QID=1}, body=[107, 101, 121, 61, 53], transactionId=‘null’}]
key=5
MessageExt [queueId=2, storeSize=234, queueOffset=20, sysFlag=0, bornTimestamp=1628949643572, bornHost=/192.168.0.103:55518, storeTimestamp=1628949763575, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002B35E, commitLogOffset=176990, bodyCRC=1514813576, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=21, CONSUME_START_TIME=1628949763580, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5340006, CLUSTER=DefaultCluster, WAIT=true, DELAY=6, REAL_QID=2}, body=[107, 101, 121, 61, 54], transactionId=‘null’}]
key=6
MessageExt [queueId=3, storeSize=234, queueOffset=19, sysFlag=0, bornTimestamp=1628949643574, bornHost=/192.168.0.103:55518, storeTimestamp=1628949823580, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002B532, commitLogOffset=177458, bodyCRC=760023070, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=20, CONSUME_START_TIME=1628949823583, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5360007, CLUSTER=DefaultCluster, WAIT=true, DELAY=7, REAL_QID=3}, body=[107, 101, 121, 61, 55], transactionId=‘null’}]
key=7
MessageExt [queueId=0, storeSize=234, queueOffset=22, sysFlag=0, bornTimestamp=1628949643576, bornHost=/192.168.0.103:55518, storeTimestamp=1628949883582, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002B706, commitLogOffset=177926, bodyCRC=1039275407, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=23, CONSUME_START_TIME=1628949883586, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5380008, CLUSTER=DefaultCluster, WAIT=true, DELAY=8, REAL_QID=0}, body=[107, 101, 121, 61, 56], transactionId=‘null’}]
key=8
同样我们在控制台可以看到,存放的消息

相关文章:
RocketMQ 延迟队列
什么是延迟队列指消息发送到某个队列后,在指定多长时间之后才能被消费。应用场景RocketMQ 延迟队列定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。broker有配置项messageD…...
【精准计时】北斗GPS卫星时钟同步改变精准计时年代
【精准计时】北斗GPS卫星时钟同步改变精准计时年代 【精准计时】北斗GPS卫星时钟同步改变精准计时年代 北斗GPS成精确计时先锋 北斗GPS精确时间自动校准技术,是一种简便的获取北斗GPS精确时间信息的专利技术,具有灵敏度高、不受时间及地域限制等特点…...
【C#基础】C# 面向对象编程
序号系列文章5【C#基础】C# 运算符总结6【C#基础】C# 常用语句讲解7【C#基础】C# 常用数据结构文章目录前言面向对象的 C#1,类的概念2,类的定义3,类成员4,对象5,继承6,多态性结语前言 😊大家好&…...
数据结构与算法入门
目录数据结构概述逻辑结构存储结构算法概述如何理解“大O记法”时间复杂度空间复杂度数据结构概述 数据结构可以简单的理解为数据与数据之间所存在的一些关系,数据的结构分为数据的存储结构和数据的逻辑结构。 逻辑结构 集合结构:数据元素同属于一个集…...
【OpenAI】基于 Gym-CarRacing 的自动驾驶练习项目 | 路径训练功能的实现 | GYM-Box2D CarRacing
限时开放,猛戳订阅! 👉 《一起玩蛇》🐍 💭 写在前面: 本篇是关于多伦多大学自动驾驶专业项目的博客。GYM-Box2D CarRacing 是一种在 OpenAI Gym 平台上开发和比较强化学习算法的模拟环境。它是流行的 Box2…...
亚马逊、沃尔玛测评自养号测评、退款、撸卡撸货怎么做?
大家好,有很多的测评工作室做亚马逊测评、沃尔玛测评自养号大额退款,撸卡撸货的找到我,问我有什么方式可以解决成本,效率,纯净度,便捷性等问题,测评养号系统从最早的模拟器,虚拟机到…...
Apollo 2.1.0最新版docker 部署多环境 与java spring boot 接入demo (附带一键部署脚本)
最新Apollo 版本发布2.1.0 https://www.apolloconfig.com/#/zh/design/apollo-design 环境说明 ecs 主机一台数据库mysql 8.0docker 环境 apollo 是内网可信应用,最好是部署在内网里面,外网不可使用,避免配置信息泄漏,这里为了方…...
分布式算法 - 一致性Hash算法
一致性Hash算法是个经典算法,Hash环的引入是为解决单调性(Monotonicity) 的问题;虚拟节点的引入是为了解决 平衡性(Balance) 问题。一致性Hash算法引入在分布式集群中,对机器的添加删除,或者机器故障后自动脱离集群这些操作是分布…...
OAuth2.0入门
什么是OAuth2.0 OAuth(Open Authorization)是一个关于授权(authorization)的开放网络标准,允许用户授权第三方应用访问他们存储在另外的服务提供者上的信息,而不需要将用户名和密码提供给第三方移动应用或…...
【HTTP——了解HTTP协议及状态码】
一, 什么是通信通信,就是信息的传递和交换。通信三要素:通信的主体,通信的内容,通信的方式现实生活中的通信:我打电话叫小明来我家吃饭【其中通信的主体是,我,小明。通信内容是&…...
骨传导耳机靠谱吗,骨传导耳机的原理是什么
很多人刚开始接触骨传导耳机时都会具有一个疑问,骨传导耳机是不是真的靠谱,是不是真的不伤害听力?骨传导耳机传输声音的原理是什么? 下面就给大家讲解一下骨传导耳机传输声音的原理以及骨传导耳机对听力到底有没有伤害。 骨传导…...
对个人博客系统进行web自动化测试(包含测试代码和测试的详细过程)
目录 一、总述 二、登录页面测试 一些准备工作 验证页面显示是否正确 验证正常登录的情况 该过程中出现的问题 验证登录失败的情况 关于登录界面的总代码 测试视频 三、注册界面的自动化测试 测试代码 过程中出现的bug 测试视频 四、博客列表页测试(…...
[ 2204听力 ] 五
[ 第五次课 对话1 ] Narrator Listen to a conversation between a student and her Ecology professor (woman) Hi, professor, did you want to talk about my paper? I didn’t get a grade. (man) Ah, yes, I think you might have done the wrong assignment. assign…...
嵌入式常问问题和知识
12、并发和并行的区别? 最本质的区别就是:并发是轮流处理多个任务,并行是同时处理多个任务。 你吃饭吃到一半,电话来了,你一直到吃完了以后才去接,这就说明你不支持并发也不支持并行。 你吃饭吃到一半&…...
【数据结构】空间复杂度
🚀write in front🚀 📜所属专栏:初阶数据结构 🛰️博客主页:睿睿的博客主页 🛰️代码仓库:🎉VS2022_C语言仓库 🎡您的点赞、关注、收藏、评论,是对…...
湖南中创教育提醒校外培训留意这几点,避免维权
校外教育培训机构是市场经济发展的必然产物,有需求就有市场,这个无可厚非。而校外教育培训机构的火热,正是反映出人民群众对教育发展的需求在不断增强。 培训机构分类中有面对大学生参加公务员招考、教师考编等考证考试的培训机构࿱…...
docker 配置私有/本地镜像仓库
docker 配置私有/本地镜像仓库docker pull registry mkdir -p /usr/local/docker/registry-data docker tag registry 192.168.28.132:5000/registry docker run -di -p 5000:5000 --namelocal_registry --restartalways --privilegedtrue --log-drivernone -v /usr/local/d…...
每日学术速递2.23
Subjects: Robotics 1.On discrete symmetries of robotics systems: A group-theoretic and data-driven analysis 标题:关于机器人系统的离散对称性:群论和数据驱动分析 作者:Daniel Ordonez-Apraez, Mario Martin, Antonio Agudo, F…...
LeetCode 232. 用栈实现队列
LeetCode 232. 用栈实现队列 难度:easy\color{Green}{easy}easy 题目描述 请你仅使用两个栈实现先入先出队列。队列应当支持一般队列支持的所有操作(pushpushpush、poppoppop、peekpeekpeek、emptyemptyempty): 实现 MyQueueM…...
AI算法创新赛-人车目标检测竞赛总结04
队伍:AI000038 小组成员:杨志强,林松 1. 算法介绍 1.1 相关工作 当前流行的目标检测算法主要分为三种,一阶段算法:SSD,FCOS,Scaled,YOLO系列等;二阶段算法:…...
JavaScript 中的 ES|QL:利用 Apache Arrow 工具
作者:来自 Elastic Jeffrey Rengifo 学习如何将 ES|QL 与 JavaScript 的 Apache Arrow 客户端工具一起使用。 想获得 Elastic 认证吗?了解下一期 Elasticsearch Engineer 培训的时间吧! Elasticsearch 拥有众多新功能,助你为自己…...
什么?连接服务器也能可视化显示界面?:基于X11 Forwarding + CentOS + MobaXterm实战指南
文章目录 什么是X11?环境准备实战步骤1️⃣ 服务器端配置(CentOS)2️⃣ 客户端配置(MobaXterm)3️⃣ 验证X11 Forwarding4️⃣ 运行自定义GUI程序(Python示例)5️⃣ 成功效果(yakit方式)
本期内容并不是很难,相信大家会学的很愉快,当然对于有后端基础的朋友来说,本期内容更加容易了解,当然没有基础的也别担心,本期内容会详细解释有关内容 本期用到的软件:yakit(因为经过之前好多期…...
听写流程自动化实践,轻量级教育辅助
随着智能教育工具的发展,越来越多的传统学习方式正在被数字化、自动化所优化。听写作为语文、英语等学科中重要的基础训练形式,也迎来了更高效的解决方案。 这是一款轻量但功能强大的听写辅助工具。它是基于本地词库与可选在线语音引擎构建,…...
html css js网页制作成品——HTML+CSS榴莲商城网页设计(4页)附源码
目录 一、👨🎓网站题目 二、✍️网站描述 三、📚网站介绍 四、🌐网站效果 五、🪓 代码实现 🧱HTML 六、🥇 如何让学习不再盲目 七、🎁更多干货 一、👨…...
Python ROS2【机器人中间件框架】 简介
销量过万TEEIS德国护膝夏天用薄款 优惠券冠生园 百花蜂蜜428g 挤压瓶纯蜂蜜巨奇严选 鞋子除臭剂360ml 多芬身体磨砂膏280g健70%-75%酒精消毒棉片湿巾1418cm 80片/袋3袋大包清洁食品用消毒 优惠券AIMORNY52朵红玫瑰永生香皂花同城配送非鲜花七夕情人节生日礼物送女友 热卖妙洁棉…...
Xen Server服务器释放磁盘空间
disk.sh #!/bin/bashcd /run/sr-mount/e54f0646-ae11-0457-b64f-eba4673b824c # 全部虚拟机物理磁盘文件存储 a$(ls -l | awk {print $NF} | cut -d. -f1) # 使用中的虚拟机物理磁盘文件 b$(xe vm-disk-list --multiple | grep uuid | awk {print $NF})printf "%s\n"…...
08. C#入门系列【类的基本概念】:开启编程世界的奇妙冒险
C#入门系列【类的基本概念】:开启编程世界的奇妙冒险 嘿,各位编程小白探险家!欢迎来到 C# 的奇幻大陆!今天咱们要深入探索这片大陆上至关重要的 “建筑”—— 类!别害怕,跟着我,保准让你轻松搞…...
MySQL 索引底层结构揭秘:B-Tree 与 B+Tree 的区别与应用
文章目录 一、背景知识:什么是 B-Tree 和 BTree? B-Tree(平衡多路查找树) BTree(B-Tree 的变种) 二、结构对比:一张图看懂 三、为什么 MySQL InnoDB 选择 BTree? 1. 范围查询更快 2…...
tomcat入门
1 tomcat 是什么 apache开发的web服务器可以为java web程序提供运行环境tomcat是一款高效,稳定,易于使用的web服务器tomcathttp服务器Servlet服务器 2 tomcat 目录介绍 -bin #存放tomcat的脚本 -conf #存放tomcat的配置文件 ---catalina.policy #to…...
