RabbitMQ学习总结-消息的可靠性
保证MQ消息的可靠性,主要从三个方面:发送者确认可靠性,MQ确认可靠性,消费者确认可靠性。
1.发送者可靠性:主要依赖于发送者重试机制,发送者确认机制;
发送者重试机制,其实就是配置文件配置重试规则,当消息发送失败后,会根据配置的重试次数,进行多次发送重试,如代码:
spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数
发送者确认机制:则是依赖于消息的回执,这其中包括发送者回执,和消费者回执两种,但是这种回执都比较耗性能,会导致消息消费的很慢。并且,这也是需要在配置文件中做配置的:
spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型publisher-returns: true # 开启publisher return机制
并且还要有代码的实现,这种方式极大的影响了性能,:
@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("触发return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}
@Test
void testPublisherConfirm() {// 1.创建CorrelationDataCorrelationData cd = new CorrelationData();// 2.给Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 2.1.Future发生异常时的处理逻辑,基本不会触发log.error("send message fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执log.debug("发送消息成功,收到 ack!");}else{ // result.getReason(),String类型,返回nack时的异常描述log.error("发送消息失败,收到 nack, reason : {}", result.getReason());}}});// 3.发送消息rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}
2.MQ自身的可靠性:交换机/队列/消息都实现持久化,消息不会丢失,如果是在项目中通过代码创建的交换机/队列/消息,spring默认就是持久化的,如果在mq的客户端手工配置,那就要选定各个参数了。持久化后的消息会直接进入磁盘,不在经过内存了,正常来讲有IO的操作会慢才对,但是在实际的操作中却是非常快。
MQ队列最怕的就是消息积压,导致内存溢出。在3.12版本以后,MQ直接默认就是Laz懒惰队列的模式了,这个模式会直接加载到磁盘,当用到消息的时候,会从磁盘加载到内存,磁盘空间很大,支持数百万级别的存储,所以内存溢出的可能性就会大大降低。我们可以在mq客户端手动设置为lazy队列,也可以在代码中直接实现,代码如下:
@RabbitListener(queuesToDeclare = @Queue(name = "lazy.queue",durable = "true",arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){log.info("接收到 lazy.queue的消息:{}", msg);
}
3.消费者的可靠性:
3.1消费者消费消息后,向MQ发送回执,让MQ知道消息是否正常被消费了,目前回执有三种:
ack:成功处理了消息,MQ从队列中就会删除消息,正常。
nack:失败处理了消息,MQ需要再次投递消息,这会出现一直重试的问题。
reject:消息失败,并拒绝了消息,并且从队列中删除了消息。这个消息被删除了,岂不是数据就丢失了。
对于以上三种回执,基本回执都是固定的,AMQP提供了消息确认的方式,不用写代码,配置就可以,配置有三种:none-配置它失败了,消息会被删除,auto-失败了,消息会回到MQ重新投递,不会丢失,不会被删除,manual-太麻烦,算了。
不过,对于auto的配置,对于返回的异常,会有两种判断:1,如果是业务异常,会自动返回nack
如果是消息处理或者校验异常,会直接进行reject
spring:rabbitmq:listener:simple:acknowledge-mode: auto
3.2 生产者有重试机制,消费者也有重试机制,但是,对于消费者的重试,如果一直失败,那就要有一定的策略,可以把这个失败的消息放到另一个交换机上,后续人工进行干预,这样可以保证消息不丢失。
对于消费者的重试配置:
spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
如何把消息发送到另一个交换机上呢?
在消费者服务定义一个处理失败消息队列的交换机,这样就可以把消息存储过去了
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}
4.业务的幂等性判断
4.1.对于一条MQ消息,为了防止被重复消费,可以做一个唯一的msgID,当消费的时候可以先检查下这个ID,如果已经消费过了,那就不能再消费了,一定程度上可以避免被重复消费,代码如下:
@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jjmc.setCreateMessageIds(true);return jjmc;
}
4.2.还有一种保证消费唯一性的就是业务上的判断,当需要消费消息的时候,可以先提前去查询下需要消费消息的状态,如果状态已经发生了改变,自然也不用再去消费这条消息了
5.对于以上所有保证消息可靠性的方案,其实都不能完全保证,最终需要一个兜底的方案,兜底方案我们可以采取一个定时任务的方式,定时轮询检查消息是否消费。比如时间间隔多少秒进行一次轮询检查,这种方式我们可以理解为主动查询。这种兜底很大程度上可以保证业务上的一致性。
相关文章:
RabbitMQ学习总结-消息的可靠性
保证MQ消息的可靠性,主要从三个方面:发送者确认可靠性,MQ确认可靠性,消费者确认可靠性。 1.发送者可靠性:主要依赖于发送者重试机制,发送者确认机制; 发送者重试机制,其实就是配置…...
2024蓝桥杯每日一题(BFS)
备战2024年蓝桥杯 -- 每日一题 Python大学A组 试题一:母亲的奶牛 试题二:走迷宫 试题三:八数码1 试题四:全球变暖 试题五:八数码2 试题一:母亲的奶牛 【题目描述】 农夫约…...
力扣思路题:最长特殊序列1
int findLUSlength(char * a, char * b){int alenstrlen(a),blenstrlen(b);if (strcmp(a,b)0)return -1;return alen>blen?alen:blen; }...
c# 的ref 和out
在C#中,ref和out是用于方法参数的关键字,它们都允许在方法调用中对参数进行修改。 ref关键字用于传递参数的引用。当使用ref关键字声明一个参数时,实际上是在告诉编译器此参数在调用方法之前必须被赋值。ref参数传递的是参数的引用地址&…...
ONLYOFFICE文档8.0全新发布:私有部署、卓越安全的协同办公解决方案
ONLYOFFICE文档8.0全新发布:私有部署、卓越安全的协同办公解决方案 文章目录 ONLYOFFICE文档8.0全新发布:私有部署、卓越安全的协同办公解决方案摘要📑引言 🌟正文📚一、ONLYOFFICE文档概述 📊二、ONLYOFFI…...
Mar 14 | Datawhale 01~04 打卡 | Leetcode面试下
第一阶段主要就是学习字符串的处理和二叉树的遍历。前一段时间学习了二叉树的遍历,记忆还比较深刻,这几个题还是比较轻松的做出来了;但是像Longest Palindromic Substring这样的题除了简单的字符串处理(回文判断)&…...
【计算机网络】什么是http?
目录 前言 1. 什么是HTTP协议? 2. 为什么使用HTTP协议? 3. HTTP协议通信过程 4. 什么是url? 5. HTTP报文 5.1 请求报文 5.2 响应报文 6. HTTP请求方式 7. HTTP头部字段 8. HTTP状态码 9. 连接管理 长连接与短连接 管线化连接…...
【python开发】并发编程(上)
并发编程(上) 一、进程和线程(一)多线程(二)多进程(三)GIL锁 二、多线程开发(一)t.start()(二)t.join()(三)t.…...
用c语言实现扫雷游戏
文章目录 概要整体架构流程代码的实现小结 概要 学习了c语言后,我们可以通过c语言写一些小程序,然后我们这篇文章主要是,扫雷游戏的一步一步游戏。 整体架构流程 扫雷网页版 根据上面网页版的基础扫雷可以看出是一个99的格子,…...
LeetCode 2882.删去重复的行
DataFrame customers ------------------- | Column Name | Type | ------------------- | customer_id | int | | name | object | | email | object | ------------------- 在 DataFrame 中基于 email 列存在一些重复行。 编写一个解决方案,删除这些重复行&#…...
对OceanBase进行 sysbench 压测前,如何用 obdiag巡检
有一些用户想对 OceanBase 进行 sysbench 压测,并向我询问是否需要对数据库的各种参数进行调整。我想起有一个工具 obdiag ,具备对集群进行巡检的功能。因此,我正好借此机会试用一下这个工具。 obdiag 功能的比较丰富,详细情况可参…...
每天学习几道面试题|Kafka架构设计类
文章目录 1. Kafka 是如何保证高可用性和容错性的?2. Kafka 的存储机制是怎样的?它是如何处理大量数据的?3. Kafka 如何处理消费者的消费速率低于生产者的生产速率?4. Kafka 集群中的 Controller 是什么?它的作用是什么…...
.rmallox勒索病毒解密方法|勒索病毒解决|勒索病毒恢复|数据库修复
导言: 近年来,勒索病毒的威胁日益增加,其中一种名为.rmallox的勒索病毒备受关注。这种病毒通过加密文件并勒索赎金来威胁受害者。本文将介绍.rmallox勒索病毒的特点,以及如何恢复被其加密的数据文件,并提供预防措施&a…...
安卓性能优化面试题 11-15
11. 简述APK安装包瘦身方案 ?(1):剔 除掉冗余的代码与不必要的jar包;具体来讲的话,我们可以使用SDK集成的ProGuard混淆工具,它可以在编译时检查并删除未使用的类、字段、方法 和属性,它会遍历所有代码找到无用处的代码,所有那些不可达的代码都会在生成最终apk文件之前被…...
Python错题集-9PermissionError:[Errno 13] (权限错误)
1问题描述 Traceback (most recent call last): File "D:\pycharm\projects\5-《Python数学建模算法与应用》程序和数据\02第2章 Python使用入门\ex2_38_1.py", line 9, in <module> fpd.ExcelWriter(data2_38_3.xlsx) #创建文件对象 File "D:…...
QT TCP通信介绍
QT是一个跨平台的C应用程序开发框架,它提供了一套完整的工具和库,用于开发各种类型的应用程序,包括图形用户界面(GUI)应用程序、命令行工具、网络应用程序等。QT提供了丰富的功能和类来简化网络通信的开发,其中包括TCP通信。 TCP…...
保姆级教学!微信小程序设计全攻略!
微信小程序开启了互联网软件的新使用模式。在各种微信小程序争相抢占流量的同时,如何设计微信小程序?让用户感到舒适是设计师在产品设计初期应该考虑的问题。那么如何做好微信小程序的设计呢?即时设计总结了以下设计指南,希望对准…...
日期差值的计算
1、枚举所有数值进行日期判断 时间复杂度是o(n)的,比较慢,单实例能凑合用,多实例的话时间复杂度有点高。 核心代码就是判断某个八位数能否表示一个日期。 static int[] month {0,31,28,31,30,31,30,31,31,30,31,30,31};static String a, b…...
为什么需要Occupancy?
1.能够得到3D的占用信息 在基于BEV (鸟瞰图) 的2D预测模型中,我们通常仅具有二维平面(x和y坐标)上的信息。这种方法对于很多应用场景来说已经足够,但它并不考虑物体在垂直方向(z轴)上的分布。这限制了模型的…...
SSA优化最近邻分类预测(matlab代码)
SSA-最近邻分类预测matlab代码 麻雀搜索算法(Sparrow Search Algorithm, SSA)是一种新型的群智能优化算法,在2020年提出,主要是受麻雀的觅食行为和反捕食行为的启发。 数据为Excel分类数据集数据。 数据集划分为训练集、验证集、测试集,比例为8&#…...
3步掌握BilibiliDown:B站视频下载全攻略与效率提升指南
3步掌握BilibiliDown:B站视频下载全攻略与效率提升指南 【免费下载链接】BilibiliDown (GUI-多平台支持) B站 哔哩哔哩 视频下载器。支持稍后再看、收藏夹、UP主视频批量下载|Bilibili Video Downloader 😳 项目地址: https://gitcode.com/gh_mirrors/…...
DAMO-YOLO实战:用AI视觉系统做内容安全审核与统计
DAMO-YOLO实战:用AI视觉系统做内容安全审核与统计 1. 引言:当AI视觉遇见内容安全 在数字内容爆炸式增长的今天,如何高效地进行内容审核成为许多平台面临的挑战。传统人工审核不仅效率低下,而且容易因疲劳导致误判。本文将介绍如…...
Luau数据流分析技术:如何实现精准的类型推断
Luau数据流分析技术:如何实现精准的类型推断 【免费下载链接】luau A fast, small, safe, gradually typed embeddable scripting language derived from Lua 项目地址: https://gitcode.com/gh_mirrors/lu/luau Luau是一种快速、小巧、安全且支持渐进类型化…...
OpenCV图像预处理失效全解析,深度解读光照不均、反光伪影、亚像素抖动下的鲁棒代码实现
第一章:OpenCV图像预处理失效的典型工业场景综述在工业视觉检测系统中,OpenCV常被用作图像预处理的核心工具,但其默认参数与理想假设在真实产线环境中频繁失效。光照剧烈波动、镜头污损、金属反光、高速运动拖影以及低信噪比成像等物理约束&a…...
从一道经典OJ题出发:详解二叉树‘凹入表示法’的输出技巧与C++实现
从一道经典OJ题出发:详解二叉树‘凹入表示法’的输出技巧与C实现 1. 凹入表示法的独特魅力与实现挑战 在算法竞赛和数据结构面试中,二叉树的输出格式往往成为区分选手水平的关键细节。不同于常见的层序遍历或图形化展示,凹入表示法࿰…...
2分钟搞定:Windows包管理器Winget一键安装全攻略
2分钟搞定:Windows包管理器Winget一键安装全攻略 【免费下载链接】winget-install Install winget tool using PowerShell! Prerequisites automatically installed. Works on Windows 10/11 and Server 2022. 项目地址: https://gitcode.com/gh_mirrors/wi/winge…...
vLLM生产-解码分离架构:从概念到部署的吞吐优化实践
1. 为什么需要生产-解码分离架构 第一次部署大模型在线服务时,我盯着监控面板上的GPU利用率曲线直挠头——为什么计算单元总是间歇性满载又突然空闲?后来发现这是典型的Prefill-Decode耦合架构的弊端。就像餐厅里同一个厨师既要负责备菜(切配…...
Obsidian Local Images Plus 插件使用指南
Obsidian Local Images Plus 插件使用指南 【免费下载链接】obsidian-local-images-plus This repo is a reincarnation of obsidian-local-images plugin which main aim was downloading images in md notes to local storage. 项目地址: https://gitcode.com/gh_mirrors/o…...
MobaXterm远程连接频繁掉线?3个SSH保活设置让你告别断连烦恼
MobaXterm远程连接频繁掉线?3个SSH保活设置让你告别断连烦恼 当你在深夜调试代码,或是处理关键服务器运维任务时,突然弹出的"Connection closed"提示足以让人抓狂。MobaXterm作为Windows平台最受欢迎的全能终端工具,其免…...
大模型进阶:掌握Function Calling和MCP,解锁AI生产力(收藏版)
本文深入探讨了Function Calling技术如何帮助大模型获取实时信息、执行任务,以及MCP协议在大模型与外部交互中的关键作用。文章阐述了从提示工程到RAG,再到Function Calling和MCP的技术演进路径,强调了这些技术如何使大模型从信息工具转变为生…...
