Kafka-Controller选举
一、上下文
《Kafka-broker粗粒度启动流程》博客中我们分析了broker的大致启动流程,这个时候每个broker都不是controller角色,下面我们就来看下它是如何选举出来的吧
二、设置ZooKeeper
ZooKeeper是一个开源的分布式协调服务,主要用于分布式系统中各节点的协调和管理。Kafka的Controller选举也一样用到了它。
override def startup(): Unit = {//....initZkClient(time)configRepository = new ZkConfigRepository(new AdminZkClient(zkClient))//....}private def initZkClient(time: Time): Unit = {//config.zkConnect//zookeeper.connect=hostname1:2181,hostname2:2181,hostname2:2181/kafkainfo(s"Connecting to zookeeper on ${config.zkConnect}")_zkClient = KafkaZkClient.createZkClient("Kafka server", time, config, zkClientConfig)//如果需要,在ZK中预先创建顶级路径。_zkClient.createTopLevelPaths()}def createTopLevelPaths(): Unit = {//创建 Persistent 持久化的 zk 路径ZkData.PersistentZkPaths.foreach(makeSurePersistentPathExists)}//确保ZK中存在持久路径def makeSurePersistentPathExists(path: String): Unit = {createRecursive(path, data = null, throwIfPathExists = false)}
1、KafkaZkClient
KafkaZkClient是在Kafka.zookeeper.ZooKeeperClient之上提供更高级别的Kafka特定操作。
实现说明:此类包括各种组件(Controller, Configs, Old Consumer等)的方法,在某些情况下会从调用包中返回类的实例。
def createZkClient(name: String, time: Time, config: KafkaConfig, zkClientConfig: ZKClientConfig): KafkaZkClient = {//...KafkaZkClient(...)}
2、AdminZkClient
它提供与ZooKeeper交互的管理员相关方法。
class AdminZkClient(...){//创建topicdef createTopic(...){...}//获取broker元数据def getBrokerMetadatas(...){...}//创建主题并可选地验证其参数。请注意,TopicCommand也使用此方法。def createTopicWithAssignment(...){...}//验证主题创建参数def validateTopicCreate(...){...}//删除topic //为给定主题创建删除路径def deleteTopic(...){...}//使用可选的副本分配向现有主题添加分区。请注意,TopicCommand使用此方法。def addPartitions(...){...}//将broker从实体名称解析为整数iddef parseBroker(...){...}//.....
}
3、ZkConfigRepository
zookeeper的配置仓库,也就是kafka在zookeeper中配置信息。
class ZkConfigRepository(adminZkClient: AdminZkClient) extends ConfigRepository {override def config(configResource: ConfigResource): Properties = {//....//从zookeeper的目录下读取数据,并封装成实体(topic、broker、client-id、user、user/clients/client-id、ip)adminZkClient.fetchEntityConfig(configTypeForZk, effectiveName)}
}
4、ZkData
object ZkData {//这些是kafka broker 启动时应该存在的持久ZK路径。val PersistentZkPaths: Seq[String] = Seq(ConsumerPathZNode.path, // old consumer pathBrokerIdsZNode.path,TopicsZNode.path,ConfigEntityChangeNotificationZNode.path,DeleteTopicsZNode.path,BrokerSequenceIdZNode.path,IsrChangeNotificationZNode.path,ProducerIdBlockZNode.path,LogDirEventNotificationZNode.path) ++ ConfigType.ALL.asScala.map(ConfigEntityTypeZNode.path)
}//旧的consumer在zk上的路径
object ConsumerPathZNode {def path = "/consumers"
}object BrokerIdsZNode {def path = s"${BrokersZNode.path}/ids"def encode: Array[Byte] = null
}object TopicsZNode {def path = s"${BrokersZNode.path}/topics"
}object ConfigEntityChangeNotificationZNode {def path = s"${ConfigZNode.path}/changes"
}object DeleteTopicsZNode {def path = s"${AdminZNode.path}/delete_topics"
}object BrokerSequenceIdZNode {def path = s"${BrokersZNode.path}/seqid"
}object IsrChangeNotificationZNode {def path = "/isr_change_notification"
}object ProducerIdBlockZNode {val CurrentVersion: Long = 1Ldef path = "/latest_producer_id_block"def generateProducerIdBlockJson(producerIdBlock: ProducerIdsBlock): Array[Byte] = {Json.encodeAsBytes(Map("version" -> CurrentVersion,"broker" -> producerIdBlock.assignedBrokerId,"block_start" -> producerIdBlock.firstProducerId.toString,"block_end" -> producerIdBlock.lastProducerId.toString).asJava)}object LogDirEventNotificationZNode {def path = "/log_dir_event_notification"
}
三、验证元数据属性集成是否有效
1、meta.properties文件是否始终设置了cluster.id
2、meta.properties文件是否始终设置了node.id或者 broker.id
initialMetaPropsEnsemble.verify(Optional.of(_clusterId), verificationId, verificationFlags)
四、动态broker初始化
动态broker配置存储在ZooKeeper中,可以在两个级别定义:
1、每个代理的配置持久化在/configs/brokers/{brokerId} 这些可以使用AdminClient使用资源名称brokerId进行描述/更改
2、整个集群的默认值持续存在于/configs/brokers/<default> 这些可以使用AdminClient使用空资源名称进行描述/更改。
broker配置的优先级顺序为:
1、DYNAMIC_BROKER_CONFIG:存储在ZK中的/configs/brokers/{brokerId}
2、DYNAMIC_DEFAULT_BROKER_CONFIG: 存储在ZK中的//configs/brokers/<default>
3、STATIC_BROKER_CONFIG:启动代理时使用的属性,通常来自server.properties文件
4、DEFAULT_CONFIG:KafkaConfig中定义的默认配置
config.dynamicConfig.initialize(Some(zkClient), clientMetricsReceiverPluginOpt = None)
五、启动KafkaController
_kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, metadataCache, threadNamePrefix)
kafkaController.startup()
1、KafkaController结构
class KafkaController(...){//事件管理private[controller] val eventManager = new ControllerEventManager(config.brokerId, this, time,controllerContext.stats.rateAndTimeMetrics)//如果brokerid = 当前的controllerid 那么就返回truedef isActive: Boolean = activeControllerId == config.brokerId@volatile private var brokerInfo = initialBrokerInfo@volatile private var _brokerEpoch = initialBrokerEpoch//启动def startup(): Unit = {zkClient.registerStateChangeHandler(new StateChangeHandler {//ControllerHandler = "controller-state-change-handler"override val name: String = StateChangeHandlers.ControllerHandleroverride def afterInitializingSession(): Unit = {eventManager.put(RegisterBrokerAndReelect)}override def beforeInitializingSession(): Unit = {val queuedEvent = eventManager.clearAndPut(Expire)//阻止新会话的初始化,直到处理过期事件,这确保在创建新会话之前已处理所有挂起的事件queuedEvent.awaitProcessing()}})eventManager.put(Startup)eventManager.start()}override def process(event: ControllerEvent): Unit = {event match {//.....case RegisterBrokerAndReelect =>processRegisterBrokerAndReelect()case Startup =>processStartup()//.....}}
}
1、ControllerEventManager结构
class ControllerEventManager(...){//用串行化队列代替锁private val queue = new LinkedBlockingQueue[QueuedEvent]//ControllerEventThreadName = "controller-event-thread"private[controller] var thread = new ControllerEventThread(ControllerEventThreadName)def start(): Unit = thread.start()def put(event: ControllerEvent): QueuedEvent = inLock(putLock) {val queuedEvent = new QueuedEvent(event, time.milliseconds())queue.put(queuedEvent)queuedEvent}class ControllerEventThread(name: String) extends ShutdownableThread(...){override def doWork(): Unit = {//从队列获取事件,主要是controller相关的事件val dequeued = pollFromEventQueue()dequeued.event match {case controllerEvent =>def process(): Unit = dequeued.process(processor)}}}}
}
ControllerEventManager中的ControllerEventThread的父类是ShutdownableThread,它里面有真正的run()且调起了doWork(),doWork()又调起了process(),因此真正执行的是process()
public abstract class ShutdownableThread extends Thread {public abstract void doWork();public void run() {while (isRunning())doWork();}
}
这是一个死循环,也就是后面只要往队列中添加事件,会自动执行对应方法。从KafkaController的startup()中我们知道放了两个事件:RegisterBrokerAndReelect和Startup,下面我们来看看它们里面做了什么
2、RegisterBrokerAndReelect事件处理
private def processRegisterBrokerAndReelect(): Unit = {_brokerEpoch = zkClient.registerBroker(brokerInfo)processReelect()}
1、向zookeeper注册broker
class KafkaZkClient private[zk] (...{def registerBroker(brokerInfo: BrokerInfo): Long = {//brokers/ids/brokeridval path = brokerInfo.path//创建 对应的 brokerid 的 临时znode节点,说明:当该brokers挂掉后会随之消失val stat = checkedEphemeralCreate(path, brokerInfo.toJsonBytes)info(s"Registered broker ${brokerInfo.broker.id} at path $path with addresses: " +s"${brokerInfo.broker.endPoints.map(_.connectionString).mkString(",")}, czxid (broker epoch): ${stat.getCzxid}")//返回czxid (broker epoch)stat.getCzxid}}
2、开始选举
class KafkaController(...){private def processReelect(): Unit = {maybeResign()elect()}private def maybeResign(): Unit = {val wasActiveBeforeChange = isActive//在zk上注册节点改变事件,当controller改变时触发,为下面的选举做铺垫zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)activeControllerId = zkClient.getControllerId.getOrElse(-1)if (wasActiveBeforeChange && !isActive) {//当当前broker辞去controller职务时触发onControllerResignation()}}private def elect(): Unit = {//获取 活动状态 contoller ,如果 集群已经启动了很长时间,新增了一台broker,那么此时会获得 当下的controller ,//如果此时集群刚刚启动,那么此时没有 活动状态的 controller ,返回的结果就是 -1activeControllerId = zkClient.getControllerId.getOrElse(-1)/** 我们可以在初始启动和handleDeleted ZK回调期间到达这里。由于潜在的竞争条件,当我们到达这里时,控制器可能已经被选中了。如果此代理已经是控制器,则此检查将防止以下createEphemeralPath方法进入无限循环。*/if (activeControllerId != -1) {//如果当下已经有 activeControllerId 那么就停止选举 ,否则继续往下走//Broker $activeControllerId 已被选为控制器,因此停止选举过程debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")return}//try中会发生如下情况//1、正常运行:当选controller//2、异常:// 1、ControllerMovedException // 1、其他broker成功当选controller// 2、controller已经当选,但刚刚离职,需要重新选举// 2、Throwable 该节点当选controller,但是就职时出错了。删除该controller,重新选举// try {val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)controllerContext.epoch = epochcontrollerContext.epochZkVersion = epochZkVersionactiveControllerId = config.brokerId//${config.brokerId}已成功当选为控制器。Epoch增加到${controllerContext.eepoch},Epoch zk版本现在是${controller Context.eepoch ZkVersion}”info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} " +s"and epoch zk version is now ${controllerContext.epochZkVersion}")//成功当选controller,并开始履行作为该角色的责任onControllerFailover()} catch {case e: ControllerMovedException =>//重新开始监听目录变化maybeResign()if (activeControllerId != -1)debug(s"代理$activeControllerId被选为控制器,而不是代理${config.brokerId}", e)elsewarn("管制员已经当选,但刚刚辞职,这将导致另一轮选举", e)case t: Throwable =>error(s"在代理${config.brokerId}上选择或成为控制器时出错。立即触发控制器移动", t)triggerControllerMove()}}}
在选举前zkClient注册的 controllerChangeHandler 事件其实就是观察 controller目录的变化
class ControllerChangeHandler(eventManager: ControllerEventManager) extends ZNodeChangeHandler {//controller目录override val path: String = ControllerZNode.pathoverride def handleCreation(): Unit = eventManager.put(ControllerChange)override def handleDeletion(): Unit = eventManager.put(Reelect)override def handleDataChange(): Unit = eventManager.put(ControllerChange)
}
六、总结
1、设置zookeeper,如:zookeeper.connect=hostname1:2181,hostname2:2181,hostname2:2181/kafka并创建持久化目录:consumers、brokers/ids、brokers/topics、config/changes、admin/delete_topics、brokers/seqid、isr_change_notification、latest_producer_id_block、log_dir_event_notification
2、验证元数据属性集成是否有效,主要时看每个broker是否有了唯一的id
3、将每个broker的id注册到zookeeper
4、启动KafkaController
5、启动ControllerEventThread线程并不断消费LinkedBlockingQueue中事件
6、向队列注册RegisterBrokerAndReelect事件、Startup事件
7、首先处理RegisterBrokerAndReelect事件
8、向zookeeper注册broker,并建立临时znode
9、注册controllerChangeHandler 事件其实就是观察 controller目录的变化
10、每个broker开始向zookeeper将自己注册为controller
11、正常情况下只有一个broker成功注册成功,其他broker抛出ControllerMovedException继续监控controller目录的变化
12、如果选举controller成功,但是在就职时失败会里面进行卸任工作,并进行新一轮选举
相关文章:
Kafka-Controller选举
一、上下文 《Kafka-broker粗粒度启动流程》博客中我们分析了broker的大致启动流程,这个时候每个broker都不是controller角色,下面我们就来看下它是如何选举出来的吧 二、设置ZooKeeper ZooKeeper是一个开源的分布式协调服务,主要用于分…...
必知的 Vue3 组件传值技巧:解锁组件交互新姿势
父传子defineProps 基本概念 在 Vue 3 中,父传子是一种组件间通信的方式,用于将父组件的数据传递给子组件。这种通信方式可以让组件之间更好地协作,实现功能的复用和模块的划分。 实现步骤 在父组件中传递数据 App.vue <template>…...
【论文阅读】医学SAM适配器:适应医学图像分割的任意分割模型
【论文阅读】医学SAM适配器:适应医学图像分割的任意分割模型 文章目录 【论文阅读】医学SAM适配器:适应医学图像分割的任意分割模型一、介绍二、联系工作三、方法四、实验 Medical SAM Adapter: Adapting Segment Anything Model for Medical Image Segm…...
创新体验触手可及 紫光展锐携手影目科技推出AI眼镜开放平台
近日,紫光展锐携手影目科技共同发布了搭载展锐W517芯片的影目X系列AI眼镜开放平台。这一产品的推出标志着双方在智能穿戴领域的深度协作,将紫光展锐的领先芯片技术与影目的产品创新相融合,合力打造全球智能眼镜市场的标杆产品。这一战略布局不…...
115页PDF | 埃森哲_XX集团信息化能力成熟度评估及能力提升方案(限免下载)
一、前言 这份报告是埃森哲_XX集团信息化能力成熟度评估及能力提升方案,报告首先分析了集团的战略规划,包括调整优化期、转型升级期和跨越发展期的目标,然后识别了集团面临的内部挑战和外部压力,如管控体系不完善、业务板块多样化…...
NumPy,科学计算领域中的Python明星库!
NumPy,科学计算领域中的Python明星库! 嘿,大家好呀,今天我们要来聊聊在科学计算领域里大放异彩的 NumPy 库。NumPy 是 Python 中的一个开源库,它提供了大量的数学函数,能够高效地处理大型数组与矩阵运算。…...
Hadoop生态圈框架部署(六)- HBase完全分布式部署
文章目录 前言一、Hbase完全分布式部署(手动部署)1. 下载Hbase2. 上传安装包3. 解压HBase安装包4. 配置HBase配置文件4.1 修改hbase-env.sh配置文件4.2 修改hbase-site.xml配置文件4.3 修改regionservers配置文件4.4 删除hbase中slf4j-reload4j-1.7.33.j…...
python怎么解决中文注释
最近开发学习Python,当加入中文注释时,运行程序报错: File "red.py", line 10 SyntaxError: Non-ASCII character \xe5 in file red.py on line 10, but no encoding declared; see http://www.python.org/peps/pep-0263.html fo…...
【Unity】Game Framework框架学习使用
前言 之前用过一段时间的Game Framework框架,后来有那么一段时间都做定制小软件,框架就没再怎么使用了。 现在要做大型项目了,感觉还是用框架好一些。于是又把Game Framework拾起来了。 这篇文章主要是讲Game Framework这个框架是怎么用的…...
Linux(CentOS 7) yum一键安装mysql8
1、通过yum安装 (1)下载mysql 在Linux找个地方输入以下命令 wget https://dev.mysql.com/get/mysql80-community-release-el7-3.noarch.rpm (2)安装mysql yum 仓库配置文件 [rootVM-8-15-centos ~]# sudo rpm -Uvh mysql80-c…...
Kafka 快速入门(一)
1.1安装部署 1.1.1 集群规划 bigdata01bigdata02bigdata03zookeeperzookeeperzookeeperkafkakafkakafka 1.1.2 集群部署 官方下载地址:http://kafka.apache.org/downloads.html 检查三台虚拟机的zk是否启动:zkServer.sh start 默认启动方式 1)解压…...
丹摩征文活动 | SD3+ComfyUI的图像部署实践
一、前言 作为Stability AI 推出的一款革命性的文本转图像开源模型,Stable Diffusion 3(简称SD3)在图像质量、文本内容生成、理解复杂指令以及资源利用效率方面,都有着不俗的表现。 SD3的Medium版本,拥有20亿参数&am…...
H.265流媒体播放器EasyPlayer.js网页web无插件播放器:如何优化加载速度
在当今的网络环境中,用户对于视频播放体验的要求越来越高,尤其是对于视频加载速度的期待。EasyPlayer.js网页web无插件播放器作为一款专为现代Web环境设计的流媒体播放器,它在优化加载速度方面采取了多种措施,以确保用户能够享受到…...
【Linux】进程状态的优先级
大家好呀,我是残念,希望在你看完之后,能对你有所帮助,有什么不足请指正!共同学习交流哦 本文由:残念ing原创CSDN首发,如需要转载请通知 个人主页:残念ing-CSDN博客,欢迎各…...
react中的组件传参
在React中,组件之间的数据传递是构建用户界面的关键部分。根据不同的需求和场景,有多种方式可以在React中传递参数,以下是对这些方式的详细说明: 一、通过props传递参数 这是React中最基本和最常用的数据传递方式。父组件通过属…...
HTML5:网页开发的新纪元
文章目录 前言一、HTML5技术概述二、主要特点及优势1. 多媒体支持2. 图形绘制3. 离线存储4. 表单控件增强5. 响应式设计 三、应用场景1. 游戏开发2. 在线教育3. 电子商务 四、面临的挑战结语 前言 在互联网技术快速发展的今天,H5(HTML5的简称࿰…...
CKA认证 | Day2 K8s内部监控与日志
第三章 Kubernetes监控与日志 1、查看集群资源状态 在 Kubernetes 集群中,查看集群资源状态和组件状态是非常重要的操作。以下是一些常用的命令和解释,帮助你更好地管理和监控 Kubernetes 集群。 1.1 查看master组件状态 Kubernetes 的 Master 组件包…...
电信网关配置管理系统 upload_channels.php 文件上传致RCE漏洞复现
0x01 产品简介 中国电信集团有限公司(英文名称“China Telecom”、简称“中国电信”)成立于2000年9月,是中国特大型国有通信企业、上海世博会全球合作伙伴。电信网关配置管理系统是一个用于管理和配置电信网络中网关设备的软件系统。它可以帮助网络管理员实现对网关设备的远…...
ubuntu更改max_map_count
在Ubuntu系统中,max_map_count是一个内核参数,用于限制每个进程可以拥有的内存段的数量。对于Elasticsearch等需要大量内存映射的应用,可能需要增加这个值。 执行以下步骤来更改max_map_count的值: 打开终端。 输入以下命令以编…...
《NPU、CPU、GPU 算力定义和计算方式》
一、引言 在人工智能时代,算力成为了推动技术发展的关键因素之一。不同类型的处理器,如中央处理器(CPU)、图形处理器(GPU)和神经网络处理器(NPU),在算力方面有着各自的特…...
[2025CVPR]DeepVideo-R1:基于难度感知回归GRPO的视频强化微调框架详解
突破视频大语言模型推理瓶颈,在多个视频基准上实现SOTA性能 一、核心问题与创新亮点 1.1 GRPO在视频任务中的两大挑战 安全措施依赖问题 GRPO使用min和clip函数限制策略更新幅度,导致: 梯度抑制:当新旧策略差异过大时梯度消失收敛困难:策略无法充分优化# 传统GRPO的梯…...
【杂谈】-递归进化:人工智能的自我改进与监管挑战
递归进化:人工智能的自我改进与监管挑战 文章目录 递归进化:人工智能的自我改进与监管挑战1、自我改进型人工智能的崛起2、人工智能如何挑战人类监管?3、确保人工智能受控的策略4、人类在人工智能发展中的角色5、平衡自主性与控制力6、总结与…...
FFmpeg 低延迟同屏方案
引言 在实时互动需求激增的当下,无论是在线教育中的师生同屏演示、远程办公的屏幕共享协作,还是游戏直播的画面实时传输,低延迟同屏已成为保障用户体验的核心指标。FFmpeg 作为一款功能强大的多媒体框架,凭借其灵活的编解码、数据…...
循环冗余码校验CRC码 算法步骤+详细实例计算
通信过程:(白话解释) 我们将原始待发送的消息称为 M M M,依据发送接收消息双方约定的生成多项式 G ( x ) G(x) G(x)(意思就是 G ( x ) G(x) G(x) 是已知的)࿰…...
页面渲染流程与性能优化
页面渲染流程与性能优化详解(完整版) 一、现代浏览器渲染流程(详细说明) 1. 构建DOM树 浏览器接收到HTML文档后,会逐步解析并构建DOM(Document Object Model)树。具体过程如下: (…...
Robots.txt 文件
什么是robots.txt? robots.txt 是一个位于网站根目录下的文本文件(如:https://example.com/robots.txt),它用于指导网络爬虫(如搜索引擎的蜘蛛程序)如何抓取该网站的内容。这个文件遵循 Robots…...
12.找到字符串中所有字母异位词
🧠 题目解析 题目描述: 给定两个字符串 s 和 p,找出 s 中所有 p 的字母异位词的起始索引。 返回的答案以数组形式表示。 字母异位词定义: 若两个字符串包含的字符种类和出现次数完全相同,顺序无所谓,则互为…...
微信小程序云开发平台MySQL的连接方式
注:微信小程序云开发平台指的是腾讯云开发 先给结论:微信小程序云开发平台的MySQL,无法通过获取数据库连接信息的方式进行连接,连接只能通过云开发的SDK连接,具体要参考官方文档: 为什么? 因为…...
[Java恶补day16] 238.除自身以外数组的乘积
给你一个整数数组 nums,返回 数组 answer ,其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法,且在 O(n) 时间复杂度…...
【开发技术】.Net使用FFmpeg视频特定帧上绘制内容
目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法,当前调用一个医疗行业的AI识别算法后返回…...
