kafka生产端之架构及工作原理
文章目录
- 整体架构
- 元数据更新
整体架构
消息在真正发往Kafka之前,有可能需要经历拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)等一系列的作用,那么在此之后又会发生什么呢?下面我们来看一下生产者客户端的整体架构,如图所示。
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程)。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。
RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator缓存的大小可以通过生产者客户端参数buffer.memory配置,默认值为33554432B,即32MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send0方法调用要么被阻塞,要么抛出异常,这个取决于参数max.b1ock.ms的配置,此参数的默认值为60000,即60秒。
主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Deque)中,在RecordAccumulator的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,即Deque。消息写入缓存时,追加到双端队列的尾部;Sender读取消息时,从双端队列的头部读取。注意ProducerBatch不是ProducerRecord,ProducerBatch中可以包含一至多个ProducerRecord。通俗地说,ProducerRecord是生产者中创建的消息,而ProducerBatch是指一个消息批次,ProducerRecord会被包含在ProducerBatch中,这样可以使字节的使用更加紧漆。与此同时,将较小的ProducerRecord拼漆成一个较大的ProducerBatch,也可以减少网络请求的次数以提升整体的吞吐量。ProducerBatch和消息的具体格式有关。如果生产者客户端需要向很多分区发送消息,则可以将buffer.memory参数适当调大以增加整体的吞吐量。
消息在网络上都是以字节(Byte)的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在Kafka生产者客户端中,通过java.io.ByteBuffer实现消息内存的创建和释放。不过频繁的创建和释放是比较耗费资源的,在RecordAccumulator的内部还有一个BufferPool,它主要用来实现ByteBuffer的复用,以实现缓存的高效利用。不过BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会缓存进BufferPool中,这个特定的大小由batch.size参数来指定,默认值为16384B,即16KB。我们可以适当地调大batch.size参数以便多缓存一些消息。
ProducerBatch的大小和batch.size参数也有着密切的关系。当一条消息(ProducerRecord)流入RecordAccumulator时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个ProducerBatch(如果没有则新建),查看ProducerBatch中是否还可以写入这个ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的ProducerBatch。在新建ProducerBatch时评估这条消息的大小是否超过batch.size参数的大小,如果不超过,那么就以batch.size参数的大小来创建ProducerBatch,这样在使用完这段内存区域之后,可以通过BufferPool的管理来进行复用;如果超过,那么就以评估的大小来创建ProducerBatch,这段内存区域不会被复用。
Sender从RecordAccumulator中获取缓存的消息之后,会进一步将原本<分区,Deque>的保存形式转变成<Node,ListProducerBatch>的形式,其中Node表示Kafka集群的broker节点。对于网络连接来说,生产者客户端是与具体的broker节点建立的连接,也就是向具体的broker节点发送消息,而并不关心消息属于哪一个分区;而对于KafkaProducer的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这单需要做一个应用逻辑层面到网络IO层面的转换。
在转换成<Node,ListProducerBatch>>的形式之后,Sender还会进一步封装成<Node,Request>的形式,这样就可以将Request请求发往各个Node了,这里的Request是指Kafka的各种协议请求,对于消息发送而言就是指具体的ProduceRequest。
请求在从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为Map<NodeId,Deque>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId是一个String类型,表示节点的id编号)。与此同时,InFlightRequests还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与Node之间的连接)最多缓存的请求数。这个配置参数为max.in.flight.requests.per.connection,默认值为5,即每个连接最多只能缓存5个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应(Response)。通过比较Deque的size与这个参数的大小来判断对应的Node中是否已经堆积了很多未响应的消息,如果真是如此,那么说明这个Node节点负载较大或网络连接有问题,再继续向其发送请求会增大请求超时的可能。
元数据更新
上面提及的InFlightRequests还可以获得leastLoadedNode,即所有Node中负载最小的那一个。这里的负载最小是通过每个Node在InFlightRequests中还未确认的请求决定的,未确认的请求越多则认为负载越大。对于图中的InFlightRequests来说,图中展示了三个节点Node0、Node1和Node2,很明显Node1的负载最小。也就是说,Node1为当前的leastLoadedNodec选择leastLoadedNode发送请求可以使它能够尽快发出,避免因网络拥塞等异常而影响整体的进度。leastLoadedNode的概念可以用于多个应用场合,比如元数据请求、消费者组播协议的交互。
我们只知道主题的名称,对于其他一些必要的信息却一无所知。KafkaProducer要将此消息追加到指定主题的某个分区所对应的leader副本之前,首先需要知道主题的分区数量,然后经过计算得出(或者直接指定)目标分区,之后KafkaProducer需要知道目标分区的leader副本所在的broker节点的地址、端口等信息才能建立连接,最终才能将消息发送到Kafka,在这一过程中所需要的信息都属于元数据信息。
在上面的讲解中我们了解了bootstrap.servers参数只需要配置部分broker节点的地址即可,不需要配置所有broker节点的地址,因为客户端可以自己发现其他broker节点的地址,这一过程也属于元数据相关的更新操作。与此同时,分区数量及leader副本的分布都会动态地变化,客户端也需要动态地捕捉这些变化。
元数据是指Kafka集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的leader副本分配在哪个节点上,follower副本分配在哪些节点上,哪些副本在AR、ISR等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。
当客户端中没有需要使用的元数据信息时,比如没有指定的主题信息,或者超过metadata.max.age.ms时间没有更新元数据都会引起元数据的更新操作。客户端参数metadata.max.age.ms的默认值为300000,即5分钟。元数据的更新操作是在客户端内部进行的,对客户端的外部使用者不可见。当需要更新元数据时,会先挑选出leastLoadedNode,然后向这个Node发送MetadataRequest请求来获取具体的元数据信息。这个更新操作是由Sender线程发起的,在创建完MetadataRequest之后同样会存入InFlightRequests,之后的步骤就和发送消息时的类似。元数据虽然由Sender线程负责更新,但是主线程也需要读取这些信息,这里的数据同步通过synchronized和final关键字来保障。
相关文章:

kafka生产端之架构及工作原理
文章目录 整体架构元数据更新 整体架构 消息在真正发往Kafka之前,有可能需要经历拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)等一系列的作用,那么在此之后又会…...

38、【OS】【Nuttx】OSTest分析(3):参数传递
背景 接之前 blog 36、【OS】【Nuttx】OSTest分析(2):环境变量测试 37、【OS】【Nuttx】OSTest分析(2):任务创建 分析完环境变量测试,和任务创建的一些关键要素,OSTest 进入下一阶段…...

存储异常导致的Oracle重大生产故障
📢📢📢📣📣📣 作者:IT邦德 中国DBA联盟(ACDU)成员,10余年DBA工作经验 Oracle、PostgreSQL ACE CSDN博客专家及B站知名UP主,全网粉丝10万 擅长主流Oracle、MySQL、PG、高斯…...
C语言时间相关宏定义
在C语言中,预处理器提供了一些与时间相关的宏定义,用于在编译时获取日期、时间等信息。除了 __TIMESTAMP__ 和 __DATE__,还有以下相关的宏定义: __DATE__ 当前编译日期的字符串,格式为 "Mmm dd yyyy"&#x…...
Android Studio:Application 和 Activity的区别
Application 和 Activity 是 Android 中非常重要的两个组件,它们分别负责不同的生命周期管理和应用的不同层次的操作。 Application 是应用级别的生命周期管理,它在整个应用运行时只有一个实例,负责应用的全局初始化和资源管理。Activity 是…...
如何优化爬虫以提高搜索效率
在数据采集和网络爬虫领域,优化爬虫性能是提升数据采集效率的关键。随着网页结构的日益复杂和数据量的不断增长,高效的爬虫能够显著降低运行时间和资源成本。本文将详细介绍如何优化爬虫以提高搜索效率,包括选择合适的工具、优化代码逻辑、使…...
git撤销上一次的提交
1、撤销提交 如果需要撤销上一次的提交,只是提交到了本地,可以通过命令: // 撤销最近的提交(保留修改) git reset --soft HEAD~1 这个操作可以保留之前的提交和当前的修改。最近一次的提交到本地的修改的提交会回到…...

LLM学习笔记1——本地部署Meta-Llama-3.2-1B大模型
系列文章目录 参考博客 参考博客 文章目录 系列文章目录前言与调用一、部署要求二、实现步骤0.深度学习环境错误1,验证pytorch版本时提示以下问题:错误2,验证pytorch版本时提示以下问题:错误3,有时候还会提示你有一些…...
Nginx反代Ollama接口跨域、无法逐字输出问题
场景 本地部署deepseek模型,用的Ollama管理,内网穿透到公网,在通过nginx反代ollama接口。 问题描述 跨域问题 nginx转发时请求头中需要加入origin,并且origin还要和ollama接口同源(协议、ip、端口一致)。…...

大学资产管理系统中的下载功能设计与实现
大学资产管理系统是高校信息化建设的重要组成部分,它负责记录和管理学校内所有固定资产的信息。随着信息技术的发展,下载功能成为提高资产管理效率的关键环节之一。 系统架构的设计是实现下载功能的基础。一个良好的系统架构能够确保数据的高效传输和存储…...

股指入门:股指期货是什么意思?在哪里可以做股指期货交易?
股指期货是一种以股票指数为标的物的期货合约,也可以称为股票指数期货或期指。 股指期货是什么意思? 股指期货是一种金融衍生品,其标的资产是股票市场上的股指,例如标普500指数、道琼斯工业平均指数、上证50指数等。 股指期货允…...

< OS 有关 > 利用 google-drive-ocamlfuse 工具,在 Ubuntu 24 系统上 加载 Google DRIVE 网盘
Created by Dave On 8Feb.2025 起因: 想下载 StableDiffusion,清理系统文件时把 i/o 搞到 100%,已经删除到 apt 缓存,还差 89MB,只能另想办法。 在网上找能不能挂在 Google 网盘,百度网盘,或 …...
Golang的引用类型和指针
在Golang中,引用类型和指针是两个容易混淆的概念,但它们有本质的区别。理解它们的区别对于编写高效、正确的Go代码至关重要。 1. 引用类型 引用类型是Go语言中某些内置类型的统称,它们的值在传递时共享底层数据,而不是复制数据。…...

51单片机之冯·诺依曼结构
一、概述 8051系列单片机将作为控制应用最基本的内容集成在一个硅片上,其内部结构如图4-1所示。作为单一芯片的计算机,它的内部结构与一台计算机的主机非常相似。其中微处理器相当于计算机中的CPU,由运算器和控制器两个部分构成;…...
32. C 语言 安全函数( _s 尾缀)
本章目录 前言什么是安全函数?安全函数的特点主要的安全函数1. 字符串操作安全函数2. 格式化输出安全函数3. 内存操作安全函数4. 其他常用安全函数 安全函数实例示例 1:strcpy_s 和 strcat_s示例 2:memcpy_s示例 3:strtok_s 总结 …...

Android T(13) 源码分析 — BufferQueue 的分析
Android T(13) 源码分析 — BufferQueue 的分析 文章目录 Android T(13) 源码分析 — BufferQueue 的分析前言摘要一、Java 层的 BufferQueue 分析二、原生层的 BufferQueue 分析1、BLASTBufferQueue 的创建2、BLASTBufferQueue 的更新3、Surface 的创建 总结 前言 该系列文章…...
Vite+TS项目中配置路径别名
在使用 Vite 和 TypeScript 的项目中配置路径别名,可以简化模块导入路径,提高代码的可读性和维护性。以下是详细的步骤和示例代码: 1. 配置 Vite 别名 前置条件 下载types/node 下面引入的path会用到 npm install types/node --save-dev原…...
看盘细节系列 篇二:集合竞价的9点18分大单打到3%以下或以上,9点19分撤单
文章目录 系列文章现象原因分析时间点含义正常情况测试市场反应诱导跟风操纵股价意图系列文章 看盘细节系列 篇一:集合竞价尾盘突变 现象 集合竞价中 9 点 18 分通过一笔大单或连续几笔大单将股价打到 3% 以下或以上,9 点 19 分又迅速撤单。从而在分时图上留下一根长长的上…...

Java继承简介
继承的本质:是代码的复用,重复使用已经定义好的方法和域(即全局变量) 要掌握继承首先要了解Java方法的重载和重写 方法的重载和重写 方法的重载 当前方法名相同,但是参数类型不同,发生重载 类比数学函…...

redis之哨兵集群搭建
一:哨兵集群工作概览图 1.监控:sentinel通过心跳监控redis的master和slave实例是否正常工作 2.故障转移:假如master出现故障,sentinel会选举一个slave作为新的master,当故障实例恢复后身份会变成slave,会以…...

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明
LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造,完美适配AGV和无人叉车。同时,集成以太网与语音合成技术,为各类高级系统(如MES、调度系统、库位管理、立库等)提供高效便捷的语音交互体验。 L…...

python打卡day49
知识点回顾: 通道注意力模块复习空间注意力模块CBAM的定义 作业:尝试对今天的模型检查参数数目,并用tensorboard查看训练过程 import torch import torch.nn as nn# 定义通道注意力 class ChannelAttention(nn.Module):def __init__(self,…...
Java - Mysql数据类型对应
Mysql数据类型java数据类型备注整型INT/INTEGERint / java.lang.Integer–BIGINTlong/java.lang.Long–––浮点型FLOATfloat/java.lang.FloatDOUBLEdouble/java.lang.Double–DECIMAL/NUMERICjava.math.BigDecimal字符串型CHARjava.lang.String固定长度字符串VARCHARjava.lang…...
Qwen3-Embedding-0.6B深度解析:多语言语义检索的轻量级利器
第一章 引言:语义表示的新时代挑战与Qwen3的破局之路 1.1 文本嵌入的核心价值与技术演进 在人工智能领域,文本嵌入技术如同连接自然语言与机器理解的“神经突触”——它将人类语言转化为计算机可计算的语义向量,支撑着搜索引擎、推荐系统、…...

成都鼎讯硬核科技!雷达目标与干扰模拟器,以卓越性能制胜电磁频谱战
在现代战争中,电磁频谱已成为继陆、海、空、天之后的 “第五维战场”,雷达作为电磁频谱领域的关键装备,其干扰与抗干扰能力的较量,直接影响着战争的胜负走向。由成都鼎讯科技匠心打造的雷达目标与干扰模拟器,凭借数字射…...
高防服务器能够抵御哪些网络攻击呢?
高防服务器作为一种有着高度防御能力的服务器,可以帮助网站应对分布式拒绝服务攻击,有效识别和清理一些恶意的网络流量,为用户提供安全且稳定的网络环境,那么,高防服务器一般都可以抵御哪些网络攻击呢?下面…...

【论文阅读28】-CNN-BiLSTM-Attention-(2024)
本文把滑坡位移序列拆开、筛优质因子,再用 CNN-BiLSTM-Attention 来动态预测每个子序列,最后重构出总位移,预测效果超越传统模型。 文章目录 1 引言2 方法2.1 位移时间序列加性模型2.2 变分模态分解 (VMD) 具体步骤2.3.1 样本熵(S…...
力扣-35.搜索插入位置
题目描述 给定一个排序数组和一个目标值,在数组中找到目标值,并返回其索引。如果目标值不存在于数组中,返回它将会被按顺序插入的位置。 请必须使用时间复杂度为 O(log n) 的算法。 class Solution {public int searchInsert(int[] nums, …...

初学 pytest 记录
安装 pip install pytest用例可以是函数也可以是类中的方法 def test_func():print()class TestAdd: # def __init__(self): 在 pytest 中不可以使用__init__方法 # self.cc 12345 pytest.mark.api def test_str(self):res add(1, 2)assert res 12def test_int(self):r…...

Docker 本地安装 mysql 数据库
Docker: Accelerated Container Application Development 下载对应操作系统版本的 docker ;并安装。 基础操作不再赘述。 打开 macOS 终端,开始 docker 安装mysql之旅 第一步 docker search mysql 》〉docker search mysql NAME DE…...