RabbitMQ延迟队列
目录
一、概念
二、使用场景
三、RabbitMQ 中的 TTL
(一)队列设置 TTL
(二)消息设置 TTL
(三)两者的区别
四、整合SpringBoot实现延迟队列
(一)创建项目
(二)添加依赖
(三)修改配置文件
(四)添加Swagger配置类
五、队列TTL
(一)代码架构图
(二)配置文件类
(三)消息生产者
(四)消息消费者
六、延迟队列优化
(一)代码架构图
(二)配置文件类
(三)消息生产者
七、Rabbitmq 插件实现延迟队列
(一)代码架构图
(二)配置文件类
(三)消息生产者
(四)消息消费者
八、总结
一、概念
二、使用场景
- 订单在十分钟之内未支付则自动取消
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
- 用户注册成功后,如果三天内没有登陆则进行短信提醒。
- 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
- 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

三、RabbitMQ 中的 TTL
(一)队列设置 TTL
Map<String, Object> arguments = new HashMap<>();
// 声明队列的TTL
arguments.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
(二)消息设置 TTL
另一种方式便是针对每条消息设置 TTL
rabbitTemplate.convertAndSend("X", "XC", message, msg -> {msg.getMessageProperties().setExpiration(ttl);return msg;
});
(三)两者的区别
四、整合SpringBoot实现延迟队列
(一)创建项目


(二)添加依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!--RabbitMQ 依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--swagger--><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.9.2</version></dependency><!--RabbitMQ 测试依赖--><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency>
(三)修改配置文件
spring.rabbitmq.host=192.168.23.100
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
(四)添加Swagger配置类
@Configuration
@EnableSwagger2
public class SwaggerConfig {@Beanpublic Docket webApiConfig() {return new Docket(DocumentationType.SWAGGER_2).groupName("webapi").apiInfo(webApiInfo()).select().build();}public ApiInfo webApiInfo() {return new ApiInfoBuilder().title("rabbitmq 接口文档").description("本文档描述了 rabbitmq 微服务接口定义").version("1.0").contact(new Contact("enjoy6288", "http://atguigu.com","1551388580@qq.com")).build();}
}
五、队列TTL
(一)代码架构图
(二)配置文件类
@Configuration
public class TtlQueueConfig {public static final String X_EXCHANGE = "X";public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";public static final String Y_DEAD_LETTER_EXCHANGE = "Y";public static final String DEAD_LETTER_QUEUE = "QD";// 声明xExchange@Bean("xExchange")public DirectExchange xExchange() {return new DirectExchange(X_EXCHANGE);}// 声明yExchange@Bean("yExchange")public DirectExchange yExchange() {return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}// 声明队列A@Bean("queueA")public Queue queueA() {Map<String, Object> arguments = new HashMap<>();// 当前队列的死信交换机arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 当前队列的死信路由keyarguments.put("x-dead-letter-routing-key", "YD");// 声明队列的TTLarguments.put("x-message-ttl", 10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}// 声明队列A绑定交换机X@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange")DirectExchange xExchange) {return BindingBuilder.bind(queueA).to(xExchange).with("XA");}// 声明队列B@Bean("queueB")public Queue queueB() {Map<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 当前队列的死信路由keyarguments.put("x-dead-letter-routing-key", "YD");// 声明队列的TTLarguments.put("x-message-ttl", 40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}// 声明队列B绑定交换机X@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange")DirectExchange xExchange) {return BindingBuilder.bind(queueB).to(xExchange).with("XB");}// 声明死信队列@Bean("queueD")public Queue queueD() {return new Queue(DEAD_LETTER_QUEUE);}@Bean// 声明死信队列 QD 绑定关系public Binding queuedBindingY(@Qualifier("queueD")Queue queueD,@Qualifier("yExchange")DirectExchange exchange) {return BindingBuilder.bind(queueD).to(exchange).with("YD");}}
(三)消息生产者
@GetMapping("/sendMsg/{message}")public void sendMsg(@PathVariable String message) {log.info("当前时间是{},发送一条信息给两个 TTL 队列:{}", new Date().toString(), message);rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10s的队列" + message);rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40s的队列" + message);}
(四)消息消费者
@Component
@Slf4j
public class DeadLetterQueueConsumer {@RabbitListener(queues = "QD")public void receiveD(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);}
}
发起一个请求 http://localhost:8080/ttl/sendMsg/嘻嘻嘻
六、延迟队列优化
(一)代码架构图
在这里新增了一个队列 QC,绑定关系如下,该队列不设置 TTL 时间,而是由生产者设置过期时间

(二)配置文件类
@Configuration
public class TtlQueueConfig {public static final String X_EXCHANGE = "X";public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";public static final String Y_DEAD_LETTER_EXCHANGE = "Y";public static final String DEAD_LETTER_QUEUE = "QD";public static final String QUEUE_C = "QC";// 声明xExchange@Bean("xExchange")public DirectExchange xExchange() {return new DirectExchange(X_EXCHANGE);}// 声明yExchange@Bean("yExchange")public DirectExchange yExchange() {return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}// 声明队列A@Bean("queueA")public Queue queueA() {Map<String, Object> arguments = new HashMap<>();// 当前队列的死信交换机arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 当前队列的死信路由keyarguments.put("x-dead-letter-routing-key", "YD");// 声明队列的TTLarguments.put("x-message-ttl", 10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}// 声明队列A绑定交换机X@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange")DirectExchange xExchange) {return BindingBuilder.bind(queueA).to(xExchange).with("XA");}// 声明队列B@Bean("queueB")public Queue queueB() {Map<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 当前队列的死信路由keyarguments.put("x-dead-letter-routing-key", "YD");// 声明队列的TTLarguments.put("x-message-ttl", 40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}// 声明队列B绑定交换机X@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange")DirectExchange xExchange) {return BindingBuilder.bind(queueB).to(xExchange).with("XB");}// 声明队列C@Bean("queueC")public Queue queueC() {Map<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 当前队列的死信路由keyarguments.put("x-dead-letter-routing-key", "YD");return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();}// 声明队列C绑定交换机X@Beanpublic Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange")DirectExchange xExchange) {return BindingBuilder.bind(queueC).to(xExchange).with("XC");}// 声明死信队列@Bean("queueD")public Queue queueD() {return new Queue(DEAD_LETTER_QUEUE);}@Bean// 声明死信队列 QD 绑定关系public Binding queuedBindingY(@Qualifier("queueD")Queue queueD,@Qualifier("yExchange")DirectExchange exchange) {return BindingBuilder.bind(queueD).to(exchange).with("YD");}}
(三)消息生产者
@GetMapping("/sendExpirationMsg/{message}/{ttl}")public void sendMsg(@PathVariable String message, @PathVariable String ttl) {log.info("当前时间是{},发送一条过期信息给两个 TTL 队列:{}", new Date().toString(), message);rabbitTemplate.convertAndSend("X", "XC", message, msg -> {msg.getMessageProperties().setExpiration(ttl);return msg;});}
七、Rabbitmq 插件实现延迟队列
关于插件的安装可以查看这篇文章Docker安装RabbitMq延迟队列插件

(一)代码架构图
在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:

(二)配置文件类
(三)消息生产者
/** 基于插件的延迟队列和延迟交换机*/
@Configuration
public class DelayedQueueConfig {public static final String DELAYED_QUEUE_NAME = "delayed.queue";public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";// 声明队列@Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE_NAME);}// 声明自定义交换机@Beanpublic CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);}// 声明队列和延迟交换机的绑定@Beanpublic Binding bindingDelayedQueue(@Qualifier("delayedQueue")Queue delayedQueue,@Qualifier("delayedExchange")CustomExchange exchange) {return BindingBuilder.bind(delayedQueue).to(exchange).with(DELAYED_ROUTING_KEY).noargs();}
}
(四)消息消费者
@Component
@Slf4j
public class DelayedQueueConsumer {@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)public void receiveDelayedQueue(String message) {log.info("当前时间:{}, 接收到消息: {}", new Date().toString(), message);}
}
第二个消息被先消费掉了,符合预期
八、总结
相关文章:
RabbitMQ延迟队列
目录 一、概念 二、使用场景 三、RabbitMQ 中的 TTL (一)队列设置 TTL (二)消息设置 TTL (三)两者的区别 四、整合SpringBoot实现延迟队列 (一)创建项目 (二&am…...
Java中常用的七种队列你了解多少?
文章目录Java中常用的七种队列你了解多少?ArrayBlockingQueue队列如何使用?添加元素到队列获取队列中的元素遍历队列LinkedBlockingQueue队列如何使用?1. 创建SynchronousQueue对象2. 添加元素到队列3. 获取队列中的元素4. 遍历队列SynchronousQueue队列…...
<Java获取时间日期工具类>常见八种场景(一)
一:自定义时间日期工具类常用的八种方式(整理): 0,getTimeSecondNum:时间日期转成秒数,常用于大小比较 1,getLastYearMonthLastDay:获取去年当月最后一天的时间日期 2,getLastYearM…...
接上一篇 对多个模型环形旋转进行优化 指定旋转位置
using System.Collections; using System.Collections.Generic; using UnityEngine; using DG.Tweening; public class ModelAnimal : MonoBehaviour { //记录鼠标滑动 public Vector2 lastPos;//鼠标上次位置 Vector2 currPos;//鼠标当前位置 Vector2 offset;//两次位置的偏移…...
Unity中获取地形的法线
序之前,生成了地形图:(42条消息) 从灰度图到地形图_averagePerson的博客-CSDN博客那末,地形的法线贴图怎么获取?大概分为两个部分吧,先拿到法线数据,再画到纹理中去。关于法线计算Unity - Scripting API: M…...
模型解释性:PFI、PDP、ICE等包的用法
本篇主要介绍几种其他较常用的模型解释性方法。 1. Permutation Feature Importance(PFI) 1.1 算法原理 置换特征重要性(Permutation Feature Importance)的概念很简单,其衡量特征重要性的方法如下:计算特征改变后模型预测误差的增加。如果打乱该特征的…...
spring常见面试题(2023最新)
目录前言1.spring是什么2.spring的设计核心是什么3.IOC和AOP面试题4.spring的优点和缺点5.spring中bean的作用域6.spring中bean的注入方式7.BeanFactory 和 ApplicationContext有什么区别?8.循环依赖的情况,怎么解决?9.spring中单例Bean是线程…...
华为OD机试题,用 Java 解【压缩报文还原】问题
最近更新的博客 华为OD机试题,用 Java 解【停车场车辆统计】问题华为OD机试题,用 Java 解【字符串变换最小字符串】问题华为OD机试题,用 Java 解【计算最大乘积】问题华为OD机试题,用 Java 解【DNA 序列】问题华为OD机试 - 组成最大数(Java) | 机试题算法思路 【2023】使…...
机器学习-BM-FKNCN、BM-FKNN等分类器对比实验
目录 一、简介和环境准备 二、算法简介 2.1四种方法类: 2.1.1FKNN 2.1.2FKNCN 2.1.3BM-FKNN 2.1.3BM-FKNCN 2.2数据预处理 2.3输出视图 2.4调用各种方法看准确率 2.4.1BM-FKNCN 2.4.2BM-FKNN 2.4.3FKNCN 2.4.4FKNN 2.4.5KNN 一、简介和环境准备 k…...
ChatGPT火了,对话式人工智能还能干嘛?
身兼数职的ChatGPT 从2022火到了2023 连日来一直是各大平台的热议对象 其实除了写诗、敲代码、处理文档 以ChatGPT为代表的 对话式人工智能 还有更重要的工作要做 对话式AI与聊天机器人 相信大多数人…...
十一、操作数栈的特点(Operand Sstack)
1.每一个独立的栈帧中除了包含局部变量表以外,还包含一个后进先出的操作数栈,也可以称之为表达式栈。 2.操作数栈,在方法执行过程中,根据字节码指令,往栈中写入数据,或提取数据,即入栈ÿ…...
拆解瑞幸新用户激活流程,如何让用户“动”起来?
Aha时刻 一个产品的拉新环节,是多种方式并存的;新用户可能来自于商务搭建了新的渠道,运营策划了新的活动,企划发布了新的广告,销售谈下了新的客户,市场推广了新的群体,以及产品本身的口碑传播,功能更新带来的自然流量。 这是一个群策群力的环节,不同的团队背负不同的K…...
tkinter界面的TCP通信/开启线程等待接收数据
前言 用简洁的语言写一个可以与TCP客户端实时通信的界面。之前做了一个项目是要与PLC进行信息交互的界面,在测试的时候就利用TCP客户端来实验,文末会附上TCP客户端。本文分为三部分,第一部分是在界面向TCP发送数据,第二部分是接收…...
华为OD机试题,用 Java 解【任务混部】问题
最近更新的博客 华为OD机试题,用 Java 解【停车场车辆统计】问题华为OD机试题,用 Java 解【字符串变换最小字符串】问题华为OD机试题,用 Java 解【计算最大乘积】问题华为OD机试题,用 Java 解【DNA 序列】问题华为OD机试 - 组成最大数(Java) | 机试题算法思路 【2023】使…...
看linux内核启动流程需要的汇编指令解释
一、指令 0.MRS 和MSR MRS 指令: 对状态寄存器CPSR和SPSR进行读操作。 MSR指令: 对状态寄存器CPSR和SPSR进行写操作。 1.adrp adrp x0, boot_args把boot_args的页基地址提取出来,放到x0中。 2.stp stp x21, x1, [x0]将 x21, x1 的值存入 x0寄存器记录的地址中…...
【巨人的肩膀】JAVA面试总结(二)
1、💪 目录1、💪1.0、什么是面向对象1.1、JDK、JRE、JVM之间的区别1.2、什么是字节码1.3、hashCode()与equals()之间的联系1.4、String、StringBuffer、StringBuilder的区别1.5、和equals方法的区别1.6、重载和重写的区别1.7、List和Set的区别1.8、Array…...
【网络安全入门】零基础小白必看!!!
看到很多小伙伴都想学习 网络安全 ,让自己掌握更多的 技能,但是学习兴趣有了,却发现自己不知道哪里有 学习资源◇瞬间兴致全无!◇ 😄在线找人要资料太卑微,自己上网下载又发现要收费0 🙃差点当…...
字节前端经典面试题(附答案)
有哪些可能引起前端安全的问题? 跨站脚本 (Cross-Site Scripting, XSS): ⼀种代码注⼊⽅式, 为了与 CSS 区分所以被称作 XSS。早期常⻅于⽹络论坛, 起因是⽹站没有对⽤户的输⼊进⾏严格的限制, 使得攻击者可以将脚本上传到帖⼦让其他⼈浏览到有恶意脚本的⻚⾯, 其注⼊⽅式很简…...
数据库管理工具的使用
目录 摘要 一、Navicat是什么? 二、使用步骤 1.如何下载与安装 2.如何连接远程数据库 总结 摘要 本文主要介绍数据库管理工具的使用 一、Navicat是什么? 它是一款数据库管理工具,将此工具连接数据库,你可以从中看到各种数据库的详细…...
让马斯克反悔的毫米波雷达,被国产雷达头部厂商木牛科技迭代到了5D时代
近日,特斯拉或将在其HW4.0硬件系统配置一枚高精度4D毫米波雷达的消息在外网刷屏。据分析,“纯视觉”信仰者马斯克之所以做出这样的决定,一方面是减配了雷达的特斯拉自动驾驶,表现不尽如人意;另一方面也跟毫米波雷达的技…...
深入剖析AI大模型:大模型时代的 Prompt 工程全解析
今天聊的内容,我认为是AI开发里面非常重要的内容。它在AI开发里无处不在,当你对 AI 助手说 "用李白的风格写一首关于人工智能的诗",或者让翻译模型 "将这段合同翻译成商务日语" 时,输入的这句话就是 Prompt。…...
【磁盘】每天掌握一个Linux命令 - iostat
目录 【磁盘】每天掌握一个Linux命令 - iostat工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景 注意事项 【磁盘】每天掌握一个Linux命令 - iostat 工具概述 iostat(I/O Statistics)是Linux系统下用于监视系统输入输出设备和CPU使…...
【项目实战】通过多模态+LangGraph实现PPT生成助手
PPT自动生成系统 基于LangGraph的PPT自动生成系统,可以将Markdown文档自动转换为PPT演示文稿。 功能特点 Markdown解析:自动解析Markdown文档结构PPT模板分析:分析PPT模板的布局和风格智能布局决策:匹配内容与合适的PPT布局自动…...
【配置 YOLOX 用于按目录分类的图片数据集】
现在的图标点选越来越多,如何一步解决,采用 YOLOX 目标检测模式则可以轻松解决 要在 YOLOX 中使用按目录分类的图片数据集(每个目录代表一个类别,目录下是该类别的所有图片),你需要进行以下配置步骤&#x…...
C++ 求圆面积的程序(Program to find area of a circle)
给定半径r,求圆的面积。圆的面积应精确到小数点后5位。 例子: 输入:r 5 输出:78.53982 解释:由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982,因为我们只保留小数点后 5 位数字。 输…...
SiFli 52把Imagie图片,Font字体资源放在指定位置,编译成指定img.bin和font.bin的问题
分区配置 (ptab.json) img 属性介绍: img 属性指定分区存放的 image 名称,指定的 image 名称必须是当前工程生成的 binary 。 如果 binary 有多个文件,则以 proj_name:binary_name 格式指定文件名, proj_name 为工程 名&…...
Python Ovito统计金刚石结构数量
大家好,我是小马老师。 本文介绍python ovito方法统计金刚石结构的方法。 Ovito Identify diamond structure命令可以识别和统计金刚石结构,但是无法直接输出结构的变化情况。 本文使用python调用ovito包的方法,可以持续统计各步的金刚石结构,具体代码如下: from ovito…...
现有的 Redis 分布式锁库(如 Redisson)提供了哪些便利?
现有的 Redis 分布式锁库(如 Redisson)相比于开发者自己基于 Redis 命令(如 SETNX, EXPIRE, DEL)手动实现分布式锁,提供了巨大的便利性和健壮性。主要体现在以下几个方面: 原子性保证 (Atomicity)ÿ…...
【C++特殊工具与技术】优化内存分配(一):C++中的内存分配
目录 一、C 内存的基本概念 1.1 内存的物理与逻辑结构 1.2 C 程序的内存区域划分 二、栈内存分配 2.1 栈内存的特点 2.2 栈内存分配示例 三、堆内存分配 3.1 new和delete操作符 4.2 内存泄漏与悬空指针问题 4.3 new和delete的重载 四、智能指针…...
JS手写代码篇----使用Promise封装AJAX请求
15、使用Promise封装AJAX请求 promise就有reject和resolve了,就不必写成功和失败的回调函数了 const BASEURL ./手写ajax/test.jsonfunction promiseAjax() {return new Promise((resolve, reject) > {const xhr new XMLHttpRequest();xhr.open("get&quo…...

