RabbitMQ-保证消息可靠性
RabbitMQ-保证消息可靠性
- 1.消息可靠性
- 1.1.生产者消息确认
- 1.1.1.修改配置
- 1.1.2.定义Return回调
- 1.1.3.定义ConfirmCallback
- 1.2.消息持久化
- 1.2.1.交换机持久化
- 1.2.2.队列持久化
- 1.2.3.消息持久化
- 1.3.消费者消息确认
- 1.3.1.演示none模式
- 1.3.2.演示auto模式
- 1.4.消费失败重试机制
- 1.4.1.本地重试
- 1.4.2.失败策略
- 1.5.总结
1.消息可靠性
消息从发送,到消费者接收,会经理多个过程:
其中的每一步都可能导致消息丢失,常见的丢失原因包括:
- 发送时丢失:
- 生产者发送的消息未送达exchange
- 消息到达exchange后未到达queue
- MQ宕机,queue将消息丢失
- consumer接收到消息后未消费就宕机
针对这些问题,RabbitMQ分别给出了解决方案:
- 生产者确认机制
- mq持久化
- 消费者确认机制
- 失败重试机制
下面操作通过案例来演示每一个步骤。
项目结构如下:
1.1.生产者消息确认
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。
返回结果有两种方式:
- publisher-confirm,发送者确认
- 消息成功投递到交换机,返回ack
- 消息未投递到交换机,返回nack
- publisher-return,发送者回执
- 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。
注意:
1.1.1.修改配置
首先,在publisher服务中的application.yml文件,添加下面的内容:
spring:rabbitmq:publisher-confirm-type: correlatedpublisher-returns: truetemplate:mandatory: true
说明:
publish-confirm-type
:开启publisher-confirm,这里支持两种类型:simple
:同步等待confirm结果,直到超时correlated
:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
publish-returns
:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallbacktemplate.mandatory
:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
1.1.2.定义Return回调
每个RabbitTemplate只能配置一个R
eturnCallback,因此需要在项目加载时配置:
修改publisher服务,添加一个:
package cn.zqd.mq.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 设置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 投递失败,记录日志log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",replyCode, replyText, exchange, routingKey, message.toString());// 如果有业务需要,可以重发消息});}
}
1.1.3.定义ConfirmCallback
ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同。
在publisher服务的cn.zqd.mq.spring.SpringAmqpTest类中,定义一个单元测试方法:
public void testSendMessage2SimpleQueue() throws InterruptedException {// 1.消息体String message = "hello, spring amqp!";// 2.全局唯一的消息ID,需要封装到CorrelationData中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 3.添加callbackcorrelationData.getFuture().addCallback(result -> {if(result.isAck()){// 3.1.ack,消息成功log.debug("消息发送成功, ID:{}", correlationData.getId());}else{// 3.2.nack,消息失败log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());}},ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage()));// 4.发送消息rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);// 休眠一会儿,等待ack回执Thread.sleep(2000);
}
1.2.消息持久化
生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失。
要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。
- 交换机持久化
- 队列持久化
- 消息持久化
1.2.1.交换机持久化
RabbitMQ中交换机默认是非持久化的,mq重启后就丢失。
SpringAMQP中可以通过代码指定交换机持久化:
@Bean
public DirectExchange simpleExchange(){// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除return new DirectExchange("simple.direct", true, false);
}
事实上,默认情况下,由SpringAMQP声明的交换机都是持久化的。
可以在RabbitMQ控制台看到持久化的交换机都会带上D
的标示:
1.2.2.队列持久化
RabbitMQ中队列默认是非持久化的,mq重启后就丢失。
SpringAMQP中可以通过代码指定交换机持久化:
@Bean
public Queue simpleQueue(){// 使用QueueBuilder构建队列,durable就是持久化的return QueueBuilder.durable("simple.queue").build();
}
事实上,默认情况下,由SpringAMQP声明的队列都是持久化的。
可以在RabbitMQ控制台看到持久化的队列都会带上D
的标示:
1.2.3.消息持久化
利用SpringAMQP发送消息时,可以设置消息的属性(MessageProperties),指定delivery-mode:
- 1:非持久化
- 2:持久化
用java代码指定:
默认情况下,SpringAMQP发出的任何消息都是持久化的,不用特意指定。
1.3.消费者消息确认
RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。
而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。
设想这样的场景:
- 1)RabbitMQ投递消息给消费者
- 2)消费者获取消息后,返回ACK给RabbitMQ
- 3)RabbitMQ删除消息
- 4)消费者宕机,消息尚未处理
这样,消息就丢失了。因此消费者返回ACK的时机非常重要。
而SpringAMQP则允许配置三种确认模式:
•manual:手动ack,需要在业务代码结束后,调用api发送ack。
•auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
•none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
由此可知:
- none模式下,消息投递是不可靠的,可能丢失
- auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
- manual:自己根据业务情况,判断什么时候该ack
一般,我们都是使用默认的auto即可。
1.3.1.演示none模式
修改consumer服务的application.yml文件,添加下面内容:
spring:rabbitmq:listener:simple:acknowledge-mode: none # 关闭ack
修改consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理异常:
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {log.info("消费者接收到simple.queue的消息:【{}】", msg);// 模拟异常System.out.println(1 / 0);log.debug("消息处理完成!");
}
测试可以发现,当消息处理抛异常时,消息依然被RabbitMQ删除了。
1.3.2.演示auto模式
再次把确认机制修改为auto:
spring:rabbitmq:listener:simple:acknowledge-mode: auto # 关闭ack
在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unack(未确定状态):
抛出异常后,因为Spring会自动返回nack,所以消息恢复至Ready状态,并且没有被RabbitMQ删除:
1.4.消费失败重试机制
当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:
怎么办呢?
1.4.1.本地重试
我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
修改consumer服务的application.yml文件,添加内容:
spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000 # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
重启consumer服务,重复之前的测试。可以发现:
- 在重试3次后,SpringAMQP会抛出异常AmqpRejectAndDontRequeueException,说明本地重试触发了
- 查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是ack,mq删除消息了
结论:
- 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
- 重试达到最大次数后,Spring会返回ack,消息会被丢弃
1.4.2.失败策略
在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:
-
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
-
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
-
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
1)在consumer服务中定义处理失败消息的交换机和队列
@Bean
public DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
2)定义一个RepublishMessageRecoverer,关联队列和交换机
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
完整代码:
package cn.zqd.mq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;@Configuration
public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}
1.5.总结
如何确保RabbitMQ消息的可靠性?(面试题)
- 开启生产者确认机制,确保生产者的消息能到达队列
- 开启持久化功能,确保消息未消费前在队列中不会丢失
- 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
- 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理
相关文章:

RabbitMQ-保证消息可靠性
RabbitMQ-保证消息可靠性 1.消息可靠性1.1.生产者消息确认1.1.1.修改配置1.1.2.定义Return回调1.1.3.定义ConfirmCallback 1.2.消息持久化1.2.1.交换机持久化1.2.2.队列持久化1.2.3.消息持久化 1.3.消费者消息确认1.3.1.演示none模式1.3.2.演示auto模式 1.4.消费失败重试机制1.…...

Python教程——Python本地环境安装
文章目录 简介安装Python下载安装验证安装结果 手动添加环境变量安装问题 简介 python官网:https://www.python.org/ Python Windows下载地址:https://www.python.org/downloads/windows/ Python 官方文档:https://www.python.org/doc/ Pytho…...

“智慧交通”转型升级+创新发展策略
随着“互联网交通”的应用创新推陈出新,传统轨道交通行业客户服务中心已难以满足乘客对便捷高效的客户服务需求;节假日人流量激增,客户服务人手不足,交通、站点堵塞、信息更新不及时等问题是常态。因此,“智慧城市”交…...
华为OD机试 - 开放日活动、取出尽量少的球(Python)
题目描述 某部门开展Family Day开放日活动,其中有个从桶里取球的游戏,游戏规则如下: 有N个容量一样的小桶等距排开, 且每个小桶都默认装了数量不等的小球, 每个小桶装的小球数量记录在数组 bucketBallNums 中, 游戏开始时,要求所有桶的小球总数不能超过SUM, 如果…...

一些关于单链表的操作
思维导图: 一, 链表 1.1节点的结构 链表是啥样的啊?顾名思义链表就是一种用链子链接起来的表。那这种表是怎么样的啊? 这样的呗: 现在,我们知道了链表的形状了。那我们该如何用编程语言来形成这一种形状…...

CTF-PHP反序列化漏洞2-利用魔法函数
作者:Eason_LYC 悲观者预言失败,十言九中。 乐观者创造奇迹,一次即可。 一个人的价值,在于他所拥有的。可以不学无术,但不能一无所有! 技术领域:WEB安全、网络攻防 关注WEB安全、网络攻防。我的…...

Doris(23):Doris的函数—字符串函数
1 append_trailing_char_if_absent(VARCHAR str, VARCHAR trailing_char) 如果s字符串非空并且末尾不包含c字符,则将c字符附加到末尾。 trailing_char只包含一个字符,如果包含多个字符,将返回NULL select append_trailing_char_if_absent(a,c);select append_trailing_cha…...

01-Shiro550漏洞流程
1. 漏洞原理 Apache Shiro框架提供了记住密码的功能(RememberMe),用户登录成功后会生成经过加密并编码的cookie。在服务端对rememberMe的cookie值,先base64解码然后AES解密再反序列化,就导致了反序列化RCE漏洞。 那么…...

《程序员面试金典(第6版)》面试题 16.08. 整数的英语表示
题目描述 给定一个整数,打印该整数的英文描述。 示例 1: 输入: 123输出: “One Hundred Twenty Three” 示例 2: 输入: 12345输出: “Twelve Thousand Three Hundred Forty Five” 示例 3: 输入: 1234567输出: “One Million Two Hundred Thirty Four Thousand…...
ChatGPT技术原理 第四章:Transformer模型
目录 4.1 什么是Transformer 4.2 Transformer结构详解 4.3 Self-Attention机制 4.4 Multi-Head Attention机制 4.1 什么是Transformer...

基于redis和threadlocal实现登录状态校验和拦截
1.流程图 单机节点下的登录状态校验 分布式节点下的登录状态校验 2.代码实现 实现步骤分为如下几步 实现WebMvcConfigurer接口,添加拦截器定义拦截器,需要配置两个interceptor,第一个用于刷新token,写threadlocalÿ…...

14-6-进程间通信-信号量
前面学习了pipe,fifo,共享内存,信号。 本章将讲述信号量。 一、什么是信号量/信号量集? 1.什么是信号量 信号量是一个计数器。信号量用于实现进程间的同步和互斥。而可以取多个正整数的信号量被称为通用信号量。 对信号量的使用场景的解读 房间&#…...
《中国教育报》投稿邮箱编辑部征稿
《中国教育报》国家教育部主管,中国教育报刊社主办的以教育新闻为主的全国性日报。是迄今为止中国最具权威和最有影响力的教育新闻媒体。中国教育报刊社是中华人民共和国教育部直属的新闻出版机构。2018年获得第三届全国“百强报纸”。2019年入选“新媒体影响力指数…...

Photoshop如何使用绘画和图像修饰之实例演示?
文章目录 0.引言1.给图像添加渐变色效果2.快速创建一副素描画3.清除图像中多余的景物4.快速融合两张图像5.调整图像光影6.人像面部瑕疵修除7.美化眼睛 0.引言 因科研等多场景需要进行绘图处理,笔者对PS进行了学习,本文通过《Photoshop2021入门教程》及其…...

【C++】布隆过滤器
文章目录 布隆过滤器提出布隆过滤器概念布隆过滤器应用场景设计思路:布隆过滤器的插入布隆过滤器的查找布隆过滤器删除BloomFilter.h布隆过滤器优点布隆过滤器缺陷 布隆过滤器提出 我们在使用新闻客户端看新闻时,它会给我们不停地推荐新的内容,它每次推荐时要去重,去掉那些已经…...

功能齐全的 ESP32 智能手表,具有多个表盘、心率传感器硬件设计
相关设计资料下载ESP32 智能手表带心率、指南针设计资料(包含Arduino源码+原理图+Gerber+3D文件).zip 介绍 我们调查了智能手表项目的不同方面,并学会了集成和测试每个单独的部分。在本文中,我们将使用所学知识,结合使用硬件和软件组件,从头开始创建我们自己的智能手表。在…...

微服务不是本地部署的最佳选择,不妨试试模块化单体
微服务仅适用于成熟产品 关于从头开始使用微服务,马丁・福勒(Martin Fowler)总结道: 1. 几乎所有成功的微服务都是从一个过于庞大而不得不拆分的单体应用开始的。 2. 几乎所有从头开始以微服务构建的系统,最后都会因…...

解读Toolformer
【引子】读论文Toolformer: Language Models Can Teach Themselves to Use Tools,https://arxiv.org/pdf/2302.04761.pdf,再阅读了几篇关于Toolformer的网络热文,于是“无知者无畏”,开始自不量力地试图解读Toolformer。 大语言模…...

FCOS3D Fully Convolutional One-Stage Monocular 3D Object Detection 论文学习
论文地址:Fully Convolutional One-Stage Monocular 3D Object Detection Github地址:Fully Convolutional One-Stage Monocular 3D Object Detection 1. 解决了什么问题? 单目 3D 目标检测由于成本很低,对于自动驾驶任务非常重…...
Xpath学习笔记
Xpath原理:先将HTML文档转为XML文档,再用xpath查找HTML节点或元素 什么是xml? 1、xml指可扩展标记语言 2、xml是一种标记原因,类似于html 3、xml的设计宗旨是传输数据,而非显示数据 4、xml标签需要我们自己自定义 5、x…...

相机Camera日志实例分析之二:相机Camx【专业模式开启直方图拍照】单帧流程日志详解
【关注我,后续持续新增专题博文,谢谢!!!】 上一篇我们讲了: 这一篇我们开始讲: 目录 一、场景操作步骤 二、日志基础关键字分级如下 三、场景日志如下: 一、场景操作步骤 操作步…...
数据链路层的主要功能是什么
数据链路层(OSI模型第2层)的核心功能是在相邻网络节点(如交换机、主机)间提供可靠的数据帧传输服务,主要职责包括: 🔑 核心功能详解: 帧封装与解封装 封装: 将网络层下发…...

NLP学习路线图(二十三):长短期记忆网络(LSTM)
在自然语言处理(NLP)领域,我们时刻面临着处理序列数据的核心挑战。无论是理解句子的结构、分析文本的情感,还是实现语言的翻译,都需要模型能够捕捉词语之间依时序产生的复杂依赖关系。传统的神经网络结构在处理这种序列依赖时显得力不从心,而循环神经网络(RNN) 曾被视为…...

智能仓储的未来:自动化、AI与数据分析如何重塑物流中心
当仓库学会“思考”,物流的终极形态正在诞生 想象这样的场景: 凌晨3点,某物流中心灯火通明却空无一人。AGV机器人集群根据实时订单动态规划路径;AI视觉系统在0.1秒内扫描包裹信息;数字孪生平台正模拟次日峰值流量压力…...
大学生职业发展与就业创业指导教学评价
这里是引用 作为软工2203/2204班的学生,我们非常感谢您在《大学生职业发展与就业创业指导》课程中的悉心教导。这门课程对我们即将面临实习和就业的工科学生来说至关重要,而您认真负责的教学态度,让课程的每一部分都充满了实用价值。 尤其让我…...

听写流程自动化实践,轻量级教育辅助
随着智能教育工具的发展,越来越多的传统学习方式正在被数字化、自动化所优化。听写作为语文、英语等学科中重要的基础训练形式,也迎来了更高效的解决方案。 这是一款轻量但功能强大的听写辅助工具。它是基于本地词库与可选在线语音引擎构建,…...

Yolov8 目标检测蒸馏学习记录
yolov8系列模型蒸馏基本流程,代码下载:这里本人提交了一个demo:djdll/Yolov8_Distillation: Yolov8轻量化_蒸馏代码实现 在轻量化模型设计中,**知识蒸馏(Knowledge Distillation)**被广泛应用,作为提升模型…...

【VLNs篇】07:NavRL—在动态环境中学习安全飞行
项目内容论文标题NavRL: 在动态环境中学习安全飞行 (NavRL: Learning Safe Flight in Dynamic Environments)核心问题解决无人机在包含静态和动态障碍物的复杂环境中进行安全、高效自主导航的挑战,克服传统方法和现有强化学习方法的局限性。核心算法基于近端策略优化…...

AirSim/Cosys-AirSim 游戏开发(四)外部固定位置监控相机
这个博客介绍了如何通过 settings.json 文件添加一个无人机外的 固定位置监控相机,因为在使用过程中发现 Airsim 对外部监控相机的描述模糊,而 Cosys-Airsim 在官方文档中没有提供外部监控相机设置,最后在源码示例中找到了,所以感…...
C++.OpenGL (20/64)混合(Blending)
混合(Blending) 透明效果核心原理 #mermaid-svg-SWG0UzVfJms7Sm3e {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-SWG0UzVfJms7Sm3e .error-icon{fill:#552222;}#mermaid-svg-SWG0UzVfJms7Sm3e .error-text{fill…...