RabbitMQ5-死信队列
目录
死信的概念
死信的来源
死信实战
死信之TTl
死信之最大长度
死信之消息被拒
死信的概念
死信,顾名思义就是无法被消费的消息,一般来说,producer 将消息投递到 broker 或直接到queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中;还有用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。
死信的来源
-
消息 TTL 过期
TTL是Time To Live的缩写, 也就是生存时间
-
队列达到最大长度
队列满了,无法再添加数据到 mq 中
-
消息被拒绝
(basic.reject 或 basic.nack) 并且 requeue=false
死信实战

死信之TTl
消费者 C1:
package com.qcby.sixin;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;/*** 死信队列 - 消费者1**/
public class Consumer01 {//普通交换机名称private static final String NORMAL_EXCHANGE = "normal_exchange";//死信交换机名称private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明死信和普通交换机 类型为 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明死信队列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);//死信队列绑定:队列、交换机、路由键(routingKey)channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");//正常队列绑定死信队列信息Map<String, Object> params = new HashMap<>();//正常队列设置死信交换机,参数 key 是固定值params.put("x-dead-letter-exchange", DEAD_EXCHANGE);//正常队列设置死信 routing-key,参数 key 是固定值params.put("x-dead-letter-routing-key", "lisi");//正常队列String normalQueue = "normal-queue";channel.queueDeclare(normalQueue, false, false, false, params);channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");System.out.println("等待接收消息........... ");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Consumer01 接收到消息" + message);};channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {});}}
生产者:
package com.qcby.sixin;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;public class Producer {private static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);//设置消息的 TTL 时间 10sAMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();//该信息是用作演示队列个数限制for (int i = 1; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());System.out.println("生产者发送消息:" + message);}}
}
消费者 C2:
package com.qcby.sixin;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Consumer02 {//死信交换机名称private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明交换机channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明队列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");System.out.println("等待接收死信消息........... ");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Consumer02 接收到消息" + message);};channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {});}
}
启动 C1 ,之后关闭消费者,模拟其接收不到消息,再启动 Producer:

启动 C2 消费者,它消费死信队列里面的消息:

死信之最大长度
消费者 C1:
package com.qcby.sixin.two;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;/*** 死信队列 - 消费者1**/
public class Consumer01 {//普通交换机名称private static final String NORMAL_EXCHANGE = "normal_exchange";//死信交换机名称private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明死信和普通交换机 类型为 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明死信队列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);//死信队列绑定:队列、交换机、路由键(routingKey)channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");//正常队列绑定死信队列信息Map<String, Object> params = new HashMap<>();//正常队列设置死信交换机,参数 key 是固定值params.put("x-dead-letter-exchange", DEAD_EXCHANGE);//正常队列设置死信 routing-key,参数 key 是固定值params.put("x-dead-letter-routing-key", "lisi");//设置正常队列的长度限制params.put("x-max-length",6);//正常队列String normalQueue = "normal-queue";channel.queueDeclare(normalQueue, false, false, false, params);channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");System.out.println("等待接收消息........... ");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Consumer01 接收到消息" + message);};channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {});}}
生产者:
package com.qcby.sixin.two;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;public class Producer {private static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();//该信息是用作演示队列个数限制for (int i = 1; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());System.out.println("生产者发送消息:" + message);}}
}
消费者 C2:
package com.qcby.sixin.two;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Consumer02 {//死信交换机名称private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明交换机channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明队列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");System.out.println("等待接收死信消息........... ");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Consumer02 接收到消息" + message);};channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {});}
}

死信之消息被拒
拒收消息 "info5"
消费者 C1:
package com.qcby.sixin.three;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;/*** 死信队列 - 消费者1**/
public class Consumer01 {//普通交换机名称private static final String NORMAL_EXCHANGE = "normal_exchange";//死信交换机名称private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明死信和普通交换机 类型为 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明死信队列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);//死信队列绑定:队列、交换机、路由键(routingKey)channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");//正常队列绑定死信队列信息Map<String, Object> params = new HashMap<>();//正常队列设置死信交换机,参数 key 是固定值params.put("x-dead-letter-exchange", DEAD_EXCHANGE);//正常队列设置死信 routing-key,参数 key 是固定值params.put("x-dead-letter-routing-key", "lisi");//正常队列String normalQueue = "normal-queue";channel.queueDeclare(normalQueue, false, false, false, params);channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");System.out.println("等待接收消息........... ");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");if (message.equals("info5")) {System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");//requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);} else {System.out.println("Consumer01 接收到消息" + message);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};//开启手动应答channel.basicConsume(normalQueue, false, deliverCallback, consumerTag -> {});}}
生产者:
package com.qcby.sixin.three;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.Channel;public class Producer {private static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();//该信息是用作演示队列个数限制for (int i = 1; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());System.out.println("生产者发送消息:" + message);}}
}
消费者 C2:
package com.qcby.sixin.three;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Consumer02 {//死信交换机名称private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明交换机channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明队列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");System.out.println("等待接收死信消息........... ");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Consumer02 接收到消息" + message);};channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {});}
}

相关文章:
RabbitMQ5-死信队列
目录 死信的概念 死信的来源 死信实战 死信之TTl 死信之最大长度 死信之消息被拒 死信的概念 死信,顾名思义就是无法被消费的消息,一般来说,producer 将消息投递到 broker 或直接到queue 里了,consumer 从 queue 取出消息进…...
macOS使用LLVM官方发布的tar.xz来安装Clang编译器
之前笔者写过一篇博文ubuntu使用LLVM官方发布的tar.xz来安装Clang编译器介绍了Ubuntu下使用官方发布的tar.xz包来安装Clang编译。官方发布的版本中也有MacOS版本的tar.xz,那MacOS应该也是可以安装的。 笔者2015款MBP笔记本,CPU是intel的,出厂…...
【算法学习】归并排序算法思想的应用—求逆序对数量
Hey,大家好!👋 今天我们来聊聊一个有趣的话题——如何在归并排序的基础上,高效解决求逆序对数量的问题。如果你对算法感兴趣,或者正在准备算法面试,这篇文章一定会对你有所帮助!🚀 …...
一组开源、免费、Metro风格的 WPF UI 控件库
前言 今天大姚给大家分享一个开源、免费、Metro风格的 WPF UI 控件库:MahApps.Metro。 项目介绍 MahApps.Metro 是一个开源、免费、Metro风格的 WPF UI 控件库,提供了现代化、平滑和美观的控件和样式,帮助开发人员轻松创建具有现代感的 Win…...
Spring Security 应用详解
Spring Security 应用详解 集成SpringBootSpring Boot 介绍创建maven工程spring 容器配置Servlet Context配置安全配置测试 工作原理结构总览认证流程认证流程AuthenticationProviderUserDetailsServicePasswordEncoder 授权流程授权流程授权决策 自定义认证自定义登录页面认证…...
业务对象和对象的区别
"业务对象"和"对象"这两个术语在日常编程和软件工程中经常被使用,但它们之间存在一些区别,主要体现在它们的目的、范围和抽象层次上。 ### 对象(Object) 1. **定义**: - 对象是面向对象编程&#…...
81,【5】BUUCTF WEB [b01lers2020]Life on Mars
进入靶场 怎莫颠颠的,一下子就想到展博了 先把左边的挨个点一遍 在最后一个有点收获 不过也没其他收获了 这种进去给个正常网页的题目,基本都靠url获取信息了 抓包看看有没有其他信息 竟然没有任何信息 自闭了 看别人的wp去咯 为什么别人抓到的包里…...
华硕笔记本装win10哪个版本好用分析_华硕笔记本装win10专业版图文教程
华硕笔记本装win10哪个版本好用?华硕笔记本还是建议安装win10专业版。Win分为多个版本,其中家庭版(Home)和专业版(Pro)是用户选择最多的两个版本。win10专业版在功能以及安全性方面有着明显的优势ÿ…...
Linux进程 -fork(初识),进程状态和进程优先级
目录 一、通过系统调用创建进程-fork 1.fork的介绍 2.fork的理解 3.fork常规用法 4.fork的三个问题 5.创建多个子进程 二、进程状态 (1)Linux内核源代码 (2)进程的状态 R运行状态(运行态) S 睡眠状态&…...
数据从前端传到后端入库过程分析
数据从前端传到后端入库过程分析 概述 积累了一些项目经验,成长为一个老程序员了,自认为对各种业务和技术都能得心应手的应对了,殊不知很多时候我们借助了搜索引擎的能力,当然现在大家都是通过AI来武装自己。 今天要分析的话题是…...
macOS如何进入 Application Support 目录(cd: string not in pwd: Application)
错误信息 cd: string not in pwd: Application 表示在当前目录下找不到名为 Application Support 的目录。可能的原因如下: 拼写错误或路径错误:确保你输入的目录名称正确。目录名称是区分大小写的,因此请确保使用正确的大小写。正确的目录名…...
第38周:猫狗识别 (Tensorflow实战第八周)
目录 前言 一、前期工作 1.1 设置GPU 1.2 导入数据 输出 二、数据预处理 2.1 加载数据 2.2 再次检查数据 2.3 配置数据集 2.4 可视化数据 三、构建VGG-16网络 3.1 VGG-16网络介绍 3.2 搭建VGG-16模型 四、编译 五、训练模型 六、模型评估 七、预测 总结 前言…...
【2024年华为OD机试】 (A卷,200分)- 计算网络信号、信号强度(JavaScriptJava PythonC/C++)
一、问题描述 题目解析 问题描述 我们有一个 m x n 的二维网格地图,每个格子可能是以下几种情况之一: 0:表示该位置是空旷的。x(正整数):表示该位置是信号源,信号强度为 x。-1:表示该位置是阻隔物,信号无法直接穿透。信号源只有一个,阻隔物可能有多个。信号在传播…...
【go语言】数组和切片
一、数组 1.1 什么是数组 数组是一组数:数组需要是相同类型的数据的集合;数组是需要定义大小的;数组一旦定义了大小是不可以改变的。 1.2 数组的声明 数组和其他变量定义没有什么区别,唯一的就是这个是一组数,需要给…...
2025美赛MCM数学建模A题:《石头台阶的“记忆”:如何用数学揭开历史的足迹》(全网最全思路+模型)
✨个人主页欢迎您的访问 ✨期待您的三连 ✨ 《石头台阶的“记忆”:如何用数学揭开历史的足迹》 目录 《石头台阶的“记忆”:如何用数学揭开历史的足迹》 ✨摘要✨ ✨引言✨ 1. 引言的结构 2. 撰写步骤 (1)研究背景 &#…...
使用 Docker Compose 一键启动 Redis、MySQL 和 RabbitMQ
目录 一、Docker Compose 简介 二、服务配置详解 1. Redis 配置 2. MySQL 配置 3. RabbitMQ 配置 三、数据持久化与时间同步 四、部署与管理 五、总结 目录挂载与卷映射的区别 现代软件开发中,微服务架构因其灵活性和可扩展性而备受青睐。为了支持微服务的…...
新增自定义数据功能|UWA Gears V1.0.7
UWA Gears 是UWA最新发布的无SDK性能分析工具。针对移动平台,提供了实时监测和截帧分析功能,帮助您精准定位性能热点,提升应用的整体表现。 本次版本更新新增了自定义数据功能,支持灵活定义和捕获关键性能指标,满足特…...
docker 简要笔记
文章目录 一、前提内容1、docker 环境准备2、docker-compose 环境准备3、流程说明 二、打包 docker 镜像1、基础镜像2、国内镜像源3、基础的dockerfile4、打包镜像 四、构建运行1、docker 部分2、docker-compose 部分2.1、构建docker-compose.yml2.1.1、同目录构建2.1.2、利用镜…...
在Ubuntu上使用Apache+MariaDB安装部署Nextcloud并修改默认存储路径
一、前言 Nextcloud 是一款开源的私有云存储解决方案,允许用户轻松搭建自己的云服务。它不仅支持文件存储和共享,还提供了日历、联系人、任务管理、笔记等丰富的功能。本文将详细介绍如何在 Ubuntu 22.04 LTS 上使用 Apache 和 MariaDB 安装部署 Nextcl…...
【JavaEE】-- 计算机是如何工作的
文章目录 1. 冯诺依曼体系(VonNeumann Architecture)2. CPU 基本工作流程2.1 寄存器(Register)和 内存(RAM)2.2 控制单元 CU(ControlUnit)2.3 指令(Instruction) 3. 操作系统(OperatingSystem)3.1 操作系统的定位3.2 什么是进程/任务(Process…...
测试微信模版消息推送
进入“开发接口管理”--“公众平台测试账号”,无需申请公众账号、可在测试账号中体验并测试微信公众平台所有高级接口。 获取access_token: 自定义模版消息: 关注测试号:扫二维码关注测试号。 发送模版消息: import requests da…...
第一篇:Agent2Agent (A2A) 协议——协作式人工智能的黎明
AI 领域的快速发展正在催生一个新时代,智能代理(agents)不再是孤立的个体,而是能够像一个数字团队一样协作。然而,当前 AI 生态系统的碎片化阻碍了这一愿景的实现,导致了“AI 巴别塔问题”——不同代理之间…...
CRMEB 框架中 PHP 上传扩展开发:涵盖本地上传及阿里云 OSS、腾讯云 COS、七牛云
目前已有本地上传、阿里云OSS上传、腾讯云COS上传、七牛云上传扩展 扩展入口文件 文件目录 crmeb\services\upload\Upload.php namespace crmeb\services\upload;use crmeb\basic\BaseManager; use think\facade\Config;/*** Class Upload* package crmeb\services\upload* …...
蓝桥杯3498 01串的熵
问题描述 对于一个长度为 23333333的 01 串, 如果其信息熵为 11625907.5798, 且 0 出现次数比 1 少, 那么这个 01 串中 0 出现了多少次? #include<iostream> #include<cmath> using namespace std;int n 23333333;int main() {//枚举 0 出现的次数//因…...
以光量子为例,详解量子获取方式
光量子技术获取量子比特可在室温下进行。该方式有望通过与名为硅光子学(silicon photonics)的光波导(optical waveguide)芯片制造技术和光纤等光通信技术相结合来实现量子计算机。量子力学中,光既是波又是粒子。光子本…...
Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信
文章目录 Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信前言一、网络通信基础概念二、服务端与客户端的完整流程图解三、每一步的详细讲解和代码示例1. 创建Socket(服务端和客户端都要)2. 绑定本地地址和端口&#x…...
保姆级教程:在无网络无显卡的Windows电脑的vscode本地部署deepseek
文章目录 1 前言2 部署流程2.1 准备工作2.2 Ollama2.2.1 使用有网络的电脑下载Ollama2.2.2 安装Ollama(有网络的电脑)2.2.3 安装Ollama(无网络的电脑)2.2.4 安装验证2.2.5 修改大模型安装位置2.2.6 下载Deepseek模型 2.3 将deepse…...
Golang——7、包与接口详解
包与接口详解 1、Golang包详解1.1、Golang中包的定义和介绍1.2、Golang包管理工具go mod1.3、Golang中自定义包1.4、Golang中使用第三包1.5、init函数 2、接口详解2.1、接口的定义2.2、空接口2.3、类型断言2.4、结构体值接收者和指针接收者实现接口的区别2.5、一个结构体实现多…...
LangFlow技术架构分析
🔧 LangFlow 的可视化技术栈 前端节点编辑器 底层框架:基于 (一个现代化的 React 节点绘图库) 功能: 拖拽式构建 LangGraph 状态机 实时连线定义节点依赖关系 可视化调试循环和分支逻辑 与 LangGraph 的深…...
rknn toolkit2搭建和推理
安装Miniconda Miniconda - Anaconda Miniconda 选择一个 新的 版本 ,不用和RKNN的python版本保持一致 使用 ./xxx.sh进行安装 下面配置一下载源 # 清华大学源(最常用) conda config --add channels https://mirrors.tuna.tsinghua.edu.cn…...
