kafka操作命令详解
目录
1、集群运维命令
1.1、集群启停命令
1.3、集群迁移命令
1.4、权限管理命令
1.4.1、权限参数介绍
1.4.2、增加权限命令
1.4.3、移出权限命令
1.4.4、查看所有topic权限命令
1.4.5、查看某个topic权限命令
2、生产者命令
2.1、创建topic命令
2.2、删除topic命令
2.3、修改topic命令
2.3.1、修改配置命令
2.3.2、删除配置命令
2.4、查询topic命令
2.4.1、查询topic列表命令
2.4.2、查看某个topic详情命令
2.4.2、查看某个topic配置命令
2.5、发送消息命令
3、消费者命令
3.1、消费者组命令
3.1.1、查看消费者组列表命令
3.1.2、查看消费者组成员命令
3.1.3、查看某个消费组详情命令
3.1.4、查看某个消费者组状态命令
3.3、消费消息命令
4、性能测试命令
5、类执行命令
1、集群运维命令
1.1、集群启停命令
bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-server-stop.sh
1.3、集群迁移命令
- consumer.config <file>: 指定消费者的配置文件。该文件包含了消费源集群的相关配置,如连接信息、组ID等。
- producer.config <file>: 指定生产者的配置文件。该文件包含了生产者写入目标集群的相关配置,如连接信息、压缩方式等。
- whitelist <topic>: 指定需要镜像的主题列表,可以使用逗号分隔多个主题名。
- blacklist <topic>: 指定需要排除镜像的主题列表,可以使用逗号分隔多个主题名。与--whitelist互斥。
- num.streams <number>: 指定消费者线程数量,默认是1。
- message.handler <class>: 自定义消息处理类,可以在消息写入目标集群前对其进行处理。
- message.handler.args <args>: 传递给自定义消息处理类的参数。
- abort.on.send.failure <true|false>: 是否在发送失败时终止镜像过程,默认为true。
- offset.commit.interval.ms <number>: 定期提交消费者偏移量的间隔时间,默认是60000ms(1分钟)
#将指定topic按照消费者配置调用生产者发送到指定集群
bin/kafka-mirror-maker.sh --consumer.config consumer.properties --producer.config producer.properties --whitelist "topic1,topic2"
1.4、权限管理命令
1.4.1、权限参数介绍
- principal: 代表用户或服务主体,例如 User:Alice。
- host: 允许或拒绝访问的主机地址,* 表示任意主机。
- operation: 允许或拒绝的操作类型,例如 READ, WRITE, ALL。
- permissionType: 权限类型,可以是 ALLOW 或 DENY。
- resourceType: 资源类型,例如 TOPIC 或 GROUP。
- name: 资源名称,例如 example-topic。
- patternType: 资源模式类型,LITERAL 表示字面模式。
1.4.2、增加权限命令
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \--remove --allow-principal User:user1 --operation Write --topic my-topic
1.4.3、移出权限命令
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \--remove --allow-principal User:user1 --operation Write --topic my-topic
1.4.4、查看所有topic权限命令
bin/kafka-acls.sh --list --bootstrap-server localhost:9092
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=example-topic, patternType=LITERAL)`:(principal=User:Alice, host=*, operation=READ, permissionType=ALLOW)(principal=User:Bob, host=192.168.1.100, operation=WRITE, permissionType=ALLOW)(principal=User:Carol, host=*, operation=READ, permissionType=DENY)Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=another-topic, patternType=LITERAL)`:(principal=User:David, host=192.168.1.101, operation=READ, permissionType=ALLOW)Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=example-group, patternType=LITERAL)`:(principal=User:Admin, host=*, operation=ALL, permissionType=ALLOW)
1.4.5、查看某个topic权限命令
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \--list --topic my-topic
2、生产者命令
2.1、创建topic命令
bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
2.2、删除topic命令
bin/kafka-topics.sh --delete --topic my_topic --bootstrap-server localhost:9092
2.3、修改topic命令
2.3.1、修改配置命令
kafka-topics.sh --alter --bootstrap-server localhost:9092 --topic topicName --config flush.messages=1
2.3.2、删除配置命令
kafka-topics.sh --alter --bootstrap-server localhost:9092 --topic topicName --delete-config flush.messages
2.4、查询topic命令
2.4.1、查询topic列表命令
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
2.4.2、查看某个topic详情命令
bin/kafka-topics.sh --describe --topic my_topic --bootstrap-server localhost:9092
2.4.2、查看某个topic配置命令
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --alter --add-config retention.ms=3600000Configs for topic 'my_topic' are:cleanup.policy=deletecompression.type=producerdelete.retention.ms=86400000file.delete.delay.ms=60000flush.messages=9223372036854775807flush.ms=9223372036854775807follower.replication.throttled.replicas=index.interval.bytes=4096leader.replication.throttled.replicas=max.message.bytes=1000012message.downconversion.enable=truemessage.format.version=2.7-IV0message.timestamp.difference.max.ms=9223372036854775807message.timestamp.type=CreateTimemin.cleanable.dirty.ratio=0.5min.compaction.lag.ms=0min.insync.replicas=1preallocate=falseretention.bytes=-1retention.ms=604800000segment.bytes=1073741824segment.index.bytes=10485760segment.jitter.ms=0segment.ms=604800000unclean.leader.election.enable=false
2.5、发送消息命令
#默认发送消息
bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092#指定分区发送命令
kafka-console-producer.sh --topic my-topic 2 --broker-list localhost:9092
3、消费者命令
3.1、消费者组命令
3.1.1、查看消费者组列表命令
#查看消费组列表命令
bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092group1
group2
group3
my_consumer_group
another_consumer_group
3.1.2、查看消费者组成员命令
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test_group --membersCONSUMER-ID HOST CLIENT-ID #PARTITIONS
consumer-1-2d7d59c3-64c9-4526-905b-4e8920db3e84 /192.168.1.36 consumer-1 2
3.1.3、查看某个消费组详情命令
bin/kafka-consumer-groups.sh --describe --group my_group --bootstrap-server localhost:9092GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my_group my_topic 0 12345 12350 5 consumer-1-1 /192.168.1.1 consumer-1
my_group my_topic 1 23456 23460 4 consumer-1-2 /192.168.1.1 consumer-1
my_group my_topic 2 34567 34567 0 consumer-1-3 /192.168.1.1 consumer-1
3.1.4、查看某个消费者组状态命令
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --stateCOORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS
192.168.1.36:9092 (1) range Stable 1
3.3、消费消息命令
#从头消费命令
bin/kafka-console-consumer.sh --topic my-topic --bootstrap-server localhost:9092 --from-beginning#指定分区消费命令
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning --partition 1#指定消费者组id消费命令
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning --consumer-property group.id=test
4、性能测试命令
4.1、生产者测试命令
bin/kafka-producer-perf-test.sh --topic perf-test --num-records 1000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=127.0.0.1:9092 compression.type=lz41000 records sent, 3424.657534 records/sec (3.34 MB/sec), 13.61 ms avg latency, 255.00 ms max latency, 13 ms 50th, 20 ms 95th, 255 ms 99th.-h, --help 显示使用帮助并退出
--topic 指定生产的消息发往的 topic
--num-records 指定生产的消息总数
--payload-delimeter 如果通过 --payload-file 指定了从文件中获取消息内容,那么这个参数的意义是指定文件的消息分隔符,默认值为 \n,即文件的每一行视为一条消息;如果未指定 --payload-file 则此参数不生效
--throughput 限制每秒发送的最大的消息数,设为 -1 表示不限制
--producer-props 直接指定 Producer 配置,格式为 NAME=VALUE,例如 bootstrap.server=127.0.0.1:9092,通过此种方式指定的配置优先级高于 --producer.config
--producer-config 指定 Producer 的配置文件,格式参照官方的 config/producer.properties
--print-metrics 在测试结束后打印更详尽的指标,默认为 false
--transactional-id 指定事务 ID,测试并发事务的性能时需要,只有在 --transaction-duration-ms > 0 时生效,默认值为 performance-producer-default-transactional-id
--transactional-duration-ms 指定事务持续的最长时间,超过这段时间后就会调用 commitTransaction 来提交事务,只有指定了 > 0 的值才会开启事务,默认值为 0
--record-size 指定每条消息的大小,单位是字节,和 --payload-file 两个中必须指定一个,但不能同时指定
--payload-file 指定消息的来源文件,只支持 UTF-8 编码的文本文件,文件的消息分隔符通过 --payload-delimeter 指定,和 --record-size 两个中必须指定一个,但不能同时指定
4.2、消费者测试命令
bin/kafka-consumer-perf-test.sh --bootstrap-server 127.0.0.1:9092 --topic perf_test --messages 1000000 --threads 8 --reporting-interval 1000 --show-detailedtime, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2021-03-25 15:57:59:426, 0, 657.2275, 657.2275, 673001, 673001.0000, 1616659078690, -1616659077690, 0.0000, 0.0000
...***************************输入参数解释***************************
--bootstrap-server 指定 broker 地址,必选,除非用 --broker-list 代替(不建议)
--topic 指定消费的 topic,必选
--version 输出 Kafka 版本
--consumer.config 指定 Consumer 配置文件
--date-format 指定用于格式化 *.time 的规则,默认为 yyyy-MM-dd HH:mm:ss:SSS
--fetch-size 指定一次请求消费的大小,默认为 1048576 即 1 MB
--from-latest 如果 Consumer 没有已经建立的 offset,则指定从 log 中最新的位点开始消费,而不是从最早的位点开始消费
--group 指定 ConsumerGroup ID,默认为 perf-consumer-40924
--help 显示使用帮助并退出
--hide-header 指定后不输出 header 信息
--messages 指定消费的消息数量,必选
--num-fetch-threads 指定 fetcher 线程的数量
--print-metrics 指定打印 metrics 信息
--reporting-interval 指定打印进度信息的时间间隔,默认为 5000 即 5 秒
--show-detailed-stats 指定每隔一段时间(由 --reporting-interval 指定)输出显示详细的状态信息
--socket-buffer-size 指定 TCP 的 RECV 大小,默认为 2097152 即 2 MB
--threads 指定消费的线程数,默认为 10
--timeout 指定允许的最大超时时间,即每条消息返回的最大时间间隔,默认为 10000 即 10 秒***************************输出参数解释***************************
time:当前时间,格式由 --date-format 指定
threadId:线程 ID
data.consumed.in.MB:消费到的数据总大小,单位为 MB
MB.sec:消费 TPS,即每秒消费的消息大小
data.consumed.in.nMsg:消费到的总消息数
nMsg.sec:消费 TPS,即每秒消费的消息条数
rebalance.time.ms:消费者组重平衡的耗时,单位为 ms,0 表示没有发生重平衡
fetch.time.ms:fetch 线程的总耗时,单位为 ms
fetch.MB.sec:fetch 线程每秒钟获取到的消息大小
fetch.nMsg.sec:fetch 线程每秒钟获取到的消息数量
#若没有指定 --show-detailed,则输出信息中的前两项会有所不同,如下:
start.time, end.time, data.consumed.in.MB, MB.sec, ...
start.time:消费开始的时间,格式由 --date-format 指定
end.time:消费结束的时间,格式由 --date-format 指定
5、类执行命令
通过kafka-run-class命令执行指定类
#查看消费者消费进度
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic my-topicmy-topic:0:12345
my-topic:1:23456
my-topic:2:34567
相关文章:
kafka操作命令详解
目录 1、集群运维命令 1.1、集群启停命令 1.3、集群迁移命令 1.4、权限管理命令 1.4.1、权限参数介绍 1.4.2、增加权限命令 1.4.3、移出权限命令 1.4.4、查看所有topic权限命令 1.4.5、查看某个topic权限命令 2、生产者命令 2.1、创建topic命令 2.2、删除topic命令 …...
graalvm jdk和openjdk
下载地址:https://github.com/graalvm/graalvm-ce-builds/releases 官网: https://www.graalvm.org...
docker基础使用教程
1.准备工作 例子:工程在docker_test 生成requirements.txt文件命令:(使用参考链接2) pip list --formatfreeze > requirements.txt 参考链接1: 安装pipreqs可能比较困难 python 项目自动生成环境配置文件require…...
计算机网络 交换机的安全配置
一、理论知识 1.交换机端口安全功能介绍 交换机端口安全功能是针对交换机端口进行安全属性的配置,以控制用户的安全接入。主要包括以下两种配置项: ①限制交换机端口的最大连接数:控制交换机端口连接的主机数量;防止用户进行恶…...
深入解析大语言模型系列:Transformer架构的原理与应用
引言 在自然语言处理(NLP)领域,大语言模型(Large Language Models, LLMs)近几年取得了突破性的进展,而 Transformer 作为这些模型的核心架构,功不可没。本文将详细介绍 Transformer 的原理、结…...
uni-app地图组件控制
uni.createMapContext(mapId,this) 创建并返回 map 上下文 mapContext 对象。在自定义组件下,第二个参数传入组件实例this,以操作组件内 <map> 组件。 注意:uni.createMapContext(mapId, this) app-nvue 平台 2.2.5 支持 uni.create…...
前端调用api发请求常用的请求头content- type的类型和常用场景
Content-Type 是一个非常重要的HTTP头,它定义了发送给服务器或客户端的数据的MIME类型。这对于服务器和客户端正确解析和处理数据至关重要。下面是一些常见的 Content-Type 值及其用途和区别。 常见的 Content-Type 值 text/plain • 用途: 纯文本,无格…...
数据仓库之SparkSQL
Apache Spark SQL是Spark中的一个组件,专门用于结构化数据处理。它提供了通过SQL和DataFrame API来执行结构化数据查询的功能。以下是对Spark SQL的详细介绍: 核心概念 DataFrame: 定义: DataFrame是一个分布式数据集合,类似于关系型数据库中…...
如何在 MySQL 中导入和导出数据库以及重置 root 密码
前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。 如何导入和导出数据库 导出 要导出数据库,打开终端,确保你没有登录到 MySQL 中,然后输入以下命令&…...
基于uni-app和图鸟UI的云课堂小程序开发实践
摘要: 随着移动互联网的快速发展,移动学习已成为教育领域的重要趋势。本文介绍了基于uni-app和图鸟UI框架开发的云课堂小程序,该小程序实现了移动教学、移动学习、移动阅读和移动社交的完美结合,为用户提供了一个便捷、高效的学习…...
解决python从TD数据库取50w以上大量数据慢的问题
1.问题背景描述 python项目中的时序数据都存放在TD数据库中,数据是秒级存入的,当查询一周数据时将超过50w数据量,这是一次性获取全量数据到python程序很慢,全流程10秒以上,希望进行优化加速 2.排查 首先,…...
游戏心理学Day21
玩家情绪与暴力攻击 情绪 情绪的分类 情绪是一种经常波动的东西,我们既体验过骄傲激动和开心,也体验过羞怯内疚和沮丧。我们的感受高度依赖于情境。研究者区分出至少三种途径来考察作为一种相对固定的人格特征的情绪,即为情感性࿰…...
接口测试基础 --- 什么是接口测试及其测试流程?
接口测试是软件测试中的一个重要部分,它主要用于验证和评估不同软件组件之间的通信和交互。接口测试的目标是确保不同的系统、模块或组件能够相互连接并正常工作。 接口测试流程可以分为以下几个步骤: 1.需求分析:首先,需要仔细…...
贪心+动归1
跳跃游戏 给你一个非负整数数组 nums ,你最初位于数组的 第一个下标 。数组中的每个元素代表你在该位置可以跳跃的最大长度。 判断你是否能够到达最后一个下标,如果可以,返回 true ;否则࿰…...
三星S20以上手机中的动态相片及其分解
三星S20以后的相机,相机拍出来的图片,用三星手机自带的“相册”打开之后,还会有“查看动态照片”的选项,点击之后就能查看拍照片时前后2秒左右的视频! 不知道这个功能是不是三星独有的。 这样得到的图片非常大。因为…...
一文了解HarmonyOSNEXT发布重点内容
华为在2024年6月21日的开发者大会上正式发布了HarmonyOS NEXT版,这是华为在操作系统领域的一次重大飞跃,标志着华为在构建全场景智能生态方面的卓越成就。HarmonyOS NEXT版不仅带来了全新的系统架构和性能提升,还首次将AI能力融入系统&#x…...
矩阵中严格递增的单元格数
题目链接:leetcode:矩阵中严格递增的单元格数 描述 给你一个下标从 1 开始、大小为 m x n 的整数矩阵 mat,你可以选择任一单元格作为 起始单元格 。 从起始单元格出发,你可以移动到 同一行或同一列 中的任何其他单元格,但前提是目…...
超参数调优-通用深度学习篇(上)
文章目录 深度学习超参数调优网格搜索示例一:网格搜索回归模型超参数示例二:Keras网格搜索 随机搜索贝叶斯搜索 超参数调优框架Optuna深度学习超参数优化框架nvidia nemo大模型超参数优化框架 参数调整理论: 黑盒优化:超参数优化…...
小程序中data-xx是用方式
data-sts"3" 是微信小程序中的一种数据绑定语法,用于在 WXML(小程序模板)中将自定义的数据绑定到页面元素上。让我详细解释一下: data-xx 的作用: data-xx 允许你在页面元素上自定义属性,以便在事…...
【2024德国工作】外国人在德国找工作是什么体验?
挺难的,德语应该是所有中国人的难点。大部分中国人进德国公司要么是做中国业务相关,要么是做技术领域的工程师。先讲讲人在中国怎么找德国的工作,顺便延申下,德国工作的真实体验,最后聊聊在今年的德国工作签证申请条件…...
Spark 之 入门讲解详细版(1)
1、简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发通用内存并行计算框架。Spark在2013年6月进入Apache成为孵化项目,8个月后成为Apache顶级项目,速度之快足见过人之处&…...
脑机新手指南(八):OpenBCI_GUI:从环境搭建到数据可视化(下)
一、数据处理与分析实战 (一)实时滤波与参数调整 基础滤波操作 60Hz 工频滤波:勾选界面右侧 “60Hz” 复选框,可有效抑制电网干扰(适用于北美地区,欧洲用户可调整为 50Hz)。 平滑处理&…...
Vue3 + Element Plus + TypeScript中el-transfer穿梭框组件使用详解及示例
使用详解 Element Plus 的 el-transfer 组件是一个强大的穿梭框组件,常用于在两个集合之间进行数据转移,如权限分配、数据选择等场景。下面我将详细介绍其用法并提供一个完整示例。 核心特性与用法 基本属性 v-model:绑定右侧列表的值&…...
23-Oracle 23 ai 区块链表(Blockchain Table)
小伙伴有没有在金融强合规的领域中遇见,必须要保持数据不可变,管理员都无法修改和留痕的要求。比如医疗的电子病历中,影像检查检验结果不可篡改行的,药品追溯过程中数据只可插入无法删除的特性需求;登录日志、修改日志…...
Robots.txt 文件
什么是robots.txt? robots.txt 是一个位于网站根目录下的文本文件(如:https://example.com/robots.txt),它用于指导网络爬虫(如搜索引擎的蜘蛛程序)如何抓取该网站的内容。这个文件遵循 Robots…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
Pinocchio 库详解及其在足式机器人上的应用
Pinocchio 库详解及其在足式机器人上的应用 Pinocchio (Pinocchio is not only a nose) 是一个开源的 C 库,专门用于快速计算机器人模型的正向运动学、逆向运动学、雅可比矩阵、动力学和动力学导数。它主要关注效率和准确性,并提供了一个通用的框架&…...
客户案例 | 短视频点播企业海外视频加速与成本优化:MediaPackage+Cloudfront 技术重构实践
01技术背景与业务挑战 某短视频点播企业深耕国内用户市场,但其后台应用系统部署于东南亚印尼 IDC 机房。 随着业务规模扩大,传统架构已较难满足当前企业发展的需求,企业面临着三重挑战: ① 业务:国内用户访问海外服…...
从零手写Java版本的LSM Tree (一):LSM Tree 概述
🔥 推荐一个高质量的Java LSM Tree开源项目! https://github.com/brianxiadong/java-lsm-tree java-lsm-tree 是一个从零实现的Log-Structured Merge Tree,专为高并发写入场景设计。 核心亮点: ⚡ 极致性能:写入速度超…...
SpringCloud优势
目录 完善的微服务支持 高可用性和容错性 灵活的配置管理 强大的服务网关 分布式追踪能力 丰富的社区生态 易于与其他技术栈集成 完善的微服务支持 Spring Cloud 提供了一整套工具和组件来支持微服务架构的开发,包括服务注册与发现、负载均衡、断路器、配置管理等功能…...
