当前位置: 首页 > news >正文

Spark-RDD持久化

一、Spark的三种持久化机制

1、cache

它是persist的一种简化方式,作用是将RDD缓存到内存中,以便后续快速访问,提高计算效率。cache操作是懒执行的,即执行action算子时才会触发。

2、persist

它提供了不同的存储级别(仅磁盘、仅内存、内存或磁盘、内存或磁盘+副本数、序列化后存入内存或磁盘、堆外)可以根据不同的应用场景进行选择。

3、checkpoint

它将数据永久保存,用于减少长血缘关系带来的容错成本。checkpoint不仅保存了数据,还保存了计算该数据的算子操作。当需要恢复数据时,可以通过这些操作重新计算,而不仅仅是依赖于原始数据。且在作业完成后仍然保留,可以用于后续的计算任务。

二、用法示例

1、cache

//制作数据
val data: RDD[Int] = sc.parallelize( 1 to 10000)
//简单加工
val tempRdd: RDD[(String, Int)] = data.map(num=>if(num%2==0)("even",num)else("odd",num))
//缓存
tempRdd.cache()
//调用action算子运行
tempRdd.foreach(println)

 我们看下tempRdd的存储情况:

2、persist

//制作数据
val data: RDD[Int] = sc.parallelize( 1 to 10000)
//简单加工
val tempRdd: RDD[(String, Int)] = data.map(num=>if(num%2==0)("even",num)else("odd",num))
//持久化
tempRdd.persist(StorageLevel.MEMORY_AND_DISK)
//调用action算子运行
tempRdd.foreach(println)

 

3、checkpoint

//使用checkpoint之前需要用sc先设置检查点目录
sc.setCheckpointDir("./local-spark/checkpoint-data")
//制作数据
val data:RDD[Int] = sc.parallelize( 1 to 10000)
//简单加工
val tempRdd:RDD[(String, Int)] = data.map(num=>if(num%2==0)("even",num)else("odd",num))
//持久化
tempRdd.persist(StorageLevel.MEMORY_AND_DISK)
//创建checkpoint 会触发job
tempRdd.checkpoint()
//调用action算子运行
tempRdd.foreach(println)

从历史服务界面可以观察到,该程序启动了两个job(在源码分析中我们就会知道原因)

 

我们再看下两个job的DAG

发现重复的计算跑了两次,因此我们在使用checkpoint前一般都会添加一个persist来进行加速

下面是添加完persist后再进行checkpoint的DAG,虽然也是两个Job,但是tempRdd上的那个点变了颜色,这意味着tempRdd之前的步骤就不用重复计算了

 

三、源码分析

1、cache

//使用默认存储级别(`MEMORY_ONLY`)持久化此RDD
def cache(): this.type = persist()
//其实背后就是使用的persist
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

2、persist

RDD

abstract class RDD[T: ClassTag](@transient private var _sc: SparkContext,@transient private var deps: Seq[Dependency[_]]) extends Serializable with Logging {//设置此RDD的存储级别,以便在第一次计算后跨操作持久化其值。/只有当RDD尚未设置存储级别时,这才能用于分配新的存储级别。本地检查点是一个例外。def persist(newLevel: StorageLevel): this.type = {if (isLocallyCheckpointed) {//这意味着用户之前调用了localCheckpoint(),它应该已经将此RDD标记为持久化。//在这里,我们应该用用户明确请求的存储级别(在将其调整为使用磁盘后)覆盖旧的存储级别。persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)} else {persist(newLevel, allowOverride = false)}}//标记此RDD以使用指定级别进行持久化//newLevel 目标存储级别//allowOverride 是否用新级别覆盖任何现有级别private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {// 如果想要重新调整一个RDD的存储级别,就必须将allowOverride 置为 trueif (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {throw new UnsupportedOperationException("Cannot change storage level of an RDD after it was already assigned a level")}// 如果这是第一次将此RDD标记为持久化,请在SparkContext中注册它以进行清理和核算。只做一次。if (storageLevel == StorageLevel.NONE) {sc.cleaner.foreach(_.registerRDDForCleanup(this))//注册此RDD以持久化在内存和/或磁盘存储中sc.persistRDD(this)}//设置该RDD的storageLevel 以便在Task计算时直接获取数据,来加速计算storageLevel = newLevelthis}//迭代器嵌套计算,如果该RDD是持久化的,就直接获取数据封装成iterator给后续RDD使用final def iterator(split: Partition, context: TaskContext): Iterator[T] = {if (storageLevel != StorageLevel.NONE) {getOrCompute(split, context)} else {computeOrReadCheckpoint(split, context)}}//获取或计算RDD分区private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {val blockId = RDDBlockId(id, partition.index)var readCachedBlock = true// 此方法在executors上调用,因此需要调用SparkEnv.get而不是sc.env 获取blockManager//接下来我们看下BlockManager的getOrElseUpdate方法//最后一个参数是一个匿名函数,如果缓存中没有块,需要调用它来获取块SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {readCachedBlock = falsecomputeOrReadCheckpoint(partition, context)}) match {// Block hit.case Left(blockResult) =>if (readCachedBlock) {val existingMetrics = context.taskMetrics().inputMetricsexistingMetrics.incBytesRead(blockResult.bytes)new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {override def next(): T = {existingMetrics.incRecordsRead(1)delegate.next()}}} else {new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])}// Need to compute the block.case Right(iter) =>new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])}}//当缓存中没有块时调用它来制作块private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] ={if (isCheckpointedAndMaterialized) {//如果checkpointed和materialized 那么直接返回firstParent[T].iterator(split, context)} else {//继续计算,通过迭代器嵌套计算,知道读取到有持久化的块或者进行Shuffle或者最初的数据源compute(split, context)}}}

SparkContext

class SparkContext(config: SparkConf) extends Logging {//跟踪所有持久的RDDprivate[spark] val persistentRdds = {val map: ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]()map.asScala}private[spark] def persistRDD(rdd: RDD[_]) {persistentRdds(rdd.id) = rdd}}

BlockManager

private[spark] class BlockManager(val executorId: String,rpcEnv: RpcEnv,val master: BlockManagerMaster,val serializerManager: SerializerManager,val conf: SparkConf,memoryManager: MemoryManager,mapOutputTracker: MapOutputTracker,shuffleManager: ShuffleManager,val blockTransferService: BlockTransferService,securityManager: SecurityManager,externalBlockStoreClient: Option[ExternalBlockStoreClient])extends BlockDataManager with BlockEvictionHandler with Logging {//如果给定的块存在,则检索它,//否则调用提供的`makeIterator `方法来计算该块,持久化它,并返回其值。def getOrElseUpdate[T](blockId: BlockId,level: StorageLevel,classTag: ClassTag[T],makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {// 尝试从本地或远程存储读取块。如果它存在,那么我们就不需要通过local-get-or-put路径。get[T](blockId)(classTag) match {case Some(block) =>return Left(block)case _ =>// 没有获取到块,需要计算,如果该RDD设置了持久化就对其持久化}// 最初,我们在这个块上没有锁.doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {case None =>// doPut() 没有将工作交还给我们,因此该块已经存在或已成功存储。//因此,我们现在在块上持有读取锁。val blockResult = getLocalValues(blockId).getOrElse {// 由于我们在doPut()和get()调用之间保持了读取锁,因此该块不应该被驱逐,因此get()不返回该块表示存在一些内部错误releaseLock(blockId)throw new SparkException(s"get() failed for block $blockId even though we held a lock")}// 我们已经通过doPut()调用在块上持有读取锁,getLocalValue()再次获取锁,因此我们需要在这里调用releaseLock(),这样锁获取的净次数为1(因为调用者只会调用release())一次)。releaseLock(blockId)Left(blockResult)case Some(iter) =>// put失败,可能是因为数据太大,无法放入内存,无法放入磁盘。因此,我们需要将输入迭代器传递回调用者,以便他们可以决定如何处理这些值(例如,在不缓存的情况下处理它们)。Right(iter)}}//根据给定级别将给定块放入其中一个块存储中,必要时复制值//如果该块已存在,则此方法不会覆盖它。private def doPutIterator[T](blockId: BlockId,iterator: () => Iterator[T],level: StorageLevel,classTag: ClassTag[T],tellMaster: Boolean = true,keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]] = {doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>val startTimeNs = System.nanoTime()var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator[T]] = None// 块的大小(字节)var size = 0L//如果RDD持久化选择有内存if (level.useMemory) {// 先把它放在内存中,即使它也将useDisk设置为true;如果内存存储无法容纳它,我们稍后会将其放入磁盘。//如果RDD持久化选择需要反序列化 if (level.deserialized) {//尝试将给定块作为值放入内存存储中memoryStore.putIteratorAsValues(blockId, iterator(), level.memoryMode, classTag) match {case Right(s) =>size = scase Left(iter) =>// 没有足够的空间展开此块;如果持久化也选择了磁盘,请下载到磁盘if (level.useDisk) {logWarning(s"Persisting block $blockId to disk instead.")diskStore.put(blockId) { channel =>val out = Channels.newOutputStream(channel)serializerManager.dataSerializeStream(blockId, out, iter)(classTag)}size = diskStore.getSize(blockId)} else {iteratorFromFailedMemoryStorePut = Some(iter)}}} else { // RDD持久化没有选择反序列化//尝试将给定块作为字节放入内存存储中memoryStore.putIteratorAsBytes(blockId, iterator(), classTag, level.memoryMode) match {case Right(s) =>size = scase Left(partiallySerializedValues) =>// 没有足够的空间展开此块;如果持久化也选择了磁盘,请下载到磁盘if (level.useDisk) {logWarning(s"Persisting block $blockId to disk instead.")diskStore.put(blockId) { channel =>val out = Channels.newOutputStream(channel)partiallySerializedValues.finishWritingToStream(out)}size = diskStore.getSize(blockId)} else {iteratorFromFailedMemoryStorePut = Some(partiallySerializedValues.valuesIterator)}}}//RDD持久化时也选择了磁盘} else if (level.useDisk) {diskStore.put(blockId) { channel =>val out = Channels.newOutputStream(channel)serializerManager.dataSerializeStream(blockId, out, iterator())(classTag)}size = diskStore.getSize(blockId)}val putBlockStatus = getCurrentBlockStatus(blockId, info)val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValidif (blockWasSuccessfullyStored) {// 现在该块位于内存或磁盘存储中,请将其告知主机info.size = sizeif (tellMaster && info.tellMaster) {reportBlockStatus(blockId, putBlockStatus)}addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)logDebug(s"Put block $blockId locally took ${Utils.getUsedTimeNs(startTimeNs)}")//如果RDD持久化选择的副本数大于1if (level.replication > 1) {val remoteStartTimeNs = System.nanoTime()val bytesToReplicate = doGetLocalBytes(blockId, info)val remoteClassTag = if (!serializerManager.canUseKryo(classTag)) {scala.reflect.classTag[Any]} else {classTag}try {replicate(blockId, bytesToReplicate, level, remoteClassTag)} finally {bytesToReplicate.dispose()}logDebug(s"Put block $blockId remotely took ${Utils.getUsedTimeNs(remoteStartTimeNs)}")}}assert(blockWasSuccessfullyStored == iteratorFromFailedMemoryStorePut.isEmpty)iteratorFromFailedMemoryStorePut}}}

3、checkpoint

RDD

//将此RDD标记为检查点。它将被保存到使用`SparkContext#setCheckpointDir`设置的检查点目录中的一个文件中,并且对其父RDD的所有引用都将被删除。必须在此RDD上执行任何作业之前调用此函数。强烈建议将此RDD持久化在内存中,否则将其保存在文件上将需要重新计算。
def checkpoint(): Unit = RDDCheckpointData.synchronized {// 注意:由于下游的复杂性,我们在这里使用全局锁来确保子RDD分区指向正确的父分区。今后我们应该重新考虑这个问题。if (context.checkpointDir.isEmpty) {//SparkContext中尚未设置检查点目录 , 因此使用之前需要用sc先设置检查点目录throw new SparkException("Checkpoint directory has not been set in the SparkContext")} else if (checkpointData.isEmpty) {checkpointData = Some(new ReliableRDDCheckpointData(this))}
}

ReliableRDDCheckpointData

private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])extends RDDCheckpointData[T](rdd) with Logging {//........省略..........//将此RDD具体化,并将其内容写入可靠的DFS。在该RDD上调用的第一个action 完成后立即调用。protected override def doCheckpoint(): CheckpointRDD[T] = {//将RDD写入检查点文件,并返回表示RDD的ReliableCheckpointRDDval newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)// 如果引用超出范围,可以选择清理检查点文件if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {rdd.context.cleaner.foreach { cleaner =>cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)}}logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")newRDD}}

ReliableCheckpointRDD

private[spark] object ReliableCheckpointRDD extends Logging {def writeRDDToCheckpointDirectory[T: ClassTag](originalRDD: RDD[T],checkpointDir: String,blockSize: Int = -1): ReliableCheckpointRDD[T] = {val checkpointStartTimeNs = System.nanoTime()val sc = originalRDD.sparkContext// 为检查点创建输出路径val checkpointDirPath = new Path(checkpointDir)val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)if (!fs.mkdirs(checkpointDirPath)) {throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath")}// 保存到文件,并将其重新加载为RDDval broadcastedConf = sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration))// 这很昂贵,因为它不必要地再次计算RDD ,因此一般都会在检查点前调用持久化sc.runJob(originalRDD,writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)if (originalRDD.partitioner.nonEmpty) {//将分区器写入给定的RDD检查点目录。这是在尽最大努力的基础上完成的;写入分区器时的任何异常都会被捕获、记录并忽略。writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)}val checkpointDurationMs =TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - checkpointStartTimeNs)logInfo(s"Checkpointing took $checkpointDurationMs ms.")//从以前写入可靠存储的检查点文件中读取的RDDval newRDD = new ReliableCheckpointRDD[T](sc, checkpointDirPath.toString, originalRDD.partitioner)if (newRDD.partitions.length != originalRDD.partitions.length) {throw new SparkException(s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has different " +s"number of partitions from original RDD $originalRDD(${originalRDD.partitions.length})")}newRDD}}

什么时候对RDD进行checkpoint

当该RDD所属的Job执行后再对该RDD进行checkpoint

class SparkContext(config: SparkConf) extends Logging {def runJob[T, U: ClassTag](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],resultHandler: (Int, U) => Unit): Unit = {//执行任务dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)//递归调用父RDD查看是否要进行checkpointrdd.doCheckpoint()}}abstract class RDD[T: ClassTag](... ) extends Serializable with Logging {//递归函数private[spark] def doCheckpoint(): Unit = {RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {if (!doCheckpointCalled) {doCheckpointCalled = trueif (checkpointData.isDefined) {if (checkpointAllMarkedAncestors) {// 我们可以收集所有需要检查点的RDD,然后并行检查它们。首先检查父母,因为我们的血统在检查自己后会被截断dependencies.foreach(_.rdd.doCheckpoint())}checkpointData.get.checkpoint()} else {dependencies.foreach(_.rdd.doCheckpoint())}}}}
}

总结

1、RDD执行checkpoint方法,对该RDD进行标记

2、RDD所在的Job执行

3、执行完会用这个Job最后的RDD递归向父寻找,找到所有的被标记需要checkpoint的RDD,再次调用runJob启动任务,将这个RDD进行checkpoint

所以我们在对RDD进行checkpoint前一般会对其persist

相关文章:

Spark-RDD持久化

一、Spark的三种持久化机制 1、cache 它是persist的一种简化方式,作用是将RDD缓存到内存中,以便后续快速访问,提高计算效率。cache操作是懒执行的,即执行action算子时才会触发。 2、persist 它提供了不同的存储级别&#xff0…...

vue2中使用tailwindCss 详细教程

1、先看官方文档:https://www.tailwindcss.cn/ 2、先安装:npm install -D tailwindcss ---------------通过 npm 安装 tailwindcss,然后创建你自己的 create your tailwind.config.js 配置文件。 npm install -D tailwindcss 3、初始化文件—npx tailwindcss init npx ta…...

机器视觉工程师一直做调试,维护岗位,想转岗软件方面C#从零开始,快则三年不到,慢则一辈子不会

其实不是每一家做视觉检测,或者是做设备必须要机器视觉工程师开发,其实公司对标准软件更感兴趣,主要非常高的性价比,省时省钱省人。所以这里有个问题,就是公司平台的重要性,首先他对开发是刚需,…...

【初阶数据结构】详解二叉树 - 树和二叉树(三)(递归的魅力时刻)

文章目录 前言1. 二叉树链式结构的意义2. 手搓一棵二叉树3. 二叉树的遍历(重要)3.1 遍历的规则3.2 先序遍历3.3 中序遍历3.4 后序遍历3.5 遍历的代码实现3.5.1 先序遍历代码实现3.5.2 中序遍历代码实现3.5.3 后序遍历代码实现 4. 统计二叉树结点的个数5.…...

【QT】QWidget 重要属性

文章目录 enabledgeometrywindowTitlewindowIconqrc 机制windowOpacitycursorfontQFont toolTip 和 toolTipDurationfocusPolicyQt::FocusPolicy styleSheet enabled 作用:设置控件是否可使用. true 表⽰可用, false 表⽰禁用. 对应的API bool isEnabled(); // 获…...

什么是数据库连接池?为什么需要使用连接池?

什么是数据库连接池?为什么需要使用连接池? 什么是数据库连接池? 数据库连接池是一种创建和管理数据库连接的技术。在传统的应用程序中,每当需要与数据库进行交互时,都会创建一个新的数据库连接。 这种做法虽然简单…...

2024ICPC网络赛第一场C. Permutation Counting 4(线性代数)

题目链接 题目大意:给你n个范围[ l i , r i l_i,r_i li​,ri​],每个位置可以在这个范围中选择一个数,然后形成排列1到n的排列p。问p的所有情况的个数的奇偶性。 一个很妙的行列式转化,纯纯的线性代数。 首先,我们把…...

01.前端面试题之ts:说说如何在Vue项目中应用TypeScript?

文章目录 一、前言二、使用Componentcomputed、data、methodspropswatchemit 三 、总结 一、前言 与link类似 在VUE项目中应用typescript,我们需要引入一个库vue-property-decorator, 其是基于vue-class-component库而来,这个库vue官方推出…...

【HTTP】方法(method)以及 GET 和 POST 的区别

文章目录 方法(method)登录上传GET 和 POST 有什么区别(面试)区别不准确的说法 方法(method) 首行中的第一部分。首行是由方法、URL 和版本号组成 方法描述了这次请求想干什么,最主要的是&…...

Ubuntu NFS 搭建及配置

在 Ubuntu 上搭建和配置 NFS(Network File System)服务器,可以让其他设备通过网络访问共享的文件夹。以下是步骤指南: 1. 安装 NFS 服务器 首先,安装 NFS 服务器软件包: sudo apt update sudo apt insta…...

双十一好物推荐,这些值得入手的宝藏产品

随着双十一的钟声即将敲响,这个万众期待的购物盛宴就要来临!为了让大家避免在众多的商品中不知所措,妮妮精心筹备了一份购物清单,分享那些我亲身感受超棒,觉得十分值得购买的物品。 这些商品不但价格合理,而…...

秋招内推2025--招联金融

【投递方式】 直接扫下方二维码,或点击内推官网https://wecruit.hotjob.cn/SU61025e262f9d247b98e0a2c2/mc/position/campus,使用内推码 igcefb 投递) 【招聘岗位】 后台开发 前端开发 数据开发 数据运营 算法开发 技术运维 软件测试 产品策…...

C++类和对象——第二关

目录 类的默认成员函数: (一)构造函数 (二)析构函数 (三)拷贝构造函数 类的默认成员函数: 类里面有6个特殊的成员函数分别包揽不同的功能; (一)构造函数…...

服务器数据恢复—raid5阵列热备盘上线失败导致阵列崩溃的数据恢复案例

服务器磁盘阵列数据恢复环境: 服务器中有两组分别由4块SAS硬盘组建的raid5磁盘阵列,两组raid5阵列划分LUN,组成LVM结构,格式化为EXT3文件系统。 服务器磁盘阵列故障: 服务器中一组raid5阵列中有一块硬盘离线&#xff…...

Python与SQL Server数据库结合导出Excel并做部分修改

Python与SQL Server数据库结合导出Excel并做部分修改 需求:在数据库中提取需要的字段内容;并根据字段内容来提取与拆分数据做为新的列最后导出到Excel文件 # -*- coding: utf-8 -*- import pandas as pd import re import pymssql import timestart_ti…...

常见的TTL,RS232,RS485,IIC,SPI,UART之间的联系和区别

简单总结 图片来源 RS232,RS485可参考,IIC,SPI,UART可参考 烧录程序中常听到的一句话就是USB转TTL,但严格来说算是USB传输数据的协议转换成TTL(Transistor-Transistor Logic)协议传输数据。首先,usb是常见…...

【数据结构】栈和队列(Stack Queue)

引言 在对顺序表,链表有了充分的理解之后,现在让我们学习栈和队列!!! 【链表】 👈链表 【顺序表】👈顺序表 目录 💯栈 1.栈的概念及结构 2.栈的实现 ⭐初始化栈 ⭐入栈 ⭐…...

Vue.js基础

Vue.js https://v2.cn.vuejs.org/https://cn.vuejs.org/初识Vue 官网:Vue (读音 /vjuː/,类似于 view) 是一套用于构建用户界面的渐进式框架。与其它大型框架不同的是,Vue 被设计为可以自底向上逐层应用。Vue 的核心库只关注视图层&#xf…...

罐区紧急切断阀安装位置规范

在化工生产与储存的复杂环境中,罐区紧急切断阀的安装位置规范不仅是保障生产安全的关键一环,更是预防重大事故、减少损失的有效手段。在深入理解了罐区布局、物料特性及潜在风险后,对于紧急切断阀的安装位置,我们应遵循以下更为细…...

JavaScript 中的事件模型

JavaScript 中的事件模型是浏览器如何处理用户交互(如点击、键盘输入、鼠标移动等)或其他事件(如加载完成、定时器等)的机制。理解事件模型有助于我们处理这些事件并响应它们。JavaScript 的事件模型主要包括以下几个部分&#xf…...

测试微信模版消息推送

进入“开发接口管理”--“公众平台测试账号”,无需申请公众账号、可在测试账号中体验并测试微信公众平台所有高级接口。 获取access_token: 自定义模版消息: 关注测试号:扫二维码关注测试号。 发送模版消息: import requests da…...

AI书签管理工具开发全记录(十九):嵌入资源处理

1.前言 📝 在上一篇文章中,我们完成了书签的导入导出功能。本篇文章我们研究如何处理嵌入资源,方便后续将资源打包到一个可执行文件中。 2.embed介绍 🎯 Go 1.16 引入了革命性的 embed 包,彻底改变了静态资源管理的…...

代码随想录刷题day30

1、零钱兑换II 给你一个整数数组 coins 表示不同面额的硬币,另给一个整数 amount 表示总金额。 请你计算并返回可以凑成总金额的硬币组合数。如果任何硬币组合都无法凑出总金额,返回 0 。 假设每一种面额的硬币有无限个。 题目数据保证结果符合 32 位带…...

tomcat入门

1 tomcat 是什么 apache开发的web服务器可以为java web程序提供运行环境tomcat是一款高效,稳定,易于使用的web服务器tomcathttp服务器Servlet服务器 2 tomcat 目录介绍 -bin #存放tomcat的脚本 -conf #存放tomcat的配置文件 ---catalina.policy #to…...

日常一水C

多态 言简意赅:就是一个对象面对同一事件时做出的不同反应 而之前的继承中说过,当子类和父类的函数名相同时,会隐藏父类的同名函数转而调用子类的同名函数,如果要调用父类的同名函数,那么就需要对父类进行引用&#…...

Chrome 浏览器前端与客户端双向通信实战

Chrome 前端(即页面 JS / Web UI)与客户端(C 后端)的交互机制,是 Chromium 架构中非常核心的一环。下面我将按常见场景,从通道、流程、技术栈几个角度做一套完整的分析,特别适合你这种在分析和改…...

【LeetCode】算法详解#6 ---除自身以外数组的乘积

1.题目介绍 给定一个整数数组 nums,返回 数组 answer ,其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法,且在 O…...

【深度学习新浪潮】什么是credit assignment problem?

Credit Assignment Problem(信用分配问题) 是机器学习,尤其是强化学习(RL)中的核心挑战之一,指的是如何将最终的奖励或惩罚准确地分配给导致该结果的各个中间动作或决策。在序列决策任务中,智能体执行一系列动作后获得一个最终奖励,但每个动作对最终结果的贡献程度往往…...

篇章二 论坛系统——系统设计

目录 2.系统设计 2.1 技术选型 2.2 设计数据库结构 2.2.1 数据库实体 1. 数据库设计 1.1 数据库名: forum db 1.2 表的设计 1.3 编写SQL 2.系统设计 2.1 技术选型 2.2 设计数据库结构 2.2.1 数据库实体 通过需求分析获得概念类并结合业务实现过程中的技术需要&#x…...

RabbitMQ 各类交换机

为什么要用交换机? 交换机用来路由消息。如果直发队列,这个消息就被处理消失了,那别的队列也需要这个消息怎么办?那就要用到交换机 交换机类型 1,fanout:广播 特点 广播所有消息​​:将消息…...