当前位置: 首页 > news >正文

【大数据学习 | Spark-Core】Spark的分区器(HashPartitioner和RangePartitioner)

之前学过的kv类型上面的算子

groupby groupByKey reduceBykey sortBy sortByKey join[cogroup left inner right] shuffle的

mapValues keys values flatMapValues 普通算子,管道形式的算子

shuffle的过程是因为数据产生了打乱重分,分组、排序、join等算子需要将数据重新排版。

shuffle的过程是上游的数据处理完毕写出到自己的磁盘上,然后下游的数据从磁盘上面拉取。

重新排版打乱重分是需要存在规则的。

中间数据的流向规则叫做分区器 partitioner,这个分区器一般是存在于shuffle类算子中的,我们可以这么说,shuffle类算子一定会带有分区器,分区器也可以单独存在,人为定义分发规则。

groupBy groupBykey reduceBykey 自带的分区器HashPartitioner。

sortby sortBykey rangePartitioner

hashPartitioner

规则 按照key的hashCode %下游分区 = 分区编号

处理key-value类型数据,如果key为0,就分配去0号分区。否则调用nonNegativeMod函数。

保证取余的结果为正向结果。

hash取余的方式,不管数据分发到下游的什么分区中,最终结果都是相同的数据放入到一起。

演示结果:

scala> val arr = Array(1,2,3,4,5,6,7,8,9)
arr: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)scala> sc.makeRDD(arr,3)
res78: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[73] at makeRDD at <console>:27scala> res78.mapPartitionsWithIndex((index,it)=> it.map((index,_)))
res79: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[74] at mapPartitionsWithIndex at <console>:26scala> res79.collect
res80: Array[(Int, Int)] = Array((0,1), (0,2), (0,3), (1,4), (1,5), (1,6), (2,7), (2,8), (2,9))scala> res78.map(t=>(t,t))
res81: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[75] at map at <console>:26scala> res78.partitioner
res82: Option[org.apache.spark.Partitioner] = Nonescala> res81.reduceByKey(_+_)
res84: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[76] at reduceByKey at <console>:26scala> res84.partitioner
res85: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@3)scala> res84.mapPartitionsWithIndex((index,it)=> it.map((index,_)))
res88: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[77] at mapPartitionsWithIndex at <console>:26scala> res88.collect
res89: Array[(Int, (Int, Int))] = Array((0,(6,6)), (0,(3,3)), (0,(9,9)), (1,(4,4)), (1,(1,1)), (1,(7,7)), (2,(8,8)), (2,(5,5)), (2,(2,2)))

演示的逻辑,就是按照key.hashcode进行分区,int类型的hashcode值是自己的本身。

并且hash分区器的规则致使我们可以任意的修改下游的分区数量。

scala> res81.reduceByKey(_+_,100)
res91: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[78] at reduceByKey at <console>:26scala> res91.partitions.size
res92: Int = 100scala> res81.reduceByKey(_+_,2)
res93: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[79] at reduceByKey at <console>:26scala> res93.partitions.size
res94: Int = 2

rangePartitioner

hashPartitioner规则非常简单,直接规定来一个数据按照hashcode规则的分配,规则比较简答,但是会出现数据倾斜。

range分区规则中存在两个方法。

rangeBounds界限,在使用这个分区器之前先做一个界限划定。

首先使用水塘抽样算法,在未知的数据集中抽取能够代表整个数据集的样本,根据样本进行规则设定。

然后在使用getPartitions。

首先存在水塘抽样,规定数据的流向以后再执行整体逻辑,会先触发计算。

sortBykey是转换类的算子,不会触发计算。

但是我们发现它触发计算了,因为首先在计算之前先进行水塘抽样,能够规定下游的数据规则,然后再进行数据的计算。

scala> arr
res101: Array[Int] = Array(1, 9, 2, 8, 3, 7, 4, 6, 5)scala> arr.map(t=> (t,t))
res102: Array[(Int, Int)] = Array((1,1), (9,9), (2,2), (8,8), (3,3), (7,7), (4,4), (6,6), (5,5))scala> sc.makeRDD(res102)
res104: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[94] at makeRDD at <console>:27scala> res104.sortByKey()
res105: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[97] at sortByKey at <console>:26scala> res105.partitioner
res106: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.RangePartitioner@fe1f9dea)scala> res104.sortByKey(true,3)
res107: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[100] at sortByKey at <console>:26scala> res107.mapPartitionsWithIndex((index,it)=> it.map((index,_)))
res109: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[101] at mapPartitionsWithIndex at <console>:26scala> res109.collect
res110: Array[(Int, (Int, Int))] = Array((0,(1,1)), (0,(2,2)), (0,(3,3)), (1,(4,4)), (1,(5,5)), (1,(6,6)), (2,(7,7)), (2,(8,8)), (2,(9,9)))

range分区器,它是先做抽样然后指定下游分区的数据界限。

它可以修改分区数量,但是分区数量不能大于元素个数,必须要保证每个分区中都有元素。

scala> res104.sortByKey(true,3)
res111: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[104] at sortByKey at <console>:26scala> res104.sortByKey(true,300)
res112: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[107] at sortByKey at <console>:26scala> res111.partitions.size
res114: Int = 3scala> res112.part

自定义分区器

工作的过程中我们会遇见数据分类的情况,想要根据自己的需求定义分区的规则,让符合规则的数据发送到不同的分区中,这个时候我们就需要自定义分区器了。

定义分区器,让数据发送到不同的分区,从而不同的task任务输出的文件结果内容也不同

# 自己创建数据data/a.txt
hello tom hello jack
hello tom hello jack
hello tom hello jack
hello tom hello jack
hello tom hello jack
# 需求就是将数据按照规则进行分发到不同的分区中
# 存储的时候一个文件存储hello其他的文件存储tom jack

分区器的定义需要实现分区器的接口

class MyPartitioner extends Partitioner{override def numPartitions: Int = ???
// 设定下游存在几个分区override def getPartition(key: Any): Int = ???
// 按照key设定分区的位置
}

整体代码:

package com.hainiu.sparkimport org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}object Test1 {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("parse")conf.setMaster("local[*]")val sc = new SparkContext(conf)val rdd = sc.textFile("data/a.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)val rdd1 = rdd.partitionBy(new MyPartitioner)val fs = FileSystem.get(new Configuration())val out = "data/res"if(fs.exists(new Path(out)))fs.delete(new Path(out),true)rdd1.saveAsTextFile(out)}
}
class MyPartitioner extends Partitioner{override def numPartitions: Int = 2override def getPartition(key: Any): Int = {if(key.toString.equals("hello"))0else1}
}

相关文章:

【大数据学习 | Spark-Core】Spark的分区器(HashPartitioner和RangePartitioner)

之前学过的kv类型上面的算子 groupby groupByKey reduceBykey sortBy sortByKey join[cogroup left inner right] shuffle的 mapValues keys values flatMapValues 普通算子&#xff0c;管道形式的算子 shuffle的过程是因为数据产生了打乱重分&#xff0c;分组、排序、join等…...

CSS3_BFC(十二)

BFC MDN对BFC的解释&#xff1a;块格式化上下文&#xff08;Block Formating Context, BFC&#xff09;是web页面的可视CSS渲染的一部分&#xff0c;是块盒子的布局过程发生的区域&#xff0c;也是浮动元素与其他元素交互的区域。 1、开启BFC flow-root对内容的影响是最低的&am…...

C0032.在Clion中使用MSVC编译器编译opencv的配置方法

使用MSVC编译器编译opencv的配置方法...

微信小程序中会议列表页面的前后端实现

题外话&#xff1a;想通过集成腾讯IM来解决即时聊天的问题&#xff0c;如果含语音视频&#xff0c;腾讯组件一年5万起步&#xff0c;贵了&#xff01;后面我们改为自己实现这个功能&#xff0c;这里只是个总结而已。 图文会诊需求 首先是个图文列表界面 同个界面可以查看具体…...

WEB攻防-通用漏洞文件上传二次渲染.htaccess变异免杀

知识点&#xff1a; 1、文件上传-二次渲染 2、文件上传-简单免杀变异 3、文件上传-.htaccess妙用 4、文件上传-PHP语言特性 1、上传后门时&#xff0c;文件内容带.就不行 这时可以上传一个转换后的ip地址&#xff0c;ip地址对应网站包含后门代码 转换后的int会在访问的时候…...

vue实现列表滑动下拉加载数据

一、实现效果 二、实现思路 使用滚动事件监听器来检测用户是否滚动到底部&#xff0c;然后加载更多数据 监听滚动事件。检测用户是否滚动到底部。加载更多数据。 三、案例代码 <div class"drawer-content"><div ref"loadMoreTrigger" class&q…...

全面解析:HTML页面的加载全过程(四)--浏览器渲染之样式计算

主线程遍历得到的 DOM 树&#xff0c;依次为树中的每个节点计算出它最终的样式&#xff0c;称之为 Computed Style。 通过前面生成的DOM 树和 CSSOM 树&#xff0c;遍历 DOM 树&#xff0c;为每一个 DOM 节点&#xff0c;计算它的所有 CSS 属性&#xff0c;最后会得到一棵带有…...

#Verilog HDL# 谈谈代码中如何跨层次引用

目录 一 先谈作用问题 二 再谈跨层次问题 2.1 向下引用 2.2 向上引用 一 先谈作用问题 大多数编程语言都有一个称为作用域(scope)的特征,它定义了代码的某些部分对于变量和方法的可见性。作用域定义了一个命名空间,以避免同一命名空间内不同对象名称之间的冲突。 V…...

LeetCode 每日一题 2024/11/18-2024/11/24

记录了初步解题思路 以及本地实现代码&#xff1b;并不一定为最优 也希望大家能一起探讨 一起进步 目录 11/18 661. 图片平滑器11/19 3243. 新增道路查询后的最短距离 I11/20 3244. 新增道路查询后的最短距离 II11/21 3248. 矩阵中的蛇11/22 3233. 统计不是特殊数字的数字数量1…...

客户流失分析综述

引言 客户流失这个术语通常用来描述在特定时间或合同期内停止与公司进行业务往来的客户倾向性[1]。传统上&#xff0c;关于客户流失的研究始于客户关系管理&#xff08;CRM&#xff09;[2]。在运营服务时&#xff0c;防止客户流失至关重要。过去&#xff0c;客户获取相对于流失…...

基于51单片机的红包抽奖proteus仿真

地址&#xff1a; https://pan.baidu.com/s/1nYZlLb64kdZAWSydT_uHfA 提取码&#xff1a;1234 仿真图&#xff1a; 芯片/模块的特点&#xff1a; AT89C52/AT89C51简介&#xff1a; AT89C52/AT89C51是一款经典的8位单片机&#xff0c;是意法半导体&#xff08;STMicroelectro…...

cangjie (仓颉) vscode环境搭建

sdk下载 下载中心-仓颉编程语言官网 可选择半年更新版&#xff0c;不用申请。目前版本&#xff1a;0.53.13 &#xff0c;选择不同平台压缩包下载解压到任意位置即可 补充下载&#xff0c;vscode插件解压后&#xff0c;在vscode扩展中选择从vsix安装&#xff0c;安装后新增名为…...

阿里云私服地址

1.解压apache-maven-3.6.1-bin 2.配置本地仓库&#xff1a;修改conf/dettings.xml中的<localReoisitory>为一个指定目录。56行 <localRepository>D:\apache-maven-3.6.1-bin\apache-maven-3.6.1\mvn_repo</localRepository> 3.配置阿里云私服&#xff1a;…...

HTMLCSS:3D金字塔加载动画

效果演示 这段代码通过CSS3的3D变换和动画功能&#xff0c;创建了一个旋转的金字塔加载动画&#xff0c;每个侧面都有不同的颜色渐变&#xff0c;底部还有一个模糊的阴影效果&#xff0c;增加了视觉的立体感。 HTML <div class"pyramid-loader"><div cl…...

shell编程(2)(3)

目录 一、永久环境变量 按用户设置永久环境变量 文件路径&#xff1a; 示例步骤&#xff1a; 删除永久环境变量 二、脚本程序传递参数怎么实现 三、用编程进行数学运算 shell中利用expr进行运算 运算与变量结合 1. 变量赋值和基本运算 2. 使用expr进行运算 3. 变量…...

DFT专家分析scan insertion时使用EDT的策略

作为一名芯片设计DFT工程师专家&#xff0c;在做scan insertion&#xff08;扫描插入&#xff09;时使用EDT&#xff08;Embedded Deterministic Test&#xff0c;嵌入式确定性测试&#xff09;的参数配置策略&#xff0c;需要综合考虑多个方面的因素&#xff0c;以确保测试的高…...

Apple Vision Pro开发003-PolySpatial2.0新建项目

unity6.0下载链接:Unity 实时开发平台 | 3D、2D、VR 和 AR 引擎 一、新建项目 二、导入开发包 com.unity.polyspatial.visionos 输入版本号 2.0.4 com.unity.polyspatial&#xff08;单独导入&#xff09;&#xff0c;或者直接安装 三、对应设置 其他的操作与之前的版本相同…...

分公司如何纳税

分公司不进行纳税由总公司汇总纳税“子公司具有法人资格&#xff0c;依法独立承担民事责任;分公司不具有法人资格&#xff0c;其民事责任由公司承担。”企业设立分支机构&#xff0c;使其不具有法人资格&#xff0c;且不实行独立核算&#xff0c;则可由总公司汇总缴纳企业所得税…...

在 Ubuntu 系统上安装 npm 环境以及 nvm(Node Version Manager)

在 Ubuntu 系统上安装 npm 环境以及 nvm&#xff08;Node Version Manager&#xff09; 步骤 1: 更新系统包步骤 2: 安装 nvm步骤 3: 安装 Node.js 和 npm步骤 4: 设置默认 Node.js 版本&#xff08;可选&#xff09;总结 在 Ubuntu 系统上安装 npm 环境以及 nvm&#xff08;No…...

深度优先搜索(dfs)题目合集

深度优先搜索&#xff08;dfs&#xff09;题目合集 全排列问题 dfs原理和模版深度优先搜索原理&#xff08;纯个人理解&#xff09;参考程序dfs通用模版 素数环组合的输出 剪枝新dfs模版参考程序新的dfs模版 自然数的拆分 利用形参进行回溯 全排列问题 dfs原理和模版 P1706 全…...

全能视频下载工具:Video-Downloader让在线视频轻松保存

全能视频下载工具&#xff1a;Video-Downloader让在线视频轻松保存 【免费下载链接】Video-Downloader 下载youku,letv,sohu,tudou,bilibili,acfun,iqiyi等网站分段视频文件&#xff0c;提供mac&win独立App。 项目地址: https://gitcode.com/gh_mirrors/vi/Video-Downloa…...

重塑机械键盘体验:ZMK固件的革新之旅与实践指南

重塑机械键盘体验&#xff1a;ZMK固件的革新之旅与实践指南 【免费下载链接】zmk ZMK Firmware Repository 项目地址: https://gitcode.com/gh_mirrors/zm/zmk 在机械键盘的世界里&#xff0c;固件如同键盘的灵魂&#xff0c;决定着它的响应速度、功能拓展性和个性化程度…...

Win11Debloat:5分钟解决Windows 11卡顿的终极优化指南

Win11Debloat&#xff1a;5分钟解决Windows 11卡顿的终极优化指南 【免费下载链接】Win11Debloat A simple, lightweight PowerShell script that allows you to remove pre-installed apps, disable telemetry, as well as perform various other changes to declutter and cu…...

30 分钟搞定答辩 PPT!Paperxie AI 生成器:拯救论文人的「熬夜克星」

paperxie-免费查重复率aigc检测/开题报告/毕业论文/智能排版/文献综述/AIPPThttps://www.paperxie.cn/ppt/createhttps://www.paperxie.cn/ppt/create 一、答辩 PPT 惨案现场&#xff1a;你是不是也在为这四件事崩溃&#xff1f; 论文查重通过的那一刻&#xff0c;你以为终于能…...

《数据驱动防折叠:利用企微API与数据分析平台构建智能发送决策系统》

一、问题背景企微群发折叠与用户的历史互动行为紧密相关。对长期未交互的用户发送营销内容&#xff0c;折叠概率极高&#xff1b;而对活跃用户发送相似内容&#xff0c;则可能正常显示。因此&#xff0c;单纯从发送端进行策略优化是不够的&#xff0c;必须引入用户维度的数据&a…...

Shiny框架终极指南:输入控件与输出渲染的完美交互原理

Shiny框架终极指南&#xff1a;输入控件与输出渲染的完美交互原理 【免费下载链接】shiny Easy interactive web applications with R 项目地址: https://gitcode.com/gh_mirrors/sh/shiny Shiny是R语言生态中一款强大的交互式Web应用框架&#xff0c;它让数据科学家和分…...

告别天价桥接芯片!用高云GW5AT-LV15MG132 FPGA搞定MIPI C-PHY摄像头测试盒

国产FPGA革新摄像头测试方案&#xff1a;高云GW5AT-LV15MG132的MIPI C-PHY实战解析 在摄像头模组生产线上&#xff0c;测试环节的成本与效率直接关系到企业竞争力。传统测试方案依赖进口FPGA搭配昂贵桥接芯片&#xff0c;不仅物料清单&#xff08;BOM&#xff09;成本居高不下…...

mkcert 命令文档 - 本地 HTTPS 开发证书生成工具详解

1. 命令简介mkcert 是一个用 Go 语言编写的、零配置的本地开发用自签名证书生成工具。它能够自动创建并安装本地证书颁发机构&#xff08;CA&#xff09;到系统的信任存储中&#xff0c;并生成受本地信任的开发证书&#xff0c;大幅简化 HTTPS 本地开发环境的搭建过程&#xff…...

DeepSeek-R1-Distill-Qwen-7B优化升级:提升推理速度的技巧

DeepSeek-R1-Distill-Qwen-7B优化升级&#xff1a;提升推理速度的技巧 1. 模型概述 DeepSeek-R1-Distill-Qwen-7B是基于Qwen架构的7B参数蒸馏模型&#xff0c;由DeepSeek团队开发。该模型通过知识蒸馏技术从更大的DeepSeek-R1模型中提取关键知识&#xff0c;在保持较高推理能…...

深入S32K3XX以太网内部:用逻辑分析仪抓取MII时序,图解数据收发全过程

深入S32K3XX以太网内部&#xff1a;用逻辑分析仪抓取MII时序&#xff0c;图解数据收发全过程 在嵌入式系统开发中&#xff0c;以太网通信的底层实现往往像一个黑盒子——我们配置好寄存器&#xff0c;数据就神奇地传输了。但对于真正追求技术深度的开发者来说&#xff0c;理解信…...