Kafka-Controller选举
一、上下文
《Kafka-broker粗粒度启动流程》博客中我们分析了broker的大致启动流程,这个时候每个broker都不是controller角色,下面我们就来看下它是如何选举出来的吧
二、设置ZooKeeper
ZooKeeper是一个开源的分布式协调服务,主要用于分布式系统中各节点的协调和管理。Kafka的Controller选举也一样用到了它。
override def startup(): Unit = {//....initZkClient(time)configRepository = new ZkConfigRepository(new AdminZkClient(zkClient))//....}private def initZkClient(time: Time): Unit = {//config.zkConnect//zookeeper.connect=hostname1:2181,hostname2:2181,hostname2:2181/kafkainfo(s"Connecting to zookeeper on ${config.zkConnect}")_zkClient = KafkaZkClient.createZkClient("Kafka server", time, config, zkClientConfig)//如果需要,在ZK中预先创建顶级路径。_zkClient.createTopLevelPaths()}def createTopLevelPaths(): Unit = {//创建 Persistent 持久化的 zk 路径ZkData.PersistentZkPaths.foreach(makeSurePersistentPathExists)}//确保ZK中存在持久路径def makeSurePersistentPathExists(path: String): Unit = {createRecursive(path, data = null, throwIfPathExists = false)}
1、KafkaZkClient
KafkaZkClient是在Kafka.zookeeper.ZooKeeperClient之上提供更高级别的Kafka特定操作。
实现说明:此类包括各种组件(Controller, Configs, Old Consumer等)的方法,在某些情况下会从调用包中返回类的实例。
def createZkClient(name: String, time: Time, config: KafkaConfig, zkClientConfig: ZKClientConfig): KafkaZkClient = {//...KafkaZkClient(...)}
2、AdminZkClient
它提供与ZooKeeper交互的管理员相关方法。
class AdminZkClient(...){//创建topicdef createTopic(...){...}//获取broker元数据def getBrokerMetadatas(...){...}//创建主题并可选地验证其参数。请注意,TopicCommand也使用此方法。def createTopicWithAssignment(...){...}//验证主题创建参数def validateTopicCreate(...){...}//删除topic //为给定主题创建删除路径def deleteTopic(...){...}//使用可选的副本分配向现有主题添加分区。请注意,TopicCommand使用此方法。def addPartitions(...){...}//将broker从实体名称解析为整数iddef parseBroker(...){...}//.....
}
3、ZkConfigRepository
zookeeper的配置仓库,也就是kafka在zookeeper中配置信息。
class ZkConfigRepository(adminZkClient: AdminZkClient) extends ConfigRepository {override def config(configResource: ConfigResource): Properties = {//....//从zookeeper的目录下读取数据,并封装成实体(topic、broker、client-id、user、user/clients/client-id、ip)adminZkClient.fetchEntityConfig(configTypeForZk, effectiveName)}
}
4、ZkData
object ZkData {//这些是kafka broker 启动时应该存在的持久ZK路径。val PersistentZkPaths: Seq[String] = Seq(ConsumerPathZNode.path, // old consumer pathBrokerIdsZNode.path,TopicsZNode.path,ConfigEntityChangeNotificationZNode.path,DeleteTopicsZNode.path,BrokerSequenceIdZNode.path,IsrChangeNotificationZNode.path,ProducerIdBlockZNode.path,LogDirEventNotificationZNode.path) ++ ConfigType.ALL.asScala.map(ConfigEntityTypeZNode.path)
}//旧的consumer在zk上的路径
object ConsumerPathZNode {def path = "/consumers"
}object BrokerIdsZNode {def path = s"${BrokersZNode.path}/ids"def encode: Array[Byte] = null
}object TopicsZNode {def path = s"${BrokersZNode.path}/topics"
}object ConfigEntityChangeNotificationZNode {def path = s"${ConfigZNode.path}/changes"
}object DeleteTopicsZNode {def path = s"${AdminZNode.path}/delete_topics"
}object BrokerSequenceIdZNode {def path = s"${BrokersZNode.path}/seqid"
}object IsrChangeNotificationZNode {def path = "/isr_change_notification"
}object ProducerIdBlockZNode {val CurrentVersion: Long = 1Ldef path = "/latest_producer_id_block"def generateProducerIdBlockJson(producerIdBlock: ProducerIdsBlock): Array[Byte] = {Json.encodeAsBytes(Map("version" -> CurrentVersion,"broker" -> producerIdBlock.assignedBrokerId,"block_start" -> producerIdBlock.firstProducerId.toString,"block_end" -> producerIdBlock.lastProducerId.toString).asJava)}object LogDirEventNotificationZNode {def path = "/log_dir_event_notification"
}
三、验证元数据属性集成是否有效
1、meta.properties文件是否始终设置了cluster.id
2、meta.properties文件是否始终设置了node.id或者 broker.id
initialMetaPropsEnsemble.verify(Optional.of(_clusterId), verificationId, verificationFlags)
四、动态broker初始化
动态broker配置存储在ZooKeeper中,可以在两个级别定义:
1、每个代理的配置持久化在/configs/brokers/{brokerId} 这些可以使用AdminClient使用资源名称brokerId进行描述/更改
2、整个集群的默认值持续存在于/configs/brokers/<default> 这些可以使用AdminClient使用空资源名称进行描述/更改。
broker配置的优先级顺序为:
1、DYNAMIC_BROKER_CONFIG:存储在ZK中的/configs/brokers/{brokerId}
2、DYNAMIC_DEFAULT_BROKER_CONFIG: 存储在ZK中的//configs/brokers/<default>
3、STATIC_BROKER_CONFIG:启动代理时使用的属性,通常来自server.properties文件
4、DEFAULT_CONFIG:KafkaConfig中定义的默认配置
config.dynamicConfig.initialize(Some(zkClient), clientMetricsReceiverPluginOpt = None)
五、启动KafkaController
_kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, metadataCache, threadNamePrefix)
kafkaController.startup()
1、KafkaController结构
class KafkaController(...){//事件管理private[controller] val eventManager = new ControllerEventManager(config.brokerId, this, time,controllerContext.stats.rateAndTimeMetrics)//如果brokerid = 当前的controllerid 那么就返回truedef isActive: Boolean = activeControllerId == config.brokerId@volatile private var brokerInfo = initialBrokerInfo@volatile private var _brokerEpoch = initialBrokerEpoch//启动def startup(): Unit = {zkClient.registerStateChangeHandler(new StateChangeHandler {//ControllerHandler = "controller-state-change-handler"override val name: String = StateChangeHandlers.ControllerHandleroverride def afterInitializingSession(): Unit = {eventManager.put(RegisterBrokerAndReelect)}override def beforeInitializingSession(): Unit = {val queuedEvent = eventManager.clearAndPut(Expire)//阻止新会话的初始化,直到处理过期事件,这确保在创建新会话之前已处理所有挂起的事件queuedEvent.awaitProcessing()}})eventManager.put(Startup)eventManager.start()}override def process(event: ControllerEvent): Unit = {event match {//.....case RegisterBrokerAndReelect =>processRegisterBrokerAndReelect()case Startup =>processStartup()//.....}}
}
1、ControllerEventManager结构
class ControllerEventManager(...){//用串行化队列代替锁private val queue = new LinkedBlockingQueue[QueuedEvent]//ControllerEventThreadName = "controller-event-thread"private[controller] var thread = new ControllerEventThread(ControllerEventThreadName)def start(): Unit = thread.start()def put(event: ControllerEvent): QueuedEvent = inLock(putLock) {val queuedEvent = new QueuedEvent(event, time.milliseconds())queue.put(queuedEvent)queuedEvent}class ControllerEventThread(name: String) extends ShutdownableThread(...){override def doWork(): Unit = {//从队列获取事件,主要是controller相关的事件val dequeued = pollFromEventQueue()dequeued.event match {case controllerEvent =>def process(): Unit = dequeued.process(processor)}}}}
}
ControllerEventManager中的ControllerEventThread的父类是ShutdownableThread,它里面有真正的run()且调起了doWork(),doWork()又调起了process(),因此真正执行的是process()
public abstract class ShutdownableThread extends Thread {public abstract void doWork();public void run() {while (isRunning())doWork();}
}
这是一个死循环,也就是后面只要往队列中添加事件,会自动执行对应方法。从KafkaController的startup()中我们知道放了两个事件:RegisterBrokerAndReelect和Startup,下面我们来看看它们里面做了什么
2、RegisterBrokerAndReelect事件处理
private def processRegisterBrokerAndReelect(): Unit = {_brokerEpoch = zkClient.registerBroker(brokerInfo)processReelect()}
1、向zookeeper注册broker
class KafkaZkClient private[zk] (...{def registerBroker(brokerInfo: BrokerInfo): Long = {//brokers/ids/brokeridval path = brokerInfo.path//创建 对应的 brokerid 的 临时znode节点,说明:当该brokers挂掉后会随之消失val stat = checkedEphemeralCreate(path, brokerInfo.toJsonBytes)info(s"Registered broker ${brokerInfo.broker.id} at path $path with addresses: " +s"${brokerInfo.broker.endPoints.map(_.connectionString).mkString(",")}, czxid (broker epoch): ${stat.getCzxid}")//返回czxid (broker epoch)stat.getCzxid}}
2、开始选举
class KafkaController(...){private def processReelect(): Unit = {maybeResign()elect()}private def maybeResign(): Unit = {val wasActiveBeforeChange = isActive//在zk上注册节点改变事件,当controller改变时触发,为下面的选举做铺垫zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)activeControllerId = zkClient.getControllerId.getOrElse(-1)if (wasActiveBeforeChange && !isActive) {//当当前broker辞去controller职务时触发onControllerResignation()}}private def elect(): Unit = {//获取 活动状态 contoller ,如果 集群已经启动了很长时间,新增了一台broker,那么此时会获得 当下的controller ,//如果此时集群刚刚启动,那么此时没有 活动状态的 controller ,返回的结果就是 -1activeControllerId = zkClient.getControllerId.getOrElse(-1)/** 我们可以在初始启动和handleDeleted ZK回调期间到达这里。由于潜在的竞争条件,当我们到达这里时,控制器可能已经被选中了。如果此代理已经是控制器,则此检查将防止以下createEphemeralPath方法进入无限循环。*/if (activeControllerId != -1) {//如果当下已经有 activeControllerId 那么就停止选举 ,否则继续往下走//Broker $activeControllerId 已被选为控制器,因此停止选举过程debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")return}//try中会发生如下情况//1、正常运行:当选controller//2、异常:// 1、ControllerMovedException // 1、其他broker成功当选controller// 2、controller已经当选,但刚刚离职,需要重新选举// 2、Throwable 该节点当选controller,但是就职时出错了。删除该controller,重新选举// try {val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)controllerContext.epoch = epochcontrollerContext.epochZkVersion = epochZkVersionactiveControllerId = config.brokerId//${config.brokerId}已成功当选为控制器。Epoch增加到${controllerContext.eepoch},Epoch zk版本现在是${controller Context.eepoch ZkVersion}”info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} " +s"and epoch zk version is now ${controllerContext.epochZkVersion}")//成功当选controller,并开始履行作为该角色的责任onControllerFailover()} catch {case e: ControllerMovedException =>//重新开始监听目录变化maybeResign()if (activeControllerId != -1)debug(s"代理$activeControllerId被选为控制器,而不是代理${config.brokerId}", e)elsewarn("管制员已经当选,但刚刚辞职,这将导致另一轮选举", e)case t: Throwable =>error(s"在代理${config.brokerId}上选择或成为控制器时出错。立即触发控制器移动", t)triggerControllerMove()}}}
在选举前zkClient注册的 controllerChangeHandler 事件其实就是观察 controller目录的变化
class ControllerChangeHandler(eventManager: ControllerEventManager) extends ZNodeChangeHandler {//controller目录override val path: String = ControllerZNode.pathoverride def handleCreation(): Unit = eventManager.put(ControllerChange)override def handleDeletion(): Unit = eventManager.put(Reelect)override def handleDataChange(): Unit = eventManager.put(ControllerChange)
}
六、总结
1、设置zookeeper,如:zookeeper.connect=hostname1:2181,hostname2:2181,hostname2:2181/kafka并创建持久化目录:consumers、brokers/ids、brokers/topics、config/changes、admin/delete_topics、brokers/seqid、isr_change_notification、latest_producer_id_block、log_dir_event_notification
2、验证元数据属性集成是否有效,主要时看每个broker是否有了唯一的id
3、将每个broker的id注册到zookeeper
4、启动KafkaController
5、启动ControllerEventThread线程并不断消费LinkedBlockingQueue中事件
6、向队列注册RegisterBrokerAndReelect事件、Startup事件
7、首先处理RegisterBrokerAndReelect事件
8、向zookeeper注册broker,并建立临时znode
9、注册controllerChangeHandler 事件其实就是观察 controller目录的变化
10、每个broker开始向zookeeper将自己注册为controller
11、正常情况下只有一个broker成功注册成功,其他broker抛出ControllerMovedException继续监控controller目录的变化
12、如果选举controller成功,但是在就职时失败会里面进行卸任工作,并进行新一轮选举
相关文章:
Kafka-Controller选举
一、上下文 《Kafka-broker粗粒度启动流程》博客中我们分析了broker的大致启动流程,这个时候每个broker都不是controller角色,下面我们就来看下它是如何选举出来的吧 二、设置ZooKeeper ZooKeeper是一个开源的分布式协调服务,主要用于分…...
必知的 Vue3 组件传值技巧:解锁组件交互新姿势
父传子defineProps 基本概念 在 Vue 3 中,父传子是一种组件间通信的方式,用于将父组件的数据传递给子组件。这种通信方式可以让组件之间更好地协作,实现功能的复用和模块的划分。 实现步骤 在父组件中传递数据 App.vue <template>…...
【论文阅读】医学SAM适配器:适应医学图像分割的任意分割模型
【论文阅读】医学SAM适配器:适应医学图像分割的任意分割模型 文章目录 【论文阅读】医学SAM适配器:适应医学图像分割的任意分割模型一、介绍二、联系工作三、方法四、实验 Medical SAM Adapter: Adapting Segment Anything Model for Medical Image Segm…...
创新体验触手可及 紫光展锐携手影目科技推出AI眼镜开放平台
近日,紫光展锐携手影目科技共同发布了搭载展锐W517芯片的影目X系列AI眼镜开放平台。这一产品的推出标志着双方在智能穿戴领域的深度协作,将紫光展锐的领先芯片技术与影目的产品创新相融合,合力打造全球智能眼镜市场的标杆产品。这一战略布局不…...
115页PDF | 埃森哲_XX集团信息化能力成熟度评估及能力提升方案(限免下载)
一、前言 这份报告是埃森哲_XX集团信息化能力成熟度评估及能力提升方案,报告首先分析了集团的战略规划,包括调整优化期、转型升级期和跨越发展期的目标,然后识别了集团面临的内部挑战和外部压力,如管控体系不完善、业务板块多样化…...
NumPy,科学计算领域中的Python明星库!
NumPy,科学计算领域中的Python明星库! 嘿,大家好呀,今天我们要来聊聊在科学计算领域里大放异彩的 NumPy 库。NumPy 是 Python 中的一个开源库,它提供了大量的数学函数,能够高效地处理大型数组与矩阵运算。…...
Hadoop生态圈框架部署(六)- HBase完全分布式部署
文章目录 前言一、Hbase完全分布式部署(手动部署)1. 下载Hbase2. 上传安装包3. 解压HBase安装包4. 配置HBase配置文件4.1 修改hbase-env.sh配置文件4.2 修改hbase-site.xml配置文件4.3 修改regionservers配置文件4.4 删除hbase中slf4j-reload4j-1.7.33.j…...
python怎么解决中文注释
最近开发学习Python,当加入中文注释时,运行程序报错: File "red.py", line 10 SyntaxError: Non-ASCII character \xe5 in file red.py on line 10, but no encoding declared; see http://www.python.org/peps/pep-0263.html fo…...
【Unity】Game Framework框架学习使用
前言 之前用过一段时间的Game Framework框架,后来有那么一段时间都做定制小软件,框架就没再怎么使用了。 现在要做大型项目了,感觉还是用框架好一些。于是又把Game Framework拾起来了。 这篇文章主要是讲Game Framework这个框架是怎么用的…...
Linux(CentOS 7) yum一键安装mysql8
1、通过yum安装 (1)下载mysql 在Linux找个地方输入以下命令 wget https://dev.mysql.com/get/mysql80-community-release-el7-3.noarch.rpm (2)安装mysql yum 仓库配置文件 [rootVM-8-15-centos ~]# sudo rpm -Uvh mysql80-c…...
Kafka 快速入门(一)
1.1安装部署 1.1.1 集群规划 bigdata01bigdata02bigdata03zookeeperzookeeperzookeeperkafkakafkakafka 1.1.2 集群部署 官方下载地址:http://kafka.apache.org/downloads.html 检查三台虚拟机的zk是否启动:zkServer.sh start 默认启动方式 1)解压…...
丹摩征文活动 | SD3+ComfyUI的图像部署实践
一、前言 作为Stability AI 推出的一款革命性的文本转图像开源模型,Stable Diffusion 3(简称SD3)在图像质量、文本内容生成、理解复杂指令以及资源利用效率方面,都有着不俗的表现。 SD3的Medium版本,拥有20亿参数&am…...
H.265流媒体播放器EasyPlayer.js网页web无插件播放器:如何优化加载速度
在当今的网络环境中,用户对于视频播放体验的要求越来越高,尤其是对于视频加载速度的期待。EasyPlayer.js网页web无插件播放器作为一款专为现代Web环境设计的流媒体播放器,它在优化加载速度方面采取了多种措施,以确保用户能够享受到…...
【Linux】进程状态的优先级
大家好呀,我是残念,希望在你看完之后,能对你有所帮助,有什么不足请指正!共同学习交流哦 本文由:残念ing原创CSDN首发,如需要转载请通知 个人主页:残念ing-CSDN博客,欢迎各…...
react中的组件传参
在React中,组件之间的数据传递是构建用户界面的关键部分。根据不同的需求和场景,有多种方式可以在React中传递参数,以下是对这些方式的详细说明: 一、通过props传递参数 这是React中最基本和最常用的数据传递方式。父组件通过属…...
HTML5:网页开发的新纪元
文章目录 前言一、HTML5技术概述二、主要特点及优势1. 多媒体支持2. 图形绘制3. 离线存储4. 表单控件增强5. 响应式设计 三、应用场景1. 游戏开发2. 在线教育3. 电子商务 四、面临的挑战结语 前言 在互联网技术快速发展的今天,H5(HTML5的简称࿰…...
CKA认证 | Day2 K8s内部监控与日志
第三章 Kubernetes监控与日志 1、查看集群资源状态 在 Kubernetes 集群中,查看集群资源状态和组件状态是非常重要的操作。以下是一些常用的命令和解释,帮助你更好地管理和监控 Kubernetes 集群。 1.1 查看master组件状态 Kubernetes 的 Master 组件包…...
电信网关配置管理系统 upload_channels.php 文件上传致RCE漏洞复现
0x01 产品简介 中国电信集团有限公司(英文名称“China Telecom”、简称“中国电信”)成立于2000年9月,是中国特大型国有通信企业、上海世博会全球合作伙伴。电信网关配置管理系统是一个用于管理和配置电信网络中网关设备的软件系统。它可以帮助网络管理员实现对网关设备的远…...
ubuntu更改max_map_count
在Ubuntu系统中,max_map_count是一个内核参数,用于限制每个进程可以拥有的内存段的数量。对于Elasticsearch等需要大量内存映射的应用,可能需要增加这个值。 执行以下步骤来更改max_map_count的值: 打开终端。 输入以下命令以编…...
《NPU、CPU、GPU 算力定义和计算方式》
一、引言 在人工智能时代,算力成为了推动技术发展的关键因素之一。不同类型的处理器,如中央处理器(CPU)、图形处理器(GPU)和神经网络处理器(NPU),在算力方面有着各自的特…...
浏览器访问 AWS ECS 上部署的 Docker 容器(监听 80 端口)
✅ 一、ECS 服务配置 Dockerfile 确保监听 80 端口 EXPOSE 80 CMD ["nginx", "-g", "daemon off;"]或 EXPOSE 80 CMD ["python3", "-m", "http.server", "80"]任务定义(Task Definition&…...
论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(二)
HoST框架核心实现方法详解 - 论文深度解读(第二部分) 《Learning Humanoid Standing-up Control across Diverse Postures》 系列文章: 论文深度解读 + 算法与代码分析(二) 作者机构: 上海AI Lab, 上海交通大学, 香港大学, 浙江大学, 香港中文大学 论文主题: 人形机器人…...
【项目实战】通过多模态+LangGraph实现PPT生成助手
PPT自动生成系统 基于LangGraph的PPT自动生成系统,可以将Markdown文档自动转换为PPT演示文稿。 功能特点 Markdown解析:自动解析Markdown文档结构PPT模板分析:分析PPT模板的布局和风格智能布局决策:匹配内容与合适的PPT布局自动…...
cf2117E
原题链接:https://codeforces.com/contest/2117/problem/E 题目背景: 给定两个数组a,b,可以执行多次以下操作:选择 i (1 < i < n - 1),并设置 或,也可以在执行上述操作前执行一次删除任意 和 。求…...
Cloudflare 从 Nginx 到 Pingora:性能、效率与安全的全面升级
在互联网的快速发展中,高性能、高效率和高安全性的网络服务成为了各大互联网基础设施提供商的核心追求。Cloudflare 作为全球领先的互联网安全和基础设施公司,近期做出了一个重大技术决策:弃用长期使用的 Nginx,转而采用其内部开发…...
【开发技术】.Net使用FFmpeg视频特定帧上绘制内容
目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法,当前调用一个医疗行业的AI识别算法后返回…...
C++:多态机制详解
目录 一. 多态的概念 1.静态多态(编译时多态) 二.动态多态的定义及实现 1.多态的构成条件 2.虚函数 3.虚函数的重写/覆盖 4.虚函数重写的一些其他问题 1).协变 2).析构函数的重写 5.override 和 final关键字 1&#…...
宇树科技,改名了!
提到国内具身智能和机器人领域的代表企业,那宇树科技(Unitree)必须名列其榜。 最近,宇树科技的一项新变动消息在业界引发了不少关注和讨论,即: 宇树向其合作伙伴发布了一封公司名称变更函称,因…...
MySQL 主从同步异常处理
阅读原文:https://www.xiaozaoshu.top/articles/mysql-m-s-update-pk MySQL 做双主,遇到的这个错误: Could not execute Update_rows event on table ... Error_code: 1032是 MySQL 主从复制时的经典错误之一,通常表示ÿ…...
【Elasticsearch】Elasticsearch 在大数据生态圈的地位 实践经验
Elasticsearch 在大数据生态圈的地位 & 实践经验 1.Elasticsearch 的优势1.1 Elasticsearch 解决的核心问题1.1.1 传统方案的短板1.1.2 Elasticsearch 的解决方案 1.2 与大数据组件的对比优势1.3 关键优势技术支撑1.4 Elasticsearch 的竞品1.4.1 全文搜索领域1.4.2 日志分析…...
