Nacos 注册中心 - 健康检查机制源码
目录
1. 健康检查介绍
2. 客户端健康检查
2.1 临时实例的健康检查
2.2 永久实例的健康检查
3. 服务端健康检查
3.1 临时实例的健康检查
3.2 永久实例服务端健康检查
1. 健康检查介绍
当一个服务实例注册到 Nacos 中后,其他服务就可以从 Nacos 中查询出该服务实例信息,就可以调用使用了。
然而服务提供者如果此时挂掉了,此时其他服务拿到信息后就会调用不通,所以Nacos中的服务信息应该有一个更新机制(即删除掉挂掉的服务)
那么服务注册信息应该如何维护呢,那就是判断某个服务实例是否有问题,如果检测到服务实例出现问题了就将他剔除掉。
那么如何判断 服务实例 是否有问题呢?这就是健康检查要做的事情,即检查服务实例的健康状态。不健康则剔除下线。
下面看看客户端和服务端为了实现健康检查功能都各自做了哪些事情。
2. 客户端健康检查
2.1 临时实例的健康检查
从 Nacos 2.x 开始,临时实例的服务注册由原来的 HTTP 更换为了 GRPC 长连接方式。Nacos Client 和 Nacos Server 之间建立的 RPC 长连接,服务注册、服务取消注册等接口都是通过 GRPC 消息与服务端通信的。
GRPC 长连接是一直存在的,只要连接一直存在就代表Nacos Client 和 Nacos Server 之间的连接是通的,Nacos Client 则一直在线。如果Nacos Client 由于网络问题等其他问题挂掉了,那么这条长连接也会断开连接。
那么服务实例如何算健康呢,就是长连接一直存在没有断那就算健康的。
如果连接断掉了,那么该客户端上注册的全部服务实例都是不健康的了。
GRPC 长连接如果想一直保证连接状态,就需要定时发送心跳包,以确保连接处于活动的状态。否则一段时间不操作的话就会自动断开连接。
接下来看看 NacosClient 如何开启 RPC 长连接的
NacosClient 操作注册中心的 API 是通过 NamingService 进行的
在 NacosNamingService 的构造器中调用了 init 初始化方法:init 方法最后
进入最后一行代码:
再看最后 new NamingGrpcClientProxy 的源码:
public class NamingGrpcClientProxy {public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory,NacosClientProperties properties, ServiceInfoHolder serviceInfoHolder) {// 省略部分代码// 创建 RPC Clientthis.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);// 启动 RPC Clientstart(serverListFactory, serviceInfoHolder);}private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {rpcClient.serverListFactory(serverListFactory);rpcClient.registerConnectionListener(redoService);rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));// 启动rpcClient.start();}
看看 RpcClient.start 做了什么
public abstract class RpcClient {protected BlockingQueue<ConnectionEvent> eventLinkedBlockingQueue = new LinkedBlockingQueue<>();public final void start() {// 省略部分代码// connection event consumer.clientEventExecutor.submit(() -> {while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {ConnectionEvent take = eventLinkedBlockingQueue.take();if (take.isConnected()) {notifyConnected();} else if (take.isDisConnected()) {notifyDisConnected();}}});Connection connectToServer = null;// 状态设置为启动中rpcClientStatus.set(RpcClientStatus.STARTING);int startUpRetryTimes = rpcClientConfig.retryTimes();while (startUpRetryTimes > 0 && connectToServer == null) {startUpRetryTimes--;ServerInfo serverInfo = nextRpcServer();// 建立连接connectToServer = connectToServer(serverInfo);}this.currentConnection = connectToServer;// 状态设置为 运行中rpcClientStatus.set(RpcClientStatus.RUNNING);eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));// 省略部分代码}}
省略了一些代码,这次只关注核心的两个点
1. 找到下一个 RPC Server 建立连接
因为 Nacos 支持集群部署,此时的 RPC Server List 其实就是这些集群节点。也就是找到集群下一个节点建议连接,如果连接失败就走到下一轮循环再获取到下一个节点继续连接(再重试次数内循环)
2. eventLinkedBlockingQueue 队列中加入一项
eventLinkedBlockingQueue 队列里存的是 ConnectionEvent,ConnectionEvent 代表一个连接事件,事件有 已连接事件、断开连接事件。
public class ConnectionEvent {public static final int CONNECTED = 1;public static final int DISCONNECTED = 0;int eventType;public ConnectionEvent(int eventType) {this.eventType = eventType;}public boolean isConnected() {return eventType == CONNECTED;}public boolean isDisConnected() {return eventType == DISCONNECTED;}
}
可见上面的源码,连接建立成功后,就会往队列压入一个 已连接事件 CONNECTED
队列事件的消费者在哪里呢?
便是在 start 方法的最开头定义的,while 循环不断从队列中获取到数据然后根据事件类型,进行各自的通知。
public final void start() {// 省略部分代码// connection event consumer.clientEventExecutor.submit(() -> {while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {ConnectionEvent take = eventLinkedBlockingQueue.take();if (take.isConnected()) {// 通知连接notifyConnected();} else if (take.isDisConnected()) {// 通知断开连接notifyDisConnected();}}});}
notifyConnected 和 notifyDisConnected 实现差不多,所以这里只看一个的实现源码。
protected List<ConnectionEventListener> connectionEventListeners = new ArrayList<>();
protected void notifyConnected() {// 省略部分源码if (connectionEventListeners.isEmpty()) {return;}// 循环全部监听器 一个个回调for (ConnectionEventListener connectionEventListener : connectionEventListeners) { connectionEventListener.onConnected(); }
}
connectionEventListeners 里的数据是什么时候加入的呢?
那就是在 NamingGrpcClientProxy.start 方法
public class NamingGrpcRedoService implements ConnectionEventListener {private volatile boolean connected = false;@Overridepublic void onConnected() {// 建立连接,改变连接状态connected = true;}@Overridepublic void onDisConnect() {connected = false;// 将 redoService 上的全部缓存数据一改synchronized (registeredInstances) {registeredInstances.values().forEach(instanceRedoData -> instanceRedoData.setRegistered(false));}synchronized (subscribes) {subscribes.values().forEach(subscriberRedoData -> subscriberRedoData.setRegistered(false));}}
当GRPC 长连接断开后就会进入 onDisConnect 事件回调中,这里改变了setRegistered 状态
上篇说过,redoService的作用 Nacos 注册中心 - 服务注册源码
此时会走入服务卸载流程。
2.2 永久实例的健康检查
永久实例客户端只负责提交一个请求即完成了全部操作。
健康检查工作由 服务端做。
3. 服务端健康检查
在收到客户端建立连接事件回调后,会调用 init 方法
public class IpPortBasedClient extends AbstractClient {public void init() {if (ephemeral) {beatCheckTask = new ClientBeatCheckTaskV2(this);HealthCheckReactor.scheduleCheck(beatCheckTask);} else {healthCheckTaskV2 = new HealthCheckTaskV2(this);HealthCheckReactor.scheduleCheck(healthCheckTaskV2);}}
}
如果当前是临时实例:使用 ClientBeatCheckTaskV2 处理健康检查
如果当前是永久实例:使用 HealthCheckTaskV2处理健康检查
然后将任务放到线程池中执行定时执行
3.1 临时实例的健康检查
看看 ClientBeatCheckTaskV2 如何实现:
public class ClientBeatCheckTaskV2 extends AbstractExecuteTask implements BeatCheckTask, NacosHealthCheckTask {// 省略部分代码private final IpPortBasedClient client;private final InstanceBeatCheckTaskInterceptorChain interceptorChain;// 执行健康检查@Overridepublic void doHealthCheck() {// 拿到当前客户端上注册的全部服务Collection<Service> services = client.getAllPublishedService();for (Service each : services) {HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) client.getInstancePublishInfo(each);// 将全部服务用拦截器链一个个执行interceptorChain.doInterceptor(new InstanceBeatCheckTask(client, each, instance));} }@Overridepublic void run() {doHealthCheck();}
}
看看拦截器链如何实现
这里的拦截器链是一个典型的责任链模式
public abstract class AbstractNamingInterceptorChain<T extends Interceptable>implements NacosNamingInterceptorChain<T> {@Overridepublic void doInterceptor(T object) {for (NacosNamingInterceptor<T> each : interceptors) {if (!each.isInterceptType(object.getClass())) {continue;}// 当前是责任节点,直接由该责任节点处理if (each.intercept(object)) {object.afterIntercept();// 不往后执行了return;}}// 如果没有责任节点执行,就调用 passInterceptobject.passIntercept();}
}
首先第一个问题:拦截器都是些什么呢?
可见具体的有三个实现类
这三个类代表者三个地方的判断,判断是否开启了 健康心跳检查功能? 如果没开,那就被拦截了呀,就走不到后面的心跳检查代码了
ServiceEnableBeatCheckInterceptor
从 Service 的元数据上判断
public class ServiceEnableBeatCheckInterceptor extends AbstractBeatCheckInterceptor {@Overridepublic boolean intercept(InstanceBeatCheckTask object) {NamingMetadataManager metadataManager = ApplicationUtils.getBean(NamingMetadataManager.class);// 获取当前 Service 的元数据Optional<ServiceMetadata> metadata = metadataManager.getServiceMetadata(object.getService());// 如果元数据存在,并且其数据 enableClientBeat 配置了if (metadata.isPresent() && metadata.get().getExtendData().containsKey(UtilsAndCommons.ENABLE_CLIENT_BEAT)) {// 直接取 enableClientBeat 值return Boolean.parseBoolean(metadata.get().getExtendData().get(UtilsAndCommons.ENABLE_CLIENT_BEAT));}return false;}}
InstanceBeatCheckResponsibleInterceptor
并不是一个客户端要负责集群中全部节点的心跳处理的,而是只负责自己注册的。
public class InstanceBeatCheckResponsibleInterceptor extends AbstractBeatCheckInterceptor {@Overridepublic boolean intercept(InstanceBeatCheckTask object) {// 是否是当前责任节点return !ApplicationUtils.getBean(DistroMapper.class).responsible(object.getClient().getResponsibleId());}}
InstanceEnableBeatCheckInterceptor
这个就是实例级别的健康检查判断
public class InstanceEnableBeatCheckInterceptor extends AbstractBeatCheckInterceptor {@Overridepublic boolean intercept(InstanceBeatCheckTask object) {NamingMetadataManager metadataManager = ApplicationUtils.getBean(NamingMetadataManager.class);HealthCheckInstancePublishInfo instance = object.getInstancePublishInfo();// 获取到实例上的元数据Optional<InstanceMetadata> metadata = metadataManager.getInstanceMetadata(object.getService(), instance.getMetadataId());// 从元数据上取if (metadata.isPresent() && metadata.get().getExtendData().containsKey(UtilsAndCommons.ENABLE_CLIENT_BEAT)) {// 元数据存在取该值return ConvertUtils.toBoolean(metadata.get().getExtendData().get(UtilsAndCommons.ENABLE_CLIENT_BEAT).toString());}// 从 extendDatum 中取数据if (instance.getExtendDatum().containsKey(UtilsAndCommons.ENABLE_CLIENT_BEAT)) {return ConvertUtils.toBoolean(instance.getExtendDatum().get(UtilsAndCommons.ENABLE_CLIENT_BEAT).toString());}return false;}}
如果都没被上面三个拦截器拦截掉,那就代表 当前实例是 开启了 健康检查,所以后面就要开始进行 检查操作 了
检查操作由 object.passIntercept(); 做
object.passIntercept(); 是什么呢?
就是刚才的 开始拦截方法最后一行
@Override
public void doInterceptor(T object) {for (NacosNamingInterceptor<T> each : interceptors) {if (!each.isInterceptType(object.getClass())) {continue;}// 拦截器是否拦截?if (each.intercept(object)) {object.afterIntercept();// 拦截了,直接返回return;}}// 未被拦截到object.passIntercept();
}
object 是什么?
就是之前传过来的 InstanceBeatCheckTask
接下来看看 InstanceBeatCheckTask
public class InstanceBeatCheckTask implements Interceptable {// 全部检查项目private static final List<InstanceBeatChecker> CHECKERS = new LinkedList<>();private final IpPortBasedClient client;private final Service service;private final HealthCheckInstancePublishInfo instancePublishInfo;static {// 添加检查项目CHECKERS.add(new UnhealthyInstanceChecker());CHECKERS.add(new ExpiredInstanceChecker());// SPI 机制添加CHECKERS.addAll(NacosServiceLoader.load(InstanceBeatChecker.class));}@Overridepublic void passIntercept() {// 遍历全部检查项目for (InstanceBeatChecker each : CHECKERS) {// 开始检查each.doCheck(client, service, instancePublishInfo);}}
}
全部检查项目 都是什么呢?
下面分别介绍
UnhealthyInstanceChecker
不健康实例检查器
public class UnhealthyInstanceChecker implements InstanceBeatChecker {// 开始做检查@Overridepublic void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) {if (instance.isHealthy() && isUnhealthy(service, instance)) {// 当前实例不健康了 -> 改变健康状态为 不健康changeHealthyStatus(client, service, instance);}}// 判断实例是否健康private boolean isUnhealthy(Service service, HealthCheckInstancePublishInfo instance) {// 获取超时时间 默认 15 秒;可通过配置更改。long beatTimeout = getTimeout(service, instance);// 当前时间距离上一次发送心跳包时间 超过了 规定的超时时间 则返回 true,代表节点不健康了return System.currentTimeMillis() - instance.getLastHeartBeatTime() > beatTimeout;}// 改变健康状态private void changeHealthyStatus(Client client, Service service, HealthCheckInstancePublishInfo instance) {instance.setHealthy(false);// 省略部分代码}}
ExpiredInstanceChecker
过期实例检查器
public class ExpiredInstanceChecker implements InstanceBeatChecker {@Overridepublic void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) {boolean expireInstance = ApplicationUtils.getBean(GlobalConfig.class).isExpireInstance();if (expireInstance && isExpireInstance(service, instance)) {// 如果实例过期了,则直接剔除实例deleteIp(client, service, instance);}}private boolean isExpireInstance(Service service, HealthCheckInstancePublishInfo instance) {// 获取超时时间 默认 30 秒;可通过配置更改。long deleteTimeout = getTimeout(service, instance);// 当前时间距离上一次发送心跳包时间 超过了 规定的超时时间 则返回 true,代表节点过期了,需要进行节点剔除操作return System.currentTimeMillis() - instance.getLastHeartBeatTime() > deleteTimeout;}/*** 服务直接剔除掉*/private void deleteIp(Client client, Service service, InstancePublishInfo instance) {client.removeServiceInstance(service);// 客户端下线NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(service, client.getClientId()));// 元数据改变NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(service, instance.getMetadataId(), true));// 注销实例NotifyCenter.publishEvent(new DeregisterInstanceTraceEvent(System.currentTimeMillis(), "",false, DeregisterInstanceReason.HEARTBEAT_EXPIRE, service.getNamespace(), service.getGroup(),service.getName(), instance.getIp(), instance.getPort()));}
3.2 永久实例服务端健康检查
永久实例的健康检查是服务端主动探测方式,服务端定时外部请求客户端来看是否健康。
public class IpPortBasedClient extends AbstractClient {public void init() {if (ephemeral) {beatCheckTask = new ClientBeatCheckTaskV2(this);HealthCheckReactor.scheduleCheck(beatCheckTask);} else {healthCheckTaskV2 = new HealthCheckTaskV2(this);HealthCheckReactor.scheduleCheck(healthCheckTaskV2);}}
}
入口类是 HealthCheckTaskV2
public class HealthCheckTaskV2 extends AbstractExecuteTask implements NacosHealthCheckTask {// 省略部分代码private final IpPortBasedClient client;@Overridepublic void doHealthCheck() {// 获取到当前客户端上注册的全部节点for (Service each : client.getAllPublishedService()) {// 如果开启了健康检查if (switchDomain.isHealthCheckEnabled(each.getGroupedServiceName())) {// 拿到实例注册信息InstancePublishInfo instancePublishInfo = client.getInstancePublishInfo(each);// 拿到集群元数据ClusterMetadata metadata = getClusterMetadata(each, instancePublishInfo);// 调用 HealthCheckProcessorV2Delegate.process()ApplicationUtils.getBean(HealthCheckProcessorV2Delegate.class).process(this, each, metadata);
}}}
@Overridepublic void run() {doHealthCheck();}}
最终调用了 HealthCheckProcessorV2Delegate.process 方法
看看如何实现
HealthCheckProcessorV2Delegate 这个类就是一个委托类
public class HealthCheckProcessorV2Delegate implements HealthCheckProcessorV2 {// 类型,健康检查实现类private final Map<String, HealthCheckProcessorV2> healthCheckProcessorMap = new HashMap<>();
@Autowiredpublic void addProcessor(Collection<HealthCheckProcessorV2> processors) {healthCheckProcessorMap.putAll(processors.stream().filter(processor -> processor.getType() != null).collect(Collectors.toMap(HealthCheckProcessorV2::getType, processor -> processor)));}@Overridepublic void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {// 从元数据中获取到当前的健康检查类型 (HTTP、MySQL、TCP、None)String type = metadata.getHealthyCheckType();// 根据类型找到具体的 健康检查类HealthCheckProcessorV2 processor = healthCheckProcessorMap.get(type);if (processor == null) {// 找不到 就使用 None 健康检查processor = healthCheckProcessorMap.get(NoneHealthCheckProcessor.TYPE);}// 开始进行健康检查processor.process(task, service, metadata);}
}
健康检查有如下几类,还可通过 SPI 方式扩展
下面一个一个介绍
NoneHealthCheckProcessor
None 代表不做健康检查,所以这个类的Process 为空实现
public class NoneHealthCheckProcessor implements HealthCheckProcessorV2 {public static final String TYPE = HealthCheckType.NONE.name();@Overridepublic void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {}@Overridepublic String getType() {return TYPE;}
}
TcpHealthCheckProcessor
TCP 健康检查,用于通过 TCP 方式检查是否健康,本质上是通过建立 Socket 连接,发送 Socket 信息实现
public class TcpHealthCheckProcessor implements HealthCheckProcessorV2, Runnable {@Overridepublic void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {// 省略}// 省略部分代码private class TaskProcessor implements Callable<Void> {
@Overridepublic Void call() {// 发送 Socket 请求SocketChannel channel = null;HealthCheckInstancePublishInfo instance = beat.getInstance();BeatKey beatKey = keyMap.get(beat.toString());if (beatKey != null && beatKey.key.isValid()) {if (System.currentTimeMillis() - beatKey.birthTime < TCP_KEEP_ALIVE_MILLIS) {instance.finishCheck();return null;}beatKey.key.cancel();beatKey.key.channel().close();}channel = SocketChannel.open();channel.configureBlocking(false);// only by setting this can we make the socket close event asynchronouschannel.socket().setSoLinger(false, -1);channel.socket().setReuseAddress(true);channel.socket().setKeepAlive(true);channel.socket().setTcpNoDelay(true);ClusterMetadata cluster = beat.getMetadata();int port = cluster.isUseInstancePortForCheck() ? instance.getPort() : cluster.getHealthyCheckPort();channel.connect(new InetSocketAddress(instance.getIp(), port));SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);key.attach(beat);keyMap.put(beat.toString(), new BeatKey(key));beat.setStartTime(System.currentTimeMillis());GlobalExecutor.scheduleTcpSuperSenseTask(new TimeOutTask(key), CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);return null;}}
}
MysqlHealthCheckProcessor
本质是 发送一个 sql。(sql从配置中获取),整个过程没报异常就算健康
public class MysqlHealthCheckProcessor implements HealthCheckProcessorV2 {public static final String TYPE = HealthCheckType.MYSQL.name();private final HealthCheckCommonV2 healthCheckCommon;private final SwitchDomain switchDomain;public static final int CONNECT_TIMEOUT_MS = 500;private static final String CHECK_MYSQL_MASTER_SQL = "show global variables where variable_name='read_only'";private static final String MYSQL_SLAVE_READONLY = "ON";private static final ConcurrentMap<String, Connection> CONNECTION_POOL = new ConcurrentHashMap<String, Connection>();public MysqlHealthCheckProcessor(HealthCheckCommonV2 healthCheckCommon, SwitchDomain switchDomain) {this.healthCheckCommon = healthCheckCommon;this.switchDomain = switchDomain;}@Overridepublic String getType() {return TYPE;}@Overridepublic void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {// 省略}private class MysqlCheckTask implements Runnable {@Overridepublic void run() {Statement statement = null;ResultSet resultSet = null;String clusterName = instance.getCluster();String key =service.getGroupedServiceName() + ":" + clusterName + ":" + instance.getIp() + ":" + instance.getPort();Connection connection = CONNECTION_POOL.get(key);Mysql config = (Mysql) metadata.getHealthChecker();if (connection == null || connection.isClosed()) {String url = "jdbc:mysql://" + instance.getIp() + ":" + instance.getPort() + "?connectTimeout="+ CONNECT_TIMEOUT_MS + "&socketTimeout=" + CONNECT_TIMEOUT_MS + "&loginTimeout=" + 1;connection = DriverManager.getConnection(url, config.getUser(), config.getPwd());CONNECTION_POOL.put(key, connection);}statement = connection.createStatement();statement.setQueryTimeout(1);resultSet = statement.executeQuery(config.getCmd());int resultColumnIndex = 2;if (CHECK_MYSQL_MASTER_SQL.equals(config.getCmd())) {resultSet.next();if (MYSQL_SLAVE_READONLY.equals(resultSet.getString(resultColumnIndex))) {throw new IllegalStateException("current node is slave!");}}healthCheckCommon.checkOk(task, service, "mysql:+ok");healthCheckCommon.reEvaluateCheckRT(System.currentTimeMillis() - startTime, task,switchDomain.getMysqlHealthParams());
}
}
HttpHealthCheckProcessor
本质是发送一个 HTTP 请求,返回状态码 200 就算健康
public class HttpHealthCheckProcessor implements HealthCheckProcessorV2 {public static final String TYPE = HealthCheckType.HTTP.name();private static final NacosAsyncRestTemplate ASYNC_REST_TEMPLATE = HttpClientManager.getProcessorNacosAsyncRestTemplate();private final HealthCheckCommonV2 healthCheckCommon;
@Overridepublic void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) task.getClient().getInstancePublishInfo(service);if (null == instance) {return;}try {if (!instance.tryStartCheck()) {SRV_LOG.warn("http check started before last one finished, service: {} : {} : {}:{}",service.getGroupedServiceName(), instance.getCluster(), instance.getIp(), instance.getPort());healthCheckCommon.reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task, switchDomain.getHttpHealthParams());return;}Http healthChecker = (Http) metadata.getHealthChecker();int ckPort = metadata.isUseInstancePortForCheck() ? instance.getPort() : metadata.getHealthyCheckPort();URL host = new URL(HTTP_PREFIX + instance.getIp() + ":" + ckPort);URL target = new URL(host, healthChecker.getPath());Map<String, String> customHeaders = healthChecker.getCustomHeaders();Header header = Header.newInstance();header.addAll(customHeaders);// 发送 HTTP 请求ASYNC_REST_TEMPLATE.get(target.toString(), header, Query.EMPTY, String.class,new HttpHealthCheckCallback(instance, task, service));MetricsMonitor.getHttpHealthCheckMonitor().incrementAndGet();} catch (Throwable e) {instance.setCheckRt(switchDomain.getHttpHealthParams().getMax());healthCheckCommon.checkFail(task, service, "http:error:" + e.getMessage());healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task,switchDomain.getHttpHealthParams());}}@Overridepublic String getType() {return TYPE;}private class HttpHealthCheckCallback implements Callback<String> {private final HealthCheckTaskV2 task;private final Service service;private final HealthCheckInstancePublishInfo instance;private long startTime = System.currentTimeMillis();public HttpHealthCheckCallback(HealthCheckInstancePublishInfo instance, HealthCheckTaskV2 task,Service service) {this.instance = instance;this.task = task;this.service = service;}@Overridepublic void onReceive(RestResult<String> result) {instance.setCheckRt(System.currentTimeMillis() - startTime);int httpCode = result.getCode();if (HttpURLConnection.HTTP_OK == httpCode) {healthCheckCommon.checkOk(task, service, "http:" + httpCode);healthCheckCommon.reEvaluateCheckRT(System.currentTimeMillis() - startTime, task,switchDomain.getHttpHealthParams());} else if (HttpURLConnection.HTTP_UNAVAILABLE == httpCode|| HttpURLConnection.HTTP_MOVED_TEMP == httpCode) {// server is busy, need verification laterhealthCheckCommon.checkFail(task, service, "http:" + httpCode);healthCheckCommon.reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task, switchDomain.getHttpHealthParams());} else {//probably means the state files has been removed by administratorhealthCheckCommon.checkFailNow(task, service, "http:" + httpCode);healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task,switchDomain.getHttpHealthParams());}}@Overridepublic void onError(Throwable throwable) {Throwable cause = throwable;instance.setCheckRt(System.currentTimeMillis() - startTime);int maxStackDepth = 50;for (int deepth = 0; deepth < maxStackDepth && cause != null; deepth++) {if (HttpUtils.isTimeoutException(cause)) {healthCheckCommon.checkFail(task, service, "http:" + cause.getMessage());healthCheckCommon.reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task,switchDomain.getHttpHealthParams());return;}cause = cause.getCause();}if (throwable instanceof ConnectException) {healthCheckCommon.checkFailNow(task, service, "http:unable2connect:" + throwable.getMessage());} else {healthCheckCommon.checkFail(task, service, "http:error:" + throwable.getMessage());}healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task,switchDomain.getHttpHealthParams());}@Overridepublic void onCancel() {}}
}
可见,不同方式的健康检查差异还是挺大的,那么如果将检查结果告知 Nacos 呢,那就是调用
// 健康检查结果:健康
healthCheckCommon.checkOk(task, service, "");// 健康检查结果:不健康
healthCheckCommon.checkFail(task, service, "");
健康检查成功
检查成功主要做的事情就是 重置失败次数、结束检查
public void checkOk(HealthCheckTaskV2 task, Service service, String msg) {try {HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) task.getClient().getInstancePublishInfo(service);if (instance == null) {// 实例不存在,不做处理return;}if (!instance.isHealthy()) {// 如果实例不健康的,将状态改为 健康// 代码省略}} finally {// 重置失败次数instance.resetFailCount();// 结束检查instance.finishCheck();}
}
健康检查失
public void checkFail(HealthCheckTaskV2 task, Service service, String msg) {
HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) task.getClient().getInstancePublishInfo(service);if (instance == null) {return;}try {if (instance.isHealthy()) {// 如果实例是健康的,将状态改为 不健康// 代码省略}} finally {// 重置健康次数instance.resetOkCount();// 结束检查instance.finishCheck();}
}
相关文章:

Nacos 注册中心 - 健康检查机制源码
目录 1. 健康检查介绍 2. 客户端健康检查 2.1 临时实例的健康检查 2.2 永久实例的健康检查 3. 服务端健康检查 3.1 临时实例的健康检查 3.2 永久实例服务端健康检查 1. 健康检查介绍 当一个服务实例注册到 Nacos 中后,其他服务就可以从 Nacos 中查询出该服务…...

Transformer在计算机视觉中的应用-VIT、TNT模型
上期介绍了Transformer的结构、特点和作用等方面的知识,回头看下来这一模型并不难,依旧是传统机器翻译模型中常见的seq2seq网络,里面加入了注意力机制,QKV矩阵的运算使得计算并行。 当然,最大的重点不是矩阵运算&…...

快速入门Zookeeper技术.黑马教程
快速入门Zookeeper技术.黑马教程一、初识 Zookeeper二、ZooKeeper 安装与配置三、ZooKeeper 命令操作1.Zookeeper 数据模型2.Zookeeper 服务端常用命令3.Zookeeper 客户端常用命令四、ZooKeeper JavaAPI 操作五、ZooKeeper JavaAPI 操作1.Curator 介绍2.Curator API 常用操作2.…...
网易C++实习一面
说下C11新特性 auto有没有效率上的问题?为什么?发生在什么时候? 说下单例模式 什么时候需要加锁,什么时候不需要加锁? 像printf这样的函数,自己本身不修改数据,但是其他人会修改数据&#x…...

进程和线程的区别和联系
进程和线程的区别和联系1. 认识线程2. 进程和线程的关系3. 进程和线程的区别4. 线程共享了进程哪些资源1. 上下文切换2. 线程共享了进程哪些资源1.代码区2. 数据区3. 堆区1. 认识线程 线程是进程的一个实体,它被包含在进程中,一个进程至少包含一个线程,一个进程也可以包含多个…...

Java学习笔记——集合
目录集合与数组的对比集合体系结构Collection——常见成员方法Collection——迭代器基本使用Collection——迭代器原理分析Collection——迭代器删除方法增强for——基本格式增强for——注意点Collection——练习集合与数组的对比 package top.xxxx.www.CollectionDemo;import …...

差分运放公式推导-运算放大器
不知道大家有没遇到这种情况,在计算电路的时候,有时候会突然的忘记一些公式啊啥的,需要回去翻看笔记或者查资料,知其然而不知其所以然。今天跟大家一起来一起推导一遍差分运放的计算过程。 计算过程其实归根结底还是根据运放的虚…...
金丹二层 —— 字符串长度求解的四种方法
前言: 1.CSDN由于我的排版不怎么好看,我的有道云笔记比较美观,请移步有道云笔记 2.修炼必备 1)入门必备:VS2019社区版,下载地址:Visual Studio 较旧的下载 - 2019、2017、2015 和以前的版本 (m…...

深入剖析Linux——进程信号
致前行的人: 要努力,但不着急,繁花锦簇,硕果累累都需要过程! 目录 1.信号概念 1.1生活角度的信号 2. 技术应用角度的信号 3.Linux操作系统中查看信号 4.常用信号发送 4.1通过键盘发送信号 4.2调用系统函数发送信号 4.3…...

API-Server的监听器Controller的List分页失效
前言 最近做项目,还是K8S的插件监听器(理论上插件都是通过API-server通信),官方的不同写法居然都能出现争议,争议点就是对API-Server的请求的耗时,说是会影响API-Server。实际上通过源码分析两着有差别&am…...
jupyter notebook 进阶使用:nbextensions,终极避坑
jupyter notebook 进阶使用:nbextensions,终极避坑吐槽安装 jupyter_contrib_nbextensions1. Install the python package(安装python包)方法一,PIP:方法二,Conda(推荐)&…...
C 语言编程 — Doxygen + Graphviz 静态项目分析
目录 文章目录目录安装配置解析Project related configuration optionsBuild related configuration optionsConfiguration options related to warning and progress messagesConfiguration options related to the input filesConfiguration options related to source brows…...

Mybatis报BindingException:Invalid bound statement (not found)异常
一、前言 本文的mybatis是与springboot整合时出现的异常,若使用的不是基于springboot,解决思路也大体一样的。 二、从整合mybatis的三个步骤排查问题 但在这之前,我们先要知道整合mybatis的三个重要的工作,如此才能排查&#x…...

HttpRunner3.x(1)-框架介绍
HttpRunner 是一款面向 HTTP(S) 协议的通用测试框架,只需编写维护一份 YAML/JSON 脚本,即可实现自动化测试、性能测试、线上监控、持续集成等多种测试需求。主要特征继承的所有强大功能requests ,只需以人工方式获得乐趣即可处理HTTP…...

pytest学习和使用20-pytes如何进行分布式测试?(pytest-xdist)
20-pytes如何进行分布式测试?(pytest-xdist)1 什么是分布式测试?2 为什么要进行分布式测试?2.1 场景1:自动化测试场景2.2 场景2:性能测试场景3 分布式测试有什么特点?4 分布式测试关…...

三、Python 操作 MongoDB ----非 ODM
文章目录一、连接器的安装和配置二、新增文档三、查询文档四、更新文档五、删除文档一、连接器的安装和配置 pymongo: MongoDB 官方提供的 Python 工具包。官方文档: https://pymongo.readthedocs.io/en/stable/ pip安装,命令如下࿱…...
求最大公约数和最小公倍数---辗转相除法(欧几里得算法)
目录 一.GCD和LCM 1.最大公约数 2.最小公倍数 二.暴力求解 1.最大公约数 2.最小公倍数 三.辗转相除法 1.最大公约数 2.最小公倍数 一.GCD和LCM 1.最大公约数 最大公约数(Greatest Common Divisor,简称GCD)指的是两个或多个整数共有…...

音视频开发_获取媒体文件的详细信息
一、前言 做音视频开发过程中,经常需要获取媒体文件的详细信息。 比如:获取视频文件的总时间、帧率、尺寸、码率等等信息。 获取音频文件的的总时间、帧率、码率,声道等信息。 这篇文章贴出2个我封装好的函数,直接调用就能获取媒体信息返回,copy过去就能使用,非常方便。…...

Springboot集成Swagger
一、Swagger简介注意点! 在正式发布的时候要关闭swagger(出于安全考虑,而且节省内存空间)之前开发的时候,前端只用管理静态页面, http请求到后端, 模板引擎JSP,故后端是主力如今是前…...

Vue全新一代状态管理库 Pinia【一篇通】
文章目录前言1. Pinia 是什么?1.1 为什么取名叫 Pinia?1.2. 为什么要使用 Pinia ?2. 安装 Pinia2.1.创建 Store2.1.1. Option 类型 Store2.1.2 Setup 函数类型 Store2.1.3 模板中使用3. State 的使用事项(Option Store )3.1 读取 State3.2 …...

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式
一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明:假设每台服务器已…...

调用支付宝接口响应40004 SYSTEM_ERROR问题排查
在对接支付宝API的时候,遇到了一些问题,记录一下排查过程。 Body:{"datadigital_fincloud_generalsaas_face_certify_initialize_response":{"msg":"Business Failed","code":"40004","sub_msg…...

对WWDC 2025 Keynote 内容的预测
借助我们以往对苹果公司发展路径的深入研究经验,以及大语言模型的分析能力,我们系统梳理了多年来苹果 WWDC 主题演讲的规律。在 WWDC 2025 即将揭幕之际,我们让 ChatGPT 对今年的 Keynote 内容进行了一个初步预测,聊作存档。等到明…...
【AI学习】三、AI算法中的向量
在人工智能(AI)算法中,向量(Vector)是一种将现实世界中的数据(如图像、文本、音频等)转化为计算机可处理的数值型特征表示的工具。它是连接人类认知(如语义、视觉特征)与…...
Rust 异步编程
Rust 异步编程 引言 Rust 是一种系统编程语言,以其高性能、安全性以及零成本抽象而著称。在多核处理器成为主流的今天,异步编程成为了一种提高应用性能、优化资源利用的有效手段。本文将深入探讨 Rust 异步编程的核心概念、常用库以及最佳实践。 异步编程基础 什么是异步…...

C++ 求圆面积的程序(Program to find area of a circle)
给定半径r,求圆的面积。圆的面积应精确到小数点后5位。 例子: 输入:r 5 输出:78.53982 解释:由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982,因为我们只保留小数点后 5 位数字。 输…...
【python异步多线程】异步多线程爬虫代码示例
claude生成的python多线程、异步代码示例,模拟20个网页的爬取,每个网页假设要0.5-2秒完成。 代码 Python多线程爬虫教程 核心概念 多线程:允许程序同时执行多个任务,提高IO密集型任务(如网络请求)的效率…...
Python ROS2【机器人中间件框架】 简介
销量过万TEEIS德国护膝夏天用薄款 优惠券冠生园 百花蜂蜜428g 挤压瓶纯蜂蜜巨奇严选 鞋子除臭剂360ml 多芬身体磨砂膏280g健70%-75%酒精消毒棉片湿巾1418cm 80片/袋3袋大包清洁食品用消毒 优惠券AIMORNY52朵红玫瑰永生香皂花同城配送非鲜花七夕情人节生日礼物送女友 热卖妙洁棉…...
C++.OpenGL (14/64)多光源(Multiple Lights)
多光源(Multiple Lights) 多光源渲染技术概览 #mermaid-svg-3L5e5gGn76TNh7Lq {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-3L5e5gGn76TNh7Lq .error-icon{fill:#552222;}#mermaid-svg-3L5e5gGn76TNh7Lq .erro…...

RSS 2025|从说明书学习复杂机器人操作任务:NUS邵林团队提出全新机器人装配技能学习框架Manual2Skill
视觉语言模型(Vision-Language Models, VLMs),为真实环境中的机器人操作任务提供了极具潜力的解决方案。 尽管 VLMs 取得了显著进展,机器人仍难以胜任复杂的长时程任务(如家具装配),主要受限于人…...