当前位置: 首页 > news >正文

kafka 3.5 主题分区的Follower创建Fetcher线程从Leader拉取数据源码

Kakfa集群有主题,每一个主题下又有很多分区,为了保证防止丢失数据,在分区下分Leader副本和Follower副本,而kafka的某个分区的Leader和Follower数据如何同步呢?下面就是讲解的这个

首先要知道,Follower的数据是通过Fetch线程异步从Leader拉取的数据,不懂的可以看一下Kafka——副本(Replica)机制

  • 一、Broker接收处理分区的Leader和Follower的API
  • 二、针对分区副本的Leader和Follower的处理逻辑
    • 1、如果是Follower,则准备创建Fetcher线程,好异步执行向Leader拉取数据
    • 2、遍历分区Follower副本,判断是否有向目标broker现成的Fetcher,如果是则复用,否则创建
    • 3、创建Fetcher线程的实现

一、Broker接收处理分区的Leader和Follower的API

kafkaApis.scala

     //处理Leader和follower,ISR的请求case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
 def handleLeaderAndIsrRequest(request: RequestChannel.Request): Unit = {val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))//从请求头中获取关联ID//从请求体中获取LeaderAndIsrRequest对象。val correlationId = request.header.correlationIdval leaderAndIsrRequest = request.body[LeaderAndIsrRequest]//对请求进行集群操作的授权验证。authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)//检查broker的代数是否过时。if (zkSupport.isBrokerEpochStale(leaderAndIsrRequest.brokerEpoch, leaderAndIsrRequest.isKRaftController)) {//省略代码} else {//调用replicaManager.becomeLeaderOrFollower方法处理请求,获取响应val response = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest,RequestHandlerHelper.onLeadershipChange(groupCoordinator, txnCoordinator, _, _))requestHelper.sendResponseExemptThrottle(request, response)}}

经过一些检验后,调用becomeLeaderOrFollower获得响应结果,

二、针对分区副本的Leader和Follower的处理逻辑

def becomeLeaderOrFollower(correlationId: Int,leaderAndIsrRequest: LeaderAndIsrRequest,onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = {//首先记录了方法开始的时间戳val startMs = time.milliseconds()//副本状态改变锁replicaStateChangeLock synchronized {//从leaderAndIsrRequest中获取一些请求信息,包括controller的ID、分区状态等val controllerId = leaderAndIsrRequest.controllerIdval requestPartitionStates = leaderAndIsrRequest.partitionStates.asScalaval response = {//处理过程中,会检查请求的controller epoch是否过时if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {//省略} else {val responseMap = new mutable.HashMap[TopicPartition, Errors]controllerEpoch = leaderAndIsrRequest.controllerEpochval partitions = new mutable.HashSet[Partition]()//要成功Leader分区的集合val partitionsToBeLeader = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()//要成为Follower分区的集合val partitionsToBeFollower = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()val topicIdUpdateFollowerPartitions = new mutable.HashSet[Partition]()//遍历requestPartitionStates,其中包含了来自控制器(controller)的分区状态请求requestPartitionStates.foreach { partitionState =>val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)//对于每个分区状态请求,首先检查分区是否存在,如果不存在,则创建一个新的分区。val partitionOpt = getPartition(topicPartition) match {case HostedPartition.Offline =>stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +s"controller $controllerId with correlation id $correlationId " +s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +"partition is in an offline log directory")responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)Nonecase HostedPartition.Online(partition) =>Some(partition)case HostedPartition.None =>val partition = Partition(topicPartition, time, this)allPartitions.putIfNotExists(topicPartition, HostedPartition.Online(partition))Some(partition)}//接下来,检查分区的主题ID和Leader的epoch(版本号)等信息。partitionOpt.foreach { partition =>val currentLeaderEpoch = partition.getLeaderEpochval requestLeaderEpoch = partitionState.leaderEpochval requestTopicId = topicIdFromRequest(topicPartition.topic)val logTopicId = partition.topicIdif (!hasConsistentTopicId(requestTopicId, logTopicId)) {//如果主题ID不一致,则记录错误并将其添加到响应映射(responseMap)中。stateChangeLogger.error(s"Topic ID in memory: ${logTopicId.get} does not" +s" match the topic ID for partition $topicPartition received: " +s"${requestTopicId.get}.")responseMap.put(topicPartition, Errors.INCONSISTENT_TOPIC_ID)} //如果Leader的epoch大于当前的epoch,则记录控制器(controller)的epoch,并将分区添加到要成为Leader或Follower的集合中。 else if (requestLeaderEpoch > currentLeaderEpoch) {              //分区副本确定是当前broker的,则添加到partitionsToBeLeader或者partitionsToBeFollower//如果分区副本是leader并且broker是当前broker,则加入partitionsToBeLeader//其他的放入到partitionsToBeFollower//这样保证后续操作partitionsToBeLeader或者partitionsToBeFollower只操作当前broker的if (partitionState.replicas.contains(localBrokerId)) {partitions += partitionif (partitionState.leader == localBrokerId) {partitionsToBeLeader.put(partition, partitionState)} else {partitionsToBeFollower.put(partition, partitionState)}} //省略代码.....}//创建高水位线检查点val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)//如果partitionsToBeLeader非空,则调用makeLeaders方法将指定的分区设置为Leader,并返回这些分区的集合,否则返回空集合val partitionsBecomeLeader = if (partitionsToBeLeader.nonEmpty)//这个是处理Leader的逻辑makeLeaders(controllerId, controllerEpoch, partitionsToBeLeader, correlationId, responseMap,highWatermarkCheckpoints, topicIdFromRequest)elseSet.empty[Partition]//如果partitionsToBeFollower非空,则调用makeFollowers方法将指定的分区副本设置为Follower,并返回这些分区的集合,否则返回空集合。val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)//这个是处理Follower的逻辑makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap,highWatermarkCheckpoints, topicIdFromRequest)elseSet.empty[Partition]//根据partitionsBecomeFollower集合获取Follower分区的主题集合,并更新相关指标。val followerTopicSet = partitionsBecomeFollower.map(_.topic).toSetupdateLeaderAndFollowerMetrics(followerTopicSet)//如果topicIdUpdateFollowerPartitions非空,则调用updateTopicIdForFollowers方法更新Follower分区的主题ID。if (topicIdUpdateFollowerPartitions.nonEmpty)updateTopicIdForFollowers(controllerId, controllerEpoch, topicIdUpdateFollowerPartitions, correlationId, topicIdFromRequest)    //启动高水位检查点线程。startHighWatermarkCheckPointThread()//根据参数初始化日志目录获取器maybeAddLogDirFetchers(partitions, highWatermarkCheckpoints, topicIdFromRequest)//关闭空闲的副本获取器线程//todo FetcherThreadsreplicaFetcherManager.shutdownIdleFetcherThreads()replicaAlterLogDirsManager.shutdownIdleFetcherThreads()//省略代码....		}}//省略代码....		}}

因为这篇文章主要写Follower如何拉取数据,所以只需要关注上面代码中的makeFollowers就可以了

1、如果是Follower,则准备创建Fetcher线程,好异步执行向Leader拉取数据

/*1. 从领导者分区集中删除这些分区。2. 将这些 partition 标记为 follower,之后这些 partition 就不会再接收 produce 的请求了3. 停止对这些 partition 的副本同步,这样这些副本就不会再有(来自副本请求线程)的数据进行追加了4.对这些 partition 的 offset 进行 checkpoint,如果日志需要截断就进行截断操作;5.  清空 purgatory 中的 produce 和 fetch 请求6.如果代理未关闭,向这些 partition 的新 leader 启动副本同步线程* 执行这些步骤的顺序可确保转换中的副本在检查点偏移之前不会再接收任何消息,以便保证检查点之前的所有消息都刷新到磁盘*如果此函数中抛出意外错误,它将被传播到 KafkaAPIS,其中将在每个分区上设置错误消息,因为我们不知道是哪个分区导致了它。否则,返回由于此方法而成为追随者的分区集*/private def makeFollowers(controllerId: Int,controllerEpoch: Int,partitionStates: Map[Partition, LeaderAndIsrPartitionState],correlationId: Int,responseMap: mutable.Map[TopicPartition, Errors],highWatermarkCheckpoints: OffsetCheckpoints,topicIds: String => Option[Uuid]) : Set[Partition] = {val traceLoggingEnabled = stateChangeLogger.isTraceEnabled//省略代码。。。。//创建一个可变的Set[Partition]对象partitionsToMakeFollower,用于统计follower的集合val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()try {partitionStates.forKeyValue { (partition, partitionState) =>//遍历partitionStates中的每个分区,根据分区的leader是否可用来改变分区的状态。val newLeaderBrokerId = partitionState.leadertry {if (metadataCache.hasAliveBroker(newLeaderBrokerId)) {//如果分区的leader可用,将分区设置为follower,并将其添加到partitionsToMakeFollower中。// Only change partition state when the leader is availableif (partition.makeFollower(partitionState, highWatermarkCheckpoints, topicIds(partitionState.topicName))) {partitionsToMakeFollower += partition}} else {//省略代码。。} catch {//省略代码。。。}}//删除针对partitionsToMakeFollower中 partition 的副本同步线程replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))stateChangeLogger.info(s"Stopped fetchers as part of become-follower request from controller $controllerId " +s"epoch $controllerEpoch with correlation id $correlationId for ${partitionsToMakeFollower.size} partitions")//对于每个分区,完成延迟的抓取或生产请求。partitionsToMakeFollower.foreach { partition =>completeDelayedFetchOrProduceRequests(partition.topicPartition)}//如果正在关闭服务器,跳过添加抓取器的步骤。if (isShuttingDown.get()) {//省略代码} else {//对于每个分区,获取分区的leader和抓取偏移量,并构建partitionsToMakeFollowerWithLeaderAndOffset映射。val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map { partition =>val leaderNode = partition.leaderReplicaIdOpt.flatMap(leaderId => metadataCache.getAliveBrokerNode(leaderId, config.interBrokerListenerName)).getOrElse(Node.noNode())val leader = new BrokerEndPoint(leaderNode.id(), leaderNode.host(), leaderNode.port())val log = partition.localLogOrExceptionval fetchOffset = initialFetchOffset(log)partition.topicPartition -> InitialFetchState(topicIds(partition.topic), leader, partition.getLeaderEpoch, fetchOffset)}.toMap//添加抓取器以获取partitionsToMakeFollowerWithLeaderAndOffset中的分区。replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)}} catch {//省略代码}//省略代码。。。}

2、遍历分区Follower副本,判断是否有向目标broker现成的Fetcher,如果是则复用,否则创建

之后执行replicaFetcherManager.addFetcherForPartitions把信息添加到指定的Fetcher线程中

 // to be defined in subclass to create a specific fetcherdef createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): T
//主要目的是将分区和偏移量添加到相应的Fetcher线程中def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, InitialFetchState]): Unit = {lock synchronized {//首先对partitionAndOffsets进行分组,按照BrokerAndFetcherId来分组val partitionsPerFetcher = partitionAndOffsets.groupBy { case (topicPartition, brokerAndInitialFetchOffset) =>BrokerAndFetcherId(brokerAndInitialFetchOffset.leader, getFetcherId(topicPartition))}def addAndStartFetcherThread(brokerAndFetcherId: BrokerAndFetcherId,brokerIdAndFetcherId: BrokerIdAndFetcherId): T = {//创建Fetcher线程                           val fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)//把线程放入到fetcherThreadMapfetcherThreadMap.put(brokerIdAndFetcherId, fetcherThread)//线程启动fetcherThread.start()fetcherThread}for ((brokerAndFetcherId, initialFetchOffsets) <- partitionsPerFetcher) {val brokerIdAndFetcherId = BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId)//将启动的FetcherThread线程添加到fetcherThreadMap中val fetcherThread = fetcherThreadMap.get(brokerIdAndFetcherId) match {// //检查是否已经存在一个与当前broker和fetcher id相匹配的Fetcher线程。case Some(currentFetcherThread) if currentFetcherThread.leader.brokerEndPoint() == brokerAndFetcherId.broker =>// reuse the fetcher thread//如果存在,则重用该线程currentFetcherThreadcase Some(f) =>//如果之前有,fetcher线程,则先关闭在创建一个新的Fetcher线程并启动f.shutdown()addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)case None =>//创建一个新的Fetcher线程,并启动addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)}//将分区添加到相应的Fetcher线程中// failed partitions are removed when added partitions to threadaddPartitionsToFetcherThread(fetcherThread, initialFetchOffsets)}}}

上面可能有疑问,为什么有重用fetcher线程?
答案:是broker并不一定会为每一个主题分区的Follower都启动一个 fetcher 线程,对于一个目的 broker,只会启动 num.replica.fetchers 个线程,具体这个 topic-partition 会分配到哪个 fetcher 线程上,是根据 topic 名和 partition id 进行计算得到,实现所示:

  // Visibility for testingprivate[server] def getFetcherId(topicPartition: TopicPartition): Int = {lock synchronized {Utils.abs(31 * topicPartition.topic.hashCode() + topicPartition.partition) % numFetchersPerBroker}}

继续往下,其中createFetcherThread的实现是下面

3、创建Fetcher线程的实现

class ReplicaFetcherManager(brokerConfig: KafkaConfig,protected val replicaManager: ReplicaManager,metrics: Metrics,time: Time,threadNamePrefix: Option[String] = None,quotaManager: ReplicationQuotaManager,metadataVersionSupplier: () => MetadataVersion,brokerEpochSupplier: () => Long)extends AbstractFetcherManager[ReplicaFetcherThread](name = "ReplicaFetcherManager on broker " + brokerConfig.brokerId,clientId = "Replica",numFetchers = brokerConfig.numReplicaFetchers) {override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): ReplicaFetcherThread = {val prefix = threadNamePrefix.map(tp => s"$tp:").getOrElse("")val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"val logContext = new LogContext(s"[ReplicaFetcher replicaId=${brokerConfig.brokerId}, leaderId=${sourceBroker.id}, " +s"fetcherId=$fetcherId] ")val endpoint = new BrokerBlockingSender(sourceBroker, brokerConfig, metrics, time, fetcherId,s"broker-${brokerConfig.brokerId}-fetcher-$fetcherId", logContext)val fetchSessionHandler = new FetchSessionHandler(logContext, sourceBroker.id)val leader = new RemoteLeaderEndPoint(logContext.logPrefix, endpoint, fetchSessionHandler, brokerConfig,replicaManager, quotaManager, metadataVersionSupplier, brokerEpochSupplier)// 创建了一个ReplicaFetcherThread对象,它的构造函数接受多个参数,用于副本的获取和管理。new ReplicaFetcherThread(threadName, leader, brokerConfig, failedPartitions, replicaManager,quotaManager, logContext.logPrefix, metadataVersionSupplier)}def shutdown(): Unit = {info("shutting down")closeAllFetchers()info("shutdown completed")}
}

其中new ReplicaFetcherThread返回一个创建的线程

class ReplicaFetcherThread(name: String,leader: LeaderEndPoint,brokerConfig: KafkaConfig,failedPartitions: FailedPartitions,replicaMgr: ReplicaManager,quota: ReplicaQuota,logPrefix: String,metadataVersionSupplier: () => MetadataVersion)extends AbstractFetcherThread(name = name,clientId = name,leader = leader,failedPartitions,fetchTierStateMachine = new ReplicaFetcherTierStateMachine(leader, replicaMgr),fetchBackOffMs = brokerConfig.replicaFetchBackoffMs,isInterruptible = false,replicaMgr.brokerTopicStats) {override def doWork(): Unit = {super.doWork()completeDelayedFetchRequests()}}

ReplicaFetcherThread继承的AbstractFetcherThread类,AbstractFetcherThread又继承自ShutdownableThread类,其中ShutdownableThread中的run方法是线程的执行函数

abstract class AbstractFetcherThread(name: String,clientId: String,val leader: LeaderEndPoint,failedPartitions: FailedPartitions,val fetchTierStateMachine: TierStateMachine,fetchBackOffMs: Int = 0,isInterruptible: Boolean = true,val brokerTopicStats: BrokerTopicStats) //BrokerTopicStats's lifecycle managed by ReplicaManagerextends ShutdownableThread(name, isInterruptible) with Logging {override def doWork(): Unit = {maybeTruncate()maybeFetch()}
}
public abstract class ShutdownableThread extends Thread {//省略代码public abstract void doWork();public void run() {isStarted = true;log.info("Starting");try {while (isRunning())doWork();} catch (FatalExitError e) {shutdownInitiated.countDown();shutdownComplete.countDown();log.info("Stopped");Exit.exit(e.statusCode());} catch (Throwable e) {if (isRunning())log.error("Error due to", e);} finally {shutdownComplete.countDown();}log.info("Stopped");}
}    

ShutdownableThread中的run函数调用子类的doWork()
而doWork中的执行顺序如下

//是否截断maybeTruncate()//抓取maybeFetch()//处理延时抓取请求completeDelayedFetchRequests()

其中 maybeFetch(),就是正常拼接fetch请求,并向目标broker发送请求,调用brokercase ApiKeys.FETCH => handleFetchRequest(request)

private def maybeFetch(): Unit = {//分区映射锁val fetchRequestOpt = inLock(partitionMapLock) {val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = leader.buildFetch(partitionStates.partitionStateMap.asScala)handlePartitionsWithErrors(partitionsWithError, "maybeFetch")if (fetchRequestOpt.isEmpty) {trace(s"There are no active partitions. Back off for $fetchBackOffMs ms before sending a fetch request")partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)}fetchRequestOpt}fetchRequestOpt.foreach { case ReplicaFetch(sessionPartitions, fetchRequest) =>processFetchRequest(sessionPartitions, fetchRequest)}}

相关文章:

kafka 3.5 主题分区的Follower创建Fetcher线程从Leader拉取数据源码

Kakfa集群有主题&#xff0c;每一个主题下又有很多分区&#xff0c;为了保证防止丢失数据&#xff0c;在分区下分Leader副本和Follower副本&#xff0c;而kafka的某个分区的Leader和Follower数据如何同步呢&#xff1f;下面就是讲解的这个 首先要知道&#xff0c;Follower的数据…...

Golang web 项目中实现自定义 recovery 中间件

为什么需要实现自定义 recovery 中间件&#xff1f; 在 Golang 的 Web 项目中&#xff0c;自定义 recovery 中间件是一种常见的做法&#xff0c;用于捕获并处理应用程序的运行时错误&#xff0c;以避免整个应用程序崩溃并返回对应格式的响应数据。 很多三方 web 框架&#xf…...

Direct3D绘制旋转立方体例程

初始化文件见Direct3D的初始化_direct3dcreate9_寂寂寂寂寂蝶丶的博客-CSDN博客 D3DPractice.cpp #include <windows.h> #include "d3dUtility.h" #include <d3dx9math.h>IDirect3DDevice9* Device NULL; IDirect3DVertexBuffer9* VB NULL; IDirect3…...

ElementUI浅尝辄止31:Tabs 标签页

选项卡组件&#xff1a;分隔内容上有关联但属于不同类别的数据集合。 常见于网站内容信息分类或app内容信息tab分类 1.如何使用&#xff1f; Tabs 组件提供了选项卡功能&#xff0c;默认选中第一个标签页&#xff0c;你也可以通过 value 属性来指定当前选中的标签页。 <temp…...

将 ChatGPT 用于数据科学项目的指南

推荐&#xff1a;使用 NSDT场景编辑器 快速搭建3D应用场景 我们都知道 ChatGPT 的受欢迎程度以及人们如何使用它来提高生产力。但是&#xff0c;如果您是新手&#xff0c;则值得注册ChatGPT免费演示并尝试它所能做的一切。您还应该参加我们的 ChatGPT 简介课程&#xff0c;学习…...

06-JVM对象内存回收机制深度剖析

上一篇&#xff1a;05-JVM内存分配机制深度剖析 堆中几乎放着所有的对象实例&#xff0c;对堆垃圾回收前的第一步就是要判断哪些对象已经死亡&#xff08;即不能再被任何途径使用的对象&#xff09;。 1.引用计数法 给对象中添加一个引用计数器&#xff0c;每当有一个地方引…...

[VSCode] 替换掉/去掉空行

VSCode中使用快捷键CtrlH&#xff0c;出现替换功能&#xff0c;在上面的“查找”框中输入正则表达式&#xff1a; ^\s*(?\r?$)\n然后选择右侧的“使用正则表达式”&#xff1b;“替换”框内为空&#xff0c;点击右侧的“全部替换”&#xff0c;即可去除所有空行。 参考 [VS…...

时序分解 | MATLAB实现ICEEMDAN+SE改进的自适应经验模态分解+样本熵重构分量

时序分解 | MATLAB实现ICEEMDANSE改进的自适应经验模态分解样本熵重构分量 目录 时序分解 | MATLAB实现ICEEMDANSE改进的自适应经验模态分解样本熵重构分量效果一览基本介绍程序设计参考资料 效果一览 基本介绍 ICEEMDANSE改进的自适应经验模态分解样本熵重构分量 包括频谱图 避…...

python内网环境安装第三方包【内网搭建开发环境】

文章目录 一、问题二、解决方法三、代码实现一、问题 内网安装第三方包的应用场景,一般是一些需要在没网的环境下进行开发的情况。这些环境一般仅支持本地局域网访问,所以只能在不下载任何第三方包的情况下艰难开发。 二、解决方法 将当前应用依赖的第三方包提前下载到本地…...

7.13 在SpringBoot中 正确使用Validation实现参数效验

文章目录 前言引入Maven依赖一、POST/PUT RequestBody参数校验1.1 Valid或Validated注解配合constraints注解1.2 测试运行 二、GET/DELETE RequestParam参数校验2.1 Validated注解配合constraints注解2.2 测试运行 三、GET 无注解参数校验3.1 Valid或Validated注解配合constrai…...

Matlab图像处理之Lee滤波器

目录 一、前言:二、LEE滤波器2.1 LEE滤波器原理2.2 LEE滤波器实现步骤三、MATLAB代码示例一、前言: LEE滤波器是一种常用于合成孔径雷达(SAR)图像去噪的滤波器。它能增强图像的局部对比度。今天我们将通过MATLAB来实现这种滤波器。 二、LEE滤波器 2.1 LEE滤波器原理 LEE滤…...

C++系列-const修饰的常函数

const修饰的常函数 常函数常对象 常函数 成员函数后加const&#xff0c;称为常函数。常函数内部不可以修改成员变量。常函数内可以改变加了mutable修饰的成员变量。 code:#include <iostream>using namespace std;class Horse{public:int age 3;mutable string color …...

fail-safe 机制与 fail-fast 机制

Fail-fast 表示快速失败&#xff0c;在集合遍历过程中&#xff0c;一旦发现容器中的数据被修改了&#xff0c;会立刻抛出 ConcurrentModificationException 异常&#xff0c;从而导致遍历失败&#xff0c;像这种情况 定义一个 Map 集合&#xff0c;使用 Iterator 迭代器进行数据…...

LLM 位置编码及外推

RoPE https://zhuanlan.zhihu.com/p/629681325 PI 位置插值&#xff08;POSITION INTERPOLATION&#xff09;显著改善RoPE的外推能力。你只需要对PT&#xff08;pretraining)模型fine-turing最多1000步就能实现。PI是通过将线性的缩小了输入位置的索引使其匹配原始上下文窗口…...

第3章_瑞萨MCU零基础入门系列教程之开发环境搭建与体验

本教程基于韦东山百问网出的 DShanMCU-RA6M5开发板 进行编写&#xff0c;需要的同学可以在这里获取&#xff1a; https://item.taobao.com/item.htm?id728461040949 配套资料获取&#xff1a;https://renesas-docs.100ask.net 瑞萨MCU零基础入门系列教程汇总&#xff1a; ht…...

AI在医疗保健领域:突破界限,救治生命

文章目录 AI在医学影像分析中的应用AI在疾病预测和早期诊断中的作用个性化治疗和药物研发医疗数据管理和隐私保护未来展望 &#x1f389;欢迎来到AIGC人工智能专栏~AI在医疗保健领域&#xff1a;突破界限&#xff0c;救治生命 ☆* o(≧▽≦)o *☆嗨~我是IT陈寒&#x1f379;✨博…...

centos7安装kubernets集群

一、准备工作 准备三台虚拟机&#xff0c;centos7系统 二、系统配置 1. 修改主机名 # 三台机器都需要执行 hostnamectl set-hostname k8s-master hostnamectl set-hostname k8s-node1 hostnamectl set-hostname k8s-node22. 修改hosts文件 # 三台机器都需要执行 [rootk8s-…...

【多线程】线程安全与线程同步

线程安全与线程同步 1.什么是线程安全问题&#xff1f; 多个线程同时操作同一个共享资源的时候&#xff0c;可能会出现业务安全问题 取钱的线程安全问题场景&#xff1a; 两个人他们有一个共同的账户&#xff0c;余额是10万元&#xff0c;如果两个人同时来取钱&#xff0c;…...

指针权限,new与delete,类与对象,函数模板,类模板的用法

指针权限 用法 void Print(const char* SecretPointer) {cout << "绝密指令为&#xff1a;";cout << SecretPointer << endl; }void Change(int& number, int* const FixedPointer) {cout << "更换站台数字为&#xff1a;";c…...

Unity——脚本与序列化

在介绍序列化之前&#xff0c;我们先来了解一下为什么要对数据进行序列化 数据序列化有以下几个主要的应用场景和目的&#xff1a; 1. 持久化存储&#xff1a;序列化可以将对象或数据结构转换为字节序列&#xff0c;使得其可以被存储在磁盘上或数据库中。通过序列化&#xff…...

【JVM】- 内存结构

引言 JVM&#xff1a;Java Virtual Machine 定义&#xff1a;Java虚拟机&#xff0c;Java二进制字节码的运行环境好处&#xff1a; 一次编写&#xff0c;到处运行自动内存管理&#xff0c;垃圾回收的功能数组下标越界检查&#xff08;会抛异常&#xff0c;不会覆盖到其他代码…...

剑指offer20_链表中环的入口节点

链表中环的入口节点 给定一个链表&#xff0c;若其中包含环&#xff0c;则输出环的入口节点。 若其中不包含环&#xff0c;则输出null。 数据范围 节点 val 值取值范围 [ 1 , 1000 ] [1,1000] [1,1000]。 节点 val 值各不相同。 链表长度 [ 0 , 500 ] [0,500] [0,500]。 …...

Python爬虫(二):爬虫完整流程

爬虫完整流程详解&#xff08;7大核心步骤实战技巧&#xff09; 一、爬虫完整工作流程 以下是爬虫开发的完整流程&#xff0c;我将结合具体技术点和实战经验展开说明&#xff1a; 1. 目标分析与前期准备 网站技术分析&#xff1a; 使用浏览器开发者工具&#xff08;F12&…...

DIY|Mac 搭建 ESP-IDF 开发环境及编译小智 AI

前一阵子在百度 AI 开发者大会上&#xff0c;看到基于小智 AI DIY 玩具的演示&#xff0c;感觉有点意思&#xff0c;想着自己也来试试。 如果只是想烧录现成的固件&#xff0c;乐鑫官方除了提供了 Windows 版本的 Flash 下载工具 之外&#xff0c;还提供了基于网页版的 ESP LA…...

ETLCloud可能遇到的问题有哪些?常见坑位解析

数据集成平台ETLCloud&#xff0c;主要用于支持数据的抽取&#xff08;Extract&#xff09;、转换&#xff08;Transform&#xff09;和加载&#xff08;Load&#xff09;过程。提供了一个简洁直观的界面&#xff0c;以便用户可以在不同的数据源之间轻松地进行数据迁移和转换。…...

JUC笔记(上)-复习 涉及死锁 volatile synchronized CAS 原子操作

一、上下文切换 即使单核CPU也可以进行多线程执行代码&#xff0c;CPU会给每个线程分配CPU时间片来实现这个机制。时间片非常短&#xff0c;所以CPU会不断地切换线程执行&#xff0c;从而让我们感觉多个线程是同时执行的。时间片一般是十几毫秒(ms)。通过时间片分配算法执行。…...

Element Plus 表单(el-form)中关于正整数输入的校验规则

目录 1 单个正整数输入1.1 模板1.2 校验规则 2 两个正整数输入&#xff08;联动&#xff09;2.1 模板2.2 校验规则2.3 CSS 1 单个正整数输入 1.1 模板 <el-formref"formRef":model"formData":rules"formRules"label-width"150px"…...

STM32HAL库USART源代码解析及应用

STM32HAL库USART源代码解析 前言STM32CubeIDE配置串口USART和UART的选择使用模式参数设置GPIO配置DMA配置中断配置硬件流控制使能生成代码解析和使用方法串口初始化__UART_HandleTypeDef结构体浅析HAL库代码实际使用方法使用轮询方式发送使用轮询方式接收使用中断方式发送使用中…...

Linux nano命令的基本使用

参考资料 GNU nanoを使いこなすnano基础 目录 一. 简介二. 文件打开2.1 普通方式打开文件2.2 只读方式打开文件 三. 文件查看3.1 打开文件时&#xff0c;显示行号3.2 翻页查看 四. 文件编辑4.1 Ctrl K 复制 和 Ctrl U 粘贴4.2 Alt/Esc U 撤回 五. 文件保存与退出5.1 Ctrl …...

深入浅出Diffusion模型:从原理到实践的全方位教程

I. 引言&#xff1a;生成式AI的黎明 – Diffusion模型是什么&#xff1f; 近年来&#xff0c;生成式人工智能&#xff08;Generative AI&#xff09;领域取得了爆炸性的进展&#xff0c;模型能够根据简单的文本提示创作出逼真的图像、连贯的文本&#xff0c;乃至更多令人惊叹的…...