Kafka 命令详解及使用示例
文章目录
- Kafka 命令详解及使用示例
- Kafka 命令详解
- `kafka-topics.sh`:主题管理
- 创建主题
- 创建带副本的主题
- 修改主题分区数
- 了解分区分布
- 列出主题
- 查看主题详情
- 删除主题
- `kafka-console-producer.sh`:消息生产者
- 发送消息到主题
- 带键值对的消息
- 消息生产性能优化
- 带分区键的消息发送
- `kafka-console-consumer.sh`:消息消费者
- 消费主题中的消息
- 只读取键值对消息
- 实时消费消息
- 只消费特定分区的消息
- 以 JSON 格式输出消息
- `kafka-consumer-groups.sh`:消费者组管理
- 查看消费者组信息
- 查看消费者组的偏移量信息
- 重置消费者组的偏移量
- `kafka-configs.sh`:配置管理
- 查看主题配置
- 修改主题配置
- `kafka-acls.sh`:访问控制列表管理
- 为用户创建权限
- 删除用户权限
- 示例总结
Kafka 命令详解及使用示例
Kafka 是一个分布式流处理平台,提供了高吞吐量、低延迟的消息系统。Kafka 主要用于消息发布-订阅模式中的消息传输,广泛应用于数据管道、日志系统、事件追踪等场景。本文将介绍 Kafka 中常用的命令行工具及其具体使用方式,帮助开发者更好地管理和使用 Kafka。
Kafka 命令详解
kafka-topics.sh
:主题管理
主题(Topic)是 Kafka 中消息的逻辑分类,所有消息都发送到指定的主题中。kafka-topics.sh
用于管理主题,包括创建、删除、列出主题等操作。
创建主题
bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
--topic
:主题名称。--partitions
:分区数,消息将分布在多个分区中。--replication-factor
:副本因子,用于消息的高可用性。
创建带副本的主题
在分布式环境中,副本对于 Kafka 来说至关重要,它能确保在 Broker 故障时,消息不会丢失。创建主题时设置合适的副本数和分区数非常关键。
bin/kafka-topics.sh --create --topic important-topic --partitions 5 --replication-factor 3 --bootstrap-server localhost:9092
--partitions
设置为 5,意味着主题的数据会被分散到 5 个分区中,提升并发处理能力。--replication-factor
设置为 3,确保每个分区有 3 个副本(在不同的 Broker 上),提高容错性。
注意:副本数不能超过集群中的 Broker 数量,生产环境中一般设置副本数为 3,保证高可用性。
修改主题分区数
Kafka 支持在线扩展主题的分区数。可以在不停止服务的情况下动态增加分区数,但要注意增加分区会影响数据的顺序性,因为 Kafka 不会自动对已存在的数据进行重分配。
bin/kafka-topics.sh --alter --topic important-topic --partitions 10 --bootstrap-server localhost:9092
此命令将 important-topic
的分区数从 5 扩展到 10 个。
了解分区分布
通过 --describe
命令,可以查看每个分区在哪些 Broker 上存储,并了解它们的副本状态。
bin/kafka-topics.sh --describe --topic important-topic --bootstrap-server localhost:9092
输出的结果会显示每个分区的副本和首领(Leader)在哪个 Broker 上。Leader 是处理读写请求的副本,其他副本是跟随者,用于容错。
列出主题
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
查看主题详情
bin/kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092
删除主题
bin/kafka-topics.sh --delete --topic my-topic --bootstrap-server localhost:9092
kafka-console-producer.sh
:消息生产者
Kafka 提供了一个控制台生产者工具,允许我们从命令行发送消息到指定主题。
发送消息到主题
bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092
输入消息后按 Enter
发送到 Kafka 主题。
带键值对的消息
bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092 --property "parse.key=true" --property "key.separator=:"
在这里,消息的键和值通过冒号分隔,例如:
key1:value1
key2:value2
消息生产性能优化
在高吞吐量场景下,可以通过调整生产者配置来提高性能。例如,批量发送消息和异步生产可以显著提高效率。
bin/kafka-console-producer.sh --topic fast-topic --bootstrap-server localhost:9092 --producer-property batch.size=16384 --producer-property linger.ms=5
batch.size
:控制批量消息的大小(以字节为单位),Kafka 会尝试将消息累积到这个大小后一起发送。linger.ms
:在批量消息发送前的等待时间,可以通过稍微延迟发送消息来增加批量的大小。
此外,生产者可以配置为异步发送,这样可以减少网络等待时间:
--producer-property acks=1
acks=1
表示只等待 Leader 确认即可继续发送消息,这种方式可以提高性能,但有可能在 Leader 故障时丢失部分消息。
带分区键的消息发送
指定消息发送到特定的分区时,可以使用 key
参数,这在有状态的消息处理(如事务处理)场景中非常重要。
bin/kafka-console-producer.sh --topic partitioned-topic --bootstrap-server localhost:9092 --property "parse.key=true" --property "key.separator=:"
这样每个消息都会根据键(key)被分配到相同的分区。例如,key1:message1
和 key1:message2
会发送到相同的分区。
kafka-console-consumer.sh
:消息消费者
消费者用于从 Kafka 主题中读取消息。kafka-console-consumer.sh
是 Kafka 提供的命令行消费者工具。
消费主题中的消息
bin/kafka-console-consumer.sh --topic my-topic --from-beginning --bootstrap-server localhost:9092
--from-beginning
:从主题的起始位置读取所有消息。
只读取键值对消息
bin/kafka-console-consumer.sh --topic my-topic --bootstrap-server localhost:9092 --property print.key=true --property key.separator=,
这样读取的消息会显示为键和值的格式,例如:
key1,value1
实时消费消息
使用 kafka-console-consumer.sh
来实时消费主题中的消息:
bin/kafka-console-consumer.sh --topic fast-topic --bootstrap-server localhost:9092
如果要从主题的起始位置读取消息,可以添加 --from-beginning
参数。
只消费特定分区的消息
Kafka 支持直接从某个分区中读取消息。在某些场景下(如故障恢复或日志分析),我们可能只需要处理某个分区的数据:
bin/kafka-console-consumer.sh --topic important-topic --bootstrap-server localhost:9092 --partition 0 --offset 10
此命令从分区 0 开始读取第 10 条消息。
以 JSON 格式输出消息
Kafka 消费者可以输出 JSON 格式的消息,方便后续处理和分析:
bin/kafka-console-consumer.sh --topic json-topic --bootstrap-server localhost:9092 --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.separator=, --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka-consumer-groups.sh
:消费者组管理
Kafka 的消费者组允许多个消费者一起协同消费消息,每个分区的消息只能被一个组内的消费者消费。kafka-consumer-groups.sh
可以用于管理消费者组。
查看消费者组信息
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
查看消费者组的偏移量信息
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
输出信息包括每个分区的已消费消息偏移量以及消费者的状态。
重置消费者组的偏移量
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --reset-offsets --to-earliest --execute --topic my-topic
--to-earliest
:将偏移量重置为最早的消息。
kafka-configs.sh
:配置管理
Kafka 主题和代理的配置可以通过 kafka-configs.sh
进行管理。
查看主题配置
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --describe
修改主题配置
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --alter --add-config retention.ms=172800000
此命令将主题 my-topic
的消息保留时间修改为 2 天(单位为毫秒)。
kafka-acls.sh
:访问控制列表管理
Kafka 提供了基于 ACL(访问控制列表)的权限管理。kafka-acls.sh
用于管理权限。
为用户创建权限
bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Alice --operation All --topic my-topic
此命令允许用户 Alice
对主题 my-topic
执行所有操作。
删除用户权限
bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --allow-principal User:Alice --operation All --topic my-topic
示例总结
我们通过几个简单的示例介绍了 Kafka 的基本操作:
- 创建主题
my-topic
,并通过控制台生产者发送消息。 - 使用控制台消费者从该主题中读取消息。
- 管理消费者组的偏移量,重置到最早的消息。
- 修改主题的保留时间,以及管理用户的权限。
Kafka 提供了丰富的命令行工具,用于主题、消费者组、配置、权限等的管理。灵活使用这些命令,可以帮助我们高效地维护 Kafka 集群。
相关文章:
Kafka 命令详解及使用示例
文章目录 Kafka 命令详解及使用示例Kafka 命令详解kafka-topics.sh:主题管理创建主题创建带副本的主题修改主题分区数了解分区分布列出主题查看主题详情删除主题 kafka-console-producer.sh:消息生产者发送消息到主题带键值对的消息消息生产性能优化带分…...

重生归来之挖掘stm32底层知识(1)——寄存器
概念理解 要使用stm32首先要知道什么是引脚和寄存器。 如下图所示,芯片通过这些金属丝与电路板连接,这些金属丝叫做引脚。一般做软件开发是不需要了解芯片是怎么焊的,只要会使用就行。我们平常通过编程来控制这些引脚的输入和输出,…...

Qt构建JSON及解析JSON
目录 一.JSON简介 JSON对象 JSON数组 二.Qt中JSON介绍 QJsonvalue Qt中JSON对象 Qt中JSON数组 QJsonDocument 三.Qt构建JSON数组 四.解析JSON数组 一.JSON简介 一般来讲C类和对象在java中是无法直接直接使用的,因为压根就不是一个规则。但是他们在内存中…...

合宙Air201模组LuatOS扩展功能:温湿度传感器篇!
通过前面几期的学习,同学们的学习热情越来越高。 合宙Air201模组除了支持3种定位方式外,还具有丰富的扩展功能,比如:通过外扩BTB链接方案,最多可支持21个IO接口:SPI、I2C、UART等多种接口全部支持。 本期…...
主流敏捷工具scrum工具
在当今的快速变化和高需求的业务环境中,敏捷开发已经成为许多企业实现快速迭代和响应市场需求的重要方法。而在众多敏捷工具中,选择适合自己团队的工具尤为重要。 今天,我们将对比几款主流的敏捷工具,供参考 1. Leangoo领歌&…...
探索微服务架构:从理论到实践,深度剖析其优缺点
微服务架构(Microservice Architecture)是一种软件开发架构形式,它的核心 思想是将大型应用程序拆分成一组小的服务,每个服务都运行在其独立的进程中,并且 服务与服务之间通过轻量级的通信机制(如HTTP REST…...

2024 年最佳 Chrome 验证码扩展,解决 reCAPTCHA 问题
验证码,特别是 reCAPTCHA,已成为在线安全的不可或缺的一部分。虽然它们在区分人类和机器人方面起着至关重要的作用,但它们也可能成为合法用户和从事网络自动化的企业的主要障碍。无论您是试图简化在线体验的个人,还是依赖自动化工…...
Go语言现代web开发defer 延迟执行
The defer statement will delay the execution of a function until the surrounding function is completed. Although execution is postponed, funciton arguments will be evaluated immediately. defer语句将延迟函数的执行,直到周围的函数完成。虽然执行被延…...

Vue路由二(嵌套多级路由、路由query传参、路由命名、路由params传参、props配置、<router-link>的replace属性)
目录 1. 嵌套(多级)路由2. 路由query传参3. 路由命名4. 路由params传参5. props配置6. <router-link>的replace属性 1. 嵌套(多级)路由 pages/Car.vue <template><ul><li>car1</li><li>car2</li><li>car3</li></ul…...

【RabbitMQ】可靠性传输
概述 作为消息中间件来说,最重要的任务就是收发消息。因此我们在收发消息的过程中,就要考虑消息是否会丢失的问题。结果是必然的,假设我们没有采取任何措施,那么消息一定会丢失。对于一些不那么重要的业务来说,消息丢失…...

【论文阅读】PERCEIVER-ACTOR: A Multi-Task Transformer for Robotic Manipulation
Abstract transformers凭借其对大型数据集的扩展能力,彻底改变了视觉和自然语言处理。但在机器人操作中,数据既有限又昂贵。通过正确的问题表述,操纵仍然可以从变形金刚中受益吗?我们使用peract来研究这个问题,peract…...
Linux 常用指令
Linux 常用指令 这是本人在备战 CSP 初赛做 Linux 指令题时,心血来潮整理的,希望对大家有帮助。如有错误或有补充,麻烦私信或评论指出。 表格按字母顺序排列 命令作用alias对命令重命名cal显示日历的指令cat查看文本文件的内容cd改变当前工…...

使用 PHPstudy 建立ThinkPHP8 本地集成环境
安装Composer 下载地址:https://getcomposer.org/Composer-Setup.exehttps://getcomposer.org/Composer-Setup.exe 打开PHPstudy创建网站: cmd终端进入PHPstudy www根目录下: 执行代码:cd phpstudy www 根目录地址 cd C:\phpst…...
【系统架构设计】软件的知识产权保护+标准化概论+应用数学+云计算
【系统架构设计】软件的知识产权保护标准化概论应用数学云计算 软件的知识产权保护标准化概论应用数学云计算 软件的知识产权保护 在该部分内容中,以下几点需要注意: 如果作品是委托创作的,著作权的归属应通过委托人和受托人之间的合同来确…...

解决使用阿里云DataV Geo在线地图路径访问403问题
文章目录 1. DataV Geo在线地图路径访问403问题2. 解决方法3. 重启生效 1. DataV Geo在线地图路径访问403问题 最近在写一个省市下钻的demo,用到的是 阿里云DataV Geo在线地图 去动态获取GeoJSON 省市的数据,如下代码 axios.get("https://geo.dat…...
linux 使用SSH密钥配置免密登录
需求:多台主机SSH免密登录,需要使用同一个密钥对 操作: 在Linux中,使用SSH密钥对来在多台主机之间配置免密登录。以下是配置步骤: 在你的本地机器上生成一个SSH密钥对。如果你已经有一个,你可以跳过这一…...
python教程(二):python数据结构大全(附代码)
Python 中数据结构的重要性不言而喻,它们是构建高效、可维护代码的基础。数据结构决定了如何存储、组织和操作数据。理解和使用合适的数据结构能够极大地提升程序的性能、简洁性以及代码的可读性。 Python 的基础数据结构有 4 种,分别是 列表 (list)、元…...

MySQL基于GTID同步模式搭建主从复制
系列文章目录 rpmbuild构建mysql5.7.42版本的rpm包 文章目录 系列文章目录一、mysql-5.7.42RPM包构建二、同步模式分类介绍1.异步同步模式2.半同步模式2.1.实现半同步操作流程2.2.半同步问题总结2.3.半同步一致性2.4.异步与半同步对比 3.GTID同步 三、GTID同步介绍1.gtid介绍2…...
RecyclerView的子项长按选择功能
在Android开发中,实现RecyclerView的子项长按选择功能通常涉及到几个关键步骤:设置RecyclerView的ItemTouchListener来监听长按事件,管理选中状态,以及更新UI以反映选中状态。以下是一个基本的实现步骤和示例代码。 1. 定义数据模…...

mongoDB-1
文章目录 一、疑似坑1.11.2 mongo ops manager1.3 mongo features视图固定大小集合(有点类似ringbuffer数据结构,capped collections)(聚簇集合)clustered collection(类比到Mysql的聚簇索引)聚合管道 aggregation pipelineWiredTiger (默认存…...
【Java学习笔记】Arrays类
Arrays 类 1. 导入包:import java.util.Arrays 2. 常用方法一览表 方法描述Arrays.toString()返回数组的字符串形式Arrays.sort()排序(自然排序和定制排序)Arrays.binarySearch()通过二分搜索法进行查找(前提:数组是…...

CMake 从 GitHub 下载第三方库并使用
有时我们希望直接使用 GitHub 上的开源库,而不想手动下载、编译和安装。 可以利用 CMake 提供的 FetchContent 模块来实现自动下载、构建和链接第三方库。 FetchContent 命令官方文档✅ 示例代码 我们将以 fmt 这个流行的格式化库为例,演示如何: 使用 FetchContent 从 GitH…...

ArcGIS Pro制作水平横向图例+多级标注
今天介绍下载ArcGIS Pro中如何设置水平横向图例。 之前我们介绍了ArcGIS的横向图例制作:ArcGIS横向、多列图例、顺序重排、符号居中、批量更改图例符号等等(ArcGIS出图图例8大技巧),那这次我们看看ArcGIS Pro如何更加快捷的操作。…...

初学 pytest 记录
安装 pip install pytest用例可以是函数也可以是类中的方法 def test_func():print()class TestAdd: # def __init__(self): 在 pytest 中不可以使用__init__方法 # self.cc 12345 pytest.mark.api def test_str(self):res add(1, 2)assert res 12def test_int(self):r…...

【数据分析】R版IntelliGenes用于生物标志物发现的可解释机器学习
禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者! 文章目录 介绍流程步骤1. 输入数据2. 特征选择3. 模型训练4. I-Genes 评分计算5. 输出结果 IntelliGenesR 安装包1. 特征选择2. 模型训练和评估3. I-Genes 评分计…...

算法笔记2
1.字符串拼接最好用StringBuilder,不用String 2.创建List<>类型的数组并创建内存 List arr[] new ArrayList[26]; Arrays.setAll(arr, i -> new ArrayList<>()); 3.去掉首尾空格...

C++使用 new 来创建动态数组
问题: 不能使用变量定义数组大小 原因: 这是因为数组在内存中是连续存储的,编译器需要在编译阶段就确定数组的大小,以便正确地分配内存空间。如果允许使用变量来定义数组的大小,那么编译器就无法在编译时确定数组的大…...

android13 app的触摸问题定位分析流程
一、知识点 一般来说,触摸问题都是app层面出问题,我们可以在ViewRootImpl.java添加log的方式定位;如果是touchableRegion的计算问题,就会相对比较麻烦了,需要通过adb shell dumpsys input > input.log指令,且通过打印堆栈的方式,逐步定位问题,并找到修改方案。 问题…...
C语言中提供的第三方库之哈希表实现
一. 简介 前面一篇文章简单学习了C语言中第三方库(uthash库)提供对哈希表的操作,文章如下: C语言中提供的第三方库uthash常用接口-CSDN博客 本文简单学习一下第三方库 uthash库对哈希表的操作。 二. uthash库哈希表操作示例 u…...

永磁同步电机无速度算法--基于卡尔曼滤波器的滑模观测器
一、原理介绍 传统滑模观测器采用如下结构: 传统SMO中LPF会带来相位延迟和幅值衰减,并且需要额外的相位补偿。 采用扩展卡尔曼滤波器代替常用低通滤波器(LPF),可以去除高次谐波,并且不用相位补偿就可以获得一个误差较小的转子位…...