springboot RabbitMQ客户端连接故障恢复
最近做RabbitMQ故障演练发现RabbitMQ服务器停止后,基于springboot的消费端不可以自动的恢复,队列的消费者消失,消息一直积压到队列中,这种情况肯定是不可接收的;通过研究源代码找到了解决方案。
一、添加自动恢复配置automaticRecovery
CachingConnectionFactory factory = new CachingConnectionFactory(connectionFactory);
cachingConnectionFactoryConfigurer.configure(factory);//设置TCP连接超时时间,默认:60000ms
factory.getRabbitConnectionFactory().setConnectionTimeout(properties.getConnectionTimeout());
//启用或禁用连接自动恢复,默认:false
factory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(properties.isAutomaticRecovery());
//设置连接恢复时间间隔,默认:5000ms
factory.getRabbitConnectionFactory().setNetworkRecoveryInterval(properties.getNetworkRecoveryInterval());
//启用或禁用拓扑恢复,默认:true【拓扑恢复功能可以帮助消费者重新声明之前定义的队列、交换机和绑定等拓扑结构】
factory.getRabbitConnectionFactory().setTopologyRecoveryEnabled(properties.isTopologyRecovery());
//替换默认异常处理DefaultExceptionHandler
factory.getRabbitConnectionFactory().setExceptionHandler(new DefaultMqExceptionHandler());
//添加连接监听器
factory.addConnectionListener(new DefaultMqConnectionListener(factory));
通过上述配置如果RabbitMQ服务器发生故障,则会自动重启恢复连接及队列的消费者,如果恢复失败则会间隔5000ms再次重试;在这里提一个问题,如果服务重试一直失败,重试的上限是多少?带着这个问题我们分析下源码。
二、RabbitMQ客户端实现连接的自动恢复功能
AutorecoveringConnection#beginAutomaticRecovery是在 RabbitMQ 客户端库层面实现的连接的自动恢复功能。当 RabbitMQ 连接出现故障时,它会尝试重新建立连接,以确保消息传递的可靠性。
private synchronized void beginAutomaticRecovery() throws InterruptedException {//获取故障恢复连接的间隔时间,实际是设置的:networkRecoveryIntervalfinal long delay = this.params.getRecoveryDelayHandler().getDelay(0);if (delay > 0) {//等待指定的间隔时间this.wait(delay);}//调用恢复通知监听器this.notifyRecoveryListenersStarted();//获取恢复建立的连接对象final RecoveryAwareAMQConnection newConn = this.recoverConnection();//如果为null则直接返回if (newConn == null) {return;}//连接已经恢复建立,恢复监听器、channel等资源LOGGER.debug("Connection {} has recovered", newConn);this.addAutomaticRecoveryListener(newConn);this.recoverShutdownListeners(newConn);this.recoverBlockedListeners(newConn);this.recoverChannels(newConn);// don't assign new delegate connection until channel recovery is completethis.delegate = newConn;//判断是否恢复拓扑结构,如果开启则开启拓扑结构恢复if (this.params.isTopologyRecoveryEnabled()) {notifyTopologyRecoveryListenersStarted();recoverTopology(params.getTopologyRecoveryExecutor());}this.notifyRecoveryListenersComplete();}
addAutomaticRecoveryListener自动恢复监听器
private void addAutomaticRecoveryListener(final RecoveryAwareAMQConnection newConn) {final AutorecoveringConnection c = this;// this listener will run after shutdown listeners,// see https://github.com/rabbitmq/rabbitmq-java-client/issues/135RecoveryCanBeginListener starter = cause -> {try {if (shouldTriggerConnectionRecovery(cause)) {//开始自动回复c.beginAutomaticRecovery();}} catch (Exception e) {newConn.getExceptionHandler().handleConnectionRecoveryException(c, e);}};synchronized (this) {newConn.addRecoveryCanBeginListener(starter);}
}
init初始化
public void init() throws IOException, TimeoutException {//建立连接,否则抛出异常this.delegate = this.cf.newConnection();//自动回复监听器this.addAutomaticRecoveryListener(delegate);
}
三、消费端实现消息的消费和处理
SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#run是应用程序层面实现消息的消费和处理,它负责从RabbitMQ中接收消息并进行相应的逻辑处理:
@Override // NOSONAR - complexity - many catch blocks
public void run() { // NOSONAR - line count...try {//消费端初始化方法initialize();//当消费端是活跃状态,或者队列非空,或者消费端未被关闭则进入主循环while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {mainLoop();}}catch (InterruptedException e) {...}
}
消费端initialize初始化方法:
private void initialize() throws Throwable { // NOSONARtry {redeclareElementsIfNecessary();//启动消费端初始化this.consumer.start();this.start.countDown();}catch (QueuesNotAvailableException e) {if (isMissingQueuesFatal()) {throw e;}else {this.start.countDown();//消费端启动异常等待处理handleStartupFailure(this.consumer.getBackOffExecution());throw e;}}catch (FatalListenerStartupException ex) {if (isPossibleAuthenticationFailureFatal()) {throw ex;}else {Throwable possibleAuthException = ex.getCause().getCause();if (!(possibleAuthException instanceof PossibleAuthenticationFailureException)) {throw ex;}else {this.start.countDown();//消费端启动异常等待处理handleStartupFailure(this.consumer.getBackOffExecution());throw possibleAuthException;}}}catch (Throwable t) { //NOSONARthis.start.countDown();//消费端启动异常等待处理handleStartupFailure(this.consumer.getBackOffExecution());throw t;}if (getTransactionManager() != null) {/** Register the consumer's channel so it will be used by the transaction manager* if it's an instance of RabbitTransactionManager.*/ConsumerChannelRegistry.registerConsumerChannel(this.consumer.getChannel(), getConnectionFactory());}}
消费端异常等待处理处理:
protected void handleStartupFailure(BackOffExecution backOffExecution) {//获取等待时间间隔,参考FixedBackOff类实现long recoveryInterval = backOffExecution.nextBackOff();if (BackOffExecution.STOP == recoveryInterval) {synchronized (this) {if (isActive()) {logger.warn("stopping container - restart recovery attempts exhausted");stop();}}return;}try {if (logger.isDebugEnabled() && isActive()) {logger.debug("Recovering consumer in " + recoveryInterval + " ms.");}//当前时间加上等待时间long timeout = System.currentTimeMillis() + recoveryInterval;//如果当前时间小于等待时间,则休眠200毫秒,再次尝试while (isActive() && System.currentTimeMillis() < timeout) {Thread.sleep(RECOVERY_LOOP_WAIT_TIME);}}catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException("Irrecoverable interruption on consumer restart", e);}}
FixedBackOff回退等待时间类实现:
public class FixedBackOff implements BackOff {// 默认恢复重试间隔public static final long DEFAULT_INTERVAL = 5000L;//最大重试次数,可以认为无限大public static final long UNLIMITED_ATTEMPTS = Long.MAX_VALUE;// 默认恢复重试间隔private long interval = 5000L;//最大重试次数,可以认为无限大private long maxAttempts = Long.MAX_VALUE;public FixedBackOff() {}public FixedBackOff(long interval, long maxAttempts) {this.interval = interval;this.maxAttempts = maxAttempts;}public void setInterval(long interval) {this.interval = interval;}public long getInterval() {return this.interval;}public void setMaxAttempts(long maxAttempts) {this.maxAttempts = maxAttempts;}public long getMaxAttempts() {return this.maxAttempts;}public BackOffExecution start() {return new FixedBackOffExecution();}private class FixedBackOffExecution implements BackOffExecution {private long currentAttempts;private FixedBackOffExecution() {this.currentAttempts = 0L;}//获取下一次尝试的时间间隔,可以认为一直都是5000mspublic long nextBackOff() {++this.currentAttempts;return this.currentAttempts <= FixedBackOff.this.getMaxAttempts() ? FixedBackOff.this.getInterval() : -1L;}public String toString() {String attemptValue = FixedBackOff.this.maxAttempts == Long.MAX_VALUE ? "unlimited" : String.valueOf(FixedBackOff.this.maxAttempts);return "FixedBackOff{interval=" + FixedBackOff.this.interval + ", currentAttempts=" + this.currentAttempts + ", maxAttempts=" + attemptValue + '}';}}
}
总结:综上源码分析可知消费端故障恢复重试等待时间是5000ms,重试次数可以认为是无限制(Long最大值)
mainloop主循环逻辑:
private void mainLoop() throws Exception { // NOSONAR Exceptiontry {if (SimpleMessageListenerContainer.this.stopNow.get()) {this.consumer.forceCloseAndClearQueue();return;}//接收客户端发送过来的消息,至少获取一条boolean receivedOk = receiveAndExecute(this.consumer); // At least one message receivedif (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {checkAdjust(receivedOk);}long idleEventInterval = getIdleEventInterval();if (idleEventInterval > 0) {if (receivedOk) {updateLastReceive();}else {long now = System.currentTimeMillis();long lastAlertAt = SimpleMessageListenerContainer.this.lastNoMessageAlert.get();long lastReceive = getLastReceive();if (now > lastReceive + idleEventInterval&& now > lastAlertAt + idleEventInterval&& SimpleMessageListenerContainer.this.lastNoMessageAlert.compareAndSet(lastAlertAt, now)) {publishIdleContainerEvent(now - lastReceive);}}}}catch (ListenerExecutionFailedException ex) {// Continue to process, otherwise re-throwif (ex.getCause() instanceof NoSuchMethodException) {throw new FatalListenerExecutionException("Invalid listener", ex);}}catch (AmqpRejectAndDontRequeueException rejectEx) {/** These will normally be wrapped by an LEFE if thrown by the* listener, but we will also honor it if thrown by an* error handler.*/}}
receiveAndExecute接收和处理消息:
private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Exception { // NOSONARPlatformTransactionManager transactionManager = getTransactionManager();if (transactionManager != null) {try {if (this.transactionTemplate == null) {this.transactionTemplate =new TransactionTemplate(transactionManager, getTransactionAttribute());}return this.transactionTemplate.execute(status -> { // NOSONAR null never returnedRabbitResourceHolder resourceHolder = ConnectionFactoryUtils.bindResourceToTransaction(new RabbitResourceHolder(consumer.getChannel(), false),getConnectionFactory(), true);// unbound in ResourceHolderSynchronization.beforeCompletion()try {//接收处理消息return doReceiveAndExecute(consumer);}catch (RuntimeException e1) {prepareHolderForRollback(resourceHolder, e1);throw e1;}catch (Exception e2) {throw new WrappedTransactionException(e2);}});}catch (WrappedTransactionException e) { // NOSONAR exception flow controlthrow (Exception) e.getCause();}}//接收处理消息return doReceiveAndExecute(consumer);}
调用具体的消息监听器消费消息:
private void doExecuteListener(Channel channel, Object data) {if (data instanceof Message) {Message message = (Message) data;if (this.afterReceivePostProcessors != null) {for (MessagePostProcessor processor : this.afterReceivePostProcessors) {message = processor.postProcessMessage(message);if (message == null) {throw new ImmediateAcknowledgeAmqpException("Message Post Processor returned 'null', discarding message");}}}if (this.deBatchingEnabled && this.batchingStrategy.canDebatch(message.getMessageProperties())) {this.batchingStrategy.deBatch(message, fragment -> invokeListener(channel, fragment));}else {invokeListener(channel, message);}}else {invokeListener(channel, data);}}
GitHub代码:https://github.com/mingyang66/spring-parent
相关文章:

springboot RabbitMQ客户端连接故障恢复
最近做RabbitMQ故障演练发现RabbitMQ服务器停止后,基于springboot的消费端不可以自动的恢复,队列的消费者消失,消息一直积压到队列中,这种情况肯定是不可接收的;通过研究源代码找到了解决方案。 一、添加自动恢复配置a…...

centos下配置SFTP且限制用户访问目录
一、SFTP使用场景 ftp是大多数网站的文件传输选择工具,但ftp并不是非常安全,并且在centos上搭建的vsftpd也非常的不稳定,偶尔会出现权限问题,例如500、或是账号密码不正确等等。 而SFTP是基于默认的22端口,是ssh内含…...

A - 最短路径
给出一张包含 n 个节点、 m 条边的无向图,请你求出图上两点 s,t 间的最短路径长度。 (请大家自行处理重边和自环) Input 第一行两个数 n,m ,分别表示节点数和边数,以空格隔开,其中1≤n≤500,1≤m≤50000 ; 之后 m 行…...

自然语言处理(三):基于跳元模型的word2vec实现
跳元模型 回顾一下第一节讲过的跳元模型 跳元模型(Skip-gram Model)是一种用于学习词向量的模型,属于Word2Vec算法中的一种。它的目标是通过给定一个中心词语来预测其周围的上下文词语。 这节我们以跳元模型为例,讲解word2vec的…...

1.1 数据库系统简介
思维导图: 1.1.数据库系统简介 前言: 数据库系统是一个软件系统,用于管理和操作数据库。它提供了一个组织良好、高效并能够方便存取的数据存储机制,并且能够支持各种数据操作、事务管理、并发控制和恢复功能。以下是数据库系统的…...

WebGL 绘制圆点
前言 这篇文章不说WebGL相关概念了,初学者先到网上看看WebGL相关概念。这里着重写一下在vue3前端框架下,如何通过webGL绘制圆点。 着色器代码(画点) 画点相关的着色器代码有顶点着色器和片元着色器,代码如下: 顶点着色器&…...

迅为RK3588开发板Android12 设置系统默认不锁屏
修改 frameworks/base/packages/SettingsProvider/res/values/defaults.xml 文件,修改为如下 所示: - <bool name"def_lockscreen_disabled">false</bool> <bool name"def_lockscreen_disabled">true</bool&…...

香港服务器速度快的原因
1. 传统域名解析过程 了解CDN系统先从域名解析说起。通常,我们在浏览器中输入域名,敲回车后,进入网站进行信息的获取。您分析过输入域名后浏览是如何请求到服务器上的信息,您了解域名解析的过程么? 1.1. 主机解析域…...

过滤器,监听器与拦截器的区别
过滤器,监听器与拦截器的区别 过滤器和监听器不是Spring MVC中的组件,而是Servlet的组件,由Servlet容器来管理。拦截器是Spring MVC中的组件,由Spring容器来管理 Servlet过滤器与Spring MVC 拦截器在Web应用中所处的层次如…...

clickhouse ssb-dbgen数据构造 及 clickhouse-benchmark简单压测
一、 测试数据构造 1. 数据样例 官方文档有给出一批数据样例。优点是比较真实,缺点是太大了,动辄上百G不适合简单小测试 Anonymized Yandex.Metrica DatasetStar Schema BenchmarkWikiStatTerabyte of Click Logs from CriteoAMPLab Big Data Benchma…...

【数据分析】统计量
1. 均值、众数描述数据的集中趋势度量,四分位差、极差描述数据的离散程度。 2. 标准差、四分位差、异众比率度量离散程度,协方差是度量相关性。 期望值分别为E[X]与E[Y]的两个实随机变量X与Y之间的协方差Cov(X,Y)定义为: 从直观上来看&…...

【通用消息通知服务】0x4 - 目前进展 阶段复盘
【通用消息通知服务】0x4 - 阶段复盘 达成 基本的API已经写完✍️了(消息查看发送, 模板crud,终端crud,发送渠道crud,计划crud,计划执行查看)拆分server, executor, planner三个入口, 方便针对性水平扩展整体架构初步形成,通过队列实现了事件驱动模型和消息订阅发…...

vue若依导出word文件,简单的实现
首先前端导包,注意exportDocx的导包位置要修改成你自己的 import {exportDocx} from /utils/docUtil/docutil.js; import {addDays} from date-fns; import {listGongyi} from "/api/system/detail";然后新建一个测试按钮 <el-col :span"1.5"><…...

【LeetCode75】第四十题 最大层内元素和
目录 题目: 示例: 分析: 代码: 题目: 示例: 分析: 这道题和LeetCode75的上一题大同小异,都是要我们对二叉树进行层序遍历。 那具体如何层序遍历我再上一题也详细介绍过了&#…...

脱离束缚:数字化工厂中ARM控制器的革命性应用!
近年来,中国数字经济体系已进入高速增长阶段。制造业作为中国经济高质量发展的重要支撑力量,在面临生产成本不断上涨、关键装备和核心零部件“受制于人”等挑战时,建设数字化工厂已成必然。 数字化工厂数据采集出现的问题 在数字工厂的建设…...

queue ide is not exists in YARN
报错内容: 2023-08-17 17:30:31.342 [ERROR] [BaseTaskScheduler-Thread-7 ] o.a.l.o.s.a.AsyncExecTaskRunnerImpl (79) [run] - Failed to execute task astJob_1_codeExec_1 org.apache.linkis.orchestrator.ecm.exception.ECMPluginErrorException: errCode:…...

【C++】UDP通信:客户端向服务端发送消息并接收服务端回应的消息
目录 1 UDP简介 2 通信 3 实践 4 运行结果 1 UDP简介 (1)UDP通信是无连接的,因此不需要connect操作。 (2)UDP通信过程需要指定数据接收端的IP和端口。 (3)UDP不对收到的数据进行排序。 (4)UDP对接收到的数据报不回复确认信息。 (5)如果发生了数据丢失,不会丢一…...

RabbitMq深度学习
什么是RabbitMq? RabbitMQ是一个开源的消息队列中间件,它实现了高级消息队列协议(AMQP)。它被广泛用于分布式系统中的消息传递和异步通信。RabbitMQ提供了一种可靠的、可扩展的机制来传递消息,使不同的应用程序能够相互之间进行…...

EasyExcel自定义字段对象转换器支持转换实体和集合实体
文章目录 1. 实现ObjectConverter2. 使用3. 测试3.1 导出excel3.2 导入excel 1. 实现ObjectConverter package com.tophant.cloud.common.excel.converters;import cn.hutool.json.JSONUtil; import com.alibaba.excel.converters.Converter; import com.alibaba.excel.enums.…...

Linux重置ROOT密码(CentOS)
解释说明 在CentOS中重置root密码通常需要进入单用户模式,这是一个没有密码限制的特殊模式,允许您以root权限登录系统并更改密码。 重启系统 如果您无法登录到系统,可以通过重启系统来开始这个过程。您可以使用虚拟机控制台、物理服务器控制台…...

【Spring】一文带你彻底搞懂IOC、AOP
目录 首先简单了解一下什么是spring框架 什么是IOC? 什么是依赖注入(DI)? 控制反转和依赖注入又有什么关系? AOP是什么? SpringAOP的实现 说了这么多抽象概念,举个实例方便理解 首先简单…...

国际旅游网络的大数据分析(数学建模练习题)
题目:国际旅游网络的大数据分析 伴随着大数据时代的到来,数据分析已经深入到现代社会生活中的各个方面。 无论是国家政府部门、企事业单位还是个人,数据分析工作都是进行决策之前的 重要环节。 山东省应用统计学会是在省民政厅注册的学术类社会组织&…...

音视频技术开发周刊 | 308
每周一期,纵览音视频技术领域的干货。 新闻投稿:contributelivevideostack.com。 OpenAI首席科学家最新访谈:对模型创业两点建议、安全与对齐、Transformer够好吗? OpenAI首席科学家Ilya Sutskever最近和他的朋友Sven Strohband进…...

多旋翼飞控底层算法开发系列实验 | 多旋翼动力系统设计实验3
多旋翼动力系统设计实验3 01/多旋翼动力系统简介 多旋翼无人机的动力系统通常包括螺旋桨、电机、电调以及电池。动力系统是多旋翼最重要的组成部分,它决定了多旋翼的主要性能,如悬停时间、载重能力、飞行速度和飞行距离等。动力系统的部件…...

Redis之Sentinel(哨兵)机制
一、Sentinel是什么? Sentinel(哨岗、哨兵)是Redis的高可用性(high availability)解决方案:由一个或多个Sentinel实例(instance)组成的Sentinel系统(system)…...

加密的PDF文件,如何解密?
PDF文件带有打开密码、限制编辑,这两种密码设置了之后如何解密? 不管是打开密码或者是限制编辑,在知道密码的情况下,解密PDF密码,我们只需要在PDF编辑器中打开文件 – 属性 – 安全,将权限状态修改为无保护…...

【java】获取当前年份
目录 一、代码示例二、截图示例 一、代码示例 package com.learning;import java.text.SimpleDateFormat; import java.time.LocalDate; import java.time.Year; import java.util.Calendar; import java.util.Date;/*** 获取当前年份*/ public class GetCurrentYear {public …...

前端面试话术集锦第一篇
🚗前端面试集锦目录 💖前端面试话术集锦第一篇💖 💖前端面试话术集锦第二篇💖 文章目录 1. 前端需要注意哪些SEO2. \<img>的title和alt有什么区别3. HTTP的⼏种请求⽅法⽤途4. 从浏览器地址栏输⼊url到显示⻚⾯的步骤5. 如何进⾏⽹站性能优化6. HTTP状态码及其…...

NeRFMeshing - 精确提取NeRF中的3D网格
准确的 3D 场景和对象重建对于机器人、摄影测量和 AR/VR 等各种应用至关重要。 NeRF 在合成新颖视图方面取得了成功,但在准确表示底层几何方面存在不足。 推荐:用 NSDT编辑器 快速搭建可编程3D场景 我们已经看到了最新的进展,例如 NVIDIA 的 …...

后端面试话术集锦第五篇:rabbitmq面试话术
🚗后端面试集锦目录 💖后端面试话术集锦第 1 篇:spring面试话术💖 💖后端面试话术集锦第 2 篇:spring boot面试话术💖 💖后端面试话术集锦第 3 篇:spring cloud面试话术💖 💖后端面试话术集锦第 4 篇:ElasticSearch面试话术💖 💖后端面试话术集锦第 5 …...