【SpringBoot】SpringBoot整合RabbitMQ消息中间件,实现延迟队列和死信队列

📝个人主页:哈__
期待您的关注

目录
一、🔥死信队列
RabbitMQ的工作模式
死信队列的工作模式
二、🍉RabbitMQ相关的安装
三、🍎SpringBoot引入RabbitMQ
1.引入依赖
2.创建队列和交换器
2.1 变量声明
2.2 创建延迟交换器
2.3 创建延迟队列
2.4 延迟队列绑定延迟交换器
2.5 死信队列配置
3. 添加application.yml
4. 添加RabbitMQListener (消费者)
5. 创建DelayMessageSender
6. 创建Controller
7.测试
四、🍌死信队列的应用场景
一、🔥死信队列
RabbitMQ的死信队列(Dead Letter Queue,DLQ)是一种特殊的队列,用于接收其他队列中的“死信”消息。所谓“死信”,是指满足一定条件而无法被消费者正确处理的消息,这些条件包括消息被拒绝、消息过期、消息达到最大重试次数等。
当消息成为死信时,RabbitMQ会将其重新发送到指定的死信队列,而不是丢弃它们。这样做的好处是可以对死信进行分析和处理,例如记录日志、重新入队或者进一步处理。
死信队列通常与RabbitMQ的延迟队列(Delayed Message Queue)一起使用,通过延迟队列延迟消息的处理时间,可以更容易地触发消息成为死信的条件,从而进行测试和调试。
死信队列在消息中间件中有许多实际应用场景,主要用于处理无法被正常消费的消息,增强了消息的可靠性和处理能力。以下是一些常见的应用场景:
延迟消息处理:通过将消息发送到延迟队列,在指定的时间后再将消息发送到目标队列,实现延迟处理消息的功能。
消息重试:当消费者无法处理消息时,消息可以被重新发送到队列并设置重试次数,达到最大重试次数后转发到死信队列,以便进行进一步处理。
异常处理:当消息无法被消费者正常处理时(如格式错误、业务异常等),将消息转发到死信队列,用于记录日志、报警或人工处理。
消息超时处理:当消息在队列中等待时间过长时,可以设置消息的过期时间(TTL),超过时间后将消息转发到死信队列。
消息路由失败:当消息无法被正确路由到目标队列时,可以将消息发送到死信队列,避免消息丢失。
消息版本兼容性处理:当消息的格式或内容发生变化时,通过死信队列可以处理老版本消息,确保新版本系统的兼容性。
RabbitMQ的工作模式
死信队列的工作模式
今天我要实现的就是这个延迟队列和死信队列。生产者首先向延迟队列发送消息,待达到TTL后消息会被转送到死信队列当中,消费者会从死信队列中获取消息进行消费。
二、🍉RabbitMQ相关的安装
win10 安装rabbitMQ详细步骤_rabbitmq 安装-CSDN博客
我这里直接引用别人的文章了,下载需要大家去看一看。
RabbitMQ延迟插件的安装。
[超详细]RabbitMQ安装延迟消息插件_rabbitmq安装延迟插件-CSDN博客
三、🍎SpringBoot引入RabbitMQ
1.引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></dependency>
2.创建队列和交换器
这一步是很重要的,如果你配置错误了,消息很可能无法正确的传送。要实现延迟队列和死信队列,我们一共要创建以下几个组件:
- 延迟队列
- 延迟队列的交换器
- 死信队列
- 死信队列的交换器
在我们创建了这几个组件之后,我们还要干一些事情,我们需要把这些组件进行组装,如果你不了解RabbitMQ的基础,你可以先看看基础教学,我这里简单的说一下。RabbitMQ中有一种绑定方式,这种绑定方式会把BindingKey和RoutingKey完全匹配的进行绑定,如下图所示,生产者发送了一个BindingKey为“warning”的消息,那么这个消息就会被发送到Queue1和Queue2,这并不难理解。
我们要做的就是把队列和交换器通过一个RoutingKey绑定在一起。
2.1 变量声明
接下来的代码要好好看了,首先我们把我们后边要用到的名称变量全部定义出来。因为这个名称起的很长,我们不方便直接使用。创建DeadRabbitConfig。在类中定义如下变量,延迟队列交换器名称、延迟队列名称、延迟队列Routing名称。除此之外还有死信队列交换器名称、死信队列名称和死信Routing名称。
// 延迟队列交换器名称public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange";// 延迟队列A名称public static final String DELAY_QUEUE_A_NAME = "delay.queue.demo.business.queue_a";// 延迟队列B名称public static final String DELAY_QUEUE_B_NAME = "delay.queue.demo.business.queue_b";// 延迟队列routingA名称public static final String DELAY_QUEUE_ROUTING_A_NAME = "delay.queue.demo.business.queue_a.routing_key";// 延迟队列routingB名称public static final String DELAY_QUEUE_ROUTING_B_NAME = "delay.queue.demo.business.queue_b.routing_key";// 死信队列public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange";public static final String DEAD_LETTER_QUEUE_A_ROUTING_KEY = "delay.queue.demo.deadletter.delay_10s.routing_key";public static final String DEAD_LETTER_QUEUE_B_ROUTING_KEY = "delay.queue.demo.deadletter.delay_60s.routing_key";public static final String DEAD_LETTER_QUEUE_A_NAME = "delay.queue.demo.deadletter.queue_a";public static final String DEAD_LETTER_QUEUE_B_NAME = "delay.queue.demo.deadletter.queue_b";
2.2 创建延迟交换器
// 注册延迟交换器delayExchange@Bean("delayExchange")public DirectExchange delayExchange(){return new DirectExchange(DELAY_EXCHANGE_NAME);}
2.3 创建延迟队列
这里的延迟队列需要我们额外的配置一些参数,用于和死信队列进行信息发送。这里我是用了两种不同的方式构建延迟队列A和延迟队列B,在延迟队列A种我没有设置TTL参数,而是通过RabbitMQ的延迟插件实现的,而延迟队列B我设置了TTL为10000ms,也就是十秒,十秒内消息如果没有被消费掉就会发送到死信队列。
// 注册延迟队列A 还要绑定死信交换器和死信routingA@Bean("delayQueueA")public Queue delayQueueA(){Map<String,Object> args = new HashMap<>();args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);args.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUE_A_ROUTING_KEY);//args.put("x-message-ttl",6000);return QueueBuilder.durable(DELAY_QUEUE_A_NAME).withArguments(args).build();}// 注册延迟队列B 还要绑定死信交换器和死信routingB@Bean("delayQueueB")public Queue delayQueueB(){Map<String,Object> args = new HashMap<>();args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);args.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUE_B_ROUTING_KEY);args.put("x-message-ttl",10000);return QueueBuilder.durable(DELAY_QUEUE_B_NAME).withArguments(args).build();}
2.4 延迟队列绑定延迟交换器
// 延迟队列A绑定交换器@Beanpublic Binding delayQueueABinding(@Qualifier("delayQueueA") Queue queue, @Qualifier("delayExchange") DirectExchange delayExchange){return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_QUEUE_ROUTING_A_NAME);}// 延迟队列B绑定交换器@Beanpublic Binding delayQueueBBinding(@Qualifier("delayQueueB") Queue queue,@Qualifier("delayExchange") DirectExchange delayExchange){return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_QUEUE_ROUTING_B_NAME);}
2.5 死信队列配置
与延迟队列不同的是,死信队列并没有配置延迟参数。
// 注册死信队列A@Bean("deadLetterQueueA")public Queue deadLetterQueueA(){return new Queue(DEAD_LETTER_QUEUE_A_NAME);}// 注册死信队列B@Bean("deadLetterQueueB")public Queue deadLetterQueueB(){return new Queue(DEAD_LETTER_QUEUE_B_NAME);}// 注册死信交换器@Beanpublic DirectExchange deadLetterExchange(){return new DirectExchange(DEAD_LETTER_EXCHANGE);}// 死信队列A绑定死信交换器@Beanpublic Binding deadLetterQueueABinding(@Qualifier("deadLetterQueueA") Queue queue, @Qualifier("deadLetterExchange") DirectExchange deadLetterExchange){return BindingBuilder.bind(queue).to(deadLetterExchange).with(DEAD_LETTER_QUEUE_A_ROUTING_KEY);}// 死信队列B绑定死信交换器@Beanpublic Binding deadLetterQueueBBinding(@Qualifier("deadLetterQueueB") Queue queue, @Qualifier("deadLetterExchange")DirectExchange deadLetterExchange){return BindingBuilder.bind(queue).to(deadLetterExchange).with(DEAD_LETTER_QUEUE_B_ROUTING_KEY);}到此为止,RabbitMQ的组件配置完成。
3. 添加application.yml
server:port: 8081 spring:application:name: test-rabbitmq-producerrabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest
4. 添加RabbitMQListener (消费者)
下方的代码一共有两个消费者,一个消费者获取死信队列A中的消息,另一个消费者获取死信队列B中的消息。
@Component public class DeadLetterQueueConsumer {public static final Logger LOGGER = LoggerFactory.getLogger(DeadLetterQueueConsumer.class);@RabbitListener(queues = DeadRabbitConfig.DEAD_LETTER_QUEUE_A_NAME,ackMode = "MANUAL")public void receiveA(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());LOGGER.info("当前时间:{},死信队列A收到消息:{}", new Date().toString(), msg);System.out.println(message.getMessageProperties().getDeliveryTag());channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}@RabbitListener(queues = DeadRabbitConfig.DEAD_LETTER_QUEUE_B_NAME,ackMode = "MANUAL")public void receiveB(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());LOGGER.info("当前时间:{},死信队列B收到消息:{}", new Date().toString(), msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} }
5. 创建DelayMessageSender
这里采用的就是两种不同的方式,一种方式是使用插件来延迟消息的发送,另一种是通过TTL参数。
@Component public class DelayMessageSender {@ResourceRabbitTemplate rabbitTemplate;public void sendMessage(String msg,Integer delayTimes){switch (delayTimes){case 6:rabbitTemplate.convertAndSend(DeadRabbitConfig.DELAY_EXCHANGE_NAME, DeadRabbitConfig.DELAY_QUEUE_ROUTING_A_NAME,msg,new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration(String.valueOf(6000));return message;}});break;case 10:rabbitTemplate.convertAndSend(DeadRabbitConfig.DELAY_QUEUE_B_NAME,msg);break;}} }
6. 创建Controller
@RestController @RequestMapping("/student") public class StudentController {@AutowiredDelayMessageSender messageSender;@RequestMapping("/send-message")public String sendMessage(String msg,Integer delayTimes){System.out.println(new Date());messageSender.sendMessage(msg,delayTimes);return "发送成功";} }
7.测试
在浏览器中输入以下地址进入RabbitMQ界面。账号密码都是guest。
http://localhost:15672/先来看看我们的初始队列。这里是什么都没有的。
然后我们启动项目后在看。我们刚才创建出来的四个队列全部都被加载了出来。
使用PostMan发送一次请求。
我们的请求在17s的时候发送到后端,消息打印在23s,说明我们的延迟队列有效果。
接下来我们测试10s的延迟队列。
10s后死信队列B成功的接收到了消息。
四、🍌死信队列的应用场景
延迟队列通常用于需要延迟执行某些任务或触发某些事件的场景。例如,在电子商务中,可以使用延迟队列实现订单超时未支付自动取消功能。
1.订单创建:
- 用户下单后,系统生成订单,并将订单信息发送到一个普通队列,同时设置一个TTL(Time-To-Live)为30分钟。
- 这个队列配置了死信交换机(Dead Letter Exchange, DLX),当消息过期后会被转发到死信队列。
2.等待支付:
- 在30分钟内,用户可以完成支付。如果用户在30分钟内支付完成,系统会从普通队列中移除对应的消息并正常处理订单。
3.订单超时处理:
- 如果用户未在30分钟内完成支付,消息会自动过期并转发到死信交换机,进而转发到死信队列。
4.取消订单:
- 系统有一个专门的消费者监听死信队列。当有消息进入死信队列时,消费者会自动处理这些消息,即取消订单、释放库存,并通知用户订单已取消。
5.定时任务(可选):
- 虽然死信队列已经提供了超时订单的处理,但为了防止消息丢失或处理延迟,可以设置一个定时任务定期检查订单状态,确保所有超时未支付的订单都得到了处理。
相关文章:
【SpringBoot】SpringBoot整合RabbitMQ消息中间件,实现延迟队列和死信队列
📝个人主页:哈__ 期待您的关注 目录 一、🔥死信队列 RabbitMQ的工作模式 死信队列的工作模式 二、🍉RabbitMQ相关的安装 三、🍎SpringBoot引入RabbitMQ 1.引入依赖 2.创建队列和交换器 2.1 变量声明 2.2 创建…...
kafka消息积压处理方案
背景: 某值班的一天,生产出现消息积压问题,对此类的问题做出快速应对方案来避免同类型问题,防止影响范围进一步的扩大。 出现消费积压后如何处理: 首先优先处理消息积压,如果代码逻辑问题,立…...
【vscode-快捷键 一键JSON格式化】
网上有很多JSON格式化工具,也有很多好用的在线json格式化工具。但是其实Vscode里面的可以直接格式化JSON,这里分享一个我常用的小插件 Prettify JSON 未格式化的JSON数据 召唤出命令行,输入prettify JSON 即可! ✿✿ヽ(▽)ノ✿...
什么是 Spring Boot 的起步依赖和自动配置?它们的作用是什么?
Spring Boot 的起步依赖和自动配置是 Spring Boot 框架的两个核心特性,它们的作用主要是简化了 Spring Boot 项目的搭建和配置过程。 起步依赖(Starter Dependencies):起步依赖是一种预先定义好的依赖关系集合,它包含…...
rk3568 norflash+pcei nvme 配置
文章目录 rk3568 norflashpcei nvme 配置1,添加parameter_nor.txt文件2 修改编译规则3 修改uboot4 修改BoardConfig.mk5 修改kernel pcei配置6 编译7 烧录 rk3568 norflashpcei nvme 配置 1,添加parameter_nor.txt文件 device/rockchip/rk356x/rk3568_…...
【Vue】面经基础版-首页请求渲染
步骤分析 1.安装axios 2.看接口文档,确认请求方式,请求地址,请求参数 3.created中发送请求,获取数据,存储到data中 4.页面动态渲染 代码实现 1.安装axios yarn add axios npm i axios 2.接口文档 请求地址: …...
OBS+nginx+nginx-http-flv-module实现阿里云的推流和拉流
背景:需要将球机视频推送到阿里云nginx,使用网页和移动端进行播放,以前视频格式为RTMP,但是在网页上面播放RTMP格式需要安装flash插件,chrome浏览器不给安装,调研后发现可以使用nginx的模块nginx-http-flv-…...
ch1计算机网络和因特网
*1.1 什么是因特网 因特网是一个世界范围的计算机网络,即一个互联了遍及全世界的数十亿计算设备的网络。 具体构成: 主机hosts或端系统end-systems:数以亿计的计算设备互连,例如 主机-PCs(计算机), workstations(工作站), servers(服务器)端系统-PDAs,phones(…...
Web前端安全测试:深入剖析与实战策略
Web前端安全测试:深入剖析与实战策略 在数字化时代,Web前端作为用户与互联网服务交互的直接窗口,其安全性至关重要。然而,随着技术的不断进步,前端安全面临的威胁也日益复杂和多样化。因此,进行Web前端安全…...
Java学习-JDBC(一)
JDBC 概念 JDBC(Java Database Connectivity)Java数据库连接JDBC提供了一组独立于任何数据库管理系统的APIJava提供接口规范,由各个数据库厂商提供接口的实现,厂商提供的实现类封装成jar文件,也就是我们俗称的数据库驱动jar包JDBC充分体现了…...
异步复位和同步释放
文章目录 前言一、为什么需要复位呢?二、同步复位1. 同步复位定义2. 同步复位的实现3. 同步复位的优点和缺点同步复位优点同步复位缺点 三、异步复位1. 异步复位定义2. 异步复位的实现3. 异步复位的优点和缺点异步复位优点异步复位缺点 四、异步复位同步释放1. reco…...
03-3.2.4 双端队列
👋 Hi, I’m Beast Cheng👀 I’m interested in photography, hiking, landscape…🌱 I’m currently learning python, javascript, kotlin…📫 How to reach me --> 458290771qq.com 喜欢《数据结构》部分笔记的小伙伴可以订…...
SpringBoot的Mapper文件什么时候需要使用@Param注解
解决:nested exception is org.apache.ibatis.binding.BindingException: Parameter ‘XXX‘ not found 关于加注解,其他博客说的很清楚!但是有的人会遇见明明使用的springboot2.x以上版本,仍然提示需要加注解!这是为…...
2024.6.8
2024.6.8 **每日一题** 3040.相同分数的最大操作数目 Ⅱ,通多题意可知,该题最多有三种操作分数,分别是前两个,最后两个,以及第一个和最后一个的和。从这里也可以看出一共有三种状态转移方式,所以我们可以利…...
室内外融合定位是如何做到成为定位领域的新宠
在信息化高速发展的今天,定位技术已成为人们生活和工作中不可或缺的一部分。随着物联网、智慧城市等领域的蓬勃发展,传统的单一定位方式已无法满足复杂多变的环境需求。在这样的背景下,室内外融合定位技术应运而生,以其独特的优势…...
【刷题篇】分治-归并排序
文章目录 1、排序数组2、交易逆序对的总数3、计算右侧小于当前元素的个数4、翻转对 1、排序数组 给你一个整数数组 nums,请你将该数组升序排列。 class Solution { public:vector<int> tmp;void mergeSort(vector<int>& nums,int left,int right){…...
【经验】Ubuntu上离线安装VsCode插件浏览Linux kernel源码
1、下载VsCode离线安装包 1.1 下载 下载地址:https://marketplace.visualstudio.com/vscode 本人安装的插件: C/C++ checkpatch Chinese clangd kconfig Makefile Tools Perl Perl Toolbox注意:C/C++插件要安装Linux 64版本 1.2 安装 将离线安装包拷贝到Ubuntu中,执…...
鼠标侧键映射虚拟桌面切换 —— Win11
鼠标侧键映射虚拟桌面切换 —— Win11 基于 AutoHotkey 实现功能 下载软件 AutoHotkey建议安装在默认路径下(C盘) 此软件非常小,几乎不占用资源软件安装在默认路径以外的位置可能导致部分功能不可用 新建一个 .ahk 文件使用记事本打开该 .a…...
2024全国大学生数据统计与分析竞赛B题【电信银行卡诈骗的数据分析】思路详解
电信诈骗是指通过电话、网络和短信方式,编造虚假信息,设置骗局,对受害人实施远程、非接触式诈骗,诱使受害人打款或转账的犯罪行为,通常以冒充他人及仿冒、伪造各种合法外衣和形式的方式达到欺骗的目的,如冒…...
鸿蒙emitter 订阅事件封装 EmitterUtils
适用于api11 和api12 废话不多说,直接上代码 import emitter from ohos.events.emitter; import { StringUtils } from ohos/flutter_ohos;export class EmitterUtils{/*** 发射字符串类型的* param eventId* param data*/public static sendEvent(eventId:stri…...
德国工业4.0工程师指南:从系统融合到职业发展
1. 项目概述:为什么德国是工业工程师的理想目的地?如果你是一名工业、自动化或机器人领域的工程师,正在寻找一个能将你的技术抱负与前沿产业实践深度结合的职业舞台,那么德国很可能就是你一直在寻找的答案。这不仅仅是因为德国拥有…...
AI小白必看:打好基础再冲大模型,收藏这份学习路线图!
本文针对想学习AI的学生,强调掌握基础的重要性,避免直接进入大模型学习。文章提出应先理解AI的核心是让机器从数据中学习规律,并掌握数学、编程和数据思维能力。建议从数据处理开始,熟悉Python及常用库,逐步学习机器学…...
3PEAK思瑞浦 TP2262-SR SOP8 运算放大器
特性 供电电压:3V至36V 低供电电流:每通道700uA 轨到轨输出 带宽:4MHz 斜率:15V/us 优异的EMI抑制性能 偏移电压:最大3毫伏 偏移电压温度漂移:2V/C 低噪声:1kHz时30nV/vHz 工作温度范围:-40C至125C...
MODLR Studio光标操作插件开发:提升数据建模效率的交互优化实践
1. 项目概述与核心价值 最近在数据建模和可视化领域,一个名为 MODLR-Studio/modlr_cursor_ops 的项目引起了我的注意。乍一看这个标题,可能有些朋友会感到困惑:“MODLR”是什么?“Cursor Ops”又是指什么操作?这其实…...
文献阅读 260511-Wildfire damages and the cost-effective role of forest fuel treatments
Wildfire damages and the cost-effective role of forest fuel treatments 来自 <https://www.science.org/doi/10.1126/science.aea6463> ## Abstract: Gave the core question: Wildfires are among the most pressing environmental challenges of the 21st century,…...
瑞芯微刷机工具(RKDevTool)/瑞芯微刷机驱动(DriverAssitant)_多个版本下载及教程分享
瑞芯微刷机工具(RKDevTool)/瑞芯微刷机驱动(DriverAssitant)_多个版本下载及教程分享 适合(处理器是RK字母开头的芯片),比如RK3128、RK3188、RK3229、RK3288、RK3368、RK3328、RK3399、RK3528、RK3568、RK3566、RK3588等等瑞芯微芯…...
科研人狂喜!AI生成的位图可以转矢量图了
今天给大家分享我最近挖到的宝藏科研工具:MedPeer「图片创作」——国内领先的垂直领域AI科研绘图工具,刚好解决我们科研人最头疼的几个痛点。尤其是它的人工绘图转换服务,简直是帮我解决了大麻烦,必须给大家捋捋明白。我们科研人绘…...
ncmdump终极指南:快速解密网易云音乐NCM格式文件
ncmdump终极指南:快速解密网易云音乐NCM格式文件 【免费下载链接】ncmdump 项目地址: https://gitcode.com/gh_mirrors/ncmd/ncmdump 你是否曾经从网易云音乐下载了喜爱的歌曲,却发现它们只能在特定客户端播放?这就是NCM格式加密带来…...
MySQL 如何正确实现“随机采样”
在开发英语学习或社交应用时,随机展示单词或消息是一个高频需求。然而,看似简单的“随机”逻辑,如果实现方式不当,会随着数据量的增长演变为系统瓶颈 。 1. 性能陷阱:order by rand() 最直观的写法是 select word from…...
YOLOv5 COCO数据集 实战训练全流程解析 | 【从零到一】
1. 环境准备:从零搭建YOLOv5训练环境 第一次接触YOLOv5时,我最头疼的就是环境配置。记得当时为了一个CUDA版本问题折腾了整整两天,现在回想起来其实只要按步骤来就能避免90%的坑。下面是我总结的最稳环境搭建方案: 首先确保你的机…...








