Kafka-服务端-网络层-源码流程
整体架构如下所示:

responseQueue不在RequestChannel中,在Processor中,每个Processor内部有一个responseQueue
- 客户端发送的请求被Acceptor转发给Processor处理
- 处理器将请求放到RequestChannel的requestQueue中
- KafkaRequestHandler取出requestQueue中的请求
- 调用KafkaApis进行业务逻辑处理
- KafkaApis将响应结果放到对应的Processor的responseQueue中
- processor从responseQueue中取出响应结果
- processor将响应结果返回给客户端
KafkaServer是Kafka服务端的主类,KafkaServer中和网络成相关的服务组件包括SocketServer、KafkaApis和KafkaRequestHandlerPool。SocketServer主要关注网络层的通信协议,具体的业务处理逻辑则交给KafkaRequestHandler和KafkaApis来完成。
class KafkaServer(val config: KafkaConfig) {def startup() {socketServer = new SocketServer(config, metrics, time, credentialProvider)socketServer.startup(startupProcessors = false)/* start processing requests */apis = new KafkaApis(socketServer.requestChannel, ...)requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, ...)}}
SocketServer
def startup(startupProcessors: Boolean = true) {this.synchronized {...createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)if (startupProcessors) {startProcessors()}}private def createAcceptorAndProcessors(processorsPerListener: Int,endpoints: Seq[EndPoint]): Unit = synchronized {...endpoints.foreach { endpoint =>...val acceptor = new Acceptor(endpoint, ...)addProcessors(acceptor, endpoint, processorsPerListener)KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()acceptor.awaitStartup()acceptors.put(endpoint, acceptor)}}
可以看出SocketServer.startup()中会根据listener的个数创建相同个数的acceptor,每个acceptor关联数个processor。这是一种典型的Reactor模式,acceptor负责与客户端建立连接,并将连接分发给processor,processor负责所分连接后续的所有读写交互。
Acceptor
def run() {serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)startupComplete()try {var currentProcessor = 0while (isRunning) {try {val ready = nioSelector.select(500)if (ready > 0) {val keys = nioSelector.selectedKeys()val iter = keys.iterator()while (iter.hasNext && isRunning) {try {val key = iter.nextiter.remove()if (key.isAcceptable) {val processor = synchronized {currentProcessor = currentProcessor % processors.sizeprocessors(currentProcessor)}accept(key, processor)} elsethrow new IllegalStateException("Unrecognized key state for acceptor thread.")// round robin to the next processor thread, mod(numProcessors) will be done latercurrentProcessor = currentProcessor + 1} catch {case e: Throwable => error("Error while accepting connection", e)}}}}catch {// We catch all the throwables to prevent the acceptor thread from exiting on exceptions due// to a select operation on a specific channel or a bad request. We don't want// the broker to stop responding to requests from other clients in these scenarios.case e: ControlThrowable => throw ecase e: Throwable => error("Error occurred", e)}}} finally {debug("Closing server socket and selector.")CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)shutdownComplete()}}
上面是Acceptor的run()方法,可以看出,Acceptor在通道上注册了SelectionKey.OP_ACCEPT事件(OP_READ、OP_WRITE、OP_CONNECT、OP_ACCEPT,客户端监听OP_CONNECT事件,负责发起连接,服务端监听OP_CONNECT事件,负责建立连接),负责与客户端建立连接。并将建立的连接通过轮询的方式指派给processor。
Processor
每个Processor都会分到数个与客户端的连接。Processor的处理逻辑如下所示:
override def run() {startupComplete()try {while (isRunning) {try {// 在新分到的客户端连接上注册OP_READ事件configureNewConnections()// 从responseQueue中取响应,赋值给KafkaChannel的send,等待poll时发送processNewResponses()// selector轮询各种事件,读取请求或者发送响应poll()// 封装selector.completedReceives中的请求,放入requestQueueprocessCompletedReceives()// 处理selector.completedSends响应(移除inflightResponses中的记录;执行响应的回调函数)processCompletedSends()processDisconnected()} catch {...}}} finally {...}}
Processor线程的名字中有kafka-network字样,可以通过jstack -l pid | grep kafka-network进行筛选。
KafkaRequestHandlerPool
KafkaServer会创建请求处理线程池(KafkaRequestHandlerPool),在请求处理线程池中会创建并启动多个请求处理线程(KafkaRequestHandler)。KafkaRequestHandler会获取RequestChannel.requestQueue中的请求进行处理,在内部实际处理会交给KafkaApis完成。
class KafkaRequestHandlerPool(val brokerId: Int, ...) {...for (i <- 0 until numThreads) {createHandler(i)}def createHandler(id: Int): Unit = synchronized {runnables += new KafkaRequestHandler(..., requestChannel, apis, time)KafkaThread.daemon("kafka-request-handler-" + id, runnables(id)).start()}
}
KafkaRequestHandler的run()方法如下:
class KafkaRequestHandler(id: Int,...) extends Runnable with Logging {...def run() {while (!stopped) {val req = requestChannel.receiveRequest(300)req match {case RequestChannel.ShutdownRequest =>shutdownComplete.countDown()returncase request: RequestChannel.Request =>try {request.requestDequeueTimeNanos = endTimeapis.handle(request)} catch {case e: FatalExitError =>shutdownComplete.countDown()Exit.exit(e.statusCode)case e: Throwable => error("Exception when handling request", e)} finally {request.releaseBuffer()}case null => // continue}}shutdownComplete.countDown()}}
相关文章:
Kafka-服务端-网络层-源码流程
整体架构如下所示: responseQueue不在RequestChannel中,在Processor中,每个Processor内部有一个responseQueue 客户端发送的请求被Acceptor转发给Processor处理处理器将请求放到RequestChannel的requestQueue中KafkaRequestHandler取出reque…...
百日筑基第十一天-看看SpringBoot
百日筑基第十一天-看看SpringBoot 创建项目 Spring 官方提供了 Spring Initializr 的方式来创建 Spring Boot 项目。网址如下: https://start.spring.io/ 打开后的界面如下: 可以将 Spring Initializr 看作是 Spring Boot 项目的初始化向导ÿ…...
Generative Modeling by Estimating Gradients of the Data Distribution
Generative Modeling by Estimating Gradients of the Data Distribution 本文介绍宋飏提出的带噪声扰动的基于得分的生成模型。首先介绍基本的基于得分的生成模型的训练方法(得分匹配)和采样方法(朗之万动力学)。然后基于流形假…...
vector与list的简单介绍
1. 标准库中的vector类的介绍: vector是表示大小可以变化的数组的序列容器。 就像数组一样,vector对其元素使用连续的存储位置,这意味着也可以使用指向其元素的常规指针上的偏移量来访问其元素,并且与数组中的元素一样高效。但与数…...
四种线程池的使用,优缺点分析
池化思想:线程池、字符串常量池、数据库连接池 提高资源的利用率 下面是手动创建线程和执行任务过程,可见挺麻烦的,而且线程利用率不高。 手动创建线程对象执行任务执行完毕,释放线程对象 线程池的优点: 提高线程的…...
什么是 BEM 规范
BEM(Block, Element, Modifier)是一种 CSS 命名规范,旨在提高代码的可读性和可维护性。BEM 规范通过明确的命名规则来定义组件和组件的各个部分,使开发者能够更容易地理解和维护代码。 BEM 命名规范的基本概念 Block(…...
【Node.JS】入门
文章目录 Node.js的入门涉及对其基本概念、特点、安装、以及基本使用方法的了解。以下是对Node.js入门的详细介绍: 一、Node.js基本概念和特点 定义:Node.js是一个基于Chrome V8引擎的JavaScript运行环境,它使得JavaScript能够运行在服务器…...
Amazon SageMaker 机器学习之旅的助推器
一、前言 在当今的数字化时代,人工智能和机器学习已经成为推动社会进步的重要引擎。亚马逊云科技在 2023 re:Invent 全球大会上,宣布推出五项 Amazon SageMaker 新功能: Amazon SageMaker HyperPod 通过为大规模分布式训练提供专用的基础架构…...
TransMIL:基于Transformer的多实例学习
MIL是弱监督分类问题的有力工具。然而,目前的MIL方法通常基于iid假设,忽略了不同实例之间的相关性。为了解决这个问题,作者提出了一个新的框架,称为相关性MIL,并提供了收敛性的证明。基于此框架,还设计了一…...
3.用户程序与驱动交互
驱动程序请使用第二章https://blog.csdn.net/chenhequanlalala/article/details/140034424 用户app与驱动交互最常见的做法是insmod驱动后,生成一个设备节点,app通过open,read等系统调用去操作这个设备节点,这里先用mknode命令调…...
尽量不写一行if...elseif...写出高质量可持续迭代的项目代码
背景 无论是前端代码还是后端代码,都存在着定位困难,不好抽离,改造困难的问题,造成代码开发越来越慢,此外因为代码耦合较高,总是出现改了一处地方,然后影响其他地方,要么就是要修改…...
xcrun: error: unable to find utility “simctl“, not a developer tool or in PATH
目录 前言 一、问题详情 二、解决方案 1.确认Xcode已安装 2.安装Xcode命令行工具 3.指定正确的开发者目录 4. 确认命令行工具路径 5. 更新PATH环境变量 前言 今天使用cocoapods更新私有库的时候,遇到了"xcrun: error: unable to find utility &…...
【linux高级IO(一)】理解五种IO模型
💓博主CSDN主页:杭电码农-NEO💓 ⏩专栏分类:Linux从入门到精通⏪ 🚚代码仓库:NEO的学习日记🚚 🌹关注我🫵带你学更多操作系统知识 🔝🔝 Linux高级IO 1. 前言2. 重谈对…...
前端引用vue/element/echarts资源等引用方法Blob下载HTML
前端引用下载vue/element/echarts资源等引用方法 功能需求 需求是在HTML页面中集成Vue.js、Element Plus(Element UI的Vue 3版本)、ECharts等前端资源,使用Blob下载HTML。 解决方案概述 直接访问线上CDN地址:简单直接,…...
昇思MindSpore学习笔记2-01 LLM原理和实践 --基于 MindSpore 实现 BERT 对话情绪识别
摘要: 通过识别BERT对话情绪状态的实例,展现在昇思MindSpore AI框架中大语言模型的原理和实际使用方法、步骤。 一、环境配置 %%capture captured_output # 实验环境已经预装了mindspore2.2.14,如需更换mindspore版本,可更改下…...
uniapp实现图片懒加载 封装组件
想要的效果就是窗口滑动到哪里,哪里的图片进行展示 主要原理使用IntersectionObserver <template><view><image error"HandlerError" :style"imgStyle" :src"imageSrc" :id"randomId" :mode"mode&quo…...
持续交付:自动化测试与发布流程的变革
目录 前言1. 持续交付的概念1.1 持续交付的定义1.2 持续交付的核心原则 2. 持续交付的优势2.1 提高交付速度2.2 提高软件质量2.3 降低发布风险2.4 提高团队协作 3. 实施持续交付的步骤3.1 构建自动化测试体系3.1.1 单元测试3.1.2 集成测试3.1.3 功能测试3.1.4 性能测试 3.2 构建…...
VBA常用的字符串内置函数
前言 在VBA程序中,常用的内置函数可以按照功能分为字符串函数、数字函数、转换函数等等,本节主要会介绍常用的字符串的内置函数,包括Len()、Left()、Mid()、Right()、Split()、String()、StrConV()等。 本节的练习数据表以下表为例ÿ…...
大数据面试题之Spark(7)
目录 Spark实现wordcount Spark Streaming怎么实现数据持久化保存? Spark SQL读取文件,内存不够使用,如何处理? Spark的lazy体现在哪里? Spark中的并行度等于什么 Spark运行时并行度的设署 Spark SQL的数据倾斜 Spark的exactly-once Spark的…...
AI绘画 Stable Diffusion图像的脸部细节控制——采样器全解析
大家好,我是画画的小强 我们在运用AI绘画 Stable Diffusion 这一功能强大的AI绘图工具时,我们往往会发现自己对提示词的使用还不够充分。在这种情形下,我们应当如何调整自己的策略,以便更加精确、全面地塑造出理想的人物形象呢&a…...
微软PowerBI考试 PL300-选择 Power BI 模型框架【附练习数据】
微软PowerBI考试 PL300-选择 Power BI 模型框架 20 多年来,Microsoft 持续对企业商业智能 (BI) 进行大量投资。 Azure Analysis Services (AAS) 和 SQL Server Analysis Services (SSAS) 基于无数企业使用的成熟的 BI 数据建模技术。 同样的技术也是 Power BI 数据…...
MySQL 隔离级别:脏读、幻读及不可重复读的原理与示例
一、MySQL 隔离级别 MySQL 提供了四种隔离级别,用于控制事务之间的并发访问以及数据的可见性,不同隔离级别对脏读、幻读、不可重复读这几种并发数据问题有着不同的处理方式,具体如下: 隔离级别脏读不可重复读幻读性能特点及锁机制读未提交(READ UNCOMMITTED)允许出现允许…...
Mybatis逆向工程,动态创建实体类、条件扩展类、Mapper接口、Mapper.xml映射文件
今天呢,博主的学习进度也是步入了Java Mybatis 框架,目前正在逐步杨帆旗航。 那么接下来就给大家出一期有关 Mybatis 逆向工程的教学,希望能对大家有所帮助,也特别欢迎大家指点不足之处,小生很乐意接受正确的建议&…...
Docker 运行 Kafka 带 SASL 认证教程
Docker 运行 Kafka 带 SASL 认证教程 Docker 运行 Kafka 带 SASL 认证教程一、说明二、环境准备三、编写 Docker Compose 和 jaas文件docker-compose.yml代码说明:server_jaas.conf 四、启动服务五、验证服务六、连接kafka服务七、总结 Docker 运行 Kafka 带 SASL 认…...
基于Flask实现的医疗保险欺诈识别监测模型
基于Flask实现的医疗保险欺诈识别监测模型 项目截图 项目简介 社会医疗保险是国家通过立法形式强制实施,由雇主和个人按一定比例缴纳保险费,建立社会医疗保险基金,支付雇员医疗费用的一种医疗保险制度, 它是促进社会文明和进步的…...
大数据零基础学习day1之环境准备和大数据初步理解
学习大数据会使用到多台Linux服务器。 一、环境准备 1、VMware 基于VMware构建Linux虚拟机 是大数据从业者或者IT从业者的必备技能之一也是成本低廉的方案 所以VMware虚拟机方案是必须要学习的。 (1)设置网关 打开VMware虚拟机,点击编辑…...
SpringBoot+uniapp 的 Champion 俱乐部微信小程序设计与实现,论文初版实现
摘要 本论文旨在设计并实现基于 SpringBoot 和 uniapp 的 Champion 俱乐部微信小程序,以满足俱乐部线上活动推广、会员管理、社交互动等需求。通过 SpringBoot 搭建后端服务,提供稳定高效的数据处理与业务逻辑支持;利用 uniapp 实现跨平台前…...
Web 架构之 CDN 加速原理与落地实践
文章目录 一、思维导图二、正文内容(一)CDN 基础概念1. 定义2. 组成部分 (二)CDN 加速原理1. 请求路由2. 内容缓存3. 内容更新 (三)CDN 落地实践1. 选择 CDN 服务商2. 配置 CDN3. 集成到 Web 架构 …...
Java编程之桥接模式
定义 桥接模式(Bridge Pattern)属于结构型设计模式,它的核心意图是将抽象部分与实现部分分离,使它们可以独立地变化。这种模式通过组合关系来替代继承关系,从而降低了抽象和实现这两个可变维度之间的耦合度。 用例子…...
RSS 2025|从说明书学习复杂机器人操作任务:NUS邵林团队提出全新机器人装配技能学习框架Manual2Skill
视觉语言模型(Vision-Language Models, VLMs),为真实环境中的机器人操作任务提供了极具潜力的解决方案。 尽管 VLMs 取得了显著进展,机器人仍难以胜任复杂的长时程任务(如家具装配),主要受限于人…...
