当前位置: 首页 > news >正文

Kafka 命令详解及使用示例

文章目录

  • Kafka 命令详解及使用示例
  • Kafka 命令详解
    • `kafka-topics.sh`:主题管理
      • 创建主题
      • 创建带副本的主题
      • 修改主题分区数
      • 了解分区分布
      • 列出主题
      • 查看主题详情
      • 删除主题
    • `kafka-console-producer.sh`:消息生产者
      • 发送消息到主题
      • 带键值对的消息
      • 消息生产性能优化
      • 带分区键的消息发送
    • `kafka-console-consumer.sh`:消息消费者
      • 消费主题中的消息
      • 只读取键值对消息
      • 实时消费消息
      • 只消费特定分区的消息
      • 以 JSON 格式输出消息
    • `kafka-consumer-groups.sh`:消费者组管理
      • 查看消费者组信息
      • 查看消费者组的偏移量信息
      • 重置消费者组的偏移量
    • `kafka-configs.sh`:配置管理
      • 查看主题配置
      • 修改主题配置
    • `kafka-acls.sh`:访问控制列表管理
      • 为用户创建权限
      • 删除用户权限
  • 示例总结


Kafka 命令详解及使用示例

Kafka 是一个分布式流处理平台,提供了高吞吐量、低延迟的消息系统。Kafka 主要用于消息发布-订阅模式中的消息传输,广泛应用于数据管道、日志系统、事件追踪等场景。本文将介绍 Kafka 中常用的命令行工具及其具体使用方式,帮助开发者更好地管理和使用 Kafka。


Kafka 命令详解

kafka-topics.sh:主题管理

主题(Topic)是 Kafka 中消息的逻辑分类,所有消息都发送到指定的主题中。kafka-topics.sh 用于管理主题,包括创建、删除、列出主题等操作。

创建主题

bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
  • --topic:主题名称。
  • --partitions:分区数,消息将分布在多个分区中。
  • --replication-factor:副本因子,用于消息的高可用性。

创建带副本的主题

在分布式环境中,副本对于 Kafka 来说至关重要,它能确保在 Broker 故障时,消息不会丢失。创建主题时设置合适的副本数和分区数非常关键。

bin/kafka-topics.sh --create --topic important-topic --partitions 5 --replication-factor 3 --bootstrap-server localhost:9092
  • --partitions 设置为 5,意味着主题的数据会被分散到 5 个分区中,提升并发处理能力。
  • --replication-factor 设置为 3,确保每个分区有 3 个副本(在不同的 Broker 上),提高容错性。

注意:副本数不能超过集群中的 Broker 数量,生产环境中一般设置副本数为 3,保证高可用性。

修改主题分区数

Kafka 支持在线扩展主题的分区数。可以在不停止服务的情况下动态增加分区数,但要注意增加分区会影响数据的顺序性,因为 Kafka 不会自动对已存在的数据进行重分配。

bin/kafka-topics.sh --alter --topic important-topic --partitions 10 --bootstrap-server localhost:9092

此命令将 important-topic 的分区数从 5 扩展到 10 个。

了解分区分布

通过 --describe 命令,可以查看每个分区在哪些 Broker 上存储,并了解它们的副本状态。

bin/kafka-topics.sh --describe --topic important-topic --bootstrap-server localhost:9092

输出的结果会显示每个分区的副本和首领(Leader)在哪个 Broker 上。Leader 是处理读写请求的副本,其他副本是跟随者,用于容错。

列出主题

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

查看主题详情

bin/kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092

删除主题

bin/kafka-topics.sh --delete --topic my-topic --bootstrap-server localhost:9092

kafka-console-producer.sh:消息生产者

Kafka 提供了一个控制台生产者工具,允许我们从命令行发送消息到指定主题。

发送消息到主题

bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092

输入消息后按 Enter 发送到 Kafka 主题。

带键值对的消息

bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092 --property "parse.key=true" --property "key.separator=:"

在这里,消息的键和值通过冒号分隔,例如:

key1:value1
key2:value2

消息生产性能优化

在高吞吐量场景下,可以通过调整生产者配置来提高性能。例如,批量发送消息和异步生产可以显著提高效率。

bin/kafka-console-producer.sh --topic fast-topic --bootstrap-server localhost:9092 --producer-property batch.size=16384 --producer-property linger.ms=5
  • batch.size:控制批量消息的大小(以字节为单位),Kafka 会尝试将消息累积到这个大小后一起发送。
  • linger.ms:在批量消息发送前的等待时间,可以通过稍微延迟发送消息来增加批量的大小。

此外,生产者可以配置为异步发送,这样可以减少网络等待时间:

--producer-property acks=1
  • acks=1 表示只等待 Leader 确认即可继续发送消息,这种方式可以提高性能,但有可能在 Leader 故障时丢失部分消息。

带分区键的消息发送

指定消息发送到特定的分区时,可以使用 key 参数,这在有状态的消息处理(如事务处理)场景中非常重要。

bin/kafka-console-producer.sh --topic partitioned-topic --bootstrap-server localhost:9092 --property "parse.key=true" --property "key.separator=:"

这样每个消息都会根据键(key)被分配到相同的分区。例如,key1:message1key1:message2 会发送到相同的分区。


kafka-console-consumer.sh:消息消费者

消费者用于从 Kafka 主题中读取消息。kafka-console-consumer.sh 是 Kafka 提供的命令行消费者工具。

消费主题中的消息

bin/kafka-console-consumer.sh --topic my-topic --from-beginning --bootstrap-server localhost:9092
  • --from-beginning:从主题的起始位置读取所有消息。

只读取键值对消息

bin/kafka-console-consumer.sh --topic my-topic --bootstrap-server localhost:9092 --property print.key=true --property key.separator=,

这样读取的消息会显示为键和值的格式,例如:

key1,value1

实时消费消息

使用 kafka-console-consumer.sh 来实时消费主题中的消息:

bin/kafka-console-consumer.sh --topic fast-topic --bootstrap-server localhost:9092

如果要从主题的起始位置读取消息,可以添加 --from-beginning 参数。

只消费特定分区的消息

Kafka 支持直接从某个分区中读取消息。在某些场景下(如故障恢复或日志分析),我们可能只需要处理某个分区的数据:

bin/kafka-console-consumer.sh --topic important-topic --bootstrap-server localhost:9092 --partition 0 --offset 10

此命令从分区 0 开始读取第 10 条消息。

以 JSON 格式输出消息

Kafka 消费者可以输出 JSON 格式的消息,方便后续处理和分析:

bin/kafka-console-consumer.sh --topic json-topic --bootstrap-server localhost:9092 --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.separator=, --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

kafka-consumer-groups.sh:消费者组管理

Kafka 的消费者组允许多个消费者一起协同消费消息,每个分区的消息只能被一个组内的消费者消费。kafka-consumer-groups.sh 可以用于管理消费者组。

查看消费者组信息

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

查看消费者组的偏移量信息

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

输出信息包括每个分区的已消费消息偏移量以及消费者的状态。

重置消费者组的偏移量

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --reset-offsets --to-earliest --execute --topic my-topic
  • --to-earliest:将偏移量重置为最早的消息。

kafka-configs.sh:配置管理

Kafka 主题和代理的配置可以通过 kafka-configs.sh 进行管理。

查看主题配置

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --describe

修改主题配置

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --alter --add-config retention.ms=172800000

此命令将主题 my-topic 的消息保留时间修改为 2 天(单位为毫秒)。


kafka-acls.sh:访问控制列表管理

Kafka 提供了基于 ACL(访问控制列表)的权限管理。kafka-acls.sh 用于管理权限。

为用户创建权限

bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Alice --operation All --topic my-topic

此命令允许用户 Alice 对主题 my-topic 执行所有操作。

删除用户权限

bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --allow-principal User:Alice --operation All --topic my-topic

示例总结

我们通过几个简单的示例介绍了 Kafka 的基本操作:

  1. 创建主题 my-topic,并通过控制台生产者发送消息。
  2. 使用控制台消费者从该主题中读取消息。
  3. 管理消费者组的偏移量,重置到最早的消息。
  4. 修改主题的保留时间,以及管理用户的权限。

Kafka 提供了丰富的命令行工具,用于主题、消费者组、配置、权限等的管理。灵活使用这些命令,可以帮助我们高效地维护 Kafka 集群。

相关文章:

Kafka 命令详解及使用示例

文章目录 Kafka 命令详解及使用示例Kafka 命令详解kafka-topics.sh:主题管理创建主题创建带副本的主题修改主题分区数了解分区分布列出主题查看主题详情删除主题 kafka-console-producer.sh:消息生产者发送消息到主题带键值对的消息消息生产性能优化带分…...

重生归来之挖掘stm32底层知识(1)——寄存器

概念理解 要使用stm32首先要知道什么是引脚和寄存器。 如下图所示,芯片通过这些金属丝与电路板连接,这些金属丝叫做引脚。一般做软件开发是不需要了解芯片是怎么焊的,只要会使用就行。我们平常通过编程来控制这些引脚的输入和输出&#xff0c…...

Qt构建JSON及解析JSON

目录 一.JSON简介 JSON对象 JSON数组 二.Qt中JSON介绍 QJsonvalue Qt中JSON对象 Qt中JSON数组 QJsonDocument 三.Qt构建JSON数组 四.解析JSON数组 一.JSON简介 一般来讲C类和对象在java中是无法直接直接使用的,因为压根就不是一个规则。但是他们在内存中…...

合宙Air201模组LuatOS扩展功能:温湿度传感器篇!

通过前面几期的学习,同学们的学习热情越来越高。 合宙Air201模组除了支持3种定位方式外,还具有丰富的扩展功能,比如:通过外扩BTB链接方案,最多可支持21个IO接口:SPI、I2C、UART等多种接口全部支持。 本期…...

主流敏捷工具scrum工具

在当今的快速变化和高需求的业务环境中,敏捷开发已经成为许多企业实现快速迭代和响应市场需求的重要方法。而在众多敏捷工具中,选择适合自己团队的工具尤为重要。 今天,我们将对比几款主流的敏捷工具,供参考 1. Leangoo领歌&…...

探索微服务架构:从理论到实践,深度剖析其优缺点

微服务架构(Microservice Architecture)是一种软件开发架构形式,它的核心 思想是将大型应用程序拆分成一组小的服务,每个服务都运行在其独立的进程中,并且 服务与服务之间通过轻量级的通信机制(如HTTP REST…...

2024 年最佳 Chrome 验证码扩展,解决 reCAPTCHA 问题

验证码,特别是 reCAPTCHA,已成为在线安全的不可或缺的一部分。虽然它们在区分人类和机器人方面起着至关重要的作用,但它们也可能成为合法用户和从事网络自动化的企业的主要障碍。无论您是试图简化在线体验的个人,还是依赖自动化工…...

Go语言现代web开发defer 延迟执行

The defer statement will delay the execution of a function until the surrounding function is completed. Although execution is postponed, funciton arguments will be evaluated immediately. defer语句将延迟函数的执行,直到周围的函数完成。虽然执行被延…...

Vue路由二(嵌套多级路由、路由query传参、路由命名、路由params传参、props配置、<router-link>的replace属性)

目录 1. 嵌套(多级)路由2. 路由query传参3. 路由命名4. 路由params传参5. props配置6. <router-link>的replace属性 1. 嵌套(多级)路由 pages/Car.vue <template><ul><li>car1</li><li>car2</li><li>car3</li></ul…...

【RabbitMQ】可靠性传输

概述 作为消息中间件来说&#xff0c;最重要的任务就是收发消息。因此我们在收发消息的过程中&#xff0c;就要考虑消息是否会丢失的问题。结果是必然的&#xff0c;假设我们没有采取任何措施&#xff0c;那么消息一定会丢失。对于一些不那么重要的业务来说&#xff0c;消息丢失…...

【论文阅读】PERCEIVER-ACTOR: A Multi-Task Transformer for Robotic Manipulation

Abstract transformers凭借其对大型数据集的扩展能力&#xff0c;彻底改变了视觉和自然语言处理。但在机器人操作中&#xff0c;数据既有限又昂贵。通过正确的问题表述&#xff0c;操纵仍然可以从变形金刚中受益吗&#xff1f;我们使用peract来研究这个问题&#xff0c;peract…...

Linux 常用指令

Linux 常用指令 这是本人在备战 CSP 初赛做 Linux 指令题时&#xff0c;心血来潮整理的&#xff0c;希望对大家有帮助。如有错误或有补充&#xff0c;麻烦私信或评论指出。 表格按字母顺序排列 命令作用alias对命令重命名cal显示日历的指令cat查看文本文件的内容cd改变当前工…...

使用 PHPstudy 建立ThinkPHP8 本地集成环境

安装Composer 下载地址&#xff1a;https://getcomposer.org/Composer-Setup.exehttps://getcomposer.org/Composer-Setup.exe 打开PHPstudy创建网站&#xff1a; cmd终端进入PHPstudy www根目录下&#xff1a; 执行代码&#xff1a;cd phpstudy www 根目录地址 cd C:\phpst…...

【系统架构设计】软件的知识产权保护+标准化概论+应用数学+云计算

【系统架构设计】软件的知识产权保护标准化概论应用数学云计算 软件的知识产权保护标准化概论应用数学云计算 软件的知识产权保护 在该部分内容中&#xff0c;以下几点需要注意&#xff1a; 如果作品是委托创作的&#xff0c;著作权的归属应通过委托人和受托人之间的合同来确…...

解决使用阿里云DataV Geo在线地图路径访问403问题

文章目录 1. DataV Geo在线地图路径访问403问题2. 解决方法3. 重启生效 1. DataV Geo在线地图路径访问403问题 最近在写一个省市下钻的demo&#xff0c;用到的是 阿里云DataV Geo在线地图 去动态获取GeoJSON 省市的数据&#xff0c;如下代码 axios.get("https://geo.dat…...

linux 使用SSH密钥配置免密登录

需求&#xff1a;多台主机SSH免密登录&#xff0c;需要使用同一个密钥对 操作&#xff1a; 在Linux中&#xff0c;使用SSH密钥对来在多台主机之间配置免密登录。以下是配置步骤&#xff1a; 在你的本地机器上生成一个SSH密钥对。如果你已经有一个&#xff0c;你可以跳过这一…...

python教程(二):python数据结构大全(附代码)

Python 中数据结构的重要性不言而喻&#xff0c;它们是构建高效、可维护代码的基础。数据结构决定了如何存储、组织和操作数据。理解和使用合适的数据结构能够极大地提升程序的性能、简洁性以及代码的可读性。 Python 的基础数据结构有 4 种&#xff0c;分别是 列表 (list)、元…...

MySQL基于GTID同步模式搭建主从复制

系列文章目录 rpmbuild构建mysql5.7.42版本的rpm包 文章目录 系列文章目录一、mysql-5.7.42RPM包构建二、同步模式分类介绍1.异步同步模式2.半同步模式2.1.实现半同步操作流程2.2.半同步问题总结2.3.半同步一致性2.4.异步与半同步对比 3.GTID同步 三、GTID同步介绍1.gtid介绍2…...

RecyclerView的子项长按选择功能

在Android开发中&#xff0c;实现RecyclerView的子项长按选择功能通常涉及到几个关键步骤&#xff1a;设置RecyclerView的ItemTouchListener来监听长按事件&#xff0c;管理选中状态&#xff0c;以及更新UI以反映选中状态。以下是一个基本的实现步骤和示例代码。 1. 定义数据模…...

mongoDB-1

文章目录 一、疑似坑1.11.2 mongo ops manager1.3 mongo features视图固定大小集合&#xff08;有点类似ringbuffer数据结构&#xff0c;capped collections&#xff09;(聚簇集合)clustered collection(类比到Mysql的聚簇索引)聚合管道 aggregation pipelineWiredTiger (默认存…...

3个核心优势让研究者实现智能OCR全场景覆盖:Pix2Text开源替代方案详解

3个核心优势让研究者实现智能OCR全场景覆盖&#xff1a;Pix2Text开源替代方案详解 【免费下载链接】Pix2Text Pix In, Latex & Text Out. Recognize Chinese, English Texts, and Math Formulas from Images. 项目地址: https://gitcode.com/gh_mirrors/pi/Pix2Text …...

通义千问3-VL-Reranker-8B保姆级部署教程:5分钟搞定Nginx反向代理与HTTPS配置

通义千问3-VL-Reranker-8B保姆级部署教程&#xff1a;5分钟搞定Nginx反向代理与HTTPS配置 1. 为什么需要反向代理与HTTPS 当你成功在本地运行通义千问3-VL-Reranker-8B服务后&#xff0c;默认只能通过 http://localhost:7860 访问。这种配置存在三个明显问题&#xff1a; 安…...

Cortex-M为何不能运行Linux?解析ARM架构与操作系统的兼容性

1. Cortex-M与Linux的兼容性解析作为一名在嵌入式领域摸爬滚打多年的工程师&#xff0c;我经常被问到这个问题&#xff1a;"为什么我的STM32&#xff08;基于Cortex-M内核&#xff09;不能跑Linux&#xff1f;"要回答这个问题&#xff0c;我们需要从处理器架构和操作…...

软件实施交付转运维学习第三天:Linux系统命令基础(部分)

从实施到运维的蜕变之路&#xff0c;掌握命令就是掌握Linux的灵魂写在前面作为一名从软件实施交付转向运维的工程师&#xff0c;我深刻体会到&#xff1a;Linux命令不仅仅是简单的指令&#xff0c;更是与操作系统对话的语言。当我们站在实施和运维的交界处&#xff0c;掌握Linu…...

中兴光猫配置解密工具:突破运营商限制,掌握家庭网络自主权

中兴光猫配置解密工具&#xff1a;突破运营商限制&#xff0c;掌握家庭网络自主权 【免费下载链接】ZET-Optical-Network-Terminal-Decoder 项目地址: https://gitcode.com/gh_mirrors/ze/ZET-Optical-Network-Terminal-Decoder 在家庭网络管理中&#xff0c;你是否曾因…...

从特效 SDK 到 AI 动效平台:Neon Vibe Motion 的技术演进之路

多媒体中台在 B 站主要负责剪辑、拍摄、直播等业务场景的动效渲染&#xff0c;开发维护的 SDK 在后文统一称为特效 SDK。 传统的视频特效生产一般分三条链路&#xff1a; 三条链路存在一个困境&#xff1a;效果丰富度、实时可交互、生产效率&#xff0c;三者不可兼得。 那么能…...

OpenClaw人人养虾:配置Anthropic (Claude)

Anthropic 是 Claude 系列模型的开发者。Claude 以出色的指令遵循能力、深度推理和长文本处理著称。OpenClaw 支持通过 API Key 或 Claude Code CLI OAuth 接入。 认证方式 方式一&#xff1a;API Key&#xff08;推荐&#xff09; 前往 Anthropic Console 创建 API Key在 O…...

C-index避坑指南:生存分析中90%人会犯的5个评估错误

C-index避坑指南&#xff1a;生存分析中90%人会犯的5个评估错误 在临床研究和生物统计领域&#xff0c;C-index&#xff08;Harrells concordance index&#xff09;作为评估生存分析模型预测性能的核心指标&#xff0c;其正确计算与解读直接影响研究结论的可靠性。然而&#x…...

终极指南:用OpenCore Legacy Patcher让旧Mac焕发新生的5个简单步骤

终极指南&#xff1a;用OpenCore Legacy Patcher让旧Mac焕发新生的5个简单步骤 【免费下载链接】OpenCore-Legacy-Patcher Experience macOS just like before 项目地址: https://gitcode.com/GitHub_Trending/op/OpenCore-Legacy-Patcher 你是否还在为手中的旧款Mac无法…...

AI 开发实战:实验和试点项目怎么记录,才不会做完就散

AI 开发实战&#xff1a;实验和试点项目怎么记录&#xff0c;才不会做完就散 一、这个问题为什么值得专门拿出来做&#xff1f; 在 AI 工程落地里&#xff0c;真正拖慢团队的往往不是模型本身&#xff0c;而是流程和协作方式没有跟上。 围绕“实验和试点项目怎么记录&#xff0…...