当前位置: 首页 > news >正文

大数据——spark一文全知道

1、spark概述

spark是专为大规模数据处理而设计的快速通用计算引擎,与Hadoop的MapReduce功能类似,但它是基于内存的分布式计算框架,存储还是采用HDFS。

MapReduce和Spark的区别

  • MapReduce的MapReduce之间需要通过磁盘进行数据传递,Spark直接存在内存中,所以速度更快。
  • MapReduce的Task调度和启动开销大,而Spark的Task在线程中开销小一些。
  • MapReduce编程不够灵活,Spark的API丰富。
  • MapReduce的Map和Reduce都要一次shuffle,而Spark可以减少shuffle。

两者框架的区别:

功能Hadoop组件Spark组件
批处理MapReduce、Hive或者PigSpark Core、Spark SQL
交互式计算Impala、prestoSpark SQL
流式计算StormSpark Streaming
机器学习MahoutSpark ML、Spark MLLib

Spark具有以下优点:

  • 基于内存速度快;
  • Java、Python和R语言可以开发spark易用性好;
  • spark框架组件丰富,通用性高;
  • 可以运行在多种存储结构上,兼容性高。

Spark的缺点:

  • 内存消耗大。

2、Spark数据集

Spark的数据集合采用RDD(Resilient Distributed Dataset)弹性分布式数据集,它是一个不可变、可分区和可并行计算的集合。

  • 不可变:RDD1到RDD2时,RDD1任然存在;
  • 可分区:可分为多个partition;
  • 并行计算;
  • Dataset是指数据集,主要用于存放数据;
  • Distributed是指分布式存储,并且可以进行分布式计算;
  • Resilient弹性的特点:
    • 数据可以保存在磁盘中,也可以在内存中;
    • 数据分布式存储也是弹性的:
      • RDD分在多个节点上存储,与HDFS的分布式存储原理类似:HDFS文件以128M为基准切分为多个block存储在各个节点上,而RDD则会被切分为多个partition,这些partition在不同的节点上;
      • spark读取HDFS时,会把HDFS上的block读到内存上对应为partition;
      • spark计算结束时,会把数据存储到HDFS上,可以对应到Hive或者HBase上,以HDFS为例:RDD的每一个partition的大小小于128M时,一个partition对应HDFS的block;大于128M时,则会切分为多个block。

3、RDD的数据操作

RDD的数据操作也叫做算子,一共包括三类算子:transformation、action和persist,其中前两种进行数据处理,persist进行数据存储操作。

  • transformation:是将一个已经存在的数据集转化为一个新的数据集,map就是一个transformation操作,把数据集的每一个元素传给函数并返回新的RDD
  • action:获取数据进行运算后的结果,reduce就是一个action操作,一般聚合RDD所有元素的操作,并返回最终计算结果。
  • persist:缓存数据,可以把数据缓存在内存上,也可以缓存在磁盘上,甚至可以到磁盘其他节点上。

我们要了解所有的transformation的操作都是lazy:即不会立刻计算结果,而是记录下数据集的transformation操作,只有调用了action操作之后才会计算所有的transformation,这样会让spark运行效率更高。
在这里插入图片描述
pyspark启动
进入SPARK_HOME/sbin⽬录下执⾏

pyspark

在这里插入图片描述
sparkUI
可以在spark UI中看到当前的Spark作业 在浏览器访问当前centos的4040端⼝192.168.19.137:4040
在这里插入图片描述
启动RDD
在这里插入图片描述

3.1 transformation算子

  • map(func):将func函数作用到数据集的每一个元素上,返回一个新的RDD
rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9],3)
rdd2 = rdd1.map(lambda x:x+1)
print(rdd2.collect())

[2, 3, 4, 5, 6, 7, 8, 9, 10]

  • filter(func):筛选func函数中为true的元素,返回一个新的RDD
rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9],3)
rdd2 = rdd1.map(lambda x:x*2)
rdd3 = rdd2.filter(lambda x:x>10)
print(rdd3.collect())

[12, 14, 16, 18]

  • flatMap(func):先执行map操作,然后将所有对象合并为一个对象
rdd1 = sc.parallelize(["a b c","d e f","h i j"])
rdd2 = rdd1.flatMap(lambda x:x.split(' '))
rdd3 = rdd1.map(lambda x:x.split(' '))
print('flatmap',rdd2.collect())
print('map',rdd3.collect())

flatmap [‘a’, ‘b’, ‘c’, ‘d’, ‘e’, ‘f’, ‘h’, ‘i’, ‘j’]
map [[‘a’, ‘b’, ‘c’], [‘d’, ‘e’, ‘f’], [‘h’, ‘i’, ‘j’]]

  • union(rdd):两个RDD并集
rdd1 = sc.parallelize([("a", 1), ("b", 2)])
rdd2 = sc.parallelize([("c", 1), ("b", 3)])
rdd3 = rdd1.union(rdd2)
print(rdd3.collect())

[(‘a’, 1), (‘b’, 2), (‘c’, 1), (‘b’, 3)]

  • intersection(rdd):两个RDD求交集
rdd1 = sc.parallelize([("a", 1), ("b", 2)])
rdd2 = sc.parallelize([("c", 1), ("b", 3)])
rdd3 = rdd1.union(rdd2)
rdd4 = rdd3.intersection(rdd2)
print(rdd4.collect())

[(‘c’, 1), (‘b’, 3)]

  • groupByKey():以元祖中的第0个元素为key,进行分组,返回新的RDD,返回的结果中value是Iterable需要list进行转化
rdd1 = sc.parallelize([("a", 1), ("b", 2)])
rdd2 = sc.parallelize([("c", 1), ("b", 3)])
rdd3 = rdd1.union(rdd2)
rdd4 = rdd3.groupByKey()
print(rdd4.collect())
print(list(rdd4.collect()[0][1]))

[(‘b’, <pyspark.resultiterable.ResultIterable object at 0x7f23ab41a4a8>),
(‘c’, <pyspark.resultiterable.ResultIterable object at 0x7f23ab41a4e0>),
(‘a’, <pyspark.resultiterable.ResultIterable object at 0x7f23ab41a438>)]
[2, 3]

  • reduceByKey(func):将key相同的键值对,按照func进行计算,返回新的RDD
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
rdd2 = rdd.reduceByKey(lambda x,y:x+y)
print(rdd2.collect())

[(‘a’, 2), (‘b’, 1)]

  • sortByKey(ascending=True, numPartitions=None, keyfunc=
tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb',5)]
tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white',9)])
rdd1 = sc.parallelize(tmp2)
rdd2 = rdd1.sortByKey(True,3,keyfunc=lambda k:k.lower())
print(rdd2.collect())

[(‘a’, 3), (‘fleece’, 7), (‘had’, 2), (‘lamb’, 5), (‘little’, 4), (‘Mary’, 1), (‘was’, 8), (‘white’, 9), (‘whose’, 6)]

  • mapPatitions(func):分块进行map,默认的map是一行行数据进行,该函数是一块块进行的,适合数据量大的情况。
  • sparkContext.broadcast(要共享的数据):当某个数据需要反复查询时,不用把数据放进task中,可以通过⼴播变量, 通知当前worker上所有的task, 来共享这个数据,避免数据的多次复制,可以⼤⼤降低内存的开销。

3.2 action算子

  • collect():返回⼀个list,list中包含 RDD中的所有元素,建议数量较小时使用,数据较大不会全部显示
rdd1 = sc.parallelize([1,2,3,4,5])
print(rdd1.collect())

[1, 2, 3, 4, 5]

  • reduce(func):将RDD中元素两两传递给输⼊函数,同时产⽣⼀个新的值,新产⽣的值与RDD中下⼀个元素再被传递给输⼊函数直到最后只有⼀个值为⽌。
rdd1 = sc.parallelize([1,2,3,4,5])
result = rdd1.reduce(lambda x,y:x+y)
print(result)

15

  • first():返回RDD中的第一个元素
rdd1 = sc.parallelize([1,2,3,4,5])
result = rdd1.first()
print(result)

1

  • take(num):返回RDD的前num个元素
rdd1 = sc.parallelize([1,2,3,4,5])
result = rdd1.take(3)
print(result)

[1, 2, 3]

  • count():返回RDD元素个数
rdd1 = sc.parallelize([1,2,3,4,5])
result = rdd1.count()
print(result)

5

4、Spark架构

在这里插入图片描述

  • Client:客户端进程
  • Driver:一个Spark作业负责一个Driver进程,负责向Master注册和注销,包括:StageScheduler、TaskSchedule和DAGSchedule。
    • StageSchedule:负责生成Stage。
      • Stage:一个Spark作业一般包含一到多个Stage。
    • DAGSchedule:负责将Spark作业分解成一个多个Stage,将Stage根据RDD的Partition个数决定Task个数,然后放到TaskSchedule中。
    • TaskSchedule:将Task分配到ExecutorBackend上执行,并监控Task状态。
      • Task:一个Stage包含一个多个Task,多个Task实现并行运行。
  • Application:Spark应用程序,批处理作业的集合。其中main方法时入口,定义了RDD和RDD的操作。
  • Master:Standalone模式中的主控节点,负责接收Client提交的作业,管理Worker,并让Worker启动Driver和Executro。
  • Worker:Standalone模式中的salve节点上的守护节点,负责管理本节点的资源,定期向Master汇报心跳,接收Master命令,启动Driver和Executor。

Spark作业的Stage划分

  • 窄依赖:父RDD的每个Partition最多被一个子RDD的Partition所使用,即一个父RDD对应一个子RDD。map、filter、union、join对输入做协同划分。
  • 宽依赖:子RDD依赖所有父RDD分区。groupByKey、join对输入做非协同划分。

窄依赖的所有RDD作为一个Stage,遇到宽依赖结束。

在这里插入图片描述
在这里插入图片描述

相关文章:

大数据——spark一文全知道

1、spark概述 spark是专为大规模数据处理而设计的快速通用计算引擎&#xff0c;与Hadoop的MapReduce功能类似&#xff0c;但它是基于内存的分布式计算框架&#xff0c;存储还是采用HDFS。 MapReduce和Spark的区别 MapReduce的MapReduce之间需要通过磁盘进行数据传递&#xf…...

Linux命令200例:telnet用于远程登录的网络协议(常用)

&#x1f3c6;作者简介&#xff0c;黑夜开发者&#xff0c;全栈领域新星创作者✌。CSDN专家博主&#xff0c;阿里云社区专家博主&#xff0c;2023年6月csdn上海赛道top4。 &#x1f3c6;数年电商行业从业经验&#xff0c;历任核心研发工程师&#xff0c;项目技术负责人。 &…...

使用 eBPF 在云中实现网络可观测性

可观测性是一种了解和解释应用当前状态的能力&#xff0c;也是一种知道何时出现问题的方法。随着在 Kubernetes 和 OpenShift 上以微服务形式进行云部署的应用程序越来越多&#xff0c;可观察性受到了广泛关注。许多应用程序都有严格的承诺&#xff0c;比如在停机时间、延迟和吞…...

linux安装部署gitlab全教程,包含配置中文

linux安装部署gitlab全教程&#xff0c;包含配置中文 大家好&#xff0c;我是酷酷的韩~ 1.前期准备 安装包下载地址 https://mirrors.tuna.tsinghua.edu.cn/gitlab-ce/yum/el7/ 我这里选择的这个gitlab-ce-15.7.3-ce.0.el7.x86_64.rpm 还有一些相关依赖包(地址等审核过我放到…...

软考高级系统架构设计师系列论文八十:论企业信息化战略规划技术

软考高级系统架构设计师系列论文八十:论企业信息化战略规划技术 一、企业信息化相关知识点二、摘要三、正文四、总结一、企业信息化相关知识点 软考高级系统架构设计师:企业信息化战略与实施...

使用ChatGPT构建一个AIML聊天机器人是什么体验

​ 使用ChatGPT构建一个AIML聊天机器人是什么体验&#xff0c;使用ChatGPT将C#代码转换为Swift代码以实现Swift版的Aiml聊天机器人&#xff0c;AIML&#xff08;全名为Artificial Intelligence Markup Language&#xff09;是一种基于XML模式匹配的人工智能标记语言&#xff0c…...

[JavaWeb]【九】web后端开发-SpringBootWeb案例(菜单)

目录 一、准备工作 1.1 需求 1.2 环境搭建 1.2.1 准备数据库&表 1.2.2 创建springboot工程 1.2.3 配置application.properties & 准备对应实体类 1.2.3.1 application.properties 1.2.3.2 实体类 1.2.3.2.1 Emp类 1.2.3.2.2 Dept类 1.2.4 准备对应的Mapper、…...

vue 主组件把日期选择器给子组件props传obj值, 与子组件监听 watch对象或对象属性

1 主组件 1.1 :passObj 这种 非v-model ; change"DateChange"触发事件 <template> <div class"date-picker-panel"><el-date-picker v-model"value2" type"datetimerange" :picker-options"pickerOptions"…...

WebDAV之π-Disk派盘 + 一刻日记

一刻日记是一款日记、笔记和备忘录应用程序,旨在提供一个简单而专注的日记写作工具。它提供了一个干净、直观的界面,允许用户记录和管理他们的日常事务、个人情感、成就和目标等内容。 一刻日记的主要功能包括: – 创建和编辑日记、用户可以撰写和编辑自己的日记,记录重要…...

springboot aop实现接口防重复操作

一、前言 有时在项目开发中某些接口逻辑比较复杂&#xff0c;响应时间长&#xff0c;那么可能导致重复提交问题。 二、如何解决 1.先定义一个防重复提交的注解。 import java.lang.annotation.*;Inherited Target(ElementType.METHOD) Retention(RetentionPolicy.RUNTIME) Do…...

ubuntu18.04复现yolo v8环境配置之CUDA与pytorch版本问题以及多CUDA版本安装及切换

最近在复现yolo v8的程序&#xff0c;特记录一下过程 环境&#xff1a;ubuntu18.04ros melodic 小知识&#xff1a;GPU并行计算能力高于CPU—B站UP主说的 Ubuntu可以安装多个版本的CUDA。如果某个程序的Pyorch需要不同版本的CUDA&#xff0c;不必删除之前的CUDA&#xff0c;…...

Yaml配置文件读取方法

在日常的代码中&#xff0c;有一些值是配置文件中定义的&#xff0c;这些值可以根据用户的要求进行调整和改变。这往往会写在yaml格式的文件中。这样开放程序给用户时&#xff0c;就可以不必开放对应的源码&#xff0c;只开放yaml格式的配置文件即可。 将配置文件中的值读入程…...

Python3 lambda 函数入门示例 Python lambda 函数

Python lambda 函数 首先&#xff0c;这个语法跟C的语法几乎一样&#xff1b; 通常称 lambda 函数为匿名函数&#xff0c;也称为 丢弃函数&#xff0c;因为应一下子就不要了&#xff0c;不会长期凝结下来形成SDK API&#xff1b;本人觉得它有点类似 inline 函数&#xff0c;或者…...

【计算机网络】HTTPs 传输流程

HTTPS和HTTP的区别 1、HTTP协议传输的数据都是未加密的&#xff0c;是明文的&#xff0c;使用HTTP协议传输隐私信息非常不安 HTTPS协议是由SSLHTTP协议构建的可进行加密传输、身份认证的网络协议&#xff0c;要比http协议安全。 2、HTTPS协议需要到CA申请证书&#xff0c;一般…...

【Linux】国产深度系统装机必备(开发、日常使用)

开发相关工具 IDE推荐官网下载JetBrains Toolbox&#xff0c;后续所有与jetbrains直接全部到toolbox中下载&#xff0c;这里默认所有的app全部放在个人用户下&#xff08;/data/home/计算机用户名/.local/share/JetBrains/Toolbox/apps&#xff09;终端可视化工具&#xff1a;…...

动态规划入门:斐波那契数列模型以及多状态(C++)

斐波那契数列模型以及多状态 动态规划简述斐波那契数列模型1.第 N 个泰波那契数&#xff08;简单&#xff09;2.三步问题&#xff08;简单&#xff09;3.使⽤最⼩花费爬楼梯&#xff08;简单&#xff09;4.解码方法&#xff08;中等&#xff09; 简单多状态1.打家劫舍&#xff…...

LeetCode438.找到字符串中所有字母异位词

因为之前写过一道找字母异位词分组的题&#xff0c;所以这道题做起来还是比较得心应手。我像做之前那道字母异位词分组一样&#xff0c;先把模板p排序&#xff0c;然后拿滑动窗口去s中从头到尾滑动&#xff0c;窗口中的这段字串也给他排序&#xff0c;然后拿这两个排完序的stri…...

【微服务】03-HttpClientFactory与gRpc

文章目录 1.HttpClientFactory &#xff1a;管理外向请求的最佳实践1.1 核心能力1.2 核心对象1.3 HttpClient创建模式 2.gRPC&#xff1a;内部服务间通讯利器2.1 什么是gRPC2.2 特点gRPC特点2.3.NET生态对gRPC的支持情况2.4 服务端核心包2.5 客户端核心包2.5 .proto文件2.6 gRP…...

iOS开发之查看静态库(.a/.framework)中包含的.o文件和函数符号(ar,nm命令)

.a/.framework其实是把编译生成的.o文件&#xff0c;打包成一个.a/.framework文件。a的意思是archive/归档的意思。 查看静态库.a文件包含的内容用下面的命令解压&#xff1a; ar x xxx.a 用ar命令打包静态库&#xff1a; 参数r是将后面的*.o或者*.a文件添加到目标文件中 参数…...

Idea常用快捷键--让你代码效率提升一倍(一)

一、代码编辑相关快捷键 1.单行复制(实现快速创建多个对象)CtrlD 2.空出下一行 ShiftEnter 3.单行注释快捷键 ctrl / 4.快速构建构造函数&#xff0c;setter&#xff0c;getter、toString方法 AltInsert 4.显示快速修复和操作的菜单 altenter 5.格式化代码&#xff1a;C…...

eNSP-Cloud(实现本地电脑与eNSP内设备之间通信)

说明&#xff1a; 想象一下&#xff0c;你正在用eNSP搭建一个虚拟的网络世界&#xff0c;里面有虚拟的路由器、交换机、电脑&#xff08;PC&#xff09;等等。这些设备都在你的电脑里面“运行”&#xff0c;它们之间可以互相通信&#xff0c;就像一个封闭的小王国。 但是&#…...

golang循环变量捕获问题​​

在 Go 语言中&#xff0c;当在循环中启动协程&#xff08;goroutine&#xff09;时&#xff0c;如果在协程闭包中直接引用循环变量&#xff0c;可能会遇到一个常见的陷阱 - ​​循环变量捕获问题​​。让我详细解释一下&#xff1a; 问题背景 看这个代码片段&#xff1a; fo…...

DeepSeek 赋能智慧能源:微电网优化调度的智能革新路径

目录 一、智慧能源微电网优化调度概述1.1 智慧能源微电网概念1.2 优化调度的重要性1.3 目前面临的挑战 二、DeepSeek 技术探秘2.1 DeepSeek 技术原理2.2 DeepSeek 独特优势2.3 DeepSeek 在 AI 领域地位 三、DeepSeek 在微电网优化调度中的应用剖析3.1 数据处理与分析3.2 预测与…...

Redis相关知识总结(缓存雪崩,缓存穿透,缓存击穿,Redis实现分布式锁,如何保持数据库和缓存一致)

文章目录 1.什么是Redis&#xff1f;2.为什么要使用redis作为mysql的缓存&#xff1f;3.什么是缓存雪崩、缓存穿透、缓存击穿&#xff1f;3.1缓存雪崩3.1.1 大量缓存同时过期3.1.2 Redis宕机 3.2 缓存击穿3.3 缓存穿透3.4 总结 4. 数据库和缓存如何保持一致性5. Redis实现分布式…...

【CSS position 属性】static、relative、fixed、absolute 、sticky详细介绍,多层嵌套定位示例

文章目录 ★ position 的五种类型及基本用法 ★ 一、position 属性概述 二、position 的五种类型详解(初学者版) 1. static(默认值) 2. relative(相对定位) 3. absolute(绝对定位) 4. fixed(固定定位) 5. sticky(粘性定位) 三、定位元素的层级关系(z-i…...

如何将联系人从 iPhone 转移到 Android

从 iPhone 换到 Android 手机时&#xff0c;你可能需要保留重要的数据&#xff0c;例如通讯录。好在&#xff0c;将通讯录从 iPhone 转移到 Android 手机非常简单&#xff0c;你可以从本文中学习 6 种可靠的方法&#xff0c;确保随时保持连接&#xff0c;不错过任何信息。 第 1…...

C++ 基础特性深度解析

目录 引言 一、命名空间&#xff08;namespace&#xff09; C 中的命名空间​ 与 C 语言的对比​ 二、缺省参数​ C 中的缺省参数​ 与 C 语言的对比​ 三、引用&#xff08;reference&#xff09;​ C 中的引用​ 与 C 语言的对比​ 四、inline&#xff08;内联函数…...

OPenCV CUDA模块图像处理-----对图像执行 均值漂移滤波(Mean Shift Filtering)函数meanShiftFiltering()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 在 GPU 上对图像执行 均值漂移滤波&#xff08;Mean Shift Filtering&#xff09;&#xff0c;用于图像分割或平滑处理。 该函数将输入图像中的…...

AI书签管理工具开发全记录(十九):嵌入资源处理

1.前言 &#x1f4dd; 在上一篇文章中&#xff0c;我们完成了书签的导入导出功能。本篇文章我们研究如何处理嵌入资源&#xff0c;方便后续将资源打包到一个可执行文件中。 2.embed介绍 &#x1f3af; Go 1.16 引入了革命性的 embed 包&#xff0c;彻底改变了静态资源管理的…...

HashMap中的put方法执行流程(流程图)

1 put操作整体流程 HashMap 的 put 操作是其最核心的功能之一。在 JDK 1.8 及以后版本中&#xff0c;其主要逻辑封装在 putVal 这个内部方法中。整个过程大致如下&#xff1a; 初始判断与哈希计算&#xff1a; 首先&#xff0c;putVal 方法会检查当前的 table&#xff08;也就…...