elasticsearch源码分析-03选举集群状态
选举集群状态
es中存储的数据有一下几种,state元数据、lucene索引文件、translog事务日志
元数据信息可以分为:
- 集群层面的元信息-对应着metaData数据结构,主要是clusterUUid、settings、templates等
- 索引层面的元信息-对应着indexMetaData数据结构,主要存储分片数量、mappings索引字段映射等
- 分片层面的元信息-对应着shardStateMetaData,主要是version、indexUUid、主分片等
每个节点可能会有不同的集群状态,需要选择正确的元数据作为权威源数据。状态信息的管理在gatewayService中,它实现了ClusterStateListener接口,当选择完主节点后会发布一个集群状态task,触发回调方法clusterChanged
//恢复分片分配状态
performStateRecovery(enforceRecoverAfterTime, reason);
集群层和索引层元数据恢复在gateway模块完成
public void clusterChanged(final ClusterChangedEvent event) {if (lifecycle.stoppedOrClosed()) {return;}final ClusterState state = event.state();//只有主节点才能执行if (state.nodes().isLocalNodeElectedMaster() == false) {// not our job to recoverreturn;}//已经执行过了集群状态和索引状态恢复了if (state.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {// already recoveredreturn;}//这段省略主要是检查是否达到恢复状态条件......//恢复状态performStateRecovery(enforceRecoverAfterTime, reason);
}
首先判断只有主节点可以执行状态选举,然后判断是否已经在执行了状态恢复任务了,如果是则直接返回;如果没有则执行恢复状态任务
最终会调用recoveryRunnable.run()
final Gateway gateway = new Gateway(settings, clusterService, listGatewayMetaState);recoveryRunnable = () ->gateway.performStateRecovery(new GatewayRecoveryListener());
执行gateway的performStateRecovery方法
首先回去所有master资格的节点信息
//具有master资格的node节点final String[] nodesIds = clusterService.state().nodes().getMasterNodes().keys().toArray(String.class);
获取其他master节点的元数据
//获取集群及信息final TransportNodesListGatewayMetaState.NodesGatewayMetaState nodesState = listGatewayMetaState.list(nodesIds, null).actionGet();
这里我们看下TransportNodesListGatewayMetaState的构造函数
public TransportNodesListGatewayMetaState(ThreadPool threadPool, ClusterService clusterService, TransportService transportService,ActionFilters actionFilters, GatewayMetaState metaState) {super(ACTION_NAME, threadPool, clusterService, transportService, actionFilters,Request::new, NodeRequest::new, ThreadPool.Names.GENERIC, NodeGatewayMetaState.class);this.metaState = metaState;
}//注册action处理类
transportService.registerRequestHandler(actionName, executor, false, canTripCircuitBreaker, requestReader,new TransportHandler());
回到list方法,会调用doExecute方法
public ActionFuture<NodesGatewayMetaState> list(String[] nodesIds, @Nullable TimeValue timeout) {PlainActionFuture<NodesGatewayMetaState> future = PlainActionFuture.newFuture();execute(new Request(nodesIds).timeout(timeout), future);return future;
}protected void doExecute(Task task, NodesRequest request, ActionListener<NodesResponse> listener) {//执行new AsyncAction(task, request, listener).start();
}
发送所有节点获取元数据
void start() {final DiscoveryNode[] nodes = request.concreteNodes();if (nodes.length == 0) {//没有需要获取数据的node// nothing to notifythreadPool.generic().execute(() -> listener.onResponse(newResponse(request, responses)));return;}TransportRequestOptions.Builder builder = TransportRequestOptions.builder();if (request.timeout() != null) {builder.withTimeout(request.timeout());}//循环发送请求给所有节点for (int i = 0; i < nodes.length; i++) {final int idx = i;final DiscoveryNode node = nodes[i];final String nodeId = node.getId();try {TransportRequest nodeRequest = newNodeRequest(request);if (task != null) {nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());}//发送请求transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(),new TransportResponseHandler<NodeResponse>() {@Overridepublic NodeResponse read(StreamInput in) throws IOException {return newNodeResponse(in);}//处理返回@Overridepublic void handleResponse(NodeResponse response) {onOperation(idx, response);}@Overridepublic void handleException(TransportException exp) {onFailure(idx, node.getId(), exp);}@Overridepublic String executor() {return ThreadPool.Names.SAME;}});} catch (Exception e) {onFailure(idx, nodeId, e);}}
}
对端接收请求后处理在上面注册的NodeTransportHandler,构造每个节点元数据返回
//node请求处理class NodeTransportHandler implements TransportRequestHandler<NodeRequest> {@Overridepublic void messageReceived(NodeRequest request, TransportChannel channel, Task task) throws Exception {channel.sendResponse(nodeOperation(request, task));}}protected NodeGatewayMetaState nodeOperation(NodeRequest request) {return new NodeGatewayMetaState(clusterService.localNode(), metaState.getMetadata());}
我们继续回到每个节点发送请求的返回处理
//处理返回
@Override
public void handleResponse(NodeResponse response) {onOperation(idx, response);
}private void onOperation(int idx, NodeResponse nodeResponse) {//记录node的返回结果responses.set(idx, nodeResponse);//当所有节点都返回结果了无论是失败还是成功了if (counter.incrementAndGet() == responses.length()) {finishHim();}
}private void finishHim() {NodesResponse finalResponse;try {finalResponse = newResponse(request, responses);} catch (Exception e) {logger.debug("failed to combine responses from nodes", e);listener.onFailure(e);return;}//触发监听回调listener.onResponse(finalResponse);
}
及获取到了其他节点的元数据,继续回到performStateRecovery
需要获取的master角色节点数
//需要分配数量
final int requiredAllocation = Math.max(1, minimumMasterNodes);
开始通过版本号选择集群层元数据,比较版本号,选择版本号最大的集群状态
//集群元数据
for (final TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState.getNodes()) {if (nodeState.metadata() == null) {continue;}found++;if (electedGlobalState == null) {electedGlobalState = nodeState.metadata();//比较版本号大的胜出} else if (nodeState.metadata().version() > electedGlobalState.version()) {electedGlobalState = nodeState.metadata();}for (final ObjectCursor<IndexMetadata> cursor : nodeState.metadata().indices().values()) {indices.addTo(cursor.value.getIndex(), 1);}
}
检查是否有足够数量节点返回了集群状态
//没有获取足够的节点返回消息
if (found < requiredAllocation) {listener.onFailure("found [" + found + "] metadata states, required [" + requiredAllocation + "]");return;
}
构造集群状态,删除索引信息,下面会选择索引级元数据
//更新全局状态,清理索引,我们在下一阶段选择它们final Metadata.Builder metadataBuilder = Metadata.builder(electedGlobalState).removeAllIndices();
遍历所有节点选择返回的索引元数据版本最高的节点作为索引级元数据,然后将索引级元数据添加到metadataBuilder中
for (int i = 0; i < keys.length; i++) {if (keys[i] != null) {final Index index = (Index) keys[i];IndexMetadata electedIndexMetadata = null;int indexMetadataCount = 0;for (final TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState.getNodes()) {if (nodeState.metadata() == null) {continue;}final IndexMetadata indexMetadata = nodeState.metadata().index(index);if (indexMetadata == null) {continue;}if (electedIndexMetadata == null) {electedIndexMetadata = indexMetadata;//比较版本号,选择最大版本号} else if (indexMetadata.getVersion() > electedIndexMetadata.getVersion()) {electedIndexMetadata = indexMetadata;}indexMetadataCount++;}if (electedIndexMetadata != null) {if (indexMetadataCount < requiredAllocation) {logger.debug("[{}] found [{}], required [{}], not adding", index, indexMetadataCount, requiredAllocation);} // TODO if this logging statement is correct then we are missing an else here//设置索引级元数据metadataBuilder.put(electedIndexMetadata, false);}}
}
构造恢复后的集群级元数据和索引级元数据
//恢复后的集群状态
ClusterState recoveredState = Function.<ClusterState>identity().andThen(state -> ClusterStateUpdaters.upgradeAndArchiveUnknownOrInvalidSettings(state, clusterService.getClusterSettings())).apply(ClusterState.builder(clusterService.getClusterName()).metadata(metadataBuilder).build());listener.onSuccess(recoveredState);
调用GatewayRecoveryListener的onSuccess向集群提交任务
class GatewayRecoveryListener implements Gateway.GatewayStateRecoveredListener {@Overridepublic void onSuccess(final ClusterState recoveredState) {logger.trace("successful state recovery, importing cluster state...");clusterService.submitStateUpdateTask("local-gateway-elected-state",new RecoverStateUpdateTask() {@Overridepublic ClusterState execute(final ClusterState currentState) {final ClusterState updatedState = ClusterStateUpdaters.mixCurrentStateAndRecoveredState(currentState, recoveredState);return super.execute(ClusterStateUpdaters.recoverClusterBlocks(updatedState));}});}@Overridepublic void onFailure(final String msg) {logger.info("state recovery failed: {}", msg);resetRecoveredFlags();}}
调用RecoverStateUpdateTask的execute方法
@Override
public ClusterState execute(final ClusterState currentState) {if (currentState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {logger.debug("cluster is already recovered");return currentState;}//状态信息恢复完成final ClusterState newState = Function.<ClusterState>identity().andThen(ClusterStateUpdaters::updateRoutingTable).andThen(ClusterStateUpdaters::removeStateNotRecoveredBlock).apply(currentState);//开始分配分片return allocationService.reroute(newState, "state recovered");
}
集群元数据和索引级元数据恢复完成开始分配分片
- 元数据的持久化
具有master资格的节点和数据节点可以持久化集群状态,当接收到集群状态变更时会将其持久化到磁盘GatewayClusterApplier实现了ClusterStateApplier,当集群状态变更时会调用applyClusterState方法
@Override
public void applyClusterState(ClusterChangedEvent event) {if (event.state().blocks().disableStatePersistence()) {incrementalClusterStateWriter.setIncrementalWrite(false);return;}try {// Hack: This is to ensure that non-master-eligible Zen2 nodes always store a current term// that's higher than the last accepted term.// TODO: can we get rid of this hack?if (event.state().term() > incrementalClusterStateWriter.getPreviousManifest().getCurrentTerm()) {incrementalClusterStateWriter.setCurrentTerm(event.state().term());}//更新磁盘上的元数据incrementalClusterStateWriter.updateClusterState(event.state());incrementalClusterStateWriter.setIncrementalWrite(true);} catch (WriteStateException e) {logger.warn("Exception occurred when storing new meta data", e);}
}
将集群级元数据和索引级元数据落盘
void updateClusterState(ClusterState newState) throws WriteStateException {//元数据Metadata newMetadata = newState.metadata();final long startTimeMillis = relativeTimeMillisSupplier.getAsLong();final AtomicClusterStateWriter writer = new AtomicClusterStateWriter(metaStateService, previousManifest);//全局元数据long globalStateGeneration = writeGlobalState(writer, newMetadata);//索引级元数据Map<Index, Long> indexGenerations = writeIndicesMetadata(writer, newState);Manifest manifest = new Manifest(previousManifest.getCurrentTerm(), newState.version(), globalStateGeneration, indexGenerations);writeManifest(writer, manifest);previousManifest = manifest;previousClusterState = newState;final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis;final TimeValue finalSlowWriteLoggingThreshold = this.slowWriteLoggingThreshold;if (durationMillis >= finalSlowWriteLoggingThreshold.getMillis()) {logger.warn("writing cluster state took [{}ms] which is above the warn threshold of [{}]; " +"wrote metadata for [{}] indices and skipped [{}] unchanged indices",durationMillis, finalSlowWriteLoggingThreshold, writer.getIndicesWritten(), writer.getIndicesSkipped());} else {logger.debug("writing cluster state took [{}ms]; wrote metadata for [{}] indices and skipped [{}] unchanged indices",durationMillis, writer.getIndicesWritten(), writer.getIndicesSkipped());}
}
- 加载磁盘元数据
在node实例的start方法中会调用gatewayMetaState.start方法
//集群元数据
final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class);
gatewayMetaState.start(settings(), transportService, clusterService, injector.getInstance(MetaStateService.class),injector.getInstance(MetadataIndexUpgradeService.class), injector.getInstance(MetadataUpgrader.class),injector.getInstance(PersistedClusterStateService.class));
然后会调用loadFullState方法
//加载元数据
manifestClusterStateTuple = metaStateService.loadFullState();public Tuple<Manifest, Metadata> loadFullState() throws IOException {//加载最新的状态文件final Manifest manifest = MANIFEST_FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.nodeDataPaths());if (manifest == null) {return loadFullStateBWC();}//构建元数据final Metadata.Builder metadataBuilder;if (manifest.isGlobalGenerationMissing()) {metadataBuilder = Metadata.builder();} else {final Metadata globalMetadata = METADATA_FORMAT.loadGeneration(logger, namedXContentRegistry, manifest.getGlobalGeneration(),nodeEnv.nodeDataPaths());if (globalMetadata != null) {metadataBuilder = Metadata.builder(globalMetadata);} else {throw new IOException("failed to find global metadata [generation: " + manifest.getGlobalGeneration() + "]");}}//索引级元数据for (Map.Entry<Index, Long> entry : manifest.getIndexGenerations().entrySet()) {final Index index = entry.getKey();final long generation = entry.getValue();final String indexFolderName = index.getUUID();final IndexMetadata indexMetadata = INDEX_METADATA_FORMAT.loadGeneration(logger, namedXContentRegistry, generation,nodeEnv.resolveIndexFolder(indexFolderName));if (indexMetadata != null) {metadataBuilder.put(indexMetadata, false);} else {throw new IOException("failed to find metadata for existing index " + index.getName() + " [location: " + indexFolderName +", generation: " + generation + "]");}}return new Tuple<>(manifest, metadataBuilder.build());
}
从磁盘读取构建索引级元数据和集群级元数据,用于构建集群状态对象ClusterState
相关文章:
elasticsearch源码分析-03选举集群状态
选举集群状态 es中存储的数据有一下几种,state元数据、lucene索引文件、translog事务日志 元数据信息可以分为: 集群层面的元信息-对应着metaData数据结构,主要是clusterUUid、settings、templates等索引层面的元信息-对应着indexMetaData数…...
MySQL 重要参数优化
max_connections = 3000 innodb_buffer_pool_size = 8G max_allowed_packet = 32M innodb_file_io_threads = 8 innodb_thread_concurrency = 16 innodb_flush_log_at_trx_commit = 2 innodb_log_buffer_size = 16M 参数说明 max_connections = 3000 运行MySQL的最大连…...
软件测试之接口测试(Postman/Jmeter)
🍅 视频学习:文末有免费的配套视频可观看 🍅 点击文末小卡片,免费获取软件测试全套资料,资料在手,涨薪更快 一、什么是接口测试 通常做的接口测试指的是系统对外的接口,比如你需要从别的系统来…...
14 卡尔曼滤波及代码实现
文章目录 14 卡尔曼滤波及代码实现14.0 基本概念14.1 公式推导14.2 代码实现 14 卡尔曼滤波及代码实现 14.0 基本概念 卡尔曼滤波是一种利用线性系统状态方程,通过系统输入输出观测数据,对系统状态进行最优估计的算法。由于观测数据包括系统中的噪声和…...
计算机视觉 图像融合技术概览
在许多计算机视觉应用中(例如机器人运动和医学成像),需要将来自多幅图像的相关信息集成到一幅图像中。这种图像融合将提供更高的可靠性、准确性和数据质量。 多视图融合可以提高图像的分辨率,同时恢复场景的 3D 表示。多模态融合结合了来自不同传感器的图像,称为多传感器融…...
计算机网络课程实训:局域网方案设计与实现(基于ensp)
文章目录 前言基本要求操作分公司1分公司2总部核心交换机配置实现内部服务器的搭建acl_deny部分用户与服务器出口出口防火墙配置 前言 本篇文章是小编实训部分内容,内容可能会有错误,另外ensp对电脑兼容性及其挑剔,在使用之前一定要安装好。…...
【安全开发】内网扫描器
文章目录 前言现实现的功能较少后序开发会逐步加入简单漏洞探探测和代理功能。 一、开发过程1.项目结构2.main.go3.core模块3.1 scanner.go3.2 service.go 4.bruteforc4.1 bruteforce.go 二、使用步骤 前言 为什么要写这个? fscna被杀的概率太高(哪天二…...
ESP32-C3模组上跑通MQTT(5)
接前一篇文章:ESP32-C3模组上跑通MQTT(4) 本文内容参考: 《ESP32-C3 物联网工程开发实战》 一分钟了解MQTT协议 ESP32 MQTT API指南-CSDN博客 ESP-IDF MQTT 示例入门_mqtt outbox-CSDN博客 ESP32用自签CA进行MQTT的TLS双向认证通信_esp32 mqtt ssl-CSDN博客 特此致谢!…...
Arduino - LED 矩阵
Arduino - LED 矩阵 Arduino - LED Matrix LED matrix display, also known as LED display, or dot matrix display, are wide-used. In this tutorial, we are going to learn: LED矩阵显示器,也称为LED显示器,或点阵显示器,应用广泛。在…...
设计模式 - Observer Pattern 观察者模式
文章目录 定义观察者模式的实现构成构成UML图 观察者模式的代码实现场景代码实现 总结优点缺点应用场景 其他设计模式文章: 定义 观察者模式是行为型模式的一种,它定义对象间的一种一对多的依赖关系,使得每当一个对象改变状态,它…...
【面试系列】C++ 高频面试题
欢迎来到我的博客,很高兴能够在这里和您见面!欢迎订阅相关专栏: ⭐️ 全网最全IT互联网公司面试宝典:收集整理全网各大IT互联网公司技术、项目、HR面试真题. ⭐️ AIGC时代的创新与未来:详细讲解AIGC的概念、核心技术、…...
程序猿大战Python——实现简单的图书馆系统操作
步骤1:安装和导入库 首先,确保已经安装了 pymysql 库。如果没有安装,请执行以下命令: pip install pymysql 然后,导入必要的库: import pymysql 步骤2:创建数据库和表的函数 编写一个函数来…...
液体粒子计数器的原理及常见型号选择 lighthouse代理商北京中邦兴业
液体颗粒计数用于测量液体样品中颗粒的大小和分布。通过用激光二极管照射液体样品并检测散射光来测量颗粒分布和尺寸。散射光的性质与粒子大小的大小有关。液体颗粒计数器可用于批量取样或在线(连续监测)应用,如水处理厂,或用于…...
Java知识点整理 16 — Spring Bean
在之前的文章 Java知识点整理 8 — Spring 简介 中介绍了 Spring 的两大核心概念 IoC 和 AOP,但对 Spring Bean 的介绍不全面,本文将补充 Spring 中 Bean 的概念。 一. 什么是 Spring Bean 在 Spring 官方文档中,对 bean 的定义为…...
Nvidia Jetson/RK3588+AI双目立体相机,适合各种割草机器人、扫地机器人、AGV等应用
双目立体视觉是基于视差原理,依据成像设备从不同位置获取的被测物体的图像,匹配对应点的位置偏移,得到视差数据,进而计算物体的空间三维信息。为您带来高图像质量的双目立体相机,具有高分辨率、低功耗、远距离等优点&a…...
springboot使用feign调用不依赖cloud
在使用spring boot调用第三方api中,常用的是okhttp、apache http client等,但是直接使用下来还是有点繁琐,需要手动转换实体。 在springcloud中有个openfeign调用,第一次体验到调用接口还能这么丝滑。注解写道接口上,…...
springboot中使用springboot cache
前言:SpringBoot中使用Cache缓存可以提高对缓存的开发效率 此图片是SpringBootCache常用注解 Springboot Cache中常用注解 第一步:引入依赖 <!--缓存--><dependency><groupId>org.springframework.boot</groupId><artifactId…...
Promise,async/await的运用
一,了解Promise Promise是异步编程的一种解决方案,它是一个对象,可以获取异步操作的消息,它的出现避免了地狱回调。 (1)Promise的实例有三个状态: Pending(进行中) Re…...
图论·多源最短路径Floyddijsktra
例题地址 多源最短路径 多个源点多个终点可以使用Floyd算法直接求各源点到终点的最短距离,也可以直接多次使用dijsktra算法求单源点到终点的最短距离 Floyd算法 使用条件 多源最短路径权值正负皆可 核心思想:动态规划 子问题: 设(A,B)…...
微服务 | Springboot整合GateWay+Nacos实现动态路由
1、简介 路由转发 执行过滤器链。 网关,旨在为微服务架构提供一种简单有效的统一的API路由管理方式。同时,基于Filter链的方式提供了网关的基本功能,比如:鉴权、流量控制、熔断、路径重写、黑白名单、日志监控等。 基本功能…...
大数据学习栈记——Neo4j的安装与使用
本文介绍图数据库Neofj的安装与使用,操作系统:Ubuntu24.04,Neofj版本:2025.04.0。 Apt安装 Neofj可以进行官网安装:Neo4j Deployment Center - Graph Database & Analytics 我这里安装是添加软件源的方法 最新版…...
浅谈不同二分算法的查找情况
二分算法原理比较简单,但是实际的算法模板却有很多,这一切都源于二分查找问题中的复杂情况和二分算法的边界处理,以下是博主对一些二分算法查找的情况分析。 需要说明的是,以下二分算法都是基于有序序列为升序有序的情况…...
2025季度云服务器排行榜
在全球云服务器市场,各厂商的排名和地位并非一成不变,而是由其独特的优势、战略布局和市场适应性共同决定的。以下是根据2025年市场趋势,对主要云服务器厂商在排行榜中占据重要位置的原因和优势进行深度分析: 一、全球“三巨头”…...
Selenium常用函数介绍
目录 一,元素定位 1.1 cssSeector 1.2 xpath 二,操作测试对象 三,窗口 3.1 案例 3.2 窗口切换 3.3 窗口大小 3.4 屏幕截图 3.5 关闭窗口 四,弹窗 五,等待 六,导航 七,文件上传 …...
Linux nano命令的基本使用
参考资料 GNU nanoを使いこなすnano基础 目录 一. 简介二. 文件打开2.1 普通方式打开文件2.2 只读方式打开文件 三. 文件查看3.1 打开文件时,显示行号3.2 翻页查看 四. 文件编辑4.1 Ctrl K 复制 和 Ctrl U 粘贴4.2 Alt/Esc U 撤回 五. 文件保存与退出5.1 Ctrl …...
jmeter聚合报告中参数详解
sample、average、min、max、90%line、95%line,99%line、Error错误率、吞吐量Thoughput、KB/sec每秒传输的数据量 sample(样本数) 表示测试中发送的请求数量,即测试执行了多少次请求。 单位,以个或者次数表示。 示例:…...
淘宝扭蛋机小程序系统开发:打造互动性强的购物平台
淘宝扭蛋机小程序系统的开发,旨在打造一个互动性强的购物平台,让用户在购物的同时,能够享受到更多的乐趣和惊喜。 淘宝扭蛋机小程序系统拥有丰富的互动功能。用户可以通过虚拟摇杆操作扭蛋机,实现旋转、抽拉等动作,增…...
【Linux】Linux安装并配置RabbitMQ
目录 1. 安装 Erlang 2. 安装 RabbitMQ 2.1.添加 RabbitMQ 仓库 2.2.安装 RabbitMQ 3.配置 3.1.启动和管理服务 4. 访问管理界面 5.安装问题 6.修改密码 7.修改端口 7.1.找到文件 7.2.修改文件 1. 安装 Erlang 由于 RabbitMQ 是用 Erlang 编写的,需要先安…...
若依登录用户名和密码加密
/*** 获取公钥:前端用来密码加密* return*/GetMapping("/getPublicKey")public RSAUtil.RSAKeyPair getPublicKey() {return RSAUtil.rsaKeyPair();}新建RSAUti.Java package com.ruoyi.common.utils;import org.apache.commons.codec.binary.Base64; im…...
pgsql:还原数据库后出现重复序列导致“more than one owned sequence found“报错问题的解决
问题: pgsql数据库通过备份数据库文件进行还原时,如果表中有自增序列,还原后可能会出现重复的序列,此时若向表中插入新行时会出现“more than one owned sequence found”的报错提示。 点击菜单“其它”-》“序列”,…...
