当前位置: 首页 > news >正文

第 31 章 - 源码篇 - Elasticsearch 写入流程深入分析

写入源码分析

接收与处理

请求首先会被 Netty4HttpServerTransport 接收,接着交由 RestController 进行路由分发。

private void tryAllHandlers(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) throws Exception {// 从 tier 树中,找到该请求路径对应的 RestHandlerIterator<MethodHandlers> allHandlers = getAllHandlers(request.params(), rawPath);while (allHandlers.hasNext()) {final RestHandler handler;final MethodHandlers handlers = allHandlers.next();if (handlers == null) {handler = null;} else {handler = handlers.getHandler(requestMethod, restApiVersion);}if (handler == null) {if (handleNoHandlerFound(rawPath, requestMethod, uri, channel)) {return;}} else {// 找到后,将本次请求转发给该 RestHandlerdispatchRequest(request, channel, handler, threadContext);return;}}
}

那么 ES 如何知道对应的路由应该由谁处理呢?
Node 初始化时,会执行 ActionModule#initRestHandlers(...)

public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {......// 注册路由registerHandler.accept(new RestIndexAction());......
}

RestIndexAction 注册的路由如下所示

public List<Route> routes() {return List.of(new Route(POST, "/{index}/_doc/{id}"),new Route(PUT, "/{index}/_doc/{id}"),Route.builder(POST, "/{index}/{type}/{id}").deprecated(TYPES_DEPRECATION_MESSAGE, RestApiVersion.V_7).build(),Route.builder(PUT, "/{index}/{type}/{id}").deprecated(TYPES_DEPRECATION_MESSAGE, RestApiVersion.V_7).build());
}

每个 RestHandlerprepareRequest(final RestRequest request, final NodeClient client) 都会声明与之绑定的 TransportAction,之后所有逻辑会交由 TransportAction 处理。
其绑定的 TransportActionTransportIndexAction
RestIndexAction#prepareRequest(...)

public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {......return channel -> client.index(indexRequest,new RestStatusToXContentListener<>(channel, r -> r.getLocation(indexRequest.routing())));
}

AbstractClient#index(final IndexRequest request, final ActionListener<IndexResponse> listener)

@Override
public void index(final IndexRequest request, final ActionListener<IndexResponse> listener) {execute(IndexAction.INSTANCE, request, listener);
}

对于写入类型的 TransportAction 在内部又会通过协调节点(接收客户端请求的就是协调节点)先将请求转发给对应主分片所在的节点,主分片节点写入后,主分片节点又会转发给副本分片,副本分片写入后,返回给主分片,主分片再返回给协调节点,最后协调节点返回给客户端。

整体流程如下图所示:

协调节点分发请求

上文 search 读流程有提到,TansportAction 定义了基本流程,每个子类实现 doExecute(...) 方法,自定义执行逻辑,因此我们只需要看 TransportIndexAction#doExecute(...) 即可。

不存在索引则创建

当索引不存在时,则会先创建索引,接着再执行写入操作。如果索引存在,则直接执行写入操作。

TransportBulkAction#doInternalExecute(...)

protected void doInternalExecute(Task task, BulkRequest bulkRequest, String executorName, ActionListener<BulkResponse> listener) {for (String index : autoCreateIndices) {// 创建索引createIndex(index, bulkRequest.timeout(), minNodeVersion, new ActionListener<>() {@Overridepublic void onResponse(CreateIndexResponse result) {// 创建索引成功回调函数,if (counter.decrementAndGet() == 0) {// 执行写入操作threadPool.executor(executorName).execute(new ActionRunnable<>(listener) {@Overrideprotected void doRun() {executeBulk(task,bulkRequest,startTime,listener,executorName,responses,indicesThatCannotBeCreated);}});}}}}
}

executeBulk(..) 方法内部会创建 BulkOperation 交由该类做处理

void executeBulk(Task task,BulkRequest bulkRequest,long startTimeNanos,ActionListener<BulkResponse> listener,String executorName,AtomicArray<BulkItemResponse> responses,Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {new BulkOperation(task, bulkRequest, listener, executorName, responses, startTimeNanos, indicesThatCannotBeCreated).run();}

BulkOperation 继承自 AbstractRunnableAbstractRunnable 定义了执行的基本流程,子类需要实现 doRun() 方法,因此,只需要关注 BulkOperation#doRun() 方法。

路由计算

BulkOperation#doRun()

protected void doRun() {// 获取路由计算规则IndexRouting indexRouting = concreteIndices.routing(concreteIndex);
}

IndexRouting#fromIndexMetadata(...)

public static IndexRouting fromIndexMetadata(IndexMetadata indexMetadata) {// 索引配置上是否设置 routing_pathif (false == indexMetadata.getRoutingPaths().isEmpty()) {if (indexMetadata.isRoutingPartitionedIndex()) {throw new IllegalArgumentException("routing_partition_size is incompatible with routing_path");}return new ExtractFromSource(indexMetadata.getRoutingNumShards(),indexMetadata.getRoutingFactor(),indexMetadata.getIndex().getName(),indexMetadata.getRoutingPaths());}// 索引配置上是否设置了分区索引相关参数if (indexMetadata.isRoutingPartitionedIndex()) {return new Partitioned(indexMetadata.getRoutingNumShards(),indexMetadata.getRoutingFactor(),indexMetadata.getRoutingPartitionSize());}// 正常写入return new Unpartitioned(indexMetadata.getRoutingNumShards(), indexMetadata.getRoutingFactor());
}

上诉 3 个路由算法,底层算法都是类似的,都是基于一致性 hash 计算对应的路由。
hash 计算函数 Murmur3HashFunction#hash(String routing)

我们可以简单将路由算法理解为如下:

  1. 先计算 hash
  2. 再根据 hash 计算路由

计算 hash 可以又分为以下几种情况:

  1. 如果索引配置了 routing_path,则 hash = Murmur3HashFunction#hash(routing_path_value)
  2. 如果路径上有路由参数,则 hash = Murmur3HashFunction#hash(routing)
  3. 否则 hash = Murmur3HashFunction#hash(_id)

根据 hash 计算路由的规则如下:
IndexRouting#hashToShardId(...)

protected final int hashToShardId(int hash) {return Math.floorMod(hash, routingNumShards) / routingFactor;
}
  • routingNumShards
    值默认依赖主分片数(number_of_shards),如果创建索引时未指定,默认按因子2拆分,并且最多可拆分为1024个分片。例如原索引主分片数为1,则可拆分为1~1024中的任意数;原索引主分片为5,则支持拆分的分片数为:10、20、40、80、160、320以及最大数640(不能超过1024)。 可通过索引的 index.number_of_routing_shards 配置,但不建议配置。
  • routingFactor
    默认为 routingNumShards / number_of_shards

简单说,你可以将 number_of_routing_shards 理解为虚拟的分片数、 number_of_shards 则为物理的分片数。其本质就是 一致性 hash。

分发请求至主分片

TransportReplicationAction#doExecute(...)

@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {assert request.shardId() != null : "request shardId must be set";runReroutePhase(task, request, listener, true);
}

ReroutePhase#doRun()

protected void doRun() {if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {       // 主分片节点在协调节点上performLocalAction(state, primary, node, indexMetadata);} else {// 主分片节点不在协调节点上performRemoteAction(state, primary, node);}
}
主分片写入
接收请求

TransportReplicationAction 构造函数,注册了主分片写入的处理函数

protected TransportReplicationAction(......
) {transportService.registerRequestHandler(transportPrimaryAction,executor,forceExecutionOnPrimary,true,in -> new ConcreteShardRequest<>(requestReader, in),this::handlePrimaryRequest);
}
主分片写入

TransportShardBulkAction#dispatchedShardOperationOnPrimary(...)

@Override
protected void dispatchedShardOperationOnPrimary(BulkShardRequest request,IndexShard primary,ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener
) {......// 在主分片上执行performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, (update, shardId, mappingListener) -> {...}), listener, threadPool, executor(primary));
}
异步转发请求至副本分片

转发请求至副本分片,是在主分片写入数据后,才执行的

ReplicationOperation#execute(...)

public void execute() throws Exception {......// 执行主分片写入primary.perform(request, ActionListener.wrap(this::handlePrimaryResult, this::finishAsFailed));
}

handlePrimaryResult() 方法是写入主分片后的回调函数
ReplicationOperation#handlePrimaryResult(..)

private void handlePrimaryResult(final PrimaryResultT primaryResult) {...// 异步发送同步副本分片请求performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup, pendingReplicationActions);...
}
副本分片写入
接收请求

类似的,TransportReplicationAction 构造函数,注册了副本分片写入的处理函数

transportService.registerRequestHandler(transportReplicaAction,executor,true,true,in -> new ConcreteReplicaRequest<>(replicaRequestReader, in),this::handleReplicaRequest
);
数据写入

而后请求交给 AsyncReplicaAction#doRun() 处理

@Override
protected void doRun() throws Exception {......// 获取写入许可后,会回调至 AsyncReplicaAction#onResponse() acquireReplicaOperationPermit(replica,replicaRequest.getRequest(),this,replicaRequest.getPrimaryTerm(),replicaRequest.getGlobalCheckpoint(),replicaRequest.getMaxSeqNoOfUpdatesOrDeletes());
}

AsyncReplicaAction#onResponse()

@Override
public void onResponse(Releasable releasable) {...// 执行写入shardOperationOnReplica(...);...  
}

调用该函数后,最后代码会走到 TransportShardBulkAction#dispatchedShardOperationOnReplica(...)

@Override
protected void dispatchedShardOperationOnReplica(BulkShardRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {ActionListener.completeWith(listener, () -> {final long startBulkTime = System.nanoTime();// 执行写入final Translog.Location location = performOnReplica(request, replica);replica.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime);return new WriteReplicaResult<>(request, location, null, replica, logger);});
}

相关文章:

第 31 章 - 源码篇 - Elasticsearch 写入流程深入分析

写入源码分析 接收与处理 请求首先会被 Netty4HttpServerTransport 接收&#xff0c;接着交由 RestController 进行路由分发。 private void tryAllHandlers(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) throws Exception {…...

node.js下载、安装、设置国内镜像源(永久)(Windows11)

目录 node-v20.18.0-x64 工具下载安装设置国内镜像源&#xff08;永久&#xff09; node-v20.18.0-x64 工具 系统&#xff1a;Windows 11 下载 官网https://nodejs.org/zh-cn/download/package-manager 版本我是跟着老师选的node-v20.18.0-x64如图选择 Windows、x64、v2…...

小于n的最大数 - 贪心算法 - C++

字节经典面试题 给定一个整数n&#xff0c;并从1~9中给定若干个可以使用的数字&#xff0c;根据上述两个条件&#xff0c;得到每一位都为给定可使用数字的、最大的小于整数n的数&#xff0c;例如&#xff0c;给定可以使用的数字为 {2,3,8} 三个数&#xff1a;给定 n3589&#x…...

【顶刊TPAMI 2025】多头编码(MHE)之极限分类 Part 3:算法实现

目录 1 三种多头编码&#xff08;MHE&#xff09;实现1.1 多头乘积&#xff08;MHP&#xff09;1.2 多头级联&#xff08;MHC&#xff09;1.3 多头采样&#xff08;MHS&#xff09;1.4 标签分解策略 论文&#xff1a;Multi-Head Encoding for Extreme Label Classification 作者…...

解决CentOS 8 YUM源更新后报错问题:无法下载AppStream仓库元数据

背景介绍 在尝试更新CentOS 8的YUM源以使用阿里云镜像时&#xff0c;遇到了Failed to download metadata for repo appstream的错误。此错误通常出现在执行yum clean all && yum makecache命令之后&#xff0c;表明系统无法从指定的URL获取AppStream仓库的元数据。本文…...

[python3]Excel解析库-openpyxl

https://openpyxl.readthedocs.io/en/stable/ openpyxl 是一个用于读写 Excel 2010 xlsx/xlsm/xltx/xltm 文件的 Python 库。它允许开发者创建、修改和保存电子表格&#xff0c;而无需依赖 Microsoft Excel 软件本身。openpyxl 支持读取和写入 Excel 的工作簿&#xff08;Work…...

Docker 远程访问完整配置教程以及核心参数理解

Docker 远程访问完整配置教程 以下是配置 Docker 支持远程访问的完整教程&#xff0c;包括参数说明、配置修改、云服务器安全组设置、主机防火墙配置&#xff0c;以及验证远程访问的详细步骤。 1. 理解 -H fd:// 参数的作用&#xff08;理解了以后容易理解后面的操作&#xff…...

王老吉药业SRM系统上线 携手隆道共启战略合作新篇章

12月27日&#xff0c;广州王老吉药业股份有限公司&#xff08;简称“王老吉药业”&#xff09;SRM项目上线启动会&#xff0c;在王老吉科普教育基地——“吉园”隆重举行。广药集团纪委主任陈耕、王老吉药业总工程师黄晓丹、隆道公司总裁吴树贵、项目经理赵耀、供应商代表郭伟及…...

MyBatis 配置文件全解析

一、MyBatis 配置文件为何至关重要&#xff1f; 在 Java 后端开发领域&#xff0c;MyBatis 作为一款广受欢迎的持久层框架&#xff0c;极大地简化了数据库操作。而 MyBatis 配置文件&#xff0c;恰似整个框架的 “神经中枢”&#xff0c;掌控着其运行的方方面面&#xff0c;对…...

unity学习6:unity的3D项目的基本界面和菜单

目录 1 unity界面的基本认识 1.1 file 文件 1.2 edit 编辑/操作 1.3 Assets 1.4 gameobject 游戏对象 1.5 组件 1.6 windows 2 这些部分之间的关系 2.1 关联1&#xff1a; Assets & Project 2.2 关联2&#xff1a;gameobject & component 2.3 关联3&#xf…...

企业二要素如何用C#实现

一、什么是企业二要素&#xff1f; 企业二要素&#xff0c;通过输入统一社会信用代码、企业名称或统一社会信用代码、法人名称&#xff0c;验证两者是否匹配一致。 二、企业二要素适用哪些场景&#xff1f; 例如&#xff1a;信用与金融领域 1.信用评级&#xff1a;信用评级…...

中科院空天院无人机视觉语言导航新基准!AeroVerse:模拟、预训练、微调和评估空中无人机具身世界模型的测试基准

作者&#xff1a; Fanglong Yao, Yuanchang Yue, Youzhi Liu, Xian Sun, Kun Fu 单位&#xff1a;中国科学院空天信息创新研究院网络信息系统技术重点实验室&#xff0c;中国科学院大学电子电气与通信工程学院 原文链接&#xff1a; AeroVerse: UAV-Agent Benchmark Suite fo…...

Python安装(新手详细版)

前言 第一次接触Python&#xff0c;可能是爬虫或者是信息AI开发的小朋友&#xff0c;都说Python 语言简单&#xff0c;那么多学一些总是有好处的&#xff0c;下面从一个完全不懂的Python 的小白来安装Python 等一系列工作的记录&#xff0c;并且遇到的问题也会写出&#xff0c…...

Oracle DG备库数据文件损坏修复方法(ORA-01578/ORA-01110)

今天负责报表的同事反馈在DG库查询时出现如下报错 ORA-01578:ORACLE数据块损坏(文件号6,块号 2494856)ORA-01110:数据文件6: /oradata/PMSDG/o1 mf users_molczgmn_.dbfORA-26040:数据块是使用 NOLOGGING 选项加载的 可以看到报错是数据文件损坏&#xff0c;提示了file id和b…...

安装Linux

在Linux系统上安装MySQL数据库&#xff0c;可以根据服务器是否有网络连接选择不同的安装方式。以下分别介绍在线安装&#xff08;通过yum&#xff09;和离线安装&#xff08;手动下载.tar包&#xff09;的详细步骤&#xff1a; 一、在线安装&#xff08;通过yum&#xff09; 检…...

【文献精读笔记】Explainability for Large Language Models: A Survey (大语言模型的可解释性综述)(四)

****非斜体正文为原文献内容&#xff08;也包含笔者的补充&#xff09;&#xff0c;灰色块中是对文章细节的进一步详细解释&#xff01; 四、提示范式&#xff08;Explanation for Prompting Paradigm&#xff09; 随着语言模型规模的扩大&#xff0c;基于提示&#xff08;prom…...

【OpenCV】使用Python和OpenCV实现火焰检测

1、 项目源码和结构&#xff08;转&#xff09; https://github.com/mushfiq1998/fire-detection-python-opencv 2、 运行环境 # 安装playsound&#xff1a;用于播放报警声音 pip install playsound # 安装opencv-python&#xff1a;cv2用于图像和视频处理&#xff0c;特别是…...

SpringCloud(二)--SpringCloud服务注册与发现

一. 引言 ​ 前文简单介绍了SpringCloud的基本简介与特征&#xff0c;接下来介绍每个组成部分的功能以及经常使用的中间件。本文仅为学习所用&#xff0c;联系侵删。 二. SpringCloud概述 2.1 定义 ​ Spring Cloud是一系列框架的有序集合&#xff0c;它巧妙地利用了Spring…...

国内Ubuntu环境Docker部署CosyVoice

国内Ubuntu环境Docker部署CosyVoice 本文旨在记录在 国内 CosyVoice项目在 Ubuntu 环境下如何使用 dockermin-conda进行一键部署。 源项目地址&#xff1a; https://github.com/FunAudioLLM/CosyVoice 如果想要使用 dockerpython 进行部署&#xff0c;可以参考我另一篇博客中的…...

嵌入式linux系统中QT信号与槽实现

第一:Qt中信号与槽简介 信号与槽是Qt编程的基础。因为有了信号与槽的编程机制,在Qt中处理界面各个组件的交互操作时变得更加直观和简单。 槽函数与一般的函数不同的是:槽函数可以与一个信号关联,当信号被发射时,关联的槽函数被自动执行。 案例操作与实现: #ifndef …...

AI-调查研究-01-正念冥想有用吗?对健康的影响及科学指南

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; &#x1f680; AI篇持续更新中&#xff01;&#xff08;长期更新&#xff09; 目前2025年06月05日更新到&#xff1a; AI炼丹日志-28 - Aud…...

Python:操作 Excel 折叠

💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 Python 操作 Excel 系列 读取单元格数据按行写入设置行高和列宽自动调整行高和列宽水平…...

Qwen3-Embedding-0.6B深度解析:多语言语义检索的轻量级利器

第一章 引言&#xff1a;语义表示的新时代挑战与Qwen3的破局之路 1.1 文本嵌入的核心价值与技术演进 在人工智能领域&#xff0c;文本嵌入技术如同连接自然语言与机器理解的“神经突触”——它将人类语言转化为计算机可计算的语义向量&#xff0c;支撑着搜索引擎、推荐系统、…...

微服务商城-商品微服务

数据表 CREATE TABLE product (id bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 商品id,cateid smallint(6) UNSIGNED NOT NULL DEFAULT 0 COMMENT 类别Id,name varchar(100) NOT NULL DEFAULT COMMENT 商品名称,subtitle varchar(200) NOT NULL DEFAULT COMMENT 商…...

什么是EULA和DPA

文章目录 EULA&#xff08;End User License Agreement&#xff09;DPA&#xff08;Data Protection Agreement&#xff09;一、定义与背景二、核心内容三、法律效力与责任四、实际应用与意义 EULA&#xff08;End User License Agreement&#xff09; 定义&#xff1a; EULA即…...

鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个生活电费的缴纳和查询小程序

一、项目初始化与配置 1. 创建项目 ohpm init harmony/utility-payment-app 2. 配置权限 // module.json5 {"requestPermissions": [{"name": "ohos.permission.INTERNET"},{"name": "ohos.permission.GET_NETWORK_INFO"…...

让AI看见世界:MCP协议与服务器的工作原理

让AI看见世界&#xff1a;MCP协议与服务器的工作原理 MCP&#xff08;Model Context Protocol&#xff09;是一种创新的通信协议&#xff0c;旨在让大型语言模型能够安全、高效地与外部资源进行交互。在AI技术快速发展的今天&#xff0c;MCP正成为连接AI与现实世界的重要桥梁。…...

ArcGIS Pro制作水平横向图例+多级标注

今天介绍下载ArcGIS Pro中如何设置水平横向图例。 之前我们介绍了ArcGIS的横向图例制作&#xff1a;ArcGIS横向、多列图例、顺序重排、符号居中、批量更改图例符号等等&#xff08;ArcGIS出图图例8大技巧&#xff09;&#xff0c;那这次我们看看ArcGIS Pro如何更加快捷的操作。…...

Web 架构之 CDN 加速原理与落地实践

文章目录 一、思维导图二、正文内容&#xff08;一&#xff09;CDN 基础概念1. 定义2. 组成部分 &#xff08;二&#xff09;CDN 加速原理1. 请求路由2. 内容缓存3. 内容更新 &#xff08;三&#xff09;CDN 落地实践1. 选择 CDN 服务商2. 配置 CDN3. 集成到 Web 架构 &#xf…...

C++使用 new 来创建动态数组

问题&#xff1a; 不能使用变量定义数组大小 原因&#xff1a; 这是因为数组在内存中是连续存储的&#xff0c;编译器需要在编译阶段就确定数组的大小&#xff0c;以便正确地分配内存空间。如果允许使用变量来定义数组的大小&#xff0c;那么编译器就无法在编译时确定数组的大…...