当前位置: 首页 > news >正文

Spark Shuffle Tracking 原理分析

Shuffle Tracking

Shuffle Tracking 是 Spark 在没有 ESS(External Shuffle Service)情况,并且开启 Dynamic Allocation 的重要功能。如在 K8S 上运行 spark 没有 ESS。本文档所有的前提都是基于以上条件的。

如果开启了 ESS,那么 Executor 计算完后,把 shuffle 数据交给 ESS, Executor 没有任务时,可以安全退出,下游任务从 ESS 拉取 shuffle 数据。

1. 背景

如果 Executor 执行了上游的 Shuffle Map Task 并且把 shuffle 数据些到本地。并且现在 Executor 没有 Task 运行,那么此 Executor 是否能销毁?

现状是如果 Executor 没有 active 的 shuffle 数据,则可以被销毁。
active shuffle 的定义:如果 Shuffle Map Stage 的 task 把 shuffle 数据输出到本地。如果依赖此 shuffle 的Stage 没有计算完毕,则称此 shuffle 为 active shuffle。因为依赖此 shuffle 的 Task 可能从 Driver 端获取了 MapStatus,但是还没有拉取完 shuffle 数据。

为了达到此目的,需要跟踪每个 Stage 和每个 Task 的运行信息。并且启动定时任务,定时扫描每个 Executor,判断是否有任务运行,是否有 active 的 shuffle,如果没有则可以退出。

退出有两种,如果开启了 decommission,则到期的 executors 进入 decommission 模式,否则执行 killExecutors。

参数配置

spark.dynamicAllocation.shuffleTracking.enabled: 默认 true,是否开启 shuffle tracking。
spark.dynamicAllocation.shuffleTracking.timeout: 默认 Long.MaxValue,

2. 设计

ExecutorMonitor 为每个 Executor 创建一个 Tracker, 用于跟踪此 Executor 的状态。

private val executors = new ConcurrentHashMap[String, Tracker]()

定时任务间隔时间查找 timeout 的 executor,然后处理。

timedOutExecutors 方法的主要逻辑,就是遍历 executors。如果 executor 没有 active 的 shuffle 并且当前时间大于 executor 的超时时间 timeoutAt,则此 executor 可以被安全释放。

为什么 executor 有 active shuffle 数据就不能 kill?
在这里插入图片描述

  • Shuffle 的过程:
  1. MapTask 把 shuffle 写到本地,并且把状态汇报给 Driver.
  2. Reduce Task 从 Driver 获取 shuffle status,并从 shuffle status 获取每个 shuffle 数据的地址。
  3. 连接对应的 executor 获取 shuffle 数据。

如果在 reduce 获取完 shuffle status 后,MapTask 所在的 Executor 被 kill 掉,Reduce Task 就无法获取 shuffle 数据。

如果执行 decommission 逻辑,把 MapTask 的 shuffle 数据长传到 bos 等分布式存储是否可以?

也是不可以的,因为 reduce 可能已经把 shuffle status 拿走,获取的 shuffle status 没有记录 shuffle 数据在分布式存储上。

参考: ExecutorMonitor,ExecutorAllocationManager

Executor 状态的更新

ExecutorMonitor 实现了 SparkListner 接口,当 Job, Stage, Task 等 start 和 end 时,都会执行回调。

以 hasActiveShuffle 为例
每个 executor 用一个集合 shuffleIds 存储其上拥有的 shuffle 数据。 当其为空时,说明没有 shuffle 数据。

在 onTaskEnd 和 onBlockUpdated 时调用 addShuffle 向 shuffleIds 添加数据。

在以下时机删除 shuffleIds 里的数据。

  1. 依赖 driver 端的 ContextCleaner,当 ShuffleRDD 仅有 weakReference 时触发。
  2. rdd.cleanShuffleDependencies 方法,但是此方法仅在 org.apache.spark.ml.recommendation.ALS 使用。

timeoutAt 的计算逻辑

总结:timeoutAt 根据 idle 的时间,spark.dynamicAllocation.cachedExecutorIdleTimeout 和 spark.dynamicAllocation.shuffleTracking.timeout 这 3 个值中最大的值。

详细计算逻辑:
timeoutAt 在一些事件发生时触发计算,如 onBlockUpdated, onUnpersistRDD, updateRunningTasks, removeShuffle, updateActiveShuffles
timeoutAt 的计算逻辑:
当执行器有计算任务时 为 Long.MaxValue。
否则为 max(_cacheTimeout, _shuffleTimeout, idleTimeoutNs)
_cacheTimeout: 如果没有 cache 数据,为0,否则为参数 spark.dynamicAllocation.cachedExecutorIdleTimeout 的值(默认 Long.MaxValue)。

_shuffleTimeout: 如果没有 shuffle数据,为 0, 否则为参数 spark.dynamicAllocation.shuffleTracking.timeout 的值(默认 Long.MaxValue)。
idleTimeoutNs 为 spark.dynamicAllocation.executorIdleTimeout

3. 测试

测试命令

spark-shell  \--conf spark.dynamicAllocation.enabled=true \--conf spark.dynamicAllocation.initialExecutors=2 \--conf spark.dynamicAllocation.maxExecutor=400 \--conf spark.dynamicAllocation.minExecutors=1 \--conf spark.shuffle.service.enabled=false \--conf spark.dynamicAllocation.shuffleTracking.enabled=true

参考资料:

https://www.waitingforcode.com/apache-spark/what-new-apache-spark-3-shuffle-service-changes/read

相关文章:

Spark Shuffle Tracking 原理分析

Shuffle Tracking Shuffle Tracking 是 Spark 在没有 ESS(External Shuffle Service)情况,并且开启 Dynamic Allocation 的重要功能。如在 K8S 上运行 spark 没有 ESS。本文档所有的前提都是基于以上条件的。 如果开启了 ESS,那么 Executor 计算完后&a…...

Docker 干货系列 (持续更新)

dive 直接用本地镜像名称来启动,不需要走 hub dive.sh IMAGE_NAME"${1}" TMP_FILE/tmp/dive-tmp-image.tar docker save "$IMAGE_NAME" > $TMP_FILE && dive $TMP_FILE --sourcedocker-archive && rm $TMP_FILE示例&#…...

一.jwt token 前后端的逻辑

摘要 jwt token 前后端的交互逻辑,此部分只描述了一些交互逻辑,不涉及到真实应用的开发。 token的格式 tokenheader‘.’payload‘.’sign 第一次登陆的时候 判断http请求头中是否包含Authorization不包含则提示用户未登录当用户登录后,…...

day12_oop_抽象和接口

今日内容 零、 复习昨日 一、作业 二、抽象 三、接口 零、 复习昨日 final的作用 修饰类,类不能被继承修饰方法,方法不能重写[重点]修饰变量/属性,变成常量,不能更改 static修饰方法的特点 static修饰的方法,可以通过类名调用 static修饰的属性特点 在内存只有一份,被该类的所有…...

linux 将 api_key设置环境变量里

vi ~/.bashrc在最后添加api_key的环境变量 export GEMINI_API_KEYAIza**********WvpX7FwbdM刷新配置 source ~/.bashrc使用python 读取环境变量 import os gemini_api_key os.getenv(GEMINI_API_KEY) print(gemini_api_key)...

java八股文复习-----2024/03/03

1.接口和抽象类的区别 相似点: (1)接口和抽象类都不能被实例化 (2)实现接口或继承抽象类的普通子类都必须实现这些抽象方法 不同点: (1)抽象类可以包含普通方法和代码块&#x…...

UE4 Niagara 关卡3.4官方案例解析

Texture sampling is only supported on the GPU at the moment.(纹理采样目前仅在GPU上受支持) 效果:textures can be referenced within GPU particle systems。this demo maps a texture to a grid of particles(纹理可以在GPU粒子系统中被引用这个演…...

C# Onnx segment-anything 分割万物 一键抠图

目录 介绍 效果 模型信息 sam_vit_b_decoder.onnx sam_vit_b_encoder.onnx 项目 代码 下载 C# Onnx segment-anything 分割万物 一键抠图 介绍 github地址:GitHub - facebookresearch/segment-anything: The repository provides code for running infere…...

Linux配置网卡功能

提示:工具下载链接在文章最后 目录 一.network功能介绍二.配置network功能2.1 network_ip配置检查 2.2 network_br配置2.2.1 配置的网桥原先不存在检查2.2.2 配置的网桥已存在-修改网桥IP检查2.2.3 配置的网桥已存在-只添加网卡到网桥里检查 2.3 network_bond配置检查 2.4 netw…...

【C++】十大排序算法之 归并排序 快速排序

本次介绍内容参考自:十大经典排序算法(C实现) - fengMisaka - 博客园 (cnblogs.com) 排序算法是《数据结构与算法》中最基本的算法之一。 十种常见排序算法可以分为两大类: 比较类排序:通过比较来决定元素间的相对次序…...

x-pack的破解方式和免费jar包!!可直接用!!

原理介绍 我们平时为es安装x-pack组件,用elasticsearch-plugin install x-pack ,安装成功后。 1.cd $es目录/pulgins/x-pack 里面有一个x-pack-5.6.2.jar ,将jar包反编译,然后将里面的licence的程序改下。再编译成jar包。 2…...

最新版本,Midjourney保姆级教程!

一、认识Midjourney 1.1、MidJourney是什么? 随着ChatGPT的横空出世,人类正式迈入AI元年,其中MidJourney便是AI绘图工具,它能根据用户输入的文字描述(提示词)生成绘画作品,不管是灵动的人物&a…...

Android中的几种定位方式调用详解

目前,移动端大致通过三种方式来进行设备定位:GPS、基站、wifi。本文就详细的讲解一下这几种定位方式和实现方法。 前言 android中我们一般使用LocationManager来获取位置信息,这里面有四中provider: public static final Strin…...

【软件测试】接口调不通排查分析+常遇面试题总结

目录:导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜) 前言 1、接口调不通&am…...

c++基础学习第三天(指针,结构体)

c基础学习第三天(指针,结构体) 文章目录 1、指针1.1、指针的基本概念1.2、指针变量的定义和使用1.3、 指针所占内存空间1.4、空指针和野指针1.5、 const修饰指针1.5.1、const修饰指针-常量指针1.5.2、const修饰常量-指针常量1.5.3、const即修…...

【数仓】zookeeper软件安装及集群配置

相关文章 【数仓】基本概念、知识普及、核心技术【数仓】数据分层概念以及相关逻辑【数仓】Hadoop软件安装及使用(集群配置)【数仓】Hadoop集群配置常用参数说明 一、环境准备 准备3台虚拟机 Hadoop131:192.168.56.131Hadoop132&#xff…...

Qt 实现橡皮擦拭显示图片

1.简介 在一些游戏中看见类似解密破案的效果,使用手触摸去擦拭图片上的灰尘,然后显示最终的图片,所以也想试试Qt实现的效果。大家有自己想做的效果,都可以尝试。 以下是效果展示图。 可以控制橡皮擦的大小,进行擦拭…...

Vue3+Element-Plus中ELMessage样式丢失处理

Vu3Element-Plus项目中,element-plus使用按需引入有时会出现样式失效和在vscode中使用会报错[找不到名称“ElMessage”。ts(2304)]错误 ELMessage弹框样式丢失处理方法 使用按需引入就不能手动再引入 import { ElMessage } from "element-plus";ElMessage.success…...

97 spring 中的泛型类型注入

前言 呵呵 同样是 最近同事碰到的一个问题 他不太懂 英语, 看到的说明是 缺少一个 RedisTemplate 的实例, 但是找到了一个 RedisTemplate 的实例 呵呵 和我这里 spring 版本似乎是不太一样, 错误信息 有一些差异 以下环境基于 jdk8 spring-5.0.4-RELEASE 测试用例 BeanCon…...

C++设计模式

单例模式 单例模式保证一个类只能创建一个对象,并提供全局访问点。通常用于全局共享例如日志、数据库连接池等。 Lazy Initialization 优点:需要时才初始化,节省空间 缺点:线程不安全 class Singleton{ private:static Singlet…...

龙虎榜——20250610

上证指数放量收阴线,个股多数下跌,盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型,指数短线有调整的需求,大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的:御银股份、雄帝科技 驱动…...

谷歌浏览器插件

项目中有时候会用到插件 sync-cookie-extension1.0.0:开发环境同步测试 cookie 至 localhost,便于本地请求服务携带 cookie 参考地址:https://juejin.cn/post/7139354571712757767 里面有源码下载下来,加在到扩展即可使用FeHelp…...

树莓派超全系列教程文档--(61)树莓派摄像头高级使用方法

树莓派摄像头高级使用方法 配置通过调谐文件来调整相机行为 使用多个摄像头安装 libcam 和 rpicam-apps依赖关系开发包 文章来源: http://raspberry.dns8844.cn/documentation 原文网址 配置 大多数用例自动工作,无需更改相机配置。但是,一…...

【力扣数据库知识手册笔记】索引

索引 索引的优缺点 优点1. 通过创建唯一性索引,可以保证数据库表中每一行数据的唯一性。2. 可以加快数据的检索速度(创建索引的主要原因)。3. 可以加速表和表之间的连接,实现数据的参考完整性。4. 可以在查询过程中,…...

【入坑系列】TiDB 强制索引在不同库下不生效问题

文章目录 背景SQL 优化情况线上SQL运行情况分析怀疑1:执行计划绑定问题?尝试:SHOW WARNINGS 查看警告探索 TiDB 的 USE_INDEX 写法Hint 不生效问题排查解决参考背景 项目中使用 TiDB 数据库,并对 SQL 进行优化了,添加了强制索引。 UAT 环境已经生效,但 PROD 环境强制索…...

mongodb源码分析session执行handleRequest命令find过程

mongo/transport/service_state_machine.cpp已经分析startSession创建ASIOSession过程,并且验证connection是否超过限制ASIOSession和connection是循环接受客户端命令,把数据流转换成Message,状态转变流程是:State::Created 》 St…...

SCAU期末笔记 - 数据分析与数据挖掘题库解析

这门怎么题库答案不全啊日 来简单学一下子来 一、选择题(可多选) 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘:专注于发现数据中…...

linux 错误码总结

1,错误码的概念与作用 在Linux系统中,错误码是系统调用或库函数在执行失败时返回的特定数值,用于指示具体的错误类型。这些错误码通过全局变量errno来存储和传递,errno由操作系统维护,保存最近一次发生的错误信息。值得注意的是,errno的值在每次系统调用或函数调用失败时…...

生成 Git SSH 证书

🔑 1. ​​生成 SSH 密钥对​​ 在终端(Windows 使用 Git Bash,Mac/Linux 使用 Terminal)执行命令: ssh-keygen -t rsa -b 4096 -C "your_emailexample.com" ​​参数说明​​: -t rsa&#x…...

PL0语法,分析器实现!

简介 PL/0 是一种简单的编程语言,通常用于教学编译原理。它的语法结构清晰,功能包括常量定义、变量声明、过程(子程序)定义以及基本的控制结构(如条件语句和循环语句)。 PL/0 语法规范 PL/0 是一种教学用的小型编程语言,由 Niklaus Wirth 设计,用于展示编译原理的核…...