RabbitMQ 幂等性与消息可靠性保障
一、引言
RabbitMQ 是一个广泛应用于软件开发、数据传输、微服务等领域的高效、可靠的开源消息队列系统1。在分布式系统中,保证消息的可靠传递和幂等性是至关重要的,它能够确保系统在各种复杂情况下的稳定性和数据的准确性。
二、消息可靠性保障
(一)生产者端
- 发送方确认机制(Publisher Confirm)
- 原理:生产者发送消息后,MQ 会返回一个确认消息给生产者,告知消息是否被成功接收。通过在配置文件中开启
publisher - confirm - type
来启用该功能,一般设置为correlated
,表示成功发布消息到交换机后会触发回调方法。生产者可以为每个消息设置一个唯一的CorrelationData
作为消息的标识符,在回调方法中根据这个标识符来确定消息的发送结果。 - 示例配置:在 Spring Boot 项目的
application.yml
文件中配置如下:
- 原理:生产者发送消息后,MQ 会返回一个确认消息给生产者,告知消息是否被成功接收。通过在配置文件中开启
spring:rabbitmq:addresses: 127.0.0.1host: 5672username: guestpassword: guestvirtual - host: /# 开启消息确认publisher - confirm - type: correlated
- 事务机制
- 原理:生产者将发送消息的操作放在一个事务中,如果消息发送过程出现异常,可以回滚事务,确保消息不会丢失或出现不一致的情况。然而,事务机制会阻塞生产者线程,严重影响性能,在生产环境中一般不建议使用4。
- 示例代码:在使用 RabbitMQ 的 Java 客户端时,可以通过以下方式开启事务:
channel.txSelect();
try {// 发送消息的代码channel.basicPublish(exchange, routingKey, body);channel.txCommit();
} catch (Exception e) {channel.txRollback();
}
- 失败重试机制2
- 自带重试机制:如果发送方一开始就连不上 MQ,Spring Boot 中利用 Spring 的
retry
机制来实现重试。可以在配置文件中配置相关参数,如重试起始间隔时间、最大重试次数、最大重试间隔时间和间隔时间乘数等。 - 示例配置:
- 自带重试机制:如果发送方一开始就连不上 MQ,Spring Boot 中利用 Spring 的
spring:rabbitmq:template:retry:initial - interval: 1000msmax - attempts: 10max - interval: 10000msmultiplier: 2
- 业务重试:针对消息没有到达交换机的情况,在消息发送失败回调中进行处理。首先创建一张表记录发送到中间件的消息,包括消息的状态、第一次重试时间和重试次数等字段。在消息发送时往表中插入记录,在确认回调方法中根据消息的
msgId
更新消息状态。另外开启定时任务,定时检查状态为发送中且超过重试时间的消息,根据重试次数决定是否重新发送消息。
(二)MQ 中间件端
- 消息持久化
- 队列持久化:创建队列时将其设置为持久化,这样可以保证 Broker 在重启后队列的元数据不会丢失。在定义队列时,将
durable
参数设置为true
即可实现队列持久化。 - 消息持久化:将消息的
deliveryMode
设置为2
,可以将消息持久化到磁盘。这样只有消息成功持久化到磁盘之后,Broker 才会发送通知给生产者进行确认。
- 队列持久化:创建队列时将其设置为持久化,这样可以保证 Broker 在重启后队列的元数据不会丢失。在定义队列时,将
- 交换机持久化:创建交换机时设置为持久化,保证交换机在 Broker 重启后不会丢失。在定义交换机时,将
durable
参数设置为true
。 - 镜像队列:为了防止 MQ 服务器宕机或磁盘损坏导致消息丢失,可以引入镜像队列。镜像队列会将消息复制到多个节点上,即使某个节点出现故障,其他节点仍然可以提供服务,从而提高系统的可靠性。
(三)消费者端
- 消费者确认机制(Consumer Acknowledgement)
- 原理:当消费者处理消息结束后,需要向 RabbitMQ 发送一个回执,以告知消息的处理状态。ACK 表示消费者成功处理了消息,RabbitMQ 会从队列中删除该消息;NACK 表示消息处理失败,RabbitMQ 需要再次投递该消息;REJECT 表示消息处理失败并且被拒绝,RabbitMQ 会从队列中删除该消息,但一般很少使用 REJECT,通常只在消息格式存在问题时使用。
- 确认模式:RabbitMQ 支持三种不同的确认模式,通过
acknowledge - mode
属性进行配置。manual
模式下,消费者接收到消息后需要手动发送确认给发送者;auto
模式下,Spring AMQP 利用 AOP 对消息处理逻辑做环绕增强,业务正常执行时自动返回 ACK,出现异常时根据异常类型返回 NACK 或 REJECT;none
模式下,消费者接收到消息后不需要发送任何确认给发送者,这种模式无法保证消息的可靠性,一般不使用。
- 失败重试机制
- 本地重试:Spring 框架提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限制地将消息重新入队到 MQ 队列。可以通过配置相关参数来控制重试的次数、间隔时间等。在达到最大重试次数后,Spring AMQP 会抛出
AmqpRejectAndDontRequeueException
异常,并将消息从队列中删除。 - 重试策略自定义:Spring AMQP 允许开发人员自定义重试次数耗尽后的消息处理策略,通过实现
MessageRecovery
接口来定义不同的策略,如RejectAndDontRequeueRecoverer
(重试次数耗尽后直接拒绝消息并丢弃)、ImmediateRequeueMessageRecoverer
(重试次数耗尽后返回 NACK 给生产者使消息重新入队)、RepublishMessageRecoverer
(重试次数耗尽后将失败消息投递到指定的交换机和队列中)。
- 本地重试:Spring 框架提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限制地将消息重新入队到 MQ 队列。可以通过配置相关参数来控制重试的次数、间隔时间等。在达到最大重试次数后,Spring AMQP 会抛出
三、幂等性保障
- 通过唯一标识符保证操作的幂等性
- 原理:为每个操作生成唯一的标识符(如 ID),并在系统中跟踪这些标识符以检测重复操作。当接收到具有已知标识符的操作时,可以跳过重复的操作。Spring AMQP 的
MessageConverter
自带了MessageID
的功能,只要开启这个功能,就可以为每个消息生成唯一的 ID,也可以在业务中基于 ID 判断是否是重复消息。 - 示例代码:
- 原理:为每个操作生成唯一的标识符(如 ID),并在系统中跟踪这些标识符以检测重复操作。当接收到具有已知标识符的操作时,可以跳过重复的操作。Spring AMQP 的
@Bean
public MessageConverter messageConverter() {// 定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 配置自动创建消息ID,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jjmc.setCreateMessageIds(true);return jjmc;
}
- 通过业务判断保证操作的幂等性
- 基于数据库唯一约束:在数据库表中为相关业务字段设置唯一约束,例如在订单表中,以订单编号作为唯一键。当消费者接收到消息并处理业务时,将相关数据插入或更新到数据库中,如果出现唯一约束冲突,则说明该消息是重复的,直接返回成功即可,避免重复执行相同的业务逻辑。
- 利用 Redis 缓存:在消费者消费消息之前,先将消息的 ID 放到 Redis 中。可以使用
SETNX
命令(SET if Not eXists
)来设置键值对,如果键已经存在,说明之前有人消费过该消息。可以根据键对应的值来判断消息的处理状态,如0
表示正在处理或处理失败,1
表示处理成功。当消息成功消费之后,将 ID 对应的值设置为1
。为了防止出现死锁等情况,可以给键设置一个生存时间。
四、总结
通过在生产者端、MQ 中间件端和消费者端采取一系列的措施,RabbitMQ 可以有效地保证消息的可靠性和幂等性。在实际应用中,需要根据具体的业务场景和需求,合理地配置和使用这些功能,以确保系统的稳定性和数据的一致性。同时,还需要注意一些性能方面的问题,例如事务机制对性能的影响、重试机制可能导致的资源消耗等,在保证系统可靠性的前提下,尽可能地提高系统的性能和效率。
相关文章:
RabbitMQ 幂等性与消息可靠性保障
一、引言 RabbitMQ 是一个广泛应用于软件开发、数据传输、微服务等领域的高效、可靠的开源消息队列系统1。在分布式系统中,保证消息的可靠传递和幂等性是至关重要的,它能够确保系统在各种复杂情况下的稳定性和数据的准确性。 二、消息可靠性保障 &…...
neo4j图数据库基本概念和向量使用
一.节点 1.新建节点 create (n:GroupProduct {name:都邦高保额团意险,description: "保险产品名称"} ) return n CREATE:Neo4j 的关键字,用于创建新节点或关系。 (n:GroupProduct): n 是节点的临时别名(变量名&#…...

修复笔记:获取 torch._dynamo 的详细日志信息
一、问题描述 在运行项目时,遇到与 torch._dynamo 相关的报错,并且希望获取更详细的日志信息以便于进一步诊断问题。 二、相关环境变量设置 通过设置环境变量,可以获得更详细的日志信息: set TORCH_LOGSdynamo set TORCHDYNAM…...

Windows平台下的Qt发布版程序打包成exe可执行文件(带图标)|Qt|C++
首先先找一个可执行文件的图标 可以去阿里的矢量图库里找 iconfont-阿里巴巴矢量图标库 找到想要的图标下载下来 此时的图标是png格式的,我们要转到icon格式的文件 要使用到一个工具Drop Icons_2.1.1.rar - 蓝奏云 生成icon文件后把icon文件放到你项目的根目录下…...
PDF解析新范式:Free2AI工具实测
在数字化浪潮中,PDF文件已成为企业、政府及个人存储与传递信息的核心载体。然而,PDF内容的提取与处理始终是行业痛点——无论是合同解析、研究报告整理,还是大规模知识库构建,传统方法常面临效率低、成本高、准确率不足等问题。Free2AI基于智能体技术与大模型算力,为PDF内…...

CSS--图片链接垂直居中展示的方法
原文网址:CSS--图片链接垂直居中展示的方法-CSDN博客 简介 本文介绍CSS图片链接垂直居中展示的方法。 图片链接 问题复现 源码 <html xml:lang"cn" lang"cn"><head><meta http-equiv"Content-Type" content&quo…...
聊聊Spring AI autoconfigure模块的拆分
序 本文主要研究一下Spring AI autoconfigure模块的拆分 v1.0.0-M6版本 (base) ➜ spring-ai-spring-boot-autoconfigure git:(v1.0.0-M6) tree -L 9 . ├── pom.xml ├── src │ ├── main │ │ ├── java │ │ │ └── org │ │ │ └…...
【Elastsearch】如何获取已创建的api keys
『这种方法其实无法获取秘钥,只是获取了秘钥的名字等信息』 在Elasticsearch中,可以通过API获取已创建的API密钥(API keys)。以下是具体步骤和示例: 1.使用GET请求获取API密钥 Elasticsearch提供了GETAPI,用…...
Flutter异步原理-Future
前言 在 Dart 中,谈到异步就离不开 Future。无论是 .then()、还是 await,它们背后运作的都是一个私有实现类:_Future ,我们平时使用的 Future 只是一个抽象接口,其真正的实现逻辑由_Future 承担。 class _Future<…...

TRAE 配置blender MCP AI自动3D建模
BlenderMCP - Blender模型上下文协议集成 BlenderMCP通过模型上下文协议(MCP)将Blender连接到Claude AI,允许Claude直接与Blender交互并控制Blender。这种集成实现了即时辅助的3D建模、场景创建和操纵。 1.第一步下载 MCP插件(addon.py):Blender插件,在…...

VUE2课程计划表练习
主要练习数据变量对象 以下是修正后的完整代码: //javascript export default {data() {return {list: [{ id: 1, subject: Vue.js 前端实战开发, content: 学习指令,例如 v-if、v-for、v-model 等, place: 自习室, status: false }// 可以在这里添加更…...
虚拟文件系统
虚拟文件系统(Virtual File System,VFS)是操作系统内核中的一个抽象层,它为不同的文件系统(如ext4、NTFS、FAT32等)提供统一的访问接口。通过VFS,用户和应用程序无需关心底层文件系统的具体差异…...

2025年软件工程与数据挖掘国际会议(SEDM 2025)
2025 International Conference on Software Engineering and Data Mining 一、大会信息 会议简称:SEDM 2025 大会地点:中国太原 收录检索:提交Ei Compendex,CPCI,CNKI,Google Scholar等 二、会议简介 2025年软件开发与数据挖掘国际会议于…...
基于大模型预测的足月胎膜早破行阴道分娩全流程研究报告
目录 一、引言 1.1 研究背景与意义 1.2 研究目的与方法 1.3 研究创新点 二、胎膜早破(足月)行阴道分娩概述 2.1 胎膜早破定义与分类 2.2 足月胎膜早破行阴道分娩的现状与挑战 2.3 大模型预测引入的必要性 三、大模型预测原理与技术 3.1 大模型介绍 3.2 数据收集与…...
学习记录:DAY28
DispatcherController 功能完善与接口文档编写 前言 没什么动力说废话了。 今天来完善 DispatcherController 的功能,然后写写接口文档。 日程 早上:本来只有早八,但是早上摸鱼了,罪过罪过。下午:把 DispatcherContro…...
软件系统中功能模型 vs 数据模型 对比解析
功能模型 vs 数据模型 对比解析 一、功能模型(Functional Model) 定义:描述系统 做什么(业务逻辑与操作流程) 核心关注:行为、交互、业务流程 建模工具: 用例图(UML Use Case Dia…...

.NET高频技术点(持续更新中)
1. .NET 框架概述 .NET 框架的发展历程.NET Core 与 .NET Framework 的区别.NET 5 及后续版本的统一平台 2. C# 语言特性 异步编程(async/await)LINQ(Language Integrated Query)泛型与集合委托与事件属性与索引器 3. ASP.NET…...

pandas中的数据聚合函数:`pivot_table` 和 `groupby`有啥不同?
pivot_table 和 groupby 是 pandas 中两种常用的数据聚合方法,它们都能实现数据分组和汇总,但在使用方式和输出结构上有显著区别。 0. 基本介绍 groupby分组聚合 groupby 是 Pandas 库中的一个功能强大的方法,用于根据一个或多个列对数据进…...
微调大模型如何准备数据集——常用数据集,Alpaca和ShareGPT
微调大模型如何准备数据集——常用数据集,Alpaca和ShareGPT 数据集准备常用数据集自定义数据集AlpacaShareGPT数据集准备 常用数据集 预训练数据集 Wiki Demo (en)RefinedWeb (en)RedPajama V2 (en)Wikipedia (en)Wikipedia (zh)Pile (en)...
【Gradio】helloworld程序
前言 发现这个库用来做可视化的demo还不错,简单学习一下。 官网 https://www.gradio.app/ 安装 pip install gradio -i https://pypi.tuna.tsinghua.edu.cn/simple/helloWorld 示例 import gradio as grdef greet(name):return "hello"nameifacegr…...
机器学习例题——预测facebook签到位置(K近邻算法)和葡萄酒质量预测(线性回归)
一、预测facebook签到位置 代码展示: import pandas as pd from sklearn.preprocessing import StandardScaler from sklearn.model_selection import train_test_split from sklearn.neighbors import KNeighborsClassifier from sklearn.model_selection import…...

对golang中CSP的理解
概念: CSP模型,即通信顺序进程模型,是由英国计算机科学家C.A.R. Hoare于1978年提出的。该模型强调进程之间通过通道(channel)进行通信,并通过消息传递来协调并发执行的进程。CSP模型的核心思想是“不要通过…...
使用 pgrep 杀掉所有指定进程
使用 pgrep 杀掉所有指定进程 pgrep 是一个查找进程 ID 的工具,结合 pkill 或 kill 命令可以方便地终止指定进程。以下是几种方法: 方法1:使用 pkill(最简单) pkill 进程名例如杀掉所有名为 “firefox” 的进程&…...
Missashe考研日记-day36(改版说明)
Missashe考研日记-day36 改版说明 经过一天的思考、纠结和尝试,博主决定对更新内容进行改版,如下:1.不再每天都发一篇日记,改为一周发一篇包含一周七天学习进度的周记,但为了标题和以前相同(强迫症&#…...
基于Jetson Nano与PyTorch的无人机实时目标跟踪系统搭建指南
引言:边缘计算赋能智能监控 在AIoT时代,将深度学习模型部署到嵌入式设备已成为行业刚需。本文将手把手指导读者在NVIDIA Jetson Nano(4GB版本)开发板上,构建基于YOLOv5SORT算法的实时目标跟踪系统,集成无人…...

【LunarVim】CMake LSP配置
在 LunarVim 中为 CMakeLists.txt 文件启用代码提示(如补全和语义高亮),需要安装支持 CMake 的 LSP(语言服务器)和适当的插件。以下是完整配置指南: 1、配置流程 1.1 安装cmake-language-server 通过 Ma…...

Mkdocs页面如何嵌入PDF
嵌入PDF 嵌入PDF代码 ,注意PDF的相对地址 <iframe src"../个人简历.pdf (相对地址)" width"100%" height"800px" style"border: 1px solid #ccc; overflow: auto;"></iframe>我的完整代码: <d…...
从零开始学Flink:开启实时计算的魔法之旅
在凌晨三点的数据监控大屏前,某电商平台的技术负责人突然发现一个异常波动:支付成功率骤降15%。传统的数据仓库此时还在沉睡,而基于Flink搭建的实时风控系统早已捕捉到这个信号,自动触发预警机制。当运维团队赶到时,系…...

融合静态图与动态智能:重构下一代智能系统架构
引言:智能系统的分裂 当前的大模型系统架构正处于两个极端之间: 动态智能体系统:依赖语言模型动态决策、自由组合任务,智能灵活但稳定性差; 静态流程图系统:具备强工程能力,可控可靠…...
滑动窗口-窗口中的最大/小值-单调队列
求窗口的最大值 #include <iostream> //滑动窗口最大值用单调队列q[],q存储候选最大值的下标 //队列头是最大值的下标 using namespace std; const int N100010; int nums[N],q[N]; int hh0,tt-1;// hh 是队头指针,tt 是队尾指针,初始…...