【大数据学习 | Spark-Core】Spark的分区器(HashPartitioner和RangePartitioner)
之前学过的kv类型上面的算子
groupby
groupByKey
reduceBykey
sortBy
sortByKey
join[cogroup left inner right]
shuffle的
mapValues
keys
values
flatMapValues
普通算子,管道形式的算子
shuffle的过程是因为数据产生了打乱重分,分组、排序、join等算子需要将数据重新排版。
shuffle的过程是上游的数据处理完毕写出到自己的磁盘上,然后下游的数据从磁盘上面拉取。
重新排版打乱重分是需要存在规则的。
中间数据的流向规则叫做分区器 partitioner,这个分区器一般是存在于shuffle类算子中的,我们可以这么说,shuffle类算子一定会带有分区器,分区器也可以单独存在,人为定义分发规则。
groupBy groupBykey reduceBykey
自带的分区器HashPartitioner。
sortby sortBykey
rangePartitioner
hashPartitioner
规则 按照key的hashCode %下游分区 = 分区编号
处理key-value类型数据,如果key为0,就分配去0号分区。否则调用nonNegativeMod函数。
保证取余的结果为正向结果。
hash取余的方式,不管数据分发到下游的什么分区中,最终结果都是相同的数据放入到一起。
演示结果:
scala> val arr = Array(1,2,3,4,5,6,7,8,9)
arr: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)scala> sc.makeRDD(arr,3)
res78: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[73] at makeRDD at <console>:27scala> res78.mapPartitionsWithIndex((index,it)=> it.map((index,_)))
res79: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[74] at mapPartitionsWithIndex at <console>:26scala> res79.collect
res80: Array[(Int, Int)] = Array((0,1), (0,2), (0,3), (1,4), (1,5), (1,6), (2,7), (2,8), (2,9))scala> res78.map(t=>(t,t))
res81: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[75] at map at <console>:26scala> res78.partitioner
res82: Option[org.apache.spark.Partitioner] = Nonescala> res81.reduceByKey(_+_)
res84: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[76] at reduceByKey at <console>:26scala> res84.partitioner
res85: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@3)scala> res84.mapPartitionsWithIndex((index,it)=> it.map((index,_)))
res88: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[77] at mapPartitionsWithIndex at <console>:26scala> res88.collect
res89: Array[(Int, (Int, Int))] = Array((0,(6,6)), (0,(3,3)), (0,(9,9)), (1,(4,4)), (1,(1,1)), (1,(7,7)), (2,(8,8)), (2,(5,5)), (2,(2,2)))
演示的逻辑,就是按照key.hashcode进行分区,int类型的hashcode值是自己的本身。
并且hash分区器的规则致使我们可以任意的修改下游的分区数量。
scala> res81.reduceByKey(_+_,100)
res91: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[78] at reduceByKey at <console>:26scala> res91.partitions.size
res92: Int = 100scala> res81.reduceByKey(_+_,2)
res93: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[79] at reduceByKey at <console>:26scala> res93.partitions.size
res94: Int = 2
rangePartitioner
hashPartitioner规则非常简单,直接规定来一个数据按照hashcode规则的分配,规则比较简答,但是会出现数据倾斜。
range分区规则中存在两个方法。
rangeBounds界限,在使用这个分区器之前先做一个界限划定。
首先使用水塘抽样算法,在未知的数据集中抽取能够代表整个数据集的样本,根据样本进行规则设定。
然后在使用getPartitions。
首先存在水塘抽样,规定数据的流向以后再执行整体逻辑,会先触发计算。
sortBykey是转换类的算子,不会触发计算。
但是我们发现它触发计算了,因为首先在计算之前先进行水塘抽样,能够规定下游的数据规则,然后再进行数据的计算。
scala> arr
res101: Array[Int] = Array(1, 9, 2, 8, 3, 7, 4, 6, 5)scala> arr.map(t=> (t,t))
res102: Array[(Int, Int)] = Array((1,1), (9,9), (2,2), (8,8), (3,3), (7,7), (4,4), (6,6), (5,5))scala> sc.makeRDD(res102)
res104: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[94] at makeRDD at <console>:27scala> res104.sortByKey()
res105: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[97] at sortByKey at <console>:26scala> res105.partitioner
res106: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.RangePartitioner@fe1f9dea)scala> res104.sortByKey(true,3)
res107: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[100] at sortByKey at <console>:26scala> res107.mapPartitionsWithIndex((index,it)=> it.map((index,_)))
res109: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[101] at mapPartitionsWithIndex at <console>:26scala> res109.collect
res110: Array[(Int, (Int, Int))] = Array((0,(1,1)), (0,(2,2)), (0,(3,3)), (1,(4,4)), (1,(5,5)), (1,(6,6)), (2,(7,7)), (2,(8,8)), (2,(9,9)))
range分区器,它是先做抽样然后指定下游分区的数据界限。
它可以修改分区数量,但是分区数量不能大于元素个数,必须要保证每个分区中都有元素。
scala> res104.sortByKey(true,3)
res111: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[104] at sortByKey at <console>:26scala> res104.sortByKey(true,300)
res112: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[107] at sortByKey at <console>:26scala> res111.partitions.size
res114: Int = 3scala> res112.part
自定义分区器
工作的过程中我们会遇见数据分类的情况,想要根据自己的需求定义分区的规则,让符合规则的数据发送到不同的分区中,这个时候我们就需要自定义分区器了。
定义分区器,让数据发送到不同的分区,从而不同的task任务输出的文件结果内容也不同
# 自己创建数据data/a.txt
hello tom hello jack
hello tom hello jack
hello tom hello jack
hello tom hello jack
hello tom hello jack
# 需求就是将数据按照规则进行分发到不同的分区中
# 存储的时候一个文件存储hello其他的文件存储tom jack
分区器的定义需要实现分区器的接口
class MyPartitioner extends Partitioner{override def numPartitions: Int = ???
// 设定下游存在几个分区override def getPartition(key: Any): Int = ???
// 按照key设定分区的位置
}
整体代码:
package com.hainiu.sparkimport org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}object Test1 {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("parse")conf.setMaster("local[*]")val sc = new SparkContext(conf)val rdd = sc.textFile("data/a.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)val rdd1 = rdd.partitionBy(new MyPartitioner)val fs = FileSystem.get(new Configuration())val out = "data/res"if(fs.exists(new Path(out)))fs.delete(new Path(out),true)rdd1.saveAsTextFile(out)}
}
class MyPartitioner extends Partitioner{override def numPartitions: Int = 2override def getPartition(key: Any): Int = {if(key.toString.equals("hello"))0else1}
}
相关文章:

【大数据学习 | Spark-Core】Spark的分区器(HashPartitioner和RangePartitioner)
之前学过的kv类型上面的算子 groupby groupByKey reduceBykey sortBy sortByKey join[cogroup left inner right] shuffle的 mapValues keys values flatMapValues 普通算子,管道形式的算子 shuffle的过程是因为数据产生了打乱重分,分组、排序、join等…...
CSS3_BFC(十二)
BFC MDN对BFC的解释:块格式化上下文(Block Formating Context, BFC)是web页面的可视CSS渲染的一部分,是块盒子的布局过程发生的区域,也是浮动元素与其他元素交互的区域。 1、开启BFC flow-root对内容的影响是最低的&am…...

C0032.在Clion中使用MSVC编译器编译opencv的配置方法
使用MSVC编译器编译opencv的配置方法...

微信小程序中会议列表页面的前后端实现
题外话:想通过集成腾讯IM来解决即时聊天的问题,如果含语音视频,腾讯组件一年5万起步,贵了!后面我们改为自己实现这个功能,这里只是个总结而已。 图文会诊需求 首先是个图文列表界面 同个界面可以查看具体…...

WEB攻防-通用漏洞文件上传二次渲染.htaccess变异免杀
知识点: 1、文件上传-二次渲染 2、文件上传-简单免杀变异 3、文件上传-.htaccess妙用 4、文件上传-PHP语言特性 1、上传后门时,文件内容带.就不行 这时可以上传一个转换后的ip地址,ip地址对应网站包含后门代码 转换后的int会在访问的时候…...

vue实现列表滑动下拉加载数据
一、实现效果 二、实现思路 使用滚动事件监听器来检测用户是否滚动到底部,然后加载更多数据 监听滚动事件。检测用户是否滚动到底部。加载更多数据。 三、案例代码 <div class"drawer-content"><div ref"loadMoreTrigger" class&q…...

全面解析:HTML页面的加载全过程(四)--浏览器渲染之样式计算
主线程遍历得到的 DOM 树,依次为树中的每个节点计算出它最终的样式,称之为 Computed Style。 通过前面生成的DOM 树和 CSSOM 树,遍历 DOM 树,为每一个 DOM 节点,计算它的所有 CSS 属性,最后会得到一棵带有…...
#Verilog HDL# 谈谈代码中如何跨层次引用
目录 一 先谈作用问题 二 再谈跨层次问题 2.1 向下引用 2.2 向上引用 一 先谈作用问题 大多数编程语言都有一个称为作用域(scope)的特征,它定义了代码的某些部分对于变量和方法的可见性。作用域定义了一个命名空间,以避免同一命名空间内不同对象名称之间的冲突。 V…...
LeetCode 每日一题 2024/11/18-2024/11/24
记录了初步解题思路 以及本地实现代码;并不一定为最优 也希望大家能一起探讨 一起进步 目录 11/18 661. 图片平滑器11/19 3243. 新增道路查询后的最短距离 I11/20 3244. 新增道路查询后的最短距离 II11/21 3248. 矩阵中的蛇11/22 3233. 统计不是特殊数字的数字数量1…...

客户流失分析综述
引言 客户流失这个术语通常用来描述在特定时间或合同期内停止与公司进行业务往来的客户倾向性[1]。传统上,关于客户流失的研究始于客户关系管理(CRM)[2]。在运营服务时,防止客户流失至关重要。过去,客户获取相对于流失…...

基于51单片机的红包抽奖proteus仿真
地址: https://pan.baidu.com/s/1nYZlLb64kdZAWSydT_uHfA 提取码:1234 仿真图: 芯片/模块的特点: AT89C52/AT89C51简介: AT89C52/AT89C51是一款经典的8位单片机,是意法半导体(STMicroelectro…...

cangjie (仓颉) vscode环境搭建
sdk下载 下载中心-仓颉编程语言官网 可选择半年更新版,不用申请。目前版本:0.53.13 ,选择不同平台压缩包下载解压到任意位置即可 补充下载,vscode插件解压后,在vscode扩展中选择从vsix安装,安装后新增名为…...

阿里云私服地址
1.解压apache-maven-3.6.1-bin 2.配置本地仓库:修改conf/dettings.xml中的<localReoisitory>为一个指定目录。56行 <localRepository>D:\apache-maven-3.6.1-bin\apache-maven-3.6.1\mvn_repo</localRepository> 3.配置阿里云私服:…...

HTMLCSS:3D金字塔加载动画
效果演示 这段代码通过CSS3的3D变换和动画功能,创建了一个旋转的金字塔加载动画,每个侧面都有不同的颜色渐变,底部还有一个模糊的阴影效果,增加了视觉的立体感。 HTML <div class"pyramid-loader"><div cl…...
shell编程(2)(3)
目录 一、永久环境变量 按用户设置永久环境变量 文件路径: 示例步骤: 删除永久环境变量 二、脚本程序传递参数怎么实现 三、用编程进行数学运算 shell中利用expr进行运算 运算与变量结合 1. 变量赋值和基本运算 2. 使用expr进行运算 3. 变量…...
DFT专家分析scan insertion时使用EDT的策略
作为一名芯片设计DFT工程师专家,在做scan insertion(扫描插入)时使用EDT(Embedded Deterministic Test,嵌入式确定性测试)的参数配置策略,需要综合考虑多个方面的因素,以确保测试的高…...

Apple Vision Pro开发003-PolySpatial2.0新建项目
unity6.0下载链接:Unity 实时开发平台 | 3D、2D、VR 和 AR 引擎 一、新建项目 二、导入开发包 com.unity.polyspatial.visionos 输入版本号 2.0.4 com.unity.polyspatial(单独导入),或者直接安装 三、对应设置 其他的操作与之前的版本相同…...

分公司如何纳税
分公司不进行纳税由总公司汇总纳税“子公司具有法人资格,依法独立承担民事责任;分公司不具有法人资格,其民事责任由公司承担。”企业设立分支机构,使其不具有法人资格,且不实行独立核算,则可由总公司汇总缴纳企业所得税…...
在 Ubuntu 系统上安装 npm 环境以及 nvm(Node Version Manager)
在 Ubuntu 系统上安装 npm 环境以及 nvm(Node Version Manager) 步骤 1: 更新系统包步骤 2: 安装 nvm步骤 3: 安装 Node.js 和 npm步骤 4: 设置默认 Node.js 版本(可选)总结 在 Ubuntu 系统上安装 npm 环境以及 nvm(No…...

深度优先搜索(dfs)题目合集
深度优先搜索(dfs)题目合集 全排列问题 dfs原理和模版深度优先搜索原理(纯个人理解)参考程序dfs通用模版 素数环组合的输出 剪枝新dfs模版参考程序新的dfs模版 自然数的拆分 利用形参进行回溯 全排列问题 dfs原理和模版 P1706 全…...

手游刚开服就被攻击怎么办?如何防御DDoS?
开服初期是手游最脆弱的阶段,极易成为DDoS攻击的目标。一旦遭遇攻击,可能导致服务器瘫痪、玩家流失,甚至造成巨大经济损失。本文为开发者提供一套简洁有效的应急与防御方案,帮助快速应对并构建长期防护体系。 一、遭遇攻击的紧急应…...

关于nvm与node.js
1 安装nvm 安装过程中手动修改 nvm的安装路径, 以及修改 通过nvm安装node后正在使用的node的存放目录【这句话可能难以理解,但接着往下看你就了然了】 2 修改nvm中settings.txt文件配置 nvm安装成功后,通常在该文件中会出现以下配置&…...

如何在网页里填写 PDF 表格?
有时候,你可能希望用户能在你的网站上填写 PDF 表单。然而,这件事并不简单,因为 PDF 并不是一种原生的网页格式。虽然浏览器可以显示 PDF 文件,但原生并不支持编辑或填写它们。更糟的是,如果你想收集表单数据ÿ…...

算法笔记2
1.字符串拼接最好用StringBuilder,不用String 2.创建List<>类型的数组并创建内存 List arr[] new ArrayList[26]; Arrays.setAll(arr, i -> new ArrayList<>()); 3.去掉首尾空格...

人机融合智能 | “人智交互”跨学科新领域
本文系统地提出基于“以人为中心AI(HCAI)”理念的人-人工智能交互(人智交互)这一跨学科新领域及框架,定义人智交互领域的理念、基本理论和关键问题、方法、开发流程和参与团队等,阐述提出人智交互新领域的意义。然后,提出人智交互研究的三种新范式取向以及它们的意义。最后,总结…...

招商蛇口 | 执笔CID,启幕低密生活新境
作为中国城市生长的力量,招商蛇口以“美好生活承载者”为使命,深耕全球111座城市,以央企担当匠造时代理想人居。从深圳湾的开拓基因到西安高新CID的战略落子,招商蛇口始终与城市发展同频共振,以建筑诠释对土地与生活的…...
JavaScript基础-API 和 Web API
在学习JavaScript的过程中,理解API(应用程序接口)和Web API的概念及其应用是非常重要的。这些工具极大地扩展了JavaScript的功能,使得开发者能够创建出功能丰富、交互性强的Web应用程序。本文将深入探讨JavaScript中的API与Web AP…...
多模态图像修复系统:基于深度学习的图片修复实现
多模态图像修复系统:基于深度学习的图片修复实现 1. 系统概述 本系统使用多模态大模型(Stable Diffusion Inpainting)实现图像修复功能,结合文本描述和图片输入,对指定区域进行内容修复。系统包含完整的数据处理、模型训练、推理部署流程。 import torch import numpy …...

Ray框架:分布式AI训练与调参实践
Ray框架:分布式AI训练与调参实践 系统化学习人工智能网站(收藏):https://www.captainbed.cn/flu 文章目录 Ray框架:分布式AI训练与调参实践摘要引言框架架构解析1. 核心组件设计2. 关键技术实现2.1 动态资源调度2.2 …...

vxe-table vue 表格复选框多选数据,实现快捷键 Shift 批量选择功能
vxe-table vue 表格复选框多选数据,实现快捷键 Shift 批量选择功能 查看官网:https://vxetable.cn 效果 代码 通过 checkbox-config.isShift 启用批量选中,启用后按住快捷键和鼠标批量选取 <template><div><vxe-grid v-bind"gri…...