RabbitMQ深入 —— 死信队列
前言
前面荔枝梳理了RabbitMQ中的普通队列、交换机以及相关的知识,在这篇文章中荔枝将会梳理RabbitMQ的一个重要的队列 —— 死信队列,主要了解消息流转到死信队列的三种的方式以及相应的实现demo。希望能帮助到有需要的小伙伴~~~
文章目录
前言
死信队列
1 基本概念
2 设置消息时间TTL过期的死信队列
3 队列达到最大长度发生死信
4 消息被拒引发死信
总结
死信队列
1 基本概念
死信就是无法被消费的消息,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理就变成了死信,有死信自然就有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息
消费发生异常时,将消息投入死信队列中。比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。
死信具有一定的延迟性,它可以作为延迟消息来处理。
死信出现的原因:
- 消息TTL过期
- 队列达到最大长度(队列满了,无法再添加数据到mq中)
- 消息被拒绝(basic.reject或basic.nack)并且requeue=false.I
2 设置消息时间TTL过期的死信队列
首先我们在消费者Consumer1中声明普通交换机、死信交换机、普通队列和死信队列之间的关系,同时在声明之后令Consumer1拒收消息,在RabbitMQ中观察消息生产者发出消息的流转情况。
设置死信队列的消费者1
在死信队列中我们设置了普通交换机、死信交换机、普通队列和死信队列。同时在正常队列中通过channel信道对象中的queueDeclare方法中的一个Map类型的参数,设置了死信交换机和普通交换机之间的关系,配置好TTL、RoutingKey并声明其死信交换机。
package com.crj.rabbitmq.deadQueue;import com.crj.rabbitmq.utils.RabbitMqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;/*** 死信队列* 消费者1:需要声明死信队列和普通队列*/
public class Consumer {//普通交换机名称public static final String NORMAL_EXCHANGE = "normal";//死信交换机名称public static final String DEAD_EXCHANGE = "dead";//普通队列的名称public static final String NORMAL_QUEUE = "normalQueue";//死信队列的名称public static final String DEAD_QUEUE = "deadQueue";public static void main(String[] args) throws Exception {//声明通道Channel channel = RabbitMqUtil.getChannel();//声明普通交换机和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);/*** 声明普通队列和死信队列*///创建一个hashmap对象来配置连接死信队列的参数Map<String, Object> arguments = new HashMap<>();//设置过期时间arguments.put("x-message-ttl",10000);//正常队列设置死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置死信RoutingKeyarguments.put("x-dead-letter-routing-key","dead1");//声明普通队列channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);//死信队列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//绑定队列和交换机channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");channel.queueBind(DEAD_QUEUE,DEAD_QUEUE,"dead");//接收消息DeliverCallback deliverCallback = (consumerTag, message)->{System.out.println("Consumer1接收到的信息:"+new String(message.getBody(),"UTF-8"));System.out.println("接收队列:"+DEAD_QUEUE+"接收键:"+message.getEnvelope().getRoutingKey());};//消费者开始消费消息channel.basicConsume(DEAD_QUEUE,true,deliverCallback,(consumerTag)->{});}
}
需要注意的是,这里在正常队列中设置过期时间TTL一般不太常用,我们通常会在publish处设置消息的TTL,因此这里arguments对象有关 "x-message-ttl" 参数的配置可以注释掉。
实际处理消息的消费者2
在处理死信队列消息的消费者处,我们只需要设置消费者接收消息是来自死信队列即可。
package com.crj.rabbitmq.deadQueue;import com.crj.rabbitmq.utils.RabbitMqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;/*** 死信队列* 消费者1:需要声明死信队列和普通队列*/
public class Consumer2 {//死信队列的名称public static final String DEAD_QUEUE = "deadQueue";public static void main(String[] args) throws Exception {//声明通道Channel channel = RabbitMqUtil.getChannel();System.out.println("等待接收消息");//接收消息DeliverCallback deliverCallback = (consumerTag, message)->{System.out.println("Consumer2接收到的信息:"+new String(message.getBody(),"UTF-8"));System.out.println("接收队列:"+DEAD_QUEUE+"接收键:"+message.getEnvelope().getRoutingKey());};//消费者开始消费消息channel.basicConsume(DEAD_QUEUE,true,deliverCallback,(consumerTag)->{});}
}
生产者
在这里我们借助AMQP. BasicProperties对象的build方法来设置相应的死信TTL。
package com.crj.rabbitmq.deadQueue;import com.crj.rabbitmq.utils.RabbitMqUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;public class Publish {public static final String NORMAL_EXCHANGE = "normal";public static final String NORMAL_QUEUE = "normalQueue";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//在Consumer已经声明过交换机了,所以在这里不能声明//死信消息,设置TTLAMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 0; i < 11; i++) {String message = "info"+i;channel.basicPublish(NORMAL_EXCHANGE,"normal",properties,message.getBytes());}}
}
未运行Consumer2前我们看到普通队列在我们设置的TTL:10s之后将消息流转到死信队列中。
最后启动Consumer2后确实也收到了死信队列中的消息
3 队列达到最大长度发生死信
在这一部分中我们需要注释掉之前在生产者中设置的消息的TTL,同时在消费者1中开启正常队列的最大消息堆积容量。
arguments.put("x-max-length",6);
这样子我们就可以模拟队列达到最大长度后产生死信的情况了。
4 消息被拒引发死信
要想开启消费者拒收消息的功能,首先需要在消息接收的basicConsumer方法中关闭自动应答,同时自行设置手动应答的逻辑。在下面接收消息的回调函数中,在basicAck中设置应答,在basicReject实现消息拒收。
package com.crj.rabbitmq.deadQueue;import com.crj.rabbitmq.utils.RabbitMqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;/*** 死信队列* 消费者1:需要声明死信队列和普通队列*/
public class Consumer {//普通交换机名称public static final String NORMAL_EXCHANGE = "normal";//死信交换机名称public static final String DEAD_EXCHANGE = "dead";//普通队列的名称public static final String NORMAL_QUEUE = "normalQueue";//死信队列的名称public static final String DEAD_QUEUE = "deadQueue";public static void main(String[] args) throws Exception {//声明通道Channel channel = RabbitMqUtil.getChannel();//声明普通交换机和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);/*** 声明普通队列和死信队列*///创建一个hashmap对象来配置连接死信队列的参数Map<String, Object> arguments = new HashMap<>();//正常队列设置死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置死信RoutingKeyarguments.put("x-dead-letter-routing-key","dead1");//声明普通队列channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);//死信队列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//绑定队列和交换机channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead1");System.out.println("等待接收消息》》》》》》》》》》》");//接收消息DeliverCallback deliverCallback = (consumerTag, message)->{String msg = new String(message.getBody(),"UTF-8");if (msg.equals("info5")){System.out.println("Consumer1接收的消息是:"+msg+":此消息是被拒绝的");//这里第二个参数设置了是否要将拒收的消息塞回原队列channel.basicReject(message.getEnvelope().getDeliveryTag(), false);}else {System.out.println("Consumer1接收到的信息:"+new String(message.getBody(),"UTF-8"));//成功应答,这里设置不批量操作channel.basicAck(message.getEnvelope().getDeliveryTag(), false);}};//开启手动应答//消费者开始消费消息channel.basicConsume(DEAD_QUEUE,false,deliverCallback,(consumerTag)->{});}
}
总结
时间过期、消息被拒、队列容量限制这三个机制会引发消息被转发死信队列,那么死信队列除了在这三种情况下继续保存消息之外,还有什么作用呢?下一篇文章荔枝会梳理延时队列,相信看完下一篇文章大家能有所收获~
今朝已然成为过去,明日依然向往未来!我是荔枝,在技术成长之路上与您相伴~~~
如果博文对您有帮助的话,可以给荔枝一键三连嘿,您的支持和鼓励是荔枝最大的动力!
如果博文内容有误,也欢迎各位大佬在下方评论区批评指正!!!
相关文章:

RabbitMQ深入 —— 死信队列
前言 前面荔枝梳理了RabbitMQ中的普通队列、交换机以及相关的知识,在这篇文章中荔枝将会梳理RabbitMQ的一个重要的队列 —— 死信队列,主要了解消息流转到死信队列的三种的方式以及相应的实现demo。希望能帮助到有需要的小伙伴~~~ 文章目录 前言 死信队…...
【React + Umi】自定义离开页面拦截弹框事件
在 react umi 中对离开页面的行为进行自定义弹窗拦截控制。以下为可选的方案分析。 wrapper 首先,因为项目框架是 umi,最先想到了 umi 路由的 wrapper 装饰器,但仔细一想又不太对, wrapper 争对于跳转到某个特定页面的前置行为…...

S1FD40A180H-ASEMI快恢复二极管S1FD40A180H
编辑:ll S1FD40A180H-ASEMI快恢复二极管S1FD40A180H 型号:S1FD40A180H 品牌:ASEMI 封装:TO-247 特性:大功率、快恢复二极管 正向电流:40A 反向耐压:1800V 恢复时间:<300n…...

网络编程 day1
1->x.mind网络编程基础 2->简述字节序的概念,并用共用体(联合体)的方式计算本机的字节序 1.字节序是指不同类型的CPU主机,内存存储多字节整数序列的方式 2.小端字节序:低序字节存储在低地址上 3.大端字节序&a…...

《深入PostgreSQL的存储引擎:原理与性能》
🌷🍁 博主猫头虎(🐅🐾)带您 Go to New World✨🍁 🐅🐾猫头虎建议程序员必备技术栈一览表📖: 🛠️ 全栈技术 Full Stack: 📚…...

python开发之个微群聊机器人的开发
简要描述: 退出群聊 请求URL: http://域名地址/quitChatRoom 请求方式: POST 请求头Headers: Content-Type:application/jsonAuthorization:login接口返回 参数: 参数名必选类型说明wI…...

【Redis7】--4.事务、管道、发布和订阅
文章目录 事务1.Redis事务2.Redis事务特性3.Redis事务命令3.1MULTI3.2EXEC3.3DISCARD3.4WATCH3.5UNWATCH 4.不保证原子性4.1"全体连坐"4.2"冤头债主" 5.事务执行流程 管道1.pipeline的使用2.pipeline小总结 发布和订阅1.常用命令1.1SUBSCRIBE1.2PUBLISH1.3…...
【Vue】el 和 data短小精湛的细节!
hello,我是小索奇,精心制作的Vue教程持续更新哈,花费了大量的时间和精力,总结拓展了很多疑难点,想要学习&巩固&避坑就一起学习叭~ el 与 data 的两种写法 el共有2种写法 el表达式主要用来在模板中展示数据,它…...

前端screenfull实现界面全屏展示功能
还是先引入依赖 我们要先执行 npm config set registry https://registry.npmjs.org/将本地npm registry地址设置为官方的npm registry地址 不然这个东西安装会有点问题 然后我们执行命令安装 npm install screenfull安装完之后 我们终端执行一下 npm config delete registr…...
Dockerfile 制作常用命令总结
1.FROM( from ) : FROM : from 表示选择一个镜像作为基础镜像(在一个Dockerfile 中可以使用多条from,来构建多个镜像) 2.ENV : 用来在镜像创建出的容器中声明环境变量,如: ENV PYTHONIOENCODINGutf-8 …...

uniapp项目实践总结(十七)实现滚动触底加载
导语:在日测的开发过程中,经常会碰到页面需要渲染大量数据的情况,这时候就需要用到滚动加载功能,下面总结一下方法。 目录 原理分析实战演练案例展示 原理分析 使用scrolltolower事件来监听滚动到底部,然后加载下一…...

SAP入门到放弃系列之QM质量检验流程概述
目录 一、流程概述二、操作步骤概述2.1 主数据维护2.2 业务操作 一、流程概述 质量检验流程-Inspection Process Flow,通常由于预先设定的一些规则条件自动触发或者手工触发,例如库存地之间的调拨、生产完工入库检验、采购入库的检验、客户交货前检验等等。另外还有…...

Ansys Zemax | 光学系统设计中如何使用玻璃替换方法来优化玻璃
在光学系统中选择最优玻璃材料时,Conrady d-D以及模型玻璃等传统的玻璃选择方法提供的帮助有限。本文介绍了如何使用玻璃替换方法进行直接玻璃优化,以及在考虑玻璃的可用性、成本及耐候性等因素时,如何进一步严格挑选玻璃。 简介 玻璃替换方法…...

springboot基础--实现默认登录页面
1、搭建项目 依赖中 多加入thymeleaf依赖 <dependencies><!--thymeleaf的依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-thymeleaf</artifactId></dependency><!--we…...

TDesign WXS语法
目录 一、输出函数返回值如何获取? 二、WXS语法 三、WXS案例 一、输出函数返回值如何获取? 写在js的方法中 wxml中{{方法名()}}输出: 发现不显示?? 所以不能使用这种方式!! 二、WXS语法 1.…...

Iterator设计模式
目录 1、示例 1.1 Aggregate接口 1.2 Iterator接口 1.3 Book类 1.4 BookShelf类 1.6 BookShelfIterator 类 1.7 Main类 2、解释Iterator模式中的角色 2.1 Iterator模式的存在意义 2.2 抽象类和接口 2.3 Aggregate 和 Iterator的对应 2.4 容易弄错"下一个"…...

ROS 入门
目录 简介 ROS诞生背景 ROS的设计目标 ROS与ROS2 安装ROS 1.配置ubuntu的软件和更新 2.设置安装源 3.设置key 4.安装 5.配置环境变量 安装可能出现的问题 安装构建依赖 卸载 ROS架构 1.设计者 2.维护者 3. 立足系统架构: ROS 可以划分为三层 ROS通信机制 话…...

第四章 Linux网络编程
ARP 协议 ARP 协议(Address Resolution Protocol)通过 IP 地址查找对应的 MAC 地址。 当一个主机需要发送数据给另一个主机时,它首先会检查本地的 ARP 缓存表(ARP cache)中是否存在目标主机的 MAC 地址。如果存在&…...

无涯教程-JavaScript - OFFSET函数
描述 OFFSET函数返回对范围的引用,该范围是一个单元格或单元格范围中指定的行数和列数。 返回的引用可以是单个单元格或单元格范围。您可以指定要返回的行数和列数。 语法 OFFSET (reference, rows, cols, [height], [width]) 争论 Argument描述Required/OptionalReferenc…...
rust切片
切片类型写为[T]。 切片是序列的一个片段。 它是动态大小类型,所以要使用切片类型,就必须使用它的指针类型。引用是最常用的指针类型。 [T; n]能隐式转换成[T]。 一、定义切片 (一)不可变切片 &[T],共享切片&…...
<6>-MySQL表的增删查改
目录 一,create(创建表) 二,retrieve(查询表) 1,select列 2,where条件 三,update(更新表) 四,delete(删除表…...
树莓派超全系列教程文档--(62)使用rpicam-app通过网络流式传输视频
使用rpicam-app通过网络流式传输视频 使用 rpicam-app 通过网络流式传输视频UDPTCPRTSPlibavGStreamerRTPlibcamerasrc GStreamer 元素 文章来源: http://raspberry.dns8844.cn/documentation 原文网址 使用 rpicam-app 通过网络流式传输视频 本节介绍来自 rpica…...
3403. 从盒子中找出字典序最大的字符串 I
3403. 从盒子中找出字典序最大的字符串 I 题目链接:3403. 从盒子中找出字典序最大的字符串 I 代码如下: class Solution { public:string answerString(string word, int numFriends) {if (numFriends 1) {return word;}string res;for (int i 0;i &…...

力扣热题100 k个一组反转链表题解
题目: 代码: func reverseKGroup(head *ListNode, k int) *ListNode {cur : headfor i : 0; i < k; i {if cur nil {return head}cur cur.Next}newHead : reverse(head, cur)head.Next reverseKGroup(cur, k)return newHead }func reverse(start, end *ListNode) *ListN…...

RSS 2025|从说明书学习复杂机器人操作任务:NUS邵林团队提出全新机器人装配技能学习框架Manual2Skill
视觉语言模型(Vision-Language Models, VLMs),为真实环境中的机器人操作任务提供了极具潜力的解决方案。 尽管 VLMs 取得了显著进展,机器人仍难以胜任复杂的长时程任务(如家具装配),主要受限于人…...

基于Java+VUE+MariaDB实现(Web)仿小米商城
仿小米商城 环境安装 nodejs maven JDK11 运行 mvn clean install -DskipTestscd adminmvn spring-boot:runcd ../webmvn spring-boot:runcd ../xiaomi-store-admin-vuenpm installnpm run servecd ../xiaomi-store-vuenpm installnpm run serve 注意:运行前…...

聚六亚甲基单胍盐酸盐市场深度解析:现状、挑战与机遇
根据 QYResearch 发布的市场报告显示,全球市场规模预计在 2031 年达到 9848 万美元,2025 - 2031 年期间年复合增长率(CAGR)为 3.7%。在竞争格局上,市场集中度较高,2024 年全球前十强厂商占据约 74.0% 的市场…...

【若依】框架项目部署笔记
参考【SpringBoot】【Vue】项目部署_no main manifest attribute, in springboot-0.0.1-sn-CSDN博客 多一个redis安装 准备工作: 压缩包下载:http://download.redis.io/releases 1. 上传压缩包,并进入压缩包所在目录,解压到目标…...
深入浅出WebGL:在浏览器中解锁3D世界的魔法钥匙
WebGL:在浏览器中解锁3D世界的魔法钥匙 引言:网页的边界正在消失 在数字化浪潮的推动下,网页早已不再是静态信息的展示窗口。如今,我们可以在浏览器中体验逼真的3D游戏、交互式数据可视化、虚拟实验室,甚至沉浸式的V…...

Redis上篇--知识点总结
Redis上篇–解析 本文大部分知识整理自网上,在正文结束后都会附上参考地址。如果想要深入或者详细学习可以通过文末链接跳转学习。 1. 基本介绍 Redis 是一个开源的、高性能的 内存键值数据库,Redis 的键值对中的 key 就是字符串对象,而 val…...