当前位置: 首页 > news >正文

Spark中多分区写文件前可以不排序么

背景

Spark 3.5.0
目前 Spark中的实现中,对于多分区的写入默认会先排序,这是没必要的。可以设置spark.sql.maxConcurrentOutputFileWriters 为大于0来避免排序。

分析

这部分主要分为三个部分:
一个是V1Writes规则的重改;
另一个是FileFormatWriter中的dataWriter的选择;
还有一个是Spark中为什么会加上Sort
这三部分是需要结合在一起分析讨论的

V1Writes规则的重改

直接转到代码部分:

object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {import V1WritesUtils._override def apply(plan: LogicalPlan): LogicalPlan = {if (conf.plannedWriteEnabled) {plan.transformUp {case write: V1WriteCommand if !write.child.isInstanceOf[WriteFiles] =>val newQuery = prepareQuery(write, write.query)val attrMap = AttributeMap(write.query.output.zip(newQuery.output))val writeFiles = WriteFiles(newQuery, write.fileFormat, write.partitionColumns,write.bucketSpec, write.options, write.staticPartitions)val newChild = writeFiles.transformExpressions {case a: Attribute if attrMap.contains(a) =>a.withExprId(attrMap(a).exprId)}val newWrite = write.withNewChildren(newChild :: Nil).transformExpressions {case a: Attribute if attrMap.contains(a) =>a.withExprId(attrMap(a).exprId)}newWrite}} else {plan}}

其中 prepareQuery是对满足条件的计划前加上Sort逻辑排序,其中prepareQuery关键的代码如下:

    val requiredOrdering = write.requiredOrdering.map(_.transform {case a: Attribute => attrMap.getOrElse(a, a)}.asInstanceOf[SortOrder])val outputOrdering = empty2NullPlan.outputOrderingval orderingMatched = isOrderingMatched(requiredOrdering.map(_.child), outputOrdering)if (orderingMatched) {empty2NullPlan} else {Sort(requiredOrdering, global = false, empty2NullPlan)}

write.requiredOrdering中涉及到的类为InsertIntoHadoopFsRelationCommandInsertIntoHiveTable,且这两个物理计划中的requiredOrdering实现都是:

V1WritesUtils.getSortOrder(outputColumns, partitionColumns, bucketSpec, options)

getSortOrder方法关键代码如下:

    val sortColumns = V1WritesUtils.getBucketSortColumns(bucketSpec, dataColumns)if (SQLConf.get.maxConcurrentOutputFileWriters > 0 && sortColumns.isEmpty) {// Do not insert logical sort when concurrent output writers are enabled.Seq.empty} else {// We should first sort by dynamic partition columns, then bucket id, and finally sorting// columns.(dynamicPartitionColumns ++ writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns).map(SortOrder(_, Ascending))}

所以说 如果 spark.sql.maxConcurrentOutputFileWriters为0(默认值为0),则会加上Sort逻辑计划,具体的实现可以参考SPARK-37287
如果spark.sql.maxConcurrentOutputFileWriters为0(默认值为0)且 sortColumns为空(大部分情况下为空,除非建表是partition加上bucket),则不会加上Sort逻辑计划

FileFormatWriter 中的dataWriter的选择

InsertIntoHadoopFsRelationCommandInsertIntoHiveTable 这两个物理计划中,最终写入文件/数据的时候,会调用到FileFormatWriter.write方法,这里有个concurrentOutputWriterSpecFunc函数变量的设置:

      val concurrentOutputWriterSpecFunc = (plan: SparkPlan) => {val sortPlan = createSortPlan(plan, requiredOrdering, outputSpec)createConcurrentOutputWriterSpec(sparkSession, sortPlan, sortColumns)}val writeSpec = WriteFilesSpec(description = description,committer = committer,concurrentOutputWriterSpecFunc = concurrentOutputWriterSpecFunc)executeWrite(sparkSession, plan, writeSpec, job)

设置concurrentOutputWriterSpecFunc的代码如下:

  private def createConcurrentOutputWriterSpec(sparkSession: SparkSession,sortPlan: SortExec,sortColumns: Seq[Attribute]): Option[ConcurrentOutputWriterSpec] = {val maxWriters = sparkSession.sessionState.conf.maxConcurrentOutputFileWritersval concurrentWritersEnabled = maxWriters > 0 && sortColumns.isEmptyif (concurrentWritersEnabled) {Some(ConcurrentOutputWriterSpec(maxWriters, () => sortPlan.createSorter()))} else {None}}

如果 spark.sql.maxConcurrentOutputFileWriters为0(默认值为0),则ConcurrentOutputWriterSpec为None
如果 spark.sql.maxConcurrentOutputFileWriters大于0sortColumns为空(大部分情况下为空,除非建表是partition加上bucket),则为Some(ConcurrentOutputWriterSpec(maxWriters, () => sortPlan.createSorter())

其中executeWrite会调用WriteFilesExec.doExecuteWrite方法,从而调用FileFormatWriter.executeTask,这里就涉及到dataWriter选择:

    val dataWriter =if (sparkPartitionId != 0 && !iterator.hasNext) {// In case of empty job, leave first partition to save meta for file format like parquet.new EmptyDirectoryDataWriter(description, taskAttemptContext, committer)} else if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) {new SingleDirectoryDataWriter(description, taskAttemptContext, committer)} else {concurrentOutputWriterSpec match {case Some(spec) =>new DynamicPartitionDataConcurrentWriter(description, taskAttemptContext, committer, spec)case _ =>new DynamicPartitionDataSingleWriter(description, taskAttemptContext, committer)}}

这里其实会根据 concurrentOutputWriterSpec来选择不同的dataWriter,默认情况下为DynamicPartitionDataSingleWriter
否则就会为DynamicPartitionDataConcurrentWriter
这两者的区别,见下文

Spark中为什么会加上Sort

至于Spark在写入文件的时候会加上Sort,这个是跟写入的实现有关的,也就是DynamicPartitionDataSingleWriterDynamicPartitionDataConcurrentWriter的区别:

  • DynamicPartitionDataSingleWriter 在任何时刻,只有一个writer在写文件,这能保证写入的稳定性,不会在写入文件的时候消耗大量的内存,但是速度会慢
  • DynamicPartitionDataConcurrentWriter 会有多个 writer 同时写文件,能加快写入文件的速度,但是因为多个文件的同时写入,可能会导致OOM

对于DynamicPartitionDataSingleWriter 会根据partition或者bucket作为最细粒度来作为writer的标准,如果相邻的两条记录所属不同的partition或者bucket,则会切换writer,所以说如果不根据partition或者bucket排序的话,会导致writer频繁的切换,这会大大降低文件的写入速度。所以说需要根据partition或者bucket进行排序。

参考

  1. [SPARK-37287][SQL] Pull out dynamic partition and bucket sort from FileFormatWriter
  2. [SQL] Allow FileFormatWriter to write multiple partitions/buckets without sort

相关文章:

Spark中多分区写文件前可以不排序么

背景 Spark 3.5.0 目前 Spark中的实现中,对于多分区的写入默认会先排序,这是没必要的。可以设置spark.sql.maxConcurrentOutputFileWriters 为大于0来避免排序。 分析 这部分主要分为三个部分: 一个是V1Writes规则的重改; 另一个是FileFormatWriter中…...

突破编程_C++_面试(变量与常量)

面试题 1 : C 中的变量存储类别有哪些,并简要描述它们的特点? 在C中,变量的存储类别决定了变量的生命周期和可见性。以下是C中的几种变量存储类别及其特点: 自动存储期 也称为局部存储类别。这类变量在函数或代码块…...

k8s的一些关键信息(归类摘抄,非提炼)

零:举例说明 当用户提交一个 Deployment 对象到 Kubernetes 集群时,控制平面的 API Server 接收到该请求,并将其转发给 Controller Manager。Controller Manager 中的 Deployment Controller 监听到该请求,并根据用户定义的配置信…...

海外媒体发稿:8个提升影响力的日韩地区媒体发稿推广策略-华媒舍

在今天的数字化时代,媒体发稿推广成为企业和个人增加影响力的重要方式。特别是在日韩地区,这个拥有庞大媒体市场和活跃社交媒体用户的地区,正确的推广策略将对影响力的提升起到关键作用。我们将介绍8个提升影响力的日韩地区媒体发稿推广策略。…...

面试官:能不能给 Promise 增加取消功能和进度通知功能... 我:???

扯皮 这段时间闲着没事就去翻翻红宝书,已经看到 Promise 篇了,今天又让我翻到两个陌生的知识点。 因为 Promise 业务场景太多了自我感觉掌握的也比较透彻,之前也跟着 Promise A 的规范手写过完整的 Promise,所以这部分内容基本上…...

详解MySQL增删查改

众所周知&#xff0c;MySQL是非常重要的数据库语言&#xff0c;下面我们来回顾一下mysql的增删查改吧 MySQL创建数据库&#xff1a; CREATE DATABASE 数据库名;MySQL删除数据库&#xff1a; DROP DATABASE <database_name>; --直接删除&#xff0c;不检查是否存在 DROP…...

Mysql开启bin-log日志

目录 一、安装配置 二、mysqlbinlog命令 一、安装配置 yum -y install mariadb mariadb-server#安装mysql数据库#默认配置文件/etc/my.cnfvim /etc/my.cnflog-binmariadb-bin #开启二进制日志 systemctl restart mariadb#会在/car/lib/mysql/产生二进制日志文件&#xff0…...

Java:性能优化细节01-10

Java&#xff1a;性能优化细节01-10 在Java程序开发过程中&#xff0c;性能优化是一个重要的考虑因素。常见的误解是将性能问题归咎于Java语言本身&#xff0c;然而实际上&#xff0c;性能瓶颈更多地源于程序设计和代码实现方式的不当。因此&#xff0c;培养良好的编码习惯不仅…...

CVE-2022-24652 漏洞复现

CVE-2022-24652 开题 后台管理是thinkphp的&#xff0c;但是工具没检测出漏洞。 登陆后界面如下&#xff0c;上传头像功能值得引起注意 这其实就是CVE-2022-24652&#xff0c;危险类型文件的不加限制上传&#xff0c;是文件上传漏洞。漏洞路由/user/upload/upload 参考文章&a…...

LeetCode、338. 比特位计数【简单,位运算】

文章目录 前言LeetCode、338. 比特位计数【中等&#xff0c;位运算】题目链接与分类思路位运算移位处理前缀思想实现 资料获取 前言 博主介绍&#xff1a;✌目前全网粉丝2W&#xff0c;csdn博客专家、Java领域优质创作者&#xff0c;博客之星、阿里云平台优质作者、专注于Java…...

借助Aspose.BarCode条码控件,C# 中的文本转 QR 码生成器

二维码用于在较小的空间内存储大量数据。它们易于使用&#xff0c;可以通过智能手机或其他设备扫描来打开网站、观看视频或访问其他编码信息。在这篇博文中&#xff0c;我们将学习如何使用 C# 以编程方式生成基于文本的 QR 码。我们将提供分步指南和代码片段&#xff0c;帮助您…...

vue打包优化,webpack的8大配置方案

vue-cli 生成的项目通常集成Webpack &#xff0c;在打包的时候&#xff0c;需要webpack来做一些事情。这里我们希望它可以压缩代码体积&#xff0c;提高运行效率。 文章目录 &#xff08;1&#xff09;代码压缩&#xff1a;&#xff08;2&#xff09;图片压缩&#xff1a;&…...

B端系统从0到1:有几步,其中需求分析要做啥?

一款B系统从无到有都经历了啥&#xff0c;而其中的需求分析又要做什么&#xff1f;贝格前端工场给老铁们做一下分析&#xff0c;文章写作不易&#xff0c;如果咱们有界面设计和前端开发需求&#xff0c;别忘了私信我呦&#xff0c;开始了。 一、B端系统从0到1都有哪些要走的步骤…...

django中查询优化

在Django中&#xff0c;查询优化是一个重要的主题&#xff0c;因为不正确的查询可能会导致性能问题&#xff0c;尤其是在处理大量数据时。以下是一些在Django中进行查询优化的建议&#xff1a; 一&#xff1a;使用select_related和prefetch_related: select_related用于优化一…...

【JavaScript】输入输出语法

目录 一、输出语法 二、输入语法 一、输出语法 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>D…...

多模态基础--- word Embedding

1 word Embedding 原始的单词编码方式&#xff1a; one-hot&#xff0c;维度太大&#xff0c;不同单词之间相互独立&#xff0c;没有远近关系区分。 wordclass&#xff0c;将同一类单词编码在一起&#xff0c;此时丢失了类别和类别间的相关信息&#xff0c;比如class1和class3…...

Mysql 日志

0 引言 MySQL日志主要分为4类&#xff0c;使用这些日志文件&#xff0c;可以查看MySQL内部发生的事情。这4类日志分别是&#xff1a; ● 错误日志&#xff1a;记录MySQL服务的启动、运行或停止MySQL服务时出现的问题。 ● 查询日志&#xff1a;记录建立的客户端连接和执行的…...

【开源】SpringBoot框架开发服装店库存管理系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 数据中心模块2.2 角色管理模块2.3 服装档案模块2.4 服装入库模块2.5 服装出库模块 三、系统设计3.1 用例设计3.2 数据库设计3.2.1 角色表3.2.2 服装档案表3.2.3 服装入库表3.2.4 服装出库表 四、系统展示五、核心代码5.…...

云原生之容器编排实践-在K8S集群中使用Registry2搭建私有镜像仓库

背景 基于前面搭建的3节点 Kubernetes 集群&#xff0c;今天我们使用 Registry2 搭建私有镜像仓库&#xff0c;这在镜像安全性以及离线环境下运维等方面具有重要意义。 Note: 由于是测试环境&#xff0c;以下创建了一个 local-storage 的 StorageClass &#xff0c;并使用本地…...

标准IO 2月4日学习笔记

IO输入输出&#xff0c;操作对象是文件 Linux文件类型: b block 块设备文件 按块扫描设备信息的文件 存储设备 c character 字符设备文件 按字符扫描设备信息的文件 d direct…...

手把手教你用VSCode快速定位并修改RuoYi框架的页面标题和图标(避坑指南)

高效定制RuoYi前端界面&#xff1a;VSCode全局搜索实战指南 刚接触RuoYi框架的开发者常会遇到这样的困扰&#xff1a;想修改浏览器标签页标题或系统Logo&#xff0c;却不知从何下手。前后端分离的项目结构让配置文件散落在各处&#xff0c;而手动翻找无异于大海捞针。本文将带你…...

需要控制重复点击按钮的通用方法

如图所示 在需要控制重复点击的地方使用通用方法去控制 省时省力 比用传统的分页定时器更方便...

FORK客户端与GitHub高效协作指南

1. 为什么选择FORK客户端与GitHub协作 作为一个常年混迹在代码仓库的老司机&#xff0c;我试过几乎所有主流的Git图形化工具。FORK客户端给我的第一印象就是——清爽。没有复杂的界面&#xff0c;没有多余的功能&#xff0c;就像它的名字一样&#xff0c;专注做好代码分支管理…...

推理神器Phi-4-mini-reasoning实测:解方程、逻辑题一键生成答案

推理神器Phi-4-mini-reasoning实测&#xff1a;解方程、逻辑题一键生成答案 1. 模型介绍与核心能力 Phi-4-mini-reasoning是一款专注于逻辑推理和数学计算的轻量级AI模型。与通用聊天模型不同&#xff0c;它被专门设计用于处理需要分步推理的任务&#xff0c;能够将复杂的解题…...

用.NET 6+和secs4net快速搭建半导体设备通信主机(附完整代码示例)

基于.NET 6与secs4net构建半导体设备通信主机的实战指南 在半导体制造领域&#xff0c;设备间的高效通信是自动化生产线的核心需求。SECS/GEM协议作为行业标准&#xff0c;为设备与主机系统间的数据交换提供了可靠框架。本文将展示如何利用.NET 6平台和secs4net库快速搭建功能完…...

UE5伤害系统避坑指南:Damage Type没用好?你的Apply Damage可能白写了

UE5伤害系统深度解析&#xff1a;如何用Damage Type构建高扩展性战斗机制 在虚幻引擎5的游戏开发中&#xff0c;伤害系统是战斗机制的核心支柱。许多开发者习惯性地将注意力集中在Damage Amount这个数值上&#xff0c;却忽视了Damage Type这个能够赋予游戏深度和多样性的强大工…...

Vivado 2019.2实战:手把手教你封装自己的UART串口IP核(含参数化配置避坑指南)

Vivado 2019.2实战&#xff1a;从零构建可配置UART IP核的完整指南 在FPGA开发中&#xff0c;UART通信是最基础也最常用的功能之一。每次新项目都重新编写UART驱动不仅效率低下&#xff0c;还容易引入错误。本文将带你完整经历将一个经过验证的UART发送模块封装成可配置IP核的全…...

AI大模型进化地图:小白也能看懂的技术架构与未来趋势(收藏版)

本文深入剖析AI模型的技术架构、能力瓶颈及商业压力&#xff0c;揭示未来AI模型的四类形态&#xff1a;通用基础大模型、深度推理模型、边缘轻量模型和垂直领域专业模型。文章通过DeepSeek-R1和Google Gemini的案例&#xff0c;量化分析不同模型类型的业务逻辑差异&#xff0c;…...

DirectX兼容性解决方案:让经典游戏在Windows 10重获新生

DirectX兼容性解决方案&#xff1a;让经典游戏在Windows 10重获新生 【免费下载链接】dxwrapper Fixes compatibility issues with older games running on Windows 10 by wrapping DirectX dlls. Also allows loading custom libraries with the file extension .asi into gam…...

订单簿处理全面解析:从技术原理到实战优化

订单簿处理全面解析&#xff1a;从技术原理到实战优化 【免费下载链接】AXOrderBook A股订单簿工具&#xff0c;使用逐笔行情进行订单簿重建、千档快照发布、各档委托队列展示等&#xff0c;包括python模型和FPGA HLS实现。 项目地址: https://gitcode.com/gh_mirrors/ax/AXO…...