【大数据学习 | Spark-Core】Spark的分区器(HashPartitioner和RangePartitioner)
之前学过的kv类型上面的算子
groupby
groupByKey
reduceBykey
sortBy
sortByKey
join[cogroup left inner right]
shuffle的
mapValues
keys
values
flatMapValues
普通算子,管道形式的算子
shuffle的过程是因为数据产生了打乱重分,分组、排序、join等算子需要将数据重新排版。
shuffle的过程是上游的数据处理完毕写出到自己的磁盘上,然后下游的数据从磁盘上面拉取。
重新排版打乱重分是需要存在规则的。
中间数据的流向规则叫做分区器 partitioner,这个分区器一般是存在于shuffle类算子中的,我们可以这么说,shuffle类算子一定会带有分区器,分区器也可以单独存在,人为定义分发规则。
groupBy groupBykey reduceBykey
自带的分区器HashPartitioner。
sortby sortBykey
rangePartitioner
hashPartitioner
规则 按照key的hashCode %下游分区 = 分区编号
处理key-value类型数据,如果key为0,就分配去0号分区。否则调用nonNegativeMod函数。
保证取余的结果为正向结果。
hash取余的方式,不管数据分发到下游的什么分区中,最终结果都是相同的数据放入到一起。
演示结果:
scala> val arr = Array(1,2,3,4,5,6,7,8,9)
arr: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)scala> sc.makeRDD(arr,3)
res78: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[73] at makeRDD at <console>:27scala> res78.mapPartitionsWithIndex((index,it)=> it.map((index,_)))
res79: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[74] at mapPartitionsWithIndex at <console>:26scala> res79.collect
res80: Array[(Int, Int)] = Array((0,1), (0,2), (0,3), (1,4), (1,5), (1,6), (2,7), (2,8), (2,9))scala> res78.map(t=>(t,t))
res81: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[75] at map at <console>:26scala> res78.partitioner
res82: Option[org.apache.spark.Partitioner] = Nonescala> res81.reduceByKey(_+_)
res84: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[76] at reduceByKey at <console>:26scala> res84.partitioner
res85: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@3)scala> res84.mapPartitionsWithIndex((index,it)=> it.map((index,_)))
res88: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[77] at mapPartitionsWithIndex at <console>:26scala> res88.collect
res89: Array[(Int, (Int, Int))] = Array((0,(6,6)), (0,(3,3)), (0,(9,9)), (1,(4,4)), (1,(1,1)), (1,(7,7)), (2,(8,8)), (2,(5,5)), (2,(2,2)))
演示的逻辑,就是按照key.hashcode进行分区,int类型的hashcode值是自己的本身。
并且hash分区器的规则致使我们可以任意的修改下游的分区数量。
scala> res81.reduceByKey(_+_,100)
res91: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[78] at reduceByKey at <console>:26scala> res91.partitions.size
res92: Int = 100scala> res81.reduceByKey(_+_,2)
res93: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[79] at reduceByKey at <console>:26scala> res93.partitions.size
res94: Int = 2
rangePartitioner
hashPartitioner规则非常简单,直接规定来一个数据按照hashcode规则的分配,规则比较简答,但是会出现数据倾斜。
range分区规则中存在两个方法。
rangeBounds界限,在使用这个分区器之前先做一个界限划定。
首先使用水塘抽样算法,在未知的数据集中抽取能够代表整个数据集的样本,根据样本进行规则设定。
然后在使用getPartitions。
首先存在水塘抽样,规定数据的流向以后再执行整体逻辑,会先触发计算。
sortBykey是转换类的算子,不会触发计算。
但是我们发现它触发计算了,因为首先在计算之前先进行水塘抽样,能够规定下游的数据规则,然后再进行数据的计算。
scala> arr
res101: Array[Int] = Array(1, 9, 2, 8, 3, 7, 4, 6, 5)scala> arr.map(t=> (t,t))
res102: Array[(Int, Int)] = Array((1,1), (9,9), (2,2), (8,8), (3,3), (7,7), (4,4), (6,6), (5,5))scala> sc.makeRDD(res102)
res104: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[94] at makeRDD at <console>:27scala> res104.sortByKey()
res105: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[97] at sortByKey at <console>:26scala> res105.partitioner
res106: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.RangePartitioner@fe1f9dea)scala> res104.sortByKey(true,3)
res107: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[100] at sortByKey at <console>:26scala> res107.mapPartitionsWithIndex((index,it)=> it.map((index,_)))
res109: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[101] at mapPartitionsWithIndex at <console>:26scala> res109.collect
res110: Array[(Int, (Int, Int))] = Array((0,(1,1)), (0,(2,2)), (0,(3,3)), (1,(4,4)), (1,(5,5)), (1,(6,6)), (2,(7,7)), (2,(8,8)), (2,(9,9)))
range分区器,它是先做抽样然后指定下游分区的数据界限。
它可以修改分区数量,但是分区数量不能大于元素个数,必须要保证每个分区中都有元素。
scala> res104.sortByKey(true,3)
res111: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[104] at sortByKey at <console>:26scala> res104.sortByKey(true,300)
res112: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[107] at sortByKey at <console>:26scala> res111.partitions.size
res114: Int = 3scala> res112.part
自定义分区器
工作的过程中我们会遇见数据分类的情况,想要根据自己的需求定义分区的规则,让符合规则的数据发送到不同的分区中,这个时候我们就需要自定义分区器了。
定义分区器,让数据发送到不同的分区,从而不同的task任务输出的文件结果内容也不同
# 自己创建数据data/a.txt
hello tom hello jack
hello tom hello jack
hello tom hello jack
hello tom hello jack
hello tom hello jack
# 需求就是将数据按照规则进行分发到不同的分区中
# 存储的时候一个文件存储hello其他的文件存储tom jack
分区器的定义需要实现分区器的接口
class MyPartitioner extends Partitioner{override def numPartitions: Int = ???
// 设定下游存在几个分区override def getPartition(key: Any): Int = ???
// 按照key设定分区的位置
}
整体代码:
package com.hainiu.sparkimport org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}object Test1 {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("parse")conf.setMaster("local[*]")val sc = new SparkContext(conf)val rdd = sc.textFile("data/a.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)val rdd1 = rdd.partitionBy(new MyPartitioner)val fs = FileSystem.get(new Configuration())val out = "data/res"if(fs.exists(new Path(out)))fs.delete(new Path(out),true)rdd1.saveAsTextFile(out)}
}
class MyPartitioner extends Partitioner{override def numPartitions: Int = 2override def getPartition(key: Any): Int = {if(key.toString.equals("hello"))0else1}
}
相关文章:

【大数据学习 | Spark-Core】Spark的分区器(HashPartitioner和RangePartitioner)
之前学过的kv类型上面的算子 groupby groupByKey reduceBykey sortBy sortByKey join[cogroup left inner right] shuffle的 mapValues keys values flatMapValues 普通算子,管道形式的算子 shuffle的过程是因为数据产生了打乱重分,分组、排序、join等…...

CSS3_BFC(十二)
BFC MDN对BFC的解释:块格式化上下文(Block Formating Context, BFC)是web页面的可视CSS渲染的一部分,是块盒子的布局过程发生的区域,也是浮动元素与其他元素交互的区域。 1、开启BFC flow-root对内容的影响是最低的&am…...

C0032.在Clion中使用MSVC编译器编译opencv的配置方法
使用MSVC编译器编译opencv的配置方法...

微信小程序中会议列表页面的前后端实现
题外话:想通过集成腾讯IM来解决即时聊天的问题,如果含语音视频,腾讯组件一年5万起步,贵了!后面我们改为自己实现这个功能,这里只是个总结而已。 图文会诊需求 首先是个图文列表界面 同个界面可以查看具体…...

WEB攻防-通用漏洞文件上传二次渲染.htaccess变异免杀
知识点: 1、文件上传-二次渲染 2、文件上传-简单免杀变异 3、文件上传-.htaccess妙用 4、文件上传-PHP语言特性 1、上传后门时,文件内容带.就不行 这时可以上传一个转换后的ip地址,ip地址对应网站包含后门代码 转换后的int会在访问的时候…...

vue实现列表滑动下拉加载数据
一、实现效果 二、实现思路 使用滚动事件监听器来检测用户是否滚动到底部,然后加载更多数据 监听滚动事件。检测用户是否滚动到底部。加载更多数据。 三、案例代码 <div class"drawer-content"><div ref"loadMoreTrigger" class&q…...

全面解析:HTML页面的加载全过程(四)--浏览器渲染之样式计算
主线程遍历得到的 DOM 树,依次为树中的每个节点计算出它最终的样式,称之为 Computed Style。 通过前面生成的DOM 树和 CSSOM 树,遍历 DOM 树,为每一个 DOM 节点,计算它的所有 CSS 属性,最后会得到一棵带有…...

#Verilog HDL# 谈谈代码中如何跨层次引用
目录 一 先谈作用问题 二 再谈跨层次问题 2.1 向下引用 2.2 向上引用 一 先谈作用问题 大多数编程语言都有一个称为作用域(scope)的特征,它定义了代码的某些部分对于变量和方法的可见性。作用域定义了一个命名空间,以避免同一命名空间内不同对象名称之间的冲突。 V…...

LeetCode 每日一题 2024/11/18-2024/11/24
记录了初步解题思路 以及本地实现代码;并不一定为最优 也希望大家能一起探讨 一起进步 目录 11/18 661. 图片平滑器11/19 3243. 新增道路查询后的最短距离 I11/20 3244. 新增道路查询后的最短距离 II11/21 3248. 矩阵中的蛇11/22 3233. 统计不是特殊数字的数字数量1…...

客户流失分析综述
引言 客户流失这个术语通常用来描述在特定时间或合同期内停止与公司进行业务往来的客户倾向性[1]。传统上,关于客户流失的研究始于客户关系管理(CRM)[2]。在运营服务时,防止客户流失至关重要。过去,客户获取相对于流失…...

基于51单片机的红包抽奖proteus仿真
地址: https://pan.baidu.com/s/1nYZlLb64kdZAWSydT_uHfA 提取码:1234 仿真图: 芯片/模块的特点: AT89C52/AT89C51简介: AT89C52/AT89C51是一款经典的8位单片机,是意法半导体(STMicroelectro…...

cangjie (仓颉) vscode环境搭建
sdk下载 下载中心-仓颉编程语言官网 可选择半年更新版,不用申请。目前版本:0.53.13 ,选择不同平台压缩包下载解压到任意位置即可 补充下载,vscode插件解压后,在vscode扩展中选择从vsix安装,安装后新增名为…...

阿里云私服地址
1.解压apache-maven-3.6.1-bin 2.配置本地仓库:修改conf/dettings.xml中的<localReoisitory>为一个指定目录。56行 <localRepository>D:\apache-maven-3.6.1-bin\apache-maven-3.6.1\mvn_repo</localRepository> 3.配置阿里云私服:…...

HTMLCSS:3D金字塔加载动画
效果演示 这段代码通过CSS3的3D变换和动画功能,创建了一个旋转的金字塔加载动画,每个侧面都有不同的颜色渐变,底部还有一个模糊的阴影效果,增加了视觉的立体感。 HTML <div class"pyramid-loader"><div cl…...

shell编程(2)(3)
目录 一、永久环境变量 按用户设置永久环境变量 文件路径: 示例步骤: 删除永久环境变量 二、脚本程序传递参数怎么实现 三、用编程进行数学运算 shell中利用expr进行运算 运算与变量结合 1. 变量赋值和基本运算 2. 使用expr进行运算 3. 变量…...

DFT专家分析scan insertion时使用EDT的策略
作为一名芯片设计DFT工程师专家,在做scan insertion(扫描插入)时使用EDT(Embedded Deterministic Test,嵌入式确定性测试)的参数配置策略,需要综合考虑多个方面的因素,以确保测试的高…...

Apple Vision Pro开发003-PolySpatial2.0新建项目
unity6.0下载链接:Unity 实时开发平台 | 3D、2D、VR 和 AR 引擎 一、新建项目 二、导入开发包 com.unity.polyspatial.visionos 输入版本号 2.0.4 com.unity.polyspatial(单独导入),或者直接安装 三、对应设置 其他的操作与之前的版本相同…...

分公司如何纳税
分公司不进行纳税由总公司汇总纳税“子公司具有法人资格,依法独立承担民事责任;分公司不具有法人资格,其民事责任由公司承担。”企业设立分支机构,使其不具有法人资格,且不实行独立核算,则可由总公司汇总缴纳企业所得税…...

在 Ubuntu 系统上安装 npm 环境以及 nvm(Node Version Manager)
在 Ubuntu 系统上安装 npm 环境以及 nvm(Node Version Manager) 步骤 1: 更新系统包步骤 2: 安装 nvm步骤 3: 安装 Node.js 和 npm步骤 4: 设置默认 Node.js 版本(可选)总结 在 Ubuntu 系统上安装 npm 环境以及 nvm(No…...

深度优先搜索(dfs)题目合集
深度优先搜索(dfs)题目合集 全排列问题 dfs原理和模版深度优先搜索原理(纯个人理解)参考程序dfs通用模版 素数环组合的输出 剪枝新dfs模版参考程序新的dfs模版 自然数的拆分 利用形参进行回溯 全排列问题 dfs原理和模版 P1706 全…...

性能监控利器:Ubuntu 22.04 上的 Zabbix 安装与配置指南
简介 今天我们来聊聊如何在 Ubuntu 22.04 上安装和配置 Zabbix。我们会用到 PostgreSQL 作为数据库后端,Nginx 作为 Web 服务器,并用 Let’s Encrypt SSL 证书来保驾护航。 什么是 Zabbix? Zabbix 是一个开源的网络监控和管理解决方案&…...

性能测试的宏观分析:全面提升系统表现的关键
在当今快速发展的软件行业中,系统性能的优劣直接影响用户体验和业务成功。性能测试作为确保系统高效运行的重要环节,其方法和策略不断演进。其中,宏观分析作为一种全面评估系统性能的手段,日益受到关注。本文将深入探讨宏观分析在…...

ctfshow
1,web21 Basic认证采用Base64加密方式,Base64解码字符串发现是 用户名:密码 的格式进行Base64编码。 密码shark63 2,web22 用 子域名扫描器 扫出flag.ctf.show拿到flag,但这个域名已经没了所以就直接交的官方提供的flag。 3,web23 这段PHP代码是一个简单…...

【分享一个vue指令】鼠标放置提示指令v-tooltip
描述 自定义指令 v-tooltip mounted(el, binding):当元素被挂载到DOM上时,这个钩子会被调用。 el 是指令绑定的元素,binding 包含了指令的值,即 binding.value,这里是 clickOutside 字符串。tooltip 变量用于存储创建…...

掌握 Spring 事务管理:深入理解 @Transactional 注解
在业务方法上使用Transactional开启声明式事务时,很有可能由于使用方式有误,导致事务没有生效。 环境准备 表结构 CREATE TABLE admin (id bigint(20) unsigned NOT NULL AUTO_INCREMENT,username varchar(255) DEFAULT NULL,password varchar(255) …...

字符三角形
字符三角形 C语言代码C语言代码Java语言代码Python语言代码 💐The Begin💐点点关注,收藏不迷路💐 给定一个字符,用它构造一个底边长5个字符,高3个字符的等腰字符三角形。 输入 输入只有一行, …...

【LLM】一文学会SPPO
博客昵称:沈小农学编程 作者简介:一名在读硕士,定期更新相关算法面试题,欢迎关注小弟! PS:哈喽!各位CSDN的uu们,我是你的小弟沈小农,希望我的文章能帮助到你。欢迎大家在…...

如何通过ChatGPT提高自己的编程水平
在编程学习的过程中,开发者往往会遇到各种各样的技术难题和学习瓶颈。传统的学习方法依赖书籍、教程、视频等,但随着技术的不断发展,AI助手的崛起为编程学习带来了全新的机遇。ChatGPT,作为一种强大的自然语言处理工具,…...

NVR管理平台EasyNVR多品牌NVR管理工具的流媒体视频融合与汇聚管理方案
随着信息技术的飞速发展,视频监控已经成为现代社会安全管理和业务运营不可或缺的一部分。无论是智慧城市、智能交通、还是大型企业、校园安防,视频监控系统的应用都日益广泛。NVR管理平台EasyNVR,作为功能强大的流媒体服务器软件,…...

python之使用django框架开发web项目
本问将对django框架在python的web项目中的使用进行介绍,有不对之处,烦请指正。 首先使用创建一个django工程(本示例中使用pycharm2024+python3.12),名称和项目保存路径根据自己的需要自行修改,新手直接默认本机环境就好(关于conda将会另开一篇进行讲解。),最后点击cre…...