当前位置: 首页 > news >正文

定时/延时任务-Kafka时间轮源码分析

文章目录

  • 1. 概要
  • 2. TimingWheel
    • 2.1 核心参数
    • 2.2 添加任务
    • 2.3 推进时间
  • 3. TimerTaskList
    • 3.1 添加节点
    • 3.2 删除节点
    • 3.3 刷新链表
    • 3.4 队列相关
  • 4. 时间轮链表节点-TimerTaskEntry
  • 5. TimerTask
  • 6. Timer 和 SystemTimer - 设计降级逻辑
  • 7. 上层调用
  • 8. 小结

1. 概要

时间轮的文章:

  • 定时/延时任务-Netty时间轮的使用
  • 定时/延时任务-时间轮
  • 定时/延时任务-实现一个简单时间轮
  • 定时/延时任务-实现一个分层时间轮
  • 定时/延时任务-Netty时间轮源码分析

上一篇文章中介绍了 Netty 时间轮的源码分析,这篇文章就接着来看下 Kafka 的源码分析,由于 Kafka 是使用的 Scala 语言,所以可能会有点难分析,不过还是会尽量说清楚的。

2. TimingWheel

2.1 核心参数

Kafka的时间轮实现比较简单,主要核心参数就在 TimingWheel 里面,那么下面就先看下核心参数:

@nonthreadsafe
private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {...
}

首先这里就是 TimingWheel 的类定义,同时构造函数参数列表:

  • tickMs: Long:表示时间轮的每个时间刻度(tick)的毫秒数
  • wheelSize: Int:表示时间轮的大小(即有多少个时间刻度)
  • startMs: Long:表示时间轮的起始时间(以毫秒为单位)
  • taskCounter: AtomicInteger:用来计数的原子整数,表示任务的数量
  • queue: DelayQueue[TimerTaskList]:延迟队列,用于存储时间轮的任务列表

上面几个就是时间轮构造参数,下面就是时间轮的几个核心参数的构造:

private[this] val interval = tickMs * wheelSize
private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }
private[this] var currentTime = startMs - (startMs % tickMs)
private[this] var overflowWheel: TimingWheel = null
  1. 首先是 interval,这个就是我们之前说的一层时间轮的时间间隔,在分层时间轮下,当前层的时间轮间隔就是当前时间轮的格子数(wheelSize) * 每一格的时间间隔(tickMs )
    在这里插入图片描述
  2. buckets 就是当前时间轮中的时间格子数组,从代码中也能看到,其实这里做的就是创建一个 wheelSize 长度的数组,然后分别初始化
  3. currentTime 就是当前时间,startMs 就是时间轮的启动时间,假设启动时间是 43ms,一个 tick 的时间是 20ms,那么当前时间就是 startMs - (startMs % tickMs),结果就是 43 - 43 % 20 = 40,currentTime 就是控制指针跳动的时间
  4. overflowWheel 就是上层时间轮,上层时间轮的时间间隔 tickMs 就是本层时间轮的 interval,看上面的图就可以看懂。

2.2 添加任务

def add(timerTaskEntry: TimerTaskEntry): Boolean = {// 任务的过期时间val expiration = timerTaskEntry.expirationMs // 任务取消了if (timerTaskEntry.cancelled) {false} else if (expiration < currentTime + tickMs) {// 如果任务过期时间已经小于当前时间格子的时间,就说明要执行了false} else if (expiration < currentTime + interval) {// 如果过期时间 < currentTime + 本层时间轮的时间间隔,就说明// 任务可以添加到本层时间轮中val virtualId = expiration / tickMs// 获取对应下标的时间格子val bucket = buckets((virtualId % wheelSize.toLong).toInt)// 添加任务bucket.add(timerTaskEntry)// 然后设置这个格子的过期时间添加到任务队列里面if (bucket.setExpiration(virtualId * tickMs)) {queue.offer(bucket)}true} else {// 都不满足,那就说明过期时间已经超过本层时间轮的管理范围了,需要// 到上层时间轮去加入任务if (overflowWheel == null) addOverflowWheel()overflowWheel.add(timerTaskEntry)}
}

其实里面的逻辑很简单

  1. 首先判断下如果任务已经取消了,就直接返回 false,添加失败
  2. 判断下如果添加的任务的过期时间已经小于当前时间 + 一格时间时长,那么表示这个任务已经过期了,需要执行
  3. 如果添加的任务小于 当前时间 + 本层时间轮的总时间,那么这个任务还没有执行,并且这个任务可以添加到本层时间轮中
  4. 否则就是说本层时间轮没办法管理这个任务,需要把这个任务添加到上层时间轮中

如果上层时间轮为空,那么同时也会创建出上层时间轮

private[this] def addOverflowWheel(): Unit = {// 加锁synchronized {if (overflowWheel == null) {// 创建上层时间轮overflowWheel = new TimingWheel(tickMs = interval,wheelSize = wheelSize,startMs = currentTime,taskCounter = taskCounter,queue)}}}

对当前线程加锁,然后创建上层时间轮,注意上层时间轮的启动时间是当前时间 currentTime,同时上层时间轮的 tickMs 时间间隔是本层时间轮的时间跨度 interval。注意所有时间轮使用一个延时队列。


2.3 推进时间

def advanceClock(timeMs: Long): Unit = {if (timeMs >= currentTime + tickMs) {// 设置当前时间currentTime = timeMs - (timeMs % tickMs)// 同时也推进上层时间轮if (overflowWheel != null) overflowWheel.advanceClock(currentTime)}
}

设置当前时间,其实所谓的推进时间就是推进当前时间,在上面也说过了,当前时间的计算就是:timeMs - (timeMs % tickMs),同时除了推进当前时间轮,还需要推进上层时间轮的时间指针。


3. TimerTaskList

下面就来看下时间轮上面的链表定义,还是一样,我们先看里面的参数定义,因为链表其实参数并不多,所以不需要一个一个拿出来介绍

private[this] val root = new TimerTaskEntry(null, -1)
root.next = root
root.prev = rootprivate[this] val expiration = new AtomicLong(-1L)def setExpiration(expirationMs: Long): Boolean = {expiration.getAndSet(expirationMs) != expirationMs}

上面几个就是参数了,链表肯定要有头尾节点了,不过 kafka 这里是用了一个 root 节点同时作为头尾节点,只有一个节点的时候就指向自己。
同时因为时间轮存放到任务队列里面是以 TimerTaskList 为单位去存储的,为什么会这样呢?前面的文章里面说过,一个链表上面的任务延时等级是一样的,所以没必要以任务节点为单位去存储,这样如果一个链表上面有 100000 个任务,延时队列里面就得放 100000 个节点,我们直到延时队列时间复杂度是 O(logn),节点一旦比较多,消耗的时间就多了。而且这 10000 个节点的过期时间是一样的,所以用一个 TimerTaskList 来代替就行了。所以延时队列的节点就是一个个的 TimerTaskList。然后再看下下面的方法。

3.1 添加节点

其实添加节点就是双向链表的添加逻辑。

def add(timerTaskEntry: TimerTaskEntry): Unit = {var done = falsewhile (!done) {// 先删除这个任务timerTaskEntry.remove()synchronized {// 加锁timerTaskEntry.synchronized {if (timerTaskEntry.list == null) {// 链表结构 tail -> timerTaskEntry -> root// tail = rootval tail = root.prevtimerTaskEntry.next = roottimerTaskEntry.prev = tailtimerTaskEntry.list = thistail.next = timerTaskEntryroot.prev = timerTaskEntrytaskCounter.incrementAndGet()done = true}}}}
}

在添加任务到链表的时候首先会去删除一下这个任务,确保这个任务没有在先前被添加到时间轮中,然后加锁去添加。添加的时候其实就是形成 tail -> timerTaskEntry -> root 的结构(双向的),因为 tail = root,所以就是一个环形的双向链表。

3.2 删除节点

def remove(timerTaskEntry: TimerTaskEntry): Unit = {synchronized {timerTaskEntry.synchronized {if (timerTaskEntry.list eq this) {timerTaskEntry.next.prev = timerTaskEntry.prevtimerTaskEntry.prev.next = timerTaskEntry.nexttimerTaskEntry.next = nulltimerTaskEntry.prev = nulltimerTaskEntry.list = nulltaskCounter.decrementAndGet()}}}
}

删除节点的逻辑也很简单,就是两步:

  1. timerTaskEntry.next.prev = timerTaskEntry.prev
  2. timerTaskEntry.prev.next = timerTaskEntry.next

最后再把当前 timerTaskEntry 的属性都置空,然后让任务数量 - 1 就可以了。

3.3 刷新链表

所谓刷新链表,就是把这个链表上面的所有任务都删掉,然后执行传入的函数,这个方法是当链表过期的时候,就把上面的所有任务都删掉,然后一个一个任务执行具体逻辑。

// f 类似 Java8 里面的 function,其实这里就是传入一个 f 函数去处理节点
def flush(f: (TimerTaskEntry)=>Unit): Unit = {synchronized {// 从头结点开始遍历var head = root.nextwhile (head ne root) {// 调用上面的删除节点方法把节点从链表中移除掉remove(head)// 调用函数把任务添加到线程池中等待调度f(head)// 继续下一个节点head = root.next}// 链表都没有任务了,当然过期时间就设置成 -1 了expiration.set(-1L)}
}

其实里面的 f 函数的逻辑就是把这个任务节点丢到线程池中等待线程去调度,也就是具体执行任务。

3.4 队列相关

那既然 TimerTaskList 是要加入延时队列的,肯定要有一个获取延时和比较的方法了

def getDelay(unit: TimeUnit): Long = {unit.convert(max(getExpiration - Time.SYSTEM.hiResClockMs, 0), TimeUnit.MILLISECONDS)
}def compareTo(d: Delayed): Int = {val other = d.asInstanceOf[TimerTaskList]java.lang.Long.compare(getExpiration, other.getExpiration)
}
  • getDelay 就是获取延时,这里其实就是用任务的 过期时间 - 当前时间,如果小于 0,最后就会返回 0,表示可以马上开始执行了
  • compareTo 就是任务队列里面比较两个任务的延时时间

4. 时间轮链表节点-TimerTaskEntry

private[timer] class TimerTaskEntry(val timerTask: TimerTask, val expirationMs: Long) extends Ordered[TimerTaskEntry] {...
}

这里就是链表节点了,两个参数分别是任务和延时时间,因为内容确实不多,所以下面直接给出所有的逻辑。

private[timer] class TimerTaskEntry(val timerTask: TimerTask, val expirationMs: Long) extends Ordered[TimerTaskEntry] {// 所属的链表@volatilevar list: TimerTaskList = null// 链表前后节点var next: TimerTaskEntry = nullvar prev: TimerTaskEntry = null// 设置定时任务if (timerTask != null) timerTask.setTimerTaskEntry(this)// 任务是否取消了,如果任务取消了就会从当前节点中删掉,所以下面就是// 判断下这个任务的所属的链表节点还是不是自己def cancelled: Boolean = {timerTask.getTimerTaskEntry != this}// 把当前节点从链表中删除掉def remove(): Unit = {var currentList = listwhile (currentList != null) {currentList.remove(this)currentList = list}}// 比较两个任务的延时时间override def compare(that: TimerTaskEntry): Int = {java.lang.Long.compare(expirationMs, that.expirationMs)}
}

说实话上面的逻辑确实难看懂,没学过 scala 语言的话,我也只是大概大概翻译下。核心逻辑能看懂就行了。

5. TimerTask

trait TimerTask extends Runnable {...
}

在 Scala 语言中,trait TimerTask extends Runnable 是一个特质(trait)声明,表示该特质继承自 Runnable 接口。

  • trait 是 Scala 中的一个特性,类似于 Java 中的接口(interface)。
  • 特质可以包含抽象方法和具体方法,也可以有字段和实现。
  • 与 Java 接口不同,Scala 特质可以混合(mixin)到类中,实现多重继承的效果。

来看下里面的一些参数:

val delayMs: Long // timestamp in millisecond
private[this] var timerTaskEntry: TimerTaskEntry = null

首先就是任务的延时时间 delayMs,然后就是这个任务属于哪一个链表节点。再来看下面的几个方法:

// 任务取消
def cancel(): Unit = {synchronized {// 就是把任务从链表节点中移除掉if (timerTaskEntry != null) timerTaskEntry.remove()timerTaskEntry = null}
}// 设置任务到链表节点上
private[timer] def setTimerTaskEntry(entry: TimerTaskEntry): Unit = {synchronized {// 如果任务所属链表节点不等于要设置的节点,说明这个任务有可能原来在另一条// 链表上,现在要加入当前的链表,所以需要首先把任务节点从原来的链表上移除// 因为任务都不在原来的链表上了,节点肯定也带删掉if (timerTaskEntry != null && timerTaskEntry != entry)// 就把当前节点从链表上移除掉timerTaskEntry.remove()// 然后设置所属节点为传入的节点timerTaskEntry = entry}
}// 获取当前任务所属的链表节点
private[timer] def getTimerTaskEntry: TimerTaskEntry = timerTaskEntry

其实里面的方法并不多,下面简单来说下:

  1. 任务取消就是把链表节点从链表中移除掉,同时把当前任务所属的链表节点置空,逻辑不复杂,因为任务都要删除了,链表节点肯定不能继续待在链表中的
  2. 设置任务到新的链表节点,这里面如果发现这个任务原来已经设置过了,现在要设置到一个新的链表上,就需要先把当前节点从链表上移除掉,然后再重新设置新的节点

6. Timer 和 SystemTimer - 设计降级逻辑

Timer 是 Scala 定义的一个接口,包括几种时间轮的方法,下面就来简单看下:

trait Timer {/*** Add a new task to this executor. It will be executed after the task's delay* (beginning from the time of submission)* @param timerTask the task to add*/def add(timerTask: TimerTask): Unit/*** Advance the internal clock, executing any tasks whose expiration has been* reached within the duration of the passed timeout.* @param timeoutMs* @return whether or not any tasks were executed*/def advanceClock(timeoutMs: Long): Boolean/*** Get the number of tasks pending execution* @return the number of tasks*/def size: Int/*** Shutdown the timer service, leaving pending tasks unexecuted*/def shutdown(): Unit
}

可以看到 Timer 接口里面定义的四个方法分别就是:添加、推进时间轮、时间轮任务数、关闭时间轮,那下面就来看下 Timer 的实现类 SystemTimer,SystemTimer 也是时间轮的顶层管理类

@threadsafe
class SystemTimer(executorName: String,tickMs: Long = 1,wheelSize: Int = 20,startMs: Long = Time.SYSTEM.hiResClockMs) extends Timer {...
}

可以看到,SystemTimer 接收几个参数

  1. executorName:执行任务的线程名称
  2. tickMs:默认的最底层时间轮的时间间隔
  3. wheelSize:每一层时间轮的大小
  4. startMs:启动时间,就是当前时间

下面来看下几个变量:

// 执行任务的线程池
private[this] val taskExecutor = Executors.newFixedThreadPool(1,(runnable: Runnable) => KafkaThread.nonDaemon("executor-" + executorName, runnable))// 延时队列
private[this] val delayQueue = new DelayQueue[TimerTaskList]()
// 任务数量
private[this] val taskCounter = new AtomicInteger(0)
// 时间轮
private[this] val timingWheel = new TimingWheel(tickMs = tickMs,wheelSize = wheelSize,startMs = startMs,taskCounter = taskCounter,delayQueue
)// 读写锁
private[this] val readWriteLock = new ReentrantReadWriteLock()
private[this] val readLock = readWriteLock.readLock()
private[this] val writeLock = readWriteLock.writeLock()

下面看下几个方法,首先就是添加任务的方法

def add(timerTask: TimerTask): Unit = {// 加锁readLock.lock()try {// 添加任务,创建一个链表节点,把任务放到链表节点中// 再调用 addTimerTaskEntry 把链表节点添加到链表上addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs))} finally {// 解锁readLock.unlock()}}private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {// 调用时间轮TimingWheel 的add方法添加if (!timingWheel.add(timerTaskEntry)) {// 添加失败的情况下要么就是过期了,要么就是取消了if (!timerTaskEntry.cancelled)// 如果不是取消了,那么就要执行这个过期任务taskExecutor.submit(timerTaskEntry.timerTask)}}

在添加任务任务的时候,如果任务已经过期了或者任务被取消了,那么就会判断,如果不是任务取消,就会把任务丢到线程池里面去执行。上面就是添加的方法,下面再看下推进时间轮的方法。

def advanceClock(timeoutMs: Long): Boolean = {
// 从延时队列里面获取过期链表,超时时间 timeoutMs
var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)// 如果不为空if (bucket != null) {writeLock.lock()try {while (bucket != null) {// 推进时间轮timingWheel.advanceClock(bucket.getExpiration)// 然后执行过期链表下面的所有任务bucket.flush(reinsert)// 继续阻塞bucket = delayQueue.poll()}} finally {// 解锁writeLock.unlock()}true} else {false}
}// 把链表节点重新添加回时间轮上
private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => 
addTimerTaskEntry(timerTaskEntry)private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {// 重新添加回时间轮,如果添加失败再执行任务if (!timingWheel.add(timerTaskEntry)) {if (!timerTaskEntry.cancelled)taskExecutor.submit(timerTaskEntry.timerTask)}}

首先推进时间轮的时候会从延时队列里面获取过期的链表,第一次获取超时时间是 timeoutMs,这个是上层调用者设置的。这里如果没有获取到任务,就不会往下走推进时间轮,其实这里就是解决了 Netty 时间轮的空轮转问题
Netty 的时间轮是不断执行的,不管有没有任务过期都会去遍历当前 tick 的链表下面的所有任务,同时推进时间轮,看看有没有任务需要执行,所以如果 Netty 时间轮中有一个很长时间都不会执行的任务,在遍历的时候就做了很多 “无用功”
Kafka 则是通过延时队列的方式,没有任务就不会去遍历推进时间轮,有了任务才会去处理。所以这也算是一种精确唤醒执行了。
推进时间轮的方法已经说了,当推进时间轮之后,就回去调用 bucket.flush(reinsert) 方法,前面我们说过 flush 会传入一个 f 函数用来执行过期链表上面的任务, 这个函数就是 reinsert
那么问题来了,不是说执行过期任务吗,为什么是重新把任务添加回时间轮上?

  • 这就不得不说下分层时间轮的降级逻辑了,分层时间轮中上层时间轮的任务只有降级到最底层时间轮才能被执行。
  • 比如现在时间轮的层级是 1 -> 2 -> 3,那么 3 号时间轮上面的任务要降级到 1 才能被执行。
  • 那么如何才能降级呢?我们直到时间轮是不断被推进的,也就是 currentTime 是不断增大的,所以当链表节点重新添加回时间轮的时候,原本应该添加到 3 号时间轮的节点会添加到 2 号,同理 2 号的节点会添加到 1 号,还是不清楚的可以去看下概要里面的时间轮介绍。

最后刷新链表完成之后,继续阻塞在任务队列里面,不过这里阻塞就没有超时时间了,因为可以避免无意义的唤醒,防止空轮转,直到有任务才醒来。如果说时间轮添加了一个更快执行的任务,那么在添加方法里面就会往 delay 队列添加一个更早过期的节点,这里 SystemTimer 也会被更快唤醒。

7. 上层调用

上面就是时间轮的核心实现了,那么你可能会好奇,时间轮在哪被调用了,其实就是在 DelayedOperation.scala 里面执行。我们看下这个方法里面的 advanceClock

def advanceClock(timeoutMs: Long): Unit = {timeoutTimer.advanceClock(timeoutMs)if (estimatedTotalOperations.get - numDelayed > purgeInterval) {estimatedTotalOperations.getAndSet(numDelayed)debug("Begin purging watch lists")val purged = watcherLists.foldLeft(0) {case (sum, watcherList) => sum + watcherList.allWatchers.map(_.purgeCompleted()).sum}debug("Purged %d elements from watch lists.".format(purged))}
}

timeoutTimer.advanceClock(timeoutMs) 推进时间轮, 下面的逻辑先就不用细看了。那么这个 advanceClock 方法又是在哪被调用了呢?

private class ExpiredOperationReaper extends ShutdownableThread("ExpirationReaper-%d-%s".format(brokerId, purgatoryName),false) {override def doWork(): Unit = {advanceClock(200L)}}

这个方法会每隔 200ms 推动一次时间轮,从而推动延时任务的执行。

8. 小结

好了,到这里分层时间轮 Kafka 的源码就写好了,下面还会介绍下 RocketMQ 的延时任务源码的逻辑,不过在这之前我会简单说下 SpringBoot 的定时延时任务,毕竟 Java 的框架里面肯定少不了 SpringBoot 的身影。至于 Dubbo,看了下里面的时间轮源码,跟 Netty 的基本一模一样, Netty 在上一篇文章里面也介绍过了,所以后面就不再介绍。





如有错误,欢迎提出!!!

相关文章:

定时/延时任务-Kafka时间轮源码分析

文章目录 1. 概要2. TimingWheel2.1 核心参数2.2 添加任务2.3 推进时间 3. TimerTaskList3.1 添加节点3.2 删除节点3.3 刷新链表3.4 队列相关 4. 时间轮链表节点-TimerTaskEntry5. TimerTask6. Timer 和 SystemTimer - 设计降级逻辑7. 上层调用8. 小结 1. 概要 时间轮的文章&a…...

如何用状态图进行设计05

到目前为止&#xff0c;我们已经讨论了状态图的原理。这些原理对状态图和扩展状态图都适用。第二章后面的部分主要讲述了扩展状态图的扩展功能。我们将围绕这些增强的功能&#xff0c;使你对BetterState Pro的设计能力有很好的了解。 关于这些内容和其他有关扩展状态图特性的完…...

【计算机视觉】边缘检测

图像的边缘简单来说就是图像中灰度不连续的地方。 1.图像梯度 图像梯度是指图像像素灰度值在某个方向上的变化&#xff1b;图像梯度是图像的一阶导数&#xff0c;实际计算时可以使用差分来近似。 1.1 什么是图像梯度&#xff1f; 图像梯度是一种数学工具&#xff0c;用于描…...

林曦词典|无聊

“林曦词典”是在水墨画家林曦的课堂与访谈里&#xff0c;频频邂逅的话语&#xff0c;总能生发出无尽的思考。那些悠然轻快的、微妙纷繁的&#xff0c;亦或耳熟能详的词&#xff0c;经由林曦老师的独到解析&#xff0c;意蕴无穷&#xff0c;让人受益。于是&#xff0c;我们将诸…...

LabVIEW光栅衍射虚拟仿真系统

随着现代教育技术的快速发展&#xff0c;虚拟仿真实验平台逐渐成为物理实验教学的重要辅助工具。基于LabVIEW的平面透射光栅虚拟仿真系统帮助学生更好地理解和分析光栅衍射现象&#xff0c;提高教学质量和学生的学习兴趣。 项目背景 在波动光学的教学中&#xff0c;光栅衍射实…...

【NumPy进阶】:内存视图、性能优化与高级线性代数

目录 1. 深入理解 NumPy 的内存视图与拷贝1.1 内存视图&#xff08;View&#xff09;1.1.1 创建视图1.1.2 视图的特点 1.2 数组拷贝&#xff08;Copy&#xff09;1.2.1 创建拷贝1.2.2 拷贝的特点 1.3 视图与拷贝的选择 2. NumPy 的优化与性能提升技巧2.1 向量化操作示例&#x…...

从YOLOv5到训练实战:易用性和扩展性的加强

文章目录 前言一、模型介绍二、YOLOv5网络结构1.Input&#xff08;输入端&#xff09;&#xff1a;智能预处理与优化策略2.Backbone&#xff08;骨干网络&#xff09;&#xff1a;高效特征提取3.NECK&#xff08;颈部&#xff09;&#xff1a;特征增强与多尺度融合4.Prediction…...

Prim 算法在不同权重范围内的性能分析及其实现

Prim 算法在不同权重范围内的性能分析及其实现 1. 边权重取值在 1 到 |V| 范围内伪代码C 代码实现2. 边权重取值在 1 到常数 W 之间结论Prim 算法是一种用于求解加权无向图的最小生成树(MST)的经典算法。它通过贪心策略逐步扩展生成树,确保每次选择的边都是当前生成树到未加…...

canal安装使用

简介 canal [kənl]&#xff0c;译意为水道/管道/沟渠&#xff0c;主要用途是基于 MySQL 数据库增量日志解析&#xff0c;提供增量数据订阅和消费 工作原理 canal 模拟 MySQL slave 的交互协议&#xff0c;伪装自己为 MySQL slave &#xff0c;向 MySQL master 发送 dump 协议…...

python爬虫常用数据保存模板(Excel、CSV、mysql)——scrapy中常用数据提取方法(CSS、XPATH、正则)(23)

文章目录 1、常用数据保存模板2.1 保存为Excel格式2.2 保存为CSV格式2.3 保存至mysql数据库2、scrapy中常用数据提取方法2.1 XPath选择器2.2 CSS选择器2.3 正则表达式1、常用数据保存模板 2.1 保存为Excel格式 # 1、导入模块 from openpyxl import workbook# 2、创建一个exce…...

You need to call SQLitePCL.raw.SetProvider()

在.NET环境中使用Entity Framework Core&#xff08;EF Core&#xff09;连接SQLite数据库时&#xff0c;报错。 使用框架 .NET8 错误信息&#xff1a; Exception: You need to call SQLitePCL.raw.SetProvider(). If you are using a bundle package, this is done by calling…...

IoTDB AINode 报错,call inference 301: Error ocurred while executing inference

问题及现象 使用时序数据库 IoTDB 的 AINode 的 call inference 语句后报错&#xff1a; Msg: org.apache.iotdb.jdbc.IoTDBSOLException&#xff1a;301: Error ocurred while executing inference:[tuple object has no attribute inference]解决方法 可以替换 venv 里面的…...

LLM之RAG实战(五十)| FastAPI:构建基于LLM的WEB接口界面

FastAPI是WEB UI接口&#xff0c;随着LLM的蓬勃发展&#xff0c;FastAPI的生态也迎来了新的机遇。本文将围绕FastAPI、OpenAI的API以及FastCRUD&#xff0c;来创建一个个性化的电子邮件写作助手&#xff0c;以展示如何结合这些技术来构建强大的应用程序。 下面我们开始分步骤操…...

项目-移动端适配的几种方案

目录 一、rem方案二、vw适配方案 一、rem方案 以vue2项目为例 下载安装包&#xff1a;npm install amfe-flexible --save在main.js中引入&#xff1a;import ‘amfe-flexible’下载安装包&#xff1a;npm install postcss-pxtorem --save项目下新建postcss.config.js文件&…...

HCIA-Access V2.5_2_2网络通信基础_TCP/IP协议栈报文封装

TCP/IP协议栈的封装过程 用户从应用层发出数据先会交给传输层&#xff0c;传输层会添加TCP或者UDP头部&#xff0c;然后交给网络层&#xff0c;网络层会添加IP头部&#xff0c;然后交给数据链路层&#xff0c;数据链路层会添加以太网头部和以太网尾部&#xff0c;最后变成01这样…...

LSTM详解

1. LSTM设计 LSTM(长短期记忆网络)详解 长短期记忆网络(LSTM, Long Short-Term Memory) 是一种特殊的循环神经网络(RNN),特别适合处理和预测序列数据中的长时间依赖关系。LSTM 通过引入“门机制”(如输入门、遗忘门、输出门)来解决标准 RNN 在长时间序列任务中梯度消…...

从零开始搭建Android开发环境:简单易懂的完整教程

前言&#xff1a; 作为安卓开发的入门&#xff0c;搭建开发环境是每个开发者都必须迈出的第一步。虽然这一步看似简单&#xff0c;但如果没有正确的配置&#xff0c;可能会遇到各种问题。本篇文章将为大家详细介绍如何从零开始搭建Android开发环境&#xff0c;确保你能够顺利开…...

大模型运用-Prompt Engineering(提示工程)

什么是提示工程 提示工程 提示工程也叫指令工程&#xff0c;涉及到如何设计、优化和管理这些Prompt&#xff0c;以确保AI模型能够准确、高效地执行用户的指令&#xff0c;如&#xff1a;讲个笑话、java写个排序算法等 使用目的 1.获得具体问题的具体结果。&#xff08;如&…...

CMake简单使用(二)

目录 五、scope 作用域5.1 作用域的类型5.1.1 全局作用域5.1.2 目录作用域5.1.3 函数作用域 六、宏6.1 基本语法6.2 演示代码 七、CMake构建项目7.1 全局变量7.2 写入源码路径7.3 调用子目录cmake脚本7.4 CMakeLists 嵌套(最常用) 八、CMake 与库8.1 CMake生成动静态库8.1.1 动…...

攻防世界安卓刷题笔记(新手模式)1-4

1.基础android 进入后是这样的页面。查看源代码看看。首先要注意这个软件并没有加壳&#xff0c;所以我们可以直接着手分析。搜索错误提示“Failed”定位到关键代码&#xff0c;看样子就是检验输入的内容 注意到这里有一行关键代码&#xff0c;cond_39对应的正是failed那个地方…...

发现一个对话框中的按钮,全部失效,点击都没有任何反应,已经解决

前端问题&#xff0c;技术vue2&#xff0c;ts。 发现一个对话框中的按钮&#xff0c;全部失效&#xff0c;点击都没有任何反应。 因为我只在template标签中加入下面这个代码&#xff0c;并没有注册。 只要有一个子组件没有注册&#xff0c;就会影响所有的按钮&#xff0c;使当前…...

MyBatisPlus实现多表查询

在MyBatisPlus中实现多表查询&#xff0c;主要有以下几种方法&#xff1a; 使用注解进行多表查询&#xff1a; 你可以在Mapper接口中使用Select注解来编写SQL查询语句&#xff0c;实现多表查询。例如&#xff0c;如果你想根据用户ID查询用户信息和对应的区域名称&#xff0c;可…...

机器学习详解(5):MLP代码详解之MNIST手写数字识别

文章目录 1 MNIST数据集2 代码详解2.1 导入库和GPU2.2 MNIST数据集处理2.2.1 下载和导入2.2.2 张量(Tensors)2.2.3 准备训练数据 2.3 创建模型2.3.1 图像展开2.3.2 输入层2.3.3 隐藏层2.3.4 输出层2.3.5 模型编译 2.4 训练模型2.4.1 损失函数与优化器2.4.2 计算准确率2.4.3 训练…...

如何在vue中实现父子通信

1.需要用到的组件 父组件 <template><div id"app"><BaseCount :count"count" changeCount"cahngeCount"></BaseCount></div> </template><script> import BaseCount from ./components/BaseCount.v…...

PHP实现华为OBS存储

一&#xff1a;华为OBS存储文档地址 官方文档&#xff1a;https://support.huaweicloud.com/obs/index.html github地址&#xff1a;https://github.com/huaweicloud/huaweicloud-sdk-php-obs 二&#xff1a;安装华为OBS拓展 composer require obs/esdk-obs-php 三&#x…...

嵌入式 linux Git常用命令 抽补丁 打补丁

Git常用命令 为什么要学习git呢&#xff1f;我相信刚入门的小伙伴敲打肯定碰到过这种玄学问题&#xff0c;我明明刚刚还能用的代码&#xff0c;后面不知道咋的就不能用了&#xff0c;所以每次你调出一个功能点以后都会手动复制一份代码防止出问题&#xff0c;时间一长发现整个…...

Alan Chhabra:MongoDB AI应用程序计划(MAAP) 为客户提供价值

MongoDB全球合作伙伴执行副总裁 Alan Chhabra 每当有人向我问询MongoDB&#xff0c;我都会说他们很可能在不觉之间已经与MongoDB有过交集。事实上&#xff0c;包括70%财富百强在内的许多世界领先企业公司都在使用MongoDB。我们在MongoDB所做的一切都是为了服务客户&#xff0c…...

【学习笔记】目前市面中手持激光雷达设备及参数汇总

手持激光雷达设备介绍 手持激光雷达设备是一种利用激光时间飞行原理来测量物体距离并构建三维模型的便携式高科技产品。它通过发射激光束并分析反射回来的激光信号&#xff0c;能够精确地获取物体的三维结构信息。这种设备以其高精度、适应各种光照环境的能力和便携性&#xf…...

Burp与小程序梦中情缘

前言 在日常渗透工作中&#xff0c;有时需要对微信小程序进行抓包渗透&#xff0c;通过抓包&#xff0c;我们可以捕获小程序与服务器之间的通信数据&#xff0c;分析这些数据可以帮助我们发现潜在的安全漏洞&#xff0c;本文通过讲述三个方法在PC端来对小程序抓包渗透 文章目…...

数据结构:Win32 API详解

目录 一.Win32 API的介绍 二.控制台程序(Console)与COORD 1..控制台程序(Console): 2.控制台窗口坐标COORD&#xff1a; 3.GetStdHandle函数&#xff1a; &#xff08;1&#xff09;语法&#xff1a; &#xff08;2&#xff09;参数&#xff1a; 4.GetConsoleCursorInf…...