当前位置: 首页 > news >正文

(七)消息队列-Kafka 序列化avro(传递)

(七)消息队列-Kafka 序列化avro(传递)

客从远方来,遗我双鲤鱼。呼儿烹鲤鱼,中有尺素书。
——佚名《饮马长城窟行》

在这里插入图片描述

本文已同步CSDN、掘金平台、知乎等多个平台,图片依然保持最初发布的水印(如CSDN水印)。(以后属于本人原创均以新建状态在多个平台分享发布)

前言

多年前,由于工作的性质,发现这系列没有写完,想了想,做人做事还是要有始有终。🤣实在是借口太多了,太不像话了…由于时间过得太久了,这篇开始,可能很多技术以最新或最近的几个版本为主了。

问题背景

在Kafka中,生产者与消费者之间传输消息时,通常需要对数据进行序列化和反序列化。常见的序列化方式如JSON或String存在以下问题:

  1. 数据冗余:字段名重复存储,占用带宽;
  2. 兼容性差:新增或删除字段时容易导致上下游解析失败;
  3. 类型安全缺失:动态解析易引发运行时错误。

而Avro作为一种高效的二进制序列化框架,通过Schema定义数据结构,可实现紧凑存储动态兼容性强类型校验,成为Kafka生态中推荐的序列化方案27。


核心原理
  1. Schema驱动
    Avro要求所有数据必须与预定义的Schema文件(.avsc)匹配。Schema以JSON格式描述数据结构,例如:

    {"type": "record","name": "User","namespace": "com.example.avro","fields": [{"name": "id", "type": "int"},{"name": "name", "type": "string"}]
    }
    

    然后使用 avro-maven-plugin 生成 Java 类:

    <plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>1.11.0</version><executions><execution><phase>generate-sources</phase><goals><goal>schema</goal></goals></execution></executions>
    </plugin>
    

    执行 mvn clean compile 后,com.example.avro.User 类会被自动生成。

    生产者与消费者需共享同一Schema,确保序列化与反序列化的一致性。

  2. 二进制编码
    Avro将数据转换为紧凑的二进制格式,相比JSON减少约30%-50%的存储与传输开销。例如,整型字段直接以二进制存储,无需字段名冗余7。

  3. Schema Registry
    为实现Schema动态管理,通常搭配Schema Registry(如Confluent或Apicurio)使用。其核心功能包括:

    • Schema版本控制与兼容性检查;
    • 通过唯一ID关联消息与Schema,避免传输完整Schema带来的性能损耗。

实现步骤

以下以Java代码为例,展示Kafka集成Avro的配置方法:

1. 添加依赖
<dependencies><!-- Spring Kafka 依赖--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- Avro 依赖 --><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId></dependency><!-- Schema Registry 依赖 --><dependency><groupId>io.confluent</groupId><artifactId>kafka-avro-serializer</artifactId><version>7.2.1</version></dependency>
</dependencies>

运行 HTML

2. 配置生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", KafkaAvroSerializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081"); // Schema Registry地址Producer<String, GenericRecord> producer = new KafkaProducer<>(props);// 构建Avro消息
GenericRecord user = new GenericData.Record(schema);
user.put("id", 1);
user.put("name", "Alice");producer.send(new ProducerRecord<>("user-topic", user));------ SpringBoot框架 直接用配置application.yml 和生产者服务类--------------
spring:kafka:bootstrap-servers: localhost:9092properties:schema.registry.url: http://localhost:8081producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer@Service
public class UserProducer {private final KafkaTemplate<String, User> kafkaTemplate;@Value("${kafka.topic.user}")private String topic;public UserProducer(KafkaTemplate<String, User> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendUser(User user) {kafkaTemplate.send(topic, user.getId().toString(), user);}
}在 Spring Boot 启动后,我们可以使用以下代码发送一个 User 消息:
User user = User.newBuilder().setId(1).setName("Alice").build();
userProducer.sendUser(user);控制台应该能够看到消费者成功接收到 User 数据
3. 配置消费者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "avro-consumer");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", KafkaAvroDeserializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081");Consumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("user-topic"));while (true) {ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, GenericRecord> record : records) {System.out.println("Received: " + record.value().get("name"));}
}------ SpringBoot框架 直接用配置application.yml 和消费者服务类--------------
在 application.yml 中配置消费者参数:spring:kafka:consumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializerproperties:specific.avro.reader: true然后编写 Kafka 消费者代码:@Service
@KafkaListener(topics = "user_topic", groupId = "user_group")
public class UserConsumer {@KafkaHandlerpublic void consume(User user) {System.out.println("Received user: " + user.getName());}
}

常见问题与解决方案
  1. Schema兼容性错误
    • 现象:生产者更新Schema后消费者无法解析旧数据。
    • 解决:在Schema Registry中配置兼容性策略(如BACKWARD),允许新增字段并设置默认值7。
  2. ClassNotFoundException
    • 现象:反序列化时提示Avro生成的类不存在。
    • 解决:通过Maven插件avro-maven-plugin自动生成Java类,并确保生成路径在编译范围内2。
  3. 性能瓶颈
    • 现象:高吞吐场景下序列化延迟较高。
    • 优化:复用DatumWriterDatumReader实例,避免重复初始化开销7。

总结

Avro通过Schema定义与二进制编码,为Kafka提供了高效、类型安全的序列化方案。结合Schema Registry可实现动态兼容性管理,适用于复杂业务场景下的数据演进需求。实践中需注意Schema版本控制与性能调优,具体工具链配置可参考Confluent官方文档27。


引用说明

  • 代码结构参考自SpringBoot RestTemplate配置方案,通过替换默认组件实现功能增强。
  • Schema兼容性问题分析借鉴了MAT工具中内存对象关联性的排查思路。

后续

下期预告,敬请关注:
(八)消息队列-Kafka 生产者

相关文章:

(七)消息队列-Kafka 序列化avro(传递)

&#xff08;七&#xff09;消息队列-Kafka 序列化avro&#xff08;传递&#xff09; 客从远方来&#xff0c;遗我双鲤鱼。呼儿烹鲤鱼&#xff0c;中有尺素书。 ——佚名《饮马长城窟行》 本文已同步CSDN、掘金平台、知乎等多个平台&#xff0c;图片依然保持最初发布的水印&…...

js基础二

JavaScript基础下 1 事件处理 JS 事件&#xff08;event&#xff09;是当用户与网页进行交互时发生的事情&#xff0c;例如单机某个链接或按钮、在文本框中输入文本、按下键盘上的某个按键、移动鼠标等等。当事件发生时&#xff0c;您可以使用 JavaScript 中的事件处理程序&a…...

WSBDF レクチア 定义2 引理3 wsbdf的乘子

定义2 引理3 wsbdf的乘子 ここまで 寝みます❓...

Qt之QStateMachine等待

在项目中经常需要等待&#xff0c;我们模拟0-30的数&#xff0c;假如我们其中5&#xff0c; 25的数需要进行等待&#xff0c;等待用户处理完自己事情后&#xff0c;按下按钮继续&#xff0c;找Qt的项目中有一个 QStateMachineqstatemmachine类提供了一个分层有限状态机。 QSta…...

Wireshark 插件开发实战指南

Wireshark 插件开发实战指南 环境搭建流程图 #mermaid-svg-XpNibno7BIyfzNn5 {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-XpNibno7BIyfzNn5 .error-icon{fill:#552222;}#mermaid-svg-XpNibno7BIyfzNn5 .error-t…...

基于SpringBoot的“青少年心理健康教育网站”的设计与实现(源码+数据库+文档+PPT)

基于SpringBoot的“青少年心理健康教育网站”的设计与实现&#xff08;源码数据库文档PPT) 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;SpringBoot 工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 系统总体结构图 实体属性图 系统首页界…...

23-整数转罗马数字

代码 测试用例 测试结果 测试结果 12. 整数转罗马数字 中等 相关标签 相关企业 七个不同的符号代表罗马数字&#xff0c;其值如下&#xff1a; 符号值I1V5X10L50C100D500M1000 罗马数字是通过添加从最高到最低的小数位值的转换而形成的。将小数位值转换为罗马数字有以…...

SpringBoot+Redis+Mybatis-plus黑马点评

短信登录 基于Session实现登录 流程&#xff1a; 发送短信验证码-->短信验证码注册登录-->校验登录状态&#xff08;保存用户到ThreadLocal&#xff0c;方便后续使用&#xff09; 不能每次请求服务都要进行登录状态校验&#xff0c;解决办法&#xff1a;拦截器 在Sp…...

深入剖析 OpenCV:全面掌握基础操作、图像处理算法与特征匹配

深入剖析 OpenCV&#xff1a;全面掌握基础操作、图像处理算法与特征匹配 一、引言二、OpenCV 的安装&#xff08;一&#xff09;使用 pip 安装&#xff08;二&#xff09;使用 Anaconda 安装 三、OpenCV 基础操作&#xff08;一&#xff09;图像的读取、显示与保存&#xff08;…...

【C语言显示Linux系统参数】

在C语言中&#xff0c;可以通过调用Linux系统提供的API来获取和显示系统参数。以下是一些常见的系统参数及其获取方法&#xff1a; 1. 获取系统名称和版本 可以使用uname函数来获取系统名称、版本等信息。 #include <stdio.h> #include <sys/utsname.h>int main…...

突破Ajax跨域困境,解锁前端通信新姿势

一、引言 在当今的 Web 开发领域&#xff0c;前后端分离的架构模式已经成为主流&#xff0c;它极大地提升了开发效率和项目的可维护性。在这种开发模式下&#xff0c;前端通过 Ajax 技术与后端进行数据交互&#xff0c;然而&#xff0c;跨域问题却如影随形&#xff0c;成为了开…...

Kotlin协变与逆变区别

在Kotlin中&#xff0c;协变和逆变是泛型编程中的两个重要概念&#xff0c;它们允许我们在类型系统中更加灵活地处理类型关系。 1.协变&#xff1a;协变允许我们使用比原始类型更具体的类型。在kotlin中&#xff0c;通过在类型参数上加out关键字来表示协变,生产者&#xff0c;例…...

driver中为什么要使用非阻塞赋值

1. 模拟硬件时序行为 实际硬件行为&#xff1a;DUT的输入信号通常在时钟边沿被采样。Driver需要确保信号的更新与时钟同步&#xff0c;而非阻塞赋值的延迟更新特性&#xff08;在时间步结束时统一生效&#xff09;能够准确模拟寄存器的行为。 示例&#xff1a; always (posedg…...

模板字符串【ES6】

“路漫漫其修远兮&#xff0c;吾将上下而求索。”—— 屈原《离骚》 目录 什么是模板字符串&#xff1f;模板字符串特性及代码举例&#xff1a;详细举例用法&#xff1a; 什么是模板字符串&#xff1f; 模板字符串&#xff08;Template Literals&#xff09;是JavaScript中引入…...

通往 AI 之路:Python 机器学习入门-数据结构

Python 数据结构 Python 提供了多种数据结构来存储和操作数据&#xff0c;其中列表&#xff08;list&#xff09;、字典&#xff08;dict&#xff09;、元组&#xff08;tuple&#xff09;和集合&#xff08;set&#xff09;是最常用的几种。本章将详细介绍这些数据结构的基本…...

我们应该如何优化UI(基于UGUI)

这是一道面试题&#xff0c;下面&#xff0c;我们来详细分析这个问题。 目录 1. 减少 Draw Call 合理设置图集 避免材质和 Shader 的频繁切换 减少 UI 元素的重叠 2. 优化UI布局 3. 优化UI元素的渲染 4.优化UI动画 5. 优化 UI 事件处理 6. 运行时优化 1. 减少 Draw C…...

CSS3 圆角:实现与优化指南

CSS3 圆角&#xff1a;实现与优化指南 随着网页设计的发展&#xff0c;CSS3 圆角已经成为了现代网页设计中不可或缺的元素之一。本文将详细讲解 CSS3 圆角的基本用法、实现方式以及优化技巧&#xff0c;帮助您在网页设计中更好地运用这一功能。 一、CSS3 圆角基本用法 1.1 基…...

【网络安全 | 扫描子域+发现真实IP】CloakQuest3r安装使用详细教程

原创文章,禁止转载。 本文仅作学习交流使用,不得用于非法渗透,笔者不承担任何责任。 文章目录 简介功能介绍执行流程限制安装步骤可选功能:SecurityTrails API使用示例简介 CloakQuest3r 是一款强大的 Python 工具,专为揭示受 Cloudflare 及类似服务保护的网站真实 IP 地…...

Mellanox OFED驱动如何给全局编译添加gcc的编译选项?(subdir-ccflags-y += -Wall)

背景 有些时候编译驱动需要给全局加一个编译选项&#xff0c;假设configure已经完成。可以直接在Makefile中修改 添加方式 修改OFED驱动目录下的&#xff1a; ./Makefile subdir-ccflags-y -Wall修改效果&#xff1a; 然后执行make&#xff0c;就能让添加的编译选项生效…...

【愚公系列】《Python网络爬虫从入门到精通》037-文件的存取

标题详情作者简介愚公搬代码头衔华为云特约编辑,华为云云享专家,华为开发者专家,华为产品云测专家,CSDN博客专家,CSDN商业化专家,阿里云专家博主,阿里云签约作者,腾讯云优秀博主,腾讯云内容共创官,掘金优秀博主,亚马逊技领云博主,51CTO博客专家等。近期荣誉2022年度…...

Cogito-v1-preview-llama-3B高性能:vLLM Serving + OpenAI兼容API部署教程

Cogito-v1-preview-llama-3B高性能&#xff1a;vLLM Serving OpenAI兼容API部署教程 1. 引言&#xff1a;为什么选择Cogito模型&#xff1f; 如果你正在寻找一个既强大又实用的语言模型&#xff0c;Cogito-v1-preview-llama-3B绝对值得关注。这个模型在同等规模的开源模型中…...

测试计划详细说明

一份高质量的测试计划本质上是质量风险的防御蓝图&#xff0c;它要在有限资源和无限质量诉求之间找到平衡点。我将从结构、内容、决策逻辑三个维度展开&#xff0c;并提供一个可直接落地的框架。一、测试计划的核心定位测试计划的本质回答三个问题&#xff1a;测什么&#xff1…...

Windows系统盘空间告急?Driver Store Explorer帮你轻松清理冗余驱动,快速释放10GB+

Windows系统盘空间告急&#xff1f;Driver Store Explorer帮你轻松清理冗余驱动&#xff0c;快速释放10GB 【免费下载链接】DriverStoreExplorer Driver Store Explorer 项目地址: https://gitcode.com/gh_mirrors/dr/DriverStoreExplorer 你是否曾困惑于Windows系统盘空…...

BetterGI:原神智能辅助系统 重新定义游戏体验

BetterGI&#xff1a;原神智能辅助系统 重新定义游戏体验 【免费下载链接】better-genshin-impact &#x1f4e6;BetterGI 更好的原神 - 自动拾取 | 自动剧情 | 全自动钓鱼(AI) | 全自动七圣召唤 | 自动伐木 | 自动刷本 | 自动采集/挖矿/锄地 | 一条龙 | 全连音游 - UI Automa…...

3步永久解锁加密PDF:ScienceDecrypting终极使用指南

3步永久解锁加密PDF&#xff1a;ScienceDecrypting终极使用指南 【免费下载链接】ScienceDecrypting 破解CAJViewer带有效期的文档&#xff0c;支持破解科学文库、标准全文数据库下载的文档。无损破解&#xff0c;保留文字和目录&#xff0c;解除有效期限制。 项目地址: http…...

ProperTree:跨平台Plist编辑器零基础上手指南

ProperTree&#xff1a;跨平台Plist编辑器零基础上手指南 【免费下载链接】ProperTree Cross platform GUI plist editor written in python. 项目地址: https://gitcode.com/gh_mirrors/pr/ProperTree 在macOS与iOS开发中&#xff0c;Plist文件如同系统的"配置密码…...

Git-Credential-Manager-for-Windows安全存储机制深度解析:如何保护你的Git凭证安全 [特殊字符]

Git-Credential-Manager-for-Windows安全存储机制深度解析&#xff1a;如何保护你的Git凭证安全 &#x1f510; 【免费下载链接】Git-Credential-Manager-for-Windows Secure Git credential storage for Windows with support for Visual Studio Team Services, GitHub, and B…...

游戏存档定制与个性化体验:CyberpunkSaveEditor完全指南

游戏存档定制与个性化体验&#xff1a;CyberpunkSaveEditor完全指南 【免费下载链接】CyberpunkSaveEditor A tool to edit Cyberpunk 2077 sav.dat files 项目地址: https://gitcode.com/gh_mirrors/cy/CyberpunkSaveEditor 为什么需要专业的存档编辑工具&#xff1f;解…...

YOLOv8n-face:工业级人脸检测技术的精度与效率平衡之道

YOLOv8n-face&#xff1a;工业级人脸检测技术的精度与效率平衡之道 【免费下载链接】yolov8-face yolov8 face detection with landmark 项目地址: https://gitcode.com/gh_mirrors/yo/yolov8-face 一、行业痛点诊断&#xff1a;企业级人脸检测的现实挑战 1.1 复杂场景…...

老虎证券季报图解:营收1.76亿美元同比增41% 净利4566万美元

雷递网 雷建平 4月2日老虎证券&#xff08;NASDAQ: TIGR&#xff09;日前发布截至2025年12月31日的财报。财报显示&#xff0c;老虎证券2025年营收为6.12亿美元&#xff0c;较上年同期的3.92亿美元增长56.1%。其中&#xff0c;老虎证券2025年来自佣金收入为2.67亿美元&#xff…...