当前位置: 首页 > news >正文

Spark中多分区写文件前可以不排序么

背景

Spark 3.5.0
目前 Spark中的实现中,对于多分区的写入默认会先排序,这是没必要的。可以设置spark.sql.maxConcurrentOutputFileWriters 为大于0来避免排序。

分析

这部分主要分为三个部分:
一个是V1Writes规则的重改;
另一个是FileFormatWriter中的dataWriter的选择;
还有一个是Spark中为什么会加上Sort
这三部分是需要结合在一起分析讨论的

V1Writes规则的重改

直接转到代码部分:

object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {import V1WritesUtils._override def apply(plan: LogicalPlan): LogicalPlan = {if (conf.plannedWriteEnabled) {plan.transformUp {case write: V1WriteCommand if !write.child.isInstanceOf[WriteFiles] =>val newQuery = prepareQuery(write, write.query)val attrMap = AttributeMap(write.query.output.zip(newQuery.output))val writeFiles = WriteFiles(newQuery, write.fileFormat, write.partitionColumns,write.bucketSpec, write.options, write.staticPartitions)val newChild = writeFiles.transformExpressions {case a: Attribute if attrMap.contains(a) =>a.withExprId(attrMap(a).exprId)}val newWrite = write.withNewChildren(newChild :: Nil).transformExpressions {case a: Attribute if attrMap.contains(a) =>a.withExprId(attrMap(a).exprId)}newWrite}} else {plan}}

其中 prepareQuery是对满足条件的计划前加上Sort逻辑排序,其中prepareQuery关键的代码如下:

    val requiredOrdering = write.requiredOrdering.map(_.transform {case a: Attribute => attrMap.getOrElse(a, a)}.asInstanceOf[SortOrder])val outputOrdering = empty2NullPlan.outputOrderingval orderingMatched = isOrderingMatched(requiredOrdering.map(_.child), outputOrdering)if (orderingMatched) {empty2NullPlan} else {Sort(requiredOrdering, global = false, empty2NullPlan)}

write.requiredOrdering中涉及到的类为InsertIntoHadoopFsRelationCommandInsertIntoHiveTable,且这两个物理计划中的requiredOrdering实现都是:

V1WritesUtils.getSortOrder(outputColumns, partitionColumns, bucketSpec, options)

getSortOrder方法关键代码如下:

    val sortColumns = V1WritesUtils.getBucketSortColumns(bucketSpec, dataColumns)if (SQLConf.get.maxConcurrentOutputFileWriters > 0 && sortColumns.isEmpty) {// Do not insert logical sort when concurrent output writers are enabled.Seq.empty} else {// We should first sort by dynamic partition columns, then bucket id, and finally sorting// columns.(dynamicPartitionColumns ++ writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns).map(SortOrder(_, Ascending))}

所以说 如果 spark.sql.maxConcurrentOutputFileWriters为0(默认值为0),则会加上Sort逻辑计划,具体的实现可以参考SPARK-37287
如果spark.sql.maxConcurrentOutputFileWriters为0(默认值为0)且 sortColumns为空(大部分情况下为空,除非建表是partition加上bucket),则不会加上Sort逻辑计划

FileFormatWriter 中的dataWriter的选择

InsertIntoHadoopFsRelationCommandInsertIntoHiveTable 这两个物理计划中,最终写入文件/数据的时候,会调用到FileFormatWriter.write方法,这里有个concurrentOutputWriterSpecFunc函数变量的设置:

      val concurrentOutputWriterSpecFunc = (plan: SparkPlan) => {val sortPlan = createSortPlan(plan, requiredOrdering, outputSpec)createConcurrentOutputWriterSpec(sparkSession, sortPlan, sortColumns)}val writeSpec = WriteFilesSpec(description = description,committer = committer,concurrentOutputWriterSpecFunc = concurrentOutputWriterSpecFunc)executeWrite(sparkSession, plan, writeSpec, job)

设置concurrentOutputWriterSpecFunc的代码如下:

  private def createConcurrentOutputWriterSpec(sparkSession: SparkSession,sortPlan: SortExec,sortColumns: Seq[Attribute]): Option[ConcurrentOutputWriterSpec] = {val maxWriters = sparkSession.sessionState.conf.maxConcurrentOutputFileWritersval concurrentWritersEnabled = maxWriters > 0 && sortColumns.isEmptyif (concurrentWritersEnabled) {Some(ConcurrentOutputWriterSpec(maxWriters, () => sortPlan.createSorter()))} else {None}}

如果 spark.sql.maxConcurrentOutputFileWriters为0(默认值为0),则ConcurrentOutputWriterSpec为None
如果 spark.sql.maxConcurrentOutputFileWriters大于0sortColumns为空(大部分情况下为空,除非建表是partition加上bucket),则为Some(ConcurrentOutputWriterSpec(maxWriters, () => sortPlan.createSorter())

其中executeWrite会调用WriteFilesExec.doExecuteWrite方法,从而调用FileFormatWriter.executeTask,这里就涉及到dataWriter选择:

    val dataWriter =if (sparkPartitionId != 0 && !iterator.hasNext) {// In case of empty job, leave first partition to save meta for file format like parquet.new EmptyDirectoryDataWriter(description, taskAttemptContext, committer)} else if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) {new SingleDirectoryDataWriter(description, taskAttemptContext, committer)} else {concurrentOutputWriterSpec match {case Some(spec) =>new DynamicPartitionDataConcurrentWriter(description, taskAttemptContext, committer, spec)case _ =>new DynamicPartitionDataSingleWriter(description, taskAttemptContext, committer)}}

这里其实会根据 concurrentOutputWriterSpec来选择不同的dataWriter,默认情况下为DynamicPartitionDataSingleWriter
否则就会为DynamicPartitionDataConcurrentWriter
这两者的区别,见下文

Spark中为什么会加上Sort

至于Spark在写入文件的时候会加上Sort,这个是跟写入的实现有关的,也就是DynamicPartitionDataSingleWriterDynamicPartitionDataConcurrentWriter的区别:

  • DynamicPartitionDataSingleWriter 在任何时刻,只有一个writer在写文件,这能保证写入的稳定性,不会在写入文件的时候消耗大量的内存,但是速度会慢
  • DynamicPartitionDataConcurrentWriter 会有多个 writer 同时写文件,能加快写入文件的速度,但是因为多个文件的同时写入,可能会导致OOM

对于DynamicPartitionDataSingleWriter 会根据partition或者bucket作为最细粒度来作为writer的标准,如果相邻的两条记录所属不同的partition或者bucket,则会切换writer,所以说如果不根据partition或者bucket排序的话,会导致writer频繁的切换,这会大大降低文件的写入速度。所以说需要根据partition或者bucket进行排序。

参考

  1. [SPARK-37287][SQL] Pull out dynamic partition and bucket sort from FileFormatWriter
  2. [SQL] Allow FileFormatWriter to write multiple partitions/buckets without sort

相关文章:

Spark中多分区写文件前可以不排序么

背景 Spark 3.5.0 目前 Spark中的实现中,对于多分区的写入默认会先排序,这是没必要的。可以设置spark.sql.maxConcurrentOutputFileWriters 为大于0来避免排序。 分析 这部分主要分为三个部分: 一个是V1Writes规则的重改; 另一个是FileFormatWriter中…...

突破编程_C++_面试(变量与常量)

面试题 1 : C 中的变量存储类别有哪些,并简要描述它们的特点? 在C中,变量的存储类别决定了变量的生命周期和可见性。以下是C中的几种变量存储类别及其特点: 自动存储期 也称为局部存储类别。这类变量在函数或代码块…...

k8s的一些关键信息(归类摘抄,非提炼)

零:举例说明 当用户提交一个 Deployment 对象到 Kubernetes 集群时,控制平面的 API Server 接收到该请求,并将其转发给 Controller Manager。Controller Manager 中的 Deployment Controller 监听到该请求,并根据用户定义的配置信…...

海外媒体发稿:8个提升影响力的日韩地区媒体发稿推广策略-华媒舍

在今天的数字化时代,媒体发稿推广成为企业和个人增加影响力的重要方式。特别是在日韩地区,这个拥有庞大媒体市场和活跃社交媒体用户的地区,正确的推广策略将对影响力的提升起到关键作用。我们将介绍8个提升影响力的日韩地区媒体发稿推广策略。…...

面试官:能不能给 Promise 增加取消功能和进度通知功能... 我:???

扯皮 这段时间闲着没事就去翻翻红宝书,已经看到 Promise 篇了,今天又让我翻到两个陌生的知识点。 因为 Promise 业务场景太多了自我感觉掌握的也比较透彻,之前也跟着 Promise A 的规范手写过完整的 Promise,所以这部分内容基本上…...

详解MySQL增删查改

众所周知&#xff0c;MySQL是非常重要的数据库语言&#xff0c;下面我们来回顾一下mysql的增删查改吧 MySQL创建数据库&#xff1a; CREATE DATABASE 数据库名;MySQL删除数据库&#xff1a; DROP DATABASE <database_name>; --直接删除&#xff0c;不检查是否存在 DROP…...

Mysql开启bin-log日志

目录 一、安装配置 二、mysqlbinlog命令 一、安装配置 yum -y install mariadb mariadb-server#安装mysql数据库#默认配置文件/etc/my.cnfvim /etc/my.cnflog-binmariadb-bin #开启二进制日志 systemctl restart mariadb#会在/car/lib/mysql/产生二进制日志文件&#xff0…...

Java:性能优化细节01-10

Java&#xff1a;性能优化细节01-10 在Java程序开发过程中&#xff0c;性能优化是一个重要的考虑因素。常见的误解是将性能问题归咎于Java语言本身&#xff0c;然而实际上&#xff0c;性能瓶颈更多地源于程序设计和代码实现方式的不当。因此&#xff0c;培养良好的编码习惯不仅…...

CVE-2022-24652 漏洞复现

CVE-2022-24652 开题 后台管理是thinkphp的&#xff0c;但是工具没检测出漏洞。 登陆后界面如下&#xff0c;上传头像功能值得引起注意 这其实就是CVE-2022-24652&#xff0c;危险类型文件的不加限制上传&#xff0c;是文件上传漏洞。漏洞路由/user/upload/upload 参考文章&a…...

LeetCode、338. 比特位计数【简单,位运算】

文章目录 前言LeetCode、338. 比特位计数【中等&#xff0c;位运算】题目链接与分类思路位运算移位处理前缀思想实现 资料获取 前言 博主介绍&#xff1a;✌目前全网粉丝2W&#xff0c;csdn博客专家、Java领域优质创作者&#xff0c;博客之星、阿里云平台优质作者、专注于Java…...

借助Aspose.BarCode条码控件,C# 中的文本转 QR 码生成器

二维码用于在较小的空间内存储大量数据。它们易于使用&#xff0c;可以通过智能手机或其他设备扫描来打开网站、观看视频或访问其他编码信息。在这篇博文中&#xff0c;我们将学习如何使用 C# 以编程方式生成基于文本的 QR 码。我们将提供分步指南和代码片段&#xff0c;帮助您…...

vue打包优化,webpack的8大配置方案

vue-cli 生成的项目通常集成Webpack &#xff0c;在打包的时候&#xff0c;需要webpack来做一些事情。这里我们希望它可以压缩代码体积&#xff0c;提高运行效率。 文章目录 &#xff08;1&#xff09;代码压缩&#xff1a;&#xff08;2&#xff09;图片压缩&#xff1a;&…...

B端系统从0到1:有几步,其中需求分析要做啥?

一款B系统从无到有都经历了啥&#xff0c;而其中的需求分析又要做什么&#xff1f;贝格前端工场给老铁们做一下分析&#xff0c;文章写作不易&#xff0c;如果咱们有界面设计和前端开发需求&#xff0c;别忘了私信我呦&#xff0c;开始了。 一、B端系统从0到1都有哪些要走的步骤…...

django中查询优化

在Django中&#xff0c;查询优化是一个重要的主题&#xff0c;因为不正确的查询可能会导致性能问题&#xff0c;尤其是在处理大量数据时。以下是一些在Django中进行查询优化的建议&#xff1a; 一&#xff1a;使用select_related和prefetch_related: select_related用于优化一…...

【JavaScript】输入输出语法

目录 一、输出语法 二、输入语法 一、输出语法 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>D…...

多模态基础--- word Embedding

1 word Embedding 原始的单词编码方式&#xff1a; one-hot&#xff0c;维度太大&#xff0c;不同单词之间相互独立&#xff0c;没有远近关系区分。 wordclass&#xff0c;将同一类单词编码在一起&#xff0c;此时丢失了类别和类别间的相关信息&#xff0c;比如class1和class3…...

Mysql 日志

0 引言 MySQL日志主要分为4类&#xff0c;使用这些日志文件&#xff0c;可以查看MySQL内部发生的事情。这4类日志分别是&#xff1a; ● 错误日志&#xff1a;记录MySQL服务的启动、运行或停止MySQL服务时出现的问题。 ● 查询日志&#xff1a;记录建立的客户端连接和执行的…...

【开源】SpringBoot框架开发服装店库存管理系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 数据中心模块2.2 角色管理模块2.3 服装档案模块2.4 服装入库模块2.5 服装出库模块 三、系统设计3.1 用例设计3.2 数据库设计3.2.1 角色表3.2.2 服装档案表3.2.3 服装入库表3.2.4 服装出库表 四、系统展示五、核心代码5.…...

云原生之容器编排实践-在K8S集群中使用Registry2搭建私有镜像仓库

背景 基于前面搭建的3节点 Kubernetes 集群&#xff0c;今天我们使用 Registry2 搭建私有镜像仓库&#xff0c;这在镜像安全性以及离线环境下运维等方面具有重要意义。 Note: 由于是测试环境&#xff0c;以下创建了一个 local-storage 的 StorageClass &#xff0c;并使用本地…...

标准IO 2月4日学习笔记

IO输入输出&#xff0c;操作对象是文件 Linux文件类型: b block 块设备文件 按块扫描设备信息的文件 存储设备 c character 字符设备文件 按字符扫描设备信息的文件 d direct…...

如何在1Panel上偷渡HTTP/3

本文 首发于 Anyeの小站&#xff0c;转载请取得作者同意。 前言 简介 HTTP/3 的基础即谷歌多年探索的基于 UDP 的 QUIC 协议。与 TCP 相比&#xff0c;使用 UDP 可以提供更大的灵活性&#xff0c;并且可以使 QUIC 完全于用户空间中实现——对协议实现的更新不像 TCP 那样需要绑…...

Qt实用技巧:QCustomPlot做北斗GPS显示绝对位置运动轨迹和相对位置运动轨迹图的时,使图按照输入点顺序连曲线

若该文为原创文章&#xff0c;转载请注明原文出处 本文章博客地址&#xff1a;https://hpzwl.blog.csdn.net/article/details/136131310 红胖子网络科技博文大全&#xff1a;开发技术集合&#xff08;包含Qt实用技术、树莓派、三维、OpenCV、OpenGL、ffmpeg、OSG、单片机、软硬…...

116 C++ 可变参数函数,initializer_list (初始化列表), 省略号形参

一 可变参数函数 有时候我们传递的参数是不固定的。 这种能接受非固定个数参数的函数就是可变参数函数 怎么实现呢&#xff1f;就要用到 initializer_list 标准库类型 该类型能够使用的前提条件是&#xff1a;所有的实参类型相同。 二&#xff0c;initializer_list(初始化列…...

强国有我社会实践公益活动在合肥市庐阳区开展

2月18日是开工第一天&#xff0c;阳光灿烂、春光明媚。合肥市四十五中2022级星辰&#xff08;5&#xff09;班部分同学在监护人的陪伴下来到庐阳区双岗街道万小店社区残疾人工作站&#xff0c;和工作站兄弟姐妹们共同开展“强国复兴有我”社会实践公益活动。合肥市庐阳区为民社…...

Nginx 正向代理、反向代理

文章目录 前言1. 正向代理1.1 概念1.2 逻辑图1.3 使用场景 2. 反向代理2.1 概念2.2 逻辑图2.3 使用场景 前言 正向代理主要是用来解决访问限制问题&#xff1b;反向代理则是提供负载均衡、安全防护等作用 1. 正向代理 1.1 概念 正向代理是一个位于客户端和目标服务器之间的代理…...

软考学习--计算机组成原理与体系结构

计算机组成原理与体系结构 数据的表示 进制转换 R 进制转换为 10 进制–按权展开法 10进制转换为2进制 原码 反码 补码 移码 原码 &#xff1a;数字的二进制表示反码 &#xff1a; 正数的反码等于原码&#xff0c;负数的反码等于原码取反补码&#xff1a; 正数的补码等…...

fish终端下conda activate失败

【问题】fish终端下激活conda环境报错&#xff1a; >> conda activate base CondaError: Run conda init before conda activate ## 然而运行 conda init fish 仍旧无法解决【解决】 参考&#xff1a;https://github.com/conda/conda/issues/11079 方法一&#xf…...

FPGA之移位寄存器

SLICEM中的LUT可以配置为32位移位寄存器,而无需使用slice中可用的触发器。以这种方式使用,每个LUT 可以将串 行数据延迟 1 到 32 个时钟周期。移入D &#xff08;DI1 LUT 引脚&#xff09;和移出 Q31&#xff08;MC31 LUT 引脚&#xff09;线路将LUT级联&#xff0c;以形成更大…...

Android Compose Material3 ModalNavigationDrawer 抽屉的使用(处理了一些坑)

Android Compose Material3 ModalNavigationDrawer 抽屉的使用&#xff08;处理了一些坑&#xff09; val drawerState rememberDrawerState(initialValue DrawerValue.Closed) val scope rememberCoroutineScope()ModalNavigationDrawer(drawerState drawerState,drawerC…...

golang select两个channel性能稳定,三个channel时性能会发生抖动,为什么?

golang select两个channel性能稳定&#xff0c;三个channel时性能会发生抖动&#xff0c;为什么&#xff1f; 答题思路 select —> 让 Goroutine同时等待多个 Channel 可读或者可写 —> Goroutine —> 调度器调度 —> 资源竞争 —> 不稳定、抖动 在 Go 中&#…...