RabbitMQ延迟队列
目录
一、概念
二、使用场景
三、RabbitMQ 中的 TTL
(一)队列设置 TTL
(二)消息设置 TTL
(三)两者的区别
四、整合SpringBoot实现延迟队列
(一)创建项目
(二)添加依赖
(三)修改配置文件
(四)添加Swagger配置类
五、队列TTL
(一)代码架构图
(二)配置文件类
(三)消息生产者
(四)消息消费者
六、延迟队列优化
(一)代码架构图
(二)配置文件类
(三)消息生产者
七、Rabbitmq 插件实现延迟队列
(一)代码架构图
(二)配置文件类
(三)消息生产者
(四)消息消费者
八、总结
一、概念
二、使用场景
- 订单在十分钟之内未支付则自动取消
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
- 用户注册成功后,如果三天内没有登陆则进行短信提醒。
- 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
- 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
三、RabbitMQ 中的 TTL
(一)队列设置 TTL
Map<String, Object> arguments = new HashMap<>();
// 声明队列的TTL
arguments.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
(二)消息设置 TTL
另一种方式便是针对每条消息设置 TTL
rabbitTemplate.convertAndSend("X", "XC", message, msg -> {msg.getMessageProperties().setExpiration(ttl);return msg;
});
(三)两者的区别
四、整合SpringBoot实现延迟队列
(一)创建项目
(二)添加依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!--RabbitMQ 依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--swagger--><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.9.2</version></dependency><!--RabbitMQ 测试依赖--><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency>
(三)修改配置文件
spring.rabbitmq.host=192.168.23.100
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
(四)添加Swagger配置类
@Configuration
@EnableSwagger2
public class SwaggerConfig {@Beanpublic Docket webApiConfig() {return new Docket(DocumentationType.SWAGGER_2).groupName("webapi").apiInfo(webApiInfo()).select().build();}public ApiInfo webApiInfo() {return new ApiInfoBuilder().title("rabbitmq 接口文档").description("本文档描述了 rabbitmq 微服务接口定义").version("1.0").contact(new Contact("enjoy6288", "http://atguigu.com","1551388580@qq.com")).build();}
}
五、队列TTL
(一)代码架构图

(二)配置文件类
@Configuration
public class TtlQueueConfig {public static final String X_EXCHANGE = "X";public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";public static final String Y_DEAD_LETTER_EXCHANGE = "Y";public static final String DEAD_LETTER_QUEUE = "QD";// 声明xExchange@Bean("xExchange")public DirectExchange xExchange() {return new DirectExchange(X_EXCHANGE);}// 声明yExchange@Bean("yExchange")public DirectExchange yExchange() {return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}// 声明队列A@Bean("queueA")public Queue queueA() {Map<String, Object> arguments = new HashMap<>();// 当前队列的死信交换机arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 当前队列的死信路由keyarguments.put("x-dead-letter-routing-key", "YD");// 声明队列的TTLarguments.put("x-message-ttl", 10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}// 声明队列A绑定交换机X@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange")DirectExchange xExchange) {return BindingBuilder.bind(queueA).to(xExchange).with("XA");}// 声明队列B@Bean("queueB")public Queue queueB() {Map<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 当前队列的死信路由keyarguments.put("x-dead-letter-routing-key", "YD");// 声明队列的TTLarguments.put("x-message-ttl", 40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}// 声明队列B绑定交换机X@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange")DirectExchange xExchange) {return BindingBuilder.bind(queueB).to(xExchange).with("XB");}// 声明死信队列@Bean("queueD")public Queue queueD() {return new Queue(DEAD_LETTER_QUEUE);}@Bean// 声明死信队列 QD 绑定关系public Binding queuedBindingY(@Qualifier("queueD")Queue queueD,@Qualifier("yExchange")DirectExchange exchange) {return BindingBuilder.bind(queueD).to(exchange).with("YD");}}
(三)消息生产者
@GetMapping("/sendMsg/{message}")public void sendMsg(@PathVariable String message) {log.info("当前时间是{},发送一条信息给两个 TTL 队列:{}", new Date().toString(), message);rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10s的队列" + message);rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40s的队列" + message);}
(四)消息消费者
@Component
@Slf4j
public class DeadLetterQueueConsumer {@RabbitListener(queues = "QD")public void receiveD(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);}
}
发起一个请求 http://localhost:8080/ttl/sendMsg/嘻嘻嘻
六、延迟队列优化
(一)代码架构图
在这里新增了一个队列 QC,绑定关系如下,该队列不设置 TTL 时间,而是由生产者设置过期时间
(二)配置文件类
@Configuration
public class TtlQueueConfig {public static final String X_EXCHANGE = "X";public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";public static final String Y_DEAD_LETTER_EXCHANGE = "Y";public static final String DEAD_LETTER_QUEUE = "QD";public static final String QUEUE_C = "QC";// 声明xExchange@Bean("xExchange")public DirectExchange xExchange() {return new DirectExchange(X_EXCHANGE);}// 声明yExchange@Bean("yExchange")public DirectExchange yExchange() {return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}// 声明队列A@Bean("queueA")public Queue queueA() {Map<String, Object> arguments = new HashMap<>();// 当前队列的死信交换机arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 当前队列的死信路由keyarguments.put("x-dead-letter-routing-key", "YD");// 声明队列的TTLarguments.put("x-message-ttl", 10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}// 声明队列A绑定交换机X@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange")DirectExchange xExchange) {return BindingBuilder.bind(queueA).to(xExchange).with("XA");}// 声明队列B@Bean("queueB")public Queue queueB() {Map<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 当前队列的死信路由keyarguments.put("x-dead-letter-routing-key", "YD");// 声明队列的TTLarguments.put("x-message-ttl", 40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}// 声明队列B绑定交换机X@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange")DirectExchange xExchange) {return BindingBuilder.bind(queueB).to(xExchange).with("XB");}// 声明队列C@Bean("queueC")public Queue queueC() {Map<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 当前队列的死信路由keyarguments.put("x-dead-letter-routing-key", "YD");return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();}// 声明队列C绑定交换机X@Beanpublic Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange")DirectExchange xExchange) {return BindingBuilder.bind(queueC).to(xExchange).with("XC");}// 声明死信队列@Bean("queueD")public Queue queueD() {return new Queue(DEAD_LETTER_QUEUE);}@Bean// 声明死信队列 QD 绑定关系public Binding queuedBindingY(@Qualifier("queueD")Queue queueD,@Qualifier("yExchange")DirectExchange exchange) {return BindingBuilder.bind(queueD).to(exchange).with("YD");}}
(三)消息生产者
@GetMapping("/sendExpirationMsg/{message}/{ttl}")public void sendMsg(@PathVariable String message, @PathVariable String ttl) {log.info("当前时间是{},发送一条过期信息给两个 TTL 队列:{}", new Date().toString(), message);rabbitTemplate.convertAndSend("X", "XC", message, msg -> {msg.getMessageProperties().setExpiration(ttl);return msg;});}
七、Rabbitmq 插件实现延迟队列
关于插件的安装可以查看这篇文章Docker安装RabbitMq延迟队列插件
(一)代码架构图
在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:
(二)配置文件类
(三)消息生产者
/** 基于插件的延迟队列和延迟交换机*/
@Configuration
public class DelayedQueueConfig {public static final String DELAYED_QUEUE_NAME = "delayed.queue";public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";// 声明队列@Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE_NAME);}// 声明自定义交换机@Beanpublic CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);}// 声明队列和延迟交换机的绑定@Beanpublic Binding bindingDelayedQueue(@Qualifier("delayedQueue")Queue delayedQueue,@Qualifier("delayedExchange")CustomExchange exchange) {return BindingBuilder.bind(delayedQueue).to(exchange).with(DELAYED_ROUTING_KEY).noargs();}
}
(四)消息消费者
@Component
@Slf4j
public class DelayedQueueConsumer {@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)public void receiveDelayedQueue(String message) {log.info("当前时间:{}, 接收到消息: {}", new Date().toString(), message);}
}

第二个消息被先消费掉了,符合预期
八、总结
相关文章:

RabbitMQ延迟队列
目录 一、概念 二、使用场景 三、RabbitMQ 中的 TTL (一)队列设置 TTL (二)消息设置 TTL (三)两者的区别 四、整合SpringBoot实现延迟队列 (一)创建项目 (二&am…...

Java中常用的七种队列你了解多少?
文章目录Java中常用的七种队列你了解多少?ArrayBlockingQueue队列如何使用?添加元素到队列获取队列中的元素遍历队列LinkedBlockingQueue队列如何使用?1. 创建SynchronousQueue对象2. 添加元素到队列3. 获取队列中的元素4. 遍历队列SynchronousQueue队列…...
<Java获取时间日期工具类>常见八种场景(一)
一:自定义时间日期工具类常用的八种方式(整理): 0,getTimeSecondNum:时间日期转成秒数,常用于大小比较 1,getLastYearMonthLastDay:获取去年当月最后一天的时间日期 2,getLastYearM…...
接上一篇 对多个模型环形旋转进行优化 指定旋转位置
using System.Collections; using System.Collections.Generic; using UnityEngine; using DG.Tweening; public class ModelAnimal : MonoBehaviour { //记录鼠标滑动 public Vector2 lastPos;//鼠标上次位置 Vector2 currPos;//鼠标当前位置 Vector2 offset;//两次位置的偏移…...

Unity中获取地形的法线
序之前,生成了地形图:(42条消息) 从灰度图到地形图_averagePerson的博客-CSDN博客那末,地形的法线贴图怎么获取?大概分为两个部分吧,先拿到法线数据,再画到纹理中去。关于法线计算Unity - Scripting API: M…...

模型解释性:PFI、PDP、ICE等包的用法
本篇主要介绍几种其他较常用的模型解释性方法。 1. Permutation Feature Importance(PFI) 1.1 算法原理 置换特征重要性(Permutation Feature Importance)的概念很简单,其衡量特征重要性的方法如下:计算特征改变后模型预测误差的增加。如果打乱该特征的…...
spring常见面试题(2023最新)
目录前言1.spring是什么2.spring的设计核心是什么3.IOC和AOP面试题4.spring的优点和缺点5.spring中bean的作用域6.spring中bean的注入方式7.BeanFactory 和 ApplicationContext有什么区别?8.循环依赖的情况,怎么解决?9.spring中单例Bean是线程…...

华为OD机试题,用 Java 解【压缩报文还原】问题
最近更新的博客 华为OD机试题,用 Java 解【停车场车辆统计】问题华为OD机试题,用 Java 解【字符串变换最小字符串】问题华为OD机试题,用 Java 解【计算最大乘积】问题华为OD机试题,用 Java 解【DNA 序列】问题华为OD机试 - 组成最大数(Java) | 机试题算法思路 【2023】使…...

机器学习-BM-FKNCN、BM-FKNN等分类器对比实验
目录 一、简介和环境准备 二、算法简介 2.1四种方法类: 2.1.1FKNN 2.1.2FKNCN 2.1.3BM-FKNN 2.1.3BM-FKNCN 2.2数据预处理 2.3输出视图 2.4调用各种方法看准确率 2.4.1BM-FKNCN 2.4.2BM-FKNN 2.4.3FKNCN 2.4.4FKNN 2.4.5KNN 一、简介和环境准备 k…...

ChatGPT火了,对话式人工智能还能干嘛?
身兼数职的ChatGPT 从2022火到了2023 连日来一直是各大平台的热议对象 其实除了写诗、敲代码、处理文档 以ChatGPT为代表的 对话式人工智能 还有更重要的工作要做 对话式AI与聊天机器人 相信大多数人…...
十一、操作数栈的特点(Operand Sstack)
1.每一个独立的栈帧中除了包含局部变量表以外,还包含一个后进先出的操作数栈,也可以称之为表达式栈。 2.操作数栈,在方法执行过程中,根据字节码指令,往栈中写入数据,或提取数据,即入栈ÿ…...
拆解瑞幸新用户激活流程,如何让用户“动”起来?
Aha时刻 一个产品的拉新环节,是多种方式并存的;新用户可能来自于商务搭建了新的渠道,运营策划了新的活动,企划发布了新的广告,销售谈下了新的客户,市场推广了新的群体,以及产品本身的口碑传播,功能更新带来的自然流量。 这是一个群策群力的环节,不同的团队背负不同的K…...

tkinter界面的TCP通信/开启线程等待接收数据
前言 用简洁的语言写一个可以与TCP客户端实时通信的界面。之前做了一个项目是要与PLC进行信息交互的界面,在测试的时候就利用TCP客户端来实验,文末会附上TCP客户端。本文分为三部分,第一部分是在界面向TCP发送数据,第二部分是接收…...

华为OD机试题,用 Java 解【任务混部】问题
最近更新的博客 华为OD机试题,用 Java 解【停车场车辆统计】问题华为OD机试题,用 Java 解【字符串变换最小字符串】问题华为OD机试题,用 Java 解【计算最大乘积】问题华为OD机试题,用 Java 解【DNA 序列】问题华为OD机试 - 组成最大数(Java) | 机试题算法思路 【2023】使…...
看linux内核启动流程需要的汇编指令解释
一、指令 0.MRS 和MSR MRS 指令: 对状态寄存器CPSR和SPSR进行读操作。 MSR指令: 对状态寄存器CPSR和SPSR进行写操作。 1.adrp adrp x0, boot_args把boot_args的页基地址提取出来,放到x0中。 2.stp stp x21, x1, [x0]将 x21, x1 的值存入 x0寄存器记录的地址中…...

【巨人的肩膀】JAVA面试总结(二)
1、💪 目录1、💪1.0、什么是面向对象1.1、JDK、JRE、JVM之间的区别1.2、什么是字节码1.3、hashCode()与equals()之间的联系1.4、String、StringBuffer、StringBuilder的区别1.5、和equals方法的区别1.6、重载和重写的区别1.7、List和Set的区别1.8、Array…...

【网络安全入门】零基础小白必看!!!
看到很多小伙伴都想学习 网络安全 ,让自己掌握更多的 技能,但是学习兴趣有了,却发现自己不知道哪里有 学习资源◇瞬间兴致全无!◇ 😄在线找人要资料太卑微,自己上网下载又发现要收费0 🙃差点当…...
字节前端经典面试题(附答案)
有哪些可能引起前端安全的问题? 跨站脚本 (Cross-Site Scripting, XSS): ⼀种代码注⼊⽅式, 为了与 CSS 区分所以被称作 XSS。早期常⻅于⽹络论坛, 起因是⽹站没有对⽤户的输⼊进⾏严格的限制, 使得攻击者可以将脚本上传到帖⼦让其他⼈浏览到有恶意脚本的⻚⾯, 其注⼊⽅式很简…...

数据库管理工具的使用
目录 摘要 一、Navicat是什么? 二、使用步骤 1.如何下载与安装 2.如何连接远程数据库 总结 摘要 本文主要介绍数据库管理工具的使用 一、Navicat是什么? 它是一款数据库管理工具,将此工具连接数据库,你可以从中看到各种数据库的详细…...

让马斯克反悔的毫米波雷达,被国产雷达头部厂商木牛科技迭代到了5D时代
近日,特斯拉或将在其HW4.0硬件系统配置一枚高精度4D毫米波雷达的消息在外网刷屏。据分析,“纯视觉”信仰者马斯克之所以做出这样的决定,一方面是减配了雷达的特斯拉自动驾驶,表现不尽如人意;另一方面也跟毫米波雷达的技…...
Java 语言特性(面试系列1)
一、面向对象编程 1. 封装(Encapsulation) 定义:将数据(属性)和操作数据的方法绑定在一起,通过访问控制符(private、protected、public)隐藏内部实现细节。示例: public …...

376. Wiggle Subsequence
376. Wiggle Subsequence 代码 class Solution { public:int wiggleMaxLength(vector<int>& nums) {int n nums.size();int res 1;int prediff 0;int curdiff 0;for(int i 0;i < n-1;i){curdiff nums[i1] - nums[i];if( (prediff > 0 && curdif…...

【配置 YOLOX 用于按目录分类的图片数据集】
现在的图标点选越来越多,如何一步解决,采用 YOLOX 目标检测模式则可以轻松解决 要在 YOLOX 中使用按目录分类的图片数据集(每个目录代表一个类别,目录下是该类别的所有图片),你需要进行以下配置步骤&#x…...

全志A40i android7.1 调试信息打印串口由uart0改为uart3
一,概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本:2014.07; Kernel版本:Linux-3.10; 二,Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01),并让boo…...
Spring是如何解决Bean的循环依赖:三级缓存机制
1、什么是 Bean 的循环依赖 在 Spring框架中,Bean 的循环依赖是指多个 Bean 之间互相持有对方引用,形成闭环依赖关系的现象。 多个 Bean 的依赖关系构成环形链路,例如: 双向依赖:Bean A 依赖 Bean B,同时 Bean B 也依赖 Bean A(A↔B)。链条循环: Bean A → Bean…...

uniapp 开发ios, xcode 提交app store connect 和 testflight内测
uniapp 中配置 配置manifest 文档:manifest.json 应用配置 | uni-app官网 hbuilderx中本地打包 下载IOS最新SDK 开发环境 | uni小程序SDK hbulderx 版本号:4.66 对应的sdk版本 4.66 两者必须一致 本地打包的资源导入到SDK 导入资源 | uni小程序SDK …...
「全栈技术解析」推客小程序系统开发:从架构设计到裂变增长的完整解决方案
在移动互联网营销竞争白热化的当下,推客小程序系统凭借其裂变传播、精准营销等特性,成为企业抢占市场的利器。本文将深度解析推客小程序系统开发的核心技术与实现路径,助力开发者打造具有市场竞争力的营销工具。 一、系统核心功能架构&…...

【Linux手册】探秘系统世界:从用户交互到硬件底层的全链路工作之旅
目录 前言 操作系统与驱动程序 是什么,为什么 怎么做 system call 用户操作接口 总结 前言 日常生活中,我们在使用电子设备时,我们所输入执行的每一条指令最终大多都会作用到硬件上,比如下载一款软件最终会下载到硬盘上&am…...

Kubernetes 节点自动伸缩(Cluster Autoscaler)原理与实践
在 Kubernetes 集群中,如何在保障应用高可用的同时有效地管理资源,一直是运维人员和开发者关注的重点。随着微服务架构的普及,集群内各个服务的负载波动日趋明显,传统的手动扩缩容方式已无法满足实时性和弹性需求。 Cluster Auto…...
深入浅出WebGL:在浏览器中解锁3D世界的魔法钥匙
WebGL:在浏览器中解锁3D世界的魔法钥匙 引言:网页的边界正在消失 在数字化浪潮的推动下,网页早已不再是静态信息的展示窗口。如今,我们可以在浏览器中体验逼真的3D游戏、交互式数据可视化、虚拟实验室,甚至沉浸式的V…...