当前位置: 首页 > news >正文

(七)消息队列-Kafka 序列化avro(传递)

(七)消息队列-Kafka 序列化avro(传递)

客从远方来,遗我双鲤鱼。呼儿烹鲤鱼,中有尺素书。
——佚名《饮马长城窟行》

在这里插入图片描述

本文已同步CSDN、掘金平台、知乎等多个平台,图片依然保持最初发布的水印(如CSDN水印)。(以后属于本人原创均以新建状态在多个平台分享发布)

前言

多年前,由于工作的性质,发现这系列没有写完,想了想,做人做事还是要有始有终。🤣实在是借口太多了,太不像话了…由于时间过得太久了,这篇开始,可能很多技术以最新或最近的几个版本为主了。

问题背景

在Kafka中,生产者与消费者之间传输消息时,通常需要对数据进行序列化和反序列化。常见的序列化方式如JSON或String存在以下问题:

  1. 数据冗余:字段名重复存储,占用带宽;
  2. 兼容性差:新增或删除字段时容易导致上下游解析失败;
  3. 类型安全缺失:动态解析易引发运行时错误。

而Avro作为一种高效的二进制序列化框架,通过Schema定义数据结构,可实现紧凑存储动态兼容性强类型校验,成为Kafka生态中推荐的序列化方案27。


核心原理
  1. Schema驱动
    Avro要求所有数据必须与预定义的Schema文件(.avsc)匹配。Schema以JSON格式描述数据结构,例如:

    {"type": "record","name": "User","namespace": "com.example.avro","fields": [{"name": "id", "type": "int"},{"name": "name", "type": "string"}]
    }
    

    然后使用 avro-maven-plugin 生成 Java 类:

    <plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>1.11.0</version><executions><execution><phase>generate-sources</phase><goals><goal>schema</goal></goals></execution></executions>
    </plugin>
    

    执行 mvn clean compile 后,com.example.avro.User 类会被自动生成。

    生产者与消费者需共享同一Schema,确保序列化与反序列化的一致性。

  2. 二进制编码
    Avro将数据转换为紧凑的二进制格式,相比JSON减少约30%-50%的存储与传输开销。例如,整型字段直接以二进制存储,无需字段名冗余7。

  3. Schema Registry
    为实现Schema动态管理,通常搭配Schema Registry(如Confluent或Apicurio)使用。其核心功能包括:

    • Schema版本控制与兼容性检查;
    • 通过唯一ID关联消息与Schema,避免传输完整Schema带来的性能损耗。

实现步骤

以下以Java代码为例,展示Kafka集成Avro的配置方法:

1. 添加依赖
<dependencies><!-- Spring Kafka 依赖--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- Avro 依赖 --><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId></dependency><!-- Schema Registry 依赖 --><dependency><groupId>io.confluent</groupId><artifactId>kafka-avro-serializer</artifactId><version>7.2.1</version></dependency>
</dependencies>

运行 HTML

2. 配置生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", KafkaAvroSerializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081"); // Schema Registry地址Producer<String, GenericRecord> producer = new KafkaProducer<>(props);// 构建Avro消息
GenericRecord user = new GenericData.Record(schema);
user.put("id", 1);
user.put("name", "Alice");producer.send(new ProducerRecord<>("user-topic", user));------ SpringBoot框架 直接用配置application.yml 和生产者服务类--------------
spring:kafka:bootstrap-servers: localhost:9092properties:schema.registry.url: http://localhost:8081producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer@Service
public class UserProducer {private final KafkaTemplate<String, User> kafkaTemplate;@Value("${kafka.topic.user}")private String topic;public UserProducer(KafkaTemplate<String, User> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendUser(User user) {kafkaTemplate.send(topic, user.getId().toString(), user);}
}在 Spring Boot 启动后,我们可以使用以下代码发送一个 User 消息:
User user = User.newBuilder().setId(1).setName("Alice").build();
userProducer.sendUser(user);控制台应该能够看到消费者成功接收到 User 数据
3. 配置消费者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "avro-consumer");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", KafkaAvroDeserializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081");Consumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("user-topic"));while (true) {ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, GenericRecord> record : records) {System.out.println("Received: " + record.value().get("name"));}
}------ SpringBoot框架 直接用配置application.yml 和消费者服务类--------------
在 application.yml 中配置消费者参数:spring:kafka:consumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializerproperties:specific.avro.reader: true然后编写 Kafka 消费者代码:@Service
@KafkaListener(topics = "user_topic", groupId = "user_group")
public class UserConsumer {@KafkaHandlerpublic void consume(User user) {System.out.println("Received user: " + user.getName());}
}

常见问题与解决方案
  1. Schema兼容性错误
    • 现象:生产者更新Schema后消费者无法解析旧数据。
    • 解决:在Schema Registry中配置兼容性策略(如BACKWARD),允许新增字段并设置默认值7。
  2. ClassNotFoundException
    • 现象:反序列化时提示Avro生成的类不存在。
    • 解决:通过Maven插件avro-maven-plugin自动生成Java类,并确保生成路径在编译范围内2。
  3. 性能瓶颈
    • 现象:高吞吐场景下序列化延迟较高。
    • 优化:复用DatumWriterDatumReader实例,避免重复初始化开销7。

总结

Avro通过Schema定义与二进制编码,为Kafka提供了高效、类型安全的序列化方案。结合Schema Registry可实现动态兼容性管理,适用于复杂业务场景下的数据演进需求。实践中需注意Schema版本控制与性能调优,具体工具链配置可参考Confluent官方文档27。


引用说明

  • 代码结构参考自SpringBoot RestTemplate配置方案,通过替换默认组件实现功能增强。
  • Schema兼容性问题分析借鉴了MAT工具中内存对象关联性的排查思路。

后续

下期预告,敬请关注:
(八)消息队列-Kafka 生产者

相关文章:

(七)消息队列-Kafka 序列化avro(传递)

&#xff08;七&#xff09;消息队列-Kafka 序列化avro&#xff08;传递&#xff09; 客从远方来&#xff0c;遗我双鲤鱼。呼儿烹鲤鱼&#xff0c;中有尺素书。 ——佚名《饮马长城窟行》 本文已同步CSDN、掘金平台、知乎等多个平台&#xff0c;图片依然保持最初发布的水印&…...

js基础二

JavaScript基础下 1 事件处理 JS 事件&#xff08;event&#xff09;是当用户与网页进行交互时发生的事情&#xff0c;例如单机某个链接或按钮、在文本框中输入文本、按下键盘上的某个按键、移动鼠标等等。当事件发生时&#xff0c;您可以使用 JavaScript 中的事件处理程序&a…...

WSBDF レクチア 定义2 引理3 wsbdf的乘子

定义2 引理3 wsbdf的乘子 ここまで 寝みます❓...

Qt之QStateMachine等待

在项目中经常需要等待&#xff0c;我们模拟0-30的数&#xff0c;假如我们其中5&#xff0c; 25的数需要进行等待&#xff0c;等待用户处理完自己事情后&#xff0c;按下按钮继续&#xff0c;找Qt的项目中有一个 QStateMachineqstatemmachine类提供了一个分层有限状态机。 QSta…...

Wireshark 插件开发实战指南

Wireshark 插件开发实战指南 环境搭建流程图 #mermaid-svg-XpNibno7BIyfzNn5 {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-XpNibno7BIyfzNn5 .error-icon{fill:#552222;}#mermaid-svg-XpNibno7BIyfzNn5 .error-t…...

基于SpringBoot的“青少年心理健康教育网站”的设计与实现(源码+数据库+文档+PPT)

基于SpringBoot的“青少年心理健康教育网站”的设计与实现&#xff08;源码数据库文档PPT) 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;SpringBoot 工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 系统总体结构图 实体属性图 系统首页界…...

23-整数转罗马数字

代码 测试用例 测试结果 测试结果 12. 整数转罗马数字 中等 相关标签 相关企业 七个不同的符号代表罗马数字&#xff0c;其值如下&#xff1a; 符号值I1V5X10L50C100D500M1000 罗马数字是通过添加从最高到最低的小数位值的转换而形成的。将小数位值转换为罗马数字有以…...

SpringBoot+Redis+Mybatis-plus黑马点评

短信登录 基于Session实现登录 流程&#xff1a; 发送短信验证码-->短信验证码注册登录-->校验登录状态&#xff08;保存用户到ThreadLocal&#xff0c;方便后续使用&#xff09; 不能每次请求服务都要进行登录状态校验&#xff0c;解决办法&#xff1a;拦截器 在Sp…...

深入剖析 OpenCV:全面掌握基础操作、图像处理算法与特征匹配

深入剖析 OpenCV&#xff1a;全面掌握基础操作、图像处理算法与特征匹配 一、引言二、OpenCV 的安装&#xff08;一&#xff09;使用 pip 安装&#xff08;二&#xff09;使用 Anaconda 安装 三、OpenCV 基础操作&#xff08;一&#xff09;图像的读取、显示与保存&#xff08;…...

【C语言显示Linux系统参数】

在C语言中&#xff0c;可以通过调用Linux系统提供的API来获取和显示系统参数。以下是一些常见的系统参数及其获取方法&#xff1a; 1. 获取系统名称和版本 可以使用uname函数来获取系统名称、版本等信息。 #include <stdio.h> #include <sys/utsname.h>int main…...

突破Ajax跨域困境,解锁前端通信新姿势

一、引言 在当今的 Web 开发领域&#xff0c;前后端分离的架构模式已经成为主流&#xff0c;它极大地提升了开发效率和项目的可维护性。在这种开发模式下&#xff0c;前端通过 Ajax 技术与后端进行数据交互&#xff0c;然而&#xff0c;跨域问题却如影随形&#xff0c;成为了开…...

Kotlin协变与逆变区别

在Kotlin中&#xff0c;协变和逆变是泛型编程中的两个重要概念&#xff0c;它们允许我们在类型系统中更加灵活地处理类型关系。 1.协变&#xff1a;协变允许我们使用比原始类型更具体的类型。在kotlin中&#xff0c;通过在类型参数上加out关键字来表示协变,生产者&#xff0c;例…...

driver中为什么要使用非阻塞赋值

1. 模拟硬件时序行为 实际硬件行为&#xff1a;DUT的输入信号通常在时钟边沿被采样。Driver需要确保信号的更新与时钟同步&#xff0c;而非阻塞赋值的延迟更新特性&#xff08;在时间步结束时统一生效&#xff09;能够准确模拟寄存器的行为。 示例&#xff1a; always (posedg…...

模板字符串【ES6】

“路漫漫其修远兮&#xff0c;吾将上下而求索。”—— 屈原《离骚》 目录 什么是模板字符串&#xff1f;模板字符串特性及代码举例&#xff1a;详细举例用法&#xff1a; 什么是模板字符串&#xff1f; 模板字符串&#xff08;Template Literals&#xff09;是JavaScript中引入…...

通往 AI 之路:Python 机器学习入门-数据结构

Python 数据结构 Python 提供了多种数据结构来存储和操作数据&#xff0c;其中列表&#xff08;list&#xff09;、字典&#xff08;dict&#xff09;、元组&#xff08;tuple&#xff09;和集合&#xff08;set&#xff09;是最常用的几种。本章将详细介绍这些数据结构的基本…...

我们应该如何优化UI(基于UGUI)

这是一道面试题&#xff0c;下面&#xff0c;我们来详细分析这个问题。 目录 1. 减少 Draw Call 合理设置图集 避免材质和 Shader 的频繁切换 减少 UI 元素的重叠 2. 优化UI布局 3. 优化UI元素的渲染 4.优化UI动画 5. 优化 UI 事件处理 6. 运行时优化 1. 减少 Draw C…...

CSS3 圆角:实现与优化指南

CSS3 圆角&#xff1a;实现与优化指南 随着网页设计的发展&#xff0c;CSS3 圆角已经成为了现代网页设计中不可或缺的元素之一。本文将详细讲解 CSS3 圆角的基本用法、实现方式以及优化技巧&#xff0c;帮助您在网页设计中更好地运用这一功能。 一、CSS3 圆角基本用法 1.1 基…...

【网络安全 | 扫描子域+发现真实IP】CloakQuest3r安装使用详细教程

原创文章,禁止转载。 本文仅作学习交流使用,不得用于非法渗透,笔者不承担任何责任。 文章目录 简介功能介绍执行流程限制安装步骤可选功能:SecurityTrails API使用示例简介 CloakQuest3r 是一款强大的 Python 工具,专为揭示受 Cloudflare 及类似服务保护的网站真实 IP 地…...

Mellanox OFED驱动如何给全局编译添加gcc的编译选项?(subdir-ccflags-y += -Wall)

背景 有些时候编译驱动需要给全局加一个编译选项&#xff0c;假设configure已经完成。可以直接在Makefile中修改 添加方式 修改OFED驱动目录下的&#xff1a; ./Makefile subdir-ccflags-y -Wall修改效果&#xff1a; 然后执行make&#xff0c;就能让添加的编译选项生效…...

【愚公系列】《Python网络爬虫从入门到精通》037-文件的存取

标题详情作者简介愚公搬代码头衔华为云特约编辑,华为云云享专家,华为开发者专家,华为产品云测专家,CSDN博客专家,CSDN商业化专家,阿里云专家博主,阿里云签约作者,腾讯云优秀博主,腾讯云内容共创官,掘金优秀博主,亚马逊技领云博主,51CTO博客专家等。近期荣誉2022年度…...

思源黑体TTF构建指南:免费商用多语言字体的终极解决方案

思源黑体TTF构建指南&#xff1a;免费商用多语言字体的终极解决方案 【免费下载链接】source-han-sans-ttf A (hinted!) version of Source Han Sans 项目地址: https://gitcode.com/gh_mirrors/so/source-han-sans-ttf 你是否曾为多语言项目中的字体问题而烦恼&#xf…...

免费中医AI终极指南:仲景大模型如何让普通人也能享受专业中医咨询

免费中医AI终极指南&#xff1a;仲景大模型如何让普通人也能享受专业中医咨询 【免费下载链接】CMLM-ZhongJing 首个中医大语言模型——“仲景”。受古代中医学巨匠张仲景深邃智慧启迪&#xff0c;专为传统中医领域打造的预训练大语言模型。 The first-ever Traditional Chines…...

Windows右键菜单终极优化指南:如何用ContextMenuManager让右键菜单秒开如飞

Windows右键菜单终极优化指南&#xff1a;如何用ContextMenuManager让右键菜单秒开如飞 【免费下载链接】ContextMenuManager &#x1f5b1;️ 纯粹的Windows右键菜单管理程序 项目地址: https://gitcode.com/gh_mirrors/co/ContextMenuManager 你是否曾经对着电脑屏幕等…...

【linux学习】linux工具篇(下)

Linux调试器-gdb使用&#xff0c;Linux项目自动化构建工具-make/Makefile我是程序员小青蛙&#xff0c;下面分享linux的工具利用前言程序的发布方式有两种&#xff0c;debug模式和release模式 Linux gcc/g出来的二进制程序&#xff0c;默认是release模式 要使用gdb调试&#xf…...

Flink 2.2集成Flink CDC 3.6

1 、部署Flink CDC tar -zxf flink-cdc-3.6.0-2.2-bin.tar.gz -C /usr/bigtop/3.3.0/usr/libln -s /usr/bigtop/3.3.0/usr/lib/flink-cdc-3.6...

网盘直链解析工具:多平台文件下载的实用解决方案

网盘直链解析工具&#xff1a;多平台文件下载的实用解决方案 【免费下载链接】Online-disk-direct-link-download-assistant 一个基于 JavaScript 的网盘文件下载地址获取工具。基于【网盘直链下载助手】修改 &#xff0c;支持 百度网盘 / 阿里云盘 / 中国移动云盘 / 天翼云盘 …...

书匠策AI:那个让你论文查重从“红色地狱“直接变“绿色天堂“的神器

各位正在跟论文死磕的同学们&#xff0c;先别划走。 今天咱们不聊怎么写开题报告&#xff0c;不聊怎么搭框架&#xff0c;咱们聊一个所有人写完初稿后都会遭遇的终极BOSS——查重。 你有没有经历过这种崩溃&#xff1a;熬夜写了一万字&#xff0c;信心满满提交查重&#xff0…...

后端开发必知的数据库优化技巧:这5个方法让你的系统性能提升10倍

对于软件测试从业者来说&#xff0c;理解数据库优化逻辑不仅能帮我们更快定位性能瓶颈&#xff0c;还能让我们在测试阶段就提前发现潜在的数据库设计问题&#xff0c;避免上线后出现大规模性能故障。很多测试同学往往把注意力放在接口逻辑、功能正确性上&#xff0c;却忽略了数…...

Playwright Python3.7+安装失败根因与一次成功配置指南

1. 为什么Playwright在Python3.7环境下总“装不上”&#xff1f;——这不是你的pip问题&#xff0c;是环境认知偏差 你刚在新配的Mac M2上敲下 pip install playwright &#xff0c;终端卡在 Building wheel for playwright... 十分钟不动&#xff1b;或者Windows上反复提示…...

通过用量看板与成本管理功能实现团队API支出精细化管控

&#x1f680; 告别海外账号与网络限制&#xff01;稳定直连全球优质大模型&#xff0c;限时半价接入中。 &#x1f449; 点击领取海量免费额度 通过用量看板与成本管理功能实现团队API支出精细化管控 对于依赖大模型API进行开发的团队而言&#xff0c;成本控制与资源分配的透…...