当前位置: 首页 > news >正文

Kafka 深入服务端 — 时间轮

Kafka中存在大量的延迟操作,比如延时生产、延时拉取和延时删除等。Kafka基于时间轮概念自定义实现了一个用于延时功能的定时器,来完成这些延迟操作。

1 时间轮

Kafka没有使用基于JDK自带的Timer或DelayQueue来实现延迟功能,因为它们的插入和删除操作的时间复杂度为logn,这不能满足Kafka高性能要求。

1.1 Timer 和 DelayQueue

它们都使用了一个优先级队列(通常基于堆实现)来管理任务。

1.1.1 Timer

用于计划在特定时间后执行的任务,这些任务可以只执行一次或定期重复执行。其有以下特点:

  1. 运行在单线程中,无法满足多个任务同时执行的需求。如果前置任务耗时长,可能会阻塞后置任务。
  2. 如果任务执行过程中抛出异常,Timer会被异常中断停止。
public class TimerTest {public static void main(String[] args) {Timer timer = new Timer();Date startTime = new Date();TimerTask task1 = new TimerTask() {@Overridepublic void run() {long dis = (new Date().getTime() - startTime.getTime()) / 1_000;System.out.println("task1执行,距离开始时间:" + dis + "s");}};TimerTask task2 = new TimerTask() {@Overridepublic void run() {long dis = (new Date().getTime() - startTime.getTime()) / 1_000;System.out.println("task2执行,距离开始时间:" + dis + "s,休眠5s。");try {Thread.sleep(5000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("task2休眠结束");}};TimerTask task3 = new TimerTask() {@Overridepublic void run() {long dis = (new Date().getTime() - startTime.getTime()) / 1_000;System.out.println("task3执行,距离开始时间:" + dis + "s");}};timer.schedule(task1,1000); // 1s后执行timer.schedule(task2,2000); // 2s后执行timer.schedule(task3,3000); // 3s后执行}
//    执行结果:
//    task1执行,距离开始时间:1s
//    task2执行,距离开始时间:2s,休眠5s。
//    task2休眠结束
//    task3执行,距离开始时间:7s
}

1.1.2 DelayQueue

是一个无界阻塞队列,用于存储实现了Delayed接口的元素,这些元素只有在它们的延迟期满时才会被取出。其有以下特点:

  1. 线程安全,可以在多线程环境中使用。
  2. 无界队列,可以存储任意数量的元素,直到系统内存耗尽。
  3. 延迟精度依赖与系统时钟。
public class DelayQueueTest {private static class DelayQueueTask implements Delayed {private final String taskName;private final long delayTime;private DelayQueueTask(String taskName, long delayTime) {this.taskName = taskName;this.delayTime = delayTime + System.currentTimeMillis();}@Overridepublic long getDelay(TimeUnit unit) { // 返回剩余延迟long diff = delayTime - System.currentTimeMillis();return unit.convert(diff,TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed o) {if (this.delayTime < ((DelayQueueTask) o).delayTime) {return -1;}if (this.delayTime > ((DelayQueueTask) o).delayTime) {return 1;}return 0;}@Overridepublic String toString() {return "DelayQueueTask{" +"taskName='" + taskName + '\'' +", delayTime=" + delayTime +'}';}}public static void main(String[] args) throws InterruptedException {DelayQueue<DelayQueueTask> delayQueue = new DelayQueue<>();delayQueue.offer(new DelayQueueTask("task1",5000));delayQueue.offer(new DelayQueueTask("task2",2000));delayQueue.offer(new DelayQueueTask("task3",4000));System.out.println("开始执行delayQueue任务");while (!delayQueue.isEmpty()) {DelayQueueTask task = delayQueue.take();System.out.println("任务:" + task);}System.out.println("delayQueue任务 任务执行完毕");}
}

1.2 时间轮结构

Kafka 在任务的插入与删除采用了时间轮结构,其时间复杂度为O(1),而在时间推进上,还是依赖JDK提供的DelayQueue。

图 时间轮(TimingWheel)结构

Kafka的时间轮是一个存储定时任务的环形队列,每个元素(时间格)相当于一个桶(bucket),来存储一个定时任务列表TimerTaskList。TimerTaskList是一个环形的双向链表,链表中的每一项都是定时任务项TimerTaskEntry,其中封装了真正的定时任务TimerTask。

Kafka将TimerTaskList插入到DelayQueue(队列)中,使其成为其中的一个元素。它的过期时间为TimerTaskList的TimerTaskEntry中最快过期的时间。

1.2.1 时间格与时间跨度

时间轮由多个时间格组成(上面示意图中,每一层有10个时间格),每个时间格代表时间轮的基本时间跨度(tick),时间格数(wheelSize)是固定的,那么时间轮的总体跨度interval = tick * wheelSize。

时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间。currentTime是tick的整数倍,currentTime将整个时间轮划分为到期部分和未到期部分。当前指向的时间格也属于到期部分,此时需要处理此时间格所对应的TimerTaskList中的所有任务。

上面示意图中,tick = 1s,此时currentTime指向第2个时间格,需要处理这个时间格存储的所有任务。假设,此时插入了一个3s后的任务,则把该任务插入第5个时间格中的bucket。

1.2.2 时间轮层级

当currentTime 指向第2个时间格时,需要插入一个33s后的任务,此时时间超过了第一层的跨度(1s * 10 = 10s)。Kafka引入层级时间轮的概念,当到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层的时间轮中。

33s 的任务会被插入到第二层的第(33 / 10 = 3) 3 个时间格中。

第一层的开始时间(第0格)startMs 是当前系统时间。其余高层时间轮的起始时间都设置为创建此层时前面第一轮的currentTime。

每一层时间轮都会有指向更高一层的引用。

1.2.3 任务处理及时间轮降级

图 时钟

时间轮类似于时钟,当每一层走完一圈时,上一层就会走一格。例如当第1层的currentTime 指向第2格时,此时需要插入两个任务,分别是33s及39s后。它们都会被插入到第2层的第3格中的bucket(TimerTaskList)。

假设经过33s(第1层指向第5格,第2层指向第3格)后,此bucket还是只有这两个任务。Kafka会把它们所在的TimerTaskList从第2层的第3格中取出,将33s的任务执行并从TimerTaskList中删除。此时,39s的任务还剩6s,Kafka会把这个任务“降级”,插入到第1层第1((5+6)% 10)格中。

1.2.4 时间推进与DelayQueue

如果按照时间格一格格推进时间,这样消耗会比较大,而且可能好多时间格没有存储任务。Kafka借助DelayQueue来推进时间。

将时间格bucket的TimerTaskList封装成Delayed,其剩余时间取TimerTaskList中TimerTaskEntry最快达到的时间。然后将这些Delayed插入到DelayQueue中。DelayQueue会将这些Delayed排序,最快到达的排在队列头部。当到达时刻时,将表头的TimerTaskEntry取出,对它的TaskEntry执行任务执行或降级等操作。

相关文章:

Kafka 深入服务端 — 时间轮

Kafka中存在大量的延迟操作&#xff0c;比如延时生产、延时拉取和延时删除等。Kafka基于时间轮概念自定义实现了一个用于延时功能的定时器&#xff0c;来完成这些延迟操作。 1 时间轮 Kafka没有使用基于JDK自带的Timer或DelayQueue来实现延迟功能&#xff0c;因为它们的插入和…...

网络爬虫学习:应用selenium获取Edge浏览器版本号,自动下载对应版本msedgedriver,确保Edge浏览器顺利打开。

一、前言 我从24年11月份开始学习网络爬虫应用开发&#xff0c;经过2个来月的努力&#xff0c;于1月下旬完成了开发一款网络爬虫软件的学习目标。这里对本次学习及应用开发进行一下回顾总结。 前几天我已经发了一篇日志&#xff08;网络爬虫学习&#xff1a;应用selenium从搜…...

【go语言】结构体

一、type 关键字的用法 在 go 语言中&#xff0c;type 关键字用于定义新的类型&#xff0c;他可以用来定义基础类型、结构体类型、接口类型、函数类型等。通过 type 关键字&#xff0c;我们可以为现有类型创建新的类型别名或者自定义新的类型。 1.1 类型别名 使用 type 可以为…...

Spring Boot是什么及其优点

简介 Spring Boot是基于Spring框架开发的全新框架&#xff0c;其设计目的是简化Spring应用的初始化搭建和开发过程。 Spring Boot整合了许多框架和第三方库配置&#xff0c;几乎可以达到“开箱即用”。 优点 可快速构建独立的Spring应用。 直接嵌入Tomcat、Jetty和Underto…...

谷氨酸:大脑功能的多面手

标题&#xff1a;谷氨酸&#xff1a;大脑功能的多面手 文章信息摘要&#xff1a; 谷氨酸是大脑中最主要的兴奋性神经递质&#xff0c;参与了90%以上的神经元激活&#xff0c;在蛋白质合成、味觉&#xff08;鲜味&#xff09;以及神经可塑性中发挥重要作用。它与GABA、多巴胺等…...

SpringCloudGateWay和Sentinel结合做黑白名单来源控制

假设我们的分布式项目&#xff0c;admin是8087&#xff0c;gateway是8088&#xff0c;consumer是8086 我们一般的思路是我们的请求必须经过我们的网关8088然后网关转发到我们的分布式项目&#xff0c;那我要是没有处理我们绕过网关直接访问项目8087和8086不也是可以&#xff1…...

HTML新春烟花

系列文章 序号目录1HTML满屏跳动的爱心&#xff08;可写字&#xff09;2HTML五彩缤纷的爱心3HTML满屏漂浮爱心4HTML情人节快乐5HTML蓝色爱心射线6HTML跳动的爱心&#xff08;简易版&#xff09;7HTML粒子爱心8HTML蓝色动态爱心9HTML跳动的爱心&#xff08;双心版&#xff09;10…...

【Elasticsearch】中数据流需要配置索引模板吗?

是的&#xff0c;数据流需要配置索引模板。在Elasticsearch中&#xff0c;数据流&#xff08;Data Streams&#xff09;是一种用于处理时间序列数据的高级结构&#xff0c;它背后由多个隐藏的索引组成&#xff0c;这些索引被称为后备索引&#xff08;Backing Indices&#xff0…...

Git进阶之旅:Git 配置信息 Config

Git 配置级别&#xff1a; 仓库级别&#xff1a;local [ 优先级最高 ]用户级别&#xff1a;global [ 优先级次之 ]系统级别&#xff1a;system [ 优先级最低 ] 配置文件位置&#xff1a; git 仓库级别对应的配置文件是当前仓库下的 .git/configgit 用户级别对应的配置文件时用…...

buu-pwn1_sctf_2016-好久不见29

这个也是栈溢出&#xff0c;不一样的点是&#xff0c;有replace替换&#xff0c;要输入0x3c字符&#xff08;60&#xff09;&#xff0c;Iyou 所以&#xff0c;20个I就行&#xff0c;找后面函数 输出提示信息&#xff0c;要求用户输入关于自己的信息。 使用fgets函数从标准输入…...

ES2021+新特性、常用函数

一、ES2021新特性 ES2021 数字分隔符 let num 1234567 let num2 1_234_567 Promise.any 与 Promise.all 类似&#xff0c;Promise.any 也接受一个 Promise 的数组。当其中任何一个 Promise 完成&#xff08;fullfill&#xff09;时&#xff0c;就返回那个已经有完成值的 …...

STM32——LCD

一、引脚配置 查看引脚 将上述引脚都设置为GPIO_Output 二、导入驱动文件 将 LCD 驱动的 Inc 以及 Src 中的 fonts.h,lcd.h 和 lcd.c 导入到自己工程的驱动文件中。 当然&#xff0c;后面 lcd 的驱动学习可以和 IMX6U 一块学。 三、LCD函数 void LCD_Clear(u16 Color); 功能…...

【redis进阶】分布式锁

目录 一、什么是分布式锁 二、分布式锁的基础实现 三、引入过期时间 四、引入校验 id 五、引入lua 六、引入 watch dog (看门狗) 七、引入 Redlock 算法 八、其他功能 redis学习&#x1f973; 一、什么是分布式锁 在一个分布式的系统中&#xff0c;也会涉及到多个节点访问同一…...

园区管理系统如何提升企业核心竞争力与资产管理智能化水平

内容概要 在当今快节奏的商业环境中&#xff0c;园区管理系统正成为企业的重要合作伙伴&#xff0c;尤其在工业园、产业园、物流园、写字楼和公寓等多种类型的物业管理中。这个系统不仅仅是一个管理工具&#xff0c;它还是提升企业运营效率和核心竞争力的关键因素。通过智能化…...

AI大模型开发原理篇-3:词向量和词嵌入

简介 词向量是用于表示单词意义的向量&#xff0c; 并且还可以被认为是单词的特征向量或表示。 将单词映射到实向量的技术称为词嵌入。在实际应用中&#xff0c;词向量和词嵌入这两个重要的NLP术语通常可以互换使用。它们都表示将词汇表中的单词映射到固定大小的连续向量空间中…...

高精度算法:高精度减法

P2142 高精度减法 - 洛谷 | 计算机科学教育新生态 我们两个整数一定要是大数减去小数&#xff0c;所以这个点我们需要特判一下&#xff0c;那我们两个字符串表示的整型怎么判断大小呢&#xff0c;我们字典序比较大小和真实的数字比较大小是一样的&#xff0c;比如我们的‘21’…...

Java创建项目准备工作

新建项目 新建空项目 每一个空项目创建好后都要检查jdk版本 检查SDK和语言级别——Apply——OK 检查当前项目的Maven路径&#xff0c;如果已经配置好全局&#xff0c;就是正确路径不用管 修改项目字符集编码&#xff0c;将所有编码都调整为UTF-8 创建Spingboot工程 创建Spring…...

基于STM32的智能宠物喂食器设计

目录 引言系统设计 硬件设计软件设计 系统功能模块 定时喂食模块远程控制与视频监控模块食物存量检测与报警模块语音互动与用户交互模块数据记录与智能分析模块 控制算法 定时与手动投喂算法食物存量检测与低存量提醒算法数据记录与远程反馈算法 代码实现 喂食控制代码存量检测…...

在线课堂小程序设计与实现(LW+源码+讲解)

专注于大学生项目实战开发,讲解,毕业答疑辅导&#xff0c;欢迎高校老师/同行前辈交流合作✌。 技术范围&#xff1a;SpringBoot、Vue、SSM、HLMT、小程序、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容&#xff1a;…...

为AI聊天工具添加一个知识系统 之77 详细设计之18 正则表达式 之5

本文要点 昨天讨论了 本项目&#xff08;AI聊天工具添加一个知识系统&#xff09;中正则表达式模板的设计中可能要考虑到的一些问题&#xff08;讨论到的内容比较随意&#xff0c;暂时无法确定 那些考虑 是否 应该是正则表达式模板设计要考虑的以及 是否完整&#xff09;。今天…...

深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录

ASP.NET Core 是一个跨平台的开源框架&#xff0c;用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录&#xff0c;以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...

React 第五十五节 Router 中 useAsyncError的使用详解

前言 useAsyncError 是 React Router v6.4 引入的一个钩子&#xff0c;用于处理异步操作&#xff08;如数据加载&#xff09;中的错误。下面我将详细解释其用途并提供代码示例。 一、useAsyncError 用途 处理异步错误&#xff1a;捕获在 loader 或 action 中发生的异步错误替…...

【力扣数据库知识手册笔记】索引

索引 索引的优缺点 优点1. 通过创建唯一性索引&#xff0c;可以保证数据库表中每一行数据的唯一性。2. 可以加快数据的检索速度&#xff08;创建索引的主要原因&#xff09;。3. 可以加速表和表之间的连接&#xff0c;实现数据的参考完整性。4. 可以在查询过程中&#xff0c;…...

深入浅出:JavaScript 中的 `window.crypto.getRandomValues()` 方法

深入浅出&#xff1a;JavaScript 中的 window.crypto.getRandomValues() 方法 在现代 Web 开发中&#xff0c;随机数的生成看似简单&#xff0c;却隐藏着许多玄机。无论是生成密码、加密密钥&#xff0c;还是创建安全令牌&#xff0c;随机数的质量直接关系到系统的安全性。Jav…...

Python实现prophet 理论及参数优化

文章目录 Prophet理论及模型参数介绍Python代码完整实现prophet 添加外部数据进行模型优化 之前初步学习prophet的时候&#xff0c;写过一篇简单实现&#xff0c;后期随着对该模型的深入研究&#xff0c;本次记录涉及到prophet 的公式以及参数调优&#xff0c;从公式可以更直观…...

第25节 Node.js 断言测试

Node.js的assert模块主要用于编写程序的单元测试时使用&#xff0c;通过断言可以提早发现和排查出错误。 稳定性: 5 - 锁定 这个模块可用于应用的单元测试&#xff0c;通过 require(assert) 可以使用这个模块。 assert.fail(actual, expected, message, operator) 使用参数…...

镜像里切换为普通用户

如果你登录远程虚拟机默认就是 root 用户&#xff0c;但你不希望用 root 权限运行 ns-3&#xff08;这是对的&#xff0c;ns3 工具会拒绝 root&#xff09;&#xff0c;你可以按以下方法创建一个 非 root 用户账号 并切换到它运行 ns-3。 一次性解决方案&#xff1a;创建非 roo…...

如何在网页里填写 PDF 表格?

有时候&#xff0c;你可能希望用户能在你的网站上填写 PDF 表单。然而&#xff0c;这件事并不简单&#xff0c;因为 PDF 并不是一种原生的网页格式。虽然浏览器可以显示 PDF 文件&#xff0c;但原生并不支持编辑或填写它们。更糟的是&#xff0c;如果你想收集表单数据&#xff…...

九天毕昇深度学习平台 | 如何安装库?

pip install 库名 -i https://pypi.tuna.tsinghua.edu.cn/simple --user 举个例子&#xff1a; 报错 ModuleNotFoundError: No module named torch 那么我需要安装 torch pip install torch -i https://pypi.tuna.tsinghua.edu.cn/simple --user pip install 库名&#x…...

LangChain知识库管理后端接口:数据库操作详解—— 构建本地知识库系统的基础《二》

这段 Python 代码是一个完整的 知识库数据库操作模块&#xff0c;用于对本地知识库系统中的知识库进行增删改查&#xff08;CRUD&#xff09;操作。它基于 SQLAlchemy ORM 框架 和一个自定义的装饰器 with_session 实现数据库会话管理。 &#x1f4d8; 一、整体功能概述 该模块…...