当前位置: 首页 > article >正文

Nacos源码—7.Nacos升级gRPC分析四

大纲

5.服务变动时如何通知订阅的客户端

6.微服务实例信息如何同步集群节点

6.微服务实例信息如何同步集群节点

(1)服务端处理服务注册时会发布一个ClientChangedEvent事件

(2)ClientChangedEvent事件的处理源码

(3)集群节点处理数据同步请求的源码

(1)服务端处理服务注册时会发布一个ClientChangedEvent事件

ClientChangedEvent事件的作用就是向集群节点同步服务实例数据的。

//Instance request handler.
@Component
public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {private final EphemeralClientOperationServiceImpl clientOperationService;public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {this.clientOperationService = clientOperationService;}@Override@Secured(action = ActionTypes.WRITE)public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {//根据请求信息创建一个Service对象,里面包含了:命名空间、分组名、服务名Service service = Service.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);switch (request.getType()) {case NamingRemoteConstants.REGISTER_INSTANCE://注册实例return registerInstance(service, request, meta);case NamingRemoteConstants.DE_REGISTER_INSTANCE://注销实例return deregisterInstance(service, request, meta);default:throw new NacosException(NacosException.INVALID_PARAM, String.format("Unsupported request type %s", request.getType()));}}private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {//调用EphemeralClientOperationServiceImpl的注册方法registerInstance(),这里需要注意如下参数;//参数service:根据请求信息创建的一个Service对象,里面有命名空间、分组名、服务名//参数request.getInstance():这个参数就对应了客户端的实例对象,里面包含IP、端口等信息//参数meta.getConnectionId():这个参数很关键,它是连接IDclientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);}...
}@Component("ephemeralClientOperationService")
public class EphemeralClientOperationServiceImpl implements ClientOperationService {private final ClientManager clientManager;public EphemeralClientOperationServiceImpl(ClientManagerDelegate clientManager) {this.clientManager = clientManager;}@Overridepublic void registerInstance(Service service, Instance instance, String clientId) {//从ServiceManager中根据由请求信息创建的Service对象获取一个已注册的Service对象Service singleton = ServiceManager.getInstance().getSingleton(service);if (!singleton.isEphemeral()) {throw new NacosRuntimeException(NacosException.INVALID_PARAM, String.format("Current service %s is persistent service, can't register ephemeral instance.", singleton.getGroupedServiceName()));}//从ClientManagerDelegate中根据请求参数中的connectionId获取一个Client对象,即IpPortBasedClient对象Client client = clientManager.getClient(clientId);if (!clientIsLegal(client, clientId)) {return;}//将请求中的instance实例信息封装为InstancePublishInfo对象InstancePublishInfo instanceInfo = getPublishInfo(instance);//往Client对象里添加已注册的服务对象Service,调用的是IpPortBasedClient对象的父类AbstractClient的addServiceInstance()方法client.addServiceInstance(singleton, instanceInfo);//设置IpPortBasedClient对象的lastUpdatedTime属性为最新时间client.setLastUpdatedTime();//发布客户端注册服务实例的事件NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));//发布服务实例元数据的事件NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));}...
}//Nacos naming client based ip and port.
//The client is bind to the ip and port users registered. It's a abstract content to simulate the tcp session client.
public class IpPortBasedClient extends AbstractClient {...@Overridepublic boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {return super.addServiceInstance(service, parseToHealthCheckInstance(instancePublishInfo));}...
}//Abstract implementation of {@code Client}.
public abstract class AbstractClient implements Client {//publishers其实就是记录该客户端提供的服务和服务实例,一个客户端可提供多个服务//存储客户端发送过来的请求中的Instance信息,当然这些信息已封装为InstancePublishInfo对象//key为已注册的Service,value是根据请求中的instance实例信息封装的InstancePublishInfo对象protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);//subscribers存放着:订阅者Subscriber(其实可理解为当前客户端)订阅了的Service服务//subscribers的key=stock-service(要订阅的某个服务)、value=order-service(订阅者,某个具体的包含IP的服务实例)protected final ConcurrentHashMap<Service, Subscriber> subscribers = new ConcurrentHashMap<>(16, 0.75f, 1);...@Overridepublic boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {//服务注册时,如果是第一次put进去Service对象,会返回nullif (null == publishers.put(service, instancePublishInfo)) {//监视器记录MetricsMonitor.incrementInstanceCount();}//发布客户端改变事件,用于处理集群间的数据同步NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());return true;}...
}

(2)ClientChangedEvent事件的处理源码

DistroClientDataProcessor的onEvent()方法会响应ClientChangedEvent。该方法如果判断出事件类型为ClientChangedEvent事件,那么就会执行DistroClientDataProcessor的syncToAllServer()方法,然后调用DistroProtocol的sync()方法进行集群节点同步处理。

DistroProtocol的sync()方法会遍历集群中除自身节点外的其他节点,然后对遍历到的每个节点执行DistroProtocol的syncToTarget()方法。

在DistroProtocol的syncToTarget()方法中,首先把要同步的集群节点targetServer包装成DistroKey对象,然后根据DistroKey对象创建DistroDelayTask延迟任务,接着调用NacosDelayTaskExecuteEngine的addTask()方法,往延迟任务执行引擎的tasks中添加任务。

NacosDelayTaskExecuteEngine在初始化时会启动一个定时任务,这个定时任务会定时执行ProcessRunnable的run()方法。而ProcessRunnable的run()方法会不断从任务池tasks中取出延迟任务处理,处理DistroDelayTask任务时会调用DistroDelayTaskProcessor的process()方法。

在执行DistroDelayTaskProcessor的process()方法时,会先根据DistroDelayTask任务封装一个DistroSyncChangeTask任务,然后调用NacosExecuteTaskExecuteEngine的addTask()方法。也就是调用TaskExecuteWorker的process()方法,将DistroSyncChangeTask任务添加到TaskExecuteWorker的阻塞队列中,同时创建TaskExecuteWorker时会启动线程不断从队列中取出任务处理。因此最终会执行DistroSyncChangeTask的run()方法。

public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor {private final ClientManager clientManager;private final DistroProtocol distroProtocol;...@Overridepublic void onEvent(Event event) {...if (event instanceof ClientEvent.ClientVerifyFailedEvent) {syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);} else {syncToAllServer((ClientEvent) event);}}private void syncToAllServer(ClientEvent event) {Client client = event.getClient();//Only ephemeral data sync by Distro, persist client should sync by raft.//临时实例使用Distro协议,持久化实例使用Raft协议//ClientManager.isResponsibleClient()方法,判断只有该client的责任节点才能进行集群数据同步if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {return;}if (event instanceof ClientEvent.ClientDisconnectEvent) {//如果event是客户端注销实例时需要进行集群节点同步的事件DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.DELETE);} else if (event instanceof ClientEvent.ClientChangedEvent) {//如果event是客户端注册实例时需要进行集群节点同步的事件DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.CHANGE);}}...
}@Component
public class DistroProtocol {private final ServerMemberManager memberManager;private final DistroTaskEngineHolder distroTaskEngineHolder;...//Start to sync by configured delay.public void sync(DistroKey distroKey, DataOperation action) {sync(distroKey, action, DistroConfig.getInstance().getSyncDelayMillis());}//Start to sync data to all remote server.public void sync(DistroKey distroKey, DataOperation action, long delay) {//遍历集群中除自身节点外的其他节点for (Member each : memberManager.allMembersWithoutSelf()) {syncToTarget(distroKey, action, each.getAddress(), delay);}}//Start to sync to target server.public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {//先把要同步的集群节点targetServer包装成DistroKey对象DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), targetServer);//然后根据DistroKey对象创建DistroDelayTask任务DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);//接着调用NacosDelayTaskExecuteEngine.addTask()方法//往延迟任务执行引擎DistroDelayTaskExecuteEngine中添加延迟任务DistroDelayTaskdistroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);}}...
}//延迟任务执行引擎
public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> {private final ScheduledExecutorService processingExecutor;protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;//任务池public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {super(logger);tasks = new ConcurrentHashMap<>(initCapacity);processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));//开启定时任务,即启动ProcessRunnable线程任务processingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);}...@Overridepublic void addTask(Object key, AbstractDelayTask newTask) {lock.lock();try {AbstractDelayTask existTask = tasks.get(key);if (null != existTask) {newTask.merge(existTask);}//最后放入到任务池中tasks.put(key, newTask);} finally {lock.unlock();}}protected void processTasks() {//获取tasks中所有的任务,然后进行遍历Collection<Object> keys = getAllTaskKeys();for (Object taskKey : keys) {//通过任务key,获取具体的任务,并且从任务池中移除掉AbstractDelayTask task = removeTask(taskKey);if (null == task) {continue;}//通过任务key获取对应的NacosTaskProcessor延迟任务处理器NacosTaskProcessor processor = getProcessor(taskKey);if (null == processor) {getEngineLog().error("processor not found for task, so discarded. " + task);continue;}try {//ReAdd task if process failed//调用获取到的NacosTaskProcessor延迟任务处理器的process()方法if (!processor.process(task)) {//如果失败了,会重试添加task回tasks这个map中retryFailedTask(taskKey, task);}} catch (Throwable e) {getEngineLog().error("Nacos task execute error ", e);retryFailedTask(taskKey, task);}}}...private class ProcessRunnable implements Runnable {@Overridepublic void run() {try {processTasks();} catch (Throwable e) {getEngineLog().error(e.toString(), e);}}}
}//Distro delay task processor.
public class DistroDelayTaskProcessor implements NacosTaskProcessor {...@Overridepublic boolean process(NacosTask task) {if (!(task instanceof DistroDelayTask)) {return true;}DistroDelayTask distroDelayTask = (DistroDelayTask) task;DistroKey distroKey = distroDelayTask.getDistroKey();switch (distroDelayTask.getAction()) {case DELETE://处理客户端注销实例时的延迟任务(同步数据到集群节点)//根据DistroDelayTask任务封装一个DistroSyncTask任务DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);//调用NacosExecuteTaskExecuteEngine.addTask()方法distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);return true;case CHANGE:case ADD://处理客户端注册实例时的延迟任务(同步数据到集群节点)//根据DistroDelayTask任务封装一个DistroSyncChangeTask任务DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);//调用NacosExecuteTaskExecuteEngine.addTask()方法distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);return true;default:return false;}}
}//任务执行引擎
public class NacosExecuteTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractExecuteTask> {private final TaskExecuteWorker[] executeWorkers;public NacosExecuteTaskExecuteEngine(String name, Logger logger, int dispatchWorkerCount) {super(logger);executeWorkers = new TaskExecuteWorker[dispatchWorkerCount];for (int mod = 0; mod < dispatchWorkerCount; ++mod) {executeWorkers[mod] = new TaskExecuteWorker(name, mod, dispatchWorkerCount, getEngineLog());}}...@Overridepublic void addTask(Object tag, AbstractExecuteTask task) {//根据tag获取到TaskExecuteWorkerNacosTaskProcessor processor = getProcessor(tag);if (null != processor) {processor.process(task);return;}TaskExecuteWorker worker = getWorker(tag);//调用TaskExecuteWorker.process()方法把AbstractExecuteTask任务放入到队列当中去worker.process(task);}private TaskExecuteWorker getWorker(Object tag) {int idx = (tag.hashCode() & Integer.MAX_VALUE) % workersCount();return executeWorkers[idx];}    ...
}public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable {private final BlockingQueue<Runnable> queue;//任务存储容器public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {...this.queue = new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY);new InnerWorker(name).start();}@Overridepublic boolean process(NacosTask task) {if (task instanceof AbstractExecuteTask) {//把NacosTask任务放入到阻塞队列中putTask((Runnable) task);}return true;}private void putTask(Runnable task) {try {//把NacosTask任务放入到阻塞队列中queue.put(task);} catch (InterruptedException ire) {log.error(ire.toString(), ire);}}...private class InnerWorker extends Thread {InnerWorker(String name) {setDaemon(false);setName(name);}@Overridepublic void run() {while (!closed.get()) {try {//一直取阻塞队列中的任务Runnable task = queue.take();long begin = System.currentTimeMillis();//调用NacosTask中的run方法task.run();long duration = System.currentTimeMillis() - begin;if (duration > 1000L) {log.warn("task {} takes {}ms", task, duration);}} catch (Throwable e) {log.error("[TASK-FAILED] " + e.toString(), e);}}}}
}

执行DistroSyncChangeTask的run()方法,其实就是执行AbstractDistroExecuteTask的run()方法。AbstractDistroExecuteTask的run()方法会先获取请求数据,然后调用DistroClientTransportAgent的syncData()方法同步集群节点,也就是调用ClusterRpcClientProxy的sendRequest()方法发送数据同步请求,最终会调用RpcClient的request()方法 -> GrpcConnection的request()方法。

public class DistroSyncChangeTask extends AbstractDistroExecuteTask {...@Overrideprotected void doExecuteWithCallback(DistroCallback callback) {String type = getDistroKey().getResourceType();//获取请求数据DistroData distroData = getDistroData(type);if (null == distroData) {Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());return;}//默认调用DistroClientTransportAgent.syncData()方法同步集群节点getDistroComponentHolder().findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer(), callback);}private DistroData getDistroData(String type) {DistroData result = getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey());if (null != result) {result.setType(OPERATION);}return result;}...
}public abstract class AbstractDistroExecuteTask extends AbstractExecuteTask {...@Overridepublic void run() {//Nacos:Naming:v2:ClientDataString type = getDistroKey().getResourceType();//获取DistroClientTransportAgent对象DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(type);if (null == transportAgent) {Loggers.DISTRO.warn("No found transport agent for type [{}]", type);return;}Loggers.DISTRO.info("[DISTRO-START] {}", toString());//默认返回trueif (transportAgent.supportCallbackTransport()) {//默认执行子类的doExecuteWithCallback()方法doExecuteWithCallback(new DistroExecuteCallback());} else {executeDistroTask();}}protected abstract void doExecuteWithCallback(DistroCallback callback);...
}public class DistroClientTransportAgent implements DistroTransportAgent {private final ClusterRpcClientProxy clusterRpcClientProxy;private final ServerMemberManager memberManager;...@Overridepublic boolean syncData(DistroData data, String targetServer) {if (isNoExistTarget(targetServer)) {return true;}//创建请求对象DistroDataRequest request = new DistroDataRequest(data, data.getType());//找到集群节点Member member = memberManager.find(targetServer);if (checkTargetServerStatusUnhealthy(member)) {Loggers.DISTRO.warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy", targetServer);return false;}try {//向集群节点发送RPC异步请求Response response = clusterRpcClientProxy.sendRequest(member, request);return checkResponse(response);} catch (NacosException e) {Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! ", e);}return false;}...
}@Service
public class ClusterRpcClientProxy extends MemberChangeListener {...//send request to member.public Response sendRequest(Member member, Request request, long timeoutMills) throws NacosException {RpcClient client = RpcClientFactory.getClient(memberClientKey(member));if (client != null) {//调用RpcClient.request()方法return client.request(request, timeoutMills);} else {throw new NacosException(CLIENT_INVALID_PARAM, "No rpc client related to member: " + member);}}...
}public abstract class RpcClient implements Closeable {//在NamingGrpcClientProxy初始化 -> 调用RpcClient.start()方法时,//会将GrpcClient.connectToServer()方法的返回值赋值给currentConnection属性protected volatile Connection currentConnection;...//send request.public Response request(Request request, long timeoutMills) throws NacosException {int retryTimes = 0;Response response;Exception exceptionThrow = null;long start = System.currentTimeMillis();while (retryTimes < RETRY_TIMES && System.currentTimeMillis() < timeoutMills + start) {...//发起gRPC请求,调用GrpcConnection.request()方法response = this.currentConnection.request(request, timeoutMills);...}...}...
}

(3)集群节点处理数据同步请求的源码

通过DistroClientTransportAgent的syncData()方法发送的数据同步请求,会被DistroDataRequestHandler的handle()方法处理。然后会调用DistroDataRequestHandler的handleSyncData()方法,接着调用DistroProtocol的onReceive()方法,于是最终会调用到DistroClientDataProcessor.processData()方法。

在执行DistroClientDataProcessor的processData()方法时,如果是同步服务实例新增、修改后的数据,则执行DistroClientDataProcessor的handlerClientSyncData()方法。该方法会和处理服务注册时一样,发布一个客户端注册服务实例的事件。如果是同步服务实例删除后的数据,则调用EphemeralIpPortClientManager的clientDisconnected()方法。首先移除客户端对象信息,然后发布一个客户端注销服务实例的事件。

其中客户端注销服务实例的事件ClientDisconnectEvent,首先会被ClientServiceIndexesManager的onEvent()方法进行处理,处理时会调用ClientServiceIndexesManager的handleClientDisconnect()方法,移除ClientServiceIndexesManager订阅者列表的元素和注册表的元素。然后会被DistroClientDataProcessor的onEvent()方法进行处理,进行集群节点之间的数据同步。

@Component
public class DistroDataRequestHandler extends RequestHandler<DistroDataRequest, DistroDataResponse> {private final DistroProtocol distroProtocol;...@Overridepublic DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {try {switch (request.getDataOperation()) {case VERIFY:return handleVerify(request.getDistroData(), meta);case SNAPSHOT:return handleSnapshot();case ADD:case CHANGE:case DELETE://服务实例新增、修改、删除的同步,都会由DistroDataRequestHandler.handleSyncData()方法处理return handleSyncData(request.getDistroData());case QUERY:return handleQueryData(request.getDistroData());default:return new DistroDataResponse();}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);DistroDataResponse result = new DistroDataResponse();result.setErrorCode(ResponseCode.FAIL.getCode());result.setMessage("handle distro request with exception");return result;}}private DistroDataResponse handleSyncData(DistroData distroData) {DistroDataResponse result = new DistroDataResponse();//调用DistroProtocol.onReceive()方法if (!distroProtocol.onReceive(distroData)) {result.setErrorCode(ResponseCode.FAIL.getCode());result.setMessage("[DISTRO-FAILED] distro data handle failed");}return result;}...
}@Component
public class DistroProtocol {...//Receive synced distro data, find processor to process.public boolean onReceive(DistroData distroData) {Loggers.DISTRO.info("[DISTRO] Receive distro data type: {}, key: {}", distroData.getType(), distroData.getDistroKey());//Nacos:Naming:v2:ClientDataString resourceType = distroData.getDistroKey().getResourceType();//获取DistroClientDataProcessor处理对象DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);if (null == dataProcessor) {Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);return false;}//调用DistroClientDataProcessor.processData()方法return dataProcessor.processData(distroData);}...
}public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor {...@Overridepublic boolean processData(DistroData distroData) {switch (distroData.getType()) {case ADD:case CHANGE://服务实例添加和改变时的执行逻辑ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class).deserialize(distroData.getContent(), ClientSyncData.class);handlerClientSyncData(clientSyncData);return true;case DELETE://服务实例删除时的执行逻辑String deleteClientId = distroData.getDistroKey().getResourceKey();Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);//调用EphemeralIpPortClientManager.clientDisconnected()方法clientManager.clientDisconnected(deleteClientId);return true;default:return false;}}private void handlerClientSyncData(ClientSyncData clientSyncData) {Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());Client client = clientManager.getClient(clientSyncData.getClientId());upgradeClient(client, clientSyncData);}private void upgradeClient(Client client, ClientSyncData clientSyncData) {List<String> namespaces = clientSyncData.getNamespaces();List<String> groupNames = clientSyncData.getGroupNames();List<String> serviceNames = clientSyncData.getServiceNames();List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();Set<Service> syncedService = new HashSet<>();for (int i = 0; i < namespaces.size(); i++) {Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));Service singleton = ServiceManager.getInstance().getSingleton(service);syncedService.add(singleton);InstancePublishInfo instancePublishInfo = instances.get(i);//如果和当前不一样才发布事件if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {client.addServiceInstance(singleton, instancePublishInfo);//发布客户端注册服务实例的事件,与客户端进行服务注册时一样NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));}}for (Service each : client.getAllPublishedService()) {if (!syncedService.contains(each)) {client.removeServiceInstance(each);NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));}}}...
}@Component("ephemeralIpPortClientManager")
public class EphemeralIpPortClientManager implements ClientManager {//key是请求参数中的connectionId即clientId,value是一个继承了实现Client接口的AbstractClient的IpPortBasedClient对象private final ConcurrentMap<String, IpPortBasedClient> clients = new ConcurrentHashMap<>();...@Overridepublic boolean clientDisconnected(String clientId) {Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);//移除客户端信息IpPortBasedClient client = clients.remove(clientId);if (null == client) {return true;}//发布客户端注销服务实例的事件NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));client.release();return true;}...
}@Component
public class ClientServiceIndexesManager extends SmartSubscriber {//注册表(服务提供者),一个Service服务对象,对应多个服务实例的clientIdprivate final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();//订阅者列表(服务消费者),一个Service服务对象,对应多个订阅者的clientIdprivate final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();...@Overridepublic void onEvent(Event event) {if (event instanceof ClientEvent.ClientDisconnectEvent) {handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);} else if (event instanceof ClientOperationEvent) {handleClientOperation((ClientOperationEvent) event);}}private void handleClientDisconnect(ClientEvent.ClientDisconnectEvent event) {Client client = event.getClient();for (Service each : client.getAllSubscribeService()) {//移除订阅者列表的元素removeSubscriberIndexes(each, client.getClientId());}for (Service each : client.getAllPublishedService()) {//移除注册表的元素removePublisherIndexes(each, client.getClientId());}}...
}public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor {...@Overridepublic void onEvent(Event event) {...if (event instanceof ClientEvent.ClientVerifyFailedEvent) {syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);} else {syncToAllServer((ClientEvent) event);}}private void syncToAllServer(ClientEvent event) {Client client = event.getClient();//Only ephemeral data sync by Distro, persist client should sync by raft.//临时实例使用Distro协议,持久化实例使用Raft协议//ClientManager.isResponsibleClient()方法,判断只有该client的责任节点才能进行集群数据同步if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {return;}if (event instanceof ClientEvent.ClientDisconnectEvent) {//如果event是客户端注销实例时需要进行集群节点同步的事件DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.DELETE);} else if (event instanceof ClientEvent.ClientChangedEvent) {//如果event是客户端注册实例时需要进行集群节点同步的事件DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.CHANGE);}}...
}

(4)总结

一.执行引擎的总结

延时任务执行引擎的实现原理是引擎有一个Map类型的tasks任务池,这个任务池可以根据key映射对应的任务处理器。引擎会定时从任务池中获取任务,执行任务处理器的处理方法处理任务。

任务执行引擎的实现原理是会创建多个任务执行Worker,每个任务执行Worker都会有一个阻塞队列。向任务执行引擎添加任务时会将任务添加到其中一个Woker的阻塞队列中,Worker在初始化时就会启动一个线程不断取出阻塞队列中的任务来处理。所以任务执行引擎会通过阻塞队列 + 异步任务的方式来实现。

二.用于向集群节点同步数据的客户端改变事件的处理流程总结

步骤一:先创建DistroDelayTask延迟任务放入到延迟任务执行引擎的任务池,DistroDelayTask延迟任务会由DistroDelayTaskProcessor处理器处理。

步骤二:DistroDelayTaskProcessor处理时会创建DistroSyncChangeTask任务,然后再将任务分发添加到执行引擎中的任务执行Worker的阻塞队列中。

步骤三:任务执行Worker会从队列中获取并执行DistroSyncChangeTask任务,也就是执行引擎会触发调用AbstractDistroExecuteTask的run()方法,从而调用DistroSyncChangeTask的doExecuteWithCallback()方法。

步骤四:doExecuteWithCallback()方法会获取最新的微服务实例列表,然后通过DistroClientTransportAgent的syncData()方法发送数据同步请求。

相关文章:

Nacos源码—7.Nacos升级gRPC分析四

大纲 5.服务变动时如何通知订阅的客户端 6.微服务实例信息如何同步集群节点 6.微服务实例信息如何同步集群节点 (1)服务端处理服务注册时会发布一个ClientChangedEvent事件 (2)ClientChangedEvent事件的处理源码 (3)集群节点处理数据同步请求的源码 (1)服务端处理服务注册…...

TIME - MoE 模型代码 3.2——Time-MoE-main/time_moe/datasets/time_moe_dataset.py

源码&#xff1a;GitHub - Time-MoE/Time-MoE: [ICLR 2025 Spotlight] Official implementation of "Time-MoE: Billion-Scale Time Series Foundation Models with Mixture of Experts" 这段代码定义了一个用于时间序列数据处理的 TimeMoEDataset 类&#xff0c;支…...

【某OTA网站】phantom-token 1004

新版1004 phantom-token 请求头中包含phantom-token 定位到 window.signature 熟悉的vmp 和xhs一样 最新环境检测点 最新检测 canvas 下的 toDataURL方法较严 过程中 会用setAttribute给canvas 设置width height 从而使toDataURL返回不同的值 如果写死toDataURL的返回值…...

OrangePi Zero 3学习笔记(Android篇)2 - 第一个C程序

目录 1. 创建项目文件夹 2. 创建c/cpp文件 3. 创建Android.mk/Android.bp文件 3.1 Android.mk 3.2 Android.bp 4. 编译 5. adb push 6. 打包到image中 在AOSP里面添加一个C或C程序&#xff0c;这个程序在Android中需要通过shell的方式运行。 1. 创建项目文件夹 首先需…...

DeepResearch深度搜索实现方法调研

DeepResearch深度搜索实现方法调研 Deep Research 有三个核心能力 能力一&#xff1a;自主规划解决问题的搜索路径&#xff08;生成子问题&#xff0c;queries&#xff0c;检索&#xff09;能力二&#xff1a;在探索路径时动态调整搜索方向&#xff08;刘亦菲最好的一部电影是…...

使用大语言模型进行机器人规划(Robot planning with LLMs)

李升伟 编译 长期规划在机器人学领域可以从经典控制方法与大型语言模型在现实世界知识能力的结合中获益。 在20世纪80年代&#xff0c;机器人学和人工智能&#xff08;AI&#xff09;领域的专家提出了莫雷奇悖论&#xff0c;观察到人类看似简单的涉及移动和感知的任务&#x…...

【论文阅读】基于客户端数据子空间主角度的聚类联邦学习分布相似性高效识别

Efficient distribution similarity identification in clustered federated learning via principal angles between client data subspaces -- 基于客户端数据子空间主角度的聚类联邦学习分布相似性高效识别 论文来源TLDR背景与问题两个子空间之间的主角&#xff08;Principa…...

Elasticsearch知识汇总之ElasticSearch部署

五 ElasticSearch部署 部署Elasticsearch&#xff0c;可以在任何 Linux、MacOS 或 Windows 机器上运行 Elasticsearch。在Docker 容器 中运行 Elasticsearch 。使用Elastic Cloud on Kubernetes 设置和管理 Elasticsearch、Kibana、Elastic Agent 以及 Kubernetes 上的 Elasti…...

ROBOVERSE:面向可扩展和可泛化机器人学习的统一平台、数据集和基准

25年4月来自UC Berkeley、北大、USC、UMich、UIUC、Stanford、CMU、UCLA 和 北京通用 AI 研究院&#xff08;BIGAI&#xff09;的论文“ROBOVERSE: Towards a Unified Platform, Dataset and Benchmark for Scalable and Generalizable Robot Learning”。 数据扩展和标准化评…...

LVGL的核心:lv_timer_handler

文章目录 &#x1f9e0; 一句话总结 LVGL 的运行核心&#xff1a;&#x1f501; 1. while(1) 主循环中的 lv_task_handler()⏱️ 2. lv_timer_handler() 定时器调度核心✅ 并发控制✅ 关键行为流程&#xff1a;&#x1f300; 任务执行逻辑&#xff1a;&#x1f9ee; 计算下一次…...

(41)VTK C++开发示例 ---qt使用vtk最小示例

文章目录 1. 概述2. CMake链接VTK3. main.cpp文件4. 演示效果 更多精彩内容&#x1f449;内容导航 &#x1f448;&#x1f449;VTK开发 &#x1f448; 1. 概述 本文演示了在Qt中使用VTK的最小示例程序&#xff0c;使用VTK创建显示一个锥体&#xff1b; 采用Cmake作为构建工具&a…...

⭐️⭐️⭐️【课时1:大模型是什么?】学习总结 ⭐️⭐️⭐️ for《大模型Clouder认证:基于百炼平台构建智能体应用》认证

一、学习目标 概要 通过学习《课时1:大模型是什么?》,全面了解大模型的基础概念、核心特点、发展脉络及阿里云在大模型领域的布局,为后续基于百炼平台构建智能体应用的实践操作打下坚实的理论基础。 具体目标列表 理解人工智能到大模型的演变逻辑,明确大模型在AI发展历…...

OS7.【Linux】基本指令入门(6)

目录 1.zip和unzip 配置指令 使用 两个名词:打包和压缩 打包 压缩 Linux下的操作演示 压缩和解压缩文件 压缩和解压缩目录 -d选项 2.tar Linux下的打包和压缩方案简介 czf选项 xzf选项 -C选项 tzf选项 3.bc 4.uname 不带选项的uname -a选项 -r选项 -v选项…...

国标GB28181视频平台EasyCVR安防系统部署知识:如何解决异地监控集中管理和组网问题

在企业、连锁机构及园区管理等场景中&#xff0c;异地监控集中管控与快速组网需求日益迫切。弱电项目人员和企业管理者亟需整合分散监控资源&#xff0c;实现跨区域统一管理与实时查看。 一、解决方案 案例一&#xff1a;运营商专线方案​ 利用运营商专线&#xff0c;连接各分…...

O2O上门服务如何颠覆传统足浴行业?真实案例分析

在湖南经营传统足浴店的张总最近遇到了件让他哭笑不得的事。原本他的门店生意还算稳定&#xff0c;虽然这两年行情不好&#xff0c;但靠着老顾客还能勉强维持。可谁想到&#xff0c;一次好心帮忙&#xff0c;竟让他发现了行业的新天地。 几年前&#xff0c;张总的一位做砂石生意…...

金仓数据库永久增量备份技术原理与操作

先用一张图说明一下常见的备份方式 为什么需要永久增量备份 传统的数据库备份方案通常是间隔7天对数据库做一次全量备份&#xff08;完整备份&#xff09;&#xff0c;每天会基于全量备份做一次增量备份&#xff0c;如此循环&#xff0c;这种备份方案在全备数据量过大场景下…...

19、HashTable(哈希)、位图的实现和布隆过滤器的介绍

一、了解哈希【散列表】 1、哈希的结构 在STL中&#xff0c;HashTable是一个重要的底层数据结构, 无序关联容器包括unordered_set, unordered_map内部都是基于哈希表实现 哈希表又称散列表&#xff0c;一种以「key-value」形式存储数据的数据结构。哈希函数&#xff1a;负责将…...

函数级重构:如何写出高可读性的方法?

1. 引言:为什么方法级别的重构如此重要? 在软件开发中,方法(函数)是程序逻辑的基本单元。一个高质量的方法不仅决定了程序是否能正常运行,更直接影响到: 代码的可读性:能否让其他开发者快速理解可维护性:未来修改是否容易出错可测试性:是否便于编写单元测试协作效率…...

mysql中int(1) 和 int(10) 有什么区别?

困惑 最近遇到个问题&#xff0c;有个表的要加个user_id字段&#xff0c;user_id字段可能很大&#xff0c;于是我提mysql工单​​alter table xxx ADD user_id int(1)​​。领导看到我的sql工单&#xff0c;于是说&#xff1a;这int(1)怕是不够用吧&#xff0c;接下来是一通解…...

FreeRTOS如何实现100%的硬实时性?

实时系统在嵌入式应用中至关重要&#xff0c;其核心在于确保任务在指定时间内完成。根据截止时间满足的严格程度&#xff0c;实时系统分为硬实时和软实时。硬实时系统要求任务100%满足截止时间&#xff0c;否则可能导致灾难性后果&#xff0c;例如汽车安全系统或医疗设备。软实…...

深度学习 ----- 数据预处理

常用的高级数据预处理的方法总结 &#x1f9e0; 一、图像数据高级预处理方法汇总表 方法原理常用参数适用场景图像增强&#xff08;Augmentation&#xff09;改变图像外观/几何结构&#xff0c;提升泛化能力翻转、旋转、缩放、色调扰动等分类、检测、分割等Mixup / CutMix合成…...

Cluster Interconnect in Oracle RAC

Cluster Interconnect in Oracle RAC (文档 ID 787420.1)​编辑转到底部 In this Document Purpose Scope Details Physical Layout of the Private Interconnect Why Do We Need a Private Interconnect ? Interconnect Failure Interconnect High Availability Private Inte…...

【Spring Boot 注解】@SpringBootApplication

文章目录 SpringBootApplication注解一、简介二、使用1.指定要扫描的包 SpringBootApplication注解 一、简介 SpringBootApplication 是 Spring Boot 提供的一个注解&#xff0c;通常用于启动类&#xff08;主类&#xff09;上&#xff0c;它是三个注解的组合&#xff1a; 1.…...

angular的cdk组件库

目录 一、虚拟滚动 一、虚拟滚动 <!-- itemSize相当于每个项目的高度为30px --><!-- 需要给虚拟滚动设置宽高&#xff0c;否则无法正常显示 --> <cdk-virtual-scroll-viewport [itemSize]"40" class"view_scroll"><div class"m…...

element-ui日期时间选择器禁止输入日期

需求解释&#xff1a;时间日期选择器&#xff0c;下方日期有禁止选择范围&#xff0c;所以上面的日期输入框要求禁止输入&#xff0c;但时间输入框可以输入&#xff0c;也就是下图效果&#xff0c;其中日历中的禁止选择可以通过【picker-options】这个属性实现&#xff0c;此属…...

HarmonyOS Next~HarmonyOS应用测试全流程解析:从一级类目上架到二级类目专项测试

HarmonyOS Next&#xff5e;HarmonyOS应用测试全流程解析&#xff1a;从一级类目上架到二级类目专项测试 引言&#xff1a;HarmonyOS生态下的质量保障挑战 在万物互联的智能时代&#xff0c;HarmonyOS作为分布式操作系统&#xff0c;为开发者带来了前所未有的创新空间&#x…...

网络安全体系架构:核心框架与关键机制解析

以下是关于网络安全体系架构设计相关内容的详细介绍&#xff1a; 一、开放系统互联安全体系结构 开放系统互联&#xff08;OSI&#xff09;安全体系结构是一种基于分层模型的安全架构&#xff0c;旨在为开放系统之间的通信提供安全保障。它定义了安全服务、安全机制以及它们在…...

一种安全不泄漏、高效、免费的自动化脚本平台

在数字化转型加速的今天&#xff0c;自动化脚本工具已成为提升效率的重要助手。然而&#xff0c;用户在选择这类工具时&#xff0c;往往面临两大核心关切&#xff1a;安全性与成本。冰狐智能辅助&#xff08;IceFox Intelligent Assistant&#xff09;作为一款新兴的自动化脚本…...

[论文阅读]Deeply-Supervised Nets

摘要 我们提出的深度监督网络&#xff08;DSN&#xff09;方法在最小化分类误差的同时&#xff0c;使隐藏层的学习过程更加直接和透明。我们尝试通过研究深度网络中的新公式来提升分类性能。我们关注卷积神经网络&#xff08;CNN&#xff09;架构中的三个方面&#xff1a;&…...

多模态大语言模型arxiv论文略读(六十二)

MileBench: Benchmarking MLLMs in Long Context ➡️ 论文标题&#xff1a;MileBench: Benchmarking MLLMs in Long Context ➡️ 论文作者&#xff1a;Dingjie Song, Shunian Chen, Guiming Hardy Chen, Fei Yu, Xiang Wan, Benyou Wang ➡️ 研究机构: The Chinese Univers…...