当前位置: 首页 > news >正文

【从零开始学习RabbitMQ | 第二篇】如何确保MQ的可靠性和消费者可靠性

目录

前言:

MQ可靠性: 

数据持久化:

Lazy Queue: 

消费者可靠性:

消费者确认机制:

消费失败处理:

MQ保证幂等性:

方法一:

总结:


前言:

在上一篇文章中,我们介绍了如何确保生产者的可靠性,确保消息一定可以到达MQ。

【从零开始学习RabbitMQ | 第一篇】如何确保生产者的可靠性-CSDN博客icon-default.png?t=N7T8https://liyuanxin.blog.csdn.net/article/details/139261125?spm=1001.2014.3001.5502

但是MQ自己也是会丢失消息的,比如MQ的突然宕机或者消息过多造成的阻塞,因此我们这篇文章来介绍一下如何确保MQ的可靠性和消费者可靠性

默认情况下,RabbitMQ会把接收到的消息保存在内存中来降低消息收发的延迟,但这样会导致两个问题:

  • 一旦MQ宕机,内存中的消息会丢失
  • 内存空间有限,当消费者故障或者处理过慢的时候,会导致消息的积压,引发阻塞。 

想要保证MQ的可靠性,主要依赖于两种方式:

  • 数据持久化
  • Lazy Queue 

MQ可靠性: 

数据持久化:

RabbitMQ实现持久化主要有三个部分:

  1. 持久化队列(Durable Queues): 持久化队列指的是队列本身被存储在磁盘上,这样即使RabbitMQ服务器重启,队列也不会丢失。要创建持久化队列,需要在声明队列时设置durable属性为true。持久化队列中的消息默认不是持久化的,需要单独设置每条消息为持久化。

  2. 持久化消息(Persistent Messages): 持久化消息意味着消息本身被存储在磁盘上,因此即使RabbitMQ服务器重启,消息也不会丢失。在发送消息时,需要设置消息的deliveryMode属性为2(即PERSISTENT),这样RabbitMQ就会将消息存储到磁盘上。

  3. 持久化交换机(Durable Exchanges): 持久化交换器与持久化队列类似,指的是交换器被存储在磁盘上,这样服务器重启后交换器依然存在。声明交换器时,也需要设置durable属性为true。持久化交换器确保了消息路由的结构在服务器重启后能够保留。

当我们把消息持久化到磁盘中的时候,就避免了消息挤压造成的阻塞。但是当我们把消息持久化到磁盘中的时候,这个时候是不能接收新的消息的。

Lazy Queue: 

因此实际上把消息持久化到磁盘中不是一个很好的解决方案,在RabbitMQ的3.6.0版本后,增加了Lazy  Queue这个概念。其实就是懒惰队列:

懒惰队列的特性如下:

  • 接收到消息后直接存入磁盘而不是内存(内存只保留最近的消息,默认2048条)。
  • 消费者要消费消息的时候才会从磁盘中读取并加载到内存中。
  • 支持数百万条的消息存储。

在3.12版本后,所有的队列都是Lazy Queue模式,无法更改。

消费者可靠性:

消费者确认机制:

 为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。当消费者处理消息的结束之后,将会向RabbitMQ发送一个回执,告知自己消息的处理状态 ,消费者一共可以向Rabbitmq回执三种处理状态,分别是:

  • ACK:成功处理消息,RabbitMQ从队列中删除该消息
  • NACK:消息处理失败,RabbitMQ再次投递消息
  • REJECT: 消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

而消费者确认机制并不用我们手写,Spring AMQP已经为我们实现了消费者消息确认机制,并允许我们通过配置文件的形式选择ACK处理方式,有三种:

  • none:不处理,消息投递给消费者之后就会ack,消息会从MQ中删除,不安全
  • manual:手动模式。需要自己调用api发送ack或者reject。
  • auto:托管给Spring AMQP 利用 AOP实现消费者确认机制。当业务正常执行的时候返回ack

当业务异常的时候,根据异常不同返回不同的结果:

  1. 如果是业务异常,自动返回nack
  2. 如果是消息处理或者校验异常,自动返回reject

Spring AMQP的消费者确认机制默认开启auto。

消费失败处理:

如果我们把消费者确认机制配置为auto的时候,当消费者出现异常之后,消息会不断的重新入队到队列,再重新发送给消费者。同样的消息再次引发异常,再次入队,再次发送给消费者。

这样会导致MQ的消息处理数量飙升,带来不必要的压力,因此除了把处理失败的消息重新进行处理之外,我们还应该有其他的处理方案。

我们可以利用Spring 的retry机制,在消费者出现异常的时候使用本地重试,而不是无限制的requeue到MQ队列中。

    listener:simple:prefetch: 1retry:enabled: true #开启超时重试机制initial-interval: 1000ms #失败后的初始等待时间multiplier: 1 #失败后下次等待时长倍数,下次等待时长= Initial - interval * multipliermax-attempts: 3 #最大重试次数stateless: true #true无状态,false有状态,如果业务中包含事务,就改为fasle

当我们开启重试模式之后,如果重试次数耗尽并且消息依然失败,那么就需要有MessageRecoverer接口来处理,这个接口包含三种不同的实现:

  • RejectAndDontRequeueRecoverer :重试耗尽之后,直接reject,丢弃消息。
  • ImmedIateRequeueMessageRecoverer : 重试耗尽之后,返回nack,消息重新入队
  • RepublishMessageRecoerer:重试耗尽之后,将失败消息投递到指定的交换机

如果消息丢失不会对业务产生重大影响,可以选择RejectAndDontRequeueRecoverer

如果希望消息有机会被重新处理,可以选择ImmediateRequeueMessageRecoverer

RepublishMessageRecoverer则适用于需要对失败消息进行进一步分析或记录的情况。

MQ保证幂等性:

通过上述的各种保证措施,我们基本上可以确保业务至少被执行一次。那对于一些需要保证幂等性的业务,我们要如何保证这个消息只被消费一次呢?

【从零开始学习重要知识点 | 第一篇】快速了解什么是幂等性以及常见解决方案_幂等性问题如何解决分布式锁-CSDN博客icon-default.png?t=N7T8https://blog.csdn.net/fckbb/article/details/136331113?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522171689607216777224421340%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=171689607216777224421340&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~first_rank_ecpm_v1~rank_v31_ecpm-1-136331113-null-null.142%5Ev100%5Epc_search_result_base1&utm_term=%E5%B9%82%E7%AD%89%E6%80%A7%20%E6%88%91%E6%98%AF%E4%B8%80%E7%9B%98%E7%89%9B%E8%82%89&spm=1018.2226.3001.4187

方法一:

给每一个消息都设置一个唯一ID,用ID区分是否被消费过。当我们消费一个消息的时候,先在数据库中查询是否存在这个数据的ID,如果不存在就消费。如果存在就说明这个消息之前被消费过。

其实他和我们上面黏贴的文章中介绍的token机制的思想很像。而在MQ中也不需要我们手动去设置唯一ID,可以在消费者的消息转换器中开启:

 我们可以看一看这个setCreateMessageIds这个方法中是如何构造id的:

这里逻辑就很清晰了,实际上就是在发消息的时候做一个if判断,如果我们设置了构造id,并且当前消息配置类的id为空,我们就为这个消息构造一个id。

 但是这种方法也有自己的缺点,也就是对业务逻辑有侵入性,而且还有额外的数据库操作

总结:

        在本文中,我们探讨了RabbitMQ作为领先的开源消息代理,如何通过一系列高级特性和策略来确保消息队列和消费者的高度可靠性。

首先,我们讨论了RabbitMQ的持久化机制,它允许消息和队列在服务器重启后依然保持不变。通过将消息标记为持久化,并在队列上启用持久化选项,我们可以确保关键数据不会因系统故障而丢失。

最后,我们介绍了消息确认机制,这是确保消息被成功处理的关键。消费者在处理完消息后发送确认回执,RabbitMQ只有在收到确认后才会从队列中移除消息。如果消费者在处理过程中失败,未确认的消息将重新入队,供其他消费者处理。

如果我的内容对你有帮助,请点赞,评论,收藏。创作不易,大家的支持就是我坚持下去的动力!

相关文章:

【从零开始学习RabbitMQ | 第二篇】如何确保MQ的可靠性和消费者可靠性

目录 前言: MQ可靠性: 数据持久化: Lazy Queue: 消费者可靠性: 消费者确认机制: 消费失败处理: MQ保证幂等性: 方法一: 总结: 前言: …...

常用批处理命令及批处理文件编写技巧

一常用批处理命令 1.查看命令用法:命令 /? //如:cd /? 2.切换盘符目录:cd /d D:\test 或直接输入 d: //进入上次d盘所在的目录 3.切换目录:cd test 4.清屏:cls 5.“arp -a” //它会列出当前设备缓存中的所有…...

android NetworkMonitor记录

是否能上网的状态 上网url地址的设置: NetworkMonitor.java makeCaptivePortalHttpsUrls config_captive_portal_https_urls DEFAULT_CAPTIVE_PORTAL_HTTPS_URLS http准备监测 isCaptivePortal sendHttpAndHttpsParallelWithFallbackProbes httpsProbe.start();…...

OSPF优化——OSPF减少LSA更新量2

二、特殊区域——优化非骨干区域的LSA数量 不是骨干区域、不能存在虚链路 1、不能存在 ASBR 1)末梢区域 该区域将拒绝 4、5LSA的进人,同时由该区域连接骨干0区域的ABR 向该区域,发布一条3类的缺省路由; 该区域内每台路由器均需配置&#xf…...

【AMS】Android 8.0+ 绕开启动后台Service限制

一、背景 应客户要求,需要在开机时,拉起应用A。但因为开机时,同时被拉起的应用过多,导致Launcher在开机那一刻较为卡顿。为解决这一问题,采取了延迟拉起的做法。在开机后,延迟一定时间,由系统服务,拉起应用A。 于是乎,就出现这么个报错: Not allowed to start ser…...

【多态】(超级详细!)

【多态】(超级详细!) 前言一、 多态的概念二、重写1. 方法重写的规则2. 重写和重载的区别 三、多态实现的条件四、 向上转型五、动态绑定 前言 面向对象的三大特征:封装性、继承性、多态性。 extends继承或者implements实现&…...

vue的组件化

vue的组件化 vue的组件化,就是根据功能、业务逻辑、数据流向等因素进行划分把页面拆分成多个组件。组件是资源独立的,组件也可以相互嵌套。目的是提高代码的可读性、可维护性和可复用性。 组件化思想体现 ​ 组件封装步骤 1.公共组件 公共组件全局注…...

spark的简单学习一

一 RDD 1.1 RDD的概述 1.RDD(Resilient Distributed Dataset,弹性分布式数据集)是Apache Spark中的一个核心概念。它是Spark中用于表示不可变、可分区、里面的元素可并行计算的集合。RDD提供了一种高度受限的共享内存模型,即RD…...

【第5章】SpringBoot整合Druid

文章目录 前言一、启动器二、配置1.JDBC 配置2.连接池配置3. 监控配置 三、配置多数据源1. 添加配置2. 创建数据源 四、配置 Filter1. 配置Filter2. 可配置的Filter 五、获取 Druid 的监控数据六、案例1. 问题2. 引入库3. 配置4. 配置类5. 测试类6. 测试结果 七、案例 ( 推荐 )…...

力扣654. 最大二叉树

Problem: 654. 最大二叉树 文章目录 题目描述思路复杂度Code 题目描述 思路 对于构造二叉树这类问题一般都是利用先、中、后序遍历,再将原始问题分解得出结果 1.定义递归函数build,每次将一个数组中的最大值作为当前子树的根节点构造二叉树;…...

基于Netty实现WebSocket客户端

本文是基于Netty快速上手WebSocket客户端,不涉及WebSocket的TLS/SSL加密传输。 WebSocket原理参考【WebSocket简介-CSDN博客】,测试用的WebSocket服务端也是用Netty实现的,参考【基于Netty实现WebSocket服务端-CSDN博客】 一、基于Netty快速…...

homebrew安装mysql的一些问题

本文目录 一、Homebrew镜像安装二、mac安装mysql2.1、修改mysql密码 本文基于mac环境下进行的安装 一、Homebrew镜像安装 Homebrew国内如何自动安装,运行命令/bin/zsh -c "$(curl -fsSL https://gitee.com/cunkai/HomebrewCN/raw/master/Homebrew.sh)" 会…...

产线问题排查

CPU过高 使用top命令查看占用CPU过高的进程。 导出CPU占用高进程的线程栈。 jstack pid >> java.txt Java 内存过高的问题排查 1.分析OOM异常的原因,堆溢出?栈溢出?本地内存溢出? 2.如果是堆溢出,导出堆dump&…...

华为WLAN实验继续-2,多个AP如何部署

----------------------------------------如果添加新的AP,如何实现多AP的服务----------- 新增加一个AP2启动之后发现无法获得IP地址 在AP2上查看其MAC地址,并与将其加入到AC中去 打开AC,将AP2的MAC加入到AC中 sys Enter system view, re…...

手把手教你写Java项目(1)——流程

个人练手项目的一般流程: 个人练手项目的流程通常相对简单和灵活,但仍然遵循一定的步骤来确保项目的顺利进行。流程相对较为详细,不是所有流程都要实现,一些仅供参考。主要是让大家对项目有初步的了解,不至于无法入手…...

微信小程序post请求

一、普通请求 wx.request({url: http://43.143.124.247:8282/sendEmail,method: POST,data: {user: that.data.currarr[0][that.data.mulu[0]] that.data.currarr[1][that.data.mulu[1]] that.data.sushe,pwd: 3101435196qq.com},header: {Content-Type: application/x-www-…...

frm一级4个1大神复习经验分享系列(二)

先说一下自己的情况,8月份中旬开始备考,中间一直是跟着网课走,notes和官方书都没看,然后10月份下旬开始刷题一直到考试。下面分享一些自己备考的经验和走过的弯路。 一级 一级整体学习下来的感受是偏重于基础的理论知识。FRM一级侧…...

理解磁盘分区与管理:U启、PE、DiskGenius、MBR与GUID

目录 U启和PE的区别: U启(U盘启动): PE(预安装环境): 在DiskGenius中分区完成之后是否还需要格式化: 1.建立文件系统: 2.清除数据: 3.检查并修复分区: 分区表格式中,MBR和GUID的区别: 1…...

GPT-4o和GPT-4有什么区别?我们还需要付费开通GPT-4?

GPT-4o 是 OpenAI 最新推出的大模型,有它的独特之处。那么GPT-4o 与 GPT-4 之间的主要区别具体有哪些呢?今天我们就来聊聊这个问题。 目前来看,主要是下面几个差异。 响应速度 GPT-4o 的一个显著优势是其处理速度。它能够更快地回应用户的查…...

《C++ Primer Plus》第十二章复习题和编程练习

目录 一、复习题二、编程练习 一、复习题 1. 假设String类有如下私有成员: // String 类声明 class String { private: char* str;int len;// ... };a. 下述默认构造函数有什么问题? String::String() { } // 默认构造函数b. 下述构造函数有什么问题…...

[2025CVPR]DeepVideo-R1:基于难度感知回归GRPO的视频强化微调框架详解

突破视频大语言模型推理瓶颈,在多个视频基准上实现SOTA性能 一、核心问题与创新亮点 1.1 GRPO在视频任务中的两大挑战 ​安全措施依赖问题​ GRPO使用min和clip函数限制策略更新幅度,导致: 梯度抑制:当新旧策略差异过大时梯度消失收敛困难:策略无法充分优化# 传统GRPO的梯…...

vscode(仍待补充)

写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh? debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...

高危文件识别的常用算法:原理、应用与企业场景

高危文件识别的常用算法:原理、应用与企业场景 高危文件识别旨在检测可能导致安全威胁的文件,如包含恶意代码、敏感数据或欺诈内容的文档,在企业协同办公环境中(如Teams、Google Workspace)尤为重要。结合大模型技术&…...

Python如何给视频添加音频和字幕

在Python中,给视频添加音频和字幕可以使用电影文件处理库MoviePy和字幕处理库Subtitles。下面将详细介绍如何使用这些库来实现视频的音频和字幕添加,包括必要的代码示例和详细解释。 环境准备 在开始之前,需要安装以下Python库:…...

【Java_EE】Spring MVC

目录 Spring Web MVC ​编辑注解 RestController RequestMapping RequestParam RequestParam RequestBody PathVariable RequestPart 参数传递 注意事项 ​编辑参数重命名 RequestParam ​编辑​编辑传递集合 RequestParam 传递JSON数据 ​编辑RequestBody ​…...

均衡后的SNRSINR

本文主要摘自参考文献中的前两篇,相关文献中经常会出现MIMO检测后的SINR不过一直没有找到相关数学推到过程,其中文献[1]中给出了相关原理在此仅做记录。 1. 系统模型 复信道模型 n t n_t nt​ 根发送天线, n r n_r nr​ 根接收天线的 MIMO 系…...

AI+无人机如何守护濒危物种?YOLOv8实现95%精准识别

【导读】 野生动物监测在理解和保护生态系统中发挥着至关重要的作用。然而,传统的野生动物观察方法往往耗时耗力、成本高昂且范围有限。无人机的出现为野生动物监测提供了有前景的替代方案,能够实现大范围覆盖并远程采集数据。尽管具备这些优势&#xf…...

《Docker》架构

文章目录 架构模式单机架构应用数据分离架构应用服务器集群架构读写分离/主从分离架构冷热分离架构垂直分库架构微服务架构容器编排架构什么是容器,docker,镜像,k8s 架构模式 单机架构 单机架构其实就是应用服务器和单机服务器都部署在同一…...

热烈祝贺埃文科技正式加入可信数据空间发展联盟

2025年4月29日,在福州举办的第八届数字中国建设峰会“可信数据空间分论坛”上,可信数据空间发展联盟正式宣告成立。国家数据局党组书记、局长刘烈宏出席并致辞,强调该联盟是推进全国一体化数据市场建设的关键抓手。 郑州埃文科技有限公司&am…...

恶补电源:1.电桥

一、元器件的选择 搜索并选择电桥,再multisim中选择FWB,就有各种型号的电桥: 电桥是用来干嘛的呢? 它是一个由四个二极管搭成的“桥梁”形状的电路,用来把交流电(AC)变成直流电(DC)。…...