【RabbitMQ】之消息的可靠性方案
目录
- 一、数据丢失场景
- 二、数据可靠性方案
- 1、生产者丢失消息解决方案
- 2、MQ 队列丢失消息解决方案
- 3、消费者丢失消息解决方案
一、数据丢失场景
MQ 消息数据完整的链路为:从 Producer 发送消息到 RabbitMQ 服务器中,再由 Broker 服务的 Exchange 根据 Routing_Key 路由到指定的 Queue 队列中,最后投送到消费者中完成消费。
所以消息在上面三个节点都可能存在消息丢失的情况:
- 生产者丢失消息:生产者将消息发送到服务器过程中,由于网络问题或服务器问题可能会导致消息发送失败而导致消息丢失;
- MQ 队列丢失消息:消息是存放在 MQ 服务器的消息队列中的,但由于 MQ 服务故障导致崩溃或服务重启,就可能会导致消息队列中的数据丢失;
- 消费者丢失消息:消费者收到消息后,处理过程中可能因为程序出错导致消息的消费失败,或中途消费者挂了导致消息没有完成消费,这些都会导致消息丢失。
二、数据可靠性方案
上面已经了解到了消息数据可能丢失的环节,所以,我们需要针对每个环节进行处理,以防止数据的丢失。
1、生产者丢失消息解决方案
对于生产者消息丢失的问题,我们有常用的两种方案:
- 1、开启消息发送事务功能;
- 2、开启 Confirm 消息确认机制。
1-1、开启消息发送事务功能
我们可以选择使用 RabbitMQ 提供的事务功能:生产者在发送数据之前开启事物,然后再发送消息。如果消息没有成功被 RabbitMQ 接收到,那么生产者会受到异常报错,这时就可以回滚事务,然后尝试重新发送;如果收到了消息,那么就可以提交事务。
伪代码如下:
channel.txSelect();// 开启事物
try{...
}catch(Exection e){channel.txRollback();// 回滚事物// 重新提交
}
这种方案有个比较大的缺点:RabbitMQ 事务一旦开启,就会变为同步阻塞操作,生产者会阻塞等待是否发送成功,由于比较耗性能而会造成吞吐量的下降。所以并不推荐这种方案。
1-2、开启 Confirm 消息确认机制
在生产者中开启了Confirm 模式,为每次写的消息分配一个唯一的 ID,然后再发送给 RabbitMQ 服务
- 如果成功写入到了 RabbitMQ 之中,RabbitMQ 会给你回传一个 ACK 消息,告诉你这个消息发送 OK 了;
- 如果 RabbitMQ 没能处理这个消息,就会回调你一个 NACK 接口,告诉你这个消息失败了,你可以进行重试。
同时也可以结合这个机制知道自己在内存里维护每个消息的 ID,如果超过一定时间还没接收到这个消息的回调,那么可以尝试进行重发。
伪代码如下:
//开启confirm
channel.confirm();//发送成功回调
public void ack(String messageId){
}// 发送失败回调
public void nack(String messageId){//重发该消息
}
由于事务机制是同步阻塞的,而 Confirm 机制是异步的,在发送消息之后可以接着发送下一个消息,最后通过 RabbitMQ 的回调告知成功与否,所以,生产者消息丢失方案一般都是采用 Confirm 确认机制。
2、MQ 队列丢失消息解决方案
对于 MQ 队列丢失消息的问题,我们可以开启消息的持久化,当然队列本身也要开启持久化,毕竟队列如果不存在了,哪怕消息持久化也没有用。关于 RabbitMQ 的持久化机制,可以参考我的另一篇博客:【RabbitMQ】之持久化机制
开启了消息队列的持久化后,可以将消息的持久化和生产者的 Confirm 机制配合起来,只有消息持久化到了磁盘,才会个生产者发送 ACK,这样就算是在持久化之前 RabbitMQ 挂了,数据丢了,生产者收不到 ACK 回调也会进行消息重发。
持久化有个关键的问题需要注意:
消息在正确存入 RabbitMQ 之后,还需要有一段时间(这个时间很短,但不可忽视)才能存入磁盘之中。因为 RabbitMQ 并不是为每条消息都做 fsync
的处理,可能仅仅保存到 cache 中而不是物理磁盘上,在这段时间内 RabbitMQ 的 broker 发生 crash,消息保存到 cache 但是还没来得及落盘,那么这些消息将会丢失。
解决这个问题的方案是 RabbitMQ 开启镜像队列,镜像队列相当于配置了副本,当 master 在此特殊时间内 crash 掉,可以自动切换到 slave,这样有效地保障了数据的丢失。更多关于 RabbitMQ 镜像队列的知识可以参考我的另一篇博客:【RabbitMQ】之高可用集群搭建
3、消费者丢失消息解决方案
针对消费者丢失消息问题,我们可以使用 RabbitMQ 提供的 ACK 应答机制,首先需要将 自动应答标志位 autoAck
设置为 false 来关闭 RabbitMQ 的自动ack,这是为了防止 Consumer 收到消息后,还没来得及处理完成就 crash 掉了。所以我们采用手动应答的方式:
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
然后在消费者执行完毕之后手动应答:channel.basicAck
。
总结:RabbitMQ 消息的可靠性涉及 producer 端的确认机制、broker 服务的持久化与镜像队列的配置、consumer 端的确认机制。要想确保消息的可靠性越高,那么性能也会随之而降,所以需要根据实际情况进行选择和取舍。
相关文章:

【RabbitMQ】之消息的可靠性方案
目录 一、数据丢失场景二、数据可靠性方案 1、生产者丢失消息解决方案2、MQ 队列丢失消息解决方案3、消费者丢失消息解决方案 一、数据丢失场景 MQ 消息数据完整的链路为:从 Producer 发送消息到 RabbitMQ 服务器中,再由 Broker 服务的 Exchange 根据…...

性能测试/负载测试/压力测试之间的区别
做测试一年多来,虽然平时的工作都能很好的完成,但最近突然发现自己在关于测试的整体知识体系上面的了解很是欠缺,所以,在工作之余也做了一些测试方面的知识的补充。不足之处,还请大家多多交流,互相学习。 …...

Mybatis ,Mybatis-plus列表多字段排序,包含sql以及warpper
根据 mybatis 根据多字段排序已经wrapper 根据多字段排序 首先根据咱们返回前端的数据列来规划好排序字段 如下: 这里的字段为返回VO的字段,要转换成数据库字段然后加入到排序中 示例,穿了 surname,cerRank 多字段,然后是倒序 false 首先创建好映射&am…...
sonarqube PHP编码规范检查
一、PSR规范整理 PHP 已有的编码规范如下 https://blog.csdn.net/qq_40876291/article/details/103848172 1.1 基本编码规范:PSR1 官网规范链接 https://www.php-fig.org/psr/psr-1/ 文件只能使用<?php和<?标记。文件必须仅使用UTF-8,而不使…...
Kylin 麒麟 Qt软件 QtCreator 中文输入法问题
Kylin 麒麟 Qt软件 QtCreator 中文输入法问题 背景: QtCreator 和程序在麒麟系统下没法进行输入,或没法进行输入法的切换。 包括麒麟自带默认搜狗输入法的切换也不行。 使用下面的命令进行安装后,可以正常在QtCreator和程序中使用输入法。 …...

租赁固定资产管理
智能租赁资产管理系统可以为企业单位提供RFID资产管理系统。移动APP资产管理,准确总结易损耗品和固定资金,从入库到仓库库存实时跟踪,控制出库和入库的全过程。同时,备件和耗材与所属资产设备有关,便于备件的申请和管理…...
【Kubernetes】Kubernetes的概念
Kubernetes 一、Kubernetes 概述1.Kubernetes 是什么?2. Kubernetes 的作用3. 为什么要用 Kubernetes?4. Kubernetes 的概念5. Kubernetes 的主要功能6. Kubernetes 集群架构与组件二、Kubernetes 的组件1. Master 组件1.1 Kube-apiserver1.2 Kube-controller-manager1.3 Kub…...

抖音短视频seo源码矩阵系统开发
一、前言: 抖音SEO源码矩阵系统开发是一项专为抖音平台设计的SEO优化系统,能够帮助用户提升抖音视频的搜索排名和曝光度。为了确保系统运行正常,需要安装FFmpeg和FFprobe工具。FFmpeg是一个用于处理多媒体数据的开源工具集,而FFpr…...
npm install pnpm -g报错解决!
目录 报错信息:(反正就是各种err) 报错分析: 错误处理: 其它pnpm报错传送门: 报错信息:(反正就是各种err) npm ERR! code EPERM npm ERR! syscall mkdir npm ERR! pa…...

vue2、vue3生命周期详解以及对比
文章目录 对比vue2-vue3vue3生命周期生命周期的主要阶段详情 vue2 生命周期生命周期钩子函数 总共11个 常用的8个按照这四个阶段我们对应有八个生命周期钩子函数vue生命周期使用场景 对比vue2-vue3 如果熟悉vue2的话,vue3信手拈来,看图 vue3生命周期 on…...

JSON动态生成表格
<!DOCTYPE html> <html><head><meta charset"utf-8"><title></title></head><body><script>var fromjava"{\"total\":3,\"students\":[{\"name\":\"张三\",\&q…...
C# Winform中使用SendMessage方法(发送消息与接收消息)
C# Winform窗口间消息通知,使用Windows API SendMessage方法跨进程实现消息发送,重写WndProc方法接收消息并消息处理 主要使用到如下三个方法函数: WndProc:主要用在拦截并处理系统消息和自定义消息 可以重写WndProc函数…...

Netty各组件基本用法、入站和出站详情、群聊系统的实现、粘包和拆包
Netty Bootstrap和ServerBootstrapFuture和ChannelFutureChannelSelectorNioEventLoop和NioEventLoopGroupByteBuf示例代码 Channel相关组件入站详情出站详情对象编解码ProtoBuf和ProtoStuffnetty实现群聊系统粘包和拆包TCP协议特点举个例子 Bootstrap和ServerBootstrap Boots…...

Day03-作业(AxiosElementUI)
作业1: 根据需求完成如下页面数据列表展示 需求:Vue挂载完成后,通过axios发送异步请求到服务端,获取学生列表数据,并通过Vue展示在页面上 获取数据url:http://yapi.smart-xwork.cn/mock/169327/student 素材: <!DOCTYPE html…...

低代码开发平台源码:基于模型驱动,内置功能强大的建模引擎,零代码也能快速创建智能化、移动化的企业应用程序
管理后台低代码PaaS平台是一款基于 Salesforce Platform 的开源替代方案,旨在为企业提供高效、灵活、易于使用的低代码开发平台。低代码PaaS平台的10大核心引擎功能:1.建模引擎 2.移动引擎 3.流程引擎 4.页面引擎 5.报表引擎 6.安全引擎 7.API引擎 8.应用集成引擎 9…...

下载JMeter的历史版本——个人推荐5.2.1版本
官网地址:https://archive.apache.org/dist/jmeter/binaries/...

2023-07-30 LeetCode每日一题(环形链表 II)
2023-07-30每日一题 一、题目编号 142. 环形链表 II二、题目链接 点击跳转到题目位置 三、题目描述 给定一个链表的头节点 head ,返回链表开始入环的第一个节点。 如果链表无环,则返回 null。 如果链表中有某个节点,可以通过连续跟踪 n…...
设计模式——简单工厂模式
1 概述 将创造对象的工作交给一个单独的类来实现 ,这个单独的类就是工厂。 2 实现 假设要做一个计算器的需求,通常我们想到的是这样写: package com.example.easyfactory;import java.util.Scanner;public class Demo1 {public static vo…...
AnimatedVectorDrawable矢量图动画的使用和修改
文章目录 一、前言二、一个矢量图文件三、参考链接 一、前言 矢量可绘制对象可以提供比较复杂的动画效果,只是绘制比较复杂,这里可以让UI使用Adobe After Effects软件制作出相关的矢量图xml文件交由开发使用。只是如果需要重复播放的动画效果时候&#…...

【C++】—— 多态的基本介绍
前言: 在之前的学习过程中,我们已经对继承进行了详细的学习和了解。今天,我将带领大家学习的是关于 多态 的基本知识。 目录 (一)多态的概念 1、概念 (二)多态的定义及实现 1、多态的构成条…...

Spark 之 入门讲解详细版(1)
1、简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发通用内存并行计算框架。Spark在2013年6月进入Apache成为孵化项目,8个月后成为Apache顶级项目,速度之快足见过人之处&…...

前端导出带有合并单元格的列表
// 导出async function exportExcel(fileName "共识调整.xlsx") {// 所有数据const exportData await getAllMainData();// 表头内容let fitstTitleList [];const secondTitleList [];allColumns.value.forEach(column > {if (!column.children) {fitstTitleL…...

ESP32读取DHT11温湿度数据
芯片:ESP32 环境:Arduino 一、安装DHT11传感器库 红框的库,别安装错了 二、代码 注意,DATA口要连接在D15上 #include "DHT.h" // 包含DHT库#define DHTPIN 15 // 定义DHT11数据引脚连接到ESP32的GPIO15 #define D…...

STM32F4基本定时器使用和原理详解
STM32F4基本定时器使用和原理详解 前言如何确定定时器挂载在哪条时钟线上配置及使用方法参数配置PrescalerCounter ModeCounter Periodauto-reload preloadTrigger Event Selection 中断配置生成的代码及使用方法初始化代码基本定时器触发DCA或者ADC的代码讲解中断代码定时启动…...

MySQL 8.0 OCP 英文题库解析(十三)
Oracle 为庆祝 MySQL 30 周年,截止到 2025.07.31 之前。所有人均可以免费考取原价245美元的MySQL OCP 认证。 从今天开始,将英文题库免费公布出来,并进行解析,帮助大家在一个月之内轻松通过OCP认证。 本期公布试题111~120 试题1…...
【python异步多线程】异步多线程爬虫代码示例
claude生成的python多线程、异步代码示例,模拟20个网页的爬取,每个网页假设要0.5-2秒完成。 代码 Python多线程爬虫教程 核心概念 多线程:允许程序同时执行多个任务,提高IO密集型任务(如网络请求)的效率…...

JUC笔记(上)-复习 涉及死锁 volatile synchronized CAS 原子操作
一、上下文切换 即使单核CPU也可以进行多线程执行代码,CPU会给每个线程分配CPU时间片来实现这个机制。时间片非常短,所以CPU会不断地切换线程执行,从而让我们感觉多个线程是同时执行的。时间片一般是十几毫秒(ms)。通过时间片分配算法执行。…...
Spring AI与Spring Modulith核心技术解析
Spring AI核心架构解析 Spring AI(https://spring.io/projects/spring-ai)作为Spring生态中的AI集成框架,其核心设计理念是通过模块化架构降低AI应用的开发复杂度。与Python生态中的LangChain/LlamaIndex等工具类似,但特别为多语…...

SAP学习笔记 - 开发26 - 前端Fiori开发 OData V2 和 V4 的差异 (Deepseek整理)
上一章用到了V2 的概念,其实 Fiori当中还有 V4,咱们这一章来总结一下 V2 和 V4。 SAP学习笔记 - 开发25 - 前端Fiori开发 Remote OData Service(使用远端Odata服务),代理中间件(ui5-middleware-simpleproxy)-CSDN博客…...

嵌入式学习笔记DAY33(网络编程——TCP)
一、网络架构 C/S (client/server 客户端/服务器):由客户端和服务器端两个部分组成。客户端通常是用户使用的应用程序,负责提供用户界面和交互逻辑 ,接收用户输入,向服务器发送请求,并展示服务…...