当前位置: 首页 > news >正文

Kafka 的消息格式:了解消息结构与序列化

Kafka 作为一款高性能的消息中间件系统,其消息格式对于消息的生产、传输和消费起着至关重要的作用。本篇博客将深入讨论 Kafka 的消息格式,包括消息的结构、序列化与反序列化,以及一些常用的消息格式选项。通过更丰富的示例代码和深入的解析,希望能够帮助大家更好地理解 Kafka 消息的内部机制。

1. Kafka 消息结构

Kafka 的消息结构由消息头、消息键、消息值和时间戳等组成。下面是一个典型的 Kafka 消息结构:

----------------------------------------------------------------------------------------------
| Message Header | Key | Value | Timestamp | Optional Headers |
----------------------------------------------------------------------------------------------

1.1 消息头

消息头包含一些元数据信息,例如消息的大小、压缩信息等。消息头的结构可能会根据 Kafka 版本和配置而有所不同。

1.2 消息键与消息值

  • 消息键(Key): 用于标识消息的唯一性,通常用于分区和查找消息。

  • 消息值(Value): 包含实际的消息内容。

1.3 时间戳

时间戳表示消息的产生时间,有两种类型:

  • 创建时间戳: 表示消息被创建的时间。

  • LogAppendTime 时间戳: 表示消息被追加到日志的时间。

2. 消息的序列化与反序列化

Kafka 中的消息在生产者发送和消费者接收时需要进行序列化和反序列化。这是因为 Kafka 是以字节流的形式存储和传输消息的,而实际的消息内容可能是各种不同的数据类型。以下是一些常用的序列化器和反序列化器:

2.1 字符串序列化器

// 生产者端
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "Hello, Kafka!");// 消费者端
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {System.out.printf("Consumed record with key %s and value %s%n", record.key(), record.value());
});

2.2 Avro 序列化器

Avro 是一种高性能且紧凑的二进制序列化格式,适用于复杂数据结构的消息。

// 生产者端
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("field1", "value1");
avroRecord.put("field2", 42);
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("my-topic", "key", avroRecord);// 消费者端
ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {GenericRecord value = record.value();System.out.printf("Consumed record with key %s and value %s%n", record.key(), value);
});

2.3 JSON 序列化器

// 生产者端
JsonNode jsonNode = objectMapper.createObjectNode();
((ObjectNode) jsonNode).put("field1", "value1");
((ObjectNode) jsonNode).put("field2", 42);
ProducerRecord<String, JsonNode> record = new ProducerRecord<>("my-topic", "key", jsonNode);// 消费者端
ConsumerRecords<String, JsonNode> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {JsonNode value = record.value();System.out.printf("Consumed record with key %s and value %s%n", record.key(), value);
});

3. 自定义消息格式

在某些情况下,你可能需要定义自己的消息格式。Kafka 提供了 ByteArraySerializerByteArrayDeserializer,允许你将消息以字节数组的形式发送和接收,从而实现自定义的序列化和反序列化逻辑。

// 生产者端
byte[] customMessageBytes = serializeCustomMessage(customMessage);
ProducerRecord<String, byte[]> record = new ProducerRecord<>("my-topic", "key", customMessageBytes);// 消费者端
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {byte[] value = record.value();CustomMessage customMessage = deserializeCustomMessage(value);System.out.printf("Consumed record with key %s and value %s%n", record.key(), customMessage);
});

4. 消息的压缩与解压

Kafka 支持消息的压缩,以减小网络传输的开销。以下是一些常用的压缩选项:

// 生产者端
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
Producer<String, String> producer = new KafkaProducer<>(props);// 消费者端
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
Consumer<String, String> consumer = new KafkaConsumer<>(props);

5. 消息的版本控制与兼容性

在实际应用中,系统的演进和变化是不可避免的。因此,考虑到消息的版本控制和兼容性是非常重要的。以下是一些相关的注意事项和最佳实践:

5.1 消息的演进

  • 向后兼容性: 新版本的消费者能够处理旧版本的消息。

  • 向前兼容性: 旧版本的消费者能够处理新版本的消息。

5.2 Schema Registry

Schema Registry 是一个用于存储和管理 Avro、JSON 等消息格式的架构的中心化服务。通过使用 Schema Registry,可以更好地管理消息的演进,并确保向前和向后的兼容性。

// 配置 Schema Registry 地址
props.put("schema.registry.url", "http://schema-registry:8081");

6. 消息的认证与加密

Kafka 提供了安全性特性,包括消息的认证和加密。以下是一些相关的配置选项:

6.1 SSL 加密通信

// 生产者端
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");// 消费者端
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");

6.2 认证配置

// 生产者端
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");// 消费者端
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");

7. 消息的追踪与监控

追踪和监控是保障系统稳定性和性能的重要手段。以下是一些常用的追踪和监控工具:

7.1 JMX 监控

Kafka 提供了 JMX 接口,可以通过 JConsole 或其他 JMX 客户端进行监控。

7.2 Kafka Manager

Kafka Manager 是一款开源的 Kafka 集群管理和监控工具,提供了直观的 Web 界面。

7.3 Prometheus 和 Grafana

使用 Prometheus 进行指标采集,结合 Grafana 进行可视化展示,可以更全面地监控 Kafka 集群的性能和健康状况。

总结

在深入探讨Kafka消息格式、版本控制、安全性和监控等关键主题后,对构建高效、灵活的消息系统有了更为全面的认识。了解消息结构、序列化与反序列化、自定义消息格式,以及消息的压缩与解压,是确保消息传递的基础。随后,版本控制与兼容性的重要性得到了强调,Schema Registry成为管理Avro、JSON等消息格式的利器。在保障消息传递安全方面,SSL加密通信和认证配置提供了可靠的手段。最后,通过JMX监控、Kafka Manager、以及Prometheus和Grafana的运用,能够实时追踪和监控Kafka集群的健康状态。

这篇文章旨在为大家提供全方位的Kafka消息系统知识,使其能够在实际应用中根据业务需求构建稳健、高效的消息处理系统。深入理解这些关键概念,将有助于确保消息系统的可维护性、稳定性和安全性,为实际业务场景中的挑战提供可行的解决方案。继续关注更多Kafka相关的技术内容,将使大家能够不断深化对消息系统的认识,应对日益复杂的数据处理需求。

相关文章:

Kafka 的消息格式:了解消息结构与序列化

Kafka 作为一款高性能的消息中间件系统&#xff0c;其消息格式对于消息的生产、传输和消费起着至关重要的作用。本篇博客将深入讨论 Kafka 的消息格式&#xff0c;包括消息的结构、序列化与反序列化&#xff0c;以及一些常用的消息格式选项。通过更丰富的示例代码和深入的解析&…...

装箱 Box 数据类型

装箱是最简单直接的一种智能指针&#xff0c;它的类型是Box<T>。装箱使我们可以把数据存储到堆上&#xff0c;并在栈上保留一个指向堆数据的指针。装箱操作常常被用于下面的场景&#xff1a; 当你拥有一个无法在编译时确定大小的类型&#xff0c;但又想使用这个类型的值…...

多传感器融合SLAM在自动驾驶方向的初步探索的记录

1. VIO的不可观问题 现有的VIO都是解决的六自由度的问题, 但是对于行驶在路面上的车来说, 通常情况下不会有roll与z方向的自由度, 而且车体模型限制了不可能有纯yaw的变换. 同时由于IMU在Z轴上与roll, pitch上激励不足, 会导致IMU在初始化过程中尺度不准以及重力方向估计错误,…...

ffmpeg与opencv-python处理视频

安装 opencv pip install opencv-pythonFFmpeg 1.下载 FFmpeg 访问FFmpeg官方网站。选择 “Windows builds from gyan.dev” 链接&#xff0c;这会带您到一个包含最新版本 FFmpeg Windows 构建的页面。选择一个适合您系统的版本&#xff08;例如&#xff0c;32位或64位&…...

java 操作git

​ 实现功能&#xff1a;借助jgit实现拉取文件&#xff0c;并返回文件路径清单 <!-- 依赖库 版本号有自行选择&#xff0c;只是需要注意支持的jdk版本即可&#xff0c;我使用的是jdk1.8--> <dependency><groupId>org.eclipse.jgit</groupId><artif…...

Linux 导入、导出 MySQL 数据库命令

一、导出数据库 1、导出完整数据&#xff1a;表结构数据 mysqldump -u用户名 -p 数据库名 > 数据库名.sql 举例&#xff1a;以下命令可以导出 abc 数据库的数据和表结构 /usr/local/mysql/bin/mysqldump -uroot -p abc > abc.sql2、只导出表结构 mysqldump -u用户名 -p…...

华为数通---BFD多跳检测示例

定义 双向转发检测BFD&#xff08;Bidirectional Forwarding Detection&#xff09;是一种全网统一的检测机制&#xff0c;用于快速检测、监控网络中链路或者IP路由的转发连通状况。 目的 为了减小设备故障对业务的影响&#xff0c;提高网络的可靠性&#xff0c;网络设备需要…...

AWS 日志分析工具

当您的网络资源托管在 AWS 中时&#xff0c;需要定期监控您的 AWS CloudTrail 日志、Amazon S3 服务器日志和 AWS ELB 日志等云日志&#xff0c;以降低任何潜在的安全风险、识别严重错误并确保满足所有合规性法规。 什么是 Amazon S3 Amazon Simple Storage Service&#xff…...

gitLab 和Idea分支合并

以下二选1即可完成分支合并建议第一种简单有效 Idea合并方式 切换到被合并的分支&#xff0c;如我想把0701的内容合并到dev&#xff0c;切换到dev分支&#xff0c;然后再点击merge然后选择要合并的分支&#xff0c;即可,此时git上的代码没有更新只是把代码合到本地需要pull才…...

关于 mapboxgl 的常用方法及效果

给地图标记点 实现效果 /*** 在地图上添加标记点* point: [lng, lat]* color: #83f7a0*/addMarkerOnMap(point, color #83f7a0) {const marker new mapboxgl.Marker({draggable: false,color: color,}).setLngLat(point).addTo(this.map);this.markersList.push(marker);},…...

C语言——二级指针

指针变量也是变量&#xff0c;是变量就有地址&#xff0c;那么指针变量的地址存放在哪里&#xff1f;——这就是二期指针 int a 10;int *pa &a;int **ppa &pa;//a的地址存放在pa中&#xff0c;pa的地址存放在ppa中。 //pa是一级指针&#xff0c;ppa是二级指针。 对…...

股市复苏中的明懿金汇:抓住新机遇

2023年对于明懿金汇来说是充满挑战与机遇的一年。面对复杂多变的市场环境&#xff0c;明懿金汇展现了其对市场趋势的敏锐洞察和卓越的策略适应能力。以下是该公司在2023年的主要投资策略和市场适应方式的详细分析。 随着2023年中国股市迎来反弹&#xff0c;明懿金汇迅速调整了…...

Spacemesh、Kaspa和Chia的全面对比!

当今区块链领域&#xff0c;PoST&#xff08;Proof of Space and Time&#xff09;共识算法引领着一股新的技术浪潮。在这个热潮下&#xff0c;Chia项目作为PoST共识机制的经典项目&#xff0c;和目前算力赛道备受瞩目的Kaspa项目&#xff0c;都是不可忽视的存在。虽然这两个项…...

【HTML语法】

HTML语法 1. HTML语法1.1 HTML编辑器1.2 HTML模板1.3 标签示例1.4 常见的HTML标签1.51.61.71.81.91.101.11 学习网站&#xff1a;https://www.runoob.com/html/html-tutorial.html 1. HTML语法 HTML&#xff08;全称 Hypertext Markup Language&#xff0c;超文本标记语言&…...

ROS报错:RLException:Invalid roslaunch XML Syntax: mismatched tag:

运行roslaunch文件提示&#xff1a; RLException:Invalid roslaunch XML Syntax: mismatched tag: line 45&#xff0c; column 2 The traceback for the exception was written to the log file. j 解决办法&#xff1a; line45 行多了标签&#xff1a;</node> 另外…...

C语言实现快速排序

完整代码&#xff1a; #include<stdio.h>//用第一个元素将待排序序列划分成左右两个部分&#xff0c;返回排序后low的位置&#xff0c;即枢轴的位置 int partition(int arr[],int low,int high){//让待排序序列中的第一个元素成为基准int pivotarr[low];//lowhigh代表一…...

ChatGPT对于当今的社会或科技发展有何重要性?

ChatGPT对于当今社会和科技发展的重要性在于&#xff1a; 促进社交交流&#xff1a;ChatGPT可以为人们提供全天候的在线聊天服务&#xff0c;连接人与人之间的沟通交流&#xff0c;改善社交沟通方式。 提高有效性和效率&#xff1a;人们可以通过ChatGPT获得快速和精确的信息&a…...

宝塔是可以切换mongodb版本的

在软件商店&#xff0c;搜索monggodb&#xff0c;点击设置。点击第三个标签版本切换即可。但是前提要删除所有非系统数据库。 删除数据库方法&#xff1a; 要在 MongoDB 中删除一个数据库&#xff0c;可以使用 dropDatabase() 命令。请注意&#xff0c;在执行此操作之前&#x…...

16、XSS——会话管理

文章目录 一、web会话管理概述1.1 会话管理1.2 为什么需要会话管理&#xff1f;1.3 常见的web应用会话管理的方式 二、会话管理方式2.1 基于server端的session的管理方式2.2 cookie-based的管理方式2.3 token-based的管理方式 三、安全问题 一、web会话管理概述 1.1 会话管理 …...

稀疏矩阵的操作(数据结构实训)

题目&#xff1a; 标准输入输出 题目描述&#xff1a; 稀疏矩阵可以采用三元组存储。 输入&#xff1a; 输入包含若干个测试用例&#xff0c;每个测试用例的第一行为两个正整数m,n(1<m,n<100),表示矩阵的行数和列数,接下来m行&#xff0c;每行n个整数&#xff0c;表示稀疏…...

19c补丁后oracle属主变化,导致不能识别磁盘组

补丁后服务器重启&#xff0c;数据库再次无法启动 ORA01017: invalid username/password; logon denied Oracle 19c 在打上 19.23 或以上补丁版本后&#xff0c;存在与用户组权限相关的问题。具体表现为&#xff0c;Oracle 实例的运行用户&#xff08;oracle&#xff09;和集…...

调用支付宝接口响应40004 SYSTEM_ERROR问题排查

在对接支付宝API的时候&#xff0c;遇到了一些问题&#xff0c;记录一下排查过程。 Body:{"datadigital_fincloud_generalsaas_face_certify_initialize_response":{"msg":"Business Failed","code":"40004","sub_msg…...

K8S认证|CKS题库+答案| 11. AppArmor

目录 11. AppArmor 免费获取并激活 CKA_v1.31_模拟系统 题目 开始操作&#xff1a; 1&#xff09;、切换集群 2&#xff09;、切换节点 3&#xff09;、切换到 apparmor 的目录 4&#xff09;、执行 apparmor 策略模块 5&#xff09;、修改 pod 文件 6&#xff09;、…...

shell脚本--常见案例

1、自动备份文件或目录 2、批量重命名文件 3、查找并删除指定名称的文件&#xff1a; 4、批量删除文件 5、查找并替换文件内容 6、批量创建文件 7、创建文件夹并移动文件 8、在文件夹中查找文件...

在rocky linux 9.5上在线安装 docker

前面是指南&#xff0c;后面是日志 sudo dnf config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo sudo dnf install docker-ce docker-ce-cli containerd.io -y docker version sudo systemctl start docker sudo systemctl status docker …...

深入浅出:JavaScript 中的 `window.crypto.getRandomValues()` 方法

深入浅出&#xff1a;JavaScript 中的 window.crypto.getRandomValues() 方法 在现代 Web 开发中&#xff0c;随机数的生成看似简单&#xff0c;却隐藏着许多玄机。无论是生成密码、加密密钥&#xff0c;还是创建安全令牌&#xff0c;随机数的质量直接关系到系统的安全性。Jav…...

三体问题详解

从物理学角度&#xff0c;三体问题之所以不稳定&#xff0c;是因为三个天体在万有引力作用下相互作用&#xff0c;形成一个非线性耦合系统。我们可以从牛顿经典力学出发&#xff0c;列出具体的运动方程&#xff0c;并说明为何这个系统本质上是混沌的&#xff0c;无法得到一般解…...

【开发技术】.Net使用FFmpeg视频特定帧上绘制内容

目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法&#xff0c;当前调用一个医疗行业的AI识别算法后返回…...

Java线上CPU飙高问题排查全指南

一、引言 在Java应用的线上运行环境中&#xff0c;CPU飙高是一个常见且棘手的性能问题。当系统出现CPU飙高时&#xff0c;通常会导致应用响应缓慢&#xff0c;甚至服务不可用&#xff0c;严重影响用户体验和业务运行。因此&#xff0c;掌握一套科学有效的CPU飙高问题排查方法&…...

Python网页自动化Selenium中文文档

1. 安装 1.1. 安装 Selenium Python bindings 提供了一个简单的API&#xff0c;让你使用Selenium WebDriver来编写功能/校验测试。 通过Selenium Python的API&#xff0c;你可以非常直观的使用Selenium WebDriver的所有功能。 Selenium Python bindings 使用非常简洁方便的A…...