【从零开始学习RabbitMQ | 第二篇】如何确保MQ的可靠性和消费者可靠性
目录
前言:
MQ可靠性:
数据持久化:
Lazy Queue:
消费者可靠性:
消费者确认机制:
消费失败处理:
MQ保证幂等性:
方法一:
总结:
前言:
在上一篇文章中,我们介绍了如何确保生产者的可靠性,确保消息一定可以到达MQ。
【从零开始学习RabbitMQ | 第一篇】如何确保生产者的可靠性-CSDN博客
https://liyuanxin.blog.csdn.net/article/details/139261125?spm=1001.2014.3001.5502
但是MQ自己也是会丢失消息的,比如MQ的突然宕机或者消息过多造成的阻塞,因此我们这篇文章来介绍一下如何确保MQ的可靠性和消费者可靠性

默认情况下,RabbitMQ会把接收到的消息保存在内存中来降低消息收发的延迟,但这样会导致两个问题:
- 一旦MQ宕机,内存中的消息会丢失
- 内存空间有限,当消费者故障或者处理过慢的时候,会导致消息的积压,引发阻塞。

想要保证MQ的可靠性,主要依赖于两种方式:
- 数据持久化
- Lazy Queue
MQ可靠性:
数据持久化:
RabbitMQ实现持久化主要有三个部分:
-
持久化队列(Durable Queues): 持久化队列指的是队列本身被存储在磁盘上,这样即使RabbitMQ服务器重启,队列也不会丢失。要创建持久化队列,需要在声明队列时设置
durable属性为true。持久化队列中的消息默认不是持久化的,需要单独设置每条消息为持久化。 -
持久化消息(Persistent Messages): 持久化消息意味着消息本身被存储在磁盘上,因此即使RabbitMQ服务器重启,消息也不会丢失。在发送消息时,需要设置消息的
deliveryMode属性为2(即PERSISTENT),这样RabbitMQ就会将消息存储到磁盘上。 -
持久化交换机(Durable Exchanges): 持久化交换器与持久化队列类似,指的是交换器被存储在磁盘上,这样服务器重启后交换器依然存在。声明交换器时,也需要设置
durable属性为true。持久化交换器确保了消息路由的结构在服务器重启后能够保留。
当我们把消息持久化到磁盘中的时候,就避免了消息挤压造成的阻塞。但是当我们把消息持久化到磁盘中的时候,这个时候是不能接收新的消息的。
Lazy Queue:
因此实际上把消息持久化到磁盘中不是一个很好的解决方案,在RabbitMQ的3.6.0版本后,增加了Lazy Queue这个概念。其实就是懒惰队列:
懒惰队列的特性如下:
- 接收到消息后直接存入磁盘而不是内存(内存只保留最近的消息,默认2048条)。
- 消费者要消费消息的时候才会从磁盘中读取并加载到内存中。
- 支持数百万条的消息存储。
在3.12版本后,所有的队列都是Lazy Queue模式,无法更改。
消费者可靠性:
消费者确认机制:
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。当消费者处理消息的结束之后,将会向RabbitMQ发送一个回执,告知自己消息的处理状态 ,消费者一共可以向Rabbitmq回执三种处理状态,分别是:
- ACK:成功处理消息,RabbitMQ从队列中删除该消息
- NACK:消息处理失败,RabbitMQ再次投递消息
- REJECT: 消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
而消费者确认机制并不用我们手写,Spring AMQP已经为我们实现了消费者消息确认机制,并允许我们通过配置文件的形式选择ACK处理方式,有三种:
- none:不处理,消息投递给消费者之后就会ack,消息会从MQ中删除,不安全
- manual:手动模式。需要自己调用api发送ack或者reject。
- auto:托管给Spring AMQP 利用 AOP实现消费者确认机制。当业务正常执行的时候返回ack
当业务异常的时候,根据异常不同返回不同的结果:
- 如果是业务异常,自动返回nack
- 如果是消息处理或者校验异常,自动返回reject

Spring AMQP的消费者确认机制默认开启auto。
消费失败处理:
如果我们把消费者确认机制配置为auto的时候,当消费者出现异常之后,消息会不断的重新入队到队列,再重新发送给消费者。同样的消息再次引发异常,再次入队,再次发送给消费者。
这样会导致MQ的消息处理数量飙升,带来不必要的压力,因此除了把处理失败的消息重新进行处理之外,我们还应该有其他的处理方案。
我们可以利用Spring 的retry机制,在消费者出现异常的时候使用本地重试,而不是无限制的requeue到MQ队列中。
listener:simple:prefetch: 1retry:enabled: true #开启超时重试机制initial-interval: 1000ms #失败后的初始等待时间multiplier: 1 #失败后下次等待时长倍数,下次等待时长= Initial - interval * multipliermax-attempts: 3 #最大重试次数stateless: true #true无状态,false有状态,如果业务中包含事务,就改为fasle
当我们开启重试模式之后,如果重试次数耗尽并且消息依然失败,那么就需要有MessageRecoverer接口来处理,这个接口包含三种不同的实现:
- RejectAndDontRequeueRecoverer :重试耗尽之后,直接reject,丢弃消息。
- ImmedIateRequeueMessageRecoverer : 重试耗尽之后,返回nack,消息重新入队
- RepublishMessageRecoerer:重试耗尽之后,将失败消息投递到指定的交换机
如果消息丢失不会对业务产生重大影响,可以选择RejectAndDontRequeueRecoverer。
如果希望消息有机会被重新处理,可以选择ImmediateRequeueMessageRecoverer。
而RepublishMessageRecoverer则适用于需要对失败消息进行进一步分析或记录的情况。
MQ保证幂等性:
通过上述的各种保证措施,我们基本上可以确保业务至少被执行一次。那对于一些需要保证幂等性的业务,我们要如何保证这个消息只被消费一次呢?
【从零开始学习重要知识点 | 第一篇】快速了解什么是幂等性以及常见解决方案_幂等性问题如何解决分布式锁-CSDN博客
https://blog.csdn.net/fckbb/article/details/136331113?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522171689607216777224421340%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=171689607216777224421340&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~first_rank_ecpm_v1~rank_v31_ecpm-1-136331113-null-null.142%5Ev100%5Epc_search_result_base1&utm_term=%E5%B9%82%E7%AD%89%E6%80%A7%20%E6%88%91%E6%98%AF%E4%B8%80%E7%9B%98%E7%89%9B%E8%82%89&spm=1018.2226.3001.4187
方法一:
给每一个消息都设置一个唯一ID,用ID区分是否被消费过。当我们消费一个消息的时候,先在数据库中查询是否存在这个数据的ID,如果不存在就消费。如果存在就说明这个消息之前被消费过。

其实他和我们上面黏贴的文章中介绍的token机制的思想很像。而在MQ中也不需要我们手动去设置唯一ID,可以在消费者的消息转换器中开启:

我们可以看一看这个setCreateMessageIds这个方法中是如何构造id的:

这里逻辑就很清晰了,实际上就是在发消息的时候做一个if判断,如果我们设置了构造id,并且当前消息配置类的id为空,我们就为这个消息构造一个id。
但是这种方法也有自己的缺点,也就是对业务逻辑有侵入性,而且还有额外的数据库操作。
总结:
在本文中,我们探讨了RabbitMQ作为领先的开源消息代理,如何通过一系列高级特性和策略来确保消息队列和消费者的高度可靠性。
首先,我们讨论了RabbitMQ的持久化机制,它允许消息和队列在服务器重启后依然保持不变。通过将消息标记为持久化,并在队列上启用持久化选项,我们可以确保关键数据不会因系统故障而丢失。
最后,我们介绍了消息确认机制,这是确保消息被成功处理的关键。消费者在处理完消息后发送确认回执,RabbitMQ只有在收到确认后才会从队列中移除消息。如果消费者在处理过程中失败,未确认的消息将重新入队,供其他消费者处理。
如果我的内容对你有帮助,请点赞,评论,收藏。创作不易,大家的支持就是我坚持下去的动力!

相关文章:
【从零开始学习RabbitMQ | 第二篇】如何确保MQ的可靠性和消费者可靠性
目录 前言: MQ可靠性: 数据持久化: Lazy Queue: 消费者可靠性: 消费者确认机制: 消费失败处理: MQ保证幂等性: 方法一: 总结: 前言: …...
常用批处理命令及批处理文件编写技巧
一常用批处理命令 1.查看命令用法:命令 /? //如:cd /? 2.切换盘符目录:cd /d D:\test 或直接输入 d: //进入上次d盘所在的目录 3.切换目录:cd test 4.清屏:cls 5.“arp -a” //它会列出当前设备缓存中的所有…...
android NetworkMonitor记录
是否能上网的状态 上网url地址的设置: NetworkMonitor.java makeCaptivePortalHttpsUrls config_captive_portal_https_urls DEFAULT_CAPTIVE_PORTAL_HTTPS_URLS http准备监测 isCaptivePortal sendHttpAndHttpsParallelWithFallbackProbes httpsProbe.start();…...
OSPF优化——OSPF减少LSA更新量2
二、特殊区域——优化非骨干区域的LSA数量 不是骨干区域、不能存在虚链路 1、不能存在 ASBR 1)末梢区域 该区域将拒绝 4、5LSA的进人,同时由该区域连接骨干0区域的ABR 向该区域,发布一条3类的缺省路由; 该区域内每台路由器均需配置…...
【AMS】Android 8.0+ 绕开启动后台Service限制
一、背景 应客户要求,需要在开机时,拉起应用A。但因为开机时,同时被拉起的应用过多,导致Launcher在开机那一刻较为卡顿。为解决这一问题,采取了延迟拉起的做法。在开机后,延迟一定时间,由系统服务,拉起应用A。 于是乎,就出现这么个报错: Not allowed to start ser…...
【多态】(超级详细!)
【多态】(超级详细!) 前言一、 多态的概念二、重写1. 方法重写的规则2. 重写和重载的区别 三、多态实现的条件四、 向上转型五、动态绑定 前言 面向对象的三大特征:封装性、继承性、多态性。 extends继承或者implements实现&…...
vue的组件化
vue的组件化 vue的组件化,就是根据功能、业务逻辑、数据流向等因素进行划分把页面拆分成多个组件。组件是资源独立的,组件也可以相互嵌套。目的是提高代码的可读性、可维护性和可复用性。 组件化思想体现 组件封装步骤 1.公共组件 公共组件全局注…...
spark的简单学习一
一 RDD 1.1 RDD的概述 1.RDD(Resilient Distributed Dataset,弹性分布式数据集)是Apache Spark中的一个核心概念。它是Spark中用于表示不可变、可分区、里面的元素可并行计算的集合。RDD提供了一种高度受限的共享内存模型,即RD…...
【第5章】SpringBoot整合Druid
文章目录 前言一、启动器二、配置1.JDBC 配置2.连接池配置3. 监控配置 三、配置多数据源1. 添加配置2. 创建数据源 四、配置 Filter1. 配置Filter2. 可配置的Filter 五、获取 Druid 的监控数据六、案例1. 问题2. 引入库3. 配置4. 配置类5. 测试类6. 测试结果 七、案例 ( 推荐 )…...
力扣654. 最大二叉树
Problem: 654. 最大二叉树 文章目录 题目描述思路复杂度Code 题目描述 思路 对于构造二叉树这类问题一般都是利用先、中、后序遍历,再将原始问题分解得出结果 1.定义递归函数build,每次将一个数组中的最大值作为当前子树的根节点构造二叉树;…...
基于Netty实现WebSocket客户端
本文是基于Netty快速上手WebSocket客户端,不涉及WebSocket的TLS/SSL加密传输。 WebSocket原理参考【WebSocket简介-CSDN博客】,测试用的WebSocket服务端也是用Netty实现的,参考【基于Netty实现WebSocket服务端-CSDN博客】 一、基于Netty快速…...
homebrew安装mysql的一些问题
本文目录 一、Homebrew镜像安装二、mac安装mysql2.1、修改mysql密码 本文基于mac环境下进行的安装 一、Homebrew镜像安装 Homebrew国内如何自动安装,运行命令/bin/zsh -c "$(curl -fsSL https://gitee.com/cunkai/HomebrewCN/raw/master/Homebrew.sh)" 会…...
产线问题排查
CPU过高 使用top命令查看占用CPU过高的进程。 导出CPU占用高进程的线程栈。 jstack pid >> java.txt Java 内存过高的问题排查 1.分析OOM异常的原因,堆溢出?栈溢出?本地内存溢出? 2.如果是堆溢出,导出堆dump&…...
华为WLAN实验继续-2,多个AP如何部署
----------------------------------------如果添加新的AP,如何实现多AP的服务----------- 新增加一个AP2启动之后发现无法获得IP地址 在AP2上查看其MAC地址,并与将其加入到AC中去 打开AC,将AP2的MAC加入到AC中 sys Enter system view, re…...
手把手教你写Java项目(1)——流程
个人练手项目的一般流程: 个人练手项目的流程通常相对简单和灵活,但仍然遵循一定的步骤来确保项目的顺利进行。流程相对较为详细,不是所有流程都要实现,一些仅供参考。主要是让大家对项目有初步的了解,不至于无法入手…...
微信小程序post请求
一、普通请求 wx.request({url: http://43.143.124.247:8282/sendEmail,method: POST,data: {user: that.data.currarr[0][that.data.mulu[0]] that.data.currarr[1][that.data.mulu[1]] that.data.sushe,pwd: 3101435196qq.com},header: {Content-Type: application/x-www-…...
frm一级4个1大神复习经验分享系列(二)
先说一下自己的情况,8月份中旬开始备考,中间一直是跟着网课走,notes和官方书都没看,然后10月份下旬开始刷题一直到考试。下面分享一些自己备考的经验和走过的弯路。 一级 一级整体学习下来的感受是偏重于基础的理论知识。FRM一级侧…...
理解磁盘分区与管理:U启、PE、DiskGenius、MBR与GUID
目录 U启和PE的区别: U启(U盘启动): PE(预安装环境): 在DiskGenius中分区完成之后是否还需要格式化: 1.建立文件系统: 2.清除数据: 3.检查并修复分区: 分区表格式中,MBR和GUID的区别: 1…...
GPT-4o和GPT-4有什么区别?我们还需要付费开通GPT-4?
GPT-4o 是 OpenAI 最新推出的大模型,有它的独特之处。那么GPT-4o 与 GPT-4 之间的主要区别具体有哪些呢?今天我们就来聊聊这个问题。 目前来看,主要是下面几个差异。 响应速度 GPT-4o 的一个显著优势是其处理速度。它能够更快地回应用户的查…...
《C++ Primer Plus》第十二章复习题和编程练习
目录 一、复习题二、编程练习 一、复习题 1. 假设String类有如下私有成员: // String 类声明 class String { private: char* str;int len;// ... };a. 下述默认构造函数有什么问题? String::String() { } // 默认构造函数b. 下述构造函数有什么问题…...
靠谱的江西靠谱单招机构哪家靠谱
每年单招报名前后,总有不少家长和同学跑来问我:“江西到底哪家单招机构靠谱?”说实话,这个问题没有标准答案,但如果你愿意听点实在的,我可以分享一下这几年自己观察到的和从往届学员那里听到的信息。为什么…...
收藏! Harness 让你轻松驾驭大模型,小白也能写出高效代码
本文探讨了 AI 编程 Agent 的核心要素,强调 Harness(工具、流程和反馈系统)的重要性远超单纯依赖模型。通过实例说明,优化编辑格式等 Harness 设计可显著提升 Agent 成功率。文章提出,为 AI 准备更好的工作台ÿ…...
中年以后,真正有效的抗衰老运动,其实就这 4 种
过了 30 岁,肌肉每年流失 1%-2%,基础代谢下降,精力大不如前——这不是错觉,是生理规律。 但运动的选择,决定了你是「老得快」还是「逆生长」。分享 4 种被科学验证的抗衰老运动,中年人越早开始越好。 1️⃣…...
大模型岗位锐评:小白程序员转型指南 学习资源包免费领!收藏必备
本文深度剖析大模型领域的五大梯队岗位,从底层架构工程师到应用开发工程师,详细介绍了各岗位的日常工作、新手友好度、优势与避雷点。文章强调大模型领域人才缺口巨大,传统程序员具备转型优势,并提供了系统学习路线及实战资源&…...
3个核心优势:为什么Robo 3T仍然是MongoDB开发者的首选工具
3个核心优势:为什么Robo 3T仍然是MongoDB开发者的首选工具 【免费下载链接】robomongo Native cross-platform MongoDB management tool 项目地址: https://gitcode.com/gh_mirrors/ro/robomongo 还在为MongoDB的命令行操作感到困扰?想象一下&…...
别再手动敲命令了!用Kuboard-Spray v1.2.4图形化搞定K8s集群(附CentOS 7.9避坑实录)
图形化利器Kuboard-Spray v1.2.4:三分钟搭建生产级K8s集群的避坑指南 当你在凌晨三点盯着满屏的kubeadm init报错信息时,是否想过Kubernetes集群部署还能更简单?去年我们团队在客户现场部署一套生产环境时,传统命令行方式让我们在…...
别急着升级Android Studio!手把手教你降级AGP 8.3.0-alpha01到8.1.3,解决版本不兼容报错
别急着升级Android Studio!手把手教你降级AGP 8.3.0-alpha01到8.1.3,解决版本不兼容报错 接手一个Kotlin项目时,最令人头疼的莫过于刚打开就遭遇版本不兼容的红色报错。尤其当错误提示显示"项目使用了不兼容的Android Gradle插件版本(A…...
RK3576开发板AP6275S无线模块调试:从驱动到应用实战
1. 项目概述:从零上手RK3576的无线模块调试最近在折腾一块基于瑞芯微RK3576的国产工业评估板——眺望电子的EVM-RK3576。这块板子接口资源相当丰富,双千兆网口、CAN、RS485、USB3.0等一应俱全,对于做工业网关、边缘计算盒子或者多媒体终端的开…...
AI插件深度对比 | Copilot、Tabnine、Codeium谁是王者
Copilot 的代码补全能力确实厉害,我试过在写 Python 函数的时候,只要输入注释,它就能自动生成函数体。比如写 “# 计算斐波那契数列”,它能直接给出递归和迭代两种实现方式。不过有时候生成的代码有点冗长,需要手动精简…...
免费额度哪家强?ESP32玩家实测八大国产大模型API(含通义千问、Kimi、DeepSeek)
ESP32开发者指南:八大国产大模型API横向评测与实战选型 当ESP32遇上大语言模型,会擦出怎样的火花?在物联网设备上直接运行AI交互功能,已经成为越来越多开发者的新选择。但面对众多国产大模型API,如何选择最适合ESP32项…...
