当前位置: 首页 > news >正文

大数据学习之Spark分布式计算框架RDD、内核进阶

一.RDD

28.RDD_为什么需要RDD

29.RDD_定义

30.RDD_五大特性总述

31.RDD_五大特性1

32.RDD_五大特性2

33.RDD_五大特性3

34.RDD_五大特性4

35.RDD_五大特性5

36.RDD_五大特性总结

37.RDD_创建概述

38.RDD_并行化创建

演示代码:
// 获取当前 RDD 的分区数
@Since ( "1.6.0" )
final def getNumPartitions : Int =
partitions . length
// 显示出 RDD 被分配到不同分区的信息
/**Return an RDD created by coalescing all
elements within each partition into an
array.*/
def glom (): RDD [ Array [ T ]]
1
2
3
4
5
6
package com . itbaizhan . rdd
//1. 导入 SparkConf 类、 SparkContext
import org . apache . spark . rdd . RDD
import org . apache . spark .{ SparkConf ,
SparkContext }
object CreateByParallelize {
def main ( args : Array [ String ]): Unit = {
//2. 构建 SparkConf 对象。并设置本地运行和程序的
名称
val conf = new
SparkConf (). setMaster ( "local[2]" ). setAppName
( "CreateRdd1" )
//3. 构建 SparkContext 对象
val sc = new SparkContext ( conf )
//4. 通过并行化创建 RDD 对象:将本地集合 -> 分布式的
RDD 对象
1
2
3
4
5
6
7
8
9
10
11
12
79    
//val rdd: RDD[Int] =
sc.parallelize[Int](List(1, 2, 3, 4, 5, 6,
7, 8))
val rdd : RDD [ Int ] =
sc . parallelize ( List ( 1 , 2 , 3 , 4 , 5 , 6 , 7 ,
8 ), 3 )
//5. 输出默认的分区数
//5.1
setMaster("local[*]")&¶llelize(List(1,
2, 3, 4, 5, 6, 7, 8))
//println(" 默认分区
数: "+rdd.getNumPartitions)//8, 默认当前系统的
CPU
//5.2
setMaster("local[2]")&¶llelize(List(1,
2, 3, 4, 5, 6, 7, 8))
//println(" 默认分区
数: "+rdd.getNumPartitions)//2
//5.3
setMaster("local[2]")&¶llelize(List(1,
2, 3, 4, 5, 6, 7, 8),3)
println ( " 默认分区
数: " + rdd . getNumPartitions ) //3
//6.collect 方法:将 rdd 对象中每个分区的数据,都
发送到 Driver ,形成一个 Array 对象
val array1 : Array [ Int ] = rdd . collect ()
println ( "rdd.collect()=" + array1 . mkString ( ",
" ))
//7. 显示出 rdd 对象中元素被分布到不同分区的数据信
13
14
15
16
17
18
19
20
21
22
23
24
25
80 运行结果:
实时效果反馈
1. 以下关于并行化创建 RDD 的描述错误的是:
A
通过并行化集合创建,将本地集合对象转分布式 RDD
B
parallelize() 方法必须传递两个参数。
C
parallelize 没有给定分区数 , 默认分区数等于执行程序的当前
服务器 CPU 核数。
答案:
val array2 : Array [ Array [ Int ]] =
rdd . glom (). collect ()
println ( "rdd.glom().collect() 的内容是 :" )
/*for(eleArr<- array2){
println(eleArr.mkString(","))
}*/
array2 . foreach ( eleArr => println ( eleArr . mkStr
ing ( "," )))
}
}
26
27
28
29
30
31
32
33
默认分区数: 3
rdd.collect()=1,2,3,4,5,6,7,8
rdd.glom().collect() 的内容是 :
1,2
3,4,5
6,7,8

39.RDD_读取文件创建RDD

40.RDD_读取小文件创建RDD

扩展 wholeTextFiles 适合读取一堆小文件:
//path 指定小文件的路径目录
//minPartitions 最小分区数 可选参数
def wholeTextFiles ( path :
String , minPartitions : Int =
defaultMinPartitions ): RDD [( String , String )]
1
2
3
85 代码演示:
package com . itbaizhan . rdd
//1. 导入类
import org . apache . spark . rdd . RDD
import org . apache . spark .{ SparkConf ,
SparkContext }
object CreateByWholeTextFiles {
def main ( args : Array [ String ]): Unit = {
//2. 构建 SparkConf 对象,并设置本地运行和程序名
val conf : SparkConf = new
SparkConf (). setMaster ( "local[*]" ). setAppName
( "WholeTextFiles" )
//3. 使用 conf 对象构建 SparkContet 对象
val sc = new SparkContext ( conf )
//5. 读取指定目录下的小文件
val rdd : RDD [( String , String )] =
sc . wholeTextFiles ( "data/tiny_files" )
//(filePath1, " 内容 1"),(filePath2, " 内容
2"),...,(filePathN, " 内容 N")
val tuples : Array [( String , String )] =
rdd . collect ()
tuples . foreach ( ele => println ( ele . _1 , ele . _2 ))
//6. 获取小文件中的内容
val array : Array [ String ] =
rdd . map ( _ . _2 ). collect ()
println ( "---------------------------" )
println ( array . mkString ( "|" ))
//4. 关闭 sc 对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
86 运行输出结果 :
RDD_ 算子概述
定义: 分布式集合 RDD 对象的方法被称为算子
算子分类:
Transformation 转换算子
1
Action 行动算子
2
sc . stop ()
}
}
22
23
24
(file:/D:/codes/itbaizhan/sparkdemo/data/tin
y_files/file1.txt,hello Linux
hello Zookeper
hello Maven
hello hive
hello spark)
(file:/D:/codes/itbaizhan/sparkdemo/data/tin
y_files/file2.txt,Spark Core
Spark RDD
Spark Sql)
----------------
hello Linux
hello Zookeper
hello Maven
hello hive
hello spark|Spark Core
Spark RDD
Spark Sql

41.RDD_算子概述

42.RDD_转换算子map

43.RDD_转换算子flatmap

44.RDD_转换算子reducebykey

45.RDD_转换算子filter

46.RDD_转换算子distinct

47.RDD_转换算子glom

48.RDD_转换算子groupby

object RddGroupBy {
def main ( args : Array [ String ]): Unit = {
//2. 构建 SparkConf 对象,并设置本地运行和程序名
val conf : SparkConf = new
SparkConf (). setMaster ( "local[*]" ). setAppName
( "groupBy" )
//3. 使用 conf 对象构建 SparkContet 对象
val sc = new SparkContext ( conf )
//5. 创建 Rdd
val rdd : RDD [( Char , Int )] =
sc . parallelize ( Array (( 'a' , 1 ), ( 'a' , 2 ),
( 'b' , 1 ), ( 'b' , 2 ), ( 'a' , 3 ), ( 'a' , 4 )))
//6. 通过 groupBy 算子对 rdd 对象中的数据进行分组
//groupBy 插入的函数的用意是指定按照谁进行分组
// 分组后的结果是有二元组组成的 RDD
val gbRdd : RDD [( Char , Iterable [( Char ,
Int )])] = rdd . groupBy ( tupEle => tupEle . _1 )
// 收集到 Driver
val result1 : Array [( Char ,
Iterable [( Char , Int )])] = gbRdd . collect ()
//(a,CompactBuffer((a,1), (a,2), (a,3),
(a,4))),(b,CompactBuffer((b,1), (b,2)))
println ( result1 . mkString ( "," ))
//7. 使用 map 转换算子
//(a,List((a,1), (a,2), (a,3), (a,4))),
(b,List((b,1), (b,2)))
val result2 : Array [( Char , List [( Char ,
Int )])] = gbRdd . map ( tup => ( tup . _1 ,
tup . _2 . toList )). collect ()
println ( result2 . mkString ( "," ))
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
104 实时效果反馈
1. 以下关于
rdd.groupBy(tupEle => tupEle._1)
的描述错误的是:
A
groupBy 传入的函数的意思是 : 通过这个函数 , 确定按照谁来
分组。
B
groupBy 方法适用于元素为元祖类型的 RDD ,元祖元素的个
数只能为 2
C
groupBy 方法适用于元素为元祖类型的 RDD ,元祖元素的个
>=2
答案:
1=>B

49.RDD_转换算子groupbyKey

50.RDD_转换算子sortby

51.RDD_转换算子sortbyKey

52.RDD_转换算子union并集

53.RDD_转换算子交集和差集

54.RDD_转换算子关联算子

55.RDD_转换算子partitionBy

56.RDD_转换算子mapPatitions

57.RDD_转换算子sample

58.RDD_行动算子foreachPartition

59.RDD_行动算子foreach

60.RDD_行动算子saveAsTestFile

61.RDD_行动算子countByKey

62.RDD_行动算子reduce

63.RDD_行动算子fold

64.RDD_行动算子first_take_count

65.RDD_行动算子top_takeOrderd

66.RDD_行动算子takeSample

二.内核进阶

67.内核进阶_DAG概述

68.内核进阶_血缘关系

69.内核进阶_宽窄依赖关系

70.内核进阶_stage划分

71.内核进阶_任务调度概述

72.内核进阶_管道计算模式上

73.内核进阶_管道计算模式下

74.内核进阶_cache缓存

75.内核进阶_checkpoint检查点

76.内核进阶_cache和checkpoint区别

77.内核进阶_并行度

78.内核进阶_广播变量

79.内核进阶_累加器一

80.内核进阶_累加器二

81.内核进阶_累加器之重复计算

82.内核进阶_项目实战PVUV需求分析

83.内核进阶_项目实战PV分析

84.内核进阶_项目实战UV分析

85.内核进阶_二次排序实战

86.内核进阶_分组取topN实战

87.内核进阶_卡口统计项目需求分析

88.内核进阶_卡口统计项目统计正常的卡口

89.内核进阶_卡口统计项目TOP5

90.内核进阶_卡口统计项目统计不同区域同时出现的车辆

91.内核进阶_卡口统计项目统计某卡口下通过的车辆轨迹一

92.内核进阶_卡口统计项目统计某卡口下通过的车辆轨迹二

93.内核进阶_卡口统计项目统计某卡口下通过的车辆轨迹三

94.内核进阶_卡口统计项目统计某卡口下通过的车辆轨迹四

相关文章:

大数据学习之Spark分布式计算框架RDD、内核进阶

一.RDD 28.RDD_为什么需要RDD 29.RDD_定义 30.RDD_五大特性总述 31.RDD_五大特性1 32.RDD_五大特性2 33.RDD_五大特性3 34.RDD_五大特性4 35.RDD_五大特性5 36.RDD_五大特性总结 37.RDD_创建概述 38.RDD_并行化创建 演示代码&#xff1a; // 获取当前 RDD 的分区数 Since ( …...

Unity 加载OSGB(webgl直接加载,无需转换格式!)

Unity webgl加载倾斜摄影数据 前言效果图后续不足 前言 Unity加载倾斜摄影数据&#xff0c;有很多的插件方便好用&#xff0c;但是发布到网页端均失败&#xff0c;因为webgl 的限制&#xff0c;IO读取失效。 前不久发现一个开源项目: UnityOSGB-main 通过两种方式在 Unity 中…...

tcp/ip网络协议,tcp/ip网络协议栈

TCP/IP网络协议和TCP/IP网络协议栈是互联网通信的基石&#xff0c;它们定义了电子设备如何连入因特网以及数据如何在它们之间传输的标准。以下是对TCP/IP网络协议和TCP/IP网络协议栈的详细解释&#xff1a; 一、TCP/IP网络协议 TCP/IP&#xff08;Transmission Control Proto…...

【Debug】the remote host closed the connection错误信息分析

出现的情况说明&#xff1a;QT软件。刚开始都可以连接成功 之后连接 断开几次 就会出现连接失败 错误信息是the remote host closed the connection。the remote host closed the connection广泛原因分析 这个错误通常意味着远端 STM32 服务器主动关闭了连接。可能的原因包括&a…...

SpringBoot扩展篇:@Scope和@Lazy源码解析

SpringBoot扩展篇&#xff1a;Scope和Lazy源码解析 1. 研究主题及Demo2. 注册BeanDefinition3. 初始化属性3.1 解决依赖注入3.2 创建代理 ContextAnnotationAutowireCandidateResolver#getLazyResolutionProxyIfNecessary3.3 代理拦截处理3.4 单例bean与原型bean创建的区别 4. …...

“AI隐患识别系统,安全多了道“智能护盾”

家人们&#xff0c;在生活和工作里&#xff0c;咱们都知道安全那可是头等大事。不管是走在马路上&#xff0c;还是在工厂车间忙碌&#xff0c;又或是住在高楼大厦里&#xff0c;身边都可能藏着一些安全隐患。以前&#xff0c;发现这些隐患大多靠咱们的眼睛和经验&#xff0c;可…...

通向AGI之路:人工通用智能的技术演进与人类未来

文章目录 引言:当机器开始思考一、AGI的本质定义与技术演进1.1 从专用到通用:智能形态的范式转移1.2 AGI发展路线图二、突破AGI的五大技术路径2.1 神经符号整合(Neuro-Symbolic AI)2.2 世界模型架构(World Models)2.3 具身认知理论(Embodied Cognition)三、AGI安全:价…...

论文阅读:InstanceDiffusion: Instance-level Control for Image Generation

CVPR2024文章 摘要&#xff1a; 文本到图像扩散模型产生高质量的图像&#xff0c;但不提供对图像中单个实例的控制。我们引入了InstanceDiffusion&#xff0c;它将精确的实例级控制添加到文本到图像扩散模型中。InstanceDiffusion 支持每个实例的自由形式的语言条件&#xff…...

7.攻防世界 wzsc_文件上传

打开题目页面如下 上传了一张带有木马的图片 返回的页面是空白的&#xff0c;不过路径变了 猜测存在根目录/upload 也可以通过dirsearch扫描根目录 命令&#xff1a; dirsearch -u http://61.147.171.105:65024/ -e* 终于得到了上传的文件的信息 但是测试发现.php文件以及.…...

以为是响应式对象丢失导致数据没有回显

背景&#xff1a;之前ruoyi生成的vue2代码&#xff0c; <el-form ref“form”&#xff0c;后面我改成vue3的写法&#xff0c;没有实例化form&#xff0c; 在vue3中是需要定义const form ref(); 导致点击了修改后&#xff0c;页面弹框显示出来&#xff0c;数据没有回显。 一直…...

来 Gitcode 免费体验 DeepSeek 蒸馏模型,开启 AI 探索新旅程

在 AI 技术飞速发展的时代&#xff0c;你是否也怀揣着对前沿科技的无限好奇与探索欲望&#xff1f;然而&#xff0c;昂贵的模型体验费用和复杂的操作流程&#xff0c;是不是让你一次次望而却步&#xff1f;现在&#xff0c;这些都不再是问题&#xff01;DeepSeek 蒸馏模型现已强…...

2.Mkdocs配置说明(mkdocs.yml)【最新版】

官方文件&#xff1a;Changing the colors - Material for MkDocs 建议详细学习一下上面的官方网站↑↑↑ 我把我目前的配置文件mkdocs.yml代码写在下面&#x1f447;&#x1f3fb; #[Info] site_name: Mkdocs教程 #your site name 显示在左上角 site_url: http://wcowin.wo…...

云轴科技ZStack+海光DCU:率先推出DeepSeek私有化部署方案

针对日益强劲的AI推理需求和企业级AI应用私有化部署场景&#xff08;Private AI&#xff09;&#xff0c;云轴科技ZStack联合海光信息&#xff0c;共同推动ZStack智塔全面支持DeepSeek V3/R1/Janus Pro系列模型&#xff0c;基于海光DCU实现高性能适配&#xff0c;为企业提供安全…...

扩增子分析|零模型2——基于βNTI的微生物随机性和确定性装配过程(箱线图和柱状图R中实现)

一、引言 我们之前发布的周集中老师团队零模型R中实战案例&#xff1a;扩增子分析|基于零模型的群落确定性和随机性构建过程——R实战_bmntd-CSDN博客。在文末只输出了一个.csv 表格。并没有提供绘图的方法&#xff0c;有小伙伴问如何在R中一键成图呢&#xff1f;还真可以&…...

专题:剑指offer

链表 JZ6 从尾到头打印链表 思路&#xff1a;先顺序输出到栈里面 然后再以此从栈顶弹出即可 /** * struct ListNode { * int val; * struct ListNode *next; * ListNode(int x) : * val(x), next(NULL) { * } * }; */ #include …...

DeepSeek 部署过程中的问题

文章目录 DeepSeek 部署过程中的问题一、部署扩展&#xff1a;docker 部署 DS1.1 部署1.2 可视化 二、问题三、GPU 设置3.1 ollama GPU 的支持情况3.2 更新 GPU 驱动3.3 安装 cuda3.4 下载 cuDNN3.5 配置环境变量 四、测试 DeepSeek 部署过程中的问题 Windows 中 利用 ollama 来…...

DeepSeek R1本地化部署 Ollama + Chatbox 打造最强 AI 工具

&#x1f308; 个人主页&#xff1a;Zfox_ &#x1f525; 系列专栏&#xff1a;Linux 目录 一&#xff1a;&#x1f525; Ollama &#x1f98b; 下载 Ollama&#x1f98b; 选择模型&#x1f98b; 运行模型&#x1f98b; 使用 && 测试 二&#xff1a;&#x1f525; Chat…...

应急场景中的数据融合与对齐

1. 概述 在应急管理中,快速、准确地掌握现场状况、实时监控灾情并进行决策至关重要。各类数据(如卫星影像、无人机图像、激光雷达点云、地理信息系统(GIS)数据、传感器数据、社交媒体信息、移动终端数据等)具有来源广泛、格式多样、时空特性不同等特点。如何将这些异构数…...

手机上运行AI大模型(Deepseek等)

最近deepseek的大火&#xff0c;让大家掀起新一波的本地部署运行大模型的热潮&#xff0c;特别是deepseek有蒸馏的小参数量版本&#xff0c;电脑上就相当方便了&#xff0c;直接ollamaopen-webui这种类似的组合就可以轻松地实现&#xff0c;只要硬件&#xff0c;如显存&#xf…...

Mellanox网卡信息查看

1、查看Mellanox网卡的SN&#xff08;序列号&#xff09;和PN mstvpd 04:00.0或者lspci -s 04:00.0 -vvv来自https://enterprise-support.nvidia.com/s/article/MLNX2-117-2532kn 2、查看Mellanox网卡驱动、固件版本 ethtool -i ens6np0...

大数据学习栈记——Neo4j的安装与使用

本文介绍图数据库Neofj的安装与使用&#xff0c;操作系统&#xff1a;Ubuntu24.04&#xff0c;Neofj版本&#xff1a;2025.04.0。 Apt安装 Neofj可以进行官网安装&#xff1a;Neo4j Deployment Center - Graph Database & Analytics 我这里安装是添加软件源的方法 最新版…...

利用ngx_stream_return_module构建简易 TCP/UDP 响应网关

一、模块概述 ngx_stream_return_module 提供了一个极简的指令&#xff1a; return <value>;在收到客户端连接后&#xff0c;立即将 <value> 写回并关闭连接。<value> 支持内嵌文本和内置变量&#xff08;如 $time_iso8601、$remote_addr 等&#xff09;&a…...

Leetcode 3576. Transform Array to All Equal Elements

Leetcode 3576. Transform Array to All Equal Elements 1. 解题思路2. 代码实现 题目链接&#xff1a;3576. Transform Array to All Equal Elements 1. 解题思路 这一题思路上就是分别考察一下是否能将其转化为全1或者全-1数组即可。 至于每一种情况是否可以达到&#xf…...

【大模型RAG】Docker 一键部署 Milvus 完整攻略

本文概要 Milvus 2.5 Stand-alone 版可通过 Docker 在几分钟内完成安装&#xff1b;只需暴露 19530&#xff08;gRPC&#xff09;与 9091&#xff08;HTTP/WebUI&#xff09;两个端口&#xff0c;即可让本地电脑通过 PyMilvus 或浏览器访问远程 Linux 服务器上的 Milvus。下面…...

c++ 面试题(1)-----深度优先搜索(DFS)实现

操作系统&#xff1a;ubuntu22.04 IDE:Visual Studio Code 编程语言&#xff1a;C11 题目描述 地上有一个 m 行 n 列的方格&#xff0c;从坐标 [0,0] 起始。一个机器人可以从某一格移动到上下左右四个格子&#xff0c;但不能进入行坐标和列坐标的数位之和大于 k 的格子。 例…...

三体问题详解

从物理学角度&#xff0c;三体问题之所以不稳定&#xff0c;是因为三个天体在万有引力作用下相互作用&#xff0c;形成一个非线性耦合系统。我们可以从牛顿经典力学出发&#xff0c;列出具体的运动方程&#xff0c;并说明为何这个系统本质上是混沌的&#xff0c;无法得到一般解…...

什么是Ansible Jinja2

理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具&#xff0c;可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板&#xff0c;允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板&#xff0c;并通…...

Reasoning over Uncertain Text by Generative Large Language Models

https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829 1. 概述 文本中的不确定性在许多语境中传达,从日常对话到特定领域的文档(例如医学文档)(Heritage 2013;Landmark、Gulbrandsen 和 Svenevei…...

虚拟电厂发展三大趋势:市场化、技术主导、车网互联

市场化&#xff1a;从政策驱动到多元盈利 政策全面赋能 2025年4月&#xff0c;国家发改委、能源局发布《关于加快推进虚拟电厂发展的指导意见》&#xff0c;首次明确虚拟电厂为“独立市场主体”&#xff0c;提出硬性目标&#xff1a;2027年全国调节能力≥2000万千瓦&#xff0…...

MacOS下Homebrew国内镜像加速指南(2025最新国内镜像加速)

macos brew国内镜像加速方法 brew install 加速formula.jws.json下载慢加速 &#x1f37a; 最新版brew安装慢到怀疑人生&#xff1f;别怕&#xff0c;教你轻松起飞&#xff01; 最近Homebrew更新至最新版&#xff0c;每次执行 brew 命令时都会自动从官方地址 https://formulae.…...