RabbitMQ08_保证消息可靠性
保证消息可靠性
- 一、生产者可靠性
- 1、生产者重连机制(防止网络波动)
- 2、生产者确认机制
- Publisher Return 确认机制
- Publisher Confirm 确认机制
- 二、MQ 可靠性
- 1、数据持久化
- 交换机、队列持久化
- 消息持久化
- 2、Lazy Queue 惰性队列
- 三、消费者可靠性
- 1、消费者确认机制
- 2、失败重试机制
- 3、业务幂等性
一、生产者可靠性
1、生产者重连机制(防止网络波动)
spring:rabbitmq:connection-timeout: 1s #设置MQ的连接超时时间template:retry:enabled: true #开启超时重试机制(默认是false)initial-interval: 1000ms #失败后的初始等待时间multiplier: 1 #失败后下次的等待时长倍数,下次等待时长= initial-interval * multipliermax-attempts: 3 #最大重试次数
2、生产者确认机制
Publisher Return 确认机制
消息投递到MQ但是MQ路由失败,MQ返回路由失败原因
spring:rabbitmq:publisher-returns: true # 开启publisher return机制
@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("触发return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}
Publisher Confirm 确认机制
临时消息投递到了MQ且入队成功,返回ACK
持久消息投递到了MQ且入队完成持久化,返回ACK
消息投递异常,返回NACK
spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
publisher-confirm-type 的三种类型
- none 关闭confirm机制
- simple 同步阻塞等待MQ回执消息
- correlated MQ异步回调返回回执消息
@Test
void testPublisherConfirm() throws InterruptedException {CorrelationData cd = new CorrelationData();cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() { @Overridepublic void onFailure(Throwable ex) {// Future发生异常时的处理逻辑,基本不会触发log.error("handle message ack fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){ log.debug("发送消息成功,收到 ack!");} else {log.error("发送消息失败,收到 nack, reason : {}", result.getReason());//TODO 重试发送}}});// 3.发送消息rabbitTemplate.convertAndSend("hmall.direct", "red1", "hello", cd);
}
二、MQ 可靠性
1、数据持久化
交换机、队列持久化
默认创建时就是持久化的(Durability = Durable)

消息持久化
RabbitTemplate 的 convertAndSend() 方法发送的消息默认就是持久化的(delivery mode = 2)
如果非要发送一个非持久化的消息,需要在调用 rabbitTemplate.convertAndSend() 方法时,显式地设置消息的 MessageProperties,并将 deliveryMode 设置为 1 (非持久化)
2、Lazy Queue 惰性队列
Lazy Queue是一种以惰性模式运行的队列,它尽可能地将消息存储在磁盘上,而不是内存中。只有当消费者需要消费消息时,这些消息才会被加载到内存中,效率比传统队列高。
3.12版本后,所有队列都是Lazy Queue模式,无法更改。
三、消费者可靠性
1、消费者确认机制
消费者回执消息类型
ack消费者处理成功,RabbitMQ 将从队列中删除消息nack消费者处理失败,RabbitMQ 需再次投递消息reject消费者拒绝处理,RabbitMQ 将从队列中删除消息
SpringAMQP 消息监听器的三种确认模式
none不处理。即消费者收到消息后立刻返回ack,消息会丢失,非常不安全。manual手动模式。业务代码手动调用api发送 ack 或 reject,存在业务入侵,但更灵活。auto自动模式(默认)。通过 AOP 对消息处理方法做环绕增强,正常返回ack,出现业务异常返回nack,出现消息处理或校验异常返回reject
spring:rabbitmq:listener:simple:acknowledge-mode: auto
2、失败重试机制
消费者处理消息出现异常时利用本地重试,而不是无限的requeue到mq,让mq重新投递给消费者
spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试(默认是关闭的)initial-interval: 1000ms # 初始的失败等待时长为1秒multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现。
失败消息处理策略
- RejectAndDontRequeueRecoverer:默认实现,重试耗尽后直接reject,丢弃消息。
- ImmediateRequeueMessageRecoverer:重试耗尽后返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后将失败消息投递到指定的交换机
以第三种失败消息处理策略为例,配置方式如下:
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
3、业务幂等性
由于存在各种确认和重试机制,消费者有重复消费消息的可能性,因此要保证业务的幂等性。
保证业务幂等性的方式如下:
- 方案一:发送消息时生成唯一消息ID,投递给消费者,消费者接收到消息,业务处理成功后将消息ID保存到数据库,下次根据消息ID去数据库查询判断是否已处理,如果已处理则放弃处理。
@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jjmc.setCreateMessageIds(true);return jjmc;
}
- 方案二:结合业务逻辑,基于业务本身做判断。
相关文章:
RabbitMQ08_保证消息可靠性
保证消息可靠性 一、生产者可靠性1、生产者重连机制(防止网络波动)2、生产者确认机制Publisher Return 确认机制Publisher Confirm 确认机制 二、MQ 可靠性1、数据持久化交换机、队列持久化消息持久化 2、Lazy Queue 惰性队列 三、消费者可靠性1、消费者…...
恶意Bot流量识别分析实践
1、摘要 随着互联网的发展,自动化工具和脚本(Bots)的使用越来越普遍。虽然一些善意 Bots 对于网站的正常运行和数据采集至关重要,但恶意 Bots 可能会对网站带来负面影响,如爬取敏感信息、恶意注册、刷流量等。因此&am…...
Java2 实用教程(第6版)习题2 第四题
【源文件的命名与书中的不同】 四、阅读程序题 1、上机运行下列程序,注意观察输出的结果。 public class E2_1 {public static void main(String args[]){for(int i20302;i<20322;i){System.out.println((char) i);}} } 运行结果: 低 住 佐 佑 佒…...
HashMap和ConcurrentHashMap的区别
1.是什么 HashMap和ConcurrentHashMap都是Java集合框架中的成员,它们用于存储键值对,但它们在并发场景下的表现和行为有很大的不同。以下是它们之间的一些主要区别: 1. 并发安全性 HashMap: HashMap不是线程安全的。如果多个线程同时访问Has…...
css 下拉框展示:当hover的时候展示下拉框 z-index的用法解释
代码如下: <template><div class"outer"><div class"left"></div><div class"aTest2"><div class"box">显示方框</div><div class"aTest3"></div></…...
spring装配笔记
spring装配是个大课题,能懂一点是一点吧。 关于代码链路,最后的方式就是倒序摸索,正序那么多逻辑,没有一百万也差不多少,所以就用倒序。 .(点号)和#井号是一个意思,下面代码可能不详细区分,复…...
vscode【实用插件】Notes 便捷做笔记
安装 在 vscode 插件市场的搜索 Notes点 安装 安装成功后,vscode 左侧栏会出现 使用 初次使用 需先选择一个本地目录 重启 vscode 后,得到 切换笔记目录 新建笔记 快捷键为 Alt N 默认会创建 .md 文件 配合插件 Markdown Preview Enhanced 预览 .md…...
中间件:maxwell、canal
文章目录 1、底层原理:基于mysql的bin log日志实现的:把自己伪装成slave2、bin log 日志有三种模式:2.1、statement模式:2.2、row模式:2.3、mixed模式: 3、maxwell只支持 row 模式:4、maxwell介…...
postman控制变量和常用方法
1、添加环境: 2、环境添加变量: 3、配置不同的环境:local、dev、sit、uat、pro 4、 接口调用 5、清除cookie方法: 6、下载文件方法:...
Spring Boot 中整合 Kafka
在 Spring Boot 中整合 Kafka 非常简单,Spring Kafka 提供了丰富的支持,使得我们可以轻松地实现 Kafka 的生产者和消费者。下面是一个简单的 Spring Boot 整合 Kafka 的示例。 1. 添加依赖 首先,在 pom.xml 中添加 Spring Kafka 的依赖&#…...
什么是开放式耳机?具有什么特色?非常值得入手的蓝牙耳机推荐
开放式耳机是当下较为热门的一种耳机类型。它具有以下特点: 设计结构: 呈现开放式的构造,不会完全堵住耳道。如此一来,外界声音能够较容易地被使用者听到,在使用耳机时可以保持对周围环境的察觉。比如在户外…...
编译 FFmpeg 以支持 AV1 编解码器以及其他硬件加速选项(如 NVENC、VAAPI 等)
步骤 1: 安装必要的依赖 sudo apt update sudo apt install -y \autoconf automake build-essential cmake git libass-dev libfreetype6-dev \libsdl2-dev libtool libva-dev libvdpau-dev libxcb1-dev libxcb-shm0-dev \libxcb-xfixes0-dev pkg-config texinfo wget zlib1g-…...
解释一下Java中的多线程。如何创建一个新的线程?
在Java中,多线程是一种机制,允许一个程序同时执行多个任务或处理。每个任务被称为一个线程。 这种并行执行可以极大地提高应用程序的效率和响应速度。 例如,在开发一个桌面应用程序时,你可以使用一个线程来更新用户界面…...
Java语言程序设计基础篇_编程练习题**18.30 (找出单词)
题目:**18.30 (找出单词) 编写一个程序,递归地找出某个目录下的所有文件中某个单词出现的次数。从命令行如下传递参数: java Exercise18_30 dirName word 习题思路 (读取路径方法)和18.28题差不多,把找…...
MyBatis中 #{} 和 ${} 的区别
1. #{id}(参数占位符) 作用: 使用 #{id} 时,MyBatis 会将 id 参数绑定为 JDBC 的参数。这种方式能够有效防止 SQL 注入攻击,因为它会进行参数的预处理,将参数值作为数据类型的绑定,而不是直接插入到 SQL 语…...
Android Perfetto 学习
1、如何抓取性能日志 方式1、通过手机里的System Tracing抓取 1、点击Settings->System->Developer options->System Tracing->Record trace 打开 2、操作完成后,点击Settings->System->Developer options->System Tracing->Record trace…...
ES数据的删除与备份
背景 需要删除索引下满足指定条件的文档数据,并将删除的数据进行备份。 操作步骤 新建索引 该索引结构与映射关系与原索引一致 查看原索引设置 GET /tb/_settings结果: {"tb" : {"settings" : {"index" : {"ro…...
论文解读《Object-Centric Learning with Slot Attention》
系列文章目录 文章目录 系列文章目录论文细节理解 1. 研究背景2. 论文贡献3. 方法框架3.1 Slot Attention模块3.2 无监督对象发现架构 4. 研究思路5. 实验6. 限制 论文细节理解 supervised property prediction tasks是什么? Supervised property prediction tasks…...
YOLOv8+注意力机制+PyQt5玉米病害检测系统完整资源集合
资源包含可视化的玉米病害检测系统,基于最新的YOLOv8注意力机制训练的玉米病害检测模型,和基于PyQt5制作的可视玉米病害系统,包含登陆页面和检测页面,该系统可自动检测和识别图片或视频当中出现的七类玉米病害:矮花叶病…...
tcp、udp通信调试工具Socket Tool
tcp、udp通信调试工具Socket Tool ]...
电脑插入多块移动硬盘后经常出现卡顿和蓝屏
当电脑在插入多块移动硬盘后频繁出现卡顿和蓝屏问题时,可能涉及硬件资源冲突、驱动兼容性、供电不足或系统设置等多方面原因。以下是逐步排查和解决方案: 1. 检查电源供电问题 问题原因:多块移动硬盘同时运行可能导致USB接口供电不足&#x…...
如何在最短时间内提升打ctf(web)的水平?
刚刚刷完2遍 bugku 的 web 题,前来答题。 每个人对刷题理解是不同,有的人是看了writeup就等于刷了,有的人是收藏了writeup就等于刷了,有的人是跟着writeup做了一遍就等于刷了,还有的人是独立思考做了一遍就等于刷了。…...
Device Mapper 机制
Device Mapper 机制详解 Device Mapper(简称 DM)是 Linux 内核中的一套通用块设备映射框架,为 LVM、加密磁盘、RAID 等提供底层支持。本文将详细介绍 Device Mapper 的原理、实现、内核配置、常用工具、操作测试流程,并配以详细的…...
均衡后的SNRSINR
本文主要摘自参考文献中的前两篇,相关文献中经常会出现MIMO检测后的SINR不过一直没有找到相关数学推到过程,其中文献[1]中给出了相关原理在此仅做记录。 1. 系统模型 复信道模型 n t n_t nt 根发送天线, n r n_r nr 根接收天线的 MIMO 系…...
LLMs 系列实操科普(1)
写在前面: 本期内容我们继续 Andrej Karpathy 的《How I use LLMs》讲座内容,原视频时长 ~130 分钟,以实操演示主流的一些 LLMs 的使用,由于涉及到实操,实际上并不适合以文字整理,但还是决定尽量整理一份笔…...
【Nginx】使用 Nginx+Lua 实现基于 IP 的访问频率限制
使用 NginxLua 实现基于 IP 的访问频率限制 在高并发场景下,限制某个 IP 的访问频率是非常重要的,可以有效防止恶意攻击或错误配置导致的服务宕机。以下是一个详细的实现方案,使用 Nginx 和 Lua 脚本结合 Redis 来实现基于 IP 的访问频率限制…...
在鸿蒙HarmonyOS 5中使用DevEco Studio实现企业微信功能
1. 开发环境准备 安装DevEco Studio 3.1: 从华为开发者官网下载最新版DevEco Studio安装HarmonyOS 5.0 SDK 项目配置: // module.json5 {"module": {"requestPermissions": [{"name": "ohos.permis…...
探索Selenium:自动化测试的神奇钥匙
目录 一、Selenium 是什么1.1 定义与概念1.2 发展历程1.3 功能概述 二、Selenium 工作原理剖析2.1 架构组成2.2 工作流程2.3 通信机制 三、Selenium 的优势3.1 跨浏览器与平台支持3.2 丰富的语言支持3.3 强大的社区支持 四、Selenium 的应用场景4.1 Web 应用自动化测试4.2 数据…...
tauri项目,如何在rust端读取电脑环境变量
如果想在前端通过调用来获取环境变量的值,可以通过标准的依赖: std::env::var(name).ok() 想在前端通过调用来获取,可以写一个command函数: #[tauri::command] pub fn get_env_var(name: String) -> Result<String, Stri…...
在鸿蒙HarmonyOS 5中使用DevEco Studio实现指南针功能
指南针功能是许多位置服务应用的基础功能之一。下面我将详细介绍如何在HarmonyOS 5中使用DevEco Studio实现指南针功能。 1. 开发环境准备 确保已安装DevEco Studio 3.1或更高版本确保项目使用的是HarmonyOS 5.0 SDK在项目的module.json5中配置必要的权限 2. 权限配置 在mo…...
