【大数据学习 | Spark-Core】Spark的分区器(HashPartitioner和RangePartitioner)
之前学过的kv类型上面的算子
groupby groupByKey reduceBykey sortBy sortByKey join[cogroup left inner right] shuffle的
mapValues keys values flatMapValues 普通算子,管道形式的算子
shuffle的过程是因为数据产生了打乱重分,分组、排序、join等算子需要将数据重新排版。
shuffle的过程是上游的数据处理完毕写出到自己的磁盘上,然后下游的数据从磁盘上面拉取。

重新排版打乱重分是需要存在规则的。
中间数据的流向规则叫做分区器 partitioner,这个分区器一般是存在于shuffle类算子中的,我们可以这么说,shuffle类算子一定会带有分区器,分区器也可以单独存在,人为定义分发规则。
groupBy groupBykey reduceBykey 自带的分区器HashPartitioner。

sortby sortBykey rangePartitioner

hashPartitioner
规则 按照key的hashCode %下游分区 = 分区编号

处理key-value类型数据,如果key为0,就分配去0号分区。否则调用nonNegativeMod函数。

保证取余的结果为正向结果。
hash取余的方式,不管数据分发到下游的什么分区中,最终结果都是相同的数据放入到一起。
演示结果:
scala> val arr = Array(1,2,3,4,5,6,7,8,9)
arr: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)scala> sc.makeRDD(arr,3)
res78: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[73] at makeRDD at <console>:27scala> res78.mapPartitionsWithIndex((index,it)=> it.map((index,_)))
res79: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[74] at mapPartitionsWithIndex at <console>:26scala> res79.collect
res80: Array[(Int, Int)] = Array((0,1), (0,2), (0,3), (1,4), (1,5), (1,6), (2,7), (2,8), (2,9))scala> res78.map(t=>(t,t))
res81: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[75] at map at <console>:26scala> res78.partitioner
res82: Option[org.apache.spark.Partitioner] = Nonescala> res81.reduceByKey(_+_)
res84: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[76] at reduceByKey at <console>:26scala> res84.partitioner
res85: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@3)scala> res84.mapPartitionsWithIndex((index,it)=> it.map((index,_)))
res88: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[77] at mapPartitionsWithIndex at <console>:26scala> res88.collect
res89: Array[(Int, (Int, Int))] = Array((0,(6,6)), (0,(3,3)), (0,(9,9)), (1,(4,4)), (1,(1,1)), (1,(7,7)), (2,(8,8)), (2,(5,5)), (2,(2,2)))
演示的逻辑,就是按照key.hashcode进行分区,int类型的hashcode值是自己的本身。
并且hash分区器的规则致使我们可以任意的修改下游的分区数量。
scala> res81.reduceByKey(_+_,100)
res91: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[78] at reduceByKey at <console>:26scala> res91.partitions.size
res92: Int = 100scala> res81.reduceByKey(_+_,2)
res93: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[79] at reduceByKey at <console>:26scala> res93.partitions.size
res94: Int = 2
rangePartitioner
hashPartitioner规则非常简单,直接规定来一个数据按照hashcode规则的分配,规则比较简答,但是会出现数据倾斜。
range分区规则中存在两个方法。

rangeBounds界限,在使用这个分区器之前先做一个界限划定。

首先使用水塘抽样算法,在未知的数据集中抽取能够代表整个数据集的样本,根据样本进行规则设定。
然后在使用getPartitions。

首先存在水塘抽样,规定数据的流向以后再执行整体逻辑,会先触发计算。

sortBykey是转换类的算子,不会触发计算。

但是我们发现它触发计算了,因为首先在计算之前先进行水塘抽样,能够规定下游的数据规则,然后再进行数据的计算。
scala> arr
res101: Array[Int] = Array(1, 9, 2, 8, 3, 7, 4, 6, 5)scala> arr.map(t=> (t,t))
res102: Array[(Int, Int)] = Array((1,1), (9,9), (2,2), (8,8), (3,3), (7,7), (4,4), (6,6), (5,5))scala> sc.makeRDD(res102)
res104: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[94] at makeRDD at <console>:27scala> res104.sortByKey()
res105: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[97] at sortByKey at <console>:26scala> res105.partitioner
res106: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.RangePartitioner@fe1f9dea)scala> res104.sortByKey(true,3)
res107: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[100] at sortByKey at <console>:26scala> res107.mapPartitionsWithIndex((index,it)=> it.map((index,_)))
res109: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[101] at mapPartitionsWithIndex at <console>:26scala> res109.collect
res110: Array[(Int, (Int, Int))] = Array((0,(1,1)), (0,(2,2)), (0,(3,3)), (1,(4,4)), (1,(5,5)), (1,(6,6)), (2,(7,7)), (2,(8,8)), (2,(9,9)))
range分区器,它是先做抽样然后指定下游分区的数据界限。
它可以修改分区数量,但是分区数量不能大于元素个数,必须要保证每个分区中都有元素。
scala> res104.sortByKey(true,3)
res111: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[104] at sortByKey at <console>:26scala> res104.sortByKey(true,300)
res112: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[107] at sortByKey at <console>:26scala> res111.partitions.size
res114: Int = 3scala> res112.part
自定义分区器
工作的过程中我们会遇见数据分类的情况,想要根据自己的需求定义分区的规则,让符合规则的数据发送到不同的分区中,这个时候我们就需要自定义分区器了。

定义分区器,让数据发送到不同的分区,从而不同的task任务输出的文件结果内容也不同
# 自己创建数据data/a.txt
hello tom hello jack
hello tom hello jack
hello tom hello jack
hello tom hello jack
hello tom hello jack
# 需求就是将数据按照规则进行分发到不同的分区中
# 存储的时候一个文件存储hello其他的文件存储tom jack
分区器的定义需要实现分区器的接口
class MyPartitioner extends Partitioner{override def numPartitions: Int = ???
// 设定下游存在几个分区override def getPartition(key: Any): Int = ???
// 按照key设定分区的位置
}
整体代码:
package com.hainiu.sparkimport org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}object Test1 {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("parse")conf.setMaster("local[*]")val sc = new SparkContext(conf)val rdd = sc.textFile("data/a.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)val rdd1 = rdd.partitionBy(new MyPartitioner)val fs = FileSystem.get(new Configuration())val out = "data/res"if(fs.exists(new Path(out)))fs.delete(new Path(out),true)rdd1.saveAsTextFile(out)}
}
class MyPartitioner extends Partitioner{override def numPartitions: Int = 2override def getPartition(key: Any): Int = {if(key.toString.equals("hello"))0else1}
}
相关文章:
【大数据学习 | Spark-Core】Spark的分区器(HashPartitioner和RangePartitioner)
之前学过的kv类型上面的算子 groupby groupByKey reduceBykey sortBy sortByKey join[cogroup left inner right] shuffle的 mapValues keys values flatMapValues 普通算子,管道形式的算子 shuffle的过程是因为数据产生了打乱重分,分组、排序、join等…...
CSS3_BFC(十二)
BFC MDN对BFC的解释:块格式化上下文(Block Formating Context, BFC)是web页面的可视CSS渲染的一部分,是块盒子的布局过程发生的区域,也是浮动元素与其他元素交互的区域。 1、开启BFC flow-root对内容的影响是最低的&am…...
C0032.在Clion中使用MSVC编译器编译opencv的配置方法
使用MSVC编译器编译opencv的配置方法...
微信小程序中会议列表页面的前后端实现
题外话:想通过集成腾讯IM来解决即时聊天的问题,如果含语音视频,腾讯组件一年5万起步,贵了!后面我们改为自己实现这个功能,这里只是个总结而已。 图文会诊需求 首先是个图文列表界面 同个界面可以查看具体…...
WEB攻防-通用漏洞文件上传二次渲染.htaccess变异免杀
知识点: 1、文件上传-二次渲染 2、文件上传-简单免杀变异 3、文件上传-.htaccess妙用 4、文件上传-PHP语言特性 1、上传后门时,文件内容带.就不行 这时可以上传一个转换后的ip地址,ip地址对应网站包含后门代码 转换后的int会在访问的时候…...
vue实现列表滑动下拉加载数据
一、实现效果 二、实现思路 使用滚动事件监听器来检测用户是否滚动到底部,然后加载更多数据 监听滚动事件。检测用户是否滚动到底部。加载更多数据。 三、案例代码 <div class"drawer-content"><div ref"loadMoreTrigger" class&q…...
全面解析:HTML页面的加载全过程(四)--浏览器渲染之样式计算
主线程遍历得到的 DOM 树,依次为树中的每个节点计算出它最终的样式,称之为 Computed Style。 通过前面生成的DOM 树和 CSSOM 树,遍历 DOM 树,为每一个 DOM 节点,计算它的所有 CSS 属性,最后会得到一棵带有…...
#Verilog HDL# 谈谈代码中如何跨层次引用
目录 一 先谈作用问题 二 再谈跨层次问题 2.1 向下引用 2.2 向上引用 一 先谈作用问题 大多数编程语言都有一个称为作用域(scope)的特征,它定义了代码的某些部分对于变量和方法的可见性。作用域定义了一个命名空间,以避免同一命名空间内不同对象名称之间的冲突。 V…...
LeetCode 每日一题 2024/11/18-2024/11/24
记录了初步解题思路 以及本地实现代码;并不一定为最优 也希望大家能一起探讨 一起进步 目录 11/18 661. 图片平滑器11/19 3243. 新增道路查询后的最短距离 I11/20 3244. 新增道路查询后的最短距离 II11/21 3248. 矩阵中的蛇11/22 3233. 统计不是特殊数字的数字数量1…...
客户流失分析综述
引言 客户流失这个术语通常用来描述在特定时间或合同期内停止与公司进行业务往来的客户倾向性[1]。传统上,关于客户流失的研究始于客户关系管理(CRM)[2]。在运营服务时,防止客户流失至关重要。过去,客户获取相对于流失…...
基于51单片机的红包抽奖proteus仿真
地址: https://pan.baidu.com/s/1nYZlLb64kdZAWSydT_uHfA 提取码:1234 仿真图: 芯片/模块的特点: AT89C52/AT89C51简介: AT89C52/AT89C51是一款经典的8位单片机,是意法半导体(STMicroelectro…...
cangjie (仓颉) vscode环境搭建
sdk下载 下载中心-仓颉编程语言官网 可选择半年更新版,不用申请。目前版本:0.53.13 ,选择不同平台压缩包下载解压到任意位置即可 补充下载,vscode插件解压后,在vscode扩展中选择从vsix安装,安装后新增名为…...
阿里云私服地址
1.解压apache-maven-3.6.1-bin 2.配置本地仓库:修改conf/dettings.xml中的<localReoisitory>为一个指定目录。56行 <localRepository>D:\apache-maven-3.6.1-bin\apache-maven-3.6.1\mvn_repo</localRepository> 3.配置阿里云私服:…...
HTMLCSS:3D金字塔加载动画
效果演示 这段代码通过CSS3的3D变换和动画功能,创建了一个旋转的金字塔加载动画,每个侧面都有不同的颜色渐变,底部还有一个模糊的阴影效果,增加了视觉的立体感。 HTML <div class"pyramid-loader"><div cl…...
shell编程(2)(3)
目录 一、永久环境变量 按用户设置永久环境变量 文件路径: 示例步骤: 删除永久环境变量 二、脚本程序传递参数怎么实现 三、用编程进行数学运算 shell中利用expr进行运算 运算与变量结合 1. 变量赋值和基本运算 2. 使用expr进行运算 3. 变量…...
DFT专家分析scan insertion时使用EDT的策略
作为一名芯片设计DFT工程师专家,在做scan insertion(扫描插入)时使用EDT(Embedded Deterministic Test,嵌入式确定性测试)的参数配置策略,需要综合考虑多个方面的因素,以确保测试的高…...
Apple Vision Pro开发003-PolySpatial2.0新建项目
unity6.0下载链接:Unity 实时开发平台 | 3D、2D、VR 和 AR 引擎 一、新建项目 二、导入开发包 com.unity.polyspatial.visionos 输入版本号 2.0.4 com.unity.polyspatial(单独导入),或者直接安装 三、对应设置 其他的操作与之前的版本相同…...
分公司如何纳税
分公司不进行纳税由总公司汇总纳税“子公司具有法人资格,依法独立承担民事责任;分公司不具有法人资格,其民事责任由公司承担。”企业设立分支机构,使其不具有法人资格,且不实行独立核算,则可由总公司汇总缴纳企业所得税…...
在 Ubuntu 系统上安装 npm 环境以及 nvm(Node Version Manager)
在 Ubuntu 系统上安装 npm 环境以及 nvm(Node Version Manager) 步骤 1: 更新系统包步骤 2: 安装 nvm步骤 3: 安装 Node.js 和 npm步骤 4: 设置默认 Node.js 版本(可选)总结 在 Ubuntu 系统上安装 npm 环境以及 nvm(No…...
深度优先搜索(dfs)题目合集
深度优先搜索(dfs)题目合集 全排列问题 dfs原理和模版深度优先搜索原理(纯个人理解)参考程序dfs通用模版 素数环组合的输出 剪枝新dfs模版参考程序新的dfs模版 自然数的拆分 利用形参进行回溯 全排列问题 dfs原理和模版 P1706 全…...
鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院查看报告小程序
一、开发环境准备 工具安装: 下载安装DevEco Studio 4.0(支持HarmonyOS 5)配置HarmonyOS SDK 5.0确保Node.js版本≥14 项目初始化: ohpm init harmony/hospital-report-app 二、核心功能模块实现 1. 报告列表…...
【服务器压力测试】本地PC电脑作为服务器运行时出现卡顿和资源紧张(Windows/Linux)
要让本地PC电脑作为服务器运行时出现卡顿和资源紧张的情况,可以通过以下几种方式模拟或触发: 1. 增加CPU负载 运行大量计算密集型任务,例如: 使用多线程循环执行复杂计算(如数学运算、加密解密等)。运行图…...
C++八股 —— 单例模式
文章目录 1. 基本概念2. 设计要点3. 实现方式4. 详解懒汉模式 1. 基本概念 线程安全(Thread Safety) 线程安全是指在多线程环境下,某个函数、类或代码片段能够被多个线程同时调用时,仍能保证数据的一致性和逻辑的正确性…...
Swagger和OpenApi的前世今生
Swagger与OpenAPI的关系演进是API标准化进程中的重要篇章,二者共同塑造了现代RESTful API的开发范式。 本期就扒一扒其技术演进的关键节点与核心逻辑: 🔄 一、起源与初创期:Swagger的诞生(2010-2014) 核心…...
力扣-35.搜索插入位置
题目描述 给定一个排序数组和一个目标值,在数组中找到目标值,并返回其索引。如果目标值不存在于数组中,返回它将会被按顺序插入的位置。 请必须使用时间复杂度为 O(log n) 的算法。 class Solution {public int searchInsert(int[] nums, …...
技术栈RabbitMq的介绍和使用
目录 1. 什么是消息队列?2. 消息队列的优点3. RabbitMQ 消息队列概述4. RabbitMQ 安装5. Exchange 四种类型5.1 direct 精准匹配5.2 fanout 广播5.3 topic 正则匹配 6. RabbitMQ 队列模式6.1 简单队列模式6.2 工作队列模式6.3 发布/订阅模式6.4 路由模式6.5 主题模式…...
云原生安全实战:API网关Kong的鉴权与限流详解
🔥「炎码工坊」技术弹药已装填! 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 一、基础概念 1. API网关(API Gateway) API网关是微服务架构中的核心组件,负责统一管理所有API的流量入口。它像一座…...
pikachu靶场通关笔记19 SQL注入02-字符型注入(GET)
目录 一、SQL注入 二、字符型SQL注入 三、字符型注入与数字型注入 四、源码分析 五、渗透实战 1、渗透准备 2、SQL注入探测 (1)输入单引号 (2)万能注入语句 3、获取回显列orderby 4、获取数据库名database 5、获取表名…...
数学建模-滑翔伞伞翼面积的设计,运动状态计算和优化 !
我们考虑滑翔伞的伞翼面积设计问题以及运动状态描述。滑翔伞的性能主要取决于伞翼面积、气动特性以及飞行员的重量。我们的目标是建立数学模型来描述滑翔伞的运动状态,并优化伞翼面积的设计。 一、问题分析 滑翔伞在飞行过程中受到重力、升力和阻力的作用。升力和阻力与伞翼面…...
前端开发者常用网站
Can I use网站:一个查询网页技术兼容性的网站 一个查询网页技术兼容性的网站Can I use:Can I use... Support tables for HTML5, CSS3, etc (查询浏览器对HTML5的支持情况) 权威网站:MDN JavaScript权威网站:JavaScript | MDN...
