当前位置: 首页 > news >正文

19 客户端服务订阅机制的核心流程

Nacos客户端服务订阅机制的核心流程

说起Nacos的服务订阅机制,大家会觉得比较难理解,那我们就来详细分析一下,那我们先从Nacos订阅的概述说起

Nacos订阅概述

Nacos的订阅机制,如果用一句话来描述就是:Nacos客户端通过一个定时任务,每6秒从注册中心获取实例列表,当发现实例发生变化时,发布变更事件,订阅者进行业务处理(更新实例,更改本地缓存)。
在这里插入图片描述

定时任务开启

其实订阅本质上就是服务发现的一种方式,也就是在服务发现的时候执行订阅方法,触发定时任务去拉取服务端的数据。

​ NacosNamingService中暴露的许多重载的subscribe,重载的目的就是让大家少写一些参数,这些参数呢,Nacos给默认处理了。最终这些重载方法都会调用到下面这个方法:

@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)throws NacosException {if (null == listener) {return;}String clusterString = StringUtils.join(clusters, ",");changeNotifier.registerListener(groupName, serviceName, clusterString, listener);clientProxy.subscribe(serviceName, groupName, clusterString);
}

这里我们先来看subscribe方法,大家可能有些眼熟它是clientProxy类型调用的方法,实际上就是NamingClientProxyDelegate.subscribe(),所以其实这里和之前的服务发现中调用的是一个方法,这里其实是在做服务列表的查询,所以得出结论查询和订阅都调用了同一个方法.

@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);// 定时调度UpdateTaskserviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);// 获取缓存中的ServiceInfoServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);if (null == result) {// 如果为null,则进行订阅逻辑处理,基于gRPC协议result = grpcClientProxy.subscribe(serviceName, groupName, clusters);}// ServiceInfo本地缓存处理serviceInfoHolder.processServiceInfo(result);return result;
}

但是这里我们要关注这里的任务调度,该方法包含了构建serviceKey、通过serviceKey判断重复、最后添加UpdateTask,而其中的addTask的实现就是发起了一个定时任务:

public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);if (futureMap.get(serviceKey) != null) {return;}synchronized (futureMap) {if (futureMap.get(serviceKey) != null) {return;}//构建UpdateTaskScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));futureMap.put(serviceKey, future);}
}

定时任务延迟一秒执行:

private synchronized ScheduledFuture<?> addTask(UpdateTask task) {return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}

所以在这里我们得出结论,核心为:调用订阅方法和发起定时任务。

定时任务执行内容

在这里插入图片描述
当我们知道了整体流程以后,我们再来看对应源码:

@Override
public void run() {long delayTime = DEFAULT_DELAY;try {// 判断是服务是否订阅和未开启过定时任务,如果订阅过直接不在执行if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) {NAMING_LOGGER.info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters);return;}// 获取缓存的service信息ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);// 如果为空if (serviceObj == null) {// 根据serviceName从注册中心服务端获取Service信息serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);// 处理本地缓存serviceInfoHolder.processServiceInfo(serviceObj);lastRefTime = serviceObj.getLastRefTime();return;}// 过期服务,服务的最新更新时间小于等于缓存刷新(最后一次拉取数据的时间)时间,从注册中心重新查询if (serviceObj.getLastRefTime() <= lastRefTime) {serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);// 处理本地缓存serviceInfoHolder.processServiceInfo(serviceObj);}//刷新更新时间lastRefTime = serviceObj.getLastRefTime();if (CollectionUtils.isEmpty(serviceObj.getHosts())) {incFailCount();return;}// 下次更新缓存时间设置,默认6秒// TODO multiple time can be configured.delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;// 重置失败数量为0(可能会出现失败情况,没有ServiceInfo,连接失败)resetFailCount();} catch (Throwable e) {incFailCount();NAMING_LOGGER.warn("[NA] failed to update serviceName: {}", groupedServiceName, e);} finally {// 下次调度刷新时间,下次执行的时间与failCount有关,failCount=0,则下次调度时间为6秒,最长为1分钟// 即当无异常情况下缓存实例的刷新时间是6秒executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);}
}

业务逻辑最后会计算下一次定时任务的执行时间,通过delayTime来延迟执行。delayTime默认为 1000L * 6,也就是6秒。而在finally里面真的发起下一次定时任务。当出现异常时,下次执行的时间与失败次数有关,但最长不超过1分钟。

总结:

  1. 订阅方法的调用,并进行EventListener的注册,后面UpdateTask要用来进行判断;

  2. 通过委托代理类来处理订阅逻辑,此处与获取实例列表方法使用了同一个方法;

  3. 通过定时任务执行UpdateTask方法,默认执行间隔为6秒,当发生异常时会延长,但不超过1分钟;

  4. UpdateTask方法中会比较本地是否存在缓存,缓存是否过期。当不存在或过期时,查询注册中心,获取最新实例,更新最后获取时间,处理ServiceInfo。

  5. 重新计算定时任务时间,循环执行流程。

相关文章:

19 客户端服务订阅机制的核心流程

Nacos客户端服务订阅机制的核心流程 说起Nacos的服务订阅机制&#xff0c;大家会觉得比较难理解&#xff0c;那我们就来详细分析一下&#xff0c;那我们先从Nacos订阅的概述说起 Nacos订阅概述 Nacos的订阅机制&#xff0c;如果用一句话来描述就是&#xff1a;Nacos客户端通…...

教师论文|科技专著管理系统

技术&#xff1a;Java、JSP等摘要&#xff1a;随着计算机和互联网技术的发展&#xff0c;社会的信息化程度越来越高&#xff0c;各行各业只有适应这种发展趋势&#xff0c;才能增强自己的适应能力和竞争能力&#xff0c;不断发展壮大。大学作为教育的基地&#xff0c;是社会进步…...

骨传导耳机是什么意思,骨传导耳机的好处具体有哪些

​在这个全民都是手机的时代&#xff0c;各种蓝牙耳机&#xff0c;入耳式耳机&#xff0c;真无线耳机等各种款式琳琅满目。而骨传导耳机是一种全新的科技产物&#xff0c;顾名思义就是通过头骨振动将声音传至外耳内的耳机。由于无需入耳&#xff0c;不会对耳朵造成任何影响。那…...

elasticsearch—使用汇总

文档结构1、概念简介2、使用创景3、核心组件4、环境部署5、操作实践官方网站&#xff1a;https://www.elastic.co/cn/elasticsearch/ 官方手册&#xff1a;https://www.elastic.co/guide/en/elasticsearch/reference/8.6/getting-started.html 参考教程&#xff1a; A&#xff…...

聊一聊代码重构——我们为什么要代码重构

代码重构 事情的起因是在去年下半年&#xff0c;我们终于无法承受往年的历史包袱而决定开始进行代码重构。 在以前我们尝试过进行代码重构但是从来没有系统性的考虑过如何重构。在对代码重构的过程中很多经验都是来自《重构&#xff1a;改善既有代码的设计》这本书&#xff0c;…...

【Python学习笔记】第二十九节 Python2 和Python3发生了哪些变化

Python 版本分为两大流派&#xff0c;一个是 Python 2.x 版本&#xff0c;另外一个是 Python 3.x 版本&#xff0c;Python 官方同时提供了对这两个版本的支持和维护。2020 年 1 月 1 日&#xff0c;Python 官方终止了对 Python 2.7 版本&#xff08;最后一个 Python 2.x 版本&a…...

[oeasy]python0099_雅达利大崩溃_IBM的开放架构_兼容机_oem

雅达利大崩溃 回忆上次内容 个人计算机浪潮已经来临 苹果公司迅速发展微软公司脱离mits准备做纯软件公司IBM用大型机思路制作的5100惨败 Commodore 64 既做计算机又做游戏机 计算机行业和游戏行业 跟随着底层技术不断迭代已经进入了战乱纷纷的年代最终又会如何呢&#xff1f…...

学术论文投稿之同行评审过程中可能会遭遇哪些偏见?

同行评审过程的顺利进行&#xff0c;在很大程度上取决于学术界的积极参与和相互信任&#xff0c;以及需要参与各方都以负责任的态度行事。作为审稿专家&#xff0c;向作者提供公正、客观的评价是至关重要的。同行评审过程中&#xff0c;若有任何偏离客观性的行为&#xff0c;均…...

Python写一个自动发送直播弹幕的工具,非常简单

哈喽大家好&#xff0c;今天给大家用Python整一个可以在直播间自动发弹幕的工具&#xff0c;来为喜欢的主播疯狂扣6 &#xff01; 事情原由昨晚回家&#xff0c;表弟在看LOL直播&#xff0c;看得我气不打一处来&#xff0c;差点就想锤他。 身为程序员的表弟&#xff0c;看直…...

学生档案管理系统的设计与实现

技术&#xff1a;Java、JSP等摘要&#xff1a;本设计是为托普学院学生档案的管理实现电子化而设计的&#xff0c;系统开发采用J2EE技术&#xff0c;数据库采用了SQL Server 2005&#xff0c;因而系统具有很好的扩展性、可移植性&#xff0c;实现了教学资源的信息化管理。主要功…...

JavaEE学习笔记-SpringBoot快速上手、部分注解解释

SpringBoot快速上手 一、快速创建SpringBoot应用1.1利用IDEA提供的Spring Initializr创建Spring Boot应用1.2Spring Boot生成的项目结构介绍1.3初步测试后端是否OK(建立一个controll类)二、热部署2.1 添加依赖2.2 Setting处项目自动化设置2.3 具体项目设置2.4 待选步骤三、注…...

【Python学习笔记】第二十六节 Python PyMySQL

一、什么是 PyMySQL&#xff1f;PyMySQL 是在 Python3.x 版本中用于连接 MySQL 服务器的一个库。可以用它来连接Python和MySQL。如果你追求速度&#xff0c;这是一个很好的选择&#xff0c;因为它比mysql-connector-python快。PyMySQL 遵循 Python 数据库 API v2.0 规范&#x…...

Android问题笔记 -关于Kotlin插件版本的问题

专栏分享点击跳转>Unity3D特效百例点击跳转>案例项目实战源码点击跳转>游戏脚本-辅助自动化点击跳转>Android控件全解手册点击跳转>Scratch编程案例 &#x1f449;关于作者 众所周知&#xff0c;人生是一个漫长的流程&#xff0c;不断克服困难&#xff0c;不断…...

【同步工具类:Phaser】

同步工具类:Phaser介绍特性动态调整线程个数层次Phaser源码分析state 变量解析构造函数对state变量赋值阻塞方法arrive()awaitAdvance()业务场景实现CountDownLatch功能代码测试结果实现 CyclicBarrier功能代码展示测试结果总结介绍 一个可重复使用的同步屏障&#xff0c;功能…...

Linux命令·rmdir

今天学习一下linux中命令&#xff1a; rmdir命令。rmdir是常用的命令&#xff0c;该命令的功能是删除空目录&#xff0c;一个目录被删除之前必须是空的。&#xff08;注意&#xff0c;rm - r dir命令可代替rmdir&#xff0c;但是有很大危险性。&#xff09;删除某目录时也必须具…...

从0开始自制解释器——综述

作为一个程序员&#xff0c;自制自己的编译器一直是一个梦想。之前也曾为了这个梦想学习过类似龙书、虎书这种大部头的书&#xff0c;但是光看理论总有一些云里雾里的感觉。看完只觉得脑袋昏昏沉沉并没有觉得有多少长进。当初看过《疯狂的程序员》这本书&#xff0c;书里说&…...

【spring】spring5特性

1、整个 Spring5 框架的代码基于 Java8&#xff0c;运行时兼容 JDK9&#xff0c;许多不建议使用的类和方 法在代码库中删除 日志框架 2、Spring 5.0 框架自带了通用的日志封装 &#xff08;1&#xff09;Spring5 已经移除 Log4jConfigListener&#xff0c;官方建议使用 Log4j…...

曹云金回归、于谦电影杀青,德云社想不火都难

说起民间最大的相声社团&#xff0c;首屈一指的要属德云社&#xff0c;之所以说德云社最大&#xff0c;主要是优秀相声演员够多。德云社在郭德纲的带领下&#xff0c;如今已经是人才济济&#xff0c;听说最近队伍会进一步壮大&#xff0c;前徒弟曹云金也要回归了。 当年曹云金作…...

从入门到精通:数据库设计规范指南

当我们开始设计数据库时&#xff0c;我们需要确保它是可靠和可扩展的。为了实现这一目标&#xff0c;我们需要遵循一些数据库设计规范。本文将介绍一些数据库设计规范&#xff0c;以确保您的数据库能够满足当前和未来的业务需求。 目录 一、命名规则 二、数据类型 三、索引…...

js 求解《初级算法》8.字符串转换整数(atoi)

一、题目描述 请你来实现一个 myAtoi(string s) 函数&#xff0c;使其能将字符串转换成一个 32 位有符号整数 算法如下&#xff1a; 读入字符串并丢弃无用的前导空格 检查下一个字符&#xff08;假设还未到字符末尾&#xff09;为正还是负号&#xff0c;读取该字符&#xff…...

接口测试中缓存处理策略

在接口测试中&#xff0c;缓存处理策略是一个关键环节&#xff0c;直接影响测试结果的准确性和可靠性。合理的缓存处理策略能够确保测试环境的一致性&#xff0c;避免因缓存数据导致的测试偏差。以下是接口测试中常见的缓存处理策略及其详细说明&#xff1a; 一、缓存处理的核…...

【Linux】shell脚本忽略错误继续执行

在 shell 脚本中&#xff0c;可以使用 set -e 命令来设置脚本在遇到错误时退出执行。如果你希望脚本忽略错误并继续执行&#xff0c;可以在脚本开头添加 set e 命令来取消该设置。 举例1 #!/bin/bash# 取消 set -e 的设置 set e# 执行命令&#xff0c;并忽略错误 rm somefile…...

Linux链表操作全解析

Linux C语言链表深度解析与实战技巧 一、链表基础概念与内核链表优势1.1 为什么使用链表&#xff1f;1.2 Linux 内核链表与用户态链表的区别 二、内核链表结构与宏解析常用宏/函数 三、内核链表的优点四、用户态链表示例五、双向循环链表在内核中的实现优势5.1 插入效率5.2 安全…...

ES6从入门到精通:前言

ES6简介 ES6&#xff08;ECMAScript 2015&#xff09;是JavaScript语言的重大更新&#xff0c;引入了许多新特性&#xff0c;包括语法糖、新数据类型、模块化支持等&#xff0c;显著提升了开发效率和代码可维护性。 核心知识点概览 变量声明 let 和 const 取代 var&#xf…...

label-studio的使用教程(导入本地路径)

文章目录 1. 准备环境2. 脚本启动2.1 Windows2.2 Linux 3. 安装label-studio机器学习后端3.1 pip安装(推荐)3.2 GitHub仓库安装 4. 后端配置4.1 yolo环境4.2 引入后端模型4.3 修改脚本4.4 启动后端 5. 标注工程5.1 创建工程5.2 配置图片路径5.3 配置工程类型标签5.4 配置模型5.…...

C++:std::is_convertible

C++标志库中提供is_convertible,可以测试一种类型是否可以转换为另一只类型: template <class From, class To> struct is_convertible; 使用举例: #include <iostream> #include <string>using namespace std;struct A { }; struct B : A { };int main…...

Docker 运行 Kafka 带 SASL 认证教程

Docker 运行 Kafka 带 SASL 认证教程 Docker 运行 Kafka 带 SASL 认证教程一、说明二、环境准备三、编写 Docker Compose 和 jaas文件docker-compose.yml代码说明&#xff1a;server_jaas.conf 四、启动服务五、验证服务六、连接kafka服务七、总结 Docker 运行 Kafka 带 SASL 认…...

Frozen-Flask :将 Flask 应用“冻结”为静态文件

Frozen-Flask 是一个用于将 Flask 应用“冻结”为静态文件的 Python 扩展。它的核心用途是&#xff1a;将一个 Flask Web 应用生成成纯静态 HTML 文件&#xff0c;从而可以部署到静态网站托管服务上&#xff0c;如 GitHub Pages、Netlify 或任何支持静态文件的网站服务器。 &am…...

解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错

出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上&#xff0c;所以报错&#xff0c;到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本&#xff0c;cu、torch、cp 的版本一定要对…...

UR 协作机器人「三剑客」:精密轻量担当(UR7e)、全能协作主力(UR12e)、重型任务专家(UR15)

UR协作机器人正以其卓越性能在现代制造业自动化中扮演重要角色。UR7e、UR12e和UR15通过创新技术和精准设计满足了不同行业的多样化需求。其中&#xff0c;UR15以其速度、精度及人工智能准备能力成为自动化领域的重要突破。UR7e和UR12e则在负载规格和市场定位上不断优化&#xf…...