Elasticsearch 8.9 Master节点处理请求源码
大家看可以看ElasticSearch源码:Rest请求与Master节点处理流程(1)
这个图非常好,下午的讲解代码在各个类和方法之间流转,都体现这个图上
- 一、Master节点处理请求的逻辑
- 1、节点(数据节点)要和主节点进行通讯,需要继承自基类MasterNodeRequest
- 2、Master节点处理来自客户端的请求(以创建索引请求举例)
- (1)首先会找到RestHandler中创建索引的Action(RestCreateIndexAction)
- (2)再执行继承自TransportMasterNodeAction的Action必须实现的masterOperation方法
- 二、RestHander的Action如何映射到TransportMasterNodeAction(还是以创建索引举例)
- 1、首先通过nodeClient执行doExecute()
- 2、创建一个task任务异步执行TransportAction
- 3、TransportMasterNodeAction中doExecute会通过线程池调用子类实现的masterOperation方法
- 4、TransportCreateIndexAction的masterOperation实现会调用createIndexService接口创建索引
一、Master节点处理请求的逻辑
不是所有的请求都需要
Master
节点处理,但是有些请求必须让Master
节点处理,比如创建index,下面的3就是用创建索引做的示例
1、节点(数据节点)要和主节点进行通讯,需要继承自基类MasterNodeRequest
主节点在
Elasticsearch
集群中负责集群的管理和协调工作。当节点需要执行某些操作时,它将创建相应的MasterNodeRequest
实现类的实例,填充请求的参数和数据,并将其发送给主节点。主节点根据不同的MasterNodeRequest
实现类的类型,执行相应的操作
/*** A based request for master based operation.* 在master上*/
public abstract class MasterNodeRequest<Request extends MasterNodeRequest<Request>> extends ActionRequest {public static final TimeValue DEFAULT_MASTER_NODE_TIMEOUT = TimeValue.timeValueSeconds(30);protected TimeValue masterNodeTimeout = DEFAULT_MASTER_NODE_TIMEOUT;protected MasterNodeRequest() {}protected MasterNodeRequest(StreamInput in) throws IOException {super(in);masterNodeTimeout = in.readTimeValue();}@Overridepublic void writeTo(StreamOutput out) throws IOException {super.writeTo(out);out.writeTimeValue(masterNodeTimeout);}/*** A timeout value in case the master has not been discovered yet or disconnected.*/@SuppressWarnings("unchecked")public final Request masterNodeTimeout(TimeValue timeout) {this.masterNodeTimeout = timeout;return (Request) this;}/*** A timeout value in case the master has not been discovered yet or disconnected.*/public final Request masterNodeTimeout(String timeout) {return masterNodeTimeout(TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".masterNodeTimeout"));}public final TimeValue masterNodeTimeout() {return this.masterNodeTimeout;}
}
这里有点模糊,后面学到数据节点向主节点请求或者同步什么时,我再挂个链接
2、Master节点处理来自客户端的请求(以创建索引请求举例)
(1)首先会找到RestHandler中创建索引的Action(RestCreateIndexAction)
至于请求如何找到
RestCreateIndexAction
的,可以参考Elasticsearch 8.9启动时构建接收Rest请求的hander过程源码
@ServerlessScope(Scope.PUBLIC)
public class RestCreateIndexAction extends BaseRestHandler {//省略代码 @Overridepublic List<Route> routes() {return List.of(new Route(PUT, "/{index}"));}@Overridepublic String getName() {return "create_index_action";}@Overridepublic RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {CreateIndexRequest createIndexRequest;if (request.getRestApiVersion() == RestApiVersion.V_7) {createIndexRequest = prepareRequestV7(request);} else {createIndexRequest = prepareRequest(request);}return channel -> client.admin().indices().create(createIndexRequest, new RestToXContentListener<>(channel));}//省略代码
}
(2)再执行继承自TransportMasterNodeAction的Action必须实现的masterOperation方法
TransportMasterNodeAction
主要用于处理来自节点的各种管理操作请求,如创建索引、删除索引、更新集群设置等。
当节点(数据节点)发送请求到主节点时,请求会被传递给相应的TransportMasterNodeAction
实现类进行处理。实现类会根据请求的类型,执行相应的操作逻辑,并返回执行结果给主节点。
/*** 需要在主节点上执行的操作的基类。* A base class for operations that needs to be performed on the master node.**/
public abstract class TransportMasterNodeAction<Request extends MasterNodeRequest<Request>, Response extends ActionResponse> extendsHandledTransportAction<Request, Response>implementsActionWithReservedState<Request> {//省略代码 }
/*** 创建索引操作*/
public class TransportCreateIndexAction extends TransportMasterNodeAction<CreateIndexRequest, CreateIndexResponse> {@Overrideprotected void masterOperation(Task task,final CreateIndexRequest request,final ClusterState state,final ActionListener<CreateIndexResponse> listener) {//省略代码createIndexService.createIndex(updateRequest,listener.map(response -> new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)));}
}
二、RestHander的Action如何映射到TransportMasterNodeAction(还是以创建索引举例)
这个场景为主节点和数据节点分离的情况
1、首先通过nodeClient执行doExecute()
client.admin().indices().create
中create
方法调用IndicesAdmin
类的create
方法,再调用execute
方法的入参是 CreateIndexAction.INSTANCE
static class IndicesAdmin implements IndicesAdminClient {@Overridepublic void create(final CreateIndexRequest request, final ActionListener<CreateIndexResponse> listener) {execute(CreateIndexAction.INSTANCE, request, listener);}@Overridepublic <Request extends ActionRequest, Response extends ActionResponse> ActionFuture<Response> execute(ActionType<Response> action,Request request) {return client.execute(action, request);}}
调用的是AbstractClient
的execute
方法
/*** This is the single execution point of *all* clients.* 这是所有客户端的单个执行点。*/@Overridepublic final <Request extends ActionRequest, Response extends ActionResponse> void execute(ActionType<Response> action,Request request,ActionListener<Response> listener) {try {doExecute(action, request, listener);} catch (Exception e) {assert false : new AssertionError(e);listener.onFailure(e);}}
doExecute
方法调用的是NodeClient
类的方法
@Overridepublic <Request extends ActionRequest, Response extends ActionResponse> void doExecute(ActionType<Response> action,Request request,ActionListener<Response> listener) {// Discard the task because the Client interface doesn't use it.try {executeLocally(action, request, listener);} catch (TaskCancelledException | IllegalArgumentException | IllegalStateException e) {listener.onFailure(e);}}/***在本地执行 {@link ActionType},返回用于跟踪它的 {@link Task},并链接 {@link ActionListener}。如果在侦听响应时不需要访问任务,则首选此方法。这是用于实现 {@link 客户端} 接口的方法。*/public <Request extends ActionRequest, Response extends ActionResponse> Task executeLocally(ActionType<Response> action,Request request,ActionListener<Response> listener) {//注册并执行任务return taskManager.registerAndExecute("transport",transportAction(action),request,localConnection,new SafelyWrappedActionListener<>(listener));}
之后调用TaskManager.java
的方法
2、创建一个task任务异步执行TransportAction
public <Request extends ActionRequest, Response extends ActionResponse> Task registerAndExecute(String type,TransportAction<Request, Response> action,Request request,Transport.Connection localConnection,ActionListener<Response> taskListener) { //检查请求是否有父任务,如果有,则注册子连接。final Releasable unregisterChildNode;if (request.getParentTask().isSet()) {unregisterChildNode = registerChildConnection(request.getParentTask().getId(), localConnection);} else {unregisterChildNode = null;}//创建一个新的跟踪上下文try (var ignored = threadPool.getThreadContext().newTraceContext()) {final Task task;//注册一个任务,并捕获可能的取消任务异常。try {task = register(type, action.actionName, request);} catch (TaskCancelledException e) {Releasables.close(unregisterChildNode);throw e;}//执行操作,并在操作完成时调用相应的监听器。action.execute(task, request, new ActionListener<>() {@Overridepublic void onResponse(Response response) {try {release();} finally {taskListener.onResponse(response);}}//根据操作的成功或失败情况,取消子任务并释放资源。@Overridepublic void onFailure(Exception e) {try {if (request.getParentTask().isSet()) {cancelChildLocal(request.getParentTask(), request.getRequestId(), e.toString());}release();} finally {taskListener.onFailure(e);}}@Overridepublic String toString() {return this.getClass().getName() + "{" + taskListener + "}{" + task + "}";}private void release() {Releasables.close(unregisterChildNode, () -> unregister(task));}});//返回任务对象。return task;}}
下面是TransportAction.java
类中的方法
/*** Use this method when the transport action should continue to run in the context of the current task* 当传输操作应继续在当前任务的上下文中运行时,请使用此方法*/public final void execute(Task task, Request request, ActionListener<Response> listener) {final ActionRequestValidationException validationException;//对请求进行验证,如果验证过程中出现异常,则记录错误日志并通知监听器执行失败。try {validationException = request.validate();} catch (Exception e) {assert false : new AssertionError("validating of request [" + request + "] threw exception", e);logger.warn("validating of request [" + request + "] threw exception", e);listener.onFailure(e);return;}if (validationException != null) {listener.onFailure(validationException);return;}//检查是否存在任务且请求需要存储结果,如果满足条件,则创建一个TaskResultStoringActionListener实例,用于在任务完成后将结果存储起来。if (task != null && request.getShouldStoreResult()) {listener = new TaskResultStoringActionListener<>(taskManager, task, listener);}//创建一个请求过滤器链(RequestFilterChain),然后调用proceed方法,将任务、动作名称、请求和监听器传递给过滤器链进行处理。RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger);requestFilterChain.proceed(task, actionName, request, listener);}
@Overridepublic void proceed(Task task, String actionName, Request request, ActionListener<Response> listener) {int i = index.getAndIncrement();try {if (i < this.action.filters.length) {this.action.filters[i].apply(task, actionName, request, listener, this);} else if (i == this.action.filters.length) {//`this.action.doExecute(task, request, listener);` 中`action`对应的是`TransportMasterNodeAction`。this.action.doExecute(task, request, listener);} else {listener.onFailure(new IllegalStateException("proceed was called too many times"));}} catch (Exception e) {logger.trace("Error during transport action execution.", e);listener.onFailure(e);}}
this.action.doExecute(task, request, listener);
中action
对应的是TransportMasterNodeAction
3、TransportMasterNodeAction中doExecute会通过线程池调用子类实现的masterOperation方法
TransportMasterNodeAction
继承HandledTransportAction
,
而HandledTransportAction
继承自TransportAction
public abstract class TransportMasterNodeAction<Request extends MasterNodeRequest<Request>, Response extends ActionResponse> extendsHandledTransportAction<Request, Response>implementsActionWithReservedState<Request> {@Overrideprotected void doExecute(Task task, final Request request, ActionListener<Response> listener) {//省略代码new AsyncSingleAction(task, request, listener).doStart(state);}
}
protected void doStart(ClusterState clusterState) {threadPool.executor(executor).execute(ActionRunnable.wrap(delegate, l -> executeMasterOperation(task, request, clusterState, l)));}
private void executeMasterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener)throws Exception {//调用子类实现masterOperation(task, request, state, listener);}
//子类实现
protected abstract void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener)throws Exception;
4、TransportCreateIndexAction的masterOperation实现会调用createIndexService接口创建索引
其中创建索引的action
是TransportCreateIndexAction
@Overrideprotected void masterOperation(Task task,final CreateIndexRequest request,final ClusterState state,final ActionListener<CreateIndexResponse> listener) {createIndexService.createIndex(updateRequest,listener.map(response -> new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)));
}
之后调用createIndexService.createIndex
创建索引
相关文章:

Elasticsearch 8.9 Master节点处理请求源码
大家看可以看ElasticSearch源码:Rest请求与Master节点处理流程(1) 这个图非常好,下午的讲解代码在各个类和方法之间流转,都体现这个图上 一、Master节点处理请求的逻辑1、节点(数据节点)要和主节点进行通讯࿰…...

Python---练习:while循环案例:猜数字
需求: 计算机从1 ~ 10之间随机生成一个数字,然后提示输入数字,如果我们输入的数字与随机数相等,则提示恭喜你,答对了。如果输入的数字比随机数大,则提示,猜大了。反之,则提示猜小了…...

CRM自动化意味着什么?企业如何从中受益?
客户关系管理(CRM)软件不再仅仅适用于大公司或销售周期长的行业,它越来越成为各种规模企业的重要工具。 在日常工作中,当你陷入流程的所有细节时,可能会产生不必要的工作。因此,如果你想要CRM提供的组织和…...
Python大数据之PySpark
PySpark入门 1、 Spark与PySpark 1、 Spark与PySpark...

网工记背命令(7)----静态路由(负载分担,主备备份)
1.静态路由负载分担 如图所示,属于不同网段的主机通过几台 Switch 相连,要求不配置动态路由协议,使不同网 段的任意两台主机之间能够互通,从拓扑图中可以看出,从 PCA 到 PCC 有两条路径可以过去,分别是 PC…...

error: unable to read askpass response from
报错信息 解决方法: 中文:文件-->设置-->版本控制-->Git-->勾选使用凭证帮助程序 英文:File -> Settings -> Version Control -> Git / Check "User credential Helper" 因为我的webstrom是中文版的&#…...
运行stable-diffusion-xl-refiner-1.0遇到version `GLIBCXX_3.4.29‘ not found的问题
一、问题背景 https://huggingface.co/stabilityai/stable-diffusion-xl-refiner-1.0 在运行示例程序时候遇到GLIBCXX_3.4.29‘ not found diffusers to > 0.18.0 import torch from diffusers import StableDiffusionXLImg2ImgPipeline from diffusers.utils import loa…...
Ubuntu - 安装 Elasticsearch(ES)
注意:以下步骤基于 Elasticsearch 7.x 版本。版本可能会随时间而变化,请查看 Elasticsearch 官方网站以获取最新的版本信息。 添加 Elasticsearch APT 仓库: 打开终端,并使用以下命令添加 Elasticsearch APT 仓库到系统…...

字节码进阶之java Instrumentation原理详解
文章目录 0. 前言1. 基础2. Java Instrumentation API使用示例 3. Java Agent4. 字节码操作库5. 实际应用6. 注意事项和最佳实践 0. 前言 Java Instrumentation是Java API的一部分,它允许开发人员在运行时修改类的字节码。使用此功能,可以实现许多高级操…...
Android 13.0 锁屏页面禁止下拉状态栏
1.概述 在13.0的系统产品定制化中,在默认的锁屏界面的时候原生系统是可以下拉状态栏的,但是定制的产品是需要禁用下拉状态栏的,所以需要在锁屏页面的时候禁用下拉状态栏,需要从两部分查看下拉状态栏流程然后禁用状态栏 接下来就来分析下看这个功能怎么实现 2.锁屏页面禁止…...

Windows10 Docker 安装教程
Docker Desktop是什么? Docker Desktop是适用于Windows的Docker桌面,是Docker设计用于在Windows 10上运行。它是一个本地 Windows 应用程序,为构建、交付和运行dockerized应用程序提供易于使用的开发环境。Docker Desktop for Windows 使用 …...

JWT认证
目录 前言 JWT组成部分 JWT工作原理 在Express中使用JWT 安装JWT相关的包 导入JWT相关的包 定义密钥 登录成功后调用jwt.sign()生成JWT字符串 将JWT字符串还原为JSON对象 捕获解析JWT失败后产生的错误 结尾 前言 Session 认证机制需要配合 Cookie 才能实现。由于 Co…...

【网络安全 --- xss-labs靶场通关(1-10关)】详细的xss-labs靶场通关思路及技巧讲解,让你对xss漏洞的理解更深刻
靶场安装: 靶场安装请参考以下博客,既详细有提供工具: 【网络安全 --- xss-labs靶场】xss-labs靶场安装详细教程,让你巩固对xss漏洞的理解及绕过技巧和方法(提供资源)-CSDN博客【网络安全 --- xss-labs通…...

Mathematics-Vocabulary·数学专业英语词汇
点击查看: Mathematics-Vocabulary数学专业英语词汇点击查看: Mathematics-Vocabulary-Offline数学专业英语词汇离线版本 Chinese-English translation英译汉The study of mathematics in English requires understanding the subject-specific vocabulary and terminology. Ma…...

画程序流程图
一。在线程序流程图。类图和时序图 Integrations | Mermaid 二。VSCODE画UML图和各种种 1.下载plantuml.jarReleases plantuml/plantuml GitHubGenerate diagrams from textual description. Contribute to plantuml/plantuml development by creating an account on GitHu…...

C++ 模板进阶
非类型模板参数 模板参数分为:类型模板参数与非类型模板参数 类型模板参数即:出现在模板参数列表中,跟在class或者typename之后的参数类型名称非类型模板参数即:用一个常量作为类(函数)模板的一个参数,在类(函数)模…...

jenkins 安装与使用、用户权限划分
jenkins 安装与使用 安装插件: 开启该插件功能 验证用户管理 创建web01~02 使用web01登录 用户权限划分 安装 Role-Based Strategy 插件后,系统管理 中多了如图下所示的一个功能,用户权限的划分就是靠他来做的 创建角色 重新访问 创建项目…...

Hadoop3教程(三十三):(生产调优篇)慢磁盘监控与小文件归档
文章目录 (161)慢磁盘监控(162)小文件归档小文件过多的问题如何对小文件进行归档 参考文献 (161)慢磁盘监控 慢磁盘,是指写入数据时特别慢的一类磁盘。这种磁盘并不少见,当机器运行…...

物联网知识复习
物联网的内涵和体系结构 物联网的基本内涵 物联网的基本内涵在于物联,物物相连或者物和人相连的互联网。 也就是说,它是要由物主动发起的,物物互联的互联网。 它的第一层意思是说物和物相连;第二层意思是说物和人相连。 物联网的…...
Golang爬虫入门指南
引言 网络爬虫是一种自动化程序,用于从互联网上收集信息。随着互联网的迅速发展,爬虫技术在各行各业中越来越受欢迎。Golang作为一种高效、并发性好的编程语言,也逐渐成为爬虫开发的首选语言。本文将介绍使用Golang编写爬虫的基础知识和技巧…...
生成xcframework
打包 XCFramework 的方法 XCFramework 是苹果推出的一种多平台二进制分发格式,可以包含多个架构和平台的代码。打包 XCFramework 通常用于分发库或框架。 使用 Xcode 命令行工具打包 通过 xcodebuild 命令可以打包 XCFramework。确保项目已经配置好需要支持的平台…...

基于FPGA的PID算法学习———实现PID比例控制算法
基于FPGA的PID算法学习 前言一、PID算法分析二、PID仿真分析1. PID代码2.PI代码3.P代码4.顶层5.测试文件6.仿真波形 总结 前言 学习内容:参考网站: PID算法控制 PID即:Proportional(比例)、Integral(积分&…...

树莓派超全系列教程文档--(61)树莓派摄像头高级使用方法
树莓派摄像头高级使用方法 配置通过调谐文件来调整相机行为 使用多个摄像头安装 libcam 和 rpicam-apps依赖关系开发包 文章来源: http://raspberry.dns8844.cn/documentation 原文网址 配置 大多数用例自动工作,无需更改相机配置。但是,一…...
k8s从入门到放弃之Ingress七层负载
k8s从入门到放弃之Ingress七层负载 在Kubernetes(简称K8s)中,Ingress是一个API对象,它允许你定义如何从集群外部访问集群内部的服务。Ingress可以提供负载均衡、SSL终结和基于名称的虚拟主机等功能。通过Ingress,你可…...

以下是对华为 HarmonyOS NETX 5属性动画(ArkTS)文档的结构化整理,通过层级标题、表格和代码块提升可读性:
一、属性动画概述NETX 作用:实现组件通用属性的渐变过渡效果,提升用户体验。支持属性:width、height、backgroundColor、opacity、scale、rotate、translate等。注意事项: 布局类属性(如宽高)变化时&#…...

【CSS position 属性】static、relative、fixed、absolute 、sticky详细介绍,多层嵌套定位示例
文章目录 ★ position 的五种类型及基本用法 ★ 一、position 属性概述 二、position 的五种类型详解(初学者版) 1. static(默认值) 2. relative(相对定位) 3. absolute(绝对定位) 4. fixed(固定定位) 5. sticky(粘性定位) 三、定位元素的层级关系(z-i…...

新能源汽车智慧充电桩管理方案:新能源充电桩散热问题及消防安全监管方案
随着新能源汽车的快速普及,充电桩作为核心配套设施,其安全性与可靠性备受关注。然而,在高温、高负荷运行环境下,充电桩的散热问题与消防安全隐患日益凸显,成为制约行业发展的关键瓶颈。 如何通过智慧化管理手段优化散…...
【HTML-16】深入理解HTML中的块元素与行内元素
HTML元素根据其显示特性可以分为两大类:块元素(Block-level Elements)和行内元素(Inline Elements)。理解这两者的区别对于构建良好的网页布局至关重要。本文将全面解析这两种元素的特性、区别以及实际应用场景。 1. 块元素(Block-level Elements) 1.1 基本特性 …...

听写流程自动化实践,轻量级教育辅助
随着智能教育工具的发展,越来越多的传统学习方式正在被数字化、自动化所优化。听写作为语文、英语等学科中重要的基础训练形式,也迎来了更高效的解决方案。 这是一款轻量但功能强大的听写辅助工具。它是基于本地词库与可选在线语音引擎构建,…...

保姆级教程:在无网络无显卡的Windows电脑的vscode本地部署deepseek
文章目录 1 前言2 部署流程2.1 准备工作2.2 Ollama2.2.1 使用有网络的电脑下载Ollama2.2.2 安装Ollama(有网络的电脑)2.2.3 安装Ollama(无网络的电脑)2.2.4 安装验证2.2.5 修改大模型安装位置2.2.6 下载Deepseek模型 2.3 将deepse…...