RabbitMQ延迟队列
目录
一、概念
二、使用场景
三、RabbitMQ 中的 TTL
(一)队列设置 TTL
(二)消息设置 TTL
(三)两者的区别
四、整合SpringBoot实现延迟队列
(一)创建项目
(二)添加依赖
(三)修改配置文件
(四)添加Swagger配置类
五、队列TTL
(一)代码架构图
(二)配置文件类
(三)消息生产者
(四)消息消费者
六、延迟队列优化
(一)代码架构图
(二)配置文件类
(三)消息生产者
七、Rabbitmq 插件实现延迟队列
(一)代码架构图
(二)配置文件类
(三)消息生产者
(四)消息消费者
八、总结
一、概念
二、使用场景
- 订单在十分钟之内未支付则自动取消
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
- 用户注册成功后,如果三天内没有登陆则进行短信提醒。
- 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
- 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

三、RabbitMQ 中的 TTL
(一)队列设置 TTL
Map<String, Object> arguments = new HashMap<>();
// 声明队列的TTL
arguments.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
(二)消息设置 TTL
另一种方式便是针对每条消息设置 TTL
rabbitTemplate.convertAndSend("X", "XC", message, msg -> {msg.getMessageProperties().setExpiration(ttl);return msg;
});
(三)两者的区别
四、整合SpringBoot实现延迟队列
(一)创建项目


(二)添加依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!--RabbitMQ 依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--swagger--><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.9.2</version></dependency><!--RabbitMQ 测试依赖--><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency>
(三)修改配置文件
spring.rabbitmq.host=192.168.23.100
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
(四)添加Swagger配置类
@Configuration
@EnableSwagger2
public class SwaggerConfig {@Beanpublic Docket webApiConfig() {return new Docket(DocumentationType.SWAGGER_2).groupName("webapi").apiInfo(webApiInfo()).select().build();}public ApiInfo webApiInfo() {return new ApiInfoBuilder().title("rabbitmq 接口文档").description("本文档描述了 rabbitmq 微服务接口定义").version("1.0").contact(new Contact("enjoy6288", "http://atguigu.com","1551388580@qq.com")).build();}
}
五、队列TTL
(一)代码架构图
(二)配置文件类
@Configuration
public class TtlQueueConfig {public static final String X_EXCHANGE = "X";public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";public static final String Y_DEAD_LETTER_EXCHANGE = "Y";public static final String DEAD_LETTER_QUEUE = "QD";// 声明xExchange@Bean("xExchange")public DirectExchange xExchange() {return new DirectExchange(X_EXCHANGE);}// 声明yExchange@Bean("yExchange")public DirectExchange yExchange() {return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}// 声明队列A@Bean("queueA")public Queue queueA() {Map<String, Object> arguments = new HashMap<>();// 当前队列的死信交换机arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 当前队列的死信路由keyarguments.put("x-dead-letter-routing-key", "YD");// 声明队列的TTLarguments.put("x-message-ttl", 10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}// 声明队列A绑定交换机X@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange")DirectExchange xExchange) {return BindingBuilder.bind(queueA).to(xExchange).with("XA");}// 声明队列B@Bean("queueB")public Queue queueB() {Map<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 当前队列的死信路由keyarguments.put("x-dead-letter-routing-key", "YD");// 声明队列的TTLarguments.put("x-message-ttl", 40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}// 声明队列B绑定交换机X@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange")DirectExchange xExchange) {return BindingBuilder.bind(queueB).to(xExchange).with("XB");}// 声明死信队列@Bean("queueD")public Queue queueD() {return new Queue(DEAD_LETTER_QUEUE);}@Bean// 声明死信队列 QD 绑定关系public Binding queuedBindingY(@Qualifier("queueD")Queue queueD,@Qualifier("yExchange")DirectExchange exchange) {return BindingBuilder.bind(queueD).to(exchange).with("YD");}}
(三)消息生产者
@GetMapping("/sendMsg/{message}")public void sendMsg(@PathVariable String message) {log.info("当前时间是{},发送一条信息给两个 TTL 队列:{}", new Date().toString(), message);rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10s的队列" + message);rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40s的队列" + message);}
(四)消息消费者
@Component
@Slf4j
public class DeadLetterQueueConsumer {@RabbitListener(queues = "QD")public void receiveD(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);}
}
发起一个请求 http://localhost:8080/ttl/sendMsg/嘻嘻嘻
六、延迟队列优化
(一)代码架构图
在这里新增了一个队列 QC,绑定关系如下,该队列不设置 TTL 时间,而是由生产者设置过期时间

(二)配置文件类
@Configuration
public class TtlQueueConfig {public static final String X_EXCHANGE = "X";public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";public static final String Y_DEAD_LETTER_EXCHANGE = "Y";public static final String DEAD_LETTER_QUEUE = "QD";public static final String QUEUE_C = "QC";// 声明xExchange@Bean("xExchange")public DirectExchange xExchange() {return new DirectExchange(X_EXCHANGE);}// 声明yExchange@Bean("yExchange")public DirectExchange yExchange() {return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}// 声明队列A@Bean("queueA")public Queue queueA() {Map<String, Object> arguments = new HashMap<>();// 当前队列的死信交换机arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 当前队列的死信路由keyarguments.put("x-dead-letter-routing-key", "YD");// 声明队列的TTLarguments.put("x-message-ttl", 10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}// 声明队列A绑定交换机X@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange")DirectExchange xExchange) {return BindingBuilder.bind(queueA).to(xExchange).with("XA");}// 声明队列B@Bean("queueB")public Queue queueB() {Map<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 当前队列的死信路由keyarguments.put("x-dead-letter-routing-key", "YD");// 声明队列的TTLarguments.put("x-message-ttl", 40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}// 声明队列B绑定交换机X@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange")DirectExchange xExchange) {return BindingBuilder.bind(queueB).to(xExchange).with("XB");}// 声明队列C@Bean("queueC")public Queue queueC() {Map<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 当前队列的死信路由keyarguments.put("x-dead-letter-routing-key", "YD");return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();}// 声明队列C绑定交换机X@Beanpublic Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange")DirectExchange xExchange) {return BindingBuilder.bind(queueC).to(xExchange).with("XC");}// 声明死信队列@Bean("queueD")public Queue queueD() {return new Queue(DEAD_LETTER_QUEUE);}@Bean// 声明死信队列 QD 绑定关系public Binding queuedBindingY(@Qualifier("queueD")Queue queueD,@Qualifier("yExchange")DirectExchange exchange) {return BindingBuilder.bind(queueD).to(exchange).with("YD");}}
(三)消息生产者
@GetMapping("/sendExpirationMsg/{message}/{ttl}")public void sendMsg(@PathVariable String message, @PathVariable String ttl) {log.info("当前时间是{},发送一条过期信息给两个 TTL 队列:{}", new Date().toString(), message);rabbitTemplate.convertAndSend("X", "XC", message, msg -> {msg.getMessageProperties().setExpiration(ttl);return msg;});}
七、Rabbitmq 插件实现延迟队列
关于插件的安装可以查看这篇文章Docker安装RabbitMq延迟队列插件

(一)代码架构图
在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:

(二)配置文件类
(三)消息生产者
/** 基于插件的延迟队列和延迟交换机*/
@Configuration
public class DelayedQueueConfig {public static final String DELAYED_QUEUE_NAME = "delayed.queue";public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";// 声明队列@Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE_NAME);}// 声明自定义交换机@Beanpublic CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);}// 声明队列和延迟交换机的绑定@Beanpublic Binding bindingDelayedQueue(@Qualifier("delayedQueue")Queue delayedQueue,@Qualifier("delayedExchange")CustomExchange exchange) {return BindingBuilder.bind(delayedQueue).to(exchange).with(DELAYED_ROUTING_KEY).noargs();}
}
(四)消息消费者
@Component
@Slf4j
public class DelayedQueueConsumer {@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)public void receiveDelayedQueue(String message) {log.info("当前时间:{}, 接收到消息: {}", new Date().toString(), message);}
}
第二个消息被先消费掉了,符合预期
八、总结
相关文章:
RabbitMQ延迟队列
目录 一、概念 二、使用场景 三、RabbitMQ 中的 TTL (一)队列设置 TTL (二)消息设置 TTL (三)两者的区别 四、整合SpringBoot实现延迟队列 (一)创建项目 (二&am…...
Java中常用的七种队列你了解多少?
文章目录Java中常用的七种队列你了解多少?ArrayBlockingQueue队列如何使用?添加元素到队列获取队列中的元素遍历队列LinkedBlockingQueue队列如何使用?1. 创建SynchronousQueue对象2. 添加元素到队列3. 获取队列中的元素4. 遍历队列SynchronousQueue队列…...
<Java获取时间日期工具类>常见八种场景(一)
一:自定义时间日期工具类常用的八种方式(整理): 0,getTimeSecondNum:时间日期转成秒数,常用于大小比较 1,getLastYearMonthLastDay:获取去年当月最后一天的时间日期 2,getLastYearM…...
接上一篇 对多个模型环形旋转进行优化 指定旋转位置
using System.Collections; using System.Collections.Generic; using UnityEngine; using DG.Tweening; public class ModelAnimal : MonoBehaviour { //记录鼠标滑动 public Vector2 lastPos;//鼠标上次位置 Vector2 currPos;//鼠标当前位置 Vector2 offset;//两次位置的偏移…...
Unity中获取地形的法线
序之前,生成了地形图:(42条消息) 从灰度图到地形图_averagePerson的博客-CSDN博客那末,地形的法线贴图怎么获取?大概分为两个部分吧,先拿到法线数据,再画到纹理中去。关于法线计算Unity - Scripting API: M…...
模型解释性:PFI、PDP、ICE等包的用法
本篇主要介绍几种其他较常用的模型解释性方法。 1. Permutation Feature Importance(PFI) 1.1 算法原理 置换特征重要性(Permutation Feature Importance)的概念很简单,其衡量特征重要性的方法如下:计算特征改变后模型预测误差的增加。如果打乱该特征的…...
spring常见面试题(2023最新)
目录前言1.spring是什么2.spring的设计核心是什么3.IOC和AOP面试题4.spring的优点和缺点5.spring中bean的作用域6.spring中bean的注入方式7.BeanFactory 和 ApplicationContext有什么区别?8.循环依赖的情况,怎么解决?9.spring中单例Bean是线程…...
华为OD机试题,用 Java 解【压缩报文还原】问题
最近更新的博客 华为OD机试题,用 Java 解【停车场车辆统计】问题华为OD机试题,用 Java 解【字符串变换最小字符串】问题华为OD机试题,用 Java 解【计算最大乘积】问题华为OD机试题,用 Java 解【DNA 序列】问题华为OD机试 - 组成最大数(Java) | 机试题算法思路 【2023】使…...
机器学习-BM-FKNCN、BM-FKNN等分类器对比实验
目录 一、简介和环境准备 二、算法简介 2.1四种方法类: 2.1.1FKNN 2.1.2FKNCN 2.1.3BM-FKNN 2.1.3BM-FKNCN 2.2数据预处理 2.3输出视图 2.4调用各种方法看准确率 2.4.1BM-FKNCN 2.4.2BM-FKNN 2.4.3FKNCN 2.4.4FKNN 2.4.5KNN 一、简介和环境准备 k…...
ChatGPT火了,对话式人工智能还能干嘛?
身兼数职的ChatGPT 从2022火到了2023 连日来一直是各大平台的热议对象 其实除了写诗、敲代码、处理文档 以ChatGPT为代表的 对话式人工智能 还有更重要的工作要做 对话式AI与聊天机器人 相信大多数人…...
十一、操作数栈的特点(Operand Sstack)
1.每一个独立的栈帧中除了包含局部变量表以外,还包含一个后进先出的操作数栈,也可以称之为表达式栈。 2.操作数栈,在方法执行过程中,根据字节码指令,往栈中写入数据,或提取数据,即入栈ÿ…...
拆解瑞幸新用户激活流程,如何让用户“动”起来?
Aha时刻 一个产品的拉新环节,是多种方式并存的;新用户可能来自于商务搭建了新的渠道,运营策划了新的活动,企划发布了新的广告,销售谈下了新的客户,市场推广了新的群体,以及产品本身的口碑传播,功能更新带来的自然流量。 这是一个群策群力的环节,不同的团队背负不同的K…...
tkinter界面的TCP通信/开启线程等待接收数据
前言 用简洁的语言写一个可以与TCP客户端实时通信的界面。之前做了一个项目是要与PLC进行信息交互的界面,在测试的时候就利用TCP客户端来实验,文末会附上TCP客户端。本文分为三部分,第一部分是在界面向TCP发送数据,第二部分是接收…...
华为OD机试题,用 Java 解【任务混部】问题
最近更新的博客 华为OD机试题,用 Java 解【停车场车辆统计】问题华为OD机试题,用 Java 解【字符串变换最小字符串】问题华为OD机试题,用 Java 解【计算最大乘积】问题华为OD机试题,用 Java 解【DNA 序列】问题华为OD机试 - 组成最大数(Java) | 机试题算法思路 【2023】使…...
看linux内核启动流程需要的汇编指令解释
一、指令 0.MRS 和MSR MRS 指令: 对状态寄存器CPSR和SPSR进行读操作。 MSR指令: 对状态寄存器CPSR和SPSR进行写操作。 1.adrp adrp x0, boot_args把boot_args的页基地址提取出来,放到x0中。 2.stp stp x21, x1, [x0]将 x21, x1 的值存入 x0寄存器记录的地址中…...
【巨人的肩膀】JAVA面试总结(二)
1、💪 目录1、💪1.0、什么是面向对象1.1、JDK、JRE、JVM之间的区别1.2、什么是字节码1.3、hashCode()与equals()之间的联系1.4、String、StringBuffer、StringBuilder的区别1.5、和equals方法的区别1.6、重载和重写的区别1.7、List和Set的区别1.8、Array…...
【网络安全入门】零基础小白必看!!!
看到很多小伙伴都想学习 网络安全 ,让自己掌握更多的 技能,但是学习兴趣有了,却发现自己不知道哪里有 学习资源◇瞬间兴致全无!◇ 😄在线找人要资料太卑微,自己上网下载又发现要收费0 🙃差点当…...
字节前端经典面试题(附答案)
有哪些可能引起前端安全的问题? 跨站脚本 (Cross-Site Scripting, XSS): ⼀种代码注⼊⽅式, 为了与 CSS 区分所以被称作 XSS。早期常⻅于⽹络论坛, 起因是⽹站没有对⽤户的输⼊进⾏严格的限制, 使得攻击者可以将脚本上传到帖⼦让其他⼈浏览到有恶意脚本的⻚⾯, 其注⼊⽅式很简…...
数据库管理工具的使用
目录 摘要 一、Navicat是什么? 二、使用步骤 1.如何下载与安装 2.如何连接远程数据库 总结 摘要 本文主要介绍数据库管理工具的使用 一、Navicat是什么? 它是一款数据库管理工具,将此工具连接数据库,你可以从中看到各种数据库的详细…...
让马斯克反悔的毫米波雷达,被国产雷达头部厂商木牛科技迭代到了5D时代
近日,特斯拉或将在其HW4.0硬件系统配置一枚高精度4D毫米波雷达的消息在外网刷屏。据分析,“纯视觉”信仰者马斯克之所以做出这样的决定,一方面是减配了雷达的特斯拉自动驾驶,表现不尽如人意;另一方面也跟毫米波雷达的技…...
HP-Socket版本发布后用户反馈分析:情感、主题与趋势
HP-Socket版本发布后用户反馈分析:情感、主题与趋势 【免费下载链接】HP-Socket High Performance TCP/UDP/HTTP Communication Component 项目地址: https://gitcode.com/gh_mirrors/hp/HP-Socket HP-Socket作为一款高性能TCP/UDP/HTTP通信组件,…...
如何突破Cursor试用限制?3种创新方案全解析
如何突破Cursor试用限制?3种创新方案全解析 【免费下载链接】go-cursor-help 解决Cursor在免费订阅期间出现以下提示的问题: Youve reached your trial request limit. / Too many free trial accounts used on this machine. Please upgrade to pro. We have this …...
微软服软!被骂5年的Win11将被“整改”:告别强制更新、减少Copilot、任务栏摆放自由
整理 | 屠敏出品 | CSDN(ID:CSDNnews)Windows 11 自 2021 年发布以来,因任务栏功能缩水、UI 不统一、强制网络登录以及更高的硬件门槛,成为用户集中吐槽的焦点。再加上近来微软猛推 AI 功能,Copilot 的入口…...
ArcGIS Desktop许可证被占满?别慌,这3个方法帮你快速释放Advanced许可(附详细步骤)
ArcGIS Desktop高级许可被占用?3种高效解决方案与实战技巧 当你正在赶制项目报告或处理关键地理数据时,突然弹出的"All ArcGIS for Desktop Advanced licenses are in use"错误提示足以让任何GIS专业人士心跳加速。这种情况往往发生在团队共享…...
WPF图片处理避坑指南:Image控件Stretch属性的4种模式详解(含效果对比图)
WPF图片处理避坑指南:Image控件Stretch属性的4种模式详解 刚接触WPF开发的工程师们,是否经常遇到图片显示变形、比例失调的困扰?Image控件的Stretch属性看似简单,却藏着不少设计哲学。今天我们就来彻底拆解这个影响图片显示效果的…...
物联网水产养殖监控系统:智能联动,实现养殖设备自动调控
一、应用背景 水产养殖是我国农业经济的重要组成部分,传统养殖模式长期依赖人工巡检、经验判断,存在诸多难以破解的行业痛点,严重制约养殖效益与产业可持续发展。随着物联网、大数据、边缘计算、无线通信技术的成熟,搭建智能化、数…...
Comsol热流耦合拓扑优化:最大化放热量与功率耗散的探索
Comsol热流耦合拓扑优化。 目标函数采用最大化放热量和功率耗散。在工程领域,热流耦合问题一直是研究的重点,尤其是如何通过拓扑优化来实现特定目标,比如最大化放热量和功率耗散,这对于提高系统性能至关重要。而Comsol作为一款强大…...
游戏原画效率提升50%:Pixel Fashion Atelier在角色装备概念图批量生成中的应用
游戏原画效率提升50%:Pixel Fashion Atelier在角色装备概念图批量生成中的应用 1. 传统游戏原画设计的痛点 游戏开发过程中,角色装备设计往往是最耗时的环节之一。传统工作流程中,美术团队需要: 手工绘制数十种装备变体反复修改…...
C++ 无原生 JSON 支持?一文实现通用序列化与反序列化封装方案
前言 在现代软件开发中,JSON(JavaScript Object Notation)因其轻量级和易读性成为数据交换的主流格式。C虽无原生JSON支持,但通过封装第三方库(如nlohmann/json),可高效实现序列化(…...
自动驾驶之心实习生招募|上海线下,一起做点真东西
点击下方卡片,关注“自动驾驶之心”公众号 戳我-> 领取自动驾驶近30个方向学习路线 自动驾驶之心是业内头部的垂类自媒体平台,过去一年,我们梳理了端到端、VLA、世界模型、强化学习等前沿方向的最新进展,也分享了行业概况、融资…...

