Kafka-TopicPartition
Kafka主题与分区
主题与分区
topic & partition,是Kafka两个核心的概念,也是Kafka的基本组织单元。 主题作为消息的归类,可以再细分为一个或多个分区,分区也可以看作对消息的二次归类。 分区的划分为kafka提供了可伸缩性、水平扩展性、容错性等优势。 分区可以有一个至多个副本,每个副本对应一个日志文件,每个日志文件对应一至多个日志分段(LogSegment),每个日志分段还可以细分为索引文件、日志存储文件和快照文件等
主题的管理
主题的管理
-
创建主题
-
查看主题信息
-
修改主题
-
删除主题
上述操作可以采用Kafka提供的kafka-topics.sh脚本来完成,也可以采用Kafka提供的AdminClient来完成。 该脚本位于¥KAFKA_HOME/bin目录下 
创建主题
创建主题的命令格式如下:
kafka-topics.sh --bootstrap-server <server:port> \--create --topic <topic> \--partitions <numPartitions> \--replication-factor <replicationFactor>
创建一个分区数为4、副本因子为2的主题
kafka-topics.sh --bootstrap-server localhost:9092 \--create --topic topic-create \--partitions 4 \--replication-factor 2
创建一个分区数为4、副本因子为2的主题,并且指定主题的配置信息
kafka-topics.sh --bootstrap-server localhost:9092 \--create --topic topic-create \--partitions 4 \--replication-factor 2 \--config max.message.bytes=128000
通过describe指令来查看分区副本的分配细节
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic-create
使用replica-assignment参数手动指定分区副本的分配方案
使用这种方式根据分区号的数值大小按照从小到大的顺序进行排列
例如:0:1:2,0:1:2,0:1:2,0:1:2
-
分区与分区之间用逗号分隔
-
分区与副本之间用冒号分隔
kafka-topics.sh --bootstrap-server localhost:9092 \--create --topic topic-create-same \--replica-assignment 0:1:2,0:1:2,0:1:2,0:1:2
注意:
-
同一个分区内的副本不能有重复,比如0:0,1:1这样,就会报出AdminCommandFailedException异常
-
分区之间所指定的副本数不同,比如0:0,1:1这样,就会报出AdminOperationException异常
主题命名规范
-
主题名称只能包含ASCII字母、数字、点、减号和下划线
-
主题名称长度不能超过249个字符
-
主题名称不能以点开头
-
不能以__开头,这是Kafka内部使用的主题前缀
-
不能包含空格、单引号、双引号、逗号、分号、冒号和NULL字符
-
主题名称应该全部小写,因为Kafka在区分主题名称时是不区分大小写的
-
主题名称不能与Kafka保留的名称冲突,比如__consumer_offsets
-
主题名称不能与已经存在的消费者组名称冲突
-
主题名称不能与已经存在的主题名称冲突
查看主题信息
通过list指令来查看当前Kafka集群中所有可用的主题
kafka-topics.sh --bootstrap-server localhost:9092 --list

通过describe指令来查看主题的详细信息
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic-create

修改主题
当主题被创建之后,依然允许我们对其做一定的修改,比如修改分区数、修改副本因子、修改配置等。 通过alter指令来修改主题的配置信息
# 修改主题的最大消息字节数,配置值从10000修改为20000kafka-topics.sh --bootstrap-server localhost:9092 \--alter --topic topic-config \--config max.message.bytes=20000
通过alter指令来修改主题的分区数
kafka-topics.sh --bootstrap-server localhost:9092 \--alter --topic topic-create \--partitions 6
删除主题
通过delete指令来删除主题
kafka-topics.sh --bootstrap-server localhost:9092 \--delete --topic topic-delete
通过delete-config参数来删除之前设置的配置信息
kafka-topics.sh --bootstrap-server localhost:9092 \--alter --topic topic-config \--delete-config max.message.bytes
手动删除主题
-
主题中的元数据存储在Zookeeper中的/brokers/topics和/config/topics路径下
-
主题中的消息数据存储在log.dir或log.dirs配置的路径下,只需要手动删除这些地方的数据即可。
配置管理
kafka-configs.sh脚本用于管理Kafka的配置信息,该脚本位于$KAFKA_HOME/bin目录下 主要包含变更配置alter和查看配置describe两个指令
# 变更主题的配置信息
kafka-configs.sh --bootstrap-server localhost:9092 \--alter --entity-type topics --entity-name topic-config \--add-config max.message.bytes=128000# 添加主题的配置信息
kafka-configs.sh --bootstrap-server localhost:9092 \--alter --entity-type topics --entity-name topic-config \--add-config max.message.bytes=128000# 查看主题的配置信息
kafka-configs.sh --bootstrap-server localhost:9092 \--describe --entity-type topics --entity-name topic-config
KafkaAdminClient
KafkaAdminClient是Kafka提供的一个管理客户端,用于管理Kafka集群中的资源,比如主题、分区、消费者组等。
TopicCommand基本使用
使用KafkaAdminClient来完成TopicCommand的基本操作
查看主题信息
public class demo{public static void describeTopic(){String[ ] options = new String[ ]{"--bootstrap-server localhost:9092","--describe","--topic", "topic-create"};kafka.admin.TopicCommand.main(options);}
}
创建主题
public class demo{public static void createTopic(){String[ ] options = new String[ ]{"--bootstrap-server localhost:9092","--create","--replication-factor", "1","--partitions", "1","--topic", "topic-create-api"};kafka.admin.TopicCommand.main(options);}
}
查看所有可用主题
public class demo{public static void listTopic(){String[ ] options = new String[ ]{"--bootstrap-server localhost:9092","--list"};kafka.admin.TopicCommand.main(options);}
}
KafkaAdminClient基本使用
KafkaAdminClient可以用来管理broker、配置和ACL(Access Control List),以及管理主题、分区和消费者组等。 KafkaAdminClient继承了org.apache.kafka.clients.admin.AdminClient,提供了一系列的API来管理Kafka集群中的资源。
AdminClient常见的方法
-
createTopics:创建主题
- CreateTopicsResult createTopics(Collection newTopics)
-
deleteTopics:删除主题
- DeleteTopicsResult deleteTopics(Collection topics)
-
listTopics:列出所有可用的主题
- ListTopicsResult listTopics()
-
describeTopics:查看主题的详细信息
- DescribeTopicsResult describeTopics(Collection topicNames)
-
describeCluster:查看集群的详细信息
- DescribeClusterResult describeCluster()
-
describeConfigs:查看配置的详细信息
- DescribeConfigsResult describeConfigs(Collection resources)
-
alterConfigs:修改配置信息
- AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs)
-
describeConsumerGroups:查看消费者组的详细信息
- DescribeConsumerGroupsResult describeConsumerGroups(Collection groupIds)
-
listConsumerGroups:列出所有可用的消费者组
- ListConsumerGroupsResult listConsumerGroups()
-
createPartitions:创建分区
- CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions)
使用KafkaAdminClient创建主题
public class KafkaAdminClientCreateTopic {/*** 使用AdminClient创建Topic** 创建完成之后使用如下脚本进行检查* 进入KAFKA_HOME/bin* 执行 ./kafka-topics.sh --bootstrap-server localhost:9092 --list*/public static void createTopic(){Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient adminClient = AdminClient.create(props);NewTopic newTopic = new NewTopic("topic-create-api", 1, (short) 1);// 创建主题的方法内部是通过发送CreateTopicRequest请求来完成的CreateTopicsResult result = adminClient.createTopics(Arrays.asList(newTopic));try {result.all().get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient,释放资源adminClient.close();}public static void main(String[ ] args) {createTopic();}
}
使用KafkaAdminClient查看主题信息
public class KafkaAdminClientDescribeTopic {/*** 使用AdminClient查看Topic信息*/public static void describeTopic(){Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient adminClient = AdminClient.create(props);DescribeTopicsResult result = adminClient.describeTopics(Arrays.asList("topic-create-api"));try {Map<String, TopicDescription> map = result.all().get();for (Map.Entry<String, TopicDescription> entry : map.entrySet()) {System.out.println(entry.getKey() + " : " + entry.getValue());}} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient,释放资源adminClient.close();}public static void main(String[ ] args) {describeTopic();}
}
使用KafkaAdminClient查看所有可用的主题
public class KafkaAdminClientListTopic {/*** 使用AdminClient查看所有可用的Topic*/public static void listTopic(){Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient adminClient = AdminClient.create(props);ListTopicsResult result = adminClient.listTopics();try {Set<String> set = result.names().get();for (String s : set) {System.out.println(s);}} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient,释放资源adminClient.close();}public static void main(String[ ] args) {listTopic();}
}
使用KafkaAdminClient创建分区
public class KafkaAdminClientCreatePartition {/*** 使用AdminClient创建分区*/public static void createPartition(){Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient adminClient = AdminClient.create(props);Map<String, NewPartitions> map = new HashMap<>();NewPartitions newPartitions = NewPartitions.increaseTo(2);map.put("topic-create-api", newPartitions);CreatePartitionsResult result = adminClient.createPartitions(map);try {result.all().get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient,释放资源adminClient.close();}public static void main(String[ ] args) {createPartition();}
}
使用KafkaAdminClient删除主题
public class KafkaAdminClientDeleteTopic {/*** 使用AdminClient删除Topic*/public static void deleteTopic(){Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient adminClient = AdminClient.create(props);DeleteTopicsResult result = adminClient.deleteTopics(Arrays.asList("topic-create-api"));try {result.all().get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient,释放资源adminClient.close();}public static void main(String[ ] args) {deleteTopic();}
}
使用KafkaAdminClient修改主题配置
public class KafkaAdminClientAlterTopic {/*** 使用AdminClient修改Topic配置*/public static void alterTopic(){Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient adminClient = AdminClient.create(props);ConfigEntry configEntry = new ConfigEntry("max.message.bytes", "128000");Config config = new Config(Arrays.asList(configEntry));Map<ConfigResource, Config> map = new HashMap<>();ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "topic-create-api");map.put(configResource, config);AlterConfigsResult result = adminClient.alterConfigs(map);try {result.all().get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient,释放资源adminClient.close();}public static void main(String[ ] args) {alterTopic();}
}
使用KafkaAdminClient查看主题配置
public class KafkaAdminClientDescribeTopicConfig {/*** 使用AdminClient查看Topic配置*/public static void describeTopicConfig(){Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient adminClient = AdminClient.create(props);ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "topic-create-api");DescribeConfigsResult result = adminClient.describeConfigs(Arrays.asList(configResource));try {Map<ConfigResource, Config> map = result.all().get();for (Map.Entry<ConfigResource, Config> entry : map.entrySet()) {System.out.println(entry.getKey() + " : " + entry.getValue());}} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient,释放资源adminClient.close();}public static void main(String[ ] args) {describeTopicConfig();}
}
相关文章:
Kafka-TopicPartition
Kafka主题与分区 主题与分区 topic & partition,是Kafka两个核心的概念,也是Kafka的基本组织单元。 主题作为消息的归类,可以再细分为一个或多个分区,分区也可以看作对消息的二次归类。 分区的划分为kafka提供了可伸缩性、水…...
英特尔工作站:助力专业用户实现高效创作
原创 | 文 BFT机器人 英特尔工作站是由全球知名的英特尔公司设计和开发的一款计算平台。英特尔在工作站处理器领域将其产品分为性能型和移动型两类,它的诞生旨在满足专业用户在科学、工程、设计等领域对高性能计算的需求。英特尔工作站配备了最新的英特尔处理器、大…...
软件工程期末复习(选择+填空+判断)
文章目录 软件工程期末复习一、 选择题 软件工程期末复习 一、 选择题 1.“软件危机”的表现不包括:(c) A、软件产品不能按期交付 B、用户对“已完成的”软件产品时常不满意 C、程序员越来越供不应求 D、软件项目难以管理,维护困…...
群晖NAS基础设置
群晖NAS基础设置 最近一直在玩群晖NAS系统,有一些基础的配置跟大家分享一下 开启ssh登录 1.开启方法 控制面板—>终端和SNMP—>终端机 2.使用ssh软件登录 这里我用SecureCRT登录 进入ssh 3.进入root用户 starstar-nas:~$ sudo su -l root Password: ro…...
2023亚太杯数学建模A题B题C题选题建议,思路分析,模型代码
目录 ABC题思路模型代码:获取见文末名片,第一时间更新 视频连接讲解如上 A题思路:采果机器人的图像识别技术思路模型代码 B题思路:玻璃温室中的微气候法规 C题思路:我国新能源电动汽车的发展趋势 ABC题思路模型代…...
OpenGL的学习之路 -5
1.视景体 正交投影 人眼看世界,有一个可见范围。范围内可见,范围外不可见。视景体就是这么一个概念。 (上图仅学习记录用) 在OGL中,有两种投影方式,对应两种视景体。第一种,正交投影…...
【linux】服务器CPU占用50%,top/htop/ps却看不到异常进程?使用unhide可以查看!
问题描述 htop发现前32个核全被占满了,但是却找不到对应进程号 查杀 治标:杀死隐藏进程 1、unhide 安装unhide apt-get install unhideunhide使用 unhide proc果然发现了隐藏进程 kill -9 kill -9 [pid]这么多pid号,我这边杀了其中…...
JSP EL表达式获取list/Map集合与java Bean对象
上文 JSP EL表达式基本使用 中 我们对EL表达式做了一个基本的了解 也做了基础的字符串数据使用 那么 我们可以来看一下我们的集合 首先 list 这个比较简单 我们直接这样写代码 <% page import"java.util.ArrayList" %> <% page import"java.util.Lis…...
汇编程序:查找数组中最大最小值
实验内容 1. 从数据段DS中包含9个字节的数组数据VALUE中分别找出最大值(存到max中)、最小值(存到min中)。 2. 能够单步执行程序,认真观察、判断每条指令执行的结果是否正确,对错误结果,能够做出…...
ElasticSearch之禁用交换分区
操作系统将进程加载至内存中执行时,对于当前未使用到的内存页,可能会将相关内存页交换至硬盘上,即swap。 对于性能敏感、时延敏感的应用程序比如ElasticSearch,swap特性会明显影响性能和稳定性,因此最好禁用swap特性。…...
【Linux】第二十一站:文件(一)
文章目录 一、共识原理二、C系列文件接口三、从C过渡到系统:文件系统调用四、访问文件的本质 一、共识原理 文件 内容 属性 文件分为打开的文件 和 没打开的文件 打开的文件:是谁打开的?是进程!----所以研究打开的文件本质是研…...
centos7 docker开启认证的远程端口2376配置
docker开启2375会存在安全漏洞 暴露了2375端口的Docker主机。因为没有任何加密和认证过程,知道了主机IP以后,,任何人都可以管理这台主机上的容器和镜像,以前贪图方便,只开启了没有认证的docker2375端口,后…...
Java王者荣耀小游戏
Background类 package LX;import java.awt.*; //背景类 public class Background extends GameObject{public Background(GameFrame gameFrame) {super(gameFrame);}Image bg Toolkit.getDefaultToolkit().getImage("C:\\Users\\ASUS\\Desktop\\王者荣耀图片\\Map.jpg&…...
谈谈Redis的几种经典集群模式
目录 前言 主从复制 哨兵模式 分片集群 前言 Redis集群是一种通过将多个Redis节点连接在一起以实现高可用性、数据分片和负载均衡的技术。它允许Redis在不同节点上同时提供服务,提高整体性能和可靠性。在Redis中提供集群方案总共有三种:主从复制、…...
【腾讯云 HAI域探秘】基于高性能应用服务器HAI部署的 ChatGLM2-6B模型,我开发了AI办公助手,公司行政小姐姐用了都说好!
目录 前言 一、腾讯云HAI介绍: 1、即插即用 轻松上手 2、横向对比 青出于蓝 3、多种高性能应用部署场景 二、腾讯云HAI一键部署并使用ChatGLM2-6B快速实现开发者所需的相关API服务 1、登录 高性能应用服务 HAI 控制台 2、点击 新建 选择 AI模型,…...
服务器tar压缩解压文件
文章目录 一、前言二、命令2.1、解压2.2、压缩 三、最后 一、前言 前端上传dist代码到服务器上后,是在linux上操作,所以和window有所不同。一般是打好dist,然后压缩成gz传输到服务器,此时在服务器上可能涉及到解压和压缩的操作&a…...
博物馆线上导览系统的设计与实现-计算机毕业设计源码64574
摘 要 21世纪的今天,随着社会的不断发展与进步,人们对于信息科学化的认识,已由低层次向高层次发展,由原来的感性认识向理性认识提高,管理工作的重要性已逐渐被人们所认识,科学化的管理,使信息存…...
vue升级题
不熟悉的: 2, 3.你用过befcoreDetory 吗?清除定时器,第一个和第二个再看一下 实例加载完成是在哪个生命周期--beforecreate 7.父子组件生命周期执行顺序?为什么这么渲染?场景 8.简单描述每个周期具体适…...
Edit And Resend测试接口工具(浏览器上的Postman)
优点 可以不用设置Cookie或者Token,只设置参数进行重发接口测试API 使用Microsoft Rdge浏览器 F12——然后点击网络——在页面点击发起请求——然后选择要重发的请求右键选择Edit And Resend——在网络控制台设置自己要设置的参数去测试自己写的功能...
maven常用打包命令,值传递和引用传递,Java包 ,JDK 中常用的包有哪些,import java和javax有什么区别
文章目录 maven常用打包命令Java程序设计语言对对象采用的不是引用调用,实际上,对象引用是按值传递的。值传递和引用传递有什么区别Java包 ,JDK 中常用的包有哪些import java和javax有什么区别 谈谈java基础的内容,而且很多人都回…...
进程地址空间(比特课总结)
一、进程地址空间 1. 环境变量 1 )⽤户级环境变量与系统级环境变量 全局属性:环境变量具有全局属性,会被⼦进程继承。例如当bash启动⼦进程时,环 境变量会⾃动传递给⼦进程。 本地变量限制:本地变量只在当前进程(ba…...
从WWDC看苹果产品发展的规律
WWDC 是苹果公司一年一度面向全球开发者的盛会,其主题演讲展现了苹果在产品设计、技术路线、用户体验和生态系统构建上的核心理念与演进脉络。我们借助 ChatGPT Deep Research 工具,对过去十年 WWDC 主题演讲内容进行了系统化分析,形成了这份…...
循环冗余码校验CRC码 算法步骤+详细实例计算
通信过程:(白话解释) 我们将原始待发送的消息称为 M M M,依据发送接收消息双方约定的生成多项式 G ( x ) G(x) G(x)(意思就是 G ( x ) G(x) G(x) 是已知的)࿰…...
Cloudflare 从 Nginx 到 Pingora:性能、效率与安全的全面升级
在互联网的快速发展中,高性能、高效率和高安全性的网络服务成为了各大互联网基础设施提供商的核心追求。Cloudflare 作为全球领先的互联网安全和基础设施公司,近期做出了一个重大技术决策:弃用长期使用的 Nginx,转而采用其内部开发…...
【JavaSE】绘图与事件入门学习笔记
-Java绘图坐标体系 坐标体系-介绍 坐标原点位于左上角,以像素为单位。 在Java坐标系中,第一个是x坐标,表示当前位置为水平方向,距离坐标原点x个像素;第二个是y坐标,表示当前位置为垂直方向,距离坐标原点y个像素。 坐标体系-像素 …...
Map相关知识
数据结构 二叉树 二叉树,顾名思义,每个节点最多有两个“叉”,也就是两个子节点,分别是左子 节点和右子节点。不过,二叉树并不要求每个节点都有两个子节点,有的节点只 有左子节点,有的节点只有…...
C++ Visual Studio 2017厂商给的源码没有.sln文件 易兆微芯片下载工具加开机动画下载。
1.先用Visual Studio 2017打开Yichip YC31xx loader.vcxproj,再用Visual Studio 2022打开。再保侟就有.sln文件了。 易兆微芯片下载工具加开机动画下载 ExtraDownloadFile1Info.\logo.bin|0|0|10D2000|0 MFC应用兼容CMD 在BOOL CYichipYC31xxloaderDlg::OnIni…...
【开发技术】.Net使用FFmpeg视频特定帧上绘制内容
目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法,当前调用一个医疗行业的AI识别算法后返回…...
Xen Server服务器释放磁盘空间
disk.sh #!/bin/bashcd /run/sr-mount/e54f0646-ae11-0457-b64f-eba4673b824c # 全部虚拟机物理磁盘文件存储 a$(ls -l | awk {print $NF} | cut -d. -f1) # 使用中的虚拟机物理磁盘文件 b$(xe vm-disk-list --multiple | grep uuid | awk {print $NF})printf "%s\n"…...
管理学院权限管理系统开发总结
文章目录 🎓 管理学院权限管理系统开发总结 - 现代化Web应用实践之路📝 项目概述🏗️ 技术架构设计后端技术栈前端技术栈 💡 核心功能特性1. 用户管理模块2. 权限管理系统3. 统计报表功能4. 用户体验优化 🗄️ 数据库设…...
