Kafka-Controller选举
一、上下文
《Kafka-broker粗粒度启动流程》博客中我们分析了broker的大致启动流程,这个时候每个broker都不是controller角色,下面我们就来看下它是如何选举出来的吧
二、设置ZooKeeper
ZooKeeper是一个开源的分布式协调服务,主要用于分布式系统中各节点的协调和管理。Kafka的Controller选举也一样用到了它。
override def startup(): Unit = {//....initZkClient(time)configRepository = new ZkConfigRepository(new AdminZkClient(zkClient))//....}private def initZkClient(time: Time): Unit = {//config.zkConnect//zookeeper.connect=hostname1:2181,hostname2:2181,hostname2:2181/kafkainfo(s"Connecting to zookeeper on ${config.zkConnect}")_zkClient = KafkaZkClient.createZkClient("Kafka server", time, config, zkClientConfig)//如果需要,在ZK中预先创建顶级路径。_zkClient.createTopLevelPaths()}def createTopLevelPaths(): Unit = {//创建 Persistent 持久化的 zk 路径ZkData.PersistentZkPaths.foreach(makeSurePersistentPathExists)}//确保ZK中存在持久路径def makeSurePersistentPathExists(path: String): Unit = {createRecursive(path, data = null, throwIfPathExists = false)}
1、KafkaZkClient
KafkaZkClient是在Kafka.zookeeper.ZooKeeperClient之上提供更高级别的Kafka特定操作。
实现说明:此类包括各种组件(Controller, Configs, Old Consumer等)的方法,在某些情况下会从调用包中返回类的实例。
def createZkClient(name: String, time: Time, config: KafkaConfig, zkClientConfig: ZKClientConfig): KafkaZkClient = {//...KafkaZkClient(...)}
2、AdminZkClient
它提供与ZooKeeper交互的管理员相关方法。
class AdminZkClient(...){//创建topicdef createTopic(...){...}//获取broker元数据def getBrokerMetadatas(...){...}//创建主题并可选地验证其参数。请注意,TopicCommand也使用此方法。def createTopicWithAssignment(...){...}//验证主题创建参数def validateTopicCreate(...){...}//删除topic //为给定主题创建删除路径def deleteTopic(...){...}//使用可选的副本分配向现有主题添加分区。请注意,TopicCommand使用此方法。def addPartitions(...){...}//将broker从实体名称解析为整数iddef parseBroker(...){...}//.....
}
3、ZkConfigRepository
zookeeper的配置仓库,也就是kafka在zookeeper中配置信息。
class ZkConfigRepository(adminZkClient: AdminZkClient) extends ConfigRepository {override def config(configResource: ConfigResource): Properties = {//....//从zookeeper的目录下读取数据,并封装成实体(topic、broker、client-id、user、user/clients/client-id、ip)adminZkClient.fetchEntityConfig(configTypeForZk, effectiveName)}
}
4、ZkData
object ZkData {//这些是kafka broker 启动时应该存在的持久ZK路径。val PersistentZkPaths: Seq[String] = Seq(ConsumerPathZNode.path, // old consumer pathBrokerIdsZNode.path,TopicsZNode.path,ConfigEntityChangeNotificationZNode.path,DeleteTopicsZNode.path,BrokerSequenceIdZNode.path,IsrChangeNotificationZNode.path,ProducerIdBlockZNode.path,LogDirEventNotificationZNode.path) ++ ConfigType.ALL.asScala.map(ConfigEntityTypeZNode.path)
}//旧的consumer在zk上的路径
object ConsumerPathZNode {def path = "/consumers"
}object BrokerIdsZNode {def path = s"${BrokersZNode.path}/ids"def encode: Array[Byte] = null
}object TopicsZNode {def path = s"${BrokersZNode.path}/topics"
}object ConfigEntityChangeNotificationZNode {def path = s"${ConfigZNode.path}/changes"
}object DeleteTopicsZNode {def path = s"${AdminZNode.path}/delete_topics"
}object BrokerSequenceIdZNode {def path = s"${BrokersZNode.path}/seqid"
}object IsrChangeNotificationZNode {def path = "/isr_change_notification"
}object ProducerIdBlockZNode {val CurrentVersion: Long = 1Ldef path = "/latest_producer_id_block"def generateProducerIdBlockJson(producerIdBlock: ProducerIdsBlock): Array[Byte] = {Json.encodeAsBytes(Map("version" -> CurrentVersion,"broker" -> producerIdBlock.assignedBrokerId,"block_start" -> producerIdBlock.firstProducerId.toString,"block_end" -> producerIdBlock.lastProducerId.toString).asJava)}object LogDirEventNotificationZNode {def path = "/log_dir_event_notification"
}
三、验证元数据属性集成是否有效
1、meta.properties文件是否始终设置了cluster.id
2、meta.properties文件是否始终设置了node.id或者 broker.id
initialMetaPropsEnsemble.verify(Optional.of(_clusterId), verificationId, verificationFlags)
四、动态broker初始化
动态broker配置存储在ZooKeeper中,可以在两个级别定义:
1、每个代理的配置持久化在/configs/brokers/{brokerId} 这些可以使用AdminClient使用资源名称brokerId进行描述/更改
2、整个集群的默认值持续存在于/configs/brokers/<default> 这些可以使用AdminClient使用空资源名称进行描述/更改。
broker配置的优先级顺序为:
1、DYNAMIC_BROKER_CONFIG:存储在ZK中的/configs/brokers/{brokerId}
2、DYNAMIC_DEFAULT_BROKER_CONFIG: 存储在ZK中的//configs/brokers/<default>
3、STATIC_BROKER_CONFIG:启动代理时使用的属性,通常来自server.properties文件
4、DEFAULT_CONFIG:KafkaConfig中定义的默认配置
config.dynamicConfig.initialize(Some(zkClient), clientMetricsReceiverPluginOpt = None)
五、启动KafkaController
_kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, metadataCache, threadNamePrefix)
kafkaController.startup()
1、KafkaController结构
class KafkaController(...){//事件管理private[controller] val eventManager = new ControllerEventManager(config.brokerId, this, time,controllerContext.stats.rateAndTimeMetrics)//如果brokerid = 当前的controllerid 那么就返回truedef isActive: Boolean = activeControllerId == config.brokerId@volatile private var brokerInfo = initialBrokerInfo@volatile private var _brokerEpoch = initialBrokerEpoch//启动def startup(): Unit = {zkClient.registerStateChangeHandler(new StateChangeHandler {//ControllerHandler = "controller-state-change-handler"override val name: String = StateChangeHandlers.ControllerHandleroverride def afterInitializingSession(): Unit = {eventManager.put(RegisterBrokerAndReelect)}override def beforeInitializingSession(): Unit = {val queuedEvent = eventManager.clearAndPut(Expire)//阻止新会话的初始化,直到处理过期事件,这确保在创建新会话之前已处理所有挂起的事件queuedEvent.awaitProcessing()}})eventManager.put(Startup)eventManager.start()}override def process(event: ControllerEvent): Unit = {event match {//.....case RegisterBrokerAndReelect =>processRegisterBrokerAndReelect()case Startup =>processStartup()//.....}}
}
1、ControllerEventManager结构
class ControllerEventManager(...){//用串行化队列代替锁private val queue = new LinkedBlockingQueue[QueuedEvent]//ControllerEventThreadName = "controller-event-thread"private[controller] var thread = new ControllerEventThread(ControllerEventThreadName)def start(): Unit = thread.start()def put(event: ControllerEvent): QueuedEvent = inLock(putLock) {val queuedEvent = new QueuedEvent(event, time.milliseconds())queue.put(queuedEvent)queuedEvent}class ControllerEventThread(name: String) extends ShutdownableThread(...){override def doWork(): Unit = {//从队列获取事件,主要是controller相关的事件val dequeued = pollFromEventQueue()dequeued.event match {case controllerEvent =>def process(): Unit = dequeued.process(processor)}}}}
}
ControllerEventManager中的ControllerEventThread的父类是ShutdownableThread,它里面有真正的run()且调起了doWork(),doWork()又调起了process(),因此真正执行的是process()
public abstract class ShutdownableThread extends Thread {public abstract void doWork();public void run() {while (isRunning())doWork();}
}
这是一个死循环,也就是后面只要往队列中添加事件,会自动执行对应方法。从KafkaController的startup()中我们知道放了两个事件:RegisterBrokerAndReelect和Startup,下面我们来看看它们里面做了什么
2、RegisterBrokerAndReelect事件处理
private def processRegisterBrokerAndReelect(): Unit = {_brokerEpoch = zkClient.registerBroker(brokerInfo)processReelect()}
1、向zookeeper注册broker
class KafkaZkClient private[zk] (...{def registerBroker(brokerInfo: BrokerInfo): Long = {//brokers/ids/brokeridval path = brokerInfo.path//创建 对应的 brokerid 的 临时znode节点,说明:当该brokers挂掉后会随之消失val stat = checkedEphemeralCreate(path, brokerInfo.toJsonBytes)info(s"Registered broker ${brokerInfo.broker.id} at path $path with addresses: " +s"${brokerInfo.broker.endPoints.map(_.connectionString).mkString(",")}, czxid (broker epoch): ${stat.getCzxid}")//返回czxid (broker epoch)stat.getCzxid}}
2、开始选举
class KafkaController(...){private def processReelect(): Unit = {maybeResign()elect()}private def maybeResign(): Unit = {val wasActiveBeforeChange = isActive//在zk上注册节点改变事件,当controller改变时触发,为下面的选举做铺垫zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)activeControllerId = zkClient.getControllerId.getOrElse(-1)if (wasActiveBeforeChange && !isActive) {//当当前broker辞去controller职务时触发onControllerResignation()}}private def elect(): Unit = {//获取 活动状态 contoller ,如果 集群已经启动了很长时间,新增了一台broker,那么此时会获得 当下的controller ,//如果此时集群刚刚启动,那么此时没有 活动状态的 controller ,返回的结果就是 -1activeControllerId = zkClient.getControllerId.getOrElse(-1)/** 我们可以在初始启动和handleDeleted ZK回调期间到达这里。由于潜在的竞争条件,当我们到达这里时,控制器可能已经被选中了。如果此代理已经是控制器,则此检查将防止以下createEphemeralPath方法进入无限循环。*/if (activeControllerId != -1) {//如果当下已经有 activeControllerId 那么就停止选举 ,否则继续往下走//Broker $activeControllerId 已被选为控制器,因此停止选举过程debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")return}//try中会发生如下情况//1、正常运行:当选controller//2、异常:// 1、ControllerMovedException // 1、其他broker成功当选controller// 2、controller已经当选,但刚刚离职,需要重新选举// 2、Throwable 该节点当选controller,但是就职时出错了。删除该controller,重新选举// try {val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)controllerContext.epoch = epochcontrollerContext.epochZkVersion = epochZkVersionactiveControllerId = config.brokerId//${config.brokerId}已成功当选为控制器。Epoch增加到${controllerContext.eepoch},Epoch zk版本现在是${controller Context.eepoch ZkVersion}”info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} " +s"and epoch zk version is now ${controllerContext.epochZkVersion}")//成功当选controller,并开始履行作为该角色的责任onControllerFailover()} catch {case e: ControllerMovedException =>//重新开始监听目录变化maybeResign()if (activeControllerId != -1)debug(s"代理$activeControllerId被选为控制器,而不是代理${config.brokerId}", e)elsewarn("管制员已经当选,但刚刚辞职,这将导致另一轮选举", e)case t: Throwable =>error(s"在代理${config.brokerId}上选择或成为控制器时出错。立即触发控制器移动", t)triggerControllerMove()}}}
在选举前zkClient注册的 controllerChangeHandler 事件其实就是观察 controller目录的变化
class ControllerChangeHandler(eventManager: ControllerEventManager) extends ZNodeChangeHandler {//controller目录override val path: String = ControllerZNode.pathoverride def handleCreation(): Unit = eventManager.put(ControllerChange)override def handleDeletion(): Unit = eventManager.put(Reelect)override def handleDataChange(): Unit = eventManager.put(ControllerChange)
}
六、总结
1、设置zookeeper,如:zookeeper.connect=hostname1:2181,hostname2:2181,hostname2:2181/kafka并创建持久化目录:consumers、brokers/ids、brokers/topics、config/changes、admin/delete_topics、brokers/seqid、isr_change_notification、latest_producer_id_block、log_dir_event_notification
2、验证元数据属性集成是否有效,主要时看每个broker是否有了唯一的id
3、将每个broker的id注册到zookeeper
4、启动KafkaController
5、启动ControllerEventThread线程并不断消费LinkedBlockingQueue中事件
6、向队列注册RegisterBrokerAndReelect事件、Startup事件
7、首先处理RegisterBrokerAndReelect事件
8、向zookeeper注册broker,并建立临时znode
9、注册controllerChangeHandler 事件其实就是观察 controller目录的变化
10、每个broker开始向zookeeper将自己注册为controller
11、正常情况下只有一个broker成功注册成功,其他broker抛出ControllerMovedException继续监控controller目录的变化
12、如果选举controller成功,但是在就职时失败会里面进行卸任工作,并进行新一轮选举
相关文章:
Kafka-Controller选举
一、上下文 《Kafka-broker粗粒度启动流程》博客中我们分析了broker的大致启动流程,这个时候每个broker都不是controller角色,下面我们就来看下它是如何选举出来的吧 二、设置ZooKeeper ZooKeeper是一个开源的分布式协调服务,主要用于分…...
必知的 Vue3 组件传值技巧:解锁组件交互新姿势
父传子defineProps 基本概念 在 Vue 3 中,父传子是一种组件间通信的方式,用于将父组件的数据传递给子组件。这种通信方式可以让组件之间更好地协作,实现功能的复用和模块的划分。 实现步骤 在父组件中传递数据 App.vue <template>…...
【论文阅读】医学SAM适配器:适应医学图像分割的任意分割模型
【论文阅读】医学SAM适配器:适应医学图像分割的任意分割模型 文章目录 【论文阅读】医学SAM适配器:适应医学图像分割的任意分割模型一、介绍二、联系工作三、方法四、实验 Medical SAM Adapter: Adapting Segment Anything Model for Medical Image Segm…...
创新体验触手可及 紫光展锐携手影目科技推出AI眼镜开放平台
近日,紫光展锐携手影目科技共同发布了搭载展锐W517芯片的影目X系列AI眼镜开放平台。这一产品的推出标志着双方在智能穿戴领域的深度协作,将紫光展锐的领先芯片技术与影目的产品创新相融合,合力打造全球智能眼镜市场的标杆产品。这一战略布局不…...
115页PDF | 埃森哲_XX集团信息化能力成熟度评估及能力提升方案(限免下载)
一、前言 这份报告是埃森哲_XX集团信息化能力成熟度评估及能力提升方案,报告首先分析了集团的战略规划,包括调整优化期、转型升级期和跨越发展期的目标,然后识别了集团面临的内部挑战和外部压力,如管控体系不完善、业务板块多样化…...
NumPy,科学计算领域中的Python明星库!
NumPy,科学计算领域中的Python明星库! 嘿,大家好呀,今天我们要来聊聊在科学计算领域里大放异彩的 NumPy 库。NumPy 是 Python 中的一个开源库,它提供了大量的数学函数,能够高效地处理大型数组与矩阵运算。…...
Hadoop生态圈框架部署(六)- HBase完全分布式部署
文章目录 前言一、Hbase完全分布式部署(手动部署)1. 下载Hbase2. 上传安装包3. 解压HBase安装包4. 配置HBase配置文件4.1 修改hbase-env.sh配置文件4.2 修改hbase-site.xml配置文件4.3 修改regionservers配置文件4.4 删除hbase中slf4j-reload4j-1.7.33.j…...
python怎么解决中文注释
最近开发学习Python,当加入中文注释时,运行程序报错: File "red.py", line 10 SyntaxError: Non-ASCII character \xe5 in file red.py on line 10, but no encoding declared; see http://www.python.org/peps/pep-0263.html fo…...
【Unity】Game Framework框架学习使用
前言 之前用过一段时间的Game Framework框架,后来有那么一段时间都做定制小软件,框架就没再怎么使用了。 现在要做大型项目了,感觉还是用框架好一些。于是又把Game Framework拾起来了。 这篇文章主要是讲Game Framework这个框架是怎么用的…...
Linux(CentOS 7) yum一键安装mysql8
1、通过yum安装 (1)下载mysql 在Linux找个地方输入以下命令 wget https://dev.mysql.com/get/mysql80-community-release-el7-3.noarch.rpm (2)安装mysql yum 仓库配置文件 [rootVM-8-15-centos ~]# sudo rpm -Uvh mysql80-c…...
Kafka 快速入门(一)
1.1安装部署 1.1.1 集群规划 bigdata01bigdata02bigdata03zookeeperzookeeperzookeeperkafkakafkakafka 1.1.2 集群部署 官方下载地址:http://kafka.apache.org/downloads.html 检查三台虚拟机的zk是否启动:zkServer.sh start 默认启动方式 1)解压…...
丹摩征文活动 | SD3+ComfyUI的图像部署实践
一、前言 作为Stability AI 推出的一款革命性的文本转图像开源模型,Stable Diffusion 3(简称SD3)在图像质量、文本内容生成、理解复杂指令以及资源利用效率方面,都有着不俗的表现。 SD3的Medium版本,拥有20亿参数&am…...
H.265流媒体播放器EasyPlayer.js网页web无插件播放器:如何优化加载速度
在当今的网络环境中,用户对于视频播放体验的要求越来越高,尤其是对于视频加载速度的期待。EasyPlayer.js网页web无插件播放器作为一款专为现代Web环境设计的流媒体播放器,它在优化加载速度方面采取了多种措施,以确保用户能够享受到…...
【Linux】进程状态的优先级
大家好呀,我是残念,希望在你看完之后,能对你有所帮助,有什么不足请指正!共同学习交流哦 本文由:残念ing原创CSDN首发,如需要转载请通知 个人主页:残念ing-CSDN博客,欢迎各…...
react中的组件传参
在React中,组件之间的数据传递是构建用户界面的关键部分。根据不同的需求和场景,有多种方式可以在React中传递参数,以下是对这些方式的详细说明: 一、通过props传递参数 这是React中最基本和最常用的数据传递方式。父组件通过属…...
HTML5:网页开发的新纪元
文章目录 前言一、HTML5技术概述二、主要特点及优势1. 多媒体支持2. 图形绘制3. 离线存储4. 表单控件增强5. 响应式设计 三、应用场景1. 游戏开发2. 在线教育3. 电子商务 四、面临的挑战结语 前言 在互联网技术快速发展的今天,H5(HTML5的简称࿰…...
CKA认证 | Day2 K8s内部监控与日志
第三章 Kubernetes监控与日志 1、查看集群资源状态 在 Kubernetes 集群中,查看集群资源状态和组件状态是非常重要的操作。以下是一些常用的命令和解释,帮助你更好地管理和监控 Kubernetes 集群。 1.1 查看master组件状态 Kubernetes 的 Master 组件包…...
电信网关配置管理系统 upload_channels.php 文件上传致RCE漏洞复现
0x01 产品简介 中国电信集团有限公司(英文名称“China Telecom”、简称“中国电信”)成立于2000年9月,是中国特大型国有通信企业、上海世博会全球合作伙伴。电信网关配置管理系统是一个用于管理和配置电信网络中网关设备的软件系统。它可以帮助网络管理员实现对网关设备的远…...
ubuntu更改max_map_count
在Ubuntu系统中,max_map_count是一个内核参数,用于限制每个进程可以拥有的内存段的数量。对于Elasticsearch等需要大量内存映射的应用,可能需要增加这个值。 执行以下步骤来更改max_map_count的值: 打开终端。 输入以下命令以编…...
《NPU、CPU、GPU 算力定义和计算方式》
一、引言 在人工智能时代,算力成为了推动技术发展的关键因素之一。不同类型的处理器,如中央处理器(CPU)、图形处理器(GPU)和神经网络处理器(NPU),在算力方面有着各自的特…...
Facebook Instant Game变现全攻略:如何通过广告和内购让你的HTML5游戏赚钱
Facebook Instant Game变现全攻略:如何通过广告和内购让你的HTML5游戏赚钱 在HTML5游戏开发领域,Facebook Instant Game已经成为不可忽视的平台。这个无需下载、即点即玩的游戏生态系统,为开发者提供了独特的变现机会。不同于传统应用商店30%…...
CentOS7 无法输入中文 CentOS7 中文输入法设置
一、问题描述 安装完 CentOS7 后,不能输入中文,按 WIN空格 也无法切换到中文输入法 二、解决方案 右键桌面 -> 打开终端(E) -> 执行命令 ibus-setup -> 输入法 -> 添加(A) -> 汉语 -> Intelligent Pinyin -> 添加(A) ibus-setup&am…...
ai辅助开发:向快马描述你的微服务项目,智能生成全套java环境配置与编排文件
最近在搭建一个分布式微服务项目时,遇到了环境配置这个老大难问题。不同模块需要不同中间件,团队成员电脑环境各异,每次新人加入都要折腾半天环境。好在发现了InsCode(快马)平台的AI辅助开发功能,用自然语言描述需求就能自动生成全…...
PyCharm配置PySide6工具链避坑指南:解决虚拟环境路径、命令报错那些事儿
PyCharm配置PySide6工具链避坑指南:解决虚拟环境路径、命令报错那些事儿 刚接触PySide6开发的朋友,十有八九会在PyCharm配置Designer、UIC和RCC工具时踩坑。明明照着教程一步步操作,却总是遇到"程序不存在"、"命令执行错误&qu…...
WarcraftHelper技术适配方案:让经典RTS游戏重获现代硬件支持
WarcraftHelper技术适配方案:让经典RTS游戏重获现代硬件支持 【免费下载链接】WarcraftHelper Warcraft III Helper , support 1.20e, 1.24e, 1.26a, 1.27a, 1.27b 项目地址: https://gitcode.com/gh_mirrors/wa/WarcraftHelper 痛点解析:魔兽争霸…...
利用快马平台快速构建鸿蒙pc镜像下载验证工具原型
最近在研究鸿蒙系统的PC版本适配工作,发现获取官方镜像是个不小的门槛。官方渠道的下载链接分散在不同页面,版本信息也不够直观,每次下载完还得手动校验文件完整性,整个过程相当繁琐。于是想做个工具来简化这个流程,正…...
JS 入门通关手册(35):执行上下文、调用栈与作用域链深度解析
一、什么是执行上下文?执行上下文(Execution Context)是 JS 代码运行时的环境,JS 引擎会为每一段可执行代码创建一个上下文,用来管理变量、作用域、this 指向等。简单理解:一段代码在哪里跑、能访问什么、t…...
用STM32F103C8T6和NRF24L01自制遥控器,从硬件选型到代码调试的完整避坑指南
STM32F103C8T6与NRF24L01遥控器开发实战:从硬件设计到软件调试的全流程解析 在创客和嵌入式开发领域,无线遥控系统一直是热门话题。无论是机器人控制、无人机飞行还是智能家居应用,稳定可靠的遥控器都是不可或缺的核心组件。本文将详细介绍如…...
WMIC命令行高效卸载Windows软件:从入门到精通
1. 为什么选择WMIC卸载软件? 每次电脑卡顿的时候,打开C盘一看,总会被各种不明所以的软件占满空间。传统的卸载方式要经过"控制面板-程序和功能-找到目标-点击卸载"的繁琐流程,而WMIC只需要几行命令就能搞定。我在帮同事…...
手把手实战:微信小程序+SpringBoot+Vue3全栈开发指南(二)
1. 从Vue2升级到Vue3的核心变化 很多开发者还在使用Vue2进行微信小程序开发,但Vue3已经带来了许多革命性的改进。我在最近的一个电商小程序项目中完成了技术栈升级,实测下来性能提升非常明显。Vue3最大的变化是引入了Composition API,这让我们…...
