当前位置: 首页 > news >正文

RabbitMQ深入 —— 死信队列

前言

        前面荔枝梳理了RabbitMQ中的普通队列、交换机以及相关的知识,在这篇文章中荔枝将会梳理RabbitMQ的一个重要的队列 —— 死信队列,主要了解消息流转到死信队列的三种的方式以及相应的实现demo。希望能帮助到有需要的小伙伴~~~


文章目录

前言

死信队列

1 基本概念 

2 设置消息时间TTL过期的死信队列

3 队列达到最大长度发生死信 

4 消息被拒引发死信

总结


死信队列

1 基本概念 

     死信就是无法被消费的消息,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理就变成了死信,有死信自然就有了死信队列。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息
消费发生异常时,将消息投入死信队列中。比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

死信具有一定的延迟性,它可以作为延迟消息来处理。

死信出现的原因:

  • 消息TTL过期
  • 队列达到最大长度(队列满了,无法再添加数据到mq中)
  • 消息被拒绝(basic.reject或basic.nack)并且requeue=false.I 

2 设置消息时间TTL过期的死信队列

首先我们在消费者Consumer1中声明普通交换机、死信交换机、普通队列和死信队列之间的关系,同时在声明之后令Consumer1拒收消息,在RabbitMQ中观察消息生产者发出消息的流转情况。

设置死信队列的消费者1

        在死信队列中我们设置了普通交换机、死信交换机、普通队列和死信队列。同时在正常队列中通过channel信道对象中的queueDeclare方法中的一个Map类型的参数,设置了死信交换机和普通交换机之间的关系,配置好TTL、RoutingKey并声明其死信交换机。

package com.crj.rabbitmq.deadQueue;import com.crj.rabbitmq.utils.RabbitMqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;/*** 死信队列* 消费者1:需要声明死信队列和普通队列*/
public class Consumer {//普通交换机名称public static final String NORMAL_EXCHANGE = "normal";//死信交换机名称public static final String DEAD_EXCHANGE = "dead";//普通队列的名称public static final String NORMAL_QUEUE = "normalQueue";//死信队列的名称public static final String DEAD_QUEUE = "deadQueue";public static void main(String[] args) throws Exception {//声明通道Channel channel = RabbitMqUtil.getChannel();//声明普通交换机和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);/*** 声明普通队列和死信队列*///创建一个hashmap对象来配置连接死信队列的参数Map<String, Object> arguments = new HashMap<>();//设置过期时间arguments.put("x-message-ttl",10000);//正常队列设置死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置死信RoutingKeyarguments.put("x-dead-letter-routing-key","dead1");//声明普通队列channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);//死信队列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//绑定队列和交换机channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");channel.queueBind(DEAD_QUEUE,DEAD_QUEUE,"dead");//接收消息DeliverCallback deliverCallback = (consumerTag, message)->{System.out.println("Consumer1接收到的信息:"+new String(message.getBody(),"UTF-8"));System.out.println("接收队列:"+DEAD_QUEUE+"接收键:"+message.getEnvelope().getRoutingKey());};//消费者开始消费消息channel.basicConsume(DEAD_QUEUE,true,deliverCallback,(consumerTag)->{});}
}

需要注意的是,这里在正常队列中设置过期时间TTL一般不太常用,我们通常会在publish处设置消息的TTL,因此这里arguments对象有关 "x-message-ttl" 参数的配置可以注释掉。

实际处理消息的消费者2

在处理死信队列消息的消费者处,我们只需要设置消费者接收消息是来自死信队列即可。 

package com.crj.rabbitmq.deadQueue;import com.crj.rabbitmq.utils.RabbitMqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;/*** 死信队列* 消费者1:需要声明死信队列和普通队列*/
public class Consumer2 {//死信队列的名称public static final String DEAD_QUEUE = "deadQueue";public static void main(String[] args) throws Exception {//声明通道Channel channel = RabbitMqUtil.getChannel();System.out.println("等待接收消息");//接收消息DeliverCallback deliverCallback = (consumerTag, message)->{System.out.println("Consumer2接收到的信息:"+new String(message.getBody(),"UTF-8"));System.out.println("接收队列:"+DEAD_QUEUE+"接收键:"+message.getEnvelope().getRoutingKey());};//消费者开始消费消息channel.basicConsume(DEAD_QUEUE,true,deliverCallback,(consumerTag)->{});}
}

​​​​生产者

在这里我们借助AMQP. BasicProperties对象的build方法来设置相应的死信TTL。

package com.crj.rabbitmq.deadQueue;import com.crj.rabbitmq.utils.RabbitMqUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;public class Publish {public static final String NORMAL_EXCHANGE = "normal";public static final String NORMAL_QUEUE = "normalQueue";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//在Consumer已经声明过交换机了,所以在这里不能声明//死信消息,设置TTLAMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 0; i < 11; i++) {String message = "info"+i;channel.basicPublish(NORMAL_EXCHANGE,"normal",properties,message.getBytes());}}
}

未运行Consumer2前我们看到普通队列在我们设置的TTL:10s之后将消息流转到死信队列中。

最后启动Consumer2后确实也收到了死信队列中的消息

3 队列达到最大长度发生死信 

在这一部分中我们需要注释掉之前在生产者中设置的消息的TTL,同时在消费者1中开启正常队列的最大消息堆积容量。 

arguments.put("x-max-length",6);

 这样子我们就可以模拟队列达到最大长度后产生死信的情况了。

4 消息被拒引发死信

        要想开启消费者拒收消息的功能,首先需要在消息接收的basicConsumer方法中关闭自动应答,同时自行设置手动应答的逻辑。在下面接收消息的回调函数中,在basicAck中设置应答,在basicReject实现消息拒收。

package com.crj.rabbitmq.deadQueue;import com.crj.rabbitmq.utils.RabbitMqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;/*** 死信队列* 消费者1:需要声明死信队列和普通队列*/
public class Consumer {//普通交换机名称public static final String NORMAL_EXCHANGE = "normal";//死信交换机名称public static final String DEAD_EXCHANGE = "dead";//普通队列的名称public static final String NORMAL_QUEUE = "normalQueue";//死信队列的名称public static final String DEAD_QUEUE = "deadQueue";public static void main(String[] args) throws Exception {//声明通道Channel channel = RabbitMqUtil.getChannel();//声明普通交换机和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);/*** 声明普通队列和死信队列*///创建一个hashmap对象来配置连接死信队列的参数Map<String, Object> arguments = new HashMap<>();//正常队列设置死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置死信RoutingKeyarguments.put("x-dead-letter-routing-key","dead1");//声明普通队列channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);//死信队列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//绑定队列和交换机channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead1");System.out.println("等待接收消息》》》》》》》》》》》");//接收消息DeliverCallback deliverCallback = (consumerTag, message)->{String msg = new String(message.getBody(),"UTF-8");if (msg.equals("info5")){System.out.println("Consumer1接收的消息是:"+msg+":此消息是被拒绝的");//这里第二个参数设置了是否要将拒收的消息塞回原队列channel.basicReject(message.getEnvelope().getDeliveryTag(), false);}else {System.out.println("Consumer1接收到的信息:"+new String(message.getBody(),"UTF-8"));//成功应答,这里设置不批量操作channel.basicAck(message.getEnvelope().getDeliveryTag(), false);}};//开启手动应答//消费者开始消费消息channel.basicConsume(DEAD_QUEUE,false,deliverCallback,(consumerTag)->{});}
}

总结

        时间过期、消息被拒、队列容量限制这三个机制会引发消息被转发死信队列,那么死信队列除了在这三种情况下继续保存消息之外,还有什么作用呢?下一篇文章荔枝会梳理延时队列,相信看完下一篇文章大家能有所收获~

今朝已然成为过去,明日依然向往未来!我是荔枝,在技术成长之路上与您相伴~~~

如果博文对您有帮助的话,可以给荔枝一键三连嘿,您的支持和鼓励是荔枝最大的动力!

如果博文内容有误,也欢迎各位大佬在下方评论区批评指正!!!

相关文章:

RabbitMQ深入 —— 死信队列

前言 前面荔枝梳理了RabbitMQ中的普通队列、交换机以及相关的知识&#xff0c;在这篇文章中荔枝将会梳理RabbitMQ的一个重要的队列 —— 死信队列&#xff0c;主要了解消息流转到死信队列的三种的方式以及相应的实现demo。希望能帮助到有需要的小伙伴~~~ 文章目录 前言 死信队…...

【React + Umi】自定义离开页面拦截弹框事件

在 react umi 中对离开页面的行为进行自定义弹窗拦截控制。以下为可选的方案分析。 wrapper 首先&#xff0c;因为项目框架是 umi&#xff0c;最先想到了 umi 路由的 wrapper 装饰器&#xff0c;但仔细一想又不太对&#xff0c; wrapper 争对于跳转到某个特定页面的前置行为…...

S1FD40A180H-ASEMI快恢复二极管S1FD40A180H

编辑&#xff1a;ll S1FD40A180H-ASEMI快恢复二极管S1FD40A180H 型号&#xff1a;S1FD40A180H 品牌&#xff1a;ASEMI 封装&#xff1a;TO-247 特性&#xff1a;大功率、快恢复二极管 正向电流&#xff1a;40A 反向耐压&#xff1a;1800V 恢复时间&#xff1a;<300n…...

网络编程 day1

1->x.mind网络编程基础 2->简述字节序的概念&#xff0c;并用共用体&#xff08;联合体&#xff09;的方式计算本机的字节序 1.字节序是指不同类型的CPU主机&#xff0c;内存存储多字节整数序列的方式 2.小端字节序&#xff1a;低序字节存储在低地址上 3.大端字节序&a…...

《深入PostgreSQL的存储引擎:原理与性能》

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f405;&#x1f43e;猫头虎建议程序员必备技术栈一览表&#x1f4d6;&#xff1a; &#x1f6e0;️ 全栈技术 Full Stack: &#x1f4da…...

python开发之个微群聊机器人的开发

简要描述&#xff1a; 退出群聊 请求URL&#xff1a; http://域名地址/quitChatRoom 请求方式&#xff1a; POST 请求头Headers&#xff1a; Content-Type&#xff1a;application/jsonAuthorization&#xff1a;login接口返回 参数&#xff1a; 参数名必选类型说明wI…...

【Redis7】--4.事务、管道、发布和订阅

文章目录 事务1.Redis事务2.Redis事务特性3.Redis事务命令3.1MULTI3.2EXEC3.3DISCARD3.4WATCH3.5UNWATCH 4.不保证原子性4.1"全体连坐"4.2"冤头债主" 5.事务执行流程 管道1.pipeline的使用2.pipeline小总结 发布和订阅1.常用命令1.1SUBSCRIBE1.2PUBLISH1.3…...

【Vue】el 和 data短小精湛的细节!

hello&#xff0c;我是小索奇&#xff0c;精心制作的Vue教程持续更新哈&#xff0c;花费了大量的时间和精力&#xff0c;总结拓展了很多疑难点&#xff0c;想要学习&巩固&避坑就一起学习叭~ el 与 data 的两种写法 el共有2种写法 el表达式主要用来在模板中展示数据,它…...

前端screenfull实现界面全屏展示功能

还是先引入依赖 我们要先执行 npm config set registry https://registry.npmjs.org/将本地npm registry地址设置为官方的npm registry地址 不然这个东西安装会有点问题 然后我们执行命令安装 npm install screenfull安装完之后 我们终端执行一下 npm config delete registr…...

Dockerfile 制作常用命令总结

1.FROM( from ) : FROM : from 表示选择一个镜像作为基础镜像&#xff08;在一个Dockerfile 中可以使用多条from&#xff0c;来构建多个镜像&#xff09; 2.ENV &#xff1a; 用来在镜像创建出的容器中声明环境变量&#xff0c;如&#xff1a; ENV PYTHONIOENCODINGutf-8 …...

uniapp项目实践总结(十七)实现滚动触底加载

导语&#xff1a;在日测的开发过程中&#xff0c;经常会碰到页面需要渲染大量数据的情况&#xff0c;这时候就需要用到滚动加载功能&#xff0c;下面总结一下方法。 目录 原理分析实战演练案例展示 原理分析 使用scrolltolower事件来监听滚动到底部&#xff0c;然后加载下一…...

SAP入门到放弃系列之QM质量检验流程概述

目录 一、流程概述二、操作步骤概述2.1 主数据维护2.2 业务操作 一、流程概述 质量检验流程-Inspection Process Flow,通常由于预先设定的一些规则条件自动触发或者手工触发&#xff0c;例如库存地之间的调拨、生产完工入库检验、采购入库的检验、客户交货前检验等等。另外还有…...

Ansys Zemax | 光学系统设计中如何使用玻璃替换方法来优化玻璃

在光学系统中选择最优玻璃材料时&#xff0c;Conrady d-D以及模型玻璃等传统的玻璃选择方法提供的帮助有限。本文介绍了如何使用玻璃替换方法进行直接玻璃优化&#xff0c;以及在考虑玻璃的可用性、成本及耐候性等因素时&#xff0c;如何进一步严格挑选玻璃。 简介 玻璃替换方法…...

springboot基础--实现默认登录页面

1、搭建项目 依赖中 多加入thymeleaf依赖 <dependencies><!--thymeleaf的依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-thymeleaf</artifactId></dependency><!--we…...

TDesign WXS语法

目录 一、输出函数返回值如何获取&#xff1f; 二、WXS语法 三、WXS案例 一、输出函数返回值如何获取&#xff1f; 写在js的方法中 wxml中{{方法名()}}输出&#xff1a; 发现不显示&#xff1f;&#xff1f; 所以不能使用这种方式&#xff01;&#xff01; 二、WXS语法 1.…...

Iterator设计模式

目录 1、示例 1.1 Aggregate接口 1.2 Iterator接口 1.3 Book类 1.4 BookShelf类 1.6 BookShelfIterator 类 1.7 Main类 2、解释Iterator模式中的角色 2.1 Iterator模式的存在意义 2.2 抽象类和接口 2.3 Aggregate 和 Iterator的对应 2.4 容易弄错"下一个"…...

ROS 入门

目录 简介 ROS诞生背景 ROS的设计目标 ROS与ROS2 安装ROS 1.配置ubuntu的软件和更新 2.设置安装源 3.设置key 4.安装 5.配置环境变量 安装可能出现的问题 安装构建依赖 卸载 ROS架构 1.设计者 2.维护者 3. 立足系统架构: ROS 可以划分为三层 ROS通信机制 话…...

第四章 Linux网络编程

ARP 协议 ARP 协议&#xff08;Address Resolution Protocol&#xff09;通过 IP 地址查找对应的 MAC 地址。 当一个主机需要发送数据给另一个主机时&#xff0c;它首先会检查本地的 ARP 缓存表&#xff08;ARP cache&#xff09;中是否存在目标主机的 MAC 地址。如果存在&…...

无涯教程-JavaScript - OFFSET函数

描述 OFFSET函数返回对范围的引用,该范围是一个单元格或单元格范围中指定的行数和列数。 返回的引用可以是单个单元格或单元格范围。您可以指定要返回的行数和列数。 语法 OFFSET (reference, rows, cols, [height], [width]) 争论 Argument描述Required/OptionalReferenc…...

rust切片

切片类型写为[T]。 切片是序列的一个片段。 它是动态大小类型&#xff0c;所以要使用切片类型&#xff0c;就必须使用它的指针类型。引用是最常用的指针类型。 [T; n]能隐式转换成[T]。 一、定义切片 &#xff08;一&#xff09;不可变切片 &[T]&#xff0c;共享切片&…...

<6>-MySQL表的增删查改

目录 一&#xff0c;create&#xff08;创建表&#xff09; 二&#xff0c;retrieve&#xff08;查询表&#xff09; 1&#xff0c;select列 2&#xff0c;where条件 三&#xff0c;update&#xff08;更新表&#xff09; 四&#xff0c;delete&#xff08;删除表&#xf…...

树莓派超全系列教程文档--(62)使用rpicam-app通过网络流式传输视频

使用rpicam-app通过网络流式传输视频 使用 rpicam-app 通过网络流式传输视频UDPTCPRTSPlibavGStreamerRTPlibcamerasrc GStreamer 元素 文章来源&#xff1a; http://raspberry.dns8844.cn/documentation 原文网址 使用 rpicam-app 通过网络流式传输视频 本节介绍来自 rpica…...

3403. 从盒子中找出字典序最大的字符串 I

3403. 从盒子中找出字典序最大的字符串 I 题目链接&#xff1a;3403. 从盒子中找出字典序最大的字符串 I 代码如下&#xff1a; class Solution { public:string answerString(string word, int numFriends) {if (numFriends 1) {return word;}string res;for (int i 0;i &…...

力扣热题100 k个一组反转链表题解

题目: 代码: func reverseKGroup(head *ListNode, k int) *ListNode {cur : headfor i : 0; i < k; i {if cur nil {return head}cur cur.Next}newHead : reverse(head, cur)head.Next reverseKGroup(cur, k)return newHead }func reverse(start, end *ListNode) *ListN…...

RSS 2025|从说明书学习复杂机器人操作任务:NUS邵林团队提出全新机器人装配技能学习框架Manual2Skill

视觉语言模型&#xff08;Vision-Language Models, VLMs&#xff09;&#xff0c;为真实环境中的机器人操作任务提供了极具潜力的解决方案。 尽管 VLMs 取得了显著进展&#xff0c;机器人仍难以胜任复杂的长时程任务&#xff08;如家具装配&#xff09;&#xff0c;主要受限于人…...

基于Java+VUE+MariaDB实现(Web)仿小米商城

仿小米商城 环境安装 nodejs maven JDK11 运行 mvn clean install -DskipTestscd adminmvn spring-boot:runcd ../webmvn spring-boot:runcd ../xiaomi-store-admin-vuenpm installnpm run servecd ../xiaomi-store-vuenpm installnpm run serve 注意&#xff1a;运行前…...

聚六亚甲基单胍盐酸盐市场深度解析:现状、挑战与机遇

根据 QYResearch 发布的市场报告显示&#xff0c;全球市场规模预计在 2031 年达到 9848 万美元&#xff0c;2025 - 2031 年期间年复合增长率&#xff08;CAGR&#xff09;为 3.7%。在竞争格局上&#xff0c;市场集中度较高&#xff0c;2024 年全球前十强厂商占据约 74.0% 的市场…...

【若依】框架项目部署笔记

参考【SpringBoot】【Vue】项目部署_no main manifest attribute, in springboot-0.0.1-sn-CSDN博客 多一个redis安装 准备工作&#xff1a; 压缩包下载&#xff1a;http://download.redis.io/releases 1. 上传压缩包&#xff0c;并进入压缩包所在目录&#xff0c;解压到目标…...

深入浅出WebGL:在浏览器中解锁3D世界的魔法钥匙

WebGL&#xff1a;在浏览器中解锁3D世界的魔法钥匙 引言&#xff1a;网页的边界正在消失 在数字化浪潮的推动下&#xff0c;网页早已不再是静态信息的展示窗口。如今&#xff0c;我们可以在浏览器中体验逼真的3D游戏、交互式数据可视化、虚拟实验室&#xff0c;甚至沉浸式的V…...

Redis上篇--知识点总结

Redis上篇–解析 本文大部分知识整理自网上&#xff0c;在正文结束后都会附上参考地址。如果想要深入或者详细学习可以通过文末链接跳转学习。 1. 基本介绍 Redis 是一个开源的、高性能的 内存键值数据库&#xff0c;Redis 的键值对中的 key 就是字符串对象&#xff0c;而 val…...