RabbitMQ-死信队列(golang)
1、概念
死信(Dead Letter),字面上可以理解为未被消费者成功消费的信息,正常来说,生产者将消息放入到队列中,消费者从队列获取消息,并进行处理,但是由于某种原因,队列中的消息未被消费者拿到,这样的消息就会成为死信,存放死信消息的队列,也就被称为死信队列(Dead Letter Queue,简称DLQ)。
2、死信产生的原因
文心一言的回答如下:
- 消息被拒绝:当消费者使用basic.reject或basic.nack方法拒绝消息,并且requeue参数被设置为false时,消息会被视为死信。这意味着消费者明确表示无法或不愿意处理该消息,并且不希望该消息重新进入队列等待其他消费者处理。
- 消息处理失败:消费者由于代码错误、消息格式不正确、业务规则冲突等原因无法成功处理消息时,该消息也可以被标记为死信。这种情况下,尽管消费者尝试处理消息,但由于某些无法克服的错误,消息无法被成功消费。
- 消息过期:如果消息设置了生存时间(TTL,Time To Live),并且在这个时间内没有被消费,那么消息会过期并被视为死信。TTL是RabbitMQ中用于指定消息在队列中存活时间的参数,超过该时间的消息将被视为过期并丢弃或转发到死信队列。
- 队列长度限制:当队列中的消息数量超过了设置的最大长度时,新到达的消息无法进入队列,这些消息也会被视为死信。队列长度限制是RabbitMQ中用于控制队列大小的一种机制,当队列达到最大容量时,新到达的消息将无法被接收并可能被丢弃或转发到死信队列。
总结来说,主要原因就三个,消息被拒绝、消息过期、队列满
在一些重要的场景,比如支付场景,提交的订单超时未支付的,可以设计为进入死信队列。
3、死信队列使用实践
3.1 消息过期
设置正常队列ttl过期时间为5s,如果5s内消息没有被消费,则会自动放入死信队列中。
关键点:设置正常队列属性,ttl5s过期:
// 设置TTL并让消息过期进入死信队列,5s不消费就为进入死信队列args := amqp.Table{"x-message-ttl": int64(5000), // 5秒TTL"x-dead-letter-exchange": "","x-dead-letter-routing-key": dlx.Name,}// 声明正常队列q, err := ch.QueueDeclare("normal_queue", // nametrue, // durablefalse, // delete when unusedfalse, // exclusivefalse, // no-waitargs, // arguments)
全部代码如下:
package mainimport ("fmt"amqp "github.com/rabbitmq/amqp091-go"
)func main() {conn, err := amqp.Dial("amqp://guest:guest@xxxx.xx.xx.xxx:5672/")if err != nil {fmt.Println("Failed to connect to RabbitMQ")return}defer conn.Close()ch, err := conn.Channel()if err != nil {fmt.Println("Failed to open a channel")return}// 声明死信队列dlx, err := ch.QueueDeclare("dead_letter_queue", // nametrue, // durablefalse, // delete when unusedfalse, // exclusivefalse, // no-waitnil, // arguments)if err != nil {fmt.Println("Failed to declare a queue: dead_letter_queue,err:" + err.Error())return}err = ch.ExchangeDeclare("my_exchange", // name"direct", // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments)if err != nil {fmt.Println("Failed to declare exchange: my_exchange,err:" + err.Error())return}// 设置TTL并让消息过期进入死信队列,5s不消费就为进入死信队列args := amqp.Table{"x-message-ttl": int64(5000), // 5秒TTL"x-dead-letter-exchange": "","x-dead-letter-routing-key": dlx.Name,}// 声明正常队列,注意,必须在声明队列时就要设置死信队列信息q, err := ch.QueueDeclare("normal_queue", // nametrue, // durablefalse, // delete when unusedfalse, // exclusivefalse, // no-waitargs, // arguments)if err != nil {fmt.Println("Failed to declare a queue: normal_queue,err:" + err.Error())return}// 将正常队列绑定到交换机,并设置死信交换机和路由键err = ch.QueueBind(q.Name, // queue nameq.Name, // routing key"my_exchange", // exchangefalse,nil,)if err != nil {fmt.Println("Failed to bind queue to exchange: my_exchange,err:" + err.Error())return}err = ch.Publish("my_exchange", "normal_queue", false, false, amqp.Publishing{Body: []byte("hello world")})if err != nil {fmt.Println("Failed to publis msg to exchange: my_exchange,err:" + err.Error())return}
}
队列信息包括绑定的死信队列信息、ttl等信息如下:
运行上方代码,会向队列发送一条信息,我们先不创建消费者,5s后,消息会被自动放入死信队列。
3.2 队列满
当mq队列由于消息量过多导致队列打满时,这个时候过来的消息,将会被自动放入到死信队列中。
设置队列长度属性代码如下:
args := amqp.Table{// "x-message-ttl": int64(5000), // 5秒TTL"x-max-length": 2,"x-dead-letter-exchange": "","x-dead-letter-routing-key": dlx.Name,}// 声明正常队列q, err := ch.QueueDeclare("normal_queue", // nametrue, // durablefalse, // delete when unusedfalse, // exclusivefalse, // no-waitargs, // arguments)
队列属性如下:
发送两条信息:
继续发送第三个:
测试代码:
package mainimport ("fmt"amqp "github.com/rabbitmq/amqp091-go"
)func main() {conn, err := amqp.Dial("amqp://guest:guest@xx.xx.xx.xx:5672/")if err != nil {fmt.Println("Failed to connect to RabbitMQ")return}defer conn.Close()ch, err := conn.Channel()if err != nil {fmt.Println("Failed to open a channel")return}// 声明死信队列dlx, err := ch.QueueDeclare("dead_letter_queue", // nametrue, // durablefalse, // delete when unusedfalse, // exclusivefalse, // no-waitnil, // arguments)if err != nil {fmt.Println("Failed to declare a queue: dead_letter_queue,err:" + err.Error())return}err = ch.ExchangeDeclare("my_exchange", // name"direct", // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments)if err != nil {fmt.Println("Failed to declare exchange: my_exchange,err:" + err.Error())return}// 设置TTL并让消息过期进入死信队列,5s不消费就为进入死信队列args := amqp.Table{// "x-message-ttl": int64(5000), // 5秒TTL"x-max-length": 2,"x-dead-letter-exchange": "","x-dead-letter-routing-key": dlx.Name,}// 声明正常队列q, err := ch.QueueDeclare("normal_queue", // nametrue, // durablefalse, // delete when unusedfalse, // exclusivefalse, // no-waitargs, // arguments)if err != nil {fmt.Println("Failed to declare a queue: normal_queue,err:" + err.Error())return}// 将正常队列绑定到交换机,并设置死信交换机和路由键err = ch.QueueBind(q.Name, // queue nameq.Name, // routing key"my_exchange", // exchangefalse,nil,)if err != nil {fmt.Println("Failed to bind queue to exchange: my_exchange,err:" + err.Error())return}err = ch.Publish("my_exchange", "normal_queue", false, false, amqp.Publishing{Body: []byte("hello world")})if err != nil {fmt.Println("Failed to publis msg to exchange: my_exchange,err:" + err.Error())return}
}
3.3 消息被拒绝
消息被拒绝的情况,当消费者无法处理某条信息时,客户端想rabbitmq服务器发送一个【负确认】应答,表示消费者未能成功处理此条消息,并且希望RabbitMQ根据配置重新发送这条消息(例如,将其重新排队)或者将其丢弃。
客户端函数:ch.Nack,函数原型:
func (ch *Channel) Nack(tag uint64, multiple bool, requeue bool) error {ch.m.Lock()defer ch.m.Unlock()return ch.send(&basicNack{DeliveryTag: tag,Multiple: multiple,Requeue: requeue,})
}
入参含义如下:
tag | 这是一个唯一标识符,用于标识消费者之前接收到的特定消息。当消费者调用 |
multiple | 这是一个布尔值( |
requeue | 这也是一个布尔值( |
测试过程,首先使用3.1或者3.2的代码向mq中写入几条信息:
之后使用如下代码进行消费:
package mainimport ("fmt""time"amqp "github.com/rabbitmq/amqp091-go"
)func main() {conn, err := amqp.Dial("amqp://guest:guest@xx.xx.xx.xx:5672/")if err != nil {fmt.Println("Failed to connect to RabbitMQ")return}defer conn.Close()ch, err := conn.Channel()if err != nil {fmt.Println("Failed to open a channel")return}err = ch.ExchangeDeclare("my_exchange", // name"direct", // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments)if err != nil {fmt.Println("Failed to declare exchange: my_exchange,err:" + err.Error())return}// 声明正常队列// q, err := ch.QueueDeclare(// "normal_queue", // name// true, // durable// false, // delete when unused// false, // exclusive// false, // no-wait// nil, // arguments// )// if err != nil {// fmt.Println("Failed to declare a queue: normal_queue,err:" + err.Error())// return// }// 将正常队列绑定到交换机,并设置死信交换机和路由键err = ch.QueueBind("normal_queue", // queue name"normal_queue", // routing key"my_exchange", // exchangefalse,nil,)if err != nil {fmt.Println("Failed to bind queue to exchange: my_exchange,err:" + err.Error())return}msgs, _ := ch.Consume("normal_queue", // queue"", // consumerfalse, // auto-acktrue, // exclusivefalse, // no-localfalse, // no-waitnil, // args)go func() {for d := range msgs {// 模拟处理失败,全部放入死信队列ch.Nack(d.DeliveryTag, false, false)}}()time.Sleep(10 * time.Second)
}
运行代码后,3条消息全部进入到死信队列中:
4、总结
RabbitMQ的死信队列(Dead Letter Queue,简称DLQ)是一种用于处理消息失败或无法路由的消息的机制,死信队列中的所有消息都是无法被正常消费的死信,这使得开发者可以集中对这些消息进行管理和分析。通过分析死信队列中的消息,开发者可以了解系统的运行状态、发现潜在的问题,并进行相应的优化和改进,以提升系统的稳定性和可靠性。
相关文章:

RabbitMQ-死信队列(golang)
1、概念 死信(Dead Letter),字面上可以理解为未被消费者成功消费的信息,正常来说,生产者将消息放入到队列中,消费者从队列获取消息,并进行处理,但是由于某种原因,队列中的…...

爬虫开发工具与环境搭建——环境配置
第二章:爬虫开发工具与环境搭建 第二节:环境配置 在进行爬虫开发之前,首先需要配置好开发环境。一个良好的开发环境不仅能提高开发效率,还能避免因环境不一致带来的问题。以下是环境配置的详细步骤,涵盖了Python开发…...

15.UE5等级、经验、血条,魔法恢复和消耗制作
2-17 等级、经验、血条、魔法消耗_哔哩哔哩_bilibili 目录 1.制作UI,等级,经验,血条 2.为属性面板绑定角色真实的属性,实现动态更新 3.魔法的消耗和恢复 1.制作UI,等级,经验,血条 创建控…...

【Homework】【5】Learning resources for DQ Robotics in MATLAB
Lesson 5 代码-TwoDofPlanarRobot.m 表示一个 2 自由度平面机器人。该类包含构造函数、计算正向运动学模型的函数、计算平移雅可比矩阵的函数,以及在二维空间中绘制机器人的函数。 classdef TwoDofPlanarRobot%TwoDofPlanarRobot - 表示一个 2 自由度平面机器人类…...

vue3中 ref和reactive的区别
ref的主要作用 ref 函数接受的参数数据类型可以是原始数据类型也可以是引用数据类型。在模板中使用 ref 时,我们不需要加 .value,因为当 ref 在模板中作为顶层属性被访问时,它们会被自动解包, <p>count: {{ count }}</…...

第十四章 Spring之假如让你来写AOP——雏形篇
Spring源码阅读目录 第一部分——IOC篇 第一章 Spring之最熟悉的陌生人——IOC 第二章 Spring之假如让你来写IOC容器——加载资源篇 第三章 Spring之假如让你来写IOC容器——解析配置文件篇 第四章 Spring之假如让你来写IOC容器——XML配置文件篇 第五章 Spring之假如让你来写…...

群控系统服务端开发模式-应用开发-前端个人资料开发
一、总结 其实程序开发到现在,简单的后端框架就只剩下获取登录账号信息及获取登录账号菜单这两个功能咯。详细见下图: 1、未登录时总业务流程图 2、登录后总业务流程图 二、获取登录账号信息对接 在根目录下src文件夹下store文件夹下modules文件夹下的us…...

动态规划技巧点
动规五部曲(来自b站卡哥):1、确定DP数组中i、j…的含义。2、确定DP推导式。3、DP数组初始化。4、DP数组遍历顺序。5、DP数组打印校验。 父问题、子问题有些许区别:LeetCode343.整数拆分 今天在哔哩哔哩上刷到了一个非常有意思的l…...

深度学习之pytorch常见的学习率绘制
文章目录 0. Scope1. StepLR2. MultiStepLR3. ExponentialLR4. CosineAnnealingLR5. ReduceLROnPlateau6. CyclicLR7. OneCycleLR小结参考文献 https://blog.csdn.net/coldasice342/article/details/143435848 0. Scope 在深度学习中,学习率(Learning R…...

Spring Boot集成SQL Server快速入门Demo
1.什么是SQL Server? SQL Server是由Microsoft开发和推广的以客户/服务器(c/s)模式访问、使用Transact-SQL语言的关系数据库管理系统(DBMS),它最初是由Microsoft、Sybase和Ashton-Tate三家公司共同开发的&…...

低代码牵手 AI 接口:开启智能化开发新征程
一、低代码与 AI 接口的结合趋势 低代码开发平台近年来在软件开发领域迅速崛起。随着企业数字化转型的需求不断增长,低代码开发平台以其快速构建应用程序的优势,满足了企业对高效开发的需求。例如,启效云低代码平台通过范式化和高颗粒度的可配…...

【已解决】git push一直提示输入用户名及密码、fatal: Could not read from remote repository的问题
问题描述: 在实操中,git push代码到github上一直提示输入用户名及密码,并且跳出的输入框输入用户名和密码后,报错找不到远程仓库 实际解决中,发现我环境有两个问题解决: git push一直提示输入用户名及密码…...

python语言基础-4 常用模块-4.13 其他模块
声明:本内容非盈利性质,也不支持任何组织或个人将其用作盈利用途。本内容来源于参考书或网站,会尽量附上原文链接,并鼓励大家看原文。侵删。 4.13 其他模块 除此之外python中还有大量的功能模块,如: pill…...

微信小程序=》基础=》常见问题=》性能总结
文章目录 微信小程序开发应用 实例小程序生命周期 以及 各生命周期应用实例小程序图片 展示方案 小程序打包应用方案技术细节(分包应用实例)技术细节(压缩处理)一、准备工作二、JavaScript 代码压缩三、WXML 文件优化(…...

JWT深度解析:Java Web中的安全传输与身份验证
标题:JWT深度解析:Java Web中的安全传输与身份验证 引言 JSON Web Token(JWT)是一种轻量级的身份验证和授权标准,它允许在各方之间安全地传输信息。在Java Web开发中,JWT因其无状态、可扩展性和跨域支持而…...

使用Java爬虫获取商品订单详情:从API到数据存储
在电子商务日益发展的今天,获取商品订单详情成为了许多开发者和数据分析师的需求。无论是为了分析用户行为,还是为了优化库存管理,订单数据的获取都是至关重要的。本文将详细介绍如何使用Java编写爬虫,通过API获取商品订单详情&am…...

Mybatis中批量插入foreach优化
数据库批量入库方常见方式:Java中foreach和xml中使用foreach 两者的区别: 通过Java的foreach循环批量插入: 当我们在Java通过foreach循环插入的时候,是一条一条sql执行然后将事物统一交给spring的事物来管理(Transa…...

Word VBA如何间隔选中多个(非连续)段落
实例需求:Word文档中的有多个段落,段落总数量不确定,现在需要先选中所有基数段落,即:段落1,段落3 … ,然后一次性设置粗体格式。 也许有的读者会认为这个无厘头的需求,循环遍历遍历文…...

Linux系统常用操作与命令指南
一、快捷分类 1、移动光标 h, j, k, l 左, 下, 上, 右 Ctrl-F:下翻一页 Ctrl-B:上翻一页 Ctrl-U:上翻半页 Ctrl-d:下翻半页 0:跳至行首,不管有无缩进,就是跳到第0个字…...

StructuredStreaming (一)
一、sparkStreaming的不足 1.基于微批,延迟高不能做到真正的实时 2.DStream基于RDD,不直接支持SQL 3.流批处理的API应用层不统一,(流用的DStream-底层是RDD,批用的DF/DS/RDD) 4.不支持EventTime事件时间(一般流处理都会有两个时间:事件发生的事件&am…...

由播客转向个人定制的音频频道(1)平台搭建
项目的背景 最近开始听喜马拉雅播客的内容,但是发现许多不方便的地方。 休息的时候收听喜马拉雅,但是还需要不断地选择喜马拉雅的内容,比较麻烦,而且黑灯操作反而伤眼睛。 喜马拉雅为代表的播客平台都是VOD 形式的࿰…...

[自然语言处理] [AI]深入理解语言与情感分类:从基础到深度学习的进展
语言是人类智能的核心组成部分,具有极高的复杂性和多样性。理解语言,尤其是语言中的隐含部分,向来是人工智能研究的一个巨大挑战。图灵测试本身便是一场关于语言生成与理解的比赛,旨在检验机器是否能够模拟人类的语言能力。随着深度学习的飞速发展,语音识别、情感分析等自…...

【GPTs】Gif-PT:DALL·E制作创意动图与精灵动画
博客主页: [小ᶻZ࿆] 本文专栏: AIGC | GPTs应用实例 文章目录 💯GPTs指令💯前言💯Gif-PT主要功能适用场景优点缺点 💯小结 💯GPTs指令 中文翻译: 使用Dalle生成用户请求的精灵图动画&#…...

云原生周刊:Istio 1.24.0 正式发布
云原生周刊:Istio 1.24.0 正式发布 开源项目推荐 Kopf Kopf 是一个简洁高效的 Python 框架,只需几行代码即可编写 Kubernetes Operator。Kubernetes(K8s)作为强大的容器编排系统,虽自带命令行工具(kubec…...

Linux设置jar包开机启动
操作系统环境:CentOS 7 【需要 root 权限,使用 root 用户进行操作 或 普通用户使用 sudo 进行操作】 一、系统服务的方式 原理:利用系统服务管理应用程序的生命周期, systemctl 为系统服务管理工具 systemctl start applicati…...

计算机视觉和机器人技术中的下一个标记预测与视频扩散相结合
一种新方法可以训练神经网络对损坏的数据进行分类,同时预测下一步操作。 它可以为机器人制定灵活的计划,生成高质量的视频,并帮助人工智能代理导航数字环境。 Diffusion Forcing 方法可以对嘈杂的数据进行分类,并可靠地预测任务的…...

C语言之简单的获取命令行参数和环境变量
C语言之简单的获取命令行参数和环境变量 本人的开发环境为WIN10操作系统用VMWARE虚拟的UBUNTU LINUX 18.04LTS!!! 所有代码的编辑、编译、运行都在虚拟机上操作,初学的朋友要注意这一点!!! 详细…...

STL之vecor的使用(超详解)
目录 1. C/C中的数组 1.1. C语言中的数组 1.2. C中的数组 2. vector的接口 2.1. vector的迭代器 2.2. vector的初始化与销毁 2.3. vector的容量操作 2.4. vector的访问操作 2.5. vector的修改操作 💓 博客主页:C-SDN花园GGbond ⏩ 文章专栏…...

SystemVerilog学习笔记(一):数据类型
在systemverilog中,主要包含以下数据类型: 4值类型2值类型数组字符串结构体和联合体枚举自定义类型 无符号数:无符号数的符号不使用任何标志,即无符号数只能存储正数。无符号二进制数的范围从 0 到 ((2^n) - 1),n 表…...

Linux软件包管理与Vim编辑器使用指南
目录 一、Linux软件包管理器yum 1.什么是软件包? 2.什么是软件包管理器? 3.查看软件包 4.安装软件 编辑 5.卸载软件 Linux开发工具: 二、Linux编辑器---vim 1.vim的基本概念 (1) 正常/普通模式(Normal mode࿰…...