Elasticsearch 8.9 Master节点处理请求源码
大家看可以看ElasticSearch源码:Rest请求与Master节点处理流程(1)

这个图非常好,下午的讲解代码在各个类和方法之间流转,都体现这个图上
- 一、Master节点处理请求的逻辑
- 1、节点(数据节点)要和主节点进行通讯,需要继承自基类MasterNodeRequest
- 2、Master节点处理来自客户端的请求(以创建索引请求举例)
- (1)首先会找到RestHandler中创建索引的Action(RestCreateIndexAction)
- (2)再执行继承自TransportMasterNodeAction的Action必须实现的masterOperation方法
- 二、RestHander的Action如何映射到TransportMasterNodeAction(还是以创建索引举例)
- 1、首先通过nodeClient执行doExecute()
- 2、创建一个task任务异步执行TransportAction
- 3、TransportMasterNodeAction中doExecute会通过线程池调用子类实现的masterOperation方法
- 4、TransportCreateIndexAction的masterOperation实现会调用createIndexService接口创建索引
一、Master节点处理请求的逻辑
不是所有的请求都需要
Master节点处理,但是有些请求必须让Master节点处理,比如创建index,下面的3就是用创建索引做的示例
1、节点(数据节点)要和主节点进行通讯,需要继承自基类MasterNodeRequest
主节点在
Elasticsearch集群中负责集群的管理和协调工作。当节点需要执行某些操作时,它将创建相应的MasterNodeRequest实现类的实例,填充请求的参数和数据,并将其发送给主节点。主节点根据不同的MasterNodeRequest实现类的类型,执行相应的操作
/*** A based request for master based operation.* 在master上*/
public abstract class MasterNodeRequest<Request extends MasterNodeRequest<Request>> extends ActionRequest {public static final TimeValue DEFAULT_MASTER_NODE_TIMEOUT = TimeValue.timeValueSeconds(30);protected TimeValue masterNodeTimeout = DEFAULT_MASTER_NODE_TIMEOUT;protected MasterNodeRequest() {}protected MasterNodeRequest(StreamInput in) throws IOException {super(in);masterNodeTimeout = in.readTimeValue();}@Overridepublic void writeTo(StreamOutput out) throws IOException {super.writeTo(out);out.writeTimeValue(masterNodeTimeout);}/*** A timeout value in case the master has not been discovered yet or disconnected.*/@SuppressWarnings("unchecked")public final Request masterNodeTimeout(TimeValue timeout) {this.masterNodeTimeout = timeout;return (Request) this;}/*** A timeout value in case the master has not been discovered yet or disconnected.*/public final Request masterNodeTimeout(String timeout) {return masterNodeTimeout(TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".masterNodeTimeout"));}public final TimeValue masterNodeTimeout() {return this.masterNodeTimeout;}
}
这里有点模糊,后面学到数据节点向主节点请求或者同步什么时,我再挂个链接
2、Master节点处理来自客户端的请求(以创建索引请求举例)
(1)首先会找到RestHandler中创建索引的Action(RestCreateIndexAction)
至于请求如何找到
RestCreateIndexAction的,可以参考Elasticsearch 8.9启动时构建接收Rest请求的hander过程源码
@ServerlessScope(Scope.PUBLIC)
public class RestCreateIndexAction extends BaseRestHandler {//省略代码 @Overridepublic List<Route> routes() {return List.of(new Route(PUT, "/{index}"));}@Overridepublic String getName() {return "create_index_action";}@Overridepublic RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {CreateIndexRequest createIndexRequest;if (request.getRestApiVersion() == RestApiVersion.V_7) {createIndexRequest = prepareRequestV7(request);} else {createIndexRequest = prepareRequest(request);}return channel -> client.admin().indices().create(createIndexRequest, new RestToXContentListener<>(channel));}//省略代码
}
(2)再执行继承自TransportMasterNodeAction的Action必须实现的masterOperation方法
TransportMasterNodeAction主要用于处理来自节点的各种管理操作请求,如创建索引、删除索引、更新集群设置等。
当节点(数据节点)发送请求到主节点时,请求会被传递给相应的TransportMasterNodeAction实现类进行处理。实现类会根据请求的类型,执行相应的操作逻辑,并返回执行结果给主节点。
/*** 需要在主节点上执行的操作的基类。* A base class for operations that needs to be performed on the master node.**/
public abstract class TransportMasterNodeAction<Request extends MasterNodeRequest<Request>, Response extends ActionResponse> extendsHandledTransportAction<Request, Response>implementsActionWithReservedState<Request> {//省略代码 }
/*** 创建索引操作*/
public class TransportCreateIndexAction extends TransportMasterNodeAction<CreateIndexRequest, CreateIndexResponse> {@Overrideprotected void masterOperation(Task task,final CreateIndexRequest request,final ClusterState state,final ActionListener<CreateIndexResponse> listener) {//省略代码createIndexService.createIndex(updateRequest,listener.map(response -> new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)));}
}
二、RestHander的Action如何映射到TransportMasterNodeAction(还是以创建索引举例)
这个场景为主节点和数据节点分离的情况
1、首先通过nodeClient执行doExecute()
client.admin().indices().create中create方法调用IndicesAdmin类的create方法,再调用execute方法的入参是 CreateIndexAction.INSTANCE
static class IndicesAdmin implements IndicesAdminClient {@Overridepublic void create(final CreateIndexRequest request, final ActionListener<CreateIndexResponse> listener) {execute(CreateIndexAction.INSTANCE, request, listener);}@Overridepublic <Request extends ActionRequest, Response extends ActionResponse> ActionFuture<Response> execute(ActionType<Response> action,Request request) {return client.execute(action, request);}}
调用的是AbstractClient的execute方法
/*** This is the single execution point of *all* clients.* 这是所有客户端的单个执行点。*/@Overridepublic final <Request extends ActionRequest, Response extends ActionResponse> void execute(ActionType<Response> action,Request request,ActionListener<Response> listener) {try {doExecute(action, request, listener);} catch (Exception e) {assert false : new AssertionError(e);listener.onFailure(e);}}
doExecute方法调用的是NodeClient类的方法
@Overridepublic <Request extends ActionRequest, Response extends ActionResponse> void doExecute(ActionType<Response> action,Request request,ActionListener<Response> listener) {// Discard the task because the Client interface doesn't use it.try {executeLocally(action, request, listener);} catch (TaskCancelledException | IllegalArgumentException | IllegalStateException e) {listener.onFailure(e);}}/***在本地执行 {@link ActionType},返回用于跟踪它的 {@link Task},并链接 {@link ActionListener}。如果在侦听响应时不需要访问任务,则首选此方法。这是用于实现 {@link 客户端} 接口的方法。*/public <Request extends ActionRequest, Response extends ActionResponse> Task executeLocally(ActionType<Response> action,Request request,ActionListener<Response> listener) {//注册并执行任务return taskManager.registerAndExecute("transport",transportAction(action),request,localConnection,new SafelyWrappedActionListener<>(listener));}
之后调用TaskManager.java的方法
2、创建一个task任务异步执行TransportAction
public <Request extends ActionRequest, Response extends ActionResponse> Task registerAndExecute(String type,TransportAction<Request, Response> action,Request request,Transport.Connection localConnection,ActionListener<Response> taskListener) { //检查请求是否有父任务,如果有,则注册子连接。final Releasable unregisterChildNode;if (request.getParentTask().isSet()) {unregisterChildNode = registerChildConnection(request.getParentTask().getId(), localConnection);} else {unregisterChildNode = null;}//创建一个新的跟踪上下文try (var ignored = threadPool.getThreadContext().newTraceContext()) {final Task task;//注册一个任务,并捕获可能的取消任务异常。try {task = register(type, action.actionName, request);} catch (TaskCancelledException e) {Releasables.close(unregisterChildNode);throw e;}//执行操作,并在操作完成时调用相应的监听器。action.execute(task, request, new ActionListener<>() {@Overridepublic void onResponse(Response response) {try {release();} finally {taskListener.onResponse(response);}}//根据操作的成功或失败情况,取消子任务并释放资源。@Overridepublic void onFailure(Exception e) {try {if (request.getParentTask().isSet()) {cancelChildLocal(request.getParentTask(), request.getRequestId(), e.toString());}release();} finally {taskListener.onFailure(e);}}@Overridepublic String toString() {return this.getClass().getName() + "{" + taskListener + "}{" + task + "}";}private void release() {Releasables.close(unregisterChildNode, () -> unregister(task));}});//返回任务对象。return task;}}
下面是TransportAction.java类中的方法
/*** Use this method when the transport action should continue to run in the context of the current task* 当传输操作应继续在当前任务的上下文中运行时,请使用此方法*/public final void execute(Task task, Request request, ActionListener<Response> listener) {final ActionRequestValidationException validationException;//对请求进行验证,如果验证过程中出现异常,则记录错误日志并通知监听器执行失败。try {validationException = request.validate();} catch (Exception e) {assert false : new AssertionError("validating of request [" + request + "] threw exception", e);logger.warn("validating of request [" + request + "] threw exception", e);listener.onFailure(e);return;}if (validationException != null) {listener.onFailure(validationException);return;}//检查是否存在任务且请求需要存储结果,如果满足条件,则创建一个TaskResultStoringActionListener实例,用于在任务完成后将结果存储起来。if (task != null && request.getShouldStoreResult()) {listener = new TaskResultStoringActionListener<>(taskManager, task, listener);}//创建一个请求过滤器链(RequestFilterChain),然后调用proceed方法,将任务、动作名称、请求和监听器传递给过滤器链进行处理。RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger);requestFilterChain.proceed(task, actionName, request, listener);}
@Overridepublic void proceed(Task task, String actionName, Request request, ActionListener<Response> listener) {int i = index.getAndIncrement();try {if (i < this.action.filters.length) {this.action.filters[i].apply(task, actionName, request, listener, this);} else if (i == this.action.filters.length) {//`this.action.doExecute(task, request, listener);` 中`action`对应的是`TransportMasterNodeAction`。this.action.doExecute(task, request, listener);} else {listener.onFailure(new IllegalStateException("proceed was called too many times"));}} catch (Exception e) {logger.trace("Error during transport action execution.", e);listener.onFailure(e);}}
this.action.doExecute(task, request, listener); 中action对应的是TransportMasterNodeAction
3、TransportMasterNodeAction中doExecute会通过线程池调用子类实现的masterOperation方法
TransportMasterNodeAction继承HandledTransportAction,
而HandledTransportAction继承自TransportAction
public abstract class TransportMasterNodeAction<Request extends MasterNodeRequest<Request>, Response extends ActionResponse> extendsHandledTransportAction<Request, Response>implementsActionWithReservedState<Request> {@Overrideprotected void doExecute(Task task, final Request request, ActionListener<Response> listener) {//省略代码new AsyncSingleAction(task, request, listener).doStart(state);}
}
protected void doStart(ClusterState clusterState) {threadPool.executor(executor).execute(ActionRunnable.wrap(delegate, l -> executeMasterOperation(task, request, clusterState, l)));}
private void executeMasterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener)throws Exception {//调用子类实现masterOperation(task, request, state, listener);}
//子类实现
protected abstract void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener)throws Exception;
4、TransportCreateIndexAction的masterOperation实现会调用createIndexService接口创建索引
其中创建索引的action是TransportCreateIndexAction
@Overrideprotected void masterOperation(Task task,final CreateIndexRequest request,final ClusterState state,final ActionListener<CreateIndexResponse> listener) {createIndexService.createIndex(updateRequest,listener.map(response -> new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)));
}
之后调用createIndexService.createIndex创建索引
相关文章:
Elasticsearch 8.9 Master节点处理请求源码
大家看可以看ElasticSearch源码:Rest请求与Master节点处理流程(1) 这个图非常好,下午的讲解代码在各个类和方法之间流转,都体现这个图上 一、Master节点处理请求的逻辑1、节点(数据节点)要和主节点进行通讯࿰…...
Python---练习:while循环案例:猜数字
需求: 计算机从1 ~ 10之间随机生成一个数字,然后提示输入数字,如果我们输入的数字与随机数相等,则提示恭喜你,答对了。如果输入的数字比随机数大,则提示,猜大了。反之,则提示猜小了…...
CRM自动化意味着什么?企业如何从中受益?
客户关系管理(CRM)软件不再仅仅适用于大公司或销售周期长的行业,它越来越成为各种规模企业的重要工具。 在日常工作中,当你陷入流程的所有细节时,可能会产生不必要的工作。因此,如果你想要CRM提供的组织和…...
Python大数据之PySpark
PySpark入门 1、 Spark与PySpark 1、 Spark与PySpark...
网工记背命令(7)----静态路由(负载分担,主备备份)
1.静态路由负载分担 如图所示,属于不同网段的主机通过几台 Switch 相连,要求不配置动态路由协议,使不同网 段的任意两台主机之间能够互通,从拓扑图中可以看出,从 PCA 到 PCC 有两条路径可以过去,分别是 PC…...
error: unable to read askpass response from
报错信息 解决方法: 中文:文件-->设置-->版本控制-->Git-->勾选使用凭证帮助程序 英文:File -> Settings -> Version Control -> Git / Check "User credential Helper" 因为我的webstrom是中文版的&#…...
运行stable-diffusion-xl-refiner-1.0遇到version `GLIBCXX_3.4.29‘ not found的问题
一、问题背景 https://huggingface.co/stabilityai/stable-diffusion-xl-refiner-1.0 在运行示例程序时候遇到GLIBCXX_3.4.29‘ not found diffusers to > 0.18.0 import torch from diffusers import StableDiffusionXLImg2ImgPipeline from diffusers.utils import loa…...
Ubuntu - 安装 Elasticsearch(ES)
注意:以下步骤基于 Elasticsearch 7.x 版本。版本可能会随时间而变化,请查看 Elasticsearch 官方网站以获取最新的版本信息。 添加 Elasticsearch APT 仓库: 打开终端,并使用以下命令添加 Elasticsearch APT 仓库到系统…...
字节码进阶之java Instrumentation原理详解
文章目录 0. 前言1. 基础2. Java Instrumentation API使用示例 3. Java Agent4. 字节码操作库5. 实际应用6. 注意事项和最佳实践 0. 前言 Java Instrumentation是Java API的一部分,它允许开发人员在运行时修改类的字节码。使用此功能,可以实现许多高级操…...
Android 13.0 锁屏页面禁止下拉状态栏
1.概述 在13.0的系统产品定制化中,在默认的锁屏界面的时候原生系统是可以下拉状态栏的,但是定制的产品是需要禁用下拉状态栏的,所以需要在锁屏页面的时候禁用下拉状态栏,需要从两部分查看下拉状态栏流程然后禁用状态栏 接下来就来分析下看这个功能怎么实现 2.锁屏页面禁止…...
Windows10 Docker 安装教程
Docker Desktop是什么? Docker Desktop是适用于Windows的Docker桌面,是Docker设计用于在Windows 10上运行。它是一个本地 Windows 应用程序,为构建、交付和运行dockerized应用程序提供易于使用的开发环境。Docker Desktop for Windows 使用 …...
JWT认证
目录 前言 JWT组成部分 JWT工作原理 在Express中使用JWT 安装JWT相关的包 导入JWT相关的包 定义密钥 登录成功后调用jwt.sign()生成JWT字符串 将JWT字符串还原为JSON对象 捕获解析JWT失败后产生的错误 结尾 前言 Session 认证机制需要配合 Cookie 才能实现。由于 Co…...
【网络安全 --- xss-labs靶场通关(1-10关)】详细的xss-labs靶场通关思路及技巧讲解,让你对xss漏洞的理解更深刻
靶场安装: 靶场安装请参考以下博客,既详细有提供工具: 【网络安全 --- xss-labs靶场】xss-labs靶场安装详细教程,让你巩固对xss漏洞的理解及绕过技巧和方法(提供资源)-CSDN博客【网络安全 --- xss-labs通…...
Mathematics-Vocabulary·数学专业英语词汇
点击查看: Mathematics-Vocabulary数学专业英语词汇点击查看: Mathematics-Vocabulary-Offline数学专业英语词汇离线版本 Chinese-English translation英译汉The study of mathematics in English requires understanding the subject-specific vocabulary and terminology. Ma…...
画程序流程图
一。在线程序流程图。类图和时序图 Integrations | Mermaid 二。VSCODE画UML图和各种种 1.下载plantuml.jarReleases plantuml/plantuml GitHubGenerate diagrams from textual description. Contribute to plantuml/plantuml development by creating an account on GitHu…...
C++ 模板进阶
非类型模板参数 模板参数分为:类型模板参数与非类型模板参数 类型模板参数即:出现在模板参数列表中,跟在class或者typename之后的参数类型名称非类型模板参数即:用一个常量作为类(函数)模板的一个参数,在类(函数)模…...
jenkins 安装与使用、用户权限划分
jenkins 安装与使用 安装插件: 开启该插件功能 验证用户管理 创建web01~02 使用web01登录 用户权限划分 安装 Role-Based Strategy 插件后,系统管理 中多了如图下所示的一个功能,用户权限的划分就是靠他来做的 创建角色 重新访问 创建项目…...
Hadoop3教程(三十三):(生产调优篇)慢磁盘监控与小文件归档
文章目录 (161)慢磁盘监控(162)小文件归档小文件过多的问题如何对小文件进行归档 参考文献 (161)慢磁盘监控 慢磁盘,是指写入数据时特别慢的一类磁盘。这种磁盘并不少见,当机器运行…...
物联网知识复习
物联网的内涵和体系结构 物联网的基本内涵 物联网的基本内涵在于物联,物物相连或者物和人相连的互联网。 也就是说,它是要由物主动发起的,物物互联的互联网。 它的第一层意思是说物和物相连;第二层意思是说物和人相连。 物联网的…...
Golang爬虫入门指南
引言 网络爬虫是一种自动化程序,用于从互联网上收集信息。随着互联网的迅速发展,爬虫技术在各行各业中越来越受欢迎。Golang作为一种高效、并发性好的编程语言,也逐渐成为爬虫开发的首选语言。本文将介绍使用Golang编写爬虫的基础知识和技巧…...
5分钟解决AutoCAD字体缺失问题:FontCenter智能字体管理插件完整指南
5分钟解决AutoCAD字体缺失问题:FontCenter智能字体管理插件完整指南 【免费下载链接】FontCenter AutoCAD自动管理字体插件 项目地址: https://gitcode.com/gh_mirrors/fo/FontCenter 还在为AutoCAD图纸中出现的问号和乱码文字而烦恼吗?FontCente…...
阿里RexUniNLU镜像部署详解:支持10+种任务的NLU全能手
阿里RexUniNLU镜像部署详解:支持10种任务的NLU全能手 1. 为什么选择RexUniNLU? 在自然语言处理领域,传统模型通常需要针对特定任务进行大量数据标注和微调,这不仅耗时耗力,还限制了模型的适用范围。阿里巴巴达摩院开…...
告别OFD兼容烦恼:3分钟掌握Ofd2Pdf轻松转换技巧
告别OFD兼容烦恼:3分钟掌握Ofd2Pdf轻松转换技巧 【免费下载链接】Ofd2Pdf Convert OFD files to PDF files. 项目地址: https://gitcode.com/gh_mirrors/ofd/Ofd2Pdf 在日常办公中,你是否经常遇到OFD文件打不开、无法打印或无法共享的困扰&#x…...
深度剖析:动态规划的分类及实例
如你所知,动态规划可以根据问题特性分为多种类型,以下是几种经典问题类型及对应的实例。背包问题背包问题是一种资源类问题,涉及在给定约束条件下如何最大化目标值。常见的是 0-1 背包、完全背包、多重背包。0-1 背包问题:每个物品…...
从零基础出发,全面掌握SEO优化技巧以提升网站流量
在学习SEO的过程中,了解内容的重要性是基础。内容不仅要有吸引力,而且要与目标受众的需求紧密结合。首先,确保内容的相关性,能够有效解答用户的问题是关键。其次,利用关键词策略,使目标用户能够更容易找到相…...
终极指南:如何在Kubernetes中部署NSwag实现容器化API文档服务
终极指南:如何在Kubernetes中部署NSwag实现容器化API文档服务 【免费下载链接】NSwag The Swagger/OpenAPI toolchain for .NET, ASP.NET Core and TypeScript. 项目地址: https://gitcode.com/gh_mirrors/ns/NSwag NSwag是.NET、ASP.NET Core和TypeScript的…...
别再硬编码了!用Avue的findObject方法动态更新表单选项(附完整代码示例)
动态表单进阶:Avue中findObject的实战应用与性能优化 在开发中后台管理系统时,表单的动态化需求几乎无处不在。想象这样一个场景:当用户选择不同租户时,角色、部门和岗位的选项需要实时变化。传统硬编码方式不仅难以维护ÿ…...
LIO-SAM在KITTI数据集上的性能调优与EVO评估深度解析:从数据预处理到结果分析
LIO-SAM在KITTI数据集上的性能调优与EVO评估深度解析:从数据预处理到结果分析 当谈到激光惯性里程计(LIO)系统在自动驾驶领域的应用时,KITTI数据集无疑是最具挑战性和权威性的测试平台之一。作为紧耦合激光惯性里程计算法的代表&a…...
暗黑3终极自动化指南:D3KeyHelper图形化宏工具5分钟快速上手教程
暗黑3终极自动化指南:D3KeyHelper图形化宏工具5分钟快速上手教程 【免费下载链接】D3keyHelper D3KeyHelper是一个有图形界面,可自定义配置的暗黑3鼠标宏工具。 项目地址: https://gitcode.com/gh_mirrors/d3/D3keyHelper D3KeyHelper是一款免费开…...
如何零侵入扩展《杀戮尖塔》:ModTheSpire模组加载器全解析
如何零侵入扩展《杀戮尖塔》:ModTheSpire模组加载器全解析 【免费下载链接】ModTheSpire External mod loader for Slay The Spire 项目地址: https://gitcode.com/gh_mirrors/mo/ModTheSpire 你是否曾经想在《杀戮尖塔》中添加新角色、新卡牌或改变游戏机制…...
