当前位置: 首页 > news >正文

Spark 核心API

核心 API

spark core API 指的是 spark 预定义好的算子。无论是 spark streaming 或者 Spark SQL 都是基于这些最基础的 API 构建起来的。理解这些核心 API 也是写出高效 Spark 代码的基础。

Transformation

转化类的算子是最多的,学会使用这些算子就应付多数的数据加工需求了。他们有啥呢?可以如下分发:

  1. 转化算子: map、flatMap、filter
  2. 聚合算子:reduceByKey、reducerBy、groupBy、groupByKey、conbinerByKey、mapValues、flatMapValues
  3. 连接算子: cogroup、join、union、leftOuterJoin、rightOuterJoin、union
  4. 排序算子:sortBy、sortByKey

看起来好多,其实就这四种数据加工操作。他们之间又有实现上依赖关系。如下图所示:
函数的依赖关系

转化算子

在做数据加工的时候,我经常会将某个字段的值进行加工,例如,格式化日期、正则匹配、数据计算、逻辑判断、过滤。 都可以使用转化算子进行加工。举个例子,将过来出 158 开头的手机号,显示出来的电话中间四位替换为*


import org.apache.spark.{SparkConf, SparkContext}object CSDN {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local").setAppName(CSDN.getClass.getCanonicalName)val sc = new SparkContext(conf)sc.parallelize(List("15899887112", "15799887112", "15999887152", "15799887192")).filter(x => x.startsWith("158")).map(x => x.substring(0 , 3) + "****" + x.substring(7 , x.length)).foreach(println);}}

总结一下,map 做的事情就是 A -> B ,filter 是过滤的功能。

flatMap 的功能比较难理解,他是这样的,A -> [B , B , B] ,flatMap 返回的是一个数组。还是用一个例子来说明吧。有如下例子,

groupplayer
LakersJames,Davis
CelticsAtum,Borrow

转化为

playergroup
JamesLakers
DavisLakers
AtumCeltics
BorrowCeltics

代码是:

val conf = new SparkConf().setMaster("local").setAppName(CSDN.getClass.getCanonicalName)val sc = new SparkContext(conf)sc.parallelize(List(("Lakers" , "James,Davis"),("Celtics" , "Atum,Borrow"))).flatMap(x => {x._2.split(",").map(xx => (x._1 , xx))}).foreach(println)

还有两个和 map 和 flatMap 长的差不多的,分别是 mapValue 和 flatMapValues 两个函数。这两个函数是 PairRDDFunctions 的匿名类中的函数,从 PairRDDFunctions 的名称中可以知道,PairRDDFunctions 是真的键值对的,也就是说 RDD 中的数据是键值对的时候,我们可以调 PairRDDFunctions 的函数,scala 这个功能好像类的被动技能。这是对 RDD 功能一种扩展。说了写废话,还是说回 mapValue 和 flatMapValue ,当这个两个算子接收到 我们字段的函数后,作用到的是 key-value 的 value 上面的, map 和 flapMap 是作用到整个数据上的。例如,我们的数据是 ( James , 37) ,我自定义的函数是 self_define_function , map 和 flatMap 的效果是 self_define_function((James , 37)) , 而 mapValue 和 flatMapValues 则是 (James , self_define_function(value))。

聚合算子

聚合算子包括 combinerByKeyWithClassTag、reduceBykey、reduceBy,然后把数据连接启动的算子:cogroup、join、leftOuterJoin、rightOuterJoin,还有 union 这几个东西。

combinerByKeyWithClassTag 是一个基础类,当明白了它,reduceByKey 和 reduceBy 都会明白了。conbinerByKey 和 Accumulator(累加器) 的计算逻辑一样的。就看一下它的入参吧。

combineByKeyWithClassTag[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C,partitioner: Partitioner,mapSideCombine: Boolean = true,serializer: Serializer = null)

createCombiner : 是一个函数,此函数的入参是 V 返回的是一个 C , V 和 C 是泛型。此函数的功能是创建一个初始值。
mergeValue :也是一个函数,此函数的入参是 C 和 V 返回的是 V ,此函数会接收各个分区每条数据 V ,然后经过加工,返回的还是一个 C 。
mergeCombiner: 又是一个函数,它是合并各个分区 combiner 后的值。
partitioner: 是分区器,它是用来位每条记录计算分区用的。
mapSideCombiner:这个是设置是否在 shuffle 的过程执行,执行 map-side 的局部聚合。
serializer:是数据序列化器,数据在不同的通过网络间传输的时候,需将数据序列化后传输的,这样可以提高效率。

下面

partition1创建C
mergeValue将V累计到C上
partition2创建C
partition3创建C
combiner1获取累计值
combiner2获取累计值
mergeCombiner将combiner1和combiner2合并
获得结果

此算子是 PairRDDFunctions 的,所以它是处理 key-value 类型数据的算子。以 word count 为例子。
这需要假设我设置了 mapSideCombine = true 从可以的。

val conf = new SparkConf().setMaster("local[*]").setAppName("");
val sc = new SparkContext(conf)
sc.parallelize(List(
"Java","Spark","Scala","Python","Shell","Lisp"
)).map((_,1)).
combineByKeyWithClassTag((x:Int) => x
,(x:Int,y:Int) => x + y
,(x:Int,y:Int) => x + y
,new HashPartitioner(3)
,true
,null
)
.foreach(println)

在上面的的 combinerBykeyWithClassTag 的用户相当于 reduceByKey(+) 的用法,里面的 + 其实 (x:Int,y:Int) => x + y 的简写。

搞聚合的还有一个 groupByKey 和 groupBy() 这两个东西,既然咱们知道了 combinerByKeyWithClassTag 这个函数,其实通过看源码就可以看到 groupByKey 的功能了。

// 进入 PairRDDFunction 对象的 groupByKey 方法里面def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {val createCombiner = (v: V) => CompactBuffer(v)val mergeValue = (buf: CompactBuffer[V], v: V) => buf += vval mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2// 底层就是使用的 combinerByKeyWithClassTag 这个函数val bufs = combineByKeyWithClassTag[CompactBuffer[V]](createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)bufs.asInstanceOf[RDD[(K, Iterable[V])]]}

从源码中,可以看到 groupByKey 底层还是用的 combineByKeyWithClassTag,我来看看它里面三个非常重要的函数:

  1. val createCombiner = (v: V) => CompactBuffer(v) 这是初始化 combiner 函数,返回的是一个 CompactBuffer ,这是一个底层保存是数组,这个看以看成是一个 list 。
  2. val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v,这个是 mergeValue 的函数,它的做法是将 value 的值放到 CompactBuff 列表的。
  3. val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2,这是 mergeCombiner 的函数,此函数是将两个 CompactBuffer 合并成一个 CompactBuffer 中。

这样算下来,groupByKey 其实是将相同 key 下面的 value 放入到一个 CompactBuffer 中,然后然后在像求什么值,在进行计算就行了。可以使用 mapValues 此函数。这个函数也是 PairRDDFunction 的。

现在再来 groupBy 吧,上源码:

// 打开 RDD 的 groupBy 方法def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null): RDD[(K, Iterable[T])] = withScope {val cleanF = sc.clean(f)// 先使用 map 将 RDD 转化为一个 PairRDD ,然后就可以使用 groupByKey 了this.map(t => (cleanF(t), t)).groupByKey(p)}

从代码中可以看到,先是将 RDD 转为 PairRDD ,然后将再使用 groupBykey。转化为 PairRDD 中,使用到 f 这个我们自定义的函数,此函数接收一个 RDD 中的数据,然后返回的是个 key 值,f 其实是定义 key 的函数。

下面看一个例子,

playergroup
JamesLakers
DavisLakers
AtumCeltics
BorrowCeltics

转化为

groupplayer
LakersJames,Davis
CelticsAtum,Borrow

代码为:

    val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)val sc = new SparkContext(conf)sc.setLogLevel("ERROR")val value = sc.parallelize(List(("Lakers", "James"), ("Lakers", "Davis"), ("Celtics", "Atum"), ("Celtics", "Borrow")))value.groupByKey().mapValues(x => x.mkString(",")).foreach(x => println(s"key: ${x._1} , value:${x._2}"))sc.stop()// 第二种写法val value:RDD[(String,String)] = sc.parallelize(List(("Lakers", "James"), ("Lakers", "Davis"), ("Celtics", "Atum"), ("Celtics", "Borrow")))value.groupBy(x=>x._1).mapValues(x => x.mkString(",")).foreach(x => println(s"key: ${x._1} , value:${x._2}"))sc.stop()
排序算子

排序算子比较少,就两个一个 sortByKey ,另外一个就是 sortBy ,先来看 sortByKey 。
sortByKey 也是一个 PairRDDFunction 的函数,其处理的是 key-value 中的 key ,也就是根据 key 值来进行的排序,看一个例子吧。

    sc.parallelize(List("A" , "C" , "B" , "E" , "F" )).map((_,0))// 这里必须设置分区数量为1,否则,打印出来的元素就不排序了。.sortByKey(true , 1)sc.stop()

其实,sortBy 就是基于 sortByKey ,来看看源码就知道了。

  def sortBy[K](f: (T) => K,ascending: Boolean = true,numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {this.keyBy[K](f).sortByKey(ascending, numPartitions).values}

keyBy(func) 可以理解为 RDD.map(x => (func(x) , x)) 其实就是将 RDD 转化为一个 PairRDD , 这样就能用 sortByKey 了,最后把 PairRDD 转化为原来的 RDD 。

连接类的算子

首先要讲的就是 cogroup 算子,它也是一个基础的算子,像 join、lelfOuterJoin、rightOuterJoin、intersection、fullOuterjoin 都是依赖 cogroup 实现的。

看一下 congroup 实现订单表和商品维表关联取出商品价格的情况。

    val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)val sc = new SparkContext(conf)sc.setLogLevel("ERROR")val order:RDD[String] = sc.parallelize(List("order1,product1,1", "order1,product2,2", "order1,product3,4"))val product:RDD[String] = sc.parallelize(List("product1,10", "product2,30", "product3,87"))val productTuple:RDD[(String,String)] = product.map(x => {val strings = x.split(",")(strings(0), strings(1))})val orderTuple:RDD[(String,String)] = order.map(x => {val strings = x.split(",")(strings(1), x)})orderTuple.cogroup(productTuple).mapValues(x => {x._1.map(xx => {val strings = xx.split(",")var rs = ""var price:Int = 0if(!x._2.isEmpty){price = x._2.head.toIntrs = xx.concat(s",${price.toInt},${price.toInt*strings(2).toInt}")}rs})}).values.foreach(println)sc.stop()

再看一下 join 的实现吧

// join 算子是 PairRDDFunction 的def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {this.cogroup(other, partitioner).flatMapValues( pair =>// 笛卡尔积for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w))}

从上面的代码可以知道,是对 pair 进行笛卡尔积操作,而且前后都不为 Seq() ,也就是不为空。

再来看一下 fullOuterJoin 吧。

  def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))] = self.withScope {this.cogroup(other, partitioner).flatMapValues {case (vs, Seq()) => vs.iterator.map(v => (Some(v), None))case (Seq(), ws) => ws.iterator.map(w => (None, Some(w)))case (vs, ws) => for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), Some(w))}}

从上面的代码里面,发现 fullOuterJoin 的实现和 join 实现差不多,但是多了对应左右列表为空的处理。这和 SQL 这的 full join 的语义是相同的。这个的 case 是 scala 里面的偏函数,在 scala 的源码中应用非常的广泛。

知道了 join 和 fullOuerJoin ,就再来看看 leftOuterJoin 和 rightOuterJoin :

// leftOuterJoin 也是 PairRDDFuntion 的函数def leftOuterJoin[W](other: RDD[(K, W)],partitioner: Partitioner): RDD[(K, (V, Option[W]))] = self.withScope {this.cogroup(other, partitioner).flatMapValues { pair =>if (pair._2.isEmpty) {pair._1.iterator.map(v => (v, None))} else {for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w))}}}
// rightOuterJoin 也是 PairRDDFunction 的函数def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))] = self.withScope {this.cogroup(other, partitioner).flatMapValues { pair =>if (pair._1.isEmpty) {pair._2.iterator.map(w => (None, w))} else {for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w)}}}

从上面的代码展示来看看,[left | right]OuterJoin 和 SQL 中的 left join 和 right join 的语义也是相同的。
在这里可以得到一个结论,join、leftOuterJoin、rightOuterJoin、fullOuterJoin 的计算效率其实是相同的,都取决于 cogroup 的效率。

最后看一个 RDD 中的函数 intersection

def intersection(other: RDD[T]): RDD[T] = withScope {this.map(v => (v, null)).cogroup(other.map(v => (v, null))).filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }.keys}

从上面的代码中可以看到,是去掉了左右没有关联到的数据。这和 SQL 里面的 inner join 的语义是一致的。其实是和 join 逻辑相似,但是并没有将左右边的元素进行笛卡尔积的计算。

在实现维表关联的场景下,还有一个重要的算子,就是 broadcast 算子。来看一个例子。

    val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)val sc = new SparkContext(conf)//设置文件切分大小sc.hadoopConfiguration.setLong("fs.local.block.size",128*1024*1024)//数据合并,有大量的数据移动val productRDD: RDD[(String, String)] = sc.parallelize(List("product1;10")).map { line =>val field = line.split(";")(field(0), line)}//广播变量val productBC: Broadcast[collection.Map[String, String]] = sc.broadcast(productRDD.collectAsMap())//map task 完成数据准备val orderInfoRDD: RDD[(String, String)] = sc.parallelize(List("order1;10;product1")).map { line =>val field = line.split(";")(field(2), line)}//map 端的joinval resultRDD: RDD[(String, (String, String))] = orderInfoRDD.map {case (pid, orderInfo) => {val product: collection.Map[String, String] = productBC.value(pid, (orderInfo, product.getOrElse(pid, null)))}}resultRDD.foreach(println)Thread.sleep(100)sc.stop()

广播是非常好的优化方式,他会将维表的一个副本复制到各个分区里面,然后就可以和进行拉宽做了。

控制类的算子

控制类算子是一类和优化相关的算子。
控制类算子
例如,当我们重复利用一些计算结果的时候,可以将中间的计算结果保存到缓存中。例如,我计算某个商品在某些城市的销售额,我们希望计算每天、每周、每月的销售额,我们希望计算这些,怎么计算呢?我们可以计算先计算出每天的销售额,然后在这个基础上计算出每周和每月的销售额。

    val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)val sc = new SparkContext(conf)sc.setLogLevel("ERROR")val dimDataRDD:RDD[String] = sc.parallelize(List("2024-02-29,1,02","2024-03-01,2,03","2024-03-01,2,03","2024-03-02,2,03","2024-03-03,2,03","2024-03-04,2,03"))val dimDateBroadcast = sc.broadcast(dimDataRDD.map(x => {val strings = x.split(",")(strings(0), x)}).collectAsMap())val orderRDD:RDD[String] = sc.parallelize(List("2024-03-01,order1,product1,city1,10", "2024-03-01,order1,product1,city3,11", "2024-03-01,order1,product1,city4,11", "2024-03-01,order1,product1,city5,34", "2024-03-01,order1,product1,city3,13", "2024-03-01,order1,product1,city3,33", "2024-03-02,order1,product1,city3,19", "2024-03-02,order1,product1,city4,13", "2024-03-02,order1,product1,city5,34", "2024-03-02,order1,product1,city3,19", "2024-03-02,order1,product1,city1,38", "2024-03-03,order1,product1,city5,34", "2024-03-03,order1,product1,city3,19", "2024-03-03,order1,product1,city1,38", "2024-03-04,order1,product1,city5,34", "2024-03-04,order1,product1,city3,19", "2024-03-04,order1,product1,city1,38", "2024-02-29,order1,product1,city1,38", "2024-02-28,order1,product1,city1,12"))val byDay = orderRDD.map(x => {val strings = x.split(",")((strings(0), strings(3)), strings(4).toInt)}).reduceByKey((s: Int, x: Int) => {s + x})println("====by day city sum(amt)===")byDay.foreach(println)val byDayWithMonthAndWeek = byDay.map(x => {val dimss = dimDateBroadcast.valueval rs = dimss.getOrElse(x._1._1, Nil) match {case Nil => ("", "")case str: String => {val strings = str.split(",")(strings(1), strings(2))}}//日期+城市 周 月(x._1, rs._1, rs._2, x._2)})byDayWithMonthAndWeek.cache()println("====by city , week  sum(amt)===")byDayWithMonthAndWeek.keyBy(x => (x._1._2 ,x._2)).combineByKey((a:((String,String) , String , String , Int)) => a._4,(s:Int , x:((String,String) , String , String , Int))=>{s + x._4},(x:Int , y:Int) => {x + y}).foreach(println)println("====by city , month  sum(amt)===")byDayWithMonthAndWeek.keyBy(x => (x._1._2 ,x._3)).combineByKey((a:((String,String) , String , String , Int)) => a._4,(s:Int , x:((String,String) , String , String , Int))=>{s + x._4},(x:Int , y:Int) => {x + y}).foreach(println)

上面的代码完成了最初的逻辑。我们来看看里面的执行过程。

总体的结果
从上面的图中可以看到,有四个 job 执行,从函数来看,

0 号代表广播流
1 号代表了输出的是天、城市粒度下销售额
2 号代表输出的是城市、周粒度下的销售额汇总
3 号代表了输出的是城市、月粒度下的销售额汇总

咱们一个一个点进去看看
0 号 job
1号 job

2号代表的task
3代表的task

从 2 和 3 中可以看到前面 Stage6 和 stage3 都是 skip 的,在 Stage7 和 Stage8 中的 map 是有一个绿色的标识的,此标识就是代表了使用缓存。如果讲 cache 去掉的话。在来看看效果。

在这里插入图片描述
在这里插入图片描述
从上面的图中,可以看到 Stage7 和 Stage4 中的 map 没有绿色的标识了,说明缓存已经没了。

下面一个是 persist(), 我们先一个 cache() 的源码:

  def cache(): this.type = persist()def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

从上面的代码,可以知道,cache 是 persist 实现的,而且 persist 里面可以设置不同的保存级别:

  val NONE = new StorageLevel(false, false, false, false)val DISK_ONLY = new StorageLevel(true, false, false, false)val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)val DISK_ONLY_3 = new StorageLevel(true, false, false, false, 3)val MEMORY_ONLY = new StorageLevel(false, true, false, true)val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

它起的名字还是见名知意的。所以它可以保存缓存到内存、磁盘、堆外,并且可以序列化。
其实这些都不够安全,最安全的办法就是缓存保存到 HDFS 中。这样就最保险了。也是就有 checkpoint()
spark 的 checkpoint 值是将中间结果缓存,达到中间数据重复使用的效果,和 Flink 对比,Flink 的 checkpint 本质是一种分布式事务,可以协调各个算子完成同一批数据处理,通过checkpoin 就能实现 exactly-once 的语义,但是 spark 就不能这样。

下一个重要的控制算子就是 Accumulator ,它是一个全局性的累计器。可以保存全局性的累计值。下面是是用 accumulator 实现的 workcount。

object TestAccumulate {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("test_accumulate").setMaster("local[*]")val sc = new SparkContext(conf)val value:RDD[String] = sc.makeRDD(List("Hi,", "Hello", "ww", "hhh"))val sum:LongAccumulator = sc.longAccumulator("sum")val accu:MyAccumulator = new MyAccumulator()sc.register(accu , name = "accu")value.foreach(x => {accu.add(x)sum.add(1)})val value1: mutable.Map[String, Int] = accu.valueprintln(s"size ${value1.size}")println(s"sum ${sum.value}")for( (key ,value) <- value1){println(s"key is ${key} , and value is ${value}")}sc.stop()}}
class MyAccumulator extends AccumulatorV2[String , mutable.Map[String , Int]]{// 定义一个集合,来记录以 H 开头的字符串的个数var map:mutable.Map[String , Int] = mutable.Map[String , Int]()override def isZero: Boolean = trueoverride def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = {val rs:MyAccumulator = new MyAccumulator()rs.map = this.maprs}override def reset(): Unit = {this.map.clear()}override def add(v: String): Unit = {if(v.startsWith("H")){this.map.put(v , map.getOrElse(v , 0) + 1)}}override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {val map1:mutable.Map[String , Int] = mapval map2:mutable.Map[String , Int] = other.valuemap = map1.foldLeft(map2) {(map2, kv) => {map2(kv._1) = map2.getOrElse(kv._1, 0) + kv._2map2}}}override def value: mutable.Map[String, Int] = this.map
}

Accumulator 的计算逻辑和 combinerByKey 的逻辑十分的相似。Spark 还为我们预定义了三个累加器,longAccumulator、doubleAcculator、collectionAccumulator 三个,其实现方式也是继承了AccumulatorV2 类,然后,在 SparkContext 中注册就可以使用了。

coalesce 是将 RDD 中的分区重新划分分区,这个的作用可以处理数据倾斜的问题,其实数据倾斜的根源就是在于分区中有多有的少,我们可以使用 key 值的组合,然后重新分区达到各个分区数据量差不多的情况。

action 算子

action 算子是真正出发计算的算子,在 action 算子之前, 所有的算子就像流水线上的一个工序,按照我们想要的结果设置好了加工模具,action 才能够决一个 job 的开始。一个 Spark 任务中,可以有多个 job ,一个 job 里面可以多个 Stage 。

action 算子

其中,kafka 是后来使用 KafkaUtil 加入的 Kafka 的消费算子,其他都是 RDD 中自带的算子,这些算子中的共同特点是源码中都使用了 SparkContext#runJob()
下面来看看 foreach 函数:

  def foreach(f: T => Unit): Unit = withScope {val cleanF = sc.clean(f)sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))}

下面来说是 KafkaUtils 这是 Spark 扩展,它的功能是让 Spark 可以消费和生产 Kafka 里面的数据,这样 Spark 就能处理流式计算了。

数据源算子

数据源算子是设置数据源的算子。

在这里插入图片描述

textfile 是从文件系统中取出数据,可以是 disk 中,或者从 HDFS 中拉出来。
parallism 是可以从 List 消费数据,这个算子经常用来测试功能。

相关文章:

Spark 核心API

核心 API spark core API 指的是 spark 预定义好的算子。无论是 spark streaming 或者 Spark SQL 都是基于这些最基础的 API 构建起来的。理解这些核心 API 也是写出高效 Spark 代码的基础。 Transformation 转化类的算子是最多的&#xff0c;学会使用这些算子就应付多数的数…...

OpenLayers线性渐变和中心渐变(径向渐变)

目录 1.前言2.添加一个面要素3.线性渐变3.1 第一个注意点3.2 第二个注意点 4.中心渐变&#xff08;径向渐变&#xff09;5.总结 1.前言 OpenLayers官网有整个图层的渐变示例&#xff0c;但是没有单个要素的渐变示例&#xff0c;我们这里来补充一下。OpenLayers中的渐变是通过fi…...

[210. 课程表 II] 拓扑排序模板(DFS+BFS)

Problem: 210. 课程表 II 文章目录 思路解题方法Code 思路 本题是经典拓扑排序模板&#xff0c;通过DFS和BFS两种方式进行实现。 解题方法 DFS DFS方法的重点在于如何标记节点状态&#xff0c;初做题者如果只用未访问和已访问两种状态很容易陷入死结。正确的做法是使用三种状…...

我的第一个python web 网站

# -*- coding: utf-8 -*-import http.server import socketserver from datetime import datetimePORT 8000import sys# ...class MyHandler(http.server.SimpleHTTPRequestHandler):def do_GET(self):if self.path /:# 如果路径是根路径&#xff0c;返回页面内容self.send_r…...

产品展示型wordpress外贸网站模板

孕婴产品wordpress外贸网站模板 吸奶器、待产包、孕妇枕头、护理垫、纸尿裤、孕妇装、孕婴产品wordpress外贸网站模板。 https://www.jianzhanpress.com/?p4112 床品毛巾wordpress独立站模板 床单、被套、毛巾、抱枕、靠垫、围巾、布艺、枕头、乳胶枕、四件套、浴巾wordpre…...

四信全球化拓展再启新篇!LoRa传感器与云平台领航智能感知时代

随着科技浪潮的不断推进&#xff0c;物联网已逐渐融入我们的生活。刚刚结束的MWC24盛会上&#xff0c;四信带来了一系列前沿技术成果&#xff0c;不仅将5G技术成功扩展至当前市场主流类型的终端&#xff0c;更携手联通、ASR等业界巨头&#xff0c;在连接、5G RedCap、AI、LoRa以…...

阿里云k8s环境下,因slb限额导致的发布事故

一、背景 阿里云k8s容器&#xff0c;在发布java应用程序的时候&#xff0c;客户端访问出现500错误。 后端服务是健康且可用的&#xff0c;网关层大量500错误请求&#xff0c;slb没有流入和流出流量。 经过回滚&#xff0c;仍未能解决错误。可谓是一次血的教训&#xff0c;特…...

【STM32+OPENMV】矩形识别

一、准备工作 有关OPENMV最大色块追踪及与STM32通信内容&#xff0c;详情见【STM32HAL】与OpenMV通信 二、所用工具 1、芯片&#xff1a;STM32F103C8T6 2、CUBEMX配置软件 3、KEIL5 4、OPENMV 三、实现功能 寻找黑色矩形&#xff0c;并将最大矩形的四个边缘坐标发送给STM…...

在吗?腾讯云服务器优惠价格表曝光_2023年3月报价请过目!

腾讯云服务器多少钱一年&#xff1f;61元一年起&#xff0c;2核2G3M配置&#xff0c;腾讯云2核4G5M轻量应用服务器165元一年、756元3年&#xff0c;4核16G12M服务器32元1个月、312元一年&#xff0c;8核32G22M服务器115元1个月、345元3个月&#xff0c;腾讯云服务器网txyfwq.co…...

Revit-二开之创建Plane-(7)

2016版本的Plane 2017版本的Plane 2018版本及以上版本的Plane 由此可见2017版本是一个分水岭 #if REVIT2016Plane plane = new Plane(uiDoc.Document.ActiveView...

【操作系统学习笔记】文件管理1.2

【操作系统学习笔记】文件管理1.2 参考书籍: 王道考研 视频地址: Bilibili 文件的逻辑结构 无结构文件 文件内部的数据就是一系列的二进制流或字符流组成&#xff0c;又称流式文件&#xff0c;例如 .text 文件 有结构文件 由一组相似的记录组成&#xff0c;又称记录式文件…...

算法归纳【数组篇】

目录 二分查找1. 前提条件&#xff1a;2. 二分查找边界 2.移除元素有序数组的平方长度最小的子数组59.螺旋矩阵II54. 螺旋矩阵 二分查找 参考链接 https://programmercarl.com/0704.%E4%BA%8C%E5%88%86%E6%9F%A5%E6%89%BE.html#%E6%80%9D%E8%B7%AF 1. 前提条件&#xff1a; 数…...

【随笔】程序员如何选择职业赛道,目前各个赛道的现状如何,那个赛道前景巨大

大家好&#xff0c;我是全栈小5&#xff0c;欢迎阅读文章&#xff01; 此篇是【话题达人】系列文章&#xff0c;这一次的话题是《程序员如何选择职业赛道》 目录 背景热度柱状图赛道热度C/C云原生人工智能前沿技术软件工程后端JavaJavascriptPHPPython区块链大数据移动开发嵌入…...

进程之舞:操作系统中的启动、状态转换与唤醒艺术

✨✨ 欢迎大家来访Srlua的博文&#xff08;づ&#xffe3;3&#xffe3;&#xff09;づ╭❤&#xff5e;✨✨ &#x1f31f;&#x1f31f; 欢迎各位亲爱的读者&#xff0c;感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua&#xff0c;在这里我会分享我的知识和经验。&#x…...

Java面试(4)之 Spring Bean生命周期过程

一, 整个加载的完整链路图 更详细的生命周期函数链路图(仅供参考) 二, Bean实例化的四种方式: 1, 无参构造器(默认且常用)6 2, 静态工厂方法方式(factory-method指定实例化的静态方法) 3, 实例工厂方法方式(factory-bean指定bean的name,factory-method指定实例化方法) 4, 实…...

JavaSE——面向对象高级一(1/4)-static修饰成员变量、应用场景,static修饰成员方法、应用场景

目录 static修饰成员变量 类变量的应用场景 static修饰成员方法 static修饰成员方法的应用场景 static 叫静态&#xff0c;可可以修饰成员变量、成员方法。 成员变量按照有无static修饰&#xff0c;分为两种&#xff1a; 类变量实例变量&#xff08;对象的变量&#xff…...

轻量脚本语言Lua的配置与c++调用

文章目录 lua配置下载运行lua命令lua脚本的执行C++调用lua环境配置错误和警告测试c++程序lua脚本结果Lua是一种功能强大且快速的编程语言,易于学习和使用,并且可以嵌入到应用程序中。 Lua被设计成一种轻量级的可嵌入脚本语言。它被用于各种各样的应用程序,从游戏到web应用程…...

力扣每日一道系列 --- LeetCode 160. 相交链表

&#x1f4f7; 江池俊&#xff1a; 个人主页 &#x1f525;个人专栏&#xff1a; ✅数据结构探索 ✅LeetCode每日一道 &#x1f305; 有航道的人&#xff0c;再渺小也不会迷途。 LeetCode 160. 相交链表 思路&#xff1a; 首先计算两个链表的长度&#xff0c;然后判断两个链…...

设计模式-建造者模式实践案例

建造者模式&#xff08;Builder Pattern&#xff09;是一种创建型设计模式&#xff0c;它提供了一种创建对象的最佳方式。当一个对象需要多个部分或许多步骤来创建&#xff0c;并且需要将创建过程与表示分离时&#xff0c;建造者模式非常有用。建造者模式旨在找到一个解决方案&…...

freeRTOS_20240308

1.使用ADC采样光敏电阻数值&#xff0c;如何根据这个数值调节LED灯亮度。 HAL_TIM_PWM_Start(&htim3,TIM_CHANNEL_3); while (1){/* USER CODE END WHILE *//* USER CODE BEGIN 3 */HAL_ADC_Start(&hadc);adc_val HAL_ADC_GetValue(&hadc);printf("adc_va…...

利用chatgpt写论文使用教程

ChatGPT是人工智能技术的一种&#xff0c;可帮助人们综合运用和分析各种语言技巧&#xff0c;从而优化实验结果、加速研究流程以及提高文章质量。以下是利用ChatGPT写论文的使用教程&#xff1a; 综上所述&#xff0c;利用ChatGPT写论文涉及到一些技巧和方法&#xff0c;需要合…...

SMiC矩阵将于3月6日正式上线,开启数字化经济新纪元

在数字化浪潮的推动下&#xff0c;全球瞩目的SMiC矩阵将于2024年3月6日正式上线。这一里程碑式的事件标志着数字化经济迈入了一个全新的时代&#xff0c;为思洣客、合作伙伴和整个经济生态带来了前所未有的机遇和挑战。 SMiC矩阵作为引领数字化经济的新力量&#xff0c;始终致…...

备战蓝桥杯---动态规划的一些思想2

话不多说&#xff0c;直接看题&#xff1a; 1.换根DP&#xff1a; 我们肯定不能对每一个根节点暴力求&#xff0c;我们不妨先求f[1]&#xff0c;我们发现当他的儿子作为根节点时深度和为f[1](n-cnt[i])-cnt[i](cnt[i]表示以i为根的节点数&#xff09;&#xff0c;这样子两遍DFS…...

卫星导航 | 坐标系---地理坐标系与UTM坐标系

卫星导航 | 坐标系---地理坐标系与UTM坐标系 世界坐标系地理坐标系UTM坐标系 全球卫星导航系统(Global Navigation Satelite System,GNSS)&#xff0c;简称卫星导航&#xff0c;是室外机器人定位的一个主要信息来源。 卫星导航能给机器人提供什么信息&#xff1f; 正常工作时&…...

JavaEE之volatile关键字

一.内存可见性问题 什么是内存可见性问题 计算机运行的程序/代码&#xff0c;往往需要访问数据。这些数据往往存在于内存中。 cup使用此变量时&#xff0c;就会把内存中的数据先读出来&#xff0c;加载到cpu寄存器中&#xff0c;再去参与运算。 但是&#xff0c;关键是cpu读…...

代码学习记录10

随想录日记part10 t i m e &#xff1a; time&#xff1a; time&#xff1a; 2024.03.03 主要内容&#xff1a;今天的主要内容是深入了解数据结构中栈和队列&#xff0c;并通过三个 l e e t c o d e leetcode leetcode 题目深化认识。 20. 有效的括号1047. 删除字符串中的所有…...

java——2024-03-03

String类的对象能被修改吗&#xff1f;如果不能需要用什么修改&#xff1f;StringBuilder和StringBuffer的区别&#xff1f;equals和区别谈谈对面向对象的理解重载和重写的区别说一下ArrayList&#xff0c;LinkedList底层实现以及区别什么是哈希冲突&#xff1f;hashMap和conCu…...

Ubuntu安装conda以后,给jupyter安装C++内核

前言 大家都知道&#xff0c;jupyter notebook 可以支持python环境&#xff0c;可以在不断点调试的情况下&#xff0c;打印出当前结果&#xff0c;如果代码错了也不影响前面的内容。于是我就想有没有C环境的&#xff0c;结果还真有。 参考文章&#xff1a; 【分享】Ubuntu安装…...

【谈判】核心思想(抓大放小)

谈判交换&#xff08;抓大放小&#xff09; 一、明确目的 事&#xff1a;must: 非要不可&#xff0c;才会签字 want: 有很好&#xff0c; give: 放掉 三者&#xff0c;会变化 二、明确对象 人&#xff1a;我跟谁谈&#xff1f; 时&#xff1a; 国际形势、国家的政策、他的心…...

洛谷P5908 猫猫和企鹅 做题反思(2024.3.7)

猫猫和企鹅 题目传送门 题目描述 王国里有 n n n 个居住区&#xff0c;它们之间有 n − 1 n-1 n−1 条道路相连&#xff0c;并且保证从每个居住区出发都可以到达任何一个居住区&#xff0c;并且每条道路的长度都为 1 1 1。 除 1 1 1 号居住区外&#xff0c;每个居住区住…...