当前位置: 首页 > news >正文

Kafka 深入服务端 — 时间轮

Kafka中存在大量的延迟操作,比如延时生产、延时拉取和延时删除等。Kafka基于时间轮概念自定义实现了一个用于延时功能的定时器,来完成这些延迟操作。

1 时间轮

Kafka没有使用基于JDK自带的Timer或DelayQueue来实现延迟功能,因为它们的插入和删除操作的时间复杂度为logn,这不能满足Kafka高性能要求。

1.1 Timer 和 DelayQueue

它们都使用了一个优先级队列(通常基于堆实现)来管理任务。

1.1.1 Timer

用于计划在特定时间后执行的任务,这些任务可以只执行一次或定期重复执行。其有以下特点:

  1. 运行在单线程中,无法满足多个任务同时执行的需求。如果前置任务耗时长,可能会阻塞后置任务。
  2. 如果任务执行过程中抛出异常,Timer会被异常中断停止。
public class TimerTest {public static void main(String[] args) {Timer timer = new Timer();Date startTime = new Date();TimerTask task1 = new TimerTask() {@Overridepublic void run() {long dis = (new Date().getTime() - startTime.getTime()) / 1_000;System.out.println("task1执行,距离开始时间:" + dis + "s");}};TimerTask task2 = new TimerTask() {@Overridepublic void run() {long dis = (new Date().getTime() - startTime.getTime()) / 1_000;System.out.println("task2执行,距离开始时间:" + dis + "s,休眠5s。");try {Thread.sleep(5000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("task2休眠结束");}};TimerTask task3 = new TimerTask() {@Overridepublic void run() {long dis = (new Date().getTime() - startTime.getTime()) / 1_000;System.out.println("task3执行,距离开始时间:" + dis + "s");}};timer.schedule(task1,1000); // 1s后执行timer.schedule(task2,2000); // 2s后执行timer.schedule(task3,3000); // 3s后执行}
//    执行结果:
//    task1执行,距离开始时间:1s
//    task2执行,距离开始时间:2s,休眠5s。
//    task2休眠结束
//    task3执行,距离开始时间:7s
}

1.1.2 DelayQueue

是一个无界阻塞队列,用于存储实现了Delayed接口的元素,这些元素只有在它们的延迟期满时才会被取出。其有以下特点:

  1. 线程安全,可以在多线程环境中使用。
  2. 无界队列,可以存储任意数量的元素,直到系统内存耗尽。
  3. 延迟精度依赖与系统时钟。
public class DelayQueueTest {private static class DelayQueueTask implements Delayed {private final String taskName;private final long delayTime;private DelayQueueTask(String taskName, long delayTime) {this.taskName = taskName;this.delayTime = delayTime + System.currentTimeMillis();}@Overridepublic long getDelay(TimeUnit unit) { // 返回剩余延迟long diff = delayTime - System.currentTimeMillis();return unit.convert(diff,TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed o) {if (this.delayTime < ((DelayQueueTask) o).delayTime) {return -1;}if (this.delayTime > ((DelayQueueTask) o).delayTime) {return 1;}return 0;}@Overridepublic String toString() {return "DelayQueueTask{" +"taskName='" + taskName + '\'' +", delayTime=" + delayTime +'}';}}public static void main(String[] args) throws InterruptedException {DelayQueue<DelayQueueTask> delayQueue = new DelayQueue<>();delayQueue.offer(new DelayQueueTask("task1",5000));delayQueue.offer(new DelayQueueTask("task2",2000));delayQueue.offer(new DelayQueueTask("task3",4000));System.out.println("开始执行delayQueue任务");while (!delayQueue.isEmpty()) {DelayQueueTask task = delayQueue.take();System.out.println("任务:" + task);}System.out.println("delayQueue任务 任务执行完毕");}
}

1.2 时间轮结构

Kafka 在任务的插入与删除采用了时间轮结构,其时间复杂度为O(1),而在时间推进上,还是依赖JDK提供的DelayQueue。

图 时间轮(TimingWheel)结构

Kafka的时间轮是一个存储定时任务的环形队列,每个元素(时间格)相当于一个桶(bucket),来存储一个定时任务列表TimerTaskList。TimerTaskList是一个环形的双向链表,链表中的每一项都是定时任务项TimerTaskEntry,其中封装了真正的定时任务TimerTask。

Kafka将TimerTaskList插入到DelayQueue(队列)中,使其成为其中的一个元素。它的过期时间为TimerTaskList的TimerTaskEntry中最快过期的时间。

1.2.1 时间格与时间跨度

时间轮由多个时间格组成(上面示意图中,每一层有10个时间格),每个时间格代表时间轮的基本时间跨度(tick),时间格数(wheelSize)是固定的,那么时间轮的总体跨度interval = tick * wheelSize。

时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间。currentTime是tick的整数倍,currentTime将整个时间轮划分为到期部分和未到期部分。当前指向的时间格也属于到期部分,此时需要处理此时间格所对应的TimerTaskList中的所有任务。

上面示意图中,tick = 1s,此时currentTime指向第2个时间格,需要处理这个时间格存储的所有任务。假设,此时插入了一个3s后的任务,则把该任务插入第5个时间格中的bucket。

1.2.2 时间轮层级

当currentTime 指向第2个时间格时,需要插入一个33s后的任务,此时时间超过了第一层的跨度(1s * 10 = 10s)。Kafka引入层级时间轮的概念,当到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层的时间轮中。

33s 的任务会被插入到第二层的第(33 / 10 = 3) 3 个时间格中。

第一层的开始时间(第0格)startMs 是当前系统时间。其余高层时间轮的起始时间都设置为创建此层时前面第一轮的currentTime。

每一层时间轮都会有指向更高一层的引用。

1.2.3 任务处理及时间轮降级

图 时钟

时间轮类似于时钟,当每一层走完一圈时,上一层就会走一格。例如当第1层的currentTime 指向第2格时,此时需要插入两个任务,分别是33s及39s后。它们都会被插入到第2层的第3格中的bucket(TimerTaskList)。

假设经过33s(第1层指向第5格,第2层指向第3格)后,此bucket还是只有这两个任务。Kafka会把它们所在的TimerTaskList从第2层的第3格中取出,将33s的任务执行并从TimerTaskList中删除。此时,39s的任务还剩6s,Kafka会把这个任务“降级”,插入到第1层第1((5+6)% 10)格中。

1.2.4 时间推进与DelayQueue

如果按照时间格一格格推进时间,这样消耗会比较大,而且可能好多时间格没有存储任务。Kafka借助DelayQueue来推进时间。

将时间格bucket的TimerTaskList封装成Delayed,其剩余时间取TimerTaskList中TimerTaskEntry最快达到的时间。然后将这些Delayed插入到DelayQueue中。DelayQueue会将这些Delayed排序,最快到达的排在队列头部。当到达时刻时,将表头的TimerTaskEntry取出,对它的TaskEntry执行任务执行或降级等操作。

相关文章:

Kafka 深入服务端 — 时间轮

Kafka中存在大量的延迟操作&#xff0c;比如延时生产、延时拉取和延时删除等。Kafka基于时间轮概念自定义实现了一个用于延时功能的定时器&#xff0c;来完成这些延迟操作。 1 时间轮 Kafka没有使用基于JDK自带的Timer或DelayQueue来实现延迟功能&#xff0c;因为它们的插入和…...

网络爬虫学习:应用selenium获取Edge浏览器版本号,自动下载对应版本msedgedriver,确保Edge浏览器顺利打开。

一、前言 我从24年11月份开始学习网络爬虫应用开发&#xff0c;经过2个来月的努力&#xff0c;于1月下旬完成了开发一款网络爬虫软件的学习目标。这里对本次学习及应用开发进行一下回顾总结。 前几天我已经发了一篇日志&#xff08;网络爬虫学习&#xff1a;应用selenium从搜…...

【go语言】结构体

一、type 关键字的用法 在 go 语言中&#xff0c;type 关键字用于定义新的类型&#xff0c;他可以用来定义基础类型、结构体类型、接口类型、函数类型等。通过 type 关键字&#xff0c;我们可以为现有类型创建新的类型别名或者自定义新的类型。 1.1 类型别名 使用 type 可以为…...

Spring Boot是什么及其优点

简介 Spring Boot是基于Spring框架开发的全新框架&#xff0c;其设计目的是简化Spring应用的初始化搭建和开发过程。 Spring Boot整合了许多框架和第三方库配置&#xff0c;几乎可以达到“开箱即用”。 优点 可快速构建独立的Spring应用。 直接嵌入Tomcat、Jetty和Underto…...

谷氨酸:大脑功能的多面手

标题&#xff1a;谷氨酸&#xff1a;大脑功能的多面手 文章信息摘要&#xff1a; 谷氨酸是大脑中最主要的兴奋性神经递质&#xff0c;参与了90%以上的神经元激活&#xff0c;在蛋白质合成、味觉&#xff08;鲜味&#xff09;以及神经可塑性中发挥重要作用。它与GABA、多巴胺等…...

SpringCloudGateWay和Sentinel结合做黑白名单来源控制

假设我们的分布式项目&#xff0c;admin是8087&#xff0c;gateway是8088&#xff0c;consumer是8086 我们一般的思路是我们的请求必须经过我们的网关8088然后网关转发到我们的分布式项目&#xff0c;那我要是没有处理我们绕过网关直接访问项目8087和8086不也是可以&#xff1…...

HTML新春烟花

系列文章 序号目录1HTML满屏跳动的爱心&#xff08;可写字&#xff09;2HTML五彩缤纷的爱心3HTML满屏漂浮爱心4HTML情人节快乐5HTML蓝色爱心射线6HTML跳动的爱心&#xff08;简易版&#xff09;7HTML粒子爱心8HTML蓝色动态爱心9HTML跳动的爱心&#xff08;双心版&#xff09;10…...

【Elasticsearch】中数据流需要配置索引模板吗?

是的&#xff0c;数据流需要配置索引模板。在Elasticsearch中&#xff0c;数据流&#xff08;Data Streams&#xff09;是一种用于处理时间序列数据的高级结构&#xff0c;它背后由多个隐藏的索引组成&#xff0c;这些索引被称为后备索引&#xff08;Backing Indices&#xff0…...

Git进阶之旅:Git 配置信息 Config

Git 配置级别&#xff1a; 仓库级别&#xff1a;local [ 优先级最高 ]用户级别&#xff1a;global [ 优先级次之 ]系统级别&#xff1a;system [ 优先级最低 ] 配置文件位置&#xff1a; git 仓库级别对应的配置文件是当前仓库下的 .git/configgit 用户级别对应的配置文件时用…...

buu-pwn1_sctf_2016-好久不见29

这个也是栈溢出&#xff0c;不一样的点是&#xff0c;有replace替换&#xff0c;要输入0x3c字符&#xff08;60&#xff09;&#xff0c;Iyou 所以&#xff0c;20个I就行&#xff0c;找后面函数 输出提示信息&#xff0c;要求用户输入关于自己的信息。 使用fgets函数从标准输入…...

ES2021+新特性、常用函数

一、ES2021新特性 ES2021 数字分隔符 let num 1234567 let num2 1_234_567 Promise.any 与 Promise.all 类似&#xff0c;Promise.any 也接受一个 Promise 的数组。当其中任何一个 Promise 完成&#xff08;fullfill&#xff09;时&#xff0c;就返回那个已经有完成值的 …...

STM32——LCD

一、引脚配置 查看引脚 将上述引脚都设置为GPIO_Output 二、导入驱动文件 将 LCD 驱动的 Inc 以及 Src 中的 fonts.h,lcd.h 和 lcd.c 导入到自己工程的驱动文件中。 当然&#xff0c;后面 lcd 的驱动学习可以和 IMX6U 一块学。 三、LCD函数 void LCD_Clear(u16 Color); 功能…...

【redis进阶】分布式锁

目录 一、什么是分布式锁 二、分布式锁的基础实现 三、引入过期时间 四、引入校验 id 五、引入lua 六、引入 watch dog (看门狗) 七、引入 Redlock 算法 八、其他功能 redis学习&#x1f973; 一、什么是分布式锁 在一个分布式的系统中&#xff0c;也会涉及到多个节点访问同一…...

园区管理系统如何提升企业核心竞争力与资产管理智能化水平

内容概要 在当今快节奏的商业环境中&#xff0c;园区管理系统正成为企业的重要合作伙伴&#xff0c;尤其在工业园、产业园、物流园、写字楼和公寓等多种类型的物业管理中。这个系统不仅仅是一个管理工具&#xff0c;它还是提升企业运营效率和核心竞争力的关键因素。通过智能化…...

AI大模型开发原理篇-3:词向量和词嵌入

简介 词向量是用于表示单词意义的向量&#xff0c; 并且还可以被认为是单词的特征向量或表示。 将单词映射到实向量的技术称为词嵌入。在实际应用中&#xff0c;词向量和词嵌入这两个重要的NLP术语通常可以互换使用。它们都表示将词汇表中的单词映射到固定大小的连续向量空间中…...

高精度算法:高精度减法

P2142 高精度减法 - 洛谷 | 计算机科学教育新生态 我们两个整数一定要是大数减去小数&#xff0c;所以这个点我们需要特判一下&#xff0c;那我们两个字符串表示的整型怎么判断大小呢&#xff0c;我们字典序比较大小和真实的数字比较大小是一样的&#xff0c;比如我们的‘21’…...

Java创建项目准备工作

新建项目 新建空项目 每一个空项目创建好后都要检查jdk版本 检查SDK和语言级别——Apply——OK 检查当前项目的Maven路径&#xff0c;如果已经配置好全局&#xff0c;就是正确路径不用管 修改项目字符集编码&#xff0c;将所有编码都调整为UTF-8 创建Spingboot工程 创建Spring…...

基于STM32的智能宠物喂食器设计

目录 引言系统设计 硬件设计软件设计 系统功能模块 定时喂食模块远程控制与视频监控模块食物存量检测与报警模块语音互动与用户交互模块数据记录与智能分析模块 控制算法 定时与手动投喂算法食物存量检测与低存量提醒算法数据记录与远程反馈算法 代码实现 喂食控制代码存量检测…...

在线课堂小程序设计与实现(LW+源码+讲解)

专注于大学生项目实战开发,讲解,毕业答疑辅导&#xff0c;欢迎高校老师/同行前辈交流合作✌。 技术范围&#xff1a;SpringBoot、Vue、SSM、HLMT、小程序、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容&#xff1a;…...

为AI聊天工具添加一个知识系统 之77 详细设计之18 正则表达式 之5

本文要点 昨天讨论了 本项目&#xff08;AI聊天工具添加一个知识系统&#xff09;中正则表达式模板的设计中可能要考虑到的一些问题&#xff08;讨论到的内容比较随意&#xff0c;暂时无法确定 那些考虑 是否 应该是正则表达式模板设计要考虑的以及 是否完整&#xff09;。今天…...

【Elasticsearch】 索引模板 ignore_missing_component_templates

解释 ignore_missing_component_templates 配置 在Elasticsearch中&#xff0c;ignore_missing_component_templates 是一个配置选项&#xff0c;用于处理索引模板中引用的组件模板可能不存在的情况。当您创建一个索引模板时&#xff0c;可以指定一个或多个组件模板&#xff0…...

Github 2025-01-29 C开源项目日报 Top10

根据Github Trendings的统计,今日(2025-01-29统计)共有10个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量C项目10C++项目1Assembly项目1Go项目1我的电视 - 安卓电视直播软件 创建周期:40 天开发语言:CStar数量:649 个Fork数量:124 次关注人数:64…...

文件上传2

BUUCTF 你传你&#x1f40e;呢 先上传.htaccess 修改格式 即可上传成功 返回上传图片格式的木马 用蚁剑连接 5ecf1cca-59a1-408b-b616-090edf124db5.node5.buuoj.cn:81/upload/7d8511a847edeacb5385299396a96d91/rao.jpg 即可得到flag [GXYCTF2019]BabyUpload...

Unity敌人逻辑笔记

写ai逻辑基本上都需要状态机。因为懒得手搓状态机&#xff0c;所以选择直接用动画状态机当逻辑状态机用。 架构设计 因为敌人的根节点已经有一个animator控制动画&#xff0c;只能增加一个子节点AI&#xff0c;给它加一个animator指向逻辑“动画”状态机。还有一个脚本&#…...

高级编码参数

1.跳帧机制 参考资料&#xff1a;frameskipping-hotedgevideo 跳帧机制用于优化视频质量和编码效率。它通过选择性地跳过某些帧并使用参考帧来预测和重建视频内容&#xff0c;从而减少编码所需的比特率&#xff0c;同时保持较高的视频质量。在视频编码过程中&#xff0c;如果…...

DeepSeek-R1:通过强化学习激励大型语言模型(LLMs)的推理能力

摘要 我们推出了第一代推理模型&#xff1a;DeepSeek-R1-Zero和DeepSeek-R1。DeepSeek-R1-Zero是一个未经监督微调&#xff08;SFT&#xff09;作为初步步骤&#xff0c;而是通过大规模强化学习&#xff08;RL&#xff09;训练的模型&#xff0c;展现出卓越的推理能力。通过强…...

leetcode——合并K个有序链表(java)

给你一个链表数组&#xff0c;每个链表都已经按升序排列。 请你将所有链表合并到一个升序链表中&#xff0c;返回合并后的链表。 示例 1&#xff1a; 输入&#xff1a;lists [[1,4,5],[1,3,4],[2,6]] 输出&#xff1a;[1,1,2,3,4,4,5,6] 解释&#xff1a;链表数组如下&#…...

【Valgrind】安装报错: 报错有未满足的依赖关系: libc6,libc6-dbg

Valgrind 内存泄漏检测工具安装 安装 sudo apt install valgrind官方上也是如此 但是在我的系统&#xff08;debian12)上却失败了&#xff1a; 报错有未满足的依赖关系&#xff1a; libc6 : 破坏: valgrind (< 1:3.19.0-1~) 但是 1:3.16.1-1 正要被安装 libc6-dbg : 依赖…...

vue3和vue2的区别有哪些差异点

Vue3 vs Vue2 主要差异对比指南 官网 1. 核心架构差异 1.1 响应式系统 Vue2&#xff1a;使用 Object.defineProperty 实现响应式 // Vue2 响应式实现 Object.defineProperty(obj, key, {get() {// 依赖收集return value},set(newValue) {// 触发更新value newValue} })Vue3…...

论文笔记(六十三)Understanding Diffusion Models: A Unified Perspective(六)(完结)

Understanding Diffusion Models: A Unified Perspective&#xff08;六&#xff09;&#xff08;完结&#xff09; 文章概括指导&#xff08;Guidance&#xff09;分类器指导无分类器引导&#xff08;Classifier-Free Guidance&#xff09; 总结 文章概括 引用&#xff1a; …...