MQ高级:RabbitMQ小细节
在之前的学习中,我们只介绍了消息的发送,但是没有考虑到异常的情况,今天我们就介绍一些异常情况,和细节的部分。
目录
生产者可靠性
生产者重连
生产者确认
MQ可靠性
持久化
Lazy Queue
消费者可靠性
消费者确认机制
失败重试机制
业务幂等性
延迟消息
死信交换机
延迟消息插件
生产者可靠性
生产者重连
有时候由于网络波动,可能会出现客户端连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制:
spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长=initial-interval * multipliermax-attempts: 3 # 最大重试次数
在停止mq服务之后,运行代码,会发现测试失败,因为连接失败。最大重试次数设置的是3,此处就重试了3次再停止。
当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMOP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,
会影响业务性能。
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
生产者确认
在开启了生产者确认机制后,在MQ成功收到消息后会返回确认消息ACK给生产者,如果有异常,会返回NACK。
- 消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
- 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
- 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
- 其它情况都会返回NACK,告知投递失败
spring:rabbitmq:publisher-confirm-type: correlated# 开启publisher confirm机制,并设置confirm类型为correlatedpublisher-returns: true# 开启publisher return机制
这里publisher-confirm-type有三种模式可选:
- none:关闭confirm机制
- simple:同步阻塞等待MO的回执消息
- correlated:MO异步回调方式返回回执消息
但是!生产者确认需要额外的网络和系统资源开销,尽量不要使用。
如果一定要使用,无需开启Publisher-Return机制,因为一般路由失败是自己业务问题。
对于nack消息可以有限次数重试,依然失败则记录异常消息。
MQ可靠性
解决了生产者可靠性,还需要解决MQ的可靠性。
通过生产者发送的消息被MQ存放到内存中,经过某些特殊情况或者MQ重启后,这部分数据会丢失。并且内存空间是有限的,当消费者故障或者处理太慢时,会导致消息积压,导致MQ阻塞。
持久化
为了解决这个问题,MQ引入了数据持久化,包括交换机持久化、队列持久化、消息持久化。
前两者只需要在创建的时候设置成Durable(默认)即可。
消息持久化默认是不开启的,要手动开启。
当不开启磁盘持久化,消息会全部存放在内存中。但是发送消息过多,会占满内存。之后多出来的消息会存放到Paged out中,也就是磁盘中。等待内存中的消息被处理完后,会再把磁盘中的消息加载到内存中,再继续处理。
当开启了磁盘持久化,接收到的消息会在内存和磁盘中都存一份。此时处理的消息是从内存中处理。内存会在将要满的时候清理一次,再继续完成消息处理。磁盘则会把所有消息都保存下来。
Lazy Queue
Lazy Queue惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储
并且惰性队列的性能很高,比之前的几种性能都会好一些。
消费者可靠性
消费者确认机制
当不开启消费者确认机制,生产者投递了一条消息,不管消费者是否处理完了,会马上被RabbitMQ删除,当做已经处理完了。但是如果消费者出现网络波动或者其他异常情况,会导致没有接收到这条消息,生产者这边还会认为消费者已经接收到消息了。告知RabbitMQ自己消息处理状态。处理消息结束后,应该向RabbitMQ发送一个回执,
回执有三种可选值:
- ack:成功处理消息,RabbitMO从队列中删除该消息
- nack:消息处理失败,RabbitMO需要再次投递消息
- reiect:消息处理失败并拒绝该消息,RabbitMO从队列中删除该消息
SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式:
-
none:不处理消息确认。消息投递给消费者后立刻ack,消息会立刻从MQ删除。这种方式非常不安全,不建议使用。
-
manual:手动模式。需要在业务代码中手动调用API发送ack或reject。虽然存在业务侵入,但提供了更大的灵活性。
-
auto:自动模式。SpringAMQP利用AOP对消息处理逻辑进行环绕增强。当业务正常执行时,自动返回ack。当业务出现异常时,根据异常类型自动返回不同的结果:
1.如果是业务异常,自动返回nack。
2.如果是消息处理或校验异常,自动返回reject。
spring:rabbitmq:listener:simple:acknowledge-mode: auto # 可以设置为 none, manual, auto
失败重试机制
当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,形成无限循环。这会导致MQ的消息处理量飙升,给系统带来不必要的压力。
我们可以利用Spring的retry机制,在消费者出现异常时进行本地重试,而不是无限制地requeue到MQ队列。并且指定最大重试次数。
spring:rabbitmq:listener:simple:prefetch: 1retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初始的失败等待时长为1秒multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态; false有状态。如果业务中包含事务,这里改为false
在开启重试模式后,如果重试次数耗尽且消息依然失败,则需要有MessageRecoverer
接口来处理。MessageRecoverer
包含三种不同的实现:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式。
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队。
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
当通过最后一种方式重试耗尽后,我们可以额外设置一个队列,比如error.queue,当发送失败的消息进入这个队列后,再通过邮件强提醒这样的机制推送给工作人员,可以有效解决消息发送失败的极端情况。
业务幂等性
在程序开发中,业务幂等性指的是同一个业务,执行一次或者执行多次对业务的状态是没有影响的。
唯一消息id
方案一,是给每个消息都设置一个唯一id,利用id区分是否是重复消息:
- 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
- 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
- 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
业务判断
方案二,是结合业务逻辑,基于业务本身做判断。以我们的业务为例:我们要在支付后修改订单状态为已支付,应该在修改订单状态前先查询订单状态,判断状态是否是未支付。只有未支付订单才需要修改,其它状态不做处理。
如何保证支付服务与交易服务之间的订单状态一致性?
为确保支付服务与交易服务之间的订单状态一致性,我们采取了以下措施:
消息通知:
- 支付服务在用户支付成功后,通过MQ(消息队列)发送消息通知交易服务,以完成订单状态的同步。
消息可靠性策略:
- 采用生产者确认机制、消费者确认和消费者失败重试等策略,确保消息的可靠投递和处理。
- 开启MQ的持久化功能,避免因服务宕机导致消息丢失。
业务幂等性:
- 在交易服务更新订单状态时进行业务幂等性判断,防止因消息重复消费导致订单状态异常。
如果交易服务消息处理失败,有什么兜底方案?
- 定时任务:
- 在交易服务中设置定时任务,定期查询订单的支付状态。
- 即使MQ通知失败,定时任务也可以作为兜底方案,确保订单支付状态的最终一致性。
延迟消息
延迟消息是消息队列中的一种重要功能,它允许消息在被发送到消息队列后并不会立即被消费者消费,而是在经过特定的时间延迟后才能被消费者获取和处理。这种特性在很多业务场景中都非常有用,比如订单处理超时、定时提醒等。
比如,用户A下单某商品的最后一件,订单确认后,迟迟不支付,但是又占用着这个名额。等到很久以后取消这个订单,此时想买的人没买到,商家没有卖掉,而这个人又没有买。这种情况用延迟消息就能很好的解决这个问题。当用户下单商品后,会设置一个延迟消息,假设30分钟内没有下单,这个延迟消息就会被发送到MQ,提醒数据库这个人订单超时了,强制让这个人取消订单。
死信交换机
当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):
- 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
- 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
- 要投递的队列消息堆积满了,最早的消息可能成为死信
死信消息,经过死信交换机可以变成延迟消息。
当publisher发布一个消息后,通过交换机进入队列。通过手动设置一个过期时间,让消息变成死信消息,此时消息会自动进入通过dead-letter-exchang设置的交换机dlx.direct,再一步步的进入到consumer。
延迟消息插件
RabbitMO的官方也推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机
当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。
@RabbitListener(bindings=@QueueBinding(
value=@Queue(name="delay.queue", durable="true"),
exchange=@Exchange(name="delay.direct",delayed="true"),
key="delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}",msg);
}
@Bean
public DirectExchange delayExchange(){
return ExchangeBuilder
.directExchange("delay.direct")
.delayed()//设置delay的属性为true
.durable(true)//持久化
.build();
}
发送消息时需要通过消息头x-delay来设置过期时间:
@Test
void testPublisherDelayMessage() {//1.创建消息String message = "hello, delayed message";//2.发送消息,利用消息后置处理器添加消息头rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//添加延迟消息属性message.getMessageProperties().setDelay(5000);return message;}});
}
相关文章:

MQ高级:RabbitMQ小细节
在之前的学习中,我们只介绍了消息的发送,但是没有考虑到异常的情况,今天我们就介绍一些异常情况,和细节的部分。 目录 生产者可靠性 生产者重连 生产者确认 MQ可靠性 持久化 Lazy Queue 消费者可靠性 消费者确认机制 失…...

期权卖方怎么选择权利金高的品种,期货VIX高低对行情有什么影响
VIX指数——全称为芝加哥期权交易所市场波动率指数,俗称恐慌指数。 是衡量波动性的重要指标。VIX指数上升,预期未来市场波动性会增加。VIX指数下降,预期未来市场波动性会降低。 期货VIX指数最新价格排序 期权卖方尽量选择期货VIX指数在25以…...
内存对齐的原理和使用
1. 什么是内存对齐? 内存对齐是指将数据存储在内存中时,按照数据类型的大小,将数据放在特定的内存边界上。例如,4 字节的 int 通常放在能够被 4 整除的地址上,8 字节的 double 则放在能被 8 整除的地址上。 2. 为什么…...

搭建企业级私有仓库harbor
华子目录 harbor简介实验环境准备下载软件包安装docker-cehosts解析 实验步骤配置https加密传输解压进入解压目录,修改文件配置启动harbor 测试客户端配置harbor本地加速器注意 通过docker compose管理harbor harbor简介 harbor是由wmware公司开源的企业级docker r…...
互联网前后端分离的开发场景,一般会员和数据权限的判断是放在前端还是后端?
推荐学习文档 golang应用级os框架,欢迎stargolang应用级os框架使用案例,欢迎star案例:基于golang开发的一款超有个性的旅游计划app经历golang实战大纲golang优秀开发常用开源库汇总想学习更多golang知识,这里有免费的golang学习笔…...

李宏毅机器学习2022-HW8-Anomaly Detection
文章目录 TaskBaselineReportQuestion2 Code Link Task 异常检测Anomaly Detection 将data经过Encoder,在经过Decoder,根据输入和输出的差距来判断异常图像。training data是100000张人脸照片,testing data有大约10000张跟training data相同…...

用户体验分享 | YashanDB V23.2.3安装部署
近期崖山新版体验过程中,总能看到用户提问:openssl版本问题、monit命令找不到问题、yashan用户权限问题、数据库重装问题 今日整理了多位用户的安装经验,希望能够帮助到大家~ 1.Lucifer三思而后行 :YashanDB 个人版数据库安装部…...
【漏洞复现】泛微OA E-Office /E-mobile/App/init.php 任意文件上传漏洞
免责声明: 本文旨在提供有关特定漏洞的信息,以帮助用户了解潜在风险。发布此信息旨在促进网络安全意识和技术进步,并非出于恶意。读者应理解,利用本文提到的漏洞或进行相关测试可能违反法律或服务协议。未经授权访问系统、网络或应用程序可能导致法律责任或严重后果…...

SpringCloudEureka实战:搭建EurekaServer
1、依赖引入 <dependencies><!-- 注册中心 --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-server</artifactId></dependency> </dependencies> <de…...

DataLight(V1.4.5) 版本更新,新增 Ranger、Solr
DataLight(V1.4.5) 版本更新,新增 Ranger、Solr DataLight 迎来了重大的版本更新,现已发布 V1.4.5 版本。本次更新对平台进行了较多的功能拓展和优化,新增了对 Ranger 和 Solr 服务组件的支持,同时对多项已…...

深度解析:Python蓝桥杯青少组精英赛道与高端题型概览
目录 一、蓝桥杯青少组简介二、赛项组别与年龄范围三、比赛内容与题型1. 基础知识范围2. 题型设置2.1 选择题2.2 编程题 3. 考试时长 四、奖项设置与激励措施五、总结 一、蓝桥杯青少组简介 蓝桥杯全国软件和信息技术专业人才大赛(简称“蓝桥杯”)是由工…...

如何使用SCCMSecrets识别SCCM策略中潜在的安全问题
关于SCCMSecrets SCCMSecrets是一款针对SCCM策略的安全扫描与检测工具,该工具旨在提供一种有关 SCCM 策略的全面安全检测方法。 该工具可以从各种权限级别执行,并将尝试发现与策略分发相关的潜在错误配置。除了分发点上托管的包脚本外,它还将…...
Qt 信号重载问题--使用lambda表达式--解决方法
在connect()中,使用lambda表达式时遇到信号重载,无法识别使用哪个参数时,可通过以下方法处理: 1. 使用QOverload: Qt5.7才有 connect(comboBox,QOverload<int>::of(&QComboBox::currentIndexChanged), [](int index)…...
并行编程实战——TBB框架的应用之一Supra的基础
一、TBB的应用 在前面分析了TBB框架的各种基本知识和相关的基础应用。这些基础的应用很容易通过学习文档或相关的代码来较为轻松的掌握。为了能够更好的理解TBB框架的优势,这里从一个开源的应用程序来分析一下TBB在其中的更高一层的抽象应用,以方便开发…...
std::vector
std::vector是C标准库中一个非常强大的容器类,它提供了动态数组的功能。std::vector可以自动调整大小,提供了随机访问的能力,同时还支持在序列的尾部高效地添加和删除元素。 当创建一个空的std::vector对象时,它不分配任何内存&a…...
Java Web 之 Cookie 详解
在 JavaWeb 开发中,Cookie 就像网站给浏览器贴的小纸条,用于记录一些用户信息或状态,方便下次访问时识别用户身份或进行个性化服务。 也可以这么理解: 场景一:想象一下,你去一家咖啡店,店员认…...
linux系统下让.py文件开机自启动
一 创建服务文件 1、打开终端 2、切换到root用户 sudo su3、创建一个新的systemd服务文件 nano /etc/systemd/system/total_test0619.service 4、在服务文件中添加以下内容 [Unit] DescriptionRun total_test0619.py at startup[Service] Typesimple ExecStart/usr/bin/n…...
linux远程桌面:xrdp 安装失败
window 如何远程 Linux 桌面 安装xrdp yum install xrdpsystemctl start xrdp 如果找不到软件包,就安装epel源,最好改成国内镜像的 在 /etc/yum.repos.d/ 下创建epel.repo,内容如下 [epel] nameExtra Packages for Enterprise Linux 7 - $basearch …...
9.30Python基础-元组(补充)、字典、集合
Python元组(tuple)补充 1、元组的不可变性 元组(tuple)是Python中的一种内置数据类型,用于存储不可变的序列。虽然元组本身不可变,但元组内的元素如果是可变对象(如列表)ÿ…...

桥接模式和NET模式的区别
桥接模式和NET模式的区别 NAT模式: NAT:网络地址转换(模式):借助宿主机来上网,没桥接那么麻烦,只用配置DNS即可。 缺点:扎根于宿主机,不能和局域网内其它真实的主机进行…...

UDP(Echoserver)
网络命令 Ping 命令 检测网络是否连通 使用方法: ping -c 次数 网址ping -c 3 www.baidu.comnetstat 命令 netstat 是一个用来查看网络状态的重要工具. 语法:netstat [选项] 功能:查看网络状态 常用选项: n 拒绝显示别名&#…...
python爬虫:Newspaper3k 的详细使用(好用的新闻网站文章抓取和解析的Python库)
更多内容请见: 爬虫和逆向教程-专栏介绍和目录 文章目录 一、Newspaper3k 概述1.1 Newspaper3k 介绍1.2 主要功能1.3 典型应用场景1.4 安装二、基本用法2.2 提取单篇文章的内容2.2 处理多篇文档三、高级选项3.1 自定义配置3.2 分析文章情感四、实战案例4.1 构建新闻摘要聚合器…...

k8s业务程序联调工具-KtConnect
概述 原理 工具作用是建立了一个从本地到集群的单向VPN,根据VPN原理,打通两个内网必然需要借助一个公共中继节点,ktconnect工具巧妙的利用k8s原生的portforward能力,简化了建立连接的过程,apiserver间接起到了中继节…...
OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别
OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别 直接训练提示词嵌入向量的核心区别 您提到的代码: prompt_embedding = initial_embedding.clone().requires_grad_(True) optimizer = torch.optim.Adam([prompt_embedding...

学习STC51单片机32(芯片为STC89C52RCRC)OLED显示屏2
每日一言 今天的每一份坚持,都是在为未来积攒底气。 案例:OLED显示一个A 这边观察到一个点,怎么雪花了就是都是乱七八糟的占满了屏幕。。 解释 : 如果代码里信号切换太快(比如 SDA 刚变,SCL 立刻变&#…...
PostgreSQL——环境搭建
一、Linux # 安装 PostgreSQL 15 仓库 sudo dnf install -y https://download.postgresql.org/pub/repos/yum/reporpms/EL-$(rpm -E %{rhel})-x86_64/pgdg-redhat-repo-latest.noarch.rpm# 安装之前先确认是否已经存在PostgreSQL rpm -qa | grep postgres# 如果存在࿰…...
Python 高效图像帧提取与视频编码:实战指南
Python 高效图像帧提取与视频编码:实战指南 在音视频处理领域,图像帧提取与视频编码是基础但极具挑战性的任务。Python 结合强大的第三方库(如 OpenCV、FFmpeg、PyAV),可以高效处理视频流,实现快速帧提取、压缩编码等关键功能。本文将深入介绍如何优化这些流程,提高处理…...

Qt的学习(一)
1.什么是Qt Qt特指用来进行桌面应用开发(电脑上写的程序)涉及到的一套技术Qt无法开发网页前端,也不能开发移动应用。 客户端开发的重要任务:编写和用户交互的界面。一般来说和用户交互的界面,有两种典型风格&…...
CppCon 2015 学习:Simple, Extensible Pattern Matching in C++14
什么是 Pattern Matching(模式匹配) ❝ 模式匹配就是一种“描述式”的写法,不需要你手动判断、提取数据,而是直接描述你希望的数据结构是什么样子,系统自动判断并提取。❞ 你给的定义拆解: ✴ Instead of …...

使用ch340继电器完成随机断电测试
前言 如图所示是市面上常见的OTA压测继电器,通过ch340串口模块完成对继电器的分路控制,这里我编写了一个脚本方便对4路继电器的控制,可以设置开启时间,关闭时间,复位等功能 软件界面 在设备管理器查看串口号后&…...