当前位置: 首页 > news >正文

KafkaQ - 好用的 Kafka Linux 命令行可视化工具

软件效果前瞻 ~

鉴于并没有在网上找到比较好的linux平台的kafka可视化工具,今天为大家介绍一下自己开发的在 Linux 平台上使用的可视化工具KafkaQ

虽然简陋,主要可以实现下面的这些功能:

1)查看当前topic的分片数量和副本数量

2)查看当前topic下面每个分片的最大offset

3)查看当前topic某个分片下面指定offset范围的数据

4)搜索当前topic指定关键词的message

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

KafkaQ分为普通版本和搜索版本:

* 普通版本支持上述3种查询

* 搜索版本支持上述3种查询之外,增加关键词搜索,即在分片中搜索指定关键词的message

一、普通版 KafkaQ.sh

使用方法:

Usage: KafkaQ.sh --topic<topic> [--partition<partition>] [--offset<offset>] [--limit<limit>]--topic 话题名称
--partition 分片索引(可选)
--offset 从第k个offset开始检索(可选)
--limit 从第k个offset开始检索X条结果(可选)

显示的效果如下,十分简洁,分片数据里面左边一列是消息入库的时间,右边是message内容:

KafkaQ 源码如下:

#!/bin/bash# 默认值
PARTITION=${2:-0}
OFFSET=${3:-0}
LIMIT=${4:-0}# 检查参数
if [ -z "$1" ]; thenecho "Usage: $0 --topic<topic> [--partition<partition>] [--offset<offset>] [--limit<limit>]"exit 1
fiTOPIC="$1"# 检查Kafka命令是否存在
if ! command -v /usr/local/kafka/bin/kafka-topics.sh >/dev/null 2>&1; thenecho "Kafka not found at /usr/local/kafka/bin/"exit 1
fi# 获取Topic信息
echo -e "\033[0;31m* 话题: $TOPIC\033[0m"# 获取分区数和副本数
PARTITION_INFO=$(/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic "$TOPIC")
PARTITION_COUNT=$(echo "$PARTITION_INFO" | awk '/Partition:/ {print $2}' | wc -l)
REPLICA_COUNT=$(echo "$PARTITION_INFO" | grep -oP 'ReplicationFactor: \K\d+')echo "* 分片: $PARTITION_COUNT, 副本: $REPLICA_COUNT"# 获取分片a和分片b的最大偏移量
MAX_OFFSET=$(/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic "$TOPIC"  |  awk -F: '{ printf "  分片: %s,MaxOffset: %s\n", $2, $3 }')
echo "$MAX_OFFSET"# 获取分片数据
if [ "$LIMIT" -gt 0 ]; thenecho -e "\033[0;33mFetching messages from partition $PARTITION with offset $OFFSET and limit $LIMIT ...\033[0m"MESSAGES=$(/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic "$TOPIC" --partition "$PARTITION" --offset "$OFFSET" --max-messages "$LIMIT" --property print.key=true --property print.value=true --property print.timestamp=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer)# 格式化输出消息echo "$MESSAGES" | awk -F'\t' 'BEGIN {print "* 分片数据:"}{if ($3 != "null") {timestamp = substr($1, 12) / 1000 # 从第10个字符开始提取时间戳,并除以1000以转换为秒级时间戳value = $3printf "\033[0;33m%s\033[0m %s\n", strftime("%Y-%m-%d %H:%M:%S", timestamp), value}}'
fi

二、搜索版 KafkaQ-Search.sh

使用方法:

Usage: KafkaQ-Search.sh --topic<topic> [--partition<partition>] [--offset<offset>] [--limit<limit>] [--search<keyword>]--topic 话题名称
--partition 分片索引(可选)
--offset 从第k个offset开始检索(可选)
--limit 从第k个offset开始检索X条结果(可选)
--search 搜索字符串

示例(所有参数是必选的哦):

sh KafkaQ-Search.sh --topic log --partition 0 --offset 0 --limit 18480 --search '9fea9c52-c0fe-4429-81e1-d045f35f9be9'

显示效果如下:

KafkaQ-Search.sh 源码如下:

#!/bin/bash# 默认值
PARTITION=${2:-0}
OFFSET=${3:-0}
LIMIT=${4:-0}
SEARCH=${5:-""}# 检查参数
if [ -z "$1" ]; thenecho "Usage: $0 --topic<topic> [--partition<partition>] [--offset<offset>] [--limit<limit>] [--search<keyword>]"exit 1
fiwhile [[ $# -gt 0 ]]; docase "$1" in--topic)TOPIC="$2"shift 2;;--partition)PARTITION="$2"shift 2;;--offset)OFFSET="$2"shift 2;;--limit)LIMIT="$2"shift 2;;--search)SEARCH="$2"shift 2;;*)echo "Unknown parameter: $1"exit 1;;esac
done# 检查Kafka命令是否存在
if ! command -v /usr/local/kafka/bin/kafka-topics.sh >/dev/null 2>&1; thenecho "Kafka not found at /usr/local/kafka/bin/"exit 1
fi# 获取Topic信息
echo -e "\033[0;31m* 话题: $TOPIC\033[0m"# 获取分区数和副本数
PARTITION_INFO=$(/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic "$TOPIC")
PARTITION_COUNT=$(echo "$PARTITION_INFO" | awk '/Partition:/ {print $2}' | wc -l)
REPLICA_COUNT=$(echo "$PARTITION_INFO" | grep -oP 'ReplicationFactor: \K\d+')echo "* 分片: $PARTITION_COUNT, 副本: $REPLICA_COUNT"# 获取分片a和分片b的最大偏移量
MAX_OFFSET=$(/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic "$TOPIC"  |  awk -F: '{ printf "  分片: %s,MaxOffset: %s\n", $2, $3 }')
echo "$MAX_OFFSET"# 获取分片数据
if [ "$LIMIT" -gt 0 ]; thenecho -e "\033[0;33mFetching messages from partition $PARTITION with offset $OFFSET and limit $LIMIT ...\033[0m"MESSAGES=$(/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic "$TOPIC" --partition "$PARTITION" --offset "$OFFSET" --max-messages "$LIMIT" --property print.key=true --property print.value=true --property print.timestamp=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer)# 搜索关键词并输出结果if [[ ! -z $SEARCH ]]; thenecho -e "\033[0;32m* 搜索条件:$SEARCH\033[0m"echo "  搜索结果:"echo "$MESSAGES" | grep --color=never "$SEARCH" | awk -F'\t' '{timestamp = substr($1, 12) / 1000 # 从第12个字符开始提取时间戳,并除以1000以转换为秒级时间戳value = $3printf "\033[0;33m%s\033[0m %s\n", strftime("%Y-%m-%d %H:%M:%S", timestamp), value}'fi
fi

 * (附注)参考的shell如下

1、获取kafka的topic 分区数量

/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic <topic>

2、获取kafka每个分片最大的offset

/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic <topic>

3、获取kafka分片指定offset范围的具体信息

/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <topic> --partition <partition> --offset <offset> --max-messages <max-message> --property print.key=true --property print.value=true --property print.timestamp=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

相关文章:

KafkaQ - 好用的 Kafka Linux 命令行可视化工具

软件效果前瞻 ~ 鉴于并没有在网上找到比较好的linux平台的kafka可视化工具&#xff0c;今天为大家介绍一下自己开发的在 Linux 平台上使用的可视化工具KafkaQ 虽然简陋&#xff0c;主要可以实现下面的这些功能&#xff1a; 1&#xff09;查看当前topic的分片数量和副本数量 …...

不愧是字节,图像算法面试真细致

这本面试宝典是一份专为大四、研三春招和研二暑假实习生准备的珍贵资料。 涵盖了图像算法领域的核心知识和常见面试题&#xff0c;包括卷积神经网络、实例分割算法、目标检测、图像处理等多个方面。不论你是初学者还是有经验的老手&#xff0c;都能从中找到实用的内容。 通过…...

14、C++中代码重用

1、C模板的主要作用是允许编写通用代码&#xff0c;即能够在不同数据类型或数据结构上工作而无需重复编写代码。通过模板&#xff0c;可以实现代码的复用性和灵活性&#xff0c;从而提高开发效率和程序的可维护性。 typename关键字&#xff1a; 在C中&#xff0c;typename关键…...

剖析框架代码结构的系统方法(下)

当面对Dubbo、Spring Cloud、Mybatis等开源框架时,我们可以采用一定的系统性的方法来快速把握它们的代码结构。这些系统方法包括对架构演进过程、核心执行流程、基础架构组成和可扩展性设计等维度的讨论。 在上一讲中,我们已经讨论了架构演进过程和核心执行流程这两个系统方法…...

C语言学习笔记之结构体(一)

目录 什么是结构体&#xff1f; 结构体的声明 结构体变量的定义和初始化 结构体成员的访问 结构体传参 什么是结构体&#xff1f; 在现实生活中的很多事物无法用单一类型的变量就能描述清楚&#xff0c;如&#xff1a;描述一个学生&#xff0c;需要姓名&#xff0c;年龄&a…...

MATLAB入门知识

目录 原教程链接&#xff1a;数学建模清风老师《MATLAB教程新手入门篇》https://www.bilibili.com/video/BV1dN4y1Q7Kt/ 前言 历史记录 脚本文件&#xff08;.m&#xff09; Matlab帮助系统 注释 ans pi inf无穷大 -inf负无穷大 i j虚数单位 eps浮点相对精度 0/&a…...

计算机网络(5) ARP协议

什么是ARP 地址解析协议&#xff0c;即ARP&#xff08;Address Resolution Protocol&#xff09;&#xff0c;是根据IP地址获取物理地址的一个TCP/IP协议。主机发送信息时将包含目标IP地址的ARP请求广播到局域网络上的所有主机&#xff0c;并接收返回消息&#xff0c;以此确定…...

美团的 AI 面试有点简单

刷到一个美团的 AI 实习生的面试帖子&#xff0c;帖子虽然不长&#xff0c;但是把美团 AI 评测算法实习生面试的问题都po出来了。 单纯的看帖子中面试官提出的问题&#xff0c;并不是很难&#xff0c;大部分集中在考察AI项目和对AI模型的理解上&#xff0c;并没有过多的考察AI算…...

编程软件怎么给机器人编程:深入探索编程与机器人技术的融合

编程软件怎么给机器人编程&#xff1a;深入探索编程与机器人技术的融合 随着科技的飞速发展&#xff0c;机器人技术已经深入到我们生活的方方面面。而要让机器人按照我们的意愿执行任务&#xff0c;就需要借助编程软件对机器人进行编程。那么&#xff0c;编程软件究竟是如何给…...

unity2d Ugui--Image城市道路汽车行驶

目录 1.车辆生成与回收 2.路径点控制 3.车辆控制 1.车辆生成与回收 using System.Collections.Generic; using UnityEngine;public class RoadContr : MonoBehaviour {public WayPoint[] wayPoints; //出生点public Transform pare;[SerializeField]private Car[] fabCar;pu…...

【深度学习】【Prompt】使用GPT的一些提示词

f翻译论文用这个提示词&#xff1a; # 翻译规则## 翻译规则1 请在翻译这篇学术论文时&#xff0c;严格保留所有专业术语的原始英文表述&#xff0c;不要尝试将它们翻译成中文&#xff0c;而不是专业术语的部分&#xff0c;需要翻译为中文。保持所有文章引用格式和内容的完整无…...

如何在centos中和windows server中找到挖矿木马和消灭挖矿木马

在 CentOS 和 Windows Server 中查找和消灭挖矿木马涉及多个步骤&#xff0c;包括检测、清理和预防。以下是具体的步骤和命令。 在 CentOS 中查找和消灭挖矿木马 步骤 1&#xff1a;检测木马 检查异常进程&#xff1a; ps aux | grep -E miner|cryptonight|xmrig查找进程列表…...

Slice用法举例Python

Slice用法举例Python 在Python中&#xff0c;slice&#xff08;切片&#xff09;是一个强大的工具&#xff0c;用于处理序列类型的数据&#xff0c;如列表、元组、字符串等。slice提供了一种简洁而高效的方式来获取序列的子集或修改序列的某些部分。下面&#xff0c;我们将从四…...

响应式网页开发方法与实践

随着移动设备的普及和多样化&#xff0c;响应式网页开发已成为现代网页设计的主流趋势。响应式网页&#xff08;Responsive Web Design, RWD&#xff09;是一种网页设计技术&#xff0c;其核心思想是通过灵活的布局和媒体查询&#xff0c;使网页能够适应不同设备和屏幕尺寸&…...

feedparser - Python 解析Atom和RSSfeed

文章目录 一、关于 feedparser二、安装三、关于文档及构建四、测试五、常见RSS元素访问常见 Channel 元素访问常用项目元素 六、常见Atom元素访问常用feed元素访问公共入口元素 七、获取Atom元素的详细信息Feed元素的详细信息 八、测试元素是否存在九、其他功能 & 文档高级…...

ARM32开发--IIC时钟案例

知不足而奋进 望远山而前行 目录 文章目录 前言 目标 内容 需求 开发流程 移植驱动 修改I2C实现 测试功能 总结 前言 在现代嵌入式系统开发中&#xff0c;移植外设驱动并测试其功能是一项常见的任务。本次学习的目标是掌握移植方法和测试方法&#xff0c;以实现对开…...

[深度学习]基于C++和onnxruntime部署yolov10的onnx模型

基于C和ONNX Runtime部署YOLOv10的ONNX模型&#xff0c;可以遵循以下步骤&#xff1a; 准备环境&#xff1a;首先&#xff0c;确保已经下载后指定版本opencv和onnruntime的C库。 模型转换&#xff1a;按照官方源码&#xff1a;https://github.com/THU-MIG/yolov10 安装好yolov…...

Spring-事件

Java 事件/监听器编程模型 设计模式-观察者模式的拓展 可观察者对象(消息发送者) Java.util.Observalbe观察者 java.util.Observer 标准化接口(标记接口) 事件对象 java.util.EventObject事件监听器 java.util.EventListener public class ObserverDemo {public static vo…...

delmia的工序设置

process的设置需要在workcell sequuencing里面去设置...

【JavaEE精炼宝库】多线程(5)单例模式 | 指令重排序 | 阻塞队列

目录 一、单例模式&#xff1a; 1.1 饿汉模式&#xff1a; 1.2 懒汉模式&#xff1a; 1.2.1 线程安全的懒汉模式&#xff1a; 1.2.2 线程安全的懒汉模式的优化&#xff1a; 二、指令重排序 三、阻塞队列 3.1 阻塞队列的概念&#xff1a; 3.2 生产者消费者模型&#xf…...

华为云AI开发平台ModelArts

华为云ModelArts&#xff1a;重塑AI开发流程的“智能引擎”与“创新加速器”&#xff01; 在人工智能浪潮席卷全球的2025年&#xff0c;企业拥抱AI的意愿空前高涨&#xff0c;但技术门槛高、流程复杂、资源投入巨大的现实&#xff0c;却让许多创新构想止步于实验室。数据科学家…...

深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录

ASP.NET Core 是一个跨平台的开源框架&#xff0c;用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录&#xff0c;以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...

Docker 运行 Kafka 带 SASL 认证教程

Docker 运行 Kafka 带 SASL 认证教程 Docker 运行 Kafka 带 SASL 认证教程一、说明二、环境准备三、编写 Docker Compose 和 jaas文件docker-compose.yml代码说明&#xff1a;server_jaas.conf 四、启动服务五、验证服务六、连接kafka服务七、总结 Docker 运行 Kafka 带 SASL 认…...

【C++从零实现Json-Rpc框架】第六弹 —— 服务端模块划分

一、项目背景回顾 前五弹完成了Json-Rpc协议解析、请求处理、客户端调用等基础模块搭建。 本弹重点聚焦于服务端的模块划分与架构设计&#xff0c;提升代码结构的可维护性与扩展性。 二、服务端模块设计目标 高内聚低耦合&#xff1a;各模块职责清晰&#xff0c;便于独立开发…...

C++八股 —— 单例模式

文章目录 1. 基本概念2. 设计要点3. 实现方式4. 详解懒汉模式 1. 基本概念 线程安全&#xff08;Thread Safety&#xff09; 线程安全是指在多线程环境下&#xff0c;某个函数、类或代码片段能够被多个线程同时调用时&#xff0c;仍能保证数据的一致性和逻辑的正确性&#xf…...

智能分布式爬虫的数据处理流水线优化:基于深度强化学习的数据质量控制

在数字化浪潮席卷全球的今天&#xff0c;数据已成为企业和研究机构的核心资产。智能分布式爬虫作为高效的数据采集工具&#xff0c;在大规模数据获取中发挥着关键作用。然而&#xff0c;传统的数据处理流水线在面对复杂多变的网络环境和海量异构数据时&#xff0c;常出现数据质…...

OPenCV CUDA模块图像处理-----对图像执行 均值漂移滤波(Mean Shift Filtering)函数meanShiftFiltering()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 在 GPU 上对图像执行 均值漂移滤波&#xff08;Mean Shift Filtering&#xff09;&#xff0c;用于图像分割或平滑处理。 该函数将输入图像中的…...

android RelativeLayout布局

<?xml version"1.0" encoding"utf-8"?> <RelativeLayout xmlns:android"http://schemas.android.com/apk/res/android"android:layout_width"match_parent"android:layout_height"match_parent"android:gravity&…...

全面解析数据库:从基础概念到前沿应用​

在数字化时代&#xff0c;数据已成为企业和社会发展的核心资产&#xff0c;而数据库作为存储、管理和处理数据的关键工具&#xff0c;在各个领域发挥着举足轻重的作用。从电商平台的商品信息管理&#xff0c;到社交网络的用户数据存储&#xff0c;再到金融行业的交易记录处理&a…...

pgsql:还原数据库后出现重复序列导致“more than one owned sequence found“报错问题的解决

问题&#xff1a; pgsql数据库通过备份数据库文件进行还原时&#xff0c;如果表中有自增序列&#xff0c;还原后可能会出现重复的序列&#xff0c;此时若向表中插入新行时会出现“more than one owned sequence found”的报错提示。 点击菜单“其它”-》“序列”&#xff0c;…...