Kafka 的消息格式:了解消息结构与序列化
Kafka 作为一款高性能的消息中间件系统,其消息格式对于消息的生产、传输和消费起着至关重要的作用。本篇博客将深入讨论 Kafka 的消息格式,包括消息的结构、序列化与反序列化,以及一些常用的消息格式选项。通过更丰富的示例代码和深入的解析,希望能够帮助大家更好地理解 Kafka 消息的内部机制。
1. Kafka 消息结构
Kafka 的消息结构由消息头、消息键、消息值和时间戳等组成。下面是一个典型的 Kafka 消息结构:
----------------------------------------------------------------------------------------------
| Message Header | Key | Value | Timestamp | Optional Headers |
----------------------------------------------------------------------------------------------
1.1 消息头
消息头包含一些元数据信息,例如消息的大小、压缩信息等。消息头的结构可能会根据 Kafka 版本和配置而有所不同。
1.2 消息键与消息值
-
消息键(Key): 用于标识消息的唯一性,通常用于分区和查找消息。
-
消息值(Value): 包含实际的消息内容。
1.3 时间戳
时间戳表示消息的产生时间,有两种类型:
-
创建时间戳: 表示消息被创建的时间。
-
LogAppendTime 时间戳: 表示消息被追加到日志的时间。
2. 消息的序列化与反序列化
Kafka 中的消息在生产者发送和消费者接收时需要进行序列化和反序列化。这是因为 Kafka 是以字节流的形式存储和传输消息的,而实际的消息内容可能是各种不同的数据类型。以下是一些常用的序列化器和反序列化器:
2.1 字符串序列化器
// 生产者端
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "Hello, Kafka!");// 消费者端
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {System.out.printf("Consumed record with key %s and value %s%n", record.key(), record.value());
});
2.2 Avro 序列化器
Avro 是一种高性能且紧凑的二进制序列化格式,适用于复杂数据结构的消息。
// 生产者端
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("field1", "value1");
avroRecord.put("field2", 42);
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("my-topic", "key", avroRecord);// 消费者端
ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {GenericRecord value = record.value();System.out.printf("Consumed record with key %s and value %s%n", record.key(), value);
});
2.3 JSON 序列化器
// 生产者端
JsonNode jsonNode = objectMapper.createObjectNode();
((ObjectNode) jsonNode).put("field1", "value1");
((ObjectNode) jsonNode).put("field2", 42);
ProducerRecord<String, JsonNode> record = new ProducerRecord<>("my-topic", "key", jsonNode);// 消费者端
ConsumerRecords<String, JsonNode> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {JsonNode value = record.value();System.out.printf("Consumed record with key %s and value %s%n", record.key(), value);
});
3. 自定义消息格式
在某些情况下,你可能需要定义自己的消息格式。Kafka 提供了 ByteArraySerializer
和 ByteArrayDeserializer
,允许你将消息以字节数组的形式发送和接收,从而实现自定义的序列化和反序列化逻辑。
// 生产者端
byte[] customMessageBytes = serializeCustomMessage(customMessage);
ProducerRecord<String, byte[]> record = new ProducerRecord<>("my-topic", "key", customMessageBytes);// 消费者端
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {byte[] value = record.value();CustomMessage customMessage = deserializeCustomMessage(value);System.out.printf("Consumed record with key %s and value %s%n", record.key(), customMessage);
});
4. 消息的压缩与解压
Kafka 支持消息的压缩,以减小网络传输的开销。以下是一些常用的压缩选项:
// 生产者端
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
Producer<String, String> producer = new KafkaProducer<>(props);// 消费者端
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
Consumer<String, String> consumer = new KafkaConsumer<>(props);
5. 消息的版本控制与兼容性
在实际应用中,系统的演进和变化是不可避免的。因此,考虑到消息的版本控制和兼容性是非常重要的。以下是一些相关的注意事项和最佳实践:
5.1 消息的演进
-
向后兼容性: 新版本的消费者能够处理旧版本的消息。
-
向前兼容性: 旧版本的消费者能够处理新版本的消息。
5.2 Schema Registry
Schema Registry 是一个用于存储和管理 Avro、JSON 等消息格式的架构的中心化服务。通过使用 Schema Registry,可以更好地管理消息的演进,并确保向前和向后的兼容性。
// 配置 Schema Registry 地址
props.put("schema.registry.url", "http://schema-registry:8081");
6. 消息的认证与加密
Kafka 提供了安全性特性,包括消息的认证和加密。以下是一些相关的配置选项:
6.1 SSL 加密通信
// 生产者端
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");// 消费者端
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
6.2 认证配置
// 生产者端
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");// 消费者端
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");
7. 消息的追踪与监控
追踪和监控是保障系统稳定性和性能的重要手段。以下是一些常用的追踪和监控工具:
7.1 JMX 监控
Kafka 提供了 JMX 接口,可以通过 JConsole 或其他 JMX 客户端进行监控。
7.2 Kafka Manager
Kafka Manager 是一款开源的 Kafka 集群管理和监控工具,提供了直观的 Web 界面。
7.3 Prometheus 和 Grafana
使用 Prometheus 进行指标采集,结合 Grafana 进行可视化展示,可以更全面地监控 Kafka 集群的性能和健康状况。
总结
在深入探讨Kafka消息格式、版本控制、安全性和监控等关键主题后,对构建高效、灵活的消息系统有了更为全面的认识。了解消息结构、序列化与反序列化、自定义消息格式,以及消息的压缩与解压,是确保消息传递的基础。随后,版本控制与兼容性的重要性得到了强调,Schema Registry成为管理Avro、JSON等消息格式的利器。在保障消息传递安全方面,SSL加密通信和认证配置提供了可靠的手段。最后,通过JMX监控、Kafka Manager、以及Prometheus和Grafana的运用,能够实时追踪和监控Kafka集群的健康状态。
这篇文章旨在为大家提供全方位的Kafka消息系统知识,使其能够在实际应用中根据业务需求构建稳健、高效的消息处理系统。深入理解这些关键概念,将有助于确保消息系统的可维护性、稳定性和安全性,为实际业务场景中的挑战提供可行的解决方案。继续关注更多Kafka相关的技术内容,将使大家能够不断深化对消息系统的认识,应对日益复杂的数据处理需求。
相关文章:

Kafka 的消息格式:了解消息结构与序列化
Kafka 作为一款高性能的消息中间件系统,其消息格式对于消息的生产、传输和消费起着至关重要的作用。本篇博客将深入讨论 Kafka 的消息格式,包括消息的结构、序列化与反序列化,以及一些常用的消息格式选项。通过更丰富的示例代码和深入的解析&…...
装箱 Box 数据类型
装箱是最简单直接的一种智能指针,它的类型是Box<T>。装箱使我们可以把数据存储到堆上,并在栈上保留一个指向堆数据的指针。装箱操作常常被用于下面的场景: 当你拥有一个无法在编译时确定大小的类型,但又想使用这个类型的值…...

多传感器融合SLAM在自动驾驶方向的初步探索的记录
1. VIO的不可观问题 现有的VIO都是解决的六自由度的问题, 但是对于行驶在路面上的车来说, 通常情况下不会有roll与z方向的自由度, 而且车体模型限制了不可能有纯yaw的变换. 同时由于IMU在Z轴上与roll, pitch上激励不足, 会导致IMU在初始化过程中尺度不准以及重力方向估计错误,…...
ffmpeg与opencv-python处理视频
安装 opencv pip install opencv-pythonFFmpeg 1.下载 FFmpeg 访问FFmpeg官方网站。选择 “Windows builds from gyan.dev” 链接,这会带您到一个包含最新版本 FFmpeg Windows 构建的页面。选择一个适合您系统的版本(例如,32位或64位&…...
java 操作git
实现功能:借助jgit实现拉取文件,并返回文件路径清单 <!-- 依赖库 版本号有自行选择,只是需要注意支持的jdk版本即可,我使用的是jdk1.8--> <dependency><groupId>org.eclipse.jgit</groupId><artif…...
Linux 导入、导出 MySQL 数据库命令
一、导出数据库 1、导出完整数据:表结构数据 mysqldump -u用户名 -p 数据库名 > 数据库名.sql 举例:以下命令可以导出 abc 数据库的数据和表结构 /usr/local/mysql/bin/mysqldump -uroot -p abc > abc.sql2、只导出表结构 mysqldump -u用户名 -p…...

华为数通---BFD多跳检测示例
定义 双向转发检测BFD(Bidirectional Forwarding Detection)是一种全网统一的检测机制,用于快速检测、监控网络中链路或者IP路由的转发连通状况。 目的 为了减小设备故障对业务的影响,提高网络的可靠性,网络设备需要…...

AWS 日志分析工具
当您的网络资源托管在 AWS 中时,需要定期监控您的 AWS CloudTrail 日志、Amazon S3 服务器日志和 AWS ELB 日志等云日志,以降低任何潜在的安全风险、识别严重错误并确保满足所有合规性法规。 什么是 Amazon S3 Amazon Simple Storage Serviceÿ…...

gitLab 和Idea分支合并
以下二选1即可完成分支合并建议第一种简单有效 Idea合并方式 切换到被合并的分支,如我想把0701的内容合并到dev,切换到dev分支,然后再点击merge然后选择要合并的分支,即可,此时git上的代码没有更新只是把代码合到本地需要pull才…...

关于 mapboxgl 的常用方法及效果
给地图标记点 实现效果 /*** 在地图上添加标记点* point: [lng, lat]* color: #83f7a0*/addMarkerOnMap(point, color #83f7a0) {const marker new mapboxgl.Marker({draggable: false,color: color,}).setLngLat(point).addTo(this.map);this.markersList.push(marker);},…...
C语言——二级指针
指针变量也是变量,是变量就有地址,那么指针变量的地址存放在哪里?——这就是二期指针 int a 10;int *pa &a;int **ppa &pa;//a的地址存放在pa中,pa的地址存放在ppa中。 //pa是一级指针,ppa是二级指针。 对…...

股市复苏中的明懿金汇:抓住新机遇
2023年对于明懿金汇来说是充满挑战与机遇的一年。面对复杂多变的市场环境,明懿金汇展现了其对市场趋势的敏锐洞察和卓越的策略适应能力。以下是该公司在2023年的主要投资策略和市场适应方式的详细分析。 随着2023年中国股市迎来反弹,明懿金汇迅速调整了…...
Spacemesh、Kaspa和Chia的全面对比!
当今区块链领域,PoST(Proof of Space and Time)共识算法引领着一股新的技术浪潮。在这个热潮下,Chia项目作为PoST共识机制的经典项目,和目前算力赛道备受瞩目的Kaspa项目,都是不可忽视的存在。虽然这两个项…...

【HTML语法】
HTML语法 1. HTML语法1.1 HTML编辑器1.2 HTML模板1.3 标签示例1.4 常见的HTML标签1.51.61.71.81.91.101.11 学习网站:https://www.runoob.com/html/html-tutorial.html 1. HTML语法 HTML(全称 Hypertext Markup Language,超文本标记语言&…...

ROS报错:RLException:Invalid roslaunch XML Syntax: mismatched tag:
运行roslaunch文件提示: RLException:Invalid roslaunch XML Syntax: mismatched tag: line 45, column 2 The traceback for the exception was written to the log file. j 解决办法: line45 行多了标签:</node> 另外…...

C语言实现快速排序
完整代码: #include<stdio.h>//用第一个元素将待排序序列划分成左右两个部分,返回排序后low的位置,即枢轴的位置 int partition(int arr[],int low,int high){//让待排序序列中的第一个元素成为基准int pivotarr[low];//lowhigh代表一…...
ChatGPT对于当今的社会或科技发展有何重要性?
ChatGPT对于当今社会和科技发展的重要性在于: 促进社交交流:ChatGPT可以为人们提供全天候的在线聊天服务,连接人与人之间的沟通交流,改善社交沟通方式。 提高有效性和效率:人们可以通过ChatGPT获得快速和精确的信息&a…...
宝塔是可以切换mongodb版本的
在软件商店,搜索monggodb,点击设置。点击第三个标签版本切换即可。但是前提要删除所有非系统数据库。 删除数据库方法: 要在 MongoDB 中删除一个数据库,可以使用 dropDatabase() 命令。请注意,在执行此操作之前&#x…...

16、XSS——会话管理
文章目录 一、web会话管理概述1.1 会话管理1.2 为什么需要会话管理?1.3 常见的web应用会话管理的方式 二、会话管理方式2.1 基于server端的session的管理方式2.2 cookie-based的管理方式2.3 token-based的管理方式 三、安全问题 一、web会话管理概述 1.1 会话管理 …...
稀疏矩阵的操作(数据结构实训)
题目: 标准输入输出 题目描述: 稀疏矩阵可以采用三元组存储。 输入: 输入包含若干个测试用例,每个测试用例的第一行为两个正整数m,n(1<m,n<100),表示矩阵的行数和列数,接下来m行,每行n个整数,表示稀疏…...

19c补丁后oracle属主变化,导致不能识别磁盘组
补丁后服务器重启,数据库再次无法启动 ORA01017: invalid username/password; logon denied Oracle 19c 在打上 19.23 或以上补丁版本后,存在与用户组权限相关的问题。具体表现为,Oracle 实例的运行用户(oracle)和集…...

调用支付宝接口响应40004 SYSTEM_ERROR问题排查
在对接支付宝API的时候,遇到了一些问题,记录一下排查过程。 Body:{"datadigital_fincloud_generalsaas_face_certify_initialize_response":{"msg":"Business Failed","code":"40004","sub_msg…...

K8S认证|CKS题库+答案| 11. AppArmor
目录 11. AppArmor 免费获取并激活 CKA_v1.31_模拟系统 题目 开始操作: 1)、切换集群 2)、切换节点 3)、切换到 apparmor 的目录 4)、执行 apparmor 策略模块 5)、修改 pod 文件 6)、…...

shell脚本--常见案例
1、自动备份文件或目录 2、批量重命名文件 3、查找并删除指定名称的文件: 4、批量删除文件 5、查找并替换文件内容 6、批量创建文件 7、创建文件夹并移动文件 8、在文件夹中查找文件...
在rocky linux 9.5上在线安装 docker
前面是指南,后面是日志 sudo dnf config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo sudo dnf install docker-ce docker-ce-cli containerd.io -y docker version sudo systemctl start docker sudo systemctl status docker …...
深入浅出:JavaScript 中的 `window.crypto.getRandomValues()` 方法
深入浅出:JavaScript 中的 window.crypto.getRandomValues() 方法 在现代 Web 开发中,随机数的生成看似简单,却隐藏着许多玄机。无论是生成密码、加密密钥,还是创建安全令牌,随机数的质量直接关系到系统的安全性。Jav…...
三体问题详解
从物理学角度,三体问题之所以不稳定,是因为三个天体在万有引力作用下相互作用,形成一个非线性耦合系统。我们可以从牛顿经典力学出发,列出具体的运动方程,并说明为何这个系统本质上是混沌的,无法得到一般解…...

【开发技术】.Net使用FFmpeg视频特定帧上绘制内容
目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法,当前调用一个医疗行业的AI识别算法后返回…...
Java线上CPU飙高问题排查全指南
一、引言 在Java应用的线上运行环境中,CPU飙高是一个常见且棘手的性能问题。当系统出现CPU飙高时,通常会导致应用响应缓慢,甚至服务不可用,严重影响用户体验和业务运行。因此,掌握一套科学有效的CPU飙高问题排查方法&…...
Python网页自动化Selenium中文文档
1. 安装 1.1. 安装 Selenium Python bindings 提供了一个简单的API,让你使用Selenium WebDriver来编写功能/校验测试。 通过Selenium Python的API,你可以非常直观的使用Selenium WebDriver的所有功能。 Selenium Python bindings 使用非常简洁方便的A…...