Spring Cloud Nacos源码讲解(十)- Nacos服务端服务发现处理
Nacos集群数据同步
当我们有服务进行注册以后,会写入注册信息同时会触发ClientChangedEvent事件,通过这个事件,就会开始进行Nacos的集群数据同步,当然这其中只有有一个Nacos节点来处理对应的客户端请求,其实这其中还涉及到一个负责节点和非负责节点
负责节点
这是首先我们要查看的是DistroClientDataProcessor(客户端数据一致性处理器)类型,这个类型会处理当前节点负责的Client,那我们要查看其中的syncToAllServer方法。
private void syncToAllServer(ClientEvent event) {Client client = event.getClient();// 判断客户端是否为空,是否是临时实例,判断是否是负责节点if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {return;}if (event instanceof ClientEvent.ClientDisconnectEvent) {// 客户端断开连接DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.DELETE);} else if (event instanceof ClientEvent.ClientChangedEvent) {// 客户端新增/修改DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.CHANGE);}
}
distroProtocol会循环所有其他nacos节点,提交一个异步任务,这个异步任务会延迟1s,其实这里我们就可以看到这里涉及到客户端的断开和客户端的新增和修改,对于Delete操作,由DistroSyncDeleteTask处理,对于Change操作,由DistroSyncChangeTask处理,这里我们从DistroSyncChangeTask来看
public class DistroSyncChangeTask extends AbstractDistroExecuteTask {private static final DataOperation OPERATION = DataOperation.CHANGE;public DistroSyncChangeTask(DistroKey distroKey, DistroComponentHolder distroComponentHolder) {super(distroKey, distroComponentHolder);}@Overrideprotected DataOperation getDataOperation() {return OPERATION;}// 无回调@Overrideprotected boolean doExecute() {String type = getDistroKey().getResourceType();DistroData distroData = getDistroData(type);if (null == distroData) {Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());return true;}return getDistroComponentHolder().findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());}// 有回调@Overrideprotected void doExecuteWithCallback(DistroCallback callback) {String type = getDistroKey().getResourceType();DistroData distroData = getDistroData(type);if (null == distroData) {Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());return;}getDistroComponentHolder().findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer(), callback);}@Overridepublic String toString() {return "DistroSyncChangeTask for " + getDistroKey().toString();}// 从DistroClientDataProcessor获取DistroDataprivate DistroData getDistroData(String type) {DistroData result = getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey());if (null != result) {result.setType(OPERATION);}return result;}
}
获取到的DistroData,其实是从ClientManager实时获取Client。
// DistroClientDataProcessor
@Override
public DistroData getDistroData(DistroKey distroKey) {Client client = clientManager.getClient(distroKey.getResourceKey());if (null == client) {return null;}// 把生成的同步数据放入到数组中byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData());return new DistroData(distroKey, data);
}
AbstractClient继承了Client,同时给DistroClientDataProcessorClient提供Client的注册信息,包括客户端注册了哪些namespace,哪些group,哪些service,哪些instance。
@Override
public ClientSyncData generateSyncData() {List<String> namespaces = new LinkedList<>();List<String> groupNames = new LinkedList<>();List<String> serviceNames = new LinkedList<>();List<InstancePublishInfo> instances = new LinkedList<>();for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) {namespaces.add(entry.getKey().getNamespace());groupNames.add(entry.getKey().getGroup());serviceNames.add(entry.getKey().getName());instances.add(entry.getValue());}return new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances);
}
这里我们在回过头来看syncData方法,这个方法实际上是由DistroClientTransportAgent封装为DistroDataRequest调用其他Nacos节点。
@Override
public boolean syncData(DistroData data, String targetServer) {if (isNoExistTarget(targetServer)) {return true;}DistroDataRequest request = new DistroDataRequest(data, data.getType());Member member = memberManager.find(targetServer);if (checkTargetServerStatusUnhealthy(member)) {Loggers.DISTRO.warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy", targetServer);return false;}try {Response response = clusterRpcClientProxy.sendRequest(member, request);return checkResponse(response);} catch (NacosException e) {Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! ", e);}return false;
}
非负责节点
当负责节点将数据发送给非负责节点以后,将要处理发送过来的Client数据。这里我们要看DistroClientDataProcessor.processData方法
@Override
public boolean processData(DistroData distroData) {switch (distroData.getType()) {case ADD:case CHANGE:ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class).deserialize(distroData.getContent(), ClientSyncData.class);//处理同步数据handlerClientSyncData(clientSyncData);return true;case DELETE:String deleteClientId = distroData.getDistroKey().getResourceKey();Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);clientManager.clientDisconnected(deleteClientId);return true;default:return false;}
}
然后来查看具体处理方法handlerClientSyncData
private void handlerClientSyncData(ClientSyncData clientSyncData) {Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());// 同步客户端连接clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());// 获取Client(此时注册到的是ConnectionBasedClient)Client client = clientManager.getClient(clientSyncData.getClientId());// 更新Client数据upgradeClient(client, clientSyncData);
}
DistroClientDataProcessor的upgradeClient方法,更新Client里的注册表信息,发布对应事件
private void upgradeClient(Client client, ClientSyncData clientSyncData) {List<String> namespaces = clientSyncData.getNamespaces();List<String> groupNames = clientSyncData.getGroupNames();List<String> serviceNames = clientSyncData.getServiceNames();List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();Set<Service> syncedService = new HashSet<>();for (int i = 0; i < namespaces.size(); i++) {Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));Service singleton = ServiceManager.getInstance().getSingleton(service);syncedService.add(singleton);InstancePublishInfo instancePublishInfo = instances.get(i);if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {client.addServiceInstance(singleton, instancePublishInfo);NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));}}for (Service each : client.getAllPublishedService()) {if (!syncedService.contains(each)) {client.removeServiceInstance(each);NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));}}
}
**注意:**这里要注意下此时的Client实现类ConnectionBasedClient,只不过它的isNative属性为false,这是非负责节点和负责节点的主要区别。
其实判断当前nacos节点是否为负责节点的依据就是这个isNative属性,如果是客户端直接注册在这个nacos节点上的ConnectionBasedClient,它的isNative属性为true;如果是由Distro协议,同步到这个nacos节点上的ConnectionBasedClient,它的isNative属性为false。
那其实我们都知道2.x的版本以后使用了长连接,所以通过长连接建立在哪个节点上,哪个节点就是责任节点,客户端也只会向这个责任节点发送请求。
Distro协议负责集群数据统一
Distro为了确保集群间数据一致,不仅仅依赖于数据发生改变时的实时同步,后台有定时任务做数据同步。
在1.x版本中,责任节点每5s同步所有Service的Instance列表的摘要(md5)给非责任节点,非责任节点用对端传来的服务md5比对本地服务的md5,如果发生改变,需要反查责任节点。
在2.x版本中,对这个流程做了改造,责任节点会发送Client全量数据,非责任节点定时检测同步过来的Client是否过期,减少1.x版本中的反查。
责任节点每5s向其他节点发送DataOperation=VERIFY类型的DistroData,来维持非责任节点的Client数据不过期。
//DistroVerifyTimedTask
@Override
public void run() {try {// 所有其他节点List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("server list is: {}", targetServer);}for (String each : distroComponentHolder.getDataStorageTypes()) {// 遍历想这些节点发送Client.isNative=true的DistroData,type = VERIFYverifyForDataStorage(each, targetServer);}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e);}
}
非责任节点每5s扫描isNative=false的client,如果client30s内没有被VERIFY的DistroData更新过续租时间,会删除这个同步过来的Client数据。
//ConnectionBasedClientManager->ExpiredClientCleaner
private static class ExpiredClientCleaner implements Runnable {private final ConnectionBasedClientManager clientManager;public ExpiredClientCleaner(ConnectionBasedClientManager clientManager) {this.clientManager = clientManager;}@Overridepublic void run() {long currentTime = System.currentTimeMillis();for (String each : clientManager.allClientId()) {ConnectionBasedClient client = (ConnectionBasedClient) clientManager.getClient(each);if (null != client && client.isExpire(currentTime)) {clientManager.clientDisconnected(each);}}}
}
-------------------------------------------------------------------------------------------
@Override
public boolean isExpire(long currentTime) {// 判断30s内没有续租 认为过期return !isNative() && currentTime - getLastRenewTime() > ClientConfig.getInstance().getClientExpiredTime();
}
相关文章:
Spring Cloud Nacos源码讲解(十)- Nacos服务端服务发现处理
Nacos集群数据同步 当我们有服务进行注册以后,会写入注册信息同时会触发ClientChangedEvent事件,通过这个事件,就会开始进行Nacos的集群数据同步,当然这其中只有有一个Nacos节点来处理对应的客户端请求,其实这其中…...
C++ 修改程序进程的优先级(Linux,Windows)
文章目录1、Linux1.1 常用命令1.1.1 不占用终端运行和后台运行方式1.1.2 查询进程1.1.3 结束进程1.1.4 优先级命令1.2 C 代码示例1.2.1 代码一1.2.2 代码二2、Windows2.1 简介2.2 函数声明2.3 C 代码示例2.3.1 代码一2.3.2 代码二结语1、Linux 1.1 常用命令 1.1.1 不占用终端…...
同步和异步promise
进程和线程进程(厂房):程序的运行环境线程(工人):进行运算的东西同步和异步同步:一件事干完才去干下一件事,前面的代码不执行,后面的代码也不执行。同步的代码可能会出现…...
CHATGPT是新的“搜索引擎终结者”吗?百度是否慌了
ChatGPT 以其非凡的自然语言处理 (NLP) 能力和清晰的响应风靡全球,有望带来一场重大的技术革命。在不知不觉中,叙事转向了ChatGPT与百度的对决,因为来自OpenAI的智能和健谈的聊天机器人已经慢慢获得了“潜在的百度终结…...
力扣-订单最多的客户
大家好,我是空空star,本篇带大家了解一道简单的力扣sql练习题。 文章目录前言一、题目:586. 订单最多的客户二、解题1.正确示范①提交SQL运行结果2.正确示范②提交SQL运行结果3.正确示范③提交SQL运行结果4.正确示范④提交SQL运行结果5.其他总…...
MyBatis学习笔记(六) —— MyBatis的各种查询功能
6、MyBatis的各种查询功能 6.1、查询一个实体类对象 SelectMapper.java接口 /*** 根据用户id查询用户信息* param id* return*/ User getUserById(Param("id") int id);SelectMapper.xml <!--User getUserById(Param("id") int id)--> <selec…...
2023年最新详细教程!手把手教你搭建Hexo + GitLab个人博客
文章目录前言一、安装和配置环境1.安装 Git2.安装 Node.js二、新建博客项目1.GitLab配置CI/CD自动化部署1.1 GitLab新建项目1.2 GitLab自建Runners1.2.1 下载gitlab-runner1.2.2 注册Runners1.2.3 安装Runners并启动1.3 添加.gitlab-ci.yml文件2.拉取和推送hexo blog2.1 拉取he…...
centos7安装
centos7安装制作U盘启动盘下载镜像下载 UltralISO制作启动盘使用U盘安装系统修改模式为 UEFI调整BOOT option保存重启进入安装界面安装图形界面安装搜狗输入法制作U盘启动盘 下载镜像 去官网下载镜像,找到 mirrors链接(速度快) 选择一个中…...
java String类(超详细,含常用方法、面试题,内存图,案例)
String类一、String类的特点二、String 类的常见构造方法三、String常见的面试题1.字符串常量池2.String s "abc"与String s new String("abc")区别3.字符拼接4.常量优化机制四、String常用方法1. 比较字符串内容2. 遍历字符串3.截取字符串4.替换字符串5…...
哈希表以及哈希冲突
目录 哈希表 哈希冲突 1. 冲突发生 2. 比较常见的哈希函数 3. 负载因子调节(重点) 散列表的载荷因子概念 负载因子和冲突率的关系 冲突-解决-闭散列 线性探测 二次探测 冲突-解决-开散列 结尾 我们在前面讲解了TerrMap(Set)的底层是一个搜索…...
测试——基本概念
概念 测试和调试有以下几点区别: 测试是测试人员进行的工作,调试是开发人员调试是发现并解决问题,测试只是发现问题测试贯穿于整个项目的生命周期,而调试主要在编码阶段 测试人员一般有如下的工作: 需求分析&#x…...
SnowFlake 雪花算法和原理(分布式 id 生成算法)
一、概述 SnowFlake 算法:是 Twitter 开源的分布式 id 生成算法。核心思想:使用一个 64 bit 的 long 型的数字作为全局唯一 id。算法原理最高位是符号位,始终为0,不可用。41位的时间序列,精确到毫秒级,41位…...
【死磕数据库专栏】MySQL对数据库增删改查的基本操作
前言 本文是专栏【死磕数据库专栏】的第二篇文章,主要讲解MySQL语句最常用的增删改查操作。我一直觉得这个世界就是个程序,每天都在执行增删改查。 MySQL 中我们最常用的增删改查,对应SQL语句就是 insert 、delete、update、select…...
阿里软件测试二面:adb 连接 Android 手机的两种方式,看完你就懂了
前言 随着现在移动端技术的突飞猛进,导致现在市场上,APP 应用数不胜数,那对于测试工程师而言,对于 APP 的测试,那基本就是一个必修课了。 今天,我就来给大家介绍一下,adb 连接 Android 手机的两…...
Docker安装YApi
目录0、Docker 环境准备1、数据库准备 MongoDB2、启动 YAPI3、官网教程0、Docker 环境准备 Docker 容器之间网络互通需要使用 docker network create yapi 创建一个自定义网络 docker network create yapi1、数据库准备 MongoDB YAPI 的数据库是 MongoDB,准备镜像…...
springboot自定义参数解析器
为什么要自定义参数解析器呢? 因为很多项目每次获取用户信息,需要重复从请求头中获取token,用token再去redis或是sql中去拿到存储的计本对象,再将获取到的Json数据,转化为我们需要的对象等代码,作为一名程…...
Python Unittest ddt数据驱动
1、数据驱动介绍: ddt.ddt(类装饰器,申明当前类使用ddt框架)ddt.data(函数装饰器,用于给测试用例传递数据),支持传python所有数据类型:数字(int,…...
Vue自定义组件遇到分页传输数据不正确解决办法
测试环境 Vue3 Element Plus 遇到问题 <el-table:data"tableData">...其他el-table-column<template #default"scope">// 自定义组件<my-button name"编辑" :id"scope.row.id"/ ></template></el-table&…...
ABAP 辨析CO|CN|CA|NA|CS|NS|CP|NP
1、文档说明 本篇文档将通过举例,解析字符的比较运算符之间的用法和区别,涉及到的操作符:CO|CN|CA|NA|CS|NS|CP|NP 2、用法和区别 用法总览 以下举例,几乎都使用一个字符变量和一个硬编码字符进行对比的方式,忽略尾…...
RK3568平台开发系列讲解(设备驱动篇)Pinctrl子系统详解
🚀返回专栏总目录 文章目录 一、pinctrl子系统结构描述二、重要的概念三、主要的数据结构和接口沉淀、分享、成长,让自己和他人都能有所收获!😄 📢我们知道在许多soc内部包含有多个pin控制器,通过pin控制器的寄存器,我们可以配置一个或者一组引脚的功能和特性。Linux…...
使用VSCode开发Django指南
使用VSCode开发Django指南 一、概述 Django 是一个高级 Python 框架,专为快速、安全和可扩展的 Web 开发而设计。Django 包含对 URL 路由、页面模板和数据处理的丰富支持。 本文将创建一个简单的 Django 应用,其中包含三个使用通用基本模板的页面。在此…...
无法与IP建立连接,未能下载VSCode服务器
如题,在远程连接服务器的时候突然遇到了这个提示。 查阅了一圈,发现是VSCode版本自动更新惹的祸!!! 在VSCode的帮助->关于这里发现前几天VSCode自动更新了,我的版本号变成了1.100.3 才导致了远程连接出…...
java调用dll出现unsatisfiedLinkError以及JNA和JNI的区别
UnsatisfiedLinkError 在对接硬件设备中,我们会遇到使用 java 调用 dll文件 的情况,此时大概率出现UnsatisfiedLinkError链接错误,原因可能有如下几种 类名错误包名错误方法名参数错误使用 JNI 协议调用,结果 dll 未实现 JNI 协…...
【Oracle】分区表
个人主页:Guiat 归属专栏:Oracle 文章目录 1. 分区表基础概述1.1 分区表的概念与优势1.2 分区类型概览1.3 分区表的工作原理 2. 范围分区 (RANGE Partitioning)2.1 基础范围分区2.1.1 按日期范围分区2.1.2 按数值范围分区 2.2 间隔分区 (INTERVAL Partit…...
稳定币的深度剖析与展望
一、引言 在当今数字化浪潮席卷全球的时代,加密货币作为一种新兴的金融现象,正以前所未有的速度改变着我们对传统货币和金融体系的认知。然而,加密货币市场的高度波动性却成为了其广泛应用和普及的一大障碍。在这样的背景下,稳定…...
Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信
文章目录 Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信前言一、网络通信基础概念二、服务端与客户端的完整流程图解三、每一步的详细讲解和代码示例1. 创建Socket(服务端和客户端都要)2. 绑定本地地址和端口&#x…...
代码随想录刷题day30
1、零钱兑换II 给你一个整数数组 coins 表示不同面额的硬币,另给一个整数 amount 表示总金额。 请你计算并返回可以凑成总金额的硬币组合数。如果任何硬币组合都无法凑出总金额,返回 0 。 假设每一种面额的硬币有无限个。 题目数据保证结果符合 32 位带…...
【JavaSE】多线程基础学习笔记
多线程基础 -线程相关概念 程序(Program) 是为完成特定任务、用某种语言编写的一组指令的集合简单的说:就是我们写的代码 进程 进程是指运行中的程序,比如我们使用QQ,就启动了一个进程,操作系统就会为该进程分配内存…...
GitHub 趋势日报 (2025年06月06日)
📊 由 TrendForge 系统生成 | 🌐 https://trendforge.devlive.org/ 🌐 本日报中的项目描述已自动翻译为中文 📈 今日获星趋势图 今日获星趋势图 590 cognee 551 onlook 399 project-based-learning 348 build-your-own-x 320 ne…...
NPOI Excel用OLE对象的形式插入文件附件以及插入图片
static void Main(string[] args) {XlsWithObjData();Console.WriteLine("输出完成"); }static void XlsWithObjData() {// 创建工作簿和单元格,只有HSSFWorkbook,XSSFWorkbook不可以HSSFWorkbook workbook new HSSFWorkbook();HSSFSheet sheet (HSSFSheet)workboo…...
