SpringBoot教程(十五) | SpringBoot集成RabbitMq(死信队列、延迟队列)
SpringBoot教程(十五) | SpringBoot集成RabbitMq(死信队列、延迟队列)
- (一)死信队列
- 使用场景
- 具体用法
- 前提
- 示例:
- (二)延迟队列
- 使用场景
- 方法一:通过死亡队列实现
- 方法二:通过延迟消息插件(rabbitmq_delayed_message_exchange)实现
(一)死信队列
死信队列是一个重要的概念,用于处理那些因各种原因无法被正常消费的消息。
它不是RabbitMQ直接提供的一个现成的方法或工具,而是通过特定的配置和机制来实现的。
使用场景
死信队列在多种场景下都非常有用,包括但不限于:
- 消息重试机制:当消息处理失败时,可以将其发送到死信队列进行重试。
- 异常消息处理:对于无法被正常处理的异常消息,可以将其存储在死信队列中,以便后续分析处理。
- 延迟消息处理:通过结合消息的TTL(Time-To-Live,生存时间)和死信队列,可以实现消息的延迟处理。
- 确保消息不丢失:在消息处理过程中,如果发生消费者崩溃或网络故障等情况,消息可能会丢失。通过死信队列,可以确保这些消息得到保留,并在系统恢复后重新处理。
具体用法
要在RabbitMQ中设置和使用死信队列,通常需要按照以下步骤进行:
- 定义死信交换机(DLX):首先,需要定义一个交换机作为死信交换机,它可以是任何类型的交换机(如direct、fanout、topic等)。
- 配置原队列:在声明原队列时,需要指定两个参数:x-dead-letter-exchange和x-dead-letter-routing-key。前者指定了当消息变成死信时应该发送到的交换机(即死信交换机),后者指定了发送到该交换机的路由键。
- 声明死信队列:接着,需要声明一个或多个死信队列,并将它们绑定到死信交换机上。这样,当死信消息被发送到死信交换机时,就可以根据路由键将其路由到相应的死信队列中。
- 处理死信消息:最后,需要编写消费者代码来监听死信队列中的消息,并对这些消息进行相应的处理。
前提
要想进入死信队列,得出现异常,出现异常后,会根据你的配置帮你放到死信队列中 所以异常不要被捕获。
如果实在要捕获的话,就得你在消费者这边去做“发送消息的”操作,自己把发送过来消息塞到死信队列中
示例:
消费者 mq的yml配置(重试机制)
spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestlistener:simple:# 重试机制retry:enabled: true #是否开启消费者重试
配置类:
package com.example.reactboot.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class DirectExchangeConfig {//===========================普通===========================//定义队列的名称常量public static final String DIRECT_QUEUE = "directQueue";public static final String DIRECT_QUEUE2 = "directQueue2";//定义直接交换机的名称常量public static final String DIRECT_EXCHANGE = "directExchange";//定义路由键常量,用于交换机和队列之间的绑定public static final String DIRECT_ROUTING_KEY = "direct";//定义路由键常量,用于交换机和队列之间的绑定public static final String DIRECT_ROUTING_KEY_2= "direct2";//定义队列,名称为DIRECT_QUEUE//为普通队列 绑设置 死信参数@Beanpublic Queue directQueue() {//return new Queue(DIRECT_QUEUE, true);Map<String, Object> args = new HashMap<>();// 设置死信交换机args.put("x-dead-letter-exchange", DLX_EXCHANGE);// 设置发送到死信交换机的路由键args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);// 创建队列,设置为持久化、非排他、非自动删除,并附带死信参数return new Queue(DIRECT_QUEUE, true, false, false, args);}//定义直接交换机@Beanpublic DirectExchange directExchange() {return new DirectExchange(DIRECT_EXCHANGE, true, false);}//定义队列,名称为DIRECT_QUEUE2@Beanpublic Queue directQueue2() {return new Queue(DIRECT_QUEUE2, true);}//定义一个绑定,将directQueue队列绑定到directExchange交换机上,//使用direct作为路由键@Beanpublic Binding bindingDirectExchange(Queue directQueue, DirectExchange directExchange) {return BindingBuilder.bind(directQueue).to(directExchange).with(DIRECT_ROUTING_KEY);}// 定义一个绑定Bean,将directQueue2队列也绑定到directExchange交换机上,@Beanpublic Binding bindingDirectExchange2(Queue directQueue2, DirectExchange directExchange) {return BindingBuilder.bind(directQueue2).to(directExchange).with(DIRECT_ROUTING_KEY_2);}//===========================死信===========================// 定义死信交换机的名称public static final String DLX_EXCHANGE = "dlx_exchange";// 定义发送到死信交换机的路由键public static final String DLX_ROUTING_KEY = "dlx.routing.key";// 定义死信队列的名称public static final String DLX_QUEUE = "dlx_queue";/*** 声明死信交换机,这里使用Direct类型。* @return 返回一个配置好的DirectExchange对象。*/@BeanDirectExchange dlxExchange() {// 创建并返回Direct类型的交换机return new DirectExchange(DLX_EXCHANGE,true, false);}/*** 声明死信队列。* @return 返回一个配置好的Queue对象,用作死信队列。*/@BeanQueue dlxQueue() {// 创建并返回死信队列,设置为持久化return new Queue(DLX_QUEUE, true);}/*** 绑定死信队列到死信交换机,使用指定的路由键。*/@BeanBinding binding(Queue dlxQueue,DirectExchange dlxExchange) {return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(DLX_ROUTING_KEY);}}
生产者发送消息:
package com.example.reactboot.controller;import com.example.reactboot.config.DirectExchangeConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;@RestController
public class RabbitMqTest {@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/sendMQ")public String sendMessage() {rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE,DirectExchangeConfig.DIRECT_ROUTING_KEY, "发送一条测试消息:direct");return "direct消息发送成功!!";}}
消费者消费消息:
package com.example.reactboot.queueListener;import com.example.reactboot.config.DirectExchangeConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;/*** @className: DirectQueueListener* @description: 直连交换机的监听器* @author: sh.Liu* @date: 2021-08-23 16:03*/
@Slf4j
@Component
public class DirectQueueListener {//监听普通队列@RabbitHandler@RabbitListener(queues = DirectExchangeConfig.DIRECT_QUEUE)public void process(String xx){SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("DirectReceiver消费者收到消息1 : " + xx + " 接收时间:" + sdf.format(new Date()) + "\n");//先执行业务代码int i = 1 / 0;}//监听死信队列@RabbitHandler@RabbitListener(queues = DirectExchangeConfig.DLX_QUEUE)public void process3(String testMessage) {System.out.println("死信得列里面的 : " + testMessage + "\n");}}
(二)延迟队列
延迟队列是一种特殊的消息队列,其内部消息是有序的,并且具有延时属性。
在RabbitMQ中,虽然AMQP协议本身没有直接支持延迟队列,但可以通过一些变通的方法(如使用死信队列配合消息的TTL属性,或者使用RabbitMQ的延迟消息插件)来实现延迟队列的功能。
使用场景
延迟队列在多种业务场景中都有广泛的应用,包括但不限于:
- 订单超时未支付自动取消:用户下单后,如果在规定时间内未完成支付,系统可以自动取消订单。
- 退款超时通知:用户申请退款后,如果长时间未得到处理,系统可以自动通知相关运营人员介入。
- 新用户注册后的引导邮件:用户注册账号后,系统可以在一段时间后发送欢迎邮件或引导邮件。
- 会议提醒:在预定的会议开始前一段时间,系统自动发送提醒给参会人员。
- 任务调度:在指定时间后执行某项任务,如定时清理日志、执行批处理任务等。
方法一:通过死亡队列实现
以下是使用死信队列配合TTL属性实现延迟队列的基本步骤:
- 定义死信交换机(DLX, Dead-Letter Exchange)和死信队列(DLQ, Dead-Letter Queue)
- 设置普通队列的TTL和死信交换机:在创建普通队列时,可以为其设置TTL属性,指定消息在该队列中的最大存活时间。同时,需要将该队列的死信交换机设置为前面定义的DLX,以便消息在过期后能够被发送到DLQ。
- 生产者发送消息:生产者将消息发送到普通队列,并指定消息的TTL。消息在队列中等待,直到TTL过期。
- 消息过期并发送到死信队列:当消息的TTL过期后,RabbitMQ会自动将该消息发送到其配置的死信交换机,再由死信交换机根据路由键将其发送到DLQ。
- 消费者从死信队列消费消息:消费者监听DLQ,当有新消息到达时,进行消费处理。
就是:把普通队列的消息设置存活时间,目前有两者方式:
1.在队列上面设置消息的过期时间
2.直接在消息上面设置过期时间。
方式一(队列上面设置消息过期时间):
上面的关于 死信示例 完全可以复用进行测试
在以下的方法里面多加一行 args.put(“x-message-ttl”, 10000);
//定义队列,名称为DIRECT_QUEUE//为普通队列 绑设置 死信参数@Beanpublic Queue directQueue() {//return new Queue(DIRECT_QUEUE, true);Map<String, Object> args = new HashMap<>();// 设置消息TTL为10秒args.put("x-message-ttl", 10000);// 设置死信交换机args.put("x-dead-letter-exchange", DLX_EXCHANGE);// 设置发送到死信交换机的路由键args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);// 创建队列,设置为持久化、非排他、非自动删除,并附带死信参数return new Queue(DIRECT_QUEUE, true, false, false, args);}
你可以把 DirectQueueListener 里面的 process 方法注释掉(以免被消费掉)。
再执行生产者的 sendMessage 方法。
这个时候你就可以看到下面关于 监听死信队列 的方法 ,等10秒后就会打印你发的消息了
方式二(消息上面设置过期时间):
上面的关于 死信示例 完全可以复用进行测试
改一下 这个 生产者发送消息:
package com.example.reactboot.controller;import com.example.reactboot.config.DirectExchangeConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;@RestController
public class RabbitMqTest {@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/sendMQ")public String sendMessage() {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE, DirectExchangeConfig.DIRECT_ROUTING_KEY, "发送一条测试消息:direct! "+sdf.format(new Date()), new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//设置过期时间,超过5秒消息就会消失message.getMessageProperties().setExpiration("5000");//设置编码格式message.getMessageProperties().setContentEncoding("UTF-8");return message;}}); return "direct消息发送成功!!";}}
你可以把 DirectQueueListener 里面的 process 方法注释掉(以免被消费掉)。
再执行生产者的 sendMessage 方法。
这个时候你就可以看到下面关于 监听死信队列 的方法 ,等5秒后就会打印你发的消息了
到这里其实就结束了,剩下的就是监听到死信队列里面的消息后的业务操作了
方法二:通过延迟消息插件(rabbitmq_delayed_message_exchange)实现
后续在说
相关文章:

SpringBoot教程(十五) | SpringBoot集成RabbitMq(死信队列、延迟队列)
SpringBoot教程(十五) | SpringBoot集成RabbitMq(死信队列、延迟队列) (一)死信队列使用场景具体用法前提示例: (二)延迟队列使用场景方法一:通过死亡队列实现方法二&…...

Dubbo依赖包
Dubbo 是一个高性能的 RPC 框架,用于构建分布式服务治理系统。要使用 Dubbo,项目中需要引入一些关键的依赖包。这些依赖包提供了 Dubbo 的核心功能、服务注册与发现、网络通信、序列化等能力。 一、Dubbo 核心依赖包 Dubbo 的核心依赖包包含了实现 RPC…...

webGIS后端程序员学习路线
webGIS后端程序员学习路线 1. GIS 基础知识 学习要点: 学习资源: 2. 后端编程基础 学习要点: 学习资源: 3. 地理数据库(Spatial Database) 学习要点: 学习资源: 4. 空间数…...

OpenCV绘图函数(15)图像上绘制矩形函数 rectangle()的使用
操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 绘制一个简单的、粗的或填充的直立矩形。 这个函数 cv::rectangle 绘制一个矩形轮廓或一个填充的矩形,其两个相对的顶点分别是 pt1 和…...

从零开始,认识游戏设计师(4)体验源于设计师②
认真并仔细地揣摩你的想法 了解自己的感受并不是一件简单的事情,作为设计师,我觉得比了解玩家总体感觉的技能更重要的是你能清楚知道描述自己感受。 试想一下,你是否能准确描述你喜欢什么,你讨厌什么,以及为什么这样…...

周末总结(2024/09/07)
工作 人际关系核心实践: 要学会随时回应别人的善意,执行时间控制在5分钟以内 坚持每天早会打招呼 遇到接不住的话题时拉低自己,抬高别人(无阴阳气息) 朋友圈点赞控制在5min以内,职场社交不要放在5min以外 职场的人际关系在面对利…...

MySQL数据库的SQL注入漏洞解析
说明:本文仅是用于学习分析自己搭建的SQL漏洞内容和原理,请勿用在非法途径上,违者后果自负,与笔者无关;本文开始前请认真详细学习《中华人民共和国网络安全法》及其相关法规内容【学法时习之丨网络安全在身边一图了解网络安全法_中央网络安全和信息化委员会办公室】 …...

Redis进阶(七):分布式锁
在分布式系统下,涉及到多个节点访问同一个公共资源的情况,此时需要通过 锁 进行互斥控制:避免出现 线程安全问题。 1.分布式锁的基本实现 超卖问题: 解决: 采用redis实现分布式锁 可用采取:在购票的时候࿰…...

Python 中考虑 concurrent.futures 实现真正的并行计算
Python 中考虑 concurrent.futures 实现真正的并行计算 思考,如何将代码所要执行的计算任务划分成多个独立的部分并在各自的核心上面平行地运行。 Python 的全局解释器锁(global interpreter lock,GIL)导致没办法用线程来实现真…...

【C++多线程编程】 线程安全与对象生命周期管理
目录 类的线程安全 实现线程安全 构造函数在多线程中的安全性 析构函数多线程环境的安全 智能指针实现多线程安全 shared_ptr 非完全线程安全 shared_ptr可能导致对象生命周期延长 const引用可以减少传递shared_ptr开销 shared_ptr 智能指针块模块的优点 析构所在线程…...

【系统架构设计师-2024年-上半年】综合知识-答案及详解
更多内容请见: 备考系统架构设计师-核心总结索引 文章目录 【第1题】【第2题】【第3题】【第4题】【第5题】【第6题】【第7题】【第8题】【第9题】【第10题】【第11题】【第12题】【第13题】【第14题】【第15题】【第16~17题】【第18~19题】【第20~21题】【第22题】【第23题】…...

MATLAB 中的对数计算
在 MATLAB 中,计算对数是进行数学分析和科学计算的常见需求。对数运算在数据分析、信号处理和控制系统中都有广泛应用。本篇博客将详细介绍如何在 MATLAB 中进行对数计算,包括自然对数、常用对数以及任意底数的对数。 1. 自然对数(以 e 为底…...

详解 HTTPS 与 TLS证书链校验
一文详解 HTTPS 与 TLS证书链校验_证书链怎么验证-CSDN博客 深入浅出 SSL/CA 证书及其相关证书文件(pem、crt、cer、key、csr) https://zhuanlan.zhihu.com/p/702745054...

新手做短视频素材在哪里找?做短视频素材工具教程网站有哪些?
本文将为你提供一系列新手友好的视频制作资源,包括素材网站和编辑工具,帮助你快速成为短视频领域的新星。让我们从国内知名的蛙学网开始介绍。 蛙学网:新手的视频素材天堂 对于短视频新手而言,蛙学网绝对是一个宝库。该网站提供了…...

【html】编辑器、基础、属性、标题、段落、格式化、 连接、头部、CSS、图像
目录 2.HTML编辑器 3.HTML基础 3.1 HTML标题 3.2 段落 4.HTML元素 4.1 元素语法 4.2 嵌套元素 4.3 HTML空元素 4.4 HTML提示,使用小写标签 5.HTML属性 5.1 属性实例 5.2 HTML 属性常用引用属性值 5.3 使用小写属性 5.4 HTML属性参考手册 6.HTML标题 6.1 HTML水…...

算法【洪水填充】
洪水填充是一种很简单的技巧,设置路径信息进行剪枝和统计,类似感染的过程。路径信息不撤销,来保证每一片的感染过程可以得到区分。看似是暴力递归过程,其实时间复杂度非常好,遍历次数和样本数量的规模一致。 下面通过…...

PostgreSQL的repmgr工具介绍
PostgreSQL的repmgr工具介绍 repmgr(Replication Manager)是一个专为 PostgreSQL 设计的开源工具,用于管理和监控 PostgreSQL 的流复制及实现高可用性。它提供了一组工具和实用程序,简化了 PostgreSQL 复制集群的配置、维护和故障…...

面试官:synchronized的锁升级过程是怎样的?
大家好,我是大明哥,一个专注「死磕 Java」系列创作的硬核程序员。 回答 在 JDK 1.6之前,synchronized 是一个重量级、效率比较低下的锁,但是在JDK 1.6后,JVM 为了提高锁的获取与释放效,,对 synchronized 进…...

Linux中的时间
1、date命令 参数作用参数作用参数作用%Y年xxxx%m月xx%d日xx%H小时(00~23)%M分钟(00~59)%S秒(00~59)%I小时(00~12)%t跳格[Tab键]%j今…...

用Boot写mybatis的增删改查
一、总览 项目结构: 图一 1、JavaBean文件 2、数据库操作 3、Java测试 4、SpringBoot启动类 5、SpringBoot数据库配置 二、配置数据库 在项目资源包中新建名为application.yml的文件,如图一。 建好文件我们就要开始写…...

电脑主机内存
在计算机的组成结构当中内存是非常重要的一部分,它用来存储程序和数据。对于计算机来说有了内存才能保证计算机的正常工作。 内部存储器就是我们所说的内存条,一般是用来即时存储数据。不做数据的长期保留。 外部存储器就是我们常说的固态或者硬盘。固态…...

文件操作与隐写
一、文件类型的识别 1、文件头完好情况: (1)file命令 使用file命令识别:识别出file.doc为jpg类型 (2)winhex 通过winhex工具查看文件头类型,根据文件头部内容去判断文件的类型 eg:JPG类型 &a…...

SQLException: No Suitable Driver Found - 完美解决方法详解
🚨 SQLException: No Suitable Driver Found - 完美解决方法详解 🚨 **🚨 SQLException: No Suitable Driver Found - 完美解决方法详解 🚨****摘要 📝****引言 🎯****正文 📚****1. 问题概述 ❗…...

pycharm破解教程
下载pycharm https://www.jetbrains.com/pycharm/download/other.html 破解网站 https://hardbin.com/ipfs/bafybeih65no5dklpqfe346wyeiak6wzemv5d7z2ya7nssdgwdz4xrmdu6i/ 点击下载破解程序 安装pycharm 自己选择安装路径 安装完成后运行破解程序 等到Done图标出现 选择Ac…...

如何使用 ef core 的 code first(fluent api)模式实现自定义类型转换器?
如何使用 ef core 的 code first 模式实现自定义类型转换器 前言 1. 项目结构2. 实现步骤2.1 定义转换器2.1.1 DateTime 转换器2.1.2 JsonDocument 转换器 2.2 创建实体类并配置数据结构类型2.3 定义 Utility 工具类2.4 配置 DbContext2.4.1 使用 EF Core 配置 DbContext 的两种…...

MapSet之相关概念
系列文章: 1. 先导片--Map&Set之二叉搜索树 2. Map&Set之相关概念 目录 1.搜索 1.1 概念和场景 1.2 模型 2.Map的使用 2.1 关于Map的说明 2.2 关于Map.Entry的说明 2.3 Map的常用方法说明 3.Set的说明 3.1关于Set说明 3.2 常见方法说明 1.搜…...

【大数据】浅谈Pyecharts:数据可视化的强大工具
文章目录 一、引言二、Pyecharts是什么三、Pyecharts的发展历程四、如何使用Pyecharts1. 安装Pyecharts2. 创建图表(1)导入Pyecharts模块:(2)创建图表实例:(3)添加数据:&…...

[深度学习][LLM]:浮点数怎么表示,什么是混合精度训练?
混合精度训练 混合精度训练1. 浮点表示法:[IEEE](https://zh.wikipedia.org/wiki/电气电子工程师协会)二进制浮点数算术标准(IEEE 754)1.1 浮点数剖析1.2 举例说明例子 1:例子 2: 1.3 浮点数比较1.4 浮点数的舍入 2. 混合精度训练2.1 为什么需…...

openssl双向认证自签名证书生成
编写配置文件openssl.cnf [ req ] distinguished_name req_distinguished_name req_extensions req_ext[ req_distinguished_name ] countryName Country Name (2 letter code) countryName_default US stateOrProvinceName State or Province Name…...

如何使用 Python 读取 Excel 文件:从零开始的超详细教程
“日出东海落西山 愁也一天 喜也一天 遇事不钻牛角尖” 文章目录 前言文章有误敬请斧正 不胜感恩!||Day03为什么要用 Python 读取 Excel 文件?准备工作:安装所需工具安装 Python安装 Pandas安装 openpyxl 使用 Pandas 读取 Excel 文件什么是 …...