【shell-10】shell实现的各种kafka脚本
kafka-shell工具
- 背景
- 日志 log
- 一.启动kafka->(start-kafka)
- 二.停止kafka->(stop-kafka)
- 三.创建topic->(create-topic)
- 四.删除topic->(delete-topic)
- 五.获取topic列表->(list-topic)
- 六. 将文件数据 录入到kafka->(file-to-kafka)
- 七.将kafka数据 下载到文件->(kafka-to-file)
- 八. 查看topic的groupID消费情况->(list-group)
背景
注意:我用的kafka版本是 3.2.1 其他版本kafka提供的 命令行可能有细微区别。
因为经常要用kafka环境参与测试,所以写了不少脚本。在很多时候可以大大提高测试的效率。
主要包含如下功能:
topic的管理【创建,删除】
topic信息查看【topic列表,topic groupid 消费情况】
topic数据传输【file数据录入到topic,topic数据下载到本地文件】
脚本中做了各种检查,日志的输出做了颜色区分,用起来没啥问题。
日志 log
此文件是个额外的日志文件主要用于打印日志,该文件会被下面的shell文件引用
#!/bin/bash
#日志级别 debug-1, info-2, warn-3, error-4, always-5
LOG_LEVEL=2#调试日志
function log_debug(){content="$(date '+%Y-%m-%d %H:%M:%S') [DEBUG]: $@"[ $LOG_LEVEL -le 1 ] && echo -e "\033[32m" ${content} "\033[0m"
}
#信息日志
function log_info(){content="$(date '+%Y-%m-%d %H:%M:%S') [INGO]: $@"[ $LOG_LEVEL -le 2 ] && echo -e "\033[32m" ${content} "\033[0m"
}
#警告日志
function log_warn(){content="$(date '+%Y-%m-%d %H:%M:%S') [WARN] $@"[ $LOG_LEVEL -le 3 ] && echo -e "\033[33m" ${content} "\033[0m"
}
#错误日志
function log_err(){content="$(date '+%Y-%m-%d %H:%M:%S') [ERROR]: $@"[ $LOG_LEVEL -le 4 ] && echo -e "\033[31m" ${content} "\033[0m"
}
~
一.启动kafka->(start-kafka)
下面代码中的路径你要替换成自己的路径
#!/bin/bash
source /home/shell/logpid=`ps -aux | grep /home/kafka/kafka_2.12-3.2.1/bin/ | grep -v grep |awk '{print$2}'`
log_info "Start checking kafka process"
if [ -z $pid ]; thenlog_info "The kafka process does not exist, startting.........................................................................................."
elselog_warn "The kafka process exists and does not need to be started"exit 1
fi
nohup kafka-server-start.sh /home/kafka/kafka_2.12-3.2.1/config/server.properties >>/home/kafka/kafka.log 2>&1 &
# 日志的路径是安装kafka的时候指定的,也要替换成自己的路径
tail -f 20 /home/kafka/kafka.log
二.停止kafka->(stop-kafka)
下面代码中的路径你要替换成自己的路径
#!/bin/bash
source /home/shell/log
log_info "Start checking kafka process"
pid=`ps -aux | grep /home/kafka/kafka_2.12-3.2.1/bin/ | grep -v grep |awk '{print$2}'`
if [ -z $pid ]; thenlog_warn "The kafka process does not exist and does not need to be stopped"exit 1
elselog_info "The kafka process alive, stopping.............................................................................................................."
fi
kafka-server-stop.sh
log_info "Stop kafka success"
三.创建topic->(create-topic)
下面代码中的路径你要替换成自己的路径
#!/bin/bash
source /home/shell/log
log_info "脚本功能: 创建topic"
log_info "脚本参数: topic名称(必选)"
if [ $# -ne 1 ]; thenlog_err "错误:请传入topic名称"exit 1
fi
#TOPIC名称
TOPIC_NAME=$1
#KAFKA地址
KAFKA_BROKER=ip:9092
# 检查Kafka主题是否存在, 若已存在则放弃创建
if kafka-topics.sh --bootstrap-server $KAFKA_BROKER --list | grep -q "^$TOPIC_NAME$";thenlog_warn "$TOPIC_NAME 已经存在,放弃创建"
else# 默认1副本,3分区kafka-topics.sh --create --bootstrap-server $KAFKA_BROKER --replication-factor 1 --partitions 3 --topic $TOPIC_NAMElog_info "请执行topic-list检查创建是否成功"
fi
~

四.删除topic->(delete-topic)
下面代码中的路径你要替换成自己的路径
#!/bin/bashsource /home/shell/log
log_info "脚本作用:删除topic"
log_info "脚本参数: 支持多个topic,用空格分开,可以批量删除"
KAFKA_BROKER=ip:9092
function check_kafka_topic() {local local_topic_name=$1if kafka-topics.sh --bootstrap-server $KAFKA_BROKER --list | grep -q "^$local_topic_name$";thenlog_info "$local_topic_name存在->true"return 0 # 返回true elselog_warn "$local_topic_name 不存在->false"return 1 # return falsefi
}# 逐个删除topic
for topic in "$@"
doif ! check_kafka_topic $topic; thenlog_info "tpoic->$topic 不存在,跳过删除行为"continueelselog_info "topic->$topic 执行删除"kafka-topics.sh --delete --bootstrap-server $KAFKA_BROKER --topic $topiclog_info "topic->$topic 删除成功"fi
done

五.获取topic列表->(list-topic)
#!/bin/bash
source /home/shell/log
KAFKA_BROKER=ip:9092
log_info "脚本作用: 列出topic信息"
log_info "脚本参数: topic名称(可选->未指定topic则列出所有topic信息)"
if [ $# -eq 1 ]; thenlog_info "目标$1 详情如下"kafka-topics.sh --describe --bootstrap-server $KAFKA_BROKER | grep -v "__consumer_offsets" | grep $1
elselog_info "所有topic 列表如下:"kafka-topics.sh --describe --bootstrap-server $KAFKA_BROKER | grep -v "__consumer_offsets"
fi

六. 将文件数据 录入到kafka->(file-to-kafka)
#!/bin/bash
source /home/shell/log
log_info "脚本作用: 将文件中的数据录入指定topic"
log_info "脚本参数: 1.文件路劲(必选) 2.topic(必选)"
log_info "参数校验"
log_info "执行条件检查.........................................................................................................."
if [ $# -ne 2 ]; thenlog_err "必须传入两个参数: 1.文件路劲(必选) 2.topic(必选)"exit 1
fiif ! [ -f $1 ]; thenlog_err "$1不是一个有效的数据文件"exit 1
fiFILE_PATH=$1
TOPIC_NAME=$2
KAFKA_BROKER=ip:9092 #检查topic是否存在
function check_kafka_topic() {local local_KAFKA_BROKER=$1if kafka-topics.sh --bootstrap-server $KAFKA_BROKER --list | grep -q "^$local_KAFKA_BROKER$";thenreturn 0 # 返回true elsereturn 1 # return falsefi
}#将文件数据推送到kafka
function send_to_kafka(){local local_path=$1local count=0while IFS= read -r line; do kafka-console-producer.sh --broker-list $KAFKA_BROKER --topic $TOPIC_NAME <<< "$line" count=$((count+1))done < "$local_path"echo $count
} if ! check_kafka_topic $TOPIC_NAME;thenlog_err "条件检查不通过, 原因: topic->$TOPIC_NAME不存在, 请先创建topic"exit 1
filog_info "参数检查通过.........................................................................................................."
start_time=`date "+%Y-%m-%d %H:%M:%S"`
start_seconds=$(date -d "$start_time" +%s)log_info "开始录入数............................................................................................................"
count=$(send_to_kafka $FILE_PATH)end_time=`date "+%Y-%m-%d %H:%M:%S"`
end_seconds=$(date -d "$end_time" +%s)
time_diff=$((end_seconds - start_seconds)) log_info "录入条数: $count"
log_info "花费时间:$time_diff 秒"
log_info "录入完成.............................................................................................................."

七.将kafka数据 下载到文件->(kafka-to-file)
#!/bin/bash
source /home/shell/log
log_info "脚本作用: 将kafka指定topic的数据消费到指定文件中"
log_info "脚本参数:1.数据文件路径(必选) 2.topic名称(必选) 3.groupID(可选->不存在则从头消费,存在则从grooupID offset 开始消费)"
log_info "group-list 脚本可以查看当前的"
# Kafka的bin目录
KAFKA_BIN_DIR=/path/to/kafka/bin#kafka 地址
KAFKA_SERVER=ip:9092 # Kafka的配置文件目录
KAFKA_CONFIG_DIR=/home/kafka/kafka_2.12-3.2.1/config# Kafka消费者配置文件
CONSUMER_CONFIG=$KAFKA_CONFIG_DIR/consumer.properties# 指定要消费的主题
TOPIC_NAME=your_topic_name# 指定要写入的文件
FILE_PATH=$1
TOPIC_NAME=$2
GROUP_ID=$3log_info "执行检察............................................................................................................................"function check_kafka_topic() {local local_topic_name=$1if kafka-topics.sh --bootstrap-server $KAFKA_SERVER --list | grep -q "^$local_topic_name$";thenreturn 0 # 返回true elsereturn 1 # return falsefi
}if ! check_kafka_topic $TOPIC_NAME;thenlog_err "topic->$TOPIC_NAME 未找到"exit 1
fi
log_info "检查通过............................................................................................................................"log_info "当前topic,所有groupID的消费情况如下>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"
while IFS= read -r line; doif [[ $line == *"PARTITION"* ]]; thencontent="$(date '+%Y-%m-%d %H:%M:%S') [INFO] $line"echo -e "\033[45m" ${content} "\033[0m"else log_info "$line"fi
done< <(kafka-consumer-groups.sh --bootstrap-server $KAFKA_SERVER --describe --all-groups | grep -v '__consumer_offsets' | grep "$TOPIC_NAME\|PARTITION")log_info "当前topic,所有groupID的消费情况输出完成>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"log_info "消费进程运行中( CTRL+C 可退出消费 )................................................................................................."
# 运行消费者脚本并将输出重定向到文件
if [ $# -eq 2 ]; thenkafka-console-consumer.sh --bootstrap-server $KAFKA_SERVER --topic $TOPIC_NAME --from-beginning > $FILE_PATH
fi
if [ $# -eq 3 ]; thenkafka-console-consumer.sh --bootstrap-server $KAFKA_SERVER --topic $TOPIC_NAME --from-beginning --group $GROUP_ID > $FILE_PATH
fi

八. 查看topic的groupID消费情况->(list-group)
#!/bin/bash
kafka_broker=ip:9092
source /home/shell/log
log_info "脚本功能: 查看topic的groupID信息"
log_info "脚本参数: topic名称(可选->未指定topic则列出所有topic的groupID信息)"
function check_kafka_topic() {local local_topic_name=$1if kafka-topics.sh --bootstrap-server $kafka_broker --list | grep -q "^$local_topic_name$";thenlog_info "$local_topic_name存在->true"return 0 # 返回true elselog_warn "$local_topic_name 不存在->false"return 1 # return falsefi
}if [ $# -eq 1 ]; thenif ! check_kafka_topic $1; then#topic 不存在则直接退出程序log_warn "topic=$1, 不存在"exit 1filog_info "topic_name=$1 的gruoupID信息如下:"kafka-consumer-groups.sh --bootstrap-server $kafka_broker --describe --all-groups | grep $1 | grep -v __consumer_offsets
elselog_info "所有groupID信息如下:"kafka-consumer-groups.sh --bootstrap-server $kafka_broker --describe --all-groups | grep -v __consumer_offsets
fi

相关文章:
【shell-10】shell实现的各种kafka脚本
kafka-shell工具 背景日志 log一.启动kafka->(start-kafka)二.停止kafka->(stop-kafka)三.创建topic->(create-topic)四.删除topic->(delete-topic)五.获取topic列表->(list-topic)六. 将文件数据 录入到kafka->(file-to-kafka)七.将kafka数据 下载到文件-&g…...
【模型压缩】模型剪枝详解
参考链接:https://zhuanlan.zhihu.com/p/635454943 https 文章目录 1. 前言1.1 为什么要进行模型剪枝1.2 为什么可以进行模型剪枝2. 剪枝方式的几种分类2.1 结构化剪枝 和 非结构化剪枝2.1.1 结构化剪枝2.1.2 非结构化剪枝2.2 静态剪枝与动态剪枝2.2.1 静态剪枝2.2.2 动态剪枝…...
Log4j2-01-log4j2 hello world 入门使用
拓展阅读 Log4j2 系统学习 Logback 系统学习 Slf4j Slf4j-02-slf4j 与 logback 整合 SLF4j MDC-日志添加唯一标识 分布式链路追踪-05-mdc 等信息如何跨线程? Log4j2 与 logback 的实现方式 日志开源组件(一)java 注解结合 spring aop 实现自动输…...
Mysql-日志介绍 日志配置
环境部署 docker run -d -p 3306:3306 --privilegedtrue -v $(pwd)/logs:/var/lib/logs -v $(pwd)/conf:/etc/mysql/conf.d -v $(pwd)/data:/var/lib/mysql -e MYSQL_ROOT_PASSWORD654321 --name mysql mysql:5.7运行指令的目录下新建好这些文件: 日志类型 日…...
计算机网络的体系结构的各层在整个过程中起到什么作用?
ps:本文章的图片内容来源都是来自于湖科大教书匠的视频,声明:仅供自己复习,里面加上了自己的理解 这里附上视频链接地址:1.6 计算机网络体系结构(4)—专用术语_哔哩哔哩_bilibili 目录 &#x…...
如何在业务代码中优雅的使用策略模式?
策略模式介绍 假设你正在开发一个电商平台,其中涉及到商品的折扣策略。优惠策略有很多种可能,如领取优惠券抵扣、返现促销、拼团优惠等。最初的实现可能会在购物车类中嵌入各种折扣逻辑,导致代码的可维护性和扩展性下降。 下面代码在业务开…...
“docker-credential-desktop.exe“: executable file not found in $PATH 错误解决
"docker-credential-desktop.exe": executable file not found in $PATH 错误解决 1. 错误信息和解决方法 1. 错误信息和解决方法 错误信息, error getting credentials - err: exec: "docker-credential-desktop.exe": executable file not …...
openssl3.2/test/certs - 055 - all DNS-like CNs allowed by CA1, no DNS SANs
文章目录 openssl3.2/test/certs - 055 - all DNS-like CNs allowed by CA1, no DNS SANs概述笔记END openssl3.2/test/certs - 055 - all DNS-like CNs allowed by CA1, no DNS SANs 概述 openssl3.2 - 官方demo学习 - test - certs 笔记 /*! * \file D:\my_dev\my_local_…...
长虹智能电视6000iD、6080iD、3000iD、U2系列等 ZLM61HiPJ机芯升级刷机方法,附刷机数据
机芯:ZLM61HiPJ 适用机型:UD39B6000iD、UD42B6000iD、UD50B6000iD、UD55B6000iD、UD42C6000iD、UD42C6080iD、UD49C6000iD、UD49C6080iD、UD55C6000iD、UD55C6080iD、UD50C6000iD、UD58C3000iD、42U2 LE42C19S-UD、LE50C29SD-UD、 UD49C6000iD(LJM2W)、…...
六、VTK创建平面vtkPlaneSource
vtkPlaneSource创建位于平面中的四边形数组 先看看效果图: vtkPlaneSource 创建一个 m x n 个四边形数组,这些四边形在平面中排列为规则平铺。通过指定一个原点来定义平面,然后指定另外两个点,这两个点与原点一起定义平面的两个轴。这些轴不必是正交的 - 因此您可以创建平行…...
LiveGBS流媒体平台GB/T28181常见问题-如何配置使用自己已有的redis服务替换redis版本升级redis版本
LiveGBS如何配置使用自己已有的redis服务替换redis版本升级redis版本 1、Redis服务2、如何切换REDIS?2.1、停止启动REDIS2.2、配置信令服务2.3、配置流媒体服务2.4、启动 3、搭建GB28181视频直播平台 1、Redis服务 在LivGBS中Redis作为数据交换、数据订阅、数据发布的高速缓存…...
stm32产品架构
文章目录 前言一、pandas是什么?二、使用步骤 1.引入库2.读入数据 总结 前言 起因是我在看野火的ucosiii,然后他是基于i.mx芯片。然后我就很疑惑i.mx是什么芯片,看了下好像是ARM-M7(或者叫ARMCM7)架构的芯片。然后我又疑惑ARM-M7又是什么架…...
数据结构——双链表
双链表中节点类型的描述: 双链表的初始化(带头结点) 、 双链表的插入操作 后插操作 InsertNextDNode(p, s): 在p结点后插入s结点 按位序插入操作: 思路:从头结点开始,找到某个位序的前驱结点ÿ…...
Git 对文件名大小写不敏感的问题解决方案
目录 一、Git 对文件名大小写不敏感1.1 问题描述1.2 原因分析1.3 解决方案方式一:使用git命令进行修改方式二:关闭git 忽略大小写配置 (可以当前项目设置,也可以全局设置 --global) 二、新的问题(重复的目录…...
Java复习系列之阶段三:框架原理
1. Spring 1.1 核心功能 1. IOC容器 IOC,全称为控制反转(Inversion of Control),是一种软件设计原则,用于减少计算机代码之间的耦合度。控制反转的核心思想是将传统程序中对象的创建和绑定由程序代码直接控制转移到…...
【Python】01快速上手爬虫案例一:搞定豆瓣读书
文章目录 前言一、VSCodePython环境搭建二、爬虫案例一1、爬取第一页数据2、爬取所有页数据3、格式化html数据4、导出excel文件 前言 实战是最好的老师,直接案例操作,快速上手。 案例一,爬取数据,最终效果图: 一、VS…...
JavaEE 网络编程
JavaEE 网络编程 文章目录 JavaEE 网络编程引子1. 网络编程-相关概念1.1 基本概念1.2 发送端和接收端1.3 请求和响应1.4 客户端和服务端 2. Socket 套接字2.1 数据包套接字通信模型2.2 流套接字通信模型2.3 Socket编程注意事项 3. UDP数据报套接字编程3.1 DatagramSocket3.2 Da…...
5.rk3588用cv读取图片(C++)
rk3588自带了cv,不需要重新安装,执行以下操作即可: 一、读取图片 1.读取某张图片 #define HAVE_OPENCV_VIDEO #define HAVE_OPENCV_VIDEOIO#include <opencv2/opencv.hpp> #include <iostream> #include <opencv2/opencv.h…...
Github 无法正常访问?一招解决
查询IP网址: https://ip.chinaz.com/ 主页如下: 分别查询以下三个网址的IP: github.com github.global.ssl.fastly.net assets-cdn.github.com 修改 hosts 文件: 将 /etc/hosts 复制到 home 下 sudo cp /etc/hosts ./ gedit hosts 在底下…...
架构师的36项修炼-08系统的安全架构设计
本课时讲解系统的安全架构。 本节课主要讲 Web 的攻击与防护、信息的加解密与反垃圾。其中 Web 攻击方式包括 XSS 跨站点脚本攻击、SQL 注入攻击和 CSRF 跨站点请求伪造攻击;防护手段主要有消毒过滤、SQL 参数绑定、验证码和防火墙;加密手段,…...
Vim 调用外部命令学习笔记
Vim 外部命令集成完全指南 文章目录 Vim 外部命令集成完全指南核心概念理解命令语法解析语法对比 常用外部命令详解文本排序与去重文本筛选与搜索高级 grep 搜索技巧文本替换与编辑字符处理高级文本处理编程语言处理其他实用命令 范围操作示例指定行范围处理复合命令示例 实用技…...
Xshell远程连接Kali(默认 | 私钥)Note版
前言:xshell远程连接,私钥连接和常规默认连接 任务一 开启ssh服务 service ssh status //查看ssh服务状态 service ssh start //开启ssh服务 update-rc.d ssh enable //开启自启动ssh服务 任务二 修改配置文件 vi /etc/ssh/ssh_config //第一…...
LeetCode - 199. 二叉树的右视图
题目 199. 二叉树的右视图 - 力扣(LeetCode) 思路 右视图是指从树的右侧看,对于每一层,只能看到该层最右边的节点。实现思路是: 使用深度优先搜索(DFS)按照"根-右-左"的顺序遍历树记录每个节点的深度对于…...
嵌入式学习笔记DAY33(网络编程——TCP)
一、网络架构 C/S (client/server 客户端/服务器):由客户端和服务器端两个部分组成。客户端通常是用户使用的应用程序,负责提供用户界面和交互逻辑 ,接收用户输入,向服务器发送请求,并展示服务…...
【无标题】路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论
路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论 一、传统路径模型的根本缺陷 在经典正方形路径问题中(图1): mermaid graph LR A((A)) --- B((B)) B --- C((C)) C --- D((D)) D --- A A -.- C[无直接路径] B -…...
基于IDIG-GAN的小样本电机轴承故障诊断
目录 🔍 核心问题 一、IDIG-GAN模型原理 1. 整体架构 2. 核心创新点 (1) 梯度归一化(Gradient Normalization) (2) 判别器梯度间隙正则化(Discriminator Gradient Gap Regularization) (3) 自注意力机制(Self-Attention) 3. 完整损失函数 二…...
NPOI操作EXCEL文件 ——CAD C# 二次开发
缺点:dll.版本容易加载错误。CAD加载插件时,没有加载所有类库。插件运行过程中用到某个类库,会从CAD的安装目录找,找不到就报错了。 【方案2】让CAD在加载过程中把类库加载到内存 【方案3】是发现缺少了哪个库,就用插件程序加载进…...
解决:Android studio 编译后报错\app\src\main\cpp\CMakeLists.txt‘ to exist
现象: android studio报错: [CXX1409] D:\GitLab\xxxxx\app.cxx\Debug\3f3w4y1i\arm64-v8a\android_gradle_build.json : expected buildFiles file ‘D:\GitLab\xxxxx\app\src\main\cpp\CMakeLists.txt’ to exist 解决: 不要动CMakeLists.…...
MySQL 主从同步异常处理
阅读原文:https://www.xiaozaoshu.top/articles/mysql-m-s-update-pk MySQL 做双主,遇到的这个错误: Could not execute Update_rows event on table ... Error_code: 1032是 MySQL 主从复制时的经典错误之一,通常表示ÿ…...
阿里云Ubuntu 22.04 64位搭建Flask流程(亲测)
cd /home 进入home盘 安装虚拟环境: 1、安装virtualenv pip install virtualenv 2.创建新的虚拟环境: virtualenv myenv 3、激活虚拟环境(激活环境可以在当前环境下安装包) source myenv/bin/activate 此时,终端…...
