Kafka源码简要分析
目录
一、生产者的初始化流程
二、生产者到缓冲队列的流程
三、Sender拉取数据到Kafka流程
四、消费者初始化
五、主题订阅原理
六、消费者抓取数据原理
七、消费者组初始化
八、消费者组消费流程
九、提交offset原理
一、生产者的初始化流程
- 首先获取事务id和客户端id(用到事物必须要事物id不然报错,每个生产者都需要唯一标识客户端id)
- 监控kafka相关情况的JmxReporter配置
- 然后获取分区器,如果用户有自定义的就读取配置的,如果没有配置就用默认分区器
- 然后key和value进行序列化
- 然后就读取自定义拦截器,可以定义多个拦截器,组成拦截器链
- 然后初始化控制单条日志的大小,默认是1m;缓冲区大小,默认32m;
- 创建内存池,缓存队列,初始化批次大小默认16k,压缩相关处理,默认是none,重试间隔时间默认100ms
- 连接kafka集群,获取元数据,才能知道要发送到哪个分区
- 创建sender线程,会有个创建sender的方法,sender线程负责拉取缓冲队列消息到Kafka,在方法里面会定义缓存请求的个数默认5个,然后请求超时的时间,然后创建一个网络请求客户端对象,会传入刚刚的参数还有客户端id,重试时间,发送缓冲区的大小128和接受缓冲区的大小32,还有acks等配置。sender继承了Runnbale接口,然后会new个sender线程出来用上面这些参数,然后返回。
- sender放到后台,启动sender线程
二、生产者到缓冲队列的流程
- 在执行到拦截器的时候就要调用一个onSend方法,如果有多个拦截器,每个拦截器都会走一次这个方法,这个方法就是拦截器对数据加工的
- 然后获取元数据,要根据主题的分区放到对应的缓存队列
- 序列化相关操作key和value的序列化和压缩
- 分区操作,如果指定了分区,直接分配到指定分区;没有指定就会根据分区器进行分配,没有指定key就会粘性分区处理(如果批次大小和活着时间到了不然就一直是那个,满足才能创建新队列用),如果指定key就根据key到hashcode进分区数取模,
- 保证(序列化和压缩后)数据大小能够传输,他去读取配置的消息最大值和缓冲区大小,如果有超过的抛异常
- 向缓存队列里面追加数据,获取或者创建一个队列按照分区,然后尝试添加数据(一般不成功,因为还没申请内存),然后根据16k和现在压缩后的总大小取最大值,申请内存就申请这个大小,内存池分配内存,然后sender线程拿走就了会释放内存。
- 如果批次大小满了或者有了新的批次需要创建,就唤醒sender线程把缓冲队列的数据拉取过去。
三、Sender拉取数据到Kafka流程
- 事务相关操作
- 获取元数据信息,为了知道发到哪个分区
- 判断32m缓存是否准备好,先获取队列的信息,先判断内存队列有没有数据
- 判断leader是不是空如果没有目标那还是会抛出异常,如果批次大小或时间满足一个条件,就会发送。
- 把所有请求按照节点为单位来发送请求,这样一台机器只需要建立一次连接
- 封装了个request然后通过网络客户端把数据发送过去
- 然后服务端还是通过网络客户端获取结果
四、消费者初始化
- 消费者组平衡
- 获取消费者组id和客户端id
- 设置请求服务端等待时间,默认30秒;重试时间,默认100毫秒
- 拦截器链相关处理
- key和value的反序列化
- 判断offset从什么位置开始消费
- 获取消费者元数据(重试时间、是否允许访问系统主题默认false,是否允许自动创建topic主题默认true)
- 连接Kafka集群
- 创建网络客户端对象(连接重试时间默认50ms,最大重试时间1s,发送缓冲区128kb和接受缓冲区64kb大小)
- 指定消费者分区分配策略
- 创建coordinator对象
- 设置自动提交offset时间,默认5s,配置抓取数据的参数(最少抓取多少最大一次抓取多少等)
五、主题订阅原理
- 传入要订阅的主题,如果为null直接抛出异常
- 注册负载均衡监听器,如果消费者组中有节点挂了,要通知其他消费者
- 按照主题自动订阅进行分配
六、消费者抓取数据原理
- 他首先先初始化消费者组和队列
- 然后回调消息会到缓冲队列,然后去队列抓取数据,最多一次500条
- 然后抓取后拦截器开始处理数据
七、消费者组初始化
- 先判断coordinator不为null那就说明为消费者组
- 如果没有指定分区分配策略会抛出异常
- 判断coordinator是否准备好,他会循环创建查找coordinator的请求并发送,并获取服务器返回到结果
他这整个消费者组初始化就是判断coordinator有没有准备好
八、消费者组消费流程
- 他会用判断coordinator是不是空,是的话就等待
- 他上来先去队列拉取数据,一般是拉取不到的
- 他先构造请求的入参(最少一次抓多少,最多抓多少,超时时间等待)然后调用send
- 他送后返回future,通过回调获取数据的
- 他会循环遍历数据获取分区,获取分区的数据,如果有数据就放到消息队列里面
- 然后就调用从队列拉取数据的方法拉取,然后他有大小限制最大500,他会循环一波一波拉取过去
- 然后放到拦截器走加工操作
九、提交offset原理
- 同步提交:找到coordinator然后调用commitOffset进行发送,然后不停dowhile循环,调用发送提交请求,然后等待回调获取结果,一直循环到成功为止。
- 异步提交:他还是用coordinator去提交但是他不等待结果,他new了个监听等待结果。
相关文章:
Kafka源码简要分析
目录 一、生产者的初始化流程 二、生产者到缓冲队列的流程 三、Sender拉取数据到Kafka流程 四、消费者初始化 五、主题订阅原理 六、消费者抓取数据原理 七、消费者组初始化 八、消费者组消费流程 九、提交offset原理 一、生产者的初始化流程 首先获取事务id和客户端…...
react 按住ctrl键,点击时会出现菜单的问题修复
问题描述:我需要按住crtl键,然后鼠标点击后做一些逻辑操作,但是出现如下问题 问题一:按住ctrl键后,点击时不触发click事件,只触发 mousedown和mouseup事件。 问题二:按住ctrl键点击时出现菜单…...

【虚拟机栈】
文章目录 1. 虚拟机栈概述2. 局部变量表(Local Variables)3. 操作数栈4. 动态链接4.1 方法的调用:解析与分配 5. 方法返回地址6. 栈的相关面试题 1. 虚拟机栈概述 每个线程在创建时都会创建一个虚拟机栈,其内部保存一个个的栈帧(Stack Frame…...

Linux系列讲解 —— 【fsck】检查并修复Linux文件系统
当文件系统出现损坏时,例如文件无法查看,删除等,可以使用 fsck(File System Consistency Check)进行修复。但是需要注意fsck在修复时,如果检查出某个文件有问题,可能会向用户请求删除。所以&…...
gitlab突然提示我要输入密码了。
用了很长时间的一个gitlab库,今天提交代码的时候突然提示我输入密码了,并且用户还是gitxx.xx.xx.xx的,瞬间懵逼。 想想原因,可能是因为我不久前设置了本地对另外一个git库的远程访问,用的是ssh,操作过程中可…...
业务测试常见问题(一)
如何多维度的分析一个需求? 功能维度:需求中所描述的功能是否实现,与用户的需求是否一致,是否完整符合用户的需求等。 安全性维度:是否有安全漏洞,是否存在未授权访问漏洞等,以保证系统的安全性…...

IntelliJ IDEA失焦自动重启服务的解决方法
IDEA 热部署特性 热部署,即应用正属于运行状态时,我们对应用源码进行了修改更新,在不重新启动应用的情况下,可以能够自动的把更新的内容重新进行编译并部署到服务器上,使修改立即生效。 现象 在使用 IntelliJ IDEA运…...
终端准入控制系统,保障企业内网安全的关键防线
随着网络技术的不断发展,企业面临的安全威胁也越来越多。终端作为承载企业业务的媒介,对内网资产安全有着重要影响。确保内网终端(如PC、BYOD、IoT等)能够得到统一管理,对保护内网安全很有必要。终端准入控制作为一种有…...

mysql-执行计划
1. 执行计划表概述 id相同表示加载表的顺序是从上到下。 id不同id值越大,优先级越高,越先被执行。id有相同,也有不同,同时存在。 id相同的可以认为是一组,从上往下顺序执行;在所有的组中,id的值…...

金蝶云星空和旺店通·企业奇门接口打通对接实战
金蝶云星空和旺店通企业奇门接口打通对接实战 接入系统:金蝶云星空 金蝶K/3Cloud(金蝶云星空)是移动互联网时代的新型ERP,是基于WEB2.0与云技术的新时代企业管理服务平台。金蝶K/3Cloud围绕着“生态、人人、体验”,旨在…...
在服务器上使用nginx改变前端项目请求的url
location /app-dev {rewrite ^/app-dev/(.*) /$1 break;proxy_pass http://152.136.36.251:9999;proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr; } location /请求后缀 { rewrite ^/app-dev/(.*) /$1 break; proxy_pass 想要的请求后端的url; …...
【学习笔记】莫比乌斯反演
退役OIer回来受虐啦 一些定义 μ ( x ) { 1 x > 1 ( − 1 ) n x ∏ i 1 n P i 0 o t h e r w i s e \mu(x) \begin{cases} 1 & x > 1 \\ (-1)^n & x \prod _ {i1} ^ {n} P_{i}\\ 0 & otherwise \end{cases} μ(x)⎩ ⎨ ⎧1(−1)n0x>1x∏i1nPi…...

一款构建Python命令行应用的开源库
1 简介 当我们编写 Python 程序时,我们经常需要与用户进行交互,接收输入并输出结果。Python 提供了许多方法来实现这一点,其中一个非常方便的方法是使用 typer 库。typer 是一个用于构建命令行应用程序的 Python 库,它使得创建命令…...

10-Node.js模块化
01.模块化简介 目标 了解模块化概念和好处,以及 CommonJS 标准语法导出和导入 讲解 在 Node.js 中每个文件都被当做是一个独立的模块,模块内定义的变量和函数都是独立作用域的,因为 Node.js 在执行模块代码时,将使用如下所示的…...

数字IC前端学习笔记:数字乘法器的优化设计(Dadda Tree乘法器)
相关阅读 数字IC前端https://blog.csdn.net/weixin_45791458/category_12173698.html?spm1001.2014.3001.5482 华莱士树仍然是一种比较规则的结构(这使得可以方便地生成树的结构),这导致了它所使用的全加器和半加器个数不是最少的ÿ…...

计算机专业毕业设计项目推荐14-文档编辑平台(SpringBoot+Vue+Mysql)
文档编辑平台(SpringBootVueMysql) **介绍****各部分模块实现** 介绍 本系列(后期可能博主会统一为专栏)博文献给即将毕业的计算机专业同学们,因为博主自身本科和硕士也是科班出生,所以也比较了解计算机专业的毕业设计流程以及模式,在编写的…...
【读书后台管理系统】—后端框架搭建(二)
【读书后台管理系统】—后端框架搭建(二) 一、 Node 简介 Node 是一个基于 V8 引擎的 Javascript 运行环境,它使得 Javascript 可以运行在服务端,直接与操作系统进行交互,与文件控制、网络交互、进程控制等 Chrome …...

【DLoopDetector(C++)】DBow2词袋模型loop close学习
0.前言 最近读了两篇论文,论文作者开源了一种基于词袋模型DBoW2库的DLoopDetector算法,自己运行demo测试一下 对应论文介绍:Bags of Binary Words for Fast Place Recognition in Image Sequences 开源项目Github地址:https://gi…...

什么是CAS机制?
CAS和Synchronized的区别是什么?适合什么样的场景?有什么样的优点和缺点? 示例程序:启动两个线程,每个线程中让静态变量count循环累加100次。 public class ThreadTest {private static int count 0;public static …...
Java多态详解
下面讲解一下Java中的多态机制,力求用最通俗易懂的语言,最精炼的话语,最生动的例子,深入浅出Java多态,帮助读者轻松掌握这个知识点。 什么是多态? 多态是指同一种行为具有多个不同表现形式的能力。 多态…...
云计算——弹性云计算器(ECS)
弹性云服务器:ECS 概述 云计算重构了ICT系统,云计算平台厂商推出使得厂家能够主要关注应用管理而非平台管理的云平台,包含如下主要概念。 ECS(Elastic Cloud Server):即弹性云服务器,是云计算…...

【WiFi帧结构】
文章目录 帧结构MAC头部管理帧 帧结构 Wi-Fi的帧分为三部分组成:MAC头部frame bodyFCS,其中MAC是固定格式的,frame body是可变长度。 MAC头部有frame control,duration,address1,address2,addre…...

Python:操作 Excel 折叠
💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 Python 操作 Excel 系列 读取单元格数据按行写入设置行高和列宽自动调整行高和列宽水平…...

MongoDB学习和应用(高效的非关系型数据库)
一丶 MongoDB简介 对于社交类软件的功能,我们需要对它的功能特点进行分析: 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具: mysql:关系型数据库&am…...
Java多线程实现之Callable接口深度解析
Java多线程实现之Callable接口深度解析 一、Callable接口概述1.1 接口定义1.2 与Runnable接口的对比1.3 Future接口与FutureTask类 二、Callable接口的基本使用方法2.1 传统方式实现Callable接口2.2 使用Lambda表达式简化Callable实现2.3 使用FutureTask类执行Callable任务 三、…...
Java 加密常用的各种算法及其选择
在数字化时代,数据安全至关重要,Java 作为广泛应用的编程语言,提供了丰富的加密算法来保障数据的保密性、完整性和真实性。了解这些常用加密算法及其适用场景,有助于开发者在不同的业务需求中做出正确的选择。 一、对称加密算法…...
vue3 定时器-定义全局方法 vue+ts
1.创建ts文件 路径:src/utils/timer.ts 完整代码: import { onUnmounted } from vuetype TimerCallback (...args: any[]) > voidexport function useGlobalTimer() {const timers: Map<number, NodeJS.Timeout> new Map()// 创建定时器con…...
Rust 异步编程
Rust 异步编程 引言 Rust 是一种系统编程语言,以其高性能、安全性以及零成本抽象而著称。在多核处理器成为主流的今天,异步编程成为了一种提高应用性能、优化资源利用的有效手段。本文将深入探讨 Rust 异步编程的核心概念、常用库以及最佳实践。 异步编程基础 什么是异步…...
Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信
文章目录 Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信前言一、网络通信基础概念二、服务端与客户端的完整流程图解三、每一步的详细讲解和代码示例1. 创建Socket(服务端和客户端都要)2. 绑定本地地址和端口&#x…...

中医有效性探讨
文章目录 西医是如何发展到以生物化学为药理基础的现代医学?传统医学奠基期(远古 - 17 世纪)近代医学转型期(17 世纪 - 19 世纪末)现代医学成熟期(20世纪至今) 中医的源远流长和一脉相承远古至…...