当前位置: 首页 > news >正文

RabbitMQ学习总结-消息的可靠性

保证MQ消息的可靠性,主要从三个方面:发送者确认可靠性,MQ确认可靠性,消费者确认可靠性。

1.发送者可靠性:主要依赖于发送者重试机制,发送者确认机制;

发送者重试机制,其实就是配置文件配置重试规则,当消息发送失败后,会根据配置的重试次数,进行多次发送重试,如代码:

spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数

发送者确认机制:则是依赖于消息的回执,这其中包括发送者回执,和消费者回执两种,但是这种回执都比较耗性能,会导致消息消费的很慢。并且,这也是需要在配置文件中做配置的:

spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型publisher-returns: true # 开启publisher return机制

并且还要有代码的实现,这种方式极大的影响了性能,:

@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("触发return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}
@Test
void testPublisherConfirm() {// 1.创建CorrelationDataCorrelationData cd = new CorrelationData();// 2.给Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 2.1.Future发生异常时的处理逻辑,基本不会触发log.error("send message fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执log.debug("发送消息成功,收到 ack!");}else{ // result.getReason(),String类型,返回nack时的异常描述log.error("发送消息失败,收到 nack, reason : {}", result.getReason());}}});// 3.发送消息rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}

2.MQ自身的可靠性:交换机/队列/消息都实现持久化,消息不会丢失,如果是在项目中通过代码创建的交换机/队列/消息,spring默认就是持久化的,如果在mq的客户端手工配置,那就要选定各个参数了。持久化后的消息会直接进入磁盘,不在经过内存了,正常来讲有IO的操作会慢才对,但是在实际的操作中却是非常快。

MQ队列最怕的就是消息积压,导致内存溢出。在3.12版本以后,MQ直接默认就是Laz懒惰队列的模式了,这个模式会直接加载到磁盘,当用到消息的时候,会从磁盘加载到内存,磁盘空间很大,支持数百万级别的存储,所以内存溢出的可能性就会大大降低。我们可以在mq客户端手动设置为lazy队列,也可以在代码中直接实现,代码如下:

@RabbitListener(queuesToDeclare = @Queue(name = "lazy.queue",durable = "true",arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){log.info("接收到 lazy.queue的消息:{}", msg);
}

3.消费者的可靠性:

3.1消费者消费消息后,向MQ发送回执,让MQ知道消息是否正常被消费了,目前回执有三种:

ack:成功处理了消息,MQ从队列中就会删除消息,正常。

nack:失败处理了消息,MQ需要再次投递消息,这会出现一直重试的问题。

reject:消息失败,并拒绝了消息,并且从队列中删除了消息。这个消息被删除了,岂不是数据就丢失了。

对于以上三种回执,基本回执都是固定的,AMQP提供了消息确认的方式,不用写代码,配置就可以,配置有三种:none-配置它失败了,消息会被删除,auto-失败了,消息会回到MQ重新投递,不会丢失,不会被删除,manual-太麻烦,算了。

不过,对于auto的配置,对于返回的异常,会有两种判断:1,如果是业务异常,会自动返回nack

如果是消息处理或者校验异常,会直接进行reject

spring:rabbitmq:listener:simple:acknowledge-mode: auto

3.2 生产者有重试机制,消费者也有重试机制,但是,对于消费者的重试,如果一直失败,那就要有一定的策略,可以把这个失败的消息放到另一个交换机上,后续人工进行干预,这样可以保证消息不丢失。

对于消费者的重试配置:

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

如何把消息发送到另一个交换机上呢?

在消费者服务定义一个处理失败消息队列的交换机,这样就可以把消息存储过去了

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

4.业务的幂等性判断

4.1.对于一条MQ消息,为了防止被重复消费,可以做一个唯一的msgID,当消费的时候可以先检查下这个ID,如果已经消费过了,那就不能再消费了,一定程度上可以避免被重复消费,代码如下:

@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jjmc.setCreateMessageIds(true);return jjmc;
}

4.2.还有一种保证消费唯一性的就是业务上的判断,当需要消费消息的时候,可以先提前去查询下需要消费消息的状态,如果状态已经发生了改变,自然也不用再去消费这条消息了

5.对于以上所有保证消息可靠性的方案,其实都不能完全保证,最终需要一个兜底的方案,兜底方案我们可以采取一个定时任务的方式,定时轮询检查消息是否消费。比如时间间隔多少秒进行一次轮询检查,这种方式我们可以理解为主动查询。这种兜底很大程度上可以保证业务上的一致性。

相关文章:

RabbitMQ学习总结-消息的可靠性

保证MQ消息的可靠性&#xff0c;主要从三个方面&#xff1a;发送者确认可靠性&#xff0c;MQ确认可靠性&#xff0c;消费者确认可靠性。 1.发送者可靠性&#xff1a;主要依赖于发送者重试机制&#xff0c;发送者确认机制&#xff1b; 发送者重试机制&#xff0c;其实就是配置…...

2024蓝桥杯每日一题(BFS)

备战2024年蓝桥杯 -- 每日一题 Python大学A组 试题一&#xff1a;母亲的奶牛 试题二&#xff1a;走迷宫 试题三&#xff1a;八数码1 试题四&#xff1a;全球变暖 试题五&#xff1a;八数码2 试题一&#xff1a;母亲的奶牛 【题目描述】 农夫约…...

力扣思路题:最长特殊序列1

int findLUSlength(char * a, char * b){int alenstrlen(a),blenstrlen(b);if (strcmp(a,b)0)return -1;return alen>blen?alen:blen; }...

c# 的ref 和out

在C#中&#xff0c;ref和out是用于方法参数的关键字&#xff0c;它们都允许在方法调用中对参数进行修改。 ref关键字用于传递参数的引用。当使用ref关键字声明一个参数时&#xff0c;实际上是在告诉编译器此参数在调用方法之前必须被赋值。ref参数传递的是参数的引用地址&…...

ONLYOFFICE文档8.0全新发布:私有部署、卓越安全的协同办公解决方案

ONLYOFFICE文档8.0全新发布&#xff1a;私有部署、卓越安全的协同办公解决方案 文章目录 ONLYOFFICE文档8.0全新发布&#xff1a;私有部署、卓越安全的协同办公解决方案摘要&#x1f4d1;引言 &#x1f31f;正文&#x1f4da;一、ONLYOFFICE文档概述 &#x1f4ca;二、ONLYOFFI…...

Mar 14 | Datawhale 01~04 打卡 | Leetcode面试下

第一阶段主要就是学习字符串的处理和二叉树的遍历。前一段时间学习了二叉树的遍历&#xff0c;记忆还比较深刻&#xff0c;这几个题还是比较轻松的做出来了&#xff1b;但是像Longest Palindromic Substring这样的题除了简单的字符串处理&#xff08;回文判断&#xff09;&…...

【计算机网络】什么是http?

​ 目录 前言 1. 什么是HTTP协议&#xff1f; 2. 为什么使用HTTP协议&#xff1f; 3. HTTP协议通信过程 4. 什么是url&#xff1f; 5. HTTP报文 5.1 请求报文 5.2 响应报文 6. HTTP请求方式 7. HTTP头部字段 8. HTTP状态码 9. 连接管理 长连接与短连接 管线化连接…...

【python开发】并发编程(上)

并发编程&#xff08;上&#xff09; 一、进程和线程&#xff08;一&#xff09;多线程&#xff08;二&#xff09;多进程&#xff08;三&#xff09;GIL锁 二、多线程开发&#xff08;一&#xff09;t.start()&#xff08;二&#xff09;t.join()&#xff08;三&#xff09;t.…...

用c语言实现扫雷游戏

文章目录 概要整体架构流程代码的实现小结 概要 学习了c语言后&#xff0c;我们可以通过c语言写一些小程序&#xff0c;然后我们这篇文章主要是&#xff0c;扫雷游戏的一步一步游戏。 整体架构流程 扫雷网页版 根据上面网页版的基础扫雷可以看出是一个99的格子&#xff0c;…...

LeetCode 2882.删去重复的行

DataFrame customers ------------------- | Column Name | Type | ------------------- | customer_id | int | | name | object | | email | object | ------------------- 在 DataFrame 中基于 email 列存在一些重复行。 编写一个解决方案&#xff0c;删除这些重复行&#…...

对OceanBase进行 sysbench 压测前,如何用 obdiag巡检

有一些用户想对 OceanBase 进行 sysbench 压测&#xff0c;并向我询问是否需要对数据库的各种参数进行调整。我想起有一个工具 obdiag &#xff0c;具备对集群进行巡检的功能。因此&#xff0c;我正好借此机会试用一下这个工具。 obdiag 功能的比较丰富&#xff0c;详细情况可参…...

每天学习几道面试题|Kafka架构设计类

文章目录 1. Kafka 是如何保证高可用性和容错性的&#xff1f;2. Kafka 的存储机制是怎样的&#xff1f;它是如何处理大量数据的&#xff1f;3. Kafka 如何处理消费者的消费速率低于生产者的生产速率&#xff1f;4. Kafka 集群中的 Controller 是什么&#xff1f;它的作用是什么…...

.rmallox勒索病毒解密方法|勒索病毒解决|勒索病毒恢复|数据库修复

导言&#xff1a; 近年来&#xff0c;勒索病毒的威胁日益增加&#xff0c;其中一种名为.rmallox的勒索病毒备受关注。这种病毒通过加密文件并勒索赎金来威胁受害者。本文将介绍.rmallox勒索病毒的特点&#xff0c;以及如何恢复被其加密的数据文件&#xff0c;并提供预防措施&a…...

安卓性能优化面试题 11-15

11. 简述APK安装包瘦身方案 ?(1):剔 除掉冗余的代码与不必要的jar包;具体来讲的话,我们可以使用SDK集成的ProGuard混淆工具,它可以在编译时检查并删除未使用的类、字段、方法 和属性,它会遍历所有代码找到无用处的代码,所有那些不可达的代码都会在生成最终apk文件之前被…...

Python错题集-9PermissionError:[Errno 13] (权限错误)

1问题描述 Traceback (most recent call last): File "D:\pycharm\projects\5-《Python数学建模算法与应用》程序和数据\02第2章 Python使用入门\ex2_38_1.py", line 9, in <module> fpd.ExcelWriter(data2_38_3.xlsx) #创建文件对象 File "D:…...

QT TCP通信介绍

QT是一个跨平台的C应用程序开发框架&#xff0c;它提供了一套完整的工具和库&#xff0c;用于开发各种类型的应用程序&#xff0c;包括图形用户界面(GUI)应用程序、命令行工具、网络应用程序等。QT提供了丰富的功能和类来简化网络通信的开发&#xff0c;其中包括TCP通信。 TCP…...

保姆级教学!微信小程序设计全攻略!

微信小程序开启了互联网软件的新使用模式。在各种微信小程序争相抢占流量的同时&#xff0c;如何设计微信小程序&#xff1f;让用户感到舒适是设计师在产品设计初期应该考虑的问题。那么如何做好微信小程序的设计呢&#xff1f;即时设计总结了以下设计指南&#xff0c;希望对准…...

日期差值的计算

1、枚举所有数值进行日期判断 时间复杂度是o(n)的&#xff0c;比较慢&#xff0c;单实例能凑合用&#xff0c;多实例的话时间复杂度有点高。 核心代码就是判断某个八位数能否表示一个日期。 static int[] month {0,31,28,31,30,31,30,31,31,30,31,30,31};static String a, b…...

为什么需要Occupancy?

1.能够得到3D的占用信息 在基于BEV (鸟瞰图) 的2D预测模型中&#xff0c;我们通常仅具有二维平面&#xff08;x和y坐标&#xff09;上的信息。这种方法对于很多应用场景来说已经足够&#xff0c;但它并不考虑物体在垂直方向&#xff08;z轴&#xff09;上的分布。这限制了模型的…...

SSA优化最近邻分类预测(matlab代码)

SSA-最近邻分类预测matlab代码 麻雀搜索算法(Sparrow Search Algorithm, SSA)是一种新型的群智能优化算法&#xff0c;在2020年提出&#xff0c;主要是受麻雀的觅食行为和反捕食行为的启发。 数据为Excel分类数据集数据。 数据集划分为训练集、验证集、测试集,比例为8&#…...

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式

一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明&#xff1a;假设每台服务器已…...

RocketMQ延迟消息机制

两种延迟消息 RocketMQ中提供了两种延迟消息机制 指定固定的延迟级别 通过在Message中设定一个MessageDelayLevel参数&#xff0c;对应18个预设的延迟级别指定时间点的延迟级别 通过在Message中设定一个DeliverTimeMS指定一个Long类型表示的具体时间点。到了时间点后&#xf…...

IGP(Interior Gateway Protocol,内部网关协议)

IGP&#xff08;Interior Gateway Protocol&#xff0c;内部网关协议&#xff09; 是一种用于在一个自治系统&#xff08;AS&#xff09;内部传递路由信息的路由协议&#xff0c;主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...

智能在线客服平台:数字化时代企业连接用户的 AI 中枢

随着互联网技术的飞速发展&#xff0c;消费者期望能够随时随地与企业进行交流。在线客服平台作为连接企业与客户的重要桥梁&#xff0c;不仅优化了客户体验&#xff0c;还提升了企业的服务效率和市场竞争力。本文将探讨在线客服平台的重要性、技术进展、实际应用&#xff0c;并…...

1.3 VSCode安装与环境配置

进入网址Visual Studio Code - Code Editing. Redefined下载.deb文件&#xff0c;然后打开终端&#xff0c;进入下载文件夹&#xff0c;键入命令 sudo dpkg -i code_1.100.3-1748872405_amd64.deb 在终端键入命令code即启动vscode 需要安装插件列表 1.Chinese简化 2.ros …...

多种风格导航菜单 HTML 实现(附源码)

下面我将为您展示 6 种不同风格的导航菜单实现&#xff0c;每种都包含完整 HTML、CSS 和 JavaScript 代码。 1. 简约水平导航栏 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport&qu…...

【Nginx】使用 Nginx+Lua 实现基于 IP 的访问频率限制

使用 NginxLua 实现基于 IP 的访问频率限制 在高并发场景下&#xff0c;限制某个 IP 的访问频率是非常重要的&#xff0c;可以有效防止恶意攻击或错误配置导致的服务宕机。以下是一个详细的实现方案&#xff0c;使用 Nginx 和 Lua 脚本结合 Redis 来实现基于 IP 的访问频率限制…...

并发编程 - go版

1.并发编程基础概念 进程和线程 A. 进程是程序在操作系统中的一次执行过程&#xff0c;系统进行资源分配和调度的一个独立单位。B. 线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。C.一个进程可以创建和撤销多个线程;同一个进程中…...

解读《网络安全法》最新修订,把握网络安全新趋势

《网络安全法》自2017年施行以来&#xff0c;在维护网络空间安全方面发挥了重要作用。但随着网络环境的日益复杂&#xff0c;网络攻击、数据泄露等事件频发&#xff0c;现行法律已难以完全适应新的风险挑战。 2025年3月28日&#xff0c;国家网信办会同相关部门起草了《网络安全…...

k8s从入门到放弃之HPA控制器

k8s从入门到放弃之HPA控制器 Kubernetes中的Horizontal Pod Autoscaler (HPA)控制器是一种用于自动扩展部署、副本集或复制控制器中Pod数量的机制。它可以根据观察到的CPU利用率&#xff08;或其他自定义指标&#xff09;来调整这些对象的规模&#xff0c;从而帮助应用程序在负…...