【大数据学习 | kafka】kafka的数据存储结构

以上是kafka的数据的存储方式。
这些数据可以在服务器集群上对应的文件夹中查看到。
[hexuan@hadoop106 __consumer_offsets-0]$ ll
总用量 8
-rw-rw-r--. 1 hexuan hexuan 10485760 10月 28 22:21 00000000000000000000.index
-rw-rw-r--. 1 hexuan hexuan 0 10月 28 22:21 00000000000000000000.log
-rw-rw-r--. 1 hexuan hexuan 10485756 10月 28 22:21 00000000000000000000.timeindex
-rw-rw-r--. 1 hexuan hexuan 8 10月 28 22:21 leader-epoch-checkpoint
-rw-rw-r--. 1 hexuan hexuan 43 10月 28 22:21 partition.metadata
每个文件夹以topic+partition进行命名,更加便于管理和查询检索,因为kafka的数据都是按照条进行处理和流动的一般都是给流式应用做数据供给和缓冲,所以检索速度必须要快,分块管理是最好的方式。
消费者在检索相应数据的时候会非常的简单。
consumer检索数据的过程。
首先文件的存储是分段的,那么文件的名称代表的就是这个文件中存储的数据范围和条数。
00000000000000000000.index
00000000000000000000.log
00000000000000000000.timeindex
代表存储的数据是从0条开始的00000000000000100000.index
00000000000000100000.log
00000000000000100000.timeindex
代表存储的数据是从100000条开始的
所以首先检索数据的时候就可以跳过1G为大小的块,比如检索888这条数据的,就可以直接去00000000000000000000.log中查询数据
那么查询数据还是需要在1G大小的内容中找寻是比较麻烦的,这个时候可以从index索引出发去检索,首先我们可以通过kafka提供的工具类去查看log和index中的内容
# 首先创建一个topic_bkafka-topics.sh --bootstrap-server hadoop106:9092 --create --topic topic_b --partitions 5 --replication-factor 2
# 然后通过代码随机向不同的分区中分发不同的数据1W条
package com.hainiu.kafka.consumer;/*** ClassName : test1* Package : com.hainiu.kafka.consumer* Description** @Author HeXua* @Create 2024/11/3 22:45* Version 1.0*/
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class test1 {public static void main(String[] args) throws InterruptedException {Properties pro = new Properties();pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());pro.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);pro.put(ProducerConfig.LINGER_MS_CONFIG, 100);pro.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024*1024*64);pro.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);pro.put(ProducerConfig.RETRIES_CONFIG, 3);pro.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");pro.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);for (int i = 0; i < 10000; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("topic_b", ""+i,"this is hainiu");producer.send(record);}producer.close();}
}

然后去查看log和index中的内容
# kafka查看日志和索引的命令
kafka-run-class.sh kafka.tools.DumpLogSegments --files xxx
查看日志.log
[hexuan@hadoop106 topic_b-0]$ kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log
Dumping 00000000000000000000.log
Log starting offset: 0
baseOffset: 0 lastOffset: 605 count: 606 baseSequence: 0 lastSequence: 605 producerId: 11 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1730645208553 size: 5149 magic: 2 compresscodec: snappy crc: 595601909 isvalid: true
baseOffset: 606 lastOffset: 1205 count: 600 baseSequence: 606 lastSequence: 1205 producerId: 11 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 5149 CreateTime: 1730645208577 size: 4929 magic: 2 compresscodec: snappy crc: 1974998903 isvalid: true
baseOffset: 1206 lastOffset: 1439 count: 234 baseSequence: 1206 lastSequence: 1439 producerId: 11 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 10078 CreateTime: 1730645208584 size: 2085 magic: 2 compresscodec: snappy crc: 1665550202 isvalid: true
查看索引.index
内容即:
index索引
| offset 第几条 | position 物理偏移量位置,也就是第几个字 |
|---|---|
| 1187 | 5275 |
| 1767 | 10140 |
| 2022 | 15097 |
log日志
# 打印日志内容的命令 --print-data-log 打印数据
kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log --print-data-log
Dumping 00000000000000000000.log
Log starting offset: 0
baseOffset: 0 lastOffset: 605 count: 606 baseSequence: 0 lastSequence: 605 producerId: 11 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1730645208553 size: 5149 magic: 2 compresscodec: snappy crc: 595601909 isvalid: true
| offset: 0 CreateTime: 1730645208524 keySize: 2 valueSize: 14 sequence: 0 headerKeys: [] key: 14 payload: this is hainiu
| offset: 1 CreateTime: 1730645208524 keySize: 2 valueSize: 14 sequence: 1 headerKeys: [] key: 19 payload: this is hainiu
| offset: 2 CreateTime: 1730645208524 keySize: 2 valueSize: 14 sequence: 2 headerKeys: [] key: 24 payload: this is hainiu
| offset: 3 CreateTime: 1730645208524 keySize: 2 valueSize: 14 sequence: 3 headerKeys: [] key: 26 payload: this is hainiu
可以看到刷写的日志
baseOffset: 0 lastOffset: 605 count: 606
从0 到605 条一次性刷写606条
lastSequence: 605 producerId
刷写事务日志编号,生产者的编号

通过名称跳过1G的端,然后找到相应的index的偏移量,然后根据偏移量定位log位置,不断向下找寻数据。
大家可以看到index中的索引数据是轻量稀疏的,这个数据是按照4KB为大小生成的,一旦刷写4KB大小的数据就会写出相应的文件索引。

官网给出的默认值4KB

一个数据段大小是1G
timeIndex
我们看到在数据中还包含一个timeindex的时间索引
# 查询时间索引
kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.timeindex
[hexuan@hadoop106 topic_b-0]$ kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.timeindex
Dumping 00000000000000000000.timeindex
timestamp: 1730645208577 offset: 1205
timestamp: 1730645208584 offset: 1439
可以看到和index索引一样,这个也是4Kb写出一部分数据,但是写出的是时间,我们可以根据时间进行断点找寻数据,指定时间重复计算
也就是说,写到磁盘的数据是按照1G分为一个整体部分的,但是这个整体部分需要4KB写一次,并且一次会生成一个索引问题信息,在检索的时候可以通过稀疏索引进行数据的检索,效率更快。
相关文章:
【大数据学习 | kafka】kafka的数据存储结构
以上是kafka的数据的存储方式。 这些数据可以在服务器集群上对应的文件夹中查看到。 [hexuanhadoop106 __consumer_offsets-0]$ ll 总用量 8 -rw-rw-r--. 1 hexuan hexuan 10485760 10月 28 22:21 00000000000000000000.index -rw-rw-r--. 1 hexuan hexuan 0 10月 28 …...
知识竞赛答题系统,线上答题小程序链接怎么做?
随着智能手机的普及,越来越多的单位开始在线上开展知识竞赛。这种形式的知识竞赛不仅易于操作,而且参与度更高。那么线上知识竞赛答题系统怎么做呢?自己可以做吗?答案是可以的!借助微信答题系统制作平台风传吧…...
基于SSM的社区物业管理系统+LW参考示例
1.项目介绍 系统角色:管理员、业主(普通用户)功能模块:管理员(用户管理、二手置换管理、报修管理、缴费管理、公告管理)、普通用户(登录注册、二手置换、生活缴费、信息采集、报事报修…...
android——jetpack startup初始化框架
一、jetpack startup Android Jetpack Startup是一个库,它简化了Android应用启动过程,尤其是对于那些需要处理复杂数据绑定和初始化逻辑的应用。它的核心在于提供了一个StartupComponent,用于声明应用的初始化逻辑,这个逻辑会在首…...
英伟达HOVER——用于人形机器人的多功能全身控制器:整合不同的控制模式且实现彼此之间的无缝切换
前言 前几天,一在长沙的朋友李总发我一个英伟达HOVER的视频(自从我今年年初以来持续不断的解读各大顶级实验室的最前沿paper、以及分享我司七月在具身领域的探索与落地后,影响力便越来越大了,不断加油 ),该视频说的有点玄乎&…...
GEE代码学习 day17
13.2 地球上到处都有许多图像吗? 我们可以使用下面的代码将这个 reducer count 应用于我们过滤后的 ImageCollection。我们将返回相同的数据集并筛选 2020 年,但没有地理限制。这将收集来自世界各地的图像,然后计算每个像素中的图像数量。以…...
论文阅读笔记-Covariate Shift: A Review and Analysis on Classifiers
前言 标题:Covariate Shift: A Review and Analysis on Classifiers 原文链接:Link\ 我们都知道在机器学习模型中,训练数据和测试数据是不同的阶段,并且,通常是是假定训练数据和测试数据点遵循相同的分布。但是实际上&…...
基于SSM+VUE守护萌宠宠物网站JAVA|VUE|Springboot计算机毕业设计源代码+数据库+LW文档+开题报告+答辩稿+部署教+代码讲解
源代码数据库LW文档(1万字以上)开题报告答辩稿 部署教程代码讲解代码时间修改教程 一、开发工具、运行环境、开发技术 开发工具 1、操作系统:Window操作系统 2、开发工具:IntelliJ IDEA或者Eclipse 3、数据库存储:…...
【在Linux世界中追寻伟大的One Piece】Socket编程TCP
目录 1 -> TCP socket API 2 -> V1 -Echo Server 2.1 -> 测试多个连接的情况 1 -> TCP socket API socket(): socket()打开一个网络通讯端口,如果成功的话,就像open()一样返回一个文件描述符。应用程序可以像读写文件一样用r…...
进入半导体行业需要具备哪些能力?
要进入半导体公司,尤其是从事工艺流程设计和制程优化的岗位,需要具备一定的跨学科背景。 以某公司招聘要求为例: **公司 招聘岗位:工艺工程师 该公司是一家从事半导体设备、工艺与材料研发、生产和销售的公司,面向…...
Nature重磅:AI化学家再升级!大幅提升实验效率,推动化学合成进入“智能化”新阶段
人工智能(AI)驱动的机器人,正在我们的生活中扮演着越来越重要的角色,而在化学合成实验室内,它们也在悄然改变着传统实验方式。 如今,科学家们在智能化学领域取得了新突破—— 来自英国利物浦大学的研究团…...
源代码泄漏怎么办?SDC沙盒成为破局利器
在数字化时代,源代码安全已成为企业关注的焦点。源代码的泄露不仅可能导致知识产权的损失,还可能被竞争对手利用,给企业带来巨大的经济损失和法律风险。因此,采取有效的源代码防泄漏措施至关重要。深信达的SDC沙盒防泄密软件&…...
【论文复现】基于图卷积网络的轻量化推荐模型
本文所涉及所有资源均在这里可获取。 📕作者简介:热爱跑步的恒川,致力于C/C、Java、Python等多编程语言,热爱跑步,喜爱音乐、摄影的一位博主。 📗本文收录于论文复现系列,大家有兴趣的可以看一看…...
使用ssh-key免密登录服务器或免密连接git代码仓库网站
ssh登录服务器场景 假设有两台机器,分别是: 源机器:主机A(hostA),ip:198.168.0.1 目标机器:主机B(hostB),ip:192.168.0.2 ssh-key免…...
自由学习记录(19)
unity核心也算是看完了吧,但觉得的确是少了点东西,之后再看mvc框架,和网络开发,,感觉有必要想想主次顺序了,毕竟在明年的3月之前尽量让自己更有贴合需求的能力 先了解一些相关概念,不用看懂&am…...
Elasticsearch中时间字段格式用法详解
Elasticsearch中时间字段格式用法详解 攻城狮Jozz关注IP属地: 北京 2024.03.18 16:27:51字数 758阅读 2,571 Elasticsearch(简称ES)是一个基于Lucene构建的开源、分布式、RESTful搜索引擎。它提供了全文搜索、结构化搜索以及分析等功能,广泛…...
蓝桥杯-网络安全比赛题目-遗漏的压缩包
小蓝同学给你发来了他自己开发的网站链接, 他说他故意留下了一个压缩包文件,里面有网站的源代码, 他想考验一下你的网络安全技能。 (点击“下发赛题”后,你将得到一个http链接。如果该链接自动跳转到https,…...
ES海量数据插入如何优化性能?
2024年10月NJSD技术盛典暨第十届NJSD软件开发者大会、第八届IAS互联网架构大会在南京召开。百度文心快码总经理臧志分享了《AI原生研发新范式的实践与思考》,探讨了大模型赋能下的研发变革及如何在公司和行业中落地,AI原生研发新范式的内涵和推动经验。 …...
遥控救生圈,水上应急救援的新革命_鼎跃安全
水上事故发生后,时间就是生命。每一秒钟的延误,都可能增加溺水者失去生命的风险。传统的救援方式往往依赖人工迅速反应,但在大规模的紧急事件中,人工救援速度难以满足需求。而遥控救生圈的出现改变了这一切,它的作用在…...
【flask开启进程,前端内容图片化并转pdf-会议签到补充】
flask开启进程,前端内容图片化并转pdf-会议签到补充 flask及flask-socketio开启threading页面内容转图片转pdf流程前端主js代码内容转图片-browser端browser端的同步编程flask的主要功能route,def 总结 用到了pdf,来回数据转发和合成,担心flask卡顿,响应差,于是刚好看到threadi…...
网络编程(Modbus进阶)
思维导图 Modbus RTU(先学一点理论) 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议,由 Modicon 公司(现施耐德电气)于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…...
ESP32 I2S音频总线学习笔记(四): INMP441采集音频并实时播放
简介 前面两期文章我们介绍了I2S的读取和写入,一个是通过INMP441麦克风模块采集音频,一个是通过PCM5102A模块播放音频,那如果我们将两者结合起来,将麦克风采集到的音频通过PCM5102A播放,是不是就可以做一个扩音器了呢…...
镜像里切换为普通用户
如果你登录远程虚拟机默认就是 root 用户,但你不希望用 root 权限运行 ns-3(这是对的,ns3 工具会拒绝 root),你可以按以下方法创建一个 非 root 用户账号 并切换到它运行 ns-3。 一次性解决方案:创建非 roo…...
CocosCreator 之 JavaScript/TypeScript和Java的相互交互
引擎版本: 3.8.1 语言: JavaScript/TypeScript、C、Java 环境:Window 参考:Java原生反射机制 您好,我是鹤九日! 回顾 在上篇文章中:CocosCreator Android项目接入UnityAds 广告SDK。 我们简单讲…...
ElasticSearch搜索引擎之倒排索引及其底层算法
文章目录 一、搜索引擎1、什么是搜索引擎?2、搜索引擎的分类3、常用的搜索引擎4、搜索引擎的特点二、倒排索引1、简介2、为什么倒排索引不用B+树1.创建时间长,文件大。2.其次,树深,IO次数可怕。3.索引可能会失效。4.精准度差。三. 倒排索引四、算法1、Term Index的算法2、 …...
【JavaSE】绘图与事件入门学习笔记
-Java绘图坐标体系 坐标体系-介绍 坐标原点位于左上角,以像素为单位。 在Java坐标系中,第一个是x坐标,表示当前位置为水平方向,距离坐标原点x个像素;第二个是y坐标,表示当前位置为垂直方向,距离坐标原点y个像素。 坐标体系-像素 …...
如何在最短时间内提升打ctf(web)的水平?
刚刚刷完2遍 bugku 的 web 题,前来答题。 每个人对刷题理解是不同,有的人是看了writeup就等于刷了,有的人是收藏了writeup就等于刷了,有的人是跟着writeup做了一遍就等于刷了,还有的人是独立思考做了一遍就等于刷了。…...
蓝桥杯3498 01串的熵
问题描述 对于一个长度为 23333333的 01 串, 如果其信息熵为 11625907.5798, 且 0 出现次数比 1 少, 那么这个 01 串中 0 出现了多少次? #include<iostream> #include<cmath> using namespace std;int n 23333333;int main() {//枚举 0 出现的次数//因…...
sipsak:SIP瑞士军刀!全参数详细教程!Kali Linux教程!
简介 sipsak 是一个面向会话初始协议 (SIP) 应用程序开发人员和管理员的小型命令行工具。它可以用于对 SIP 应用程序和设备进行一些简单的测试。 sipsak 是一款 SIP 压力和诊断实用程序。它通过 sip-uri 向服务器发送 SIP 请求,并检查收到的响应。它以以下模式之一…...
【C++进阶篇】智能指针
C内存管理终极指南:智能指针从入门到源码剖析 一. 智能指针1.1 auto_ptr1.2 unique_ptr1.3 shared_ptr1.4 make_shared 二. 原理三. shared_ptr循环引用问题三. 线程安全问题四. 内存泄漏4.1 什么是内存泄漏4.2 危害4.3 避免内存泄漏 五. 最后 一. 智能指针 智能指…...
