【从零开始学习RabbitMQ | 第二篇】如何确保MQ的可靠性和消费者可靠性
目录
前言:
MQ可靠性:
数据持久化:
Lazy Queue:
消费者可靠性:
消费者确认机制:
消费失败处理:
MQ保证幂等性:
方法一:
总结:
前言:
在上一篇文章中,我们介绍了如何确保生产者的可靠性,确保消息一定可以到达MQ。
【从零开始学习RabbitMQ | 第一篇】如何确保生产者的可靠性-CSDN博客https://liyuanxin.blog.csdn.net/article/details/139261125?spm=1001.2014.3001.5502
但是MQ自己也是会丢失消息的,比如MQ的突然宕机或者消息过多造成的阻塞,因此我们这篇文章来介绍一下如何确保MQ的可靠性和消费者可靠性
默认情况下,RabbitMQ会把接收到的消息保存在内存中来降低消息收发的延迟,但这样会导致两个问题:
- 一旦MQ宕机,内存中的消息会丢失
- 内存空间有限,当消费者故障或者处理过慢的时候,会导致消息的积压,引发阻塞。
想要保证MQ的可靠性,主要依赖于两种方式:
- 数据持久化
- Lazy Queue
MQ可靠性:
数据持久化:
RabbitMQ实现持久化主要有三个部分:
-
持久化队列(Durable Queues): 持久化队列指的是队列本身被存储在磁盘上,这样即使RabbitMQ服务器重启,队列也不会丢失。要创建持久化队列,需要在声明队列时设置
durable
属性为true
。持久化队列中的消息默认不是持久化的,需要单独设置每条消息为持久化。 -
持久化消息(Persistent Messages): 持久化消息意味着消息本身被存储在磁盘上,因此即使RabbitMQ服务器重启,消息也不会丢失。在发送消息时,需要设置消息的
deliveryMode
属性为2
(即PERSISTENT
),这样RabbitMQ就会将消息存储到磁盘上。 -
持久化交换机(Durable Exchanges): 持久化交换器与持久化队列类似,指的是交换器被存储在磁盘上,这样服务器重启后交换器依然存在。声明交换器时,也需要设置
durable
属性为true
。持久化交换器确保了消息路由的结构在服务器重启后能够保留。
当我们把消息持久化到磁盘中的时候,就避免了消息挤压造成的阻塞。但是当我们把消息持久化到磁盘中的时候,这个时候是不能接收新的消息的。
Lazy Queue:
因此实际上把消息持久化到磁盘中不是一个很好的解决方案,在RabbitMQ的3.6.0版本后,增加了Lazy Queue这个概念。其实就是懒惰队列:
懒惰队列的特性如下:
- 接收到消息后直接存入磁盘而不是内存(内存只保留最近的消息,默认2048条)。
- 消费者要消费消息的时候才会从磁盘中读取并加载到内存中。
- 支持数百万条的消息存储。
在3.12版本后,所有的队列都是Lazy Queue模式,无法更改。
消费者可靠性:
消费者确认机制:
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。当消费者处理消息的结束之后,将会向RabbitMQ发送一个回执,告知自己消息的处理状态 ,消费者一共可以向Rabbitmq回执三种处理状态,分别是:
- ACK:成功处理消息,RabbitMQ从队列中删除该消息
- NACK:消息处理失败,RabbitMQ再次投递消息
- REJECT: 消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
而消费者确认机制并不用我们手写,Spring AMQP已经为我们实现了消费者消息确认机制,并允许我们通过配置文件的形式选择ACK处理方式,有三种:
- none:不处理,消息投递给消费者之后就会ack,消息会从MQ中删除,不安全
- manual:手动模式。需要自己调用api发送ack或者reject。
- auto:托管给Spring AMQP 利用 AOP实现消费者确认机制。当业务正常执行的时候返回ack
当业务异常的时候,根据异常不同返回不同的结果:
- 如果是业务异常,自动返回nack
- 如果是消息处理或者校验异常,自动返回reject
Spring AMQP的消费者确认机制默认开启auto。
消费失败处理:
如果我们把消费者确认机制配置为auto的时候,当消费者出现异常之后,消息会不断的重新入队到队列,再重新发送给消费者。同样的消息再次引发异常,再次入队,再次发送给消费者。
这样会导致MQ的消息处理数量飙升,带来不必要的压力,因此除了把处理失败的消息重新进行处理之外,我们还应该有其他的处理方案。
我们可以利用Spring 的retry机制,在消费者出现异常的时候使用本地重试,而不是无限制的requeue到MQ队列中。
listener:simple:prefetch: 1retry:enabled: true #开启超时重试机制initial-interval: 1000ms #失败后的初始等待时间multiplier: 1 #失败后下次等待时长倍数,下次等待时长= Initial - interval * multipliermax-attempts: 3 #最大重试次数stateless: true #true无状态,false有状态,如果业务中包含事务,就改为fasle
当我们开启重试模式之后,如果重试次数耗尽并且消息依然失败,那么就需要有MessageRecoverer接口来处理,这个接口包含三种不同的实现:
- RejectAndDontRequeueRecoverer :重试耗尽之后,直接reject,丢弃消息。
- ImmedIateRequeueMessageRecoverer : 重试耗尽之后,返回nack,消息重新入队
- RepublishMessageRecoerer:重试耗尽之后,将失败消息投递到指定的交换机
如果消息丢失不会对业务产生重大影响,可以选择RejectAndDontRequeueRecoverer
。
如果希望消息有机会被重新处理,可以选择ImmediateRequeueMessageRecoverer
。
而RepublishMessageRecoverer
则适用于需要对失败消息进行进一步分析或记录的情况。
MQ保证幂等性:
通过上述的各种保证措施,我们基本上可以确保业务至少被执行一次。那对于一些需要保证幂等性的业务,我们要如何保证这个消息只被消费一次呢?
【从零开始学习重要知识点 | 第一篇】快速了解什么是幂等性以及常见解决方案_幂等性问题如何解决分布式锁-CSDN博客https://blog.csdn.net/fckbb/article/details/136331113?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522171689607216777224421340%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=171689607216777224421340&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~first_rank_ecpm_v1~rank_v31_ecpm-1-136331113-null-null.142%5Ev100%5Epc_search_result_base1&utm_term=%E5%B9%82%E7%AD%89%E6%80%A7%20%E6%88%91%E6%98%AF%E4%B8%80%E7%9B%98%E7%89%9B%E8%82%89&spm=1018.2226.3001.4187
方法一:
给每一个消息都设置一个唯一ID,用ID区分是否被消费过。当我们消费一个消息的时候,先在数据库中查询是否存在这个数据的ID,如果不存在就消费。如果存在就说明这个消息之前被消费过。
其实他和我们上面黏贴的文章中介绍的token机制的思想很像。而在MQ中也不需要我们手动去设置唯一ID,可以在消费者的消息转换器中开启:
我们可以看一看这个setCreateMessageIds这个方法中是如何构造id的:
这里逻辑就很清晰了,实际上就是在发消息的时候做一个if判断,如果我们设置了构造id,并且当前消息配置类的id为空,我们就为这个消息构造一个id。
但是这种方法也有自己的缺点,也就是对业务逻辑有侵入性,而且还有额外的数据库操作。
总结:
在本文中,我们探讨了RabbitMQ作为领先的开源消息代理,如何通过一系列高级特性和策略来确保消息队列和消费者的高度可靠性。
首先,我们讨论了RabbitMQ的持久化机制,它允许消息和队列在服务器重启后依然保持不变。通过将消息标记为持久化,并在队列上启用持久化选项,我们可以确保关键数据不会因系统故障而丢失。
最后,我们介绍了消息确认机制,这是确保消息被成功处理的关键。消费者在处理完消息后发送确认回执,RabbitMQ只有在收到确认后才会从队列中移除消息。如果消费者在处理过程中失败,未确认的消息将重新入队,供其他消费者处理。
如果我的内容对你有帮助,请点赞,评论,收藏。创作不易,大家的支持就是我坚持下去的动力!
相关文章:

【从零开始学习RabbitMQ | 第二篇】如何确保MQ的可靠性和消费者可靠性
目录 前言: MQ可靠性: 数据持久化: Lazy Queue: 消费者可靠性: 消费者确认机制: 消费失败处理: MQ保证幂等性: 方法一: 总结: 前言: …...

常用批处理命令及批处理文件编写技巧
一常用批处理命令 1.查看命令用法:命令 /? //如:cd /? 2.切换盘符目录:cd /d D:\test 或直接输入 d: //进入上次d盘所在的目录 3.切换目录:cd test 4.清屏:cls 5.“arp -a” //它会列出当前设备缓存中的所有…...
android NetworkMonitor记录
是否能上网的状态 上网url地址的设置: NetworkMonitor.java makeCaptivePortalHttpsUrls config_captive_portal_https_urls DEFAULT_CAPTIVE_PORTAL_HTTPS_URLS http准备监测 isCaptivePortal sendHttpAndHttpsParallelWithFallbackProbes httpsProbe.start();…...

OSPF优化——OSPF减少LSA更新量2
二、特殊区域——优化非骨干区域的LSA数量 不是骨干区域、不能存在虚链路 1、不能存在 ASBR 1)末梢区域 该区域将拒绝 4、5LSA的进人,同时由该区域连接骨干0区域的ABR 向该区域,发布一条3类的缺省路由; 该区域内每台路由器均需配置…...
【AMS】Android 8.0+ 绕开启动后台Service限制
一、背景 应客户要求,需要在开机时,拉起应用A。但因为开机时,同时被拉起的应用过多,导致Launcher在开机那一刻较为卡顿。为解决这一问题,采取了延迟拉起的做法。在开机后,延迟一定时间,由系统服务,拉起应用A。 于是乎,就出现这么个报错: Not allowed to start ser…...

【多态】(超级详细!)
【多态】(超级详细!) 前言一、 多态的概念二、重写1. 方法重写的规则2. 重写和重载的区别 三、多态实现的条件四、 向上转型五、动态绑定 前言 面向对象的三大特征:封装性、继承性、多态性。 extends继承或者implements实现&…...

vue的组件化
vue的组件化 vue的组件化,就是根据功能、业务逻辑、数据流向等因素进行划分把页面拆分成多个组件。组件是资源独立的,组件也可以相互嵌套。目的是提高代码的可读性、可维护性和可复用性。 组件化思想体现 组件封装步骤 1.公共组件 公共组件全局注…...

spark的简单学习一
一 RDD 1.1 RDD的概述 1.RDD(Resilient Distributed Dataset,弹性分布式数据集)是Apache Spark中的一个核心概念。它是Spark中用于表示不可变、可分区、里面的元素可并行计算的集合。RDD提供了一种高度受限的共享内存模型,即RD…...

【第5章】SpringBoot整合Druid
文章目录 前言一、启动器二、配置1.JDBC 配置2.连接池配置3. 监控配置 三、配置多数据源1. 添加配置2. 创建数据源 四、配置 Filter1. 配置Filter2. 可配置的Filter 五、获取 Druid 的监控数据六、案例1. 问题2. 引入库3. 配置4. 配置类5. 测试类6. 测试结果 七、案例 ( 推荐 )…...

力扣654. 最大二叉树
Problem: 654. 最大二叉树 文章目录 题目描述思路复杂度Code 题目描述 思路 对于构造二叉树这类问题一般都是利用先、中、后序遍历,再将原始问题分解得出结果 1.定义递归函数build,每次将一个数组中的最大值作为当前子树的根节点构造二叉树;…...
基于Netty实现WebSocket客户端
本文是基于Netty快速上手WebSocket客户端,不涉及WebSocket的TLS/SSL加密传输。 WebSocket原理参考【WebSocket简介-CSDN博客】,测试用的WebSocket服务端也是用Netty实现的,参考【基于Netty实现WebSocket服务端-CSDN博客】 一、基于Netty快速…...

homebrew安装mysql的一些问题
本文目录 一、Homebrew镜像安装二、mac安装mysql2.1、修改mysql密码 本文基于mac环境下进行的安装 一、Homebrew镜像安装 Homebrew国内如何自动安装,运行命令/bin/zsh -c "$(curl -fsSL https://gitee.com/cunkai/HomebrewCN/raw/master/Homebrew.sh)" 会…...
产线问题排查
CPU过高 使用top命令查看占用CPU过高的进程。 导出CPU占用高进程的线程栈。 jstack pid >> java.txt Java 内存过高的问题排查 1.分析OOM异常的原因,堆溢出?栈溢出?本地内存溢出? 2.如果是堆溢出,导出堆dump&…...

华为WLAN实验继续-2,多个AP如何部署
----------------------------------------如果添加新的AP,如何实现多AP的服务----------- 新增加一个AP2启动之后发现无法获得IP地址 在AP2上查看其MAC地址,并与将其加入到AC中去 打开AC,将AP2的MAC加入到AC中 sys Enter system view, re…...
手把手教你写Java项目(1)——流程
个人练手项目的一般流程: 个人练手项目的流程通常相对简单和灵活,但仍然遵循一定的步骤来确保项目的顺利进行。流程相对较为详细,不是所有流程都要实现,一些仅供参考。主要是让大家对项目有初步的了解,不至于无法入手…...
微信小程序post请求
一、普通请求 wx.request({url: http://43.143.124.247:8282/sendEmail,method: POST,data: {user: that.data.currarr[0][that.data.mulu[0]] that.data.currarr[1][that.data.mulu[1]] that.data.sushe,pwd: 3101435196qq.com},header: {Content-Type: application/x-www-…...
frm一级4个1大神复习经验分享系列(二)
先说一下自己的情况,8月份中旬开始备考,中间一直是跟着网课走,notes和官方书都没看,然后10月份下旬开始刷题一直到考试。下面分享一些自己备考的经验和走过的弯路。 一级 一级整体学习下来的感受是偏重于基础的理论知识。FRM一级侧…...
理解磁盘分区与管理:U启、PE、DiskGenius、MBR与GUID
目录 U启和PE的区别: U启(U盘启动): PE(预安装环境): 在DiskGenius中分区完成之后是否还需要格式化: 1.建立文件系统: 2.清除数据: 3.检查并修复分区: 分区表格式中,MBR和GUID的区别: 1…...

GPT-4o和GPT-4有什么区别?我们还需要付费开通GPT-4?
GPT-4o 是 OpenAI 最新推出的大模型,有它的独特之处。那么GPT-4o 与 GPT-4 之间的主要区别具体有哪些呢?今天我们就来聊聊这个问题。 目前来看,主要是下面几个差异。 响应速度 GPT-4o 的一个显著优势是其处理速度。它能够更快地回应用户的查…...

《C++ Primer Plus》第十二章复习题和编程练习
目录 一、复习题二、编程练习 一、复习题 1. 假设String类有如下私有成员: // String 类声明 class String { private: char* str;int len;// ... };a. 下述默认构造函数有什么问题? String::String() { } // 默认构造函数b. 下述构造函数有什么问题…...

2024 年科技裁员综合清单
推荐阅读: 独立国家的共同财富 美国千禧一代的收入低于父辈 创造大量就业机会却毁掉了财富 这四件事是创造国家财富的关键 全球财富报告证实联盟自始至终无能 美国人已陷入无休止债务循环中,这正在耗尽他们的财务生命 2024 年,科技行业…...

Linux系统编程学习笔记
1 前言 1.1 环境 平台:uabntu20.04 工具:vim,gcc,make 1.2 GCC Linux系统下的GCC(GNU Compiler Collection)是GNU推出的功能强大、性能优越的多平台编译器,是GNU的代表作品之一。gcc是可以在多种硬体平台上编译出可执…...
vue3 excel 文件导出
//文件导出 在index.ts 中 export function downloadHandle(url: string,params?:object, filename?: string, method: string GET){ try { downloadLoadingInstance ElLoading.service({ text: "正在生成下载数据,请稍候", background: "rgba…...
优雅的代码规范
在软件开发中,优雅的代码规范可以帮助我们写出既美观又实用的代码。 以下是提升代码质量的建议性规范: 命名清晰: 使用描述性强的命名,让代码自我解释。 简洁性: 力求简洁,避免冗余,用最少的代…...

JVM、JRE 和 JDK 的区别,及如何解决学习中可能会遇到的问题
在学习Java编程的过程中,理解JVM、JRE和JDK之间的区别是非常重要的。它们是Java开发和运行环境的核心组件,各自扮演不同的角色。 一、JVM(Java Virtual Machine) 定义 JVM(Java虚拟机)是一个虚拟化的计算…...

【开源】加油站管理系统 JAVA+Vue.js+SpringBoot+MySQL
目录 一、项目介绍 论坛模块 加油站模块 汽油模块 二、项目截图 三、核心代码 一、项目介绍 Vue.jsSpringBoot前后端分离新手入门项目《加油站管理系统》,包括论坛模块、加油站模块、汽油模块、加油模块和部门角色菜单模块,项目编号T003。 【开源…...
详解 Scala 的泛型
一、协变与逆变 1. 说明 协变:Son 是 Father 的子类,则 MyList[Son] 也作为 MyList[Father] 的 “子类”逆变:Son 是 Father 的子类,则 MyList[Son] 作为 MyList[Father] 的 “父类”不变:Son 是 Father 的子类&…...
【本周面试问题总结】
01.如何判断链表中是否有环 ①穷举遍历:从头节点开始,依次遍历单链表中的每一个节点。每遍历到一个新节点,将新节点和此前节点进行比较,若已经存在则说明已被遍历过,链表有环。 ②快慢指针:创建两个指针&am…...

SaltStack
SaltStack 官方文档 1.简介 作用:批量处理状态管理(配置管理)事件驱动(通过事件触发操作)管理私有云/公有云 yum仓库:http://repo.saltstack.com 安装1.master和minionrpm --import https://repo.saltproj…...

【Rust日报】Rust 中的形式验证
文章 - 未来的愿景:Rust 中的形式验证 这篇文章回顾了形式化验证的基本概念,作者展示了如何使用 Hoare triples 来描述和推理程序的正确性,以及如何使用分离逻辑来解决验证的复杂性。文章还解释了为什么 Rust 适用于形式化验证,以…...