当前位置: 首页 > article >正文

Kafka主题运维全指南:从基础配置到故障处理

#作者:张桐瑞

文章目录

  • 主题日常管理
  • 1. 修改主题分区。
  • 2. 修改主题级别参数。
  • 3. 变更副本数。
  • 4. 修改主题限速。
  • 5.主题分区迁移。
  • 6. 常见主题错误处理
    • 常见错误1:主题删除失败。
    • 常见错误2:__consumer_offsets占用太多的磁盘。

主题日常管理

所谓的日常管理,无非就是主题的增删改查。你可能会觉得,这有什么好讨论的,官网上不都有命令吗?这部分内容的确比较简单,但它是我们讨论后面内容的基础。而且,在讨论的过程中,我还会向你分享一些小技巧。另外,我们今天讨论的管理手段都是借助于Kafka自带的命令。事实上,在专栏后面,我们还会专门讨论如何使用Java API的方式来运维Kafka集群。

我们先来学习一下如何使用命令创建Kafka主题。Kafka提供了自带的kafka-topics脚本,用于帮助用户创建主题。该脚本文件位于Kafka安装目录的bin子目录下。如果你是在Windows上使用Kafka,那么该脚本位于bin路径的windows子目录下。一个典型的创建命令如下:
bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name --partitions 1 --replication-factor 1

create表明我们要创建主题,而partitions和replication factor分别设置了主题的分区数以及每个分区下的副本数。如果你之前使用过这个命令,你可能会感到奇怪:难道不是指定 –zookeeper参数吗?为什么现在变成 –bootstrap-server了呢?我来给出答案:从Kafka 2.2版本开始,社区推荐用 –bootstrap-server参数替换 –zookeeper参数,并且显式地将后者标记为“已过期”,因此,如果你已经在使用2.2版本了,那么创建主题请指定 –bootstrap-server参数。

社区推荐使用 –bootstrap-server而非 –zookeeper的原因主要有两个。

  1. 使用 –zookeeper会绕过Kafka的安全体系。这就是说,即使你为Kafka集群设置了安全认证,限制了主题的创建,如果你使用 –zookeeper的命令,依然能成功创建任意主题,不受认证体系的约束。这显然是Kafka集群的运维人员不希望看到的。
  2. 使用 –bootstrap-server与集群进行交互,越来越成为使用Kafka的标准姿势。换句话说,以后会有越来越少的命令和API需要与ZooKeeper进行连接。这样,我们只需要一套连接信息,就能与Kafka进行全方位的交互,不用像以前一样,必须同时维护ZooKeeper和Broker的连接信息。
    创建好主题之后,Kafka允许我们使用相同的脚本查询主题。你可以使用下面的命令,查询所有主题的列表。
    bin/kafka-topics.sh --bootstrap-server broker_host:port --list
    如果要查询单个主题的详细数据,你可以使用下面的命令。
bin/kafka-topics.sh --bootstrap-server broker_host:port --describe --topic <topic_name>

如果describe命令不指定具体的主题名称,那么Kafka默认会返回所有“可见”主题的详细数据给你。

这里的“可见”,是指发起这个命令的用户能够看到的Kafka主题。这和前面说到主题创建时,使用 –zookeeper和 –bootstrap-server的区别是一样的。如果指定了 –bootstrap-server,那么这条命令就会受到安全认证体系的约束,即对命令发起者进行权限验证,然后返回它能看到的主题。否则,如果指定 –zookeeper参数,那么默认会返回集群中所有的主题详细数据。基于这些原因,我建议你最好统一使用 –bootstrap-server连接参数。

说完了主题的“增”和“查”,我们说说如何“改”。Kafka中涉及到主题变更的地方有5处。

1. 修改主题分区。

其实就是增加分区,目前Kafka不允许减少某个主题的分区数。你可以使用kafka-topics脚本,结合 –alter参数来增加某个主题的分区数,命令如下:
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic <topic_name> --partitions <新分区数>
这里要注意的是,你指定的分区数一定要比原有分区数大,否则Kafka会抛出InvalidPartitionsException异常。

2. 修改主题级别参数。

在主题创建之后,我们可以使用kafka-configs脚本修改对应的参数。
这个用法我们在专栏第8讲中讨论过,现在先来复习一下。假设我们要设置主题级别参数max.message.bytes,那么命令如下:
bin/kafka-configs.sh --zookeeper zookeeper_host:port --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760
也许你会觉得奇怪,为什么这个脚本就要指定 –zookeeper,而不是 –bootstrap-server呢?其实,这个脚本也能指定 –bootstrap-server参数,只是它是用来设置动态参数的。在专栏后面,我会详细介绍什么是动态参数,以及动态参数都有哪些。现在,你只需要了解设置常规的主题级别参数,还是使用 –zookeeper。

3. 变更副本数。

使用自带的kafka-reassign-partitions脚本,帮助我们增加主题的副本数。这里先留个悬念,稍后我会拿Kafka内部主题__consumer_offsets来演示如何增加主题副本数。

4. 修改主题限速。

这里主要是指设置Leader副本和Follower副本使用的带宽。有时候,我们想要让某个主题的副本在执行副本同步机制时,不要消耗过多的带宽。Kafka提供了这样的功能。我来举个例子。假设我有个主题,名为test,我想让该主题各个分区的Leader副本和Follower副本在处理副本同步时,不得占用超过100MBps的带宽。注意是大写B,即每秒不超过100MB。那么,我们应该怎么设置呢?

要达到这个目的,我们必须先设置Broker端参数leader.replication.throttled.rate和follower.replication.throttled.rate,命令如下:

bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' --entity-type brokers --entity-name 0

这条命令结尾处的 –entity-name就是Broker ID。倘若该主题的副本分别在0、1、2、3多个Broker上,那么你还要依次为Broker 1、2、3执行这条命令。

设置好这个参数之后,我们还需要为该主题设置要限速的副本。在这个例子中,我们想要为所有副本都设置限速,因此统一使用通配符*来表示,命令如下:

bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' --entity-type topics --entity-name test

5.主题分区迁移。

同样是使用kafka-reassign-partitions脚本,对主题各个分区的副本进行“手术”般的调整,比如把某些分区批量迁移到其他Broker上。这种变更比较复杂,我会在专栏后面专门和你分享如何做主题的分区迁移。
最后,我们来聊聊如何删除主题。命令很简单,我直接分享给你。
bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic <topic_name>
删除主题的命令并不复杂,关键是删除操作是异步的,执行完这条命令不代表主题立即就被删除了。它仅仅是被标记成“已删除”状态而已。Kafka会在后台默默地开启主题删除操作。因此,通常情况下,你都需要耐心地等待一段时间。
特殊主题的管理与运维
说完了日常的主题管理操作,我们来聊聊Kafka内部主题consumer_offsets和transaction_state。前者你可能已经很熟悉了,后者是Kafka支持事务新引入的。如果在你的生产环境中,你看到很多带有consumer_offsets和transaction_state前缀的子目录,不用惊慌,这是正常的。这两个内部主题默认都有50个分区,因此,分区子目录会非常得多。
关于这两个内部主题,我的建议是不要手动创建或修改它们,还是让Kafka自动帮我们创建好了。不过这里有个比较隐晦的问题,那就是__consumer_offsets的副本数问题。

在Kafka 0.11之前,当Kafka自动创建该主题时,它会综合考虑当前运行的Broker台数和Broker端参数offsets.topic.replication.factor值,然后取两者的较小值作为该主题的副本数,但这就违背了用户设置offsets.topic.replication.factor的初衷。这正是很多用户感到困扰的地方:我的集群中有100台Broker,offsets.topic.replication.factor也设成了3,为什么我的__consumer_offsets主题只有1个副本?其实,这就是因为这个主题是在只有一台Broker启动时被创建的。

在0.11版本之后,社区修正了这个问题。也就是说,0.11之后,Kafka会严格遵守offsets.topic.replication.factor值。如果当前运行的Broker数量小于offsets.topic.replication.factor值,Kafka会创建主题失败,并显式抛出异常。

那么,如果该主题的副本值已经是1了,我们能否把它增加到3呢?当然可以。我们来看一下具体的方法。

第1步是创建一个json文件,显式提供50个分区对应的副本数。注意,replicas中的3台Broker排列顺序不同,目的是将Leader副本均匀地分散在Broker上。该文件具体格式如下:

{"version":1, "partitions":[{"topic":"__consumer_offsets","partition":0,"replicas":[0,1,2]}, {"topic":"__consumer_offsets","partition":1,"replicas":[0,2,1]},{"topic":"__consumer_offsets","partition":2,"replicas":[1,0,2]},{"topic":"__consumer_offsets","partition":3,"replicas":[1,2,0]},...{"topic":"__consumer_offsets","partition":49,"replicas":[0,1,2]}
]}`

第2步是执行kafka-reassign-partitions脚本,命令如下:

bin/kafka-reassign-partitions.sh --zookeeper zookeeper_host:port --reassignment-json-file reassign.json --execute

除了修改内部主题,我们可能还想查看这些内部主题的消息内容。特别是对于__consumer_offsets而言,由于它保存了消费者组的位移数据,有时候直接查看该主题消息是很方便的事情。下面的命令可以帮助我们直接查看消费者组提交的位移数据。

bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

除了查看位移提交数据,我们还可以直接读取该主题消息,查看消费者组的状态信息。

bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning

对于内部主题__transaction_state而言,方法是相同的。你只需要指定kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter即可。

6. 常见主题错误处理

最后,我们来说说与主题相关的常见错误,以及相应的处理方法。

常见错误1:主题删除失败。

当运行完上面的删除命令后,很多人发现已删除主题的分区数据依然“躺在”硬盘上,没有被清除。这时该怎么办呢?

实际上,造成主题删除失败的原因有很多,最常见的原因有两个:副本所在的Broker宕机了;待删除主题的部分分区依然在执行迁移过程。

如果是因为前者,通常你重启对应的Broker之后,删除操作就能自动恢复;如果是因为后者,那就麻烦了,很可能两个操作会相互干扰。

不管什么原因,一旦你碰到主题无法删除的问题,可以采用这样的方法:
第1步,手动删除ZooKeeper节点/admin/delete_topics下以待删除主题为名的znode。

第2步,手动删除该主题在磁盘上的分区目录。

第3步,在ZooKeeper中执行rmr /controller,触发Controller重选举,刷新Controller缓存。
在执行最后一步时,你一定要谨慎,因为它可能造成大面积的分区Leader重选举。事实上,仅仅执行前两步也是可以的,只是Controller缓存中没有清空待删除主题罢了,也不影响使用。

常见错误2:__consumer_offsets占用太多的磁盘。

一旦你发现这个主题消耗了过多的磁盘空间,那么,你一定要显式地用jstack命令查看一下kafka-log-cleaner-thread前缀的线程状态。通常情况下,这都是因为该线程挂掉了,无法及时清理此内部主题。倘若真是这个原因导致的,那我们就只能重启相应的Broker了。另外,请你注意保留出错日志,因为这通常都是Bug导致的,最好提交到社区看一下。

相关文章:

Kafka主题运维全指南:从基础配置到故障处理

#作者&#xff1a;张桐瑞 文章目录 主题日常管理1. 修改主题分区。2. 修改主题级别参数。3. 变更副本数。4. 修改主题限速。5.主题分区迁移。6. 常见主题错误处理常见错误1&#xff1a;主题删除失败。常见错误2&#xff1a;__consumer_offsets占用太多的磁盘。 主题日常管理 …...

AI语音助手的Python实现

引言 语音助手(如小爱同学、Siri)通过语音识别、自然语言处理(NLP)和语音合成技术,为用户提供直观、高效的交互体验。随着人工智能的普及,Python开发者可以利用开源库和AI模型,快速构建自定义语音助手。本文由浅入深,详细介绍如何使用Python开发AI语音助手,涵盖基础功…...

tomcat指定使用的jdk版本

说明 有时候需要对tomcat配置指定的jdk版本号&#xff0c;此时&#xff0c;我们可以通过以下方式进行配置 设置方式 找到tomcat的bin目录中的setclasspath.bat。如果是linux系统则是setclasspath.sh set JAVA_HOMEC:\Program Files\Java\jdk8 set JRE_HOMEC:\Program Files…...

CVPR2025重磅突破:AnomalyAny框架实现单样本生成逼真异常数据,破解视觉检测瓶颈!

本文介绍了一种名为AnomalyAny的创新框架&#xff0c;该方法利用Stable Diffusion的强大生成能力&#xff0c;仅需单个正常样本和文本描述&#xff0c;即可生成逼真且多样化的异常样本&#xff0c;有效解决了视觉异常检测中异常样本稀缺的难题&#xff0c;为工业质检、医疗影像…...

提升移动端网页调试效率:WebDebugX 与常见工具组合实践

在日常移动端开发中&#xff0c;网页调试始终是一个高频但又极具挑战的环节。尤其在面对 iOS 与 Android 的混合技术栈、各种设备差异化行为时&#xff0c;开发者迫切需要一套高效、可靠且跨平台的调试方案。过去&#xff0c;我们或多或少使用过 Chrome DevTools、Remote Debug…...

永磁同步电机无速度算法--基于卡尔曼滤波器的滑模观测器

一、原理介绍 传统滑模观测器采用如下结构&#xff1a; 传统SMO中LPF会带来相位延迟和幅值衰减&#xff0c;并且需要额外的相位补偿。 采用扩展卡尔曼滤波器代替常用低通滤波器(LPF)&#xff0c;可以去除高次谐波&#xff0c;并且不用相位补偿就可以获得一个误差较小的转子位…...

前端中slice和splic的区别

1. slice slice 用于从数组中提取一部分元素&#xff0c;返回一个新的数组。 特点&#xff1a; 不修改原数组&#xff1a;slice 不会改变原数组&#xff0c;而是返回一个新的数组。提取数组的部分&#xff1a;slice 会根据指定的开始索引和结束索引提取数组的一部分。不包含…...

什么是VR全景技术

VR全景技术&#xff0c;全称为虚拟现实全景技术&#xff0c;是通过计算机图像模拟生成三维空间中的虚拟世界&#xff0c;使用户能够在该虚拟世界中进行全方位、无死角的观察和交互的技术。VR全景技术模拟人在真实空间中的视觉体验&#xff0c;结合图文、3D、音视频等多媒体元素…...

在树莓派上添加音频输入设备的几种方法

在树莓派上添加音频输入设备可以通过以下步骤完成&#xff0c;具体方法取决于设备类型&#xff08;如USB麦克风、3.5mm接口麦克风或HDMI音频输入&#xff09;。以下是详细指南&#xff1a; 1. 连接音频输入设备 USB麦克风/声卡&#xff1a;直接插入树莓派的USB接口。3.5mm麦克…...

Kubernetes 网络模型深度解析:Pod IP 与 Service 的负载均衡机制,Service到底是什么?

Pod IP 的本质与特性 Pod IP 的定位 纯端点地址&#xff1a;Pod IP 是分配给 Pod 网络命名空间的真实 IP 地址&#xff08;如 10.244.1.2&#xff09;无特殊名称&#xff1a;在 Kubernetes 中&#xff0c;它通常被称为 “Pod IP” 或 “容器 IP”生命周期&#xff1a;与 Pod …...

零知开源——STM32F103RBT6驱动 ICM20948 九轴传感器及 vofa + 上位机可视化教程

STM32F1 本教程使用零知标准板&#xff08;STM32F103RBT6&#xff09;通过I2C驱动ICM20948九轴传感器&#xff0c;实现姿态解算&#xff0c;并通过串口将数据实时发送至VOFA上位机进行3D可视化。代码基于开源库修改优化&#xff0c;适合嵌入式及物联网开发者。在基础驱动上新增…...

WPF八大法则:告别模态窗口卡顿

⚙️ 核心问题&#xff1a;阻塞式模态窗口的缺陷 原始代码中ShowDialog()会阻塞UI线程&#xff0c;导致后续逻辑无法执行&#xff1a; var result modalWindow.ShowDialog(); // 线程阻塞 ProcessResult(result); // 必须等待窗口关闭根本问题&#xff1a…...

MySQL 主从同步异常处理

阅读原文&#xff1a;https://www.xiaozaoshu.top/articles/mysql-m-s-update-pk MySQL 做双主&#xff0c;遇到的这个错误&#xff1a; Could not execute Update_rows event on table ... Error_code: 1032是 MySQL 主从复制时的经典错误之一&#xff0c;通常表示&#xff…...

【Elasticsearch】Elasticsearch 在大数据生态圈的地位 实践经验

Elasticsearch 在大数据生态圈的地位 & 实践经验 1.Elasticsearch 的优势1.1 Elasticsearch 解决的核心问题1.1.1 传统方案的短板1.1.2 Elasticsearch 的解决方案 1.2 与大数据组件的对比优势1.3 关键优势技术支撑1.4 Elasticsearch 的竞品1.4.1 全文搜索领域1.4.2 日志分析…...

【LeetCode】算法详解#6 ---除自身以外数组的乘积

1.题目介绍 给定一个整数数组 nums&#xff0c;返回 数组 answer &#xff0c;其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法&#xff0c;且在 O…...

抽象类和接口(全)

一、抽象类 1.概念&#xff1a;如果⼀个类中没有包含⾜够的信息来描绘⼀个具体的对象&#xff0c;这样的类就是抽象类。 像是没有实际⼯作的⽅法,我们可以把它设计成⼀个抽象⽅法&#xff0c;包含抽象⽅法的类我们称为抽象类。 2.语法 在Java中&#xff0c;⼀个类如果被 abs…...

实战三:开发网页端界面完成黑白视频转为彩色视频

​一、需求描述 设计一个简单的视频上色应用&#xff0c;用户可以通过网页界面上传黑白视频&#xff0c;系统会自动将其转换为彩色视频。整个过程对用户来说非常简单直观&#xff0c;不需要了解技术细节。 效果图 ​二、实现思路 总体思路&#xff1a; 用户通过Gradio界面上…...

Oracle11g安装包

Oracle 11g安装包 适用于windows系统&#xff0c;64位 下载路径 oracle 11g 安装包...

深入浅出Diffusion模型:从原理到实践的全方位教程

I. 引言&#xff1a;生成式AI的黎明 – Diffusion模型是什么&#xff1f; 近年来&#xff0c;生成式人工智能&#xff08;Generative AI&#xff09;领域取得了爆炸性的进展&#xff0c;模型能够根据简单的文本提示创作出逼真的图像、连贯的文本&#xff0c;乃至更多令人惊叹的…...

深度学习之模型压缩三驾马车:模型剪枝、模型量化、知识蒸馏

一、引言 在深度学习中&#xff0c;我们训练出的神经网络往往非常庞大&#xff08;比如像 ResNet、YOLOv8、Vision Transformer&#xff09;&#xff0c;虽然精度很高&#xff0c;但“太重”了&#xff0c;运行起来很慢&#xff0c;占用内存大&#xff0c;不适合部署到手机、摄…...

系统掌握PyTorch:图解张量、Autograd、DataLoader、nn.Module与实战模型

本文较长&#xff0c;建议点赞收藏&#xff0c;以免遗失。更多AI大模型应用开发学习视频及资料&#xff0c;尽在聚客AI学院。 本文通过代码驱动的方式&#xff0c;系统讲解PyTorch核心概念和实战技巧&#xff0c;涵盖张量操作、自动微分、数据加载、模型构建和训练全流程&#…...

【SpringBoot自动化部署】

SpringBoot自动化部署方法 使用Jenkins进行持续集成与部署 Jenkins是最常用的自动化部署工具之一&#xff0c;能够实现代码拉取、构建、测试和部署的全流程自动化。 配置Jenkins任务时&#xff0c;需要添加Git仓库地址和凭证&#xff0c;设置构建触发器&#xff08;如GitHub…...

go 里面的指针

指针 在 Go 中&#xff0c;指针&#xff08;pointer&#xff09;是一个变量的内存地址&#xff0c;就像 C 语言那样&#xff1a; a : 10 p : &a // p 是一个指向 a 的指针 fmt.Println(*p) // 输出 10&#xff0c;通过指针解引用• &a 表示获取变量 a 的地址 p 表示…...

nnUNet V2修改网络——暴力替换网络为UNet++

更换前,要用nnUNet V2跑通所用数据集,证明nnUNet V2、数据集、运行环境等没有问题 阅读nnU-Net V2 的 U-Net结构,初步了解要修改的网络,知己知彼,修改起来才能游刃有余。 U-Net存在两个局限,一是网络的最佳深度因应用场景而异,这取决于任务的难度和可用于训练的标注数…...

论文阅读:LLM4Drive: A Survey of Large Language Models for Autonomous Driving

地址&#xff1a;LLM4Drive: A Survey of Large Language Models for Autonomous Driving 摘要翻译 自动驾驶技术作为推动交通和城市出行变革的催化剂&#xff0c;正从基于规则的系统向数据驱动策略转变。传统的模块化系统受限于级联模块间的累积误差和缺乏灵活性的预设规则。…...

Cilium动手实验室: 精通之旅---13.Cilium LoadBalancer IPAM and L2 Service Announcement

Cilium动手实验室: 精通之旅---13.Cilium LoadBalancer IPAM and L2 Service Announcement 1. LAB环境2. L2公告策略2.1 部署Death Star2.2 访问服务2.3 部署L2公告策略2.4 服务宣告 3. 可视化 ARP 流量3.1 部署新服务3.2 准备可视化3.3 再次请求 4. 自动IPAM4.1 IPAM Pool4.2 …...

数学建模-滑翔伞伞翼面积的设计,运动状态计算和优化 !

我们考虑滑翔伞的伞翼面积设计问题以及运动状态描述。滑翔伞的性能主要取决于伞翼面积、气动特性以及飞行员的重量。我们的目标是建立数学模型来描述滑翔伞的运动状态,并优化伞翼面积的设计。 一、问题分析 滑翔伞在飞行过程中受到重力、升力和阻力的作用。升力和阻力与伞翼面…...

tauri项目,如何在rust端读取电脑环境变量

如果想在前端通过调用来获取环境变量的值&#xff0c;可以通过标准的依赖&#xff1a; std::env::var(name).ok() 想在前端通过调用来获取&#xff0c;可以写一个command函数&#xff1a; #[tauri::command] pub fn get_env_var(name: String) -> Result<String, Stri…...

MyBatis中关于缓存的理解

MyBatis缓存 MyBatis系统当中默认定义两级缓存&#xff1a;一级缓存、二级缓存 默认情况下&#xff0c;只有一级缓存开启&#xff08;sqlSession级别的缓存&#xff09;二级缓存需要手动开启配置&#xff0c;需要局域namespace级别的缓存 一级缓存&#xff08;本地缓存&#…...

学习一下用鸿蒙​​DevEco Studio HarmonyOS5实现百度地图

在鸿蒙&#xff08;HarmonyOS5&#xff09;中集成百度地图&#xff0c;可以通过以下步骤和技术方案实现。结合鸿蒙的分布式能力和百度地图的API&#xff0c;可以构建跨设备的定位、导航和地图展示功能。 ​​1. 鸿蒙环境准备​​ ​​开发工具​​&#xff1a;下载安装 ​​De…...