(七)消息队列-Kafka 序列化avro(传递)
(七)消息队列-Kafka 序列化avro(传递)
客从远方来,遗我双鲤鱼。呼儿烹鲤鱼,中有尺素书。
——佚名《饮马长城窟行》

本文已同步CSDN、掘金平台、知乎等多个平台,图片依然保持最初发布的水印(如CSDN水印)。(以后属于本人原创均以新建状态在多个平台分享发布)
前言
多年前,由于工作的性质,发现这系列没有写完,想了想,做人做事还是要有始有终。🤣实在是借口太多了,太不像话了…由于时间过得太久了,这篇开始,可能很多技术以最新或最近的几个版本为主了。
问题背景
在Kafka中,生产者与消费者之间传输消息时,通常需要对数据进行序列化和反序列化。常见的序列化方式如JSON或String存在以下问题:
- 数据冗余:字段名重复存储,占用带宽;
- 兼容性差:新增或删除字段时容易导致上下游解析失败;
- 类型安全缺失:动态解析易引发运行时错误。
而Avro作为一种高效的二进制序列化框架,通过Schema定义数据结构,可实现紧凑存储、动态兼容性和强类型校验,成为Kafka生态中推荐的序列化方案27。
核心原理
-
Schema驱动
Avro要求所有数据必须与预定义的Schema文件(.avsc)匹配。Schema以JSON格式描述数据结构,例如:{"type": "record","name": "User","namespace": "com.example.avro","fields": [{"name": "id", "type": "int"},{"name": "name", "type": "string"}] }然后使用
avro-maven-plugin生成 Java 类:<plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>1.11.0</version><executions><execution><phase>generate-sources</phase><goals><goal>schema</goal></goals></execution></executions> </plugin>执行
mvn clean compile后,com.example.avro.User类会被自动生成。生产者与消费者需共享同一Schema,确保序列化与反序列化的一致性。
-
二进制编码
Avro将数据转换为紧凑的二进制格式,相比JSON减少约30%-50%的存储与传输开销。例如,整型字段直接以二进制存储,无需字段名冗余7。 -
Schema Registry
为实现Schema动态管理,通常搭配Schema Registry(如Confluent或Apicurio)使用。其核心功能包括:- Schema版本控制与兼容性检查;
- 通过唯一ID关联消息与Schema,避免传输完整Schema带来的性能损耗。
实现步骤
以下以Java代码为例,展示Kafka集成Avro的配置方法:
1. 添加依赖
<dependencies><!-- Spring Kafka 依赖--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- Avro 依赖 --><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId></dependency><!-- Schema Registry 依赖 --><dependency><groupId>io.confluent</groupId><artifactId>kafka-avro-serializer</artifactId><version>7.2.1</version></dependency>
</dependencies>
运行 HTML
2. 配置生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", KafkaAvroSerializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081"); // Schema Registry地址Producer<String, GenericRecord> producer = new KafkaProducer<>(props);// 构建Avro消息
GenericRecord user = new GenericData.Record(schema);
user.put("id", 1);
user.put("name", "Alice");producer.send(new ProducerRecord<>("user-topic", user));------ SpringBoot框架 直接用配置application.yml 和生产者服务类--------------
spring:kafka:bootstrap-servers: localhost:9092properties:schema.registry.url: http://localhost:8081producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer@Service
public class UserProducer {private final KafkaTemplate<String, User> kafkaTemplate;@Value("${kafka.topic.user}")private String topic;public UserProducer(KafkaTemplate<String, User> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendUser(User user) {kafkaTemplate.send(topic, user.getId().toString(), user);}
}在 Spring Boot 启动后,我们可以使用以下代码发送一个 User 消息:
User user = User.newBuilder().setId(1).setName("Alice").build();
userProducer.sendUser(user);控制台应该能够看到消费者成功接收到 User 数据
3. 配置消费者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "avro-consumer");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", KafkaAvroDeserializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081");Consumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("user-topic"));while (true) {ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, GenericRecord> record : records) {System.out.println("Received: " + record.value().get("name"));}
}------ SpringBoot框架 直接用配置application.yml 和消费者服务类--------------
在 application.yml 中配置消费者参数:spring:kafka:consumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializerproperties:specific.avro.reader: true然后编写 Kafka 消费者代码:@Service
@KafkaListener(topics = "user_topic", groupId = "user_group")
public class UserConsumer {@KafkaHandlerpublic void consume(User user) {System.out.println("Received user: " + user.getName());}
}
常见问题与解决方案
- Schema兼容性错误
- 现象:生产者更新Schema后消费者无法解析旧数据。
- 解决:在Schema Registry中配置兼容性策略(如
BACKWARD),允许新增字段并设置默认值7。
- ClassNotFoundException
- 现象:反序列化时提示Avro生成的类不存在。
- 解决:通过Maven插件
avro-maven-plugin自动生成Java类,并确保生成路径在编译范围内2。
- 性能瓶颈
- 现象:高吞吐场景下序列化延迟较高。
- 优化:复用
DatumWriter和DatumReader实例,避免重复初始化开销7。
总结
Avro通过Schema定义与二进制编码,为Kafka提供了高效、类型安全的序列化方案。结合Schema Registry可实现动态兼容性管理,适用于复杂业务场景下的数据演进需求。实践中需注意Schema版本控制与性能调优,具体工具链配置可参考Confluent官方文档27。
引用说明
- 代码结构参考自SpringBoot RestTemplate配置方案,通过替换默认组件实现功能增强。
- Schema兼容性问题分析借鉴了MAT工具中内存对象关联性的排查思路。
后续
下期预告,敬请关注:
(八)消息队列-Kafka 生产者
相关文章:
(七)消息队列-Kafka 序列化avro(传递)
(七)消息队列-Kafka 序列化avro(传递) 客从远方来,遗我双鲤鱼。呼儿烹鲤鱼,中有尺素书。 ——佚名《饮马长城窟行》 本文已同步CSDN、掘金平台、知乎等多个平台,图片依然保持最初发布的水印&…...
js基础二
JavaScript基础下 1 事件处理 JS 事件(event)是当用户与网页进行交互时发生的事情,例如单机某个链接或按钮、在文本框中输入文本、按下键盘上的某个按键、移动鼠标等等。当事件发生时,您可以使用 JavaScript 中的事件处理程序&a…...
WSBDF レクチア 定义2 引理3 wsbdf的乘子
定义2 引理3 wsbdf的乘子 ここまで 寝みます❓...
Qt之QStateMachine等待
在项目中经常需要等待,我们模拟0-30的数,假如我们其中5, 25的数需要进行等待,等待用户处理完自己事情后,按下按钮继续,找Qt的项目中有一个 QStateMachineqstatemmachine类提供了一个分层有限状态机。 QSta…...
Wireshark 插件开发实战指南
Wireshark 插件开发实战指南 环境搭建流程图 #mermaid-svg-XpNibno7BIyfzNn5 {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-XpNibno7BIyfzNn5 .error-icon{fill:#552222;}#mermaid-svg-XpNibno7BIyfzNn5 .error-t…...
基于SpringBoot的“青少年心理健康教育网站”的设计与实现(源码+数据库+文档+PPT)
基于SpringBoot的“青少年心理健康教育网站”的设计与实现(源码数据库文档PPT) 开发语言:Java 数据库:MySQL 技术:SpringBoot 工具:IDEA/Ecilpse、Navicat、Maven 系统展示 系统总体结构图 实体属性图 系统首页界…...
23-整数转罗马数字
代码 测试用例 测试结果 测试结果 12. 整数转罗马数字 中等 相关标签 相关企业 七个不同的符号代表罗马数字,其值如下: 符号值I1V5X10L50C100D500M1000 罗马数字是通过添加从最高到最低的小数位值的转换而形成的。将小数位值转换为罗马数字有以…...
SpringBoot+Redis+Mybatis-plus黑马点评
短信登录 基于Session实现登录 流程: 发送短信验证码-->短信验证码注册登录-->校验登录状态(保存用户到ThreadLocal,方便后续使用) 不能每次请求服务都要进行登录状态校验,解决办法:拦截器 在Sp…...
深入剖析 OpenCV:全面掌握基础操作、图像处理算法与特征匹配
深入剖析 OpenCV:全面掌握基础操作、图像处理算法与特征匹配 一、引言二、OpenCV 的安装(一)使用 pip 安装(二)使用 Anaconda 安装 三、OpenCV 基础操作(一)图像的读取、显示与保存(…...
【C语言显示Linux系统参数】
在C语言中,可以通过调用Linux系统提供的API来获取和显示系统参数。以下是一些常见的系统参数及其获取方法: 1. 获取系统名称和版本 可以使用uname函数来获取系统名称、版本等信息。 #include <stdio.h> #include <sys/utsname.h>int main…...
突破Ajax跨域困境,解锁前端通信新姿势
一、引言 在当今的 Web 开发领域,前后端分离的架构模式已经成为主流,它极大地提升了开发效率和项目的可维护性。在这种开发模式下,前端通过 Ajax 技术与后端进行数据交互,然而,跨域问题却如影随形,成为了开…...
Kotlin协变与逆变区别
在Kotlin中,协变和逆变是泛型编程中的两个重要概念,它们允许我们在类型系统中更加灵活地处理类型关系。 1.协变:协变允许我们使用比原始类型更具体的类型。在kotlin中,通过在类型参数上加out关键字来表示协变,生产者,例…...
driver中为什么要使用非阻塞赋值
1. 模拟硬件时序行为 实际硬件行为:DUT的输入信号通常在时钟边沿被采样。Driver需要确保信号的更新与时钟同步,而非阻塞赋值的延迟更新特性(在时间步结束时统一生效)能够准确模拟寄存器的行为。 示例: always (posedg…...
模板字符串【ES6】
“路漫漫其修远兮,吾将上下而求索。”—— 屈原《离骚》 目录 什么是模板字符串?模板字符串特性及代码举例:详细举例用法: 什么是模板字符串? 模板字符串(Template Literals)是JavaScript中引入…...
通往 AI 之路:Python 机器学习入门-数据结构
Python 数据结构 Python 提供了多种数据结构来存储和操作数据,其中列表(list)、字典(dict)、元组(tuple)和集合(set)是最常用的几种。本章将详细介绍这些数据结构的基本…...
我们应该如何优化UI(基于UGUI)
这是一道面试题,下面,我们来详细分析这个问题。 目录 1. 减少 Draw Call 合理设置图集 避免材质和 Shader 的频繁切换 减少 UI 元素的重叠 2. 优化UI布局 3. 优化UI元素的渲染 4.优化UI动画 5. 优化 UI 事件处理 6. 运行时优化 1. 减少 Draw C…...
CSS3 圆角:实现与优化指南
CSS3 圆角:实现与优化指南 随着网页设计的发展,CSS3 圆角已经成为了现代网页设计中不可或缺的元素之一。本文将详细讲解 CSS3 圆角的基本用法、实现方式以及优化技巧,帮助您在网页设计中更好地运用这一功能。 一、CSS3 圆角基本用法 1.1 基…...
【网络安全 | 扫描子域+发现真实IP】CloakQuest3r安装使用详细教程
原创文章,禁止转载。 本文仅作学习交流使用,不得用于非法渗透,笔者不承担任何责任。 文章目录 简介功能介绍执行流程限制安装步骤可选功能:SecurityTrails API使用示例简介 CloakQuest3r 是一款强大的 Python 工具,专为揭示受 Cloudflare 及类似服务保护的网站真实 IP 地…...
Mellanox OFED驱动如何给全局编译添加gcc的编译选项?(subdir-ccflags-y += -Wall)
背景 有些时候编译驱动需要给全局加一个编译选项,假设configure已经完成。可以直接在Makefile中修改 添加方式 修改OFED驱动目录下的: ./Makefile subdir-ccflags-y -Wall修改效果: 然后执行make,就能让添加的编译选项生效…...
【愚公系列】《Python网络爬虫从入门到精通》037-文件的存取
标题详情作者简介愚公搬代码头衔华为云特约编辑,华为云云享专家,华为开发者专家,华为产品云测专家,CSDN博客专家,CSDN商业化专家,阿里云专家博主,阿里云签约作者,腾讯云优秀博主,腾讯云内容共创官,掘金优秀博主,亚马逊技领云博主,51CTO博客专家等。近期荣誉2022年度…...
思源黑体TTF构建指南:免费商用多语言字体的终极解决方案
思源黑体TTF构建指南:免费商用多语言字体的终极解决方案 【免费下载链接】source-han-sans-ttf A (hinted!) version of Source Han Sans 项目地址: https://gitcode.com/gh_mirrors/so/source-han-sans-ttf 你是否曾为多语言项目中的字体问题而烦恼…...
免费中医AI终极指南:仲景大模型如何让普通人也能享受专业中医咨询
免费中医AI终极指南:仲景大模型如何让普通人也能享受专业中医咨询 【免费下载链接】CMLM-ZhongJing 首个中医大语言模型——“仲景”。受古代中医学巨匠张仲景深邃智慧启迪,专为传统中医领域打造的预训练大语言模型。 The first-ever Traditional Chines…...
Windows右键菜单终极优化指南:如何用ContextMenuManager让右键菜单秒开如飞
Windows右键菜单终极优化指南:如何用ContextMenuManager让右键菜单秒开如飞 【免费下载链接】ContextMenuManager 🖱️ 纯粹的Windows右键菜单管理程序 项目地址: https://gitcode.com/gh_mirrors/co/ContextMenuManager 你是否曾经对着电脑屏幕等…...
【linux学习】linux工具篇(下)
Linux调试器-gdb使用,Linux项目自动化构建工具-make/Makefile我是程序员小青蛙,下面分享linux的工具利用前言程序的发布方式有两种,debug模式和release模式 Linux gcc/g出来的二进制程序,默认是release模式 要使用gdb调试…...
Flink 2.2集成Flink CDC 3.6
1 、部署Flink CDC tar -zxf flink-cdc-3.6.0-2.2-bin.tar.gz -C /usr/bigtop/3.3.0/usr/libln -s /usr/bigtop/3.3.0/usr/lib/flink-cdc-3.6...
网盘直链解析工具:多平台文件下载的实用解决方案
网盘直链解析工具:多平台文件下载的实用解决方案 【免费下载链接】Online-disk-direct-link-download-assistant 一个基于 JavaScript 的网盘文件下载地址获取工具。基于【网盘直链下载助手】修改 ,支持 百度网盘 / 阿里云盘 / 中国移动云盘 / 天翼云盘 …...
书匠策AI:那个让你论文查重从“红色地狱“直接变“绿色天堂“的神器
各位正在跟论文死磕的同学们,先别划走。 今天咱们不聊怎么写开题报告,不聊怎么搭框架,咱们聊一个所有人写完初稿后都会遭遇的终极BOSS——查重。 你有没有经历过这种崩溃:熬夜写了一万字,信心满满提交查重࿰…...
后端开发必知的数据库优化技巧:这5个方法让你的系统性能提升10倍
对于软件测试从业者来说,理解数据库优化逻辑不仅能帮我们更快定位性能瓶颈,还能让我们在测试阶段就提前发现潜在的数据库设计问题,避免上线后出现大规模性能故障。很多测试同学往往把注意力放在接口逻辑、功能正确性上,却忽略了数…...
Playwright Python3.7+安装失败根因与一次成功配置指南
1. 为什么Playwright在Python3.7环境下总“装不上”?——这不是你的pip问题,是环境认知偏差 你刚在新配的Mac M2上敲下 pip install playwright ,终端卡在 Building wheel for playwright... 十分钟不动;或者Windows上反复提示…...
通过用量看板与成本管理功能实现团队API支出精细化管控
🚀 告别海外账号与网络限制!稳定直连全球优质大模型,限时半价接入中。 👉 点击领取海量免费额度 通过用量看板与成本管理功能实现团队API支出精细化管控 对于依赖大模型API进行开发的团队而言,成本控制与资源分配的透…...
