当前位置: 首页 > news >正文

SpringBoot 整合 Avro 与 Kafka 详解

SpringBoot 整合 Avro 与 Kafka 详解

在大数据处理和实时数据流场景中,Apache Kafka 和 Apache Avro 是两个非常重要的工具。Kafka 作为一个分布式流处理平台,能够高效地处理大量数据,而 Avro 则是一个用于序列化数据的紧凑、快速的二进制数据格式。将这两者结合,并通过 Spring Boot 进行整合,可以构建出高效、可扩展的实时数据处理系统。

一、环境准备

在开始整合之前,需要准备好以下环境:

  • Java:确保已经安装了 JDK,推荐使用 JDK 8 或更高版本。
  • Maven:用于管理项目的依赖和构建过程。
  • Spring Boot:作为项目的框架,推荐使用较新的版本,如 Spring Boot 2.x。
  • Kafka:确保 Kafka 已经安装并运行,可以使用 Docker 部署 Kafka 集群。
  • Avro:Avro 依赖 JSON 定义的架构来序列化数据。
二、项目结构

一个典型的 Spring Boot 项目结构可能如下:

spring-boot-kafka-avro
├── src
│   ├── main
│   │   ├── java
│   │   │   └── com
│   │   │       └── example
│   │   │           ├── SpringBootKafkaAvroApplication.java
│   │   │           ├── config
│   │   │           │   └── KafkaConfig.java
│   │   │           ├── producer
│   │   │           │   └── KafkaProducer.java
│   │   │           ├── consumer
│   │   │           │   └── KafkaConsumer.java
│   │   │           └── model
│   │   │               └── ElectronicsPackage.java (由 Avro 自动生成)
│   │   ├── resources
│   │   │   ├── application.properties
│   │   │   └── avro
│   │   │       └── electronicsPackage.avsc (Avro 架构文件)
│   └── test
│       └── java
│           └── com
│               └── example
│                   └── SpringBootKafkaAvroApplicationTests.java
└── pom.xml
三、添加依赖

pom.xml 文件中添加必要的依赖:

<dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- Spring Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.9.13</version> <!-- 根据需要选择合适的版本 --></dependency><!-- Avro --><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId><version>1.11.0</version> <!-- 根据需要选择合适的版本 --></dependency><!-- Avro Maven Plugin --><plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>${avro.version}</version><executions><execution><phase>generate-sources</phase><goals><goal>schema</goal></goals><configuration><sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory><outputDirectory>${project.build.directory}/generated/avro</outputDirectory></configuration></execution></executions></plugin>
</dependencies>
四、定义 Avro 架构

src/main/resources/avro/ 目录下创建一个 Avro 架构文件 electronicsPackage.avsc

{"namespace": "com.example.model","type": "record","name": "ElectronicsPackage","fields": [{"name": "package_number", "type": ["string", "null"], "default": null},{"name": "frs_site_code", "type": ["string", "null"], "default": null},{"name": "frs_site_code_type", "type": ["string", "null"], "default": null}]
}

这个架构文件定义了 ElectronicsPackage 类,包括三个字段:package_numberfrs_site_codefrs_site_code_type

五、生成 Avro 类

运行 Maven 构建过程,Avro Maven 插件会根据 electronicsPackage.avsc 文件生成相应的 Java 类 ElectronicsPackage.java

六、配置 Kafka

application.properties 文件中配置 Kafka 的相关属性:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=com.example.config.AvroSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=com.example.config.AvroDeserializer

注意,这里指定了自定义的 AvroSerializerAvroDeserializer 类。

七、实现 Avro 序列化器和反序列化器

创建 AvroSerializerAvroDeserializer 类,用于 Avro 数据的序列化和反序列化。

// AvroSerializer.java
package com.example.config;import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.serialization.Serializer;import java.io.ByteArrayOutputStream;
import java.io.IOException;public class AvroSerializer<T extends SpecificRecord> implements Serializer<T> {private final DatumWriter<T> writer;public AvroSerializer(Class<T> type) {this.writer = new SpecificDatumWriter<>(type);}@Overridepublic byte[] serialize(String topic, T data) {if (data == null) {return null;}ByteArrayOutputStream out = new ByteArrayOutputStream();Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);try {writer.write(data, encoder);encoder.flush();out.close();} catch (IOException e) {throw new RuntimeException(e);}return out.toByteArray();}@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {// No-op}@Overridepublic void close() {// No-op}
}// AvroDeserializer.java
package com.example.config;import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.serialization.Deserializer;import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Map;public class AvroDeserializer<T extends SpecificRecord> implements Deserializer<T> {private final Class<T> type;private final DatumReader<T> reader;public AvroDeserializer(Class<T> type) {this.type = type;this.reader = new SpecificDatumReader<>(type);}@Overridepublic T deserialize(String topic, byte[] data) {if (data == null) {return null;}ByteArrayInputStream in = new ByteArrayInputStream(data);Decoder decoder = DecoderFactory.get().binaryDecoder(in, null);try {return reader.read(null, decoder);} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {// No-op

相关文章:

SpringBoot 整合 Avro 与 Kafka 详解

SpringBoot 整合 Avro 与 Kafka 详解 在大数据处理和实时数据流场景中&#xff0c;Apache Kafka 和 Apache Avro 是两个非常重要的工具。Kafka 作为一个分布式流处理平台&#xff0c;能够高效地处理大量数据&#xff0c;而 Avro 则是一个用于序列化数据的紧凑、快速的二进制数…...

若依 ruoyi VUE el-select 直接获取 选择option 的 label和value

1、最新在研究若依这个项目&#xff0c;我使用的是前后端分离的方案&#xff0c;RuoYi-Vue-fast(后端) RuoYi-Vue-->ruoyi-ui(前端)。RuoYi-Vue-fast是单应用版本没有区分那么多的modules 自己开发起来很方便&#xff0c;这个项目运行起来很方便&#xff0c;但是需要自定义的…...

大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; 目前已经更新到了&#xff1a; Hadoop&#xff08;已更完&#xff09;HDFS&#xff08;已更完&#xff09;MapReduce&#xff08;已更完&am…...

修改MySQL存储路径

1.查看原路径 show variables like ‘%datadir%’; 2.停止MYSQL 以管理员身份运行命令提示符 net stop MySQL84 在服务中直接停止MySQL 3.编辑配置文件 可能会遇到无权限修改&#xff0c;可以先修改my.ini的权限。可以通过&#xff1a;右键my.ini → 属性 → 安全→ 编辑 …...

Git常用的命令【提交与回退】

git分布式版本控制系统 &#xff08;SVN集中式版本控制系统&#xff09;之间的对比 git有本地仓库和远程仓库&#xff0c;不同的开发人员可以分别提交自己的本地仓库并维护代码的版本控制。 然后多个人员在本地仓库协作的代码&#xff0c;可以提交到远程仓库中做整合。 git本…...

详解:HTTP/HTTPS协议

HTTP协议 一.HTTP是什么 HTTP&#xff0c;全称超文本传输协议&#xff0c;是一种用于分布式、协作式、超媒体信息系统的应用层协议。HTTP往往是基于传输层TCP协议实现的&#xff0c;采用的一问一答的模式&#xff0c;即发一个请求&#xff0c;返回一个响应。 Q&#xff1a;什…...

0.96寸OLED---STM32

一、简介 OLED&#xff1a;有机发光二极管 OLED显示屏&#xff1a;性能优异的新型显示屏&#xff0c;具有功耗低&#xff08;相比LCD不需要背光源&#xff0c;每一个节点当度发光&#xff09;、响应速度快、宽视角&#xff08;自发光&#xff0c;从任何视角看都比较清晰&…...

保姆级教学 uniapp绘制二维码海报并保存至相册,真机正常展示图片二维码

一、获取二维码 uni.request({url: https://api.weixin.qq.com/wxa/getwxacode?access_token${getStorage("token")},responseType: "arraybuffer",method: "POST",data: {path: "/pages/index/index"},success(res) {// 转换为 Uint…...

常用Vim操作

vimrc配置 ctags -R * 生成tags文件 set number set ts4 set sw4 set autoindent set cindent set tag~/tmp/log/help/tags 自动补全&#xff1a; ctrln&#xff1a;自动补全 输入&#xff1a; a&#xff1a;从当前文字后插入i&#xff1a;从当前文字前插入s: 删除当前字…...

【C#】NET 9中LINQ的新特性-CountBy

前言 在 .NET 中,使用 LINQ 对元素进行分组并计算它们的出现次数时,需要通过两个步步骤。首先,使用 GroupBy方法根据特定键对元素进行分类。然后,再计算每个组元素包含个数。而随着 .NET 9 版本发布,引入了一些新特性。其中 LINQ 引入了一种新的方法 CountBy,本文一起来了…...

Trimble X9三维激光扫描仪高效应对化工厂复杂管道扫描测绘挑战【沪敖3D】

化工安全关系到国计民生&#xff0c;近年来随着化工厂数字化改革不断推进&#xff0c;数字工厂逐步成为工厂安全管理的重要手段。而化工管道作为工厂设施的重要组成部分&#xff0c;由于其数量多、种类繁杂&#xff0c;一直是企业管理的重点和难点。 传统的化工管廊往往缺乏详…...

【数据结构】文件和外部排序

外部排序 外存信息的存取 计算基本存储方式 内部存储&#xff08;主存&#xff09;&#xff1a;断电后数据会丢失&#xff0c;访问速度快&#xff0c;成本高容量通常较小外部存储&#xff08;辅存&#xff09;&#xff1a;断电后数据不会丢失&#xff0c;访问速度较慢&#x…...

新手学习:网页前端、后端、服务器Tomcat和数据库的基本介绍

首先一点&#xff0c;不管是那个框架开发的网页前端&#xff0c;最后都需要Build,构建完毕以后都是原始的HTML CSS JS 三样文件&#xff01; 网页前端 目录结构 在开始开发网站之前&#xff0c;首先需要了解如何组织文件。一个简单的网页项目通常会有以下几个文件夹和文件&…...

机器学习贝叶斯模型原理

一、引言 在机器学习与数据分析的广袤天地中&#xff0c;贝叶斯模型犹如一颗璀璨的明星&#xff0c;闪耀着独特的光芒&#xff0c;为众多领域的分类、预测等任务提供了强大的理论支撑与实用解法。然而&#xff0c;对于许多初涉此领域的小伙伴而言&#xff0c;贝叶斯模型背后的…...

【C++】实现100以内素数的求解

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 &#x1f4af;前言&#x1f4af;代码概览&#x1f4af;代码结构与逻辑分析1. 包含的头文件和命名空间2. 素数判断函数 isPrime功能输入与输出核心逻辑数学背景 3. 主函数 main功能核心逻辑输出示例 &#…...

Python 浏览器自动化新利器:DrissionPage,让网页操作更简单!

Python 浏览器自动化新利器&#xff1a;DrissionPage&#xff0c;让网页操作更简单&#xff01; 文章目录 Python 浏览器自动化新利器&#xff1a;DrissionPage&#xff0c;让网页操作更简单&#xff01;&#x1f680; 引言&#x1f31f; DrissionPage简介&#x1f6e0;️ 三大…...

Rust学习笔记_13——枚举

Rust学习笔记_10——守卫 Rust学习笔记_11——函数 Rust学习笔记_12——闭包 枚举 文章目录 枚举1. 定义1.1 无值变体1.2 有值变体1.3 枚举与泛型的结合 2. 使用2.1 和匹配模式一起使用2.2 枚举作为类型别名 3. 常用枚举类型 在Rust编程语言中&#xff0c;枚举&#xff08;enum…...

Postgresql 格式转换笔记整理

1、数据类型有哪些 1.1 数值类型 DECIMAL/NUMERIC 使用方法 DECIMAL是PostgreSQL中的一种数值数据类型&#xff0c;用于存储固定精度和小数位数的数值。DECIMAL的精度是由用户指定的&#xff0c;可以存储任何位数的数值&#xff0c;而小数位数则由用户自行定义。DECIMAL类型的…...

AI开发:卷积神经网络CNN原理初识,简易例程 - 机器学习

一 、卷积神经网络是什么 &#xff08;1&#xff09;印象 今天说的CNN&#xff0c;并不是我们熟知的美国有线电视新闻网。 那什么是CNN呢&#xff1f; Convolutional Neural Networks, CNN&#xff09;简单来说&#xff0c;就是用一个筛子来筛面粉的。 筛子就是卷积核&…...

详细介绍vue的递归组件(重要)

递归组件在 Vue 中是一个非常强大的概念&#xff0c;尤其在渲染层级结构&#xff08;如树形结构、嵌套列表、评论系统等&#xff09;时&#xff0c;能够极大地简化代码。 什么是递归组件&#xff1f; 递归组件就是一个组件在其模板中引用自身。这种做法通常用于渲染树形结构或…...

从深圳崛起的“机器之眼”:赴港乐动机器人的万亿赛道赶考路

进入2025年以来&#xff0c;尽管围绕人形机器人、具身智能等机器人赛道的质疑声不断&#xff0c;但全球市场热度依然高涨&#xff0c;入局者持续增加。 以国内市场为例&#xff0c;天眼查专业版数据显示&#xff0c;截至5月底&#xff0c;我国现存在业、存续状态的机器人相关企…...

全球首个30米分辨率湿地数据集(2000—2022)

数据简介 今天我们分享的数据是全球30米分辨率湿地数据集&#xff0c;包含8种湿地亚类&#xff0c;该数据以0.5X0.5的瓦片存储&#xff0c;我们整理了所有属于中国的瓦片名称与其对应省份&#xff0c;方便大家研究使用。 该数据集作为全球首个30米分辨率、覆盖2000–2022年时间…...

多模态商品数据接口:融合图像、语音与文字的下一代商品详情体验

一、多模态商品数据接口的技术架构 &#xff08;一&#xff09;多模态数据融合引擎 跨模态语义对齐 通过Transformer架构实现图像、语音、文字的语义关联。例如&#xff0c;当用户上传一张“蓝色连衣裙”的图片时&#xff0c;接口可自动提取图像中的颜色&#xff08;RGB值&…...

视觉slam十四讲实践部分记录——ch2、ch3

ch2 一、使用g++编译.cpp为可执行文件并运行(P30) g++ helloSLAM.cpp ./a.out运行 二、使用cmake编译 mkdir build cd build cmake .. makeCMakeCache.txt 文件仍然指向旧的目录。这表明在源代码目录中可能还存在旧的 CMakeCache.txt 文件,或者在构建过程中仍然引用了旧的路…...

【SSH疑难排查】轻松解决新版OpenSSH连接旧服务器的“no matching...“系列算法协商失败问题

【SSH疑难排查】轻松解决新版OpenSSH连接旧服务器的"no matching..."系列算法协商失败问题 摘要&#xff1a; 近期&#xff0c;在使用较新版本的OpenSSH客户端连接老旧SSH服务器时&#xff0c;会遇到 "no matching key exchange method found"​, "n…...

Razor编程中@Html的方法使用大全

文章目录 1. 基础HTML辅助方法1.1 Html.ActionLink()1.2 Html.RouteLink()1.3 Html.Display() / Html.DisplayFor()1.4 Html.Editor() / Html.EditorFor()1.5 Html.Label() / Html.LabelFor()1.6 Html.TextBox() / Html.TextBoxFor() 2. 表单相关辅助方法2.1 Html.BeginForm() …...

JS红宝书笔记 - 3.3 变量

要定义变量&#xff0c;可以使用var操作符&#xff0c;后跟变量名 ES实现变量初始化&#xff0c;因此可以同时定义变量并设置它的值 使用var操作符定义的变量会成为包含它的函数的局部变量。 在函数内定义变量时省略var操作符&#xff0c;可以创建一个全局变量 如果需要定义…...

Java数组Arrays操作全攻略

Arrays类的概述 Java中的Arrays类位于java.util包中&#xff0c;提供了一系列静态方法用于操作数组&#xff08;如排序、搜索、填充、比较等&#xff09;。这些方法适用于基本类型数组和对象数组。 常用成员方法及代码示例 排序&#xff08;sort&#xff09; 对数组进行升序…...

信息系统分析与设计复习

2024试卷 单选题&#xff08;20&#xff09; 1、在一个聊天系统(类似ChatGPT)中&#xff0c;属于控制类的是&#xff08;&#xff09;。 A. 话语者类 B.聊天文字输入界面类 C. 聊天主题辨别类 D. 聊天历史类 ​解析 B-C-E备选架构中分析类分为边界类、控制类和实体类。 边界…...

Redis专题-实战篇一-基于Session和Redis实现登录业务

GitHub项目地址&#xff1a;https://github.com/whltaoin/redisLearningProject_hm-dianping 基于Session实现登录业务功能提交版本码&#xff1a;e34399f 基于Redis实现登录业务提交版本码&#xff1a;60bf740 一、导入黑马点评后端项目 项目架构图 1. 前期阶段2. 后续阶段导…...