(七)消息队列-Kafka 序列化avro(传递)
(七)消息队列-Kafka 序列化avro(传递)
客从远方来,遗我双鲤鱼。呼儿烹鲤鱼,中有尺素书。
——佚名《饮马长城窟行》

本文已同步CSDN、掘金平台、知乎等多个平台,图片依然保持最初发布的水印(如CSDN水印)。(以后属于本人原创均以新建状态在多个平台分享发布)
前言
多年前,由于工作的性质,发现这系列没有写完,想了想,做人做事还是要有始有终。🤣实在是借口太多了,太不像话了…由于时间过得太久了,这篇开始,可能很多技术以最新或最近的几个版本为主了。
问题背景
在Kafka中,生产者与消费者之间传输消息时,通常需要对数据进行序列化和反序列化。常见的序列化方式如JSON或String存在以下问题:
- 数据冗余:字段名重复存储,占用带宽;
- 兼容性差:新增或删除字段时容易导致上下游解析失败;
- 类型安全缺失:动态解析易引发运行时错误。
而Avro作为一种高效的二进制序列化框架,通过Schema定义数据结构,可实现紧凑存储、动态兼容性和强类型校验,成为Kafka生态中推荐的序列化方案27。
核心原理
-
Schema驱动
Avro要求所有数据必须与预定义的Schema文件(.avsc)匹配。Schema以JSON格式描述数据结构,例如:{"type": "record","name": "User","namespace": "com.example.avro","fields": [{"name": "id", "type": "int"},{"name": "name", "type": "string"}] }然后使用
avro-maven-plugin生成 Java 类:<plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>1.11.0</version><executions><execution><phase>generate-sources</phase><goals><goal>schema</goal></goals></execution></executions> </plugin>执行
mvn clean compile后,com.example.avro.User类会被自动生成。生产者与消费者需共享同一Schema,确保序列化与反序列化的一致性。
-
二进制编码
Avro将数据转换为紧凑的二进制格式,相比JSON减少约30%-50%的存储与传输开销。例如,整型字段直接以二进制存储,无需字段名冗余7。 -
Schema Registry
为实现Schema动态管理,通常搭配Schema Registry(如Confluent或Apicurio)使用。其核心功能包括:- Schema版本控制与兼容性检查;
- 通过唯一ID关联消息与Schema,避免传输完整Schema带来的性能损耗。
实现步骤
以下以Java代码为例,展示Kafka集成Avro的配置方法:
1. 添加依赖
<dependencies><!-- Spring Kafka 依赖--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- Avro 依赖 --><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId></dependency><!-- Schema Registry 依赖 --><dependency><groupId>io.confluent</groupId><artifactId>kafka-avro-serializer</artifactId><version>7.2.1</version></dependency>
</dependencies>
运行 HTML
2. 配置生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", KafkaAvroSerializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081"); // Schema Registry地址Producer<String, GenericRecord> producer = new KafkaProducer<>(props);// 构建Avro消息
GenericRecord user = new GenericData.Record(schema);
user.put("id", 1);
user.put("name", "Alice");producer.send(new ProducerRecord<>("user-topic", user));------ SpringBoot框架 直接用配置application.yml 和生产者服务类--------------
spring:kafka:bootstrap-servers: localhost:9092properties:schema.registry.url: http://localhost:8081producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer@Service
public class UserProducer {private final KafkaTemplate<String, User> kafkaTemplate;@Value("${kafka.topic.user}")private String topic;public UserProducer(KafkaTemplate<String, User> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendUser(User user) {kafkaTemplate.send(topic, user.getId().toString(), user);}
}在 Spring Boot 启动后,我们可以使用以下代码发送一个 User 消息:
User user = User.newBuilder().setId(1).setName("Alice").build();
userProducer.sendUser(user);控制台应该能够看到消费者成功接收到 User 数据
3. 配置消费者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "avro-consumer");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", KafkaAvroDeserializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081");Consumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("user-topic"));while (true) {ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, GenericRecord> record : records) {System.out.println("Received: " + record.value().get("name"));}
}------ SpringBoot框架 直接用配置application.yml 和消费者服务类--------------
在 application.yml 中配置消费者参数:spring:kafka:consumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializerproperties:specific.avro.reader: true然后编写 Kafka 消费者代码:@Service
@KafkaListener(topics = "user_topic", groupId = "user_group")
public class UserConsumer {@KafkaHandlerpublic void consume(User user) {System.out.println("Received user: " + user.getName());}
}
常见问题与解决方案
- Schema兼容性错误
- 现象:生产者更新Schema后消费者无法解析旧数据。
- 解决:在Schema Registry中配置兼容性策略(如
BACKWARD),允许新增字段并设置默认值7。
- ClassNotFoundException
- 现象:反序列化时提示Avro生成的类不存在。
- 解决:通过Maven插件
avro-maven-plugin自动生成Java类,并确保生成路径在编译范围内2。
- 性能瓶颈
- 现象:高吞吐场景下序列化延迟较高。
- 优化:复用
DatumWriter和DatumReader实例,避免重复初始化开销7。
总结
Avro通过Schema定义与二进制编码,为Kafka提供了高效、类型安全的序列化方案。结合Schema Registry可实现动态兼容性管理,适用于复杂业务场景下的数据演进需求。实践中需注意Schema版本控制与性能调优,具体工具链配置可参考Confluent官方文档27。
引用说明
- 代码结构参考自SpringBoot RestTemplate配置方案,通过替换默认组件实现功能增强。
- Schema兼容性问题分析借鉴了MAT工具中内存对象关联性的排查思路。
后续
下期预告,敬请关注:
(八)消息队列-Kafka 生产者
相关文章:
(七)消息队列-Kafka 序列化avro(传递)
(七)消息队列-Kafka 序列化avro(传递) 客从远方来,遗我双鲤鱼。呼儿烹鲤鱼,中有尺素书。 ——佚名《饮马长城窟行》 本文已同步CSDN、掘金平台、知乎等多个平台,图片依然保持最初发布的水印&…...
springboot使用redis
springboot使用redis redis-service.exe : 服务端,启动后不要关闭 redis-cli.exe : 客户端,访问redis中的数据 redisclient-win32.x86_64.2.0.jar : redis的图形界面客户端,执行方式是在这个文件的目录执行 java -jar redisclient-win32.x86_64.2.0.jar或者在这个jar包的目录…...
【原创】Open WebUI 本地部署
使用官网的默认部署,遇到不少的问题。比如白屏问题,其实需要修改几个参数即可。 其实在部署的时候有不少参数 WEBUI_AUTH False ENABLE_OPENAI_API 0 PATH /usr/local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin LANG C.UTF-8…...
【实战篇】【深度解析DeepSeek:从机器学习到深度学习的全场景落地指南】
一、机器学习模型:DeepSeek的降维打击 1.1 监督学习与无监督学习的"左右互搏" 监督学习就像学霸刷题——给标注数据(参考答案)训练模型。DeepSeek在信贷风控场景中,用逻辑回归模型分析百万级用户数据,通过特征工程挖掘出"凌晨3点频繁申请贷款"这类魔…...
SpringBoot高校运动会管理系统 附带详细运行指导视频
文章目录 一、项目演示二、项目介绍三、运行截图四、主要代码1.报名赛事代码2.用户登录代码3.保存成绩代码 一、项目演示 项目演示地址: 视频地址 二、项目介绍 项目描述:这是一个基于SpringBoot框架开发的高校运动会管理系统项目。首先,这…...
【Linux网络-HTTP协议】HTTP基础概念+构建HTTP
代码定位:南毅c/Linux - Gitee.com HTTP协议 介绍 虽然我们说,应用层协议是我们程序猿自己定的.但实际上,已经有大佬们定义了一些现成的,又非常好用的应用层协议,供我们直接参考使用。HTTP(超文本传输协议)就是其中之一。 在互联网世界中,…...
web3.0简介
Web3.0(或简称 Web3)是近年来广泛讨论的一个新型互联网概念,其核心思想在于利用区块链及相关分布式技术,打造一个更加开放、去中心化、透明且以用户为主导的网络生态系统。这意味着在 Web3.0 时代,用户不再只是信息的消…...
高频 SQL 50 题(基础版)_626. 换座位
高频 SQL 50 题(基础版)_626. 换座位 select(case when mod(id,2)!0 AND counts ! id then id1when mod(id,2)!0 AND counts id then idelse id -1end) as id,student fromseat,(selectcount(*) as countsfrom seat) as seat_counts order by id asc;...
hive 面试题
Hive基础概念 1.1 Hive是什么? 基于Hadoop的数据仓库工具,支持类SQL(HiveQL)查询,底层转换为MapReduce/Tez/Spark任务。 核心功能:数据ETL、查询、分析;定位:OLAP(分析…...
【Jenkins】个人向-Jenkinsfile如何写
官方参考:https://www.jenkins.io/doc/book/pipeline/syntax/ Pipeline Utility Steps 插件:https://birdbook.com.cn/ops/ci/jenkins/plugins/pipeline%20utility%20steps.html 常用环境变量 含义表达式备注params,传入参数传入参数params…...
python第十一课:并发编程 | 多任务交响乐团
🎯 本节目标 理解多线程/多进程/协程的应用场景掌握threading与multiprocessing核心用法学会使用asyncio进行异步编程开发实战项目:高并发爬虫引擎破解GIL锁的性能迷思 1️⃣ 并发编程三剑客 🎻 生活化比喻: 多线程 → 餐厅多个…...
Android SystemUI深度定制实战:下拉状态栏集成响铃功能开关全解析
一、功能实现全景视图 目标场景:在Android 14系统级ROM定制中,为SystemUI下拉状态栏的QuickQSPanel区域新增响铃模式切换开关,实现静音/响铃快速切换功能。该功能需通过三层关键改造实现: 二、核心实现三部曲 1. 配置注入&…...
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程的演示都将在 Flink CDC CLI 中进行,无需一行 Java/Scala 代码,也无需安装 IDE。 这篇教程将展示如何基于 Flink CDC YAML 快速构建 MySQL 到 Kafka 的 Streaming ELT 作业,包含整库同步、表结构变更同步演示和关键参数介绍。 准备阶段…...
ubuntu下r8125网卡重启丢失修复案例一则
刚装的一台服务器,ubuntu24.04,主板网卡是r8125,安装服务后会莫名其妙丢失驱动 按照官网的方法下载最新8125驱动包: Realtek 然后卸载驱动 rmmod r8125 然后在驱动包里安装(幸好我之前装了build-essential&#x…...
解决 ERROR 1130 (HY000): Host is not allowed to connect to this MySQL server
当使用 MySQL 时,您可能会遇到错误信息“ERROR 1130 (HY000): Host ‘hostname’is not allowed to connect to this MySQL server”这是 MySQL 用于防止未经授权的访问的标准安全特性。实际上,服务器还没有配置为接受来自相关主机的连接。 Common Caus…...
科普|无人机专业术语
文章目录 前言一、飞控二、电调三、通道四、2S、3S、4S电池五、电池后面C是什么意思?六、电机的型号七、什么是电机的KV值?八、螺旋桨的型号九、电机与螺旋桨的搭配 前言 无人机飞控系统控制飞行姿态,电调控制电机转速,遥控器通道控制飞行动作。电池C…...
Qt:窗口
目录 菜单栏 QMenuBar 菜单添加快捷键 添加子菜单 添加分割线和添加图标 QMenuBar创建方式 工具栏 QToolBar 和菜单栏搭配 创建多个工具栏 状态栏 QStatusBar 状态栏中添加其他控件 浮动窗口 QDockWidget 对话框 对话框的内存释放问题 自定义对话框界面 模态对话…...
深入浅出 Go 语言:协程(Goroutine)详解
深入浅出 Go 语言:协程(Goroutine)详解 引言 Go 语言的协程(goroutine)是其并发模型的核心特性之一。协程允许你轻松地编写并发代码,而不需要复杂的线程管理和锁机制。通过协程,你可以同时执行多个任务,并…...
Python从0到100(八十九):Resnet、LSTM、Shufflenet、CNN四种网络分析及对比
前言: 零基础学Python:Python从0到100最新最全教程。 想做这件事情很久了,这次我更新了自己所写过的所有博客,汇集成了Python从0到100,共一百节课,帮助大家一个月时间里从零基础到学习Python基础语法、Pyth…...
实验:k8s+keepalived+nginx+iptables
1、创建两个nginx的pod,app都是nginx nginx1 nginx2 2、创建两个的pod的service 3、配置两台keepalived的调度器和nginx七层反向代理,VIP设置192.168.254.110 keepalived调度器master keepalived调度器backup 两台调度器都配置nginx七层反向代理&#…...
elpis全栈课程学习之elpis-core学习总结
elpis全栈课程学习之elpis-core学习总结 核心原理 elpis-core是全栈框架elpis的服务端内核,主要应用于服务端接口的开发以及页面的SSR渲染,elpis-core基于约定优于配置的原理,通过一系列的loader来加载对应的文件,大大节约用户的…...
LlamaFactory-webui:训练大语言模型的入门级教程
LlamaFactory是一个开源框架,支持多种流行的语言模型,及多种微调技术,同时,以友好的交互式界面,简化了大语言模型的学习。 本章内容,从如何拉取,我已经搭建好的Llamafactory镜像开始࿰…...
手机打电话时如何识别对方按下的DTMF按键的字符-安卓AI电话机器人
手机打电话时如何识别对方按下的DTMF按键的字符 --安卓AI电话机器人 一、前言 前面的篇章中,使用蓝牙电话拦截手机通话的声音,并对数据加工,这个功能出来也有一段时间了。前段时间有试用的用户咨询说:有没有办法在手机上ÿ…...
Spring Cloud LoadBalancer详解
一、介绍 Spring Cloud LoadBalancer是Spring Cloud官方自己提供的客户端负载均衡器,抽象和实现,用来替代Ribbon(已经停更), 二、Ribbon和Loadbalance 对比 组件组件提供的负载策略支持负载的客户端Ribbon随机 RandomRule轮询 RoundRobinRule 重试 RetryRule最低并发 Bes…...
使用Spring Boot与达梦数据库(DM)进行多数据源配置及MyBatis Plus集成
使用Spring Boot与达梦数据库(DM)进行多数据源配置及MyBatis Plus集成 在现代企业级应用开发中,处理多个数据源是一个常见的需求。本文将详细介绍如何使用Spring Boot结合达梦数据库(DM),并通过MyBatis Plus来简化数据库操作&…...
基于SpringBoot和PostGIS的省域“地理难抵点(最纵深处)”检索及可视化实践
目录 前言 1、研究背景 2、研究意义 一、研究目标 1、“地理难抵点”的概念 二、“难抵点”空间检索实现 1、数据获取与处理 2、计算流程 3、难抵点计算 4、WebGIS可视化 三、成果展示 1、华东地区 2、华南地区 3、华中地区 4、华北地区 5、西北地区 6、西南地…...
【Qt】详细介绍如何在Visual Studio Code中编译、运行Qt项目
Visual Studio Code一只用的顺手,写Qt的时候也能用VS Code开发就方便多了。 理论上也不算困难,毕竟Qt项目其实就是CMake(QMake的情况这里就暂不考虑了)项目,VS Code在编译、运行CMake项目还是比较成熟的。 这里笔者打…...
【Linux】修改 core 文件大小和路径
在 Linux 系统中,默认情况下,核心转储文件(core dump)会生成在当前工作目录下。为了将核心转储文件生成在指定路径下,可以通过以下方法进行配置。 1. 设置核心转储文件路径 Linux 系统提供了两种方式来指定核心转储文…...
本地部署大语言模型-DeepSeek
DeepSeek 是国内顶尖 AI 团队「深度求索」开发的多模态大模型,具备数学推理、代码生成等深度能力,堪称"AI界的六边形战士"。 Hostease AMD 9950X/96G/3.84T NVMe/1G/5IP/RTX4090 GPU服务器提供多种计费模式。 DeepSeek-R1-32B配置 配置项 规…...
【03】STM32F407 HAL 库框架设计学习
【03】STM32F407 HAL 库框架设计学习 摘要 本文旨在为初学者提供一个关于STM32F407微控制器HAL(Hardware Abstraction Layer)库框架设计的详细学习教程。通过本文,读者将从零开始,逐步掌握STM32F407的基本知识、HAL库的配置步骤…...
