RabbitMQ-死信队列(golang)
1、概念
死信(Dead Letter),字面上可以理解为未被消费者成功消费的信息,正常来说,生产者将消息放入到队列中,消费者从队列获取消息,并进行处理,但是由于某种原因,队列中的消息未被消费者拿到,这样的消息就会成为死信,存放死信消息的队列,也就被称为死信队列(Dead Letter Queue,简称DLQ)。
2、死信产生的原因
文心一言的回答如下:
- 消息被拒绝:当消费者使用basic.reject或basic.nack方法拒绝消息,并且requeue参数被设置为false时,消息会被视为死信。这意味着消费者明确表示无法或不愿意处理该消息,并且不希望该消息重新进入队列等待其他消费者处理。
- 消息处理失败:消费者由于代码错误、消息格式不正确、业务规则冲突等原因无法成功处理消息时,该消息也可以被标记为死信。这种情况下,尽管消费者尝试处理消息,但由于某些无法克服的错误,消息无法被成功消费。
- 消息过期:如果消息设置了生存时间(TTL,Time To Live),并且在这个时间内没有被消费,那么消息会过期并被视为死信。TTL是RabbitMQ中用于指定消息在队列中存活时间的参数,超过该时间的消息将被视为过期并丢弃或转发到死信队列。
- 队列长度限制:当队列中的消息数量超过了设置的最大长度时,新到达的消息无法进入队列,这些消息也会被视为死信。队列长度限制是RabbitMQ中用于控制队列大小的一种机制,当队列达到最大容量时,新到达的消息将无法被接收并可能被丢弃或转发到死信队列。
总结来说,主要原因就三个,消息被拒绝、消息过期、队列满
在一些重要的场景,比如支付场景,提交的订单超时未支付的,可以设计为进入死信队列。
3、死信队列使用实践
3.1 消息过期
设置正常队列ttl过期时间为5s,如果5s内消息没有被消费,则会自动放入死信队列中。
关键点:设置正常队列属性,ttl5s过期:
// 设置TTL并让消息过期进入死信队列,5s不消费就为进入死信队列args := amqp.Table{"x-message-ttl": int64(5000), // 5秒TTL"x-dead-letter-exchange": "","x-dead-letter-routing-key": dlx.Name,}// 声明正常队列q, err := ch.QueueDeclare("normal_queue", // nametrue, // durablefalse, // delete when unusedfalse, // exclusivefalse, // no-waitargs, // arguments)
全部代码如下:
package mainimport ("fmt"amqp "github.com/rabbitmq/amqp091-go"
)func main() {conn, err := amqp.Dial("amqp://guest:guest@xxxx.xx.xx.xxx:5672/")if err != nil {fmt.Println("Failed to connect to RabbitMQ")return}defer conn.Close()ch, err := conn.Channel()if err != nil {fmt.Println("Failed to open a channel")return}// 声明死信队列dlx, err := ch.QueueDeclare("dead_letter_queue", // nametrue, // durablefalse, // delete when unusedfalse, // exclusivefalse, // no-waitnil, // arguments)if err != nil {fmt.Println("Failed to declare a queue: dead_letter_queue,err:" + err.Error())return}err = ch.ExchangeDeclare("my_exchange", // name"direct", // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments)if err != nil {fmt.Println("Failed to declare exchange: my_exchange,err:" + err.Error())return}// 设置TTL并让消息过期进入死信队列,5s不消费就为进入死信队列args := amqp.Table{"x-message-ttl": int64(5000), // 5秒TTL"x-dead-letter-exchange": "","x-dead-letter-routing-key": dlx.Name,}// 声明正常队列,注意,必须在声明队列时就要设置死信队列信息q, err := ch.QueueDeclare("normal_queue", // nametrue, // durablefalse, // delete when unusedfalse, // exclusivefalse, // no-waitargs, // arguments)if err != nil {fmt.Println("Failed to declare a queue: normal_queue,err:" + err.Error())return}// 将正常队列绑定到交换机,并设置死信交换机和路由键err = ch.QueueBind(q.Name, // queue nameq.Name, // routing key"my_exchange", // exchangefalse,nil,)if err != nil {fmt.Println("Failed to bind queue to exchange: my_exchange,err:" + err.Error())return}err = ch.Publish("my_exchange", "normal_queue", false, false, amqp.Publishing{Body: []byte("hello world")})if err != nil {fmt.Println("Failed to publis msg to exchange: my_exchange,err:" + err.Error())return}
}
队列信息包括绑定的死信队列信息、ttl等信息如下:
运行上方代码,会向队列发送一条信息,我们先不创建消费者,5s后,消息会被自动放入死信队列。
3.2 队列满
当mq队列由于消息量过多导致队列打满时,这个时候过来的消息,将会被自动放入到死信队列中。
设置队列长度属性代码如下:
args := amqp.Table{// "x-message-ttl": int64(5000), // 5秒TTL"x-max-length": 2,"x-dead-letter-exchange": "","x-dead-letter-routing-key": dlx.Name,}// 声明正常队列q, err := ch.QueueDeclare("normal_queue", // nametrue, // durablefalse, // delete when unusedfalse, // exclusivefalse, // no-waitargs, // arguments)
队列属性如下:
发送两条信息:
继续发送第三个:
测试代码:
package mainimport ("fmt"amqp "github.com/rabbitmq/amqp091-go"
)func main() {conn, err := amqp.Dial("amqp://guest:guest@xx.xx.xx.xx:5672/")if err != nil {fmt.Println("Failed to connect to RabbitMQ")return}defer conn.Close()ch, err := conn.Channel()if err != nil {fmt.Println("Failed to open a channel")return}// 声明死信队列dlx, err := ch.QueueDeclare("dead_letter_queue", // nametrue, // durablefalse, // delete when unusedfalse, // exclusivefalse, // no-waitnil, // arguments)if err != nil {fmt.Println("Failed to declare a queue: dead_letter_queue,err:" + err.Error())return}err = ch.ExchangeDeclare("my_exchange", // name"direct", // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments)if err != nil {fmt.Println("Failed to declare exchange: my_exchange,err:" + err.Error())return}// 设置TTL并让消息过期进入死信队列,5s不消费就为进入死信队列args := amqp.Table{// "x-message-ttl": int64(5000), // 5秒TTL"x-max-length": 2,"x-dead-letter-exchange": "","x-dead-letter-routing-key": dlx.Name,}// 声明正常队列q, err := ch.QueueDeclare("normal_queue", // nametrue, // durablefalse, // delete when unusedfalse, // exclusivefalse, // no-waitargs, // arguments)if err != nil {fmt.Println("Failed to declare a queue: normal_queue,err:" + err.Error())return}// 将正常队列绑定到交换机,并设置死信交换机和路由键err = ch.QueueBind(q.Name, // queue nameq.Name, // routing key"my_exchange", // exchangefalse,nil,)if err != nil {fmt.Println("Failed to bind queue to exchange: my_exchange,err:" + err.Error())return}err = ch.Publish("my_exchange", "normal_queue", false, false, amqp.Publishing{Body: []byte("hello world")})if err != nil {fmt.Println("Failed to publis msg to exchange: my_exchange,err:" + err.Error())return}
}
3.3 消息被拒绝
消息被拒绝的情况,当消费者无法处理某条信息时,客户端想rabbitmq服务器发送一个【负确认】应答,表示消费者未能成功处理此条消息,并且希望RabbitMQ根据配置重新发送这条消息(例如,将其重新排队)或者将其丢弃。
客户端函数:ch.Nack,函数原型:
func (ch *Channel) Nack(tag uint64, multiple bool, requeue bool) error {ch.m.Lock()defer ch.m.Unlock()return ch.send(&basicNack{DeliveryTag: tag,Multiple: multiple,Requeue: requeue,})
}
入参含义如下:
tag | 这是一个唯一标识符,用于标识消费者之前接收到的特定消息。当消费者调用 |
multiple | 这是一个布尔值( |
requeue | 这也是一个布尔值( |
测试过程,首先使用3.1或者3.2的代码向mq中写入几条信息:
之后使用如下代码进行消费:
package mainimport ("fmt""time"amqp "github.com/rabbitmq/amqp091-go"
)func main() {conn, err := amqp.Dial("amqp://guest:guest@xx.xx.xx.xx:5672/")if err != nil {fmt.Println("Failed to connect to RabbitMQ")return}defer conn.Close()ch, err := conn.Channel()if err != nil {fmt.Println("Failed to open a channel")return}err = ch.ExchangeDeclare("my_exchange", // name"direct", // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments)if err != nil {fmt.Println("Failed to declare exchange: my_exchange,err:" + err.Error())return}// 声明正常队列// q, err := ch.QueueDeclare(// "normal_queue", // name// true, // durable// false, // delete when unused// false, // exclusive// false, // no-wait// nil, // arguments// )// if err != nil {// fmt.Println("Failed to declare a queue: normal_queue,err:" + err.Error())// return// }// 将正常队列绑定到交换机,并设置死信交换机和路由键err = ch.QueueBind("normal_queue", // queue name"normal_queue", // routing key"my_exchange", // exchangefalse,nil,)if err != nil {fmt.Println("Failed to bind queue to exchange: my_exchange,err:" + err.Error())return}msgs, _ := ch.Consume("normal_queue", // queue"", // consumerfalse, // auto-acktrue, // exclusivefalse, // no-localfalse, // no-waitnil, // args)go func() {for d := range msgs {// 模拟处理失败,全部放入死信队列ch.Nack(d.DeliveryTag, false, false)}}()time.Sleep(10 * time.Second)
}
运行代码后,3条消息全部进入到死信队列中:
4、总结
RabbitMQ的死信队列(Dead Letter Queue,简称DLQ)是一种用于处理消息失败或无法路由的消息的机制,死信队列中的所有消息都是无法被正常消费的死信,这使得开发者可以集中对这些消息进行管理和分析。通过分析死信队列中的消息,开发者可以了解系统的运行状态、发现潜在的问题,并进行相应的优化和改进,以提升系统的稳定性和可靠性。
相关文章:

RabbitMQ-死信队列(golang)
1、概念 死信(Dead Letter),字面上可以理解为未被消费者成功消费的信息,正常来说,生产者将消息放入到队列中,消费者从队列获取消息,并进行处理,但是由于某种原因,队列中的…...

爬虫开发工具与环境搭建——环境配置
第二章:爬虫开发工具与环境搭建 第二节:环境配置 在进行爬虫开发之前,首先需要配置好开发环境。一个良好的开发环境不仅能提高开发效率,还能避免因环境不一致带来的问题。以下是环境配置的详细步骤,涵盖了Python开发…...

15.UE5等级、经验、血条,魔法恢复和消耗制作
2-17 等级、经验、血条、魔法消耗_哔哩哔哩_bilibili 目录 1.制作UI,等级,经验,血条 2.为属性面板绑定角色真实的属性,实现动态更新 3.魔法的消耗和恢复 1.制作UI,等级,经验,血条 创建控…...

【Homework】【5】Learning resources for DQ Robotics in MATLAB
Lesson 5 代码-TwoDofPlanarRobot.m 表示一个 2 自由度平面机器人。该类包含构造函数、计算正向运动学模型的函数、计算平移雅可比矩阵的函数,以及在二维空间中绘制机器人的函数。 classdef TwoDofPlanarRobot%TwoDofPlanarRobot - 表示一个 2 自由度平面机器人类…...
vue3中 ref和reactive的区别
ref的主要作用 ref 函数接受的参数数据类型可以是原始数据类型也可以是引用数据类型。在模板中使用 ref 时,我们不需要加 .value,因为当 ref 在模板中作为顶层属性被访问时,它们会被自动解包, <p>count: {{ count }}</…...

第十四章 Spring之假如让你来写AOP——雏形篇
Spring源码阅读目录 第一部分——IOC篇 第一章 Spring之最熟悉的陌生人——IOC 第二章 Spring之假如让你来写IOC容器——加载资源篇 第三章 Spring之假如让你来写IOC容器——解析配置文件篇 第四章 Spring之假如让你来写IOC容器——XML配置文件篇 第五章 Spring之假如让你来写…...

群控系统服务端开发模式-应用开发-前端个人资料开发
一、总结 其实程序开发到现在,简单的后端框架就只剩下获取登录账号信息及获取登录账号菜单这两个功能咯。详细见下图: 1、未登录时总业务流程图 2、登录后总业务流程图 二、获取登录账号信息对接 在根目录下src文件夹下store文件夹下modules文件夹下的us…...
动态规划技巧点
动规五部曲(来自b站卡哥):1、确定DP数组中i、j…的含义。2、确定DP推导式。3、DP数组初始化。4、DP数组遍历顺序。5、DP数组打印校验。 父问题、子问题有些许区别:LeetCode343.整数拆分 今天在哔哩哔哩上刷到了一个非常有意思的l…...

深度学习之pytorch常见的学习率绘制
文章目录 0. Scope1. StepLR2. MultiStepLR3. ExponentialLR4. CosineAnnealingLR5. ReduceLROnPlateau6. CyclicLR7. OneCycleLR小结参考文献 https://blog.csdn.net/coldasice342/article/details/143435848 0. Scope 在深度学习中,学习率(Learning R…...

Spring Boot集成SQL Server快速入门Demo
1.什么是SQL Server? SQL Server是由Microsoft开发和推广的以客户/服务器(c/s)模式访问、使用Transact-SQL语言的关系数据库管理系统(DBMS),它最初是由Microsoft、Sybase和Ashton-Tate三家公司共同开发的&…...

低代码牵手 AI 接口:开启智能化开发新征程
一、低代码与 AI 接口的结合趋势 低代码开发平台近年来在软件开发领域迅速崛起。随着企业数字化转型的需求不断增长,低代码开发平台以其快速构建应用程序的优势,满足了企业对高效开发的需求。例如,启效云低代码平台通过范式化和高颗粒度的可配…...

【已解决】git push一直提示输入用户名及密码、fatal: Could not read from remote repository的问题
问题描述: 在实操中,git push代码到github上一直提示输入用户名及密码,并且跳出的输入框输入用户名和密码后,报错找不到远程仓库 实际解决中,发现我环境有两个问题解决: git push一直提示输入用户名及密码…...
python语言基础-4 常用模块-4.13 其他模块
声明:本内容非盈利性质,也不支持任何组织或个人将其用作盈利用途。本内容来源于参考书或网站,会尽量附上原文链接,并鼓励大家看原文。侵删。 4.13 其他模块 除此之外python中还有大量的功能模块,如: pill…...
微信小程序=》基础=》常见问题=》性能总结
文章目录 微信小程序开发应用 实例小程序生命周期 以及 各生命周期应用实例小程序图片 展示方案 小程序打包应用方案技术细节(分包应用实例)技术细节(压缩处理)一、准备工作二、JavaScript 代码压缩三、WXML 文件优化(…...
JWT深度解析:Java Web中的安全传输与身份验证
标题:JWT深度解析:Java Web中的安全传输与身份验证 引言 JSON Web Token(JWT)是一种轻量级的身份验证和授权标准,它允许在各方之间安全地传输信息。在Java Web开发中,JWT因其无状态、可扩展性和跨域支持而…...

使用Java爬虫获取商品订单详情:从API到数据存储
在电子商务日益发展的今天,获取商品订单详情成为了许多开发者和数据分析师的需求。无论是为了分析用户行为,还是为了优化库存管理,订单数据的获取都是至关重要的。本文将详细介绍如何使用Java编写爬虫,通过API获取商品订单详情&am…...
Mybatis中批量插入foreach优化
数据库批量入库方常见方式:Java中foreach和xml中使用foreach 两者的区别: 通过Java的foreach循环批量插入: 当我们在Java通过foreach循环插入的时候,是一条一条sql执行然后将事物统一交给spring的事物来管理(Transa…...

Word VBA如何间隔选中多个(非连续)段落
实例需求:Word文档中的有多个段落,段落总数量不确定,现在需要先选中所有基数段落,即:段落1,段落3 … ,然后一次性设置粗体格式。 也许有的读者会认为这个无厘头的需求,循环遍历遍历文…...
Linux系统常用操作与命令指南
一、快捷分类 1、移动光标 h, j, k, l 左, 下, 上, 右 Ctrl-F:下翻一页 Ctrl-B:上翻一页 Ctrl-U:上翻半页 Ctrl-d:下翻半页 0:跳至行首,不管有无缩进,就是跳到第0个字…...

StructuredStreaming (一)
一、sparkStreaming的不足 1.基于微批,延迟高不能做到真正的实时 2.DStream基于RDD,不直接支持SQL 3.流批处理的API应用层不统一,(流用的DStream-底层是RDD,批用的DF/DS/RDD) 4.不支持EventTime事件时间(一般流处理都会有两个时间:事件发生的事件&am…...

【人工智能】神经网络的优化器optimizer(二):Adagrad自适应学习率优化器
一.自适应梯度算法Adagrad概述 Adagrad(Adaptive Gradient Algorithm)是一种自适应学习率的优化算法,由Duchi等人在2011年提出。其核心思想是针对不同参数自动调整学习率,适合处理稀疏数据和不同参数梯度差异较大的场景。Adagrad通…...

关于iview组件中使用 table , 绑定序号分页后序号从1开始的解决方案
问题描述:iview使用table 中type: "index",分页之后 ,索引还是从1开始,试过绑定后台返回数据的id, 这种方法可行,就是后台返回数据的每个页面id都不完全是按照从1开始的升序,因此百度了下,找到了…...
Spring AI 入门:Java 开发者的生成式 AI 实践之路
一、Spring AI 简介 在人工智能技术快速迭代的今天,Spring AI 作为 Spring 生态系统的新生力量,正在成为 Java 开发者拥抱生成式 AI 的最佳选择。该框架通过模块化设计实现了与主流 AI 服务(如 OpenAI、Anthropic)的无缝对接&…...
Python 包管理器 uv 介绍
Python 包管理器 uv 全面介绍 uv 是由 Astral(热门工具 Ruff 的开发者)推出的下一代高性能 Python 包管理器和构建工具,用 Rust 编写。它旨在解决传统工具(如 pip、virtualenv、pip-tools)的性能瓶颈,同时…...

Vue ③-生命周期 || 脚手架
生命周期 思考:什么时候可以发送初始化渲染请求?(越早越好) 什么时候可以开始操作dom?(至少dom得渲染出来) Vue生命周期: 一个Vue实例从 创建 到 销毁 的整个过程。 生命周期四个…...

零知开源——STM32F103RBT6驱动 ICM20948 九轴传感器及 vofa + 上位机可视化教程
STM32F1 本教程使用零知标准板(STM32F103RBT6)通过I2C驱动ICM20948九轴传感器,实现姿态解算,并通过串口将数据实时发送至VOFA上位机进行3D可视化。代码基于开源库修改优化,适合嵌入式及物联网开发者。在基础驱动上新增…...

《Docker》架构
文章目录 架构模式单机架构应用数据分离架构应用服务器集群架构读写分离/主从分离架构冷热分离架构垂直分库架构微服务架构容器编排架构什么是容器,docker,镜像,k8s 架构模式 单机架构 单机架构其实就是应用服务器和单机服务器都部署在同一…...
es6+和css3新增的特性有哪些
一:ECMAScript 新特性(ES6) ES6 (2015) - 革命性更新 1,记住的方法,从一个方法里面用到了哪些技术 1,let /const块级作用域声明2,**默认参数**:函数参数可以设置默认值。3&#x…...
2025年低延迟业务DDoS防护全攻略:高可用架构与实战方案
一、延迟敏感行业面临的DDoS攻击新挑战 2025年,金融交易、实时竞技游戏、工业物联网等低延迟业务成为DDoS攻击的首要目标。攻击呈现三大特征: AI驱动的自适应攻击:攻击流量模拟真实用户行为,差异率低至0.5%,传统规则引…...
【深尚想】TPS54618CQRTERQ1汽车级同步降压转换器电源芯片全面解析
1. 元器件定义与技术特点 TPS54618CQRTERQ1 是德州仪器(TI)推出的一款 汽车级同步降压转换器(DC-DC开关稳压器),属于高性能电源管理芯片。核心特性包括: 输入电压范围:2.95V–6V,输…...