RabbitMQ实现死信队列
目录
- 死信队列是什么
- 怎样实现一个死信队列
- 说明
- 实现过程
- 导入依赖
- 添加配置
- 编写mq配置类
- 添加业务队列的消费者
- 添加死信队列的消费者
- 添加消息发送者
- 添加消息测试类
- 测试
- 死信队列的应用场景
- 总结
死信队列是什么
“死信”是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:
- 消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。
- 消息在队列的存活时间超过设置的TTL时间。
- 消息队列的消息数量已经超过最大队列长度。
那么该消息将成为“死信”。
“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。
怎样实现一个死信队列
说明
配置死信队列大概可以分为三个步骤:
1.配置业务队列,绑定到业务交换机上
2.为业务队列配置死信交换机和路由key
3.为死信交换机配置死信队列
注意,并不是直接声明一个公共的死信队列,然后所以死信消息就自己跑到死信队列里去了。而是为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的路由key。
实现过程
导入依赖
<!--RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
添加配置
spring: #rabbitmqrabbitmq:host: 83.136.16.134password: guestusername: guestlistener:type: simplesimple:default-requeue-rejected: falseacknowledge-mode: manual
编写mq配置类
代码里面有详细说明,这里不在赘述。
package com.miaosha.study.mq;import com.sun.org.apache.regexp.internal.RE;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** @Author: laz* @CreateTime: 2023-02-27 09:16* @Version: 1.0*/
@Configuration
public class RabbitmqConfig {/*** 业务交换机*/public static final String BUSINESS_EXCHANGE_NAME = "business.exchange";/*** 业务队列a*/public static final String BUSINESS_QUEUEA_NAME = "business.queue.a";/*** 业务交换机b*/public static final String BUSINESS_QUEUEB_NAME = "business.queue.b";/*** 死信交换机*/public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange";/*** 死信队列a*/public static final String DEAD_LETTER_QUEUEA_NAME = "dead.letter.queue.a";/*** 死信队列b*/public static final String DEAD_LETTER_QUEUEB_NAME = "dead.letter.queue.b";/*** 死信队列路由键a*/public static final String DEAD_LETTER_QUEUEA_ROUNTING_KEY_NAME = "dead.letter.queue.a.rounting.key";/*** 死信队列路由键b*/public static final String DEAD_LETTER_QUEUEB_ROUNTING_KEY_NAME = "dead.letter.queue.b.rounting.key";/*** 申明业务交换机* @return*/@Beanpublic FanoutExchange businessExchange(){return new FanoutExchange(BUSINESS_EXCHANGE_NAME);}/*** 申明死信交换机* @return*/@Beanpublic DirectExchange deadletterExchange(){return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);}/*** 申明业务队列a* @return*/@Beanpublic Queue queuea(){Map<String,Object> map = new HashMap<>();//绑定死信交换机map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_NAME);//绑定的死信路由键map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEA_ROUNTING_KEY_NAME);return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(map).build();}/*** 申明业务队列b* @return*/@Beanpublic Queue queueb(){Map<String,Object> map = new HashMap<>();//绑定死信交换机map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_NAME);//绑定的死信路由键map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEB_ROUNTING_KEY_NAME);return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(map).build();}/*** 申明死信队列a* @return*/@Beanpublic Queue deadletterQueuea(){return new Queue(DEAD_LETTER_QUEUEA_NAME);}/*** 申明死信队列b* @return*/@Beanpublic Queue deadletterQueueb(){return new Queue(DEAD_LETTER_QUEUEB_NAME);}/*** 队列a绑定到业务交换机* @return*/@Beanpublic Binding businessBindinga(){return BindingBuilder.bind(queuea()).to(businessExchange());}/*** 队列b绑定到业务交换机* @return*/@Beanpublic Binding businessBindingb(){return BindingBuilder.bind(queueb()).to(businessExchange());}/*** 死信队列a绑定到死信交换机* @return*/@Beanpublic Binding deadletterBindinga(){return BindingBuilder.bind(deadletterQueuea()).to(deadletterExchange()).with(DEAD_LETTER_QUEUEA_ROUNTING_KEY_NAME);}/*** 死信队列b绑定到死信交换机* @return*/@Beanpublic Binding deadletterBindingB(){return BindingBuilder.bind(deadletterQueueb()).to(deadletterExchange()).with(DEAD_LETTER_QUEUEB_ROUNTING_KEY_NAME);}
}
添加业务队列的消费者
package com.miaosha.study.mq;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;import static com.miaosha.study.mq.RabbitmqConfig.BUSINESS_QUEUEA_NAME;
import static com.miaosha.study.mq.RabbitmqConfig.BUSINESS_QUEUEB_NAME;/*** @Author: laz* @CreateTime: 2023-02-27 09:53* @Version: 1.0*/
@Slf4j
@Component
public class RabbitmqReceiver {/*** 监听业务队列a* @param message*/@RabbitListener(queues = BUSINESS_QUEUEA_NAME)public void queuea(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());log.info("业务队列A接受到消息【{}】",msg);boolean ack = true;Exception exception = null;try {//这里模拟业务逻辑出现异常的情况if (msg.contains("fail")){throw new RuntimeException("dead letter exception");}} catch (Exception e){ack = false;exception = e;}//当ack为false时(业务逻辑出现异常),说明当前消息消费异常,这里直接放入死信队列if (!ack){log.error("业务队列A消费发生异常,error msg:{}", exception.getMessage());/*** void basicNack(long deliveryTag, boolean multiple, boolean requeue)* 参数一:当前消息的唯一id* 参数二:是否针对多条消息* 参数三:是否从新入队列*/channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);} else {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}/*** 监听业务队列b* @param msg*/@RabbitListener(queues = BUSINESS_QUEUEB_NAME)public void queueb(Message msg,Channel channel) throws Exception{String str = new String(msg.getBody());log.info("业务队列B接受到消息【{}】",str);channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);}
}
添加死信队列的消费者
package com.miaosha.study.mq;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;import static com.miaosha.study.mq.RabbitmqConfig.*;/*** @Author: laz* @CreateTime: 2023-02-27 09:58* @Version: 1.0*/
@Slf4j
@Component
public class DeadLetterReceiver {/*** 监听业务队列a* @param msg*/@RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)public void queuea(Message msg, Channel channel) throws IOException {String str = new String(msg.getBody());log.info("死信队列A接受到消息【{}】",str);channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);log.info("死信消息properties:{}", msg.getMessageProperties());}/*** 监听业务队列b* @param msg*/@RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)public void queueb(Message msg, Channel channel) throws IOException {String str = new String(msg.getBody());log.info("死信队列B接受到消息【{}】",str);channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);log.info("死信消息properties:{}", msg.getMessageProperties());}
}
添加消息发送者
package com.miaosha.study.mq;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import static com.miaosha.study.mq.RabbitmqConfig.BUSINESS_EXCHANGE_NAME;/*** @Author: laz* @CreateTime: 2023-02-27 09:49* @Version: 1.0*/
@Component
public class RabbitmqSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg(String msg){rabbitTemplate.convertAndSend(BUSINESS_EXCHANGE_NAME,"",msg);}
}
添加消息测试类
package com.miaosha.study.controller;import com.miaosha.study.mq.RabbitmqSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @Author: laz* @CreateTime: 2023-02-27 09:59* @Version: 1.0*/
@RestController
@RequestMapping("mq")
public class TestController {@Autowiredprivate RabbitmqSender rabbitmqSender;@RequestMapping("testDeadLetterQueue/{msg}")public void testDeadLetterQueue(@PathVariable("msg")String msg){rabbitmqSender.sendMsg(msg);}
}
测试
运行项目,访问:http://localhost:8081/mq/testDeadLetterQueue/msg

可以看到,此时只有业务消费者消费了消息,死信队列并没有消费到消息。
然后根据消费者里面的逻辑,我们发送一条 ‘fail’的消息,再次测试
访问:http://localhost:8081/mq/testDeadLetterQueue/fail

可以看到,死信队列a已收到消息。到此实现死信队列的流程就通了。
注意:我们的死信消息MessageProperties中的内容比较多,代表的含义分别是:
| 字段名 | 含义 |
|---|---|
x-first-death-exchange | 第一次被抛入的死信交换机的名称 |
x-first-death-reason | 第一次成为死信的原因,rejected:消息在重新进入队列时被队列拒绝,由于default-requeue-rejected 参数被设置为false。expired :消息过期。maxlen : 队列内消息数量超过队列最大容量 |
x-first-death-queue | 第一次成为死信前所在队列名称 |
x-death | 历次被投入死信交换机的信息列表,同一个消息每次进入一个死信交换机,这个数组的信息就会被更新 |
死信队列的应用场景
一般用在较为重要的业务队列中,确保未被正确消费的消息不被丢弃,一般发生消费异常可能原因主要有由于消息信息本身存在错误导致处理异常,处理过程中参数校验异常,或者因网络波动导致的查询异常等等,当发生异常时,当然不能每次通过日志来获取原消息,然后让运维帮忙重新投递消息。通过配置死信队列,可以让未正确处理的消息暂存到另一个队列中,待后续排查清楚问题后,编写相应的处理代码来处理死信消息,这样比手工恢复数据要好太多了。
总结
死信队列其实并没有什么神秘的地方,不过是绑定在死信交换机上的普通队列,而死信交换机也只是一个普通的交换机,不过是用来专门处理死信的交换机。
死信消息的生命周期:
- 业务消息被投入业务队列
- 消费者消费业务队列的消息,由于处理过程中发生异常,于是进行了nck或者reject操作
- 被nck或reject的消息由RabbitMQ投递到死信交换机中
- 死信交换机将消息投入相应的死信队列
- 死信队列的消费者消费死信消息
本篇文章到此结束!希望对您有所帮助。
相关文章:
RabbitMQ实现死信队列
目录死信队列是什么怎样实现一个死信队列说明实现过程导入依赖添加配置编写mq配置类添加业务队列的消费者添加死信队列的消费者添加消息发送者添加消息测试类测试死信队列的应用场景总结死信队列是什么 “死信”是RabbitMQ中的一种消息机制,当你在消费消息时&#…...
【Linux】安装Tomcat教程
目录 1.上传安装包 2.解压安装包 3.启动Tomcat 4.查看启动日志 5.查看进程 6.开放端口 7.停止Tomcat 1.上传安装包 使用FinalShell自带的上传工具将Tomcat的二进制发布包上传到Linux(与前面上传JDK安装包步骤 一致)。 2.解压安装包 将上传上来的安装包解压到指定目录…...
学习笔记之Vuex(五)
Vuex(五)Vuex一、什么是Vuex二、Vuex工作原理三、搭建Vuex环境四、求和案例分析4.1 求和案例——vue实现4.2 求和案例——vuex实现(五)Vuex 一、什么是Vuex 1.概念 在Vue中实现集中式状态(数据)管理的一…...
SSM知识快速复习
SSM知识快速复习SpringIOCDIIOC容器在Spring中的实现常用注解Autowired注解的原理AOP相关术语作用动态代理实现原理事务Transactional事务属性:只读事务属性:超时事务属性:回滚策略事务属性:事务隔离级别事务属性:事务…...
【Linux】安装MySQL
目录 1.检测当前系统是否安装过MySQL相关数据库 2. 卸载现有的MySQL数据库 3.上传解压 4.顺序安装rpm包 5.启动MySQL 6.查看临时密码 7.登录MySQL 8.开放端口 1.检测当前系统是否安装过MySQL相关数据库 需要通过rpm相关指令,来查询当前系统中是否存在已安…...
【深度学习】手把手教你开发自己的深度学习模板
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录前言1数据相关1.1 数据初探1.2.数据处理1.3 数据变形2 定义网络,优化函数3. 训练前言 入坑2年后,重新梳理之前的知识,发现其实需…...
一个诡异的 Pulsar InterruptedException 异常
背景 今天收到业务团队反馈线上有个应用往 Pulsar 中发送消息失败了,经过日志查看得知是发送消息时候抛出了 java.lang.InterruptedException 异常。 和业务沟通后得知是在一个 gRPC 接口中触发的消息发送,大约持续了半个小时的异常后便恢复正常了&…...
Java岗面试题--Java并发(volatile 专题)
目录1. 面试题一:谈谈 volatile 的使用及其原理补充:内存屏障volatile 的原理2. 面试题二:volatile 为什么不能保证原子性3. 面试题三:volatile 的内存语义4. 面试题四:volatile 的实现机制5. 面试题五:vol…...
Java---打家劫舍ⅠⅡ
目录 打家劫舍Ⅰ 题目分析 代码一 代码二 打家劫舍Ⅱ 打家劫舍Ⅰ 你是一个专业的小偷,计划偷窃沿街的房屋。每间房内都藏有一定的现金,影响你偷窃的唯一制约因素就是相邻的房屋装有相互连通的防盗系统,如果两间相邻的房屋在同一晚上被…...
MySQL Lesson4
1:关于查询结果集的去重(distinct) select distinct job from emp; **distinct只能出现在所有字段的最前面。所表示的含有是所有的结果联合起来去重。 select distinct deptno,job from emp order by deptno; select count(distinct job)from…...
浅谈权限获取方法之文件上传
概述 文件上传漏洞是发生在有上传功能的应用中,如果应用程序对用户的上传文件没有控制或者存在缺陷,攻击者可以利用应用上传功能存在的缺陷,上传木马、病毒等有危害的文件到服务器上面,控制服务器。 漏洞成因及危害 文件上传漏…...
资产设备防拆标签安全防护和资产定位解决方案
随着社会经济的发展和高新技术的日新月异,对各方面的安全要求也在不断地提高,以物联网安防、入侵报警和出入口控制、应急系统等为主的安全防范系统日益成为各类文物场所智能化弱电工程不可或缺的组成部分,是重点资产管理场所内加强管理和安全…...
企业电子招标采购源码之电子招标投标全流程!
随着各级政府部门的大力推进,以及国内互联网的建设,电子招投标已经逐渐成为国内主流的招标投标方式,但是依然有很多人对电子招投标的流程不够了解,在具体操作上存在困难。虽然各个交易平台的招标投标在线操作会略有不同࿰…...
【考研408】计算机网络笔记
文章目录计算机网络体系结构计算机网络概述计算机网络的组成计算机网络的功能计算机网络的分类计算机网络的性能指标课后习题计算机网络体系结构与参考模型计算机网络协议、接口、服务的概念ISO/OSI参考模型和TCP/IP模型课后习题物理层通信基础基本概念奈奎斯特定理与香农定理编…...
[C++]继承
🥁作者: 华丞臧 📕专栏:【C】 各位读者老爷如果觉得博主写的不错,请诸位多多支持(点赞收藏关注)。如果有错误的地方,欢迎在评论区指出。 推荐一款刷题网站 👉LeetCode 文章目录一、继承…...
优化知识管理方法丨整理零碎信息,提高数据价值
信息流时代,知识成集合倍数增长,看似我们学习了很多知识,但知识零碎无系统,知识之间缺乏联系,没有深度,所以虽然你很努力,但你发现自己的能力增长特别缓慢,你需要整理知识将零散的知…...
Windows操作系统的体系结构、运行环境和运行状态
我是荔园微风,作为一名在IT界整整25年的老兵,今天我们来重新审视一下Windows这个我们熟悉的不能再熟悉的系统。说Windows操作系统的运行环境和运行状态,首先要介绍一下Windows操作系统的体系结构,然后再要说到最重要的两个概念:核…...
【工作笔记】Http响应头过长
起因 突然有测试小伙伴反馈进公司官网主页会白屏,但只是个例不是普遍现象 查监控发现没监控到异常问题 查了很久(这个很久单指对于线上问题来说)才定位是请求的异常,因为这套系统的异常用的是 ExceptionHandler,这也导…...
hive建分区表,分桶表,内部表,外部表
hive建分区表,分桶表,内部表,外部表 一、概念介绍 Hive是基于Hadoop的一个工具,用来帮助不熟悉 MapReduce的人使用SQL对存储在Hadoop中的大规模数据进行数据提取、转化、加载。Hive数据仓库工具能将结构化的数据文件映射为一张数…...
【分享】灌溉制度设计小程序VB源代码
说明 根据作物需水特性和当地气候、土壤、农业技术及灌水技术等因素制定的灌水方案。主要内容包括灌水次数、灌水时间、灌水定额和灌溉定额。灌溉制度是规划、设计灌溉工程和进行灌区运行管理的基本资料,是编制和执行灌区用水计划的重要依据。 1—计划湿润土层允…...
C++:std::is_convertible
C++标志库中提供is_convertible,可以测试一种类型是否可以转换为另一只类型: template <class From, class To> struct is_convertible; 使用举例: #include <iostream> #include <string>using namespace std;struct A { }; struct B : A { };int main…...
基于Flask实现的医疗保险欺诈识别监测模型
基于Flask实现的医疗保险欺诈识别监测模型 项目截图 项目简介 社会医疗保险是国家通过立法形式强制实施,由雇主和个人按一定比例缴纳保险费,建立社会医疗保险基金,支付雇员医疗费用的一种医疗保险制度, 它是促进社会文明和进步的…...
【ROS】Nav2源码之nav2_behavior_tree-行为树节点列表
1、行为树节点分类 在 Nav2(Navigation2)的行为树框架中,行为树节点插件按照功能分为 Action(动作节点)、Condition(条件节点)、Control(控制节点) 和 Decorator(装饰节点) 四类。 1.1 动作节点 Action 执行具体的机器人操作或任务,直接与硬件、传感器或外部系统…...
Nginx server_name 配置说明
Nginx 是一个高性能的反向代理和负载均衡服务器,其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机(Virtual Host)。 1. 简介 Nginx 使用 server_name 指令来确定…...
OpenLayers 分屏对比(地图联动)
注:当前使用的是 ol 5.3.0 版本,天地图使用的key请到天地图官网申请,并替换为自己的key 地图分屏对比在WebGIS开发中是很常见的功能,和卷帘图层不一样的是,分屏对比是在各个地图中添加相同或者不同的图层进行对比查看。…...
Swagger和OpenApi的前世今生
Swagger与OpenAPI的关系演进是API标准化进程中的重要篇章,二者共同塑造了现代RESTful API的开发范式。 本期就扒一扒其技术演进的关键节点与核心逻辑: 🔄 一、起源与初创期:Swagger的诞生(2010-2014) 核心…...
Mobile ALOHA全身模仿学习
一、题目 Mobile ALOHA:通过低成本全身远程操作学习双手移动操作 传统模仿学习(Imitation Learning)缺点:聚焦与桌面操作,缺乏通用任务所需的移动性和灵活性 本论文优点:(1)在ALOHA…...
人机融合智能 | “人智交互”跨学科新领域
本文系统地提出基于“以人为中心AI(HCAI)”理念的人-人工智能交互(人智交互)这一跨学科新领域及框架,定义人智交互领域的理念、基本理论和关键问题、方法、开发流程和参与团队等,阐述提出人智交互新领域的意义。然后,提出人智交互研究的三种新范式取向以及它们的意义。最后,总结…...
DeepSeek源码深度解析 × 华为仓颉语言编程精粹——从MoE架构到全场景开发生态
前言 在人工智能技术飞速发展的今天,深度学习与大模型技术已成为推动行业变革的核心驱动力,而高效、灵活的开发工具与编程语言则为技术创新提供了重要支撑。本书以两大前沿技术领域为核心,系统性地呈现了两部深度技术著作的精华:…...
数据库正常,但后端收不到数据原因及解决
从代码和日志来看,后端SQL查询确实返回了数据,但最终user对象却为null。这表明查询结果没有正确映射到User对象上。 在前后端分离,并且ai辅助开发的时候,很容易出现前后端变量名不一致情况,还不报错,只是单…...
