自定义 Kafka 脚本 kf-use.sh 的解析与功能与应用示例
Kafka:分布式消息系统的核心原理与安装部署-CSDN博客
自定义 Kafka 脚本 kf-use.sh 的解析与功能与应用示例-CSDN博客
Kafka 生产者全面解析:从基础原理到高级实践-CSDN博客
Kafka 生产者优化与数据处理经验-CSDN博客
Kafka 工作流程解析:从 Broker 工作原理、节点的服役、退役、副本的生成到数据存储与读写优化-CSDN博客
Kafka 消费者全面解析:原理、消费者 API 与Offset 位移-CSDN博客
Kafka 分区分配及再平衡策略深度解析与消费者事务和数据积压的简单介绍-CSDN博客
Kafka 数据倾斜:原因、影响与解决方案-CSDN博客
Kafka 核心要点解析_kafka mirrok-CSDN博客
Kafka 核心问题深度解析:全面理解分布式消息队列的关键要点_kafka队列日志-CSDN博客
目录
一、脚本功能概述
二、生产者性能测试
三、消费者性能测试
四、查看可用主题列表
五、脚本使用示例
六、脚本源码
七、总结
在大数据处理的领域中,Kafka 扮演着极为重要的角色,它作为一个分布式流处理平台,能够高效地处理大规模的实时数据。而 Kafka 提供了一系列的脚本工具,帮助我们更好地管理和测试 Kafka 集群。今天,我们就来深入探讨一个自定义的 Kafka 脚本 kf-use.sh
,了解其功能与使用场景。
一、脚本功能概述
kf-use.sh
脚本主要提供了两个核心功能:生产者性能测试和消费者性能测试,同时还具备查看可用主题列表以及退出脚本等功能。通过这些功能,我们可以对 Kafka 集群的生产和消费能力进行评估,以便优化集群配置和数据处理流程。
二、生产者性能测试
- 参数输入
- 当选择生产者性能测试功能时,脚本首先会提示用户输入主题名称。如果输入的主题不存在,脚本会要求用户重新输入,确保测试的主题是有效的。
- 接着,用户需要输入要生产的记录数量,并且脚本会验证输入是否为整数,如果不是则提示重新输入。
- 最后,用户要输入每条记录的大小(单位为字节),同样会进行整数验证。
- 性能测试执行
- 一旦用户输入了正确的参数,脚本会调用
kafka-producer-perf-test.sh
工具,并传入相应的参数,如主题名称、记录数量、记录大小等,同时设置吞吐量为 -1(表示不限制吞吐量),并指定 Kafka 集群的地址bigdata01:9092
。这样就可以开始对生产者的性能进行测试,测试结果将反映出在给定条件下生产者向指定主题发送数据的效率。
- 一旦用户输入了正确的参数,脚本会调用
三、消费者性能测试
- 主题选择
- 在消费者性能测试功能中,首先会调用
kafka-topics.sh
工具列出当前可用的主题列表,然后提示用户输入要测试的主题名称。如果输入的主题不存在,脚本会要求重新输入。
- 在消费者性能测试功能中,首先会调用
- 性能测试执行
- 当用户选择了存在的主题后,脚本会调用
kafka-consumer-perf-test.sh
工具,传入 Kafka 集群地址bigdata01:9092
、主题名称以及要消费的消息数量(这里固定为 100000 条),从而对消费者从指定主题消费数据的性能进行测试,测试结果可以帮助我们了解消费者处理数据的速度和效率。
- 当用户选择了存在的主题后,脚本会调用
四、查看可用主题列表
无论是在生产者还是消费者性能测试功能中,都提供了查看可用主题列表的选项。通过调用 kafka-topics.sh
工具并传入集群地址 bigdata01:9092
,可以获取当前 Kafka 集群中所有的主题名称,方便用户了解集群中的数据主题情况,以便做出正确的测试选择。
五、脚本使用示例
假设我们要对一个名为 test_topic
的主题进行生产者性能测试,我们可以按照以下步骤操作:
- 运行
kf-use.sh
脚本。 - 选择生产者性能测试功能(可能是对应的数字选项,如 1)。
- 输入主题名称
test_topic
。 - 输入要生产的记录数量,例如 10000。
- 输入每条记录的大小,比如 100 字节。
- 脚本会自动执行生产者性能测试,并输出测试结果,包括发送的总字节数、每秒发送的记录数、每秒发送的字节数等信息。
同样,如果要进行消费者性能测试,例如对 test_topic
主题:
- 运行
kf-use.sh
脚本并选择消费者性能测试功能(可能是数字 7 后再选择 1)。 - 查看可用主题列表,确认
test_topic
存在后输入该主题名称。 - 脚本会执行消费者性能测试,并给出消费 100000 条消息的相关性能数据,如每秒消费的消息数等。
六、脚本源码
#!/bin/bashwhile true; do# 命令大全系统界面echo "Kafka 命令大全系统:"echo "1. 主题操作(topics)"echo "2. 生产者操作(producer)"echo "3. 消费者操作(consumer)"echo "4. 配置操作(configs)"echo "5. 消费者组操作(consumer groups)"echo "6. 生产者性能测试(producer perf test)"echo "7. 消费者性能测试(consumer perf test)"echo "0. 退出"read -p "请输入功能选项数字:" choicecase $choice in1)# 主题操作菜单while true; doecho "主题操作功能:"echo "1. 查看所有主题"echo "2. 创建主题"echo "3. 查看某主题详细信息"echo "4. 修改某主题分区数"echo "5. 删除主题"echo "0.返回命令大全系统界面"read -p "请输入主题操作选项数字:" topic_choicecase $topic_choice in1)kafka-topics.sh --bootstrap-server bigdata01:9092 --list;;2)# 创建主题read -p "请输入要创建的主题名称:" topic_namewhile true; doread -p "请输入分区数(整数):" partitionsif [[ $partitions =~ ^[0-9]+$ ]]; thenbreakelseecho "分区数必须是整数,请重新输入。"fidonewhile true; doread -p "请输入副本数(整数,且不超过可用 broker 数量):" replication_factorif [[ $replication_factor =~ ^[0-9]+$ ]]; then# 这里可添加检查副本数不超过可用 broker 数量的逻辑,暂时省略breakelseecho "副本数必须是整数,请重新输入。"fidonekafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions $partitions --replication-factor $replication_factor --topic $topic_name;;3)# 查看某主题详细信息kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要查看详细信息的主题名称:" topic_to_describekafka-topics.sh --bootstrap-server bigdata01:9092 --describe --topic $topic_to_describe;;4)# 修改某主题分区数kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要修改分区数的主题名称:" topic_to_alterwhile true; doread -p "请输入新的分区数(整数且大于当前分区数):" new_partitionsif [[ $new_partitions =~ ^[0-9]+$ ]]; then# 这里可添加检查新分区数是否大于当前分区数的逻辑,暂时省略breakelseecho "新分区数必须是整数,请重新输入。"fidonekafka-topics.sh --bootstrap-server bigdata01:9092 --alter --topic $topic_to_alter --partitions $new_partitions;;5)# 删除主题kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要删除的主题名称:" topic_to_deletekafka-topics.sh --bootstrap-server bigdata01:9092 --delete --topic $topic_to_delete;;0)break;;*)echo "无效的主题操作选择。";;esacdone;;2)# 生产者操作菜单while true; doecho "生产者操作功能:"echo "1. 发送消息到指定主题"echo "0.返回命令大全系统界面"read -p "请输入生产者操作选项数字:" producer_choicecase $producer_choice in1)kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要发送消息的主题名称:" topic_nameecho "开始发送消息到主题 $topic_name。输入'EXIT'退出发送。"while true; doread -p "请输入消息内容:" messageif [ "$message" = "EXIT" ]; thenbreakfikafka-console-producer.sh --bootstrap-server bigdata01:9092 --topic $topic_name <<< "$message"done;;0)break;;*)echo "无效的生产者操作选择。";;esacdone;;3)# 消费者操作菜单while true; doecho "消费者操作功能:"echo "1. 消费指定主题的消息"echo "2. 从主题开头消费所有消息"echo "0.返回命令大全系统界面"read -p "请输入消费者操作选项数字:" consumer_choicecase $consumer_choice in1)kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要消费消息的主题名称:" topic_namekafka-console-consumer.sh --bootstrap-server bigdata01:9092 --topic $topic_name;;2)kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要从开头消费消息的主题名称:" topic_namekafka-console-consumer.sh --bootstrap-server bigdata01:9092 --from-beginning --topic $topic_name;;0)break;;*)echo "无效的消费者操作选择。";;esacdone;;4)# 配置操作菜单while true; doecho "配置操作功能:"echo "1. 查看主题配置"echo "2. 修改主题配置"echo "0.返回命令大全系统界面"read -p "请输入配置操作选项数字:" config_choicecase $config_choice in1)kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要查看配置的主题名称:" topic_namekafka-configs.sh --bootstrap-server bigdata01:9092 --describe --entity-type topics --entity-name $topic_name;;2)kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要修改配置的主题名称:" topic_nameread -p "请输入配置项名称:" config_nameread -p "请输入配置项值:" config_valuekafka-configs.sh --bootstrap-server bigdata01:9092 --alter --entity-type topics --entity-name $topic_name --add-config $config_name=$config_value;;0)break;;*)echo "无效的配置操作选择。";;esacdone;;5)# 消费者组操作菜单while true; doecho "消费者组操作功能:"echo "1. 查看消费者组列表"echo "2. 查看消费者组详情"echo "3. 重置消费者组偏移量"echo "0.返回命令大全系统界面"read -p "请输入消费者组操作选项数字:" consumer_group_choicecase $consumer_group_choice in1)kafka-consumer-groups.sh --bootstrap-server bigdata01:9092 --list;;2)read -p "请输入要查看详情的消费者组名称:" group_namekafka-consumer-groups.sh --bootstrap-server bigdata01:9092 --describe --group $group_name;;3)read -p "请输入要重置偏移量的消费者组名称:" group_nameread -p "请输入要重置偏移量的主题名称:" topic_namekafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"kafka-consumer-groups.sh --bootstrap-server bigdata01:9092 --reset-offsets --group $group_name --topic $topic_name --to-earliest;;0)break;;*)echo "无效的消费者组操作选择。";;esacdone;;6)# 生产者性能测试菜单while true; doecho "生产者性能测试功能:"echo "1. 执行生产者性能测试"echo "2. 查看可用主题列表"echo "0.返回命令大全系统界面"read -p "请输入生产者性能测试选项数字:" producer_perf_choicecase $producer_perf_choice in1)kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要测试的主题名称:" topic_nameexisting_topics=$(kafka-topics.sh --bootstrap-server bigdata01:9092 --list)if echo "$existing_topics" | grep -q "$topic_name"; thenwhile true; doread -p "请输入要发送的记录数量(整数):" num_recordsif [[ $num_records =~ ^[0-9]+$ ]]; thenbreakelseecho "记录数量必须是整数,请重新输入。"fidonewhile true; doread -p "请输入每条记录的大小(整数,单位字节):" record_sizeif [[ $record_size =~ ^[0-9]+$ ]]; thenbreakelseecho "记录大小必须是整数,请重新输入。"fidonekafka-producer-perf-test.sh --topic $topic_name --num-records $num_records --record-size $record_size --throughput -1 --producer-props bootstrap.servers=bigdata01:9092elseecho "主题 $topic_name 不存在,请重新输入。"fi;;2)kafka-topics.sh --bootstrap-server bigdata01:9092 --list;;0)break;;*)echo "无效的生产者性能测试选择。";;esacdone;;7)# 消费者性能测试菜单while true; doecho "消费者性能测试功能:"echo "1. 执行消费者性能测试"echo "2. 查看可用主题列表"echo "0.返回命令大全系统界面"read -p "请输入消费者性能测试选项数字:" consumer_perf_choicecase $consumer_perf_choice in1)kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要测试的主题名称:" topic_nameexisting_topics=$(kafka-topics.sh --bootstrap-server bigdata01:9092 --list)if echo "$existing_topics" | grep -q "$topic_name"; thenkafka-consumer-perf-test.sh --broker-list bigdata01:9092 --topic $topic_name --messages 100000elseecho "主题 $topic_name 不存在,请重新输入。"fi;;2)kafka-topics.sh --bootstrap-server bigdata01:9092 --list;;0)break;;*)echo "无效的消费者性能测试选择。";;esacdone;;0)echo "退出脚本。"break;;*)echo "无效的选择。";;esac
done
七、总结
kf-use.sh
脚本为我们提供了一个便捷的方式来测试 Kafka 集群的生产者和消费者性能,同时方便地查看可用主题列表。通过合理使用这个脚本,我们可以更好地了解 Kafka 集群在数据生产和消费方面的能力,及时发现潜在的性能瓶颈并进行优化,从而提高整个大数据处理流程的效率。无论是对于 Kafka 初学者还是有一定经验的开发者,这个脚本都是一个非常实用的工具,可以帮助我们更好地管理和优化 Kafka 集群的运行。
在实际应用中,我们可以根据不同的业务需求和数据处理场景,灵活调整测试参数,深入分析测试结果,以确保 Kafka 集群能够稳定、高效地运行,满足日益增长的大数据处理需求。
相关文章:

自定义 Kafka 脚本 kf-use.sh 的解析与功能与应用示例
Kafka:分布式消息系统的核心原理与安装部署-CSDN博客 自定义 Kafka 脚本 kf-use.sh 的解析与功能与应用示例-CSDN博客 Kafka 生产者全面解析:从基础原理到高级实践-CSDN博客 Kafka 生产者优化与数据处理经验-CSDN博客 Kafka 工作流程解析:…...

【SQL】【数据库】语句翻译例题
SQL自然语言到SQL翻译知识点 以下是将自然语言转化为SQL语句的所有相关知识点,分门别类详细列出,并结合技巧说明。 1. 数据库操作 创建数据库 自然语言:创建一个名为“TestDB”的数据库。 CREATE DATABASE TestDB;技巧:识别**“创…...

linux基本命令2
7. 文件查找和搜索 (继续) find — 查找文件 find /path/to/search -name "file_name" # 根据名称查找文件 find /path/to/search -type f # 查找所有普通文件 find /path/to/search -type d # 查找所有目录 find /path/to/search -name "*.txt" # 查找…...

Spring Boot项目集成Redisson 原始依赖与 Spring Boot Starter 的流程
Redisson 是一个高性能的 Java Redis 客户端,提供了丰富的分布式工具集,如分布式锁、Map、Queue 等,帮助开发者简化 Redis 的操作。在集成 Redisson 到项目时,开发者通常有两种选择: 使用 Redisson 原始依赖。使用 Re…...

Git命令使用与原理详解
1.仓库 # 在当前目录新建一个Git代码库 $ git init # 新建一个目录,将其初始化为Git代码库 $ git init [project-name] # 下载一个项目和它的整个代码历史 $ git clone [url]2.配置 # 显示当前的Git配置 $ git config --list # 编辑Git配置文件 $ git co…...

Linux:自定义Shell
本文旨在通过自己完成一个简单的Shell来帮助理解命令行Shell这个程序。 目录 一、输出“提示” 二、获取输入 三、切割字符串 四、执行指令 1.子进程替换 2.内建指令 一、输出“提示” 这个项目基于虚拟机Ubuntu22.04.5实现。 打开终端界面如图所示。 其中。 之前&#x…...

vue项目中中怎么获取环境变量
在 Vue 项目中,有几种获取环境变量的方法。最常用的是通过 import.meta.env 来访问。 1.首先在项目根目录创建环境变量文件: .env # 所有环境都会加载 .env.development # 开发环境 .env.production # 生产环境2.在环境变量文件…...

C#里怎么样使用正则表达式?
C#里怎么样使用正则表达式? 正则表达式是由普通字符(如英文字母)以及特殊字符(也称为元字符)组成的一种文字模式 这种文字模式可用于检查字符串的值是否满足一定的规则,例如: 验证输入的邮箱是否合法 输入的身份证号码是否合法 输入的用户名是否满足条件等 也可以…...

《生成式 AI》课程 第5講:訓練不了人工智慧?你可以訓練你自己 (下)
资料来自李宏毅老师《生成式 AI》课程,如有侵权请通知下线 Introduction to Generative AI 2024 Springhttps://speech.ee.ntu.edu.tw/~hylee/genai/2024-spring.php 摘要 这一系列的作业是为 2024 年春季的《生成式 AI》课程设计的,共包含十个作业。…...

Vue 动态给 data 添加新属性深度解析:问题、原理与解决方案
在 Vue 中,动态地向 data 中添加新的属性是一个常见的需求,但它也可能引发一些问题,尤其是关于 响应式更新 和 数据绑定 的问题。Vue 的响应式系统通过 getter 和 setter 来追踪和更新数据,但 动态添加新属性 时,Vue 并不会自动为这些新属性创建响应式链接。 1. 直接向 V…...

【Pytest+Yaml+Allure】实现接口自动化测试框架
一、框架思想 requestsyamlpytestallure实现接口自动化框架。结合数据驱动和分层思想,将代码与数据分离,易维护,易上手。使用yaml编写编写测试用例,利用requests库发送请求,使用pytest管理用例,allure生成…...

el-input绑定点击回车事件意外触发页面刷新
小伙伴们在项目中应该还是比较常用键盘指定按键事件的,尤其是一些筛选条件的通过点击键盘回车按键去触发搜索 例如: <el-form><el-form-item label条件title><el-input v-modelformData.searchKey keydown.entersearch></el-input…...

Golang的语言特性与鸭子类型
Golang的语言特性与鸭子类型 前言 什么是鸭子类型? Suppose you see a bird walking around in a farm yard. This bird has no label that says ‘duck’. But the bird certainly looks like a duck. Also, he goes to the pond and you notice that he swims l…...

如何在Linux系统中排查GPU上运行的程序
如何在Linux系统中排查GPU上运行的程序 在Linux系统中,随着深度学习和高性能计算的普及,GPU资源的管理和监控变得越来越重要。当您遇到GPU资源不足或性能下降的问题时,需要能够快速定位并解决这些问题。本文将介绍几种常用的方法来帮助您排查…...

VSCode 新建 Python 包/模块 Pylance 无法解析
问题描述: 利用 VSCode 写代码,在项目里新建一个 Python 包或者模块,然后在其他文件里正常导入这个包或者模块时出现: Import “xxxx” could not be resolved Pylance (reportMissingImports) 也就是说 Pylance 此时无法解析我们…...

Unet++改进44:添加MogaBlock(2024最新改进模块)|在纯基于卷积神经网络的模型中进行判别视觉表示学习,具有良好的复杂性和性能权衡。
本文内容:添加MogaBlock 目录 论文简介 1.步骤一 2.步骤二 3.步骤三 4.步骤四 论文简介 通过将内核尽可能全局化,现代卷积神经网络在计算机视觉任务中显示出巨大的潜力。然而,最近在深度神经网络(dnn)内的多阶博弈论相互作用方面的进展揭示了现代卷积神经网络的表示瓶…...

计算机网络(14)ip地址超详解
先看图: 注意看第三列蓝色标注的点不会改变,A类地址第一个比特只会是0,B类是10,C类是110,D类是1110,E类是1111. IPv4地址根据其用途和网络规模的不同,分为五个主要类别(A、B、C、D、…...

【C语言】野指针问题详解及防范方法
博客主页: [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C语言 文章目录 💯前言💯什么是野指针?💯未初始化的指针代码示例问题分析解决方法 💯指针越界访问代码示例问题分析解决方法 💯指向已释放内存的…...

【SVN和GIT】版本控制系统详细下载使用教程
文章目录 ** 参考文章一、什么是SVN和GIT二、软件使用介绍1 SVN安装1.1 服务端SVN下载地址1.2 客户端SVN下载地址2 SVN使用2.1 服务端SVN基础使用2.1.1 创建存储库和用户成员2.1.2 为存储库添加访问人员2.2 客户端SVN基础使用2.2.1 在本地下载库中的内容2.2.2 版本文件操作--更…...

【Vue】Vue3.0(二十六)Vue3.0中的作用域插槽
上篇文章 【Vue】Vue3.0(二十五)Vue3.0中的具名插槽 的概念和使用场景 🏡作者主页:点击! 🤖Vue专栏:点击! ⏰️创作时间:2024年11月20日17点30分 文章目录 概念使用场景示…...
神经网络(系统性学习二):单层神经网络(感知机)
此前篇章: 神经网络中常用的激活函数 神经网络(系统性学习一):入门篇 单层神经网络(又叫感知机) 单层网络是最简单的全连接神经网络,它仅有输入层和输出层,没有隐藏层。即&#x…...

CTF之密码学(BF与Ook)
BrainFuck(通常也被称为Brainfuck或BF)和Ook是两种非常特殊且有趣的编程语言。以下是对这两种语言的详细介绍: 一、BrainFuck 简介: BrainFuck是一种极小化的计算机语言,由Urban Mller在1993年创建。由于“fuck”在英…...

【TEST】Apache JMeter + Influxdb + Grafana
介绍 使用Jmeter发起测试,测试结果存入Influxdb,Grafana展示你的测试结果。 环境 windows 10docker desktopJDK17 安装 Apache JMeter 访问官网(Apache JMeter - Apache JMeter™)下载JMeter(目前最新版本5.6.3&a…...

SpringBoot集成多个rabbitmq
1、pom文件 <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp --> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><versio…...

从零开始学习数据库 day0(基础)
在当今的信息时代,数据已经成为了企业和组织最重要的资产之一。无论是电子商务平台,社交媒体,还是科研机构,几乎每个地方都离不开数据库。今天,我们将一起走进数据库的世界,学习它的基础知识,帮…...

MongoDB相关问题
视频教程 【GeekHour】20分钟掌握MongoDB Complete MongoDB Tutorial by Net Ninja MongoDB开机后调用缓慢的原因及解决方法 问题分析: MongoDB开机后调用缓慢,通常是由于以下原因导致: 索引重建: MongoDB在启动时会重建索引…...

linux基本命令(1)
1. 文件和目录操作 ls — 列出目录内容 ls # 显示当前目录的文件和目录 ls -l # 显示详细的文件信息(权限、大小、修改时间等) ls -a # 显示所有文件(包括隐藏文件) ls -lh # 显示详细信息并以易读的方式显示文件大小 cd — 改…...

【机器学习】超简明Python基础教程
Python是一种简单易学、功能强大的编程语言,适用于数据分析、人工智能、Web开发、自动化脚本等多个领域。本教程面向零基础学习者,逐步讲解Python的基本概念、语法和操作。 1. 安装与运行 安装Python 从官网 Welcome to Python.org 下载适合自己系统的…...

基于信创环境的信息化系统运行监控及运维需求及策略
随着信息技术的快速发展和国家对信息安全的日益重视,信创环境(信息技术应用创新环境)的建设已成为行业发展的重要趋势。本指南旨在为运维团队在基于信创环境的系统建设及运维过程中提供参考,确保项目顺利实施并满足各项技术指标和…...

【Mysql】视图--介绍和作用 视图的创建
1、介绍 (1)视图(view)是一个虚拟表,非真实存在,其本质是根据SQL语句获取动态的数据集,并为其命名,用户使用时只需使用视图名称既可获取结果集,并可以将其当作表来使用。…...