RabbitMQ5-死信队列
目录
死信的概念
死信的来源
死信实战
死信之TTl
死信之最大长度
死信之消息被拒
死信的概念
死信,顾名思义就是无法被消费的消息,一般来说,producer 将消息投递到 broker 或直接到queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中;还有用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。
死信的来源
-
消息 TTL 过期
TTL是Time To Live的缩写, 也就是生存时间
-
队列达到最大长度
队列满了,无法再添加数据到 mq 中
-
消息被拒绝
(basic.reject 或 basic.nack) 并且 requeue=false
死信实战

死信之TTl
消费者 C1:
package com.qcby.sixin;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;/*** 死信队列 - 消费者1**/
public class Consumer01 {//普通交换机名称private static final String NORMAL_EXCHANGE = "normal_exchange";//死信交换机名称private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明死信和普通交换机 类型为 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明死信队列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);//死信队列绑定:队列、交换机、路由键(routingKey)channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");//正常队列绑定死信队列信息Map<String, Object> params = new HashMap<>();//正常队列设置死信交换机,参数 key 是固定值params.put("x-dead-letter-exchange", DEAD_EXCHANGE);//正常队列设置死信 routing-key,参数 key 是固定值params.put("x-dead-letter-routing-key", "lisi");//正常队列String normalQueue = "normal-queue";channel.queueDeclare(normalQueue, false, false, false, params);channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");System.out.println("等待接收消息........... ");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Consumer01 接收到消息" + message);};channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {});}}
生产者:
package com.qcby.sixin;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;public class Producer {private static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);//设置消息的 TTL 时间 10sAMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();//该信息是用作演示队列个数限制for (int i = 1; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());System.out.println("生产者发送消息:" + message);}}
}
消费者 C2:
package com.qcby.sixin;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Consumer02 {//死信交换机名称private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明交换机channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明队列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");System.out.println("等待接收死信消息........... ");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Consumer02 接收到消息" + message);};channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {});}
}
启动 C1 ,之后关闭消费者,模拟其接收不到消息,再启动 Producer:

启动 C2 消费者,它消费死信队列里面的消息:

死信之最大长度
消费者 C1:
package com.qcby.sixin.two;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;/*** 死信队列 - 消费者1**/
public class Consumer01 {//普通交换机名称private static final String NORMAL_EXCHANGE = "normal_exchange";//死信交换机名称private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明死信和普通交换机 类型为 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明死信队列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);//死信队列绑定:队列、交换机、路由键(routingKey)channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");//正常队列绑定死信队列信息Map<String, Object> params = new HashMap<>();//正常队列设置死信交换机,参数 key 是固定值params.put("x-dead-letter-exchange", DEAD_EXCHANGE);//正常队列设置死信 routing-key,参数 key 是固定值params.put("x-dead-letter-routing-key", "lisi");//设置正常队列的长度限制params.put("x-max-length",6);//正常队列String normalQueue = "normal-queue";channel.queueDeclare(normalQueue, false, false, false, params);channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");System.out.println("等待接收消息........... ");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Consumer01 接收到消息" + message);};channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {});}}
生产者:
package com.qcby.sixin.two;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;public class Producer {private static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();//该信息是用作演示队列个数限制for (int i = 1; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());System.out.println("生产者发送消息:" + message);}}
}
消费者 C2:
package com.qcby.sixin.two;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Consumer02 {//死信交换机名称private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明交换机channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明队列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");System.out.println("等待接收死信消息........... ");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Consumer02 接收到消息" + message);};channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {});}
}

死信之消息被拒
拒收消息 "info5"
消费者 C1:
package com.qcby.sixin.three;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;/*** 死信队列 - 消费者1**/
public class Consumer01 {//普通交换机名称private static final String NORMAL_EXCHANGE = "normal_exchange";//死信交换机名称private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明死信和普通交换机 类型为 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明死信队列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);//死信队列绑定:队列、交换机、路由键(routingKey)channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");//正常队列绑定死信队列信息Map<String, Object> params = new HashMap<>();//正常队列设置死信交换机,参数 key 是固定值params.put("x-dead-letter-exchange", DEAD_EXCHANGE);//正常队列设置死信 routing-key,参数 key 是固定值params.put("x-dead-letter-routing-key", "lisi");//正常队列String normalQueue = "normal-queue";channel.queueDeclare(normalQueue, false, false, false, params);channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");System.out.println("等待接收消息........... ");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");if (message.equals("info5")) {System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");//requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);} else {System.out.println("Consumer01 接收到消息" + message);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};//开启手动应答channel.basicConsume(normalQueue, false, deliverCallback, consumerTag -> {});}}
生产者:
package com.qcby.sixin.three;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.Channel;public class Producer {private static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();//该信息是用作演示队列个数限制for (int i = 1; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());System.out.println("生产者发送消息:" + message);}}
}
消费者 C2:
package com.qcby.sixin.three;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Consumer02 {//死信交换机名称private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明交换机channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明队列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");System.out.println("等待接收死信消息........... ");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Consumer02 接收到消息" + message);};channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {});}
}

相关文章:
RabbitMQ5-死信队列
目录 死信的概念 死信的来源 死信实战 死信之TTl 死信之最大长度 死信之消息被拒 死信的概念 死信,顾名思义就是无法被消费的消息,一般来说,producer 将消息投递到 broker 或直接到queue 里了,consumer 从 queue 取出消息进…...
macOS使用LLVM官方发布的tar.xz来安装Clang编译器
之前笔者写过一篇博文ubuntu使用LLVM官方发布的tar.xz来安装Clang编译器介绍了Ubuntu下使用官方发布的tar.xz包来安装Clang编译。官方发布的版本中也有MacOS版本的tar.xz,那MacOS应该也是可以安装的。 笔者2015款MBP笔记本,CPU是intel的,出厂…...
【算法学习】归并排序算法思想的应用—求逆序对数量
Hey,大家好!👋 今天我们来聊聊一个有趣的话题——如何在归并排序的基础上,高效解决求逆序对数量的问题。如果你对算法感兴趣,或者正在准备算法面试,这篇文章一定会对你有所帮助!🚀 …...
一组开源、免费、Metro风格的 WPF UI 控件库
前言 今天大姚给大家分享一个开源、免费、Metro风格的 WPF UI 控件库:MahApps.Metro。 项目介绍 MahApps.Metro 是一个开源、免费、Metro风格的 WPF UI 控件库,提供了现代化、平滑和美观的控件和样式,帮助开发人员轻松创建具有现代感的 Win…...
Spring Security 应用详解
Spring Security 应用详解 集成SpringBootSpring Boot 介绍创建maven工程spring 容器配置Servlet Context配置安全配置测试 工作原理结构总览认证流程认证流程AuthenticationProviderUserDetailsServicePasswordEncoder 授权流程授权流程授权决策 自定义认证自定义登录页面认证…...
业务对象和对象的区别
"业务对象"和"对象"这两个术语在日常编程和软件工程中经常被使用,但它们之间存在一些区别,主要体现在它们的目的、范围和抽象层次上。 ### 对象(Object) 1. **定义**: - 对象是面向对象编程&#…...
81,【5】BUUCTF WEB [b01lers2020]Life on Mars
进入靶场 怎莫颠颠的,一下子就想到展博了 先把左边的挨个点一遍 在最后一个有点收获 不过也没其他收获了 这种进去给个正常网页的题目,基本都靠url获取信息了 抓包看看有没有其他信息 竟然没有任何信息 自闭了 看别人的wp去咯 为什么别人抓到的包里…...
华硕笔记本装win10哪个版本好用分析_华硕笔记本装win10专业版图文教程
华硕笔记本装win10哪个版本好用?华硕笔记本还是建议安装win10专业版。Win分为多个版本,其中家庭版(Home)和专业版(Pro)是用户选择最多的两个版本。win10专业版在功能以及安全性方面有着明显的优势ÿ…...
Linux进程 -fork(初识),进程状态和进程优先级
目录 一、通过系统调用创建进程-fork 1.fork的介绍 2.fork的理解 3.fork常规用法 4.fork的三个问题 5.创建多个子进程 二、进程状态 (1)Linux内核源代码 (2)进程的状态 R运行状态(运行态) S 睡眠状态&…...
数据从前端传到后端入库过程分析
数据从前端传到后端入库过程分析 概述 积累了一些项目经验,成长为一个老程序员了,自认为对各种业务和技术都能得心应手的应对了,殊不知很多时候我们借助了搜索引擎的能力,当然现在大家都是通过AI来武装自己。 今天要分析的话题是…...
macOS如何进入 Application Support 目录(cd: string not in pwd: Application)
错误信息 cd: string not in pwd: Application 表示在当前目录下找不到名为 Application Support 的目录。可能的原因如下: 拼写错误或路径错误:确保你输入的目录名称正确。目录名称是区分大小写的,因此请确保使用正确的大小写。正确的目录名…...
第38周:猫狗识别 (Tensorflow实战第八周)
目录 前言 一、前期工作 1.1 设置GPU 1.2 导入数据 输出 二、数据预处理 2.1 加载数据 2.2 再次检查数据 2.3 配置数据集 2.4 可视化数据 三、构建VGG-16网络 3.1 VGG-16网络介绍 3.2 搭建VGG-16模型 四、编译 五、训练模型 六、模型评估 七、预测 总结 前言…...
【2024年华为OD机试】 (A卷,200分)- 计算网络信号、信号强度(JavaScriptJava PythonC/C++)
一、问题描述 题目解析 问题描述 我们有一个 m x n 的二维网格地图,每个格子可能是以下几种情况之一: 0:表示该位置是空旷的。x(正整数):表示该位置是信号源,信号强度为 x。-1:表示该位置是阻隔物,信号无法直接穿透。信号源只有一个,阻隔物可能有多个。信号在传播…...
【go语言】数组和切片
一、数组 1.1 什么是数组 数组是一组数:数组需要是相同类型的数据的集合;数组是需要定义大小的;数组一旦定义了大小是不可以改变的。 1.2 数组的声明 数组和其他变量定义没有什么区别,唯一的就是这个是一组数,需要给…...
2025美赛MCM数学建模A题:《石头台阶的“记忆”:如何用数学揭开历史的足迹》(全网最全思路+模型)
✨个人主页欢迎您的访问 ✨期待您的三连 ✨ 《石头台阶的“记忆”:如何用数学揭开历史的足迹》 目录 《石头台阶的“记忆”:如何用数学揭开历史的足迹》 ✨摘要✨ ✨引言✨ 1. 引言的结构 2. 撰写步骤 (1)研究背景 &#…...
使用 Docker Compose 一键启动 Redis、MySQL 和 RabbitMQ
目录 一、Docker Compose 简介 二、服务配置详解 1. Redis 配置 2. MySQL 配置 3. RabbitMQ 配置 三、数据持久化与时间同步 四、部署与管理 五、总结 目录挂载与卷映射的区别 现代软件开发中,微服务架构因其灵活性和可扩展性而备受青睐。为了支持微服务的…...
新增自定义数据功能|UWA Gears V1.0.7
UWA Gears 是UWA最新发布的无SDK性能分析工具。针对移动平台,提供了实时监测和截帧分析功能,帮助您精准定位性能热点,提升应用的整体表现。 本次版本更新新增了自定义数据功能,支持灵活定义和捕获关键性能指标,满足特…...
docker 简要笔记
文章目录 一、前提内容1、docker 环境准备2、docker-compose 环境准备3、流程说明 二、打包 docker 镜像1、基础镜像2、国内镜像源3、基础的dockerfile4、打包镜像 四、构建运行1、docker 部分2、docker-compose 部分2.1、构建docker-compose.yml2.1.1、同目录构建2.1.2、利用镜…...
在Ubuntu上使用Apache+MariaDB安装部署Nextcloud并修改默认存储路径
一、前言 Nextcloud 是一款开源的私有云存储解决方案,允许用户轻松搭建自己的云服务。它不仅支持文件存储和共享,还提供了日历、联系人、任务管理、笔记等丰富的功能。本文将详细介绍如何在 Ubuntu 22.04 LTS 上使用 Apache 和 MariaDB 安装部署 Nextcl…...
【JavaEE】-- 计算机是如何工作的
文章目录 1. 冯诺依曼体系(VonNeumann Architecture)2. CPU 基本工作流程2.1 寄存器(Register)和 内存(RAM)2.2 控制单元 CU(ControlUnit)2.3 指令(Instruction) 3. 操作系统(OperatingSystem)3.1 操作系统的定位3.2 什么是进程/任务(Process…...
前端倒计时误差!
提示:记录工作中遇到的需求及解决办法 文章目录 前言一、误差从何而来?二、五大解决方案1. 动态校准法(基础版)2. Web Worker 计时3. 服务器时间同步4. Performance API 高精度计时5. 页面可见性API优化三、生产环境最佳实践四、终极解决方案架构前言 前几天听说公司某个项…...
大型活动交通拥堵治理的视觉算法应用
大型活动下智慧交通的视觉分析应用 一、背景与挑战 大型活动(如演唱会、马拉松赛事、高考中考等)期间,城市交通面临瞬时人流车流激增、传统摄像头模糊、交通拥堵识别滞后等问题。以演唱会为例,暖城商圈曾因观众集中离场导致周边…...
网络编程(UDP编程)
思维导图 UDP基础编程(单播) 1.流程图 服务器:短信的接收方 创建套接字 (socket)-----------------------------------------》有手机指定网络信息-----------------------------------------------》有号码绑定套接字 (bind)--------------…...
tree 树组件大数据卡顿问题优化
问题背景 项目中有用到树组件用来做文件目录,但是由于这个树组件的节点越来越多,导致页面在滚动这个树组件的时候浏览器就很容易卡死。这种问题基本上都是因为dom节点太多,导致的浏览器卡顿,这里很明显就需要用到虚拟列表的技术&…...
python报错No module named ‘tensorflow.keras‘
是由于不同版本的tensorflow下的keras所在的路径不同,结合所安装的tensorflow的目录结构修改from语句即可。 原语句: from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense 修改后: from tensorflow.python.keras.lay…...
基于Java+MySQL实现(GUI)客户管理系统
客户资料管理系统的设计与实现 第一章 需求分析 1.1 需求总体介绍 本项目为了方便维护客户信息为了方便维护客户信息,对客户进行统一管理,可以把所有客户信息录入系统,进行维护和统计功能。可通过文件的方式保存相关录入数据,对…...
STM32---外部32.768K晶振(LSE)无法起振问题
晶振是否起振主要就检查两个1、晶振与MCU是否兼容;2、晶振的负载电容是否匹配 目录 一、判断晶振与MCU是否兼容 二、判断负载电容是否匹配 1. 晶振负载电容(CL)与匹配电容(CL1、CL2)的关系 2. 如何选择 CL1 和 CL…...
nnUNet V2修改网络——暴力替换网络为UNet++
更换前,要用nnUNet V2跑通所用数据集,证明nnUNet V2、数据集、运行环境等没有问题 阅读nnU-Net V2 的 U-Net结构,初步了解要修改的网络,知己知彼,修改起来才能游刃有余。 U-Net存在两个局限,一是网络的最佳深度因应用场景而异,这取决于任务的难度和可用于训练的标注数…...
GraphQL 实战篇:Apollo Client 配置与缓存
GraphQL 实战篇:Apollo Client 配置与缓存 上一篇:GraphQL 入门篇:基础查询语法 依旧和上一篇的笔记一样,主实操,没啥过多的细节讲解,代码具体在: https://github.com/GoldenaArcher/graphql…...
网页端 js 读取发票里的二维码信息(图片和PDF格式)
起因 为了实现在报销流程中,发票不能重用的限制,发票上传后,希望能读出发票号,并记录发票号已用,下次不再可用于报销。 基于上面的需求,研究了OCR 的方式和读PDF的方式,实际是可行的ÿ…...
