【RabbitMQ】死信(延迟队列)的使用
目录
一、介绍
1、什么是死信队列(延迟队列)
2、应用场景
3、死信队列(延迟队列)的使用
4、死信消息来源
二、案例实践
1、案例一
2、案例二(消息接收确认 )
3、总结
一、介绍
1、什么是死信队列(延迟队列)
- 死信,在官网中对应的单词为“Dead Letter”,它是 RabbitMQ 的一种消息机制。
- 死信队列(Dead Letter Queue)和延迟队列(Delay Queue)是两种不同的队列类型,但在实际应用中它们可以结合使用。
- 死信队列是当消息在队列中因为过期、被拒绝等原因无法正常处理时,会被重新发送到另一个交换机上,这个交换机就是死信交换机。死信队列可以用于实现重试机制、日志审计等特殊应用逻辑。
延迟队列则是一种特殊的队列类型,它允许将消息延迟指定的时间后才能被消费者消费。这种队列通常用于处理那些需要在特定时间点被处理的任务,例如定时任务、限时优惠等。在RabbitMQ中,可以通过设置消息的TTL(生存时间)来实现延迟队列的功能。当消息在队列中超过了TTL,它就会被移除并被发送到指定的死信交换机,进而被路由到死信队列中。
结合使用死信队列和延迟队列可以实现一些复杂的应用逻辑。
例如:
可以将某个需要延迟处理的消息发送到延迟队列中,并在消息过期之前将其存储在死信队列中。这样,当消息从延迟队列中移除时,它会被自动发送到死信队列中,然后由消费者消费并执行相应的操作。这种结合使用的方式可以提供更高的灵活性和可靠性,使得系统能够更好地应对各种异常情况。
2、应用场景
死信队列(Dead Letter Queue)和延迟队列(Delay Queue)在以下应用场景中表现优异:
这些场景都是通过结合使用死信队列和延迟队列来提高系统的可靠性、鲁棒性和灵活性。通过合理地设置死信队列和延迟队列的参数,可以实现各种复杂的业务逻辑和异常处理机制。
- 库存解锁服务:例如,当一个商品被锁定且无法被购买时,可以将其放入死信队列中,并在一定时间后重新发送到原始队列进行处理,从而解锁库存。
- 定时关单功能:例如,用户在商城下单成功并点击去支付后,如果在指定时间内未支付,系统可以自动将订单放入死信队列中,并在一定时间后重新发送到原始队列进行处理。
- 保证订单业务的消息数据不丢失:当消息消费发生异常时,可以将消息投入死信队列中,以便后续等到环境好了之后,再消费死信队列中的消息。
- 实现重试机制:当某个消息处理失败时,可以将它放入死信队列中,并在一段时间后重新发送到交换机进行重试。这样可以提高系统的鲁棒性,确保消息能够被正确处理。
- 处理定时任务和秒杀活动:使用延迟队列可以将消息延迟到特定的时间后进行处理,例如定时任务、秒杀活动等。这样可以确保在特定的时间点执行相应的操作。
- 日志处理和审计:将日志消息发送到死信队列中,可以在日志发生异常时进行记录和审计,以便分析和排查问题。
3、死信队列(延迟队列)的使用
使用死信队列(Dead Letter Queue)和延迟队列(Delay Queue)可以提高系统的可靠性和灵活性,特别是在处理异常情况、重试机制和定时任务等方面。
需要注意的是,在使用死信队列和延迟队列时,需要考虑系统的可用性和性能。如果大量的消息被放入死信队列中,可能会导致系统资源的过度消耗。因此,需要合理配置死信队列和延迟队列的大小和数量,以及监控和管理系统的性能和资源使用情况。
- 定义死信队列和延迟队列:在RabbitMQ中,可以在定义队列时指定队列类型为死信队列或延迟队列。例如,使用RabbitMQ的命令行工具或管理界面创建死信队列或延迟队列。
- 配置交换机和队列属性:在绑定交换机和队列时,可以设置一些属性来控制消息的路由和消费。例如,可以设置消息的TTL(生存时间)来实现延迟队列的功能,或者设置消息的优先级和延迟时间等属性。
- 发送消息到队列:使用RabbitMQ的生产者将消息发送到定义的队列中。如果需要将消息放入延迟队列中,可以在发送消息时设置相应的延迟时间。
- 处理消息:消费者从队列中获取消息并进行处理。如果消息无法正常处理,可以将它放入死信队列中。在处理消息时,可以根据需要设置重试机制,以便在消息处理失败时重新发送消息到队列中进行重试。
- 监控和管理:使用RabbitMQ的管理界面或命令行工具监控和管理死信队列和延迟队列的状态和消息。可以查看队列的大小、消息的延迟时间、消费者数量等信息,并根据需要进行调整和管理。
4、死信消息来源
- 消息 TTL 过期
- 队列满了,无法再次添加数据
- 消息被拒绝(reject 或 nack),并且 requeue =false
二、案例实践
1、案例一
生产者
在生产者的Config里面添加队列 。
// =============死信队列===============// 定义QueueA Bean,返回QueueA实例@Beanpublic Queue QueueA() {Map<String, Object> config = new HashMap<>();//message在该队列queue的存活时间最大为5秒config.put("x-message-ttl", 5000);//x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)config.put("x-dead-letter-exchange", "deadExchange");//x-dead-letter-routing-key参数是给这个DLX指定路由键config.put("x-dead-letter-routing-key", "deadQueue");// 返回QueueA实例return new Queue("QueueA", true, true, false, config);}// 定义DirectExchangeA Bean,返回DirectExchangeA实例@Beanpublic DirectExchange DirectExchangeA() {// 返回DirectExchangeA实例return new DirectExchange("DirectExchangeA");}// 定义bindingA Bean,将QueueA绑定到DirectExchangeA,并设置路由键为A.A@Beanpublic Binding bindingA() {// 将QueueA绑定到DirectExchangeAreturn BindingBuilder// 设置路由键为A.A.bind(QueueA()).to(DirectExchangeA()).with("A.A");}// 定义QueueB Bean,返回QueueB实例@Beanpublic Queue QueueB() {// 返回QueueB实例return new Queue("QueueB");}// 定义DirectExchangeB Bean,返回DirectExchangeB实例@Beanpublic DirectExchange DirectExchangeB() {// 返回DirectExchangeB实例return new DirectExchange("DirectExchangeB");}// 定义bindingB Bean,将QueueB绑定到DirectExchangeB,并设置路由键为B.B@Beanpublic Binding bindingB() {// 将QueueB绑定到DirectExchangeBreturn BindingBuilder// 设置路由键为B.B.bind(QueueB()).to(DirectExchangeB()).with("B.B");}
消费者
在消费者里面添加QueueA和QueueB
QueueA
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "QueueA")
public class ReceiverQA {// 接收directExchange01交换机中Queue02队列消息的方法@RabbitHandlerpublic void Queue02(String msg) {log.warn("QueueA,接收到信息:" + msg);}}
QueueB
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "QueueB")
public class ReceiverQB {// 接收directExchange01交换机中Queue02队列消息的方法@RabbitHandlerpublic void Queue02(String msg) {log.warn("QueueB,接收到信息:" + msg);}}
编写Controller层
@RequestMapping("send5")public String send5() {rabbitTemplate.convertAndSend("DirectExchangeA", "A.A", "Hello,A");return "🐉";}
结果:
2、案例二(消息接收确认 )
-
如果某个服务忘记 ACK 了,则 RabbitMQ 不会再发送数据给它,因为 RabbitMQ 认为该服务的处理能力有限
-
ACK 机制还可以起到限流作用,比如在接收到某条消息时休眠几秒钟
-
消息确认模式有:
-
AcknowledgeMode.NONE:自动确认
-
AcknowledgeMode.AUTO:根据情况确认
-
AcknowledgeMode.MANUAL:手动确认
-
配置文件
默认情况下消息消费者是自动 ack (确认)消息的,如果要手动 ack(确认)则需要修改确认模式为 manualspring:rabbitmq:listener:simple:acknowledge-mode: manual
在消费者里的更改ReceiverQA
import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component;import java.io.IOException;@Component @SuppressWarnings("all") @Slf4j @RabbitListener(queues = "QueueA") public class ReceiverQA {// 接收directExchange01交换机中Queue02队列消息的方法@RabbitHandlerpublic void QueueA(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {log.warn("QueueA,接收到信息:" + msg);channel.basicAck(tag, false);}}
需要注意的 basicAck 方法需要传递两个参数
deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel
multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
3、总结
-
持久化
-
exchange要持久化
-
queue要持久化
-
message要持久化
-
-
消息确认
-
启动消费返回(@ReturnList注解,生产者就可以知道哪些消息没有发出去)
-
生产者和Server(broker)之间的消息确认
-
消费者和Server(broker)之间的消息确认
-
相关文章:

【RabbitMQ】死信(延迟队列)的使用
目录 一、介绍 1、什么是死信队列(延迟队列) 2、应用场景 3、死信队列(延迟队列)的使用 4、死信消息来源 二、案例实践 1、案例一 2、案例二(消息接收确认 ) 3、总结 一、介绍 1、什么是死信队列(延迟队列) 死信,在官网中对应的单词…...
java 解析word模板(2024-01-25)
本文主要功能是解析word模板 这是一个word解析类,因为我做的系统用到了而且没有可用的帮助类,只能自己写。之前的实现方式是freemarker 模板解析。但是这次要求用poi不在使用freemarker。实现功能比较少,主要是满足开发需求即可,没…...

flutter-相关个人记录
1、flutter 安卓打包打包报错 flutter build apk -v --no-tree-shake-icons 2、获取华为指纹证书命令 keytool -list -v -keystore ***.jks 3、IOS项目中私有方法查找隐藏文件中 1、cd 项目目录地址 2、grep -r xerbla. "xerbla"为需要查找的关键字 3…...

互斥锁/读写锁(Linux)
一、互斥锁 临界资源概念: 不能同时访问的资源,比如写文件,只能由一个线程写,同时写会写乱。 比如外设打印机,打印的时候只能由一个程序使用。 外设基本上都是不能共享的资源。 生活中比如卫生间,同一…...
Jackson序列化Bean额外属性附加--@JsonAnyGetter、@JsonUnwrapped用户
1. 场景 有一项工作,需要将数据从一个服务S中读取出来(得到的是一个JSON),将数据解析转换以后构造成一个数组的类型A的对象,写入到一个服务T中。 A.class Data public class A {String f0 ;String f1 ; }在发现需要…...
排序算法——冒泡排序算法详解
冒泡排序算法详解 1.引言2.算法概览2.1输入处理2.2核心算法步骤2.3数据结构2.4复杂度分析 3.算法优化4.边界条件和异常处理5.实验和测试6.应用和扩展7.代码示例8.总结 1.引言 冒泡排序是一种简单而直观的比较排序算法,它通过多次遍历数组,比较相邻元素并…...

宋仕强论道之华强北的缺货潮(十六)
始于2019年缺货潮让华强北又生产一大批亿万富翁,缺货的原因主要是:首先,疫情封控导致大量白领在家远程办公,需要购买电脑、打印机等办公设备,同时孩子们也要在家上网课,进一步增加对电子智能终端产品的需求…...
登录注册页面
前提:基于element-ui环境 模态登录组件 分析Login.vue <template><div class"login"><span click"handleClose">X</span></div> </template><script> export default {name: "Login",m…...

视频美颜SDK详解:动态贴纸技术的前沿探索
当下,美颜SDK的动态贴纸技术作为视频美颜的独特亮点,吸引了越来越多开发者和用户的关注。 一、技术详解 动态贴纸技术是视频美颜SDK中的一项创新性功能,它通过在实时视频中添加各种动态效果,为用户提供更加生动有趣的拍摄体验。…...
vue3 实现上传图片裁剪
在线的例子以及代码,请点击访问链接...
flink1.18 广播流 The Broadcast State Pattern 官方案例scala版本
对应官网 https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/ 测试数据 * 广播流 官方案例 scala版本* 广播状态* https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance…...
vueRouter中scrollBehavior实现滚动固定位置
使用前端路由,当切换到新路由时,想要页面滚到顶部,或者是保持原先的滚动位置,就像重新加载页面那样。 vue-router 能做到,而且更好,它让你可以自定义路由切换时页面如何滚动。 注意: 这个功能只在 HTML5 h…...

解决WinForms跨线程操作控件的问题
解决WinForms跨线程操作控件的问题 介绍 在构建Windows窗体应用程序时,我们通常会遇到需要从非UI线程更新UI元素的场景。由于WinForms控件并不是线程安全的,直接这样做会抛出一个异常:“控件’control name’是从其他线程创建的,…...

从零开始:Git 上传与使用指南
Git 是一种非常强大的版本控制系统,它可以帮助您在多人协作开发项目中更好地管理代码版本,并确保每个团队成员都能及时地获取最新的代码更改。在使用 Git 进行版本控制之前,您需要先进行一些设置,以确保您的代码能够顺利地与远程仓…...
Docker compose部署Golang服务
Docker Compose 部署 在使用docker部署时,除了使用--link的方式来关联容器之外,还可以使用 docker compose 运行多个容器。 本文以项目:https://github.com/johncxf/go-api 为例。 定义 Dockerfile 我这里用于区分默认 Dockerfile 文件&a…...

Day36 435无重叠区间 763划分字母区间
435 无重叠区间 给定一个区间的集合,找到需要移除区间的最小数量,使剩余区间互不重叠。 注意: 可以认为区间的终点总是大于它的起点。 区间 [1,2] 和 [2,3] 的边界相互“接触”,但没有相互重叠。 本题与上一题类似: 如果按照左…...

【Servlet】如何编写第一个Servlet程序
个人主页:兜里有颗棉花糖 欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 兜里有颗棉花糖 原创 收录于专栏【Servlet】 本专栏旨在分享学习Servlet的一点学习心得,欢迎大家在评论区交流讨论💌 Servlet是Java编写的服务器端…...

读懂比特币—bitcoin代码分析(五)
今天的代码分析主要是 bitcoin/src/init.cpp 文件中的三个函数:AppInitSanityChecks、AppInitLockDataDirectory、AppInitInterfaces,下面我们来说明这三个函数是用来干什么的,并逐行解读函数代码,先贴出源代码如下: …...
uniapp使用uQRCode插件生成二维码的简单使用
最近在找移动端绘制二维码的问题 ,直接上代码 下载 weapp-qrcode.js(可以通过npm install weapp-qrcode --save 下载,之后把它父子到untils目录下) npm install weapp-qrcode --save在组件页面使用 <canvas id"couponQrcode" canvas-id&qu…...

【寒假每日一题·2024】AcWing 4965. 三国游戏(补)
文章目录 一、题目1、原题链接2、题目描述 二、解题报告1、思路分析2、时间复杂度3、代码详解 一、题目 1、原题链接 4965. 三国游戏 2、题目描述 二、解题报告 1、思路分析 思路参考y总:y总讲解视频 (1)题目中的获胜情况分为三种ÿ…...
KubeSphere 容器平台高可用:环境搭建与可视化操作指南
Linux_k8s篇 欢迎来到Linux的世界,看笔记好好学多敲多打,每个人都是大神! 题目:KubeSphere 容器平台高可用:环境搭建与可视化操作指南 版本号: 1.0,0 作者: 老王要学习 日期: 2025.06.05 适用环境: Ubuntu22 文档说…...
生成xcframework
打包 XCFramework 的方法 XCFramework 是苹果推出的一种多平台二进制分发格式,可以包含多个架构和平台的代码。打包 XCFramework 通常用于分发库或框架。 使用 Xcode 命令行工具打包 通过 xcodebuild 命令可以打包 XCFramework。确保项目已经配置好需要支持的平台…...

地震勘探——干扰波识别、井中地震时距曲线特点
目录 干扰波识别反射波地震勘探的干扰波 井中地震时距曲线特点 干扰波识别 有效波:可以用来解决所提出的地质任务的波;干扰波:所有妨碍辨认、追踪有效波的其他波。 地震勘探中,有效波和干扰波是相对的。例如,在反射波…...

大型活动交通拥堵治理的视觉算法应用
大型活动下智慧交通的视觉分析应用 一、背景与挑战 大型活动(如演唱会、马拉松赛事、高考中考等)期间,城市交通面临瞬时人流车流激增、传统摄像头模糊、交通拥堵识别滞后等问题。以演唱会为例,暖城商圈曾因观众集中离场导致周边…...

PPT|230页| 制造集团企业供应链端到端的数字化解决方案:从需求到结算的全链路业务闭环构建
制造业采购供应链管理是企业运营的核心环节,供应链协同管理在供应链上下游企业之间建立紧密的合作关系,通过信息共享、资源整合、业务协同等方式,实现供应链的全面管理和优化,提高供应链的效率和透明度,降低供应链的成…...
关于 WASM:1. WASM 基础原理
一、WASM 简介 1.1 WebAssembly 是什么? WebAssembly(WASM) 是一种能在现代浏览器中高效运行的二进制指令格式,它不是传统的编程语言,而是一种 低级字节码格式,可由高级语言(如 C、C、Rust&am…...

第 86 场周赛:矩阵中的幻方、钥匙和房间、将数组拆分成斐波那契序列、猜猜这个单词
Q1、[中等] 矩阵中的幻方 1、题目描述 3 x 3 的幻方是一个填充有 从 1 到 9 的不同数字的 3 x 3 矩阵,其中每行,每列以及两条对角线上的各数之和都相等。 给定一个由整数组成的row x col 的 grid,其中有多少个 3 3 的 “幻方” 子矩阵&am…...
【生成模型】视频生成论文调研
工作清单 上游应用方向:控制、速度、时长、高动态、多主体驱动 类型工作基础模型WAN / WAN-VACE / HunyuanVideo控制条件轨迹控制ATI~镜头控制ReCamMaster~多主体驱动Phantom~音频驱动Let Them Talk: Audio-Driven Multi-Person Conversational Video Generation速…...

Python基于历史模拟方法实现投资组合风险管理的VaR与ES模型项目实战
说明:这是一个机器学习实战项目(附带数据代码文档),如需数据代码文档可以直接到文章最后关注获取。 1.项目背景 在金融市场日益复杂和波动加剧的背景下,风险管理成为金融机构和个人投资者关注的核心议题之一。VaR&…...

【JVM面试篇】高频八股汇总——类加载和类加载器
目录 1. 讲一下类加载过程? 2. Java创建对象的过程? 3. 对象的生命周期? 4. 类加载器有哪些? 5. 双亲委派模型的作用(好处)? 6. 讲一下类的加载和双亲委派原则? 7. 双亲委派模…...