深入解析:Nacos AP 模式的实现原理与应用场景
优质博文:IT-BLOG-CN
一、CAP 基础
Nacos
作为注册中心同时支持CP
和AP
模式。 Nacos
通过不同的协议和机制来实现这两种模式,以满足不同的需求场景。
在Nacos
中,默认情况下使用的是AP
模式,通过Distro
协议来实现。AP
模式主要关注高可用性,在网络分区时仍然保持服务,但可能会允许短暂的数据不一致。
此外,Nacos
也支持CP
模式,通过Raft
协议来实现。CP
模式在网络分区时牺牲可用性以保证数据一致性,适用于对数据准确性要求高的场景,如金融行业。
具体实现上,Nacos
通过客户端设置spring.cloud.nacos.discovery.ephemeral
的值为false
来启用CP
模式,默认为true
则为AP
模式。此外,Nacos
的gRPC
通信端口和集群节点之间的通信端口也有所不同,分别用于AP
和CP
模式的实现。
二、Nacos AP 实现原理
Distro
协议。Distro
是阿里巴巴开源的一个动态服务发现、配置管理和服务管理平台。目前流行的Nacos
服务管理框架就采用了Distro
协议。Distro
协议被定位为临时数据的一致性协议 :该类型协议, 不需要把数据存储到磁盘或者数据库,因为临时数据通常和服务器保持一个session
会话,该会话只要存在,数据就不会丢失。
Distro
协议保证写必须永远是成功的,即使可能会发生网络分区。当网络恢复时,把各数据分片的数据进行合并。
Distro协议具有以下特点:
【1】数据分片Sharding
:
☑️ 将数据根据某种规则(如哈希)分片,每个节点负责一部分数据的存储和管理。
☑️ 这种分片策略可以有效地分散负载,避免单点瓶颈。
【2】数据复制Replication
:
☑️ 在每个节点上维护一份完整的数据副本,确保数据在节点之间的一致性。
☑️ 使用异步复制的方式,在数据更新时将更新信息广播给其他节点。
【3】一致性管理:
☑️ 采用最终一致性模型,确保数据在一定时间内达到一致。
☑️ 使用版本号或时间戳来管理数据的更新,确保数据的最新状态能够传播到所有节点。
【4】心跳检测和故障转移:
☑️ 定期进行心跳检测,确保节点的健康状态。
☑️ 在节点故障时,其他节点能够快速接管其数据和职责,确保系统的高可用性。
Distro
协议服务端节点发现使用寻址机制来实现服务端节点的管理。在Nacos
中,寻址模式有三种:
在Nacos
服务启动的时候ServerMemberManager
这个类专门对集群节点进行管理的,这个类在init
方法中就会对集群进行初始化
protected void init() throws NacosException {Loggers.CORE.info("Nacos-related cluster resource initialization");// 得到当前nacos服务的端口号this.port = EnvUtil.getProperty("server.port", Integer.class, 8848);// 得到当前nacos服务的地址this.localAddress = InetUtils.getSelfIP() + ":" + port;// 解析地址得到当前nacos服务所对应的集群节点对象this.self = MemberUtil.singleParse(this.localAddress);// 给当前nacos服务设置一个版本号this.self.setExtendVal(MemberMetaDataConstants.VERSION, VersionUtils.version);// 把自己放到serverList中serverList.put(self.getAddress(), self);// 该方法做了两件事// 1.注册了一个集群节点信息变更事件// 2.注册了订阅IPChangeEvent事件的事件订阅者registerClusterEvent();// 初始化节点地址寻址模式// 在这里就可以通过配置的节点地址去初始化整个nacos集群节点集合了initAndStartLookup();if (serverList.isEmpty()) {throw new NacosException(NacosException.SERVER_ERROR, "cannot get serverlist, so exit.");}Loggers.CORE.info("The cluster resource is initialized");
}
注册了订阅IPChangeEvent
事件的事件订阅者registerClusterEvent
:该方法做了两件事:
【1】注册了一个集群节点信息变更事件。
【2】注册了订阅IPChangeEvent
事件的事件订阅者。
private void registerClusterEvent() {// 注册一个集群节点信息变更事件NotifyCenter.registerToPublisher(MembersChangeEvent.class,EnvUtil.getProperty("nacos.member-change-event.queue.size", Integer.class, 128));// 注册一个事件订阅者,订阅的事件类型是IPChangeEventNotifyCenter.registerSubscriber(new Subscriber<InetUtils.IPChangeEvent>() {@Overridepublic void onEvent(InetUtils.IPChangeEvent event) {String newAddress = event.getNewIP() + ":" + port;ServerMemberManager.this.localAddress = newAddress;EnvUtil.setLocalAddress(localAddress);Member self = ServerMemberManager.this.self;self.setIp(event.getNewIP());String oldAddress = event.getOldIP() + ":" + port;ServerMemberManager.this.serverList.remove(oldAddress);ServerMemberManager.this.serverList.put(newAddress, self);ServerMemberManager.this.memberAddressInfos.remove(oldAddress);ServerMemberManager.this.memberAddressInfos.add(newAddress);}@Overridepublic Class<? extends Event> subscribeType() {return InetUtils.IPChangeEvent.class;}});
}
通过上述方法分析可知:注册一个MembersChangeEvent
事件,而对应的事件订阅者是ServerListManager
。同时该方法还会注册一个IPChangeEvent
事件的事件订阅者,IPChangeEvent
这个事件就是当前节点IP
发生变更之后发布的,该事件发布之后会被这个注册的订阅者所捕获,该订阅者做的事情也很简单,就是对集群节点集合中对应当前节点的ip
进行更新就行了。
这一行代码就是初始化集群的关键: 创建nacos
集群节点寻址器
private void initAndStartLookup() throws NacosException {this.lookup = LookupFactory.createLookUp(this);this.lookup.start();
}
先是通过LookupFactory
创建一个节点寻址器,然后调用start
方法启动这个节点寻址器。
通过上图可知Nacos
配置集群节点地址的时候有两种方式:读取本地配置文件 和 通过配置服务器
/*** 创建nacos集群节点寻址器** @param memberManager {@link ServerMemberManager}* @return {@link MemberLookup}* @throws NacosException NacosException*/
public static MemberLookup createLookUp(ServerMemberManager memberManager) throws NacosException {// 条件成立:当前nacos节点是集群模式if (!EnvUtil.getStandaloneMode()) {// 从配置环境中获取nacos集群节点的寻址方式String lookupType = EnvUtil.getProperty(LOOKUP_MODE_TYPE);LookupType type = chooseLookup(lookupType);// 得到对应的节点寻址器,FileConfigMemberLookup / AddressServerMemberLookupLOOK_UP = find(type);currentLookupType = type;}// 条件成立:当前nacos节点是单机模式else {LOOK_UP = new StandaloneMemberLookup();}LOOK_UP.injectMemberManager(memberManager);Loggers.CLUSTER.info("Current addressing mode selection : {}", LOOK_UP.getClass().getSimpleName());return LOOK_UP;
}
创建nacos
集群节点寻址器(参考流程图)
/*** 创建nacos集群节点寻址器** @param memberManager {@link ServerMemberManager}* @return {@link MemberLookup}* @throws NacosException NacosException*/
public static MemberLookup createLookUp(ServerMemberManager memberManager) throws NacosException {// 条件成立:当前nacos节点是集群模式if (!EnvUtil.getStandaloneMode()) {// 从配置环境中获取nacos集群节点的寻址方式String lookupType = EnvUtil.getProperty(LOOKUP_MODE_TYPE);LookupType type = chooseLookup(lookupType);// 得到对应的节点寻址器,FileConfigMemberLookup / AddressServerMemberLookupLOOK_UP = find(type);currentLookupType = type;}// 条件成立:当前nacos节点是单机模式else {LOOK_UP = new StandaloneMemberLookup();}LOOK_UP.injectMemberManager(memberManager);Loggers.CLUSTER.info("Current addressing mode selection : {}", LOOK_UP.getClass().getSimpleName());return LOOK_UP;
}
怎么确定是使用FileConfigMemberLookup
还是AddressServerMemberLookup
,主要看是否配置了lookupType
,如果没有配置lookupType
就按照默认的先看是否配置了集群配置文件。
private static LookupType chooseLookup(String lookupType) {if (StringUtils.isNotBlank(lookupType)) {LookupType type = LookupType.sourceOf(lookupType);if (Objects.nonNull(type)) {return type;}}// 代码来到这里说明没有配置lookupType,此时会默认去寻找user.home/nacos/conf/cluster.conf文件File file = new File(EnvUtil.getClusterConfFilePath());// 条件成立:集群配置文件存在,或者环境变量配置了集群节点地址if (file.exists() || StringUtils.isNotBlank(EnvUtil.getMemberList())) {// 返回文件寻址模式return LookupType.FILE_CONFIG;}// 返回服务器寻址模式return LookupType.ADDRESS_SERVER;
}
确定了对应的节点寻址器,FileConfigMemberLookup / AddressServerMemberLookup
后获取对应的节点地址。如果当前节点是集群模式,那么会去判断${user.home}/nacos/conf/cluster.conf
这个文件是否存在或者环境变量中是否配置了集群节点地址,如果两者有一个成立就是文件寻址模式,反之是服务器寻址模式。
private static MemberLookup find(LookupType type) {// 条件成立:集群配置方式是文件配置的方式if (LookupType.FILE_CONFIG.equals(type)) {// 创建一个FileConfigMemberLookup对象并返回LOOK_UP = new FileConfigMemberLookup();return LOOK_UP;}// 条件成立:集群配置方式是通过服务器获取节点地址的方式if (LookupType.ADDRESS_SERVER.equals(type)) {LOOK_UP = new AddressServerMemberLookup();return LOOK_UP;}// unpossible to run herethrow new IllegalArgumentException();
}
如何初始化集群:
【1】单机模式: StandaloneMemberLookup
【2】文件模式: FileConfigMemberLookup
利用监控cluster.conf
文件的变动实现节点的管理。核心代码如下:
public class FileConfigMemberLookup extends AbstractMemberLookup {/*** 文件监听回调*/private FileWatcher watcher = new FileWatcher() {/*** 当对应的目录或者文件发生变更的时候会回调该方法* @param event {@link FileChangeEvent}*/@Overridepublic void onChange(FileChangeEvent event) {readClusterConfFromDisk();}@Overridepublic boolean interest(String context) {return StringUtils.contains(context, "cluster.conf");}};@Overridepublic void start() throws NacosException {if (start.compareAndSet(false, true)) {// 从文件中读取集群节点地址readClusterConfFromDisk();try {// 使用notify机制监控文件的变化,并自动触发读取cluster.confWatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher);} catch (Throwable e) {Loggers.CLUSTER.error("An exception occurred in the launch file monitor : {}", e.getMessage());}}}@Overridepublic void destroy() throws NacosException {WatchFileCenter.deregisterWatcher(EnvUtil.getConfPath(), watcher);}/*** 从cluster.conf文件中读取集群节点地址*/private void readClusterConfFromDisk() {Collection<Member> tmpMembers = new ArrayList<>();try {// 获取到cluster.conf文件中配置的节点地址列表List<String> tmp = EnvUtil.readClusterConf();// 把这些节点地址分别转换成对应的集群节点对象tmpMembers = MemberUtil.readServerConf(tmp);} catch (Throwable e) {Loggers.CLUSTER.error("nacos-XXXX [serverlist] failed to get serverlist from disk!, error : {}", e.getMessage());}afterLookup(tmpMembers);}
}
在start
方法中先调用readClusterConfFromDisk
方法,这个方法会读取${user.home}/nacos/conf/cluster.conf
这个文件中配置的节点地址,读取到之后把这些节点地址转化为对应的Member
对象,一个Member
对象就代表一个节点,接着会调用父类AbstractMemberLookup
的afterLookup
方法。
public void afterLookup(Collection<Member> members) {this.memberManager.memberChange(members);
}
调用的是集群节点管理器的menberChange
方法,同时把上面从cluster.conf
文件中读取到的Member
节点集合作为方法参数:
/*** 当nacos节点启动 或者 每次配置的集群节点地址发生改变的时候就会调用到该方法* @param members 当前最新的集群节点地址* @return 返回true表示集群节点数量发生改变了,反之表示没改变*/
synchronized boolean memberChange(Collection<Member> members) {if (members == null || members.isEmpty()) {return false;}// 配置的集群节点地址是否包含当前nacos节点boolean isContainSelfIp = members.stream().anyMatch(ipPortTmp -> Objects.equals(localAddress, ipPortTmp.getAddress()));if (isContainSelfIp) {isInIpList = true;} else {isInIpList = false;members.add(this.self);Loggers.CLUSTER.warn("[serverlist] self ip {} not in serverlist {}", self, members);}// If the number of old and new clusters is different, the cluster information// must have changed; if the number of clusters is the same, then compare whether// there is a difference; if there is a difference, then the cluster node changes// are involved and all recipients need to be notified of the node change eventboolean hasChange = members.size() != serverList.size();ConcurrentSkipListMap<String, Member> tmpMap = new ConcurrentSkipListMap<>();Set<String> tmpAddressInfo = new ConcurrentHashSet<>();for (Member member : members) {final String address = member.getAddress();if (!serverList.containsKey(address)) {hasChange = true;// 如果cluster.conf或address-server中的cluster信息被更改,而对应的nacos-server还没有启动,则成员的状态应该设置为DOWN,// 如果相应的nacos-server已经启动,则在几秒钟后检测到该成员的状态将被设置为UPmember.setState(NodeState.DOWN);} else {//fix issue # 4925member.setState(serverList.get(address).getState());}// Ensure that the node is created only oncetmpMap.put(address, member);if (NodeState.UP.equals(member.getState())) {tmpAddressInfo.add(address);}}// 更新serverList为最新的集群节点集合serverList = tmpMap;// 更新memberAddressInfos为最新的集群节点地址memberAddressInfos = tmpAddressInfo;// 获取更新之后集群所有的节点对象Collection<Member> finalMembers = allMembers();Loggers.CLUSTER.warn("[serverlist] updated to : {}", finalMembers);// Persist the current cluster node information to cluster.conf// <important> need to put the event publication into a synchronized block to ensure// that the event publication is sequential// 条件成立:1.集群中有节点增加了// 2.集群中有节点下线了// 3.手动增加或者删除了节点配置地址信息if (hasChange) {// 把最新的节点写入到配置中(cluster.conf或者address-server)MemberUtil.syncToFile(finalMembers);// 发布一个MembersChangeEvent事件Event event = MembersChangeEvent.builder().members(finalMembers).build();NotifyCenter.publishEvent(event);}return hasChange;
}
如果是集群节点的数量发生改变的话,就会发布一个MembersChangeEvent
事件,而这个事件对应的订阅者是ServerListManager
这个类,在这个类中也保存了整个nacos
集群所有的节点集合,在回调它的订阅方法时很简单就是把这个集合属性重新赋值,代码如下:
/*** 当集群中的其他节点发生变化的时候会当前nacos节点就会发布一个MembersChangeEvent事件,然后会调用该方法更新最新的集群信息集合* @param event MembersChangeEvent*/
@Override
public void onEvent(MembersChangeEvent event) {// 把最新的集群节点集合重新赋值到serversthis.servers = new ArrayList<>(event.getMembers());
}
读取配置文件的过程已经分析完后,对cluster.conf
文件注册一个监听器,当cluster.conf
文件发生变更时就会触发FileWatcher
中的onChange
回调方法,onChange
回调方法会再次调用上面说的readClusterConfFromDisk
方法重新读取一遍cluster.conf
文件中的节点地址。
// 使用notify机制监控文件的变化,并自动触发读取cluster.conf
WatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher);public static synchronized boolean registerWatcher(final String paths, FileWatcher watcher) throws NacosException {checkState();if (NOW_WATCH_JOB_CNT == MAX_WATCH_FILE_JOB) {return false;}WatchDirJob job = MANAGER.get(paths);if (job == null) {job = new WatchDirJob(paths);job.start();MANAGER.put(paths, job);NOW_WATCH_JOB_CNT++;}job.addSubscribe(watcher);return true;
}/*** 文件监听回调*/
private FileWatcher watcher = new FileWatcher() {/*** 当对应的目录或者文件发生变更的时候会回调该方法* @param event {@link FileChangeEvent}*/@Overridepublic void onChange(FileChangeEvent event) {readClusterConfFromDisk();}@Overridepublic boolean interest(String context) {return StringUtils.contains(context, "cluster.conf");}
};
【3】服务器模式: AddressServerMemberLookup
使用地址服务器存储节点信息,服务端节点定时拉取信息进行管理
核心代码: 使用地址服务器存储节点信息,会创建AddressServerMemberLookup
,服务端定时拉取信息进行管理;
public class AddressServerMemberLookup extends AbstractMemberLookup {private final GenericType<RestResult<String>> genericType = new GenericType<RestResult<String>>() {};public String domainName;public String addressPort;public String addressUrl;public String envIdUrl;public String addressServerUrl;private volatile boolean isAddressServerHealth = true;private int addressServerFailCount = 0;private int maxFailCount = 12;private final NacosRestTemplate restTemplate = HttpClientBeanHolder.getNacosRestTemplate(Loggers.CORE);private volatile boolean shutdown = false;@Overridepublic void start() throws NacosException {if (start.compareAndSet(false, true)) {this.maxFailCount = Integer.parseInt(EnvUtil.getProperty("maxHealthCheckFailCount", "12"));initAddressSys();run();}}/**** 获取服务器地址*/private void initAddressSys() {String envDomainName = System.getenv("address_server_domain");if (StringUtils.isBlank(envDomainName)) {domainName = EnvUtil.getProperty("address.server.domain", "jmenv.tbsite.net");} else {domainName = envDomainName;}String envAddressPort = System.getenv("address_server_port");if (StringUtils.isBlank(envAddressPort)) {addressPort = EnvUtil.getProperty("address.server.port", "8080");} else {addressPort = envAddressPort;}String envAddressUrl = System.getenv("address_server_url");if (StringUtils.isBlank(envAddressUrl)) {addressUrl = EnvUtil.getProperty("address.server.url", EnvUtil.getContextPath() + "/" + "serverlist");} else {addressUrl = envAddressUrl;}addressServerUrl = "http://" + domainName + ":" + addressPort + addressUrl;envIdUrl = "http://" + domainName + ":" + addressPort + "/env";Loggers.CORE.info("ServerListService address-server port:" + addressPort);Loggers.CORE.info("ADDRESS_SERVER_URL:" + addressServerUrl);}@SuppressWarnings("PMD.UndefineMagicConstantRule")private void run() throws NacosException {// With the address server, you need to perform a synchronous member node pull at startup// Repeat three times, successfully jump outboolean success = false;Throwable ex = null;int maxRetry = EnvUtil.getProperty("nacos.core.address-server.retry", Integer.class, 5);for (int i = 0; i < maxRetry; i++) {try {//拉取集群节点信息syncFromAddressUrl();success = true;break;} catch (Throwable e) {ex = e;Loggers.CLUSTER.error("[serverlist] exception, error : {}", ExceptionUtil.getAllExceptionMsg(ex));}}if (!success) {throw new NacosException(NacosException.SERVER_ERROR, ex);}//创建定时任务GlobalExecutor.scheduleByCommon(new AddressServerSyncTask(), 5_000L);}@Overridepublic void destroy() throws NacosException {shutdown = true;}@Overridepublic Map<String, Object> info() {Map<String, Object> info = new HashMap<>(4);info.put("addressServerHealth", isAddressServerHealth);info.put("addressServerUrl", addressServerUrl);info.put("envIdUrl", envIdUrl);info.put("addressServerFailCount", addressServerFailCount);return info;}private void syncFromAddressUrl() throws Exception {RestResult<String> result = restTemplate.get(addressServerUrl, Header.EMPTY, Query.EMPTY, genericType.getType());if (result.ok()) {isAddressServerHealth = true;Reader reader = new StringReader(result.getData());try {afterLookup(MemberUtil.readServerConf(EnvUtil.analyzeClusterConf(reader)));} catch (Throwable e) {Loggers.CLUSTER.error("[serverlist] exception for analyzeClusterConf, error : {}",ExceptionUtil.getAllExceptionMsg(e));}addressServerFailCount = 0;} else {addressServerFailCount++;if (addressServerFailCount >= maxFailCount) {isAddressServerHealth = false;}Loggers.CLUSTER.error("[serverlist] failed to get serverlist, error code {}", result.getCode());}}// 定时任务class AddressServerSyncTask implements Runnable {@Overridepublic void run() {if (shutdown) {return;}try {//拉取服务列表syncFromAddressUrl();} catch (Throwable ex) {addressServerFailCount++;if (addressServerFailCount >= maxFailCount) {isAddressServerHealth = false;}Loggers.CLUSTER.error("[serverlist] exception, error : {}", ExceptionUtil.getAllExceptionMsg(ex));} finally {GlobalExecutor.scheduleByCommon(this, 5_000L);}}}
}
初始全量同步: Distro
协议节点启动时会从其他节点全量同步数据。在Nacos
中,整体流程如下:
【1】启动一个定时任务线程DistroLoadDataTask
加载数据,调用load()
方法加载数据。
/**** 数据加载过程*/
@Override
public void run() {try {//加载数据load();if (!checkCompleted()) {GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());} else {loadCallback.onSuccess();Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");}} catch (Exception e) {loadCallback.onFailed(e);Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);}
}/**** 加载数据,并同步* @throws Exception*/
private void load() throws Exception {while (memberManager.allMembersWithoutSelf().isEmpty()) {Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");TimeUnit.SECONDS.sleep(1);}while (distroComponentHolder.getDataStorageTypes().isEmpty()) {Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");TimeUnit.SECONDS.sleep(1);}//同步数据for (String each : distroComponentHolder.getDataStorageTypes()) {if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {//从远程机器上同步所有数据loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));}}
}
【2】调用loadAllDataSnapshotFromRemote()
方法从远程机器同步所有的数据。
/**** 从远程机器上同步所有数据*/
private boolean loadAllDataSnapshotFromRemote(String resourceType) {DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);if (null == transportAgent || null == dataProcessor) {Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}",resourceType, transportAgent, dataProcessor);return false;}//遍历集群成员节点,不包括自己for (Member each : memberManager.allMembersWithoutSelf()) {try {Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());//从远程节点加载数据,调用http请求接口: distro/datums;DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());//处理数据boolean result = dataProcessor.processSnapshot(distroData);Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(),result);if (result) {return true;}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);}}return false;
}
【3】从namingProxy
代理获取所有的数据data
。
● 构造http
请求,调用httpGet
方法从指定的server
获取数据。
● 从获取的结果result
中获取数据bytes
。
/**** 从namingProxy代理获取所有的数据data,从获取的结果result中获取数据bytes;* @param targetServer target server.* @return*/
@Override
public DistroData getDatumSnapshot(String targetServer) {try {//从namingProxy代理获取所有的数据data,从获取的结果result中获取数据bytes;byte[] allDatum = NamingProxy.getAllData(targetServer);//将数据封装成DistroDatareturn new DistroData(new DistroKey("snapshot", KeyBuilder.INSTANCE_LIST_KEY_PREFIX), allDatum);} catch (Exception e) {throw new DistroException(String.format("Get snapshot from %s failed.", targetServer), e);}
}/*** Get all datum from target server.* NamingProxy.getAllData* 执行HttpGet请求,并获取返回数据* @param server target server address* @return all datum byte array* @throws Exception exception*/
public static byte[] getAllData(String server) throws Exception {//参数封装Map<String, String> params = new HashMap<>(8);//组装URL,并执行HttpGet请求,获取结果集RestResult<String> result = HttpClient.httpGet("http://" + server + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL,new ArrayList<>(), params);//返回数据if (result.ok()) {return result.getData().getBytes();}throw new IOException("failed to req API: " + "http://" + server + EnvUtil.getContextPath()+ UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL + ". code: " + result.getCode() + " msg: "+ result.getMessage());
}
【4】处理数据同步到本地processData
。
● 从data
反序列化出datumMap
。
● 把数据存储到dataStore
,也就是本地缓存dataMap
。
● 监听器不包括key
,就创建一个空的service
,并且绑定监听器。
/*** 数据处理并更新本地缓存* @param data* @return* @throws Exception*/
private boolean processData(byte[] data) throws Exception {if (data.length > 0) {//从data反序列化出datumMapMap<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class);// 把数据存储到dataStore,也就是本地缓存dataMapfor (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {dataStore.put(entry.getKey(), entry.getValue());//监听器不包括key,就创建一个空的service,并且绑定监听器if (!listeners.containsKey(entry.getKey())) {// pretty sure the service not exist:if (switchDomain.isDefaultInstanceEphemeral()) {// create empty service//创建一个空的serviceLoggers.DISTRO.info("creating service {}", entry.getKey());Service service = new Service();String serviceName = KeyBuilder.getServiceName(entry.getKey());String namespaceId = KeyBuilder.getNamespace(entry.getKey());service.setName(serviceName);service.setNamespaceId(namespaceId);service.setGroupName(Constants.DEFAULT_GROUP);// now validate the service. if failed, exception will be thrownservice.setLastModifiedMillis(System.currentTimeMillis());service.recalculateChecksum();// The Listener corresponding to the key value must not be empty// 与键值对应的监听器不能为空,这里的监听器类型是 ServiceManagerRecordListener listener = listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).peek();if (Objects.isNull(listener)) {return false;}//为空的绑定监听器listener.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);}}}//循环所有datumMapfor (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {if (!listeners.containsKey(entry.getKey())) {// Should not happen:Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());continue;}try {//执行监听器的onChange监听方法for (RecordListener listener : listeners.get(entry.getKey())) {listener.onChange(entry.getKey(), entry.getValue().value);}} catch (Exception e) {Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);continue;}// Update data store if listener executed successfully:// 监听器listener执行成功后,就更新dataStoredataStore.put(entry.getKey(), entry.getValue());}}return true;
}
【5】监听器listener
执行成功后,就更新data store
。
增量同步: 新增数据使用异步广播同步:服务注册的InstanceController.register()
就是数据入口,它会调用ServiceManager.registerInstance()
,执行数据同步的时候,调用addInstance()
,在该方法中会执行DistroConsistencyServiceImpl.put()
,该方法是增量同步的入口,会调用distroProtocol.sync()
方法,代码如下:
/**** 数据保存* @param key key of data, this key should be globally unique* @param value value of data* @throws NacosException*/
@Override
public void put(String key, Record value) throws NacosException {//将数据存入到dataStore中onPut(key, value);//使用distroProtocol同步数据distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX),DataOperation.CHANGE,globalConfig.getTaskDispatchPeriod() / 2);
}
【1】DistroProtocol
使用sync()
方法接收增量数据
public void sync(DistroKey distroKey, DataOperation action, long delay) {//向除了自己外的所有节点广播for (Member each : memberManager.allMembersWithoutSelf()) {DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),each.getAddress());DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);//从distroTaskEngineHolder获取延时执行引擎,并将distroDelayTask任务添加进来//执行延时任务发布distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());}}
}
【2】向其他节点发布广播任务:调用distroTaskEngineHolder
发布延迟任务
/**** 构造函数指定任务处理器* @param distroComponentHolder*/
public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);//指定任务处理器defaultDelayTaskProcessordelayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);
}
【3】调用DistroDelayTaskProcessor.process()
方法进行任务投递:将延迟任务转换为异步变更任务
/**** 任务处理过程* @param task task.* @return*/
@Override
public boolean process(NacosTask task) {if (!(task instanceof DistroDelayTask)) {return true;}DistroDelayTask distroDelayTask = (DistroDelayTask) task;DistroKey distroKey = distroDelayTask.getDistroKey();if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) {//将延迟任务变更成异步任务,异步任务对象是一个线程DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);//将任务添加到NacosExecuteTaskExecuteEngine中,并执行distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);return true;}return false;
}
【4】执行变更任务DistroSyncChangeTask.run()
方法:向指定节点发送消息
/**** 执行数据同步*/
@Override
public void run() {Loggers.DISTRO.info("[DISTRO-START] {}", toString());try {//获取本地缓存数据String type = getDistroKey().getResourceType();DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());distroData.setType(DataOperation.CHANGE);//向其他节点同步数据boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());if (!result) {handleFailedTask();}Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);} catch (Exception e) {Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);handleFailedTask();}
}
● 调用DistroHttpAgent.syncData()
方法发送数据
● 调用NamingProxy.syncData()
方法发送数据
/**** 向其他节点同步数据* @param data data* @param targetServer target server* @return*/
@Override
public boolean syncData(DistroData data, String targetServer) {if (!memberManager.hasMember(targetServer)) {return true;}//获取数据字节数组byte[] dataContent = data.getContent();//通过Http协议同步数据return NamingProxy.syncData(dataContent, data.getDistroKey().getTargetServer());
}
【5】异常任务调用handleFailedTask()
方法进行处理
● 调用DistroFailedTaskHandler
处理失败任务
● 调用DistroHttpCombinedKeyTaskFailedHandler
将失败任务重新投递成延迟任务
private void handleFailedTask() {String type = getDistroKey().getResourceType();DistroFailedTaskHandler failedTaskHandler = distroComponentHolder.findFailedTaskHandler(type);if (null == failedTaskHandler) {return;}// 重试失败任务failedTaskHandler.retry(getDistroKey(), ApplyAction.CHANGE);
}
相关文章:

深入解析:Nacos AP 模式的实现原理与应用场景
优质博文:IT-BLOG-CN 一、CAP 基础 Nacos作为注册中心同时支持CP和AP模式。 Nacos通过不同的协议和机制来实现这两种模式,以满足不同的需求场景。 在Nacos中,默认情况下使用的是AP模式,通过Distro协议来实现。AP模式主要关…...
snmpnetstat使用说明
1.snmpnetstat介绍 snmpnetstat命令是用来查看snmp服务器的端口连接信息。 2.snmpnetstat安装 1.snmpnetstat安装 命令: yum -y install net-snmp net-snmp-utils [root@logstash ~]# yum -y install net-snmp net-snmp-utils Loaded plugins: fastestmirror Loading mi…...

linux线程 | 同步与互斥 | 互斥(下)
前言:本篇文章主要讲述linux线程的互斥的知识。 讲解流程为先讲解锁的工作原理, 再自己封装一下锁并且使用一下。 做完这些就要输出一堆理论性的东西, 但博主会总结两条结论!!最后就是讲一下死锁。 那么, 废…...
2024-10-17 问AI: [AI面试题] 讨论 AI 的挑战和局限性
文心一言 AI(人工智能)虽然取得了显著的进步,但仍面临多方面的挑战和局限性。以下是对AI挑战和局限性的详细讨论: 一、数据质量与可靠性 数据质量:AI系统依赖于高质量的数据进行训练和学习。如果数据质量低劣或包含…...
go基础(一)
包声明引入包函数变量语句&表达式注释 package main//包声明import "fmt"//引入包 //函数 func main() {/* 这是我的第一个简单的程序 */fmt.Println("Hello, World!") }基础语法 标记 go程序可以由多个标记组成,可以是关键字࿰…...
python忽略warnings 的方法
我在训练深度学习模型的时候一直出现这样的警告,但是不影响运行: UserWarning: Failed to load image Python extension: [WinError 127] 找不到指定的程序。 warn(f"Failed to load image Python extension: {e}") 要避免在 Python 程序运…...

2024年底蓝奏云最新可用API接口列表 支持优享版 无需手动抓取cookie
Lanzou Pro V1 接口列表 API状态版本路由获取文件与目录✅^1.0.1/v1/getFilesAndDirectories?url{}&page{}获取目录✅^1.0.0/v1/getDirectory?url{}获取文件✅^1.0.1/v1/getFiles?url{}&page{}搜索文件✅^1.0.0/v1/searchFile?url{}&wd{}依Id解析✅^1.0.2/v1/…...

Linux常用命令详细解析(含完整命令演示过程)
目录 1. 目录结构介绍 2. Linux命令基础 2.1 命令和命令行 2.2 格式 3. 常用命令 3.1 产看目录命令——ls 3.2 通配符 3.3 改变工作目录命令——cd 3.4 查看当前路径命令——pwd 3.5 创建新的目录命令——mkdir 3.6 创建文件目录命令——touch 3.7 查看…...

《使用Gin框架构建分布式应用》阅读笔记:p101-p107
《用Gin框架构建分布式应用》学习第7天,p101-p107总结,总计7页。 一、技术总结 1.StatusBadRequest vs StatusInternalServerError 写代码的时候有一个问题,什么时候使用 StatusBadRequest(400错误),什么时候使用 StatusIntern…...

014集——c#实现打开、另存对话框(CAD—C#二次开发入门)
如下图所示,运行后实现如下功能: 打开对话框,选择一个文件,并获取文件名变量。 打开另存对话框,输入路径和文件名,获取另存文件名变量。 部分代码如下: public static void Ofd(this Database…...

全面升级:亚马逊测评环境方案的最新趋势与实践
在亚马逊测评领域深耕多年,见证了无数环境方案的更迭与演变,每一次变化都体现了国人不畏艰难、勇于创新的精神。面对平台的政策调整,总能找到相应的对策。那么,当前是否存在一套相对稳定且高效的技术方案呢?答案是肯定…...
Java中的异步编程模型
1.什么是异步编程? 异步编程是一种编程模式,允许程序在等待某些操作(例如文件I/O或网络请求)完成时,不必停下来等待,而是继续执行其他任务。当异步操作完成时,回调函数或任务调度器会处理结果&…...
opencv 按位操作
opencv位运算说明 按位与,按位或,按位非,按位异或 在 OpenCV 中,按位操作函数的接口一般包括两个或多个图像数组(矩阵)作为输入,常常还会有一个可选的掩码参数。下面我列出每个函数的具体接口…...
【Bug】STM32串口空闲中断接收不定长数据异常
Bug 使用标准库配置STM32F103C8T6的串口1开启接收中断和空闲中断,通过空闲中断来判断数据发送是否结束,收到数据后切换板载LED灯所接引脚电平,发现LED出现三种情况,熄灭、微亮、正常亮,但是LED灯所接的GPIO引脚为PC13…...

使用Radzen Blazor组件库开发的基于ABP框架炫酷UI主题
一、项目简介 使用过ABP框架的童鞋应该知道它也自带了一款免费的Blazor UI主题,它的页面是长这样的: 个人感觉不太美观,于是网上搜了很多Blazor开源组件库,发现有一款样式非常不错的组件库,名叫:Radzen&am…...

Java入门4——输入输出+实用的函数
在本篇博客,采用代码解释的方法,帮助大家熟悉Java的语法 一、输入和输出 在Java当中,我们一般有这样输入输出: import java.util.Scanner;public class javaSchool {public static void main(String[] args) {Scanner scanner …...

《当尼采哭泣》
这是一个相互救赎的故事。故事铺垫比较冗长,看到一半的时候一度看不下去。直到看到最后两章才最终感觉值得一看。很多表层现象,就像露出水面的冰山。解决表面的问题,需要深挖冰山水下的部分。一个人碰到的最难解决的问题不在外部,…...
TOMCAT Using CATALINA——OPTS,闪退解决方法(两种)
【Java实践】安装tomcat启动startup.bat出现闪退问题_安装tomcat点击startup闪退-CSDN博客...

Android音视频 MediaCodec框架-启动编码(4)
Android音视频 MediaCodec框架-启动编码 简述 上一节我们介绍了MediaCodec框架创建编码器流程,编解码的流程其实基本是一样的,只是底层的最终的实现组件不同,所以我们只看启动编码流程。 MediaCodec启动编码 从MediaCodec的start方法开始…...
# Go 语言中的 Interface 和 Struct
go package mainimport ("fmt" )// Girl 是一个接口,定义了所有"女性"类型都应该实现的方法 type Girl interface {call()introduce() }// Wife 结构体代表妻子 type wife struct {name stringage intyearsWed int }// call 方法…...

多云管理“拦路虎”:深入解析网络互联、身份同步与成本可视化的技术复杂度
一、引言:多云环境的技术复杂性本质 企业采用多云策略已从技术选型升维至生存刚需。当业务系统分散部署在多个云平台时,基础设施的技术债呈现指数级积累。网络连接、身份认证、成本管理这三大核心挑战相互嵌套:跨云网络构建数据…...
Android Wi-Fi 连接失败日志分析
1. Android wifi 关键日志总结 (1) Wi-Fi 断开 (CTRL-EVENT-DISCONNECTED reason3) 日志相关部分: 06-05 10:48:40.987 943 943 I wpa_supplicant: wlan0: CTRL-EVENT-DISCONNECTED bssid44:9b:c1:57:a8:90 reason3 locally_generated1解析: CTR…...
ssc377d修改flash分区大小
1、flash的分区默认分配16M、 / # df -h Filesystem Size Used Available Use% Mounted on /dev/root 1.9M 1.9M 0 100% / /dev/mtdblock4 3.0M...

前端导出带有合并单元格的列表
// 导出async function exportExcel(fileName "共识调整.xlsx") {// 所有数据const exportData await getAllMainData();// 表头内容let fitstTitleList [];const secondTitleList [];allColumns.value.forEach(column > {if (!column.children) {fitstTitleL…...
蓝桥杯 2024 15届国赛 A组 儿童节快乐
P10576 [蓝桥杯 2024 国 A] 儿童节快乐 题目描述 五彩斑斓的气球在蓝天下悠然飘荡,轻快的音乐在耳边持续回荡,小朋友们手牵着手一同畅快欢笑。在这样一片安乐祥和的氛围下,六一来了。 今天是六一儿童节,小蓝老师为了让大家在节…...
leetcodeSQL解题:3564. 季节性销售分析
leetcodeSQL解题:3564. 季节性销售分析 题目: 表:sales ---------------------- | Column Name | Type | ---------------------- | sale_id | int | | product_id | int | | sale_date | date | | quantity | int | | price | decimal | -…...

零基础设计模式——行为型模式 - 责任链模式
第四部分:行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习!行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想:使多个对象都有机会处…...
docker 部署发现spring.profiles.active 问题
报错: org.springframework.boot.context.config.InvalidConfigDataPropertyException: Property spring.profiles.active imported from location class path resource [application-test.yml] is invalid in a profile specific resource [origin: class path re…...

算法:模拟
1.替换所有的问号 1576. 替换所有的问号 - 力扣(LeetCode) 遍历字符串:通过外层循环逐一检查每个字符。遇到 ? 时处理: 内层循环遍历小写字母(a 到 z)。对每个字母检查是否满足: 与…...
C++课设:简易日历程序(支持传统节假日 + 二十四节气 + 个人纪念日管理)
名人说:路漫漫其修远兮,吾将上下而求索。—— 屈原《离骚》 创作者:Code_流苏(CSDN)(一个喜欢古诗词和编程的Coder😊) 专栏介绍:《编程项目实战》 目录 一、为什么要开发一个日历程序?1. 深入理解时间算法2. 练习面向对象设计3. 学习数据结构应用二、核心算法深度解析…...