(七)消息队列-Kafka 序列化avro(传递)
(七)消息队列-Kafka 序列化avro(传递)
客从远方来,遗我双鲤鱼。呼儿烹鲤鱼,中有尺素书。
——佚名《饮马长城窟行》
本文已同步CSDN、掘金平台、知乎等多个平台,图片依然保持最初发布的水印(如CSDN水印)。(以后属于本人原创均以新建状态在多个平台分享发布)
前言
多年前,由于工作的性质,发现这系列没有写完,想了想,做人做事还是要有始有终。🤣实在是借口太多了,太不像话了…由于时间过得太久了,这篇开始,可能很多技术以最新或最近的几个版本为主了。
问题背景
在Kafka中,生产者与消费者之间传输消息时,通常需要对数据进行序列化和反序列化。常见的序列化方式如JSON或String存在以下问题:
- 数据冗余:字段名重复存储,占用带宽;
- 兼容性差:新增或删除字段时容易导致上下游解析失败;
- 类型安全缺失:动态解析易引发运行时错误。
而Avro作为一种高效的二进制序列化框架,通过Schema定义数据结构,可实现紧凑存储、动态兼容性和强类型校验,成为Kafka生态中推荐的序列化方案27。
核心原理
-
Schema驱动
Avro要求所有数据必须与预定义的Schema文件(.avsc)匹配。Schema以JSON格式描述数据结构,例如:{"type": "record","name": "User","namespace": "com.example.avro","fields": [{"name": "id", "type": "int"},{"name": "name", "type": "string"}] }
然后使用
avro-maven-plugin
生成 Java 类:<plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>1.11.0</version><executions><execution><phase>generate-sources</phase><goals><goal>schema</goal></goals></execution></executions> </plugin>
执行
mvn clean compile
后,com.example.avro.User
类会被自动生成。生产者与消费者需共享同一Schema,确保序列化与反序列化的一致性。
-
二进制编码
Avro将数据转换为紧凑的二进制格式,相比JSON减少约30%-50%的存储与传输开销。例如,整型字段直接以二进制存储,无需字段名冗余7。 -
Schema Registry
为实现Schema动态管理,通常搭配Schema Registry(如Confluent或Apicurio)使用。其核心功能包括:- Schema版本控制与兼容性检查;
- 通过唯一ID关联消息与Schema,避免传输完整Schema带来的性能损耗。
实现步骤
以下以Java代码为例,展示Kafka集成Avro的配置方法:
1. 添加依赖
<dependencies><!-- Spring Kafka 依赖--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- Avro 依赖 --><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId></dependency><!-- Schema Registry 依赖 --><dependency><groupId>io.confluent</groupId><artifactId>kafka-avro-serializer</artifactId><version>7.2.1</version></dependency>
</dependencies>
运行 HTML
2. 配置生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", KafkaAvroSerializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081"); // Schema Registry地址Producer<String, GenericRecord> producer = new KafkaProducer<>(props);// 构建Avro消息
GenericRecord user = new GenericData.Record(schema);
user.put("id", 1);
user.put("name", "Alice");producer.send(new ProducerRecord<>("user-topic", user));------ SpringBoot框架 直接用配置application.yml 和生产者服务类--------------
spring:kafka:bootstrap-servers: localhost:9092properties:schema.registry.url: http://localhost:8081producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer@Service
public class UserProducer {private final KafkaTemplate<String, User> kafkaTemplate;@Value("${kafka.topic.user}")private String topic;public UserProducer(KafkaTemplate<String, User> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendUser(User user) {kafkaTemplate.send(topic, user.getId().toString(), user);}
}在 Spring Boot 启动后,我们可以使用以下代码发送一个 User 消息:
User user = User.newBuilder().setId(1).setName("Alice").build();
userProducer.sendUser(user);控制台应该能够看到消费者成功接收到 User 数据
3. 配置消费者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "avro-consumer");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", KafkaAvroDeserializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081");Consumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("user-topic"));while (true) {ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, GenericRecord> record : records) {System.out.println("Received: " + record.value().get("name"));}
}------ SpringBoot框架 直接用配置application.yml 和消费者服务类--------------
在 application.yml 中配置消费者参数:spring:kafka:consumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializerproperties:specific.avro.reader: true然后编写 Kafka 消费者代码:@Service
@KafkaListener(topics = "user_topic", groupId = "user_group")
public class UserConsumer {@KafkaHandlerpublic void consume(User user) {System.out.println("Received user: " + user.getName());}
}
常见问题与解决方案
- Schema兼容性错误
- 现象:生产者更新Schema后消费者无法解析旧数据。
- 解决:在Schema Registry中配置兼容性策略(如
BACKWARD
),允许新增字段并设置默认值7。
- ClassNotFoundException
- 现象:反序列化时提示Avro生成的类不存在。
- 解决:通过Maven插件
avro-maven-plugin
自动生成Java类,并确保生成路径在编译范围内2。
- 性能瓶颈
- 现象:高吞吐场景下序列化延迟较高。
- 优化:复用
DatumWriter
和DatumReader
实例,避免重复初始化开销7。
总结
Avro通过Schema定义与二进制编码,为Kafka提供了高效、类型安全的序列化方案。结合Schema Registry可实现动态兼容性管理,适用于复杂业务场景下的数据演进需求。实践中需注意Schema版本控制与性能调优,具体工具链配置可参考Confluent官方文档27。
引用说明
- 代码结构参考自SpringBoot RestTemplate配置方案,通过替换默认组件实现功能增强。
- Schema兼容性问题分析借鉴了MAT工具中内存对象关联性的排查思路。
后续
下期预告,敬请关注:
(八)消息队列-Kafka 生产者
相关文章:

(七)消息队列-Kafka 序列化avro(传递)
(七)消息队列-Kafka 序列化avro(传递) 客从远方来,遗我双鲤鱼。呼儿烹鲤鱼,中有尺素书。 ——佚名《饮马长城窟行》 本文已同步CSDN、掘金平台、知乎等多个平台,图片依然保持最初发布的水印&…...

js基础二
JavaScript基础下 1 事件处理 JS 事件(event)是当用户与网页进行交互时发生的事情,例如单机某个链接或按钮、在文本框中输入文本、按下键盘上的某个按键、移动鼠标等等。当事件发生时,您可以使用 JavaScript 中的事件处理程序&a…...

WSBDF レクチア 定义2 引理3 wsbdf的乘子
定义2 引理3 wsbdf的乘子 ここまで 寝みます❓...
Qt之QStateMachine等待
在项目中经常需要等待,我们模拟0-30的数,假如我们其中5, 25的数需要进行等待,等待用户处理完自己事情后,按下按钮继续,找Qt的项目中有一个 QStateMachineqstatemmachine类提供了一个分层有限状态机。 QSta…...
Wireshark 插件开发实战指南
Wireshark 插件开发实战指南 环境搭建流程图 #mermaid-svg-XpNibno7BIyfzNn5 {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-XpNibno7BIyfzNn5 .error-icon{fill:#552222;}#mermaid-svg-XpNibno7BIyfzNn5 .error-t…...

基于SpringBoot的“青少年心理健康教育网站”的设计与实现(源码+数据库+文档+PPT)
基于SpringBoot的“青少年心理健康教育网站”的设计与实现(源码数据库文档PPT) 开发语言:Java 数据库:MySQL 技术:SpringBoot 工具:IDEA/Ecilpse、Navicat、Maven 系统展示 系统总体结构图 实体属性图 系统首页界…...
23-整数转罗马数字
代码 测试用例 测试结果 测试结果 12. 整数转罗马数字 中等 相关标签 相关企业 七个不同的符号代表罗马数字,其值如下: 符号值I1V5X10L50C100D500M1000 罗马数字是通过添加从最高到最低的小数位值的转换而形成的。将小数位值转换为罗马数字有以…...

SpringBoot+Redis+Mybatis-plus黑马点评
短信登录 基于Session实现登录 流程: 发送短信验证码-->短信验证码注册登录-->校验登录状态(保存用户到ThreadLocal,方便后续使用) 不能每次请求服务都要进行登录状态校验,解决办法:拦截器 在Sp…...
深入剖析 OpenCV:全面掌握基础操作、图像处理算法与特征匹配
深入剖析 OpenCV:全面掌握基础操作、图像处理算法与特征匹配 一、引言二、OpenCV 的安装(一)使用 pip 安装(二)使用 Anaconda 安装 三、OpenCV 基础操作(一)图像的读取、显示与保存(…...
【C语言显示Linux系统参数】
在C语言中,可以通过调用Linux系统提供的API来获取和显示系统参数。以下是一些常见的系统参数及其获取方法: 1. 获取系统名称和版本 可以使用uname函数来获取系统名称、版本等信息。 #include <stdio.h> #include <sys/utsname.h>int main…...
突破Ajax跨域困境,解锁前端通信新姿势
一、引言 在当今的 Web 开发领域,前后端分离的架构模式已经成为主流,它极大地提升了开发效率和项目的可维护性。在这种开发模式下,前端通过 Ajax 技术与后端进行数据交互,然而,跨域问题却如影随形,成为了开…...
Kotlin协变与逆变区别
在Kotlin中,协变和逆变是泛型编程中的两个重要概念,它们允许我们在类型系统中更加灵活地处理类型关系。 1.协变:协变允许我们使用比原始类型更具体的类型。在kotlin中,通过在类型参数上加out关键字来表示协变,生产者,例…...
driver中为什么要使用非阻塞赋值
1. 模拟硬件时序行为 实际硬件行为:DUT的输入信号通常在时钟边沿被采样。Driver需要确保信号的更新与时钟同步,而非阻塞赋值的延迟更新特性(在时间步结束时统一生效)能够准确模拟寄存器的行为。 示例: always (posedg…...

模板字符串【ES6】
“路漫漫其修远兮,吾将上下而求索。”—— 屈原《离骚》 目录 什么是模板字符串?模板字符串特性及代码举例:详细举例用法: 什么是模板字符串? 模板字符串(Template Literals)是JavaScript中引入…...
通往 AI 之路:Python 机器学习入门-数据结构
Python 数据结构 Python 提供了多种数据结构来存储和操作数据,其中列表(list)、字典(dict)、元组(tuple)和集合(set)是最常用的几种。本章将详细介绍这些数据结构的基本…...
我们应该如何优化UI(基于UGUI)
这是一道面试题,下面,我们来详细分析这个问题。 目录 1. 减少 Draw Call 合理设置图集 避免材质和 Shader 的频繁切换 减少 UI 元素的重叠 2. 优化UI布局 3. 优化UI元素的渲染 4.优化UI动画 5. 优化 UI 事件处理 6. 运行时优化 1. 减少 Draw C…...
CSS3 圆角:实现与优化指南
CSS3 圆角:实现与优化指南 随着网页设计的发展,CSS3 圆角已经成为了现代网页设计中不可或缺的元素之一。本文将详细讲解 CSS3 圆角的基本用法、实现方式以及优化技巧,帮助您在网页设计中更好地运用这一功能。 一、CSS3 圆角基本用法 1.1 基…...
【网络安全 | 扫描子域+发现真实IP】CloakQuest3r安装使用详细教程
原创文章,禁止转载。 本文仅作学习交流使用,不得用于非法渗透,笔者不承担任何责任。 文章目录 简介功能介绍执行流程限制安装步骤可选功能:SecurityTrails API使用示例简介 CloakQuest3r 是一款强大的 Python 工具,专为揭示受 Cloudflare 及类似服务保护的网站真实 IP 地…...

Mellanox OFED驱动如何给全局编译添加gcc的编译选项?(subdir-ccflags-y += -Wall)
背景 有些时候编译驱动需要给全局加一个编译选项,假设configure已经完成。可以直接在Makefile中修改 添加方式 修改OFED驱动目录下的: ./Makefile subdir-ccflags-y -Wall修改效果: 然后执行make,就能让添加的编译选项生效…...

【愚公系列】《Python网络爬虫从入门到精通》037-文件的存取
标题详情作者简介愚公搬代码头衔华为云特约编辑,华为云云享专家,华为开发者专家,华为产品云测专家,CSDN博客专家,CSDN商业化专家,阿里云专家博主,阿里云签约作者,腾讯云优秀博主,腾讯云内容共创官,掘金优秀博主,亚马逊技领云博主,51CTO博客专家等。近期荣誉2022年度…...
[特殊字符] 智能合约中的数据是如何在区块链中保持一致的?
🧠 智能合约中的数据是如何在区块链中保持一致的? 为什么所有区块链节点都能得出相同结果?合约调用这么复杂,状态真能保持一致吗?本篇带你从底层视角理解“状态一致性”的真相。 一、智能合约的数据存储在哪里…...

黑马Mybatis
Mybatis 表现层:页面展示 业务层:逻辑处理 持久层:持久数据化保存 在这里插入图片描述 Mybatis快速入门 
Redis相关知识总结(缓存雪崩,缓存穿透,缓存击穿,Redis实现分布式锁,如何保持数据库和缓存一致)
文章目录 1.什么是Redis?2.为什么要使用redis作为mysql的缓存?3.什么是缓存雪崩、缓存穿透、缓存击穿?3.1缓存雪崩3.1.1 大量缓存同时过期3.1.2 Redis宕机 3.2 缓存击穿3.3 缓存穿透3.4 总结 4. 数据库和缓存如何保持一致性5. Redis实现分布式…...
laravel8+vue3.0+element-plus搭建方法
创建 laravel8 项目 composer create-project --prefer-dist laravel/laravel laravel8 8.* 安装 laravel/ui composer require laravel/ui 修改 package.json 文件 "devDependencies": {"vue/compiler-sfc": "^3.0.7","axios": …...

招商蛇口 | 执笔CID,启幕低密生活新境
作为中国城市生长的力量,招商蛇口以“美好生活承载者”为使命,深耕全球111座城市,以央企担当匠造时代理想人居。从深圳湾的开拓基因到西安高新CID的战略落子,招商蛇口始终与城市发展同频共振,以建筑诠释对土地与生活的…...

[免费]微信小程序问卷调查系统(SpringBoot后端+Vue管理端)【论文+源码+SQL脚本】
大家好,我是java1234_小锋老师,看到一个不错的微信小程序问卷调查系统(SpringBoot后端Vue管理端)【论文源码SQL脚本】,分享下哈。 项目视频演示 【免费】微信小程序问卷调查系统(SpringBoot后端Vue管理端) Java毕业设计_哔哩哔哩_bilibili 项…...

Python 实现 Web 静态服务器(HTTP 协议)
目录 一、在本地启动 HTTP 服务器1. Windows 下安装 node.js1)下载安装包2)配置环境变量3)安装镜像4)node.js 的常用命令 2. 安装 http-server 服务3. 使用 http-server 开启服务1)使用 http-server2)详解 …...
适应性Java用于现代 API:REST、GraphQL 和事件驱动
在快速发展的软件开发领域,REST、GraphQL 和事件驱动架构等新的 API 标准对于构建可扩展、高效的系统至关重要。Java 在现代 API 方面以其在企业应用中的稳定性而闻名,不断适应这些现代范式的需求。随着不断发展的生态系统,Java 在现代 API 方…...

《Docker》架构
文章目录 架构模式单机架构应用数据分离架构应用服务器集群架构读写分离/主从分离架构冷热分离架构垂直分库架构微服务架构容器编排架构什么是容器,docker,镜像,k8s 架构模式 单机架构 单机架构其实就是应用服务器和单机服务器都部署在同一…...

从物理机到云原生:全面解析计算虚拟化技术的演进与应用
前言:我的虚拟化技术探索之旅 我最早接触"虚拟机"的概念是从Java开始的——JVM(Java Virtual Machine)让"一次编写,到处运行"成为可能。这个软件层面的虚拟化让我着迷,但直到后来接触VMware和Doc…...