当前位置: 首页 > news >正文

Spark中多分区写文件前可以不排序么

背景

Spark 3.5.0
目前 Spark中的实现中,对于多分区的写入默认会先排序,这是没必要的。可以设置spark.sql.maxConcurrentOutputFileWriters 为大于0来避免排序。

分析

这部分主要分为三个部分:
一个是V1Writes规则的重改;
另一个是FileFormatWriter中的dataWriter的选择;
还有一个是Spark中为什么会加上Sort
这三部分是需要结合在一起分析讨论的

V1Writes规则的重改

直接转到代码部分:

object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {import V1WritesUtils._override def apply(plan: LogicalPlan): LogicalPlan = {if (conf.plannedWriteEnabled) {plan.transformUp {case write: V1WriteCommand if !write.child.isInstanceOf[WriteFiles] =>val newQuery = prepareQuery(write, write.query)val attrMap = AttributeMap(write.query.output.zip(newQuery.output))val writeFiles = WriteFiles(newQuery, write.fileFormat, write.partitionColumns,write.bucketSpec, write.options, write.staticPartitions)val newChild = writeFiles.transformExpressions {case a: Attribute if attrMap.contains(a) =>a.withExprId(attrMap(a).exprId)}val newWrite = write.withNewChildren(newChild :: Nil).transformExpressions {case a: Attribute if attrMap.contains(a) =>a.withExprId(attrMap(a).exprId)}newWrite}} else {plan}}

其中 prepareQuery是对满足条件的计划前加上Sort逻辑排序,其中prepareQuery关键的代码如下:

    val requiredOrdering = write.requiredOrdering.map(_.transform {case a: Attribute => attrMap.getOrElse(a, a)}.asInstanceOf[SortOrder])val outputOrdering = empty2NullPlan.outputOrderingval orderingMatched = isOrderingMatched(requiredOrdering.map(_.child), outputOrdering)if (orderingMatched) {empty2NullPlan} else {Sort(requiredOrdering, global = false, empty2NullPlan)}

write.requiredOrdering中涉及到的类为InsertIntoHadoopFsRelationCommandInsertIntoHiveTable,且这两个物理计划中的requiredOrdering实现都是:

V1WritesUtils.getSortOrder(outputColumns, partitionColumns, bucketSpec, options)

getSortOrder方法关键代码如下:

    val sortColumns = V1WritesUtils.getBucketSortColumns(bucketSpec, dataColumns)if (SQLConf.get.maxConcurrentOutputFileWriters > 0 && sortColumns.isEmpty) {// Do not insert logical sort when concurrent output writers are enabled.Seq.empty} else {// We should first sort by dynamic partition columns, then bucket id, and finally sorting// columns.(dynamicPartitionColumns ++ writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns).map(SortOrder(_, Ascending))}

所以说 如果 spark.sql.maxConcurrentOutputFileWriters为0(默认值为0),则会加上Sort逻辑计划,具体的实现可以参考SPARK-37287
如果spark.sql.maxConcurrentOutputFileWriters为0(默认值为0)且 sortColumns为空(大部分情况下为空,除非建表是partition加上bucket),则不会加上Sort逻辑计划

FileFormatWriter 中的dataWriter的选择

InsertIntoHadoopFsRelationCommandInsertIntoHiveTable 这两个物理计划中,最终写入文件/数据的时候,会调用到FileFormatWriter.write方法,这里有个concurrentOutputWriterSpecFunc函数变量的设置:

      val concurrentOutputWriterSpecFunc = (plan: SparkPlan) => {val sortPlan = createSortPlan(plan, requiredOrdering, outputSpec)createConcurrentOutputWriterSpec(sparkSession, sortPlan, sortColumns)}val writeSpec = WriteFilesSpec(description = description,committer = committer,concurrentOutputWriterSpecFunc = concurrentOutputWriterSpecFunc)executeWrite(sparkSession, plan, writeSpec, job)

设置concurrentOutputWriterSpecFunc的代码如下:

  private def createConcurrentOutputWriterSpec(sparkSession: SparkSession,sortPlan: SortExec,sortColumns: Seq[Attribute]): Option[ConcurrentOutputWriterSpec] = {val maxWriters = sparkSession.sessionState.conf.maxConcurrentOutputFileWritersval concurrentWritersEnabled = maxWriters > 0 && sortColumns.isEmptyif (concurrentWritersEnabled) {Some(ConcurrentOutputWriterSpec(maxWriters, () => sortPlan.createSorter()))} else {None}}

如果 spark.sql.maxConcurrentOutputFileWriters为0(默认值为0),则ConcurrentOutputWriterSpec为None
如果 spark.sql.maxConcurrentOutputFileWriters大于0sortColumns为空(大部分情况下为空,除非建表是partition加上bucket),则为Some(ConcurrentOutputWriterSpec(maxWriters, () => sortPlan.createSorter())

其中executeWrite会调用WriteFilesExec.doExecuteWrite方法,从而调用FileFormatWriter.executeTask,这里就涉及到dataWriter选择:

    val dataWriter =if (sparkPartitionId != 0 && !iterator.hasNext) {// In case of empty job, leave first partition to save meta for file format like parquet.new EmptyDirectoryDataWriter(description, taskAttemptContext, committer)} else if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) {new SingleDirectoryDataWriter(description, taskAttemptContext, committer)} else {concurrentOutputWriterSpec match {case Some(spec) =>new DynamicPartitionDataConcurrentWriter(description, taskAttemptContext, committer, spec)case _ =>new DynamicPartitionDataSingleWriter(description, taskAttemptContext, committer)}}

这里其实会根据 concurrentOutputWriterSpec来选择不同的dataWriter,默认情况下为DynamicPartitionDataSingleWriter
否则就会为DynamicPartitionDataConcurrentWriter
这两者的区别,见下文

Spark中为什么会加上Sort

至于Spark在写入文件的时候会加上Sort,这个是跟写入的实现有关的,也就是DynamicPartitionDataSingleWriterDynamicPartitionDataConcurrentWriter的区别:

  • DynamicPartitionDataSingleWriter 在任何时刻,只有一个writer在写文件,这能保证写入的稳定性,不会在写入文件的时候消耗大量的内存,但是速度会慢
  • DynamicPartitionDataConcurrentWriter 会有多个 writer 同时写文件,能加快写入文件的速度,但是因为多个文件的同时写入,可能会导致OOM

对于DynamicPartitionDataSingleWriter 会根据partition或者bucket作为最细粒度来作为writer的标准,如果相邻的两条记录所属不同的partition或者bucket,则会切换writer,所以说如果不根据partition或者bucket排序的话,会导致writer频繁的切换,这会大大降低文件的写入速度。所以说需要根据partition或者bucket进行排序。

参考

  1. [SPARK-37287][SQL] Pull out dynamic partition and bucket sort from FileFormatWriter
  2. [SQL] Allow FileFormatWriter to write multiple partitions/buckets without sort

相关文章:

Spark中多分区写文件前可以不排序么

背景 Spark 3.5.0 目前 Spark中的实现中,对于多分区的写入默认会先排序,这是没必要的。可以设置spark.sql.maxConcurrentOutputFileWriters 为大于0来避免排序。 分析 这部分主要分为三个部分: 一个是V1Writes规则的重改; 另一个是FileFormatWriter中…...

突破编程_C++_面试(变量与常量)

面试题 1 : C 中的变量存储类别有哪些,并简要描述它们的特点? 在C中,变量的存储类别决定了变量的生命周期和可见性。以下是C中的几种变量存储类别及其特点: 自动存储期 也称为局部存储类别。这类变量在函数或代码块…...

k8s的一些关键信息(归类摘抄,非提炼)

零:举例说明 当用户提交一个 Deployment 对象到 Kubernetes 集群时,控制平面的 API Server 接收到该请求,并将其转发给 Controller Manager。Controller Manager 中的 Deployment Controller 监听到该请求,并根据用户定义的配置信…...

海外媒体发稿:8个提升影响力的日韩地区媒体发稿推广策略-华媒舍

在今天的数字化时代,媒体发稿推广成为企业和个人增加影响力的重要方式。特别是在日韩地区,这个拥有庞大媒体市场和活跃社交媒体用户的地区,正确的推广策略将对影响力的提升起到关键作用。我们将介绍8个提升影响力的日韩地区媒体发稿推广策略。…...

面试官:能不能给 Promise 增加取消功能和进度通知功能... 我:???

扯皮 这段时间闲着没事就去翻翻红宝书,已经看到 Promise 篇了,今天又让我翻到两个陌生的知识点。 因为 Promise 业务场景太多了自我感觉掌握的也比较透彻,之前也跟着 Promise A 的规范手写过完整的 Promise,所以这部分内容基本上…...

详解MySQL增删查改

众所周知&#xff0c;MySQL是非常重要的数据库语言&#xff0c;下面我们来回顾一下mysql的增删查改吧 MySQL创建数据库&#xff1a; CREATE DATABASE 数据库名;MySQL删除数据库&#xff1a; DROP DATABASE <database_name>; --直接删除&#xff0c;不检查是否存在 DROP…...

Mysql开启bin-log日志

目录 一、安装配置 二、mysqlbinlog命令 一、安装配置 yum -y install mariadb mariadb-server#安装mysql数据库#默认配置文件/etc/my.cnfvim /etc/my.cnflog-binmariadb-bin #开启二进制日志 systemctl restart mariadb#会在/car/lib/mysql/产生二进制日志文件&#xff0…...

Java:性能优化细节01-10

Java&#xff1a;性能优化细节01-10 在Java程序开发过程中&#xff0c;性能优化是一个重要的考虑因素。常见的误解是将性能问题归咎于Java语言本身&#xff0c;然而实际上&#xff0c;性能瓶颈更多地源于程序设计和代码实现方式的不当。因此&#xff0c;培养良好的编码习惯不仅…...

CVE-2022-24652 漏洞复现

CVE-2022-24652 开题 后台管理是thinkphp的&#xff0c;但是工具没检测出漏洞。 登陆后界面如下&#xff0c;上传头像功能值得引起注意 这其实就是CVE-2022-24652&#xff0c;危险类型文件的不加限制上传&#xff0c;是文件上传漏洞。漏洞路由/user/upload/upload 参考文章&a…...

LeetCode、338. 比特位计数【简单,位运算】

文章目录 前言LeetCode、338. 比特位计数【中等&#xff0c;位运算】题目链接与分类思路位运算移位处理前缀思想实现 资料获取 前言 博主介绍&#xff1a;✌目前全网粉丝2W&#xff0c;csdn博客专家、Java领域优质创作者&#xff0c;博客之星、阿里云平台优质作者、专注于Java…...

借助Aspose.BarCode条码控件,C# 中的文本转 QR 码生成器

二维码用于在较小的空间内存储大量数据。它们易于使用&#xff0c;可以通过智能手机或其他设备扫描来打开网站、观看视频或访问其他编码信息。在这篇博文中&#xff0c;我们将学习如何使用 C# 以编程方式生成基于文本的 QR 码。我们将提供分步指南和代码片段&#xff0c;帮助您…...

vue打包优化,webpack的8大配置方案

vue-cli 生成的项目通常集成Webpack &#xff0c;在打包的时候&#xff0c;需要webpack来做一些事情。这里我们希望它可以压缩代码体积&#xff0c;提高运行效率。 文章目录 &#xff08;1&#xff09;代码压缩&#xff1a;&#xff08;2&#xff09;图片压缩&#xff1a;&…...

B端系统从0到1:有几步,其中需求分析要做啥?

一款B系统从无到有都经历了啥&#xff0c;而其中的需求分析又要做什么&#xff1f;贝格前端工场给老铁们做一下分析&#xff0c;文章写作不易&#xff0c;如果咱们有界面设计和前端开发需求&#xff0c;别忘了私信我呦&#xff0c;开始了。 一、B端系统从0到1都有哪些要走的步骤…...

django中查询优化

在Django中&#xff0c;查询优化是一个重要的主题&#xff0c;因为不正确的查询可能会导致性能问题&#xff0c;尤其是在处理大量数据时。以下是一些在Django中进行查询优化的建议&#xff1a; 一&#xff1a;使用select_related和prefetch_related: select_related用于优化一…...

【JavaScript】输入输出语法

目录 一、输出语法 二、输入语法 一、输出语法 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>D…...

多模态基础--- word Embedding

1 word Embedding 原始的单词编码方式&#xff1a; one-hot&#xff0c;维度太大&#xff0c;不同单词之间相互独立&#xff0c;没有远近关系区分。 wordclass&#xff0c;将同一类单词编码在一起&#xff0c;此时丢失了类别和类别间的相关信息&#xff0c;比如class1和class3…...

Mysql 日志

0 引言 MySQL日志主要分为4类&#xff0c;使用这些日志文件&#xff0c;可以查看MySQL内部发生的事情。这4类日志分别是&#xff1a; ● 错误日志&#xff1a;记录MySQL服务的启动、运行或停止MySQL服务时出现的问题。 ● 查询日志&#xff1a;记录建立的客户端连接和执行的…...

【开源】SpringBoot框架开发服装店库存管理系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 数据中心模块2.2 角色管理模块2.3 服装档案模块2.4 服装入库模块2.5 服装出库模块 三、系统设计3.1 用例设计3.2 数据库设计3.2.1 角色表3.2.2 服装档案表3.2.3 服装入库表3.2.4 服装出库表 四、系统展示五、核心代码5.…...

云原生之容器编排实践-在K8S集群中使用Registry2搭建私有镜像仓库

背景 基于前面搭建的3节点 Kubernetes 集群&#xff0c;今天我们使用 Registry2 搭建私有镜像仓库&#xff0c;这在镜像安全性以及离线环境下运维等方面具有重要意义。 Note: 由于是测试环境&#xff0c;以下创建了一个 local-storage 的 StorageClass &#xff0c;并使用本地…...

标准IO 2月4日学习笔记

IO输入输出&#xff0c;操作对象是文件 Linux文件类型: b block 块设备文件 按块扫描设备信息的文件 存储设备 c character 字符设备文件 按字符扫描设备信息的文件 d direct…...

模型参数、模型存储精度、参数与显存

模型参数量衡量单位 M&#xff1a;百万&#xff08;Million&#xff09; B&#xff1a;十亿&#xff08;Billion&#xff09; 1 B 1000 M 1B 1000M 1B1000M 参数存储精度 模型参数是固定的&#xff0c;但是一个参数所表示多少字节不一定&#xff0c;需要看这个参数以什么…...

渗透实战PortSwigger靶场-XSS Lab 14:大多数标签和属性被阻止

<script>标签被拦截 我们需要把全部可用的 tag 和 event 进行暴力破解 XSS cheat sheet&#xff1a; https://portswigger.net/web-security/cross-site-scripting/cheat-sheet 通过爆破发现body可以用 再把全部 events 放进去爆破 这些 event 全部可用 <body onres…...

初探Service服务发现机制

1.Service简介 Service是将运行在一组Pod上的应用程序发布为网络服务的抽象方法。 主要功能&#xff1a;服务发现和负载均衡。 Service类型的包括ClusterIP类型、NodePort类型、LoadBalancer类型、ExternalName类型 2.Endpoints简介 Endpoints是一种Kubernetes资源&#xf…...

Linux部署私有文件管理系统MinIO

最近需要用到一个文件管理服务&#xff0c;但是又不想花钱&#xff0c;所以就想着自己搭建一个&#xff0c;刚好我们用的一个开源框架已经集成了MinIO&#xff0c;所以就选了这个 我这边对文件服务性能要求不是太高&#xff0c;单机版就可以 安装非常简单&#xff0c;几个命令就…...

在 Visual Studio Code 中使用驭码 CodeRider 提升开发效率:以冒泡排序为例

目录 前言1 插件安装与配置1.1 安装驭码 CodeRider1.2 初始配置建议 2 示例代码&#xff1a;冒泡排序3 驭码 CodeRider 功能详解3.1 功能概览3.2 代码解释功能3.3 自动注释生成3.4 逻辑修改功能3.5 单元测试自动生成3.6 代码优化建议 4 驭码的实际应用建议5 常见问题与解决建议…...

Spring Boot + MyBatis 集成支付宝支付流程

Spring Boot MyBatis 集成支付宝支付流程 核心流程 商户系统生成订单调用支付宝创建预支付订单用户跳转支付宝完成支付支付宝异步通知支付结果商户处理支付结果更新订单状态支付宝同步跳转回商户页面 代码实现示例&#xff08;电脑网站支付&#xff09; 1. 添加依赖 <!…...

Electron简介(附电子书学习资料)

一、什么是Electron&#xff1f; Electron 是一个由 GitHub 开发的 开源框架&#xff0c;允许开发者使用 Web技术&#xff08;HTML、CSS、JavaScript&#xff09; 构建跨平台的桌面应用程序&#xff08;Windows、macOS、Linux&#xff09;。它将 Chromium浏览器内核 和 Node.j…...

claude3.7高阶玩法,生成系统架构图,国内直接使用

文章目录 零、前言一、操作指南操作指导 二、提示词模板三、实战图书管理系统通过4o模型生成系统描述通过claude3.7生成系统架构图svg代码转换成图片 在线考试系统通过4o模型生成系统描述通过claude3.7生成系统架构图svg代码转换成图片 四、感受 零、前言 现在很多AI大模型可以…...

Redis专题-实战篇一-基于Session和Redis实现登录业务

GitHub项目地址&#xff1a;https://github.com/whltaoin/redisLearningProject_hm-dianping 基于Session实现登录业务功能提交版本码&#xff1a;e34399f 基于Redis实现登录业务提交版本码&#xff1a;60bf740 一、导入黑马点评后端项目 项目架构图 1. 前期阶段2. 后续阶段导…...

Java高级 |【实验八】springboot 使用Websocket

隶属文章&#xff1a;Java高级 | &#xff08;二十二&#xff09;Java常用类库-CSDN博客 系列文章&#xff1a;Java高级 | 【实验一】Springboot安装及测试 |最新-CSDN博客 Java高级 | 【实验二】Springboot 控制器类相关注解知识-CSDN博客 Java高级 | 【实验三】Springboot 静…...