Spark 核心API
核心 API
spark core API 指的是 spark 预定义好的算子。无论是 spark streaming 或者 Spark SQL 都是基于这些最基础的 API 构建起来的。理解这些核心 API 也是写出高效 Spark 代码的基础。
Transformation
转化类的算子是最多的,学会使用这些算子就应付多数的数据加工需求了。他们有啥呢?可以如下分发:
- 转化算子: map、flatMap、filter
- 聚合算子:reduceByKey、reducerBy、groupBy、groupByKey、conbinerByKey、mapValues、flatMapValues
- 连接算子: cogroup、join、union、leftOuterJoin、rightOuterJoin、union
- 排序算子:sortBy、sortByKey
看起来好多,其实就这四种数据加工操作。他们之间又有实现上依赖关系。如下图所示:
转化算子
在做数据加工的时候,我经常会将某个字段的值进行加工,例如,格式化日期、正则匹配、数据计算、逻辑判断、过滤。 都可以使用转化算子进行加工。举个例子,将过来出 158 开头的手机号,显示出来的电话中间四位替换为*
。
import org.apache.spark.{SparkConf, SparkContext}object CSDN {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local").setAppName(CSDN.getClass.getCanonicalName)val sc = new SparkContext(conf)sc.parallelize(List("15899887112", "15799887112", "15999887152", "15799887192")).filter(x => x.startsWith("158")).map(x => x.substring(0 , 3) + "****" + x.substring(7 , x.length)).foreach(println);}}
总结一下,map 做的事情就是 A -> B ,filter 是过滤的功能。
flatMap 的功能比较难理解,他是这样的,A -> [B , B , B] ,flatMap 返回的是一个数组。还是用一个例子来说明吧。有如下例子,
group | player |
---|---|
Lakers | James,Davis |
Celtics | Atum,Borrow |
转化为
player | group |
---|---|
James | Lakers |
Davis | Lakers |
Atum | Celtics |
Borrow | Celtics |
代码是:
val conf = new SparkConf().setMaster("local").setAppName(CSDN.getClass.getCanonicalName)val sc = new SparkContext(conf)sc.parallelize(List(("Lakers" , "James,Davis"),("Celtics" , "Atum,Borrow"))).flatMap(x => {x._2.split(",").map(xx => (x._1 , xx))}).foreach(println)
还有两个和 map 和 flatMap 长的差不多的,分别是 mapValue 和 flatMapValues 两个函数。这两个函数是 PairRDDFunctions 的匿名类中的函数,从 PairRDDFunctions 的名称中可以知道,PairRDDFunctions 是真的键值对的,也就是说 RDD 中的数据是键值对的时候,我们可以调 PairRDDFunctions 的函数,scala 这个功能好像类的被动技能。这是对 RDD 功能一种扩展。说了写废话,还是说回 mapValue 和 flatMapValue ,当这个两个算子接收到 我们字段的函数后,作用到的是 key-value 的 value 上面的, map 和 flapMap 是作用到整个数据上的。例如,我们的数据是 ( James , 37) ,我自定义的函数是 self_define_function , map 和 flatMap 的效果是 self_define_function((James , 37)) , 而 mapValue 和 flatMapValues 则是 (James , self_define_function(value))。
聚合算子
聚合算子包括 combinerByKeyWithClassTag、reduceBykey、reduceBy,然后把数据连接启动的算子:cogroup、join、leftOuterJoin、rightOuterJoin,还有 union 这几个东西。
combinerByKeyWithClassTag 是一个基础类,当明白了它,reduceByKey 和 reduceBy 都会明白了。conbinerByKey 和 Accumulator(累加器) 的计算逻辑一样的。就看一下它的入参吧。
combineByKeyWithClassTag[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C,partitioner: Partitioner,mapSideCombine: Boolean = true,serializer: Serializer = null)
createCombiner : 是一个函数,此函数的入参是 V 返回的是一个 C , V 和 C 是泛型。此函数的功能是创建一个初始值。
mergeValue :也是一个函数,此函数的入参是 C 和 V 返回的是 V ,此函数会接收各个分区每条数据 V ,然后经过加工,返回的还是一个 C 。
mergeCombiner: 又是一个函数,它是合并各个分区 combiner 后的值。
partitioner: 是分区器,它是用来位每条记录计算分区用的。
mapSideCombiner:这个是设置是否在 shuffle 的过程执行,执行 map-side 的局部聚合。
serializer:是数据序列化器,数据在不同的通过网络间传输的时候,需将数据序列化后传输的,这样可以提高效率。
下面
此算子是 PairRDDFunctions 的,所以它是处理 key-value 类型数据的算子。以 word count 为例子。
这需要假设我设置了 mapSideCombine = true 从可以的。
val conf = new SparkConf().setMaster("local[*]").setAppName("");
val sc = new SparkContext(conf)
sc.parallelize(List(
"Java","Spark","Scala","Python","Shell","Lisp"
)).map((_,1)).
combineByKeyWithClassTag((x:Int) => x
,(x:Int,y:Int) => x + y
,(x:Int,y:Int) => x + y
,new HashPartitioner(3)
,true
,null
)
.foreach(println)
在上面的的 combinerBykeyWithClassTag 的用户相当于 reduceByKey(+) 的用法,里面的 + 其实 (x:Int,y:Int) => x + y 的简写。
搞聚合的还有一个 groupByKey 和 groupBy() 这两个东西,既然咱们知道了 combinerByKeyWithClassTag 这个函数,其实通过看源码就可以看到 groupByKey 的功能了。
// 进入 PairRDDFunction 对象的 groupByKey 方法里面def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {val createCombiner = (v: V) => CompactBuffer(v)val mergeValue = (buf: CompactBuffer[V], v: V) => buf += vval mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2// 底层就是使用的 combinerByKeyWithClassTag 这个函数val bufs = combineByKeyWithClassTag[CompactBuffer[V]](createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)bufs.asInstanceOf[RDD[(K, Iterable[V])]]}
从源码中,可以看到 groupByKey 底层还是用的 combineByKeyWithClassTag,我来看看它里面三个非常重要的函数:
val createCombiner = (v: V) => CompactBuffer(v)
这是初始化 combiner 函数,返回的是一个 CompactBuffer ,这是一个底层保存是数组,这个看以看成是一个 list 。val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
,这个是 mergeValue 的函数,它的做法是将 value 的值放到 CompactBuff 列表的。val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
,这是 mergeCombiner 的函数,此函数是将两个 CompactBuffer 合并成一个 CompactBuffer 中。
这样算下来,groupByKey 其实是将相同 key 下面的 value 放入到一个 CompactBuffer 中,然后然后在像求什么值,在进行计算就行了。可以使用 mapValues 此函数。这个函数也是 PairRDDFunction 的。
现在再来 groupBy 吧,上源码:
// 打开 RDD 的 groupBy 方法def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null): RDD[(K, Iterable[T])] = withScope {val cleanF = sc.clean(f)// 先使用 map 将 RDD 转化为一个 PairRDD ,然后就可以使用 groupByKey 了this.map(t => (cleanF(t), t)).groupByKey(p)}
从代码中可以看到,先是将 RDD 转为 PairRDD ,然后将再使用 groupBykey。转化为 PairRDD 中,使用到 f 这个我们自定义的函数,此函数接收一个 RDD 中的数据,然后返回的是个 key 值,f 其实是定义 key 的函数。
下面看一个例子,
player | group |
---|---|
James | Lakers |
Davis | Lakers |
Atum | Celtics |
Borrow | Celtics |
转化为
group | player |
---|---|
Lakers | James,Davis |
Celtics | Atum,Borrow |
代码为:
val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)val sc = new SparkContext(conf)sc.setLogLevel("ERROR")val value = sc.parallelize(List(("Lakers", "James"), ("Lakers", "Davis"), ("Celtics", "Atum"), ("Celtics", "Borrow")))value.groupByKey().mapValues(x => x.mkString(",")).foreach(x => println(s"key: ${x._1} , value:${x._2}"))sc.stop()// 第二种写法val value:RDD[(String,String)] = sc.parallelize(List(("Lakers", "James"), ("Lakers", "Davis"), ("Celtics", "Atum"), ("Celtics", "Borrow")))value.groupBy(x=>x._1).mapValues(x => x.mkString(",")).foreach(x => println(s"key: ${x._1} , value:${x._2}"))sc.stop()
排序算子
排序算子比较少,就两个一个 sortByKey ,另外一个就是 sortBy ,先来看 sortByKey 。
sortByKey 也是一个 PairRDDFunction 的函数,其处理的是 key-value 中的 key ,也就是根据 key 值来进行的排序,看一个例子吧。
sc.parallelize(List("A" , "C" , "B" , "E" , "F" )).map((_,0))// 这里必须设置分区数量为1,否则,打印出来的元素就不排序了。.sortByKey(true , 1)sc.stop()
其实,sortBy 就是基于 sortByKey ,来看看源码就知道了。
def sortBy[K](f: (T) => K,ascending: Boolean = true,numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {this.keyBy[K](f).sortByKey(ascending, numPartitions).values}
keyBy(func) 可以理解为 RDD.map(x => (func(x) , x)) 其实就是将 RDD 转化为一个 PairRDD , 这样就能用 sortByKey 了,最后把 PairRDD 转化为原来的 RDD 。
连接类的算子
首先要讲的就是 cogroup 算子,它也是一个基础的算子,像 join、lelfOuterJoin、rightOuterJoin、intersection、fullOuterjoin 都是依赖 cogroup 实现的。
看一下 congroup 实现订单表和商品维表关联取出商品价格的情况。
val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)val sc = new SparkContext(conf)sc.setLogLevel("ERROR")val order:RDD[String] = sc.parallelize(List("order1,product1,1", "order1,product2,2", "order1,product3,4"))val product:RDD[String] = sc.parallelize(List("product1,10", "product2,30", "product3,87"))val productTuple:RDD[(String,String)] = product.map(x => {val strings = x.split(",")(strings(0), strings(1))})val orderTuple:RDD[(String,String)] = order.map(x => {val strings = x.split(",")(strings(1), x)})orderTuple.cogroup(productTuple).mapValues(x => {x._1.map(xx => {val strings = xx.split(",")var rs = ""var price:Int = 0if(!x._2.isEmpty){price = x._2.head.toIntrs = xx.concat(s",${price.toInt},${price.toInt*strings(2).toInt}")}rs})}).values.foreach(println)sc.stop()
再看一下 join 的实现吧
// join 算子是 PairRDDFunction 的def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {this.cogroup(other, partitioner).flatMapValues( pair =>// 笛卡尔积for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w))}
从上面的代码可以知道,是对 pair 进行笛卡尔积操作,而且前后都不为 Seq() ,也就是不为空。
再来看一下 fullOuterJoin 吧。
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))] = self.withScope {this.cogroup(other, partitioner).flatMapValues {case (vs, Seq()) => vs.iterator.map(v => (Some(v), None))case (Seq(), ws) => ws.iterator.map(w => (None, Some(w)))case (vs, ws) => for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), Some(w))}}
从上面的代码里面,发现 fullOuterJoin 的实现和 join 实现差不多,但是多了对应左右列表为空的处理。这和 SQL 这的 full join 的语义是相同的。这个的 case 是 scala 里面的偏函数,在 scala 的源码中应用非常的广泛。
知道了 join 和 fullOuerJoin ,就再来看看 leftOuterJoin 和 rightOuterJoin :
// leftOuterJoin 也是 PairRDDFuntion 的函数def leftOuterJoin[W](other: RDD[(K, W)],partitioner: Partitioner): RDD[(K, (V, Option[W]))] = self.withScope {this.cogroup(other, partitioner).flatMapValues { pair =>if (pair._2.isEmpty) {pair._1.iterator.map(v => (v, None))} else {for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w))}}}
// rightOuterJoin 也是 PairRDDFunction 的函数def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))] = self.withScope {this.cogroup(other, partitioner).flatMapValues { pair =>if (pair._1.isEmpty) {pair._2.iterator.map(w => (None, w))} else {for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w)}}}
从上面的代码展示来看看,[left | right]OuterJoin 和 SQL 中的 left join 和 right join 的语义也是相同的。
在这里可以得到一个结论,join、leftOuterJoin、rightOuterJoin、fullOuterJoin 的计算效率其实是相同的,都取决于 cogroup 的效率。
最后看一个 RDD 中的函数 intersection
def intersection(other: RDD[T]): RDD[T] = withScope {this.map(v => (v, null)).cogroup(other.map(v => (v, null))).filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }.keys}
从上面的代码中可以看到,是去掉了左右没有关联到的数据。这和 SQL 里面的 inner join 的语义是一致的。其实是和 join 逻辑相似,但是并没有将左右边的元素进行笛卡尔积的计算。
在实现维表关联的场景下,还有一个重要的算子,就是 broadcast 算子。来看一个例子。
val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)val sc = new SparkContext(conf)//设置文件切分大小sc.hadoopConfiguration.setLong("fs.local.block.size",128*1024*1024)//数据合并,有大量的数据移动val productRDD: RDD[(String, String)] = sc.parallelize(List("product1;10")).map { line =>val field = line.split(";")(field(0), line)}//广播变量val productBC: Broadcast[collection.Map[String, String]] = sc.broadcast(productRDD.collectAsMap())//map task 完成数据准备val orderInfoRDD: RDD[(String, String)] = sc.parallelize(List("order1;10;product1")).map { line =>val field = line.split(";")(field(2), line)}//map 端的joinval resultRDD: RDD[(String, (String, String))] = orderInfoRDD.map {case (pid, orderInfo) => {val product: collection.Map[String, String] = productBC.value(pid, (orderInfo, product.getOrElse(pid, null)))}}resultRDD.foreach(println)Thread.sleep(100)sc.stop()
广播是非常好的优化方式,他会将维表的一个副本复制到各个分区里面,然后就可以和进行拉宽做了。
控制类的算子
控制类算子是一类和优化相关的算子。
例如,当我们重复利用一些计算结果的时候,可以将中间的计算结果保存到缓存中。例如,我计算某个商品在某些城市的销售额,我们希望计算每天、每周、每月的销售额,我们希望计算这些,怎么计算呢?我们可以计算先计算出每天的销售额,然后在这个基础上计算出每周和每月的销售额。
val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)val sc = new SparkContext(conf)sc.setLogLevel("ERROR")val dimDataRDD:RDD[String] = sc.parallelize(List("2024-02-29,1,02","2024-03-01,2,03","2024-03-01,2,03","2024-03-02,2,03","2024-03-03,2,03","2024-03-04,2,03"))val dimDateBroadcast = sc.broadcast(dimDataRDD.map(x => {val strings = x.split(",")(strings(0), x)}).collectAsMap())val orderRDD:RDD[String] = sc.parallelize(List("2024-03-01,order1,product1,city1,10", "2024-03-01,order1,product1,city3,11", "2024-03-01,order1,product1,city4,11", "2024-03-01,order1,product1,city5,34", "2024-03-01,order1,product1,city3,13", "2024-03-01,order1,product1,city3,33", "2024-03-02,order1,product1,city3,19", "2024-03-02,order1,product1,city4,13", "2024-03-02,order1,product1,city5,34", "2024-03-02,order1,product1,city3,19", "2024-03-02,order1,product1,city1,38", "2024-03-03,order1,product1,city5,34", "2024-03-03,order1,product1,city3,19", "2024-03-03,order1,product1,city1,38", "2024-03-04,order1,product1,city5,34", "2024-03-04,order1,product1,city3,19", "2024-03-04,order1,product1,city1,38", "2024-02-29,order1,product1,city1,38", "2024-02-28,order1,product1,city1,12"))val byDay = orderRDD.map(x => {val strings = x.split(",")((strings(0), strings(3)), strings(4).toInt)}).reduceByKey((s: Int, x: Int) => {s + x})println("====by day city sum(amt)===")byDay.foreach(println)val byDayWithMonthAndWeek = byDay.map(x => {val dimss = dimDateBroadcast.valueval rs = dimss.getOrElse(x._1._1, Nil) match {case Nil => ("", "")case str: String => {val strings = str.split(",")(strings(1), strings(2))}}//日期+城市 周 月(x._1, rs._1, rs._2, x._2)})byDayWithMonthAndWeek.cache()println("====by city , week sum(amt)===")byDayWithMonthAndWeek.keyBy(x => (x._1._2 ,x._2)).combineByKey((a:((String,String) , String , String , Int)) => a._4,(s:Int , x:((String,String) , String , String , Int))=>{s + x._4},(x:Int , y:Int) => {x + y}).foreach(println)println("====by city , month sum(amt)===")byDayWithMonthAndWeek.keyBy(x => (x._1._2 ,x._3)).combineByKey((a:((String,String) , String , String , Int)) => a._4,(s:Int , x:((String,String) , String , String , Int))=>{s + x._4},(x:Int , y:Int) => {x + y}).foreach(println)
上面的代码完成了最初的逻辑。我们来看看里面的执行过程。
从上面的图中可以看到,有四个 job 执行,从函数来看,
0 号代表广播流
1 号代表了输出的是天、城市粒度下销售额
2 号代表输出的是城市、周粒度下的销售额汇总
3 号代表了输出的是城市、月粒度下的销售额汇总
咱们一个一个点进去看看
从 2 和 3 中可以看到前面 Stage6 和 stage3 都是 skip 的,在 Stage7 和 Stage8 中的 map 是有一个绿色的标识的,此标识就是代表了使用缓存。如果讲 cache 去掉的话。在来看看效果。
从上面的图中,可以看到 Stage7 和 Stage4 中的 map 没有绿色的标识了,说明缓存已经没了。
下面一个是 persist(), 我们先一个 cache() 的源码:
def cache(): this.type = persist()def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
从上面的代码,可以知道,cache 是 persist 实现的,而且 persist 里面可以设置不同的保存级别:
val NONE = new StorageLevel(false, false, false, false)val DISK_ONLY = new StorageLevel(true, false, false, false)val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)val DISK_ONLY_3 = new StorageLevel(true, false, false, false, 3)val MEMORY_ONLY = new StorageLevel(false, true, false, true)val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
它起的名字还是见名知意的。所以它可以保存缓存到内存、磁盘、堆外,并且可以序列化。
其实这些都不够安全,最安全的办法就是缓存保存到 HDFS 中。这样就最保险了。也是就有 checkpoint()
spark 的 checkpoint 值是将中间结果缓存,达到中间数据重复使用的效果,和 Flink 对比,Flink 的 checkpint 本质是一种分布式事务,可以协调各个算子完成同一批数据处理,通过checkpoin 就能实现 exactly-once 的语义,但是 spark 就不能这样。
下一个重要的控制算子就是 Accumulator ,它是一个全局性的累计器。可以保存全局性的累计值。下面是是用 accumulator 实现的 workcount。
object TestAccumulate {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("test_accumulate").setMaster("local[*]")val sc = new SparkContext(conf)val value:RDD[String] = sc.makeRDD(List("Hi,", "Hello", "ww", "hhh"))val sum:LongAccumulator = sc.longAccumulator("sum")val accu:MyAccumulator = new MyAccumulator()sc.register(accu , name = "accu")value.foreach(x => {accu.add(x)sum.add(1)})val value1: mutable.Map[String, Int] = accu.valueprintln(s"size ${value1.size}")println(s"sum ${sum.value}")for( (key ,value) <- value1){println(s"key is ${key} , and value is ${value}")}sc.stop()}}
class MyAccumulator extends AccumulatorV2[String , mutable.Map[String , Int]]{// 定义一个集合,来记录以 H 开头的字符串的个数var map:mutable.Map[String , Int] = mutable.Map[String , Int]()override def isZero: Boolean = trueoverride def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = {val rs:MyAccumulator = new MyAccumulator()rs.map = this.maprs}override def reset(): Unit = {this.map.clear()}override def add(v: String): Unit = {if(v.startsWith("H")){this.map.put(v , map.getOrElse(v , 0) + 1)}}override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {val map1:mutable.Map[String , Int] = mapval map2:mutable.Map[String , Int] = other.valuemap = map1.foldLeft(map2) {(map2, kv) => {map2(kv._1) = map2.getOrElse(kv._1, 0) + kv._2map2}}}override def value: mutable.Map[String, Int] = this.map
}
Accumulator 的计算逻辑和 combinerByKey 的逻辑十分的相似。Spark 还为我们预定义了三个累加器,longAccumulator、doubleAcculator、collectionAccumulator 三个,其实现方式也是继承了AccumulatorV2 类,然后,在 SparkContext 中注册就可以使用了。
coalesce 是将 RDD 中的分区重新划分分区,这个的作用可以处理数据倾斜的问题,其实数据倾斜的根源就是在于分区中有多有的少,我们可以使用 key 值的组合,然后重新分区达到各个分区数据量差不多的情况。
action 算子
action 算子是真正出发计算的算子,在 action 算子之前, 所有的算子就像流水线上的一个工序,按照我们想要的结果设置好了加工模具,action 才能够决一个 job 的开始。一个 Spark 任务中,可以有多个 job ,一个 job 里面可以多个 Stage 。
其中,kafka 是后来使用 KafkaUtil 加入的 Kafka 的消费算子,其他都是 RDD 中自带的算子,这些算子中的共同特点是源码中都使用了 SparkContext#runJob()
下面来看看 foreach 函数:
def foreach(f: T => Unit): Unit = withScope {val cleanF = sc.clean(f)sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))}
下面来说是 KafkaUtils 这是 Spark 扩展,它的功能是让 Spark 可以消费和生产 Kafka 里面的数据,这样 Spark 就能处理流式计算了。
数据源算子
数据源算子是设置数据源的算子。
textfile 是从文件系统中取出数据,可以是 disk 中,或者从 HDFS 中拉出来。
parallism 是可以从 List 消费数据,这个算子经常用来测试功能。
相关文章:

Spark 核心API
核心 API spark core API 指的是 spark 预定义好的算子。无论是 spark streaming 或者 Spark SQL 都是基于这些最基础的 API 构建起来的。理解这些核心 API 也是写出高效 Spark 代码的基础。 Transformation 转化类的算子是最多的,学会使用这些算子就应付多数的数…...

OpenLayers线性渐变和中心渐变(径向渐变)
目录 1.前言2.添加一个面要素3.线性渐变3.1 第一个注意点3.2 第二个注意点 4.中心渐变(径向渐变)5.总结 1.前言 OpenLayers官网有整个图层的渐变示例,但是没有单个要素的渐变示例,我们这里来补充一下。OpenLayers中的渐变是通过fi…...

[210. 课程表 II] 拓扑排序模板(DFS+BFS)
Problem: 210. 课程表 II 文章目录 思路解题方法Code 思路 本题是经典拓扑排序模板,通过DFS和BFS两种方式进行实现。 解题方法 DFS DFS方法的重点在于如何标记节点状态,初做题者如果只用未访问和已访问两种状态很容易陷入死结。正确的做法是使用三种状…...

我的第一个python web 网站
# -*- coding: utf-8 -*-import http.server import socketserver from datetime import datetimePORT 8000import sys# ...class MyHandler(http.server.SimpleHTTPRequestHandler):def do_GET(self):if self.path /:# 如果路径是根路径,返回页面内容self.send_r…...

产品展示型wordpress外贸网站模板
孕婴产品wordpress外贸网站模板 吸奶器、待产包、孕妇枕头、护理垫、纸尿裤、孕妇装、孕婴产品wordpress外贸网站模板。 https://www.jianzhanpress.com/?p4112 床品毛巾wordpress独立站模板 床单、被套、毛巾、抱枕、靠垫、围巾、布艺、枕头、乳胶枕、四件套、浴巾wordpre…...

四信全球化拓展再启新篇!LoRa传感器与云平台领航智能感知时代
随着科技浪潮的不断推进,物联网已逐渐融入我们的生活。刚刚结束的MWC24盛会上,四信带来了一系列前沿技术成果,不仅将5G技术成功扩展至当前市场主流类型的终端,更携手联通、ASR等业界巨头,在连接、5G RedCap、AI、LoRa以…...

阿里云k8s环境下,因slb限额导致的发布事故
一、背景 阿里云k8s容器,在发布java应用程序的时候,客户端访问出现500错误。 后端服务是健康且可用的,网关层大量500错误请求,slb没有流入和流出流量。 经过回滚,仍未能解决错误。可谓是一次血的教训,特…...

【STM32+OPENMV】矩形识别
一、准备工作 有关OPENMV最大色块追踪及与STM32通信内容,详情见【STM32HAL】与OpenMV通信 二、所用工具 1、芯片:STM32F103C8T6 2、CUBEMX配置软件 3、KEIL5 4、OPENMV 三、实现功能 寻找黑色矩形,并将最大矩形的四个边缘坐标发送给STM…...

在吗?腾讯云服务器优惠价格表曝光_2023年3月报价请过目!
腾讯云服务器多少钱一年?61元一年起,2核2G3M配置,腾讯云2核4G5M轻量应用服务器165元一年、756元3年,4核16G12M服务器32元1个月、312元一年,8核32G22M服务器115元1个月、345元3个月,腾讯云服务器网txyfwq.co…...

Revit-二开之创建Plane-(7)
2016版本的Plane 2017版本的Plane 2018版本及以上版本的Plane 由此可见2017版本是一个分水岭 #if REVIT2016Plane plane = new Plane(uiDoc.Document.ActiveView...

【操作系统学习笔记】文件管理1.2
【操作系统学习笔记】文件管理1.2 参考书籍: 王道考研 视频地址: Bilibili 文件的逻辑结构 无结构文件 文件内部的数据就是一系列的二进制流或字符流组成,又称流式文件,例如 .text 文件 有结构文件 由一组相似的记录组成,又称记录式文件…...

算法归纳【数组篇】
目录 二分查找1. 前提条件:2. 二分查找边界 2.移除元素有序数组的平方长度最小的子数组59.螺旋矩阵II54. 螺旋矩阵 二分查找 参考链接 https://programmercarl.com/0704.%E4%BA%8C%E5%88%86%E6%9F%A5%E6%89%BE.html#%E6%80%9D%E8%B7%AF 1. 前提条件: 数…...

【随笔】程序员如何选择职业赛道,目前各个赛道的现状如何,那个赛道前景巨大
大家好,我是全栈小5,欢迎阅读文章! 此篇是【话题达人】系列文章,这一次的话题是《程序员如何选择职业赛道》 目录 背景热度柱状图赛道热度C/C云原生人工智能前沿技术软件工程后端JavaJavascriptPHPPython区块链大数据移动开发嵌入…...

进程之舞:操作系统中的启动、状态转换与唤醒艺术
✨✨ 欢迎大家来访Srlua的博文(づ ̄3 ̄)づ╭❤~✨✨ 🌟🌟 欢迎各位亲爱的读者,感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua,在这里我会分享我的知识和经验。&#x…...

Java面试(4)之 Spring Bean生命周期过程
一, 整个加载的完整链路图 更详细的生命周期函数链路图(仅供参考) 二, Bean实例化的四种方式: 1, 无参构造器(默认且常用)6 2, 静态工厂方法方式(factory-method指定实例化的静态方法) 3, 实例工厂方法方式(factory-bean指定bean的name,factory-method指定实例化方法) 4, 实…...

JavaSE——面向对象高级一(1/4)-static修饰成员变量、应用场景,static修饰成员方法、应用场景
目录 static修饰成员变量 类变量的应用场景 static修饰成员方法 static修饰成员方法的应用场景 static 叫静态,可可以修饰成员变量、成员方法。 成员变量按照有无static修饰,分为两种: 类变量实例变量(对象的变量ÿ…...

轻量脚本语言Lua的配置与c++调用
文章目录 lua配置下载运行lua命令lua脚本的执行C++调用lua环境配置错误和警告测试c++程序lua脚本结果Lua是一种功能强大且快速的编程语言,易于学习和使用,并且可以嵌入到应用程序中。 Lua被设计成一种轻量级的可嵌入脚本语言。它被用于各种各样的应用程序,从游戏到web应用程…...

力扣每日一道系列 --- LeetCode 160. 相交链表
📷 江池俊: 个人主页 🔥个人专栏: ✅数据结构探索 ✅LeetCode每日一道 🌅 有航道的人,再渺小也不会迷途。 LeetCode 160. 相交链表 思路: 首先计算两个链表的长度,然后判断两个链…...

设计模式-建造者模式实践案例
建造者模式(Builder Pattern)是一种创建型设计模式,它提供了一种创建对象的最佳方式。当一个对象需要多个部分或许多步骤来创建,并且需要将创建过程与表示分离时,建造者模式非常有用。建造者模式旨在找到一个解决方案&…...

freeRTOS_20240308
1.使用ADC采样光敏电阻数值,如何根据这个数值调节LED灯亮度。 HAL_TIM_PWM_Start(&htim3,TIM_CHANNEL_3); while (1){/* USER CODE END WHILE *//* USER CODE BEGIN 3 */HAL_ADC_Start(&hadc);adc_val HAL_ADC_GetValue(&hadc);printf("adc_va…...

利用chatgpt写论文使用教程
ChatGPT是人工智能技术的一种,可帮助人们综合运用和分析各种语言技巧,从而优化实验结果、加速研究流程以及提高文章质量。以下是利用ChatGPT写论文的使用教程: 综上所述,利用ChatGPT写论文涉及到一些技巧和方法,需要合…...

SMiC矩阵将于3月6日正式上线,开启数字化经济新纪元
在数字化浪潮的推动下,全球瞩目的SMiC矩阵将于2024年3月6日正式上线。这一里程碑式的事件标志着数字化经济迈入了一个全新的时代,为思洣客、合作伙伴和整个经济生态带来了前所未有的机遇和挑战。 SMiC矩阵作为引领数字化经济的新力量,始终致…...

备战蓝桥杯---动态规划的一些思想2
话不多说,直接看题: 1.换根DP: 我们肯定不能对每一个根节点暴力求,我们不妨先求f[1],我们发现当他的儿子作为根节点时深度和为f[1](n-cnt[i])-cnt[i](cnt[i]表示以i为根的节点数),这样子两遍DFS…...

卫星导航 | 坐标系---地理坐标系与UTM坐标系
卫星导航 | 坐标系---地理坐标系与UTM坐标系 世界坐标系地理坐标系UTM坐标系 全球卫星导航系统(Global Navigation Satelite System,GNSS),简称卫星导航,是室外机器人定位的一个主要信息来源。 卫星导航能给机器人提供什么信息? 正常工作时&…...

JavaEE之volatile关键字
一.内存可见性问题 什么是内存可见性问题 计算机运行的程序/代码,往往需要访问数据。这些数据往往存在于内存中。 cup使用此变量时,就会把内存中的数据先读出来,加载到cpu寄存器中,再去参与运算。 但是,关键是cpu读…...

代码学习记录10
随想录日记part10 t i m e : time: time: 2024.03.03 主要内容:今天的主要内容是深入了解数据结构中栈和队列,并通过三个 l e e t c o d e leetcode leetcode 题目深化认识。 20. 有效的括号1047. 删除字符串中的所有…...

java——2024-03-03
String类的对象能被修改吗?如果不能需要用什么修改?StringBuilder和StringBuffer的区别?equals和区别谈谈对面向对象的理解重载和重写的区别说一下ArrayList,LinkedList底层实现以及区别什么是哈希冲突?hashMap和conCu…...

Ubuntu安装conda以后,给jupyter安装C++内核
前言 大家都知道,jupyter notebook 可以支持python环境,可以在不断点调试的情况下,打印出当前结果,如果代码错了也不影响前面的内容。于是我就想有没有C环境的,结果还真有。 参考文章: 【分享】Ubuntu安装…...

【谈判】核心思想(抓大放小)
谈判交换(抓大放小) 一、明确目的 事:must: 非要不可,才会签字 want: 有很好, give: 放掉 三者,会变化 二、明确对象 人:我跟谁谈? 时: 国际形势、国家的政策、他的心…...

洛谷P5908 猫猫和企鹅 做题反思(2024.3.7)
猫猫和企鹅 题目传送门 题目描述 王国里有 n n n 个居住区,它们之间有 n − 1 n-1 n−1 条道路相连,并且保证从每个居住区出发都可以到达任何一个居住区,并且每条道路的长度都为 1 1 1。 除 1 1 1 号居住区外,每个居住区住…...