当前位置: 首页 > news >正文

Elasticsearch 8.9 Master节点处理请求源码

大家看可以看ElasticSearch源码:Rest请求与Master节点处理流程(1)

在这里插入图片描述

这个图非常好,下午的讲解代码在各个类和方法之间流转,都体现这个图上

  • 一、Master节点处理请求的逻辑
    • 1、节点(数据节点)要和主节点进行通讯,需要继承自基类MasterNodeRequest
    • 2、Master节点处理来自客户端的请求(以创建索引请求举例)
      • (1)首先会找到RestHandler中创建索引的Action(RestCreateIndexAction)
      • (2)再执行继承自TransportMasterNodeAction的Action必须实现的masterOperation方法
  • 二、RestHander的Action如何映射到TransportMasterNodeAction(还是以创建索引举例)
    • 1、首先通过nodeClient执行doExecute()
    • 2、创建一个task任务异步执行TransportAction
    • 3、TransportMasterNodeAction中doExecute会通过线程池调用子类实现的masterOperation方法
    • 4、TransportCreateIndexAction的masterOperation实现会调用createIndexService接口创建索引

一、Master节点处理请求的逻辑

不是所有的请求都需要Master节点处理,但是有些请求必须让Master节点处理,比如创建index,下面的3就是用创建索引做的示例

1、节点(数据节点)要和主节点进行通讯,需要继承自基类MasterNodeRequest

主节点在 Elasticsearch 集群中负责集群的管理和协调工作。当节点需要执行某些操作时,它将创建相应的 MasterNodeRequest 实现类的实例,填充请求的参数和数据,并将其发送给主节点。主节点根据不同的 MasterNodeRequest 实现类的类型,执行相应的操作

/*** A based request for master based operation.* 在master上*/
public abstract class MasterNodeRequest<Request extends MasterNodeRequest<Request>> extends ActionRequest {public static final TimeValue DEFAULT_MASTER_NODE_TIMEOUT = TimeValue.timeValueSeconds(30);protected TimeValue masterNodeTimeout = DEFAULT_MASTER_NODE_TIMEOUT;protected MasterNodeRequest() {}protected MasterNodeRequest(StreamInput in) throws IOException {super(in);masterNodeTimeout = in.readTimeValue();}@Overridepublic void writeTo(StreamOutput out) throws IOException {super.writeTo(out);out.writeTimeValue(masterNodeTimeout);}/*** A timeout value in case the master has not been discovered yet or disconnected.*/@SuppressWarnings("unchecked")public final Request masterNodeTimeout(TimeValue timeout) {this.masterNodeTimeout = timeout;return (Request) this;}/*** A timeout value in case the master has not been discovered yet or disconnected.*/public final Request masterNodeTimeout(String timeout) {return masterNodeTimeout(TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".masterNodeTimeout"));}public final TimeValue masterNodeTimeout() {return this.masterNodeTimeout;}
}

这里有点模糊,后面学到数据节点向主节点请求或者同步什么时,我再挂个链接

2、Master节点处理来自客户端的请求(以创建索引请求举例)

(1)首先会找到RestHandler中创建索引的Action(RestCreateIndexAction)

至于请求如何找到RestCreateIndexAction的,可以参考Elasticsearch 8.9启动时构建接收Rest请求的hander过程源码

@ServerlessScope(Scope.PUBLIC)
public class RestCreateIndexAction extends BaseRestHandler {//省略代码  @Overridepublic List<Route> routes() {return List.of(new Route(PUT, "/{index}"));}@Overridepublic String getName() {return "create_index_action";}@Overridepublic RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {CreateIndexRequest createIndexRequest;if (request.getRestApiVersion() == RestApiVersion.V_7) {createIndexRequest = prepareRequestV7(request);} else {createIndexRequest = prepareRequest(request);}return channel -> client.admin().indices().create(createIndexRequest, new RestToXContentListener<>(channel));}//省略代码  
}    

(2)再执行继承自TransportMasterNodeAction的Action必须实现的masterOperation方法

TransportMasterNodeAction 主要用于处理来自节点的各种管理操作请求,如创建索引、删除索引、更新集群设置等。
当节点(数据节点)发送请求到主节点时,请求会被传递给相应的 TransportMasterNodeAction 实现类进行处理。实现类会根据请求的类型,执行相应的操作逻辑,并返回执行结果给主节点。


/*** 需要在主节点上执行的操作的基类。* A base class for operations that needs to be performed on the master node.**/
public abstract class TransportMasterNodeAction<Request extends MasterNodeRequest<Request>, Response extends ActionResponse> extendsHandledTransportAction<Request, Response>implementsActionWithReservedState<Request> {//省略代码     }
/*** 创建索引操作*/
public class TransportCreateIndexAction extends TransportMasterNodeAction<CreateIndexRequest, CreateIndexResponse> {@Overrideprotected void masterOperation(Task task,final CreateIndexRequest request,final ClusterState state,final ActionListener<CreateIndexResponse> listener) {//省略代码createIndexService.createIndex(updateRequest,listener.map(response -> new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)));}
}    

二、RestHander的Action如何映射到TransportMasterNodeAction(还是以创建索引举例)

这个场景为主节点和数据节点分离的情况

1、首先通过nodeClient执行doExecute()

client.admin().indices().createcreate方法调用IndicesAdmin类的create方法,再调用execute方法的入参是 CreateIndexAction.INSTANCE

static class IndicesAdmin implements IndicesAdminClient {@Overridepublic void create(final CreateIndexRequest request, final ActionListener<CreateIndexResponse> listener) {execute(CreateIndexAction.INSTANCE, request, listener);}@Overridepublic <Request extends ActionRequest, Response extends ActionResponse> ActionFuture<Response> execute(ActionType<Response> action,Request request) {return client.execute(action, request);}}

调用的是AbstractClientexecute方法

  /*** This is the single execution point of *all* clients.* 这是所有客户端的单个执行点。*/@Overridepublic final <Request extends ActionRequest, Response extends ActionResponse> void execute(ActionType<Response> action,Request request,ActionListener<Response> listener) {try {doExecute(action, request, listener);} catch (Exception e) {assert false : new AssertionError(e);listener.onFailure(e);}}

doExecute方法调用的是NodeClient类的方法

  @Overridepublic <Request extends ActionRequest, Response extends ActionResponse> void doExecute(ActionType<Response> action,Request request,ActionListener<Response> listener) {// Discard the task because the Client interface doesn't use it.try {executeLocally(action, request, listener);} catch (TaskCancelledException | IllegalArgumentException | IllegalStateException e) {listener.onFailure(e);}}/***在本地执行 {@link ActionType},返回用于跟踪它的 {@link Task},并链接 {@link ActionListener}。如果在侦听响应时不需要访问任务,则首选此方法。这是用于实现 {@link 客户端} 接口的方法。*/public <Request extends ActionRequest, Response extends ActionResponse> Task executeLocally(ActionType<Response> action,Request request,ActionListener<Response> listener) {//注册并执行任务return taskManager.registerAndExecute("transport",transportAction(action),request,localConnection,new SafelyWrappedActionListener<>(listener));}   

之后调用TaskManager.java的方法

2、创建一个task任务异步执行TransportAction

public <Request extends ActionRequest, Response extends ActionResponse> Task registerAndExecute(String type,TransportAction<Request, Response> action,Request request,Transport.Connection localConnection,ActionListener<Response> taskListener) { //检查请求是否有父任务,如果有,则注册子连接。final Releasable unregisterChildNode;if (request.getParentTask().isSet()) {unregisterChildNode = registerChildConnection(request.getParentTask().getId(), localConnection);} else {unregisterChildNode = null;}//创建一个新的跟踪上下文try (var ignored = threadPool.getThreadContext().newTraceContext()) {final Task task;//注册一个任务,并捕获可能的取消任务异常。try {task = register(type, action.actionName, request);} catch (TaskCancelledException e) {Releasables.close(unregisterChildNode);throw e;}//执行操作,并在操作完成时调用相应的监听器。action.execute(task, request, new ActionListener<>() {@Overridepublic void onResponse(Response response) {try {release();} finally {taskListener.onResponse(response);}}//根据操作的成功或失败情况,取消子任务并释放资源。@Overridepublic void onFailure(Exception e) {try {if (request.getParentTask().isSet()) {cancelChildLocal(request.getParentTask(), request.getRequestId(), e.toString());}release();} finally {taskListener.onFailure(e);}}@Overridepublic String toString() {return this.getClass().getName() + "{" + taskListener + "}{" + task + "}";}private void release() {Releasables.close(unregisterChildNode, () -> unregister(task));}});//返回任务对象。return task;}}

下面是TransportAction.java类中的方法

    /*** Use this method when the transport action should continue to run in the context of the current task* 当传输操作应继续在当前任务的上下文中运行时,请使用此方法*/public final void execute(Task task, Request request, ActionListener<Response> listener) {final ActionRequestValidationException validationException;//对请求进行验证,如果验证过程中出现异常,则记录错误日志并通知监听器执行失败。try {validationException = request.validate();} catch (Exception e) {assert false : new AssertionError("validating of request [" + request + "] threw exception", e);logger.warn("validating of request [" + request + "] threw exception", e);listener.onFailure(e);return;}if (validationException != null) {listener.onFailure(validationException);return;}//检查是否存在任务且请求需要存储结果,如果满足条件,则创建一个TaskResultStoringActionListener实例,用于在任务完成后将结果存储起来。if (task != null && request.getShouldStoreResult()) {listener = new TaskResultStoringActionListener<>(taskManager, task, listener);}//创建一个请求过滤器链(RequestFilterChain),然后调用proceed方法,将任务、动作名称、请求和监听器传递给过滤器链进行处理。RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger);requestFilterChain.proceed(task, actionName, request, listener);}
 @Overridepublic void proceed(Task task, String actionName, Request request, ActionListener<Response> listener) {int i = index.getAndIncrement();try {if (i < this.action.filters.length) {this.action.filters[i].apply(task, actionName, request, listener, this);} else if (i == this.action.filters.length) {//`this.action.doExecute(task, request, listener);` 中`action`对应的是`TransportMasterNodeAction`。this.action.doExecute(task, request, listener);} else {listener.onFailure(new IllegalStateException("proceed was called too many times"));}} catch (Exception e) {logger.trace("Error during transport action execution.", e);listener.onFailure(e);}}

this.action.doExecute(task, request, listener);action对应的是TransportMasterNodeAction

3、TransportMasterNodeAction中doExecute会通过线程池调用子类实现的masterOperation方法

TransportMasterNodeAction继承HandledTransportAction
HandledTransportAction继承自TransportAction

public abstract class TransportMasterNodeAction<Request extends MasterNodeRequest<Request>, Response extends ActionResponse> extendsHandledTransportAction<Request, Response>implementsActionWithReservedState<Request> {@Overrideprotected void doExecute(Task task, final Request request, ActionListener<Response> listener) {//省略代码new AsyncSingleAction(task, request, listener).doStart(state);}
}    
 protected void doStart(ClusterState clusterState) {threadPool.executor(executor).execute(ActionRunnable.wrap(delegate, l -> executeMasterOperation(task, request, clusterState, l)));}
private void executeMasterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener)throws Exception {//调用子类实现masterOperation(task, request, state, listener);}
//子类实现   
protected abstract void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener)throws Exception;

4、TransportCreateIndexAction的masterOperation实现会调用createIndexService接口创建索引

其中创建索引的actionTransportCreateIndexAction

 @Overrideprotected void masterOperation(Task task,final CreateIndexRequest request,final ClusterState state,final ActionListener<CreateIndexResponse> listener) {createIndexService.createIndex(updateRequest,listener.map(response -> new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)));
}

之后调用createIndexService.createIndex创建索引

相关文章:

Elasticsearch 8.9 Master节点处理请求源码

大家看可以看ElasticSearch源码&#xff1a;Rest请求与Master节点处理流程&#xff08;1&#xff09; 这个图非常好&#xff0c;下午的讲解代码在各个类和方法之间流转&#xff0c;都体现这个图上 一、Master节点处理请求的逻辑1、节点(数据节点)要和主节点进行通讯&#xff0…...

Python---练习:while循环案例:猜数字

需求&#xff1a; 计算机从1 ~ 10之间随机生成一个数字&#xff0c;然后提示输入数字&#xff0c;如果我们输入的数字与随机数相等&#xff0c;则提示恭喜你&#xff0c;答对了。如果输入的数字比随机数大&#xff0c;则提示&#xff0c;猜大了。反之&#xff0c;则提示猜小了…...

CRM自动化意味着什么?企业如何从中受益?

客户关系管理&#xff08;CRM&#xff09;软件不再仅仅适用于大公司或销售周期长的行业&#xff0c;它越来越成为各种规模企业的重要工具。 在日常工作中&#xff0c;当你陷入流程的所有细节时&#xff0c;可能会产生不必要的工作。因此&#xff0c;如果你想要CRM提供的组织和…...

Python大数据之PySpark

PySpark入门 1、 Spark与PySpark 1、 Spark与PySpark...

网工记背命令(7)----静态路由(负载分担,主备备份)

1.静态路由负载分担 如图所示&#xff0c;属于不同网段的主机通过几台 Switch 相连&#xff0c;要求不配置动态路由协议&#xff0c;使不同网 段的任意两台主机之间能够互通&#xff0c;从拓扑图中可以看出&#xff0c;从 PCA 到 PCC 有两条路径可以过去&#xff0c;分别是 PC…...

error: unable to read askpass response from

报错信息 解决方法&#xff1a; 中文&#xff1a;文件-->设置-->版本控制-->Git-->勾选使用凭证帮助程序 英文&#xff1a;File -> Settings -> Version Control -> Git / Check "User credential Helper" 因为我的webstrom是中文版的&#…...

运行stable-diffusion-xl-refiner-1.0遇到version `GLIBCXX_3.4.29‘ not found的问题

一、问题背景 https://huggingface.co/stabilityai/stable-diffusion-xl-refiner-1.0 在运行示例程序时候遇到GLIBCXX_3.4.29‘ not found diffusers to > 0.18.0 import torch from diffusers import StableDiffusionXLImg2ImgPipeline from diffusers.utils import loa…...

Ubuntu - 安装 Elasticsearch(ES)

注意&#xff1a;以下步骤基于 Elasticsearch 7.x 版本。版本可能会随时间而变化&#xff0c;请查看 Elasticsearch 官方网站以获取最新的版本信息。 添加 Elasticsearch APT 仓库&#xff1a; 打开终端&#xff0c;并使用以下命令添加 Elasticsearch APT 仓库到系统&#xf…...

字节码进阶之java Instrumentation原理详解

文章目录 0. 前言1. 基础2. Java Instrumentation API使用示例 3. Java Agent4. 字节码操作库5. 实际应用6. 注意事项和最佳实践 0. 前言 Java Instrumentation是Java API的一部分&#xff0c;它允许开发人员在运行时修改类的字节码。使用此功能&#xff0c;可以实现许多高级操…...

Android 13.0 锁屏页面禁止下拉状态栏

1.概述 在13.0的系统产品定制化中,在默认的锁屏界面的时候原生系统是可以下拉状态栏的,但是定制的产品是需要禁用下拉状态栏的,所以需要在锁屏页面的时候禁用下拉状态栏,需要从两部分查看下拉状态栏流程然后禁用状态栏 接下来就来分析下看这个功能怎么实现 2.锁屏页面禁止…...

Windows10 Docker 安装教程

Docker Desktop是什么&#xff1f; Docker Desktop是适用于Windows的Docker桌面&#xff0c;是Docker设计用于在Windows 10上运行。它是一个本地 Windows 应用程序&#xff0c;为构建、交付和运行dockerized应用程序提供易于使用的开发环境。Docker Desktop for Windows 使用 …...

JWT认证

目录 前言 JWT组成部分 JWT工作原理 在Express中使用JWT 安装JWT相关的包 导入JWT相关的包 定义密钥 登录成功后调用jwt.sign()生成JWT字符串 将JWT字符串还原为JSON对象 捕获解析JWT失败后产生的错误 结尾 前言 Session 认证机制需要配合 Cookie 才能实现。由于 Co…...

【网络安全 --- xss-labs靶场通关(1-10关)】详细的xss-labs靶场通关思路及技巧讲解,让你对xss漏洞的理解更深刻

靶场安装&#xff1a; 靶场安装请参考以下博客&#xff0c;既详细有提供工具&#xff1a; 【网络安全 --- xss-labs靶场】xss-labs靶场安装详细教程&#xff0c;让你巩固对xss漏洞的理解及绕过技巧和方法&#xff08;提供资源&#xff09;-CSDN博客【网络安全 --- xss-labs通…...

Mathematics-Vocabulary·数学专业英语词汇

点击查看: Mathematics-Vocabulary数学专业英语词汇点击查看: Mathematics-Vocabulary-Offline数学专业英语词汇离线版本 Chinese-English translation英译汉The study of mathematics in English requires understanding the subject-specific vocabulary and terminology. Ma…...

画程序流程图

一。在线程序流程图。类图和时序图 Integrations | Mermaid 二。VSCODE画UML图和各种种 1.下载plantuml.jarReleases plantuml/plantuml GitHubGenerate diagrams from textual description. Contribute to plantuml/plantuml development by creating an account on GitHu…...

C++ 模板进阶

非类型模板参数 模板参数分为&#xff1a;类型模板参数与非类型模板参数 类型模板参数即&#xff1a;出现在模板参数列表中&#xff0c;跟在class或者typename之后的参数类型名称非类型模板参数即&#xff1a;用一个常量作为类(函数)模板的一个参数&#xff0c;在类(函数)模…...

jenkins 安装与使用、用户权限划分

jenkins 安装与使用 安装插件&#xff1a; 开启该插件功能 验证用户管理 创建web01~02 使用web01登录 用户权限划分 安装 Role-Based Strategy 插件后&#xff0c;系统管理 中多了如图下所示的一个功能&#xff0c;用户权限的划分就是靠他来做的 创建角色 重新访问 创建项目…...

Hadoop3教程(三十三):(生产调优篇)慢磁盘监控与小文件归档

文章目录 &#xff08;161&#xff09;慢磁盘监控&#xff08;162&#xff09;小文件归档小文件过多的问题如何对小文件进行归档 参考文献 &#xff08;161&#xff09;慢磁盘监控 慢磁盘&#xff0c;是指写入数据时特别慢的一类磁盘。这种磁盘并不少见&#xff0c;当机器运行…...

物联网知识复习

物联网的内涵和体系结构 物联网的基本内涵 物联网的基本内涵在于物联&#xff0c;物物相连或者物和人相连的互联网。 也就是说&#xff0c;它是要由物主动发起的&#xff0c;物物互联的互联网。 它的第一层意思是说物和物相连&#xff1b;第二层意思是说物和人相连。 物联网的…...

Golang爬虫入门指南

引言 网络爬虫是一种自动化程序&#xff0c;用于从互联网上收集信息。随着互联网的迅速发展&#xff0c;爬虫技术在各行各业中越来越受欢迎。Golang作为一种高效、并发性好的编程语言&#xff0c;也逐渐成为爬虫开发的首选语言。本文将介绍使用Golang编写爬虫的基础知识和技巧…...

vscode里如何用git

打开vs终端执行如下&#xff1a; 1 初始化 Git 仓库&#xff08;如果尚未初始化&#xff09; git init 2 添加文件到 Git 仓库 git add . 3 使用 git commit 命令来提交你的更改。确保在提交时加上一个有用的消息。 git commit -m "备注信息" 4 …...

使用VSCode开发Django指南

使用VSCode开发Django指南 一、概述 Django 是一个高级 Python 框架&#xff0c;专为快速、安全和可扩展的 Web 开发而设计。Django 包含对 URL 路由、页面模板和数据处理的丰富支持。 本文将创建一个简单的 Django 应用&#xff0c;其中包含三个使用通用基本模板的页面。在此…...

线程与协程

1. 线程与协程 1.1. “函数调用级别”的切换、上下文切换 1. 函数调用级别的切换 “函数调用级别的切换”是指&#xff1a;像函数调用/返回一样轻量地完成任务切换。 举例说明&#xff1a; 当你在程序中写一个函数调用&#xff1a; funcA() 然后 funcA 执行完后返回&…...

376. Wiggle Subsequence

376. Wiggle Subsequence 代码 class Solution { public:int wiggleMaxLength(vector<int>& nums) {int n nums.size();int res 1;int prediff 0;int curdiff 0;for(int i 0;i < n-1;i){curdiff nums[i1] - nums[i];if( (prediff > 0 && curdif…...

转转集团旗下首家二手多品类循环仓店“超级转转”开业

6月9日&#xff0c;国内领先的循环经济企业转转集团旗下首家二手多品类循环仓店“超级转转”正式开业。 转转集团创始人兼CEO黄炜、转转循环时尚发起人朱珠、转转集团COO兼红布林CEO胡伟琨、王府井集团副总裁祝捷等出席了开业剪彩仪式。 据「TMT星球」了解&#xff0c;“超级…...

基础测试工具使用经验

背景 vtune&#xff0c;perf, nsight system等基础测试工具&#xff0c;都是用过的&#xff0c;但是没有记录&#xff0c;都逐渐忘了。所以写这篇博客总结记录一下&#xff0c;只要以后发现新的用法&#xff0c;就记得来编辑补充一下 perf 比较基础的用法&#xff1a; 先改这…...

微信小程序云开发平台MySQL的连接方式

注&#xff1a;微信小程序云开发平台指的是腾讯云开发 先给结论&#xff1a;微信小程序云开发平台的MySQL&#xff0c;无法通过获取数据库连接信息的方式进行连接&#xff0c;连接只能通过云开发的SDK连接&#xff0c;具体要参考官方文档&#xff1a; 为什么&#xff1f; 因为…...

华硕a豆14 Air香氛版,美学与科技的馨香融合

在快节奏的现代生活中&#xff0c;我们渴望一个能激发创想、愉悦感官的工作与生活伙伴&#xff0c;它不仅是冰冷的科技工具&#xff0c;更能触动我们内心深处的细腻情感。正是在这样的期许下&#xff0c;华硕a豆14 Air香氛版翩然而至&#xff0c;它以一种前所未有的方式&#x…...

NXP S32K146 T-Box 携手 SD NAND(贴片式TF卡):驱动汽车智能革新的黄金组合

在汽车智能化的汹涌浪潮中&#xff0c;车辆不再仅仅是传统的交通工具&#xff0c;而是逐步演变为高度智能的移动终端。这一转变的核心支撑&#xff0c;来自于车内关键技术的深度融合与协同创新。车载远程信息处理盒&#xff08;T-Box&#xff09;方案&#xff1a;NXP S32K146 与…...

纯 Java 项目(非 SpringBoot)集成 Mybatis-Plus 和 Mybatis-Plus-Join

纯 Java 项目&#xff08;非 SpringBoot&#xff09;集成 Mybatis-Plus 和 Mybatis-Plus-Join 1、依赖1.1、依赖版本1.2、pom.xml 2、代码2.1、SqlSession 构造器2.2、MybatisPlus代码生成器2.3、获取 config.yml 配置2.3.1、config.yml2.3.2、项目配置类 2.4、ftl 模板2.4.1、…...