nacos1.4源码-服务发现、心跳机制
nacos的服务发现主要采用服务端主动推送+客户端定时拉取;心跳机制通过每5s向服务端发送心跳任务来保活,当超过15s服务端未接收到心跳任务时,将该实例设置为非健康状态;当超过30s时,删除该实例。
1.服务发现
nacos主要采用服务端主动推送+客户端定时拉取来保证AP架构的高可用性。
NacosNamingService:服务发现的接口(客户端!)
每个nacos实例有本地缓存Map,存放所有的实例
调用查询服务实例列表
/nacos`/v1/ns/instance/list
NacosNamingService.getAllInstances() -> getServiceInfo()
核心方法是getServiceInfo,代码如下:
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());String key = ServiceInfo.getKey(serviceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);// 从本地拿实例if (null == serviceObj) {// 如果拿到为空,则更新服务列表serviceObj = new ServiceInfo(serviceName, clusters);serviceInfoMap.put(serviceObj.getKey(), serviceObj);updatingMap.put(serviceName, new Object());updateServiceNow(serviceName, clusters);updatingMap.remove(serviceName);} else if (updatingMap.containsKey(serviceName)) {if (UPDATE_HOLD_INTERVAL > 0) {// hold a moment waiting for update finishsynchronized (serviceObj) {try {serviceObj.wait(UPDATE_HOLD_INTERVAL);} catch (InterruptedException e) {NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);}}}}scheduleUpdateIfAbsent(serviceName, clusters);// 定时拉取return serviceInfoMap.get(serviceObj.getKey());// 从map拿}
本地从服务端拉取服务实例,放到本地缓存中
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)throws NacosException {final Map<String, String> params = new HashMap<String, String>(8);params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, serviceName);params.put("clusters", clusters);params.put("udpPort", String.valueOf(udpPort));params.put("clientIP", NetUtils.localIP());params.put("healthyOnly", String.valueOf(healthyOnly));return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);// 调用接口}
客户端通过定时任务-->定时拉取服务端的实例,更新本地缓存(在发起服务调用的时候,才会执行定时任务!)
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {return;}synchronized (futureMap) {if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {return;}ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);}}
2.心跳机制
NamingService->register
注册实例的时候,客户端会有一个心跳任务,定时5s向服务端发送心跳!
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
BeatTask就是心跳任务
调用Instance/beat接口,发送实例心跳检查
BeatTask#run的代码如下:
@Overridepublic void run() {if (beatInfo.isStopped()) {return;}long nextTime = beatInfo.getPeriod();try {JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);long interval = result.get("clientBeatInterval").asLong();boolean lightBeatEnabled = false;if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();}BeatReactor.this.lightBeatEnabled = lightBeatEnabled;if (interval > 0) {nextTime = interval;}int code = NamingResponseCode.OK;if (result.has(CommonParams.CODE)) {code = result.get(CommonParams.CODE).asInt();}if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {Instance instance = new Instance();instance.setPort(beatInfo.getPort());instance.setIp(beatInfo.getIp());instance.setWeight(beatInfo.getWeight());instance.setMetadata(beatInfo.getMetadata());instance.setClusterName(beatInfo.getCluster());instance.setServiceName(beatInfo.getServiceName());instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(true);try {serverProxy.registerService(beatInfo.getServiceName(),NamingUtils.getGroupName(beatInfo.getServiceName()), instance);} catch (Exception ignore) {}}} catch (NacosException ex) {NAMING_LOGGER.warn("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());} catch (Exception unknownEx) {NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, unknown exception msg: {}",JacksonUtils.toJson(beatInfo), unknownEx.getMessage(), unknownEx);} finally {executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);// 定时任务}}
服务端会更新心跳的时间!每隔5秒检查一下实例健康状态
对于集群而言,在一台服务端执行定时任务即可!不需要再所有节点都执行定时任务
当实例下线后,服务端通过InstanceController,检查健康状态(健康任务检查)。代码入口:
createEmptyService->createServiceIfAbsent->putServiceAndInit->init->scheduleCheck(定时任务)
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {// 心跳时间判断
如果超过15秒,设置实例为非健康(更新健康状态)
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {// 如果已经过去了30s,直接删掉这个机器(删除实例)
asyncHttpRequest(url, headers, paramValues, callback, HttpMethod.DELETE);
核心代码如下:
@Overridepublic void run() {try {if (!getDistroMapper().responsible(service.getName())) {return;}if (!getSwitchDomain().isHealthCheckEnabled()) {return;}List<Instance> instances = service.allIPs(true);// 获取服务端的所有实例// first set health status of instances:for (Instance instance : instances) {if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {// 心跳时间判断if (!instance.isMarked()) {if (instance.isHealthy()) {instance.setHealthy(false);// 如果超过15s,健康设置为falseLoggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",instance.getIp(), instance.getPort(), instance.getClusterName(),service.getName(), UtilsAndCommons.LOCALHOST_SITE,instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());getPushService().serviceChanged(service);ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));}}}}if (!getGlobalConfig().isExpireInstance()) {return;}// then remove obsolete instances:for (Instance instance : instances) {if (instance.isMarked()) {continue;}if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {// delete instanceLoggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),JacksonUtils.toJson(instance));deleteIp(instance);}}} catch (Exception e) {Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);}}
3.服务事件变动
客户端每隔5秒从服务端拉取一次信息,是不是有延迟!
对于AP架构而言,有一点延迟 无所谓。Ribbon会进行重试
当服务端的服务列表变化时,服务端会主动推送给客户端信息
核心代码入口:notifier#run->handle->onChange
onChange更新注册表的相关代码!
getPushService().serviceChanged(this);// 发布 服务变化 的事件,通知客户端
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());for (String clusterName : clusterMap.keySet()) {ipMap.put(clusterName, new ArrayList<>());}for (Instance instance : instances) {try {if (instance == null) {Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");continue;}if (StringUtils.isEmpty(instance.getClusterName())) {instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);}if (!clusterMap.containsKey(instance.getClusterName())) {Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",instance.getClusterName(), instance.toJson());Cluster cluster = new Cluster(instance.getClusterName(), this);cluster.init();getClusterMap().put(instance.getClusterName(), cluster);}List<Instance> clusterIPs = ipMap.get(instance.getClusterName());if (clusterIPs == null) {clusterIPs = new LinkedList<>();ipMap.put(instance.getClusterName(), clusterIPs);}clusterIPs.add(instance);} catch (Exception e) {Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);}}for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {//make every ip mineList<Instance> entryIPs = entry.getValue();clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);// 核心注册逻辑}setLastModifiedMillis(System.currentTimeMillis());getPushService().serviceChanged(this);// 发布 服务变化 的事件StringBuilder stringBuilder = new StringBuilder();for (Instance instance : allIPs()) {stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");}Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),stringBuilder.toString());}
(通过全文搜索事件,找到这个方法)PushService#onApplicationEvent,它来处理事件。代码如下:
udpPush(ackEntry);// 服务端给客户端发送udp信息,通知客户端实例变动
@Override
public void onApplicationEvent(ServiceChangeEvent event) {Service service = event.getService();String serviceName = service.getName();String namespaceId = service.getNamespaceId();Future future = GlobalExecutor.scheduleUdpSender(() -> {try {Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");ConcurrentMap<String, PushClient> clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));if (MapUtils.isEmpty(clients)) {return;}Map<String, Object> cache = new HashMap<>(16);long lastRefTime = System.nanoTime();for (PushClient client : clients.values()) {if (client.zombie()) {Loggers.PUSH.debug("client is zombie: " + client.toString());clients.remove(client.toString());Loggers.PUSH.debug("client is zombie: " + client.toString());continue;}Receiver.AckEntry ackEntry;Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());byte[] compressData = null;Map<String, Object> data = null;if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);compressData = (byte[]) (pair.getValue0());data = (Map<String, Object>) pair.getValue1();Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());}if (compressData != null) {ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);} else {ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);if (ackEntry != null) {cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));}}Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",client.getServiceName(), client.getAddrStr(), client.getAgent(),(ackEntry == null ? null : ackEntry.key));udpPush(ackEntry);// 给客户端发送udp信息}} catch (Exception e) {Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);} finally {futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));}}, 1000, TimeUnit.MILLISECONDS);futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);}
客户端接收udp报文的代码如下:
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());String key = ServiceInfo.getKey(serviceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);// 从本地拿实例if (null == serviceObj) {// 如果拿到为空,则更新服务列表serviceObj = new ServiceInfo(serviceName, clusters);serviceInfoMap.put(serviceObj.getKey(), serviceObj);updatingMap.put(serviceName, new Object());updateServiceNow(serviceName, clusters);// upd的端口
相关文章:
nacos1.4源码-服务发现、心跳机制
nacos的服务发现主要采用服务端主动推送客户端定时拉取;心跳机制通过每5s向服务端发送心跳任务来保活,当超过15s服务端未接收到心跳任务时,将该实例设置为非健康状态;当超过30s时,删除该实例。 1.服务发现 nacos主要采…...
C++ 2D平台游戏开发案例
关于2D平台游戏的C开发案例,包括游戏设计、实现细节、图形渲染和音效处理等内容。虽然无法一次性提供3000字,但我会尽量详细描述各个部分,并确保有足够的深度和广度。 2D平台游戏开发案例 一、游戏设计 游戏概述 游戏名称:“冒险…...
【Webpack--019】TreeShaking
🤓😍Sam9029的CSDN博客主页:Sam9029的博客_CSDN博客-前端领域博主 🐱🐉若此文你认为写的不错,不要吝啬你的赞扬,求收藏,求评论,求一个大大的赞!👍* &#x…...
Docker基本操作命令
Docker 是一个开源的应用容器引擎,允许开发者打包应用以及其依赖包到一个可移植的容器中,然后发布到任何流行的 Linux 机器上,也可以实现虚拟化。容器是完全使用沙箱机制,相互之间不会有任何接口。主要功能是为开发者提供一个简单…...
开源计算器应用的全面测试计划:确保功能性和可靠性
✅作者简介:2022年博客新星 第八。热爱国学的Java后端开发者,修心和技术同步精进。 🍎个人主页:Java Fans的博客 🍊个人信条:不迁怒,不贰过。小知识,大智慧。 💞当前专栏…...
uni.requestPayment 支付成功之后会走 wx.onAppRoute
uni.requestPayment 是用于发起微信支付的统一接口,而 wx.onAppRoute 是用于监听小程序的路由变化。当 uni.requestPayment 支付成功后,如果发生了页面跳转或者其他路由变化,wx.onAppRoute 会被触发。这个行为是正常的,因为支付成…...
统⼀服务入口 - Gateway
网关介绍 问题 在 spring cloud 体系中我们通过 Eureka,Nacos 解决了服务注册,服务发现的问题,使⽤Spring Cloud LoadBalance解决了负载均衡的问题,使⽤ OpenFeign 解决了远程调⽤的问题. 但是当前所有微服务的接⼝都是直接对外暴露的,可以直接通过外部访问.为了保证对外服务的…...
QGraphicsWidget Class
Header:#include < QGraphicsWidget > qmake:QT += widgets Since:Qt 4.4 Inherits:QGraphicsObject and QGraphicsLayoutItem Inherited By:QGraphicsProxyWidget This class was introduced in Qt 4.4. Public Types enum anonymous {Type }Properties autoFi…...
探讨最好用的AI工具:从日常到创新的应用
文章目录 引言常用AI工具1. 语音助手2. 图像识别软件3. 机器翻译工具4. 智能客服系统 创新AI应用1. 自动驾驶汽车2. 虚拟试衣间3. 医疗影像分析4. 个性化推荐系统 个人体验分享1. 通义灵码2. 文心一言3. 智能写作助手4. 智能家居设备5. DALLE6. Whisper7. Codex8. Gym9. ChatGP…...
Python系统教程005(字符串的格式化输出)
知识回顾 1、默认情况下,input函数接收的数据是字符串类型。 2、字符串类型的关键词是str。 3、\n和\t都是转义字符,\n用来换行,\t用来留出一段固定长度的空白。 4、type函数能够用来查看变量的数据类型 5、数据类型的转换,举…...
六款电脑远程控制软件分享,2024最热门软件合集,总有一款适合你!速来看!
想要随时随地控制自己的电脑? 无论你是办公需求,还是要远程协助他人,一款好用的远程控制软件绝对少不了。 2024年最热门的六款远程控制软件已经为你准备好,总有一款适合你,赶快往下看吧! 1. 安企神系统—…...
优质微信群不再难寻!掌握这些技巧就够了!
在当今信息爆炸的时代,微信群已成为人们交流思想、分享知识、建立人脉的重要平台。无论是专业领域的深入探讨,还是兴趣爱好的自由交流,微信群都能为你提供一个即时互动的虚拟空间。然而,面对海量的微信群信息,如何高效…...
python - mysql操作
Python MySQL 操作 1. 背景介绍 常见的Mysql驱动介绍: MySQL-python:也就是MySQLdb。是对C语言操作MySQL数据库的一个简单封装。遵循了Python DB API v2。但是只支持Python2,目前还不支持Python3。mysqlclient:是MySQL-python的…...
基于Springboot+Vue的服装生产管理信息系统设计与实现(含源码数据库)
1.开发环境 开发系统:Windows10/11 架构模式:MVC/前后端分离 JDK版本: Java JDK1.8 开发工具:IDEA 数据库版本: mysql5.7或8.0 数据库可视化工具: navicat 服务器: SpringBoot自带 apache tomcat 主要技术: Java,Springboot,mybatis,mysql,vue 2.视频演示地址 3.功能 在这个…...
75.【C语言】文件操作(2)
承接74.【C语言】文件操作(1)文章 目录 5.详细阐释文件的打开和关闭 1.流 2.标准流 3.文件指针 FILE 两层含义 附:FILE的头文件 4.操作文件的步骤 1.fopen函数 编辑 简写的全称查询 输入&输出的含义 2.fclose函数 3.代码示例 补充:绝对路径和相对路径 注意…...
Redis 使用记录
封装调用redis类 import redis from conf.config import RedisConfigclass RedisConfig:redis_json config_data[redis_config]redis_pwd env.get(project_name).get(pwd)host redis_json.get("host")dialog_states_db redis_json.get("dialog_states_db&q…...
IDEA实用小技巧
1. IDEA代码提示忽略大小写 打开设置,点击Editor–>General–>Code Completion ,然后将右侧的Match Case前面的选框去掉勾选。 2. 快速查找接口RestfulToolkitX插件 该插件可以快速查找接口(快捷键为CTRL\) 还会在侧边栏…...
PEI转染试剂对血清的敏感性研究
在细胞生物学和基因工程领域,聚乙烯亚胺(PEI)作为一种常用的转染试剂,广泛应用于基因的递送。然而,PEI转染试剂对血清的敏感性一直是研究的热点问题。转染过程中,血清作为培养基的成分之一,可能…...
手机怎样改网络ip地址?内容详尽实用
随着网络技术的发展,更改手机IP地址已成为一种常见需求。本文将详细介绍如何在不同网络环境下更改手机IP地址,包括移动网络和WiFi网络,以及同时适用于两种网络的方法,内容详尽实用,干货满满。 一、适用于移动网络&…...
使用Pybind11,Python调用C++动态库
最近学习了一下pybind11,使用python来调用C动态库的模式,在某些场景下有用,这里做一个记录。 环境准备 安装python,我这里安装的是3.12版本 下载Pybind11库,这是一个仅包含头文件的轻量级库,使用起来非常…...
[10-3]软件I2C读写MPU6050 江协科技学习笔记(16个知识点)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16...
Java求职者面试指南:Spring、Spring Boot、MyBatis框架与计算机基础问题解析
Java求职者面试指南:Spring、Spring Boot、MyBatis框架与计算机基础问题解析 一、第一轮提问(基础概念问题) 1. 请解释Spring框架的核心容器是什么?它在Spring中起到什么作用? Spring框架的核心容器是IoC容器&#…...
Yolov8 目标检测蒸馏学习记录
yolov8系列模型蒸馏基本流程,代码下载:这里本人提交了一个demo:djdll/Yolov8_Distillation: Yolov8轻量化_蒸馏代码实现 在轻量化模型设计中,**知识蒸馏(Knowledge Distillation)**被广泛应用,作为提升模型…...
GitHub 趋势日报 (2025年06月06日)
📊 由 TrendForge 系统生成 | 🌐 https://trendforge.devlive.org/ 🌐 本日报中的项目描述已自动翻译为中文 📈 今日获星趋势图 今日获星趋势图 590 cognee 551 onlook 399 project-based-learning 348 build-your-own-x 320 ne…...
MySQL 8.0 事务全面讲解
以下是一个结合两次回答的 MySQL 8.0 事务全面讲解,涵盖了事务的核心概念、操作示例、失败回滚、隔离级别、事务性 DDL 和 XA 事务等内容,并修正了查看隔离级别的命令。 MySQL 8.0 事务全面讲解 一、事务的核心概念(ACID) 事务是…...
jmeter聚合报告中参数详解
sample、average、min、max、90%line、95%line,99%line、Error错误率、吞吐量Thoughput、KB/sec每秒传输的数据量 sample(样本数) 表示测试中发送的请求数量,即测试执行了多少次请求。 单位,以个或者次数表示。 示例:…...
关于uniapp展示PDF的解决方案
在 UniApp 的 H5 环境中使用 pdf-vue3 组件可以实现完整的 PDF 预览功能。以下是详细实现步骤和注意事项: 一、安装依赖 安装 pdf-vue3 和 PDF.js 核心库: npm install pdf-vue3 pdfjs-dist二、基本使用示例 <template><view class"con…...
0x-3-Oracle 23 ai-sqlcl 25.1 集成安装-配置和优化
是不是受够了安装了oracle database之后sqlplus的简陋,无法删除无法上下翻页的苦恼。 可以安装readline和rlwrap插件的话,配置.bahs_profile后也能解决上下翻页这些,但是很多生产环境无法安装rpm包。 oracle提供了sqlcl免费许可,…...
GraphQL 实战篇:Apollo Client 配置与缓存
GraphQL 实战篇:Apollo Client 配置与缓存 上一篇:GraphQL 入门篇:基础查询语法 依旧和上一篇的笔记一样,主实操,没啥过多的细节讲解,代码具体在: https://github.com/GoldenaArcher/graphql…...
若依项目部署--传统架构--未完待续
若依项目介绍 项目源码获取 #Git工具下载 dnf -y install git #若依项目获取 git clone https://gitee.com/y_project/RuoYi-Vue.git项目背景 随着企业信息化需求的增加,传统开发模式存在效率低,重复劳动多等问题。若依项目通过整合主流技术框架&…...
