Kafka-创建topic源码
一、命令创建topic
kafka-topics --create --topic quickstart-events --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2
二、kafka-topics脚本
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.TopicCommand "$@"
脚本中指定了处理它的主类:TopicCommand
三、TopicCommand
public abstract class TopicCommand {public static void main(String... args) {Exit.exit(mainNoExit(args));}private static int mainNoExit(String... args) {try {execute(args);return 0;} catch (Throwable e) {return 1;}}static void execute(String... args) throws Exception {//解析命令行参数TopicCommandOptions opts = new TopicCommandOptions(args);//创建TopicServiceTopicService topicService = new TopicService(opts.commandConfig(), opts.bootstrapServer());try {if (opts.hasCreateOption()) {//这是处理topic创建的,我们主要分析它topicService.createTopic(opts);} else if (opts.hasAlterOption()) {//更高topic逻辑topicService.alterTopic(opts);} else if (opts.hasListOption()) {//获取topictopicService.listTopics(opts);} else if (opts.hasDescribeOption()) {//topi相关描述信息topicService.describeTopic(opts);} else if (opts.hasDeleteOption()) {//删除topictopicService.deleteTopic(opts);}}catch(...){...}finally {topicService.close();}}public static class TopicService implements AutoCloseable {public void createTopic(TopicCommandOptions opts) throws Exception {CommandTopicPartition topic = new CommandTopicPartition(opts);if (Topic.hasCollisionChars(topic.name)) {//由于度量名称的限制,带有句点(“.”)或下划线(“_”)的主题可能会发生冲突。为了避免问题,最好使用其中之一,但不要两者都使用System.out.println(".........");}createTopic(topic);}public void createTopic(CommandTopicPartition topic) throws Exception {if (topic.replicationFactor.filter(rf -> rf > Short.MAX_VALUE || rf < 1).isPresent()) {//复制因子必须介于1和“+Short.MAX_VALUE+”之间throw new IllegalArgumentException("...");}if (topic.partitions.filter(p -> p < 1).isPresent()) {//分区必须大于0throw new IllegalArgumentException("...");}try {NewTopic newTopic;//取决于创建 topic 时 是否指定了 replica-assignmentif (topic.hasReplicaAssignment()) {newTopic = new NewTopic(topic.name, topic.replicaAssignment);} else {newTopic = new NewTopic(topic.name, topic.partitions, topic.replicationFactor.map(Integer::shortValue));}//给topic设置参数Map<String, String> configsMap = topic.configsToAdd.stringPropertyNames().stream().collect(Collectors.toMap(name -> name, name -> topic.configsToAdd.getProperty(name)));newTopic.configs(configsMap);//批量创建topicCreateTopicsResult createResult = adminClient.createTopics(Collections.singleton(newTopic),new CreateTopicsOptions().retryOnQuotaViolation(false));//等待所有topic都创建成功createResult.all().get();System.out.println("Created topic " + topic.name + ".");} catch (ExecutionException e) {//......}}}}
TopicCommandOptions中有对创建topic所有参数的解读,我们下面来详细看下这些参数
四、创建Topic参数
bootstrap-server
必选项:连接Kafka server用
command-config
包含要传递给Admin Client的配置的属性文件。这仅与--bootstrap-server选项一起使用,用于描述和更改broker配置
list
列出所有可用的topic
create
创建一个新的topic
delete
删除一个topic
alter
更改分区数量和副本分配,通过--alter更新现有主题的配置
describe
列出给定topic的详细信息
topic
要创建、更改、描述或删除的主题。它还接受正则表达式,但--create选项除外。将主题名称放在双引号中,并使用“\\”前缀转义正则表达式符号;例如 \"test\\.topic\"
topic-id
仅与用于描述主题的--bootstrap-server选项一起使用
config
正在创建的主题的主题配置覆盖。
delete-config
要删除现有主题的主题配置覆盖
partitions
正在创建或更改的主题的分区数量(警告:如果为具有键的主题增加分区,则分区逻辑或消息顺序将受到影响)。如果未提供用于,则为集群默认值
replication-factor
正在创建的主题中每个分区的复制因子。如果未提供,则为群集默认值
replica-assignment
正在创建或更改的topic的手动分区到broker分配列表
under-replicated-partitions
如果在描述主题时设置,则仅在复制分区下显示
unavailable-partitions
如果在描述主题时设置,则仅显示其leader不可用的分区
under-min-isr-partitions
如果在描述主题时设置,则仅显示isr计数 < 配置的最小值的分区。
at-min-isr-partitions
如果在描述主题时设置,则仅显示isr计数 = 配置的最小值的分区
topics-with-overrides
如果在描述主题时设置,则仅显示已覆盖配置的topic
if-exists
如果在更改、删除或描述主题时设置,则仅当主题存在时才会执行该操作
if-not-exists
如果在创建主题时设置,则仅当主题不存在时才会执行该操作。
exclude-internal
运行list或describe命令时排除内部topic。默认情况下,内部topic将被列出
partition-size-limit-per-response
一个DescribeTopicPartitions响应中包含的最大分区大小
五、AdminClient
从第二步的源码中看到最终将topic的创建交给了AdminClient来完成,下面我们继续往下分析
1、创建
在TopicService的构造方法中创建的AdminClient
它是Kafka的管理客户端,支持管理和检查topic、broker、配置和ACL。
AdminClient的创建用到了bootstrap.servers,它里面有连接KafkaServer的host:port列表。
bootstrap.servers配置仅用于发现群集中的broker,然后AdminClient将根据需要连接到这些broker。因此,只包括两个或三个经纪人地址就足以应对broker不可用的风险。
TopicService topicService = new TopicService(opts.commandConfig(), opts.bootstrapServer());
public static class TopicService implements AutoCloseable {private final Admin adminClient;public TopicService(Properties commandConfig, Optional<String> bootstrapServer) {this.adminClient = createAdminClient(commandConfig, bootstrapServer);}private static Admin createAdminClient(Properties commandConfig, Optional<String> bootstrapServer) {if (bootstrapServer.isPresent()) {commandConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer.get());}return Admin.create(commandConfig);}}
2、交由子类KafkaAdminClient处理
public class KafkaAdminClient extends AdminClient {private final AdminClientRunnable runnable;//创建一批topicpublic CreateTopicsResult createTopics(final Collection<NewTopic> newTopics,final CreateTopicsOptions options) {final Map<String, KafkaFutureImpl<TopicMetadataAndConfig>> topicFutures = new HashMap<>(newTopics.size());final CreatableTopicCollection topics = new CreatableTopicCollection();for (NewTopic newTopic : newTopics) {//判断名字是否符合规范if (topicNameIsUnrepresentable(newTopic.name())) {KafkaFutureImpl<TopicMetadataAndConfig> future = new KafkaFutureImpl<>();future.completeExceptionally(new InvalidTopicException("The given topic name '" +newTopic.name() + "' cannot be represented in a request."));topicFutures.put(newTopic.name(), future);} else if (!topicFutures.containsKey(newTopic.name())) {//topicFutures 装的是还没有创建的 topicnametopicFutures.put(newTopic.name(), new KafkaFutureImpl<>());topics.add(newTopic.convertToCreatableTopic());}}if (!topics.isEmpty()) {final long now = time.milliseconds();final long deadline = calcDeadlineMs(now, options.timeoutMs());//里面封装了 ApiKeys.CREATE_TOPICS 请求final Call call = getCreateTopicsCall(options, topicFutures, topics,Collections.emptyMap(), now, deadline);//实现了Runnable接口runnable.call(call, now);}return new CreateTopicsResult(new HashMap<>(topicFutures));}
}
从这里我们看到,这里会用一个线程向broker发送ApiKeys.CREATE_TOPICS 请求。下面我们来看broker端怎么处理topics的创建请求的。按照我们之前的经验,要去看KafkaApis中对应ApiKeys.CREATE_TOPICS的处理逻辑
class KafkaApis(...){request.header.apiKey match {//....case ApiKeys.CREATE_TOPICS => maybeForwardToController(request, handleCreateTopicsRequest)case ApiKeys.DELETE_TOPICS => maybeForwardToController(request, handleDeleteTopicsRequest)case ApiKeys.CREATE_ACLS => maybeForwardToController(request, handleCreateAcls)case ApiKeys.DELETE_ACLS => maybeForwardToController(request, handleDeleteAcls)case ApiKeys.CREATE_PARTITIONS => maybeForwardToController(request, handleCreatePartitionsRequest)case ApiKeys.ELECT_LEADERS => maybeForwardToController(request, handleElectLeaders)//.....}
}
六、CREATE_TOPICS的处理逻辑
从KafkaApi中我们看到很多请求都调用了maybeForwardToController()方法来处理,但是传入的参数不同,从名称上我们可以猜测这些请求可能交由Controller来处理,回想下《Kafka-Controller角色需要做什么?》中当一个broker当选为Controller时第一件事就是注册监听器,去监听broker改变、topic改变、topic删除、isr改变等,并分别准备好了响应的处理逻辑。因此这里只要让topic发生改变就可以自动触发让Controller处理了。下面看下handleCreateTopicsRequest()中都做了什么?
1、获取ZooKeeper
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
2、判断集群当下是否有Controller
如果集群当下没有Controller,直接向客户端返回Errors.NOT_CONTROLLER错误。我们按照集群当下有Controller继续分析。
if (!zkSupport.controller.isActive) {//如果没有contorller,直接向客户端发送响应信息(集群当下没有controller),且这个时候时创建不了topic的,createTopicsRequest.data.topics.forEach { topic =>results.add(new CreatableTopicResult().setName(topic.name).setErrorCode(Errors.NOT_CONTROLLER.code))}sendResponseCallback(results)} else {//正常逻辑}
3、检查topic名称
集群元数据topic是一个具有不同实现的内部topic。不应允许用户创建同名的topic。
if (topicNames.contains(Topic.CLUSTER_METADATA_TOPIC_NAME)) {//拒绝创建内部主题 __cluster_metadatainfo(s"Rejecting creation of internal topic ${Topic.CLUSTER_METADATA_TOPIC_NAME}")}topicNames.diff(Set(Topic.CLUSTER_METADATA_TOPIC_NAME))
4、调用ZkAdminManager创建topic
zkSupport.adminManager.createTopics(createTopicsRequest.data.timeoutMs,createTopicsRequest.data.validateOnly,toCreate,authorizedForDescribeConfigs,controllerMutationQuota,handleCreateTopicsResults)}
1、循环校验每个topic是否符合规则
1、topic是否已经存在
2、topic是否为null
3、numPartitions或replicationFactor和replicasAssignments都已设置。两者不能同时使用
2、确定分区分配列表
如果用户指定了列表,那么就直接用用户的,否则使用Kafka自己的分配策略(下篇博客分析)
val assignments = if (topic.assignments.isEmpty) {CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(brokers.asJavaCollection, resolvedNumPartitions, resolvedReplicationFactor))} else {val assignments = new mutable.HashMap[Int, Seq[Int]]//注意:我们不会检查replicaAssignment是否包含未知的代理——与添加分区的情况不同,这遵循TopicCommand中的现有逻辑topic.assignments.forEach { assignment =>assignments(assignment.partitionIndex) = assignment.brokerIds.asScala.map(a => a: Int)}assignments}
3、topics目录下创建指定的topic
//ConfigType.TOPIC : topics 目录
//topic :要创建的topic名称
zkClient.setOrCreateEntityConfigs(ConfigType.TOPIC, topic, config)
4、topic目录下创建分区目录和对应信息
writeTopicPartitionAssignment(topic, partitionReplicaAssignment.map { case (k, v) => k -> ReplicaAssignment(v) },isUpdate = false, usesTopicId)
5、创建对应的元数据
CreatePartitionsMetadata(topic.name, assignments.keySet)
七、Controller端处理逻辑
我们找到TopicChange对应的处理逻辑
override def process(event: ControllerEvent): Unit = {try {event match {case TopicChange =>processTopicChange()//......}}}private def processTopicChange(): Unit = {if (!isActive) return//从 brokers/topics/目录下获取所有的topicval topics = zkClient.getAllTopicsInCluster(true)//从controllerContext 获取当下缓存中所有的 topic//两者相减获取 新增加的 topicval newTopics = topics -- controllerContext.allTopics// 获取删除的topic (既topics目录没有,但是缓存中有)val deletedTopics = controllerContext.allTopics.diff(topics)//设置新的topic到缓存controllerContext.setAllTopics(topics)//检测zk中 每个topic 目录的变化registerPartitionModificationsHandlers(newTopics.toSeq)//现在要添加分区和副本了,也就是从topic下获取 topic_id、adding_replicas、removing_replicas、partitions 信息val addedPartitionReplicaAssignment = zkClient.getReplicaAssignmentAndTopicIdForTopics(newTopics)deletedTopics.foreach(controllerContext.removeTopic)processTopicIds(addedPartitionReplicaAssignment)addedPartitionReplicaAssignment.foreach { case TopicIdReplicaAssignment(_, _, newAssignments) =>newAssignments.foreach { case (topicAndPartition, newReplicaAssignment) =>//controllerContext 的缓存中 更新分区、副本、leder信息controllerContext.updatePartitionFullReplicaAssignment(topicAndPartition, newReplicaAssignment)}}info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +s"[$addedPartitionReplicaAssignment]")if (addedPartitionReplicaAssignment.nonEmpty) {val partitionAssignments = addedPartitionReplicaAssignment.map { case TopicIdReplicaAssignment(_, _, partitionsReplicas) => partitionsReplicas.keySet }.reduce((s1, s2) => s1.union(s2))//更高topic下的分区、副本为可用状态 OnlineReplica//此时 往topic 生产数据就ok了 onNewPartitionCreation(partitionAssignments)}}
从源码中我们可以看到,Controller这端会不断的将新的topic以及其下的topic_id、adding_replicas、removing_replicas、partitions 信息加载到缓存,并使用它们的状态机将它们更新至可用状态。并剔除掉删除的topic。始终保持,当向topic生产数据时,它这里都时最新的状态。
相关文章:
Kafka-创建topic源码
一、命令创建topic kafka-topics --create --topic quickstart-events --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2 二、kafka-topics脚本 exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.TopicCommand "$" 脚本中指定了…...

【网络安全】(一) 0成本添加访问级监控
互联网的安全感这个概念源于阿里。顾名思义,让互联网的用户对于web产品能够产生足够的信任和依赖。特别是涉及到用户资金交易的站点,一次严重的用户资料泄露就可以彻底毁掉你的品牌。 然而当前阶段除了bat大部分互联网行业的企业对于网络安全给的重视都…...

【Three.js基础学习】26. Animated galaxy
前言 shaders实现星系 课程回顾 使用顶点着色器为每个粒子设置动画 a属性 , u制服 ,v变化 像素比:window.devicePixelRatio 自动从渲染器检索像素比 renderer.getPixelRatio() 如何尺寸衰减, 放大缩小视角时,粒子都是同…...
vscode使用ssh配置docker容器环境
1 创建容器,并映射主机和容器的指定ssh服务端口 2 进入容器 docker exec -it <容器ID> /bin/bash 3在容器中安装ssh服务 apt-get update apt-get install openssh-server 接着修改ssh文件信息,将容器的10008端口暴露出来允许root用户使用ssh登录 vim /…...

NLP论文速读(EMNLP 2024)|动态奖励与提示优化来帮助语言模型的进行自我对齐
论文速读|Dynamic Rewarding with Prompt Optimization Enables Tuning-free Self-Alignment of Language Models 论文信息: 简介: 本文讨论的背景是大型语言模型(LLMs)的自我对齐问题。传统的LLMs对齐方法依赖于昂贵的训练和人类偏好注释&am…...
【LeetCode】167. 两数之和 II - 输入有序数组
描述 给定一个下标从 1 开始的整数数组numbers,该数组已按非递减顺序排列,请从数组中找出满足相加之和等于目标数target的两个数。如果这两个数分别是numbers[index1]和numbers[index2],返回整数数组[index1, index2]。 只存在唯一答案&#…...
Getx:GetxController依赖管理02,Binding绑定全局控制器(懒加载Controller)
在使用GetX 状态管理器的时候,如果每个页面都手动实例化一个控制器就太麻烦了, Binding 的作用就是所有需要进行状态管理的控制器进行统一初始化 创建全局控制器Binding import package:get/get.dart; import ../controllers/counter.dart; // 同上一篇内…...
leetcode 找不同
389. 找不同 已解答 简单 相关标签 相关企业 给定两个字符串 s 和 t ,它们只包含小写字母。 字符串 t 由字符串 s 随机重排,然后在随机位置添加一个字母。 请找出在 t 中被添加的字母。 示例 1: 输入:s "abcd"…...

2025 - 生信信息学 - GEO数据分析 - RF分析(随机森林)
GEO数据分析 - RF分析(随机森林) 01 准备数据文件 #install.packages("randomForest")#引用包 library(randomForest) set.seed(123456)inputFile"diffGeneExp.txt" #输入文件 setwd("/Users/wangyang/Desktop/BCBM/02ra…...

Matlab深度学习(四)——AlexNet卷积神经网络
网络搭建参考:手撕 CNN 经典网络之 AlexNet(理论篇)-CSDN博客 在实际工程应用中,构建并训练一个大规模的卷积神经网络是比较复杂的,需要大量的数据以及高性能的硬件。如果通过训练好的典型网络稍加改进…...
etcd defrag
场景 prometheus监控告警,告警信息如下 etcd cluster "kube-etcd": database size in use on instance xx is 33.45% of the actual allocated disk space, please run defragmentation (e.g. etcdctl defrag) to retrieve the unused fragmented disk space.处理…...

golang语言整合jwt+gin框架实现token
1.下载jwt go get -u github.com/dgrijalva/jwt-go2.新建生成token和解析token文件 2.1 新建common文件夹和jwtConfig文件夹 新建jwtconfig.go文件 2.2 jwtconfig.go文件代码 /* Time : 2021/8/2 下午3:03 Author : mrxuexi File : main Software: GoLand */ package jwtC…...
数据治理、数据素养和数据质量管理:文献综述
注意:这并不是正式发表的论文,只是一篇用来交作业的文章 摘要 随着数据时代的到来,数据治理、数据素养和数据质量管理成为组织数据管理中的三大核心概念。本文基于相关研究与实践,对这三个领域进行全面综述,探讨它…...
【Linux】用户和用户组管理
管理用户 1.添加用户账号——useradd命令 【实例2-1-1】 按系统默认配置添加指定用户账号st和stu。 # 添加用户账号st [rootlocalhost ~]# useradd st # 添加用户账号stu [rootlocalhost ~]# useradd stu【实例2-1-2】添加用户账号stu01,UID为1004&am…...

游戏引擎学习第16天
视频参考:https://www.bilibili.com/video/BV1mEUCY8EiC/ 这些字幕讨论了编译器警告的概念以及如何在编译过程中启用和处理警告。以下是字幕的内容摘要: 警告的定义:警告是编译器用来告诉你某些地方可能存在问题,尽管编译器不强制要求你修复…...
如何通过对敏捷实践的调整,帮助远程团队提升研发效能?
首先明确一点,最敏捷的做法就是不要远程团队或分布式团队,远程一定比不上面对面同一地点的模式,毕竟环境不同,就不要期望远程团队和本地团队具备相同的效能,甚至期望更高。 那么,无论何种原因,…...

Ubuntu Linux使用前准备动作 配置SSH
在 Ubuntu 系统中配置 SSH 服务可以通过以下步骤进行: 1、安装ssh服务 1)打开终端(可以使用快捷键 Ctrl Alt T)。 2)运行以下命令安装 OpenSSH 服务器: sudo apt-get update:这一步是更新…...
疫情下的图书馆管理系统:Spring Boot技术
摘要 随着信息技术在管理上越来越深入而广泛的应用,管理信息系统的实施在技术上已逐步成熟。本文介绍了疫情下图书馆管理系统的开发全过程。通过分析疫情下图书馆管理系统管理的不足,创建了一个计算机管理疫情下图书馆管理系统的方案。文章介绍了疫情下图…...

vue3完整安装并创建项目
1、下载:https://npmmirror.com/mirrors/node/v18.19.0/node-v18.19.0-x64.msi 2、验证Nodejs是否安装成功(管理员身份运行cmd) node -v #查看nodejs的版本 v18.19.0npm -v #查看npm的版本 10.2.3 3、在D:\Program Files\nodejs路径下创建两…...

【Linux】Linux入门实操——进程管理(重点)
1. 概述 在 LINUX 中,每个执行的程序都称为一个进程。每一个进程都分配一个ID号(pid,进程号)。>windows > linux每个进程都可能以两种方式存在的。前台与后台,所谓前台进程就是用户目前的屏幕上可以进行操作的。后台进程则是实际在操作࿰…...

C++初阶-list的底层
目录 1.std::list实现的所有代码 2.list的简单介绍 2.1实现list的类 2.2_list_iterator的实现 2.2.1_list_iterator实现的原因和好处 2.2.2_list_iterator实现 2.3_list_node的实现 2.3.1. 避免递归的模板依赖 2.3.2. 内存布局一致性 2.3.3. 类型安全的替代方案 2.3.…...
<6>-MySQL表的增删查改
目录 一,create(创建表) 二,retrieve(查询表) 1,select列 2,where条件 三,update(更新表) 四,delete(删除表…...
鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院查看报告小程序
一、开发环境准备 工具安装: 下载安装DevEco Studio 4.0(支持HarmonyOS 5)配置HarmonyOS SDK 5.0确保Node.js版本≥14 项目初始化: ohpm init harmony/hospital-report-app 二、核心功能模块实现 1. 报告列表…...
【HTML-16】深入理解HTML中的块元素与行内元素
HTML元素根据其显示特性可以分为两大类:块元素(Block-level Elements)和行内元素(Inline Elements)。理解这两者的区别对于构建良好的网页布局至关重要。本文将全面解析这两种元素的特性、区别以及实际应用场景。 1. 块元素(Block-level Elements) 1.1 基本特性 …...
Hive 存储格式深度解析:从 TextFile 到 ORC,如何选对数据存储方案?
在大数据处理领域,Hive 作为 Hadoop 生态中重要的数据仓库工具,其存储格式的选择直接影响数据存储成本、查询效率和计算资源消耗。面对 TextFile、SequenceFile、Parquet、RCFile、ORC 等多种存储格式,很多开发者常常陷入选择困境。本文将从底…...

HDFS分布式存储 zookeeper
hadoop介绍 狭义上hadoop是指apache的一款开源软件 用java语言实现开源框架,允许使用简单的变成模型跨计算机对大型集群进行分布式处理(1.海量的数据存储 2.海量数据的计算)Hadoop核心组件 hdfs(分布式文件存储系统)&a…...
在Ubuntu24上采用Wine打开SourceInsight
1. 安装wine sudo apt install wine 2. 安装32位库支持,SourceInsight是32位程序 sudo dpkg --add-architecture i386 sudo apt update sudo apt install wine32:i386 3. 验证安装 wine --version 4. 安装必要的字体和库(解决显示问题) sudo apt install fonts-wqy…...
《C++ 模板》
目录 函数模板 类模板 非类型模板参数 模板特化 函数模板特化 类模板的特化 模板,就像一个模具,里面可以将不同类型的材料做成一个形状,其分为函数模板和类模板。 函数模板 函数模板可以简化函数重载的代码。格式:templa…...

无人机侦测与反制技术的进展与应用
国家电网无人机侦测与反制技术的进展与应用 引言 随着无人机(无人驾驶飞行器,UAV)技术的快速发展,其在商业、娱乐和军事领域的广泛应用带来了新的安全挑战。特别是对于关键基础设施如电力系统,无人机的“黑飞”&…...

深度学习水论文:mamba+图像增强
🧀当前视觉领域对高效长序列建模需求激增,对Mamba图像增强这方向的研究自然也逐渐火热。原因在于其高效长程建模,以及动态计算优势,在图像质量提升和细节恢复方面有难以替代的作用。 🧀因此短时间内,就有不…...