当前位置: 首页 > news >正文

从源码的角度告诉你 spark是怎样完成对文件切片

目录

1.说明

2.怎样设置默认切片数

2.1 RDD默认切片设置

2.2 SparkSQL默认切片设置

3. makeRDD 切片原理

4. textFile 切片原理

4.1 切片规则

4.2 怎样设置切片大小

4.3 测试代码

 5.hadoopFile 切片原理

5.1 说明

5.2 切片规则

5.3 怎样设置切片大小

5.4 代码测试

5.5 minPartitions 在 CombineTextInputFormat 中的作用?

5.6 重点关注


1.说明

在spark中为我们提供了用来读取数据的方法
    比如 makeRDD、parallelize、textFile、hadoopFile等方法
    
这些方法按照数据源可以分为两类 文件系统、Driver内存中的集合数据
当我们使用指定的方法读取数据后,会按照指定的切片个数对文件进行切片


2.怎样设置默认切片数

在我们在使用RDD的算子时,经常会遇到可以显式的指定切片个数,或者隐式的使用默认切片个数,下面会告诉我们,怎样设置默认切片个数

2.1 RDD默认切片设置

1.驱动程序中设置
val sparkconf: SparkConf = new SparkConf().setAppName("测试默认切片数").set("spark.default.parallelism","1000").setMaster("local[100]")2.spark-shell或spark-submit 设置
spark-shell \
--master yarn \
--name "spark-shell-tmp" \
--conf spark.default.parallelism=1000 \
--driver-memory 40G \
--executor-memory 40G \
--num-executors 40 \
--executor-cores 6 \3.不指定 spark.default.parallelism 参数时,将使用默认值local模式:local[100] :  100local      :  客户端机器核数集群模式(yarn):2 或者 核数总和

源码:

查看默认切片数: 

// 获取默认切片数
val parallelism = sc.defaultParallelism

2.2 SparkSQL默认切片设置

-- 设置默认切片数
set spark.sql.shuffle.partitions=1000;默认值:当不设置时,默认为200注意:spark.default.parallelism 只有在处理RDD时才会起作用,对SparkSQL的无效spark.sql.shuffle.partitions 则是对sparks SQL专用的设置

3. makeRDD 切片原理

可用通过 makeRDD算子 将Driver中序列集合中数据转换成RDD,在转换的过程中,会根据指定的切片个数集合索引对集合切片

切片规则:

        根据集合长度切片数将集合切分成若干子集合(和集合元素内容无关)

示例代码:

  test("makeRDD - 切片逻辑") {// 初始化 spark配置实例val sparkconf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("")// 初始化 spark环境对象val sc: SparkContext = new SparkContext(sparkconf)val rdd: RDD[(String, String)] = sc.makeRDD(List(("张飞1", "张飞java scala spark"), ("张飞2", "张飞java scala spark"), ("刘备3", "刘备java spark"), ("刘备4", "刘备java scala spark"), ("刘备5", "刘备scala spark"), ("关羽6", "关羽java scala spark"), ("关羽7", "关羽java scala"), ("关羽8", "关羽java scala spark"), ("关羽9", "关羽java spark")))// 查看每个分区的内容rdd.mapPartitionsWithIndex((i, iter) => {println(s"分区编号$i :${iter.mkString(" ")}");iter}).collect()rdd.getNumPartitionssc.stop()}

结果:

源码阅读:

1. 通过SparkContext创建rdd
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}2. ParallelCollectionRDD类中的 getPartitions方法
override def getPartitions: Array[Partition] = {val slices = ParallelCollectionRDD.slice(data, numSlices).toArrayslices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
}3. ParallelCollectionRDD对象的slice方法(核心切片逻辑)def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {// 对切片数做合法性校验if (numSlices < 1) {throw new IllegalArgumentException("Positive number of partitions required")}// TODO 通过 集合长度和切片数 获取每个切片的位置信息// 从这可以得出 对集合的切片只和 集合索引和切片数相关,和集合内容无关// 将 集合索引按照切片数 切分成若干元素def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {(0 until numSlices).iterator.map { i =>val start = ((i * length) / numSlices).toIntval end = (((i + 1) * length) / numSlices).toInt(start, end)}}// 对集合类型做判断seq match {case r: Range =>positions(r.length, numSlices).zipWithIndex.map { case ((start, end), index) =>// If the range is inclusive, use inclusive range for the last sliceif (r.isInclusive && index == numSlices - 1) {new Range.Inclusive(r.start + start * r.step, r.end, r.step)} else {new Range.Inclusive(r.start + start * r.step, r.start + (end - 1) * r.step, r.step)}}.toSeq.asInstanceOf[Seq[Seq[T]]]case nr: NumericRange[T] =>// For ranges of Long, Double, BigInteger, etcval slices = new ArrayBuffer[Seq[T]](numSlices)var r = nrfor ((start, end) <- positions(nr.length, numSlices)) {val sliceSize = end - startslices += r.take(sliceSize).asInstanceOf[Seq[T]]r = r.drop(sliceSize)}slices.toSeqcase _ =>val array = seq.toArray // To prevent O(n^2) operations for List etcpositions(array.length, numSlices).map { case (start, end) =>array.slice(start, end).toSeq}.toSeq}
}

4. textFile 切片原理

textFile使用的MapReduce框架中TextInputFormat类完成对文件切片和读取切片中数据

4.1 切片规则

1.对job输入路径中的每个文件单独切片
2.判断每个文件是否支持切片
         true : 按照指定切片大小对文件切片
         false: 文件整体作为一个切片 

4.2 怎样设置切片大小

// 切片大小计算规则splitSize = Math.max(minSize, Math.min(goalSize, blockSize))// 参数说明1.minSizeset mapreduce.input.fileinputformat.split.minsize=256000000 或 set mapred.min.split.size=256000000默认值 minSize=1L2.goalSizegoalSize=所有文件大小总和/指定的切片个数3.blockSize本地目录32M|HDFS目录128M或256M(看hdfs文件块具体配置)// 需求 1.真实切片大小 < blockSizegoalSize=所有文件大小总和/指定的切片个数 < blockSize 即(创建rdd时调大切片个数)2.真实切片大小 > blockSizeset mapreduce.input.fileinputformat.split.minSize=大于blockSize值

4.3 测试代码

  test("textFile - 切片逻辑") {// 初始化 spark配置实例val sf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("Test textFile")// 初始化 spark环境对象val sc: SparkContext = new SparkContext(sf)sc.hadoopConfiguration.setInt("mapred.min.split.size", 469000000)// sc.hadoopConfiguration.setInt("mapreduce.input.fileinputformat.split.minsize", 256000000)// 读取目录下的所有文件val rdd: RDD[String] = sc.textFile("src/main/resources/data/dir/dir3/LOL.map", 1000)// 打印分区个数println("切片个数:"+rdd.getNumPartitions)sc.stop()}

执行结果:


 5.hadoopFile 切片原理

5.1 说明

def hadoopFile[K, V](path: String,inputFormatClass: Class[_ <: InputFormat[K, V]],keyClass: Class[K],valueClass: Class[V],minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {assertNotStopped()功能:读取HDFS文件或本地文件来创建RDD(使用MapReduce框架中InputFormat类)参数:path: 指定job的输入路径inputFormatClass: 对输入文件切片和读取的实现类keyClass: key的数据类型valueClass: value的数据类型minPartitions: 最小切片数

5.2 切片规则

根据指定的切片大小进行切片,允许将多个文件合并成换一个切片对象

5.3 怎样设置切片大小

指定切片大小(默认值Long.MaxValue)
set mapred.max.split.size=切片大小 或
set mapreduce.input.fileinputformat.split.maxsize=切片大小

5.4 代码测试

  test("spark中使用 CombineTextInputFormat") {// 初始化 spark配置实例val sf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("")// 初始化 spark环境对象val sc: SparkContext = new SparkContext(sf)// 读取目录下的所有文件val input = "src/main/resources/data/dir/dir3"val combineRDD: RDD[(LongWritable, Text)] = sc.hadoopFile[LongWritable, Text, org.apache.hadoop.mapred.lib.CombineTextInputFormat](input, 10000)//    val combineRDD: RDD[(LongWritable, Text)] = sc.hadoopFile[LongWritable, Text//      , org.apache.hadoop.mapred.TextInputFormat](input, 10000)sc.hadoopConfiguration.setInt("mapred.max.split.size", 128000000)//sc.hadoopConfiguration.setInt("mapreduce.input.fileinputformat.split.maxsize", 128000000)println("切片个数:" + combineRDD.getNumPartitions)//combineRDD.map(_._2.toString).foreach(println(_))//combineRDD.collect()//combineRDD.hadsc.stop()}

执行结果:

5.5 minPartitions 在 CombineTextInputFormat 中的作用?

CombineTextInputFormat切片逻辑和 最小切片数(minPartitions)  无关

查看 org.apache.hadoop.mapred.lib.CombineTextInputFormat类 getSplits方法
TODO: numSplits指定的切片个数,并没有使用public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {List<org.apache.hadoop.mapreduce.InputSplit> newStyleSplits =super.getSplits(Job.getInstance(job));InputSplit[] ret = new InputSplit[newStyleSplits.size()];for(int pos = 0; pos < newStyleSplits.size(); ++pos) {org.apache.hadoop.mapreduce.lib.input.CombineFileSplit newStyleSplit = (org.apache.hadoop.mapreduce.lib.input.CombineFileSplit) newStyleSplits.get(pos);ret[pos] = new CombineFileSplit(job, newStyleSplit.getPaths(),newStyleSplit.getStartOffsets(), newStyleSplit.getLengths(),newStyleSplit.getLocations());}return ret;}

5.6 重点关注

对计算任务而言,合并小文件是一把双刃剑,合并小文件后 就舍弃了数据本地化,则加了网络IO的开销,需要根据实际情况合理的选择 切片策略

CombineTextInputFormat源码参考:​​​​​​​https://blog.csdn.net/wawmg/article/details/17095125

相关文章:

从源码的角度告诉你 spark是怎样完成对文件切片

目录 1.说明 2.怎样设置默认切片数 2.1 RDD默认切片设置 2.2 SparkSQL默认切片设置 3. makeRDD 切片原理 4. textFile 切片原理 4.1 切片规则 4.2 怎样设置切片大小 4.3 测试代码 5.hadoopFile 切片原理 5.1 说明 5.2 切片规则 5.3 怎样设置切片大小 5.4 代码测试…...

剑指 Offer II 019. 最多删除一个字符得到回文

题目链接 剑指 Offer II 019. 最多删除一个字符得到回文 easy 题目描述 给定一个非空字符串 s&#xff0c;请判断如果 最多 从字符串中删除一个字符能否得到一个回文字符串。 示例 1: 输入: s “aba” 输出: true 示例 2: 输入: s “abca” 输出: true 解释: 可以删除 “c”…...

RK3568驱动OV13850摄像头模组调试过程

摄像头介绍品牌&#xff1a;Omnivision型号&#xff1a;CMK-OV13850接口&#xff1a;MIPI像素&#xff1a;1320WOV13850彩色图像传感器是一款低电压、高性能1/3.06英寸1320万像素CMOS图像传感器&#xff0c;使用OmniBSI?技术提供了单-1320万像素&#xff08;42243136)摄像头的…...

Go项目的目录结构基本布局

前言 随着项目的代码量在不断地增长&#xff0c;不同的开发人员按自己意愿随意布局和创建目录结构&#xff0c;项目维护性就很差&#xff0c;代码也非常凌乱。良好的目录与文件结构十分重要&#xff0c;尤其是团队合作的时候&#xff0c;良好的目录与文件结构可以减少很多不必要…...

CHAPTER 1 Linux Filesystem Management

Linux Filesystem Management1 文件系统是什么2 文件系统的组成3 inode详解1. inode到底是什么2. inode的内容3. inode的大小4. inode的号码5. 硬链接6. 软链接4 存储区域5 常见文件系统的类型1. 根文件系统2. 虚拟文件系统3. 真文件系统4. 伪文件系统5. 网络文件系统1 文件系统…...

RocketMQ架构篇 - 读写队列与生产者如何选择队列

读、写队列 创建主题时&#xff0c;可以指定 writeQueueNums&#xff08;写队列的个数&#xff09;、readQueueNums&#xff08;读队列的个数&#xff09;。生产者发送消息时&#xff0c;使用写队列的个数返回路由信息&#xff1b;消费者消费消息时&#xff0c;使用读队列的个…...

华为OD机试真题Python实现【通信误码】真题+解题思路+代码(20222023)

通信误码 题目 信号传播过程中会出现一些误码,不同的数字表示不同的误码 ID,取值范围为 1~65535,用一个数组记录误码出现的情况,每个误码出现的次数代表误码频度,请找出记录中包含频度最高误码的最小子数组长度。 🔥🔥🔥🔥🔥👉👉👉👉👉👉 华为OD…...

【单目3D目标检测】MonoDDE论文精读与代码解析

文章目录PrefacePros and ConsAbstractContributionsPreliminaryDirect depth estimationDepth from heightPespective-n-point&#xff08;PnP&#xff09;PipelineDiverse Depth EstimationsRobust Depth CombinationOutput distributionSelecting and combining reliable de…...

复习 Kotlin 从小白到大牛 第二版 笔记要点

4.2.2 常量和只读变量 常量和只读变量一旦初始化就不能再被修改。在kotlin中&#xff0c;声明常量是在标识符的前面加上val或const val 关键字。 1. val 声明的是运行时变量&#xff0c;在运行时进行初始化 2.const val 声明的是编译时常量&#xff0c;在编译时初始化 val …...

X264简介-Android使用(二)

X264简介-Android使用&#xff08;二&#xff09; 4、Ubuntu上安装ffmpeg&#xff1a; 检查更新本地软件包&#xff08;如果未更新&#xff0c;reboot Vmware&#xff09;&#xff1a; sudo apt update sudo apt upgrade官网下载的source文件安装&#xff1a; http://ffmpe…...

【独家】华为OD机试 - 统计差异值大于相似值二元组个数(C 语言解题)

最近更新的博客 华为od 2023 | 什么是华为od,od 薪资待遇,od机试题清单华为OD机试真题大全,用 Python 解华为机试题 | 机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南)华为od机试,独家整理 已参加机试人员的实战技巧文章目录 最近更新的博客使用说明本期…...

掌握好Framework 才是王道~

现在面试对Android开发者的要求越来越高了&#xff01;从最开始的阿里、头条、腾讯等大厂&#xff0c;到现在的互联网车企&#xff0c;面试总喜欢问道 Framework底层原理的相关问题 Android Framework的三大核心功能&#xff1a; 1、View.java:View工作原理&#xff0c;实现包…...

Codeforces Round 856 (Div. 2) A — C

Codeforces Round 856 (Div. 2) 文章目录A. Prefix and Suffix Array题目大意题目分析codeB. Not Dividing题目大意题目分析codeC. Scoring Subsequences题目大意题目分析codeA. Prefix and Suffix Array 题目大意 给出一个字符串所有的非空前后缀&#xff0c;判断原字符串是…...

2022年MathorCup数学建模B题无人仓的搬运机器人调度问题解题全过程文档加程序

2022年第十二届MathorCup高校数学建模 B题 无人仓的搬运机器人调度问题 原题再现 本题考虑在无人仓内的仓库管理问题之一&#xff0c;搬运机器人 AGV 的调度问题。更多的背景介绍请参看附件-背景介绍。对于无人仓来说&#xff0c;仓库的地图模型可以简化为图的数据结构。 仓库…...

开源项目的演进会遇到哪些“坑”?KubeVela 从发起到晋级 CNCF 孵化的全程回顾

作者&#xff1a;孙健波、曾庆国 点击查看&#xff1a;「开源人说」第五期《KubeVela&#xff1a;一场向应用交付标准的冲锋》 2023 年 2 月&#xff0c;**KubeVela [ 1] ** 经过全体 ToC 投票成功进入 CNCF Incubation&#xff0c;是云原生领域首个晋级孵化的面向应用的交付…...

MSDP实验配置

目录 配置MSDP 配置PIM SM协议 配置各PIM SM域内的静态RP 配置MSDP对等体 配置域内的MSDP对等体 AR8和AR9建立EBGP邻居 配置域间的MSDP对等体 进行实验验证 什么是MSDP MSDP&#xff08;Multicast Source Discovery Protocol&#xff09;组播源发现协议的简称 用来传递…...

惊!初中生也来卷了……

大家好&#xff0c;我是良许。 前两天在抖音直播的时候&#xff0c;突然来了一位不速之客…… 他自称是初中生&#xff0c;一开始我还有点不太相信&#xff0c;直到跟他连麦&#xff0c;听到他还略带一些稚嫩的声音&#xff0c;我才知道&#xff0c;他没有骗我…… 他说他想学…...

kafka相关配置介绍

kafka默认配置 每个kafka broker中配置文件server.properties默认必须配置的属性如下&#xff1a; broker.id0 num.network.threads2 num.io.threads8 socket.send.buffer.bytes1048576 socket.receive.buffer.bytes1048576 socket.request.max.bytes104857600 log.dirs/tmp/…...

【PyTorch】教程:torch.nn.Hardtanh

torch.nn.Hardtanh 原型 CLASS torch.nn.Hardtanh(min_val- 1.0, max_val1.0, inplaceFalse, min_valueNone, max_valueNone) 参数 min_val ([float]) – 线性区域的最小值&#xff0c;默认为 -1max_val ([float]) – 线性区域的最大值&#xff0c;默认为 1inplace ([bool]) …...

神垕古镇景区5A级十年都没有实现的三大主因

钧 瓷 内 参 第40期&#xff08;总第371期&#xff09; 2023年3月5日 神垕古镇景区5A级十年都没有实现的三大主因 这是2013年&#xff0c;禹州市市政府第一次提出创建5A级景区到今年三月份整整十年啊&#xff01; 目前神垕古镇景区是4A级景区&#xff0c;5A级一直进行中&a…...

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造&#xff0c;完美适配AGV和无人叉车。同时&#xff0c;集成以太网与语音合成技术&#xff0c;为各类高级系统&#xff08;如MES、调度系统、库位管理、立库等&#xff09;提供高效便捷的语音交互体验。 L…...

接口测试中缓存处理策略

在接口测试中&#xff0c;缓存处理策略是一个关键环节&#xff0c;直接影响测试结果的准确性和可靠性。合理的缓存处理策略能够确保测试环境的一致性&#xff0c;避免因缓存数据导致的测试偏差。以下是接口测试中常见的缓存处理策略及其详细说明&#xff1a; 一、缓存处理的核…...

Docker 离线安装指南

参考文章 1、确认操作系统类型及内核版本 Docker依赖于Linux内核的一些特性&#xff0c;不同版本的Docker对内核版本有不同要求。例如&#xff0c;Docker 17.06及之后的版本通常需要Linux内核3.10及以上版本&#xff0c;Docker17.09及更高版本对应Linux内核4.9.x及更高版本。…...

理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端

&#x1f31f; 什么是 MCP&#xff1f; 模型控制协议 (MCP) 是一种创新的协议&#xff0c;旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议&#xff0c;它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...

连锁超市冷库节能解决方案:如何实现超市降本增效

在连锁超市冷库运营中&#xff0c;高能耗、设备损耗快、人工管理低效等问题长期困扰企业。御控冷库节能解决方案通过智能控制化霜、按需化霜、实时监控、故障诊断、自动预警、远程控制开关六大核心技术&#xff0c;实现年省电费15%-60%&#xff0c;且不改动原有装备、安装快捷、…...

【JavaSE】绘图与事件入门学习笔记

-Java绘图坐标体系 坐标体系-介绍 坐标原点位于左上角&#xff0c;以像素为单位。 在Java坐标系中,第一个是x坐标,表示当前位置为水平方向&#xff0c;距离坐标原点x个像素;第二个是y坐标&#xff0c;表示当前位置为垂直方向&#xff0c;距离坐标原点y个像素。 坐标体系-像素 …...

适应性Java用于现代 API:REST、GraphQL 和事件驱动

在快速发展的软件开发领域&#xff0c;REST、GraphQL 和事件驱动架构等新的 API 标准对于构建可扩展、高效的系统至关重要。Java 在现代 API 方面以其在企业应用中的稳定性而闻名&#xff0c;不断适应这些现代范式的需求。随着不断发展的生态系统&#xff0c;Java 在现代 API 方…...

深入浅出WebGL:在浏览器中解锁3D世界的魔法钥匙

WebGL&#xff1a;在浏览器中解锁3D世界的魔法钥匙 引言&#xff1a;网页的边界正在消失 在数字化浪潮的推动下&#xff0c;网页早已不再是静态信息的展示窗口。如今&#xff0c;我们可以在浏览器中体验逼真的3D游戏、交互式数据可视化、虚拟实验室&#xff0c;甚至沉浸式的V…...

在Spring Boot中集成RabbitMQ的完整指南

前言 在现代微服务架构中&#xff0c;消息队列&#xff08;Message Queue&#xff09;是实现异步通信、解耦系统组件的重要工具。RabbitMQ 是一个流行的消息中间件&#xff0c;支持多种消息协议&#xff0c;具有高可靠性和可扩展性。 本博客将详细介绍如何在 Spring Boot 项目…...

【Java】Ajax 技术详解

文章目录 1. Filter 过滤器1.1 Filter 概述1.2 Filter 快速入门开发步骤:1.3 Filter 执行流程1.4 Filter 拦截路径配置1.5 过滤器链2. Listener 监听器2.1 Listener 概述2.2 ServletContextListener3. Ajax 技术3.1 Ajax 概述3.2 Ajax 快速入门服务端实现:客户端实现:4. Axi…...