Elasticsearch 8.9 Master节点处理请求源码
大家看可以看ElasticSearch源码:Rest请求与Master节点处理流程(1)

这个图非常好,下午的讲解代码在各个类和方法之间流转,都体现这个图上
- 一、Master节点处理请求的逻辑
- 1、节点(数据节点)要和主节点进行通讯,需要继承自基类MasterNodeRequest
- 2、Master节点处理来自客户端的请求(以创建索引请求举例)
- (1)首先会找到RestHandler中创建索引的Action(RestCreateIndexAction)
- (2)再执行继承自TransportMasterNodeAction的Action必须实现的masterOperation方法
- 二、RestHander的Action如何映射到TransportMasterNodeAction(还是以创建索引举例)
- 1、首先通过nodeClient执行doExecute()
- 2、创建一个task任务异步执行TransportAction
- 3、TransportMasterNodeAction中doExecute会通过线程池调用子类实现的masterOperation方法
- 4、TransportCreateIndexAction的masterOperation实现会调用createIndexService接口创建索引
一、Master节点处理请求的逻辑
不是所有的请求都需要
Master节点处理,但是有些请求必须让Master节点处理,比如创建index,下面的3就是用创建索引做的示例
1、节点(数据节点)要和主节点进行通讯,需要继承自基类MasterNodeRequest
主节点在
Elasticsearch集群中负责集群的管理和协调工作。当节点需要执行某些操作时,它将创建相应的MasterNodeRequest实现类的实例,填充请求的参数和数据,并将其发送给主节点。主节点根据不同的MasterNodeRequest实现类的类型,执行相应的操作
/*** A based request for master based operation.* 在master上*/
public abstract class MasterNodeRequest<Request extends MasterNodeRequest<Request>> extends ActionRequest {public static final TimeValue DEFAULT_MASTER_NODE_TIMEOUT = TimeValue.timeValueSeconds(30);protected TimeValue masterNodeTimeout = DEFAULT_MASTER_NODE_TIMEOUT;protected MasterNodeRequest() {}protected MasterNodeRequest(StreamInput in) throws IOException {super(in);masterNodeTimeout = in.readTimeValue();}@Overridepublic void writeTo(StreamOutput out) throws IOException {super.writeTo(out);out.writeTimeValue(masterNodeTimeout);}/*** A timeout value in case the master has not been discovered yet or disconnected.*/@SuppressWarnings("unchecked")public final Request masterNodeTimeout(TimeValue timeout) {this.masterNodeTimeout = timeout;return (Request) this;}/*** A timeout value in case the master has not been discovered yet or disconnected.*/public final Request masterNodeTimeout(String timeout) {return masterNodeTimeout(TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".masterNodeTimeout"));}public final TimeValue masterNodeTimeout() {return this.masterNodeTimeout;}
}
这里有点模糊,后面学到数据节点向主节点请求或者同步什么时,我再挂个链接
2、Master节点处理来自客户端的请求(以创建索引请求举例)
(1)首先会找到RestHandler中创建索引的Action(RestCreateIndexAction)
至于请求如何找到
RestCreateIndexAction的,可以参考Elasticsearch 8.9启动时构建接收Rest请求的hander过程源码
@ServerlessScope(Scope.PUBLIC)
public class RestCreateIndexAction extends BaseRestHandler {//省略代码 @Overridepublic List<Route> routes() {return List.of(new Route(PUT, "/{index}"));}@Overridepublic String getName() {return "create_index_action";}@Overridepublic RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {CreateIndexRequest createIndexRequest;if (request.getRestApiVersion() == RestApiVersion.V_7) {createIndexRequest = prepareRequestV7(request);} else {createIndexRequest = prepareRequest(request);}return channel -> client.admin().indices().create(createIndexRequest, new RestToXContentListener<>(channel));}//省略代码
}
(2)再执行继承自TransportMasterNodeAction的Action必须实现的masterOperation方法
TransportMasterNodeAction主要用于处理来自节点的各种管理操作请求,如创建索引、删除索引、更新集群设置等。
当节点(数据节点)发送请求到主节点时,请求会被传递给相应的TransportMasterNodeAction实现类进行处理。实现类会根据请求的类型,执行相应的操作逻辑,并返回执行结果给主节点。
/*** 需要在主节点上执行的操作的基类。* A base class for operations that needs to be performed on the master node.**/
public abstract class TransportMasterNodeAction<Request extends MasterNodeRequest<Request>, Response extends ActionResponse> extendsHandledTransportAction<Request, Response>implementsActionWithReservedState<Request> {//省略代码 }
/*** 创建索引操作*/
public class TransportCreateIndexAction extends TransportMasterNodeAction<CreateIndexRequest, CreateIndexResponse> {@Overrideprotected void masterOperation(Task task,final CreateIndexRequest request,final ClusterState state,final ActionListener<CreateIndexResponse> listener) {//省略代码createIndexService.createIndex(updateRequest,listener.map(response -> new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)));}
}
二、RestHander的Action如何映射到TransportMasterNodeAction(还是以创建索引举例)
这个场景为主节点和数据节点分离的情况
1、首先通过nodeClient执行doExecute()
client.admin().indices().create中create方法调用IndicesAdmin类的create方法,再调用execute方法的入参是 CreateIndexAction.INSTANCE
static class IndicesAdmin implements IndicesAdminClient {@Overridepublic void create(final CreateIndexRequest request, final ActionListener<CreateIndexResponse> listener) {execute(CreateIndexAction.INSTANCE, request, listener);}@Overridepublic <Request extends ActionRequest, Response extends ActionResponse> ActionFuture<Response> execute(ActionType<Response> action,Request request) {return client.execute(action, request);}}
调用的是AbstractClient的execute方法
/*** This is the single execution point of *all* clients.* 这是所有客户端的单个执行点。*/@Overridepublic final <Request extends ActionRequest, Response extends ActionResponse> void execute(ActionType<Response> action,Request request,ActionListener<Response> listener) {try {doExecute(action, request, listener);} catch (Exception e) {assert false : new AssertionError(e);listener.onFailure(e);}}
doExecute方法调用的是NodeClient类的方法
@Overridepublic <Request extends ActionRequest, Response extends ActionResponse> void doExecute(ActionType<Response> action,Request request,ActionListener<Response> listener) {// Discard the task because the Client interface doesn't use it.try {executeLocally(action, request, listener);} catch (TaskCancelledException | IllegalArgumentException | IllegalStateException e) {listener.onFailure(e);}}/***在本地执行 {@link ActionType},返回用于跟踪它的 {@link Task},并链接 {@link ActionListener}。如果在侦听响应时不需要访问任务,则首选此方法。这是用于实现 {@link 客户端} 接口的方法。*/public <Request extends ActionRequest, Response extends ActionResponse> Task executeLocally(ActionType<Response> action,Request request,ActionListener<Response> listener) {//注册并执行任务return taskManager.registerAndExecute("transport",transportAction(action),request,localConnection,new SafelyWrappedActionListener<>(listener));}
之后调用TaskManager.java的方法
2、创建一个task任务异步执行TransportAction
public <Request extends ActionRequest, Response extends ActionResponse> Task registerAndExecute(String type,TransportAction<Request, Response> action,Request request,Transport.Connection localConnection,ActionListener<Response> taskListener) { //检查请求是否有父任务,如果有,则注册子连接。final Releasable unregisterChildNode;if (request.getParentTask().isSet()) {unregisterChildNode = registerChildConnection(request.getParentTask().getId(), localConnection);} else {unregisterChildNode = null;}//创建一个新的跟踪上下文try (var ignored = threadPool.getThreadContext().newTraceContext()) {final Task task;//注册一个任务,并捕获可能的取消任务异常。try {task = register(type, action.actionName, request);} catch (TaskCancelledException e) {Releasables.close(unregisterChildNode);throw e;}//执行操作,并在操作完成时调用相应的监听器。action.execute(task, request, new ActionListener<>() {@Overridepublic void onResponse(Response response) {try {release();} finally {taskListener.onResponse(response);}}//根据操作的成功或失败情况,取消子任务并释放资源。@Overridepublic void onFailure(Exception e) {try {if (request.getParentTask().isSet()) {cancelChildLocal(request.getParentTask(), request.getRequestId(), e.toString());}release();} finally {taskListener.onFailure(e);}}@Overridepublic String toString() {return this.getClass().getName() + "{" + taskListener + "}{" + task + "}";}private void release() {Releasables.close(unregisterChildNode, () -> unregister(task));}});//返回任务对象。return task;}}
下面是TransportAction.java类中的方法
/*** Use this method when the transport action should continue to run in the context of the current task* 当传输操作应继续在当前任务的上下文中运行时,请使用此方法*/public final void execute(Task task, Request request, ActionListener<Response> listener) {final ActionRequestValidationException validationException;//对请求进行验证,如果验证过程中出现异常,则记录错误日志并通知监听器执行失败。try {validationException = request.validate();} catch (Exception e) {assert false : new AssertionError("validating of request [" + request + "] threw exception", e);logger.warn("validating of request [" + request + "] threw exception", e);listener.onFailure(e);return;}if (validationException != null) {listener.onFailure(validationException);return;}//检查是否存在任务且请求需要存储结果,如果满足条件,则创建一个TaskResultStoringActionListener实例,用于在任务完成后将结果存储起来。if (task != null && request.getShouldStoreResult()) {listener = new TaskResultStoringActionListener<>(taskManager, task, listener);}//创建一个请求过滤器链(RequestFilterChain),然后调用proceed方法,将任务、动作名称、请求和监听器传递给过滤器链进行处理。RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger);requestFilterChain.proceed(task, actionName, request, listener);}
@Overridepublic void proceed(Task task, String actionName, Request request, ActionListener<Response> listener) {int i = index.getAndIncrement();try {if (i < this.action.filters.length) {this.action.filters[i].apply(task, actionName, request, listener, this);} else if (i == this.action.filters.length) {//`this.action.doExecute(task, request, listener);` 中`action`对应的是`TransportMasterNodeAction`。this.action.doExecute(task, request, listener);} else {listener.onFailure(new IllegalStateException("proceed was called too many times"));}} catch (Exception e) {logger.trace("Error during transport action execution.", e);listener.onFailure(e);}}
this.action.doExecute(task, request, listener); 中action对应的是TransportMasterNodeAction
3、TransportMasterNodeAction中doExecute会通过线程池调用子类实现的masterOperation方法
TransportMasterNodeAction继承HandledTransportAction,
而HandledTransportAction继承自TransportAction
public abstract class TransportMasterNodeAction<Request extends MasterNodeRequest<Request>, Response extends ActionResponse> extendsHandledTransportAction<Request, Response>implementsActionWithReservedState<Request> {@Overrideprotected void doExecute(Task task, final Request request, ActionListener<Response> listener) {//省略代码new AsyncSingleAction(task, request, listener).doStart(state);}
}
protected void doStart(ClusterState clusterState) {threadPool.executor(executor).execute(ActionRunnable.wrap(delegate, l -> executeMasterOperation(task, request, clusterState, l)));}
private void executeMasterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener)throws Exception {//调用子类实现masterOperation(task, request, state, listener);}
//子类实现
protected abstract void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener)throws Exception;
4、TransportCreateIndexAction的masterOperation实现会调用createIndexService接口创建索引
其中创建索引的action是TransportCreateIndexAction
@Overrideprotected void masterOperation(Task task,final CreateIndexRequest request,final ClusterState state,final ActionListener<CreateIndexResponse> listener) {createIndexService.createIndex(updateRequest,listener.map(response -> new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)));
}
之后调用createIndexService.createIndex创建索引
相关文章:
Elasticsearch 8.9 Master节点处理请求源码
大家看可以看ElasticSearch源码:Rest请求与Master节点处理流程(1) 这个图非常好,下午的讲解代码在各个类和方法之间流转,都体现这个图上 一、Master节点处理请求的逻辑1、节点(数据节点)要和主节点进行通讯࿰…...
Python---练习:while循环案例:猜数字
需求: 计算机从1 ~ 10之间随机生成一个数字,然后提示输入数字,如果我们输入的数字与随机数相等,则提示恭喜你,答对了。如果输入的数字比随机数大,则提示,猜大了。反之,则提示猜小了…...
CRM自动化意味着什么?企业如何从中受益?
客户关系管理(CRM)软件不再仅仅适用于大公司或销售周期长的行业,它越来越成为各种规模企业的重要工具。 在日常工作中,当你陷入流程的所有细节时,可能会产生不必要的工作。因此,如果你想要CRM提供的组织和…...
Python大数据之PySpark
PySpark入门 1、 Spark与PySpark 1、 Spark与PySpark...
网工记背命令(7)----静态路由(负载分担,主备备份)
1.静态路由负载分担 如图所示,属于不同网段的主机通过几台 Switch 相连,要求不配置动态路由协议,使不同网 段的任意两台主机之间能够互通,从拓扑图中可以看出,从 PCA 到 PCC 有两条路径可以过去,分别是 PC…...
error: unable to read askpass response from
报错信息 解决方法: 中文:文件-->设置-->版本控制-->Git-->勾选使用凭证帮助程序 英文:File -> Settings -> Version Control -> Git / Check "User credential Helper" 因为我的webstrom是中文版的&#…...
运行stable-diffusion-xl-refiner-1.0遇到version `GLIBCXX_3.4.29‘ not found的问题
一、问题背景 https://huggingface.co/stabilityai/stable-diffusion-xl-refiner-1.0 在运行示例程序时候遇到GLIBCXX_3.4.29‘ not found diffusers to > 0.18.0 import torch from diffusers import StableDiffusionXLImg2ImgPipeline from diffusers.utils import loa…...
Ubuntu - 安装 Elasticsearch(ES)
注意:以下步骤基于 Elasticsearch 7.x 版本。版本可能会随时间而变化,请查看 Elasticsearch 官方网站以获取最新的版本信息。 添加 Elasticsearch APT 仓库: 打开终端,并使用以下命令添加 Elasticsearch APT 仓库到系统…...
字节码进阶之java Instrumentation原理详解
文章目录 0. 前言1. 基础2. Java Instrumentation API使用示例 3. Java Agent4. 字节码操作库5. 实际应用6. 注意事项和最佳实践 0. 前言 Java Instrumentation是Java API的一部分,它允许开发人员在运行时修改类的字节码。使用此功能,可以实现许多高级操…...
Android 13.0 锁屏页面禁止下拉状态栏
1.概述 在13.0的系统产品定制化中,在默认的锁屏界面的时候原生系统是可以下拉状态栏的,但是定制的产品是需要禁用下拉状态栏的,所以需要在锁屏页面的时候禁用下拉状态栏,需要从两部分查看下拉状态栏流程然后禁用状态栏 接下来就来分析下看这个功能怎么实现 2.锁屏页面禁止…...
Windows10 Docker 安装教程
Docker Desktop是什么? Docker Desktop是适用于Windows的Docker桌面,是Docker设计用于在Windows 10上运行。它是一个本地 Windows 应用程序,为构建、交付和运行dockerized应用程序提供易于使用的开发环境。Docker Desktop for Windows 使用 …...
JWT认证
目录 前言 JWT组成部分 JWT工作原理 在Express中使用JWT 安装JWT相关的包 导入JWT相关的包 定义密钥 登录成功后调用jwt.sign()生成JWT字符串 将JWT字符串还原为JSON对象 捕获解析JWT失败后产生的错误 结尾 前言 Session 认证机制需要配合 Cookie 才能实现。由于 Co…...
【网络安全 --- xss-labs靶场通关(1-10关)】详细的xss-labs靶场通关思路及技巧讲解,让你对xss漏洞的理解更深刻
靶场安装: 靶场安装请参考以下博客,既详细有提供工具: 【网络安全 --- xss-labs靶场】xss-labs靶场安装详细教程,让你巩固对xss漏洞的理解及绕过技巧和方法(提供资源)-CSDN博客【网络安全 --- xss-labs通…...
Mathematics-Vocabulary·数学专业英语词汇
点击查看: Mathematics-Vocabulary数学专业英语词汇点击查看: Mathematics-Vocabulary-Offline数学专业英语词汇离线版本 Chinese-English translation英译汉The study of mathematics in English requires understanding the subject-specific vocabulary and terminology. Ma…...
画程序流程图
一。在线程序流程图。类图和时序图 Integrations | Mermaid 二。VSCODE画UML图和各种种 1.下载plantuml.jarReleases plantuml/plantuml GitHubGenerate diagrams from textual description. Contribute to plantuml/plantuml development by creating an account on GitHu…...
C++ 模板进阶
非类型模板参数 模板参数分为:类型模板参数与非类型模板参数 类型模板参数即:出现在模板参数列表中,跟在class或者typename之后的参数类型名称非类型模板参数即:用一个常量作为类(函数)模板的一个参数,在类(函数)模…...
jenkins 安装与使用、用户权限划分
jenkins 安装与使用 安装插件: 开启该插件功能 验证用户管理 创建web01~02 使用web01登录 用户权限划分 安装 Role-Based Strategy 插件后,系统管理 中多了如图下所示的一个功能,用户权限的划分就是靠他来做的 创建角色 重新访问 创建项目…...
Hadoop3教程(三十三):(生产调优篇)慢磁盘监控与小文件归档
文章目录 (161)慢磁盘监控(162)小文件归档小文件过多的问题如何对小文件进行归档 参考文献 (161)慢磁盘监控 慢磁盘,是指写入数据时特别慢的一类磁盘。这种磁盘并不少见,当机器运行…...
物联网知识复习
物联网的内涵和体系结构 物联网的基本内涵 物联网的基本内涵在于物联,物物相连或者物和人相连的互联网。 也就是说,它是要由物主动发起的,物物互联的互联网。 它的第一层意思是说物和物相连;第二层意思是说物和人相连。 物联网的…...
Golang爬虫入门指南
引言 网络爬虫是一种自动化程序,用于从互联网上收集信息。随着互联网的迅速发展,爬虫技术在各行各业中越来越受欢迎。Golang作为一种高效、并发性好的编程语言,也逐渐成为爬虫开发的首选语言。本文将介绍使用Golang编写爬虫的基础知识和技巧…...
LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明
LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造,完美适配AGV和无人叉车。同时,集成以太网与语音合成技术,为各类高级系统(如MES、调度系统、库位管理、立库等)提供高效便捷的语音交互体验。 L…...
(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)
题目:3442. 奇偶频次间的最大差值 I 思路 :哈希,时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况,哈希表这里用数组即可实现。 C版本: class Solution { public:int maxDifference(string s) {int a[26]…...
多云管理“拦路虎”:深入解析网络互联、身份同步与成本可视化的技术复杂度
一、引言:多云环境的技术复杂性本质 企业采用多云策略已从技术选型升维至生存刚需。当业务系统分散部署在多个云平台时,基础设施的技术债呈现指数级积累。网络连接、身份认证、成本管理这三大核心挑战相互嵌套:跨云网络构建数据…...
使用分级同态加密防御梯度泄漏
抽象 联邦学习 (FL) 支持跨分布式客户端进行协作模型训练,而无需共享原始数据,这使其成为在互联和自动驾驶汽车 (CAV) 等领域保护隐私的机器学习的一种很有前途的方法。然而,最近的研究表明&…...
Nginx server_name 配置说明
Nginx 是一个高性能的反向代理和负载均衡服务器,其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机(Virtual Host)。 1. 简介 Nginx 使用 server_name 指令来确定…...
Spring AI 入门:Java 开发者的生成式 AI 实践之路
一、Spring AI 简介 在人工智能技术快速迭代的今天,Spring AI 作为 Spring 生态系统的新生力量,正在成为 Java 开发者拥抱生成式 AI 的最佳选择。该框架通过模块化设计实现了与主流 AI 服务(如 OpenAI、Anthropic)的无缝对接&…...
Maven 概述、安装、配置、仓库、私服详解
目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...
稳定币的深度剖析与展望
一、引言 在当今数字化浪潮席卷全球的时代,加密货币作为一种新兴的金融现象,正以前所未有的速度改变着我们对传统货币和金融体系的认知。然而,加密货币市场的高度波动性却成为了其广泛应用和普及的一大障碍。在这样的背景下,稳定…...
CSS | transition 和 transform的用处和区别
省流总结: transform用于变换/变形,transition是动画控制器 transform 用来对元素进行变形,常见的操作如下,它是立即生效的样式变形属性。 旋转 rotate(角度deg)、平移 translateX(像素px)、缩放 scale(倍数)、倾斜 skewX(角度…...
第7篇:中间件全链路监控与 SQL 性能分析实践
7.1 章节导读 在构建数据库中间件的过程中,可观测性 和 性能分析 是保障系统稳定性与可维护性的核心能力。 特别是在复杂分布式场景中,必须做到: 🔍 追踪每一条 SQL 的生命周期(从入口到数据库执行)&#…...
