Golang实现RabbitMQ中死信队列各个情况
下面这段教程针对是你已经有一些基本的MQ的知识,比如说能够很清楚的理解queue、exchange等概念,如果你还不是很理解,我建议你先访问官网查看基本的教程。
文章目录
- 1、造成死信队列的主要原因
- 2、操作逻辑图
- 3、代码实战
- 3.1 针对原因1:消费者超出时间未应答
- 3.3 针对原因2:限制一定的长度
- 3.3 针对原因3:消费者拒绝的消息回到死信队列中
1、造成死信队列的主要原因
- 消费者超时未应答
- 队列的容量有限
- 消费者拒绝了的消息
2、操作逻辑图

3、代码实战
其实整体的思路就是分别创建一个normal_exchange、dead_exchange、normal_queue、dead_queue,然后将normal_exchange与normal_queue进行绑定,将dead_exchange与dead_queue进行绑定,这里比较关键的一个点在于说如何将normal_queue与dead_exchange进行绑定,这样才能将错误的消息传递过来。下面就是这段代码的关键。
// 声明一个normal队列_, err = ch.QueueDeclare(constant.NormalQueue,true,false,false,false,amqp.Table{//"x-message-ttl": 5000, // 指定过期时间//"x-max-length": 6, // 指定长度。超过这个长度的消息会发送到dead_exchange中"x-dead-letter-exchange": constant.DeadExchange, // 指定死信交换机"x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key})
3.1 针对原因1:消费者超出时间未应答
consumer1.go
package day07import (amqp "github.com/rabbitmq/amqp091-go""log""v1/utils"
)type Constant struct {NormalExchange stringDeadExchange stringNormalQueue stringDeadQueue stringNormalRoutingKey stringDeadRoutingKey string
}func Consumer1() {// 获取连接ch := utils.GetChannel()// 创建一个变量常量constant := Constant{NormalExchange: "normal_exchange",DeadExchange: "dead_exchange",NormalQueue: "normal_queue",DeadQueue: "dead_queue",NormalRoutingKey: "normal_key",DeadRoutingKey: "dead_key",}// 声明normal交换机err := ch.ExchangeDeclare(constant.NormalExchange,amqp.ExchangeDirect,true,false,false,false,nil,)utils.FailOnError(err, "Failed to declare a normal exchange")// 声明一个dead交换机err = ch.ExchangeDeclare(constant.DeadExchange,amqp.ExchangeDirect,true,false,false,false,nil,)utils.FailOnError(err, "Failed to declare a dead exchange")// 声明一个normal队列_, err = ch.QueueDeclare(constant.NormalQueue,true,false,false,false,amqp.Table{"x-message-ttl": 5000, // 指定过期时间//"x-max-length": 6,"x-dead-letter-exchange": constant.DeadExchange, // 指定死信交换机"x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key})utils.FailOnError(err, "Failed to declare a normal queue")// 声明一个dead队列:注意不要给死信队列设置消息时间,否者死信队列里面的信息会再次过期_, err = ch.QueueDeclare(constant.DeadQueue,true,false,false,false,nil)utils.FailOnError(err, "Failed to declare a dead queue")// 将normal_exchange与normal_queue进行绑定err = ch.QueueBind(constant.NormalQueue, constant.NormalRoutingKey, constant.NormalExchange, false, nil)utils.FailOnError(err, "Failed to binding normal_exchange with normal_queue")// 将dead_exchange与dead_queue进行绑定err = ch.QueueBind(constant.DeadQueue, constant.DeadRoutingKey, constant.DeadExchange, false, nil)utils.FailOnError(err, "Failed to binding dead_exchange with dead_queue")// 消费消息msgs, err := ch.Consume(constant.NormalQueue,"",false, // 这个地方一定要关闭自动应答false,false,false,nil)utils.FailOnError(err, "Failed to consume in Consumer1")var forever chan struct{}go func() {for d := range msgs {if err := d.Reject(false); err != nil {utils.FailOnError(err, "Failed to Reject a message")}}}()log.Printf(" [*] Waiting for logs. To exit press CTRL+C")<-forever
}
consumer2.go
package day07import (amqp "github.com/rabbitmq/amqp091-go""log""v1/utils"
)func Consumer2() {// 拿取信道ch := utils.GetChannel()// 声明一个交换机err := ch.ExchangeDeclare("dead_exchange",amqp.ExchangeDirect,true,false,false,false,nil)utils.FailOnError(err, "Failed to Declare a exchange")// 接收消息的应答msgs, err := ch.Consume("dead_queue","",false,false,false,false,nil,)var forever chan struct{}go func() {for d := range msgs {log.Printf("[x] %s", d.Body)// 开启手动应答ßd.Ack(false)}}()log.Printf(" [*] Waiting for logs. To exit press CTRL+C")<-forever}
produce.go
package day07import ("context"amqp "github.com/rabbitmq/amqp091-go""strconv""time""v1/utils"
)func Produce() {// 获取信道ch := utils.GetChannel()// 声明一个交换机err := ch.ExchangeDeclare("normal_exchange",amqp.ExchangeDirect,true,false,false,false,nil)utils.FailOnError(err, "Failed to declare a exchange")ctx, cancer := context.WithTimeout(context.Background(), 5*time.Second)defer cancer()// 发送了10条消息for i := 0; i < 10; i++ {msg := "Info:" + strconv.Itoa(i)ch.PublishWithContext(ctx,"normal_exchange","normal_key",false,false,amqp.Publishing{ContentType: "text/plain",Body: []byte(msg),})}
}
3.3 针对原因2:限制一定的长度
只需要改变consumer1.go中的对normal_queue的声明
// 声明一个normal队列_, err = ch.QueueDeclare(constant.NormalQueue,true,false,false,false,amqp.Table{//"x-message-ttl": 5000, // 指定过期时间"x-max-length": 6,"x-dead-letter-exchange": constant.DeadExchange, // 指定死信交换机"x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key})
3.3 针对原因3:消费者拒绝的消息回到死信队列中
这里需要完成两点工作
工作1:需要在consumer1中作出拒绝的操作
go func() {for d := range msgs {if err := d.Reject(false); err != nil {utils.FailOnError(err, "Failed to Reject a message")}}}()
工作2:如果你consume的时候开启了自动应答一定要关闭
// 消费消息msgs, err := ch.Consume(constant.NormalQueue,"",false, // 这个地方一定要关闭自动应答false,false,false,nil)
其他的部分不需要改变,按照问题1中的设计即可。
相关文章:
Golang实现RabbitMQ中死信队列各个情况
下面这段教程针对是你已经有一些基本的MQ的知识,比如说能够很清楚的理解queue、exchange等概念,如果你还不是很理解,我建议你先访问官网查看基本的教程。 文章目录1、造成死信队列的主要原因2、操作逻辑图3、代码实战3.1 针对原因1࿱…...
react源码分析:组件的创建和更新
这一章节就来讲讲ReactDOM.render()方法的内部实现与流程吧。 因为初始化的源码文件部分所涵盖的内容很多,包括创建渲染、更新渲染、Fiber树的创建与diff,element的创建与插入,还包括一些优化算法,所以我就整个的React执行流程画了…...
Android Lmkd 低内存终止守护程序
一、低内存终止守护程序 Android 低内存终止守护程序 (lmkd) 进程可监控运行中的 Android 系统的内存状态,并通过终止最不必要的进程来应对内存压力大的问题,使系统以可接受的性能水平运行。 所有应用进程都是从zygote孵化出来的,记录在AMS…...
快速掌握 Flutter 图片开发核心技能
大家好,我是 17。 在 Flutter 中使用图片是最基础能力之一。17 做了精心准备,满满的都是干货!本文介绍如何在 Flutter 中使用图片,尽量详细,示例完整,包会! 使用网络图片 使用网络图片超级简…...
复习使用git(二)
删除远程分支 git push origin --delete 分支名 撤销修改 撤销工作区的修改 已修改,但尚未添加(add),使用 git restore 文件名 撤销工作区的修改。 Note: “git checkout – 文件名”,checkout 检出的意思&#x…...
魔兽世界335服务端架设对外网开放的步骤
警告:在没有网络安全防护措施或基础知识的情况下,开放端口可能造成被黑客入侵、流量攻击、破坏数据、资料泄露等情况的发生。在你选择开放端口时,视为已经充分了解可能发生的后果、危害,清楚自己在做什么,并且自己将对…...
华为OD机试模拟题 用 C++ 实现 - 通信误码(2023.Q1)
最近更新的博客 【华为OD机试模拟题】用 C++ 实现 - 最多获得的短信条数(2023.Q1)) 文章目录 最近更新的博客使用说明通信误码题目输入输出示例一输入输出说明示例二输入输出说明Code使用说明 参加华为od机试,一定要注意不要完全背诵代码,需要理解之后模仿写出,...
Vue 核心
文章目录Vue 核心一,Vue 简介(一)官网(二)介绍与描述(三)Vue 的特点(四)与其它 JS 框架的关联(五)Vue 周边库二,初识 Vue三࿰…...
Kylin V10桌面版arm3568 源码安装redis
上传redis-5.0.14.tar.gz到/home/kylin/下载;解压kylinkylin:~/下载$ tar -zxvf redis-5.0.14.tar.gz/opt下新建redis目录,并将上面解压的文件夹移到此处kylinkylin:~/下载$ sudo mv redis-5.0.14 /opt/redis/编译:kylinkylin:/opt/redis/red…...
【ICCV2022】 CAPAO:一种高效的单阶段人体姿态估计模型
CAPAO:一种高效的单阶段人体姿态估计模型 重新思考关键点表示:将关键点和姿态建模作为多人姿态估计的对象(Rethinking Keypoint Representations: Modeling Keypoints and Poses as Objects for Multi-Person Human Pose Estimation…...
ROS1学习笔记:ROS中的坐标管理系统(ubuntu20.04)
参考B站古月居ROS入门21讲:ROS中的坐标系管理系统 基于VMware Ubuntu 20.04 Noetic版本的环境 文章目录一、机器人中的坐标变换二、TF功能包三、小海龟跟随实验3.1 启动实验3.2 查看当前的TF树3.3 坐标相对位置可视化3.3.1 tf_echo3.3.2 rviz一、机器人中的坐标变换…...
requests---(2)session简介与自动写博客
目录:导读 session简介 session登录 自动写博客 获取登录cookies 抓取写博客接口 requests自动写博客 写在最后 http协议是无状态的,也就是每个请求都是独立的。那么登录后的一系列动作,都需要用cookie来验证身份是否是登录状态&#…...
基于 HAProxy + Keepalived 搭建 RabbitMQ 高可用集群
RabbitMQ 集群 通常情况下,在集群中我们把每一个服务称之为一个节点,在 RabbitMQ 集群中,节点类型可以分为两种: 内存节点:元数据存放于内存中。为了重启后能同步数据,内存节点会将磁盘节点的地址存放于磁…...
基于51单片机和proteus的智能调速风扇设计
此智能风扇是基于51单片机和proteus的仿真设计,功能如下: 1. Timer0 PWM控制电机转速 2. DHT11采集温湿度 3. LCD1602显示温湿度及电机状态 4. 按键控制电机加减速启停等 5. 串口控制电机加减速启停等 功能框图如下: Proteus仿真界面如下…...
SQL Server开启CDC的完整操作过程
这里写自定义目录标题写在前面SQL Server开启CDC1. 将指定库的实例先开启CDC2. 开启需要开启CDC的表3. 关闭CDC功能更详细信息参照官网写在前面 鉴于老旧数据的结构和项目都在sqlserver上存储,且迁移成本巨大,当下要为sqlserver的存储过程减负。要将一部…...
【Spring Cloud Alibaba】008-Sentinel
【Spring Cloud Alibaba】008-Sentinel 文章目录【Spring Cloud Alibaba】008-Sentinel一、服务雪崩1、概述2、解决方案常见的容错机制二、Sentinel:分布式系统的流量防卫兵1、**Sentinel** 概述简介特性Sentinel 的开源生态Sentinel 的历史2、Sentinel 基本概念资源…...
解读CRC校验计算
个人随笔 (Owed by: 春夜喜雨 http://blog.csdn.net/chunyexiyu) 参考:http://www.sunshine2k.de/articles/coding/crc/understanding_crc.html 参考:https://en.wikipedia.org/wiki/Cyclic_redundancy_check 参考:https://www.cnblogs.com/…...
深入理解Spring MVC下
上一篇博客从理论概念上来梳理Spring MVC相关知识,此篇博客将通过spring官网提供showcase代码为例子,详细介绍showcase代码中包含的各个例子是如何实现的。官网的showcase代码包含的主要例子包括,Demo地址:Mapping Requests&#…...
【Linux】ssh-keygen不需要回车,自动生成密钥,批量免密操作!
使用命令ssh-keygen 需要手动敲击回车,才会生成密钥,如下代码所示 [rootlocalhost ~]# ssh-keygen Generating public/private rsa key pair. Enter file in which to save the key (/root/.ssh/id_rsa): Enter passphrase (empty for no passphrase):…...
C/C++开发,无可避免的内存管理(篇四)-智能指针备选
一、智能指针 采用C/C开发堆内存管理无论是底层开发还是上层应用,无论是开发新手,还是多年的老手,都会不自觉中招,尤其是那些不是自己一手经历的代码,要追溯问题出在哪里更是个麻烦事。C/C程序常常会遇到程序突然退出&…...
20吨燃气蒸汽锅炉实力厂家/支持上门安装调试
燃气蒸汽锅炉,认准源头实力厂家,不仅能买到品质过硬的设备,更能享受到省心便捷的上门安装调试服务,免去自行安装的繁琐与隐患,让设备快速投入平稳运行。我们作为深耕锅炉制造行业的实力厂家,具备正规生产资…...
ABC系统实战指南:革新数字电路设计的逻辑综合与形式验证技术突破
ABC系统实战指南:革新数字电路设计的逻辑综合与形式验证技术突破 【免费下载链接】abc ABC: System for Sequential Logic Synthesis and Formal Verification 项目地址: https://gitcode.com/gh_mirrors/ab/abc 在现代集成电路设计流程中,工程师…...
手把手教你用STM32实现BLDC电机的SPWM控制(附代码调试心得)
STM32实战:无刷直流电机SPWM控制全解析与代码优化指南 从理论到实践:BLDC电机控制的核心逻辑 第一次接触无刷直流电机(BLDC)控制时,我被它优雅的工作原理所吸引——没有电刷的火花和磨损,却能实现高效的能量转换。在工业自动化、无…...
MangoHud与Vulkan视频编码协议:AV1监控完全指南
MangoHud与Vulkan视频编码协议:AV1监控完全指南 【免费下载链接】MangoHud A Vulkan and OpenGL overlay for monitoring FPS, temperatures, CPU/GPU load and more. Discord: https://discordapp.com/invite/Gj5YmBb 项目地址: https://gitcode.com/gh_mirrors/…...
PySide6商业项目避坑指南:从许可证验证到Qt Designer实战
PySide6商业项目避坑指南:从许可证合规到UI开发实战 当企业开发者选择PySide6作为桌面应用开发框架时,往往会被其商业友好的LGPL许可证所吸引。但真正落地到项目开发中,从法律合规到技术实现都存在诸多需要特别注意的细节。本文将深入剖析那些…...
【八股必备】多线程面试题2
第一部分:线程基础与概念篇1. 线程模型面试官:先来个基础题,Java程序里的线程和操作系统线程是什么关系?是一回事吗?候选人:好的。在绝大多数情况下,比如我们常用的Windows、Linux系统ÿ…...
3步解除音乐枷锁:QMCDecode全场景音频解密指南
3步解除音乐枷锁:QMCDecode全场景音频解密指南 【免费下载链接】QMCDecode QQ音乐QMC格式转换为普通格式(qmcflac转flac,qmc0,qmc3转mp3, mflac,mflac0等转flac),仅支持macOS,可自动识别到QQ音乐下载目录,默认转换结果…...
A-59F 多功能语音处理模组:覆盖全场景人群,让每一次语音都清晰无噪
在门禁对讲、会议扩音、车载通话、导游喊话、监护设备、智能工牌等各类语音设备中,啸叫刺耳、环境嘈杂、回音不断、拾音模糊、通话断续是所有人共同的痛点。一款真正解决问题的核心硬件 ——A-59F 多功能语音处理模组,它集成扩音防啸叫、AI ENC 降噪、AE…...
QRazyBox:5分钟解决二维码修复难题的专业工具
QRazyBox:5分钟解决二维码修复难题的专业工具 【免费下载链接】qrazybox QR Code Analysis and Recovery Toolkit 项目地址: https://gitcode.com/gh_mirrors/qr/qrazybox 二维码已经成为现代生活中无处不在的数字桥梁,但你是否遇到过这样的情况&…...
别再折腾虚拟机了!用Docker 5分钟搞定Oracle 10g测试环境(附阿里云镜像源)
5分钟极速部署Oracle 10g:Docker化开发环境实战指南 每次需要搭建Oracle测试环境时,你是否也经历过这样的痛苦?下载几个GB的安装包、配置复杂的系统参数、等待漫长的安装过程,最后可能还会遇到各种依赖问题。作为一名长期与Oracle…...
