Kafka 命令详解及使用示例
文章目录
- Kafka 命令详解及使用示例
- Kafka 命令详解
- `kafka-topics.sh`:主题管理
- 创建主题
- 创建带副本的主题
- 修改主题分区数
- 了解分区分布
- 列出主题
- 查看主题详情
- 删除主题
- `kafka-console-producer.sh`:消息生产者
- 发送消息到主题
- 带键值对的消息
- 消息生产性能优化
- 带分区键的消息发送
- `kafka-console-consumer.sh`:消息消费者
- 消费主题中的消息
- 只读取键值对消息
- 实时消费消息
- 只消费特定分区的消息
- 以 JSON 格式输出消息
- `kafka-consumer-groups.sh`:消费者组管理
- 查看消费者组信息
- 查看消费者组的偏移量信息
- 重置消费者组的偏移量
- `kafka-configs.sh`:配置管理
- 查看主题配置
- 修改主题配置
- `kafka-acls.sh`:访问控制列表管理
- 为用户创建权限
- 删除用户权限
- 示例总结
Kafka 命令详解及使用示例
Kafka 是一个分布式流处理平台,提供了高吞吐量、低延迟的消息系统。Kafka 主要用于消息发布-订阅模式中的消息传输,广泛应用于数据管道、日志系统、事件追踪等场景。本文将介绍 Kafka 中常用的命令行工具及其具体使用方式,帮助开发者更好地管理和使用 Kafka。
Kafka 命令详解
kafka-topics.sh:主题管理
主题(Topic)是 Kafka 中消息的逻辑分类,所有消息都发送到指定的主题中。kafka-topics.sh 用于管理主题,包括创建、删除、列出主题等操作。
创建主题
bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
--topic:主题名称。--partitions:分区数,消息将分布在多个分区中。--replication-factor:副本因子,用于消息的高可用性。
创建带副本的主题
在分布式环境中,副本对于 Kafka 来说至关重要,它能确保在 Broker 故障时,消息不会丢失。创建主题时设置合适的副本数和分区数非常关键。
bin/kafka-topics.sh --create --topic important-topic --partitions 5 --replication-factor 3 --bootstrap-server localhost:9092
--partitions设置为 5,意味着主题的数据会被分散到 5 个分区中,提升并发处理能力。--replication-factor设置为 3,确保每个分区有 3 个副本(在不同的 Broker 上),提高容错性。
注意:副本数不能超过集群中的 Broker 数量,生产环境中一般设置副本数为 3,保证高可用性。
修改主题分区数
Kafka 支持在线扩展主题的分区数。可以在不停止服务的情况下动态增加分区数,但要注意增加分区会影响数据的顺序性,因为 Kafka 不会自动对已存在的数据进行重分配。
bin/kafka-topics.sh --alter --topic important-topic --partitions 10 --bootstrap-server localhost:9092
此命令将 important-topic 的分区数从 5 扩展到 10 个。
了解分区分布
通过 --describe 命令,可以查看每个分区在哪些 Broker 上存储,并了解它们的副本状态。
bin/kafka-topics.sh --describe --topic important-topic --bootstrap-server localhost:9092
输出的结果会显示每个分区的副本和首领(Leader)在哪个 Broker 上。Leader 是处理读写请求的副本,其他副本是跟随者,用于容错。
列出主题
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
查看主题详情
bin/kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092
删除主题
bin/kafka-topics.sh --delete --topic my-topic --bootstrap-server localhost:9092
kafka-console-producer.sh:消息生产者
Kafka 提供了一个控制台生产者工具,允许我们从命令行发送消息到指定主题。
发送消息到主题
bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092
输入消息后按 Enter 发送到 Kafka 主题。
带键值对的消息
bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092 --property "parse.key=true" --property "key.separator=:"
在这里,消息的键和值通过冒号分隔,例如:
key1:value1
key2:value2
消息生产性能优化
在高吞吐量场景下,可以通过调整生产者配置来提高性能。例如,批量发送消息和异步生产可以显著提高效率。
bin/kafka-console-producer.sh --topic fast-topic --bootstrap-server localhost:9092 --producer-property batch.size=16384 --producer-property linger.ms=5
batch.size:控制批量消息的大小(以字节为单位),Kafka 会尝试将消息累积到这个大小后一起发送。linger.ms:在批量消息发送前的等待时间,可以通过稍微延迟发送消息来增加批量的大小。
此外,生产者可以配置为异步发送,这样可以减少网络等待时间:
--producer-property acks=1
acks=1表示只等待 Leader 确认即可继续发送消息,这种方式可以提高性能,但有可能在 Leader 故障时丢失部分消息。
带分区键的消息发送
指定消息发送到特定的分区时,可以使用 key 参数,这在有状态的消息处理(如事务处理)场景中非常重要。
bin/kafka-console-producer.sh --topic partitioned-topic --bootstrap-server localhost:9092 --property "parse.key=true" --property "key.separator=:"
这样每个消息都会根据键(key)被分配到相同的分区。例如,key1:message1 和 key1:message2 会发送到相同的分区。
kafka-console-consumer.sh:消息消费者
消费者用于从 Kafka 主题中读取消息。kafka-console-consumer.sh 是 Kafka 提供的命令行消费者工具。
消费主题中的消息
bin/kafka-console-consumer.sh --topic my-topic --from-beginning --bootstrap-server localhost:9092
--from-beginning:从主题的起始位置读取所有消息。
只读取键值对消息
bin/kafka-console-consumer.sh --topic my-topic --bootstrap-server localhost:9092 --property print.key=true --property key.separator=,
这样读取的消息会显示为键和值的格式,例如:
key1,value1
实时消费消息
使用 kafka-console-consumer.sh 来实时消费主题中的消息:
bin/kafka-console-consumer.sh --topic fast-topic --bootstrap-server localhost:9092
如果要从主题的起始位置读取消息,可以添加 --from-beginning 参数。
只消费特定分区的消息
Kafka 支持直接从某个分区中读取消息。在某些场景下(如故障恢复或日志分析),我们可能只需要处理某个分区的数据:
bin/kafka-console-consumer.sh --topic important-topic --bootstrap-server localhost:9092 --partition 0 --offset 10
此命令从分区 0 开始读取第 10 条消息。
以 JSON 格式输出消息
Kafka 消费者可以输出 JSON 格式的消息,方便后续处理和分析:
bin/kafka-console-consumer.sh --topic json-topic --bootstrap-server localhost:9092 --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.separator=, --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka-consumer-groups.sh:消费者组管理
Kafka 的消费者组允许多个消费者一起协同消费消息,每个分区的消息只能被一个组内的消费者消费。kafka-consumer-groups.sh 可以用于管理消费者组。
查看消费者组信息
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
查看消费者组的偏移量信息
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
输出信息包括每个分区的已消费消息偏移量以及消费者的状态。
重置消费者组的偏移量
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --reset-offsets --to-earliest --execute --topic my-topic
--to-earliest:将偏移量重置为最早的消息。
kafka-configs.sh:配置管理
Kafka 主题和代理的配置可以通过 kafka-configs.sh 进行管理。
查看主题配置
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --describe
修改主题配置
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --alter --add-config retention.ms=172800000
此命令将主题 my-topic 的消息保留时间修改为 2 天(单位为毫秒)。
kafka-acls.sh:访问控制列表管理
Kafka 提供了基于 ACL(访问控制列表)的权限管理。kafka-acls.sh 用于管理权限。
为用户创建权限
bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Alice --operation All --topic my-topic
此命令允许用户 Alice 对主题 my-topic 执行所有操作。
删除用户权限
bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --allow-principal User:Alice --operation All --topic my-topic
示例总结
我们通过几个简单的示例介绍了 Kafka 的基本操作:
- 创建主题
my-topic,并通过控制台生产者发送消息。 - 使用控制台消费者从该主题中读取消息。
- 管理消费者组的偏移量,重置到最早的消息。
- 修改主题的保留时间,以及管理用户的权限。
Kafka 提供了丰富的命令行工具,用于主题、消费者组、配置、权限等的管理。灵活使用这些命令,可以帮助我们高效地维护 Kafka 集群。
相关文章:
Kafka 命令详解及使用示例
文章目录 Kafka 命令详解及使用示例Kafka 命令详解kafka-topics.sh:主题管理创建主题创建带副本的主题修改主题分区数了解分区分布列出主题查看主题详情删除主题 kafka-console-producer.sh:消息生产者发送消息到主题带键值对的消息消息生产性能优化带分…...
重生归来之挖掘stm32底层知识(1)——寄存器
概念理解 要使用stm32首先要知道什么是引脚和寄存器。 如下图所示,芯片通过这些金属丝与电路板连接,这些金属丝叫做引脚。一般做软件开发是不需要了解芯片是怎么焊的,只要会使用就行。我们平常通过编程来控制这些引脚的输入和输出,…...
Qt构建JSON及解析JSON
目录 一.JSON简介 JSON对象 JSON数组 二.Qt中JSON介绍 QJsonvalue Qt中JSON对象 Qt中JSON数组 QJsonDocument 三.Qt构建JSON数组 四.解析JSON数组 一.JSON简介 一般来讲C类和对象在java中是无法直接直接使用的,因为压根就不是一个规则。但是他们在内存中…...
合宙Air201模组LuatOS扩展功能:温湿度传感器篇!
通过前面几期的学习,同学们的学习热情越来越高。 合宙Air201模组除了支持3种定位方式外,还具有丰富的扩展功能,比如:通过外扩BTB链接方案,最多可支持21个IO接口:SPI、I2C、UART等多种接口全部支持。 本期…...
主流敏捷工具scrum工具
在当今的快速变化和高需求的业务环境中,敏捷开发已经成为许多企业实现快速迭代和响应市场需求的重要方法。而在众多敏捷工具中,选择适合自己团队的工具尤为重要。 今天,我们将对比几款主流的敏捷工具,供参考 1. Leangoo领歌&…...
探索微服务架构:从理论到实践,深度剖析其优缺点
微服务架构(Microservice Architecture)是一种软件开发架构形式,它的核心 思想是将大型应用程序拆分成一组小的服务,每个服务都运行在其独立的进程中,并且 服务与服务之间通过轻量级的通信机制(如HTTP REST…...
2024 年最佳 Chrome 验证码扩展,解决 reCAPTCHA 问题
验证码,特别是 reCAPTCHA,已成为在线安全的不可或缺的一部分。虽然它们在区分人类和机器人方面起着至关重要的作用,但它们也可能成为合法用户和从事网络自动化的企业的主要障碍。无论您是试图简化在线体验的个人,还是依赖自动化工…...
Go语言现代web开发defer 延迟执行
The defer statement will delay the execution of a function until the surrounding function is completed. Although execution is postponed, funciton arguments will be evaluated immediately. defer语句将延迟函数的执行,直到周围的函数完成。虽然执行被延…...
Vue路由二(嵌套多级路由、路由query传参、路由命名、路由params传参、props配置、<router-link>的replace属性)
目录 1. 嵌套(多级)路由2. 路由query传参3. 路由命名4. 路由params传参5. props配置6. <router-link>的replace属性 1. 嵌套(多级)路由 pages/Car.vue <template><ul><li>car1</li><li>car2</li><li>car3</li></ul…...
【RabbitMQ】可靠性传输
概述 作为消息中间件来说,最重要的任务就是收发消息。因此我们在收发消息的过程中,就要考虑消息是否会丢失的问题。结果是必然的,假设我们没有采取任何措施,那么消息一定会丢失。对于一些不那么重要的业务来说,消息丢失…...
【论文阅读】PERCEIVER-ACTOR: A Multi-Task Transformer for Robotic Manipulation
Abstract transformers凭借其对大型数据集的扩展能力,彻底改变了视觉和自然语言处理。但在机器人操作中,数据既有限又昂贵。通过正确的问题表述,操纵仍然可以从变形金刚中受益吗?我们使用peract来研究这个问题,peract…...
Linux 常用指令
Linux 常用指令 这是本人在备战 CSP 初赛做 Linux 指令题时,心血来潮整理的,希望对大家有帮助。如有错误或有补充,麻烦私信或评论指出。 表格按字母顺序排列 命令作用alias对命令重命名cal显示日历的指令cat查看文本文件的内容cd改变当前工…...
使用 PHPstudy 建立ThinkPHP8 本地集成环境
安装Composer 下载地址:https://getcomposer.org/Composer-Setup.exehttps://getcomposer.org/Composer-Setup.exe 打开PHPstudy创建网站: cmd终端进入PHPstudy www根目录下: 执行代码:cd phpstudy www 根目录地址 cd C:\phpst…...
【系统架构设计】软件的知识产权保护+标准化概论+应用数学+云计算
【系统架构设计】软件的知识产权保护标准化概论应用数学云计算 软件的知识产权保护标准化概论应用数学云计算 软件的知识产权保护 在该部分内容中,以下几点需要注意: 如果作品是委托创作的,著作权的归属应通过委托人和受托人之间的合同来确…...
解决使用阿里云DataV Geo在线地图路径访问403问题
文章目录 1. DataV Geo在线地图路径访问403问题2. 解决方法3. 重启生效 1. DataV Geo在线地图路径访问403问题 最近在写一个省市下钻的demo,用到的是 阿里云DataV Geo在线地图 去动态获取GeoJSON 省市的数据,如下代码 axios.get("https://geo.dat…...
linux 使用SSH密钥配置免密登录
需求:多台主机SSH免密登录,需要使用同一个密钥对 操作: 在Linux中,使用SSH密钥对来在多台主机之间配置免密登录。以下是配置步骤: 在你的本地机器上生成一个SSH密钥对。如果你已经有一个,你可以跳过这一…...
python教程(二):python数据结构大全(附代码)
Python 中数据结构的重要性不言而喻,它们是构建高效、可维护代码的基础。数据结构决定了如何存储、组织和操作数据。理解和使用合适的数据结构能够极大地提升程序的性能、简洁性以及代码的可读性。 Python 的基础数据结构有 4 种,分别是 列表 (list)、元…...
MySQL基于GTID同步模式搭建主从复制
系列文章目录 rpmbuild构建mysql5.7.42版本的rpm包 文章目录 系列文章目录一、mysql-5.7.42RPM包构建二、同步模式分类介绍1.异步同步模式2.半同步模式2.1.实现半同步操作流程2.2.半同步问题总结2.3.半同步一致性2.4.异步与半同步对比 3.GTID同步 三、GTID同步介绍1.gtid介绍2…...
RecyclerView的子项长按选择功能
在Android开发中,实现RecyclerView的子项长按选择功能通常涉及到几个关键步骤:设置RecyclerView的ItemTouchListener来监听长按事件,管理选中状态,以及更新UI以反映选中状态。以下是一个基本的实现步骤和示例代码。 1. 定义数据模…...
mongoDB-1
文章目录 一、疑似坑1.11.2 mongo ops manager1.3 mongo features视图固定大小集合(有点类似ringbuffer数据结构,capped collections)(聚簇集合)clustered collection(类比到Mysql的聚簇索引)聚合管道 aggregation pipelineWiredTiger (默认存…...
SciencePlots——绘制论文中的图片
文章目录 安装一、风格二、1 资源 安装 # 安装最新版 pip install githttps://github.com/garrettj403/SciencePlots.git# 安装稳定版 pip install SciencePlots一、风格 简单好用的深度学习论文绘图专用工具包–Science Plot 二、 1 资源 论文绘图神器来了:一行…...
unix/linux,sudo,其发展历程详细时间线、由来、历史背景
sudo 的诞生和演化,本身就是一部 Unix/Linux 系统管理哲学变迁的微缩史。来,让我们拨开时间的迷雾,一同探寻 sudo 那波澜壮阔(也颇为实用主义)的发展历程。 历史背景:su的时代与困境 ( 20 世纪 70 年代 - 80 年代初) 在 sudo 出现之前,Unix 系统管理员和需要特权操作的…...
JDK 17 新特性
#JDK 17 新特性 /**************** 文本块 *****************/ python/scala中早就支持,不稀奇 String json “”" { “name”: “Java”, “version”: 17 } “”"; /**************** Switch 语句 -> 表达式 *****************/ 挺好的ÿ…...
在WSL2的Ubuntu镜像中安装Docker
Docker官网链接: https://docs.docker.com/engine/install/ubuntu/ 1、运行以下命令卸载所有冲突的软件包: for pkg in docker.io docker-doc docker-compose docker-compose-v2 podman-docker containerd runc; do sudo apt-get remove $pkg; done2、设置Docker…...
Spring AI与Spring Modulith核心技术解析
Spring AI核心架构解析 Spring AI(https://spring.io/projects/spring-ai)作为Spring生态中的AI集成框架,其核心设计理念是通过模块化架构降低AI应用的开发复杂度。与Python生态中的LangChain/LlamaIndex等工具类似,但特别为多语…...
【Oracle】分区表
个人主页:Guiat 归属专栏:Oracle 文章目录 1. 分区表基础概述1.1 分区表的概念与优势1.2 分区类型概览1.3 分区表的工作原理 2. 范围分区 (RANGE Partitioning)2.1 基础范围分区2.1.1 按日期范围分区2.1.2 按数值范围分区 2.2 间隔分区 (INTERVAL Partit…...
html-<abbr> 缩写或首字母缩略词
定义与作用 <abbr> 标签用于表示缩写或首字母缩略词,它可以帮助用户更好地理解缩写的含义,尤其是对于那些不熟悉该缩写的用户。 title 属性的内容提供了缩写的详细说明。当用户将鼠标悬停在缩写上时,会显示一个提示框。 示例&#x…...
【Go语言基础【13】】函数、闭包、方法
文章目录 零、概述一、函数基础1、函数基础概念2、参数传递机制3、返回值特性3.1. 多返回值3.2. 命名返回值3.3. 错误处理 二、函数类型与高阶函数1. 函数类型定义2. 高阶函数(函数作为参数、返回值) 三、匿名函数与闭包1. 匿名函数(Lambda函…...
安全突围:重塑内生安全体系:齐向东在2025年BCS大会的演讲
文章目录 前言第一部分:体系力量是突围之钥第一重困境是体系思想落地不畅。第二重困境是大小体系融合瓶颈。第三重困境是“小体系”运营梗阻。 第二部分:体系矛盾是突围之障一是数据孤岛的障碍。二是投入不足的障碍。三是新旧兼容难的障碍。 第三部分&am…...
2025年渗透测试面试题总结-腾讯[实习]科恩实验室-安全工程师(题目+回答)
安全领域各种资源,学习文档,以及工具分享、前沿信息分享、POC、EXP分享。不定期分享各种好玩的项目及好用的工具,欢迎关注。 目录 腾讯[实习]科恩实验室-安全工程师 一、网络与协议 1. TCP三次握手 2. SYN扫描原理 3. HTTPS证书机制 二…...
