RabbitMQ消息可靠性(一)-- 生产者消息确认
前言
在项目中,引入了RabbitMQ这一中间件,必然也需要在业务中增加对数据安全性的一层考虑,来保证RabbitMQ消息的可靠性,否则一个个消息丢失可能导致整个业务的数据出现不一致等问题,对系统带来巨大的影响,消息的可靠性可以主要在三个方面去考虑:生产者消息确认,消费者消息确认,消息持久化,这篇文件说明生产者消息确认的。
一、消息确认流程图

由图可知,消息确认是分为生产者确认和消费者确认的,生产者和MQ之间的消息确认机制为生产者消息确认,MQ和消费者之间的消息确认机制为消费者消息确认
消息丢失的情景有三种情况:
- 发送消息过程中出现网络问题:生产者以为发送成功,但MQ没有收到;(需要生产者消息确认)
- 接收到消息后由于MQ服务器宕机或重启等原因(消息默认存在内存中)导致消息丢失;(需要消息持久化)
- 消费者接收到消息后处理消息出错,没有完成消息的处理,但是自动返回ack(这时候需要开启手动确认模式,消费者消息确认)
二、生产者消息确认
RabbitMQ提供了publisher confirm机制来避免消息投递到MQ过程中丢失。这种机制下每个message都必须要有一个独一无二的ID,来区分开不同的消息,避免ack(消息确认参数)冲突。每当消息发送到MQ成功后,MQ都会返回一个结果给生产者,以保证生产者消息确认。在生产者消息确认时,又有两种返回结果方式(通常两个都要实现)来确保消息投递可靠性,分别为publisher-confirm和publisher-return,以下作出说明。
1、publisher-confirm(发送者确认)
消息成功投递到交换机,返回ack
消息未投递到交换机,返回nack
2、publisher-return(发送者回执)
消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。
三、代码实现
1、配置文件
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest#确认消息已发送到交换机(Exchange)publisher-confirm-type: correlated#确认消息已发送到队列(Queue)publisher-returns: true
publish-confirm-type有三个值,
- none:禁用发布确认模式,是默认值
- simple:同步等待confirm结果,直到超时
- correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
publisher-returns:开启消息失败回调,回调函数ReturnCallback
2、配置ConfirmCallback函数和ReturnCallback函数
/*** 生产者消息回调配置类*/
@Configuration
@Slf4j
public class ProviderCallBackConfig {@Resourceprivate CachingConnectionFactory cachingConnectionFactory;@Beanpublic RabbitTemplate rabbitTemplate() {RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);// 当mandatory设置为true时,若exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,//那么broker会调用basic.return方法将消息返还给生产者。// 当mandatory设置为false时,出现上述情况broker会直接将消息丢弃。rabbitTemplate.setMandatory(true);/*** TODO RabbitMQ生产者发送消息确认回调,解决消息可靠性问题* 消息确认回调,确认消息是否到达broker* data:消息唯一标识* ack:确认结果* cause:失败原因*/rabbitTemplate.setConfirmCallback((data, ack, cause) -> {if (ack) {//消息发送成功后,更新数据库消息状态等逻辑log.info("消息发送至exchange成功------>消息唯一标识: {}, 确认状态: {}, 造成原因: {}",data, ack, cause);} else {//信息发送失败,打印日志后,可以根据业务选择是否重发消息log.info("消息发送至exchange失败------>消息唯一标识: {}, 确认状态: {}, 造成原因: {}", data, ack, cause);}});/*** TODO RabbitMQ生产者发送消息失败回调,解决消息可靠性问题* message 消息* replyCode 回应码* replyText 回应信息* exchange 交换机* routingKey 路由键*/rabbitTemplate.setReturnsCallback((res) -> {//若发送失败,打印错误信息,然后可以根据业务选择重发消息log.error("消息发送至queue失败-------->res: {}", JSON.toJSONString(res));});return rabbitTemplate;}}
到这里,生产者推送消息的消息确认调用回调函数已经完毕。
可以看到上面写了两个回调函数,一个叫 ConfirmCallback ,一个叫 RetrunCallback;
那么以上这两种回调函数都是在什么情况会触发呢?
先从总体的情况分析,推送消息存在3种情况:
①消息推送到server,但是在server里找不到交换机
②消息推送到server,找到交换机了,但是没找到队列
③消息推送成功
①消息推送到server,但是在server里找不到交换机
写个测试接口,把消息推送到名为‘non-existent-exchange’的交换机上(这个交换机是没有创建没有配置的):
@GetMapping("/testProviderMessageBack")@ApiOperation(value = "测试生产者消息回调")@ApiOperationSupport(order = 5)public String testProviderMessageBack() {CorrelationData data = new CorrelationData();data.setId("111");rabbitTemplate.convertAndSend("non-existent-exchange", "TestDirectRouting", "测试生产者消息回调",data);return "ok";}
调用接口,查看项目的控制台输出情况(原因里面有说,没有找到交换机'non-existent-exchange'):
结论: ①这种情况触发的是 ConfirmCallback 回调函数
消息发送至exchange失败------>
消息唯一标识: CorrelationData [id=111],
确认状态: false,
造成原因: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost '/', class-id=60, method-id=40)
②消息推送到server,找到交换机了,但是没找到队列
这种情况就是需要新增一个交换机,但是不给这个交换机绑定队列,我来简单地在DirectRabitConfig里面新增一个直连交换机,名叫‘lonelyDirectExchange’,但没给它做任何绑定配置操作:
@BeanDirectExchange lonelyDirectExchange() {return new DirectExchange("lonelyDirectExchange");}
然后写个测试接口,把消息推送到名为‘lonelyDirectExchange’的交换机上(这个交换机是没有任何队列配置的):
@GetMapping("/testProviderMessageBack2")@ApiOperation(value = "测试生产者消息回调2")@ApiOperationSupport(order = 6)public String testProviderMessageBack2() {CorrelationData data = new CorrelationData();data.setId("222");rabbitTemplate.convertAndSend("lonelyDirectExchange", "TestLonelyDirectRouting", "测试生产者消息回调2",data);return "ok";}
消息发送至exchange成功------>
消息唯一标识: CorrelationData [id=222],
确认状态: true,
造成原因: null
消息发送至queue失败-------->
res: {"exchange":"lonelyDirectExchange","message":{"body":"5rWL6K+V55Sf5Lqn6ICF5raI5oGv5Zue6LCDMg==","messageProperties":{"contentEncoding":"UTF-8","contentLength":0,"contentType":"text/plain","deliveryTag":0,"finalRetryForMessageWithNoId":false,"headers":{"spring_returned_message_correlation":"222"},"lastInBatch":false,"priority":0,"projectionUsed":false,"publishSequenceNumber":0,"receivedDeliveryMode":"PERSISTENT"}},"replyCode":312,"replyText":"NO_ROUTE","routingKey":"TestLonelyDirectRouting"}
这种情况下,两个函数都被调用了,
消息是推送成功到交换机了的,所以ConfirmCallback对消息确认情况是true;
而在RetrunCallback回调函数的打印参数里面可以看到,在路由分发给队列的时候,找不到队列,所以报了错误 NO_ROUTE 。
结论:②这种情况触发的是 ConfirmCallback和RetrunCallback两个回调函数。
③消息推送成功
那么测试下,按照正常调用之前消息推送的接口就行,就调用下 /sendDirectMessage接口,可以看到控制台输出:
结论:这种情况触发的是 ConfirmCallback 回调函数。
相关文章:
RabbitMQ消息可靠性(一)-- 生产者消息确认
前言 在项目中,引入了RabbitMQ这一中间件,必然也需要在业务中增加对数据安全性的一层考虑,来保证RabbitMQ消息的可靠性,否则一个个消息丢失可能导致整个业务的数据出现不一致等问题,对系统带来巨大的影响,…...
9 种方法使用 Amazon CodeWhisperer 快速构建应用
Amazon CodeWhisperer 是一款很赞的生成式人工智能编程工具。自从在工作中使用了 CodeWhisperer,我发现不仅代码编译的效率有所提高,应用开发的工作也变得快乐起来。然而,任何生成式 AI 工具的有效学习都需要初学者要有接受新工作方式的心态和…...
性能测试-性能工程落地的4个阶段(21)
性能工程按照不同的内容和目的划分为4个阶段,分别是线下单系统压测分析阶段、线下全链路压测分析阶段、生产只读业务压测及容量评估阶段、生产读写业务全链路压测及容量评估阶段。(也可以理解为一个企业性能测试体系的发展阶段) 线下单系统压测分析阶段 针对单系统的性能…...
小程序 navigateBack 携带参数返回的三种方式(详细)
如果觉着主图好看,点个赞,你早晚也会看到这么好看的景色! 第一种方式 getCurrentPages 获取当前页面栈。数组中第一个元素为首页,最后一个元素为当前页面。不要尝试修改页面栈,会导致路由以及页面状态错误。不要在 App.onLaunch 的时候调用 getCurrentPages(),此时 page …...
通过内网穿透实现远程连接群晖Drive,轻松实现异地访问群晖NAS
文章目录 前言1.群晖Synology Drive套件的安装1.1 安装Synology Drive套件1.2 设置Synology Drive套件1.3 局域网内电脑测试和使用 2.使用cpolar远程访问内网Synology Drive2.1 Cpolar云端设置2.2 Cpolar本地设置2.3 测试和使用 3. 结语 前言 群晖作为专业的数据存储中心&…...
vue3 + vite常用工具
1. plop 1.1 安装 yarn add plop -D1.2 使用 1.2.1 package.json 配置脚本命令 "scripts": {"dev": "vite --mode dev","build": "vue-tsc --noEmit && vite build","serve": "vite preview"…...
Vue框架分享与总结
总结开发中最常用的vue语法,以及对特定语法的理解。vue官网 文章目录 一、创建vue项目1、使用开发工具创建2、使用命令行创建3、vue框架结构4、Vue文件结构 二、Vue 常用模板语法1、v-if、v-show2、v-for3、v-on4、v-bind5、v-model 三、组件通信1、父组件给子组件传…...
声音生成评价指标——使用声音分类模型评价生成声音质量(基于resnetish、VGGish、AlexNet)
文章目录 引言正文数据预处理将wav转成log-mel频谱图进行保存创建dataset类保存数据 模型定义模型训练过程训练代码定义loss为nan从AlexNet到ResNetloss上下剧烈波动——使用学习率衰减策略学习率调整——根据准确率来调整学习率数据处理问题 模型的测试 总结 引言 这篇文章主要…...
HarmonyOS学习路之方舟开发框架—学习ArkTS语言(状态管理 六)
AppStorage:应用全局的UI状态存储 AppStorage是应用全局的UI状态存储,是和应用的进程绑定的,由UI框架在应用程序启动时创建,为应用程序UI状态属性提供中央存储。 和LocalStorage不同的是,LocalStorage是页面级的&…...
SPA首屏加载速度慢
什么是首屏加载 首屏时间(First Contentful Paint),指的是浏览器从响应用户输入网址地址,到首屏内容渲染完成的时间,此时整个网页不一定要全部渲染完成,但需要展示当前视窗需要的内容 首屏加载可以说是用…...
JVM执行流程
一、Java为什么是一种跨平台的语言? 通常,我们编写的java源代码会被JDK的编译器编译成字节码文件,再由JVM将字节码文件翻译成计算机读的懂得机器码进行执行;因为不同平台使用的JVM不一样,所以不同的JVM会把相同的字节码…...
laravel 凌晨0点 导出数据库
一、创建导出模型 <?php namespace App\Models;use Illuminate\Support\Facades\DB;class DbBackup {private $table;public function __construct(){$this->table env(DB_DATABASE);}public function run($file ){$file !$file ? public_path($this->t…...
mysql MVCC多版本并发控制
mvcc的概念 mvcc 的实现依赖于: 隐藏字段 行格式(row_id,trx_id,roll_ponter)UndologRead view innodb 存储引擎的表来说,聚集索引记录中都包含两个必要的隐藏字段,row_id(如果没有聚集索引,才会创建的) …...
new/delete, malloc/free 内存泄漏如何检测
区别: 首先new/delete是运算符,malloc/free是库函数。malloc/free只开辟内存不初始化;new/delete及开辟内存也初始化。抛出异常的方式:new/delete开辟失败使用抛出bad_alloc;malloc/free通过返回值判断。malloc和new区…...
Java开发推荐关注的网站
一、开发者社区 阿里云开发者社区:https://developer.aliyun.com/腾讯云开发者社区:https://cloud.tencent.com/developer 二、开发规范 阿里巴巴Java开发规范 github地址:https://github.com/alibaba/p3c gitcode地址:https:/…...
OpenHarmony社区运营报告(2023年8月)
本月快讯 2023年8月3日,OpenAtom OpenHarmony(以下简称“OpenHarmony”)发布了Beta2版本。OpenHarmony 4.0 Beta2在系统能力、应用框架、分布式通信、媒体功能、安全性等方面进行了全面升级。其中,ArkUI增强了界面组件能力&#x…...
Web学习笔记-React(路由)
笔记内容转载自 AcWing 的 Web 应用课讲义,课程链接:AcWing Web 应用课。 CONTENTS 1. Web分类2. Route组件3. URL中传递参数4. Search Params传递参数5. 重定向6. 嵌套路由 本节内容是如何将页面和 URL 一一对应起来。 1. Web分类 Web 页面可以分为两…...
MySQL无法查看系统默认字符集以及校验规则
show variables like character_set_database; show variables like collation_database;这个错误信息表示MySQL在尝试访问performance_schema.session_variables表时,发现该表不存在。这个问题可能是由于MySQL的版本升级导致的。解决这个问题的一种方法是运行mysql…...
不负昭华,前程似锦,新一批研发效能认证证书颁发丨IDCF
亲爱的认证学员, 恭喜你成功获得由国家工业和信息化部教育与考试中心颁发的职业技术证书——《研发效能(DevOps)工程师国家职业技术认证》。你的努力和才华得到了官方的认可,这是你职业生涯中的一个重要的里程碑。 这个证书不仅代表着你的专业知识和技…...
深入理解ES6模块化:语法、特性与最佳实践
目录 一、前言 二、ES6模块化基础 1. 模块的定义与导出 2. 模块的导入与使用 3. 模块默认导出与命名导出 4. 模块的循环引用与解决方案 三、模块化语法进阶 1. 模块的命名导出与默认导出的混合使用 2. 模块的别名导出与导入 3. 命名空间的使用与作用 4. 动态导入模块…...
后进先出(LIFO)详解
LIFO 是 Last In, First Out 的缩写,中文译为后进先出。这是一种数据结构的工作原则,类似于一摞盘子或一叠书本: 最后放进去的元素最先出来 -想象往筒状容器里放盘子: (1)你放进的最后一个盘子(…...
LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明
LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造,完美适配AGV和无人叉车。同时,集成以太网与语音合成技术,为各类高级系统(如MES、调度系统、库位管理、立库等)提供高效便捷的语音交互体验。 L…...
1688商品列表API与其他数据源的对接思路
将1688商品列表API与其他数据源对接时,需结合业务场景设计数据流转链路,重点关注数据格式兼容性、接口调用频率控制及数据一致性维护。以下是具体对接思路及关键技术点: 一、核心对接场景与目标 商品数据同步 场景:将1688商品信息…...
(二)原型模式
原型的功能是将一个已经存在的对象作为源目标,其余对象都是通过这个源目标创建。发挥复制的作用就是原型模式的核心思想。 一、源型模式的定义 原型模式是指第二次创建对象可以通过复制已经存在的原型对象来实现,忽略对象创建过程中的其它细节。 📌 核心特点: 避免重复初…...
JDK 17 新特性
#JDK 17 新特性 /**************** 文本块 *****************/ python/scala中早就支持,不稀奇 String json “”" { “name”: “Java”, “version”: 17 } “”"; /**************** Switch 语句 -> 表达式 *****************/ 挺好的ÿ…...
MySQL中【正则表达式】用法
MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现(两者等价),用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例: 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...
pikachu靶场通关笔记22-1 SQL注入05-1-insert注入(报错法)
目录 一、SQL注入 二、insert注入 三、报错型注入 四、updatexml函数 五、源码审计 六、insert渗透实战 1、渗透准备 2、获取数据库名database 3、获取表名table 4、获取列名column 5、获取字段 本系列为通过《pikachu靶场通关笔记》的SQL注入关卡(共10关࿰…...
return this;返回的是谁
一个审批系统的示例来演示责任链模式的实现。假设公司需要处理不同金额的采购申请,不同级别的经理有不同的审批权限: // 抽象处理者:审批者 abstract class Approver {protected Approver successor; // 下一个处理者// 设置下一个处理者pub…...
jmeter聚合报告中参数详解
sample、average、min、max、90%line、95%line,99%line、Error错误率、吞吐量Thoughput、KB/sec每秒传输的数据量 sample(样本数) 表示测试中发送的请求数量,即测试执行了多少次请求。 单位,以个或者次数表示。 示例:…...
高防服务器价格高原因分析
高防服务器的价格较高,主要是由于其特殊的防御机制、硬件配置、运营维护等多方面的综合成本。以下从技术、资源和服务三个维度详细解析高防服务器昂贵的原因: 一、硬件与技术投入 大带宽需求 DDoS攻击通过占用大量带宽资源瘫痪目标服务器,因此…...
