当前位置: 首页 > news >正文

大数据学习之Spark分布式计算框架RDD、内核进阶

一.RDD

28.RDD_为什么需要RDD

29.RDD_定义

30.RDD_五大特性总述

31.RDD_五大特性1

32.RDD_五大特性2

33.RDD_五大特性3

34.RDD_五大特性4

35.RDD_五大特性5

36.RDD_五大特性总结

37.RDD_创建概述

38.RDD_并行化创建

演示代码:
// 获取当前 RDD 的分区数
@Since ( "1.6.0" )
final def getNumPartitions : Int =
partitions . length
// 显示出 RDD 被分配到不同分区的信息
/**Return an RDD created by coalescing all
elements within each partition into an
array.*/
def glom (): RDD [ Array [ T ]]
1
2
3
4
5
6
package com . itbaizhan . rdd
//1. 导入 SparkConf 类、 SparkContext
import org . apache . spark . rdd . RDD
import org . apache . spark .{ SparkConf ,
SparkContext }
object CreateByParallelize {
def main ( args : Array [ String ]): Unit = {
//2. 构建 SparkConf 对象。并设置本地运行和程序的
名称
val conf = new
SparkConf (). setMaster ( "local[2]" ). setAppName
( "CreateRdd1" )
//3. 构建 SparkContext 对象
val sc = new SparkContext ( conf )
//4. 通过并行化创建 RDD 对象:将本地集合 -> 分布式的
RDD 对象
1
2
3
4
5
6
7
8
9
10
11
12
79    
//val rdd: RDD[Int] =
sc.parallelize[Int](List(1, 2, 3, 4, 5, 6,
7, 8))
val rdd : RDD [ Int ] =
sc . parallelize ( List ( 1 , 2 , 3 , 4 , 5 , 6 , 7 ,
8 ), 3 )
//5. 输出默认的分区数
//5.1
setMaster("local[*]")&¶llelize(List(1,
2, 3, 4, 5, 6, 7, 8))
//println(" 默认分区
数: "+rdd.getNumPartitions)//8, 默认当前系统的
CPU
//5.2
setMaster("local[2]")&¶llelize(List(1,
2, 3, 4, 5, 6, 7, 8))
//println(" 默认分区
数: "+rdd.getNumPartitions)//2
//5.3
setMaster("local[2]")&¶llelize(List(1,
2, 3, 4, 5, 6, 7, 8),3)
println ( " 默认分区
数: " + rdd . getNumPartitions ) //3
//6.collect 方法:将 rdd 对象中每个分区的数据,都
发送到 Driver ,形成一个 Array 对象
val array1 : Array [ Int ] = rdd . collect ()
println ( "rdd.collect()=" + array1 . mkString ( ",
" ))
//7. 显示出 rdd 对象中元素被分布到不同分区的数据信
13
14
15
16
17
18
19
20
21
22
23
24
25
80 运行结果:
实时效果反馈
1. 以下关于并行化创建 RDD 的描述错误的是:
A
通过并行化集合创建,将本地集合对象转分布式 RDD
B
parallelize() 方法必须传递两个参数。
C
parallelize 没有给定分区数 , 默认分区数等于执行程序的当前
服务器 CPU 核数。
答案:
val array2 : Array [ Array [ Int ]] =
rdd . glom (). collect ()
println ( "rdd.glom().collect() 的内容是 :" )
/*for(eleArr<- array2){
println(eleArr.mkString(","))
}*/
array2 . foreach ( eleArr => println ( eleArr . mkStr
ing ( "," )))
}
}
26
27
28
29
30
31
32
33
默认分区数: 3
rdd.collect()=1,2,3,4,5,6,7,8
rdd.glom().collect() 的内容是 :
1,2
3,4,5
6,7,8

39.RDD_读取文件创建RDD

40.RDD_读取小文件创建RDD

扩展 wholeTextFiles 适合读取一堆小文件:
//path 指定小文件的路径目录
//minPartitions 最小分区数 可选参数
def wholeTextFiles ( path :
String , minPartitions : Int =
defaultMinPartitions ): RDD [( String , String )]
1
2
3
85 代码演示:
package com . itbaizhan . rdd
//1. 导入类
import org . apache . spark . rdd . RDD
import org . apache . spark .{ SparkConf ,
SparkContext }
object CreateByWholeTextFiles {
def main ( args : Array [ String ]): Unit = {
//2. 构建 SparkConf 对象,并设置本地运行和程序名
val conf : SparkConf = new
SparkConf (). setMaster ( "local[*]" ). setAppName
( "WholeTextFiles" )
//3. 使用 conf 对象构建 SparkContet 对象
val sc = new SparkContext ( conf )
//5. 读取指定目录下的小文件
val rdd : RDD [( String , String )] =
sc . wholeTextFiles ( "data/tiny_files" )
//(filePath1, " 内容 1"),(filePath2, " 内容
2"),...,(filePathN, " 内容 N")
val tuples : Array [( String , String )] =
rdd . collect ()
tuples . foreach ( ele => println ( ele . _1 , ele . _2 ))
//6. 获取小文件中的内容
val array : Array [ String ] =
rdd . map ( _ . _2 ). collect ()
println ( "---------------------------" )
println ( array . mkString ( "|" ))
//4. 关闭 sc 对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
86 运行输出结果 :
RDD_ 算子概述
定义: 分布式集合 RDD 对象的方法被称为算子
算子分类:
Transformation 转换算子
1
Action 行动算子
2
sc . stop ()
}
}
22
23
24
(file:/D:/codes/itbaizhan/sparkdemo/data/tin
y_files/file1.txt,hello Linux
hello Zookeper
hello Maven
hello hive
hello spark)
(file:/D:/codes/itbaizhan/sparkdemo/data/tin
y_files/file2.txt,Spark Core
Spark RDD
Spark Sql)
----------------
hello Linux
hello Zookeper
hello Maven
hello hive
hello spark|Spark Core
Spark RDD
Spark Sql

41.RDD_算子概述

42.RDD_转换算子map

43.RDD_转换算子flatmap

44.RDD_转换算子reducebykey

45.RDD_转换算子filter

46.RDD_转换算子distinct

47.RDD_转换算子glom

48.RDD_转换算子groupby

object RddGroupBy {
def main ( args : Array [ String ]): Unit = {
//2. 构建 SparkConf 对象,并设置本地运行和程序名
val conf : SparkConf = new
SparkConf (). setMaster ( "local[*]" ). setAppName
( "groupBy" )
//3. 使用 conf 对象构建 SparkContet 对象
val sc = new SparkContext ( conf )
//5. 创建 Rdd
val rdd : RDD [( Char , Int )] =
sc . parallelize ( Array (( 'a' , 1 ), ( 'a' , 2 ),
( 'b' , 1 ), ( 'b' , 2 ), ( 'a' , 3 ), ( 'a' , 4 )))
//6. 通过 groupBy 算子对 rdd 对象中的数据进行分组
//groupBy 插入的函数的用意是指定按照谁进行分组
// 分组后的结果是有二元组组成的 RDD
val gbRdd : RDD [( Char , Iterable [( Char ,
Int )])] = rdd . groupBy ( tupEle => tupEle . _1 )
// 收集到 Driver
val result1 : Array [( Char ,
Iterable [( Char , Int )])] = gbRdd . collect ()
//(a,CompactBuffer((a,1), (a,2), (a,3),
(a,4))),(b,CompactBuffer((b,1), (b,2)))
println ( result1 . mkString ( "," ))
//7. 使用 map 转换算子
//(a,List((a,1), (a,2), (a,3), (a,4))),
(b,List((b,1), (b,2)))
val result2 : Array [( Char , List [( Char ,
Int )])] = gbRdd . map ( tup => ( tup . _1 ,
tup . _2 . toList )). collect ()
println ( result2 . mkString ( "," ))
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
104 实时效果反馈
1. 以下关于
rdd.groupBy(tupEle => tupEle._1)
的描述错误的是:
A
groupBy 传入的函数的意思是 : 通过这个函数 , 确定按照谁来
分组。
B
groupBy 方法适用于元素为元祖类型的 RDD ,元祖元素的个
数只能为 2
C
groupBy 方法适用于元素为元祖类型的 RDD ,元祖元素的个
>=2
答案:
1=>B

49.RDD_转换算子groupbyKey

50.RDD_转换算子sortby

51.RDD_转换算子sortbyKey

52.RDD_转换算子union并集

53.RDD_转换算子交集和差集

54.RDD_转换算子关联算子

55.RDD_转换算子partitionBy

56.RDD_转换算子mapPatitions

57.RDD_转换算子sample

58.RDD_行动算子foreachPartition

59.RDD_行动算子foreach

60.RDD_行动算子saveAsTestFile

61.RDD_行动算子countByKey

62.RDD_行动算子reduce

63.RDD_行动算子fold

64.RDD_行动算子first_take_count

65.RDD_行动算子top_takeOrderd

66.RDD_行动算子takeSample

二.内核进阶

67.内核进阶_DAG概述

68.内核进阶_血缘关系

69.内核进阶_宽窄依赖关系

70.内核进阶_stage划分

71.内核进阶_任务调度概述

72.内核进阶_管道计算模式上

73.内核进阶_管道计算模式下

74.内核进阶_cache缓存

75.内核进阶_checkpoint检查点

76.内核进阶_cache和checkpoint区别

77.内核进阶_并行度

78.内核进阶_广播变量

79.内核进阶_累加器一

80.内核进阶_累加器二

81.内核进阶_累加器之重复计算

82.内核进阶_项目实战PVUV需求分析

83.内核进阶_项目实战PV分析

84.内核进阶_项目实战UV分析

85.内核进阶_二次排序实战

86.内核进阶_分组取topN实战

87.内核进阶_卡口统计项目需求分析

88.内核进阶_卡口统计项目统计正常的卡口

89.内核进阶_卡口统计项目TOP5

90.内核进阶_卡口统计项目统计不同区域同时出现的车辆

91.内核进阶_卡口统计项目统计某卡口下通过的车辆轨迹一

92.内核进阶_卡口统计项目统计某卡口下通过的车辆轨迹二

93.内核进阶_卡口统计项目统计某卡口下通过的车辆轨迹三

94.内核进阶_卡口统计项目统计某卡口下通过的车辆轨迹四

相关文章:

大数据学习之Spark分布式计算框架RDD、内核进阶

一.RDD 28.RDD_为什么需要RDD 29.RDD_定义 30.RDD_五大特性总述 31.RDD_五大特性1 32.RDD_五大特性2 33.RDD_五大特性3 34.RDD_五大特性4 35.RDD_五大特性5 36.RDD_五大特性总结 37.RDD_创建概述 38.RDD_并行化创建 演示代码&#xff1a; // 获取当前 RDD 的分区数 Since ( …...

Unity 加载OSGB(webgl直接加载,无需转换格式!)

Unity webgl加载倾斜摄影数据 前言效果图后续不足 前言 Unity加载倾斜摄影数据&#xff0c;有很多的插件方便好用&#xff0c;但是发布到网页端均失败&#xff0c;因为webgl 的限制&#xff0c;IO读取失效。 前不久发现一个开源项目: UnityOSGB-main 通过两种方式在 Unity 中…...

tcp/ip网络协议,tcp/ip网络协议栈

TCP/IP网络协议和TCP/IP网络协议栈是互联网通信的基石&#xff0c;它们定义了电子设备如何连入因特网以及数据如何在它们之间传输的标准。以下是对TCP/IP网络协议和TCP/IP网络协议栈的详细解释&#xff1a; 一、TCP/IP网络协议 TCP/IP&#xff08;Transmission Control Proto…...

【Debug】the remote host closed the connection错误信息分析

出现的情况说明&#xff1a;QT软件。刚开始都可以连接成功 之后连接 断开几次 就会出现连接失败 错误信息是the remote host closed the connection。the remote host closed the connection广泛原因分析 这个错误通常意味着远端 STM32 服务器主动关闭了连接。可能的原因包括&a…...

SpringBoot扩展篇:@Scope和@Lazy源码解析

SpringBoot扩展篇&#xff1a;Scope和Lazy源码解析 1. 研究主题及Demo2. 注册BeanDefinition3. 初始化属性3.1 解决依赖注入3.2 创建代理 ContextAnnotationAutowireCandidateResolver#getLazyResolutionProxyIfNecessary3.3 代理拦截处理3.4 单例bean与原型bean创建的区别 4. …...

“AI隐患识别系统,安全多了道“智能护盾”

家人们&#xff0c;在生活和工作里&#xff0c;咱们都知道安全那可是头等大事。不管是走在马路上&#xff0c;还是在工厂车间忙碌&#xff0c;又或是住在高楼大厦里&#xff0c;身边都可能藏着一些安全隐患。以前&#xff0c;发现这些隐患大多靠咱们的眼睛和经验&#xff0c;可…...

通向AGI之路:人工通用智能的技术演进与人类未来

文章目录 引言:当机器开始思考一、AGI的本质定义与技术演进1.1 从专用到通用:智能形态的范式转移1.2 AGI发展路线图二、突破AGI的五大技术路径2.1 神经符号整合(Neuro-Symbolic AI)2.2 世界模型架构(World Models)2.3 具身认知理论(Embodied Cognition)三、AGI安全:价…...

论文阅读:InstanceDiffusion: Instance-level Control for Image Generation

CVPR2024文章 摘要&#xff1a; 文本到图像扩散模型产生高质量的图像&#xff0c;但不提供对图像中单个实例的控制。我们引入了InstanceDiffusion&#xff0c;它将精确的实例级控制添加到文本到图像扩散模型中。InstanceDiffusion 支持每个实例的自由形式的语言条件&#xff…...

7.攻防世界 wzsc_文件上传

打开题目页面如下 上传了一张带有木马的图片 返回的页面是空白的&#xff0c;不过路径变了 猜测存在根目录/upload 也可以通过dirsearch扫描根目录 命令&#xff1a; dirsearch -u http://61.147.171.105:65024/ -e* 终于得到了上传的文件的信息 但是测试发现.php文件以及.…...

以为是响应式对象丢失导致数据没有回显

背景&#xff1a;之前ruoyi生成的vue2代码&#xff0c; <el-form ref“form”&#xff0c;后面我改成vue3的写法&#xff0c;没有实例化form&#xff0c; 在vue3中是需要定义const form ref(); 导致点击了修改后&#xff0c;页面弹框显示出来&#xff0c;数据没有回显。 一直…...

来 Gitcode 免费体验 DeepSeek 蒸馏模型,开启 AI 探索新旅程

在 AI 技术飞速发展的时代&#xff0c;你是否也怀揣着对前沿科技的无限好奇与探索欲望&#xff1f;然而&#xff0c;昂贵的模型体验费用和复杂的操作流程&#xff0c;是不是让你一次次望而却步&#xff1f;现在&#xff0c;这些都不再是问题&#xff01;DeepSeek 蒸馏模型现已强…...

2.Mkdocs配置说明(mkdocs.yml)【最新版】

官方文件&#xff1a;Changing the colors - Material for MkDocs 建议详细学习一下上面的官方网站↑↑↑ 我把我目前的配置文件mkdocs.yml代码写在下面&#x1f447;&#x1f3fb; #[Info] site_name: Mkdocs教程 #your site name 显示在左上角 site_url: http://wcowin.wo…...

云轴科技ZStack+海光DCU:率先推出DeepSeek私有化部署方案

针对日益强劲的AI推理需求和企业级AI应用私有化部署场景&#xff08;Private AI&#xff09;&#xff0c;云轴科技ZStack联合海光信息&#xff0c;共同推动ZStack智塔全面支持DeepSeek V3/R1/Janus Pro系列模型&#xff0c;基于海光DCU实现高性能适配&#xff0c;为企业提供安全…...

扩增子分析|零模型2——基于βNTI的微生物随机性和确定性装配过程(箱线图和柱状图R中实现)

一、引言 我们之前发布的周集中老师团队零模型R中实战案例&#xff1a;扩增子分析|基于零模型的群落确定性和随机性构建过程——R实战_bmntd-CSDN博客。在文末只输出了一个.csv 表格。并没有提供绘图的方法&#xff0c;有小伙伴问如何在R中一键成图呢&#xff1f;还真可以&…...

专题:剑指offer

链表 JZ6 从尾到头打印链表 思路&#xff1a;先顺序输出到栈里面 然后再以此从栈顶弹出即可 /** * struct ListNode { * int val; * struct ListNode *next; * ListNode(int x) : * val(x), next(NULL) { * } * }; */ #include …...

DeepSeek 部署过程中的问题

文章目录 DeepSeek 部署过程中的问题一、部署扩展&#xff1a;docker 部署 DS1.1 部署1.2 可视化 二、问题三、GPU 设置3.1 ollama GPU 的支持情况3.2 更新 GPU 驱动3.3 安装 cuda3.4 下载 cuDNN3.5 配置环境变量 四、测试 DeepSeek 部署过程中的问题 Windows 中 利用 ollama 来…...

DeepSeek R1本地化部署 Ollama + Chatbox 打造最强 AI 工具

&#x1f308; 个人主页&#xff1a;Zfox_ &#x1f525; 系列专栏&#xff1a;Linux 目录 一&#xff1a;&#x1f525; Ollama &#x1f98b; 下载 Ollama&#x1f98b; 选择模型&#x1f98b; 运行模型&#x1f98b; 使用 && 测试 二&#xff1a;&#x1f525; Chat…...

应急场景中的数据融合与对齐

1. 概述 在应急管理中,快速、准确地掌握现场状况、实时监控灾情并进行决策至关重要。各类数据(如卫星影像、无人机图像、激光雷达点云、地理信息系统(GIS)数据、传感器数据、社交媒体信息、移动终端数据等)具有来源广泛、格式多样、时空特性不同等特点。如何将这些异构数…...

手机上运行AI大模型(Deepseek等)

最近deepseek的大火&#xff0c;让大家掀起新一波的本地部署运行大模型的热潮&#xff0c;特别是deepseek有蒸馏的小参数量版本&#xff0c;电脑上就相当方便了&#xff0c;直接ollamaopen-webui这种类似的组合就可以轻松地实现&#xff0c;只要硬件&#xff0c;如显存&#xf…...

Mellanox网卡信息查看

1、查看Mellanox网卡的SN&#xff08;序列号&#xff09;和PN mstvpd 04:00.0或者lspci -s 04:00.0 -vvv来自https://enterprise-support.nvidia.com/s/article/MLNX2-117-2532kn 2、查看Mellanox网卡驱动、固件版本 ethtool -i ens6np0...

Java 语言特性(面试系列2)

一、SQL 基础 1. 复杂查询 &#xff08;1&#xff09;连接查询&#xff08;JOIN&#xff09; 内连接&#xff08;INNER JOIN&#xff09;&#xff1a;返回两表匹配的记录。 SELECT e.name, d.dept_name FROM employees e INNER JOIN departments d ON e.dept_id d.dept_id; 左…...

Ubuntu系统下交叉编译openssl

一、参考资料 OpenSSL&&libcurl库的交叉编译 - hesetone - 博客园 二、准备工作 1. 编译环境 宿主机&#xff1a;Ubuntu 20.04.6 LTSHost&#xff1a;ARM32位交叉编译器&#xff1a;arm-linux-gnueabihf-gcc-11.1.0 2. 设置交叉编译工具链 在交叉编译之前&#x…...

iOS 26 携众系统重磅更新,但“苹果智能”仍与国行无缘

美国西海岸的夏天&#xff0c;再次被苹果点燃。一年一度的全球开发者大会 WWDC25 如期而至&#xff0c;这不仅是开发者的盛宴&#xff0c;更是全球数亿苹果用户翘首以盼的科技春晚。今年&#xff0c;苹果依旧为我们带来了全家桶式的系统更新&#xff0c;包括 iOS 26、iPadOS 26…...

Spark 之 入门讲解详细版(1)

1、简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室&#xff08;Algorithms, Machines, and People Lab&#xff09;开发通用内存并行计算框架。Spark在2013年6月进入Apache成为孵化项目&#xff0c;8个月后成为Apache顶级项目&#xff0c;速度之快足见过人之处&…...

【Linux】C语言执行shell指令

在C语言中执行Shell指令 在C语言中&#xff0c;有几种方法可以执行Shell指令&#xff1a; 1. 使用system()函数 这是最简单的方法&#xff0c;包含在stdlib.h头文件中&#xff1a; #include <stdlib.h>int main() {system("ls -l"); // 执行ls -l命令retu…...

基于当前项目通过npm包形式暴露公共组件

1.package.sjon文件配置 其中xh-flowable就是暴露出去的npm包名 2.创建tpyes文件夹&#xff0c;并新增内容 3.创建package文件夹...

GitHub 趋势日报 (2025年06月08日)

&#x1f4ca; 由 TrendForge 系统生成 | &#x1f310; https://trendforge.devlive.org/ &#x1f310; 本日报中的项目描述已自动翻译为中文 &#x1f4c8; 今日获星趋势图 今日获星趋势图 884 cognee 566 dify 414 HumanSystemOptimization 414 omni-tools 321 note-gen …...

LLM基础1_语言模型如何处理文本

基于GitHub项目&#xff1a;https://github.com/datawhalechina/llms-from-scratch-cn 工具介绍 tiktoken&#xff1a;OpenAI开发的专业"分词器" torch&#xff1a;Facebook开发的强力计算引擎&#xff0c;相当于超级计算器 理解词嵌入&#xff1a;给词语画"…...

实现弹窗随键盘上移居中

实现弹窗随键盘上移的核心思路 在Android中&#xff0c;可以通过监听键盘的显示和隐藏事件&#xff0c;动态调整弹窗的位置。关键点在于获取键盘高度&#xff0c;并计算剩余屏幕空间以重新定位弹窗。 // 在Activity或Fragment中设置键盘监听 val rootView findViewById<V…...

Python 包管理器 uv 介绍

Python 包管理器 uv 全面介绍 uv 是由 Astral&#xff08;热门工具 Ruff 的开发者&#xff09;推出的下一代高性能 Python 包管理器和构建工具&#xff0c;用 Rust 编写。它旨在解决传统工具&#xff08;如 pip、virtualenv、pip-tools&#xff09;的性能瓶颈&#xff0c;同时…...