Kafka 深入服务端 — 时间轮
Kafka中存在大量的延迟操作,比如延时生产、延时拉取和延时删除等。Kafka基于时间轮概念自定义实现了一个用于延时功能的定时器,来完成这些延迟操作。
1 时间轮
Kafka没有使用基于JDK自带的Timer或DelayQueue来实现延迟功能,因为它们的插入和删除操作的时间复杂度为logn,这不能满足Kafka高性能要求。
1.1 Timer 和 DelayQueue
它们都使用了一个优先级队列(通常基于堆实现)来管理任务。
1.1.1 Timer
用于计划在特定时间后执行的任务,这些任务可以只执行一次或定期重复执行。其有以下特点:
- 运行在单线程中,无法满足多个任务同时执行的需求。如果前置任务耗时长,可能会阻塞后置任务。
- 如果任务执行过程中抛出异常,Timer会被异常中断停止。
public class TimerTest {public static void main(String[] args) {Timer timer = new Timer();Date startTime = new Date();TimerTask task1 = new TimerTask() {@Overridepublic void run() {long dis = (new Date().getTime() - startTime.getTime()) / 1_000;System.out.println("task1执行,距离开始时间:" + dis + "s");}};TimerTask task2 = new TimerTask() {@Overridepublic void run() {long dis = (new Date().getTime() - startTime.getTime()) / 1_000;System.out.println("task2执行,距离开始时间:" + dis + "s,休眠5s。");try {Thread.sleep(5000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("task2休眠结束");}};TimerTask task3 = new TimerTask() {@Overridepublic void run() {long dis = (new Date().getTime() - startTime.getTime()) / 1_000;System.out.println("task3执行,距离开始时间:" + dis + "s");}};timer.schedule(task1,1000); // 1s后执行timer.schedule(task2,2000); // 2s后执行timer.schedule(task3,3000); // 3s后执行}
// 执行结果:
// task1执行,距离开始时间:1s
// task2执行,距离开始时间:2s,休眠5s。
// task2休眠结束
// task3执行,距离开始时间:7s
}
1.1.2 DelayQueue
是一个无界阻塞队列,用于存储实现了Delayed接口的元素,这些元素只有在它们的延迟期满时才会被取出。其有以下特点:
- 线程安全,可以在多线程环境中使用。
- 无界队列,可以存储任意数量的元素,直到系统内存耗尽。
- 延迟精度依赖与系统时钟。
public class DelayQueueTest {private static class DelayQueueTask implements Delayed {private final String taskName;private final long delayTime;private DelayQueueTask(String taskName, long delayTime) {this.taskName = taskName;this.delayTime = delayTime + System.currentTimeMillis();}@Overridepublic long getDelay(TimeUnit unit) { // 返回剩余延迟long diff = delayTime - System.currentTimeMillis();return unit.convert(diff,TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed o) {if (this.delayTime < ((DelayQueueTask) o).delayTime) {return -1;}if (this.delayTime > ((DelayQueueTask) o).delayTime) {return 1;}return 0;}@Overridepublic String toString() {return "DelayQueueTask{" +"taskName='" + taskName + '\'' +", delayTime=" + delayTime +'}';}}public static void main(String[] args) throws InterruptedException {DelayQueue<DelayQueueTask> delayQueue = new DelayQueue<>();delayQueue.offer(new DelayQueueTask("task1",5000));delayQueue.offer(new DelayQueueTask("task2",2000));delayQueue.offer(new DelayQueueTask("task3",4000));System.out.println("开始执行delayQueue任务");while (!delayQueue.isEmpty()) {DelayQueueTask task = delayQueue.take();System.out.println("任务:" + task);}System.out.println("delayQueue任务 任务执行完毕");}
}
1.2 时间轮结构
Kafka 在任务的插入与删除采用了时间轮结构,其时间复杂度为O(1),而在时间推进上,还是依赖JDK提供的DelayQueue。

图 时间轮(TimingWheel)结构
Kafka的时间轮是一个存储定时任务的环形队列,每个元素(时间格)相当于一个桶(bucket),来存储一个定时任务列表TimerTaskList。TimerTaskList是一个环形的双向链表,链表中的每一项都是定时任务项TimerTaskEntry,其中封装了真正的定时任务TimerTask。
Kafka将TimerTaskList插入到DelayQueue(队列)中,使其成为其中的一个元素。它的过期时间为TimerTaskList的TimerTaskEntry中最快过期的时间。
1.2.1 时间格与时间跨度
时间轮由多个时间格组成(上面示意图中,每一层有10个时间格),每个时间格代表时间轮的基本时间跨度(tick),时间格数(wheelSize)是固定的,那么时间轮的总体跨度interval = tick * wheelSize。
时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间。currentTime是tick的整数倍,currentTime将整个时间轮划分为到期部分和未到期部分。当前指向的时间格也属于到期部分,此时需要处理此时间格所对应的TimerTaskList中的所有任务。
上面示意图中,tick = 1s,此时currentTime指向第2个时间格,需要处理这个时间格存储的所有任务。假设,此时插入了一个3s后的任务,则把该任务插入第5个时间格中的bucket。
1.2.2 时间轮层级
当currentTime 指向第2个时间格时,需要插入一个33s后的任务,此时时间超过了第一层的跨度(1s * 10 = 10s)。Kafka引入层级时间轮的概念,当到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层的时间轮中。
33s 的任务会被插入到第二层的第(33 / 10 = 3) 3 个时间格中。
第一层的开始时间(第0格)startMs 是当前系统时间。其余高层时间轮的起始时间都设置为创建此层时前面第一轮的currentTime。
每一层时间轮都会有指向更高一层的引用。
1.2.3 任务处理及时间轮降级

图 时钟
时间轮类似于时钟,当每一层走完一圈时,上一层就会走一格。例如当第1层的currentTime 指向第2格时,此时需要插入两个任务,分别是33s及39s后。它们都会被插入到第2层的第3格中的bucket(TimerTaskList)。
假设经过33s(第1层指向第5格,第2层指向第3格)后,此bucket还是只有这两个任务。Kafka会把它们所在的TimerTaskList从第2层的第3格中取出,将33s的任务执行并从TimerTaskList中删除。此时,39s的任务还剩6s,Kafka会把这个任务“降级”,插入到第1层第1((5+6)% 10)格中。
1.2.4 时间推进与DelayQueue
如果按照时间格一格格推进时间,这样消耗会比较大,而且可能好多时间格没有存储任务。Kafka借助DelayQueue来推进时间。
将时间格bucket的TimerTaskList封装成Delayed,其剩余时间取TimerTaskList中TimerTaskEntry最快达到的时间。然后将这些Delayed插入到DelayQueue中。DelayQueue会将这些Delayed排序,最快到达的排在队列头部。当到达时刻时,将表头的TimerTaskEntry取出,对它的TaskEntry执行任务执行或降级等操作。
相关文章:
Kafka 深入服务端 — 时间轮
Kafka中存在大量的延迟操作,比如延时生产、延时拉取和延时删除等。Kafka基于时间轮概念自定义实现了一个用于延时功能的定时器,来完成这些延迟操作。 1 时间轮 Kafka没有使用基于JDK自带的Timer或DelayQueue来实现延迟功能,因为它们的插入和…...
网络爬虫学习:应用selenium获取Edge浏览器版本号,自动下载对应版本msedgedriver,确保Edge浏览器顺利打开。
一、前言 我从24年11月份开始学习网络爬虫应用开发,经过2个来月的努力,于1月下旬完成了开发一款网络爬虫软件的学习目标。这里对本次学习及应用开发进行一下回顾总结。 前几天我已经发了一篇日志(网络爬虫学习:应用selenium从搜…...
【go语言】结构体
一、type 关键字的用法 在 go 语言中,type 关键字用于定义新的类型,他可以用来定义基础类型、结构体类型、接口类型、函数类型等。通过 type 关键字,我们可以为现有类型创建新的类型别名或者自定义新的类型。 1.1 类型别名 使用 type 可以为…...
Spring Boot是什么及其优点
简介 Spring Boot是基于Spring框架开发的全新框架,其设计目的是简化Spring应用的初始化搭建和开发过程。 Spring Boot整合了许多框架和第三方库配置,几乎可以达到“开箱即用”。 优点 可快速构建独立的Spring应用。 直接嵌入Tomcat、Jetty和Underto…...
谷氨酸:大脑功能的多面手
标题:谷氨酸:大脑功能的多面手 文章信息摘要: 谷氨酸是大脑中最主要的兴奋性神经递质,参与了90%以上的神经元激活,在蛋白质合成、味觉(鲜味)以及神经可塑性中发挥重要作用。它与GABA、多巴胺等…...
SpringCloudGateWay和Sentinel结合做黑白名单来源控制
假设我们的分布式项目,admin是8087,gateway是8088,consumer是8086 我们一般的思路是我们的请求必须经过我们的网关8088然后网关转发到我们的分布式项目,那我要是没有处理我们绕过网关直接访问项目8087和8086不也是可以࿱…...
HTML新春烟花
系列文章 序号目录1HTML满屏跳动的爱心(可写字)2HTML五彩缤纷的爱心3HTML满屏漂浮爱心4HTML情人节快乐5HTML蓝色爱心射线6HTML跳动的爱心(简易版)7HTML粒子爱心8HTML蓝色动态爱心9HTML跳动的爱心(双心版)10…...
【Elasticsearch】中数据流需要配置索引模板吗?
是的,数据流需要配置索引模板。在Elasticsearch中,数据流(Data Streams)是一种用于处理时间序列数据的高级结构,它背后由多个隐藏的索引组成,这些索引被称为后备索引(Backing Indices࿰…...
Git进阶之旅:Git 配置信息 Config
Git 配置级别: 仓库级别:local [ 优先级最高 ]用户级别:global [ 优先级次之 ]系统级别:system [ 优先级最低 ] 配置文件位置: git 仓库级别对应的配置文件是当前仓库下的 .git/configgit 用户级别对应的配置文件时用…...
buu-pwn1_sctf_2016-好久不见29
这个也是栈溢出,不一样的点是,有replace替换,要输入0x3c字符(60),Iyou 所以,20个I就行,找后面函数 输出提示信息,要求用户输入关于自己的信息。 使用fgets函数从标准输入…...
ES2021+新特性、常用函数
一、ES2021新特性 ES2021 数字分隔符 let num 1234567 let num2 1_234_567 Promise.any 与 Promise.all 类似,Promise.any 也接受一个 Promise 的数组。当其中任何一个 Promise 完成(fullfill)时,就返回那个已经有完成值的 …...
STM32——LCD
一、引脚配置 查看引脚 将上述引脚都设置为GPIO_Output 二、导入驱动文件 将 LCD 驱动的 Inc 以及 Src 中的 fonts.h,lcd.h 和 lcd.c 导入到自己工程的驱动文件中。 当然,后面 lcd 的驱动学习可以和 IMX6U 一块学。 三、LCD函数 void LCD_Clear(u16 Color); 功能…...
【redis进阶】分布式锁
目录 一、什么是分布式锁 二、分布式锁的基础实现 三、引入过期时间 四、引入校验 id 五、引入lua 六、引入 watch dog (看门狗) 七、引入 Redlock 算法 八、其他功能 redis学习🥳 一、什么是分布式锁 在一个分布式的系统中,也会涉及到多个节点访问同一…...
园区管理系统如何提升企业核心竞争力与资产管理智能化水平
内容概要 在当今快节奏的商业环境中,园区管理系统正成为企业的重要合作伙伴,尤其在工业园、产业园、物流园、写字楼和公寓等多种类型的物业管理中。这个系统不仅仅是一个管理工具,它还是提升企业运营效率和核心竞争力的关键因素。通过智能化…...
AI大模型开发原理篇-3:词向量和词嵌入
简介 词向量是用于表示单词意义的向量, 并且还可以被认为是单词的特征向量或表示。 将单词映射到实向量的技术称为词嵌入。在实际应用中,词向量和词嵌入这两个重要的NLP术语通常可以互换使用。它们都表示将词汇表中的单词映射到固定大小的连续向量空间中…...
高精度算法:高精度减法
P2142 高精度减法 - 洛谷 | 计算机科学教育新生态 我们两个整数一定要是大数减去小数,所以这个点我们需要特判一下,那我们两个字符串表示的整型怎么判断大小呢,我们字典序比较大小和真实的数字比较大小是一样的,比如我们的‘21’…...
Java创建项目准备工作
新建项目 新建空项目 每一个空项目创建好后都要检查jdk版本 检查SDK和语言级别——Apply——OK 检查当前项目的Maven路径,如果已经配置好全局,就是正确路径不用管 修改项目字符集编码,将所有编码都调整为UTF-8 创建Spingboot工程 创建Spring…...
基于STM32的智能宠物喂食器设计
目录 引言系统设计 硬件设计软件设计 系统功能模块 定时喂食模块远程控制与视频监控模块食物存量检测与报警模块语音互动与用户交互模块数据记录与智能分析模块 控制算法 定时与手动投喂算法食物存量检测与低存量提醒算法数据记录与远程反馈算法 代码实现 喂食控制代码存量检测…...
在线课堂小程序设计与实现(LW+源码+讲解)
专注于大学生项目实战开发,讲解,毕业答疑辅导,欢迎高校老师/同行前辈交流合作✌。 技术范围:SpringBoot、Vue、SSM、HLMT、小程序、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容:…...
为AI聊天工具添加一个知识系统 之77 详细设计之18 正则表达式 之5
本文要点 昨天讨论了 本项目(AI聊天工具添加一个知识系统)中正则表达式模板的设计中可能要考虑到的一些问题(讨论到的内容比较随意,暂时无法确定 那些考虑 是否 应该是正则表达式模板设计要考虑的以及 是否完整)。今天…...
Linux 文件类型,目录与路径,文件与目录管理
文件类型 后面的字符表示文件类型标志 普通文件:-(纯文本文件,二进制文件,数据格式文件) 如文本文件、图片、程序文件等。 目录文件:d(directory) 用来存放其他文件或子目录。 设备…...
蓝桥杯 2024 15届国赛 A组 儿童节快乐
P10576 [蓝桥杯 2024 国 A] 儿童节快乐 题目描述 五彩斑斓的气球在蓝天下悠然飘荡,轻快的音乐在耳边持续回荡,小朋友们手牵着手一同畅快欢笑。在这样一片安乐祥和的氛围下,六一来了。 今天是六一儿童节,小蓝老师为了让大家在节…...
【SpringBoot自动化部署】
SpringBoot自动化部署方法 使用Jenkins进行持续集成与部署 Jenkins是最常用的自动化部署工具之一,能够实现代码拉取、构建、测试和部署的全流程自动化。 配置Jenkins任务时,需要添加Git仓库地址和凭证,设置构建触发器(如GitHub…...
ThreadLocal 源码
ThreadLocal 源码 此类提供线程局部变量。这些变量不同于它们的普通对应物,因为每个访问一个线程局部变量的线程(通过其 get 或 set 方法)都有自己独立初始化的变量副本。ThreadLocal 实例通常是类中的私有静态字段,这些类希望将…...
6.9本日总结
一、英语 复习默写list11list18,订正07年第3篇阅读 二、数学 学习线代第一讲,写15讲课后题 三、408 学习计组第二章,写计组习题 四、总结 明天结束线代第一章和计组第二章 五、明日计划 英语:复习l默写sit12list17&#…...
性能优化中,多面体模型基本原理
1)多面体编译技术是一种基于多面体模型的程序分析和优化技术,它将程序 中的语句实例、访问关系、依赖关系和调度等信息映射到多维空间中的几何对 象,通过对这些几何对象进行几何操作和线性代数计算来进行程序的分析和优 化。 其中࿰…...
Excel 怎么让透视表以正常Excel表格形式显示
目录 1、创建数据透视表 2、设计 》报表布局 》以表格形式显示 3、设计 》分类汇总 》不显示分类汇总 1、创建数据透视表 2、设计 》报表布局 》以表格形式显示 3、设计 》分类汇总 》不显示分类汇总...
篇章一 论坛系统——前置知识
目录 1.软件开发 1.1 软件的生命周期 1.2 面向对象 1.3 CS、BS架构 1.CS架构编辑 2.BS架构 1.4 软件需求 1.需求分类 2.需求获取 1.5 需求分析 1. 工作内容 1.6 面向对象分析 1.OOA的任务 2.统一建模语言UML 3. 用例模型 3.1 用例图的元素 3.2 建立用例模型 …...
PostgreSQL 对 IPv6 的支持情况
PostgreSQL 对 IPv6 的支持情况 PostgreSQL 全面支持 IPv6 网络协议,包括连接、存储和操作 IPv6 地址。以下是详细说明: 一、网络连接支持 1. 监听 IPv6 连接 在 postgresql.conf 中配置: listen_addresses 0.0.0.0,:: # 监听所有IPv4…...
docker容器互联
1.docker可以通过网路访问 2.docker允许映射容器内应用的服务端口到本地宿主主机 3.互联机制实现多个容器间通过容器名来快速访问 一 、端口映射实现容器访问 1.从外部访问容器应用 我们先把之前的删掉吧(如果不删的话,容器就提不起来,因…...
