springboot RabbitMQ客户端连接故障恢复
最近做RabbitMQ故障演练发现RabbitMQ服务器停止后,基于springboot的消费端不可以自动的恢复,队列的消费者消失,消息一直积压到队列中,这种情况肯定是不可接收的;通过研究源代码找到了解决方案。
一、添加自动恢复配置automaticRecovery
CachingConnectionFactory factory = new CachingConnectionFactory(connectionFactory);
cachingConnectionFactoryConfigurer.configure(factory);//设置TCP连接超时时间,默认:60000ms
factory.getRabbitConnectionFactory().setConnectionTimeout(properties.getConnectionTimeout());
//启用或禁用连接自动恢复,默认:false
factory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(properties.isAutomaticRecovery());
//设置连接恢复时间间隔,默认:5000ms
factory.getRabbitConnectionFactory().setNetworkRecoveryInterval(properties.getNetworkRecoveryInterval());
//启用或禁用拓扑恢复,默认:true【拓扑恢复功能可以帮助消费者重新声明之前定义的队列、交换机和绑定等拓扑结构】
factory.getRabbitConnectionFactory().setTopologyRecoveryEnabled(properties.isTopologyRecovery());
//替换默认异常处理DefaultExceptionHandler
factory.getRabbitConnectionFactory().setExceptionHandler(new DefaultMqExceptionHandler());
//添加连接监听器
factory.addConnectionListener(new DefaultMqConnectionListener(factory));
通过上述配置如果RabbitMQ服务器发生故障,则会自动重启恢复连接及队列的消费者,如果恢复失败则会间隔5000ms再次重试;在这里提一个问题,如果服务重试一直失败,重试的上限是多少?带着这个问题我们分析下源码。
二、RabbitMQ客户端实现连接的自动恢复功能
AutorecoveringConnection#beginAutomaticRecovery是在 RabbitMQ 客户端库层面实现的连接的自动恢复功能。当 RabbitMQ 连接出现故障时,它会尝试重新建立连接,以确保消息传递的可靠性。
private synchronized void beginAutomaticRecovery() throws InterruptedException {//获取故障恢复连接的间隔时间,实际是设置的:networkRecoveryIntervalfinal long delay = this.params.getRecoveryDelayHandler().getDelay(0);if (delay > 0) {//等待指定的间隔时间this.wait(delay);}//调用恢复通知监听器this.notifyRecoveryListenersStarted();//获取恢复建立的连接对象final RecoveryAwareAMQConnection newConn = this.recoverConnection();//如果为null则直接返回if (newConn == null) {return;}//连接已经恢复建立,恢复监听器、channel等资源LOGGER.debug("Connection {} has recovered", newConn);this.addAutomaticRecoveryListener(newConn);this.recoverShutdownListeners(newConn);this.recoverBlockedListeners(newConn);this.recoverChannels(newConn);// don't assign new delegate connection until channel recovery is completethis.delegate = newConn;//判断是否恢复拓扑结构,如果开启则开启拓扑结构恢复if (this.params.isTopologyRecoveryEnabled()) {notifyTopologyRecoveryListenersStarted();recoverTopology(params.getTopologyRecoveryExecutor());}this.notifyRecoveryListenersComplete();}
addAutomaticRecoveryListener自动恢复监听器
private void addAutomaticRecoveryListener(final RecoveryAwareAMQConnection newConn) {final AutorecoveringConnection c = this;// this listener will run after shutdown listeners,// see https://github.com/rabbitmq/rabbitmq-java-client/issues/135RecoveryCanBeginListener starter = cause -> {try {if (shouldTriggerConnectionRecovery(cause)) {//开始自动回复c.beginAutomaticRecovery();}} catch (Exception e) {newConn.getExceptionHandler().handleConnectionRecoveryException(c, e);}};synchronized (this) {newConn.addRecoveryCanBeginListener(starter);}
}
init初始化
public void init() throws IOException, TimeoutException {//建立连接,否则抛出异常this.delegate = this.cf.newConnection();//自动回复监听器this.addAutomaticRecoveryListener(delegate);
}
三、消费端实现消息的消费和处理
SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#run是应用程序层面实现消息的消费和处理,它负责从RabbitMQ中接收消息并进行相应的逻辑处理:
@Override // NOSONAR - complexity - many catch blocks
public void run() { // NOSONAR - line count...try {//消费端初始化方法initialize();//当消费端是活跃状态,或者队列非空,或者消费端未被关闭则进入主循环while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {mainLoop();}}catch (InterruptedException e) {...}
}
消费端initialize初始化方法:
private void initialize() throws Throwable { // NOSONARtry {redeclareElementsIfNecessary();//启动消费端初始化this.consumer.start();this.start.countDown();}catch (QueuesNotAvailableException e) {if (isMissingQueuesFatal()) {throw e;}else {this.start.countDown();//消费端启动异常等待处理handleStartupFailure(this.consumer.getBackOffExecution());throw e;}}catch (FatalListenerStartupException ex) {if (isPossibleAuthenticationFailureFatal()) {throw ex;}else {Throwable possibleAuthException = ex.getCause().getCause();if (!(possibleAuthException instanceof PossibleAuthenticationFailureException)) {throw ex;}else {this.start.countDown();//消费端启动异常等待处理handleStartupFailure(this.consumer.getBackOffExecution());throw possibleAuthException;}}}catch (Throwable t) { //NOSONARthis.start.countDown();//消费端启动异常等待处理handleStartupFailure(this.consumer.getBackOffExecution());throw t;}if (getTransactionManager() != null) {/** Register the consumer's channel so it will be used by the transaction manager* if it's an instance of RabbitTransactionManager.*/ConsumerChannelRegistry.registerConsumerChannel(this.consumer.getChannel(), getConnectionFactory());}}
消费端异常等待处理处理:
protected void handleStartupFailure(BackOffExecution backOffExecution) {//获取等待时间间隔,参考FixedBackOff类实现long recoveryInterval = backOffExecution.nextBackOff();if (BackOffExecution.STOP == recoveryInterval) {synchronized (this) {if (isActive()) {logger.warn("stopping container - restart recovery attempts exhausted");stop();}}return;}try {if (logger.isDebugEnabled() && isActive()) {logger.debug("Recovering consumer in " + recoveryInterval + " ms.");}//当前时间加上等待时间long timeout = System.currentTimeMillis() + recoveryInterval;//如果当前时间小于等待时间,则休眠200毫秒,再次尝试while (isActive() && System.currentTimeMillis() < timeout) {Thread.sleep(RECOVERY_LOOP_WAIT_TIME);}}catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException("Irrecoverable interruption on consumer restart", e);}}
FixedBackOff回退等待时间类实现:
public class FixedBackOff implements BackOff {// 默认恢复重试间隔public static final long DEFAULT_INTERVAL = 5000L;//最大重试次数,可以认为无限大public static final long UNLIMITED_ATTEMPTS = Long.MAX_VALUE;// 默认恢复重试间隔private long interval = 5000L;//最大重试次数,可以认为无限大private long maxAttempts = Long.MAX_VALUE;public FixedBackOff() {}public FixedBackOff(long interval, long maxAttempts) {this.interval = interval;this.maxAttempts = maxAttempts;}public void setInterval(long interval) {this.interval = interval;}public long getInterval() {return this.interval;}public void setMaxAttempts(long maxAttempts) {this.maxAttempts = maxAttempts;}public long getMaxAttempts() {return this.maxAttempts;}public BackOffExecution start() {return new FixedBackOffExecution();}private class FixedBackOffExecution implements BackOffExecution {private long currentAttempts;private FixedBackOffExecution() {this.currentAttempts = 0L;}//获取下一次尝试的时间间隔,可以认为一直都是5000mspublic long nextBackOff() {++this.currentAttempts;return this.currentAttempts <= FixedBackOff.this.getMaxAttempts() ? FixedBackOff.this.getInterval() : -1L;}public String toString() {String attemptValue = FixedBackOff.this.maxAttempts == Long.MAX_VALUE ? "unlimited" : String.valueOf(FixedBackOff.this.maxAttempts);return "FixedBackOff{interval=" + FixedBackOff.this.interval + ", currentAttempts=" + this.currentAttempts + ", maxAttempts=" + attemptValue + '}';}}
}
总结:综上源码分析可知消费端故障恢复重试等待时间是5000ms,重试次数可以认为是无限制(Long最大值)
mainloop主循环逻辑:
private void mainLoop() throws Exception { // NOSONAR Exceptiontry {if (SimpleMessageListenerContainer.this.stopNow.get()) {this.consumer.forceCloseAndClearQueue();return;}//接收客户端发送过来的消息,至少获取一条boolean receivedOk = receiveAndExecute(this.consumer); // At least one message receivedif (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {checkAdjust(receivedOk);}long idleEventInterval = getIdleEventInterval();if (idleEventInterval > 0) {if (receivedOk) {updateLastReceive();}else {long now = System.currentTimeMillis();long lastAlertAt = SimpleMessageListenerContainer.this.lastNoMessageAlert.get();long lastReceive = getLastReceive();if (now > lastReceive + idleEventInterval&& now > lastAlertAt + idleEventInterval&& SimpleMessageListenerContainer.this.lastNoMessageAlert.compareAndSet(lastAlertAt, now)) {publishIdleContainerEvent(now - lastReceive);}}}}catch (ListenerExecutionFailedException ex) {// Continue to process, otherwise re-throwif (ex.getCause() instanceof NoSuchMethodException) {throw new FatalListenerExecutionException("Invalid listener", ex);}}catch (AmqpRejectAndDontRequeueException rejectEx) {/** These will normally be wrapped by an LEFE if thrown by the* listener, but we will also honor it if thrown by an* error handler.*/}}
receiveAndExecute接收和处理消息:
private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Exception { // NOSONARPlatformTransactionManager transactionManager = getTransactionManager();if (transactionManager != null) {try {if (this.transactionTemplate == null) {this.transactionTemplate =new TransactionTemplate(transactionManager, getTransactionAttribute());}return this.transactionTemplate.execute(status -> { // NOSONAR null never returnedRabbitResourceHolder resourceHolder = ConnectionFactoryUtils.bindResourceToTransaction(new RabbitResourceHolder(consumer.getChannel(), false),getConnectionFactory(), true);// unbound in ResourceHolderSynchronization.beforeCompletion()try {//接收处理消息return doReceiveAndExecute(consumer);}catch (RuntimeException e1) {prepareHolderForRollback(resourceHolder, e1);throw e1;}catch (Exception e2) {throw new WrappedTransactionException(e2);}});}catch (WrappedTransactionException e) { // NOSONAR exception flow controlthrow (Exception) e.getCause();}}//接收处理消息return doReceiveAndExecute(consumer);}
调用具体的消息监听器消费消息:
private void doExecuteListener(Channel channel, Object data) {if (data instanceof Message) {Message message = (Message) data;if (this.afterReceivePostProcessors != null) {for (MessagePostProcessor processor : this.afterReceivePostProcessors) {message = processor.postProcessMessage(message);if (message == null) {throw new ImmediateAcknowledgeAmqpException("Message Post Processor returned 'null', discarding message");}}}if (this.deBatchingEnabled && this.batchingStrategy.canDebatch(message.getMessageProperties())) {this.batchingStrategy.deBatch(message, fragment -> invokeListener(channel, fragment));}else {invokeListener(channel, message);}}else {invokeListener(channel, data);}}
GitHub代码:https://github.com/mingyang66/spring-parent
相关文章:
springboot RabbitMQ客户端连接故障恢复
最近做RabbitMQ故障演练发现RabbitMQ服务器停止后,基于springboot的消费端不可以自动的恢复,队列的消费者消失,消息一直积压到队列中,这种情况肯定是不可接收的;通过研究源代码找到了解决方案。 一、添加自动恢复配置a…...
centos下配置SFTP且限制用户访问目录
一、SFTP使用场景 ftp是大多数网站的文件传输选择工具,但ftp并不是非常安全,并且在centos上搭建的vsftpd也非常的不稳定,偶尔会出现权限问题,例如500、或是账号密码不正确等等。 而SFTP是基于默认的22端口,是ssh内含…...
A - 最短路径
给出一张包含 n 个节点、 m 条边的无向图,请你求出图上两点 s,t 间的最短路径长度。 (请大家自行处理重边和自环) Input 第一行两个数 n,m ,分别表示节点数和边数,以空格隔开,其中1≤n≤500,1≤m≤50000 ; 之后 m 行…...
自然语言处理(三):基于跳元模型的word2vec实现
跳元模型 回顾一下第一节讲过的跳元模型 跳元模型(Skip-gram Model)是一种用于学习词向量的模型,属于Word2Vec算法中的一种。它的目标是通过给定一个中心词语来预测其周围的上下文词语。 这节我们以跳元模型为例,讲解word2vec的…...
1.1 数据库系统简介
思维导图: 1.1.数据库系统简介 前言: 数据库系统是一个软件系统,用于管理和操作数据库。它提供了一个组织良好、高效并能够方便存取的数据存储机制,并且能够支持各种数据操作、事务管理、并发控制和恢复功能。以下是数据库系统的…...
WebGL 绘制圆点
前言 这篇文章不说WebGL相关概念了,初学者先到网上看看WebGL相关概念。这里着重写一下在vue3前端框架下,如何通过webGL绘制圆点。 着色器代码(画点) 画点相关的着色器代码有顶点着色器和片元着色器,代码如下: 顶点着色器&…...
迅为RK3588开发板Android12 设置系统默认不锁屏
修改 frameworks/base/packages/SettingsProvider/res/values/defaults.xml 文件,修改为如下 所示: - <bool name"def_lockscreen_disabled">false</bool> <bool name"def_lockscreen_disabled">true</bool&…...
香港服务器速度快的原因
1. 传统域名解析过程 了解CDN系统先从域名解析说起。通常,我们在浏览器中输入域名,敲回车后,进入网站进行信息的获取。您分析过输入域名后浏览是如何请求到服务器上的信息,您了解域名解析的过程么? 1.1. 主机解析域…...
过滤器,监听器与拦截器的区别
过滤器,监听器与拦截器的区别 过滤器和监听器不是Spring MVC中的组件,而是Servlet的组件,由Servlet容器来管理。拦截器是Spring MVC中的组件,由Spring容器来管理 Servlet过滤器与Spring MVC 拦截器在Web应用中所处的层次如…...
clickhouse ssb-dbgen数据构造 及 clickhouse-benchmark简单压测
一、 测试数据构造 1. 数据样例 官方文档有给出一批数据样例。优点是比较真实,缺点是太大了,动辄上百G不适合简单小测试 Anonymized Yandex.Metrica DatasetStar Schema BenchmarkWikiStatTerabyte of Click Logs from CriteoAMPLab Big Data Benchma…...
【数据分析】统计量
1. 均值、众数描述数据的集中趋势度量,四分位差、极差描述数据的离散程度。 2. 标准差、四分位差、异众比率度量离散程度,协方差是度量相关性。 期望值分别为E[X]与E[Y]的两个实随机变量X与Y之间的协方差Cov(X,Y)定义为: 从直观上来看&…...
【通用消息通知服务】0x4 - 目前进展 阶段复盘
【通用消息通知服务】0x4 - 阶段复盘 达成 基本的API已经写完✍️了(消息查看发送, 模板crud,终端crud,发送渠道crud,计划crud,计划执行查看)拆分server, executor, planner三个入口, 方便针对性水平扩展整体架构初步形成,通过队列实现了事件驱动模型和消息订阅发…...
vue若依导出word文件,简单的实现
首先前端导包,注意exportDocx的导包位置要修改成你自己的 import {exportDocx} from /utils/docUtil/docutil.js; import {addDays} from date-fns; import {listGongyi} from "/api/system/detail";然后新建一个测试按钮 <el-col :span"1.5"><…...
【LeetCode75】第四十题 最大层内元素和
目录 题目: 示例: 分析: 代码: 题目: 示例: 分析: 这道题和LeetCode75的上一题大同小异,都是要我们对二叉树进行层序遍历。 那具体如何层序遍历我再上一题也详细介绍过了&#…...
脱离束缚:数字化工厂中ARM控制器的革命性应用!
近年来,中国数字经济体系已进入高速增长阶段。制造业作为中国经济高质量发展的重要支撑力量,在面临生产成本不断上涨、关键装备和核心零部件“受制于人”等挑战时,建设数字化工厂已成必然。 数字化工厂数据采集出现的问题 在数字工厂的建设…...
queue ide is not exists in YARN
报错内容: 2023-08-17 17:30:31.342 [ERROR] [BaseTaskScheduler-Thread-7 ] o.a.l.o.s.a.AsyncExecTaskRunnerImpl (79) [run] - Failed to execute task astJob_1_codeExec_1 org.apache.linkis.orchestrator.ecm.exception.ECMPluginErrorException: errCode:…...
【C++】UDP通信:客户端向服务端发送消息并接收服务端回应的消息
目录 1 UDP简介 2 通信 3 实践 4 运行结果 1 UDP简介 (1)UDP通信是无连接的,因此不需要connect操作。 (2)UDP通信过程需要指定数据接收端的IP和端口。 (3)UDP不对收到的数据进行排序。 (4)UDP对接收到的数据报不回复确认信息。 (5)如果发生了数据丢失,不会丢一…...
RabbitMq深度学习
什么是RabbitMq? RabbitMQ是一个开源的消息队列中间件,它实现了高级消息队列协议(AMQP)。它被广泛用于分布式系统中的消息传递和异步通信。RabbitMQ提供了一种可靠的、可扩展的机制来传递消息,使不同的应用程序能够相互之间进行…...
EasyExcel自定义字段对象转换器支持转换实体和集合实体
文章目录 1. 实现ObjectConverter2. 使用3. 测试3.1 导出excel3.2 导入excel 1. 实现ObjectConverter package com.tophant.cloud.common.excel.converters;import cn.hutool.json.JSONUtil; import com.alibaba.excel.converters.Converter; import com.alibaba.excel.enums.…...
Linux重置ROOT密码(CentOS)
解释说明 在CentOS中重置root密码通常需要进入单用户模式,这是一个没有密码限制的特殊模式,允许您以root权限登录系统并更改密码。 重启系统 如果您无法登录到系统,可以通过重启系统来开始这个过程。您可以使用虚拟机控制台、物理服务器控制台…...
浏览器访问 AWS ECS 上部署的 Docker 容器(监听 80 端口)
✅ 一、ECS 服务配置 Dockerfile 确保监听 80 端口 EXPOSE 80 CMD ["nginx", "-g", "daemon off;"]或 EXPOSE 80 CMD ["python3", "-m", "http.server", "80"]任务定义(Task Definition&…...
Oracle查询表空间大小
1 查询数据库中所有的表空间以及表空间所占空间的大小 SELECTtablespace_name,sum( bytes ) / 1024 / 1024 FROMdba_data_files GROUP BYtablespace_name; 2 Oracle查询表空间大小及每个表所占空间的大小 SELECTtablespace_name,file_id,file_name,round( bytes / ( 1024 …...
连锁超市冷库节能解决方案:如何实现超市降本增效
在连锁超市冷库运营中,高能耗、设备损耗快、人工管理低效等问题长期困扰企业。御控冷库节能解决方案通过智能控制化霜、按需化霜、实时监控、故障诊断、自动预警、远程控制开关六大核心技术,实现年省电费15%-60%,且不改动原有装备、安装快捷、…...
七、数据库的完整性
七、数据库的完整性 主要内容 7.1 数据库的完整性概述 7.2 实体完整性 7.3 参照完整性 7.4 用户定义的完整性 7.5 触发器 7.6 SQL Server中数据库完整性的实现 7.7 小结 7.1 数据库的完整性概述 数据库完整性的含义 正确性 指数据的合法性 有效性 指数据是否属于所定…...
适应性Java用于现代 API:REST、GraphQL 和事件驱动
在快速发展的软件开发领域,REST、GraphQL 和事件驱动架构等新的 API 标准对于构建可扩展、高效的系统至关重要。Java 在现代 API 方面以其在企业应用中的稳定性而闻名,不断适应这些现代范式的需求。随着不断发展的生态系统,Java 在现代 API 方…...
HybridVLA——让单一LLM同时具备扩散和自回归动作预测能力:训练时既扩散也回归,但推理时则扩散
前言 如上一篇文章《dexcap升级版之DexWild》中的前言部分所说,在叠衣服的过程中,我会带着团队对比各种模型、方法、策略,毕竟针对各个场景始终寻找更优的解决方案,是我个人和我司「七月在线」的职责之一 且个人认为,…...
鸿蒙(HarmonyOS5)实现跳一跳小游戏
下面我将介绍如何使用鸿蒙的ArkUI框架,实现一个简单的跳一跳小游戏。 1. 项目结构 src/main/ets/ ├── MainAbility │ ├── pages │ │ ├── Index.ets // 主页面 │ │ └── GamePage.ets // 游戏页面 │ └── model │ …...
微服务通信安全:深入解析mTLS的原理与实践
🔥「炎码工坊」技术弹药已装填! 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 一、引言:微服务时代的通信安全挑战 随着云原生和微服务架构的普及,服务间的通信安全成为系统设计的核心议题。传统的单体架构中&…...
全面解析数据库:从基础概念到前沿应用
在数字化时代,数据已成为企业和社会发展的核心资产,而数据库作为存储、管理和处理数据的关键工具,在各个领域发挥着举足轻重的作用。从电商平台的商品信息管理,到社交网络的用户数据存储,再到金融行业的交易记录处理&a…...
GraphRAG优化新思路-开源的ROGRAG框架
目前的如微软开源的GraphRAG的工作流程都较为复杂,难以孤立地评估各个组件的贡献,传统的检索方法在处理复杂推理任务时可能不够有效,特别是在需要理解实体间关系或多跳知识的情况下。先说结论,看完后感觉这个框架性能上不会比Grap…...
