spark的学习-03
RDD的创建的两种方式:
方式一:并行化一个已存在的集合
方法:parallelize 并行的意思
将一个集合转换为RDD
方式二:读取外部共享存储系统
方法:textFile、wholeTextFile、newAPIHadoopRDD等
读取外部存储系统的数据转换为RDD
RDD的五大特征:
-
每个RDD 都由一系列的分区构成
-
RDD 的转换操作本质上就是对RDD所有分区的并行转换
-
每个RDD 都会保存与其他RDD之间的依赖关系:血链机制或者血脉机制
-
如果是二元组【KV】类型的RDD,在Shuffle过程中可以自定义分区器,默认是hash分区(hash值取模进行分区)
-
可选的,Spark程序运行时,Task的分配可以指定实现本地优先计算:最优计算位置
RDD的五大特性分别是什么?
a. 每个RDD都可以由多个分区构成
b. 对RDD转换处理本质上是对RDD所有分区的并行转换处理
c. 对每个RDD都会保留与其他RDD之间的依赖关系:血脉机制
d. 可选的,对于KV结构的RDD,在经过Shuffle时,可以干预分区规则,默认是Hash分区
e. 可选的,Spark分配Task时会优先本地计算,尽量将Task分配到数据所在的节点
转换算子:
map:
# map:
list1 = [1, 2, 3, 4, 5]
# 目标是求出集合中各个元素的 3 次方
listRdd = sc.parallelize(list1)
mapRdd = listRdd.map(lambda x: math.pow(x, 3))
mapRdd.foreach(lambda x: print(x)) # foreach是触发算子
flatMap:
# flatMap:
# 目标是根据/切割,得到每个歌名
fileRdd = sc.textFile("../../datas/wordcount/songs.txt")
flatMapRdd = fileRdd.flatMap(lambda line: line.split("/"))
flatMapRdd.foreach(lambda x:print(x))
filter:
过滤算子
# filter :
# 目标是过滤掉不符合的文本
fileRdd2 = sc.textFile("../../datas/wordcount/songs2.txt")
filterRdd = fileRdd2.filter(lambda line: re.split("\s",line)[2] != '-1' and len(re.split("\s",line)) == 4)
filterRdd.foreach(lambda x: print(x))
union:
联合
list2 = [1,2,3,4,5,6,7,8]
list3 = [6,7,8,9,10]
rdd1 = sc.parallelize(list2)
rdd2 = sc.parallelize(list3)rdd3 = rdd1.union(rdd2)rdd3.foreach(lambda x: print(x)) # 1 2 3 4 5 6 7 8 6 7 8 9 10
distinct:
去重
rdd4 = rdd3.distinct()
rdd4.foreach(lambda x: print(x)) # 1 2 3 4 5 6 7 8 9 10
分组聚合算子 groupByKey 以及 reduceByKey:
groupByKey只根据key进行分组,但不聚合 reduceByKey根据key进行分组,且进行聚合 (必须进行shuffle,可以指定分区的数量和规则) groupByKey转换算子,只对 KV键值对的RDD 起作用
rdd5 = sc.parallelize([("word", 10), ("word", 5), ("hello", 100), ("hello", 20), ("laoyan", 1)], numSlices=3)
rdd6 = rdd5.groupByKey() # ("word",List[10,5])
rdd6.foreach(lambda x: print(x[0], *x[1])) rdd7 = rdd5.reduceByKey(lambda total, num: total + num)
rdd7.foreach(print)
重分区算子:repartition、coalesce :
二者都可以将分区变大变小
repartition必须经过shuffle 因为底层代码中 shuffle = True,可以将分区变小或者变大
而coalesce 可以选择经过不经过shuffle,默认情况下不经过,在默认情况下,只能将分区变小,不能将分区变大。假如shuffle=True,也可以将分区变大。
使用repartition更改分区的数量:
list01 = [1, 5, 2, 6, 9, 10, 4, 3, 8, 7]
# 没有指定分区,走默认,默认分区个数,因为是local 模式,所以跟核数有关,所以 分区数为2
rdd = sc.parallelize(list01)
print(rdd.getNumPartitions()) # getNumPartitions() 获取分区的数量 返回值不是RDD,所以不是转换算子,是触发算子 # 2
# 使用 repartition 将 分区数量改为4 或 1
changeRdd = rdd.repartition(4) # 经过shuffle过程,将分区数量更改为4
print(changeRdd.getNumPartitions()) # 现在就将rdd 的分区更改为4了 # 4
# 还可以更改为1 (缩小分区)
print(rdd.repartition(1).getNumPartitions()) # 1
使用coalesce 更改分区的数量:
将小分区变为大分区,必须进行shuffle过程 在coalesce的中默认shuffle=Flase,所以我们需要手动更改为True
changeRdd2 = rdd.coalesce(4,shuffle=True) #
print(changeRdd2.getNumPartitions()) # 4
将大分区改为小分区,在coalesce中可以不进行shuffle过程,所以不需要改为True
print(rdd.coalesce(1).getNumPartitions()) # 1
排序算子:sortBy、sortByKey:
fileRdd = sc.textFile("../../datas/c.txt")#fileRdd.sortBy(lambda line:line.split(",")[1],ascending=False).foreach(print)# sortByKey 对KV类型的RDD进行排序rdd5 = sc.parallelize([("word", 10), ("word", 5), ("hello", 100), ("hello", 20), ("laoyan", 1)], numSlices=3)#rdd5.sortByKey(ascending=False).foreach(print)# 假如你想根据value排序,怎么办?rdd5.sortBy(lambda tuple:tuple[1],ascending=False).foreach(print) # ascending=False降序排序
触发算子:
常见的触发算子:count、foreach、take
# 较为常见的触发算子
# count foreach saveAsTextFile
# count
list1 = [1,2,3,4,5,6,7,8,9]
rdd1 = sc.parallelize(list1,2)
print(rdd1.count()) #9rdd1.foreach(lambda x: print(x))print(rdd1.take(3)) # [1 2 3]
其他触发算子:
first、take:
# first: 返回RDD集合中的第一个元素
print(rdd1.first()) # 1 print(rdd1.take(3)) # [1 2 3]
collect:
我们在上面sortBy案例中写到了collect,如果不collect就直接打印结果的话,出来的是各个分区中排序的结果,并不是全局的(sortBy是全局排序的,只不过我们之前有分区,只在分区中排序)
想看到全局的排序,可以直接将分区数量更改为1,或者直接使用collect收集
reduce:
我们在上面的案例中也使用到了reduceByKey转换算子,这个和上面的差不多,只不过reduce只进行聚合,而不需要根据key分组什么的,因为就没有key
print(rdd1.reduce(lambda sum, num:sum + num)) # 45
top 和 takeOrdered:
先对RDD中的所有元素进行升序排序,top返回最大的几个元素、takeOrdered返回最小的几个元素
都不经过shuffle,将所有元素放入Driver内存中排序,性能更好,只能适合处理小数据量
list2 = [2,1,5,79,435,33,576]
rdd2 = sc.parallelize(list2)
print(rdd2.top(3)) # [576, 435, 79]
# takeOrdered 也是一个触发算子,返回排序之后的最小的几个值
print(rdd2.takeOrdered(3)) # [1, 2, 5]
join 方面的算子:
join leftOuterJoin rightOuterJoin fullOuterJoin 都为转换算子
join的过程,必然引发相同key值的数据汇总在一起,引发shuffle 操作
join:
rdd_singer_age = sc.parallelize([("周杰伦", 43), ("陈奕迅", 47), ("蔡依林", 41), ("林子祥", 74), ("陈升", 63)],numSlices=2)
rdd_singer_music = sc.parallelize([("周杰伦", "青花瓷"), ("陈奕迅", "孤勇者"), ("蔡依林", "日不落"), ("林子祥", "男儿当自强"),("动力火车", "当")], numSlices=2)# join
joinRdd = rdd_singer_age.join(rdd_singer_music).foreach(lambda x : print(x))
# ('周杰伦', (43, '青花瓷'))
# ('蔡依林', (41, '日不落'))
# ('陈奕迅', (47, '孤勇者'))
# ('林子祥', (74, '男儿当自强'))
leftOuterJoin:
和sql中的leftjoin一样,左边的值全出,右边的值有的就显示,没有就显示null
rightOuterJoin 同理
leftJoinRdd = rdd_singer_age.leftOuterJoin(rdd_singer_music).foreach(lambda x:print(x))
#('周杰伦', (43, '青花瓷'))
#('蔡依林', (41, '日不落'))
#('陈升', (63, None))
#('陈奕迅', (47, '孤勇者'))
#('林子祥', (74, '男儿当自强'))
fullOuterJoin:
fullJoinRdd = rdd_singer_age.fullOuterJoin(rdd_singer_music).foreach(lambda x: print(x))
# ('动力火车', (None, '当'))
# ('周杰伦', (43, '青花瓷'))
# ('蔡依林', (41, '日不落'))
# ('陈升', (63, None))
# ('陈奕迅', (47, '孤勇者'))
# ('林子祥', (74, '男儿当自强'))
分区算子 mapPartitions -- 转换算子 foreachParition -- 触发算子
mapPartitions:
input_rdd = sc.parallelize((1, 2, 3, 4, 5, 6, 7, 8, 9, 10), numSlices=2)
# 使用mapPartitions:对每个分区进行处理
def map_partition(part):rs = [i * 2 for i in part]return rs# 每个分区会调用一次:将这个分区的数据放入内存,性能比map更好,优化型算子,注意更容易出现内存溢出
map_part_rdd = input_rdd.mapPartitions(lambda part: map_partition(part))
foreachParition:
- 优点:性能快、节省外部连接资源 - 缺点:如果单个分区的数据量较大,容易出现内存溢出 - 场景: -数据量不是特别大,需要提高性能【将整个分区的数据放入内存】 -需要构建外部资源时【基于每个分区构建一份资源】
def save_part_to_mysql(part):# 构建MySQL连接for i in part:# 利用MySQL连接将结果写入MySQLprint(i)# 将每个分区的数据直接写入MySQL,一个分区就构建一个连接
map_part_rdd.foreachPartition(lambda part: save_part_to_mysql(part))
Spark的容错机制:(重点)
1、RDD容错机制:persist持久化机制
其中有三个算子: cache 、 persist 、 unpersist
cache:
# 功能:将RDD缓存在内存中
# 本质其实底层还是调用的 persist ,但是只缓存在内存中,如果内存不足的话,缓存就会失败
语法:cache()
persist :
与cache不同的是,persist 可以自己指定缓存的方式(级别)
# 将RDD缓存在磁盘中
StorageLevel.DISK_ONLY = StorageLevel(True, False, False, False)
StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
StorageLevel.DISK_ONLY_3 = StorageLevel(True, False, False, False, 3)# 将RDD缓存在内存中
StorageLevel.MEMORY_ONLY = StorageLevel(False, True, False, False)
StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)# 将RDD优先缓存在内存中,如果内存不足,就缓存在磁盘中
StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, False, False)
StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)# 使用堆外内存
StorageLevel.OFF_HEAP = StorageLevel(True, True, True, False, 1)# 使用序列化
StorageLevel.MEMORY_AND_DISK_DESER = StorageLevel(True, True, False, True)
常用的有
MEMORY_AND_DISK_2 -- 先缓存内存,如果内存不足就缓存在磁盘
MEMORY_AND_DISK_DESER -- 使用序列化
unpersist :
功能就是将缓存释放出去
-
unpersist(blocking=True):等释放完再继续下一步 (默认为False)
-
场景:明确RDD已经不再使用,后续还有很多的代码需要执行,将RDD的数据从缓存中释放,避免占用资源
-
注意:如果不释放,这个Spark程序结束,也会释放这个程序中的所有内存
总体代码演示:
# step3: 保存结果# 对RDD进行缓存rs_rdd.cache() # 只缓存在内存中rs_rdd.persist(StorageLevel.MEMORY_AND_DISK)# 打印结果:构建RDDrs_rdd.foreach(lambda x: print(x))# 打印第一行:重新构建一遍print(rs_rdd.first())# 统计行数:重新构建一遍print(rs_rdd.count())# todo:3-关闭SparkContexttime.sleep(10000)# 如果这个RDD明确后续代码中不会再被使用,一定要释放缓存rs_rdd.unpersist(blocking=True)# unpersist(blocking=True):等RDD释放完再继续下一步
# blocking = True:阻塞
2、checkPoint 检查点
checkpoint需要在触发算子的前面设置检查点,之后设置的话可能会出现只产生文件夹,而不产生结果的情况
# 创建sc对象
conf = SparkConf().setMaster("local[2]").setAppName("第一个pysparkDemo")
sc = SparkContext(conf=conf)fileRdd = sc.textFile("../../datas/wordcount/sogou.tsv")
mapRdd = (fileRdd.filter(lambda line: len(re.split("\s+", line)) == 6) \.map(lambda line: (re.split("\s+", line)[0], re.split("\s+", line)[1], re.split("\s+", line)[2][1:-1])))sc.setCheckpointDir("../datas/chk/chk1")mapRdd.checkpoint()
# checkpoint需要在触发算子的前面设置检查点,之后设置的话可能会出现只产生文件夹,而不产生结果的情况print(mapRdd.count())time.sleep(100)sc.stop()
容错机制面试题:
RDD的cache、persist持久化机制和checkpoint检查点机制有什么区别?
-
存储位置
-
persist:将RDD缓存在内存或者磁盘中
-
chk:将RDD的数据存储在文件系统磁盘中
-
-
生命周期
-
persist:当代码中遇到了unpersist或者程序结束,缓存就会被自动清理
-
chk:检查点的数据是不会被自动清理的,只能手动删除
-
-
存储内容
-
persist:会保留RDD的血脉关系,如果缓存丢失,可以通过血脉进行恢复
-
chk:会斩断RDD的血脉关系,不会保留RDD的血脉关系的
-
相关文章:

spark的学习-03
RDD的创建的两种方式: 方式一:并行化一个已存在的集合 方法:parallelize 并行的意思 将一个集合转换为RDD 方式二:读取外部共享存储系统 方法:textFile、wholeTextFile、newAPIHadoopRDD等 读取外部存储系统的数…...

一文了解Android SELinux
在Android系统中,SELinux(Security-Enhanced Linux)是一个增强的安全机制,用于对系统进行强制访问控制(Mandatory Access Control,MAC)。它限制了应用程序和进程的访问权限,提供了更…...

数据血缘追踪是如何在ETL过程中发挥作用?
在大数据环境下,数据血缘追踪具有重要意义,它能够帮助用户了解数据的派生关系、变换过程和使用情况,进而提高数据的可信度和可操作性。通过数据血缘追踪,ETL用户可以准确追溯数据的来源,快速排查数据异常和问题。 一、…...
跟我学C++中级篇——生产中如何调试程序
一、程序的BUG和异常 程序不是发布到生产环境就万事大吉了。没有人敢保证自己写的代码没有BUG,放心,说这种话的人,基本可以断定是小白。如果在开发阶段出现问题,还是比较好解决的,但是如果真到了生产上,可…...
Python爬虫实战 | 爬取网易云音乐热歌榜单
网易云音乐热歌榜单爬虫实战 环境准备 Python 3.xrequests 库BeautifulSoup 库 安装依赖 pip install requests beautifulsoup4代码 import requests from bs4 import BeautifulSoupdef get_cloud_music_hot_songs():url "http://music.163.com/#/discover/playlist…...

apk因检测是否使用代理无法抓包绕过方式
最近学习了如何在模拟器上抓取APP的包,APP防恶意行为的措施可分为三类: (1)反模拟器调试 (2)反代理 (3)反证书检验 第一种情况: 有的app检验是否使用系统代理,…...

DevOps业务价值流:架构设计最佳实践
系统设计阶段作为需求与研发之间的桥梁,在需求设计阶段的原型设计评审环节,尽管项目组人员可能未完全到齐,但关键角色必须到位,包括技术组长和测试组长。这一安排旨在同步推进两项核心任务:一是完成系统的架构设计&…...

计算机网络——SDN
分布式控制路由 集中式控制路由...
开源数据库 - mysql - innodb源码阅读 - master线程(一)
master struct /** The master thread controlling the server. */void srv_master_thread() {DBUG_TRACE;srv_slot_t *slot; // 槽位THD *thd create_internal_thd(); // 创建内部线程ut_ad(!srv_read_only_mode); //断言 srv_read_only_mode 为 falsesrv_main_thread_proce…...

vscode ssh连接autodl失败
autodl服务器已开启,vscode弹窗显示连接失败 0. 检查状态 这里的端口和主机根据自己的连接更改 ssh -p 52165 rootregion-45.autodl.pro1. 修改config权限 按返回的路径找到config文件 右键--属性--安全--高级--禁用继承--从此对象中删除所有已继承的权限--添加…...

文件系统和日志管理 附实验:远程访问第一台虚拟机日志
文件系统和日志管理 文件系统:文件系统提供了一个接口,用户用来访问硬件设备(硬盘)。 硬件设备上对文件的管理 文件存储在硬盘上,硬盘最小的存储单位是512字节,扇区。 文件在硬盘上的最小存储单位&…...

云上拼团GO指南——腾讯云博客部署案例,双11欢乐GO
知孤云出岫-CSDN博客 目录 腾讯云双11活动介绍 一.双十一活动入口 二.活动亮点 (一)双十一上云拼团Go (二)省钱攻略 (三)上云,多类型服务器供您选择 三.会员双十一冲榜活动 (一)活动内容 &#x…...

【VScode】VScode内的ChatGPT插件——CodeMoss全解析与实用教程
在当今快速发展的编程世界中,开发者们面临着越来越多的挑战。如何提高编程效率,如何快速获取解决方案,成为了每位开发者心中的疑问。今天,我们将深入探讨一款颠覆传统编程体验的插件——CodeMoss,它将ChatGPT的强大功能…...

水库大坝安全监测预警方法
一、监测目标 为了确保水库大坝的结构安全性和运行稳定性,我们需要采取一系列措施来预防和减少因自然灾害或其他潜在因素所引发的灾害损失。这不仅有助于保障广大人民群众的生命财产安全,还能确保水资源的合理利用和可持续发展。通过加强大坝的监测和维护…...
深度学习:微调(Fine-tuning)详解
微调(Fine-tuning)详解 微调(Fine-tuning)是机器学习中的一个重要概念,特别是在深度学习和自然语言处理(NLP)领域。该过程涉及调整预训练模型的参数,以适应特定的任务或数据集。以下…...

qt QWebSocketServer详解
1、概述 QWebSocketServer 是 Qt 框架中用于处理 WebSocket 服务器端的类。它允许开发者创建 WebSocket 服务器,接受客户端的连接,并与之进行双向通信。WebSocket 是一种在单个 TCP 连接上进行全双工通讯的协议,它使得客户端和服务器之间的数…...

【数据结构】线性表——链表
写在前面 本篇笔记记录线性表——链表的主要形式,虽然链表有8种形式,但是只要精通笔记中编写的两种,即可触类旁通。 文章目录 写在前面一、链表的概念及结构二、链表的分类三、无头单向非循环链表3.1、链表的实现3.1.1、链表的结构体定义3.1…...
Fork突然报错
现象: Could not resolve hostname github.com: No address associated with hostname fatal: Could not read from remote repository. 原因:需要为fork设置代理 步骤: 1.通过winR输入%localappdata%\fork\gitInstance打开文件夹 2.找到…...

Vue Element-UI 选择隐藏表格中的局部字段信息
一、功能需求分析 为什么需要这个功能? (1)简化信息,减少混乱: 就像整理抽屉,只留下常用的东西,这样找起来更快,看起来也更整洁。在表格中,只展示需要的字段ÿ…...

easyui +vue v-slot 注意事项
https://www.jeasyui.com/demo-vue/main/index.php?pluginDataGrid&themematerial-teal&dirltr&pitemCheckBox%20Selection&sortasc 接口说明 <template><div><h2>Checkbox Selection</h2><DataGrid :data"data" style&…...

深度学习在微纳光子学中的应用
深度学习在微纳光子学中的主要应用方向 深度学习与微纳光子学的结合主要集中在以下几个方向: 逆向设计 通过神经网络快速预测微纳结构的光学响应,替代传统耗时的数值模拟方法。例如设计超表面、光子晶体等结构。 特征提取与优化 从复杂的光学数据中自…...

Docker 运行 Kafka 带 SASL 认证教程
Docker 运行 Kafka 带 SASL 认证教程 Docker 运行 Kafka 带 SASL 认证教程一、说明二、环境准备三、编写 Docker Compose 和 jaas文件docker-compose.yml代码说明:server_jaas.conf 四、启动服务五、验证服务六、连接kafka服务七、总结 Docker 运行 Kafka 带 SASL 认…...

新能源汽车智慧充电桩管理方案:新能源充电桩散热问题及消防安全监管方案
随着新能源汽车的快速普及,充电桩作为核心配套设施,其安全性与可靠性备受关注。然而,在高温、高负荷运行环境下,充电桩的散热问题与消防安全隐患日益凸显,成为制约行业发展的关键瓶颈。 如何通过智慧化管理手段优化散…...

Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习)
Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习) 一、Aspose.PDF 简介二、说明(⚠️仅供学习与研究使用)三、技术流程总览四、准备工作1. 下载 Jar 包2. Maven 项目依赖配置 五、字节码修改实现代码&#…...
iOS性能调优实战:借助克魔(KeyMob)与常用工具深度洞察App瓶颈
在日常iOS开发过程中,性能问题往往是最令人头疼的一类Bug。尤其是在App上线前的压测阶段或是处理用户反馈的高发期,开发者往往需要面对卡顿、崩溃、能耗异常、日志混乱等一系列问题。这些问题表面上看似偶发,但背后往往隐藏着系统资源调度不当…...

安全突围:重塑内生安全体系:齐向东在2025年BCS大会的演讲
文章目录 前言第一部分:体系力量是突围之钥第一重困境是体系思想落地不畅。第二重困境是大小体系融合瓶颈。第三重困境是“小体系”运营梗阻。 第二部分:体系矛盾是突围之障一是数据孤岛的障碍。二是投入不足的障碍。三是新旧兼容难的障碍。 第三部分&am…...
【LeetCode】3309. 连接二进制表示可形成的最大数值(递归|回溯|位运算)
LeetCode 3309. 连接二进制表示可形成的最大数值(中等) 题目描述解题思路Java代码 题目描述 题目链接:LeetCode 3309. 连接二进制表示可形成的最大数值(中等) 给你一个长度为 3 的整数数组 nums。 现以某种顺序 连接…...

【LeetCode】算法详解#6 ---除自身以外数组的乘积
1.题目介绍 给定一个整数数组 nums,返回 数组 answer ,其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法,且在 O…...
vue3 daterange正则踩坑
<el-form-item label"空置时间" prop"vacantTime"> <el-date-picker v-model"form.vacantTime" type"daterange" start-placeholder"开始日期" end-placeholder"结束日期" clearable :editable"fal…...
API网关Kong的鉴权与限流:高并发场景下的核心实践
🔥「炎码工坊」技术弹药已装填! 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 引言 在微服务架构中,API网关承担着流量调度、安全防护和协议转换的核心职责。作为云原生时代的代表性网关,Kong凭借其插件化架构…...