RabbitMQ学习(第二天)
文章目录
- 1、生产者可靠性
- ①、生产者重连
- ②、生产者确认
- 2、MQ可靠性
- ①、数据持久化
- ②、LazyQueue(惰性队列)
- 3、消费者可靠性
- ①、消费者确认
- ②、失败重试机制
- ③、保证业务幂等性
- 总结
之前的学习中,熟悉了java中搭建和操作RabbitMQ发送接收消息,熟悉使用之后,重点要关注一下面试中常考的点,以及工作经常遇到的问题。今天的学习主要从保证消息可靠性出发,保证可靠性分三部分,分别是生产者可靠性、MQ可靠性、消费者可靠性三部分出发。
1、生产者可靠性
①、生产者重连
有时候因为网络波动,可能导致客户端连接MQ失败,通过配置可以开启连接失败后的重试机制。
yml配置文件:
spring:rabbitmq:connection-timeout: 1s # 连接MQ的连接超时间template:retry:enabled: true # 开启模板的重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数
我们故意让它连接不上,结果如下:
当网络不稳定的时刻,利用重试机制可以有数据显示消息发送的成功率。不过SpringAMQP重试的重复机制是
阻塞式的重试,也就是说多次重试等待的过程是中,当前线程是被阻塞的,会影响到业务性能。
如果对于业务性能有要求,建议参考用重试机制。如果一定需要使用,请合理配置重试等待时长和重试次数,当然也
可以考虑异步用异步多线程来执行发送消息的代码。
②、生产者确认
RabbitMQ有Publisher Confirm和Publisher Return两种确认机制。开启确认机制后,在MQ成功收到消息后会返回确认消息给生产者。返回的结果有以下几种情况:
- 消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
- 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
- 持久消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
- 其它情况都会返回NACK,告知投递失败
spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,异步confirm类型publisher-returns: true # 开启publisher return机制
publisher-confim:
这里设置 correlated 表示MQ异步回调方式返回回执消息
**publisher-return: **
开启后,会返回路由失败消息。
@Slf4j
@Configuration
public class CommonConfig {@ResourceRabbitTemplate rabbitTemplate;@PostConstructpublic void setRabbitTemplate() {rabbitTemplate.setReturnsCallback(returnedMessage -> {log.error("收到消息的return callback: msg:{} exchange:{}, test:{}, key:{}, code:{}",returnedMessage.getMessage(), returnedMessage.getExchange(), returnedMessage.getReplyText(),returnedMessage.getRoutingKey(), returnedMessage.getReplyCode());});}
}
我们定义一个config类,里面添加一个处理publish-return消息的处理,
测试代码:
@Testpublic void testConfirmCallBack() throws InterruptedException {CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {log.debug("消息回调失败", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {log.debug("收到confim callback回执");if(result.isAck()) {log.debug("消息发送成功,收到ack");} else{log.debug("消息发送失败,收到nack, 原因:{}", result.getReason());}}});rabbitTemplate.convertAndSend("amq.direct", "red2", "hello", cd);Thread.sleep(2000);}
这里我们测试三种情况:
现有正确的交换机为 amq.direct,routingKey为red,cd逻辑如代码所示。
①、交换机与routingKey均正确,运行结果:
收到confim callback回执
消息发送成功,收到ack
②、交换机正确,routingKey不正确,运行结果:
收到confim callback回执
消息发送成功,收到ack
收到消息的return callback: msg:(Body:'"hello"' MessageProperties [headers={spring_returned_message_correlation=0a8a70fd-a66b-43a3-a681-324e46db7b79, __TypeId__=java.lang.String}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) exchange:amq.direct, test:NO_ROUTE, key:red2, code:312
③、交换机不正确,运行结果:
收到confim callback回执
消息发送失败,收到nack, 原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'amq.direct.123' in vhost '/', class-id=60, method-id=40)
关于生产者确认消息要会的几点:
-
生产者确认需要额外的网络和系统资源开销,尽量不要使用
-
如果一定要使用,无需开启Publisher-Return机制,因为一般路由失败是自己业务问题
-
对于nack消息可以有限次数重试,依然失败则记录异常消息
2、MQ可靠性
MO通常将消息保存在内存中,这样可以降低收发消息的延迟,但是会有一些问题:
- 最常见的问题就是可能因为各种原因宕机重启,而这极大可能会导致消息丢失。
- 消息堆积在内存中,如果消费者出故障,导致消息积压,引发MQ阻塞。
①、数据持久化
持久化分三部分:
- 交换机持久化
- 队列持久化,
- 消息持久化
前两个的话,Spring中创建时默认将队列和交换机都创建为持久化,当然,手动设置也可以,设置为Durable:
消息持久化也可以设置:
我们向他发送了1百万条非持久化消息:
可以看到其中凹下去的地方就是消息积压导致的。
发送了1百万条持久化消息:
可以看到此时不存在消息积压。
每次下降的时候,是在将数据写入到磁盘中,但是不会出现宕机为0的情况。
②、LazyQueue(惰性队列)
惰性队列的特征如下:
-
接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息)
-
消费者要消费消息时才会从磁盘中读取并加载到内存
-
支持数百万条的消息存储
-
在3.12版本后,所有队列都是Lazy Queue模式,无法更改。
控制台创建:
spring中创建:
@Bean
public Queue lazyQueue() {return QueueBuilder.durable("lazy.queue").lazy() // 开启Lazy模式.build();
}@RabbitListener(queuesToDeclare = @Queue(name = "lazy.queue",durable = "true",arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg) {log.info("接收到 lazy.queue 的消息: {}", msg);
}
向lazy.queue发送1百万条非持久消息:
可以看到,性能特别好,因为数据都是直接通过page out到磁盘。
3、消费者可靠性
①、消费者确认
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
ack:成功处理消息,RabbitMQ从队列中删除该消息
nack:消息处理失败,RabbitMQ需要再次投递消息
reject:消息处理失败并且拒绝该消息,RabbitMQ从队列中删除消息
我们在Spring中配置,auto代表Spring帮我们自动处理这三种情况:
spring:rabbitmq:simple:acknowledge-mode: auto
一般都是直接配置auto。
但是这样就会导致一个问题,如果程序中有异常,那么mq一直重试,这样的话,会导致资源的消耗,对系统造成大的压力,因此我们后面引入了失败重试机制。
②、失败重试机制
配置以下yml内容可以开启失败重试机制:
spring:rabbitmq:listener:simple:prefetch: 1retry:enabled: true #开启失败重试机制initial-interval: 1000ms #初始的失败重试等待时长为1秒multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3stateless: true # true无状态,false有状态,如果业务中包含事务,这里为false
失败消息处理策略:
在开启重试模式后,重试次数耗尽,如果消息仍然失败,则需要MessageRecoverer接口来处理,它包含三种不同的实现:
RejectAndDontRequeueRecoverer: 重试耗尽后,直接reject,丢弃消息。默认就是这种方式
ImmediateRequeueMessageRecoverer: 重试耗尽后,返回nack,消息重新入队
RepublishMessageRecoverer: 重试耗尽后,将失败消息投递到指定的交换机
收发消息定义:
@RabbitListener(queues = {"object.queue"})public void ObjectConsumer(String msg) {System.out.println(msg);throw new RuntimeException("......");}@Testpublic void testMessage() {Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();rabbitTemplate.convertAndSend("object.queue", message);}
java代码中实现失败消息处理:
@Slf4j
@Configuration
public class CommonConfig {@ResourceRabbitTemplate rabbitTemplate;@PostConstructpublic void setRabbitTemplate() {rabbitTemplate.setReturnsCallback(returnedMessage -> {log.error("收到消息的return callback: msg:{} exchange:{}, test:{}, key:{}, code:{}",returnedMessage.getMessage(), returnedMessage.getExchange(), returnedMessage.getReplyText(),returnedMessage.getRoutingKey(), returnedMessage.getReplyCode());});}
}
运行结果:
可以看到报错信息被发送到了error交换机。
③、保证业务幂等性
幂等是一个数学概念,用函数表达式来描述是:f(x) = f(f(x))。在程序开发中,则是指同一个业务,执行一次或多次的结果是一致的。
像查询一般都是幂等的,因为它的执行对于业务状态没有影响,差多少次都没事,就是幂等的。
像用户下单,退单涉及扣钱,退款操作,如果执行多次,会造成经济损失,就是非幂等的。
为了保证幂等性,我们通过一些方法来实现:
(1)、 唯一消息id
是给每个消息都设置一个唯一id,利用id区分是否重复消费:
-
每一条消息都生成一个唯一的id,与消息一起投递给消费者。
-
消费者接收到消息后根据自己的业务,业务处理成功后将消息ID保存到数据库库
-
如果下次又收到相同消息,去数据库查找判断是否存档,存档则为重复消费放弃处理。
@Bean
public MessageConverter messageConverter() {// 1. 定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2. 配置自动创建消息id,用于识别不同消息,也可以自己在业务中生成ID判断是否重复消费jjmc.setCreateMessageIds(true);return jjmc;
}
但是,这种方案对数据库压力太大,同时速度也相对慢很多,因此只做了解,只需要知道这种方式是jjmc内部帮你创建了一个UUID的随机值作id。
其它方式的话,需要我们结合实际项目在业务逻辑中进行操作来保证幂等性。
总结
最后,用两个问题来总结:
如何保证对服务与交易服务之间的订单状态一致性?
-
首先,支付服务会在用户支付成功后利用MQ消息通知交易服务,完成订单状态同步。
-
其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消息重确认、消费者失败重试等策略,确保消息投递和处理可靠性。同时也开启了MQ的持久化,避免因服务宕机导致消息丢失。
-
最后,我们还在交易服务端新增了业务幂等判断,避免重复消费。
如果交易服务消息处理失败,有没有什么兜底方案?
- 我们可以在交易服务设置定时任务,定期查询订单支付状态。这样即使用MQ通知失败,还可以用定时任务作为兜底方案,确保订单状态最终一致性。
即Spring Task中的@Schedule注解,定时扫描订单状态,用来弥补消息失败,兜底方案。
相关文章:

RabbitMQ学习(第二天)
文章目录 1、生产者可靠性①、生产者重连②、生产者确认 2、MQ可靠性①、数据持久化②、LazyQueue(惰性队列) 3、消费者可靠性①、消费者确认②、失败重试机制③、保证业务幂等性 总结 之前的学习中,熟悉了java中搭建和操作RabbitMQ发送接收消息,熟悉使用…...

【JS逆向基础】爬虫核心模块:request模块与包的概念
前言:这篇文章主要介绍JS逆向爬虫中最常用的request模块,然后引出一系列的模块的概念,当然Python中其他比较常用的还有很多模块,正是这些模块也可以称之为库的东西构成了Python强大的生态,使其几乎可以实现任何功能。下…...

LabVIEW燃气轮机测控系统
在能源需求不断增长以及生态环境保护备受重视的背景下,微型燃气轮机凭借其在经济性、可靠性、维护性及排放性等方面的显著优势,在航空航天、分布式发电等众多领域得到广泛应用。随着计算机技术的快速发展,虚拟仪器应运而生,LabVIE…...
【链表扫盲】FROM GPT
链表是一种线性数据结构,由节点(Node)组成,每个节点包含两个部分: 数据域(data): 存储节点值。指针域(next): 存储指向下一个节点的引用。 链表…...

QT | 常用控件
前言 💓 个人主页:普通young man-CSDN博客 ⏩ 文章专栏:C_普通young man的博客-CSDN博客 ⏩ 本人giee: 普通小青年 (pu-tong-young-man) - Gitee.com 若有问题 评论区见📝 🎉欢迎大家点赞👍收藏⭐文章 —…...
Python学习之路(八)-多线程和多进程浅析
在 Python 中,多线程(Multithreading) 和 多进程(Multiprocessing) 是实现并发编程的两种主要方式。它们各有优劣,适用于不同的场景。 一、基本概念 特性多线程(threading)多进程(multiprocessing)并发模型线程共享内存空间每个进程拥有独立内存空间GIL(全局解释器锁…...
搭建和优化CI/CD流水线
CI/CD(持续集成 / 持续交付)流水线是现代软件开发中的关键实践,它能够自动化软件的构建、测试和部署过程,提高开发效率和软件质量。以下为你介绍搭建和优化 CI/CD 流水线的详细步骤: 搭建 CI/CD 流水线 1. 选择合适的…...
kotlin 01flow-StateFlow 完整教程
一 Android StateFlow 完整教程:从入门到实战 StateFlow 是 Kotlin 协程库中用于状态管理的响应式流,特别适合在 Android 应用开发中管理 UI 状态。本教程将带全面了解 StateFlow 的使用方法。 1. StateFlow 基础概念 1.1 什么是 StateFlow? StateF…...
1.2.1 Linux音频系统发展历程简介
Linux音频系统的发展经历了从最初的简单驱动到今天多层次、模块化音频架构。简要梳理其主要历程: 早期的OSS(Open Sound System) 在90年代及2000年代初,Linux主要使用OSS来支持音频。OSS直接为硬件设备(如声卡&#…...
浏览器刷新结束页面事件,调结束事件的接口(vue)
浏览器刷新的时候,正在进行中的事件结束掉,在刷新浏览器的时候做一些操作。 如果是调接口,就不能使用axios封装的接口,需要使用原生的fetch。 找到公共的文件App.vue 使用window.addEventListener(‘beforeunload’, function (e…...
聊聊Spring AI Alibaba的SentenceSplitter
序 本文主要研究一下Spring AI Alibaba的SentenceSplitter SentenceSplitter spring-ai-alibaba-core/src/main/java/com/alibaba/cloud/ai/transformer/splitter/SentenceSplitter.java public class SentenceSplitter extends TextSplitter {private final EncodingRegis…...
前端-什么是结构语言、样式语言、脚本语言?
目录 1. 结构语言(HTML / WXML)——房子的骨架 2. 样式语言(CSS / WXSS)——房子的装修 3. 脚本语言(JavaScript)——房子的智能控制系统 总结对比表: 1. 结构语言(HTML / WXML&a…...

LLM论文笔记 28: Universal length generalization with Turing Programs
Arxiv日期:2024.10.4机构:Harvard University 关键词 图灵机 CoT 长度泛化 核心结论 Turing Programs 的提出 提出 Turing Programs,一种基于图灵机计算步骤的通用 CoT 策略。通过将算法任务分解为逐步的“磁带更新”(类似图灵…...

AI日报 · 2025年5月07日|谷歌发布 Gemini 2.5 Pro 预览版 (I/O 版本),大幅提升编码与视频理解能力
1、谷歌发布 Gemini 2.5 Pro 预览版 (I/O 版本),大幅提升编码与视频理解能力 谷歌于5月6日提前发布 Gemini 2.5 Pro 预览版 (I/O 版本),为开发者带来更强编码能力,尤其优化了前端与UI开发、代码转换及智能体工作流构建,并在WebDe…...

指定Docker镜像源,使用阿里云加速异常解决
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo异常贴图 yum-config-manager:找不到命令 因为系统默认没有安装这个命令,这个命令在yum-utils 包里,可以通过命令yum -y install yum-util…...

VITA STANDARDS LIST,VITA 标准清单下载
VITA STANDARDS LIST,VITA 标准清单下载 DesignationTitleAbstractStatusVMEbus Handbook, 4th EditionA users guide to the VME, VME64 and VME64x bus specifications - features over 70 product photos and over 160 circuit diagrams, tables and graphs. The…...

Python从入门到高手8.3节-元组的常用操作方法
目录 11.3.1 元组的常用操作方法 11.3.2 元组的查找 11.3.3 祈祷明天不再打雷下雨 11.3.1 元组的常用操作方法 元组类型是一种抽象数据类型,抽象数据类型定义了数据类型的操作方法,在本节的内容中,着重介绍元组类型的操作方法。 元组是…...

Linux系统安装PaddleDetection
一、安装cuda 1. 查看设备 先输入nvidia-smi,查看设备支持的最大cuda版本,选择官网中支持的cuda版本 https://www.paddlepaddle.org.cn/install/quick?docurl/documentation/docs/zh/install/conda/linux-conda.html 2. 下载CUDA并安装 使用快捷键…...
【漫话机器学习系列】239.训练错误率(Training Error Rate)
机器学习基础概念 | 训练错误率(Training Error Rate)详解 在机器学习模型训练过程中,评估模型性能是至关重要的一个环节。其中,训练错误率(Training Error Rate) 是最基础也最重要的性能指标之一。 本文将…...
Vue3路由模式为history,使用nginx部署上线是空白的问题
一、问题 将vue使用打包后 npm run build将dist文件的内容,放入nginx的html中,并在nginx.conf中,设置端口 启动nginx,打开发现网页内容为空白 二、解决问题 1.配置vue-route const router createRouter({history: createWe…...
Python 数据智能实战 (13):AI的安全可靠 - 电商数据智能的红线与指南
写在前面 —— 技术向善,行稳致远:在智能时代,坚守数据伦理,构建可信赖的 AI 应用 通过前面的篇章,我们已经深入探索了如何利用 Python 和大语言模型 (LLM) 挖掘电商数据的巨大潜力,从智能用户分群到语义推荐,再到个性化内容生成和模型效果评估。我们手中的工具越来越…...

OpenCV 图形API(80)图像与通道拼接函数-----仿射变换函数warpAffine()
操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 对图像应用仿射变换。 函数 warpAffine 使用指定的矩阵对源图像进行变换: dst ( x , y ) src ( M 11 x M 12 y M 13 , M 21 x M…...

数据结构与算法:图论——最短路径
最短路径 先给出一些leetcode算法题,以后遇见了相关题目再往上增加 最短路径的4个常用算法是Floyd、Bellman-Ford、SPFA、Dijkstra。不同应用场景下,应有选择地使用它们: 图的规模小,用Floyd。若边的权值有负数,需要…...

提示词工程:通向AGI时代的人机交互艺术
引言:从基础到精通的提示词学习之旅 欢迎来到 "AGI时代核心技能" 系列课程的第二模块——提示词工程。在这个模块中,我们将系统性地探索如何通过精心设计的提示词,释放大型语言模型的全部潜力,实现高效、精…...
FreeRTOS系统CPU使用率统计
操作系统中CPU使用率是在软件架构设计中必须要考虑的一个重要性能指标。它直接影响到程序的执行时间以及优先级更高的任务能否实时响应的问题。而CPU使用率也不能过低,避免资源浪费。 基本原理 操作系统会统计系统总共运行了多少时间,以及在此期间每个任…...

是更换Window资源管理器的时候了-> Files-community/Files
Files • 主页https://files.community/ 它已经做到了 云盘文件集成、标签页和多种布局、丰富的文件预览…… 您想要的一切现代文件管理器的强大功能, Files 都能做到。 概述 Files 是一个现代文件管理器,可帮助用户组织他们的文件和文件夹。Files 的…...

基于windows安装MySQL8.0.40
基于windows安装MySQL8.0.40 基于windows 安装 MySQL8.0.40,解压文件到D:\mysql-8.0.40-winx64 在D:\mysql-8.0.40-winx64目录下创建my.ini文件,并更新一下内容 [client] #客户端设置,即客户端默认的连接参数 # 设置mysql客户端连接服务…...

【Vue】组件自定义事件 TodoList 自定义事件数据传输
目录 一、绑定 二、解绑 组件自定义事件总结 TodoList案例对数据传输事件的修改 总结不易~ 本章节对我有很大收获, 希望对你也是!!! 本章节素材已上传Gitee:yihaohhh/我爱Vue - Gitee.com 前面我们学习的clikc、…...

基于Centos7的DHCP服务器搭建
一、准备实验环境: 克隆两台虚拟机 一台作服务器:DHCP Server 一台作客户端:DHCP Clinet 二、部署服务器 在网络模式为NAT下使用yum下载DHCP 需要管理员用户权限才能下载,下载好后关闭客户端,改NAT模式为仅主机模式…...

LabVIEW超声波液位计检定
在工业生产、运输和存储等环节,液位计的应用十分广泛,其中超声波液位计作为非接触式液位测量设备备受青睐。然而,传统立式水槽式液位计检定装置存在受建筑高度影响、量程范围受限、流程耗时长等问题,无法满足大量程超声波液位计的…...