Kafka-TopicPartition
Kafka主题与分区
主题与分区
topic & partition,是Kafka两个核心的概念,也是Kafka的基本组织单元。 主题作为消息的归类,可以再细分为一个或多个分区,分区也可以看作对消息的二次归类。 分区的划分为kafka提供了可伸缩性、水平扩展性、容错性等优势。 分区可以有一个至多个副本,每个副本对应一个日志文件,每个日志文件对应一至多个日志分段(LogSegment),每个日志分段还可以细分为索引文件、日志存储文件和快照文件等
主题的管理
主题的管理
-
创建主题
-
查看主题信息
-
修改主题
-
删除主题
上述操作可以采用Kafka提供的kafka-topics.sh脚本来完成,也可以采用Kafka提供的AdminClient来完成。 该脚本位于¥KAFKA_HOME/bin目录下 
创建主题
创建主题的命令格式如下:
kafka-topics.sh --bootstrap-server <server:port> \--create --topic <topic> \--partitions <numPartitions> \--replication-factor <replicationFactor>
创建一个分区数为4、副本因子为2的主题
kafka-topics.sh --bootstrap-server localhost:9092 \--create --topic topic-create \--partitions 4 \--replication-factor 2
创建一个分区数为4、副本因子为2的主题,并且指定主题的配置信息
kafka-topics.sh --bootstrap-server localhost:9092 \--create --topic topic-create \--partitions 4 \--replication-factor 2 \--config max.message.bytes=128000
通过describe指令来查看分区副本的分配细节
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic-create
使用replica-assignment参数手动指定分区副本的分配方案
使用这种方式根据分区号的数值大小按照从小到大的顺序进行排列
例如:0:1:2,0:1:2,0:1:2,0:1:2
-
分区与分区之间用逗号分隔
-
分区与副本之间用冒号分隔
kafka-topics.sh --bootstrap-server localhost:9092 \--create --topic topic-create-same \--replica-assignment 0:1:2,0:1:2,0:1:2,0:1:2
注意:
-
同一个分区内的副本不能有重复,比如0:0,1:1这样,就会报出AdminCommandFailedException异常
-
分区之间所指定的副本数不同,比如0:0,1:1这样,就会报出AdminOperationException异常
主题命名规范
-
主题名称只能包含ASCII字母、数字、点、减号和下划线
-
主题名称长度不能超过249个字符
-
主题名称不能以点开头
-
不能以__开头,这是Kafka内部使用的主题前缀
-
不能包含空格、单引号、双引号、逗号、分号、冒号和NULL字符
-
主题名称应该全部小写,因为Kafka在区分主题名称时是不区分大小写的
-
主题名称不能与Kafka保留的名称冲突,比如__consumer_offsets
-
主题名称不能与已经存在的消费者组名称冲突
-
主题名称不能与已经存在的主题名称冲突
查看主题信息
通过list指令来查看当前Kafka集群中所有可用的主题
kafka-topics.sh --bootstrap-server localhost:9092 --list

通过describe指令来查看主题的详细信息
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic-create

修改主题
当主题被创建之后,依然允许我们对其做一定的修改,比如修改分区数、修改副本因子、修改配置等。 通过alter指令来修改主题的配置信息
# 修改主题的最大消息字节数,配置值从10000修改为20000kafka-topics.sh --bootstrap-server localhost:9092 \--alter --topic topic-config \--config max.message.bytes=20000
通过alter指令来修改主题的分区数
kafka-topics.sh --bootstrap-server localhost:9092 \--alter --topic topic-create \--partitions 6
删除主题
通过delete指令来删除主题
kafka-topics.sh --bootstrap-server localhost:9092 \--delete --topic topic-delete
通过delete-config参数来删除之前设置的配置信息
kafka-topics.sh --bootstrap-server localhost:9092 \--alter --topic topic-config \--delete-config max.message.bytes
手动删除主题
-
主题中的元数据存储在Zookeeper中的/brokers/topics和/config/topics路径下
-
主题中的消息数据存储在log.dir或log.dirs配置的路径下,只需要手动删除这些地方的数据即可。
配置管理
kafka-configs.sh脚本用于管理Kafka的配置信息,该脚本位于$KAFKA_HOME/bin目录下 主要包含变更配置alter和查看配置describe两个指令
# 变更主题的配置信息
kafka-configs.sh --bootstrap-server localhost:9092 \--alter --entity-type topics --entity-name topic-config \--add-config max.message.bytes=128000# 添加主题的配置信息
kafka-configs.sh --bootstrap-server localhost:9092 \--alter --entity-type topics --entity-name topic-config \--add-config max.message.bytes=128000# 查看主题的配置信息
kafka-configs.sh --bootstrap-server localhost:9092 \--describe --entity-type topics --entity-name topic-config
KafkaAdminClient
KafkaAdminClient是Kafka提供的一个管理客户端,用于管理Kafka集群中的资源,比如主题、分区、消费者组等。
TopicCommand基本使用
使用KafkaAdminClient来完成TopicCommand的基本操作
查看主题信息
public class demo{public static void describeTopic(){String[ ] options = new String[ ]{"--bootstrap-server localhost:9092","--describe","--topic", "topic-create"};kafka.admin.TopicCommand.main(options);}
}
创建主题
public class demo{public static void createTopic(){String[ ] options = new String[ ]{"--bootstrap-server localhost:9092","--create","--replication-factor", "1","--partitions", "1","--topic", "topic-create-api"};kafka.admin.TopicCommand.main(options);}
}
查看所有可用主题
public class demo{public static void listTopic(){String[ ] options = new String[ ]{"--bootstrap-server localhost:9092","--list"};kafka.admin.TopicCommand.main(options);}
}
KafkaAdminClient基本使用
KafkaAdminClient可以用来管理broker、配置和ACL(Access Control List),以及管理主题、分区和消费者组等。 KafkaAdminClient继承了org.apache.kafka.clients.admin.AdminClient,提供了一系列的API来管理Kafka集群中的资源。
AdminClient常见的方法
-
createTopics:创建主题
- CreateTopicsResult createTopics(Collection newTopics)
-
deleteTopics:删除主题
- DeleteTopicsResult deleteTopics(Collection topics)
-
listTopics:列出所有可用的主题
- ListTopicsResult listTopics()
-
describeTopics:查看主题的详细信息
- DescribeTopicsResult describeTopics(Collection topicNames)
-
describeCluster:查看集群的详细信息
- DescribeClusterResult describeCluster()
-
describeConfigs:查看配置的详细信息
- DescribeConfigsResult describeConfigs(Collection resources)
-
alterConfigs:修改配置信息
- AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs)
-
describeConsumerGroups:查看消费者组的详细信息
- DescribeConsumerGroupsResult describeConsumerGroups(Collection groupIds)
-
listConsumerGroups:列出所有可用的消费者组
- ListConsumerGroupsResult listConsumerGroups()
-
createPartitions:创建分区
- CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions)
使用KafkaAdminClient创建主题
public class KafkaAdminClientCreateTopic {/*** 使用AdminClient创建Topic** 创建完成之后使用如下脚本进行检查* 进入KAFKA_HOME/bin* 执行 ./kafka-topics.sh --bootstrap-server localhost:9092 --list*/public static void createTopic(){Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient adminClient = AdminClient.create(props);NewTopic newTopic = new NewTopic("topic-create-api", 1, (short) 1);// 创建主题的方法内部是通过发送CreateTopicRequest请求来完成的CreateTopicsResult result = adminClient.createTopics(Arrays.asList(newTopic));try {result.all().get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient,释放资源adminClient.close();}public static void main(String[ ] args) {createTopic();}
}
使用KafkaAdminClient查看主题信息
public class KafkaAdminClientDescribeTopic {/*** 使用AdminClient查看Topic信息*/public static void describeTopic(){Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient adminClient = AdminClient.create(props);DescribeTopicsResult result = adminClient.describeTopics(Arrays.asList("topic-create-api"));try {Map<String, TopicDescription> map = result.all().get();for (Map.Entry<String, TopicDescription> entry : map.entrySet()) {System.out.println(entry.getKey() + " : " + entry.getValue());}} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient,释放资源adminClient.close();}public static void main(String[ ] args) {describeTopic();}
}
使用KafkaAdminClient查看所有可用的主题
public class KafkaAdminClientListTopic {/*** 使用AdminClient查看所有可用的Topic*/public static void listTopic(){Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient adminClient = AdminClient.create(props);ListTopicsResult result = adminClient.listTopics();try {Set<String> set = result.names().get();for (String s : set) {System.out.println(s);}} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient,释放资源adminClient.close();}public static void main(String[ ] args) {listTopic();}
}
使用KafkaAdminClient创建分区
public class KafkaAdminClientCreatePartition {/*** 使用AdminClient创建分区*/public static void createPartition(){Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient adminClient = AdminClient.create(props);Map<String, NewPartitions> map = new HashMap<>();NewPartitions newPartitions = NewPartitions.increaseTo(2);map.put("topic-create-api", newPartitions);CreatePartitionsResult result = adminClient.createPartitions(map);try {result.all().get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient,释放资源adminClient.close();}public static void main(String[ ] args) {createPartition();}
}
使用KafkaAdminClient删除主题
public class KafkaAdminClientDeleteTopic {/*** 使用AdminClient删除Topic*/public static void deleteTopic(){Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient adminClient = AdminClient.create(props);DeleteTopicsResult result = adminClient.deleteTopics(Arrays.asList("topic-create-api"));try {result.all().get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient,释放资源adminClient.close();}public static void main(String[ ] args) {deleteTopic();}
}
使用KafkaAdminClient修改主题配置
public class KafkaAdminClientAlterTopic {/*** 使用AdminClient修改Topic配置*/public static void alterTopic(){Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient adminClient = AdminClient.create(props);ConfigEntry configEntry = new ConfigEntry("max.message.bytes", "128000");Config config = new Config(Arrays.asList(configEntry));Map<ConfigResource, Config> map = new HashMap<>();ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "topic-create-api");map.put(configResource, config);AlterConfigsResult result = adminClient.alterConfigs(map);try {result.all().get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient,释放资源adminClient.close();}public static void main(String[ ] args) {alterTopic();}
}
使用KafkaAdminClient查看主题配置
public class KafkaAdminClientDescribeTopicConfig {/*** 使用AdminClient查看Topic配置*/public static void describeTopicConfig(){Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient adminClient = AdminClient.create(props);ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "topic-create-api");DescribeConfigsResult result = adminClient.describeConfigs(Arrays.asList(configResource));try {Map<ConfigResource, Config> map = result.all().get();for (Map.Entry<ConfigResource, Config> entry : map.entrySet()) {System.out.println(entry.getKey() + " : " + entry.getValue());}} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient,释放资源adminClient.close();}public static void main(String[ ] args) {describeTopicConfig();}
}
相关文章:
Kafka-TopicPartition
Kafka主题与分区 主题与分区 topic & partition,是Kafka两个核心的概念,也是Kafka的基本组织单元。 主题作为消息的归类,可以再细分为一个或多个分区,分区也可以看作对消息的二次归类。 分区的划分为kafka提供了可伸缩性、水…...
英特尔工作站:助力专业用户实现高效创作
原创 | 文 BFT机器人 英特尔工作站是由全球知名的英特尔公司设计和开发的一款计算平台。英特尔在工作站处理器领域将其产品分为性能型和移动型两类,它的诞生旨在满足专业用户在科学、工程、设计等领域对高性能计算的需求。英特尔工作站配备了最新的英特尔处理器、大…...
软件工程期末复习(选择+填空+判断)
文章目录 软件工程期末复习一、 选择题 软件工程期末复习 一、 选择题 1.“软件危机”的表现不包括:(c) A、软件产品不能按期交付 B、用户对“已完成的”软件产品时常不满意 C、程序员越来越供不应求 D、软件项目难以管理,维护困…...
群晖NAS基础设置
群晖NAS基础设置 最近一直在玩群晖NAS系统,有一些基础的配置跟大家分享一下 开启ssh登录 1.开启方法 控制面板—>终端和SNMP—>终端机 2.使用ssh软件登录 这里我用SecureCRT登录 进入ssh 3.进入root用户 starstar-nas:~$ sudo su -l root Password: ro…...
2023亚太杯数学建模A题B题C题选题建议,思路分析,模型代码
目录 ABC题思路模型代码:获取见文末名片,第一时间更新 视频连接讲解如上 A题思路:采果机器人的图像识别技术思路模型代码 B题思路:玻璃温室中的微气候法规 C题思路:我国新能源电动汽车的发展趋势 ABC题思路模型代…...
OpenGL的学习之路 -5
1.视景体 正交投影 人眼看世界,有一个可见范围。范围内可见,范围外不可见。视景体就是这么一个概念。 (上图仅学习记录用) 在OGL中,有两种投影方式,对应两种视景体。第一种,正交投影…...
【linux】服务器CPU占用50%,top/htop/ps却看不到异常进程?使用unhide可以查看!
问题描述 htop发现前32个核全被占满了,但是却找不到对应进程号 查杀 治标:杀死隐藏进程 1、unhide 安装unhide apt-get install unhideunhide使用 unhide proc果然发现了隐藏进程 kill -9 kill -9 [pid]这么多pid号,我这边杀了其中…...
JSP EL表达式获取list/Map集合与java Bean对象
上文 JSP EL表达式基本使用 中 我们对EL表达式做了一个基本的了解 也做了基础的字符串数据使用 那么 我们可以来看一下我们的集合 首先 list 这个比较简单 我们直接这样写代码 <% page import"java.util.ArrayList" %> <% page import"java.util.Lis…...
汇编程序:查找数组中最大最小值
实验内容 1. 从数据段DS中包含9个字节的数组数据VALUE中分别找出最大值(存到max中)、最小值(存到min中)。 2. 能够单步执行程序,认真观察、判断每条指令执行的结果是否正确,对错误结果,能够做出…...
ElasticSearch之禁用交换分区
操作系统将进程加载至内存中执行时,对于当前未使用到的内存页,可能会将相关内存页交换至硬盘上,即swap。 对于性能敏感、时延敏感的应用程序比如ElasticSearch,swap特性会明显影响性能和稳定性,因此最好禁用swap特性。…...
【Linux】第二十一站:文件(一)
文章目录 一、共识原理二、C系列文件接口三、从C过渡到系统:文件系统调用四、访问文件的本质 一、共识原理 文件 内容 属性 文件分为打开的文件 和 没打开的文件 打开的文件:是谁打开的?是进程!----所以研究打开的文件本质是研…...
centos7 docker开启认证的远程端口2376配置
docker开启2375会存在安全漏洞 暴露了2375端口的Docker主机。因为没有任何加密和认证过程,知道了主机IP以后,,任何人都可以管理这台主机上的容器和镜像,以前贪图方便,只开启了没有认证的docker2375端口,后…...
Java王者荣耀小游戏
Background类 package LX;import java.awt.*; //背景类 public class Background extends GameObject{public Background(GameFrame gameFrame) {super(gameFrame);}Image bg Toolkit.getDefaultToolkit().getImage("C:\\Users\\ASUS\\Desktop\\王者荣耀图片\\Map.jpg&…...
谈谈Redis的几种经典集群模式
目录 前言 主从复制 哨兵模式 分片集群 前言 Redis集群是一种通过将多个Redis节点连接在一起以实现高可用性、数据分片和负载均衡的技术。它允许Redis在不同节点上同时提供服务,提高整体性能和可靠性。在Redis中提供集群方案总共有三种:主从复制、…...
【腾讯云 HAI域探秘】基于高性能应用服务器HAI部署的 ChatGLM2-6B模型,我开发了AI办公助手,公司行政小姐姐用了都说好!
目录 前言 一、腾讯云HAI介绍: 1、即插即用 轻松上手 2、横向对比 青出于蓝 3、多种高性能应用部署场景 二、腾讯云HAI一键部署并使用ChatGLM2-6B快速实现开发者所需的相关API服务 1、登录 高性能应用服务 HAI 控制台 2、点击 新建 选择 AI模型,…...
服务器tar压缩解压文件
文章目录 一、前言二、命令2.1、解压2.2、压缩 三、最后 一、前言 前端上传dist代码到服务器上后,是在linux上操作,所以和window有所不同。一般是打好dist,然后压缩成gz传输到服务器,此时在服务器上可能涉及到解压和压缩的操作&a…...
博物馆线上导览系统的设计与实现-计算机毕业设计源码64574
摘 要 21世纪的今天,随着社会的不断发展与进步,人们对于信息科学化的认识,已由低层次向高层次发展,由原来的感性认识向理性认识提高,管理工作的重要性已逐渐被人们所认识,科学化的管理,使信息存…...
vue升级题
不熟悉的: 2, 3.你用过befcoreDetory 吗?清除定时器,第一个和第二个再看一下 实例加载完成是在哪个生命周期--beforecreate 7.父子组件生命周期执行顺序?为什么这么渲染?场景 8.简单描述每个周期具体适…...
Edit And Resend测试接口工具(浏览器上的Postman)
优点 可以不用设置Cookie或者Token,只设置参数进行重发接口测试API 使用Microsoft Rdge浏览器 F12——然后点击网络——在页面点击发起请求——然后选择要重发的请求右键选择Edit And Resend——在网络控制台设置自己要设置的参数去测试自己写的功能...
maven常用打包命令,值传递和引用传递,Java包 ,JDK 中常用的包有哪些,import java和javax有什么区别
文章目录 maven常用打包命令Java程序设计语言对对象采用的不是引用调用,实际上,对象引用是按值传递的。值传递和引用传递有什么区别Java包 ,JDK 中常用的包有哪些import java和javax有什么区别 谈谈java基础的内容,而且很多人都回…...
【Axure高保真原型】引导弹窗
今天和大家中分享引导弹窗的原型模板,载入页面后,会显示引导弹窗,适用于引导用户使用页面,点击完成后,会显示下一个引导弹窗,直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…...
深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录
ASP.NET Core 是一个跨平台的开源框架,用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录,以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...
springboot 百货中心供应链管理系统小程序
一、前言 随着我国经济迅速发展,人们对手机的需求越来越大,各种手机软件也都在被广泛应用,但是对于手机进行数据信息管理,对于手机的各种软件也是备受用户的喜爱,百货中心供应链管理系统被用户普遍使用,为方…...
线程同步:确保多线程程序的安全与高效!
全文目录: 开篇语前序前言第一部分:线程同步的概念与问题1.1 线程同步的概念1.2 线程同步的问题1.3 线程同步的解决方案 第二部分:synchronized关键字的使用2.1 使用 synchronized修饰方法2.2 使用 synchronized修饰代码块 第三部分ÿ…...
STM32F4基本定时器使用和原理详解
STM32F4基本定时器使用和原理详解 前言如何确定定时器挂载在哪条时钟线上配置及使用方法参数配置PrescalerCounter ModeCounter Periodauto-reload preloadTrigger Event Selection 中断配置生成的代码及使用方法初始化代码基本定时器触发DCA或者ADC的代码讲解中断代码定时启动…...
macOS多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用
文章目录 问题现象问题原因解决办法 问题现象 macOS启动台(Launchpad)多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用。 问题原因 很明显,都是Google家的办公全家桶。这些应用并不是通过独立安装的…...
selenium学习实战【Python爬虫】
selenium学习实战【Python爬虫】 文章目录 selenium学习实战【Python爬虫】一、声明二、学习目标三、安装依赖3.1 安装selenium库3.2 安装浏览器驱动3.2.1 查看Edge版本3.2.2 驱动安装 四、代码讲解4.1 配置浏览器4.2 加载更多4.3 寻找内容4.4 完整代码 五、报告文件爬取5.1 提…...
技术栈RabbitMq的介绍和使用
目录 1. 什么是消息队列?2. 消息队列的优点3. RabbitMQ 消息队列概述4. RabbitMQ 安装5. Exchange 四种类型5.1 direct 精准匹配5.2 fanout 广播5.3 topic 正则匹配 6. RabbitMQ 队列模式6.1 简单队列模式6.2 工作队列模式6.3 发布/订阅模式6.4 路由模式6.5 主题模式…...
MinIO Docker 部署:仅开放一个端口
MinIO Docker 部署:仅开放一个端口 在实际的服务器部署中,出于安全和管理的考虑,我们可能只能开放一个端口。MinIO 是一个高性能的对象存储服务,支持 Docker 部署,但默认情况下它需要两个端口:一个是 API 端口(用于存储和访问数据),另一个是控制台端口(用于管理界面…...
关于easyexcel动态下拉选问题处理
前些日子突然碰到一个问题,说是客户的导入文件模版想支持部分导入内容的下拉选,于是我就找了easyexcel官网寻找解决方案,并没有找到合适的方案,没办法只能自己动手并分享出来,针对Java生成Excel下拉菜单时因选项过多导…...
