当前位置: 首页 > news >正文

RabbitMQ消息可靠性(一)-- 生产者消息确认

前言

在项目中,引入了RabbitMQ这一中间件,必然也需要在业务中增加对数据安全性的一层考虑,来保证RabbitMQ消息的可靠性,否则一个个消息丢失可能导致整个业务的数据出现不一致等问题,对系统带来巨大的影响,消息的可靠性可以主要在三个方面去考虑:生产者消息确认,消费者消息确认,消息持久化,这篇文件说明生产者消息确认的。


一、消息确认流程图

由图可知,消息确认是分为生产者确认和消费者确认的,生产者和MQ之间的消息确认机制为生产者消息确认,MQ和消费者之间的消息确认机制为消费者消息确认

消息丢失的情景有三种情况:

  1. 发送消息过程中出现网络问题:生产者以为发送成功,但MQ没有收到;(需要生产者消息确认)
  2. 接收到消息后由于MQ服务器宕机或重启等原因(消息默认存在内存中)导致消息丢失;(需要消息持久化)
  3. 消费者接收到消息后处理消息出错,没有完成消息的处理,但是自动返回ack(这时候需要开启手动确认模式,消费者消息确认)

二、生产者消息确认

RabbitMQ提供了publisher confirm机制来避免消息投递到MQ过程中丢失。这种机制下每个message都必须要有一个独一无二的ID,来区分开不同的消息,避免ack(消息确认参数)冲突。每当消息发送到MQ成功后,MQ都会返回一个结果给生产者,以保证生产者消息确认。在生产者消息确认时,又有两种返回结果方式(通常两个都要实现)来确保消息投递可靠性,分别为publisher-confirm和publisher-return,以下作出说明。

1、publisher-confirm(发送者确认)

消息成功投递到交换机,返回ack

消息未投递到交换机,返回nack

2、publisher-return(发送者回执)

消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。


三、代码实现

1、配置文件

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest#确认消息已发送到交换机(Exchange)publisher-confirm-type: correlated#确认消息已发送到队列(Queue)publisher-returns: true

publish-confirm-type有三个值,

  1. none:禁用发布确认模式,是默认值
  2. simple:同步等待confirm结果,直到超时
  3. correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback

publisher-returns:开启消息失败回调,回调函数ReturnCallback

2、配置ConfirmCallback函数和ReturnCallback函数

/*** 生产者消息回调配置类*/
@Configuration
@Slf4j
public class ProviderCallBackConfig {@Resourceprivate CachingConnectionFactory cachingConnectionFactory;@Beanpublic RabbitTemplate rabbitTemplate() {RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);// 当mandatory设置为true时,若exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,//那么broker会调用basic.return方法将消息返还给生产者。// 当mandatory设置为false时,出现上述情况broker会直接将消息丢弃。rabbitTemplate.setMandatory(true);/*** TODO RabbitMQ生产者发送消息确认回调,解决消息可靠性问题* 消息确认回调,确认消息是否到达broker* data:消息唯一标识* ack:确认结果* cause:失败原因*/rabbitTemplate.setConfirmCallback((data, ack, cause) -> {if (ack) {//消息发送成功后,更新数据库消息状态等逻辑log.info("消息发送至exchange成功------>消息唯一标识: {}, 确认状态: {}, 造成原因: {}",data, ack, cause);} else {//信息发送失败,打印日志后,可以根据业务选择是否重发消息log.info("消息发送至exchange失败------>消息唯一标识: {}, 确认状态: {}, 造成原因: {}", data, ack, cause);}});/*** TODO RabbitMQ生产者发送消息失败回调,解决消息可靠性问题* message      消息* replyCode    回应码* replyText    回应信息* exchange     交换机* routingKey   路由键*/rabbitTemplate.setReturnsCallback((res) -> {//若发送失败,打印错误信息,然后可以根据业务选择重发消息log.error("消息发送至queue失败-------->res: {}", JSON.toJSONString(res));});return rabbitTemplate;}}

到这里,生产者推送消息的消息确认调用回调函数已经完毕。
可以看到上面写了两个回调函数,一个叫 ConfirmCallback ,一个叫 RetrunCallback;
那么以上这两种回调函数都是在什么情况会触发呢?

先从总体的情况分析,推送消息存在3种情况:

①消息推送到server,但是在server里找不到交换机
②消息推送到server,找到交换机了,但是没找到队列
③消息推送成功

①消息推送到server,但是在server里找不到交换机
写个测试接口,把消息推送到名为‘non-existent-exchange’的交换机上(这个交换机是没有创建没有配置的):

@GetMapping("/testProviderMessageBack")@ApiOperation(value = "测试生产者消息回调")@ApiOperationSupport(order = 5)public String testProviderMessageBack() {CorrelationData data = new CorrelationData();data.setId("111");rabbitTemplate.convertAndSend("non-existent-exchange", "TestDirectRouting", "测试生产者消息回调",data);return "ok";}

调用接口,查看项目的控制台输出情况(原因里面有说,没有找到交换机'non-existent-exchange'):

结论: ①这种情况触发的是 ConfirmCallback 回调函数

消息发送至exchange失败------>

消息唯一标识: CorrelationData [id=111],

确认状态: false,

造成原因: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost '/', class-id=60, method-id=40)

②消息推送到server,找到交换机了,但是没找到队列  
这种情况就是需要新增一个交换机,但是不给这个交换机绑定队列,我来简单地在DirectRabitConfig里面新增一个直连交换机,名叫‘lonelyDirectExchange’,但没给它做任何绑定配置操作:

    @BeanDirectExchange lonelyDirectExchange() {return new DirectExchange("lonelyDirectExchange");}

然后写个测试接口,把消息推送到名为‘lonelyDirectExchange’的交换机上(这个交换机是没有任何队列配置的):

@GetMapping("/testProviderMessageBack2")@ApiOperation(value = "测试生产者消息回调2")@ApiOperationSupport(order = 6)public String testProviderMessageBack2() {CorrelationData data = new CorrelationData();data.setId("222");rabbitTemplate.convertAndSend("lonelyDirectExchange", "TestLonelyDirectRouting", "测试生产者消息回调2",data);return "ok";}

消息发送至exchange成功------>

消息唯一标识: CorrelationData [id=222],

确认状态: true,

造成原因: null

消息发送至queue失败-------->

res: {"exchange":"lonelyDirectExchange","message":{"body":"5rWL6K+V55Sf5Lqn6ICF5raI5oGv5Zue6LCDMg==","messageProperties":{"contentEncoding":"UTF-8","contentLength":0,"contentType":"text/plain","deliveryTag":0,"finalRetryForMessageWithNoId":false,"headers":{"spring_returned_message_correlation":"222"},"lastInBatch":false,"priority":0,"projectionUsed":false,"publishSequenceNumber":0,"receivedDeliveryMode":"PERSISTENT"}},"replyCode":312,"replyText":"NO_ROUTE","routingKey":"TestLonelyDirectRouting"}

这种情况下,两个函数都被调用了,

消息是推送成功到交换机了的,所以ConfirmCallback对消息确认情况是true;
而在RetrunCallback回调函数的打印参数里面可以看到,在路由分发给队列的时候,找不到队列,所以报了错误 NO_ROUTE 。
结论:②这种情况触发的是 ConfirmCallback和RetrunCallback两个回调函数。

③消息推送成功
那么测试下,按照正常调用之前消息推送的接口就行,就调用下 /sendDirectMessage接口,可以看到控制台输出:

结论:这种情况触发的是 ConfirmCallback 回调函数。

相关文章:

RabbitMQ消息可靠性(一)-- 生产者消息确认

前言 在项目中&#xff0c;引入了RabbitMQ这一中间件&#xff0c;必然也需要在业务中增加对数据安全性的一层考虑&#xff0c;来保证RabbitMQ消息的可靠性&#xff0c;否则一个个消息丢失可能导致整个业务的数据出现不一致等问题&#xff0c;对系统带来巨大的影响&#xff0c;…...

9 种方法使用 Amazon CodeWhisperer 快速构建应用

Amazon CodeWhisperer 是一款很赞的生成式人工智能编程工具。自从在工作中使用了 CodeWhisperer&#xff0c;我发现不仅代码编译的效率有所提高&#xff0c;应用开发的工作也变得快乐起来。然而&#xff0c;任何生成式 AI 工具的有效学习都需要初学者要有接受新工作方式的心态和…...

性能测试-性能工程落地的4个阶段(21)

性能工程按照不同的内容和目的划分为4个阶段,分别是线下单系统压测分析阶段、线下全链路压测分析阶段、生产只读业务压测及容量评估阶段、生产读写业务全链路压测及容量评估阶段。(也可以理解为一个企业性能测试体系的发展阶段) 线下单系统压测分析阶段 针对单系统的性能…...

小程序 navigateBack 携带参数返回的三种方式(详细)

如果觉着主图好看,点个赞,你早晚也会看到这么好看的景色! 第一种方式 getCurrentPages 获取当前页面栈。数组中第一个元素为首页,最后一个元素为当前页面。不要尝试修改页面栈,会导致路由以及页面状态错误。不要在 App.onLaunch 的时候调用 getCurrentPages(),此时 page …...

通过内网穿透实现远程连接群晖Drive,轻松实现异地访问群晖NAS

文章目录 前言1.群晖Synology Drive套件的安装1.1 安装Synology Drive套件1.2 设置Synology Drive套件1.3 局域网内电脑测试和使用 2.使用cpolar远程访问内网Synology Drive2.1 Cpolar云端设置2.2 Cpolar本地设置2.3 测试和使用 3. 结语 前言 群晖作为专业的数据存储中心&…...

vue3 + vite常用工具

1. plop 1.1 安装 yarn add plop -D1.2 使用 1.2.1 package.json 配置脚本命令 "scripts": {"dev": "vite --mode dev","build": "vue-tsc --noEmit && vite build","serve": "vite preview"…...

Vue框架分享与总结

总结开发中最常用的vue语法&#xff0c;以及对特定语法的理解。vue官网 文章目录 一、创建vue项目1、使用开发工具创建2、使用命令行创建3、vue框架结构4、Vue文件结构 二、Vue 常用模板语法1、v-if、v-show2、v-for3、v-on4、v-bind5、v-model 三、组件通信1、父组件给子组件传…...

声音生成评价指标——使用声音分类模型评价生成声音质量(基于resnetish、VGGish、AlexNet)

文章目录 引言正文数据预处理将wav转成log-mel频谱图进行保存创建dataset类保存数据 模型定义模型训练过程训练代码定义loss为nan从AlexNet到ResNetloss上下剧烈波动——使用学习率衰减策略学习率调整——根据准确率来调整学习率数据处理问题 模型的测试 总结 引言 这篇文章主要…...

HarmonyOS学习路之方舟开发框架—学习ArkTS语言(状态管理 六)

AppStorage&#xff1a;应用全局的UI状态存储 AppStorage是应用全局的UI状态存储&#xff0c;是和应用的进程绑定的&#xff0c;由UI框架在应用程序启动时创建&#xff0c;为应用程序UI状态属性提供中央存储。 和LocalStorage不同的是&#xff0c;LocalStorage是页面级的&…...

SPA首屏加载速度慢

什么是首屏加载 首屏时间&#xff08;First Contentful Paint&#xff09;&#xff0c;指的是浏览器从响应用户输入网址地址&#xff0c;到首屏内容渲染完成的时间&#xff0c;此时整个网页不一定要全部渲染完成&#xff0c;但需要展示当前视窗需要的内容 首屏加载可以说是用…...

JVM执行流程

一、Java为什么是一种跨平台的语言&#xff1f; 通常&#xff0c;我们编写的java源代码会被JDK的编译器编译成字节码文件&#xff0c;再由JVM将字节码文件翻译成计算机读的懂得机器码进行执行&#xff1b;因为不同平台使用的JVM不一样&#xff0c;所以不同的JVM会把相同的字节码…...

laravel 凌晨0点 导出数据库

一、创建导出模型 <?php namespace App\Models;use Illuminate\Support\Facades\DB;class DbBackup {private $table;public function __construct(){$this->table env(DB_DATABASE);}public function run($file ){$file !$file ? public_path($this->t…...

mysql MVCC多版本并发控制

mvcc的概念 mvcc 的实现依赖于&#xff1a; 隐藏字段 行格式&#xff08;row_id,trx_id,roll_ponter&#xff09;UndologRead view innodb 存储引擎的表来说&#xff0c;聚集索引记录中都包含两个必要的隐藏字段&#xff0c;row_id(如果没有聚集索引&#xff0c;才会创建的) …...

new/delete, malloc/free 内存泄漏如何检测

区别&#xff1a; 首先new/delete是运算符&#xff0c;malloc/free是库函数。malloc/free只开辟内存不初始化&#xff1b;new/delete及开辟内存也初始化。抛出异常的方式&#xff1a;new/delete开辟失败使用抛出bad_alloc&#xff1b;malloc/free通过返回值判断。malloc和new区…...

Java开发推荐关注的网站

一、开发者社区 阿里云开发者社区&#xff1a;https://developer.aliyun.com/腾讯云开发者社区&#xff1a;https://cloud.tencent.com/developer 二、开发规范 阿里巴巴Java开发规范 github地址&#xff1a;https://github.com/alibaba/p3c gitcode地址&#xff1a;https:/…...

OpenHarmony社区运营报告(2023年8月)

本月快讯 2023年8月3日&#xff0c;OpenAtom OpenHarmony&#xff08;以下简称“OpenHarmony”&#xff09;发布了Beta2版本。OpenHarmony 4.0 Beta2在系统能力、应用框架、分布式通信、媒体功能、安全性等方面进行了全面升级。其中&#xff0c;ArkUI增强了界面组件能力&#x…...

Web学习笔记-React(路由)

笔记内容转载自 AcWing 的 Web 应用课讲义&#xff0c;课程链接&#xff1a;AcWing Web 应用课。 CONTENTS 1. Web分类2. Route组件3. URL中传递参数4. Search Params传递参数5. 重定向6. 嵌套路由 本节内容是如何将页面和 URL 一一对应起来。 1. Web分类 Web 页面可以分为两…...

MySQL无法查看系统默认字符集以及校验规则

show variables like character_set_database; show variables like collation_database;这个错误信息表示MySQL在尝试访问performance_schema.session_variables表时&#xff0c;发现该表不存在。这个问题可能是由于MySQL的版本升级导致的。解决这个问题的一种方法是运行mysql…...

不负昭华,前程似锦,新一批研发效能认证证书颁发丨IDCF

亲爱的认证学员&#xff0c; 恭喜你成功获得由国家工业和信息化部教育与考试中心颁发的职业技术证书——《研发效能(DevOps)工程师国家职业技术认证》。你的努力和才华得到了官方的认可&#xff0c;这是你职业生涯中的一个重要的里程碑。 这个证书不仅代表着你的专业知识和技…...

深入理解ES6模块化:语法、特性与最佳实践

目录 一、前言 二、ES6模块化基础 1. 模块的定义与导出 2. 模块的导入与使用 3. 模块默认导出与命名导出 4. 模块的循环引用与解决方案 三、模块化语法进阶 1. 模块的命名导出与默认导出的混合使用 2. 模块的别名导出与导入 3. 命名空间的使用与作用 4. 动态导入模块…...

React Native 开发环境搭建(全平台详解)

React Native 开发环境搭建&#xff08;全平台详解&#xff09; 在开始使用 React Native 开发移动应用之前&#xff0c;正确设置开发环境是至关重要的一步。本文将为你提供一份全面的指南&#xff0c;涵盖 macOS 和 Windows 平台的配置步骤&#xff0c;如何在 Android 和 iOS…...

解锁数据库简洁之道:FastAPI与SQLModel实战指南

在构建现代Web应用程序时&#xff0c;与数据库的交互无疑是核心环节。虽然传统的数据库操作方式&#xff08;如直接编写SQL语句与psycopg2交互&#xff09;赋予了我们精细的控制权&#xff0c;但在面对日益复杂的业务逻辑和快速迭代的需求时&#xff0c;这种方式的开发效率和可…...

【论文阅读28】-CNN-BiLSTM-Attention-(2024)

本文把滑坡位移序列拆开、筛优质因子&#xff0c;再用 CNN-BiLSTM-Attention 来动态预测每个子序列&#xff0c;最后重构出总位移&#xff0c;预测效果超越传统模型。 文章目录 1 引言2 方法2.1 位移时间序列加性模型2.2 变分模态分解 (VMD) 具体步骤2.3.1 样本熵&#xff08;S…...

Java 二维码

Java 二维码 **技术&#xff1a;**谷歌 ZXing 实现 首先添加依赖 <!-- 二维码依赖 --><dependency><groupId>com.google.zxing</groupId><artifactId>core</artifactId><version>3.5.1</version></dependency><de…...

AI+无人机如何守护濒危物种?YOLOv8实现95%精准识别

【导读】 野生动物监测在理解和保护生态系统中发挥着至关重要的作用。然而&#xff0c;传统的野生动物观察方法往往耗时耗力、成本高昂且范围有限。无人机的出现为野生动物监测提供了有前景的替代方案&#xff0c;能够实现大范围覆盖并远程采集数据。尽管具备这些优势&#xf…...

CSS | transition 和 transform的用处和区别

省流总结&#xff1a; transform用于变换/变形&#xff0c;transition是动画控制器 transform 用来对元素进行变形&#xff0c;常见的操作如下&#xff0c;它是立即生效的样式变形属性。 旋转 rotate(角度deg)、平移 translateX(像素px)、缩放 scale(倍数)、倾斜 skewX(角度…...

代码规范和架构【立芯理论一】(2025.06.08)

1、代码规范的目标 代码简洁精炼、美观&#xff0c;可持续性好高效率高复用&#xff0c;可移植性好高内聚&#xff0c;低耦合没有冗余规范性&#xff0c;代码有规可循&#xff0c;可以看出自己当时的思考过程特殊排版&#xff0c;特殊语法&#xff0c;特殊指令&#xff0c;必须…...

作为测试我们应该关注redis哪些方面

1、功能测试 数据结构操作&#xff1a;验证字符串、列表、哈希、集合和有序的基本操作是否正确 持久化&#xff1a;测试aof和aof持久化机制&#xff0c;确保数据在开启后正确恢复。 事务&#xff1a;检查事务的原子性和回滚机制。 发布订阅&#xff1a;确保消息正确传递。 2、性…...

Java求职者面试指南:Spring、Spring Boot、Spring MVC与MyBatis技术解析

Java求职者面试指南&#xff1a;Spring、Spring Boot、Spring MVC与MyBatis技术解析 一、第一轮基础概念问题 1. Spring框架的核心容器是什么&#xff1f;它的作用是什么&#xff1f; Spring框架的核心容器是IoC&#xff08;控制反转&#xff09;容器。它的主要作用是管理对…...

门静脉高压——表现

一、门静脉高压表现 00:01 1. 门静脉构成 00:13 组成结构&#xff1a;由肠系膜上静脉和脾静脉汇合构成&#xff0c;是肝脏血液供应的主要来源。淤血后果&#xff1a;门静脉淤血会同时导致脾静脉和肠系膜上静脉淤血&#xff0c;引发后续系列症状。 2. 脾大和脾功能亢进 00:46 …...