Springboot项目如何消费Kafka数据
目录
- 一、引入依赖
- 二、添加Kafka配置
- 三、创建 Kafka 消费者
- (一)Kafka生产的消息是JSON 字符串
- 1、方式一
- 2、方式二:需要直接访问消息元数据
- (二)Kafka生产的消息是对象Order
- 四、创建 启动类
- 五、配置 Kafka 生产者(可选)
- (一)消息类型为json串
- (二)消息类型为对象Order
- 六、启动 Kafka 服务
- 七、测试 Kafka 消费者
- 九、测试和调试
- 十、 结语
一、引入依赖
你需要在 pom.xml 中添加 spring-kafka 相关依赖:
<dependencies><!-- Spring Boot Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- Spring Boot Starter for Logging (optional but useful for debugging) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></dependency><!-- Spring Boot Starter for Testing --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>
二、添加Kafka配置
在 application.yml 或 application.properties 文件中配置 Kafka 连接属性:
- application.yml 示例:
spring:kafka:bootstrap-servers: localhost:9092 # Kafka服务器地址consumer:group-id: my-consumer-group # 消费者组IDauto-offset-reset: earliest # 消费者从头开始读取(如果没有已提交的偏移量)key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 设置key的反序列化器value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 设置value的反序列化器为字符串listener:missing-topics-fatal: false # 如果主题不存在,不抛出致命错误
- application.properties 示例:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.missing-topics-fatal=false
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 设置key的反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 设置value的反序列化器为字符串
- 注意:spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 设置key的反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 设置value的反序列化器为字符串
以上配置说明Kafka生产的数据是json字符串,那么消费接收的数据默认也是json字符串,如果接收消息想用对象接受,需要自定义序列化器,比如以下配置
spring:kafka:producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer # 对 Key 使用 StringSerializervalue-serializer: org.springframework.kafka.support.serializer.ErrorHandlingSerializer # 对 Value 使用 ErrorHandlingSerializerproperties:spring.json.value.default.type: com.example.Order # 默认的 JSON 反序列化目标类型为 Order
三、创建 Kafka 消费者
创建一个 Kafka 消费者类来处理消息。你可以使用 @KafkaListener 注解来监听 Kafka 中的消息
(一)Kafka生产的消息是JSON 字符串
1、方式一
- 如果消息是 JSON 字符串,你可以使用 StringDeserializer 获取消息后,再使用 ObjectMapper 将其转换为
Java 对象(如 Order)。
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;@Service
@EnableKafka // 启用 Kafka 消费者
public class KafkaConsumer {private final ObjectMapper objectMapper = new ObjectMapper();// 监听 Kafka 中的 order-topic 主题@KafkaListener(topics = "order-topic", groupId = "order-consumer-group")public void consumeOrder(String message) {try {// 将 JSON 字符串反序列化为 Order 对象Order order = objectMapper.readValue(message, Order.class);System.out.println("Received order: " + order);} catch (Exception e) {e.printStackTrace();}}}
说明:
- @KafkaListener(topics = “my-topic”, groupId = “my-consumer-group”):
topics 表示监听的 Kafka 主题,groupId 表示消费者所属的消费者组。 - listen(String message): 该方法会被调用来处理收到的每条消息。在此示例中,我们打印出消息内容。
2、方式二:需要直接访问消息元数据
- 可以通过 ConsumerRecord 来接收 Kafka 消息。这种方式适用于需要直接访问消息元数据(如
topic、partition、offset)的场景,也适合手动管理消息消费和偏移量提交的情况。
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumer {// 监听 Kafka 中的 order-topic 主题@KafkaListener(topics = "order-topic", groupId = "order-consumer-group")public void consumeOrder(ConsumerRecord<String, String> record) {// 获取消息的详细信息String key = record.key(); // 获取消息的 keyString value = record.value(); // 获取消息的 valueString topic = record.topic(); // 获取消息的 topicint partition = record.partition(); // 获取消息的分区long offset = record.offset(); // 获取消息的偏移量long timestamp = record.timestamp(); // 获取消息的时间戳// 处理消息(这里我们只是打印消息)System.out.println("Consumed record: ");System.out.println("Key: " + key);System.out.println("Value: " + value);System.out.println("Topic: " + topic);System.out.println("Partition: " + partition);System.out.println("Offset: " + offset);System.out.println("Timestamp: " + timestamp);}
}
(二)Kafka生产的消息是对象Order
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumer {// 监听 Kafka 中的 order-topic 主题@KafkaListener(topics = "order-topic", groupId = "order-consumer-group")public void consumeOrder(ConsumerRecord<String, Order> record) {// 获取消息的详细信息String key = record.key(); // 获取消息的 keyOrder value = record.value(); // 获取消息的 valueString topic = record.topic(); // 获取消息的 topicint partition = record.partition(); // 获取消息的分区long offset = record.offset(); // 获取消息的偏移量long timestamp = record.timestamp(); // 获取消息的时间戳// 处理消息(这里我们只是打印消息)System.out.println("Consumed record: ");System.out.println("Key: " + key);System.out.println("Value: " + value);System.out.println("Topic: " + topic);System.out.println("Partition: " + partition);System.out.println("Offset: " + offset);System.out.println("Timestamp: " + timestamp);}
}
四、创建 启动类
确保你的 Spring Boot 启动类正确配置了 Spring Boot 应用程序启动。
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class KafkaConsumerApplication {public static void main(String[] args) {SpringApplication.run(KafkaConsumerApplication.class, args);}}
五、配置 Kafka 生产者(可选)
(一)消息类型为json串
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;@Service
@EnableKafka
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate; // 发送的是 String 类型消息private ObjectMapper objectMapper = new ObjectMapper(); // Jackson ObjectMapper 用于序列化// 发送订单到 Kafkapublic void sendOrder(String topic, Order order) {try {// 将 Order 对象转换为 JSON 字符串String orderJson = objectMapper.writeValueAsString(order);// 发送 JSON 字符串到 KafkakafkaTemplate.send(topic, orderJson); // 发送字符串消息System.out.println("Order JSON sent to Kafka: " + orderJson);} catch (Exception e) {e.printStackTrace();}}
}
(二)消息类型为对象Order
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;@Service
@EnableKafka
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, Order> kafkaTemplate;// 发送订单到 Kafkapublic void sendOrder(String topic, Order order) {kafkaTemplate.send(topic, order); // 发送订单对象,Spring Kafka 会自动将 Order 转换为 JSON}
}
六、启动 Kafka 服务
启动 Kafka 服务
bin/kafka-server-start.sh config/server.properties
七、测试 Kafka 消费者
你可以通过向 Kafka 发送消息来测试消费者是否工作正常。假设你已经在 Kafka 中创建了一个名为 my-topic 的主题,可以使用 KafkaProducer 来发送消息:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {@Autowiredprivate KafkaProducer kafkaProducer;@GetMapping("/sendOrder")public String sendOrder() {Order order = new Order();order.setOrderId(1L);order.setUserId(123L);order.setProduct("Laptop");order.setQuantity(2);order.setStatus("Created");kafkaProducer.sendOrder("order-topic", order);return "Order sent!";}
}
当你访问 /sendOrder端点时,KafkaProducer 会将消息发送到 Kafka,KafkaConsumer 会接收到这条消息并打印出来。
九、测试和调试
你可以通过查看 Kafka 消费者日志,确保消息已经被成功消费。你还可以使用 KafkaTemplate 发送消息,并确保 Kafka 生产者和消费者之间的连接正常。
十、 结语
至此,你已经在 Spring Boot 中成功配置并实现了 Kafka 消费者和生产者。你可以根据需要扩展功能,例如处理更复杂的消息类型、批量消费等。
相关文章:
Springboot项目如何消费Kafka数据
目录 一、引入依赖二、添加Kafka配置三、创建 Kafka 消费者(一)Kafka生产的消息是JSON 字符串1、方式一2、方式二:需要直接访问消息元数据 (二)Kafka生产的消息是对象Order 四、创建 启动类五、配置 Kafka 生产者&…...
LeetCode 热题 100 | 子串
子串基础 前缀和:前面的数加在一起等于多少,放进map里,key为和,value为这个和出现的次数。单调队列:单调递增/递减队列,每次加入新元素,比新元素大/小的元素全部弹出。滑动窗口:两层…...

深度学习笔记11-优化器对比实验(Tensorflow)
🍨 本文为🔗365天深度学习训练营中的学习记录博客🍖 原作者:K同学啊 目录 一、导入数据并检查 二、配置数据集 三、数据可视化 四、构建模型 五、训练模型 六、模型对比评估 七、总结 一、导入数据并检查 import pathlib,…...
【掌握 JavaScript 数组迭代:map 和 includes 的使用技巧】
map map()方法是数组原型的一个函数,用于对数组的每个元素执行一个函数,并返回一个新的数组,其中包含么哦一个元素执行的结果。 语法 const newArray array.map(callback(currentValue, index, arr), thisValue)参数 callback࿱…...

深入浅出 Android AES 加密解密:从理论到实战
深入浅出 Android AES 加密解密:从理论到实战 在现代移动应用中,数据安全是不可忽视的一环。无论是用户隐私保护,还是敏感信息的存储与传输,加密技术都扮演着重要角色。本文将以 AES(Advanced Encryption Standard&am…...

Clickhouse基础(一)
数据存储的目录,在存储数据时是先经过压缩后再存储的,压缩效率很高 操作命令: sudo clickhouse start sudo clickhouse restart sudo clickhouse status进入clickhouse clickhouse-client -mCREATE TABLE db_13.t_assist (modelId UInt64,…...

深度学习|表示学习|一个神经元可以干什么|02
如是我闻: 如果我们只有一个神经元(即一个单一的线性或非线性函数),仍然可以完成一些简单的任务。以下是一个神经元可以实现的功能和应用: 1. 实现简单的线性分类 输入:一组特征向量 x x x 输出ÿ…...

ubuntu22.04降级安装CUDA11.3
环境:主机x64的ubuntu22.04,原有CUDA12.1,但是现在需要CUDA11.3,本篇文章介绍步骤。 一、下载CUDA11.3的run文件 下载网址:https://developer.nvidia.com/cuda-11-3-1-download-archive?target_osLinux&target_…...
为AI聊天工具添加一个知识系统 之32 三“中”全“会”:推理式的ISA(父类)和IOS(母本)以及生成式CMN (双亲委派)之1
本文要点和问题 要点 三“中”全“会”:推理式的ISA的(父类-父类源码)和IOS的(母本-母类脚本)以及生成式 CMN (双亲委派-子类实例)。 数据中台三端架构的中间端(信息系统架构ISA :…...

Python----Python高级(函数基础,形参和实参,参数传递,全局变量和局部变量,匿名函数,递归函数,eval()函数,LEGB规则)
一、函数基础 1.1、函数的用法和底层分析 函数是可重用的程序代码块。 函数的作用,不仅可以实现代码的复用,更能实现代码的一致性。一致性指的是,只要修改函数的代码,则所有调用该函数的地方都能得到体现。 在编写函数时…...
spring解决循环依赖的通俗理解
目录标题 1、什么是循环依赖2、解决循环依赖的原理3、Spring通过三级缓存解决循环依赖4、为什么要使用三级缓存而不是二级缓存?5、三级缓存中存放的是lambda表达式而不是一个半成品对象 1、什么是循环依赖 众所周知,Spring的容器中管理整个体系的bean对…...

用 Python 从零开始创建神经网络(十九):真实数据集
真实数据集 引言数据准备数据加载数据预处理数据洗牌批次(Batches)训练(Training)到目前为止的全部代码: 引言 在实践中,深度学习通常涉及庞大的数据集(通常以TB甚至更多为单位)&am…...
介绍PyTorch张量
介绍PyTorch张量 介绍PyTorch张量 PyTorch张量是我们在PyTorch中编程神经网络时将使用的数据结构。 在编程神经网络时,数据预处理通常是整个过程的第一步,数据预处理的一个目标是将原始输入数据转换为张量形式。 torch.Tensor类的实例 PyTorch张量…...

Vision Transformer (ViT)原理
Vision Transformer (ViT)原理 flyfish Transformer缺乏卷积神经网络(CNNs)的归纳偏差(inductive biases),比如平移不变性和局部受限的感受野。不变性意味着即使实体entity(即对象)的外观或位…...

移动云自研云原生数据库入围国采!
近日,中央国家机关2024年度事务型数据库软件框架协议联合征集采购项目产品名单正式公布,移动云自主研发的云原生数据库产品顺利入围。这一成就不仅彰显了移动云在数据库领域深耕多年造就的领先技术优势,更标志着国家权威评审机构对移动云在数…...

Unity中对象池的使用(用一个简单粗暴的例子)
问题描述:Unity在创建和销毁对象的时候是很消耗性能的,所以我们在销毁一个对象的时候,可以不用Destroy,而是将这个物体隐藏后放到回收池里面,当再次需要的时候如果回收池里面有之前回收的对象,就直接拿来用…...
linux命令行连接Postgresql常用命令
1.linux系统命令行连接数据库命令 psql -h hostname -p port -U username -d databasename -h 主机名或IP地址 -p 端口 -U 用户名 -d 连接的数据库 2.查询数据库表命令 select version() #查看版本号 \dg #查看用户 \l #查询数据库 \c mydb #切换…...
每日一题-单链表排序
为了对给定的单链表按升序排序,我们可以考虑以下解决方法: 思路 归并排序(Merge Sort):由于归并排序的时间复杂度为 O ( n log n ) O(n \log n) O(nlogn),并且归并排序不需要额外的空间(空…...
webpack04服务器配置
webpack配置 entryoutput filenamepathpublicPath 。。 打包引入的基本路径,,,比如引入一个bundle.js,。引用之后的路径就是 publicPathfilename -devServer:static : 静态文件的位置。。。hostportopencompress : 静态资源是否用gzip压缩hi…...

JDK下载安装配置
一.JDK安装配置。 1.安装注意路径,其他直接下一步。 2.配置。 下接第4步. 或者 代码复制: JAVA_HOME D:\Program Files\Java\jdk1.8.0_91 %JAVA_HOME%\bin 或者直接配置 D:\Program Files\Java\jdk1.8.0_91\bin 3.验证(CMD)。 java javac java -version javac -version 二.下…...

XCTF-web-easyupload
试了试php,php7,pht,phtml等,都没有用 尝试.user.ini 抓包修改将.user.ini修改为jpg图片 在上传一个123.jpg 用蚁剑连接,得到flag...
synchronized 学习
学习源: https://www.bilibili.com/video/BV1aJ411V763?spm_id_from333.788.videopod.episodes&vd_source32e1c41a9370911ab06d12fbc36c4ebc 1.应用场景 不超卖,也要考虑性能问题(场景) 2.常见面试问题: sync出…...

【kafka】Golang实现分布式Masscan任务调度系统
要求: 输出两个程序,一个命令行程序(命令行参数用flag)和一个服务端程序。 命令行程序支持通过命令行参数配置下发IP或IP段、端口、扫描带宽,然后将消息推送到kafka里面。 服务端程序: 从kafka消费者接收…...
golang循环变量捕获问题
在 Go 语言中,当在循环中启动协程(goroutine)时,如果在协程闭包中直接引用循环变量,可能会遇到一个常见的陷阱 - 循环变量捕获问题。让我详细解释一下: 问题背景 看这个代码片段: fo…...
多场景 OkHttpClient 管理器 - Android 网络通信解决方案
下面是一个完整的 Android 实现,展示如何创建和管理多个 OkHttpClient 实例,分别用于长连接、普通 HTTP 请求和文件下载场景。 <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas…...

抖音增长新引擎:品融电商,一站式全案代运营领跑者
抖音增长新引擎:品融电商,一站式全案代运营领跑者 在抖音这个日活超7亿的流量汪洋中,品牌如何破浪前行?自建团队成本高、效果难控;碎片化运营又难成合力——这正是许多企业面临的增长困局。品融电商以「抖音全案代运营…...

Mac软件卸载指南,简单易懂!
刚和Adobe分手,它却总在Library里给你写"回忆录"?卸载的Final Cut Pro像电子幽灵般阴魂不散?总是会有残留文件,别慌!这份Mac软件卸载指南,将用最硬核的方式教你"数字分手术"࿰…...
什么?连接服务器也能可视化显示界面?:基于X11 Forwarding + CentOS + MobaXterm实战指南
文章目录 什么是X11?环境准备实战步骤1️⃣ 服务器端配置(CentOS)2️⃣ 客户端配置(MobaXterm)3️⃣ 验证X11 Forwarding4️⃣ 运行自定义GUI程序(Python示例)5️⃣ 成功效果
深度学习习题2
1.如果增加神经网络的宽度,精确度会增加到一个特定阈值后,便开始降低。造成这一现象的可能原因是什么? A、即使增加卷积核的数量,只有少部分的核会被用作预测 B、当卷积核数量增加时,神经网络的预测能力会降低 C、当卷…...
【Java学习笔记】BigInteger 和 BigDecimal 类
BigInteger 和 BigDecimal 类 二者共有的常见方法 方法功能add加subtract减multiply乘divide除 注意点:传参类型必须是类对象 一、BigInteger 1. 作用:适合保存比较大的整型数 2. 使用说明 创建BigInteger对象 传入字符串 3. 代码示例 import j…...