SpringBoot教程(十五) | SpringBoot集成RabbitMq(死信队列、延迟队列)
SpringBoot教程(十五) | SpringBoot集成RabbitMq(死信队列、延迟队列)
- (一)死信队列
- 使用场景
- 具体用法
- 前提
- 示例:
- (二)延迟队列
- 使用场景
- 方法一:通过死亡队列实现
- 方法二:通过延迟消息插件(rabbitmq_delayed_message_exchange)实现
(一)死信队列
死信队列是一个重要的概念,用于处理那些因各种原因无法被正常消费的消息。
它不是RabbitMQ直接提供的一个现成的方法或工具,而是通过特定的配置和机制来实现的。
使用场景
死信队列在多种场景下都非常有用,包括但不限于:
- 消息重试机制:当消息处理失败时,可以将其发送到死信队列进行重试。
- 异常消息处理:对于无法被正常处理的异常消息,可以将其存储在死信队列中,以便后续分析处理。
- 延迟消息处理:通过结合消息的TTL(Time-To-Live,生存时间)和死信队列,可以实现消息的延迟处理。
- 确保消息不丢失:在消息处理过程中,如果发生消费者崩溃或网络故障等情况,消息可能会丢失。通过死信队列,可以确保这些消息得到保留,并在系统恢复后重新处理。
具体用法
要在RabbitMQ中设置和使用死信队列,通常需要按照以下步骤进行:
- 定义死信交换机(DLX):首先,需要定义一个交换机作为死信交换机,它可以是任何类型的交换机(如direct、fanout、topic等)。
- 配置原队列:在声明原队列时,需要指定两个参数:x-dead-letter-exchange和x-dead-letter-routing-key。前者指定了当消息变成死信时应该发送到的交换机(即死信交换机),后者指定了发送到该交换机的路由键。
- 声明死信队列:接着,需要声明一个或多个死信队列,并将它们绑定到死信交换机上。这样,当死信消息被发送到死信交换机时,就可以根据路由键将其路由到相应的死信队列中。
- 处理死信消息:最后,需要编写消费者代码来监听死信队列中的消息,并对这些消息进行相应的处理。
前提
要想进入死信队列,得出现异常,出现异常后,会根据你的配置帮你放到死信队列中 所以异常不要被捕获。
如果实在要捕获的话,就得你在消费者这边去做“发送消息的”操作,自己把发送过来消息塞到死信队列中
示例:
消费者 mq的yml配置(重试机制)
spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestlistener:simple:# 重试机制retry:enabled: true #是否开启消费者重试
配置类:
package com.example.reactboot.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class DirectExchangeConfig {//===========================普通===========================//定义队列的名称常量public static final String DIRECT_QUEUE = "directQueue";public static final String DIRECT_QUEUE2 = "directQueue2";//定义直接交换机的名称常量public static final String DIRECT_EXCHANGE = "directExchange";//定义路由键常量,用于交换机和队列之间的绑定public static final String DIRECT_ROUTING_KEY = "direct";//定义路由键常量,用于交换机和队列之间的绑定public static final String DIRECT_ROUTING_KEY_2= "direct2";//定义队列,名称为DIRECT_QUEUE//为普通队列 绑设置 死信参数@Beanpublic Queue directQueue() {//return new Queue(DIRECT_QUEUE, true);Map<String, Object> args = new HashMap<>();// 设置死信交换机args.put("x-dead-letter-exchange", DLX_EXCHANGE);// 设置发送到死信交换机的路由键args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);// 创建队列,设置为持久化、非排他、非自动删除,并附带死信参数return new Queue(DIRECT_QUEUE, true, false, false, args);}//定义直接交换机@Beanpublic DirectExchange directExchange() {return new DirectExchange(DIRECT_EXCHANGE, true, false);}//定义队列,名称为DIRECT_QUEUE2@Beanpublic Queue directQueue2() {return new Queue(DIRECT_QUEUE2, true);}//定义一个绑定,将directQueue队列绑定到directExchange交换机上,//使用direct作为路由键@Beanpublic Binding bindingDirectExchange(Queue directQueue, DirectExchange directExchange) {return BindingBuilder.bind(directQueue).to(directExchange).with(DIRECT_ROUTING_KEY);}// 定义一个绑定Bean,将directQueue2队列也绑定到directExchange交换机上,@Beanpublic Binding bindingDirectExchange2(Queue directQueue2, DirectExchange directExchange) {return BindingBuilder.bind(directQueue2).to(directExchange).with(DIRECT_ROUTING_KEY_2);}//===========================死信===========================// 定义死信交换机的名称public static final String DLX_EXCHANGE = "dlx_exchange";// 定义发送到死信交换机的路由键public static final String DLX_ROUTING_KEY = "dlx.routing.key";// 定义死信队列的名称public static final String DLX_QUEUE = "dlx_queue";/*** 声明死信交换机,这里使用Direct类型。* @return 返回一个配置好的DirectExchange对象。*/@BeanDirectExchange dlxExchange() {// 创建并返回Direct类型的交换机return new DirectExchange(DLX_EXCHANGE,true, false);}/*** 声明死信队列。* @return 返回一个配置好的Queue对象,用作死信队列。*/@BeanQueue dlxQueue() {// 创建并返回死信队列,设置为持久化return new Queue(DLX_QUEUE, true);}/*** 绑定死信队列到死信交换机,使用指定的路由键。*/@BeanBinding binding(Queue dlxQueue,DirectExchange dlxExchange) {return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(DLX_ROUTING_KEY);}}
生产者发送消息:
package com.example.reactboot.controller;import com.example.reactboot.config.DirectExchangeConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;@RestController
public class RabbitMqTest {@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/sendMQ")public String sendMessage() {rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE,DirectExchangeConfig.DIRECT_ROUTING_KEY, "发送一条测试消息:direct");return "direct消息发送成功!!";}}
消费者消费消息:
package com.example.reactboot.queueListener;import com.example.reactboot.config.DirectExchangeConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;/*** @className: DirectQueueListener* @description: 直连交换机的监听器* @author: sh.Liu* @date: 2021-08-23 16:03*/
@Slf4j
@Component
public class DirectQueueListener {//监听普通队列@RabbitHandler@RabbitListener(queues = DirectExchangeConfig.DIRECT_QUEUE)public void process(String xx){SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("DirectReceiver消费者收到消息1 : " + xx + " 接收时间:" + sdf.format(new Date()) + "\n");//先执行业务代码int i = 1 / 0;}//监听死信队列@RabbitHandler@RabbitListener(queues = DirectExchangeConfig.DLX_QUEUE)public void process3(String testMessage) {System.out.println("死信得列里面的 : " + testMessage + "\n");}}
(二)延迟队列
延迟队列是一种特殊的消息队列,其内部消息是有序的,并且具有延时属性。
在RabbitMQ中,虽然AMQP协议本身没有直接支持延迟队列,但可以通过一些变通的方法(如使用死信队列配合消息的TTL属性,或者使用RabbitMQ的延迟消息插件)来实现延迟队列的功能。
使用场景
延迟队列在多种业务场景中都有广泛的应用,包括但不限于:
- 订单超时未支付自动取消:用户下单后,如果在规定时间内未完成支付,系统可以自动取消订单。
- 退款超时通知:用户申请退款后,如果长时间未得到处理,系统可以自动通知相关运营人员介入。
- 新用户注册后的引导邮件:用户注册账号后,系统可以在一段时间后发送欢迎邮件或引导邮件。
- 会议提醒:在预定的会议开始前一段时间,系统自动发送提醒给参会人员。
- 任务调度:在指定时间后执行某项任务,如定时清理日志、执行批处理任务等。
方法一:通过死亡队列实现
以下是使用死信队列配合TTL属性实现延迟队列的基本步骤:
- 定义死信交换机(DLX, Dead-Letter Exchange)和死信队列(DLQ, Dead-Letter Queue)
- 设置普通队列的TTL和死信交换机:在创建普通队列时,可以为其设置TTL属性,指定消息在该队列中的最大存活时间。同时,需要将该队列的死信交换机设置为前面定义的DLX,以便消息在过期后能够被发送到DLQ。
- 生产者发送消息:生产者将消息发送到普通队列,并指定消息的TTL。消息在队列中等待,直到TTL过期。
- 消息过期并发送到死信队列:当消息的TTL过期后,RabbitMQ会自动将该消息发送到其配置的死信交换机,再由死信交换机根据路由键将其发送到DLQ。
- 消费者从死信队列消费消息:消费者监听DLQ,当有新消息到达时,进行消费处理。
就是:把普通队列的消息设置存活时间,目前有两者方式:
1.在队列上面设置消息的过期时间
2.直接在消息上面设置过期时间。
方式一(队列上面设置消息过期时间):
上面的关于 死信示例 完全可以复用进行测试
在以下的方法里面多加一行 args.put(“x-message-ttl”, 10000);
//定义队列,名称为DIRECT_QUEUE//为普通队列 绑设置 死信参数@Beanpublic Queue directQueue() {//return new Queue(DIRECT_QUEUE, true);Map<String, Object> args = new HashMap<>();// 设置消息TTL为10秒args.put("x-message-ttl", 10000);// 设置死信交换机args.put("x-dead-letter-exchange", DLX_EXCHANGE);// 设置发送到死信交换机的路由键args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);// 创建队列,设置为持久化、非排他、非自动删除,并附带死信参数return new Queue(DIRECT_QUEUE, true, false, false, args);}
你可以把 DirectQueueListener 里面的 process 方法注释掉(以免被消费掉)。
再执行生产者的 sendMessage 方法。
这个时候你就可以看到下面关于 监听死信队列 的方法 ,等10秒后就会打印你发的消息了
方式二(消息上面设置过期时间):
上面的关于 死信示例 完全可以复用进行测试
改一下 这个 生产者发送消息:
package com.example.reactboot.controller;import com.example.reactboot.config.DirectExchangeConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;@RestController
public class RabbitMqTest {@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/sendMQ")public String sendMessage() {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE, DirectExchangeConfig.DIRECT_ROUTING_KEY, "发送一条测试消息:direct! "+sdf.format(new Date()), new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//设置过期时间,超过5秒消息就会消失message.getMessageProperties().setExpiration("5000");//设置编码格式message.getMessageProperties().setContentEncoding("UTF-8");return message;}}); return "direct消息发送成功!!";}}
你可以把 DirectQueueListener 里面的 process 方法注释掉(以免被消费掉)。
再执行生产者的 sendMessage 方法。
这个时候你就可以看到下面关于 监听死信队列 的方法 ,等5秒后就会打印你发的消息了
到这里其实就结束了,剩下的就是监听到死信队列里面的消息后的业务操作了
方法二:通过延迟消息插件(rabbitmq_delayed_message_exchange)实现
后续在说
相关文章:
SpringBoot教程(十五) | SpringBoot集成RabbitMq(死信队列、延迟队列)
SpringBoot教程(十五) | SpringBoot集成RabbitMq(死信队列、延迟队列) (一)死信队列使用场景具体用法前提示例: (二)延迟队列使用场景方法一:通过死亡队列实现方法二&…...
Dubbo依赖包
Dubbo 是一个高性能的 RPC 框架,用于构建分布式服务治理系统。要使用 Dubbo,项目中需要引入一些关键的依赖包。这些依赖包提供了 Dubbo 的核心功能、服务注册与发现、网络通信、序列化等能力。 一、Dubbo 核心依赖包 Dubbo 的核心依赖包包含了实现 RPC…...
webGIS后端程序员学习路线
webGIS后端程序员学习路线 1. GIS 基础知识 学习要点: 学习资源: 2. 后端编程基础 学习要点: 学习资源: 3. 地理数据库(Spatial Database) 学习要点: 学习资源: 4. 空间数…...

OpenCV绘图函数(15)图像上绘制矩形函数 rectangle()的使用
操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 绘制一个简单的、粗的或填充的直立矩形。 这个函数 cv::rectangle 绘制一个矩形轮廓或一个填充的矩形,其两个相对的顶点分别是 pt1 和…...
从零开始,认识游戏设计师(4)体验源于设计师②
认真并仔细地揣摩你的想法 了解自己的感受并不是一件简单的事情,作为设计师,我觉得比了解玩家总体感觉的技能更重要的是你能清楚知道描述自己感受。 试想一下,你是否能准确描述你喜欢什么,你讨厌什么,以及为什么这样…...
周末总结(2024/09/07)
工作 人际关系核心实践: 要学会随时回应别人的善意,执行时间控制在5分钟以内 坚持每天早会打招呼 遇到接不住的话题时拉低自己,抬高别人(无阴阳气息) 朋友圈点赞控制在5min以内,职场社交不要放在5min以外 职场的人际关系在面对利…...

MySQL数据库的SQL注入漏洞解析
说明:本文仅是用于学习分析自己搭建的SQL漏洞内容和原理,请勿用在非法途径上,违者后果自负,与笔者无关;本文开始前请认真详细学习《中华人民共和国网络安全法》及其相关法规内容【学法时习之丨网络安全在身边一图了解网络安全法_中央网络安全和信息化委员会办公室】 …...

Redis进阶(七):分布式锁
在分布式系统下,涉及到多个节点访问同一个公共资源的情况,此时需要通过 锁 进行互斥控制:避免出现 线程安全问题。 1.分布式锁的基本实现 超卖问题: 解决: 采用redis实现分布式锁 可用采取:在购票的时候࿰…...
Python 中考虑 concurrent.futures 实现真正的并行计算
Python 中考虑 concurrent.futures 实现真正的并行计算 思考,如何将代码所要执行的计算任务划分成多个独立的部分并在各自的核心上面平行地运行。 Python 的全局解释器锁(global interpreter lock,GIL)导致没办法用线程来实现真…...

【C++多线程编程】 线程安全与对象生命周期管理
目录 类的线程安全 实现线程安全 构造函数在多线程中的安全性 析构函数多线程环境的安全 智能指针实现多线程安全 shared_ptr 非完全线程安全 shared_ptr可能导致对象生命周期延长 const引用可以减少传递shared_ptr开销 shared_ptr 智能指针块模块的优点 析构所在线程…...
【系统架构设计师-2024年-上半年】综合知识-答案及详解
更多内容请见: 备考系统架构设计师-核心总结索引 文章目录 【第1题】【第2题】【第3题】【第4题】【第5题】【第6题】【第7题】【第8题】【第9题】【第10题】【第11题】【第12题】【第13题】【第14题】【第15题】【第16~17题】【第18~19题】【第20~21题】【第22题】【第23题】…...
MATLAB 中的对数计算
在 MATLAB 中,计算对数是进行数学分析和科学计算的常见需求。对数运算在数据分析、信号处理和控制系统中都有广泛应用。本篇博客将详细介绍如何在 MATLAB 中进行对数计算,包括自然对数、常用对数以及任意底数的对数。 1. 自然对数(以 e 为底…...
详解 HTTPS 与 TLS证书链校验
一文详解 HTTPS 与 TLS证书链校验_证书链怎么验证-CSDN博客 深入浅出 SSL/CA 证书及其相关证书文件(pem、crt、cer、key、csr) https://zhuanlan.zhihu.com/p/702745054...

新手做短视频素材在哪里找?做短视频素材工具教程网站有哪些?
本文将为你提供一系列新手友好的视频制作资源,包括素材网站和编辑工具,帮助你快速成为短视频领域的新星。让我们从国内知名的蛙学网开始介绍。 蛙学网:新手的视频素材天堂 对于短视频新手而言,蛙学网绝对是一个宝库。该网站提供了…...
【html】编辑器、基础、属性、标题、段落、格式化、 连接、头部、CSS、图像
目录 2.HTML编辑器 3.HTML基础 3.1 HTML标题 3.2 段落 4.HTML元素 4.1 元素语法 4.2 嵌套元素 4.3 HTML空元素 4.4 HTML提示,使用小写标签 5.HTML属性 5.1 属性实例 5.2 HTML 属性常用引用属性值 5.3 使用小写属性 5.4 HTML属性参考手册 6.HTML标题 6.1 HTML水…...
算法【洪水填充】
洪水填充是一种很简单的技巧,设置路径信息进行剪枝和统计,类似感染的过程。路径信息不撤销,来保证每一片的感染过程可以得到区分。看似是暴力递归过程,其实时间复杂度非常好,遍历次数和样本数量的规模一致。 下面通过…...

PostgreSQL的repmgr工具介绍
PostgreSQL的repmgr工具介绍 repmgr(Replication Manager)是一个专为 PostgreSQL 设计的开源工具,用于管理和监控 PostgreSQL 的流复制及实现高可用性。它提供了一组工具和实用程序,简化了 PostgreSQL 复制集群的配置、维护和故障…...

面试官:synchronized的锁升级过程是怎样的?
大家好,我是大明哥,一个专注「死磕 Java」系列创作的硬核程序员。 回答 在 JDK 1.6之前,synchronized 是一个重量级、效率比较低下的锁,但是在JDK 1.6后,JVM 为了提高锁的获取与释放效,,对 synchronized 进…...
Linux中的时间
1、date命令 参数作用参数作用参数作用%Y年xxxx%m月xx%d日xx%H小时(00~23)%M分钟(00~59)%S秒(00~59)%I小时(00~12)%t跳格[Tab键]%j今…...

用Boot写mybatis的增删改查
一、总览 项目结构: 图一 1、JavaBean文件 2、数据库操作 3、Java测试 4、SpringBoot启动类 5、SpringBoot数据库配置 二、配置数据库 在项目资源包中新建名为application.yml的文件,如图一。 建好文件我们就要开始写…...
零门槛NAS搭建:WinNAS如何让普通电脑秒变私有云?
一、核心优势:专为Windows用户设计的极简NAS WinNAS由深圳耘想存储科技开发,是一款收费低廉但功能全面的Windows NAS工具,主打“无学习成本部署” 。与其他NAS软件相比,其优势在于: 无需硬件改造:将任意W…...

VB.net复制Ntag213卡写入UID
本示例使用的发卡器:https://item.taobao.com/item.htm?ftt&id615391857885 一、读取旧Ntag卡的UID和数据 Private Sub Button15_Click(sender As Object, e As EventArgs) Handles Button15.Click轻松读卡技术支持:网站:Dim i, j As IntegerDim cardidhex, …...
Golang dig框架与GraphQL的完美结合
将 Go 的 Dig 依赖注入框架与 GraphQL 结合使用,可以显著提升应用程序的可维护性、可测试性以及灵活性。 Dig 是一个强大的依赖注入容器,能够帮助开发者更好地管理复杂的依赖关系,而 GraphQL 则是一种用于 API 的查询语言,能够提…...
五年级数学知识边界总结思考-下册
目录 一、背景二、过程1.观察物体小学五年级下册“观察物体”知识点详解:由来、作用与意义**一、知识点核心内容****二、知识点的由来:从生活实践到数学抽象****三、知识的作用:解决实际问题的工具****四、学习的意义:培养核心素养…...

2021-03-15 iview一些问题
1.iview 在使用tree组件时,发现没有set类的方法,只有get,那么要改变tree值,只能遍历treeData,递归修改treeData的checked,发现无法更改,原因在于check模式下,子元素的勾选状态跟父节…...
MySQL中【正则表达式】用法
MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现(两者等价),用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例: 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...
汇编常见指令
汇编常见指令 一、数据传送指令 指令功能示例说明MOV数据传送MOV EAX, 10将立即数 10 送入 EAXMOV [EBX], EAX将 EAX 值存入 EBX 指向的内存LEA加载有效地址LEA EAX, [EBX4]将 EBX4 的地址存入 EAX(不访问内存)XCHG交换数据XCHG EAX, EBX交换 EAX 和 EB…...
Element Plus 表单(el-form)中关于正整数输入的校验规则
目录 1 单个正整数输入1.1 模板1.2 校验规则 2 两个正整数输入(联动)2.1 模板2.2 校验规则2.3 CSS 1 单个正整数输入 1.1 模板 <el-formref"formRef":model"formData":rules"formRules"label-width"150px"…...

如何应对敏捷转型中的团队阻力
应对敏捷转型中的团队阻力需要明确沟通敏捷转型目的、提升团队参与感、提供充分的培训与支持、逐步推进敏捷实践、建立清晰的奖励和反馈机制。其中,明确沟通敏捷转型目的尤为关键,团队成员只有清晰理解转型背后的原因和利益,才能降低对变化的…...
离线语音识别方案分析
随着人工智能技术的不断发展,语音识别技术也得到了广泛的应用,从智能家居到车载系统,语音识别正在改变我们与设备的交互方式。尤其是离线语音识别,由于其在没有网络连接的情况下仍然能提供稳定、准确的语音处理能力,广…...