当前位置: 首页 > news >正文

深入解析:Nacos AP 模式的实现原理与应用场景

优质博文:IT-BLOG-CN

一、CAP 基础

‌Nacos作为注册中心同时支持CPAP模式。‌ Nacos通过不同的协议和机制来实现这两种模式,以满足不同的需求场景。

Nacos中,默认情况下使用的是AP模式,通过Distro协议来实现。AP模式主要关注高可用性,在网络分区时仍然保持服务,但可能会允许短暂的数据不一致。

此外,Nacos也支持CP模式,通过Raft协议来实现。CP模式在网络分区时牺牲可用性以保证数据一致性,适用于对数据准确性要求高的场景,如金融行业。

具体实现上,Nacos通过客户端设置spring.cloud.nacos.discovery.ephemeral的值为false来启用CP模式,默认为true则为AP模式。此外,NacosgRPC通信端口和集群节点之间的通信端口也有所不同,分别用于APCP模式的实现‌。

二、Nacos AP 实现原理

Distro协议Distro是阿里巴巴开源的一个动态服务发现、配置管理和服务管理平台。目前流行的Nacos服务管理框架就采用了Distro协议。Distro协议被定位为临时数据的一致性协议 :该类型协议, 不需要把数据存储到磁盘或者数据库,因为临时数据通常和服务器保持一个session会话,该会话只要存在,数据就不会丢失。

Distro协议保证写必须永远是成功的,即使可能会发生网络分区。当网络恢复时,把各数据分片的数据进行合并。

Distro协议具有以下特点:
【1】数据分片Sharding
   ☑️ 将数据根据某种规则(如哈希)分片,每个节点负责一部分数据的存储和管理。
   ☑️ 这种分片策略可以有效地分散负载,避免单点瓶颈。

【2】数据复制Replication
   ☑️ 在每个节点上维护一份完整的数据副本,确保数据在节点之间的一致性。
   ☑️ 使用异步复制的方式,在数据更新时将更新信息广播给其他节点。

【3】一致性管理:
   ☑️ 采用最终一致性模型,确保数据在一定时间内达到一致。
   ☑️ 使用版本号或时间戳来管理数据的更新,确保数据的最新状态能够传播到所有节点。

【4】心跳检测和故障转移:
   ☑️ 定期进行心跳检测,确保节点的健康状态。
   ☑️ 在节点故障时,其他节点能够快速接管其数据和职责,确保系统的高可用性。

Distro协议服务端节点发现使用寻址机制来实现服务端节点的管理。在Nacos中,寻址模式有三种:

Nacos服务启动的时候ServerMemberManager这个类专门对集群节点进行管理的,这个类在init方法中就会对集群进行初始化

protected void init() throws NacosException {Loggers.CORE.info("Nacos-related cluster resource initialization");// 得到当前nacos服务的端口号this.port = EnvUtil.getProperty("server.port", Integer.class, 8848);// 得到当前nacos服务的地址this.localAddress = InetUtils.getSelfIP() + ":" + port;// 解析地址得到当前nacos服务所对应的集群节点对象this.self = MemberUtil.singleParse(this.localAddress);// 给当前nacos服务设置一个版本号this.self.setExtendVal(MemberMetaDataConstants.VERSION, VersionUtils.version);// 把自己放到serverList中serverList.put(self.getAddress(), self);// 该方法做了两件事// 1.注册了一个集群节点信息变更事件// 2.注册了订阅IPChangeEvent事件的事件订阅者registerClusterEvent();// 初始化节点地址寻址模式// 在这里就可以通过配置的节点地址去初始化整个nacos集群节点集合了initAndStartLookup();if (serverList.isEmpty()) {throw new NacosException(NacosException.SERVER_ERROR, "cannot get serverlist, so exit.");}Loggers.CORE.info("The cluster resource is initialized");
}

注册了订阅IPChangeEvent事件的事件订阅者registerClusterEvent:该方法做了两件事:
【1】注册了一个集群节点信息变更事件。
【2】注册了订阅IPChangeEvent事件的事件订阅者。

private void registerClusterEvent() {// 注册一个集群节点信息变更事件NotifyCenter.registerToPublisher(MembersChangeEvent.class,EnvUtil.getProperty("nacos.member-change-event.queue.size", Integer.class, 128));// 注册一个事件订阅者,订阅的事件类型是IPChangeEventNotifyCenter.registerSubscriber(new Subscriber<InetUtils.IPChangeEvent>() {@Overridepublic void onEvent(InetUtils.IPChangeEvent event) {String newAddress = event.getNewIP() + ":" + port;ServerMemberManager.this.localAddress = newAddress;EnvUtil.setLocalAddress(localAddress);Member self = ServerMemberManager.this.self;self.setIp(event.getNewIP());String oldAddress = event.getOldIP() + ":" + port;ServerMemberManager.this.serverList.remove(oldAddress);ServerMemberManager.this.serverList.put(newAddress, self);ServerMemberManager.this.memberAddressInfos.remove(oldAddress);ServerMemberManager.this.memberAddressInfos.add(newAddress);}@Overridepublic Class<? extends Event> subscribeType() {return InetUtils.IPChangeEvent.class;}});
}

通过上述方法分析可知:注册一个MembersChangeEvent事件,而对应的事件订阅者是ServerListManager。同时该方法还会注册一个IPChangeEvent事件的事件订阅者,IPChangeEvent这个事件就是当前节点IP发生变更之后发布的,该事件发布之后会被这个注册的订阅者所捕获,该订阅者做的事情也很简单,就是对集群节点集合中对应当前节点的ip进行更新就行了。

这一行代码就是初始化集群的关键: 创建nacos集群节点寻址器

private void initAndStartLookup() throws NacosException {this.lookup = LookupFactory.createLookUp(this);this.lookup.start();
}

先是通过LookupFactory创建一个节点寻址器,然后调用start方法启动这个节点寻址器。

通过上图可知Nacos配置集群节点地址的时候有两种方式:读取本地配置文件通过配置服务器

/*** 创建nacos集群节点寻址器** @param memberManager {@link ServerMemberManager}* @return {@link MemberLookup}* @throws NacosException NacosException*/
public static MemberLookup createLookUp(ServerMemberManager memberManager) throws NacosException {// 条件成立:当前nacos节点是集群模式if (!EnvUtil.getStandaloneMode()) {// 从配置环境中获取nacos集群节点的寻址方式String lookupType = EnvUtil.getProperty(LOOKUP_MODE_TYPE);LookupType type = chooseLookup(lookupType);// 得到对应的节点寻址器,FileConfigMemberLookup / AddressServerMemberLookupLOOK_UP = find(type);currentLookupType = type;}// 条件成立:当前nacos节点是单机模式else {LOOK_UP = new StandaloneMemberLookup();}LOOK_UP.injectMemberManager(memberManager);Loggers.CLUSTER.info("Current addressing mode selection : {}", LOOK_UP.getClass().getSimpleName());return LOOK_UP;
}

创建nacos集群节点寻址器(参考流程图)

/*** 创建nacos集群节点寻址器** @param memberManager {@link ServerMemberManager}* @return {@link MemberLookup}* @throws NacosException NacosException*/
public static MemberLookup createLookUp(ServerMemberManager memberManager) throws NacosException {// 条件成立:当前nacos节点是集群模式if (!EnvUtil.getStandaloneMode()) {// 从配置环境中获取nacos集群节点的寻址方式String lookupType = EnvUtil.getProperty(LOOKUP_MODE_TYPE);LookupType type = chooseLookup(lookupType);// 得到对应的节点寻址器,FileConfigMemberLookup / AddressServerMemberLookupLOOK_UP = find(type);currentLookupType = type;}// 条件成立:当前nacos节点是单机模式else {LOOK_UP = new StandaloneMemberLookup();}LOOK_UP.injectMemberManager(memberManager);Loggers.CLUSTER.info("Current addressing mode selection : {}", LOOK_UP.getClass().getSimpleName());return LOOK_UP;
}

怎么确定是使用FileConfigMemberLookup还是AddressServerMemberLookup,主要看是否配置了lookupType,如果没有配置lookupType就按照默认的先看是否配置了集群配置文件。

private static LookupType chooseLookup(String lookupType) {if (StringUtils.isNotBlank(lookupType)) {LookupType type = LookupType.sourceOf(lookupType);if (Objects.nonNull(type)) {return type;}}// 代码来到这里说明没有配置lookupType,此时会默认去寻找user.home/nacos/conf/cluster.conf文件File file = new File(EnvUtil.getClusterConfFilePath());// 条件成立:集群配置文件存在,或者环境变量配置了集群节点地址if (file.exists() || StringUtils.isNotBlank(EnvUtil.getMemberList())) {// 返回文件寻址模式return LookupType.FILE_CONFIG;}// 返回服务器寻址模式return LookupType.ADDRESS_SERVER;
}

确定了对应的节点寻址器,FileConfigMemberLookup / AddressServerMemberLookup后获取对应的节点地址。如果当前节点是集群模式,那么会去判断${user.home}/nacos/conf/cluster.conf这个文件是否存在或者环境变量中是否配置了集群节点地址,如果两者有一个成立就是文件寻址模式,反之是服务器寻址模式。

private static MemberLookup find(LookupType type) {// 条件成立:集群配置方式是文件配置的方式if (LookupType.FILE_CONFIG.equals(type)) {// 创建一个FileConfigMemberLookup对象并返回LOOK_UP = new FileConfigMemberLookup();return LOOK_UP;}// 条件成立:集群配置方式是通过服务器获取节点地址的方式if (LookupType.ADDRESS_SERVER.equals(type)) {LOOK_UP = new AddressServerMemberLookup();return LOOK_UP;}// unpossible to run herethrow new IllegalArgumentException();
}

如何初始化集群:
【1】单机模式: StandaloneMemberLookup
【2】文件模式: FileConfigMemberLookup利用监控cluster.conf文件的变动实现节点的管理。核心代码如下:

public class FileConfigMemberLookup extends AbstractMemberLookup {/*** 文件监听回调*/private FileWatcher watcher = new FileWatcher() {/*** 当对应的目录或者文件发生变更的时候会回调该方法* @param event {@link FileChangeEvent}*/@Overridepublic void onChange(FileChangeEvent event) {readClusterConfFromDisk();}@Overridepublic boolean interest(String context) {return StringUtils.contains(context, "cluster.conf");}};@Overridepublic void start() throws NacosException {if (start.compareAndSet(false, true)) {// 从文件中读取集群节点地址readClusterConfFromDisk();try {// 使用notify机制监控文件的变化,并自动触发读取cluster.confWatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher);} catch (Throwable e) {Loggers.CLUSTER.error("An exception occurred in the launch file monitor : {}", e.getMessage());}}}@Overridepublic void destroy() throws NacosException {WatchFileCenter.deregisterWatcher(EnvUtil.getConfPath(), watcher);}/*** 从cluster.conf文件中读取集群节点地址*/private void readClusterConfFromDisk() {Collection<Member> tmpMembers = new ArrayList<>();try {// 获取到cluster.conf文件中配置的节点地址列表List<String> tmp = EnvUtil.readClusterConf();// 把这些节点地址分别转换成对应的集群节点对象tmpMembers = MemberUtil.readServerConf(tmp);} catch (Throwable e) {Loggers.CLUSTER.error("nacos-XXXX [serverlist] failed to get serverlist from disk!, error : {}", e.getMessage());}afterLookup(tmpMembers);}
}

start方法中先调用readClusterConfFromDisk方法,这个方法会读取${user.home}/nacos/conf/cluster.conf这个文件中配置的节点地址,读取到之后把这些节点地址转化为对应的Member对象,一个Member对象就代表一个节点,接着会调用父类AbstractMemberLookupafterLookup方法。

public void afterLookup(Collection<Member> members) {this.memberManager.memberChange(members);
}

调用的是集群节点管理器的menberChange方法,同时把上面从cluster.conf文件中读取到的Member节点集合作为方法参数:

/*** 当nacos节点启动 或者 每次配置的集群节点地址发生改变的时候就会调用到该方法* @param members   当前最新的集群节点地址* @return  返回true表示集群节点数量发生改变了,反之表示没改变*/
synchronized boolean memberChange(Collection<Member> members) {if (members == null || members.isEmpty()) {return false;}// 配置的集群节点地址是否包含当前nacos节点boolean isContainSelfIp = members.stream().anyMatch(ipPortTmp -> Objects.equals(localAddress, ipPortTmp.getAddress()));if (isContainSelfIp) {isInIpList = true;} else {isInIpList = false;members.add(this.self);Loggers.CLUSTER.warn("[serverlist] self ip {} not in serverlist {}", self, members);}// If the number of old and new clusters is different, the cluster information// must have changed; if the number of clusters is the same, then compare whether// there is a difference; if there is a difference, then the cluster node changes// are involved and all recipients need to be notified of the node change eventboolean hasChange = members.size() != serverList.size();ConcurrentSkipListMap<String, Member> tmpMap = new ConcurrentSkipListMap<>();Set<String> tmpAddressInfo = new ConcurrentHashSet<>();for (Member member : members) {final String address = member.getAddress();if (!serverList.containsKey(address)) {hasChange = true;// 如果cluster.conf或address-server中的cluster信息被更改,而对应的nacos-server还没有启动,则成员的状态应该设置为DOWN,// 如果相应的nacos-server已经启动,则在几秒钟后检测到该成员的状态将被设置为UPmember.setState(NodeState.DOWN);} else {//fix issue # 4925member.setState(serverList.get(address).getState());}// Ensure that the node is created only oncetmpMap.put(address, member);if (NodeState.UP.equals(member.getState())) {tmpAddressInfo.add(address);}}// 更新serverList为最新的集群节点集合serverList = tmpMap;// 更新memberAddressInfos为最新的集群节点地址memberAddressInfos = tmpAddressInfo;// 获取更新之后集群所有的节点对象Collection<Member> finalMembers = allMembers();Loggers.CLUSTER.warn("[serverlist] updated to : {}", finalMembers);// Persist the current cluster node information to cluster.conf// <important> need to put the event publication into a synchronized block to ensure// that the event publication is sequential// 条件成立:1.集群中有节点增加了//          2.集群中有节点下线了//          3.手动增加或者删除了节点配置地址信息if (hasChange) {// 把最新的节点写入到配置中(cluster.conf或者address-server)MemberUtil.syncToFile(finalMembers);// 发布一个MembersChangeEvent事件Event event = MembersChangeEvent.builder().members(finalMembers).build();NotifyCenter.publishEvent(event);}return hasChange;
}

如果是集群节点的数量发生改变的话,就会发布一个MembersChangeEvent事件,而这个事件对应的订阅者是ServerListManager这个类,在这个类中也保存了整个nacos集群所有的节点集合,在回调它的订阅方法时很简单就是把这个集合属性重新赋值,代码如下:

/*** 当集群中的其他节点发生变化的时候会当前nacos节点就会发布一个MembersChangeEvent事件,然后会调用该方法更新最新的集群信息集合* @param event MembersChangeEvent*/
@Override
public void onEvent(MembersChangeEvent event) {// 把最新的集群节点集合重新赋值到serversthis.servers = new ArrayList<>(event.getMembers());
}

读取配置文件的过程已经分析完后,对cluster.conf文件注册一个监听器,当cluster.conf文件发生变更时就会触发FileWatcher中的onChange回调方法,onChange回调方法会再次调用上面说的readClusterConfFromDisk方法重新读取一遍cluster.conf文件中的节点地址。

// 使用notify机制监控文件的变化,并自动触发读取cluster.conf
WatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher);public static synchronized boolean registerWatcher(final String paths, FileWatcher watcher) throws NacosException {checkState();if (NOW_WATCH_JOB_CNT == MAX_WATCH_FILE_JOB) {return false;}WatchDirJob job = MANAGER.get(paths);if (job == null) {job = new WatchDirJob(paths);job.start();MANAGER.put(paths, job);NOW_WATCH_JOB_CNT++;}job.addSubscribe(watcher);return true;
}/*** 文件监听回调*/
private FileWatcher watcher = new FileWatcher() {/*** 当对应的目录或者文件发生变更的时候会回调该方法* @param event {@link FileChangeEvent}*/@Overridepublic void onChange(FileChangeEvent event) {readClusterConfFromDisk();}@Overridepublic boolean interest(String context) {return StringUtils.contains(context, "cluster.conf");}
};

【3】服务器模式: AddressServerMemberLookup使用地址服务器存储节点信息,服务端节点定时拉取信息进行管理

核心代码: 使用地址服务器存储节点信息,会创建AddressServerMemberLookup,服务端定时拉取信息进行管理;

public class AddressServerMemberLookup extends AbstractMemberLookup {private final GenericType<RestResult<String>> genericType = new GenericType<RestResult<String>>() {};public String domainName;public String addressPort;public String addressUrl;public String envIdUrl;public String addressServerUrl;private volatile boolean isAddressServerHealth = true;private int addressServerFailCount = 0;private int maxFailCount = 12;private final NacosRestTemplate restTemplate = HttpClientBeanHolder.getNacosRestTemplate(Loggers.CORE);private volatile boolean shutdown = false;@Overridepublic void start() throws NacosException {if (start.compareAndSet(false, true)) {this.maxFailCount = Integer.parseInt(EnvUtil.getProperty("maxHealthCheckFailCount", "12"));initAddressSys();run();}}/**** 获取服务器地址*/private void initAddressSys() {String envDomainName = System.getenv("address_server_domain");if (StringUtils.isBlank(envDomainName)) {domainName = EnvUtil.getProperty("address.server.domain", "jmenv.tbsite.net");} else {domainName = envDomainName;}String envAddressPort = System.getenv("address_server_port");if (StringUtils.isBlank(envAddressPort)) {addressPort = EnvUtil.getProperty("address.server.port", "8080");} else {addressPort = envAddressPort;}String envAddressUrl = System.getenv("address_server_url");if (StringUtils.isBlank(envAddressUrl)) {addressUrl = EnvUtil.getProperty("address.server.url", EnvUtil.getContextPath() + "/" + "serverlist");} else {addressUrl = envAddressUrl;}addressServerUrl = "http://" + domainName + ":" + addressPort + addressUrl;envIdUrl = "http://" + domainName + ":" + addressPort + "/env";Loggers.CORE.info("ServerListService address-server port:" + addressPort);Loggers.CORE.info("ADDRESS_SERVER_URL:" + addressServerUrl);}@SuppressWarnings("PMD.UndefineMagicConstantRule")private void run() throws NacosException {// With the address server, you need to perform a synchronous member node pull at startup// Repeat three times, successfully jump outboolean success = false;Throwable ex = null;int maxRetry = EnvUtil.getProperty("nacos.core.address-server.retry", Integer.class, 5);for (int i = 0; i < maxRetry; i++) {try {//拉取集群节点信息syncFromAddressUrl();success = true;break;} catch (Throwable e) {ex = e;Loggers.CLUSTER.error("[serverlist] exception, error : {}", ExceptionUtil.getAllExceptionMsg(ex));}}if (!success) {throw new NacosException(NacosException.SERVER_ERROR, ex);}//创建定时任务GlobalExecutor.scheduleByCommon(new AddressServerSyncTask(), 5_000L);}@Overridepublic void destroy() throws NacosException {shutdown = true;}@Overridepublic Map<String, Object> info() {Map<String, Object> info = new HashMap<>(4);info.put("addressServerHealth", isAddressServerHealth);info.put("addressServerUrl", addressServerUrl);info.put("envIdUrl", envIdUrl);info.put("addressServerFailCount", addressServerFailCount);return info;}private void syncFromAddressUrl() throws Exception {RestResult<String> result = restTemplate.get(addressServerUrl, Header.EMPTY, Query.EMPTY, genericType.getType());if (result.ok()) {isAddressServerHealth = true;Reader reader = new StringReader(result.getData());try {afterLookup(MemberUtil.readServerConf(EnvUtil.analyzeClusterConf(reader)));} catch (Throwable e) {Loggers.CLUSTER.error("[serverlist] exception for analyzeClusterConf, error : {}",ExceptionUtil.getAllExceptionMsg(e));}addressServerFailCount = 0;} else {addressServerFailCount++;if (addressServerFailCount >= maxFailCount) {isAddressServerHealth = false;}Loggers.CLUSTER.error("[serverlist] failed to get serverlist, error code {}", result.getCode());}}// 定时任务class AddressServerSyncTask implements Runnable {@Overridepublic void run() {if (shutdown) {return;}try {//拉取服务列表syncFromAddressUrl();} catch (Throwable ex) {addressServerFailCount++;if (addressServerFailCount >= maxFailCount) {isAddressServerHealth = false;}Loggers.CLUSTER.error("[serverlist] exception, error : {}", ExceptionUtil.getAllExceptionMsg(ex));} finally {GlobalExecutor.scheduleByCommon(this, 5_000L);}}}
}

初始全量同步: Distro协议节点启动时会从其他节点全量同步数据。在Nacos中,整体流程如下:
【1】启动一个定时任务线程DistroLoadDataTask加载数据,调用load()方法加载数据。

/**** 数据加载过程*/
@Override
public void run() {try {//加载数据load();if (!checkCompleted()) {GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());} else {loadCallback.onSuccess();Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");}} catch (Exception e) {loadCallback.onFailed(e);Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);}
}/**** 加载数据,并同步* @throws Exception*/
private void load() throws Exception {while (memberManager.allMembersWithoutSelf().isEmpty()) {Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");TimeUnit.SECONDS.sleep(1);}while (distroComponentHolder.getDataStorageTypes().isEmpty()) {Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");TimeUnit.SECONDS.sleep(1);}//同步数据for (String each : distroComponentHolder.getDataStorageTypes()) {if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {//从远程机器上同步所有数据loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));}}
}

【2】调用loadAllDataSnapshotFromRemote()方法从远程机器同步所有的数据。

/**** 从远程机器上同步所有数据*/
private boolean loadAllDataSnapshotFromRemote(String resourceType) {DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);if (null == transportAgent || null == dataProcessor) {Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}",resourceType, transportAgent, dataProcessor);return false;}//遍历集群成员节点,不包括自己for (Member each : memberManager.allMembersWithoutSelf()) {try {Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());//从远程节点加载数据,调用http请求接口: distro/datums;DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());//处理数据boolean result = dataProcessor.processSnapshot(distroData);Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(),result);if (result) {return true;}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);}}return false;
}

【3】从namingProxy代理获取所有的数据data
 ● 构造http请求,调用httpGet方法从指定的server获取数据。
 ● 从获取的结果result中获取数据bytes

/**** 从namingProxy代理获取所有的数据data,从获取的结果result中获取数据bytes;* @param targetServer target server.* @return*/
@Override
public DistroData getDatumSnapshot(String targetServer) {try {//从namingProxy代理获取所有的数据data,从获取的结果result中获取数据bytes;byte[] allDatum = NamingProxy.getAllData(targetServer);//将数据封装成DistroDatareturn new DistroData(new DistroKey("snapshot", KeyBuilder.INSTANCE_LIST_KEY_PREFIX), allDatum);} catch (Exception e) {throw new DistroException(String.format("Get snapshot from %s failed.", targetServer), e);}
}/*** Get all datum from target server.* NamingProxy.getAllData* 执行HttpGet请求,并获取返回数据* @param server target server address* @return all datum byte array* @throws Exception exception*/
public static byte[] getAllData(String server) throws Exception {//参数封装Map<String, String> params = new HashMap<>(8);//组装URL,并执行HttpGet请求,获取结果集RestResult<String> result = HttpClient.httpGet("http://" + server + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL,new ArrayList<>(), params);//返回数据if (result.ok()) {return result.getData().getBytes();}throw new IOException("failed to req API: " + "http://" + server + EnvUtil.getContextPath()+ UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL + ". code: " + result.getCode() + " msg: "+ result.getMessage());
}

【4】处理数据同步到本地processData
 ● 从data反序列化出datumMap
 ● 把数据存储到dataStore,也就是本地缓存dataMap
 ● 监听器不包括key,就创建一个空的service,并且绑定监听器。

/*** 数据处理并更新本地缓存* @param data* @return* @throws Exception*/
private boolean processData(byte[] data) throws Exception {if (data.length > 0) {//从data反序列化出datumMapMap<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class);// 把数据存储到dataStore,也就是本地缓存dataMapfor (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {dataStore.put(entry.getKey(), entry.getValue());//监听器不包括key,就创建一个空的service,并且绑定监听器if (!listeners.containsKey(entry.getKey())) {// pretty sure the service not exist:if (switchDomain.isDefaultInstanceEphemeral()) {// create empty service//创建一个空的serviceLoggers.DISTRO.info("creating service {}", entry.getKey());Service service = new Service();String serviceName = KeyBuilder.getServiceName(entry.getKey());String namespaceId = KeyBuilder.getNamespace(entry.getKey());service.setName(serviceName);service.setNamespaceId(namespaceId);service.setGroupName(Constants.DEFAULT_GROUP);// now validate the service. if failed, exception will be thrownservice.setLastModifiedMillis(System.currentTimeMillis());service.recalculateChecksum();// The Listener corresponding to the key value must not be empty// 与键值对应的监听器不能为空,这里的监听器类型是 ServiceManagerRecordListener listener = listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).peek();if (Objects.isNull(listener)) {return false;}//为空的绑定监听器listener.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);}}}//循环所有datumMapfor (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {if (!listeners.containsKey(entry.getKey())) {// Should not happen:Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());continue;}try {//执行监听器的onChange监听方法for (RecordListener listener : listeners.get(entry.getKey())) {listener.onChange(entry.getKey(), entry.getValue().value);}} catch (Exception e) {Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);continue;}// Update data store if listener executed successfully:// 监听器listener执行成功后,就更新dataStoredataStore.put(entry.getKey(), entry.getValue());}}return true;
}

【5】监听器listener执行成功后,就更新data store

增量同步: 新增数据使用异步广播同步:服务注册的InstanceController.register()就是数据入口,它会调用ServiceManager.registerInstance(),执行数据同步的时候,调用addInstance(),在该方法中会执行DistroConsistencyServiceImpl.put(),该方法是增量同步的入口,会调用distroProtocol.sync()方法,代码如下:

/**** 数据保存* @param key   key of data, this key should be globally unique* @param value value of data* @throws NacosException*/
@Override
public void put(String key, Record value) throws NacosException {//将数据存入到dataStore中onPut(key, value);//使用distroProtocol同步数据distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX),DataOperation.CHANGE,globalConfig.getTaskDispatchPeriod() / 2);
}

【1】DistroProtocol使用sync()方法接收增量数据

public void sync(DistroKey distroKey, DataOperation action, long delay) {//向除了自己外的所有节点广播for (Member each : memberManager.allMembersWithoutSelf()) {DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),each.getAddress());DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);//从distroTaskEngineHolder获取延时执行引擎,并将distroDelayTask任务添加进来//执行延时任务发布distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());}}
}

【2】向其他节点发布广播任务:调用distroTaskEngineHolder发布延迟任务

/**** 构造函数指定任务处理器* @param distroComponentHolder*/
public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);//指定任务处理器defaultDelayTaskProcessordelayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);
}

【3】调用DistroDelayTaskProcessor.process()方法进行任务投递:将延迟任务转换为异步变更任务

/**** 任务处理过程* @param task     task.* @return*/
@Override
public boolean process(NacosTask task) {if (!(task instanceof DistroDelayTask)) {return true;}DistroDelayTask distroDelayTask = (DistroDelayTask) task;DistroKey distroKey = distroDelayTask.getDistroKey();if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) {//将延迟任务变更成异步任务,异步任务对象是一个线程DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);//将任务添加到NacosExecuteTaskExecuteEngine中,并执行distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);return true;}return false;
}

【4】执行变更任务DistroSyncChangeTask.run()方法:向指定节点发送消息

/**** 执行数据同步*/
@Override
public void run() {Loggers.DISTRO.info("[DISTRO-START] {}", toString());try {//获取本地缓存数据String type = getDistroKey().getResourceType();DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());distroData.setType(DataOperation.CHANGE);//向其他节点同步数据boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());if (!result) {handleFailedTask();}Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);} catch (Exception e) {Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);handleFailedTask();}
}

 ● 调用DistroHttpAgent.syncData()方法发送数据
 ● 调用NamingProxy.syncData()方法发送数据

/**** 向其他节点同步数据* @param data         data* @param targetServer target server* @return*/
@Override
public boolean syncData(DistroData data, String targetServer) {if (!memberManager.hasMember(targetServer)) {return true;}//获取数据字节数组byte[] dataContent = data.getContent();//通过Http协议同步数据return NamingProxy.syncData(dataContent, data.getDistroKey().getTargetServer());
}

【5】异常任务调用handleFailedTask()方法进行处理
 ● 调用DistroFailedTaskHandler处理失败任务
 ● 调用DistroHttpCombinedKeyTaskFailedHandler将失败任务重新投递成延迟任务

private void handleFailedTask() {String type = getDistroKey().getResourceType();DistroFailedTaskHandler failedTaskHandler = distroComponentHolder.findFailedTaskHandler(type);if (null == failedTaskHandler) {return;}// 重试失败任务failedTaskHandler.retry(getDistroKey(), ApplyAction.CHANGE);
}

相关文章:

深入解析:Nacos AP 模式的实现原理与应用场景

优质博文&#xff1a;IT-BLOG-CN 一、CAP 基础 ‌Nacos作为注册中心同时支持CP和AP模式。‌ Nacos通过不同的协议和机制来实现这两种模式&#xff0c;以满足不同的需求场景。 在Nacos中&#xff0c;默认情况下使用的是AP模式&#xff0c;通过Distro协议来实现。AP模式主要关…...

snmpnetstat使用说明

1.snmpnetstat介绍 snmpnetstat命令是用来查看snmp服务器的端口连接信息。 2.snmpnetstat安装 1.snmpnetstat安装 命令: yum -y install net-snmp net-snmp-utils [root@logstash ~]# yum -y install net-snmp net-snmp-utils Loaded plugins: fastestmirror Loading mi…...

linux线程 | 同步与互斥 | 互斥(下)

前言&#xff1a;本篇文章主要讲述linux线程的互斥的知识。 讲解流程为先讲解锁的工作原理&#xff0c; 再自己封装一下锁并且使用一下。 做完这些就要输出一堆理论性的东西&#xff0c; 但博主会总结两条结论&#xff01;&#xff01;最后就是讲一下死锁。 那么&#xff0c; 废…...

2024-10-17 问AI: [AI面试题] 讨论 AI 的挑战和局限性

文心一言 AI&#xff08;人工智能&#xff09;虽然取得了显著的进步&#xff0c;但仍面临多方面的挑战和局限性。以下是对AI挑战和局限性的详细讨论&#xff1a; 一、数据质量与可靠性 数据质量&#xff1a;AI系统依赖于高质量的数据进行训练和学习。如果数据质量低劣或包含…...

go基础(一)

包声明引入包函数变量语句&表达式注释 package main//包声明import "fmt"//引入包 //函数 func main() {/* 这是我的第一个简单的程序 */fmt.Println("Hello, World!") }基础语法 标记 go程序可以由多个标记组成&#xff0c;可以是关键字&#xff0…...

python忽略warnings 的方法

我在训练深度学习模型的时候一直出现这样的警告&#xff0c;但是不影响运行&#xff1a; UserWarning: Failed to load image Python extension: [WinError 127] 找不到指定的程序。 warn(f"Failed to load image Python extension: {e}") 要避免在 Python 程序运…...

2024年底蓝奏云最新可用API接口列表 支持优享版 无需手动抓取cookie

Lanzou Pro V1 接口列表 API状态版本路由获取文件与目录✅^1.0.1/v1/getFilesAndDirectories?url{}&page{}获取目录✅^1.0.0/v1/getDirectory?url{}获取文件✅^1.0.1/v1/getFiles?url{}&page{}搜索文件✅^1.0.0/v1/searchFile?url{}&wd{}依Id解析✅^1.0.2/v1/…...

Linux常用命令详细解析(含完整命令演示过程)

目录 1. 目录结构介绍 2. Linux命令基础 2.1 命令和命令行 2.2 格式 3. 常用命令 3.1 产看目录命令——ls 3.2 通配符 3.3 改变工作目录命令——cd 3.4 查看当前路径命令——pwd 3.5 创建新的目录命令——mkdir 3.6 创建文件目录命令——touch 3.7 查看…...

《使用Gin框架构建分布式应用》阅读笔记:p101-p107

《用Gin框架构建分布式应用》学习第7天&#xff0c;p101-p107总结&#xff0c;总计7页。 一、技术总结 1.StatusBadRequest vs StatusInternalServerError 写代码的时候有一个问题&#xff0c;什么时候使用 StatusBadRequest(400错误)&#xff0c;什么时候使用 StatusIntern…...

014集——c#实现打开、另存对话框(CAD—C#二次开发入门)

如下图所示&#xff0c;运行后实现如下功能&#xff1a; 打开对话框&#xff0c;选择一个文件&#xff0c;并获取文件名变量。 打开另存对话框&#xff0c;输入路径和文件名&#xff0c;获取另存文件名变量。 部分代码如下&#xff1a; public static void Ofd(this Database…...

全面升级:亚马逊测评环境方案的最新趋势与实践

在亚马逊测评领域深耕多年&#xff0c;见证了无数环境方案的更迭与演变&#xff0c;每一次变化都体现了国人不畏艰难、勇于创新的精神。面对平台的政策调整&#xff0c;总能找到相应的对策。那么&#xff0c;当前是否存在一套相对稳定且高效的技术方案呢&#xff1f;答案是肯定…...

Java中的异步编程模型

1.什么是异步编程&#xff1f; 异步编程是一种编程模式&#xff0c;允许程序在等待某些操作&#xff08;例如文件I/O或网络请求&#xff09;完成时&#xff0c;不必停下来等待&#xff0c;而是继续执行其他任务。当异步操作完成时&#xff0c;回调函数或任务调度器会处理结果&…...

opencv 按位操作

opencv位运算说明 按位与&#xff0c;按位或&#xff0c;按位非&#xff0c;按位异或 在 OpenCV 中&#xff0c;按位操作函数的接口一般包括两个或多个图像数组&#xff08;矩阵&#xff09;作为输入&#xff0c;常常还会有一个可选的掩码参数。下面我列出每个函数的具体接口…...

【Bug】STM32串口空闲中断接收不定长数据异常

Bug 使用标准库配置STM32F103C8T6的串口1开启接收中断和空闲中断&#xff0c;通过空闲中断来判断数据发送是否结束&#xff0c;收到数据后切换板载LED灯所接引脚电平&#xff0c;发现LED出现三种情况&#xff0c;熄灭、微亮、正常亮&#xff0c;但是LED灯所接的GPIO引脚为PC13…...

使用Radzen Blazor组件库开发的基于ABP框架炫酷UI主题

一、项目简介 使用过ABP框架的童鞋应该知道它也自带了一款免费的Blazor UI主题&#xff0c;它的页面是长这样的&#xff1a; 个人感觉不太美观&#xff0c;于是网上搜了很多Blazor开源组件库&#xff0c;发现有一款样式非常不错的组件库&#xff0c;名叫&#xff1a;Radzen&am…...

Java入门4——输入输出+实用的函数

在本篇博客&#xff0c;采用代码解释的方法&#xff0c;帮助大家熟悉Java的语法 一、输入和输出 在Java当中&#xff0c;我们一般有这样输入输出&#xff1a; import java.util.Scanner;public class javaSchool {public static void main(String[] args) {Scanner scanner …...

《当尼采哭泣》

这是一个相互救赎的故事。故事铺垫比较冗长&#xff0c;看到一半的时候一度看不下去。直到看到最后两章才最终感觉值得一看。很多表层现象&#xff0c;就像露出水面的冰山。解决表面的问题&#xff0c;需要深挖冰山水下的部分。一个人碰到的最难解决的问题不在外部&#xff0c;…...

TOMCAT Using CATALINA——OPTS,闪退解决方法(两种)

【Java实践】安装tomcat启动startup.bat出现闪退问题_安装tomcat点击startup闪退-CSDN博客...

Android音视频 MediaCodec框架-启动编码(4)

Android音视频 MediaCodec框架-启动编码 简述 上一节我们介绍了MediaCodec框架创建编码器流程&#xff0c;编解码的流程其实基本是一样的&#xff0c;只是底层的最终的实现组件不同&#xff0c;所以我们只看启动编码流程。 MediaCodec启动编码 从MediaCodec的start方法开始…...

# Go 语言中的 Interface 和 Struct

go package mainimport ("fmt" )// Girl 是一个接口&#xff0c;定义了所有"女性"类型都应该实现的方法 type Girl interface {call()introduce() }// Wife 结构体代表妻子 type wife struct {name stringage intyearsWed int }// call 方法…...

多云管理“拦路虎”:深入解析网络互联、身份同步与成本可视化的技术复杂度​

一、引言&#xff1a;多云环境的技术复杂性本质​​ 企业采用多云策略已从技术选型升维至生存刚需。当业务系统分散部署在多个云平台时&#xff0c;​​基础设施的技术债呈现指数级积累​​。网络连接、身份认证、成本管理这三大核心挑战相互嵌套&#xff1a;跨云网络构建数据…...

Android Wi-Fi 连接失败日志分析

1. Android wifi 关键日志总结 (1) Wi-Fi 断开 (CTRL-EVENT-DISCONNECTED reason3) 日志相关部分&#xff1a; 06-05 10:48:40.987 943 943 I wpa_supplicant: wlan0: CTRL-EVENT-DISCONNECTED bssid44:9b:c1:57:a8:90 reason3 locally_generated1解析&#xff1a; CTR…...

ssc377d修改flash分区大小

1、flash的分区默认分配16M、 / # df -h Filesystem Size Used Available Use% Mounted on /dev/root 1.9M 1.9M 0 100% / /dev/mtdblock4 3.0M...

前端导出带有合并单元格的列表

// 导出async function exportExcel(fileName "共识调整.xlsx") {// 所有数据const exportData await getAllMainData();// 表头内容let fitstTitleList [];const secondTitleList [];allColumns.value.forEach(column > {if (!column.children) {fitstTitleL…...

蓝桥杯 2024 15届国赛 A组 儿童节快乐

P10576 [蓝桥杯 2024 国 A] 儿童节快乐 题目描述 五彩斑斓的气球在蓝天下悠然飘荡&#xff0c;轻快的音乐在耳边持续回荡&#xff0c;小朋友们手牵着手一同畅快欢笑。在这样一片安乐祥和的氛围下&#xff0c;六一来了。 今天是六一儿童节&#xff0c;小蓝老师为了让大家在节…...

leetcodeSQL解题:3564. 季节性销售分析

leetcodeSQL解题&#xff1a;3564. 季节性销售分析 题目&#xff1a; 表&#xff1a;sales ---------------------- | Column Name | Type | ---------------------- | sale_id | int | | product_id | int | | sale_date | date | | quantity | int | | price | decimal | -…...

零基础设计模式——行为型模式 - 责任链模式

第四部分&#xff1a;行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习&#xff01;行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想&#xff1a;使多个对象都有机会处…...

docker 部署发现spring.profiles.active 问题

报错&#xff1a; org.springframework.boot.context.config.InvalidConfigDataPropertyException: Property spring.profiles.active imported from location class path resource [application-test.yml] is invalid in a profile specific resource [origin: class path re…...

算法:模拟

1.替换所有的问号 1576. 替换所有的问号 - 力扣&#xff08;LeetCode&#xff09; ​遍历字符串​&#xff1a;通过外层循环逐一检查每个字符。​遇到 ? 时处理​&#xff1a; 内层循环遍历小写字母&#xff08;a 到 z&#xff09;。对每个字母检查是否满足&#xff1a; ​与…...

C++课设:简易日历程序(支持传统节假日 + 二十四节气 + 个人纪念日管理)

名人说:路漫漫其修远兮,吾将上下而求索。—— 屈原《离骚》 创作者:Code_流苏(CSDN)(一个喜欢古诗词和编程的Coder😊) 专栏介绍:《编程项目实战》 目录 一、为什么要开发一个日历程序?1. 深入理解时间算法2. 练习面向对象设计3. 学习数据结构应用二、核心算法深度解析…...