当前位置: 首页 > news >正文

Elasticsearch 8.9 Master节点处理请求源码

大家看可以看ElasticSearch源码:Rest请求与Master节点处理流程(1)

在这里插入图片描述

这个图非常好,下午的讲解代码在各个类和方法之间流转,都体现这个图上

  • 一、Master节点处理请求的逻辑
    • 1、节点(数据节点)要和主节点进行通讯,需要继承自基类MasterNodeRequest
    • 2、Master节点处理来自客户端的请求(以创建索引请求举例)
      • (1)首先会找到RestHandler中创建索引的Action(RestCreateIndexAction)
      • (2)再执行继承自TransportMasterNodeAction的Action必须实现的masterOperation方法
  • 二、RestHander的Action如何映射到TransportMasterNodeAction(还是以创建索引举例)
    • 1、首先通过nodeClient执行doExecute()
    • 2、创建一个task任务异步执行TransportAction
    • 3、TransportMasterNodeAction中doExecute会通过线程池调用子类实现的masterOperation方法
    • 4、TransportCreateIndexAction的masterOperation实现会调用createIndexService接口创建索引

一、Master节点处理请求的逻辑

不是所有的请求都需要Master节点处理,但是有些请求必须让Master节点处理,比如创建index,下面的3就是用创建索引做的示例

1、节点(数据节点)要和主节点进行通讯,需要继承自基类MasterNodeRequest

主节点在 Elasticsearch 集群中负责集群的管理和协调工作。当节点需要执行某些操作时,它将创建相应的 MasterNodeRequest 实现类的实例,填充请求的参数和数据,并将其发送给主节点。主节点根据不同的 MasterNodeRequest 实现类的类型,执行相应的操作

/*** A based request for master based operation.* 在master上*/
public abstract class MasterNodeRequest<Request extends MasterNodeRequest<Request>> extends ActionRequest {public static final TimeValue DEFAULT_MASTER_NODE_TIMEOUT = TimeValue.timeValueSeconds(30);protected TimeValue masterNodeTimeout = DEFAULT_MASTER_NODE_TIMEOUT;protected MasterNodeRequest() {}protected MasterNodeRequest(StreamInput in) throws IOException {super(in);masterNodeTimeout = in.readTimeValue();}@Overridepublic void writeTo(StreamOutput out) throws IOException {super.writeTo(out);out.writeTimeValue(masterNodeTimeout);}/*** A timeout value in case the master has not been discovered yet or disconnected.*/@SuppressWarnings("unchecked")public final Request masterNodeTimeout(TimeValue timeout) {this.masterNodeTimeout = timeout;return (Request) this;}/*** A timeout value in case the master has not been discovered yet or disconnected.*/public final Request masterNodeTimeout(String timeout) {return masterNodeTimeout(TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".masterNodeTimeout"));}public final TimeValue masterNodeTimeout() {return this.masterNodeTimeout;}
}

这里有点模糊,后面学到数据节点向主节点请求或者同步什么时,我再挂个链接

2、Master节点处理来自客户端的请求(以创建索引请求举例)

(1)首先会找到RestHandler中创建索引的Action(RestCreateIndexAction)

至于请求如何找到RestCreateIndexAction的,可以参考Elasticsearch 8.9启动时构建接收Rest请求的hander过程源码

@ServerlessScope(Scope.PUBLIC)
public class RestCreateIndexAction extends BaseRestHandler {//省略代码  @Overridepublic List<Route> routes() {return List.of(new Route(PUT, "/{index}"));}@Overridepublic String getName() {return "create_index_action";}@Overridepublic RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {CreateIndexRequest createIndexRequest;if (request.getRestApiVersion() == RestApiVersion.V_7) {createIndexRequest = prepareRequestV7(request);} else {createIndexRequest = prepareRequest(request);}return channel -> client.admin().indices().create(createIndexRequest, new RestToXContentListener<>(channel));}//省略代码  
}    

(2)再执行继承自TransportMasterNodeAction的Action必须实现的masterOperation方法

TransportMasterNodeAction 主要用于处理来自节点的各种管理操作请求,如创建索引、删除索引、更新集群设置等。
当节点(数据节点)发送请求到主节点时,请求会被传递给相应的 TransportMasterNodeAction 实现类进行处理。实现类会根据请求的类型,执行相应的操作逻辑,并返回执行结果给主节点。


/*** 需要在主节点上执行的操作的基类。* A base class for operations that needs to be performed on the master node.**/
public abstract class TransportMasterNodeAction<Request extends MasterNodeRequest<Request>, Response extends ActionResponse> extendsHandledTransportAction<Request, Response>implementsActionWithReservedState<Request> {//省略代码     }
/*** 创建索引操作*/
public class TransportCreateIndexAction extends TransportMasterNodeAction<CreateIndexRequest, CreateIndexResponse> {@Overrideprotected void masterOperation(Task task,final CreateIndexRequest request,final ClusterState state,final ActionListener<CreateIndexResponse> listener) {//省略代码createIndexService.createIndex(updateRequest,listener.map(response -> new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)));}
}    

二、RestHander的Action如何映射到TransportMasterNodeAction(还是以创建索引举例)

这个场景为主节点和数据节点分离的情况

1、首先通过nodeClient执行doExecute()

client.admin().indices().createcreate方法调用IndicesAdmin类的create方法,再调用execute方法的入参是 CreateIndexAction.INSTANCE

static class IndicesAdmin implements IndicesAdminClient {@Overridepublic void create(final CreateIndexRequest request, final ActionListener<CreateIndexResponse> listener) {execute(CreateIndexAction.INSTANCE, request, listener);}@Overridepublic <Request extends ActionRequest, Response extends ActionResponse> ActionFuture<Response> execute(ActionType<Response> action,Request request) {return client.execute(action, request);}}

调用的是AbstractClientexecute方法

  /*** This is the single execution point of *all* clients.* 这是所有客户端的单个执行点。*/@Overridepublic final <Request extends ActionRequest, Response extends ActionResponse> void execute(ActionType<Response> action,Request request,ActionListener<Response> listener) {try {doExecute(action, request, listener);} catch (Exception e) {assert false : new AssertionError(e);listener.onFailure(e);}}

doExecute方法调用的是NodeClient类的方法

  @Overridepublic <Request extends ActionRequest, Response extends ActionResponse> void doExecute(ActionType<Response> action,Request request,ActionListener<Response> listener) {// Discard the task because the Client interface doesn't use it.try {executeLocally(action, request, listener);} catch (TaskCancelledException | IllegalArgumentException | IllegalStateException e) {listener.onFailure(e);}}/***在本地执行 {@link ActionType},返回用于跟踪它的 {@link Task},并链接 {@link ActionListener}。如果在侦听响应时不需要访问任务,则首选此方法。这是用于实现 {@link 客户端} 接口的方法。*/public <Request extends ActionRequest, Response extends ActionResponse> Task executeLocally(ActionType<Response> action,Request request,ActionListener<Response> listener) {//注册并执行任务return taskManager.registerAndExecute("transport",transportAction(action),request,localConnection,new SafelyWrappedActionListener<>(listener));}   

之后调用TaskManager.java的方法

2、创建一个task任务异步执行TransportAction

public <Request extends ActionRequest, Response extends ActionResponse> Task registerAndExecute(String type,TransportAction<Request, Response> action,Request request,Transport.Connection localConnection,ActionListener<Response> taskListener) { //检查请求是否有父任务,如果有,则注册子连接。final Releasable unregisterChildNode;if (request.getParentTask().isSet()) {unregisterChildNode = registerChildConnection(request.getParentTask().getId(), localConnection);} else {unregisterChildNode = null;}//创建一个新的跟踪上下文try (var ignored = threadPool.getThreadContext().newTraceContext()) {final Task task;//注册一个任务,并捕获可能的取消任务异常。try {task = register(type, action.actionName, request);} catch (TaskCancelledException e) {Releasables.close(unregisterChildNode);throw e;}//执行操作,并在操作完成时调用相应的监听器。action.execute(task, request, new ActionListener<>() {@Overridepublic void onResponse(Response response) {try {release();} finally {taskListener.onResponse(response);}}//根据操作的成功或失败情况,取消子任务并释放资源。@Overridepublic void onFailure(Exception e) {try {if (request.getParentTask().isSet()) {cancelChildLocal(request.getParentTask(), request.getRequestId(), e.toString());}release();} finally {taskListener.onFailure(e);}}@Overridepublic String toString() {return this.getClass().getName() + "{" + taskListener + "}{" + task + "}";}private void release() {Releasables.close(unregisterChildNode, () -> unregister(task));}});//返回任务对象。return task;}}

下面是TransportAction.java类中的方法

    /*** Use this method when the transport action should continue to run in the context of the current task* 当传输操作应继续在当前任务的上下文中运行时,请使用此方法*/public final void execute(Task task, Request request, ActionListener<Response> listener) {final ActionRequestValidationException validationException;//对请求进行验证,如果验证过程中出现异常,则记录错误日志并通知监听器执行失败。try {validationException = request.validate();} catch (Exception e) {assert false : new AssertionError("validating of request [" + request + "] threw exception", e);logger.warn("validating of request [" + request + "] threw exception", e);listener.onFailure(e);return;}if (validationException != null) {listener.onFailure(validationException);return;}//检查是否存在任务且请求需要存储结果,如果满足条件,则创建一个TaskResultStoringActionListener实例,用于在任务完成后将结果存储起来。if (task != null && request.getShouldStoreResult()) {listener = new TaskResultStoringActionListener<>(taskManager, task, listener);}//创建一个请求过滤器链(RequestFilterChain),然后调用proceed方法,将任务、动作名称、请求和监听器传递给过滤器链进行处理。RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger);requestFilterChain.proceed(task, actionName, request, listener);}
 @Overridepublic void proceed(Task task, String actionName, Request request, ActionListener<Response> listener) {int i = index.getAndIncrement();try {if (i < this.action.filters.length) {this.action.filters[i].apply(task, actionName, request, listener, this);} else if (i == this.action.filters.length) {//`this.action.doExecute(task, request, listener);` 中`action`对应的是`TransportMasterNodeAction`。this.action.doExecute(task, request, listener);} else {listener.onFailure(new IllegalStateException("proceed was called too many times"));}} catch (Exception e) {logger.trace("Error during transport action execution.", e);listener.onFailure(e);}}

this.action.doExecute(task, request, listener);action对应的是TransportMasterNodeAction

3、TransportMasterNodeAction中doExecute会通过线程池调用子类实现的masterOperation方法

TransportMasterNodeAction继承HandledTransportAction
HandledTransportAction继承自TransportAction

public abstract class TransportMasterNodeAction<Request extends MasterNodeRequest<Request>, Response extends ActionResponse> extendsHandledTransportAction<Request, Response>implementsActionWithReservedState<Request> {@Overrideprotected void doExecute(Task task, final Request request, ActionListener<Response> listener) {//省略代码new AsyncSingleAction(task, request, listener).doStart(state);}
}    
 protected void doStart(ClusterState clusterState) {threadPool.executor(executor).execute(ActionRunnable.wrap(delegate, l -> executeMasterOperation(task, request, clusterState, l)));}
private void executeMasterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener)throws Exception {//调用子类实现masterOperation(task, request, state, listener);}
//子类实现   
protected abstract void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener)throws Exception;

4、TransportCreateIndexAction的masterOperation实现会调用createIndexService接口创建索引

其中创建索引的actionTransportCreateIndexAction

 @Overrideprotected void masterOperation(Task task,final CreateIndexRequest request,final ClusterState state,final ActionListener<CreateIndexResponse> listener) {createIndexService.createIndex(updateRequest,listener.map(response -> new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)));
}

之后调用createIndexService.createIndex创建索引

相关文章:

Elasticsearch 8.9 Master节点处理请求源码

大家看可以看ElasticSearch源码&#xff1a;Rest请求与Master节点处理流程&#xff08;1&#xff09; 这个图非常好&#xff0c;下午的讲解代码在各个类和方法之间流转&#xff0c;都体现这个图上 一、Master节点处理请求的逻辑1、节点(数据节点)要和主节点进行通讯&#xff0…...

Python---练习:while循环案例:猜数字

需求&#xff1a; 计算机从1 ~ 10之间随机生成一个数字&#xff0c;然后提示输入数字&#xff0c;如果我们输入的数字与随机数相等&#xff0c;则提示恭喜你&#xff0c;答对了。如果输入的数字比随机数大&#xff0c;则提示&#xff0c;猜大了。反之&#xff0c;则提示猜小了…...

CRM自动化意味着什么?企业如何从中受益?

客户关系管理&#xff08;CRM&#xff09;软件不再仅仅适用于大公司或销售周期长的行业&#xff0c;它越来越成为各种规模企业的重要工具。 在日常工作中&#xff0c;当你陷入流程的所有细节时&#xff0c;可能会产生不必要的工作。因此&#xff0c;如果你想要CRM提供的组织和…...

Python大数据之PySpark

PySpark入门 1、 Spark与PySpark 1、 Spark与PySpark...

网工记背命令(7)----静态路由(负载分担,主备备份)

1.静态路由负载分担 如图所示&#xff0c;属于不同网段的主机通过几台 Switch 相连&#xff0c;要求不配置动态路由协议&#xff0c;使不同网 段的任意两台主机之间能够互通&#xff0c;从拓扑图中可以看出&#xff0c;从 PCA 到 PCC 有两条路径可以过去&#xff0c;分别是 PC…...

error: unable to read askpass response from

报错信息 解决方法&#xff1a; 中文&#xff1a;文件-->设置-->版本控制-->Git-->勾选使用凭证帮助程序 英文&#xff1a;File -> Settings -> Version Control -> Git / Check "User credential Helper" 因为我的webstrom是中文版的&#…...

运行stable-diffusion-xl-refiner-1.0遇到version `GLIBCXX_3.4.29‘ not found的问题

一、问题背景 https://huggingface.co/stabilityai/stable-diffusion-xl-refiner-1.0 在运行示例程序时候遇到GLIBCXX_3.4.29‘ not found diffusers to > 0.18.0 import torch from diffusers import StableDiffusionXLImg2ImgPipeline from diffusers.utils import loa…...

Ubuntu - 安装 Elasticsearch(ES)

注意&#xff1a;以下步骤基于 Elasticsearch 7.x 版本。版本可能会随时间而变化&#xff0c;请查看 Elasticsearch 官方网站以获取最新的版本信息。 添加 Elasticsearch APT 仓库&#xff1a; 打开终端&#xff0c;并使用以下命令添加 Elasticsearch APT 仓库到系统&#xf…...

字节码进阶之java Instrumentation原理详解

文章目录 0. 前言1. 基础2. Java Instrumentation API使用示例 3. Java Agent4. 字节码操作库5. 实际应用6. 注意事项和最佳实践 0. 前言 Java Instrumentation是Java API的一部分&#xff0c;它允许开发人员在运行时修改类的字节码。使用此功能&#xff0c;可以实现许多高级操…...

Android 13.0 锁屏页面禁止下拉状态栏

1.概述 在13.0的系统产品定制化中,在默认的锁屏界面的时候原生系统是可以下拉状态栏的,但是定制的产品是需要禁用下拉状态栏的,所以需要在锁屏页面的时候禁用下拉状态栏,需要从两部分查看下拉状态栏流程然后禁用状态栏 接下来就来分析下看这个功能怎么实现 2.锁屏页面禁止…...

Windows10 Docker 安装教程

Docker Desktop是什么&#xff1f; Docker Desktop是适用于Windows的Docker桌面&#xff0c;是Docker设计用于在Windows 10上运行。它是一个本地 Windows 应用程序&#xff0c;为构建、交付和运行dockerized应用程序提供易于使用的开发环境。Docker Desktop for Windows 使用 …...

JWT认证

目录 前言 JWT组成部分 JWT工作原理 在Express中使用JWT 安装JWT相关的包 导入JWT相关的包 定义密钥 登录成功后调用jwt.sign()生成JWT字符串 将JWT字符串还原为JSON对象 捕获解析JWT失败后产生的错误 结尾 前言 Session 认证机制需要配合 Cookie 才能实现。由于 Co…...

【网络安全 --- xss-labs靶场通关(1-10关)】详细的xss-labs靶场通关思路及技巧讲解,让你对xss漏洞的理解更深刻

靶场安装&#xff1a; 靶场安装请参考以下博客&#xff0c;既详细有提供工具&#xff1a; 【网络安全 --- xss-labs靶场】xss-labs靶场安装详细教程&#xff0c;让你巩固对xss漏洞的理解及绕过技巧和方法&#xff08;提供资源&#xff09;-CSDN博客【网络安全 --- xss-labs通…...

Mathematics-Vocabulary·数学专业英语词汇

点击查看: Mathematics-Vocabulary数学专业英语词汇点击查看: Mathematics-Vocabulary-Offline数学专业英语词汇离线版本 Chinese-English translation英译汉The study of mathematics in English requires understanding the subject-specific vocabulary and terminology. Ma…...

画程序流程图

一。在线程序流程图。类图和时序图 Integrations | Mermaid 二。VSCODE画UML图和各种种 1.下载plantuml.jarReleases plantuml/plantuml GitHubGenerate diagrams from textual description. Contribute to plantuml/plantuml development by creating an account on GitHu…...

C++ 模板进阶

非类型模板参数 模板参数分为&#xff1a;类型模板参数与非类型模板参数 类型模板参数即&#xff1a;出现在模板参数列表中&#xff0c;跟在class或者typename之后的参数类型名称非类型模板参数即&#xff1a;用一个常量作为类(函数)模板的一个参数&#xff0c;在类(函数)模…...

jenkins 安装与使用、用户权限划分

jenkins 安装与使用 安装插件&#xff1a; 开启该插件功能 验证用户管理 创建web01~02 使用web01登录 用户权限划分 安装 Role-Based Strategy 插件后&#xff0c;系统管理 中多了如图下所示的一个功能&#xff0c;用户权限的划分就是靠他来做的 创建角色 重新访问 创建项目…...

Hadoop3教程(三十三):(生产调优篇)慢磁盘监控与小文件归档

文章目录 &#xff08;161&#xff09;慢磁盘监控&#xff08;162&#xff09;小文件归档小文件过多的问题如何对小文件进行归档 参考文献 &#xff08;161&#xff09;慢磁盘监控 慢磁盘&#xff0c;是指写入数据时特别慢的一类磁盘。这种磁盘并不少见&#xff0c;当机器运行…...

物联网知识复习

物联网的内涵和体系结构 物联网的基本内涵 物联网的基本内涵在于物联&#xff0c;物物相连或者物和人相连的互联网。 也就是说&#xff0c;它是要由物主动发起的&#xff0c;物物互联的互联网。 它的第一层意思是说物和物相连&#xff1b;第二层意思是说物和人相连。 物联网的…...

Golang爬虫入门指南

引言 网络爬虫是一种自动化程序&#xff0c;用于从互联网上收集信息。随着互联网的迅速发展&#xff0c;爬虫技术在各行各业中越来越受欢迎。Golang作为一种高效、并发性好的编程语言&#xff0c;也逐渐成为爬虫开发的首选语言。本文将介绍使用Golang编写爬虫的基础知识和技巧…...

5分钟解决AutoCAD字体缺失问题:FontCenter智能字体管理插件完整指南

5分钟解决AutoCAD字体缺失问题&#xff1a;FontCenter智能字体管理插件完整指南 【免费下载链接】FontCenter AutoCAD自动管理字体插件 项目地址: https://gitcode.com/gh_mirrors/fo/FontCenter 还在为AutoCAD图纸中出现的问号和乱码文字而烦恼吗&#xff1f;FontCente…...

阿里RexUniNLU镜像部署详解:支持10+种任务的NLU全能手

阿里RexUniNLU镜像部署详解&#xff1a;支持10种任务的NLU全能手 1. 为什么选择RexUniNLU&#xff1f; 在自然语言处理领域&#xff0c;传统模型通常需要针对特定任务进行大量数据标注和微调&#xff0c;这不仅耗时耗力&#xff0c;还限制了模型的适用范围。阿里巴巴达摩院开…...

告别OFD兼容烦恼:3分钟掌握Ofd2Pdf轻松转换技巧

告别OFD兼容烦恼&#xff1a;3分钟掌握Ofd2Pdf轻松转换技巧 【免费下载链接】Ofd2Pdf Convert OFD files to PDF files. 项目地址: https://gitcode.com/gh_mirrors/ofd/Ofd2Pdf 在日常办公中&#xff0c;你是否经常遇到OFD文件打不开、无法打印或无法共享的困扰&#x…...

深度剖析:动态规划的分类及实例

如你所知&#xff0c;动态规划可以根据问题特性分为多种类型&#xff0c;以下是几种经典问题类型及对应的实例。背包问题背包问题是一种资源类问题&#xff0c;涉及在给定约束条件下如何最大化目标值。常见的是 0-1 背包、完全背包、多重背包。0-1 背包问题&#xff1a;每个物品…...

从零基础出发,全面掌握SEO优化技巧以提升网站流量

在学习SEO的过程中&#xff0c;了解内容的重要性是基础。内容不仅要有吸引力&#xff0c;而且要与目标受众的需求紧密结合。首先&#xff0c;确保内容的相关性&#xff0c;能够有效解答用户的问题是关键。其次&#xff0c;利用关键词策略&#xff0c;使目标用户能够更容易找到相…...

终极指南:如何在Kubernetes中部署NSwag实现容器化API文档服务

终极指南&#xff1a;如何在Kubernetes中部署NSwag实现容器化API文档服务 【免费下载链接】NSwag The Swagger/OpenAPI toolchain for .NET, ASP.NET Core and TypeScript. 项目地址: https://gitcode.com/gh_mirrors/ns/NSwag NSwag是.NET、ASP.NET Core和TypeScript的…...

别再硬编码了!用Avue的findObject方法动态更新表单选项(附完整代码示例)

动态表单进阶&#xff1a;Avue中findObject的实战应用与性能优化 在开发中后台管理系统时&#xff0c;表单的动态化需求几乎无处不在。想象这样一个场景&#xff1a;当用户选择不同租户时&#xff0c;角色、部门和岗位的选项需要实时变化。传统硬编码方式不仅难以维护&#xff…...

LIO-SAM在KITTI数据集上的性能调优与EVO评估深度解析:从数据预处理到结果分析

LIO-SAM在KITTI数据集上的性能调优与EVO评估深度解析&#xff1a;从数据预处理到结果分析 当谈到激光惯性里程计&#xff08;LIO&#xff09;系统在自动驾驶领域的应用时&#xff0c;KITTI数据集无疑是最具挑战性和权威性的测试平台之一。作为紧耦合激光惯性里程计算法的代表&a…...

暗黑3终极自动化指南:D3KeyHelper图形化宏工具5分钟快速上手教程

暗黑3终极自动化指南&#xff1a;D3KeyHelper图形化宏工具5分钟快速上手教程 【免费下载链接】D3keyHelper D3KeyHelper是一个有图形界面&#xff0c;可自定义配置的暗黑3鼠标宏工具。 项目地址: https://gitcode.com/gh_mirrors/d3/D3keyHelper D3KeyHelper是一款免费开…...

如何零侵入扩展《杀戮尖塔》:ModTheSpire模组加载器全解析

如何零侵入扩展《杀戮尖塔》&#xff1a;ModTheSpire模组加载器全解析 【免费下载链接】ModTheSpire External mod loader for Slay The Spire 项目地址: https://gitcode.com/gh_mirrors/mo/ModTheSpire 你是否曾经想在《杀戮尖塔》中添加新角色、新卡牌或改变游戏机制…...