SparkRDD及算子-python版
RDD相关知识
RDD介绍
RDD
是Spark
的核心抽象,即 弹性分布式数据集(residenta distributed dataset
)。代表一个不可变,可分区,里面元素可并行计算的集合。其具有数据流模型的特点:自动容错,位置感知性调度和可伸缩性。 在Spark
中,对数据的所有操作不外乎创建RDD
、转化已有RDD
以及调用 RDD
操作进行求值。
RDD结构图
RDD具有五大特性
-
一组分片(
Partition
),即数据集的基本组成单位(RDD
是由一系列的partition
组成的)。将数据加载为RDD
时,一般会遵循数据的本地性(一般一个HDFS
里的block
会加载为一个partition
)。 -
RDD
之间的依赖关系。依赖还具体分为宽依赖和窄依赖,但并不是所有的RDD
都有依赖。为了容错(重算,cache
,checkpoint
),也就是说在内存中的RDD
操作时出错或丢失会进行重算。 -
由一个函数计算每一个分片。
Spark
中的RDD
的计算是以分片为单位的,每个RDD
都会实现compute
函数以达到这个目的。compute
函数会对迭代器进行复合,不需要保存每次计算的结果。 -
(可选)如果
RDD
里面存的数据是key-value
形式,则可以传递一个自定义的Partitioner
进行重新分区。 -
(可选)
RDD
提供一系列最佳的计算位置,即数据的本地性。
RDD之间的依赖关系
RDD
之间有一系列的依赖关系,依赖关系又分为窄依赖和宽依赖。
窄依赖:父RDD
和子RDD
partition
之间的关系是一对一的。或者父RDD
一个partition
只对应一个子RDD
的partition
情况下的父RDD
和子RDD
partition
关系是多对一的,也可以理解为没有触发shuffle
。
宽依赖:父RDD
与子RDD
partition
之间的关系是一对多。 父RDD
的一个分区的数据去到子RDD
的不同分区里面。也可以理解为触发了shuffle
。
特别说明:对于join
操作有两种情况,如果join
操作的使用每个partition
仅仅和已知的Partition
进行join
,此时的join
操作就是窄依赖;其他情况的join
操作就是宽依赖。
RDD创建
-
从
Hadoop
文件系统(或与Hadoop
兼容的其他持久化存储系统,如Hive
、Cassandra
、HBase
)输入(例如HDFS
)创建。 -
通过集合进行创建。
算子
算子可以分为Transformation
转换算子和Action
行动算子。 RDD
是懒执行的,如果没有行动操作出现,所有的转换操作都不会执行。
RDD
直观图,如下:
RDD 的 五大特性
-
一组分片(
Partition
),即数据集的基本组成单位。对于RDD
来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD
时指定RDD
的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core
的数目。 -
一个计算每个分区的函数。
Spark
中RDD
的计算是以分片为单位的,每个RDD
都会实现compute
函数以达到这个目的。compute
函数会对迭代器进行复合,不需要保存每次计算的结果。 -
RDD
之间的依赖关系。RDD
的每次转换都会生成一个新的RDD
,所以RDD
之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark
可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD
的所有分区进行重新计算。 -
一个
Partitioner
,即RDD
的分片函数。当前Spark
中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner
,另外一个是基于范围的RangePartitioner
。只有对于于key-value
的RDD
,才会有Partitioner
,非key-value
的RDD
的Parititioner
的值是None
。Partitioner
函数不但决定了RDD
本身的分片数量,也决定了parent RDD Shuffle
输出时的分片数量。 -
一个列表,存储存取每个
Partition
的优先位置(preferred location
)。对于一个HDFS
文件来说,这个列表保存的就是每个 Partition 所在的块的位置。按照“移动数据不如移动计算”的理念,Spark 在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
相关API介绍
SparkContext
创建;
sc = SparkContext("local", "Simple App")
说明:"local"
是指让Spark
程序本地运行,"Simple App"
是指Spark
程序的名称,这个名称可以任意(为了直观明了的查看,最好设置有意义的名称)。
- 集合并行化创建
RDD
;
data = [1,2,3,4]
rdd = sc.parallelize(data)
collect
算子:在驱动程序中将数据集的所有元素作为数组返回(注意数据集不能过大);
rdd.collect()
- 停止
SparkContext
。
sc.stop()
# -*- coding: UTF-8 -*-
from pyspark import SparkContextif __name__ == "__main__":#********** Begin **********## 1.初始化 SparkContext,该对象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.创建一个1到8的列表Listdata = [1, 2, 3, 4, 5, 6, 7, 8]# 3.通过 SparkContext 并行化创建 rddrdd = sc.parallelize(data)# 4.使用 rdd.collect() 收集 rdd 的内容。 rdd.collect() 是 Spark Action 算子,在后续内容中将会详细说明,主要作用是:收集 rdd 的数据内容result = rdd.collect()# 5.打印 rdd 的内容print(result)# 6.停止 SparkContextsc.stop()#********** End **********#
读取外部数据集创建RDD
编写读取本地文件创建Spark RDD
的程序。
相关知识
为了完成本关任务,你需要掌握:1.如何读取本地文件系统中的文件来创建Spark RDD
。
textFile 介绍
PySpark
可以从Hadoop
支持的任何存储源创建分布式数据集,包括本地文件系统,HDFS
,Cassandra
,HBase
,Amazon S3
等。Spark
支持文本文件,SequenceFiles
和任何其他Hadoop InputFormat
。
文本文件RDD
可以使用创建SparkContex
的textFile
方法。此方法需要一个 URI
的文件(本地路径的机器上,或一个hdfs://,s3a://
等 URI),并读取其作为行的集合。这是一个示例调用:
distFile = sc.textFile("data.txt")
# -*- coding: UTF-8 -*- from pyspark import SparkContextif __name__ == '__main__':#********** Begin **********## 1.初始化 SparkContext,该对象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 文本文件 RDD 可以使用创建 SparkContext 的t extFile 方法。 #此方法需要一个 URI的 文件(本地路径的机器上,或一个hdfs://,s3a://等URI), #并读取其作为行的集合# 2.读取本地文件,URI为:/root/wordcount.txtrdd = sc.textFile("/root/wordcount.txt")# 3.使用 rdd.collect() 收集 rdd 的内容。 #rdd.collect() 是 Spark Action 算子,在后续内容中将会详细说明,主要作用是:收集 rdd 的数据内容result = rdd.collect()# 4.打印 rdd 的内容print(result)# 5.停止 SparkContextsc.stop()#********** End **********#
map
算子
本关任务:使用Spark
的 map
算子按照相关需求完成转换操作。
相关知识
为了完成本关任务,你需要掌握:如何使用map
算子。
map
将原来RDD
的每个数据项通过map
中的用户自定义函数 f
映射转变为一个新的元素。
图中每个方框表示一个RDD
分区,左侧的分区经过自定义函数 f:T->U
映射为右侧的新 RDD
分区。但是,实际只有等到 Action
算子触发后,这个 f
函数才会和其他函数在一个 Stage
中对数据进行运算。
map 案例
-
sc = SparkContext("local", "Simple App") data = [1,2,3,4,5,6] rdd = sc.parallelize(data) print(rdd.collect()) rdd_map = rdd.map(lambda x: x * 2) print(rdd_map.collect())
输出:
[1, 2, 3, 4, 5, 6]
[2, 4, 6, 8, 10, 12]
说明:rdd1
的元素( 1 , 2 , 3 , 4 , 5 , 6
)经过 map
算子( x -> x*2
)转换成了 rdd2
( 2 , 4 , 6 , 8 , 10
)。
编程要求
请仔细阅读右侧代码,根据方法内的提示,在Begin - End
区域内进行代码补充,具体任务如下:
需求:使用 map
算子,将rdd
的数据 (1, 2, 3, 4, 5)
按照下面的规则进行转换操作,规则如下:
- 偶数转换成该数的平方;
- 奇数转换成该数的立方。
# -*- coding: UTF-8 -*-
from pyspark import SparkContextif __name__ == "__main__":#********** Begin **********## 1.初始化 SparkContext,该对象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.创建一个1到5的列表Listdata = [1, 2, 3, 4, 5]# 3.通过 SparkContext 并行化创建 rddrdd = sc.parallelize(data)# 4.使用rdd.collect() 收集 rdd 的元素。print(rdd.collect())"""使用 map 算子,将 rdd 的数据 (1, 2, 3, 4, 5) 按照下面的规则进行转换操作,规则如下:需求:偶数转换成该数的平方奇数转换成该数的立方"""# 5.使用 map 算子完成以上需求rdd_map = rdd.map(lambda x: x * x if x % 2 == 0 else x * x * x)# 6.使用rdd.collect() 收集完成 map 转换的元素print(rdd_map.collect())# 7.停止 SparkContextsc.stop()#********** End **********#
mapPartitions
算子
mapPartitions
mapPartitions
函数获取到每个分区的迭代器,在函数中通过这个分区整体的迭 代器对整个分区的元素进行操作。
图中每个方框表示一个RDD
分区,左侧的分区经过自定义函数 f:T->U
映射为右侧的新RDD
分区。
mapPartitions 与 map
map
:遍历算子,可以遍历RDD
中每一个元素,遍历的单位是每条记录。
mapPartitions
:遍历算子,可以改变RDD
格式,会提高RDD
并行度,遍历单位是Partition
,也就是在遍历之前它会将一个Partition
的数据加载到内存中。
那么问题来了,用上面的两个算子遍历一个RDD
谁的效率高? mapPartitions
算子效率高。
mapPartitions 案例
-
def f(iterator): list = [] for x in iterator: list.append(x*2) return listif __name__ == "__main__": sc = SparkContext("local", "Simple App") data = [1,2,3,4,5,6] rdd = sc.parallelize(data) print(rdd.collect()) partitions = rdd.mapPartitions(f) print(partitions.collect())
输出:
[1, 2, 3, 4, 5, 6]
[2, 4, 6, 8, 10, 12]
mapPartitions()
:传入的参数是rdd
的 iterator
(元素迭代器),返回也是一个iterator
(迭代器)。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext#********** Begin **********#
def f(iterator):list = []for x in iterator:list.append((x, len(x)))return list#********** End **********#
if __name__ == "__main__":#********** Begin **********## 1.初始化 SparkContext,该对象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2. 一个内容为("dog", "salmon", "salmon", "rat", "elephant")的列表Listdata = ["dog", "salmon", "salmon", "rat", "elephant"]# 3.通过 SparkContext 并行化创建 rddrdd = sc.parallelize(data)# 4.使用rdd.collect() 收集 rdd 的元素。print(rdd.collect())"""使用 mapPartitions 算子,将 rdd 的数据 ("dog", "salmon", "salmon", "rat", "elephant") 按照下面的规则进行转换操作,规则如下:需求:将字符串与该字符串的长度组合成一个元组,例如:dog --> (dog,3)salmon --> (salmon,6)"""# 5.使用 mapPartitions 算子完成以上需求partitions = rdd.mapPartitions(f)# 6.使用rdd.collect() 收集完成 mapPartitions 转换的元素print(partitions.collect())# 7.停止 SparkContextsc.stop()#********** End **********#
filter
算子。
filter
filter
函数功能是对元素进行过滤,对每个元素应用f
函数,返 回值为 true
的元素在RDD
中保留,返回值为false
的元素将被过滤掉。内部实现相当于生成。
FilteredRDD(this,sc.clean(f))
下面代码为函数的本质实现:
-
def filter(self, f): """ Return a new RDD containing only the elements that satisfy a predicate.>>> rdd = sc.parallelize([1, 2, 3, 4, 5]) >>> rdd.filter(lambda x: x % 2 == 0).collect() [2, 4] """ def func(iterator): return filter(fail_on_stopiteration(f), iterator) return self.mapPartitions(func, True)
上图中每个方框代表一个 RDD
分区, T
可以是任意的类型。通过用户自定义的过滤函数 f
,对每个数据项操作,将满足条件、返回结果为 true
的数据项保留。例如,过滤掉 V2
和 V3
保留了 V1
,为区分命名为 V’1
。
filter 案例
-
sc = SparkContext("local", "Simple App") data = [1,2,3,4,5,6] rdd = sc.parallelize(data) print(rdd.collect()) rdd_filter = rdd.filter(lambda x: x>2) print(rdd_filter.collect())
输出:
[1, 2, 3, 4, 5, 6]
[3, 4, 5, 6]
说明:rdd1( [ 1 , 2 , 3 , 4 , 5 , 6 ] )
经过 filter
算子转换成 rdd2( [ 3 ,4 , 5 , 6 ] )
。
使用 filter
算子,将 rdd
中的数据 (1, 2, 3, 4, 5, 6, 7, 8)
按照以下规则进行过滤,规则如下:
- 过滤掉
rdd
中的所有奇数。
# -*- coding: UTF-8 -*-
from pyspark import SparkContextif __name__ == "__main__":#********** Begin **********## 1.初始化 SparkContext,该对象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.创建一个1到8的列表Listdata = [1, 2, 3, 4, 5, 6, 7, 8]# 3.通过 SparkContext 并行化创建 rddrdd = sc.parallelize(data)# 4.使用rdd.collect() 收集 rdd 的元素。print(rdd.collect())"""使用 filter 算子,将 rdd 的数据 (1, 2, 3, 4, 5, 6, 7, 8) 按照下面的规则进行转换操作,规则如下:需求:过滤掉rdd中的奇数"""# 5.使用 filter 算子完成以上需求rdd_filter = rdd.filter(lambda x: x % 2 == 0)# 6.使用rdd.collect() 收集完成 filter 转换的元素print(rdd_filter.collect())# 7.停止 SparkContextsc.stop()#********** End **********#
flatMap
算子
flatMap
将原来RDD
中的每个元素通过函数f
转换为新的元素,并将生成的RDD
中每个集合的元素合并为一个集合,内部创建:
FlatMappedRDD(this,sc.clean(f))
上图表示RDD
的一个分区,进行flatMap
函数操作,flatMap
中传入的函数为f:T->U
,T
和U
可以是任意的数据类型。将分区中的数据通过用户自定义函数f
转换为新的数据。外部大方框可以认为是一个RDD
分区,小方框代表一个集合。V1
、V2
、V3
在一个集合作为RDD
的一个数据项,可能存储为数组或其他容器,转换为V’1
、V’2
、V’3
后,将原来的数组或容器结合拆散,拆散的数据形成RDD
中的数据项。
flatMap 案例
sc = SparkContext("local", "Simple App")
data = [["m"], ["a", "n"]]
rdd = sc.parallelize(data)
print(rdd.collect())
flat_map = rdd.flatMap(lambda x: x)
print(flat_map.collect())
输出:
[['m'], ['a', 'n']]
['m', 'a', 'n']
flatMap
:将两个集合转换成一个集合
需求:使用 flatMap
算子,将rdd
的数据 ([1, 2, 3], [4, 5, 6], [7, 8, 9])
按照下面的规则进行转换操作,规则如下:
- 合并
RDD
的元素,例如:([1,2,3],[4,5,6]) --> (1,2,3,4,5,6)
([2,3],[4,5],[6]) --> (1,2,3,4,5,6)
from pyspark import SparkContextif __name__ == "__main__":#********** Begin **********## 1.初始化 SparkContext,该对象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.创建一个[[1, 2, 3], [4, 5, 6], [7, 8, 9]] 的列表Listlist = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]# 3.通过 SparkContext 并行化创建 rddrdd = sc.parallelize(list)# 4.使用rdd.collect() 收集 rdd 的元素。print(rdd.collect()) """使用 flatMap 算子,将 rdd 的数据 ([1, 2, 3], [4, 5, 6], [7, 8, 9]) 按照下面的规则进行转换操作,规则如下:需求:合并RDD的元素,例如:([1,2,3],[4,5,6]) --> (1,2,3,4,5,6)([2,3],[4,5],[6]) --> (1,2,3,4,5,6)"""# 5.使用 filter 算子完成以上需求flat_map = rdd.flatMap(lambda x: x)# 6.使用rdd.collect() 收集完成 filter 转换的元素print(flat_map.collect())# 7.停止 SparkContextsc.stop()#********** End **********#
distinct
算子distinct
distinct
将RDD
中的元素进行去重操作。上图中的每个方框代表一个
RDD
分区,通过distinct
函数,将数据去重。 例如,重复数据V1
、V1
去重后只保留一份V1
。distinct 案例
sc = SparkContext("local", "Simple App") data = ["python", "python", "python", "java", "java"] rdd = sc.parallelize(data) print(rdd.collect()) distinct = rdd.distinct()
-
输出
['python', 'python', 'python', 'java', 'java'] ['python', 'java']
-
print(distinct.collect())
sortByKey
算子sortByKey
-
def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x): if numPartitions is None: numPartitions = self._defaultReducePartitions()memory = self._memory_limit() serializer = self._jrdd_deserializerdef sortPartition(iterator): sort = ExternalSorter(memory * 0.9, serializer).sorted return iter(sort(iterator, key=lambda kv: keyfunc(kv[0]), reverse=(not ascending)))if numPartitions == 1: if self.getNumPartitions() > 1: self = self.coalesce(1) return self.mapPartitions(sortPartition, True)# first compute the boundary of each part via sampling: we want to partition # the key-space into bins such that the bins have roughly the same # number of (key, value) pairs falling into them rddSize = self.count() if not rddSize: return self # empty RDD maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner f\fraction = min(maxSampleSize / max(rddSize, 1), 1.0) samples = self.sample(False, f\fraction, 1).map(lambda kv: kv[0]).collect() samples = sorted(samples, key=keyfunc)# we have numPartitions many parts but one of the them has # an implicit boundary bounds = [samples[int(len(samples) * (i + 1) / numPartitions)] for i in range(0, numPartitions - 1)]def rangePartitioner(k): p = bisect.bisect_left(bounds, keyfunc(k)) if ascending: return p else: return numPartitions - 1 - preturn self.partitionBy(numPartitions, rangePartitioner).mapPartitions(sortPartition, True)
-
说明:
ascending
参数是指排序(升序还是降序),默认是升序。numPartitions
参数是重新分区,默认与上一个RDD
保持一致。keyfunc
参数是排序规则。sortByKey 案例
sc = SparkContext("local", "Simple App")
data = [("a",1),("a",2),("c",1),("b",1)]
rdd = sc.parallelize(data)
key = rdd.sortByKey()
print(key.collect())
-
输出:
-
[('a', 1), ('a', 2), ('b', 1), ('c', 1)]
-
需求:使用 sortBy
算子,将 rdd
中的数据进行排序(升序)。
from pyspark import SparkContextif __name__ == "__main__":# ********** Begin **********## 1.初始化 SparkContext,该对象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.创建一个内容为[('B',1),('A',2),('C',3)]的列表ListList = [('B',1),('A',2),('C',3)]# 3.通过 SparkContext 并行化创建 rddrdd = sc.parallelize(List)# 4.使用rdd.collect() 收集 rdd 的元素print(rdd.collect())"""使用 sortByKey 算子,将 rdd 的数据 ('B', 1), ('A', 2), ('C', 3) 按照下面的规则进行转换操作,规则如下:需求:元素排序,例如:[(3,3),(2,2),(1,1)] --> [(1,1),(2,2),(3,3)]"""# 5.使用 sortByKey 算子完成以上需求key = rdd.sortByKey()# 6.使用rdd.collect() 收集完成 sortByKey 转换的元素print(key.collect())# 7.停止 SparkContextsc.stop()# ********** End **********#
mapValues
算子
mapValues
mapValues
:针对(Key, Value)
型数据中的 Value
进行 Map
操作,而不对 Key
进行处理。
上图中的方框代表 RDD
分区。 a=>a+2
代表对 (V1,1)
这样的 Key Value
数据对,数据只对 Value
中的 1
进行加 2
操作,返回结果为 3
。
mapValues 案例
-
sc = SparkContext("local", "Simple App") data = [("a",1),("a",2),("b",1)] rdd = sc.parallelize(data) values = rdd.mapValues(lambda x: x + 2) print(values.collect())
输出:
-
[('a', 3), ('a', 4), ('b', 3)]
需求:使用mapValues
算子,将rdd
的数据 ("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)
按照下面的规则进行转换操作,规则如下:
- 偶数转换成该数的平方
- 奇数转换成该数的立方
from pyspark import SparkContextif __name__ == "__main__":# ********** Begin **********## 1.初始化 SparkContext,该对象是 Spark 程序的入口sc = SparkContext("local", "Simple App") # 2.创建一个内容为[("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]的列表ListList = [("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]# 3.通过 SparkContext 并行化创建 rddrdd = sc.parallelize(List)# 4.使用rdd.collect() 收集 rdd 的元素print(rdd.collect())"""使用 mapValues 算子,将 rdd 的数据 ("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5) 按照下面的规则进行转换操作,规则如下:需求:元素(key,value)的value进行以下操作:偶数转换成该数的平方奇数转换成该数的立方"""# 5.使用 mapValues 算子完成以上需求values = rdd.mapValues(lambda x: x + 2)# 6.使用rdd.collect() 收集完成 mapValues 转换的元素print(values.collect())# 7.停止 SparkContextsc.stop()# ********** End **********#
reduceByKey
算子reduceByKey
reduceByKey
算子,只是两个值合并成一个值,比如叠加。函数实现
def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash): return self.combineByKey(lambda x: x, func, func, numPartitions, partitionFunc)
上图中的方框代表 RDD
分区。通过自定义函数 (A,B) => (A + B)
,将相同 key
的数据 (V1,2)
和 (V1,1)
的 value
做加法运算,结果为( V1,3)
。
reduceByKey 案例
sc = SparkContext("local", "Simple App")
data = [("a",1),("a",2),("b",1)]
rdd = sc.parallelize(data)
print(rdd.reduceByKey(lambda x,y:x+y).collect())
输出:
[('a', 3), ('b', 1)]
需求:使用 reduceByKey
算子,将 rdd(key-value类型)
中的数据进行值累加。
例如:
("soma",4), ("soma",1), ("soma",2) -> ("soma",7)
from pyspark import SparkContextif __name__ == "__main__":# ********** Begin **********## 1.初始化 SparkContext,该对象是 Spark 程序的入口sc = SparkContext("local", "Simple App") # 2.创建一个内容为[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]的列表ListList = [("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]# 3.通过 SparkContext 并行化创建 rddrdd = sc.parallelize(List) # 4.使用rdd.collect() 收集 rdd 的元素print(rdd.collect())"""使用 reduceByKey 算子,将 rdd 的数据[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)] 按照下面的规则进行转换操作,规则如下:需求:元素(key-value)的value累加操作,例如:(1,1),(1,1),(1,2) --> (1,4)(1,1),(1,1),(2,2),(2,2) --> (1,2),(2,4)"""# 5.使用 reduceByKey 算子完成以上需求reduce = rdd.reduceByKey(lambda x,y:x+y)# 6.使用rdd.collect() 收集完成 reduceByKey 转换的元素print(reduce.collect())# 7.停止 SparkContextsc.stop()# ********** End **********#
Action
的常用算子
count
count()
:返回 RDD
的元素个数。
示例:
-
sc = SparkContext("local", "Simple App") data = ["python", "python", "python", "java", "java"] rdd = sc.parallelize(data) print(rdd.count())
输出:
5
first
first()
:返回 RDD
的第一个元素(类似于take(1)
)。
示例:
-
sc = SparkContext("local", "Simple App") data = ["python", "python", "python", "java", "java"] rdd = sc.parallelize(data) print(rdd.first())
输出:
python
take
take(n)
:返回一个由数据集的前 n
个元素组成的数组。
示例:
-
sc = SparkContext("local", "Simple App") data = ["python", "python", "python", "java", "java"] rdd = sc.parallelize(data) print(rdd.take(2))
输出:
['python', 'python']
reduce
reduce()
:通过func
函数聚集 RDD
中的所有元素,该函数应该是可交换的和关联的,以便可以并行正确计算。
示例:
-
sc = SparkContext("local", "Simple App") data = [1,1,1,1] rdd = sc.parallelize(data) print(rdd.reduce(lambda x,y:x+y))
输出:
4
collect
collect()
:在驱动程序中,以数组的形式返回数据集的所有元素。
示例:
-
sc = SparkContext("local", "Simple App") data = [1,1,1,1] rdd = sc.parallelize(data) print(rdd.collect())
输出:
[1,1,1,1]
具体任务如下:
需求1:使用 count
算子,统计下 rdd
中元素的个数;
需求2:使用 first
算子,获取 rdd
首个元素;
需求3:使用 take
算子,获取 rdd
前三个元素;
需求4:使用 reduce
算子,进行累加操作;
需求5:使用 collect
算子,收集所有元素。
from pyspark import SparkContext
if __name__ == "__main__":# ********** Begin **********## 1.初始化 SparkContext,该对象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.创建一个内容为[1, 3, 5, 7, 9, 8, 6, 4, 2]的列表ListList = [1, 3, 5, 7, 9, 8, 6, 4, 2] # 3.通过 SparkContext 并行化创建 rddrdd = sc.parallelize(List)# 4.收集rdd的所有元素并print输出print(rdd.collect())# 5.统计rdd的元素个数并print输出print(rdd.count())# 6.获取rdd的第一个元素并print输出print(rdd.first())# 7.获取rdd的前3个元素并print输出print(rdd.take(3))# 8.聚合rdd的所有元素并print输出print(rdd.reduce(lambda x,y:x+y))# 9.停止 SparkContextsc.stop()# ********** End **********#
相关文章:

SparkRDD及算子-python版
RDD相关知识 RDD介绍 RDD 是Spark的核心抽象,即 弹性分布式数据集(residenta distributed dataset)。代表一个不可变,可分区,里面元素可并行计算的集合。其具有数据流模型的特点:自动容错,位置…...

嵌入式设备与PC上位机通信协议设计的几点原则
嵌入式设备在运行中需要设置参数,这个工作经常由PC机来实现,需要为双方通信设计协议,有代表性协议是如下三种: 从上表可以看到,一般嵌入式设备内存和运算性能都有限,因此固定二进制是首选通信协议。 一&am…...

Go 内置运算符
一、算数运算符 1、算数运算符使用 package mainimport ("fmt" )func main(){fmt.PrintIn("103",103) //10313fmt.PrintIn("10-3",10-3) //10-37fmt.PrintIn("10*3",10*3) //10*330//除法注意:如果运算的数都是…...

Table和HashBasedTable的使用案例
------------------- 1.普通使用 package org.example.testhashbasedtable;import com.google.common.collect.HashBasedTable; import com.google.common.collect.Table;import java.util.Map;public class TestHashBasedTable {public static void main(String[] args) {Ta…...

【执行批处理后 executeBatch() 没反应,一个参数相信就能搞定】
一、场景是在使用EasyExcel读取全表时,每次手动提交事务6w多条,总计190w数据量的情况下发生的。 博主比较fw,卡住了两天😶 此问题还有一个比较bug的地方,就是当你在 executeBatch() 上下打断点时还能够执行出来&…...

【LeetCode】每日一题 2023_11_25 二叉树中的伪回文路径(dfs,数组/位运算)
文章目录 刷题前唠嗑题目:二叉树中的伪回文路径题目描述代码与解题思路偷看大佬题解 结语 刷题前唠嗑 LeetCode?启动!!! 这个月第一次周末早起~ 题目:二叉树中的伪回文路径 题目链接:1457. 二…...

什么是海外私人IP代理?是纯净独享的代理吗?
相信许多互联网工作者都遇到过IP禁令,比如网络抓取项目,使用共享代理服务器向网站发出第一个请求,但却您收到了禁令,这大部分是由于你的共享IP经过多人使用被禁用所致。 那么到底什么是私人代理呢?它们是否适合您的情…...
Vue组件库推荐:Element UI深度解析
在Vue开发中,使用组件库可以极大地提高开发效率,减少重复工作量。Element UI作为一款优秀的Vue组件库,被广泛应用于各类项目中。本文将对Element UI进行深度解析,为开发者提供详细的使用说明和具体的代码示例。 1,Ele…...

Mysql 8.0主从复制模式安装(兼容Mysql 5.7)
Mysql V8.0.35安装 官网地址:MySQL :: Download MySQL Community Server 下载【Mysql 8.0.35】压缩包 解压压缩包,仅保留6个安装文件即可 mysql-community-client-8.0.31-1.el7.x86_64.rpm mysql-community-client-plugins-8.0.31-1.el7.x86_64.rpm my…...

基于Django+Tensorflow卷积神经网络鸟类识别系统
欢迎大家点赞、收藏、关注、评论啦 ,由于篇幅有限,只展示了部分核心代码。 文章目录 一项目简介系统概述系统功能核心技术系统架构系统优势 二、功能三、系统四. 总结 总结 一项目简介 介绍一个基于DjangoTensorflow卷积神经网络鸟类识别系统是一个非…...

史上最全前端知识点+高频面试题合集,十二大专题,命中率高达95%
前言: 下面分享一些关于阿里,美团,深信服等公司的面经,供大家参考一下。大家也可以去收集一些其他的面试题,可以通过面试题来看看自己有哪里不足。也可以了解自己想去的公司会问什么问题,进行有针对的复习。…...

我叫:基数排序【JAVA】
1.自我介绍 基数排序(radix sort)属于“分配式排序” (distribution sort),又称“桶子法” (bucket sort)或bin sort,它是通过键值的各个位的值,将要排序的元素分配至某些“桶”中,是‘桶排序’的扩展 2.基本思想 将所有待比较数值统一为同样的数位长度,数位较短的数…...

ArkUI开发进阶—@Builder函数@BuilderParam装饰器的妙用与场景应用【鸿蒙专栏-05】
ArkUI开发进阶—@Builder函数@BuilderParam装饰器的妙用与场景应用 HarmonyOS,作为一款全场景分布式操作系统,为了推动更广泛的应用开发,采用了一种先进而灵活的编程语言——ArkTS。ArkTS是在TypeScript(TS)的基础上发展而来,为HarmonyOS提供了丰富的应用开发工具,使开…...

智慧城市内涝积水监测仪功能,提升城市预防功能
内涝积水监测仪不仅改变了人们应对城市内涝的老办法,还让智慧城市往前迈了一大步。这个监测仪是怎么做到的呢?就是靠它精准的数据监测和预警,让城市管理有了更科学高效的解决妙招。它就像有了个聪明又负责任的助手,让城市管理更加…...

ISCTF2023 部分wp
学一年了还在入门( web where_is_the_flag ISCTF{41631519-1c64-40f6-8dbb-27877a184e74} 圣杯战争 <?php // highlight_file(__FILE__); // error_reporting(0);class artifact{public $excalibuer;public $arrow;public function __toString(){echo "为Saber选择…...
springboot(ssm毕业生学历证明系统Java(codeLW)
springboot(ssm毕业生学历证明系统Java(code&LW) 开发语言:Java 框架:ssm/springboot vue JDK版本:JDK1.8(或11) 服务器:tomcat 数据库:mysql 5.7(或8.0) 数据…...

分布式锁3: zk实现分布式锁
一 zk 实现分布式锁 1.1 zk分布式操作命令 1.指令: ls / get /zookeeper create /aa "test" delete /aa set /aa "test1" 分布式锁实现原理与最佳实践 2..znode节点类型: 永…...
每日博客Day8
每日博客Day 8 每日算法 206.翻转链表 个人第一次思路: 其实我个人的第一个思路是比较暴力的,我第一遍暴力遍历链表,把链表的所有数值全部都保存到数组里面,然后翻转这个数组,再重复的覆盖当前的这个链表。但是这样…...

Redis-主从与哨兵架构
Jedis使用 Jedis连接代码示例: 1、引入依赖 <dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version> </dependency> 2、访问代码 public class JedisSingleTe…...
PTA 7-3 将数组中的数逆序存放
本题要求编写程序,将给定的n个整数存入数组中,将数组中的这n个数逆序存放,再按顺序输出数组中的元素。 输入格式: 输入在第一行中给出一个正整数n(1≤n≤10)。第二行输入n个整数,用空格分开。 输出格式:…...
浅谈 React Hooks
React Hooks 是 React 16.8 引入的一组 API,用于在函数组件中使用 state 和其他 React 特性(例如生命周期方法、context 等)。Hooks 通过简洁的函数接口,解决了状态与 UI 的高度解耦,通过函数式编程范式实现更灵活 Rea…...
在软件开发中正确使用MySQL日期时间类型的深度解析
在日常软件开发场景中,时间信息的存储是底层且核心的需求。从金融交易的精确记账时间、用户操作的行为日志,到供应链系统的物流节点时间戳,时间数据的准确性直接决定业务逻辑的可靠性。MySQL作为主流关系型数据库,其日期时间类型的…...
脑机新手指南(八):OpenBCI_GUI:从环境搭建到数据可视化(下)
一、数据处理与分析实战 (一)实时滤波与参数调整 基础滤波操作 60Hz 工频滤波:勾选界面右侧 “60Hz” 复选框,可有效抑制电网干扰(适用于北美地区,欧洲用户可调整为 50Hz)。 平滑处理&…...
模型参数、模型存储精度、参数与显存
模型参数量衡量单位 M:百万(Million) B:十亿(Billion) 1 B 1000 M 1B 1000M 1B1000M 参数存储精度 模型参数是固定的,但是一个参数所表示多少字节不一定,需要看这个参数以什么…...
蓝桥杯 2024 15届国赛 A组 儿童节快乐
P10576 [蓝桥杯 2024 国 A] 儿童节快乐 题目描述 五彩斑斓的气球在蓝天下悠然飘荡,轻快的音乐在耳边持续回荡,小朋友们手牵着手一同畅快欢笑。在这样一片安乐祥和的氛围下,六一来了。 今天是六一儿童节,小蓝老师为了让大家在节…...

剑指offer20_链表中环的入口节点
链表中环的入口节点 给定一个链表,若其中包含环,则输出环的入口节点。 若其中不包含环,则输出null。 数据范围 节点 val 值取值范围 [ 1 , 1000 ] [1,1000] [1,1000]。 节点 val 值各不相同。 链表长度 [ 0 , 500 ] [0,500] [0,500]。 …...

从零实现STL哈希容器:unordered_map/unordered_set封装详解
本篇文章是对C学习的STL哈希容器自主实现部分的学习分享 希望也能为你带来些帮助~ 那咱们废话不多说,直接开始吧! 一、源码结构分析 1. SGISTL30实现剖析 // hash_set核心结构 template <class Value, class HashFcn, ...> class hash_set {ty…...

用docker来安装部署freeswitch记录
今天刚才测试一个callcenter的项目,所以尝试安装freeswitch 1、使用轩辕镜像 - 中国开发者首选的专业 Docker 镜像加速服务平台 编辑下面/etc/docker/daemon.json文件为 {"registry-mirrors": ["https://docker.xuanyuan.me"] }同时可以进入轩…...

ios苹果系统,js 滑动屏幕、锚定无效
现象:window.addEventListener监听touch无效,划不动屏幕,但是代码逻辑都有执行到。 scrollIntoView也无效。 原因:这是因为 iOS 的触摸事件处理机制和 touch-action: none 的设置有关。ios有太多得交互动作,从而会影响…...
C++.OpenGL (14/64)多光源(Multiple Lights)
多光源(Multiple Lights) 多光源渲染技术概览 #mermaid-svg-3L5e5gGn76TNh7Lq {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-3L5e5gGn76TNh7Lq .error-icon{fill:#552222;}#mermaid-svg-3L5e5gGn76TNh7Lq .erro…...