(七)消息队列-Kafka 序列化avro(传递)
(七)消息队列-Kafka 序列化avro(传递)
客从远方来,遗我双鲤鱼。呼儿烹鲤鱼,中有尺素书。
——佚名《饮马长城窟行》

本文已同步CSDN、掘金平台、知乎等多个平台,图片依然保持最初发布的水印(如CSDN水印)。(以后属于本人原创均以新建状态在多个平台分享发布)
前言
多年前,由于工作的性质,发现这系列没有写完,想了想,做人做事还是要有始有终。🤣实在是借口太多了,太不像话了…由于时间过得太久了,这篇开始,可能很多技术以最新或最近的几个版本为主了。
问题背景
在Kafka中,生产者与消费者之间传输消息时,通常需要对数据进行序列化和反序列化。常见的序列化方式如JSON或String存在以下问题:
- 数据冗余:字段名重复存储,占用带宽;
- 兼容性差:新增或删除字段时容易导致上下游解析失败;
- 类型安全缺失:动态解析易引发运行时错误。
而Avro作为一种高效的二进制序列化框架,通过Schema定义数据结构,可实现紧凑存储、动态兼容性和强类型校验,成为Kafka生态中推荐的序列化方案27。
核心原理
-
Schema驱动
Avro要求所有数据必须与预定义的Schema文件(.avsc)匹配。Schema以JSON格式描述数据结构,例如:{"type": "record","name": "User","namespace": "com.example.avro","fields": [{"name": "id", "type": "int"},{"name": "name", "type": "string"}] }然后使用
avro-maven-plugin生成 Java 类:<plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>1.11.0</version><executions><execution><phase>generate-sources</phase><goals><goal>schema</goal></goals></execution></executions> </plugin>执行
mvn clean compile后,com.example.avro.User类会被自动生成。生产者与消费者需共享同一Schema,确保序列化与反序列化的一致性。
-
二进制编码
Avro将数据转换为紧凑的二进制格式,相比JSON减少约30%-50%的存储与传输开销。例如,整型字段直接以二进制存储,无需字段名冗余7。 -
Schema Registry
为实现Schema动态管理,通常搭配Schema Registry(如Confluent或Apicurio)使用。其核心功能包括:- Schema版本控制与兼容性检查;
- 通过唯一ID关联消息与Schema,避免传输完整Schema带来的性能损耗。
实现步骤
以下以Java代码为例,展示Kafka集成Avro的配置方法:
1. 添加依赖
<dependencies><!-- Spring Kafka 依赖--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- Avro 依赖 --><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId></dependency><!-- Schema Registry 依赖 --><dependency><groupId>io.confluent</groupId><artifactId>kafka-avro-serializer</artifactId><version>7.2.1</version></dependency>
</dependencies>
运行 HTML
2. 配置生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", KafkaAvroSerializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081"); // Schema Registry地址Producer<String, GenericRecord> producer = new KafkaProducer<>(props);// 构建Avro消息
GenericRecord user = new GenericData.Record(schema);
user.put("id", 1);
user.put("name", "Alice");producer.send(new ProducerRecord<>("user-topic", user));------ SpringBoot框架 直接用配置application.yml 和生产者服务类--------------
spring:kafka:bootstrap-servers: localhost:9092properties:schema.registry.url: http://localhost:8081producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer@Service
public class UserProducer {private final KafkaTemplate<String, User> kafkaTemplate;@Value("${kafka.topic.user}")private String topic;public UserProducer(KafkaTemplate<String, User> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendUser(User user) {kafkaTemplate.send(topic, user.getId().toString(), user);}
}在 Spring Boot 启动后,我们可以使用以下代码发送一个 User 消息:
User user = User.newBuilder().setId(1).setName("Alice").build();
userProducer.sendUser(user);控制台应该能够看到消费者成功接收到 User 数据
3. 配置消费者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "avro-consumer");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", KafkaAvroDeserializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081");Consumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("user-topic"));while (true) {ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, GenericRecord> record : records) {System.out.println("Received: " + record.value().get("name"));}
}------ SpringBoot框架 直接用配置application.yml 和消费者服务类--------------
在 application.yml 中配置消费者参数:spring:kafka:consumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializerproperties:specific.avro.reader: true然后编写 Kafka 消费者代码:@Service
@KafkaListener(topics = "user_topic", groupId = "user_group")
public class UserConsumer {@KafkaHandlerpublic void consume(User user) {System.out.println("Received user: " + user.getName());}
}
常见问题与解决方案
- Schema兼容性错误
- 现象:生产者更新Schema后消费者无法解析旧数据。
- 解决:在Schema Registry中配置兼容性策略(如
BACKWARD),允许新增字段并设置默认值7。
- ClassNotFoundException
- 现象:反序列化时提示Avro生成的类不存在。
- 解决:通过Maven插件
avro-maven-plugin自动生成Java类,并确保生成路径在编译范围内2。
- 性能瓶颈
- 现象:高吞吐场景下序列化延迟较高。
- 优化:复用
DatumWriter和DatumReader实例,避免重复初始化开销7。
总结
Avro通过Schema定义与二进制编码,为Kafka提供了高效、类型安全的序列化方案。结合Schema Registry可实现动态兼容性管理,适用于复杂业务场景下的数据演进需求。实践中需注意Schema版本控制与性能调优,具体工具链配置可参考Confluent官方文档27。
引用说明
- 代码结构参考自SpringBoot RestTemplate配置方案,通过替换默认组件实现功能增强。
- Schema兼容性问题分析借鉴了MAT工具中内存对象关联性的排查思路。
后续
下期预告,敬请关注:
(八)消息队列-Kafka 生产者
相关文章:
(七)消息队列-Kafka 序列化avro(传递)
(七)消息队列-Kafka 序列化avro(传递) 客从远方来,遗我双鲤鱼。呼儿烹鲤鱼,中有尺素书。 ——佚名《饮马长城窟行》 本文已同步CSDN、掘金平台、知乎等多个平台,图片依然保持最初发布的水印&…...
js基础二
JavaScript基础下 1 事件处理 JS 事件(event)是当用户与网页进行交互时发生的事情,例如单机某个链接或按钮、在文本框中输入文本、按下键盘上的某个按键、移动鼠标等等。当事件发生时,您可以使用 JavaScript 中的事件处理程序&a…...
WSBDF レクチア 定义2 引理3 wsbdf的乘子
定义2 引理3 wsbdf的乘子 ここまで 寝みます❓...
Qt之QStateMachine等待
在项目中经常需要等待,我们模拟0-30的数,假如我们其中5, 25的数需要进行等待,等待用户处理完自己事情后,按下按钮继续,找Qt的项目中有一个 QStateMachineqstatemmachine类提供了一个分层有限状态机。 QSta…...
Wireshark 插件开发实战指南
Wireshark 插件开发实战指南 环境搭建流程图 #mermaid-svg-XpNibno7BIyfzNn5 {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-XpNibno7BIyfzNn5 .error-icon{fill:#552222;}#mermaid-svg-XpNibno7BIyfzNn5 .error-t…...
基于SpringBoot的“青少年心理健康教育网站”的设计与实现(源码+数据库+文档+PPT)
基于SpringBoot的“青少年心理健康教育网站”的设计与实现(源码数据库文档PPT) 开发语言:Java 数据库:MySQL 技术:SpringBoot 工具:IDEA/Ecilpse、Navicat、Maven 系统展示 系统总体结构图 实体属性图 系统首页界…...
23-整数转罗马数字
代码 测试用例 测试结果 测试结果 12. 整数转罗马数字 中等 相关标签 相关企业 七个不同的符号代表罗马数字,其值如下: 符号值I1V5X10L50C100D500M1000 罗马数字是通过添加从最高到最低的小数位值的转换而形成的。将小数位值转换为罗马数字有以…...
SpringBoot+Redis+Mybatis-plus黑马点评
短信登录 基于Session实现登录 流程: 发送短信验证码-->短信验证码注册登录-->校验登录状态(保存用户到ThreadLocal,方便后续使用) 不能每次请求服务都要进行登录状态校验,解决办法:拦截器 在Sp…...
深入剖析 OpenCV:全面掌握基础操作、图像处理算法与特征匹配
深入剖析 OpenCV:全面掌握基础操作、图像处理算法与特征匹配 一、引言二、OpenCV 的安装(一)使用 pip 安装(二)使用 Anaconda 安装 三、OpenCV 基础操作(一)图像的读取、显示与保存(…...
【C语言显示Linux系统参数】
在C语言中,可以通过调用Linux系统提供的API来获取和显示系统参数。以下是一些常见的系统参数及其获取方法: 1. 获取系统名称和版本 可以使用uname函数来获取系统名称、版本等信息。 #include <stdio.h> #include <sys/utsname.h>int main…...
突破Ajax跨域困境,解锁前端通信新姿势
一、引言 在当今的 Web 开发领域,前后端分离的架构模式已经成为主流,它极大地提升了开发效率和项目的可维护性。在这种开发模式下,前端通过 Ajax 技术与后端进行数据交互,然而,跨域问题却如影随形,成为了开…...
Kotlin协变与逆变区别
在Kotlin中,协变和逆变是泛型编程中的两个重要概念,它们允许我们在类型系统中更加灵活地处理类型关系。 1.协变:协变允许我们使用比原始类型更具体的类型。在kotlin中,通过在类型参数上加out关键字来表示协变,生产者,例…...
driver中为什么要使用非阻塞赋值
1. 模拟硬件时序行为 实际硬件行为:DUT的输入信号通常在时钟边沿被采样。Driver需要确保信号的更新与时钟同步,而非阻塞赋值的延迟更新特性(在时间步结束时统一生效)能够准确模拟寄存器的行为。 示例: always (posedg…...
模板字符串【ES6】
“路漫漫其修远兮,吾将上下而求索。”—— 屈原《离骚》 目录 什么是模板字符串?模板字符串特性及代码举例:详细举例用法: 什么是模板字符串? 模板字符串(Template Literals)是JavaScript中引入…...
通往 AI 之路:Python 机器学习入门-数据结构
Python 数据结构 Python 提供了多种数据结构来存储和操作数据,其中列表(list)、字典(dict)、元组(tuple)和集合(set)是最常用的几种。本章将详细介绍这些数据结构的基本…...
我们应该如何优化UI(基于UGUI)
这是一道面试题,下面,我们来详细分析这个问题。 目录 1. 减少 Draw Call 合理设置图集 避免材质和 Shader 的频繁切换 减少 UI 元素的重叠 2. 优化UI布局 3. 优化UI元素的渲染 4.优化UI动画 5. 优化 UI 事件处理 6. 运行时优化 1. 减少 Draw C…...
CSS3 圆角:实现与优化指南
CSS3 圆角:实现与优化指南 随着网页设计的发展,CSS3 圆角已经成为了现代网页设计中不可或缺的元素之一。本文将详细讲解 CSS3 圆角的基本用法、实现方式以及优化技巧,帮助您在网页设计中更好地运用这一功能。 一、CSS3 圆角基本用法 1.1 基…...
【网络安全 | 扫描子域+发现真实IP】CloakQuest3r安装使用详细教程
原创文章,禁止转载。 本文仅作学习交流使用,不得用于非法渗透,笔者不承担任何责任。 文章目录 简介功能介绍执行流程限制安装步骤可选功能:SecurityTrails API使用示例简介 CloakQuest3r 是一款强大的 Python 工具,专为揭示受 Cloudflare 及类似服务保护的网站真实 IP 地…...
Mellanox OFED驱动如何给全局编译添加gcc的编译选项?(subdir-ccflags-y += -Wall)
背景 有些时候编译驱动需要给全局加一个编译选项,假设configure已经完成。可以直接在Makefile中修改 添加方式 修改OFED驱动目录下的: ./Makefile subdir-ccflags-y -Wall修改效果: 然后执行make,就能让添加的编译选项生效…...
【愚公系列】《Python网络爬虫从入门到精通》037-文件的存取
标题详情作者简介愚公搬代码头衔华为云特约编辑,华为云云享专家,华为开发者专家,华为产品云测专家,CSDN博客专家,CSDN商业化专家,阿里云专家博主,阿里云签约作者,腾讯云优秀博主,腾讯云内容共创官,掘金优秀博主,亚马逊技领云博主,51CTO博客专家等。近期荣誉2022年度…...
脑机新手指南(八):OpenBCI_GUI:从环境搭建到数据可视化(下)
一、数据处理与分析实战 (一)实时滤波与参数调整 基础滤波操作 60Hz 工频滤波:勾选界面右侧 “60Hz” 复选框,可有效抑制电网干扰(适用于北美地区,欧洲用户可调整为 50Hz)。 平滑处理&…...
Go 语言接口详解
Go 语言接口详解 核心概念 接口定义 在 Go 语言中,接口是一种抽象类型,它定义了一组方法的集合: // 定义接口 type Shape interface {Area() float64Perimeter() float64 } 接口实现 Go 接口的实现是隐式的: // 矩形结构体…...
定时器任务——若依源码分析
分析util包下面的工具类schedule utils: ScheduleUtils 是若依中用于与 Quartz 框架交互的工具类,封装了定时任务的 创建、更新、暂停、删除等核心逻辑。 createScheduleJob createScheduleJob 用于将任务注册到 Quartz,先构建任务的 JobD…...
WEB3全栈开发——面试专业技能点P2智能合约开发(Solidity)
一、Solidity合约开发 下面是 Solidity 合约开发 的概念、代码示例及讲解,适合用作学习或写简历项目背景说明。 🧠 一、概念简介:Solidity 合约开发 Solidity 是一种专门为 以太坊(Ethereum)平台编写智能合约的高级编…...
拉力测试cuda pytorch 把 4070显卡拉满
import torch import timedef stress_test_gpu(matrix_size16384, duration300):"""对GPU进行压力测试,通过持续的矩阵乘法来最大化GPU利用率参数:matrix_size: 矩阵维度大小,增大可提高计算复杂度duration: 测试持续时间(秒&…...
Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决
Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决 问题背景 在一个基于 Spring Cloud Gateway WebFlux 构建的微服务项目中,新增了一个本地验证码接口 /code,使用函数式路由(RouterFunction)和 Hutool 的 Circle…...
重启Eureka集群中的节点,对已经注册的服务有什么影响
先看答案,如果正确地操作,重启Eureka集群中的节点,对已经注册的服务影响非常小,甚至可以做到无感知。 但如果操作不当,可能会引发短暂的服务发现问题。 下面我们从Eureka的核心工作原理来详细分析这个问题。 Eureka的…...
基于Java+MySQL实现(GUI)客户管理系统
客户资料管理系统的设计与实现 第一章 需求分析 1.1 需求总体介绍 本项目为了方便维护客户信息为了方便维护客户信息,对客户进行统一管理,可以把所有客户信息录入系统,进行维护和统计功能。可通过文件的方式保存相关录入数据,对…...
LINUX 69 FTP 客服管理系统 man 5 /etc/vsftpd/vsftpd.conf
FTP 客服管理系统 实现kefu123登录,不允许匿名访问,kefu只能访问/data/kefu目录,不能查看其他目录 创建账号密码 useradd kefu echo 123|passwd -stdin kefu [rootcode caozx26420]# echo 123|passwd --stdin kefu 更改用户 kefu 的密码…...
uniapp 集成腾讯云 IM 富媒体消息(地理位置/文件)
UniApp 集成腾讯云 IM 富媒体消息全攻略(地理位置/文件) 一、功能实现原理 腾讯云 IM 通过 消息扩展机制 支持富媒体类型,核心实现方式: 标准消息类型:直接使用 SDK 内置类型(文件、图片等)自…...
