SpringAMQP开启“可靠性”机制
前言
上一篇介绍了如何在 《SpringBoot 中集成和使用消息队列》,看过这一篇就基本上可以在SpringBoot中使用消息队列了,但是消息队列他归根结底是一个客户端服务器模式的中间件,面对复杂的网络环境和分布式使用环境,难免会出现各种问题。出现问题不可怕,重点在于如何预防和处理,本章就重点介绍一下如何预防和处理使用SpringAMQP时可能出现的问题。
一、消息堆积
1、什么是消息堆积?
消息堆积指的是消费者这边的处理能力低于生产者这边生产消息的能力,导致大量的消息积压在MQ的一种现象。消息堆积可能导致短时间内队列达到最大容量,导致使新消息无法进入队列;对于时间敏感的消息可能成为死信。
2、使用 work 模式同时开启prefetch
work模式:简单来说就是让多个消息队列绑定到一个队列,共同消费队列中的消息。
默认情况下,消息队列是通过轮询的方式将消息推送给消费者的,完全不考虑消费者的消费能力。举个例子:假设生产者生产了50条消息,消费者1的处理能力是1秒50条,消费者2的消费能力是1秒5条,实际这五十条消息会通过轮询各分配给两个消费者25条,如果消费者还没处理完就会阻塞等待,处理完之后再继续推送。
所以默认情况并没有考虑到消费者是否已经处理完消息,可能也会造成消息堆积。怎么解决呢?可以通过修改配置文件:将prefetch设置为1,即每次给消费者投递一条消息,处理完了再投递下一条,这样可以尽可能发挥每个消费者的最大处理能力。
spring:rabbitmq:listener:simple:prefetch: 1 #每次投递一条消息,消费完在投递下一条
3、对消息进行异步处理
再者就是可以在代码上进行优化,比如在消息处理的时候使用线程池进行异步消费,这样可以缩短每个消息的处理时间,降低消息堆积的可能性。
二、发送者可靠性
1、发送者重连机制
有时候可能因为网络波动,可能会出现客户端连接MQ失败的情况。这里可以通过重试机制来提高消息发送的成功率。
spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后等初始待时间multiplier: 1 # 失败后下次等待时长倍数max-attempts: 3 # 最大重试次数
2、发送者确认机制
在生产者这边,是可以开启确认机制的,就是MQ他在接收到消息成功后会返回一个ack给生产者,接收失败就返回nack,生产者这边就可以根据返回的结果,如果失败了就可以进行重发。RabbitMQ这边提供了两种确认机制:
- Publisher Confirm:当生产者向消息队列发送消息时,如果有设备或网络故障导致消息丢失或其他错误,AMQP 协议会自动触发 Confirm 机制,将消息发送失败的信息返回给生产者。生产者可以根据返回的信息进行相应的处理,例如重发、记录日志等。
- Publisher Return:消息路由失败时触发,一般不开启,因为路由失败是自己业务的问题
spring:rabbitmq:publisher-confirm-type: CORRELATED # none: 关闭confirm机制 # simple: 同步阻塞等待MQ回执消息 # correlated: MQ异步回调方式返回回执消息publisher-returns: true
对于ReturnCallback整个项目中配置一次即可:
@Slf4j
@Configuration
public class MqCommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 设置ReturnCallback(路由失败时触发)rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.debug("收到消息的 return callback,exchange:{}, key:{}, msg:{}, code:{}, text:{}",returnedMessage.getExchange(),returnedMessage.getRoutingKey(),returnedMessage.getReplyCode(),returnedMessage.getReplyText());}});}
}
ConfirmCallback 每次发送消息都需要编写
@Test
void testConfirmCallback() throws InterruptedException {// 1.创建CorrelationData,并指定消息IDCorrelationData cd = new CorrelationData(UUID.randomUUID().toString());// 2.给Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 基本不会触发log.error("消息回调失败",ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 这里执行回调if (result.isAck()) {// 消息发送成功log.debug("消息发送成功,收到 ack");} else {// 消息发送失败log.debug("消息发送失败,收到 nack");// 重传等业务逻辑...}}});rabbitTemplate.convertAndSend("forum.direct","red","hello",cd);
}
虽然上述确认机制可以基本保证生产者发送消息的可靠性,但是会增加系统额外的负担和资源开销,因为生产者确认也需要通过MQ来执行回调,如果需要使用,不需要开启publisher return(自己代码写的有问题),对于nack也可以有限次重试,失败多了直接记录异常即可。
三、MQ可靠性
对于MQ本身是提供了持久化的功能的,可以给保证MQ重启数据不丢失。并且在持久化情况下开启生产者确认时,RabbitMQ只有在消息持久化完成之后才会给生产者返回ACK回执。
四、消费者可靠性
1、消息者确认机制
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制,当消费者处理结束消息之后,向RabbitMQ发送一个回执,告知RabbitMQ自己消息的处理状态:
- ack:成功处理消息,RabbitMQ从队列中删除消息
- nack:消息处理失败,RabbitMQ需要再次投递消息
- reject:消息处理失败并拒绝消息,RabbitMQ从队列中删除该消息
spring:rabbitmq:listener:simple:acknowledge-mode: AUTO# none:不处理。消息投递给消费者后立即ack,消息立即从MQ中删除,不安全# manual:手动模式。需要自己在业务代码中调用api,发送ack或reject# auto:自动模式。SpringAMQP使用AOP对我们的消息处理逻辑进行了环绕增强,当业务正常执行时自动返回ack,异常时,如果是业务异常会返回nack,如果是消息处理或校验异常会返回reject
2、失败重试机制
当消费者处理消息出现异常后,MQ这边会再次将消息投递给消费者,如果无限失败就会无限重试,对于MQ和消费者来讲压力就比较大,可以利用SpringAMQP的retry进制,当消费者出现异常时限制重试次数:
spring:rabbitmq:listener:simple:acknowledge-mode: AUTOretry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初始的失败等待时间为1smultiplier: 1 # 下次失败的等待时长的倍数max-attempts: 3 # 最大重试次数stateless: true # true 无状态,false 有状态
开启重试机制后,如果重试次数耗尽,消息依然失败,就需要被MessageRecoverer接口来处理:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,默认的方式
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定交换机(建议)
下面演示第三中策略的接口配置实现:
@Configuration
public class ErrorConfiguration {@Beanpublic DirectExchange errorExchange() {return new DirectExchange("error.exchange");}@Beanpublic Queue errorQueue() {return new Queue("error.queue");}@Beanpublic Binding errorBinding(Queue errorQueue,DirectExchange errorExchange) {return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");}@Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate,"error.exchange","error");}
}
这样即使重试次数耗尽,消息也不会丢失,而是投递到了 error.queue 的队列里面。
五、保证幂等性
1、什么是幂等性?
幂等性就是重复执行相同的操作,系统的状态不会发送变化。比如查询和删除这些操作本身就是幂等的,它们多次操作不会给系统造成状态不一致的影响。
上述机制可以保证消息“至少”被消费1次,但是由于网络的复杂性,可能生产者收不到ack,导致消息的重发,或者MQ这边没有收到消费者的ack,导致消息的重复投递,这些都可能造成消息的重复消费,所以这个时候就要考虑幂等性问题了。
2、使用唯一 ID
生产者这边在给RabbitMQ投递消息的时候,附带一个唯一消息的ID,RabbitMQ这边它是自带去重功能的,就是相同ID的消息它是只存储一份的.
消费者这里,他就可以消费完一条消息后,先将消息ID存起来,然后后面的消息根据ID进行判断是否是重复消息,如果重复直接丢弃就行了.
给消息设置ID的方法:
@Configuration
public class Config {@Beanpublic MessageConverter messageConverter() {Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();jjmc.setCreateMessageIds(true);return jjmc;}
}
3、针对业务进行判断
以支付扣减余额和修改订单状态为例:
- 首先支付服务会在余额扣减成功后利用MQ将消息通知给修改订单状态的服务.
- 修改订单状态之前,先查询订单状态,只有已支付的订单才做修改,这样就可以在业务上保证幂等.
六、实现延迟消息
1、借助死信对列
因为对于那些超时为处理的消息,MQ会投递到死信对列,我们就可以借助这个特性,先将消息投递到到一个普通的对列中,然后如果超时就直接投到了死信对列,然后就让消费者监听死信对列,就可以实现延迟消息了。(PS:对列通过dead-letter-exchange属性绑定的交换机就称为 死信交换机。)
发送延迟消息:
void testSendTTLMessage() {Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8)).setExpiration(5000).build(); // 5秒钟的延迟消息rabbitTemplate.convertAndSend("simple.direct1","testKey",message);
}
2、使用RabbitMQ官方提供的插件
在RabbitMQ中,官方是推出了一种原生支持延迟消息的插件的。原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后暂存一段时间,到期后投递到队列。下面讲解插件使用:
消费者声明
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue",durable = "true"),exchange = @Exchange(name = "delay.direct",delayed = "true",type = ExchangeTypes.DIRECT),key = "delay"))public void listenDelayMessage(String msg) {log.info("接收到delay.queue的延迟消息 {}"+msg);}
生产者发送
@Testvoid testSendDelayMessage() throws InterruptedException {rabbitTemplate.convertAndSend("delay.direct", "delay", "hello,delay!", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 延迟10秒message.getMessageProperties().setDelay(5000);return message;}});log.info("延迟消息发送成功!");}
如果感觉这一大串太麻烦,可以将 new MessagePostProcessor() 分离出来:
// 封装专门用来发送延迟消息的处理器
@RequiredArgsConstructor
public class DelayMessageProcessor implements MessagePostProcessor {private final int delay;@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(delay);return message;}
}
@Testvoid testSendDelayMessage2() throws InterruptedException {rabbitTemplate.convertAndSend("delay.direct", "delay", "hello,delay!", new DelayMessageProcessor(5000));log.info("延迟消息发送成功!");Thread.sleep(1000);}
相关文章:

SpringAMQP开启“可靠性”机制
前言 上一篇介绍了如何在 《SpringBoot 中集成和使用消息队列》,看过这一篇就基本上可以在SpringBoot中使用消息队列了,但是消息队列他归根结底是一个客户端服务器模式的中间件,面对复杂的网络环境和分布式使用环境,难免会出现各…...

戴尔Dell R740服务器开机冒烟亮黄灯故障维修
今天分享的是一台过保修期的DELL PowerEdge R740服务器开机冒烟的维修案例。先上图: 接到用户报修后工程师立即响应,由于用户也是刚开工第一天服务器开机就出现了这种祥龙吐雾的祥兆,导致工厂业务流程无法正常使用,这台机器在东莞…...

【阅读笔记】空域保边降噪《Side Window Filtering》
1、保边滤波背景 保边滤波器的代表包括双边滤波、引导滤波,但是这类滤波器有一个问题,它们均将待处理的像素点放在了方形滤波窗口的中心。但如果待处理的像素位于图像纹理或者边缘,方形滤波核卷积的处理结果会导致这个边缘变模糊。 基于这个…...

vue3前端excel导出;组件表格,自定义表格导出;Vue3 + xlsx + xlsx-style
当画面有自定义的表格或者样式过于复杂的表格时,导出功能可以由前端实现 1. 使用的插件 : sheet.js-xlsx 文档地址:https://docs.sheetjs.com/ 中文地址:https://geekdaxue.co/read/SheetJS-docs-zh/README.md xlsx-style&#…...

npm install一直卡在 sill idealTree buildDeps
当npm install命令在安装过程中卡在sill idealTree buildDeps阶段时,可能的原因包括网络延迟、镜像源问题或缓存问题。以下是一些可能的解决方法: 检查镜像源: 打开命令提示符(cmd)窗口。 输入命令npm config get…...

spring boot rabbitmq常用配置
直接上代码 package com.example.demo;import org.aopalliance.aop.Advice; import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframewo…...

MySQL学习记录——십삼 视图及用户、权限管理
文章目录 1、视图2、用户管理3、权限管理 1、视图 视图把查询出来的结果以表结构的形式存储起来,视图和基表有关系,两者的数据变化都会互相影响。 在查询时,假如要经常查询一条记录,select …,那么为了方便ÿ…...

PyCharm 自动添加文件头注释
PyCharm 自动添加文件头注释 1. File and Code Templates2. Python FileReferences 1. File and Code Templates File -> Settings -> Editor -> File and Code Templates -> Python Script Reformat according to style & Enable Live Templates Created by…...

用HTML Canvas和JavaScript创建美丽的花朵动画效果
目录 一、程序代码 二、代码原理 三、运行效果 一、程序代码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>炫酷花朵</title><style>* {margin: 0;padding: 0;overflow: hidden;bac…...

java----js常用的api
java----js常用的api 时间函数获取当前时间: DateUtil.today()时间偏移字符换时间格式化 map.computeIfAbsent添加list 时间函数 获取当前时间: DateUtil.today() String todayDateUtil.today()String today “2024-02-01”; 时间偏移 往前退役30天 DateUtil.offsetDay(D…...

unity 使用VS Code 开发,VS Code配置注意事项
vscode 对应的插件(unity开发) 插件:.Net Install Tool,c#,c# Dev Kit,IntelliCode For C# Dev Kit,Unity,Unity Code Snippets 本人现在是用了这些插件 unity需要安装Visual Studio Editor 1、.Net Install Tool 设置 需要在设置里面配置…...

领域驱动设计(Domain Driven Design)
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、场景和要求二、领域模型关键词1.领域2.子域3.通用语言4.限界上下文5.领域模型6.实体和值对象7.聚合根8.领域服务9.领域事件 总结 前言 Domain Driven Desi…...

CF778A String Game 题解
文章目录 CF778A String Game 题解题面翻译Input DataOutput DataInput Sample 1Output Sample 1题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1 样例 #2样例输入 #2样例输出 #2 提示算法:二分代码: CF778A String Game 题解 link 题面翻译 …...

【工具插件类教学】Unity运行时监控变量,属性,事件等的值和调用Runtime Monitoring
目录 一、介绍 二、安装方式 三、入门 1.实例化和静态成员...

实际生产中的一次非典型的基于jmeter的接口自动化实践
实际工作中遇到过一次自动化巡检的需求,作为测试人员没法从源代码入手,加之数据库也不熟悉,故采取接口自动化的方式来实现巡检,算是一种歪门邪道吧,应该不是接口自动化的常规使用方式。jmeter在这里的作用实际上也只是…...

新能源汽车软件开发设计规范
新能源汽车 软件开发设计规范 版本: 1.0 编 制: 校 对: 审 核: 会 签: …...

Linux:grep进阶(11)
Linux:shell脚本:基础使用(4)《正则表达式-grep工具》_shell grep 全角字符串-CSDN博客https://blog.csdn.net/w14768855/article/details/132338954?ops_request_misc%257B%2522request%255Fid%2522%253A%252217083360171680022…...

【实战】二、Jest难点进阶(一) —— 前端要学的测试课 从Jest入门到TDD BDD双实战(五)
文章目录 一、Jest 前端自动化测试框架基础入门二、Jest难点进阶1.snapshot 快照测试 学习内容来源:Jest入门到TDD/BDD双实战_前端要学的测试课 相对原教程,我在学习开始时(2023.08)采用的是当前最新版本: 项版本babe…...

8.2 新特性 - 透明的读写分离
文章目录 前言1. 安装部署1.1 下载安装包1.2 MySQL Shell1.3 配置 MySQL 实例1.4 启动 ReplicaSet1.5 启动 8.2 Router 2. 测试路由总结 前言 MySQL 8.0 官方推出过一个高可用方案 ReplicaSet 主要由 Router、MySQL Shell、MySQL Server 三个组件组成。 MySQL Shell 负责管理…...

关于三维GIS开发成长路线的一些思考
三维GIS是将GIS三维化表达,从一个三维GIS开发门外汉的角度来看,三维GIS开发成长路线分几个层面: 第一层面 做三维开发,最基本的Cesium、ThreeJS、MapBox这些要能做到接口级熟悉,熟悉接口是用来干嘛的,接口…...

git操作---> 使用git push,和使用git push origin HEAD:[分支名]有什么区别呢?
git push origin HEAD:branch2: 这个命令显式地指定了你要推送的本地引用(HEAD),以及远程仓库的目标引用(origin/branch2)。 HEAD 是一个引用,指向你当前所在的本地分支的最新提交。 这个命令的意图是将当…...

基于Java的大学社团管理平台
功能介绍 平台采用B/S结构,后端采用主流的Springboot框架进行开发,前端采用主流的Vue.js进行开发。 整个平台包括前台和后台两个部分。 前台功能包括:首页、社团详情、申请加入、用户中心模块。后台功能包括:社团管理、分类管理…...

1.函数模板基础
1.1函数模板作用: 建立一个通用函数,其函数返回值类型和形参类型可以不具体指定,用一个虚拟的类型来代表,提高复用性 1.2语法: //第一种 template <typename T> 函数声明或定义//第二种 template <class T&…...

22-k8s中pod的调度-亲和性affinity
一、概述 在k8s当中,“亲和性”分为三种,节点亲和性、pod亲和性、pod反亲和性; 亲和性分类名称解释说明nodeAffinity节点亲和性通过【节点】标签匹配,用于控制pod调度到哪些node节点上,以及不能调度到哪些node节点上&…...

通俗易懂,Spring Bean生命周期管理的理解
目录 1、实例化阶段 2、初始化阶段 3、销毁阶段 总结 在Spring框架中,Bean是最基本的组件,它是Spring框架中的一个Java对象。 下面通过Bean来理解bean的生命周期: Bean(initMethod "customInit", destroyMethod "cust…...

找座位 - 华为OD统一考试(C卷)
OD统一考试(C卷) 分值: 100分 题解: Java / Python / C++ 题目描述 在一个大型体育场内举办了一场大型活动,由于疫情防控的需要,要求每位观众的必须间隔至少一个空位才允许落座。 现在给出一排观众座位分布图,座位中存在已落座的观众,请计算出,在不移动现有观众座位…...

npm run dev运行出现NODE_OPTIONS=--max_old_space_size=4096 vite --mode dev --host?
问题描述 PS E:\AWorkDataease\DataEase\core\core-frontend> npm run dev dataease0.0.0 dev NODE_OPTIONS–max_old_space_size4096 vite --mode dev --host 0.0.0.0 ‘NODE_OPTIONS’ 不是内部或外部命令,也不是可运行的程序 或批处理文件。 解决方案 遇到…...

钠离子电池技术
一、什么是钠离子电池 1、发展背景 在现有电池技术中,锂离子电池(LIB)具有无与伦比的能量密度和多功能性。自其首次商业化以来,便携式设备一直在推动其高速增长。近年,电动汽车和固定式储能应用开始兴起。由于锂离子…...

第三十六天| 435. 无重叠区间、763.划分字母区间、56. 合并区间
Leetcode 435. 无重叠区间 题目链接:435 无重叠区间 题干:给定一个区间的集合 intervals ,其中 intervals[i] [starti, endi] 。返回 需要移除区间的最小数量,使剩余区间互不重叠 。 思考:贪心法。和452 用最少数量的…...

React setState同步还是异步
React18 setState是同步还是异步?_react18 同步-CSDN博客 React18之前或者React18使用了ReactDOM.render,setState在React调度流程中是异步更新,在原生事件和setTimeout中是同步更新。React18使用ReactDOM.createRoot,那么默认都是…...