RabbitMQ学习总结-消息的可靠性
保证MQ消息的可靠性,主要从三个方面:发送者确认可靠性,MQ确认可靠性,消费者确认可靠性。
1.发送者可靠性:主要依赖于发送者重试机制,发送者确认机制;
发送者重试机制,其实就是配置文件配置重试规则,当消息发送失败后,会根据配置的重试次数,进行多次发送重试,如代码:
spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数
发送者确认机制:则是依赖于消息的回执,这其中包括发送者回执,和消费者回执两种,但是这种回执都比较耗性能,会导致消息消费的很慢。并且,这也是需要在配置文件中做配置的:
spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型publisher-returns: true # 开启publisher return机制
并且还要有代码的实现,这种方式极大的影响了性能,:
@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("触发return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}
@Test
void testPublisherConfirm() {// 1.创建CorrelationDataCorrelationData cd = new CorrelationData();// 2.给Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 2.1.Future发生异常时的处理逻辑,基本不会触发log.error("send message fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执log.debug("发送消息成功,收到 ack!");}else{ // result.getReason(),String类型,返回nack时的异常描述log.error("发送消息失败,收到 nack, reason : {}", result.getReason());}}});// 3.发送消息rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}
2.MQ自身的可靠性:交换机/队列/消息都实现持久化,消息不会丢失,如果是在项目中通过代码创建的交换机/队列/消息,spring默认就是持久化的,如果在mq的客户端手工配置,那就要选定各个参数了。持久化后的消息会直接进入磁盘,不在经过内存了,正常来讲有IO的操作会慢才对,但是在实际的操作中却是非常快。
MQ队列最怕的就是消息积压,导致内存溢出。在3.12版本以后,MQ直接默认就是Laz懒惰队列的模式了,这个模式会直接加载到磁盘,当用到消息的时候,会从磁盘加载到内存,磁盘空间很大,支持数百万级别的存储,所以内存溢出的可能性就会大大降低。我们可以在mq客户端手动设置为lazy队列,也可以在代码中直接实现,代码如下:
@RabbitListener(queuesToDeclare = @Queue(name = "lazy.queue",durable = "true",arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){log.info("接收到 lazy.queue的消息:{}", msg);
}
3.消费者的可靠性:
3.1消费者消费消息后,向MQ发送回执,让MQ知道消息是否正常被消费了,目前回执有三种:
ack:成功处理了消息,MQ从队列中就会删除消息,正常。
nack:失败处理了消息,MQ需要再次投递消息,这会出现一直重试的问题。
reject:消息失败,并拒绝了消息,并且从队列中删除了消息。这个消息被删除了,岂不是数据就丢失了。
对于以上三种回执,基本回执都是固定的,AMQP提供了消息确认的方式,不用写代码,配置就可以,配置有三种:none-配置它失败了,消息会被删除,auto-失败了,消息会回到MQ重新投递,不会丢失,不会被删除,manual-太麻烦,算了。
不过,对于auto的配置,对于返回的异常,会有两种判断:1,如果是业务异常,会自动返回nack
如果是消息处理或者校验异常,会直接进行reject
spring:rabbitmq:listener:simple:acknowledge-mode: auto
3.2 生产者有重试机制,消费者也有重试机制,但是,对于消费者的重试,如果一直失败,那就要有一定的策略,可以把这个失败的消息放到另一个交换机上,后续人工进行干预,这样可以保证消息不丢失。
对于消费者的重试配置:
spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
如何把消息发送到另一个交换机上呢?
在消费者服务定义一个处理失败消息队列的交换机,这样就可以把消息存储过去了
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}
4.业务的幂等性判断
4.1.对于一条MQ消息,为了防止被重复消费,可以做一个唯一的msgID,当消费的时候可以先检查下这个ID,如果已经消费过了,那就不能再消费了,一定程度上可以避免被重复消费,代码如下:
@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jjmc.setCreateMessageIds(true);return jjmc;
}
4.2.还有一种保证消费唯一性的就是业务上的判断,当需要消费消息的时候,可以先提前去查询下需要消费消息的状态,如果状态已经发生了改变,自然也不用再去消费这条消息了
5.对于以上所有保证消息可靠性的方案,其实都不能完全保证,最终需要一个兜底的方案,兜底方案我们可以采取一个定时任务的方式,定时轮询检查消息是否消费。比如时间间隔多少秒进行一次轮询检查,这种方式我们可以理解为主动查询。这种兜底很大程度上可以保证业务上的一致性。
相关文章:
RabbitMQ学习总结-消息的可靠性
保证MQ消息的可靠性,主要从三个方面:发送者确认可靠性,MQ确认可靠性,消费者确认可靠性。 1.发送者可靠性:主要依赖于发送者重试机制,发送者确认机制; 发送者重试机制,其实就是配置…...
2024蓝桥杯每日一题(BFS)
备战2024年蓝桥杯 -- 每日一题 Python大学A组 试题一:母亲的奶牛 试题二:走迷宫 试题三:八数码1 试题四:全球变暖 试题五:八数码2 试题一:母亲的奶牛 【题目描述】 农夫约…...

力扣思路题:最长特殊序列1
int findLUSlength(char * a, char * b){int alenstrlen(a),blenstrlen(b);if (strcmp(a,b)0)return -1;return alen>blen?alen:blen; }...
c# 的ref 和out
在C#中,ref和out是用于方法参数的关键字,它们都允许在方法调用中对参数进行修改。 ref关键字用于传递参数的引用。当使用ref关键字声明一个参数时,实际上是在告诉编译器此参数在调用方法之前必须被赋值。ref参数传递的是参数的引用地址&…...

ONLYOFFICE文档8.0全新发布:私有部署、卓越安全的协同办公解决方案
ONLYOFFICE文档8.0全新发布:私有部署、卓越安全的协同办公解决方案 文章目录 ONLYOFFICE文档8.0全新发布:私有部署、卓越安全的协同办公解决方案摘要📑引言 🌟正文📚一、ONLYOFFICE文档概述 📊二、ONLYOFFI…...
Mar 14 | Datawhale 01~04 打卡 | Leetcode面试下
第一阶段主要就是学习字符串的处理和二叉树的遍历。前一段时间学习了二叉树的遍历,记忆还比较深刻,这几个题还是比较轻松的做出来了;但是像Longest Palindromic Substring这样的题除了简单的字符串处理(回文判断)&…...

【计算机网络】什么是http?
目录 前言 1. 什么是HTTP协议? 2. 为什么使用HTTP协议? 3. HTTP协议通信过程 4. 什么是url? 5. HTTP报文 5.1 请求报文 5.2 响应报文 6. HTTP请求方式 7. HTTP头部字段 8. HTTP状态码 9. 连接管理 长连接与短连接 管线化连接…...

【python开发】并发编程(上)
并发编程(上) 一、进程和线程(一)多线程(二)多进程(三)GIL锁 二、多线程开发(一)t.start()(二)t.join()(三)t.…...
用c语言实现扫雷游戏
文章目录 概要整体架构流程代码的实现小结 概要 学习了c语言后,我们可以通过c语言写一些小程序,然后我们这篇文章主要是,扫雷游戏的一步一步游戏。 整体架构流程 扫雷网页版 根据上面网页版的基础扫雷可以看出是一个99的格子,…...
LeetCode 2882.删去重复的行
DataFrame customers ------------------- | Column Name | Type | ------------------- | customer_id | int | | name | object | | email | object | ------------------- 在 DataFrame 中基于 email 列存在一些重复行。 编写一个解决方案,删除这些重复行&#…...

对OceanBase进行 sysbench 压测前,如何用 obdiag巡检
有一些用户想对 OceanBase 进行 sysbench 压测,并向我询问是否需要对数据库的各种参数进行调整。我想起有一个工具 obdiag ,具备对集群进行巡检的功能。因此,我正好借此机会试用一下这个工具。 obdiag 功能的比较丰富,详细情况可参…...
每天学习几道面试题|Kafka架构设计类
文章目录 1. Kafka 是如何保证高可用性和容错性的?2. Kafka 的存储机制是怎样的?它是如何处理大量数据的?3. Kafka 如何处理消费者的消费速率低于生产者的生产速率?4. Kafka 集群中的 Controller 是什么?它的作用是什么…...

.rmallox勒索病毒解密方法|勒索病毒解决|勒索病毒恢复|数据库修复
导言: 近年来,勒索病毒的威胁日益增加,其中一种名为.rmallox的勒索病毒备受关注。这种病毒通过加密文件并勒索赎金来威胁受害者。本文将介绍.rmallox勒索病毒的特点,以及如何恢复被其加密的数据文件,并提供预防措施&a…...
安卓性能优化面试题 11-15
11. 简述APK安装包瘦身方案 ?(1):剔 除掉冗余的代码与不必要的jar包;具体来讲的话,我们可以使用SDK集成的ProGuard混淆工具,它可以在编译时检查并删除未使用的类、字段、方法 和属性,它会遍历所有代码找到无用处的代码,所有那些不可达的代码都会在生成最终apk文件之前被…...

Python错题集-9PermissionError:[Errno 13] (权限错误)
1问题描述 Traceback (most recent call last): File "D:\pycharm\projects\5-《Python数学建模算法与应用》程序和数据\02第2章 Python使用入门\ex2_38_1.py", line 9, in <module> fpd.ExcelWriter(data2_38_3.xlsx) #创建文件对象 File "D:…...
QT TCP通信介绍
QT是一个跨平台的C应用程序开发框架,它提供了一套完整的工具和库,用于开发各种类型的应用程序,包括图形用户界面(GUI)应用程序、命令行工具、网络应用程序等。QT提供了丰富的功能和类来简化网络通信的开发,其中包括TCP通信。 TCP…...

保姆级教学!微信小程序设计全攻略!
微信小程序开启了互联网软件的新使用模式。在各种微信小程序争相抢占流量的同时,如何设计微信小程序?让用户感到舒适是设计师在产品设计初期应该考虑的问题。那么如何做好微信小程序的设计呢?即时设计总结了以下设计指南,希望对准…...
日期差值的计算
1、枚举所有数值进行日期判断 时间复杂度是o(n)的,比较慢,单实例能凑合用,多实例的话时间复杂度有点高。 核心代码就是判断某个八位数能否表示一个日期。 static int[] month {0,31,28,31,30,31,30,31,31,30,31,30,31};static String a, b…...

为什么需要Occupancy?
1.能够得到3D的占用信息 在基于BEV (鸟瞰图) 的2D预测模型中,我们通常仅具有二维平面(x和y坐标)上的信息。这种方法对于很多应用场景来说已经足够,但它并不考虑物体在垂直方向(z轴)上的分布。这限制了模型的…...

SSA优化最近邻分类预测(matlab代码)
SSA-最近邻分类预测matlab代码 麻雀搜索算法(Sparrow Search Algorithm, SSA)是一种新型的群智能优化算法,在2020年提出,主要是受麻雀的觅食行为和反捕食行为的启发。 数据为Excel分类数据集数据。 数据集划分为训练集、验证集、测试集,比例为8&#…...

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式
一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明:假设每台服务器已…...

RocketMQ延迟消息机制
两种延迟消息 RocketMQ中提供了两种延迟消息机制 指定固定的延迟级别 通过在Message中设定一个MessageDelayLevel参数,对应18个预设的延迟级别指定时间点的延迟级别 通过在Message中设定一个DeliverTimeMS指定一个Long类型表示的具体时间点。到了时间点后…...
IGP(Interior Gateway Protocol,内部网关协议)
IGP(Interior Gateway Protocol,内部网关协议) 是一种用于在一个自治系统(AS)内部传递路由信息的路由协议,主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...

智能在线客服平台:数字化时代企业连接用户的 AI 中枢
随着互联网技术的飞速发展,消费者期望能够随时随地与企业进行交流。在线客服平台作为连接企业与客户的重要桥梁,不仅优化了客户体验,还提升了企业的服务效率和市场竞争力。本文将探讨在线客服平台的重要性、技术进展、实际应用,并…...

1.3 VSCode安装与环境配置
进入网址Visual Studio Code - Code Editing. Redefined下载.deb文件,然后打开终端,进入下载文件夹,键入命令 sudo dpkg -i code_1.100.3-1748872405_amd64.deb 在终端键入命令code即启动vscode 需要安装插件列表 1.Chinese简化 2.ros …...

多种风格导航菜单 HTML 实现(附源码)
下面我将为您展示 6 种不同风格的导航菜单实现,每种都包含完整 HTML、CSS 和 JavaScript 代码。 1. 简约水平导航栏 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport&qu…...
【Nginx】使用 Nginx+Lua 实现基于 IP 的访问频率限制
使用 NginxLua 实现基于 IP 的访问频率限制 在高并发场景下,限制某个 IP 的访问频率是非常重要的,可以有效防止恶意攻击或错误配置导致的服务宕机。以下是一个详细的实现方案,使用 Nginx 和 Lua 脚本结合 Redis 来实现基于 IP 的访问频率限制…...

并发编程 - go版
1.并发编程基础概念 进程和线程 A. 进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。B. 线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。C.一个进程可以创建和撤销多个线程;同一个进程中…...

解读《网络安全法》最新修订,把握网络安全新趋势
《网络安全法》自2017年施行以来,在维护网络空间安全方面发挥了重要作用。但随着网络环境的日益复杂,网络攻击、数据泄露等事件频发,现行法律已难以完全适应新的风险挑战。 2025年3月28日,国家网信办会同相关部门起草了《网络安全…...
k8s从入门到放弃之HPA控制器
k8s从入门到放弃之HPA控制器 Kubernetes中的Horizontal Pod Autoscaler (HPA)控制器是一种用于自动扩展部署、副本集或复制控制器中Pod数量的机制。它可以根据观察到的CPU利用率(或其他自定义指标)来调整这些对象的规模,从而帮助应用程序在负…...