当前位置: 首页 > article >正文

Nacos源码—9.Nacos升级gRPC分析七

大纲

10.gRPC客户端初始化分析

11.gRPC客户端的心跳机制(健康检查)

12.gRPC服务端如何处理客户端的建立连接请求

13.gRPC服务端如何映射各种请求与对应的Handler处理类

14.gRPC简单介绍

10.gRPC客户端初始化分析

(1)gRPC客户端代理初始化的源码

(2)gRPC客户端启动的源码

(3)gRPC客户端发起与服务端建立连接请求的源码

(1)gRPC客户端代理初始化的源码

Nacos客户端注册服务实例时会调用NacosNamingService的registerInstance()方法,接着会调用NamingClientProxyDelegate的registerService()方法,然后判断注册的服务实例是不是临时的。如果注册的服务实例是临时的,那么就使用gRPC客户端代理去进行注册。如果注册的服务实例不是临时的,那么就使用HTTP客户端代理去进行注册。

NacosNamingService的init()方法在创建客户端代理,也就是执行NamingClientProxyDelegate的构造方法时,便会创建和初始化gRPC客户端代理NamingGrpcClientProxy。

创建和初始化gRPC客户端代理NamingGrpcClientProxy时,首先会由RpcClientFactory的createClient()方法创建一个RpcClient对象,并将GrpcClient对象赋值给NamingGrpcClientProxy的rpcClient属性,然后调用NamingGrpcClientProxy的start()方法启动RPC客户端连接。

在NamingGrpcClientProxy的start()方法中,会先注册一个用于处理服务端推送请求的NamingPushRequestHandler,然后调用RpcClient的start()方法启动RPC客户端即RpcClient对象,最后将NamingGrpcClientProxy自己作为订阅者向通知中心进行注册。

public class NacosNamingService implements NamingService {...private NamingClientProxy clientProxy;private void init(Properties properties) throws NacosException {...this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);}...@Overridepublic void registerInstance(String serviceName, Instance instance) throws NacosException {registerInstance(serviceName, Constants.DEFAULT_GROUP, instance);}@Overridepublic void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {NamingUtils.checkInstanceIsLegal(instance);//调用NamingClientProxy的注册方法registerService(),其实就是NamingClientProxyDelegate.registerService()方法clientProxy.registerService(serviceName, groupName, instance);}...
}//客户端代理
public class NamingClientProxyDelegate implements NamingClientProxy {private final NamingHttpClientProxy httpClientProxy;private final NamingGrpcClientProxy grpcClientProxy;public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, Properties properties, InstancesChangeNotifier changeNotifier) throws NacosException {...//初始化HTTP客户端代理this.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties, serviceInfoHolder);//初始化gRPC客户端代理this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties, serviceInfoHolder);}...@Overridepublic void registerService(String serviceName, String groupName, Instance instance) throws NacosException {getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);}private NamingClientProxy getExecuteClientProxy(Instance instance) {return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;}...
}//gRPC客户端代理
public class NamingGrpcClientProxy extends AbstractNamingClientProxy {private final String namespaceId;private final String uuid;    private final Long requestTimeout;    private final RpcClient rpcClient;private final NamingGrpcRedoService redoService;//初始化gRPC客户端代理public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory, Properties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {super(securityProxy);this.namespaceId = namespaceId;this.uuid = UUID.randomUUID().toString();this.requestTimeout = Long.parseLong(properties.getProperty(CommonParams.NAMING_REQUEST_TIMEOUT, "-1"));Map<String, String> labels = new HashMap<String, String>();labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING);//1.通过RpcClientFactory.createClient()方法创建一个GrpcSdkClient对象实例,然后赋值给rpcClient属性this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);this.redoService = new NamingGrpcRedoService(this);//2.启动gRPC客户端代理NamingGrpcClientProxystart(serverListFactory, serviceInfoHolder);}private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {rpcClient.serverListFactory(serverListFactory);//注册连接监听器rpcClient.registerConnectionListener(redoService);//1.注册一个用于处理服务端推送请求的NamingPushRequestHandlerrpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));//2.启动RPC客户端RpcClientrpcClient.start();//3.将NamingGrpcClientProxy自己作为订阅者向通知中心进行注册NotifyCenter.registerSubscriber(this);}...@Overridepublic void registerService(String serviceName, String groupName, Instance instance) throws NacosException {NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName, instance);redoService.cacheInstanceForRedo(serviceName, groupName, instance);//执行服务实例的注册doRegisterService(serviceName, groupName, instance);}//Execute register operation.public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {//创建请求参数对象InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName, NamingRemoteConstants.REGISTER_INSTANCE, instance);//向服务端发起请求requestToServer(request, Response.class);redoService.instanceRegistered(serviceName, groupName);}private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass) throws NacosException {try {request.putAllHeader(getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));//实际会调用RpcClient.request()方法发起gRPC请求Response response = requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {throw new NacosException(response.getErrorCode(), response.getMessage());}if (responseClass.isAssignableFrom(response.getClass())) {return (T) response;}NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'", response.getClass().getName(), responseClass.getName());} catch (Exception e) {throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);}throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");}...
}public class RpcClientFactory {private static final Map<String, RpcClient> CLIENT_MAP = new ConcurrentHashMap<>();...//create a rpc client.public static RpcClient createClient(String clientName, ConnectionType connectionType, Integer threadPoolCoreSize, Integer threadPoolMaxSize, Map<String, String> labels) {if (!ConnectionType.GRPC.equals(connectionType)) {throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType());}return CLIENT_MAP.computeIfAbsent(clientName, clientNameInner -> {LOGGER.info("[RpcClientFactory] create a new rpc client of " + clientName);try {//创建GrpcClient对象GrpcClient client = new GrpcSdkClient(clientNameInner);//设置线程核心数和最大数client.setThreadPoolCoreSize(threadPoolCoreSize);client.setThreadPoolMaxSize(threadPoolMaxSize);client.labels(labels);return client;} catch (Throwable throwable) {LOGGER.error("Error to init GrpcSdkClient for client name :" + clientName, throwable);throw throwable;}});}...
}

(2)gRPC客户端启动的源码

NamingGrpcClientProxy的start()方法会通过调用RpcClient的start()方法,来启动RPC客户端即RpcClient对象。

在RpcClient的start()方法中,首先会利用CAS来修改RPC客户端(RpcClient)的状态,也就是将RpcClient.rpcClientStatus属性从INITIALIZED更新为STARTING。

然后会创建一个核心线程数为2的线程池,并提交两个任务。任务一是处理连接成功或连接断开时的线程,任务二是处理重连或健康检查的线程。

接着会创建Connection连接对象,也就是在while循环中调用GrpcClient的connectToServer()方法,尝试与服务端建立连接。如果连接失败,则会抛出异常并且进行重试,由于是同步连接,所以最大重试次数是3。

最后当客户端与服务端成功建立连接后,会把对应的Connection连接对象赋值给RpcClient.currentConnection属性,并且修改RpcClient.rpcClientStatus属性即RPC客户端状态为RUNNING。

如果客户端与服务端连接失败,则会通过异步尝试进行连接,也就是调用RpcClient的switchServerAsync()方法,往RpcClient的reconnectionSignal队列中放入一个ReconnectContext对象,reconnectionSignal队列中的元素会交给任务2来处理。

public abstract class RpcClient implements Closeable {protected volatile AtomicReference<RpcClientStatus> rpcClientStatus = new AtomicReference<>(RpcClientStatus.WAIT_INIT);protected ScheduledExecutorService clientEventExecutor;protected BlockingQueue<ConnectionEvent> eventLinkedBlockingQueue = new LinkedBlockingQueue<>();//在NamingGrpcClientProxy初始化 -> 调用RpcClient.start()方法时,会将GrpcClient.connectToServer()方法的返回值赋值给currentConnection属性protected volatile Connection currentConnection;private final BlockingQueue<ReconnectContext> reconnectionSignal = new ArrayBlockingQueue<>(1);...public final void start() throws NacosException {//利用CAS来修改RPC客户端(RpcClient)的状态,从INITIALIZED更新为STARTINGboolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);if (!success) {return;}//接下来创建调度线程池执行器,并提交两个任务clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.remote.worker");t.setDaemon(true);return t;});//任务1:处理连接成功或连接断开时的线程clientEventExecutor.submit(() -> {...     });//任务2:处理重连或健康检查的线程clientEventExecutor.submit(() -> {...});//创建连接对象Connection connectToServer = null;rpcClientStatus.set(RpcClientStatus.STARTING);//重试次数为3次int startUpRetryTimes = RETRY_TIMES;//在while循环中尝试与服务端建立连接,最多循环3次while (startUpRetryTimes > 0 && connectToServer == null) {try {startUpRetryTimes--;//获取服务端信息ServerInfo serverInfo = nextRpcServer();LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to connect to server on start up, server: {}", name, serverInfo);//调用GrpcClient.connectToServer()方法建立和服务端的长连接connectToServer = connectToServer(serverInfo);} catch (Throwable e) {LoggerUtils.printIfWarnEnabled(LOGGER, "[{}] Fail to connect to server on start up, error message = {}, start up retry times left: {}", name, e.getMessage(), startUpRetryTimes);}}//如果连接成功,connectToServer对象就不为空if (connectToServer != null) {LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up, connectionId = {}", name, connectToServer.serverInfo.getAddress(), connectToServer.getConnectionId());//连接对象赋值,currentConnection其实就是一个在客户端使用的GrpcConnection对象实例this.currentConnection = connectToServer;//更改RPC客户端RpcClient的状态rpcClientStatus.set(RpcClientStatus.RUNNING);//往eventLinkedBlockingQueue队列放入ConnectionEvent事件eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));} else {//尝试进行异步连接switchServerAsync();}registerServerRequestHandler(new ConnectResetRequestHandler());    //register client detection request.registerServerRequestHandler(request -> {if (request instanceof ClientDetectionRequest) {return new ClientDetectionResponse();}return null;});}protected ServerInfo nextRpcServer() {String serverAddress = getServerListFactory().genNextServer();//获取服务端信息return resolveServerInfo(serverAddress);}private ServerInfo resolveServerInfo(String serverAddress) {Matcher matcher = EXCLUDE_PROTOCOL_PATTERN.matcher(serverAddress);if (matcher.find()) {serverAddress = matcher.group(1);}String[] ipPortTuple = serverAddress.split(Constants.COLON, 2);int defaultPort = Integer.parseInt(System.getProperty("nacos.server.port", "8848"));String serverPort = CollectionUtils.getOrDefault(ipPortTuple, 1, Integer.toString(defaultPort));return new ServerInfo(ipPortTuple[0], NumberUtils.toInt(serverPort, defaultPort));}public void switchServerAsync() {//异步注册逻辑switchServerAsync(null, false);}protected void switchServerAsync(final ServerInfo recommendServerInfo, boolean onRequestFail) {//往reconnectionSignal队列里放入一个对象reconnectionSignal.offer(new ReconnectContext(recommendServerInfo, onRequestFail));}...
}

(3)gRPC客户端发起与服务端建立连接请求的源码

gRPC客户端与服务端建立连接的方法是GrpcClient的connectToServer()方法。该方法首先会获取进行网络通信的端口号,因为gRPC服务需要额外占用一个端口的,所以这个端口号是在Nacos的8848基础上 + 偏移量1000,变成9848。

在建立连接之前,会先检查一下服务端,如果没问题才发起连接请求,接着就会调用GrpcConnection的sendRequest()方法发起连接请求,最后返回GrpcConnection连接对象。

public abstract class GrpcClient extends RpcClient {...@Overridepublic Connection connectToServer(ServerInfo serverInfo) {try {if (grpcExecutor == null) {this.grpcExecutor = createGrpcExecutor(serverInfo.getServerIp());}//获取端口号:gRPC服务需要额外占用一个端口的,这个端口是在Nacos 8848的基础上,+ 偏移量1000,所以是9848int port = serverInfo.getServerPort() + rpcPortOffset();RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(serverInfo.getServerIp(), port);if (newChannelStubTemp != null) {//检查一下服务端,没问题才会发起RPC连接请求Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);if (response == null || !(response instanceof ServerCheckResponse)) {shuntDownChannel((ManagedChannel) newChannelStubTemp.getChannel());return null;}BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc.newStub(newChannelStubTemp.getChannel());//创建连接对象GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId());//create stream request and bind connection event to this connection.//创建流请求并将连接事件绑定到此连接StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);//stream observer to send response to servergrpcConn.setPayloadStreamObserver(payloadStreamObserver);grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);grpcConn.setChannel((ManagedChannel) newChannelStubTemp.getChannel());//send a  setup request.ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());conSetupRequest.setLabels(super.getLabels());conSetupRequest.setAbilities(super.clientAbilities);conSetupRequest.setTenant(super.getTenant());//发起连接请求grpcConn.sendRequest(conSetupRequest);//wait to register connection setupThread.sleep(100L);return grpcConn;}return null;} catch (Exception e) {LOGGER.error("[{}]Fail to connect to server!,error={}", GrpcClient.this.getName(), e);}return null;}private Response serverCheck(String ip, int port, RequestGrpc.RequestFutureStub requestBlockingStub) {try {if (requestBlockingStub == null) {return null;}ServerCheckRequest serverCheckRequest = new ServerCheckRequest();Payload grpcRequest = GrpcUtils.convert(serverCheckRequest);//向服务端发送一个检查请求ListenableFuture<Payload> responseFuture = requestBlockingStub.request(grpcRequest);Payload response = responseFuture.get(3000L, TimeUnit.MILLISECONDS);//receive connection unregister response here,not check response is success.return (Response) GrpcUtils.parse(response);} catch (Exception e) {LoggerUtils.printIfErrorEnabled(LOGGER, "Server check fail, please check server {} ,port {} is available , error ={}", ip, port, e);return null;}}private StreamObserver<Payload> bindRequestStream(final BiRequestStreamGrpc.BiRequestStreamStub streamStub, final GrpcConnection grpcConn) {//调用BiRequestStreamStub.requestBiStream()方法连接服务端return streamStub.requestBiStream(new StreamObserver<Payload>() {@Overridepublic void onNext(Payload payload) {LoggerUtils.printIfDebugEnabled(LOGGER, "[{}]Stream server request receive, original info: {}", grpcConn.getConnectionId(), payload.toString());try {Object parseBody = GrpcUtils.parse(payload);final Request request = (Request) parseBody;if (request != null) {try {Response response = handleServerRequest(request);if (response != null) {response.setRequestId(request.getRequestId());sendResponse(response);} else {LOGGER.warn("[{}]Fail to process server request, ackId->{}", grpcConn.getConnectionId(), request.getRequestId());}} catch (Exception e) {LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Handle server request exception: {}", grpcConn.getConnectionId(), payload.toString(), e.getMessage());Response errResponse = ErrorResponse.build(NacosException.CLIENT_ERROR, "Handle server request error");errResponse.setRequestId(request.getRequestId());sendResponse(errResponse);}}} catch (Exception e) {LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Error to process server push response: {}", grpcConn.getConnectionId(), payload.getBody().getValue().toStringUtf8());}}@Overridepublic void onError(Throwable throwable) {boolean isRunning = isRunning();boolean isAbandon = grpcConn.isAbandon();if (isRunning && !isAbandon) {LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Request stream error, switch server,error={}", grpcConn.getConnectionId(), throwable);if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {switchServerAsync();}} else {LoggerUtils.printIfWarnEnabled(LOGGER, "[{}]Ignore error event,isRunning:{},isAbandon={}", grpcConn.getConnectionId(), isRunning, isAbandon);}}@Overridepublic void onCompleted() {boolean isRunning = isRunning();boolean isAbandon = grpcConn.isAbandon();if (isRunning && !isAbandon) {LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Request stream onCompleted, switch server", grpcConn.getConnectionId());if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {switchServerAsync();}} else {LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]Ignore complete event,isRunning:{},isAbandon={}", grpcConn.getConnectionId(), isRunning, isAbandon);}}});}...
}

(4)总结

11.gRPC客户端的心跳机制(健康检查)

(1)线程任务一:处理连接成功或连接断开时的通知

(2)线程任务二:处理重连或健康检查

RpcClient的start()方法会调用GrpcClient的connectToServer()方法连接服务端,不管连接是否成功,最后都会往不同的阻塞队列中添加事件。

如果连接成功,那么就往RpcClient的eventLinkedBlockingQueue添加连接事件。如果连接失败,那么就往RpcClient的reconnectionSignal队列添加重连对象。而这两个阻塞队列中的数据处理,便是由执行RpcClient的start()方法时启动的两个线程任务进行处理的。

(1)线程任务一:处理连接成功或连接断开时的通知

这个任务主要在连接成功或者连接断开时,修改一些属性状态。通过eventLinkedBlockingQueue的take()方法从队列取到连接事件后,会判断连接事件是否建立连接还是断开连接。

如果是建立连接,那么就调用RpcClient的notifyConnected()方法,把执行NamingGrpcClientProxy的start()方法时所注册的NamingGrpcRedoService对象的connected属性设置为true。

如果是断开连接,那么就调用RpcClient的notifyDisConnected()方法,把执行NamingGrpcClientProxy的start()方法时所注册的NamingGrpcRedoService对象的connected属性设置为false。

public abstract class RpcClient implements Closeable {protected volatile AtomicReference<RpcClientStatus> rpcClientStatus = new AtomicReference<>(RpcClientStatus.WAIT_INIT);protected ScheduledExecutorService clientEventExecutor;protected BlockingQueue<ConnectionEvent> eventLinkedBlockingQueue = new LinkedBlockingQueue<>();private final BlockingQueue<ReconnectContext> reconnectionSignal = new ArrayBlockingQueue<>(1);//listener called where connection's status changed. 连接状态改变的监听器protected List<ConnectionEventListener> connectionEventListeners = new ArrayList<>();...public final void start() throws NacosException {//利用CAS来修改RPC客户端(RpcClient)的状态,从INITIALIZED更新为STARTINGboolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);if (!success) {return;}//接下来创建调度线程池执行器,并提交两个任务clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.remote.worker");t.setDaemon(true);return t;});//任务1:处理连接成功或连接断开时的线程clientEventExecutor.submit(() -> {while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {ConnectionEvent take;try {take = eventLinkedBlockingQueue.take();if (take.isConnected()) {notifyConnected();} else if (take.isDisConnected()) {notifyDisConnected();}} catch (Throwable e) {// Do nothing}}   });//任务2:向服务端上报心跳或重连的线程clientEventExecutor.submit(() -> {...});}...//Notify when client new connected.protected void notifyConnected() {if (connectionEventListeners.isEmpty()) {return;}LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Notify connected event to listeners.", name);for (ConnectionEventListener connectionEventListener : connectionEventListeners) {try {connectionEventListener.onConnected();} catch (Throwable throwable) {LoggerUtils.printIfErrorEnabled(LOGGER, "[{}] Notify connect listener error, listener = {}", name, connectionEventListener.getClass().getName());}}}//Notify when client disconnected.protected void notifyDisConnected() {if (connectionEventListeners.isEmpty()) {return;}LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Notify disconnected event to listeners", name);for (ConnectionEventListener connectionEventListener : connectionEventListeners) {try {connectionEventListener.onDisConnect();} catch (Throwable throwable) {LoggerUtils.printIfErrorEnabled(LOGGER, "[{}] Notify disconnect listener error, listener = {}", name, connectionEventListener.getClass().getName());}}}...//Register connection handler. Will be notified when inner connection's state changed.//在执行NamingGrpcClientProxy.start()方法时会将NamingGrpcRedoService对象注册到connectionEventListeners中public synchronized void registerConnectionListener(ConnectionEventListener connectionEventListener) {LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Registry connection listener to current client:{}", name, connectionEventListener.getClass().getName());this.connectionEventListeners.add(connectionEventListener);}...
}public class NamingGrpcRedoService implements ConnectionEventListener {private volatile boolean connected = false;...@Overridepublic void onConnected() {connected = true;LogUtils.NAMING_LOGGER.info("Grpc connection connect");}@Overridepublic void onDisConnect() {connected = false;LogUtils.NAMING_LOGGER.warn("Grpc connection disconnect, mark to redo");synchronized (registeredInstances) {registeredInstances.values().forEach(instanceRedoData -> instanceRedoData.setRegistered(false));}synchronized (subscribes) {subscribes.values().forEach(subscriberRedoData -> subscriberRedoData.setRegistered(false));}LogUtils.NAMING_LOGGER.warn("mark to redo completed");}...
}

(2)线程任务二:处理重连或健康检查

如果RpcClient的start()方法在调用GrpcClient的connectToServer()方法连接服务端时失败了,那么会往RpcClient.reconnectionSignal队列添加重连对象的,而这个任务就会获取reconnectionSignal队列中的重连对象进行重连。

因为reconnectionSignal中的数据是当连接失败时放入的,所以如果从reconnectionSignal中获取不到重连对象,等同于连接成功。

注意:这个任务从reconnectionSignal阻塞队列中获取重连对象时,调用的是阻塞队列的take()方法,而不是阻塞队列的poll()方法。BlockingQueue的take()方法,如果读取不到数据,会一直处于阻塞状态。BlockingQueue的poll()方法,在指定的时间内读取不到数据,会返回null。

情况一:如果从reconnectionSignal队列中获取到的重连对象为null

首先判断存活时间是否大于 5s,如果大于则调用RpcClient.healthCheck()方法发起健康检查的RPC请求。健康检查的触发方法是currentConnection.request()方法,健康检查的请求类型是HealthCheckRequest。

如果健康检查成功,只需刷新存活时间即可。如果健康检查失败,则需要尝试与服务端重新建立连接。

情况二:如果从reconnectionSignal队列中获取到的重连对象不为null

那么就调用RpcClient的reconnect()方法进行重新连接,该方法会通过GrpcClient的connectToServer()方法尝试与服务端建立连接。

public abstract class RpcClient implements Closeable {protected volatile AtomicReference<RpcClientStatus> rpcClientStatus = new AtomicReference<>(RpcClientStatus.WAIT_INIT);protected ScheduledExecutorService clientEventExecutor;protected BlockingQueue<ConnectionEvent> eventLinkedBlockingQueue = new LinkedBlockingQueue<>();private final BlockingQueue<ReconnectContext> reconnectionSignal = new ArrayBlockingQueue<>(1);...public final void start() throws NacosException {//利用CAS来修改RPC客户端(RpcClient)的状态,从INITIALIZED更新为STARTINGboolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);if (!success) {return;}//接下来创建调度线程池执行器,并提交两个任务clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.remote.worker");t.setDaemon(true);return t;});//任务1:处理连接成功或连接断开时的线程clientEventExecutor.submit(() -> {...     });//任务2:向服务端上报心跳或重连的线程clientEventExecutor.submit(() -> {while (true) {try {if (isShutdown()) {break;}//这里从reconnectionSignal阻塞队列中获取任务不是调用take()方法,而是调用poll()方法,并且指定了5s的最大读取时间//BlockingQueue的take()方法,如果读取不到数据,会一直处于阻塞状态//BlockingQueue的poll()方法,在指定的时间内读取不到数据,会返回nullReconnectContext reconnectContext = reconnectionSignal.poll(keepAliveTime, TimeUnit.MILLISECONDS);//reconnectContext为null,说明从reconnectionSignal中获取不到数据//由于reconnectionSignal中的数据是当连接失败时放入的//所以从reconnectionSignal中获取不到数据,等同于连接成功if (reconnectContext == null) {//check alive time.//检查存活时间,默认存活时间为5s,超过5s就需要做健康检查if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {//调用RpcClient.healthCheck()方法,发起健康检查请求boolean isHealthy = healthCheck();//如果向服务端发起健康检查请求失败,则需要尝试重新建立连接if (!isHealthy) {if (currentConnection == null) {continue;}LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Server healthy check fail, currentConnection = {}", name, currentConnection.getConnectionId());//判断连接状态是否关闭,如果是则结束异步任务RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {break;}//修改RpcClient的连接状态为不健康boolean statusFLowSuccess = RpcClient.this.rpcClientStatus.compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);//给reconnectContext属性赋值,准备尝试重连if (statusFLowSuccess) {//重新赋值,注意这里没有continue,所以逻辑会接着往下执行reconnectContext = new ReconnectContext(null, false);} else {continue;}} else {//如果向服务端发起健康检查请求成功,则刷新RpcClient的存活时间lastActiveTimeStamp = System.currentTimeMillis();continue;}} else {continue;}}if (reconnectContext.serverInfo != null) {//clear recommend server if server is not in server list.//如果服务器不在服务器列表中,则清除推荐服务器,即设置reconnectContext.serverInfo为nullboolean serverExist = false;//遍历服务端列表for (String server : getServerListFactory().getServerList()) {ServerInfo serverInfo = resolveServerInfo(server);if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) {serverExist = true;reconnectContext.serverInfo.serverPort = serverInfo.serverPort;break;}}//reconnectContext.serverInfo不存在服务端列表中,就清除服务器信息,设置reconnectContext.serverInfo为nullif (!serverExist) {LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Recommend server is not in server list, ignore recommend server {}", name, reconnectContext.serverInfo.getAddress());reconnectContext.serverInfo = null;}}//进行重新连接,RpcClient.reconnect()方法中会调用GrpcClient.connectToServer()方法尝试与服务端建立连接reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);} catch (Throwable throwable) {//Do nothing}}});}private boolean healthCheck() {HealthCheckRequest healthCheckRequest = new HealthCheckRequest();if (this.currentConnection == null) {return false;}try {//利用currentConnection连接对象,发起RPC请求,请求类型是HealthCheckRequestResponse response = this.currentConnection.request(healthCheckRequest, 3000L);//not only check server is ok, also check connection is register.return response != null && response.isSuccess();} catch (NacosException e) {//ignore}return false;}...
}

(3)总结

相关文章:

Nacos源码—9.Nacos升级gRPC分析七

大纲 10.gRPC客户端初始化分析 11.gRPC客户端的心跳机制(健康检查) 12.gRPC服务端如何处理客户端的建立连接请求 13.gRPC服务端如何映射各种请求与对应的Handler处理类 14.gRPC简单介绍 10.gRPC客户端初始化分析 (1)gRPC客户端代理初始化的源码 (2)gRPC客户端启动的源码…...

从入门到精通:Drools全攻略

目录 一、Drools 初相识二、快速上手 Drools2.1 环境搭建2.2 第一个 Drools 程序 三、深入理解 Drools 核心概念3.1 规则&#xff08;Rule&#xff09;3.2 工作内存&#xff08;Working Memory&#xff09;3.3 知识库&#xff08;Knowledge Base, KieBase&#xff09;3.4 会话&…...

最大子段和(递推)

题目描述 给出一个长度为 n 的序列 a&#xff0c;选出其中连续且非空的一段使得这段和最大。 输入格式 第一行是一个整数&#xff0c;表示序列的长度 n。 第二行有 n 个整数&#xff0c;第 i 个整数表示序列的第 i 个数字 ai​。 输出格式 输出一行一个整数表示答案。 输…...

【计算机视觉】基于深度学习的实时情绪检测系统:emotion-detection项目深度解析

基于深度学习的实时情绪检测系统&#xff1a;emotion-detection项目深度解析 1. 项目概述2. 技术原理与模型架构2.1 核心算法1) 数据预处理流程2) 改进型MobileNetV2 2.2 系统架构 3. 实战部署指南3.1 环境配置3.2 数据集准备3.3 模型训练3.4 实时推理 4. 常见问题与解决方案4.…...

Windows CMD通过adb检查触摸屏Linux驱动是否被编译

检查 CONFIG_TOUCHSCREEN_GT9XX 是否启用&#xff0c;检查内核是否编译了Goodix GT9XX系列触摸屏的驱动支持 Windows CMD.exe输入&#xff1a; adb shell “zcat /proc/config.gz | grep CONFIG_TOUCHSCREEN_GT9XX” 如果返回CONFIG_TOUCHSCREEN_GT9XXy&#xff0c;表示驱动已编…...

【图像处理基石】什么是油画感?

在图像处理中&#xff0c;“油画感”通常指图像呈现出类似油画的块状纹理、笔触痕迹或色彩过渡不自然的现象&#xff0c;表现为细节模糊、边缘不锐利、颜色断层或人工纹理明显。这种问题常见于照片处理、视频帧截图或压缩后的图像&#xff0c;本质是画质受损的一种表现。以下是…...

AD PCB布线的常用命令

PCB布线顺序&#xff1a;先信号&#xff0c;再电源&#xff0c;再GNG 1.多根走线的应用 将IC上的引脚分类 更改一类引脚以及引线的颜色&#xff0c;画出走线&#xff08;将脚引出&#xff09; 选中这些走线&#xff0c;点击‘交互式总线布线’&#xff0c;便可以多根拉线 shi…...

Python操作Elasticsearch实战指南:从安装到性能调优的全链路解析

一、引言:为什么选择Python+Elasticsearch? Elasticsearch作为分布式搜索引擎,在日志分析、全文检索等场景中表现卓越。Python凭借其简洁语法和丰富生态,成为操作ES的首选语言。本文将带您从环境搭建到性能调优,系统掌握Python操作ES的核心技能。 二、环境准备:三步完成…...

【3-2】HDLC

前言 前面我们提到了 PSTN&#xff08;Public Switched Telephone Network&#xff09; &#xff0c;今天介绍一种很少见的数据链路层的协议&#xff0c;HDLC&#xff01; 文章目录 前言1. 定义2. 帧边界3. 零比特填充4. 控制字段4.1. 信息帧&#xff08;I帧&#xff09;4.2. …...

MySQL 学习(八)如何打开binlog日志

目录 一、默认状态二、如何检查 binlog 状态三、如何开启 binlog3.1 临时开启&#xff08;重启后失效&#xff09;3.2 永久开启&#xff08;需修改配置文件&#xff09;3.3 验证是否开启成功3.4 查看 binlog 内容 四、高级配置建议五、注意事项六、开启后的日常维护 知识回顾&a…...

《数据库原理》部分习题解析

《数据库原理》部分习题解析 1. 课本pg196.第1题。 &#xff08;1&#xff09;函数依赖 若对关系模式 R(U) 的任何可能的关系 r&#xff0c;对于任意两个元组 t₁ 和 t₂&#xff0c;若 t₁[X] t₂[X]&#xff0c;则必须有 t₁[Y] t₂[Y]&#xff0c;则称属性集 Y 函数依赖…...

OpenCV进阶操作:光流估计

文章目录 前言一、光流估计1、光流估计是什么&#xff1f;2、光流估计的前提&#xff1f;1&#xff09;亮度恒定2&#xff09;小运动3&#xff09;空间一致 3、OpenCV中的经典光流算法1&#xff09;Lucas-Kanade方法&#xff08;稀疏光流&#xff09;2&#xff09; Farneback方…...

uniapp+vue3开发项目之引入vuex状态管理工具

前言&#xff1a; 我们在vue2的时候常用的状态管理工具就是vuex&#xff0c;vue3开发以后&#xff0c;又多了一个pinia的选项&#xff0c;相对更轻便&#xff0c;但是vuex也用的非常多的&#xff0c;这里简单说下在uni-app中vuex的使用。 实现步骤&#xff1a; 1、安装&#x…...

SparkSQL 连接 MySQL 并添加新数据:实战指南

SparkSQL 连接 MySQL 并添加新数据&#xff1a;实战指南 在大数据处理中&#xff0c;SparkSQL 作为 Apache Spark 的重要组件&#xff0c;能够方便地与外部数据源进行交互。MySQL 作为广泛使用的关系型数据库&#xff0c;与 SparkSQL 的结合可以充分发挥两者的优势。本文将详细…...

面试题:请解释Java中的设计模式,并举例说明单例模式(Singleton Pattern)的实现方式

Java中的设计模式 设计模式是在软件开发过程中针对特定场景而使用的通用解决方案。设计模式可以帮助开发者编写出更加清晰、灵活和可维护的代码。设计模式分为三大类&#xff1a; 创建型模式&#xff1a;用于对象的创建过程&#xff0c;如单例模式、工厂模式、建造者模式等。…...

4. 文字效果/2D-3D转换 - 3D翻转卡片

4. 文字效果/2D-3D转换 - 3D翻转卡片 案例&#xff1a;3D产品展示卡片 <!DOCTYPE html> <html><head><meta charset"utf-8"><title></title></head><style type"text/css">.scene {width: 300px;height…...

嵌入式学习笔记 - 关于单片机的位数

通常我们经常说一个单片机是8位的&#xff0c;16位的&#xff0c;32位的&#xff0c;那么怎么判断一款单片机的位数是多少位呢&#xff0c;判断的依据是什么呢&#xff0c; 一 单片机的位数 单片机的位数是指单片机数据总线的宽度&#xff0c;也就是一次能处理的数据的位数&a…...

【AI News | 20250513】每日AI进展

AI Repos 1、iap-diffusion-labs 从零开始带我们构建完整的扩散模型。通过三个精心设计的实验练习&#xff0c;循序渐进地引导我们实现流匹配和扩散模型&#xff0c;从基础 SDE 到条件图像生成&#xff0c;每一步都有详尽指导和完整代码&#xff0c;让复杂理论简单易懂。主要内…...

mybatisplus 集成逻辑删除

一开始&#xff0c;没去查资料&#xff0c;后面要被AI气死了&#xff0c;先看它的的话 一开始&#xff0c;看ai的描述&#xff0c;我还以为&#xff0c;不需要改数据库&#xff0c;mybatis-puls自动拦截集成就可以实现逻辑删除&#xff0c;c&#xff0c;最后还是要给数据库加一…...

typedef unsigned short uint16_t; typedef unsigned int uint32_t;

你提到的这两行是 C/C 中的类型别名定义&#xff1a; typedef unsigned short uint16_t; typedef unsigned int uint32_t;它们的目的是让代码更具可读性和可移植性&#xff0c;尤其在处理精确位数的整数时非常有用。 ✅ 含义解释 typedef unsigned short uint16_t;…...

SimScape物理建模实例2--带控制的单质量弹簧阻尼系统

模型下载&#xff1a; 基于simscape&#xff0c;单质量系统带位置控制资源-CSDN文库 在实例1中&#xff0c;我们搭建了不带控制的单质量弹簧阻尼系统&#xff0c;该系统没有外界力量介入&#xff0c;只有弹簧的初始弹力&#xff0c;带着弹簧使劲弹来弹去。 SimScape物理建模实…...

PyGame游戏开发(含源码+演示视频+开结题报告+设计文档)

前言&#xff1a; 大二小学期python课上基于pygame做的一个游戏小demo&#xff0c;当时老师花了一天讲解了下python基础语法后&#xff08;也是整个大学四年唯一学习python的时间&#xff09;&#xff0c;便让我们自学网课一周然后交项目&#xff0c;所以做的非常仓促&#xff…...

拒绝flash插件打劫!如何在vscode上玩4399小游戏

现在电脑上玩4399都需要flash插件了 这也导致了很多人无法玩到小时候的游戏 今天介绍一款插件 功能强大 即安即玩 首先打开vscode 点开小方框&#xff08;拓展&#xff09;搜索4399 认准4399 on vscode点击安装 安装完毕后 按下 Ctrl Shift P , 输入 4399 on VSCode 或…...

五大静态博客框架对比:Hugo、Hexo、VuePress、MkDocs、Jekyll

目录 1. Hugo概述优点缺点适用场景使用体验 2. Hexo概述优点缺点适用场景使用体验 3. VuePress概述优点缺点适用场景使用体验 4. MkDocs概述优点缺点适用场景使用体验 5. Jekyll概述优点缺点适用场景使用体验 框架对比总结如何选择&#xff1f;结语 静态博客框架通过将内容&…...

learning ray之ray核心设计和架构

我们每天都在处理海量、多样且高速生成的数据&#xff0c;这对计算能力提出了前所未有的挑战。传统的单机计算模式在面对日益复杂的机器学习模型和大规模数据集时&#xff0c;往往显得力不从心。更重要的是&#xff0c;数据科学家们本应专注于模型训练、特征工程、超参数调优这…...

C语言while循环的用法(非常详细,附带实例)

while 是 C 语言中的一种循环控制结构&#xff0c;用于在特定条件为真时重复执行一段代码。 while 循环的语法如下&#xff1a; while (条件表达式) { // 循环体&#xff1a;条件为真时执行的代码 } 条件表达式&#xff1a;返回真&#xff08;非 0&#xff09;或假&#x…...

JavaScript进阶(九)

第三部分:JavaScript进阶 目录 第三部分:JavaScript进阶 一、作用域 1.1 局部作用域 1. 作用域 2. 局部作用域 函数作用域 块作用域 1.2 全局作用域 1.3 作用域链 1.4 JS垃圾回收机制 1. 什么是垃圾回收机制 2. 内存的声明周期 3. 垃圾回收的算法说明 引用计数…...

数据结构与算法分析实验11 实现顺序查找表

实现顺序查找表 1.上机名称2.上机要求3.上机环境4.程序清单(写明运行结果及结果分析)4.1 程序清单4.1.1 头文件4.1.2 实现文件4.1.3 源文件 4.2 实现展效果示 上机体会 1.上机名称 实现顺序查找表 顺序查找表的基本概念 顺序查找表是一种线性数据结构&#xff0c;通常用于存储…...

获取高德地图JS API的安全密钥和Key的方法

要使用高德地图JavaScript API&#xff0c;您需要获取API Key和安全密钥(securityJsCode)。以下是获取步骤&#xff1a; 1. 注册高德开放平台账号 首先访问高德开放平台&#xff0c;如果没有账号需要先注册。 2. 创建应用获取Key 登录后进入"控制台" 点击"应…...

Excel表的导入与导出

Excel表的导入与导出 根据excel表来建立所需的数据库表格 <dependency><groupId>com.alibaba</groupId><artifactId>easyexcel</artifactId><version>4.0.3</version></dependency>导入easyexcel依赖项 阿里巴巴的 EasyExcel …...