SpringAMQP开启“可靠性”机制
前言
上一篇介绍了如何在 《SpringBoot 中集成和使用消息队列》,看过这一篇就基本上可以在SpringBoot中使用消息队列了,但是消息队列他归根结底是一个客户端服务器模式的中间件,面对复杂的网络环境和分布式使用环境,难免会出现各种问题。出现问题不可怕,重点在于如何预防和处理,本章就重点介绍一下如何预防和处理使用SpringAMQP时可能出现的问题。
一、消息堆积
1、什么是消息堆积?
消息堆积指的是消费者这边的处理能力低于生产者这边生产消息的能力,导致大量的消息积压在MQ的一种现象。消息堆积可能导致短时间内队列达到最大容量,导致使新消息无法进入队列;对于时间敏感的消息可能成为死信。
2、使用 work 模式同时开启prefetch
work模式:简单来说就是让多个消息队列绑定到一个队列,共同消费队列中的消息。
默认情况下,消息队列是通过轮询的方式将消息推送给消费者的,完全不考虑消费者的消费能力。举个例子:假设生产者生产了50条消息,消费者1的处理能力是1秒50条,消费者2的消费能力是1秒5条,实际这五十条消息会通过轮询各分配给两个消费者25条,如果消费者还没处理完就会阻塞等待,处理完之后再继续推送。
所以默认情况并没有考虑到消费者是否已经处理完消息,可能也会造成消息堆积。怎么解决呢?可以通过修改配置文件:将prefetch设置为1,即每次给消费者投递一条消息,处理完了再投递下一条,这样可以尽可能发挥每个消费者的最大处理能力。
spring:rabbitmq:listener:simple:prefetch: 1 #每次投递一条消息,消费完在投递下一条
3、对消息进行异步处理
再者就是可以在代码上进行优化,比如在消息处理的时候使用线程池进行异步消费,这样可以缩短每个消息的处理时间,降低消息堆积的可能性。
二、发送者可靠性
1、发送者重连机制
有时候可能因为网络波动,可能会出现客户端连接MQ失败的情况。这里可以通过重试机制来提高消息发送的成功率。
spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后等初始待时间multiplier: 1 # 失败后下次等待时长倍数max-attempts: 3 # 最大重试次数
2、发送者确认机制
在生产者这边,是可以开启确认机制的,就是MQ他在接收到消息成功后会返回一个ack给生产者,接收失败就返回nack,生产者这边就可以根据返回的结果,如果失败了就可以进行重发。RabbitMQ这边提供了两种确认机制:
- Publisher Confirm:当生产者向消息队列发送消息时,如果有设备或网络故障导致消息丢失或其他错误,AMQP 协议会自动触发 Confirm 机制,将消息发送失败的信息返回给生产者。生产者可以根据返回的信息进行相应的处理,例如重发、记录日志等。
- Publisher Return:消息路由失败时触发,一般不开启,因为路由失败是自己业务的问题
spring:rabbitmq:publisher-confirm-type: CORRELATED # none: 关闭confirm机制 # simple: 同步阻塞等待MQ回执消息 # correlated: MQ异步回调方式返回回执消息publisher-returns: true
对于ReturnCallback整个项目中配置一次即可:
@Slf4j
@Configuration
public class MqCommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 设置ReturnCallback(路由失败时触发)rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.debug("收到消息的 return callback,exchange:{}, key:{}, msg:{}, code:{}, text:{}",returnedMessage.getExchange(),returnedMessage.getRoutingKey(),returnedMessage.getReplyCode(),returnedMessage.getReplyText());}});}
}
ConfirmCallback 每次发送消息都需要编写
@Test
void testConfirmCallback() throws InterruptedException {// 1.创建CorrelationData,并指定消息IDCorrelationData cd = new CorrelationData(UUID.randomUUID().toString());// 2.给Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 基本不会触发log.error("消息回调失败",ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 这里执行回调if (result.isAck()) {// 消息发送成功log.debug("消息发送成功,收到 ack");} else {// 消息发送失败log.debug("消息发送失败,收到 nack");// 重传等业务逻辑...}}});rabbitTemplate.convertAndSend("forum.direct","red","hello",cd);
}
虽然上述确认机制可以基本保证生产者发送消息的可靠性,但是会增加系统额外的负担和资源开销,因为生产者确认也需要通过MQ来执行回调,如果需要使用,不需要开启publisher return(自己代码写的有问题),对于nack也可以有限次重试,失败多了直接记录异常即可。
三、MQ可靠性
对于MQ本身是提供了持久化的功能的,可以给保证MQ重启数据不丢失。并且在持久化情况下开启生产者确认时,RabbitMQ只有在消息持久化完成之后才会给生产者返回ACK回执。
四、消费者可靠性
1、消息者确认机制
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制,当消费者处理结束消息之后,向RabbitMQ发送一个回执,告知RabbitMQ自己消息的处理状态:
- ack:成功处理消息,RabbitMQ从队列中删除消息
- nack:消息处理失败,RabbitMQ需要再次投递消息
- reject:消息处理失败并拒绝消息,RabbitMQ从队列中删除该消息
spring:rabbitmq:listener:simple:acknowledge-mode: AUTO# none:不处理。消息投递给消费者后立即ack,消息立即从MQ中删除,不安全# manual:手动模式。需要自己在业务代码中调用api,发送ack或reject# auto:自动模式。SpringAMQP使用AOP对我们的消息处理逻辑进行了环绕增强,当业务正常执行时自动返回ack,异常时,如果是业务异常会返回nack,如果是消息处理或校验异常会返回reject
2、失败重试机制
当消费者处理消息出现异常后,MQ这边会再次将消息投递给消费者,如果无限失败就会无限重试,对于MQ和消费者来讲压力就比较大,可以利用SpringAMQP的retry进制,当消费者出现异常时限制重试次数:
spring:rabbitmq:listener:simple:acknowledge-mode: AUTOretry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初始的失败等待时间为1smultiplier: 1 # 下次失败的等待时长的倍数max-attempts: 3 # 最大重试次数stateless: true # true 无状态,false 有状态
开启重试机制后,如果重试次数耗尽,消息依然失败,就需要被MessageRecoverer接口来处理:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,默认的方式
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定交换机(建议)
下面演示第三中策略的接口配置实现:
@Configuration
public class ErrorConfiguration {@Beanpublic DirectExchange errorExchange() {return new DirectExchange("error.exchange");}@Beanpublic Queue errorQueue() {return new Queue("error.queue");}@Beanpublic Binding errorBinding(Queue errorQueue,DirectExchange errorExchange) {return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");}@Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate,"error.exchange","error");}
}
这样即使重试次数耗尽,消息也不会丢失,而是投递到了 error.queue 的队列里面。
五、保证幂等性
1、什么是幂等性?
幂等性就是重复执行相同的操作,系统的状态不会发送变化。比如查询和删除这些操作本身就是幂等的,它们多次操作不会给系统造成状态不一致的影响。
上述机制可以保证消息“至少”被消费1次,但是由于网络的复杂性,可能生产者收不到ack,导致消息的重发,或者MQ这边没有收到消费者的ack,导致消息的重复投递,这些都可能造成消息的重复消费,所以这个时候就要考虑幂等性问题了。
2、使用唯一 ID
生产者这边在给RabbitMQ投递消息的时候,附带一个唯一消息的ID,RabbitMQ这边它是自带去重功能的,就是相同ID的消息它是只存储一份的.
消费者这里,他就可以消费完一条消息后,先将消息ID存起来,然后后面的消息根据ID进行判断是否是重复消息,如果重复直接丢弃就行了.
给消息设置ID的方法:
@Configuration
public class Config {@Beanpublic MessageConverter messageConverter() {Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();jjmc.setCreateMessageIds(true);return jjmc;}
}
3、针对业务进行判断
以支付扣减余额和修改订单状态为例:
- 首先支付服务会在余额扣减成功后利用MQ将消息通知给修改订单状态的服务.
- 修改订单状态之前,先查询订单状态,只有已支付的订单才做修改,这样就可以在业务上保证幂等.
六、实现延迟消息
1、借助死信对列
因为对于那些超时为处理的消息,MQ会投递到死信对列,我们就可以借助这个特性,先将消息投递到到一个普通的对列中,然后如果超时就直接投到了死信对列,然后就让消费者监听死信对列,就可以实现延迟消息了。(PS:对列通过dead-letter-exchange属性绑定的交换机就称为 死信交换机。)
发送延迟消息:
void testSendTTLMessage() {Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8)).setExpiration(5000).build(); // 5秒钟的延迟消息rabbitTemplate.convertAndSend("simple.direct1","testKey",message);
}
2、使用RabbitMQ官方提供的插件
在RabbitMQ中,官方是推出了一种原生支持延迟消息的插件的。原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后暂存一段时间,到期后投递到队列。下面讲解插件使用:
消费者声明
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue",durable = "true"),exchange = @Exchange(name = "delay.direct",delayed = "true",type = ExchangeTypes.DIRECT),key = "delay"))public void listenDelayMessage(String msg) {log.info("接收到delay.queue的延迟消息 {}"+msg);}
生产者发送
@Testvoid testSendDelayMessage() throws InterruptedException {rabbitTemplate.convertAndSend("delay.direct", "delay", "hello,delay!", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 延迟10秒message.getMessageProperties().setDelay(5000);return message;}});log.info("延迟消息发送成功!");}
如果感觉这一大串太麻烦,可以将 new MessagePostProcessor() 分离出来:
// 封装专门用来发送延迟消息的处理器
@RequiredArgsConstructor
public class DelayMessageProcessor implements MessagePostProcessor {private final int delay;@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(delay);return message;}
}
@Testvoid testSendDelayMessage2() throws InterruptedException {rabbitTemplate.convertAndSend("delay.direct", "delay", "hello,delay!", new DelayMessageProcessor(5000));log.info("延迟消息发送成功!");Thread.sleep(1000);}
相关文章:
SpringAMQP开启“可靠性”机制
前言 上一篇介绍了如何在 《SpringBoot 中集成和使用消息队列》,看过这一篇就基本上可以在SpringBoot中使用消息队列了,但是消息队列他归根结底是一个客户端服务器模式的中间件,面对复杂的网络环境和分布式使用环境,难免会出现各…...

戴尔Dell R740服务器开机冒烟亮黄灯故障维修
今天分享的是一台过保修期的DELL PowerEdge R740服务器开机冒烟的维修案例。先上图: 接到用户报修后工程师立即响应,由于用户也是刚开工第一天服务器开机就出现了这种祥龙吐雾的祥兆,导致工厂业务流程无法正常使用,这台机器在东莞…...

【阅读笔记】空域保边降噪《Side Window Filtering》
1、保边滤波背景 保边滤波器的代表包括双边滤波、引导滤波,但是这类滤波器有一个问题,它们均将待处理的像素点放在了方形滤波窗口的中心。但如果待处理的像素位于图像纹理或者边缘,方形滤波核卷积的处理结果会导致这个边缘变模糊。 基于这个…...

vue3前端excel导出;组件表格,自定义表格导出;Vue3 + xlsx + xlsx-style
当画面有自定义的表格或者样式过于复杂的表格时,导出功能可以由前端实现 1. 使用的插件 : sheet.js-xlsx 文档地址:https://docs.sheetjs.com/ 中文地址:https://geekdaxue.co/read/SheetJS-docs-zh/README.md xlsx-style&#…...
npm install一直卡在 sill idealTree buildDeps
当npm install命令在安装过程中卡在sill idealTree buildDeps阶段时,可能的原因包括网络延迟、镜像源问题或缓存问题。以下是一些可能的解决方法: 检查镜像源: 打开命令提示符(cmd)窗口。 输入命令npm config get…...
spring boot rabbitmq常用配置
直接上代码 package com.example.demo;import org.aopalliance.aop.Advice; import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframewo…...
MySQL学习记录——십삼 视图及用户、权限管理
文章目录 1、视图2、用户管理3、权限管理 1、视图 视图把查询出来的结果以表结构的形式存储起来,视图和基表有关系,两者的数据变化都会互相影响。 在查询时,假如要经常查询一条记录,select …,那么为了方便ÿ…...

PyCharm 自动添加文件头注释
PyCharm 自动添加文件头注释 1. File and Code Templates2. Python FileReferences 1. File and Code Templates File -> Settings -> Editor -> File and Code Templates -> Python Script Reformat according to style & Enable Live Templates Created by…...

用HTML Canvas和JavaScript创建美丽的花朵动画效果
目录 一、程序代码 二、代码原理 三、运行效果 一、程序代码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>炫酷花朵</title><style>* {margin: 0;padding: 0;overflow: hidden;bac…...
java----js常用的api
java----js常用的api 时间函数获取当前时间: DateUtil.today()时间偏移字符换时间格式化 map.computeIfAbsent添加list 时间函数 获取当前时间: DateUtil.today() String todayDateUtil.today()String today “2024-02-01”; 时间偏移 往前退役30天 DateUtil.offsetDay(D…...

unity 使用VS Code 开发,VS Code配置注意事项
vscode 对应的插件(unity开发) 插件:.Net Install Tool,c#,c# Dev Kit,IntelliCode For C# Dev Kit,Unity,Unity Code Snippets 本人现在是用了这些插件 unity需要安装Visual Studio Editor 1、.Net Install Tool 设置 需要在设置里面配置…...

领域驱动设计(Domain Driven Design)
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、场景和要求二、领域模型关键词1.领域2.子域3.通用语言4.限界上下文5.领域模型6.实体和值对象7.聚合根8.领域服务9.领域事件 总结 前言 Domain Driven Desi…...

CF778A String Game 题解
文章目录 CF778A String Game 题解题面翻译Input DataOutput DataInput Sample 1Output Sample 1题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1 样例 #2样例输入 #2样例输出 #2 提示算法:二分代码: CF778A String Game 题解 link 题面翻译 …...

【工具插件类教学】Unity运行时监控变量,属性,事件等的值和调用Runtime Monitoring
目录 一、介绍 二、安装方式 三、入门 1.实例化和静态成员...
实际生产中的一次非典型的基于jmeter的接口自动化实践
实际工作中遇到过一次自动化巡检的需求,作为测试人员没法从源代码入手,加之数据库也不熟悉,故采取接口自动化的方式来实现巡检,算是一种歪门邪道吧,应该不是接口自动化的常规使用方式。jmeter在这里的作用实际上也只是…...

新能源汽车软件开发设计规范
新能源汽车 软件开发设计规范 版本: 1.0 编 制: 校 对: 审 核: 会 签: …...

Linux:grep进阶(11)
Linux:shell脚本:基础使用(4)《正则表达式-grep工具》_shell grep 全角字符串-CSDN博客https://blog.csdn.net/w14768855/article/details/132338954?ops_request_misc%257B%2522request%255Fid%2522%253A%252217083360171680022…...

【实战】二、Jest难点进阶(一) —— 前端要学的测试课 从Jest入门到TDD BDD双实战(五)
文章目录 一、Jest 前端自动化测试框架基础入门二、Jest难点进阶1.snapshot 快照测试 学习内容来源:Jest入门到TDD/BDD双实战_前端要学的测试课 相对原教程,我在学习开始时(2023.08)采用的是当前最新版本: 项版本babe…...

8.2 新特性 - 透明的读写分离
文章目录 前言1. 安装部署1.1 下载安装包1.2 MySQL Shell1.3 配置 MySQL 实例1.4 启动 ReplicaSet1.5 启动 8.2 Router 2. 测试路由总结 前言 MySQL 8.0 官方推出过一个高可用方案 ReplicaSet 主要由 Router、MySQL Shell、MySQL Server 三个组件组成。 MySQL Shell 负责管理…...

关于三维GIS开发成长路线的一些思考
三维GIS是将GIS三维化表达,从一个三维GIS开发门外汉的角度来看,三维GIS开发成长路线分几个层面: 第一层面 做三维开发,最基本的Cesium、ThreeJS、MapBox这些要能做到接口级熟悉,熟悉接口是用来干嘛的,接口…...

【大模型RAG】Docker 一键部署 Milvus 完整攻略
本文概要 Milvus 2.5 Stand-alone 版可通过 Docker 在几分钟内完成安装;只需暴露 19530(gRPC)与 9091(HTTP/WebUI)两个端口,即可让本地电脑通过 PyMilvus 或浏览器访问远程 Linux 服务器上的 Milvus。下面…...

定时器任务——若依源码分析
分析util包下面的工具类schedule utils: ScheduleUtils 是若依中用于与 Quartz 框架交互的工具类,封装了定时任务的 创建、更新、暂停、删除等核心逻辑。 createScheduleJob createScheduleJob 用于将任务注册到 Quartz,先构建任务的 JobD…...

项目部署到Linux上时遇到的错误(Redis,MySQL,无法正确连接,地址占用问题)
Redis无法正确连接 在运行jar包时出现了这样的错误 查询得知问题核心在于Redis连接失败,具体原因是客户端发送了密码认证请求,但Redis服务器未设置密码 1.为Redis设置密码(匹配客户端配置) 步骤: 1).修…...
#Uniapp篇:chrome调试unapp适配
chrome调试设备----使用Android模拟机开发调试移动端页面 Chrome://inspect/#devices MuMu模拟器Edge浏览器:Android原生APP嵌入的H5页面元素定位 chrome://inspect/#devices uniapp单位适配 根路径下 postcss.config.js 需要装这些插件 “postcss”: “^8.5.…...

C# 表达式和运算符(求值顺序)
求值顺序 表达式可以由许多嵌套的子表达式构成。子表达式的求值顺序可以使表达式的最终值发生 变化。 例如,已知表达式3*52,依照子表达式的求值顺序,有两种可能的结果,如图9-3所示。 如果乘法先执行,结果是17。如果5…...
LCTF液晶可调谐滤波器在多光谱相机捕捉无人机目标检测中的作用
中达瑞和自2005年成立以来,一直在光谱成像领域深度钻研和发展,始终致力于研发高性能、高可靠性的光谱成像相机,为科研院校提供更优的产品和服务。在《低空背景下无人机目标的光谱特征研究及目标检测应用》这篇论文中提到中达瑞和 LCTF 作为多…...
2025年低延迟业务DDoS防护全攻略:高可用架构与实战方案
一、延迟敏感行业面临的DDoS攻击新挑战 2025年,金融交易、实时竞技游戏、工业物联网等低延迟业务成为DDoS攻击的首要目标。攻击呈现三大特征: AI驱动的自适应攻击:攻击流量模拟真实用户行为,差异率低至0.5%,传统规则引…...
用神经网络读懂你的“心情”:揭秘情绪识别系统背后的AI魔法
用神经网络读懂你的“心情”:揭秘情绪识别系统背后的AI魔法 大家好,我是Echo_Wish。最近刷短视频、看直播,有没有发现,越来越多的应用都开始“懂你”了——它们能感知你的情绪,推荐更合适的内容,甚至帮客服识别用户情绪,提升服务体验。这背后,神经网络在悄悄发力,撑起…...

HTTPS证书一年多少钱?
HTTPS证书作为保障网站数据传输安全的重要工具,成为众多网站运营者的必备选择。然而,面对市场上种类繁多的HTTPS证书,其一年费用究竟是多少,又受哪些因素影响呢? 首先,HTTPS证书通常在PinTrust这样的专业平…...

安全领域新突破:可视化让隐患无处遁形
在安全领域,隐患就像暗处的 “幽灵”,随时可能引发严重事故。传统安全排查手段,常常难以将它们一网打尽。你是否好奇,究竟是什么神奇力量,能让这些潜藏的隐患无所遁形?没错,就是可视化技术。它如…...