Springboot项目如何消费Kafka数据
目录
- 一、引入依赖
- 二、添加Kafka配置
- 三、创建 Kafka 消费者
- (一)Kafka生产的消息是JSON 字符串
- 1、方式一
- 2、方式二:需要直接访问消息元数据
- (二)Kafka生产的消息是对象Order
- 四、创建 启动类
- 五、配置 Kafka 生产者(可选)
- (一)消息类型为json串
- (二)消息类型为对象Order
- 六、启动 Kafka 服务
- 七、测试 Kafka 消费者
- 九、测试和调试
- 十、 结语
一、引入依赖
你需要在 pom.xml 中添加 spring-kafka 相关依赖:
<dependencies><!-- Spring Boot Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- Spring Boot Starter for Logging (optional but useful for debugging) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></dependency><!-- Spring Boot Starter for Testing --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>
二、添加Kafka配置
在 application.yml 或 application.properties 文件中配置 Kafka 连接属性:
- application.yml 示例:
spring:kafka:bootstrap-servers: localhost:9092 # Kafka服务器地址consumer:group-id: my-consumer-group # 消费者组IDauto-offset-reset: earliest # 消费者从头开始读取(如果没有已提交的偏移量)key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 设置key的反序列化器value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 设置value的反序列化器为字符串listener:missing-topics-fatal: false # 如果主题不存在,不抛出致命错误
- application.properties 示例:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.missing-topics-fatal=false
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 设置key的反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 设置value的反序列化器为字符串
- 注意:spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 设置key的反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 设置value的反序列化器为字符串
以上配置说明Kafka生产的数据是json字符串,那么消费接收的数据默认也是json字符串,如果接收消息想用对象接受,需要自定义序列化器,比如以下配置
spring:kafka:producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer # 对 Key 使用 StringSerializervalue-serializer: org.springframework.kafka.support.serializer.ErrorHandlingSerializer # 对 Value 使用 ErrorHandlingSerializerproperties:spring.json.value.default.type: com.example.Order # 默认的 JSON 反序列化目标类型为 Order
三、创建 Kafka 消费者
创建一个 Kafka 消费者类来处理消息。你可以使用 @KafkaListener 注解来监听 Kafka 中的消息
(一)Kafka生产的消息是JSON 字符串
1、方式一
- 如果消息是 JSON 字符串,你可以使用 StringDeserializer 获取消息后,再使用 ObjectMapper 将其转换为
Java 对象(如 Order)。
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;@Service
@EnableKafka // 启用 Kafka 消费者
public class KafkaConsumer {private final ObjectMapper objectMapper = new ObjectMapper();// 监听 Kafka 中的 order-topic 主题@KafkaListener(topics = "order-topic", groupId = "order-consumer-group")public void consumeOrder(String message) {try {// 将 JSON 字符串反序列化为 Order 对象Order order = objectMapper.readValue(message, Order.class);System.out.println("Received order: " + order);} catch (Exception e) {e.printStackTrace();}}}
说明:
- @KafkaListener(topics = “my-topic”, groupId = “my-consumer-group”):
topics 表示监听的 Kafka 主题,groupId 表示消费者所属的消费者组。 - listen(String message): 该方法会被调用来处理收到的每条消息。在此示例中,我们打印出消息内容。
2、方式二:需要直接访问消息元数据
- 可以通过 ConsumerRecord 来接收 Kafka 消息。这种方式适用于需要直接访问消息元数据(如
topic、partition、offset)的场景,也适合手动管理消息消费和偏移量提交的情况。
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumer {// 监听 Kafka 中的 order-topic 主题@KafkaListener(topics = "order-topic", groupId = "order-consumer-group")public void consumeOrder(ConsumerRecord<String, String> record) {// 获取消息的详细信息String key = record.key(); // 获取消息的 keyString value = record.value(); // 获取消息的 valueString topic = record.topic(); // 获取消息的 topicint partition = record.partition(); // 获取消息的分区long offset = record.offset(); // 获取消息的偏移量long timestamp = record.timestamp(); // 获取消息的时间戳// 处理消息(这里我们只是打印消息)System.out.println("Consumed record: ");System.out.println("Key: " + key);System.out.println("Value: " + value);System.out.println("Topic: " + topic);System.out.println("Partition: " + partition);System.out.println("Offset: " + offset);System.out.println("Timestamp: " + timestamp);}
}
(二)Kafka生产的消息是对象Order
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumer {// 监听 Kafka 中的 order-topic 主题@KafkaListener(topics = "order-topic", groupId = "order-consumer-group")public void consumeOrder(ConsumerRecord<String, Order> record) {// 获取消息的详细信息String key = record.key(); // 获取消息的 keyOrder value = record.value(); // 获取消息的 valueString topic = record.topic(); // 获取消息的 topicint partition = record.partition(); // 获取消息的分区long offset = record.offset(); // 获取消息的偏移量long timestamp = record.timestamp(); // 获取消息的时间戳// 处理消息(这里我们只是打印消息)System.out.println("Consumed record: ");System.out.println("Key: " + key);System.out.println("Value: " + value);System.out.println("Topic: " + topic);System.out.println("Partition: " + partition);System.out.println("Offset: " + offset);System.out.println("Timestamp: " + timestamp);}
}
四、创建 启动类
确保你的 Spring Boot 启动类正确配置了 Spring Boot 应用程序启动。
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class KafkaConsumerApplication {public static void main(String[] args) {SpringApplication.run(KafkaConsumerApplication.class, args);}}
五、配置 Kafka 生产者(可选)
(一)消息类型为json串
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;@Service
@EnableKafka
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate; // 发送的是 String 类型消息private ObjectMapper objectMapper = new ObjectMapper(); // Jackson ObjectMapper 用于序列化// 发送订单到 Kafkapublic void sendOrder(String topic, Order order) {try {// 将 Order 对象转换为 JSON 字符串String orderJson = objectMapper.writeValueAsString(order);// 发送 JSON 字符串到 KafkakafkaTemplate.send(topic, orderJson); // 发送字符串消息System.out.println("Order JSON sent to Kafka: " + orderJson);} catch (Exception e) {e.printStackTrace();}}
}
(二)消息类型为对象Order
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;@Service
@EnableKafka
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, Order> kafkaTemplate;// 发送订单到 Kafkapublic void sendOrder(String topic, Order order) {kafkaTemplate.send(topic, order); // 发送订单对象,Spring Kafka 会自动将 Order 转换为 JSON}
}
六、启动 Kafka 服务
启动 Kafka 服务
bin/kafka-server-start.sh config/server.properties
七、测试 Kafka 消费者
你可以通过向 Kafka 发送消息来测试消费者是否工作正常。假设你已经在 Kafka 中创建了一个名为 my-topic 的主题,可以使用 KafkaProducer 来发送消息:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {@Autowiredprivate KafkaProducer kafkaProducer;@GetMapping("/sendOrder")public String sendOrder() {Order order = new Order();order.setOrderId(1L);order.setUserId(123L);order.setProduct("Laptop");order.setQuantity(2);order.setStatus("Created");kafkaProducer.sendOrder("order-topic", order);return "Order sent!";}
}
当你访问 /sendOrder端点时,KafkaProducer 会将消息发送到 Kafka,KafkaConsumer 会接收到这条消息并打印出来。
九、测试和调试
你可以通过查看 Kafka 消费者日志,确保消息已经被成功消费。你还可以使用 KafkaTemplate 发送消息,并确保 Kafka 生产者和消费者之间的连接正常。
十、 结语
至此,你已经在 Spring Boot 中成功配置并实现了 Kafka 消费者和生产者。你可以根据需要扩展功能,例如处理更复杂的消息类型、批量消费等。
相关文章:
Springboot项目如何消费Kafka数据
目录 一、引入依赖二、添加Kafka配置三、创建 Kafka 消费者(一)Kafka生产的消息是JSON 字符串1、方式一2、方式二:需要直接访问消息元数据 (二)Kafka生产的消息是对象Order 四、创建 启动类五、配置 Kafka 生产者&…...
LeetCode 热题 100 | 子串
子串基础 前缀和:前面的数加在一起等于多少,放进map里,key为和,value为这个和出现的次数。单调队列:单调递增/递减队列,每次加入新元素,比新元素大/小的元素全部弹出。滑动窗口:两层…...

深度学习笔记11-优化器对比实验(Tensorflow)
🍨 本文为🔗365天深度学习训练营中的学习记录博客🍖 原作者:K同学啊 目录 一、导入数据并检查 二、配置数据集 三、数据可视化 四、构建模型 五、训练模型 六、模型对比评估 七、总结 一、导入数据并检查 import pathlib,…...
【掌握 JavaScript 数组迭代:map 和 includes 的使用技巧】
map map()方法是数组原型的一个函数,用于对数组的每个元素执行一个函数,并返回一个新的数组,其中包含么哦一个元素执行的结果。 语法 const newArray array.map(callback(currentValue, index, arr), thisValue)参数 callback࿱…...

深入浅出 Android AES 加密解密:从理论到实战
深入浅出 Android AES 加密解密:从理论到实战 在现代移动应用中,数据安全是不可忽视的一环。无论是用户隐私保护,还是敏感信息的存储与传输,加密技术都扮演着重要角色。本文将以 AES(Advanced Encryption Standard&am…...

Clickhouse基础(一)
数据存储的目录,在存储数据时是先经过压缩后再存储的,压缩效率很高 操作命令: sudo clickhouse start sudo clickhouse restart sudo clickhouse status进入clickhouse clickhouse-client -mCREATE TABLE db_13.t_assist (modelId UInt64,…...

深度学习|表示学习|一个神经元可以干什么|02
如是我闻: 如果我们只有一个神经元(即一个单一的线性或非线性函数),仍然可以完成一些简单的任务。以下是一个神经元可以实现的功能和应用: 1. 实现简单的线性分类 输入:一组特征向量 x x x 输出ÿ…...

ubuntu22.04降级安装CUDA11.3
环境:主机x64的ubuntu22.04,原有CUDA12.1,但是现在需要CUDA11.3,本篇文章介绍步骤。 一、下载CUDA11.3的run文件 下载网址:https://developer.nvidia.com/cuda-11-3-1-download-archive?target_osLinux&target_…...
为AI聊天工具添加一个知识系统 之32 三“中”全“会”:推理式的ISA(父类)和IOS(母本)以及生成式CMN (双亲委派)之1
本文要点和问题 要点 三“中”全“会”:推理式的ISA的(父类-父类源码)和IOS的(母本-母类脚本)以及生成式 CMN (双亲委派-子类实例)。 数据中台三端架构的中间端(信息系统架构ISA :…...

Python----Python高级(函数基础,形参和实参,参数传递,全局变量和局部变量,匿名函数,递归函数,eval()函数,LEGB规则)
一、函数基础 1.1、函数的用法和底层分析 函数是可重用的程序代码块。 函数的作用,不仅可以实现代码的复用,更能实现代码的一致性。一致性指的是,只要修改函数的代码,则所有调用该函数的地方都能得到体现。 在编写函数时…...
spring解决循环依赖的通俗理解
目录标题 1、什么是循环依赖2、解决循环依赖的原理3、Spring通过三级缓存解决循环依赖4、为什么要使用三级缓存而不是二级缓存?5、三级缓存中存放的是lambda表达式而不是一个半成品对象 1、什么是循环依赖 众所周知,Spring的容器中管理整个体系的bean对…...

用 Python 从零开始创建神经网络(十九):真实数据集
真实数据集 引言数据准备数据加载数据预处理数据洗牌批次(Batches)训练(Training)到目前为止的全部代码: 引言 在实践中,深度学习通常涉及庞大的数据集(通常以TB甚至更多为单位)&am…...
介绍PyTorch张量
介绍PyTorch张量 介绍PyTorch张量 PyTorch张量是我们在PyTorch中编程神经网络时将使用的数据结构。 在编程神经网络时,数据预处理通常是整个过程的第一步,数据预处理的一个目标是将原始输入数据转换为张量形式。 torch.Tensor类的实例 PyTorch张量…...

Vision Transformer (ViT)原理
Vision Transformer (ViT)原理 flyfish Transformer缺乏卷积神经网络(CNNs)的归纳偏差(inductive biases),比如平移不变性和局部受限的感受野。不变性意味着即使实体entity(即对象)的外观或位…...

移动云自研云原生数据库入围国采!
近日,中央国家机关2024年度事务型数据库软件框架协议联合征集采购项目产品名单正式公布,移动云自主研发的云原生数据库产品顺利入围。这一成就不仅彰显了移动云在数据库领域深耕多年造就的领先技术优势,更标志着国家权威评审机构对移动云在数…...

Unity中对象池的使用(用一个简单粗暴的例子)
问题描述:Unity在创建和销毁对象的时候是很消耗性能的,所以我们在销毁一个对象的时候,可以不用Destroy,而是将这个物体隐藏后放到回收池里面,当再次需要的时候如果回收池里面有之前回收的对象,就直接拿来用…...
linux命令行连接Postgresql常用命令
1.linux系统命令行连接数据库命令 psql -h hostname -p port -U username -d databasename -h 主机名或IP地址 -p 端口 -U 用户名 -d 连接的数据库 2.查询数据库表命令 select version() #查看版本号 \dg #查看用户 \l #查询数据库 \c mydb #切换…...
每日一题-单链表排序
为了对给定的单链表按升序排序,我们可以考虑以下解决方法: 思路 归并排序(Merge Sort):由于归并排序的时间复杂度为 O ( n log n ) O(n \log n) O(nlogn),并且归并排序不需要额外的空间(空…...
webpack04服务器配置
webpack配置 entryoutput filenamepathpublicPath 。。 打包引入的基本路径,,,比如引入一个bundle.js,。引用之后的路径就是 publicPathfilename -devServer:static : 静态文件的位置。。。hostportopencompress : 静态资源是否用gzip压缩hi…...

JDK下载安装配置
一.JDK安装配置。 1.安装注意路径,其他直接下一步。 2.配置。 下接第4步. 或者 代码复制: JAVA_HOME D:\Program Files\Java\jdk1.8.0_91 %JAVA_HOME%\bin 或者直接配置 D:\Program Files\Java\jdk1.8.0_91\bin 3.验证(CMD)。 java javac java -version javac -version 二.下…...

eNSP-Cloud(实现本地电脑与eNSP内设备之间通信)
说明: 想象一下,你正在用eNSP搭建一个虚拟的网络世界,里面有虚拟的路由器、交换机、电脑(PC)等等。这些设备都在你的电脑里面“运行”,它们之间可以互相通信,就像一个封闭的小王国。 但是&#…...
生成xcframework
打包 XCFramework 的方法 XCFramework 是苹果推出的一种多平台二进制分发格式,可以包含多个架构和平台的代码。打包 XCFramework 通常用于分发库或框架。 使用 Xcode 命令行工具打包 通过 xcodebuild 命令可以打包 XCFramework。确保项目已经配置好需要支持的平台…...

C++初阶-list的底层
目录 1.std::list实现的所有代码 2.list的简单介绍 2.1实现list的类 2.2_list_iterator的实现 2.2.1_list_iterator实现的原因和好处 2.2.2_list_iterator实现 2.3_list_node的实现 2.3.1. 避免递归的模板依赖 2.3.2. 内存布局一致性 2.3.3. 类型安全的替代方案 2.3.…...

2025盘古石杯决赛【手机取证】
前言 第三届盘古石杯国际电子数据取证大赛决赛 最后一题没有解出来,实在找不到,希望有大佬教一下我。 还有就会议时间,我感觉不是图片时间,因为在电脑看到是其他时间用老会议系统开的会。 手机取证 1、分析鸿蒙手机检材&#x…...

ios苹果系统,js 滑动屏幕、锚定无效
现象:window.addEventListener监听touch无效,划不动屏幕,但是代码逻辑都有执行到。 scrollIntoView也无效。 原因:这是因为 iOS 的触摸事件处理机制和 touch-action: none 的设置有关。ios有太多得交互动作,从而会影响…...
PAN/FPN
import torch import torch.nn as nn import torch.nn.functional as F import mathclass LowResQueryHighResKVAttention(nn.Module):"""方案 1: 低分辨率特征 (Query) 查询高分辨率特征 (Key, Value).输出分辨率与低分辨率输入相同。"""def __…...
Qt 事件处理中 return 的深入解析
Qt 事件处理中 return 的深入解析 在 Qt 事件处理中,return 语句的使用是另一个关键概念,它与 event->accept()/event->ignore() 密切相关但作用不同。让我们详细分析一下它们之间的关系和工作原理。 核心区别:不同层级的事件处理 方…...

算术操作符与类型转换:从基础到精通
目录 前言:从基础到实践——探索运算符与类型转换的奥秘 算术操作符超级详解 算术操作符:、-、*、/、% 赋值操作符:和复合赋值 单⽬操作符:、--、、- 前言:从基础到实践——探索运算符与类型转换的奥秘 在先前的文…...
从实验室到产业:IndexTTS 在六大核心场景的落地实践
一、内容创作:重构数字内容生产范式 在短视频创作领域,IndexTTS 的语音克隆技术彻底改变了配音流程。B 站 UP 主通过 5 秒参考音频即可克隆出郭老师音色,生成的 “各位吴彦祖们大家好” 语音相似度达 97%,单条视频播放量突破百万…...
深入理解 React 样式方案
React 的样式方案较多,在应用开发初期,开发者需要根据项目业务具体情况选择对应样式方案。React 样式方案主要有: 1. 内联样式 2. module css 3. css in js 4. tailwind css 这些方案中,均有各自的优势和缺点。 1. 方案优劣势 1. 内联样式: 简单直观,适合动态样式和…...