Kafka-Controller选举
一、上下文
《Kafka-broker粗粒度启动流程》博客中我们分析了broker的大致启动流程,这个时候每个broker都不是controller角色,下面我们就来看下它是如何选举出来的吧
二、设置ZooKeeper
ZooKeeper是一个开源的分布式协调服务,主要用于分布式系统中各节点的协调和管理。Kafka的Controller选举也一样用到了它。
override def startup(): Unit = {//....initZkClient(time)configRepository = new ZkConfigRepository(new AdminZkClient(zkClient))//....}private def initZkClient(time: Time): Unit = {//config.zkConnect//zookeeper.connect=hostname1:2181,hostname2:2181,hostname2:2181/kafkainfo(s"Connecting to zookeeper on ${config.zkConnect}")_zkClient = KafkaZkClient.createZkClient("Kafka server", time, config, zkClientConfig)//如果需要,在ZK中预先创建顶级路径。_zkClient.createTopLevelPaths()}def createTopLevelPaths(): Unit = {//创建 Persistent 持久化的 zk 路径ZkData.PersistentZkPaths.foreach(makeSurePersistentPathExists)}//确保ZK中存在持久路径def makeSurePersistentPathExists(path: String): Unit = {createRecursive(path, data = null, throwIfPathExists = false)}
1、KafkaZkClient
KafkaZkClient是在Kafka.zookeeper.ZooKeeperClient之上提供更高级别的Kafka特定操作。
实现说明:此类包括各种组件(Controller, Configs, Old Consumer等)的方法,在某些情况下会从调用包中返回类的实例。
def createZkClient(name: String, time: Time, config: KafkaConfig, zkClientConfig: ZKClientConfig): KafkaZkClient = {//...KafkaZkClient(...)}
2、AdminZkClient
它提供与ZooKeeper交互的管理员相关方法。
class AdminZkClient(...){//创建topicdef createTopic(...){...}//获取broker元数据def getBrokerMetadatas(...){...}//创建主题并可选地验证其参数。请注意,TopicCommand也使用此方法。def createTopicWithAssignment(...){...}//验证主题创建参数def validateTopicCreate(...){...}//删除topic //为给定主题创建删除路径def deleteTopic(...){...}//使用可选的副本分配向现有主题添加分区。请注意,TopicCommand使用此方法。def addPartitions(...){...}//将broker从实体名称解析为整数iddef parseBroker(...){...}//.....
}
3、ZkConfigRepository
zookeeper的配置仓库,也就是kafka在zookeeper中配置信息。
class ZkConfigRepository(adminZkClient: AdminZkClient) extends ConfigRepository {override def config(configResource: ConfigResource): Properties = {//....//从zookeeper的目录下读取数据,并封装成实体(topic、broker、client-id、user、user/clients/client-id、ip)adminZkClient.fetchEntityConfig(configTypeForZk, effectiveName)}
}
4、ZkData
object ZkData {//这些是kafka broker 启动时应该存在的持久ZK路径。val PersistentZkPaths: Seq[String] = Seq(ConsumerPathZNode.path, // old consumer pathBrokerIdsZNode.path,TopicsZNode.path,ConfigEntityChangeNotificationZNode.path,DeleteTopicsZNode.path,BrokerSequenceIdZNode.path,IsrChangeNotificationZNode.path,ProducerIdBlockZNode.path,LogDirEventNotificationZNode.path) ++ ConfigType.ALL.asScala.map(ConfigEntityTypeZNode.path)
}//旧的consumer在zk上的路径
object ConsumerPathZNode {def path = "/consumers"
}object BrokerIdsZNode {def path = s"${BrokersZNode.path}/ids"def encode: Array[Byte] = null
}object TopicsZNode {def path = s"${BrokersZNode.path}/topics"
}object ConfigEntityChangeNotificationZNode {def path = s"${ConfigZNode.path}/changes"
}object DeleteTopicsZNode {def path = s"${AdminZNode.path}/delete_topics"
}object BrokerSequenceIdZNode {def path = s"${BrokersZNode.path}/seqid"
}object IsrChangeNotificationZNode {def path = "/isr_change_notification"
}object ProducerIdBlockZNode {val CurrentVersion: Long = 1Ldef path = "/latest_producer_id_block"def generateProducerIdBlockJson(producerIdBlock: ProducerIdsBlock): Array[Byte] = {Json.encodeAsBytes(Map("version" -> CurrentVersion,"broker" -> producerIdBlock.assignedBrokerId,"block_start" -> producerIdBlock.firstProducerId.toString,"block_end" -> producerIdBlock.lastProducerId.toString).asJava)}object LogDirEventNotificationZNode {def path = "/log_dir_event_notification"
}
三、验证元数据属性集成是否有效
1、meta.properties文件是否始终设置了cluster.id
2、meta.properties文件是否始终设置了node.id或者 broker.id
initialMetaPropsEnsemble.verify(Optional.of(_clusterId), verificationId, verificationFlags)
四、动态broker初始化
动态broker配置存储在ZooKeeper中,可以在两个级别定义:
1、每个代理的配置持久化在/configs/brokers/{brokerId} 这些可以使用AdminClient使用资源名称brokerId进行描述/更改
2、整个集群的默认值持续存在于/configs/brokers/<default> 这些可以使用AdminClient使用空资源名称进行描述/更改。
broker配置的优先级顺序为:
1、DYNAMIC_BROKER_CONFIG:存储在ZK中的/configs/brokers/{brokerId}
2、DYNAMIC_DEFAULT_BROKER_CONFIG: 存储在ZK中的//configs/brokers/<default>
3、STATIC_BROKER_CONFIG:启动代理时使用的属性,通常来自server.properties文件
4、DEFAULT_CONFIG:KafkaConfig中定义的默认配置
config.dynamicConfig.initialize(Some(zkClient), clientMetricsReceiverPluginOpt = None)
五、启动KafkaController
_kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, metadataCache, threadNamePrefix)
kafkaController.startup()
1、KafkaController结构
class KafkaController(...){//事件管理private[controller] val eventManager = new ControllerEventManager(config.brokerId, this, time,controllerContext.stats.rateAndTimeMetrics)//如果brokerid = 当前的controllerid 那么就返回truedef isActive: Boolean = activeControllerId == config.brokerId@volatile private var brokerInfo = initialBrokerInfo@volatile private var _brokerEpoch = initialBrokerEpoch//启动def startup(): Unit = {zkClient.registerStateChangeHandler(new StateChangeHandler {//ControllerHandler = "controller-state-change-handler"override val name: String = StateChangeHandlers.ControllerHandleroverride def afterInitializingSession(): Unit = {eventManager.put(RegisterBrokerAndReelect)}override def beforeInitializingSession(): Unit = {val queuedEvent = eventManager.clearAndPut(Expire)//阻止新会话的初始化,直到处理过期事件,这确保在创建新会话之前已处理所有挂起的事件queuedEvent.awaitProcessing()}})eventManager.put(Startup)eventManager.start()}override def process(event: ControllerEvent): Unit = {event match {//.....case RegisterBrokerAndReelect =>processRegisterBrokerAndReelect()case Startup =>processStartup()//.....}}
}
1、ControllerEventManager结构
class ControllerEventManager(...){//用串行化队列代替锁private val queue = new LinkedBlockingQueue[QueuedEvent]//ControllerEventThreadName = "controller-event-thread"private[controller] var thread = new ControllerEventThread(ControllerEventThreadName)def start(): Unit = thread.start()def put(event: ControllerEvent): QueuedEvent = inLock(putLock) {val queuedEvent = new QueuedEvent(event, time.milliseconds())queue.put(queuedEvent)queuedEvent}class ControllerEventThread(name: String) extends ShutdownableThread(...){override def doWork(): Unit = {//从队列获取事件,主要是controller相关的事件val dequeued = pollFromEventQueue()dequeued.event match {case controllerEvent =>def process(): Unit = dequeued.process(processor)}}}}
}
ControllerEventManager中的ControllerEventThread的父类是ShutdownableThread,它里面有真正的run()且调起了doWork(),doWork()又调起了process(),因此真正执行的是process()
public abstract class ShutdownableThread extends Thread {public abstract void doWork();public void run() {while (isRunning())doWork();}
}
这是一个死循环,也就是后面只要往队列中添加事件,会自动执行对应方法。从KafkaController的startup()中我们知道放了两个事件:RegisterBrokerAndReelect和Startup,下面我们来看看它们里面做了什么
2、RegisterBrokerAndReelect事件处理
private def processRegisterBrokerAndReelect(): Unit = {_brokerEpoch = zkClient.registerBroker(brokerInfo)processReelect()}
1、向zookeeper注册broker
class KafkaZkClient private[zk] (...{def registerBroker(brokerInfo: BrokerInfo): Long = {//brokers/ids/brokeridval path = brokerInfo.path//创建 对应的 brokerid 的 临时znode节点,说明:当该brokers挂掉后会随之消失val stat = checkedEphemeralCreate(path, brokerInfo.toJsonBytes)info(s"Registered broker ${brokerInfo.broker.id} at path $path with addresses: " +s"${brokerInfo.broker.endPoints.map(_.connectionString).mkString(",")}, czxid (broker epoch): ${stat.getCzxid}")//返回czxid (broker epoch)stat.getCzxid}}
2、开始选举
class KafkaController(...){private def processReelect(): Unit = {maybeResign()elect()}private def maybeResign(): Unit = {val wasActiveBeforeChange = isActive//在zk上注册节点改变事件,当controller改变时触发,为下面的选举做铺垫zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)activeControllerId = zkClient.getControllerId.getOrElse(-1)if (wasActiveBeforeChange && !isActive) {//当当前broker辞去controller职务时触发onControllerResignation()}}private def elect(): Unit = {//获取 活动状态 contoller ,如果 集群已经启动了很长时间,新增了一台broker,那么此时会获得 当下的controller ,//如果此时集群刚刚启动,那么此时没有 活动状态的 controller ,返回的结果就是 -1activeControllerId = zkClient.getControllerId.getOrElse(-1)/** 我们可以在初始启动和handleDeleted ZK回调期间到达这里。由于潜在的竞争条件,当我们到达这里时,控制器可能已经被选中了。如果此代理已经是控制器,则此检查将防止以下createEphemeralPath方法进入无限循环。*/if (activeControllerId != -1) {//如果当下已经有 activeControllerId 那么就停止选举 ,否则继续往下走//Broker $activeControllerId 已被选为控制器,因此停止选举过程debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")return}//try中会发生如下情况//1、正常运行:当选controller//2、异常:// 1、ControllerMovedException // 1、其他broker成功当选controller// 2、controller已经当选,但刚刚离职,需要重新选举// 2、Throwable 该节点当选controller,但是就职时出错了。删除该controller,重新选举// try {val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)controllerContext.epoch = epochcontrollerContext.epochZkVersion = epochZkVersionactiveControllerId = config.brokerId//${config.brokerId}已成功当选为控制器。Epoch增加到${controllerContext.eepoch},Epoch zk版本现在是${controller Context.eepoch ZkVersion}”info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} " +s"and epoch zk version is now ${controllerContext.epochZkVersion}")//成功当选controller,并开始履行作为该角色的责任onControllerFailover()} catch {case e: ControllerMovedException =>//重新开始监听目录变化maybeResign()if (activeControllerId != -1)debug(s"代理$activeControllerId被选为控制器,而不是代理${config.brokerId}", e)elsewarn("管制员已经当选,但刚刚辞职,这将导致另一轮选举", e)case t: Throwable =>error(s"在代理${config.brokerId}上选择或成为控制器时出错。立即触发控制器移动", t)triggerControllerMove()}}}
在选举前zkClient注册的 controllerChangeHandler 事件其实就是观察 controller目录的变化
class ControllerChangeHandler(eventManager: ControllerEventManager) extends ZNodeChangeHandler {//controller目录override val path: String = ControllerZNode.pathoverride def handleCreation(): Unit = eventManager.put(ControllerChange)override def handleDeletion(): Unit = eventManager.put(Reelect)override def handleDataChange(): Unit = eventManager.put(ControllerChange)
}
六、总结
1、设置zookeeper,如:zookeeper.connect=hostname1:2181,hostname2:2181,hostname2:2181/kafka并创建持久化目录:consumers、brokers/ids、brokers/topics、config/changes、admin/delete_topics、brokers/seqid、isr_change_notification、latest_producer_id_block、log_dir_event_notification
2、验证元数据属性集成是否有效,主要时看每个broker是否有了唯一的id
3、将每个broker的id注册到zookeeper
4、启动KafkaController
5、启动ControllerEventThread线程并不断消费LinkedBlockingQueue中事件
6、向队列注册RegisterBrokerAndReelect事件、Startup事件
7、首先处理RegisterBrokerAndReelect事件
8、向zookeeper注册broker,并建立临时znode
9、注册controllerChangeHandler 事件其实就是观察 controller目录的变化
10、每个broker开始向zookeeper将自己注册为controller
11、正常情况下只有一个broker成功注册成功,其他broker抛出ControllerMovedException继续监控controller目录的变化
12、如果选举controller成功,但是在就职时失败会里面进行卸任工作,并进行新一轮选举
相关文章:
Kafka-Controller选举
一、上下文 《Kafka-broker粗粒度启动流程》博客中我们分析了broker的大致启动流程,这个时候每个broker都不是controller角色,下面我们就来看下它是如何选举出来的吧 二、设置ZooKeeper ZooKeeper是一个开源的分布式协调服务,主要用于分…...

必知的 Vue3 组件传值技巧:解锁组件交互新姿势
父传子defineProps 基本概念 在 Vue 3 中,父传子是一种组件间通信的方式,用于将父组件的数据传递给子组件。这种通信方式可以让组件之间更好地协作,实现功能的复用和模块的划分。 实现步骤 在父组件中传递数据 App.vue <template>…...

【论文阅读】医学SAM适配器:适应医学图像分割的任意分割模型
【论文阅读】医学SAM适配器:适应医学图像分割的任意分割模型 文章目录 【论文阅读】医学SAM适配器:适应医学图像分割的任意分割模型一、介绍二、联系工作三、方法四、实验 Medical SAM Adapter: Adapting Segment Anything Model for Medical Image Segm…...

创新体验触手可及 紫光展锐携手影目科技推出AI眼镜开放平台
近日,紫光展锐携手影目科技共同发布了搭载展锐W517芯片的影目X系列AI眼镜开放平台。这一产品的推出标志着双方在智能穿戴领域的深度协作,将紫光展锐的领先芯片技术与影目的产品创新相融合,合力打造全球智能眼镜市场的标杆产品。这一战略布局不…...

115页PDF | 埃森哲_XX集团信息化能力成熟度评估及能力提升方案(限免下载)
一、前言 这份报告是埃森哲_XX集团信息化能力成熟度评估及能力提升方案,报告首先分析了集团的战略规划,包括调整优化期、转型升级期和跨越发展期的目标,然后识别了集团面临的内部挑战和外部压力,如管控体系不完善、业务板块多样化…...

NumPy,科学计算领域中的Python明星库!
NumPy,科学计算领域中的Python明星库! 嘿,大家好呀,今天我们要来聊聊在科学计算领域里大放异彩的 NumPy 库。NumPy 是 Python 中的一个开源库,它提供了大量的数学函数,能够高效地处理大型数组与矩阵运算。…...

Hadoop生态圈框架部署(六)- HBase完全分布式部署
文章目录 前言一、Hbase完全分布式部署(手动部署)1. 下载Hbase2. 上传安装包3. 解压HBase安装包4. 配置HBase配置文件4.1 修改hbase-env.sh配置文件4.2 修改hbase-site.xml配置文件4.3 修改regionservers配置文件4.4 删除hbase中slf4j-reload4j-1.7.33.j…...

python怎么解决中文注释
最近开发学习Python,当加入中文注释时,运行程序报错: File "red.py", line 10 SyntaxError: Non-ASCII character \xe5 in file red.py on line 10, but no encoding declared; see http://www.python.org/peps/pep-0263.html fo…...

【Unity】Game Framework框架学习使用
前言 之前用过一段时间的Game Framework框架,后来有那么一段时间都做定制小软件,框架就没再怎么使用了。 现在要做大型项目了,感觉还是用框架好一些。于是又把Game Framework拾起来了。 这篇文章主要是讲Game Framework这个框架是怎么用的…...

Linux(CentOS 7) yum一键安装mysql8
1、通过yum安装 (1)下载mysql 在Linux找个地方输入以下命令 wget https://dev.mysql.com/get/mysql80-community-release-el7-3.noarch.rpm (2)安装mysql yum 仓库配置文件 [rootVM-8-15-centos ~]# sudo rpm -Uvh mysql80-c…...

Kafka 快速入门(一)
1.1安装部署 1.1.1 集群规划 bigdata01bigdata02bigdata03zookeeperzookeeperzookeeperkafkakafkakafka 1.1.2 集群部署 官方下载地址:http://kafka.apache.org/downloads.html 检查三台虚拟机的zk是否启动:zkServer.sh start 默认启动方式 1)解压…...

丹摩征文活动 | SD3+ComfyUI的图像部署实践
一、前言 作为Stability AI 推出的一款革命性的文本转图像开源模型,Stable Diffusion 3(简称SD3)在图像质量、文本内容生成、理解复杂指令以及资源利用效率方面,都有着不俗的表现。 SD3的Medium版本,拥有20亿参数&am…...

H.265流媒体播放器EasyPlayer.js网页web无插件播放器:如何优化加载速度
在当今的网络环境中,用户对于视频播放体验的要求越来越高,尤其是对于视频加载速度的期待。EasyPlayer.js网页web无插件播放器作为一款专为现代Web环境设计的流媒体播放器,它在优化加载速度方面采取了多种措施,以确保用户能够享受到…...

【Linux】进程状态的优先级
大家好呀,我是残念,希望在你看完之后,能对你有所帮助,有什么不足请指正!共同学习交流哦 本文由:残念ing原创CSDN首发,如需要转载请通知 个人主页:残念ing-CSDN博客,欢迎各…...
react中的组件传参
在React中,组件之间的数据传递是构建用户界面的关键部分。根据不同的需求和场景,有多种方式可以在React中传递参数,以下是对这些方式的详细说明: 一、通过props传递参数 这是React中最基本和最常用的数据传递方式。父组件通过属…...
HTML5:网页开发的新纪元
文章目录 前言一、HTML5技术概述二、主要特点及优势1. 多媒体支持2. 图形绘制3. 离线存储4. 表单控件增强5. 响应式设计 三、应用场景1. 游戏开发2. 在线教育3. 电子商务 四、面临的挑战结语 前言 在互联网技术快速发展的今天,H5(HTML5的简称࿰…...

CKA认证 | Day2 K8s内部监控与日志
第三章 Kubernetes监控与日志 1、查看集群资源状态 在 Kubernetes 集群中,查看集群资源状态和组件状态是非常重要的操作。以下是一些常用的命令和解释,帮助你更好地管理和监控 Kubernetes 集群。 1.1 查看master组件状态 Kubernetes 的 Master 组件包…...

电信网关配置管理系统 upload_channels.php 文件上传致RCE漏洞复现
0x01 产品简介 中国电信集团有限公司(英文名称“China Telecom”、简称“中国电信”)成立于2000年9月,是中国特大型国有通信企业、上海世博会全球合作伙伴。电信网关配置管理系统是一个用于管理和配置电信网络中网关设备的软件系统。它可以帮助网络管理员实现对网关设备的远…...

ubuntu更改max_map_count
在Ubuntu系统中,max_map_count是一个内核参数,用于限制每个进程可以拥有的内存段的数量。对于Elasticsearch等需要大量内存映射的应用,可能需要增加这个值。 执行以下步骤来更改max_map_count的值: 打开终端。 输入以下命令以编…...
《NPU、CPU、GPU 算力定义和计算方式》
一、引言 在人工智能时代,算力成为了推动技术发展的关键因素之一。不同类型的处理器,如中央处理器(CPU)、图形处理器(GPU)和神经网络处理器(NPU),在算力方面有着各自的特…...
<6>-MySQL表的增删查改
目录 一,create(创建表) 二,retrieve(查询表) 1,select列 2,where条件 三,update(更新表) 四,delete(删除表…...

.Net框架,除了EF还有很多很多......
文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...
python爬虫:Newspaper3k 的详细使用(好用的新闻网站文章抓取和解析的Python库)
更多内容请见: 爬虫和逆向教程-专栏介绍和目录 文章目录 一、Newspaper3k 概述1.1 Newspaper3k 介绍1.2 主要功能1.3 典型应用场景1.4 安装二、基本用法2.2 提取单篇文章的内容2.2 处理多篇文档三、高级选项3.1 自定义配置3.2 分析文章情感四、实战案例4.1 构建新闻摘要聚合器…...

如何在最短时间内提升打ctf(web)的水平?
刚刚刷完2遍 bugku 的 web 题,前来答题。 每个人对刷题理解是不同,有的人是看了writeup就等于刷了,有的人是收藏了writeup就等于刷了,有的人是跟着writeup做了一遍就等于刷了,还有的人是独立思考做了一遍就等于刷了。…...

企业如何增强终端安全?
在数字化转型加速的今天,企业的业务运行越来越依赖于终端设备。从员工的笔记本电脑、智能手机,到工厂里的物联网设备、智能传感器,这些终端构成了企业与外部世界连接的 “神经末梢”。然而,随着远程办公的常态化和设备接入的爆炸式…...

SAP学习笔记 - 开发26 - 前端Fiori开发 OData V2 和 V4 的差异 (Deepseek整理)
上一章用到了V2 的概念,其实 Fiori当中还有 V4,咱们这一章来总结一下 V2 和 V4。 SAP学习笔记 - 开发25 - 前端Fiori开发 Remote OData Service(使用远端Odata服务),代理中间件(ui5-middleware-simpleproxy)-CSDN博客…...
管理学院权限管理系统开发总结
文章目录 🎓 管理学院权限管理系统开发总结 - 现代化Web应用实践之路📝 项目概述🏗️ 技术架构设计后端技术栈前端技术栈 💡 核心功能特性1. 用户管理模块2. 权限管理系统3. 统计报表功能4. 用户体验优化 🗄️ 数据库设…...

android13 app的触摸问题定位分析流程
一、知识点 一般来说,触摸问题都是app层面出问题,我们可以在ViewRootImpl.java添加log的方式定位;如果是touchableRegion的计算问题,就会相对比较麻烦了,需要通过adb shell dumpsys input > input.log指令,且通过打印堆栈的方式,逐步定位问题,并找到修改方案。 问题…...
libfmt: 现代C++的格式化工具库介绍与酷炫功能
libfmt: 现代C的格式化工具库介绍与酷炫功能 libfmt 是一个开源的C格式化库,提供了高效、安全的文本格式化功能,是C20中引入的std::format的基础实现。它比传统的printf和iostream更安全、更灵活、性能更好。 基本介绍 主要特点 类型安全:…...
【安全篇】金刚不坏之身:整合 Spring Security + JWT 实现无状态认证与授权
摘要 本文是《Spring Boot 实战派》系列的第四篇。我们将直面所有 Web 应用都无法回避的核心问题:安全。文章将详细阐述认证(Authentication) 与授权(Authorization的核心概念,对比传统 Session-Cookie 与现代 JWT(JS…...