Kafka源码简要分析
目录
一、生产者的初始化流程
二、生产者到缓冲队列的流程
三、Sender拉取数据到Kafka流程
四、消费者初始化
五、主题订阅原理
六、消费者抓取数据原理
七、消费者组初始化
八、消费者组消费流程
九、提交offset原理
一、生产者的初始化流程
- 首先获取事务id和客户端id(用到事物必须要事物id不然报错,每个生产者都需要唯一标识客户端id)
- 监控kafka相关情况的JmxReporter配置
- 然后获取分区器,如果用户有自定义的就读取配置的,如果没有配置就用默认分区器
- 然后key和value进行序列化
- 然后就读取自定义拦截器,可以定义多个拦截器,组成拦截器链
- 然后初始化控制单条日志的大小,默认是1m;缓冲区大小,默认32m;
- 创建内存池,缓存队列,初始化批次大小默认16k,压缩相关处理,默认是none,重试间隔时间默认100ms
- 连接kafka集群,获取元数据,才能知道要发送到哪个分区
- 创建sender线程,会有个创建sender的方法,sender线程负责拉取缓冲队列消息到Kafka,在方法里面会定义缓存请求的个数默认5个,然后请求超时的时间,然后创建一个网络请求客户端对象,会传入刚刚的参数还有客户端id,重试时间,发送缓冲区的大小128和接受缓冲区的大小32,还有acks等配置。sender继承了Runnbale接口,然后会new个sender线程出来用上面这些参数,然后返回。
- sender放到后台,启动sender线程
二、生产者到缓冲队列的流程
- 在执行到拦截器的时候就要调用一个onSend方法,如果有多个拦截器,每个拦截器都会走一次这个方法,这个方法就是拦截器对数据加工的
- 然后获取元数据,要根据主题的分区放到对应的缓存队列
- 序列化相关操作key和value的序列化和压缩
- 分区操作,如果指定了分区,直接分配到指定分区;没有指定就会根据分区器进行分配,没有指定key就会粘性分区处理(如果批次大小和活着时间到了不然就一直是那个,满足才能创建新队列用),如果指定key就根据key到hashcode进分区数取模,
- 保证(序列化和压缩后)数据大小能够传输,他去读取配置的消息最大值和缓冲区大小,如果有超过的抛异常
- 向缓存队列里面追加数据,获取或者创建一个队列按照分区,然后尝试添加数据(一般不成功,因为还没申请内存),然后根据16k和现在压缩后的总大小取最大值,申请内存就申请这个大小,内存池分配内存,然后sender线程拿走就了会释放内存。
- 如果批次大小满了或者有了新的批次需要创建,就唤醒sender线程把缓冲队列的数据拉取过去。
三、Sender拉取数据到Kafka流程
- 事务相关操作
- 获取元数据信息,为了知道发到哪个分区
- 判断32m缓存是否准备好,先获取队列的信息,先判断内存队列有没有数据
- 判断leader是不是空如果没有目标那还是会抛出异常,如果批次大小或时间满足一个条件,就会发送。
- 把所有请求按照节点为单位来发送请求,这样一台机器只需要建立一次连接
- 封装了个request然后通过网络客户端把数据发送过去
- 然后服务端还是通过网络客户端获取结果
四、消费者初始化
- 消费者组平衡
- 获取消费者组id和客户端id
- 设置请求服务端等待时间,默认30秒;重试时间,默认100毫秒
- 拦截器链相关处理
- key和value的反序列化
- 判断offset从什么位置开始消费
- 获取消费者元数据(重试时间、是否允许访问系统主题默认false,是否允许自动创建topic主题默认true)
- 连接Kafka集群
- 创建网络客户端对象(连接重试时间默认50ms,最大重试时间1s,发送缓冲区128kb和接受缓冲区64kb大小)
- 指定消费者分区分配策略
- 创建coordinator对象
- 设置自动提交offset时间,默认5s,配置抓取数据的参数(最少抓取多少最大一次抓取多少等)
五、主题订阅原理
- 传入要订阅的主题,如果为null直接抛出异常
- 注册负载均衡监听器,如果消费者组中有节点挂了,要通知其他消费者
- 按照主题自动订阅进行分配
六、消费者抓取数据原理
- 他首先先初始化消费者组和队列
- 然后回调消息会到缓冲队列,然后去队列抓取数据,最多一次500条
- 然后抓取后拦截器开始处理数据
七、消费者组初始化
- 先判断coordinator不为null那就说明为消费者组
- 如果没有指定分区分配策略会抛出异常
- 判断coordinator是否准备好,他会循环创建查找coordinator的请求并发送,并获取服务器返回到结果
他这整个消费者组初始化就是判断coordinator有没有准备好
八、消费者组消费流程
- 他会用判断coordinator是不是空,是的话就等待
- 他上来先去队列拉取数据,一般是拉取不到的
- 他先构造请求的入参(最少一次抓多少,最多抓多少,超时时间等待)然后调用send
- 他送后返回future,通过回调获取数据的
- 他会循环遍历数据获取分区,获取分区的数据,如果有数据就放到消息队列里面
- 然后就调用从队列拉取数据的方法拉取,然后他有大小限制最大500,他会循环一波一波拉取过去
- 然后放到拦截器走加工操作
九、提交offset原理
- 同步提交:找到coordinator然后调用commitOffset进行发送,然后不停dowhile循环,调用发送提交请求,然后等待回调获取结果,一直循环到成功为止。
- 异步提交:他还是用coordinator去提交但是他不等待结果,他new了个监听等待结果。
相关文章:
Kafka源码简要分析
目录 一、生产者的初始化流程 二、生产者到缓冲队列的流程 三、Sender拉取数据到Kafka流程 四、消费者初始化 五、主题订阅原理 六、消费者抓取数据原理 七、消费者组初始化 八、消费者组消费流程 九、提交offset原理 一、生产者的初始化流程 首先获取事务id和客户端…...
react 按住ctrl键,点击时会出现菜单的问题修复
问题描述:我需要按住crtl键,然后鼠标点击后做一些逻辑操作,但是出现如下问题 问题一:按住ctrl键后,点击时不触发click事件,只触发 mousedown和mouseup事件。 问题二:按住ctrl键点击时出现菜单…...
【虚拟机栈】
文章目录 1. 虚拟机栈概述2. 局部变量表(Local Variables)3. 操作数栈4. 动态链接4.1 方法的调用:解析与分配 5. 方法返回地址6. 栈的相关面试题 1. 虚拟机栈概述 每个线程在创建时都会创建一个虚拟机栈,其内部保存一个个的栈帧(Stack Frame…...
Linux系列讲解 —— 【fsck】检查并修复Linux文件系统
当文件系统出现损坏时,例如文件无法查看,删除等,可以使用 fsck(File System Consistency Check)进行修复。但是需要注意fsck在修复时,如果检查出某个文件有问题,可能会向用户请求删除。所以&…...
gitlab突然提示我要输入密码了。
用了很长时间的一个gitlab库,今天提交代码的时候突然提示我输入密码了,并且用户还是gitxx.xx.xx.xx的,瞬间懵逼。 想想原因,可能是因为我不久前设置了本地对另外一个git库的远程访问,用的是ssh,操作过程中可…...
业务测试常见问题(一)
如何多维度的分析一个需求? 功能维度:需求中所描述的功能是否实现,与用户的需求是否一致,是否完整符合用户的需求等。 安全性维度:是否有安全漏洞,是否存在未授权访问漏洞等,以保证系统的安全性…...
IntelliJ IDEA失焦自动重启服务的解决方法
IDEA 热部署特性 热部署,即应用正属于运行状态时,我们对应用源码进行了修改更新,在不重新启动应用的情况下,可以能够自动的把更新的内容重新进行编译并部署到服务器上,使修改立即生效。 现象 在使用 IntelliJ IDEA运…...
终端准入控制系统,保障企业内网安全的关键防线
随着网络技术的不断发展,企业面临的安全威胁也越来越多。终端作为承载企业业务的媒介,对内网资产安全有着重要影响。确保内网终端(如PC、BYOD、IoT等)能够得到统一管理,对保护内网安全很有必要。终端准入控制作为一种有…...
mysql-执行计划
1. 执行计划表概述 id相同表示加载表的顺序是从上到下。 id不同id值越大,优先级越高,越先被执行。id有相同,也有不同,同时存在。 id相同的可以认为是一组,从上往下顺序执行;在所有的组中,id的值…...
金蝶云星空和旺店通·企业奇门接口打通对接实战
金蝶云星空和旺店通企业奇门接口打通对接实战 接入系统:金蝶云星空 金蝶K/3Cloud(金蝶云星空)是移动互联网时代的新型ERP,是基于WEB2.0与云技术的新时代企业管理服务平台。金蝶K/3Cloud围绕着“生态、人人、体验”,旨在…...
在服务器上使用nginx改变前端项目请求的url
location /app-dev {rewrite ^/app-dev/(.*) /$1 break;proxy_pass http://152.136.36.251:9999;proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr; } location /请求后缀 { rewrite ^/app-dev/(.*) /$1 break; proxy_pass 想要的请求后端的url; …...
【学习笔记】莫比乌斯反演
退役OIer回来受虐啦 一些定义 μ ( x ) { 1 x > 1 ( − 1 ) n x ∏ i 1 n P i 0 o t h e r w i s e \mu(x) \begin{cases} 1 & x > 1 \\ (-1)^n & x \prod _ {i1} ^ {n} P_{i}\\ 0 & otherwise \end{cases} μ(x)⎩ ⎨ ⎧1(−1)n0x>1x∏i1nPi…...
一款构建Python命令行应用的开源库
1 简介 当我们编写 Python 程序时,我们经常需要与用户进行交互,接收输入并输出结果。Python 提供了许多方法来实现这一点,其中一个非常方便的方法是使用 typer 库。typer 是一个用于构建命令行应用程序的 Python 库,它使得创建命令…...
10-Node.js模块化
01.模块化简介 目标 了解模块化概念和好处,以及 CommonJS 标准语法导出和导入 讲解 在 Node.js 中每个文件都被当做是一个独立的模块,模块内定义的变量和函数都是独立作用域的,因为 Node.js 在执行模块代码时,将使用如下所示的…...
数字IC前端学习笔记:数字乘法器的优化设计(Dadda Tree乘法器)
相关阅读 数字IC前端https://blog.csdn.net/weixin_45791458/category_12173698.html?spm1001.2014.3001.5482 华莱士树仍然是一种比较规则的结构(这使得可以方便地生成树的结构),这导致了它所使用的全加器和半加器个数不是最少的ÿ…...
计算机专业毕业设计项目推荐14-文档编辑平台(SpringBoot+Vue+Mysql)
文档编辑平台(SpringBootVueMysql) **介绍****各部分模块实现** 介绍 本系列(后期可能博主会统一为专栏)博文献给即将毕业的计算机专业同学们,因为博主自身本科和硕士也是科班出生,所以也比较了解计算机专业的毕业设计流程以及模式,在编写的…...
【读书后台管理系统】—后端框架搭建(二)
【读书后台管理系统】—后端框架搭建(二) 一、 Node 简介 Node 是一个基于 V8 引擎的 Javascript 运行环境,它使得 Javascript 可以运行在服务端,直接与操作系统进行交互,与文件控制、网络交互、进程控制等 Chrome …...
【DLoopDetector(C++)】DBow2词袋模型loop close学习
0.前言 最近读了两篇论文,论文作者开源了一种基于词袋模型DBoW2库的DLoopDetector算法,自己运行demo测试一下 对应论文介绍:Bags of Binary Words for Fast Place Recognition in Image Sequences 开源项目Github地址:https://gi…...
什么是CAS机制?
CAS和Synchronized的区别是什么?适合什么样的场景?有什么样的优点和缺点? 示例程序:启动两个线程,每个线程中让静态变量count循环累加100次。 public class ThreadTest {private static int count 0;public static …...
Java多态详解
下面讲解一下Java中的多态机制,力求用最通俗易懂的语言,最精炼的话语,最生动的例子,深入浅出Java多态,帮助读者轻松掌握这个知识点。 什么是多态? 多态是指同一种行为具有多个不同表现形式的能力。 多态…...
变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析
一、变量声明设计:let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性,这种设计体现了语言的核心哲学。以下是深度解析: 1.1 设计理念剖析 安全优先原则:默认不可变强制开发者明确声明意图 let x 5; …...
python打卡day49
知识点回顾: 通道注意力模块复习空间注意力模块CBAM的定义 作业:尝试对今天的模型检查参数数目,并用tensorboard查看训练过程 import torch import torch.nn as nn# 定义通道注意力 class ChannelAttention(nn.Module):def __init__(self,…...
黑马Mybatis
Mybatis 表现层:页面展示 业务层:逻辑处理 持久层:持久数据化保存 在这里插入图片描述 Mybatis快速入门 期间,城市交通面临瞬时人流车流激增、传统摄像头模糊、交通拥堵识别滞后等问题。以演唱会为例,暖城商圈曾因观众集中离场导致周边…...
376. Wiggle Subsequence
376. Wiggle Subsequence 代码 class Solution { public:int wiggleMaxLength(vector<int>& nums) {int n nums.size();int res 1;int prediff 0;int curdiff 0;for(int i 0;i < n-1;i){curdiff nums[i1] - nums[i];if( (prediff > 0 && curdif…...
VTK如何让部分单位不可见
最近遇到一个需求,需要让一个vtkDataSet中的部分单元不可见,查阅了一些资料大概有以下几种方式 1.通过颜色映射表来进行,是最正规的做法 vtkNew<vtkLookupTable> lut; //值为0不显示,主要是最后一个参数,透明度…...
DBAPI如何优雅的获取单条数据
API如何优雅的获取单条数据 案例一 对于查询类API,查询的是单条数据,比如根据主键ID查询用户信息,sql如下: select id, name, age from user where id #{id}API默认返回的数据格式是多条的,如下: {&qu…...
AspectJ 在 Android 中的完整使用指南
一、环境配置(Gradle 7.0 适配) 1. 项目级 build.gradle // 注意:沪江插件已停更,推荐官方兼容方案 buildscript {dependencies {classpath org.aspectj:aspectjtools:1.9.9.1 // AspectJ 工具} } 2. 模块级 build.gradle plu…...
浪潮交换机配置track检测实现高速公路收费网络主备切换NQA
浪潮交换机track配置 项目背景高速网络拓扑网络情况分析通信线路收费网络路由 收费汇聚交换机相应配置收费汇聚track配置 项目背景 在实施省内一条高速公路时遇到的需求,本次涉及的主要是收费汇聚交换机的配置,浪潮网络设备在高速项目很少,通…...
return this;返回的是谁
一个审批系统的示例来演示责任链模式的实现。假设公司需要处理不同金额的采购申请,不同级别的经理有不同的审批权限: // 抽象处理者:审批者 abstract class Approver {protected Approver successor; // 下一个处理者// 设置下一个处理者pub…...
