当前位置: 首页 > news >正文

Kafka 命令详解及使用示例

文章目录

  • Kafka 命令详解及使用示例
  • Kafka 命令详解
    • `kafka-topics.sh`:主题管理
      • 创建主题
      • 创建带副本的主题
      • 修改主题分区数
      • 了解分区分布
      • 列出主题
      • 查看主题详情
      • 删除主题
    • `kafka-console-producer.sh`:消息生产者
      • 发送消息到主题
      • 带键值对的消息
      • 消息生产性能优化
      • 带分区键的消息发送
    • `kafka-console-consumer.sh`:消息消费者
      • 消费主题中的消息
      • 只读取键值对消息
      • 实时消费消息
      • 只消费特定分区的消息
      • 以 JSON 格式输出消息
    • `kafka-consumer-groups.sh`:消费者组管理
      • 查看消费者组信息
      • 查看消费者组的偏移量信息
      • 重置消费者组的偏移量
    • `kafka-configs.sh`:配置管理
      • 查看主题配置
      • 修改主题配置
    • `kafka-acls.sh`:访问控制列表管理
      • 为用户创建权限
      • 删除用户权限
  • 示例总结


Kafka 命令详解及使用示例

Kafka 是一个分布式流处理平台,提供了高吞吐量、低延迟的消息系统。Kafka 主要用于消息发布-订阅模式中的消息传输,广泛应用于数据管道、日志系统、事件追踪等场景。本文将介绍 Kafka 中常用的命令行工具及其具体使用方式,帮助开发者更好地管理和使用 Kafka。


Kafka 命令详解

kafka-topics.sh:主题管理

主题(Topic)是 Kafka 中消息的逻辑分类,所有消息都发送到指定的主题中。kafka-topics.sh 用于管理主题,包括创建、删除、列出主题等操作。

创建主题

bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
  • --topic:主题名称。
  • --partitions:分区数,消息将分布在多个分区中。
  • --replication-factor:副本因子,用于消息的高可用性。

创建带副本的主题

在分布式环境中,副本对于 Kafka 来说至关重要,它能确保在 Broker 故障时,消息不会丢失。创建主题时设置合适的副本数和分区数非常关键。

bin/kafka-topics.sh --create --topic important-topic --partitions 5 --replication-factor 3 --bootstrap-server localhost:9092
  • --partitions 设置为 5,意味着主题的数据会被分散到 5 个分区中,提升并发处理能力。
  • --replication-factor 设置为 3,确保每个分区有 3 个副本(在不同的 Broker 上),提高容错性。

注意:副本数不能超过集群中的 Broker 数量,生产环境中一般设置副本数为 3,保证高可用性。

修改主题分区数

Kafka 支持在线扩展主题的分区数。可以在不停止服务的情况下动态增加分区数,但要注意增加分区会影响数据的顺序性,因为 Kafka 不会自动对已存在的数据进行重分配。

bin/kafka-topics.sh --alter --topic important-topic --partitions 10 --bootstrap-server localhost:9092

此命令将 important-topic 的分区数从 5 扩展到 10 个。

了解分区分布

通过 --describe 命令,可以查看每个分区在哪些 Broker 上存储,并了解它们的副本状态。

bin/kafka-topics.sh --describe --topic important-topic --bootstrap-server localhost:9092

输出的结果会显示每个分区的副本和首领(Leader)在哪个 Broker 上。Leader 是处理读写请求的副本,其他副本是跟随者,用于容错。

列出主题

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

查看主题详情

bin/kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092

删除主题

bin/kafka-topics.sh --delete --topic my-topic --bootstrap-server localhost:9092

kafka-console-producer.sh:消息生产者

Kafka 提供了一个控制台生产者工具,允许我们从命令行发送消息到指定主题。

发送消息到主题

bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092

输入消息后按 Enter 发送到 Kafka 主题。

带键值对的消息

bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092 --property "parse.key=true" --property "key.separator=:"

在这里,消息的键和值通过冒号分隔,例如:

key1:value1
key2:value2

消息生产性能优化

在高吞吐量场景下,可以通过调整生产者配置来提高性能。例如,批量发送消息和异步生产可以显著提高效率。

bin/kafka-console-producer.sh --topic fast-topic --bootstrap-server localhost:9092 --producer-property batch.size=16384 --producer-property linger.ms=5
  • batch.size:控制批量消息的大小(以字节为单位),Kafka 会尝试将消息累积到这个大小后一起发送。
  • linger.ms:在批量消息发送前的等待时间,可以通过稍微延迟发送消息来增加批量的大小。

此外,生产者可以配置为异步发送,这样可以减少网络等待时间:

--producer-property acks=1
  • acks=1 表示只等待 Leader 确认即可继续发送消息,这种方式可以提高性能,但有可能在 Leader 故障时丢失部分消息。

带分区键的消息发送

指定消息发送到特定的分区时,可以使用 key 参数,这在有状态的消息处理(如事务处理)场景中非常重要。

bin/kafka-console-producer.sh --topic partitioned-topic --bootstrap-server localhost:9092 --property "parse.key=true" --property "key.separator=:"

这样每个消息都会根据键(key)被分配到相同的分区。例如,key1:message1key1:message2 会发送到相同的分区。


kafka-console-consumer.sh:消息消费者

消费者用于从 Kafka 主题中读取消息。kafka-console-consumer.sh 是 Kafka 提供的命令行消费者工具。

消费主题中的消息

bin/kafka-console-consumer.sh --topic my-topic --from-beginning --bootstrap-server localhost:9092
  • --from-beginning:从主题的起始位置读取所有消息。

只读取键值对消息

bin/kafka-console-consumer.sh --topic my-topic --bootstrap-server localhost:9092 --property print.key=true --property key.separator=,

这样读取的消息会显示为键和值的格式,例如:

key1,value1

实时消费消息

使用 kafka-console-consumer.sh 来实时消费主题中的消息:

bin/kafka-console-consumer.sh --topic fast-topic --bootstrap-server localhost:9092

如果要从主题的起始位置读取消息,可以添加 --from-beginning 参数。

只消费特定分区的消息

Kafka 支持直接从某个分区中读取消息。在某些场景下(如故障恢复或日志分析),我们可能只需要处理某个分区的数据:

bin/kafka-console-consumer.sh --topic important-topic --bootstrap-server localhost:9092 --partition 0 --offset 10

此命令从分区 0 开始读取第 10 条消息。

以 JSON 格式输出消息

Kafka 消费者可以输出 JSON 格式的消息,方便后续处理和分析:

bin/kafka-console-consumer.sh --topic json-topic --bootstrap-server localhost:9092 --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.separator=, --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

kafka-consumer-groups.sh:消费者组管理

Kafka 的消费者组允许多个消费者一起协同消费消息,每个分区的消息只能被一个组内的消费者消费。kafka-consumer-groups.sh 可以用于管理消费者组。

查看消费者组信息

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

查看消费者组的偏移量信息

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

输出信息包括每个分区的已消费消息偏移量以及消费者的状态。

重置消费者组的偏移量

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --reset-offsets --to-earliest --execute --topic my-topic
  • --to-earliest:将偏移量重置为最早的消息。

kafka-configs.sh:配置管理

Kafka 主题和代理的配置可以通过 kafka-configs.sh 进行管理。

查看主题配置

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --describe

修改主题配置

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --alter --add-config retention.ms=172800000

此命令将主题 my-topic 的消息保留时间修改为 2 天(单位为毫秒)。


kafka-acls.sh:访问控制列表管理

Kafka 提供了基于 ACL(访问控制列表)的权限管理。kafka-acls.sh 用于管理权限。

为用户创建权限

bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Alice --operation All --topic my-topic

此命令允许用户 Alice 对主题 my-topic 执行所有操作。

删除用户权限

bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --allow-principal User:Alice --operation All --topic my-topic

示例总结

我们通过几个简单的示例介绍了 Kafka 的基本操作:

  1. 创建主题 my-topic,并通过控制台生产者发送消息。
  2. 使用控制台消费者从该主题中读取消息。
  3. 管理消费者组的偏移量,重置到最早的消息。
  4. 修改主题的保留时间,以及管理用户的权限。

Kafka 提供了丰富的命令行工具,用于主题、消费者组、配置、权限等的管理。灵活使用这些命令,可以帮助我们高效地维护 Kafka 集群。

相关文章:

Kafka 命令详解及使用示例

文章目录 Kafka 命令详解及使用示例Kafka 命令详解kafka-topics.sh:主题管理创建主题创建带副本的主题修改主题分区数了解分区分布列出主题查看主题详情删除主题 kafka-console-producer.sh:消息生产者发送消息到主题带键值对的消息消息生产性能优化带分…...

重生归来之挖掘stm32底层知识(1)——寄存器

概念理解 要使用stm32首先要知道什么是引脚和寄存器。 如下图所示,芯片通过这些金属丝与电路板连接,这些金属丝叫做引脚。一般做软件开发是不需要了解芯片是怎么焊的,只要会使用就行。我们平常通过编程来控制这些引脚的输入和输出&#xff0c…...

Qt构建JSON及解析JSON

目录 一.JSON简介 JSON对象 JSON数组 二.Qt中JSON介绍 QJsonvalue Qt中JSON对象 Qt中JSON数组 QJsonDocument 三.Qt构建JSON数组 四.解析JSON数组 一.JSON简介 一般来讲C类和对象在java中是无法直接直接使用的,因为压根就不是一个规则。但是他们在内存中…...

合宙Air201模组LuatOS扩展功能:温湿度传感器篇!

通过前面几期的学习,同学们的学习热情越来越高。 合宙Air201模组除了支持3种定位方式外,还具有丰富的扩展功能,比如:通过外扩BTB链接方案,最多可支持21个IO接口:SPI、I2C、UART等多种接口全部支持。 本期…...

主流敏捷工具scrum工具

在当今的快速变化和高需求的业务环境中,敏捷开发已经成为许多企业实现快速迭代和响应市场需求的重要方法。而在众多敏捷工具中,选择适合自己团队的工具尤为重要。 今天,我们将对比几款主流的敏捷工具,供参考 1. Leangoo领歌&…...

探索微服务架构:从理论到实践,深度剖析其优缺点

微服务架构(Microservice Architecture)是一种软件开发架构形式,它的核心 思想是将大型应用程序拆分成一组小的服务,每个服务都运行在其独立的进程中,并且 服务与服务之间通过轻量级的通信机制(如HTTP REST…...

2024 年最佳 Chrome 验证码扩展,解决 reCAPTCHA 问题

验证码,特别是 reCAPTCHA,已成为在线安全的不可或缺的一部分。虽然它们在区分人类和机器人方面起着至关重要的作用,但它们也可能成为合法用户和从事网络自动化的企业的主要障碍。无论您是试图简化在线体验的个人,还是依赖自动化工…...

Go语言现代web开发defer 延迟执行

The defer statement will delay the execution of a function until the surrounding function is completed. Although execution is postponed, funciton arguments will be evaluated immediately. defer语句将延迟函数的执行,直到周围的函数完成。虽然执行被延…...

Vue路由二(嵌套多级路由、路由query传参、路由命名、路由params传参、props配置、<router-link>的replace属性)

目录 1. 嵌套(多级)路由2. 路由query传参3. 路由命名4. 路由params传参5. props配置6. <router-link>的replace属性 1. 嵌套(多级)路由 pages/Car.vue <template><ul><li>car1</li><li>car2</li><li>car3</li></ul…...

【RabbitMQ】可靠性传输

概述 作为消息中间件来说&#xff0c;最重要的任务就是收发消息。因此我们在收发消息的过程中&#xff0c;就要考虑消息是否会丢失的问题。结果是必然的&#xff0c;假设我们没有采取任何措施&#xff0c;那么消息一定会丢失。对于一些不那么重要的业务来说&#xff0c;消息丢失…...

【论文阅读】PERCEIVER-ACTOR: A Multi-Task Transformer for Robotic Manipulation

Abstract transformers凭借其对大型数据集的扩展能力&#xff0c;彻底改变了视觉和自然语言处理。但在机器人操作中&#xff0c;数据既有限又昂贵。通过正确的问题表述&#xff0c;操纵仍然可以从变形金刚中受益吗&#xff1f;我们使用peract来研究这个问题&#xff0c;peract…...

Linux 常用指令

Linux 常用指令 这是本人在备战 CSP 初赛做 Linux 指令题时&#xff0c;心血来潮整理的&#xff0c;希望对大家有帮助。如有错误或有补充&#xff0c;麻烦私信或评论指出。 表格按字母顺序排列 命令作用alias对命令重命名cal显示日历的指令cat查看文本文件的内容cd改变当前工…...

使用 PHPstudy 建立ThinkPHP8 本地集成环境

安装Composer 下载地址&#xff1a;https://getcomposer.org/Composer-Setup.exehttps://getcomposer.org/Composer-Setup.exe 打开PHPstudy创建网站&#xff1a; cmd终端进入PHPstudy www根目录下&#xff1a; 执行代码&#xff1a;cd phpstudy www 根目录地址 cd C:\phpst…...

【系统架构设计】软件的知识产权保护+标准化概论+应用数学+云计算

【系统架构设计】软件的知识产权保护标准化概论应用数学云计算 软件的知识产权保护标准化概论应用数学云计算 软件的知识产权保护 在该部分内容中&#xff0c;以下几点需要注意&#xff1a; 如果作品是委托创作的&#xff0c;著作权的归属应通过委托人和受托人之间的合同来确…...

解决使用阿里云DataV Geo在线地图路径访问403问题

文章目录 1. DataV Geo在线地图路径访问403问题2. 解决方法3. 重启生效 1. DataV Geo在线地图路径访问403问题 最近在写一个省市下钻的demo&#xff0c;用到的是 阿里云DataV Geo在线地图 去动态获取GeoJSON 省市的数据&#xff0c;如下代码 axios.get("https://geo.dat…...

linux 使用SSH密钥配置免密登录

需求&#xff1a;多台主机SSH免密登录&#xff0c;需要使用同一个密钥对 操作&#xff1a; 在Linux中&#xff0c;使用SSH密钥对来在多台主机之间配置免密登录。以下是配置步骤&#xff1a; 在你的本地机器上生成一个SSH密钥对。如果你已经有一个&#xff0c;你可以跳过这一…...

python教程(二):python数据结构大全(附代码)

Python 中数据结构的重要性不言而喻&#xff0c;它们是构建高效、可维护代码的基础。数据结构决定了如何存储、组织和操作数据。理解和使用合适的数据结构能够极大地提升程序的性能、简洁性以及代码的可读性。 Python 的基础数据结构有 4 种&#xff0c;分别是 列表 (list)、元…...

MySQL基于GTID同步模式搭建主从复制

系列文章目录 rpmbuild构建mysql5.7.42版本的rpm包 文章目录 系列文章目录一、mysql-5.7.42RPM包构建二、同步模式分类介绍1.异步同步模式2.半同步模式2.1.实现半同步操作流程2.2.半同步问题总结2.3.半同步一致性2.4.异步与半同步对比 3.GTID同步 三、GTID同步介绍1.gtid介绍2…...

RecyclerView的子项长按选择功能

在Android开发中&#xff0c;实现RecyclerView的子项长按选择功能通常涉及到几个关键步骤&#xff1a;设置RecyclerView的ItemTouchListener来监听长按事件&#xff0c;管理选中状态&#xff0c;以及更新UI以反映选中状态。以下是一个基本的实现步骤和示例代码。 1. 定义数据模…...

mongoDB-1

文章目录 一、疑似坑1.11.2 mongo ops manager1.3 mongo features视图固定大小集合&#xff08;有点类似ringbuffer数据结构&#xff0c;capped collections&#xff09;(聚簇集合)clustered collection(类比到Mysql的聚簇索引)聚合管道 aggregation pipelineWiredTiger (默认存…...

iPhone密码忘记了办?iPhoneUnlocker,iPhone解锁工具Aiseesoft iPhone Unlocker 高级注册版​分享

平时用 iPhone 的时候&#xff0c;难免会碰到解锁的麻烦事。比如密码忘了、人脸识别 / 指纹识别突然不灵&#xff0c;或者买了二手 iPhone 却被原来的 iCloud 账号锁住&#xff0c;这时候就需要靠谱的解锁工具来帮忙了。Aiseesoft iPhone Unlocker 就是专门解决这些问题的软件&…...

DAY 47

三、通道注意力 3.1 通道注意力的定义 # 新增&#xff1a;通道注意力模块&#xff08;SE模块&#xff09; class ChannelAttention(nn.Module):"""通道注意力模块(Squeeze-and-Excitation)"""def __init__(self, in_channels, reduction_rat…...

五年级数学知识边界总结思考-下册

目录 一、背景二、过程1.观察物体小学五年级下册“观察物体”知识点详解&#xff1a;由来、作用与意义**一、知识点核心内容****二、知识点的由来&#xff1a;从生活实践到数学抽象****三、知识的作用&#xff1a;解决实际问题的工具****四、学习的意义&#xff1a;培养核心素养…...

Psychopy音频的使用

Psychopy音频的使用 本文主要解决以下问题&#xff1a; 指定音频引擎与设备&#xff1b;播放音频文件 本文所使用的环境&#xff1a; Python3.10 numpy2.2.6 psychopy2025.1.1 psychtoolbox3.0.19.14 一、音频配置 Psychopy文档链接为Sound - for audio playback — Psy…...

【Web 进阶篇】优雅的接口设计:统一响应、全局异常处理与参数校验

系列回顾&#xff1a; 在上一篇中&#xff0c;我们成功地为应用集成了数据库&#xff0c;并使用 Spring Data JPA 实现了基本的 CRUD API。我们的应用现在能“记忆”数据了&#xff01;但是&#xff0c;如果你仔细审视那些 API&#xff0c;会发现它们还很“粗糙”&#xff1a;有…...

css的定位(position)详解:相对定位 绝对定位 固定定位

在 CSS 中&#xff0c;元素的定位通过 position 属性控制&#xff0c;共有 5 种定位模式&#xff1a;static&#xff08;静态定位&#xff09;、relative&#xff08;相对定位&#xff09;、absolute&#xff08;绝对定位&#xff09;、fixed&#xff08;固定定位&#xff09;和…...

蓝桥杯3498 01串的熵

问题描述 对于一个长度为 23333333的 01 串, 如果其信息熵为 11625907.5798&#xff0c; 且 0 出现次数比 1 少, 那么这个 01 串中 0 出现了多少次? #include<iostream> #include<cmath> using namespace std;int n 23333333;int main() {//枚举 0 出现的次数//因…...

项目部署到Linux上时遇到的错误(Redis,MySQL,无法正确连接,地址占用问题)

Redis无法正确连接 在运行jar包时出现了这样的错误 查询得知问题核心在于Redis连接失败&#xff0c;具体原因是客户端发送了密码认证请求&#xff0c;但Redis服务器未设置密码 1.为Redis设置密码&#xff08;匹配客户端配置&#xff09; 步骤&#xff1a; 1&#xff09;.修…...

Maven 概述、安装、配置、仓库、私服详解

目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...

HashMap中的put方法执行流程(流程图)

1 put操作整体流程 HashMap 的 put 操作是其最核心的功能之一。在 JDK 1.8 及以后版本中&#xff0c;其主要逻辑封装在 putVal 这个内部方法中。整个过程大致如下&#xff1a; 初始判断与哈希计算&#xff1a; 首先&#xff0c;putVal 方法会检查当前的 table&#xff08;也就…...