Kafka源码简要分析
目录
一、生产者的初始化流程
二、生产者到缓冲队列的流程
三、Sender拉取数据到Kafka流程
四、消费者初始化
五、主题订阅原理
六、消费者抓取数据原理
七、消费者组初始化
八、消费者组消费流程
九、提交offset原理
一、生产者的初始化流程
- 首先获取事务id和客户端id(用到事物必须要事物id不然报错,每个生产者都需要唯一标识客户端id)
- 监控kafka相关情况的JmxReporter配置
- 然后获取分区器,如果用户有自定义的就读取配置的,如果没有配置就用默认分区器
- 然后key和value进行序列化
- 然后就读取自定义拦截器,可以定义多个拦截器,组成拦截器链
- 然后初始化控制单条日志的大小,默认是1m;缓冲区大小,默认32m;
- 创建内存池,缓存队列,初始化批次大小默认16k,压缩相关处理,默认是none,重试间隔时间默认100ms
- 连接kafka集群,获取元数据,才能知道要发送到哪个分区
- 创建sender线程,会有个创建sender的方法,sender线程负责拉取缓冲队列消息到Kafka,在方法里面会定义缓存请求的个数默认5个,然后请求超时的时间,然后创建一个网络请求客户端对象,会传入刚刚的参数还有客户端id,重试时间,发送缓冲区的大小128和接受缓冲区的大小32,还有acks等配置。sender继承了Runnbale接口,然后会new个sender线程出来用上面这些参数,然后返回。
- sender放到后台,启动sender线程
二、生产者到缓冲队列的流程
- 在执行到拦截器的时候就要调用一个onSend方法,如果有多个拦截器,每个拦截器都会走一次这个方法,这个方法就是拦截器对数据加工的
- 然后获取元数据,要根据主题的分区放到对应的缓存队列
- 序列化相关操作key和value的序列化和压缩
- 分区操作,如果指定了分区,直接分配到指定分区;没有指定就会根据分区器进行分配,没有指定key就会粘性分区处理(如果批次大小和活着时间到了不然就一直是那个,满足才能创建新队列用),如果指定key就根据key到hashcode进分区数取模,
- 保证(序列化和压缩后)数据大小能够传输,他去读取配置的消息最大值和缓冲区大小,如果有超过的抛异常
- 向缓存队列里面追加数据,获取或者创建一个队列按照分区,然后尝试添加数据(一般不成功,因为还没申请内存),然后根据16k和现在压缩后的总大小取最大值,申请内存就申请这个大小,内存池分配内存,然后sender线程拿走就了会释放内存。
- 如果批次大小满了或者有了新的批次需要创建,就唤醒sender线程把缓冲队列的数据拉取过去。
三、Sender拉取数据到Kafka流程
- 事务相关操作
- 获取元数据信息,为了知道发到哪个分区
- 判断32m缓存是否准备好,先获取队列的信息,先判断内存队列有没有数据
- 判断leader是不是空如果没有目标那还是会抛出异常,如果批次大小或时间满足一个条件,就会发送。
- 把所有请求按照节点为单位来发送请求,这样一台机器只需要建立一次连接
- 封装了个request然后通过网络客户端把数据发送过去
- 然后服务端还是通过网络客户端获取结果
四、消费者初始化
- 消费者组平衡
- 获取消费者组id和客户端id
- 设置请求服务端等待时间,默认30秒;重试时间,默认100毫秒
- 拦截器链相关处理
- key和value的反序列化
- 判断offset从什么位置开始消费
- 获取消费者元数据(重试时间、是否允许访问系统主题默认false,是否允许自动创建topic主题默认true)
- 连接Kafka集群
- 创建网络客户端对象(连接重试时间默认50ms,最大重试时间1s,发送缓冲区128kb和接受缓冲区64kb大小)
- 指定消费者分区分配策略
- 创建coordinator对象
- 设置自动提交offset时间,默认5s,配置抓取数据的参数(最少抓取多少最大一次抓取多少等)
五、主题订阅原理
- 传入要订阅的主题,如果为null直接抛出异常
- 注册负载均衡监听器,如果消费者组中有节点挂了,要通知其他消费者
- 按照主题自动订阅进行分配
六、消费者抓取数据原理
- 他首先先初始化消费者组和队列
- 然后回调消息会到缓冲队列,然后去队列抓取数据,最多一次500条
- 然后抓取后拦截器开始处理数据
七、消费者组初始化
- 先判断coordinator不为null那就说明为消费者组
- 如果没有指定分区分配策略会抛出异常
- 判断coordinator是否准备好,他会循环创建查找coordinator的请求并发送,并获取服务器返回到结果
他这整个消费者组初始化就是判断coordinator有没有准备好
八、消费者组消费流程
- 他会用判断coordinator是不是空,是的话就等待
- 他上来先去队列拉取数据,一般是拉取不到的
- 他先构造请求的入参(最少一次抓多少,最多抓多少,超时时间等待)然后调用send
- 他送后返回future,通过回调获取数据的
- 他会循环遍历数据获取分区,获取分区的数据,如果有数据就放到消息队列里面
- 然后就调用从队列拉取数据的方法拉取,然后他有大小限制最大500,他会循环一波一波拉取过去
- 然后放到拦截器走加工操作
九、提交offset原理
- 同步提交:找到coordinator然后调用commitOffset进行发送,然后不停dowhile循环,调用发送提交请求,然后等待回调获取结果,一直循环到成功为止。
- 异步提交:他还是用coordinator去提交但是他不等待结果,他new了个监听等待结果。
相关文章:
Kafka源码简要分析
目录 一、生产者的初始化流程 二、生产者到缓冲队列的流程 三、Sender拉取数据到Kafka流程 四、消费者初始化 五、主题订阅原理 六、消费者抓取数据原理 七、消费者组初始化 八、消费者组消费流程 九、提交offset原理 一、生产者的初始化流程 首先获取事务id和客户端…...
react 按住ctrl键,点击时会出现菜单的问题修复
问题描述:我需要按住crtl键,然后鼠标点击后做一些逻辑操作,但是出现如下问题 问题一:按住ctrl键后,点击时不触发click事件,只触发 mousedown和mouseup事件。 问题二:按住ctrl键点击时出现菜单…...
【虚拟机栈】
文章目录 1. 虚拟机栈概述2. 局部变量表(Local Variables)3. 操作数栈4. 动态链接4.1 方法的调用:解析与分配 5. 方法返回地址6. 栈的相关面试题 1. 虚拟机栈概述 每个线程在创建时都会创建一个虚拟机栈,其内部保存一个个的栈帧(Stack Frame…...
Linux系列讲解 —— 【fsck】检查并修复Linux文件系统
当文件系统出现损坏时,例如文件无法查看,删除等,可以使用 fsck(File System Consistency Check)进行修复。但是需要注意fsck在修复时,如果检查出某个文件有问题,可能会向用户请求删除。所以&…...
gitlab突然提示我要输入密码了。
用了很长时间的一个gitlab库,今天提交代码的时候突然提示我输入密码了,并且用户还是gitxx.xx.xx.xx的,瞬间懵逼。 想想原因,可能是因为我不久前设置了本地对另外一个git库的远程访问,用的是ssh,操作过程中可…...
业务测试常见问题(一)
如何多维度的分析一个需求? 功能维度:需求中所描述的功能是否实现,与用户的需求是否一致,是否完整符合用户的需求等。 安全性维度:是否有安全漏洞,是否存在未授权访问漏洞等,以保证系统的安全性…...
IntelliJ IDEA失焦自动重启服务的解决方法
IDEA 热部署特性 热部署,即应用正属于运行状态时,我们对应用源码进行了修改更新,在不重新启动应用的情况下,可以能够自动的把更新的内容重新进行编译并部署到服务器上,使修改立即生效。 现象 在使用 IntelliJ IDEA运…...
终端准入控制系统,保障企业内网安全的关键防线
随着网络技术的不断发展,企业面临的安全威胁也越来越多。终端作为承载企业业务的媒介,对内网资产安全有着重要影响。确保内网终端(如PC、BYOD、IoT等)能够得到统一管理,对保护内网安全很有必要。终端准入控制作为一种有…...
mysql-执行计划
1. 执行计划表概述 id相同表示加载表的顺序是从上到下。 id不同id值越大,优先级越高,越先被执行。id有相同,也有不同,同时存在。 id相同的可以认为是一组,从上往下顺序执行;在所有的组中,id的值…...
金蝶云星空和旺店通·企业奇门接口打通对接实战
金蝶云星空和旺店通企业奇门接口打通对接实战 接入系统:金蝶云星空 金蝶K/3Cloud(金蝶云星空)是移动互联网时代的新型ERP,是基于WEB2.0与云技术的新时代企业管理服务平台。金蝶K/3Cloud围绕着“生态、人人、体验”,旨在…...
在服务器上使用nginx改变前端项目请求的url
location /app-dev {rewrite ^/app-dev/(.*) /$1 break;proxy_pass http://152.136.36.251:9999;proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr; } location /请求后缀 { rewrite ^/app-dev/(.*) /$1 break; proxy_pass 想要的请求后端的url; …...
【学习笔记】莫比乌斯反演
退役OIer回来受虐啦 一些定义 μ ( x ) { 1 x > 1 ( − 1 ) n x ∏ i 1 n P i 0 o t h e r w i s e \mu(x) \begin{cases} 1 & x > 1 \\ (-1)^n & x \prod _ {i1} ^ {n} P_{i}\\ 0 & otherwise \end{cases} μ(x)⎩ ⎨ ⎧1(−1)n0x>1x∏i1nPi…...
一款构建Python命令行应用的开源库
1 简介 当我们编写 Python 程序时,我们经常需要与用户进行交互,接收输入并输出结果。Python 提供了许多方法来实现这一点,其中一个非常方便的方法是使用 typer 库。typer 是一个用于构建命令行应用程序的 Python 库,它使得创建命令…...
10-Node.js模块化
01.模块化简介 目标 了解模块化概念和好处,以及 CommonJS 标准语法导出和导入 讲解 在 Node.js 中每个文件都被当做是一个独立的模块,模块内定义的变量和函数都是独立作用域的,因为 Node.js 在执行模块代码时,将使用如下所示的…...
数字IC前端学习笔记:数字乘法器的优化设计(Dadda Tree乘法器)
相关阅读 数字IC前端https://blog.csdn.net/weixin_45791458/category_12173698.html?spm1001.2014.3001.5482 华莱士树仍然是一种比较规则的结构(这使得可以方便地生成树的结构),这导致了它所使用的全加器和半加器个数不是最少的ÿ…...
计算机专业毕业设计项目推荐14-文档编辑平台(SpringBoot+Vue+Mysql)
文档编辑平台(SpringBootVueMysql) **介绍****各部分模块实现** 介绍 本系列(后期可能博主会统一为专栏)博文献给即将毕业的计算机专业同学们,因为博主自身本科和硕士也是科班出生,所以也比较了解计算机专业的毕业设计流程以及模式,在编写的…...
【读书后台管理系统】—后端框架搭建(二)
【读书后台管理系统】—后端框架搭建(二) 一、 Node 简介 Node 是一个基于 V8 引擎的 Javascript 运行环境,它使得 Javascript 可以运行在服务端,直接与操作系统进行交互,与文件控制、网络交互、进程控制等 Chrome …...
【DLoopDetector(C++)】DBow2词袋模型loop close学习
0.前言 最近读了两篇论文,论文作者开源了一种基于词袋模型DBoW2库的DLoopDetector算法,自己运行demo测试一下 对应论文介绍:Bags of Binary Words for Fast Place Recognition in Image Sequences 开源项目Github地址:https://gi…...
什么是CAS机制?
CAS和Synchronized的区别是什么?适合什么样的场景?有什么样的优点和缺点? 示例程序:启动两个线程,每个线程中让静态变量count循环累加100次。 public class ThreadTest {private static int count 0;public static …...
Java多态详解
下面讲解一下Java中的多态机制,力求用最通俗易懂的语言,最精炼的话语,最生动的例子,深入浅出Java多态,帮助读者轻松掌握这个知识点。 什么是多态? 多态是指同一种行为具有多个不同表现形式的能力。 多态…...
XML Group端口详解
在XML数据映射过程中,经常需要对数据进行分组聚合操作。例如,当处理包含多个物料明细的XML文件时,可能需要将相同物料号的明细归为一组,或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码,增加了开…...
装饰模式(Decorator Pattern)重构java邮件发奖系统实战
前言 现在我们有个如下的需求,设计一个邮件发奖的小系统, 需求 1.数据验证 → 2. 敏感信息加密 → 3. 日志记录 → 4. 实际发送邮件 装饰器模式(Decorator Pattern)允许向一个现有的对象添加新的功能,同时又不改变其…...
Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动
一、前言说明 在2011版本的gb28181协议中,拉取视频流只要求udp方式,从2016开始要求新增支持tcp被动和tcp主动两种方式,udp理论上会丢包的,所以实际使用过程可能会出现画面花屏的情况,而tcp肯定不丢包,起码…...
LeetCode - 394. 字符串解码
题目 394. 字符串解码 - 力扣(LeetCode) 思路 使用两个栈:一个存储重复次数,一个存储字符串 遍历输入字符串: 数字处理:遇到数字时,累积计算重复次数左括号处理:保存当前状态&a…...
鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院挂号小程序
一、开发准备 环境搭建: 安装DevEco Studio 3.0或更高版本配置HarmonyOS SDK申请开发者账号 项目创建: File > New > Create Project > Application (选择"Empty Ability") 二、核心功能实现 1. 医院科室展示 /…...
MODBUS TCP转CANopen 技术赋能高效协同作业
在现代工业自动化领域,MODBUS TCP和CANopen两种通讯协议因其稳定性和高效性被广泛应用于各种设备和系统中。而随着科技的不断进步,这两种通讯协议也正在被逐步融合,形成了一种新型的通讯方式——开疆智能MODBUS TCP转CANopen网关KJ-TCPC-CANP…...
Module Federation 和 Native Federation 的比较
前言 Module Federation 是 Webpack 5 引入的微前端架构方案,允许不同独立构建的应用在运行时动态共享模块。 Native Federation 是 Angular 官方基于 Module Federation 理念实现的专为 Angular 优化的微前端方案。 概念解析 Module Federation (模块联邦) Modul…...
DBAPI如何优雅的获取单条数据
API如何优雅的获取单条数据 案例一 对于查询类API,查询的是单条数据,比如根据主键ID查询用户信息,sql如下: select id, name, age from user where id #{id}API默认返回的数据格式是多条的,如下: {&qu…...
ElasticSearch搜索引擎之倒排索引及其底层算法
文章目录 一、搜索引擎1、什么是搜索引擎?2、搜索引擎的分类3、常用的搜索引擎4、搜索引擎的特点二、倒排索引1、简介2、为什么倒排索引不用B+树1.创建时间长,文件大。2.其次,树深,IO次数可怕。3.索引可能会失效。4.精准度差。三. 倒排索引四、算法1、Term Index的算法2、 …...
Axios请求超时重发机制
Axios 超时重新请求实现方案 在 Axios 中实现超时重新请求可以通过以下几种方式: 1. 使用拦截器实现自动重试 import axios from axios;// 创建axios实例 const instance axios.create();// 设置超时时间 instance.defaults.timeout 5000;// 最大重试次数 cons…...
