当前位置: 首页 > news >正文

【RabbitMQ】死信(延迟队列)的使用

目录

一、介绍

1、什么是死信队列(延迟队列)

2、应用场景

3、死信队列(延迟队列)的使用

4、死信消息来源

二、案例实践

1、案例一

2、案例二(消息接收确认 )

3、总结


一、介绍

1、什么是死信队列(延迟队列)

  •         死信,在官网中对应的单词为“Dead Letter”,它是 RabbitMQ 的一种消息机制。
  •         死信队列(Dead Letter Queue)延迟队列(Delay Queue)是两种不同的队列类型,但在实际应用中它们可以结合使用。
  •         死信队列是当消息在队列中因为过期、被拒绝等原因无法正常处理时,会被重新发送到另一个交换机上,这个交换机就是死信交换机。死信队列可以用于实现重试机制、日志审计等特殊应用逻辑。

        延迟队列则是一种特殊的队列类型,它允许将消息延迟指定的时间后才能被消费者消费。这种队列通常用于处理那些需要在特定时间点被处理的任务,例如定时任务、限时优惠等。在RabbitMQ中,可以通过设置消息的TTL(生存时间)来实现延迟队列的功能。当消息在队列中超过了TTL,它就会被移除并被发送到指定的死信交换机,进而被路由到死信队列中。

结合使用死信队列和延迟队列可以实现一些复杂的应用逻辑。

例如:

        可以将某个需要延迟处理的消息发送到延迟队列中,并在消息过期之前将其存储在死信队列中。这样,当消息从延迟队列中移除时,它会被自动发送到死信队列中,然后由消费者消费并执行相应的操作。这种结合使用的方式可以提供更高的灵活性和可靠性,使得系统能够更好地应对各种异常情况。

2、应用场景

死信队列(Dead Letter Queue)和延迟队列(Delay Queue)在以下应用场景中表现优异:

        这些场景都是通过结合使用死信队列和延迟队列来提高系统的可靠性、鲁棒性和灵活性。通过合理地设置死信队列和延迟队列的参数,可以实现各种复杂的业务逻辑和异常处理机制。

  1. 库存解锁服务:例如,当一个商品被锁定且无法被购买时,可以将其放入死信队列中,并在一定时间后重新发送到原始队列进行处理,从而解锁库存。
  2. 定时关单功能:例如,用户在商城下单成功并点击去支付后,如果在指定时间内未支付,系统可以自动将订单放入死信队列中,并在一定时间后重新发送到原始队列进行处理。
  3. 保证订单业务的消息数据不丢失:当消息消费发生异常时,可以将消息投入死信队列中,以便后续等到环境好了之后,再消费死信队列中的消息。
  4. 实现重试机制:当某个消息处理失败时,可以将它放入死信队列中,并在一段时间后重新发送到交换机进行重试。这样可以提高系统的鲁棒性,确保消息能够被正确处理。
  5. 处理定时任务和秒杀活动:使用延迟队列可以将消息延迟到特定的时间后进行处理,例如定时任务、秒杀活动等。这样可以确保在特定的时间点执行相应的操作。
  6. 日志处理和审计:将日志消息发送到死信队列中,可以在日志发生异常时进行记录和审计,以便分析和排查问题。

3、死信队列(延迟队列)的使用

使用死信队列(Dead Letter Queue)和延迟队列(Delay Queue)可以提高系统的可靠性和灵活性,特别是在处理异常情况、重试机制和定时任务等方面。

需要注意的是,在使用死信队列和延迟队列时,需要考虑系统的可用性和性能。如果大量的消息被放入死信队列中,可能会导致系统资源的过度消耗。因此,需要合理配置死信队列和延迟队列的大小和数量,以及监控和管理系统的性能和资源使用情况。

  1. 定义死信队列和延迟队列:在RabbitMQ中,可以在定义队列时指定队列类型为死信队列或延迟队列。例如,使用RabbitMQ的命令行工具或管理界面创建死信队列或延迟队列。
  2. 配置交换机和队列属性:在绑定交换机和队列时,可以设置一些属性来控制消息的路由和消费。例如,可以设置消息的TTL(生存时间)来实现延迟队列的功能,或者设置消息的优先级和延迟时间等属性。
  3. 发送消息到队列:使用RabbitMQ的生产者将消息发送到定义的队列中。如果需要将消息放入延迟队列中,可以在发送消息时设置相应的延迟时间。
  4. 处理消息:消费者从队列中获取消息并进行处理。如果消息无法正常处理,可以将它放入死信队列中。在处理消息时,可以根据需要设置重试机制,以便在消息处理失败时重新发送消息到队列中进行重试。
  5. 监控和管理:使用RabbitMQ的管理界面或命令行工具监控和管理死信队列和延迟队列的状态和消息。可以查看队列的大小、消息的延迟时间、消费者数量等信息,并根据需要进行调整和管理。

4、死信消息来源

  •  消息 TTL 过期
  • 队列满了,无法再次添加数据
  • 消息被拒绝(reject 或 nack),并且 requeue =false

二、案例实践

1、案例一

生产者

在生产者的Config里面添加队列 。

 //    =============死信队列===============// 定义QueueA Bean,返回QueueA实例@Beanpublic Queue QueueA() {Map<String, Object> config = new HashMap<>();//message在该队列queue的存活时间最大为5秒config.put("x-message-ttl", 5000);//x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)config.put("x-dead-letter-exchange", "deadExchange");//x-dead-letter-routing-key参数是给这个DLX指定路由键config.put("x-dead-letter-routing-key", "deadQueue");// 返回QueueA实例return new Queue("QueueA", true, true, false, config);}// 定义DirectExchangeA Bean,返回DirectExchangeA实例@Beanpublic DirectExchange DirectExchangeA() {// 返回DirectExchangeA实例return new DirectExchange("DirectExchangeA");}// 定义bindingA Bean,将QueueA绑定到DirectExchangeA,并设置路由键为A.A@Beanpublic Binding bindingA() {// 将QueueA绑定到DirectExchangeAreturn BindingBuilder// 设置路由键为A.A.bind(QueueA()).to(DirectExchangeA()).with("A.A");}// 定义QueueB Bean,返回QueueB实例@Beanpublic Queue QueueB() {// 返回QueueB实例return new Queue("QueueB");}// 定义DirectExchangeB Bean,返回DirectExchangeB实例@Beanpublic DirectExchange DirectExchangeB() {// 返回DirectExchangeB实例return new DirectExchange("DirectExchangeB");}// 定义bindingB Bean,将QueueB绑定到DirectExchangeB,并设置路由键为B.B@Beanpublic Binding bindingB() {// 将QueueB绑定到DirectExchangeBreturn BindingBuilder// 设置路由键为B.B.bind(QueueB()).to(DirectExchangeB()).with("B.B");}

消费者

在消费者里面添加QueueAQueueB 

QueueA

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "QueueA")
public class ReceiverQA {// 接收directExchange01交换机中Queue02队列消息的方法@RabbitHandlerpublic void Queue02(String msg) {log.warn("QueueA,接收到信息:" + msg);}}

QueueB

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "QueueB")
public class ReceiverQB {// 接收directExchange01交换机中Queue02队列消息的方法@RabbitHandlerpublic void Queue02(String msg) {log.warn("QueueB,接收到信息:" + msg);}}

编写Controller层

    @RequestMapping("send5")public String send5() {rabbitTemplate.convertAndSend("DirectExchangeA", "A.A", "Hello,A");return "🐉";}

结果:

2、案例二(消息接收确认 )

  • 如果某个服务忘记 ACK 了,则 RabbitMQ 不会再发送数据给它,因为 RabbitMQ 认为该服务的处理能力有限

  • ACK 机制还可以起到限流作用,比如在接收到某条消息时休眠几秒钟

  • 消息确认模式有:

    • AcknowledgeMode.NONE:自动确认

    • AcknowledgeMode.AUTO:根据情况确认

    • AcknowledgeMode.MANUAL:手动确认

配置文件
        默认情况下消息消费者是自动 ack (确认)消息的,如果要手动 ack(确认)则需要修改确认模式为 manual

spring:rabbitmq:listener:simple:acknowledge-mode: manual

在消费者里的更改ReceiverQA 

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "QueueA")
public class ReceiverQA {// 接收directExchange01交换机中Queue02队列消息的方法@RabbitHandlerpublic void QueueA(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {log.warn("QueueA,接收到信息:" + msg);channel.basicAck(tag, false);}}
  • 需要注意的 basicAck 方法需要传递两个参数

    • deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel

    • multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息

3、总结

  • 持久化

    • exchange要持久化

    • queue要持久化

    • message要持久化

  • 消息确认

    • 启动消费返回(@ReturnList注解,生产者就可以知道哪些消息没有发出去)

    • 生产者和Server(broker)之间的消息确认

    • 消费者和Server(broker)之间的消息确认

相关文章:

【RabbitMQ】死信(延迟队列)的使用

目录 一、介绍 1、什么是死信队列(延迟队列) 2、应用场景 3、死信队列(延迟队列)的使用 4、死信消息来源 二、案例实践 1、案例一 2、案例二&#xff08;消息接收确认 &#xff09; 3、总结 一、介绍 1、什么是死信队列(延迟队列) 死信&#xff0c;在官网中对应的单词…...

java 解析word模板(2024-01-25)

本文主要功能是解析word模板 这是一个word解析类&#xff0c;因为我做的系统用到了而且没有可用的帮助类&#xff0c;只能自己写。之前的实现方式是freemarker 模板解析。但是这次要求用poi不在使用freemarker。实现功能比较少&#xff0c;主要是满足开发需求即可&#xff0c;没…...

flutter-相关个人记录

1、flutter 安卓打包打包报错 flutter build apk -v --no-tree-shake-icons 2、获取华为指纹证书命令 keytool -list -v -keystore ***.jks 3、IOS项目中私有方法查找隐藏文件中 1、cd 项目目录地址 2、grep -r xerbla. "xerbla"为需要查找的关键字 3…...

互斥锁/读写锁(Linux)

一、互斥锁 临界资源概念&#xff1a; 不能同时访问的资源&#xff0c;比如写文件&#xff0c;只能由一个线程写&#xff0c;同时写会写乱。 比如外设打印机&#xff0c;打印的时候只能由一个程序使用。 外设基本上都是不能共享的资源。 生活中比如卫生间&#xff0c;同一…...

Jackson序列化Bean额外属性附加--@JsonAnyGetter、@JsonUnwrapped用户

1. 场景 有一项工作&#xff0c;需要将数据从一个服务S中读取出来&#xff08;得到的是一个JSON&#xff09;&#xff0c;将数据解析转换以后构造成一个数组的类型A的对象&#xff0c;写入到一个服务T中。 A.class Data public class A {String f0 ;String f1 ; }在发现需要…...

排序算法——冒泡排序算法详解

冒泡排序算法详解 1.引言2.算法概览2.1输入处理2.2核心算法步骤2.3数据结构2.4复杂度分析 3.算法优化4.边界条件和异常处理5.实验和测试6.应用和扩展7.代码示例8.总结 1.引言 冒泡排序是一种简单而直观的比较排序算法&#xff0c;它通过多次遍历数组&#xff0c;比较相邻元素并…...

宋仕强论道之华强北的缺货潮(十六)

始于2019年缺货潮让华强北又生产一大批亿万富翁&#xff0c;缺货的原因主要是&#xff1a;首先&#xff0c;疫情封控导致大量白领在家远程办公&#xff0c;需要购买电脑、打印机等办公设备&#xff0c;同时孩子们也要在家上网课&#xff0c;进一步增加对电子智能终端产品的需求…...

登录注册页面

前提&#xff1a;基于element-ui环境 模态登录组件 分析Login.vue <template><div class"login"><span click"handleClose">X</span></div> </template><script> export default {name: "Login",m…...

视频美颜SDK详解:动态贴纸技术的前沿探索

当下&#xff0c;美颜SDK的动态贴纸技术作为视频美颜的独特亮点&#xff0c;吸引了越来越多开发者和用户的关注。 一、技术详解 动态贴纸技术是视频美颜SDK中的一项创新性功能&#xff0c;它通过在实时视频中添加各种动态效果&#xff0c;为用户提供更加生动有趣的拍摄体验。…...

vue3 实现上传图片裁剪

在线的例子以及代码&#xff0c;请点击访问链接...

flink1.18 广播流 The Broadcast State Pattern 官方案例scala版本

对应官网 https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/ 测试数据 * 广播流 官方案例 scala版本* 广播状态* https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance…...

vueRouter中scrollBehavior实现滚动固定位置

使用前端路由&#xff0c;当切换到新路由时&#xff0c;想要页面滚到顶部&#xff0c;或者是保持原先的滚动位置&#xff0c;就像重新加载页面那样。 vue-router 能做到&#xff0c;而且更好&#xff0c;它让你可以自定义路由切换时页面如何滚动。 注意: 这个功能只在 HTML5 h…...

解决WinForms跨线程操作控件的问题

解决WinForms跨线程操作控件的问题 介绍 在构建Windows窗体应用程序时&#xff0c;我们通常会遇到需要从非UI线程更新UI元素的场景。由于WinForms控件并不是线程安全的&#xff0c;直接这样做会抛出一个异常&#xff1a;“控件’control name’是从其他线程创建的&#xff0c;…...

从零开始:Git 上传与使用指南

Git 是一种非常强大的版本控制系统&#xff0c;它可以帮助您在多人协作开发项目中更好地管理代码版本&#xff0c;并确保每个团队成员都能及时地获取最新的代码更改。在使用 Git 进行版本控制之前&#xff0c;您需要先进行一些设置&#xff0c;以确保您的代码能够顺利地与远程仓…...

Docker compose部署Golang服务

Docker Compose 部署 在使用docker部署时&#xff0c;除了使用--link的方式来关联容器之外&#xff0c;还可以使用 docker compose 运行多个容器。 本文以项目&#xff1a;https://github.com/johncxf/go-api 为例。 定义 Dockerfile 我这里用于区分默认 Dockerfile 文件&a…...

Day36 435无重叠区间 763划分字母区间

435 无重叠区间 给定一个区间的集合&#xff0c;找到需要移除区间的最小数量&#xff0c;使剩余区间互不重叠。 注意: 可以认为区间的终点总是大于它的起点。 区间 [1,2] 和 [2,3] 的边界相互“接触”&#xff0c;但没有相互重叠。 本题与上一题类似&#xff1a; 如果按照左…...

【Servlet】如何编写第一个Servlet程序

个人主页&#xff1a;兜里有颗棉花糖 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 兜里有颗棉花糖 原创 收录于专栏【Servlet】 本专栏旨在分享学习Servlet的一点学习心得&#xff0c;欢迎大家在评论区交流讨论&#x1f48c; Servlet是Java编写的服务器端…...

读懂比特币—bitcoin代码分析(五)

今天的代码分析主要是 bitcoin/src/init.cpp 文件中的三个函数&#xff1a;AppInitSanityChecks、AppInitLockDataDirectory、AppInitInterfaces&#xff0c;下面我们来说明这三个函数是用来干什么的&#xff0c;并逐行解读函数代码&#xff0c;先贴出源代码如下&#xff1a; …...

uniapp使用uQRCode插件生成二维码的简单使用

最近在找移动端绘制二维码的问题 &#xff0c;直接上代码 下载 weapp-qrcode.js(可以通过npm install weapp-qrcode --save 下载,之后把它父子到untils目录下&#xff09; npm install weapp-qrcode --save在组件页面使用 <canvas id"couponQrcode" canvas-id&qu…...

【寒假每日一题·2024】AcWing 4965. 三国游戏(补)

文章目录 一、题目1、原题链接2、题目描述 二、解题报告1、思路分析2、时间复杂度3、代码详解 一、题目 1、原题链接 4965. 三国游戏 2、题目描述 二、解题报告 1、思路分析 思路参考y总&#xff1a;y总讲解视频 &#xff08;1&#xff09;题目中的获胜情况分为三种&#xff…...

李飞飞团队新作ESI-Bench:具身智能的ImageNet来了!

点击下方卡片&#xff0c;关注“CVer”公众号AI/CV重磅干货&#xff0c;第一时间送达【具身智能】微信群成立&#xff01;大家快扫码加入具身星球&#xff0c;将获得&#xff1a;最新具身智能技术和项目、❤️ 从入门到精通的学习路线、&#x1f916; 具身智能招聘(实习/校招/社…...

DeepSeek模型版本选择实战手册(2024最新版):从推理延迟、显存占用到LoRA兼容性全拆解

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;DeepSeek模型版本选择实战手册&#xff08;2024最新版&#xff09;&#xff1a;从推理延迟、显存占用到LoRA兼容性全拆解 选择合适的 DeepSeek 模型版本是部署高效、低成本大模型服务的关键前提。2024…...

企业级多模型聚合平台选型,如何通过用量看板实现成本精细化管理

&#x1f680; 告别海外账号与网络限制&#xff01;稳定直连全球优质大模型&#xff0c;限时半价接入中。 &#x1f449; 点击领取海量免费额度 企业级多模型聚合平台选型&#xff0c;如何通过用量看板实现成本精细化管理 当企业技术团队决定将大模型能力深度融入业务流程时&a…...

Taotoken的Token Plan套餐如何帮助初创公司控制AI实验成本

&#x1f680; 告别海外账号与网络限制&#xff01;稳定直连全球优质大模型&#xff0c;限时半价接入中。 &#x1f449; 点击领取海量免费额度 Taotoken的Token Plan套餐如何帮助初创公司控制AI实验成本 1. 成本不可预测&#xff1a;初创AI实验的常见困境 在产品原型和早期开…...

机器学习融合粒子网格法:加速器物理模拟效率提升10倍

1. 项目概述&#xff1a;当机器学习遇上粒子网格法在加速器物理这个追求极致精度的领域&#xff0c;每一次束流动力学的优化都像是在走钢丝——既要保证粒子束的稳定传输和高效加速&#xff0c;又要避免各种复杂的共振和不稳定性。传统的模拟工具&#xff0c;比如基于粒子网格法…...

电脑里突然冒出的FNPLicensingService.exe是啥?手把手教你关闭它(附Adobe/CAD/Xshell等软件排查指南)

电脑里突然冒出的FNPLicensingService.exe是啥&#xff1f;手把手教你关闭它&#xff08;附Adobe/CAD/Xshell等软件排查指南&#xff09;当你打开任务管理器&#xff0c;突然发现一个陌生的进程FNPLicensingService.exe在后台运行&#xff0c;甚至频繁请求联网&#xff0c;这难…...

【流体】基于matlab对沼气厂管道系统进行流体动力学设计和成本优化(最小化总年化成本TAC)【含Matlab源码 15560期】

&#x1f4a5;&#x1f4a5;&#x1f4a5;&#x1f4a5;&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;&#x1f49e;&#x1f49e;&#x1f49e;&#x1f49e;&#x1f49e;&#x1f49e;欢迎来到海神之光博客之家&#x1f49e;&#x1f49e;&#x1f49e;&#x1f49…...

Node.js 项目如何集成 Taotoken 实现稳定的大模型调用

&#x1f680; 告别海外账号与网络限制&#xff01;稳定直连全球优质大模型&#xff0c;限时半价接入中。 &#x1f449; 点击领取海量免费额度 Node.js 项目如何集成 Taotoken 实现稳定的大模型调用 对于 Node.js 后端服务开发者而言&#xff0c;在项目中引入大模型能力正变得…...

中小团队如何统一管理多个项目的AI模型调用与API密钥

&#x1f680; 告别海外账号与网络限制&#xff01;稳定直连全球优质大模型&#xff0c;限时半价接入中。 &#x1f449; 点击领取海量免费额度 中小团队如何统一管理多个项目的AI模型调用与API密钥 在中小型技术团队的日常开发中&#xff0c;多个项目并行是常态。这些项目可能…...

QiLink/道息实验室创始人简介:跨界工程师的“道息”实践录

QiLink/道息实验室创始人简介&#xff1a;跨界工程师的“道息”实践录我是徐玉生&#xff0c;一个用厨师的火候、瑜伽师的呼吸、教师的逻辑&#xff0c;搭建技术社区的“非典型工程师”。2013年&#xff0c;我同时拿到中式烹调师一级&#xff08;高级技师&#xff09;和高级瑜伽…...