当前位置: 首页 > article >正文

RabbitMQ 幂等性与消息可靠性保障

一、引言

RabbitMQ 是一个广泛应用于软件开发、数据传输、微服务等领域的高效、可靠的开源消息队列系统1。在分布式系统中,保证消息的可靠传递和幂等性是至关重要的,它能够确保系统在各种复杂情况下的稳定性和数据的准确性。

二、消息可靠性保障

(一)生产者端

  • 发送方确认机制(Publisher Confirm)
    • 原理:生产者发送消息后,MQ 会返回一个确认消息给生产者,告知消息是否被成功接收。通过在配置文件中开启publisher - confirm - type来启用该功能,一般设置为correlated,表示成功发布消息到交换机后会触发回调方法。生产者可以为每个消息设置一个唯一的CorrelationData作为消息的标识符,在回调方法中根据这个标识符来确定消息的发送结果。
    • 示例配置:在 Spring Boot 项目的application.yml文件中配置如下:
spring:rabbitmq:addresses: 127.0.0.1host: 5672username: guestpassword: guestvirtual - host: /# 开启消息确认publisher - confirm - type: correlated
  • 事务机制
    • 原理:生产者将发送消息的操作放在一个事务中,如果消息发送过程出现异常,可以回滚事务,确保消息不会丢失或出现不一致的情况。然而,事务机制会阻塞生产者线程,严重影响性能,在生产环境中一般不建议使用4。
    • 示例代码:在使用 RabbitMQ 的 Java 客户端时,可以通过以下方式开启事务:
channel.txSelect();
try {// 发送消息的代码channel.basicPublish(exchange, routingKey, body);channel.txCommit();
} catch (Exception e) {channel.txRollback();
}
  • 失败重试机制2
    • 自带重试机制:如果发送方一开始就连不上 MQ,Spring Boot 中利用 Spring 的retry机制来实现重试。可以在配置文件中配置相关参数,如重试起始间隔时间、最大重试次数、最大重试间隔时间和间隔时间乘数等。
    • 示例配置
spring:rabbitmq:template:retry:initial - interval: 1000msmax - attempts: 10max - interval: 10000msmultiplier: 2
  • 业务重试:针对消息没有到达交换机的情况,在消息发送失败回调中进行处理。首先创建一张表记录发送到中间件的消息,包括消息的状态、第一次重试时间和重试次数等字段。在消息发送时往表中插入记录,在确认回调方法中根据消息的msgId更新消息状态。另外开启定时任务,定时检查状态为发送中且超过重试时间的消息,根据重试次数决定是否重新发送消息。

(二)MQ 中间件端

  • 消息持久化
    • 队列持久化:创建队列时将其设置为持久化,这样可以保证 Broker 在重启后队列的元数据不会丢失。在定义队列时,将durable参数设置为true即可实现队列持久化。
    • 消息持久化:将消息的deliveryMode设置为2,可以将消息持久化到磁盘。这样只有消息成功持久化到磁盘之后,Broker 才会发送通知给生产者进行确认。
  • 交换机持久化:创建交换机时设置为持久化,保证交换机在 Broker 重启后不会丢失。在定义交换机时,将durable参数设置为true
  • 镜像队列:为了防止 MQ 服务器宕机或磁盘损坏导致消息丢失,可以引入镜像队列。镜像队列会将消息复制到多个节点上,即使某个节点出现故障,其他节点仍然可以提供服务,从而提高系统的可靠性。

(三)消费者端

  • 消费者确认机制(Consumer Acknowledgement)
    • 原理:当消费者处理消息结束后,需要向 RabbitMQ 发送一个回执,以告知消息的处理状态。ACK 表示消费者成功处理了消息,RabbitMQ 会从队列中删除该消息;NACK 表示消息处理失败,RabbitMQ 需要再次投递该消息;REJECT 表示消息处理失败并且被拒绝,RabbitMQ 会从队列中删除该消息,但一般很少使用 REJECT,通常只在消息格式存在问题时使用。
    • 确认模式:RabbitMQ 支持三种不同的确认模式,通过acknowledge - mode属性进行配置。manual模式下,消费者接收到消息后需要手动发送确认给发送者;auto模式下,Spring AMQP 利用 AOP 对消息处理逻辑做环绕增强,业务正常执行时自动返回 ACK,出现异常时根据异常类型返回 NACK 或 REJECT;none模式下,消费者接收到消息后不需要发送任何确认给发送者,这种模式无法保证消息的可靠性,一般不使用。
  • 失败重试机制
    • 本地重试:Spring 框架提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限制地将消息重新入队到 MQ 队列。可以通过配置相关参数来控制重试的次数、间隔时间等。在达到最大重试次数后,Spring AMQP 会抛出AmqpRejectAndDontRequeueException异常,并将消息从队列中删除。
    • 重试策略自定义:Spring AMQP 允许开发人员自定义重试次数耗尽后的消息处理策略,通过实现MessageRecovery接口来定义不同的策略,如RejectAndDontRequeueRecoverer(重试次数耗尽后直接拒绝消息并丢弃)、ImmediateRequeueMessageRecoverer(重试次数耗尽后返回 NACK 给生产者使消息重新入队)、RepublishMessageRecoverer(重试次数耗尽后将失败消息投递到指定的交换机和队列中)。

三、幂等性保障

  • 通过唯一标识符保证操作的幂等性
    • 原理:为每个操作生成唯一的标识符(如 ID),并在系统中跟踪这些标识符以检测重复操作。当接收到具有已知标识符的操作时,可以跳过重复的操作。Spring AMQP 的MessageConverter自带了MessageID的功能,只要开启这个功能,就可以为每个消息生成唯一的 ID,也可以在业务中基于 ID 判断是否是重复消息。
    • 示例代码
@Bean
public MessageConverter messageConverter() {// 定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 配置自动创建消息ID,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jjmc.setCreateMessageIds(true);return jjmc;
}
  • 通过业务判断保证操作的幂等性
    • 基于数据库唯一约束:在数据库表中为相关业务字段设置唯一约束,例如在订单表中,以订单编号作为唯一键。当消费者接收到消息并处理业务时,将相关数据插入或更新到数据库中,如果出现唯一约束冲突,则说明该消息是重复的,直接返回成功即可,避免重复执行相同的业务逻辑。
    • 利用 Redis 缓存:在消费者消费消息之前,先将消息的 ID 放到 Redis 中。可以使用SETNX命令(SET if Not eXists)来设置键值对,如果键已经存在,说明之前有人消费过该消息。可以根据键对应的值来判断消息的处理状态,如0表示正在处理或处理失败,1表示处理成功。当消息成功消费之后,将 ID 对应的值设置为1。为了防止出现死锁等情况,可以给键设置一个生存时间。

四、总结

通过在生产者端、MQ 中间件端和消费者端采取一系列的措施,RabbitMQ 可以有效地保证消息的可靠性和幂等性。在实际应用中,需要根据具体的业务场景和需求,合理地配置和使用这些功能,以确保系统的稳定性和数据的一致性。同时,还需要注意一些性能方面的问题,例如事务机制对性能的影响、重试机制可能导致的资源消耗等,在保证系统可靠性的前提下,尽可能地提高系统的性能和效率。

相关文章:

RabbitMQ 幂等性与消息可靠性保障

一、引言 RabbitMQ 是一个广泛应用于软件开发、数据传输、微服务等领域的高效、可靠的开源消息队列系统1。在分布式系统中,保证消息的可靠传递和幂等性是至关重要的,它能够确保系统在各种复杂情况下的稳定性和数据的准确性。 二、消息可靠性保障 &…...

neo4j图数据库基本概念和向量使用

一.节点 1.新建节点 create (n:GroupProduct {name:都邦高保额团意险,description: "保险产品名称"} ) return n CREATE:Neo4j 的关键字,用于创建新节点或关系。 (n:GroupProduct): n 是节点的临时别名(变量名&#…...

修复笔记:获取 torch._dynamo 的详细日志信息

一、问题描述 在运行项目时,遇到与 torch._dynamo 相关的报错,并且希望获取更详细的日志信息以便于进一步诊断问题。 二、相关环境变量设置 通过设置环境变量,可以获得更详细的日志信息: set TORCH_LOGSdynamo set TORCHDYNAM…...

Windows平台下的Qt发布版程序打包成exe可执行文件(带图标)|Qt|C++

首先先找一个可执行文件的图标 可以去阿里的矢量图库里找 iconfont-阿里巴巴矢量图标库 找到想要的图标下载下来 此时的图标是png格式的,我们要转到icon格式的文件 要使用到一个工具Drop Icons_2.1.1.rar - 蓝奏云 生成icon文件后把icon文件放到你项目的根目录下…...

PDF解析新范式:Free2AI工具实测

在数字化浪潮中,PDF文件已成为企业、政府及个人存储与传递信息的核心载体。然而,PDF内容的提取与处理始终是行业痛点——无论是合同解析、研究报告整理,还是大规模知识库构建,传统方法常面临效率低、成本高、准确率不足等问题。Free2AI基于智能体技术与大模型算力,为PDF内…...

CSS--图片链接垂直居中展示的方法

原文网址&#xff1a;CSS--图片链接垂直居中展示的方法-CSDN博客 简介 本文介绍CSS图片链接垂直居中展示的方法。 图片链接 问题复现 源码 <html xml:lang"cn" lang"cn"><head><meta http-equiv"Content-Type" content&quo…...

聊聊Spring AI autoconfigure模块的拆分

序 本文主要研究一下Spring AI autoconfigure模块的拆分 v1.0.0-M6版本 (base) ➜ spring-ai-spring-boot-autoconfigure git:(v1.0.0-M6) tree -L 9 . ├── pom.xml ├── src │ ├── main │ │ ├── java │ │ │ └── org │ │ │ └…...

【Elastsearch】如何获取已创建的api keys

『这种方法其实无法获取秘钥&#xff0c;只是获取了秘钥的名字等信息』 在Elasticsearch中&#xff0c;可以通过API获取已创建的API密钥&#xff08;API keys&#xff09;。以下是具体步骤和示例&#xff1a; 1.使用GET请求获取API密钥 Elasticsearch提供了GETAPI&#xff0c;用…...

Flutter异步原理-Future

前言 在 Dart 中&#xff0c;谈到异步就离不开 Future。无论是 .then()、还是 await&#xff0c;它们背后运作的都是一个私有实现类&#xff1a;_Future &#xff0c;我们平时使用的 Future 只是一个抽象接口&#xff0c;其真正的实现逻辑由_Future 承担。 class _Future<…...

TRAE 配置blender MCP AI自动3D建模

BlenderMCP - Blender模型上下文协议集成 BlenderMCP通过模型上下文协议(MCP)将Blender连接到Claude AI&#xff0c;允许Claude直接与Blender交互并控制Blender。这种集成实现了即时辅助的3D建模、场景创建和操纵。 1.第一步下载 MCP插件(addon.py):Blender插件&#xff0c;在…...

VUE2课程计划表练习

主要练习数据变量对象 以下是修正后的完整代码&#xff1a; //javascript export default {data() {return {list: [{ id: 1, subject: Vue.js 前端实战开发, content: 学习指令&#xff0c;例如 v-if、v-for、v-model 等, place: 自习室, status: false }// 可以在这里添加更…...

虚拟文件系统

虚拟文件系统&#xff08;Virtual File System&#xff0c;VFS&#xff09;是操作系统内核中的一个抽象层&#xff0c;它为不同的文件系统&#xff08;如ext4、NTFS、FAT32等&#xff09;提供统一的访问接口。通过VFS&#xff0c;用户和应用程序无需关心底层文件系统的具体差异…...

2025年软件工程与数据挖掘国际会议(SEDM 2025)

2025 International Conference on Software Engineering and Data Mining 一、大会信息 会议简称&#xff1a;SEDM 2025 大会地点&#xff1a;中国太原 收录检索&#xff1a;提交Ei Compendex,CPCI,CNKI,Google Scholar等 二、会议简介 2025年软件开发与数据挖掘国际会议于…...

基于大模型预测的足月胎膜早破行阴道分娩全流程研究报告

目录 一、引言 1.1 研究背景与意义 1.2 研究目的与方法 1.3 研究创新点 二、胎膜早破(足月)行阴道分娩概述 2.1 胎膜早破定义与分类 2.2 足月胎膜早破行阴道分娩的现状与挑战 2.3 大模型预测引入的必要性 三、大模型预测原理与技术 3.1 大模型介绍 3.2 数据收集与…...

学习记录:DAY28

DispatcherController 功能完善与接口文档编写 前言 没什么动力说废话了。 今天来完善 DispatcherController 的功能&#xff0c;然后写写接口文档。 日程 早上&#xff1a;本来只有早八&#xff0c;但是早上摸鱼了&#xff0c;罪过罪过。下午&#xff1a;把 DispatcherContro…...

软件系统中功能模型 vs 数据模型 对比解析

功能模型 vs 数据模型 对比解析 一、功能模型&#xff08;Functional Model&#xff09; 定义&#xff1a;描述系统 做什么&#xff08;业务逻辑与操作流程&#xff09; 核心关注&#xff1a;行为、交互、业务流程 建模工具&#xff1a; 用例图&#xff08;UML Use Case Dia…...

.NET高频技术点(持续更新中)

1. .NET 框架概述 .NET 框架的发展历程.NET Core 与 .NET Framework 的区别.NET 5 及后续版本的统一平台 2. C# 语言特性 异步编程&#xff08;async/await&#xff09;LINQ&#xff08;Language Integrated Query&#xff09;泛型与集合委托与事件属性与索引器 3. ASP.NET…...

pandas中的数据聚合函数:`pivot_table` 和 `groupby`有啥不同?

pivot_table 和 groupby 是 pandas 中两种常用的数据聚合方法&#xff0c;它们都能实现数据分组和汇总&#xff0c;但在使用方式和输出结构上有显著区别。 0. 基本介绍 groupby分组聚合 groupby 是 Pandas 库中的一个功能强大的方法&#xff0c;用于根据一个或多个列对数据进…...

微调大模型如何准备数据集——常用数据集,Alpaca和ShareGPT

微调大模型如何准备数据集——常用数据集,Alpaca和ShareGPT 数据集准备常用数据集自定义数据集AlpacaShareGPT数据集准备 常用数据集 预训练数据集 Wiki Demo (en)RefinedWeb (en)RedPajama V2 (en)Wikipedia (en)Wikipedia (zh)Pile (en)...

【Gradio】helloworld程序

前言 发现这个库用来做可视化的demo还不错&#xff0c;简单学习一下。 官网 https://www.gradio.app/ 安装 pip install gradio -i https://pypi.tuna.tsinghua.edu.cn/simple/helloWorld 示例 import gradio as grdef greet(name):return "hello"nameifacegr…...

机器学习例题——预测facebook签到位置(K近邻算法)和葡萄酒质量预测(线性回归)

一、预测facebook签到位置 代码展示&#xff1a; import pandas as pd from sklearn.preprocessing import StandardScaler from sklearn.model_selection import train_test_split from sklearn.neighbors import KNeighborsClassifier from sklearn.model_selection import…...

对golang中CSP的理解

概念&#xff1a; CSP模型&#xff0c;即通信顺序进程模型&#xff0c;是由英国计算机科学家C.A.R. Hoare于1978年提出的。该模型强调进程之间通过通道&#xff08;channel&#xff09;进行通信&#xff0c;并通过消息传递来协调并发执行的进程。CSP模型的核心思想是“不要通过…...

使用 pgrep 杀掉所有指定进程

使用 pgrep 杀掉所有指定进程 pgrep 是一个查找进程 ID 的工具&#xff0c;结合 pkill 或 kill 命令可以方便地终止指定进程。以下是几种方法&#xff1a; 方法1&#xff1a;使用 pkill&#xff08;最简单&#xff09; pkill 进程名例如杀掉所有名为 “firefox” 的进程&…...

Missashe考研日记-day36(改版说明)

Missashe考研日记-day36 改版说明 经过一天的思考、纠结和尝试&#xff0c;博主决定对更新内容进行改版&#xff0c;如下&#xff1a;1.不再每天都发一篇日记&#xff0c;改为一周发一篇包含一周七天学习进度的周记&#xff0c;但为了标题和以前相同&#xff08;强迫症&#…...

基于Jetson Nano与PyTorch的无人机实时目标跟踪系统搭建指南

引言&#xff1a;边缘计算赋能智能监控 在AIoT时代&#xff0c;将深度学习模型部署到嵌入式设备已成为行业刚需。本文将手把手指导读者在NVIDIA Jetson Nano&#xff08;4GB版本&#xff09;开发板上&#xff0c;构建基于YOLOv5SORT算法的实时目标跟踪系统&#xff0c;集成无人…...

【LunarVim】CMake LSP配置

在 LunarVim 中为 CMakeLists.txt 文件启用代码提示&#xff08;如补全和语义高亮&#xff09;&#xff0c;需要安装支持 CMake 的 LSP&#xff08;语言服务器&#xff09;和适当的插件。以下是完整配置指南&#xff1a; 1、配置流程 1.1 安装cmake-language-server 通过 Ma…...

Mkdocs页面如何嵌入PDF

嵌入PDF 嵌入PDF代码 &#xff0c;注意PDF的相对地址 <iframe src"../个人简历.pdf (相对地址)" width"100%" height"800px" style"border: 1px solid #ccc; overflow: auto;"></iframe>我的完整代码&#xff1a; <d…...

从零开始学Flink:开启实时计算的魔法之旅

在凌晨三点的数据监控大屏前&#xff0c;某电商平台的技术负责人突然发现一个异常波动&#xff1a;支付成功率骤降15%。传统的数据仓库此时还在沉睡&#xff0c;而基于Flink搭建的实时风控系统早已捕捉到这个信号&#xff0c;自动触发预警机制。当运维团队赶到时&#xff0c;系…...

融合静态图与动态智能:重构下一代智能系统架构

引言&#xff1a;智能系统的分裂 当前的大模型系统架构正处于两个极端之间&#xff1a; 动态智能体系统&#xff1a;依赖语言模型动态决策、自由组合任务&#xff0c;智能灵活但稳定性差&#xff1b; 静态流程图系统&#xff1a;具备强工程能力&#xff0c;可控可靠&#xf…...

滑动窗口-窗口中的最大/小值-单调队列

求窗口的最大值 #include <iostream> //滑动窗口最大值用单调队列q[]&#xff0c;q存储候选最大值的下标 //队列头是最大值的下标 using namespace std; const int N100010; int nums[N],q[N]; int hh0,tt-1;// hh 是队头指针&#xff0c;tt 是队尾指针&#xff0c;初始…...