当前位置: 首页 > news >正文

Springboot项目如何消费Kafka数据

目录

  • 一、引入依赖
  • 二、添加Kafka配置
  • 三、创建 Kafka 消费者
    • (一)Kafka生产的消息是JSON 字符串
      • 1、方式一
      • 2、方式二:需要直接访问消息元数据
    • (二)Kafka生产的消息是对象Order
  • 四、创建 启动类
  • 五、配置 Kafka 生产者(可选)
    • (一)消息类型为json串
    • (二)消息类型为对象Order
  • 六、启动 Kafka 服务
  • 七、测试 Kafka 消费者
  • 九、测试和调试
  • 十、 结语

一、引入依赖

你需要在 pom.xml 中添加 spring-kafka 相关依赖:

<dependencies><!-- Spring Boot Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- Spring Boot Starter for Logging (optional but useful for debugging) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></dependency><!-- Spring Boot Starter for Testing --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>

二、添加Kafka配置

在 application.yml 或 application.properties 文件中配置 Kafka 连接属性:

  • application.yml 示例:
spring:kafka:bootstrap-servers: localhost:9092  # Kafka服务器地址consumer:group-id: my-consumer-group   # 消费者组IDauto-offset-reset: earliest   # 消费者从头开始读取(如果没有已提交的偏移量)key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # 设置key的反序列化器value-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # 设置value的反序列化器为字符串listener:missing-topics-fatal: false    # 如果主题不存在,不抛出致命错误
  • application.properties 示例:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.missing-topics-fatal=false
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer  # 设置key的反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer  # 设置value的反序列化器为字符串
  • 注意:spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 设置key的反序列化器
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 设置value的反序列化器为字符串
    以上配置说明Kafka生产的数据是json字符串,那么消费接收的数据默认也是json字符串,如果接收消息想用对象接受,需要自定义序列化器,比如以下配置
spring:kafka:producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer  # 对 Key 使用 StringSerializervalue-serializer: org.springframework.kafka.support.serializer.ErrorHandlingSerializer  # 对 Value 使用 ErrorHandlingSerializerproperties:spring.json.value.default.type: com.example.Order  # 默认的 JSON 反序列化目标类型为 Order

三、创建 Kafka 消费者

创建一个 Kafka 消费者类来处理消息。你可以使用 @KafkaListener 注解来监听 Kafka 中的消息

(一)Kafka生产的消息是JSON 字符串

1、方式一

  • 如果消息是 JSON 字符串,你可以使用 StringDeserializer 获取消息后,再使用 ObjectMapper 将其转换为
    Java 对象(如 Order)。
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;@Service
@EnableKafka  // 启用 Kafka 消费者
public class KafkaConsumer {private final ObjectMapper objectMapper = new ObjectMapper();// 监听 Kafka 中的 order-topic 主题@KafkaListener(topics = "order-topic", groupId = "order-consumer-group")public void consumeOrder(String message) {try {// 将 JSON 字符串反序列化为 Order 对象Order order = objectMapper.readValue(message, Order.class);System.out.println("Received order: " + order);} catch (Exception e) {e.printStackTrace();}}}

说明:

  • @KafkaListener(topics = “my-topic”, groupId = “my-consumer-group”):
    topics 表示监听的 Kafka 主题,groupId 表示消费者所属的消费者组。
  • listen(String message): 该方法会被调用来处理收到的每条消息。在此示例中,我们打印出消息内容。

2、方式二:需要直接访问消息元数据

  • 可以通过 ConsumerRecord 来接收 Kafka 消息。这种方式适用于需要直接访问消息元数据(如
    topic、partition、offset)的场景,也适合手动管理消息消费和偏移量提交的情况。
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumer {// 监听 Kafka 中的 order-topic 主题@KafkaListener(topics = "order-topic", groupId = "order-consumer-group")public void consumeOrder(ConsumerRecord<String, String> record) {// 获取消息的详细信息String key = record.key();           // 获取消息的 keyString value = record.value();       // 获取消息的 valueString topic = record.topic();       // 获取消息的 topicint partition = record.partition(); // 获取消息的分区long offset = record.offset();      // 获取消息的偏移量long timestamp = record.timestamp(); // 获取消息的时间戳// 处理消息(这里我们只是打印消息)System.out.println("Consumed record: ");System.out.println("Key: " + key);System.out.println("Value: " + value);System.out.println("Topic: " + topic);System.out.println("Partition: " + partition);System.out.println("Offset: " + offset);System.out.println("Timestamp: " + timestamp);}
}

(二)Kafka生产的消息是对象Order

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumer {// 监听 Kafka 中的 order-topic 主题@KafkaListener(topics = "order-topic", groupId = "order-consumer-group")public void consumeOrder(ConsumerRecord<String, Order> record) {// 获取消息的详细信息String key = record.key();           // 获取消息的 keyOrder value = record.value();       // 获取消息的 valueString topic = record.topic();       // 获取消息的 topicint partition = record.partition(); // 获取消息的分区long offset = record.offset();      // 获取消息的偏移量long timestamp = record.timestamp(); // 获取消息的时间戳// 处理消息(这里我们只是打印消息)System.out.println("Consumed record: ");System.out.println("Key: " + key);System.out.println("Value: " + value);System.out.println("Topic: " + topic);System.out.println("Partition: " + partition);System.out.println("Offset: " + offset);System.out.println("Timestamp: " + timestamp);}
}

四、创建 启动类

确保你的 Spring Boot 启动类正确配置了 Spring Boot 应用程序启动。

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class KafkaConsumerApplication {public static void main(String[] args) {SpringApplication.run(KafkaConsumerApplication.class, args);}}

五、配置 Kafka 生产者(可选)

(一)消息类型为json串

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;@Service
@EnableKafka
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;  // 发送的是 String 类型消息private ObjectMapper objectMapper = new ObjectMapper();  // Jackson ObjectMapper 用于序列化// 发送订单到 Kafkapublic void sendOrder(String topic, Order order) {try {// 将 Order 对象转换为 JSON 字符串String orderJson = objectMapper.writeValueAsString(order);// 发送 JSON 字符串到 KafkakafkaTemplate.send(topic, orderJson);  // 发送字符串消息System.out.println("Order JSON sent to Kafka: " + orderJson);} catch (Exception e) {e.printStackTrace();}}
}

(二)消息类型为对象Order

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;@Service
@EnableKafka
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, Order> kafkaTemplate;// 发送订单到 Kafkapublic void sendOrder(String topic, Order order) {kafkaTemplate.send(topic, order);  // 发送订单对象,Spring Kafka 会自动将 Order 转换为 JSON}
}

六、启动 Kafka 服务

启动 Kafka 服务

bin/kafka-server-start.sh config/server.properties

七、测试 Kafka 消费者

你可以通过向 Kafka 发送消息来测试消费者是否工作正常。假设你已经在 Kafka 中创建了一个名为 my-topic 的主题,可以使用 KafkaProducer 来发送消息:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {@Autowiredprivate KafkaProducer kafkaProducer;@GetMapping("/sendOrder")public String sendOrder() {Order order = new Order();order.setOrderId(1L);order.setUserId(123L);order.setProduct("Laptop");order.setQuantity(2);order.setStatus("Created");kafkaProducer.sendOrder("order-topic", order);return "Order sent!";}
}

当你访问 /sendOrder端点时,KafkaProducer 会将消息发送到 Kafka,KafkaConsumer 会接收到这条消息并打印出来。

九、测试和调试

你可以通过查看 Kafka 消费者日志,确保消息已经被成功消费。你还可以使用 KafkaTemplate 发送消息,并确保 Kafka 生产者和消费者之间的连接正常。

十、 结语

至此,你已经在 Spring Boot 中成功配置并实现了 Kafka 消费者和生产者。你可以根据需要扩展功能,例如处理更复杂的消息类型、批量消费等。

相关文章:

Springboot项目如何消费Kafka数据

目录 一、引入依赖二、添加Kafka配置三、创建 Kafka 消费者&#xff08;一&#xff09;Kafka生产的消息是JSON 字符串1、方式一2、方式二&#xff1a;需要直接访问消息元数据 &#xff08;二&#xff09;Kafka生产的消息是对象Order 四、创建 启动类五、配置 Kafka 生产者&…...

LeetCode 热题 100 | 子串

子串基础 前缀和&#xff1a;前面的数加在一起等于多少&#xff0c;放进map里&#xff0c;key为和&#xff0c;value为这个和出现的次数。单调队列&#xff1a;单调递增/递减队列&#xff0c;每次加入新元素&#xff0c;比新元素大/小的元素全部弹出。滑动窗口&#xff1a;两层…...

深度学习笔记11-优化器对比实验(Tensorflow)

&#x1f368; 本文为&#x1f517;365天深度学习训练营中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 目录 一、导入数据并检查 二、配置数据集 三、数据可视化 四、构建模型 五、训练模型 六、模型对比评估 七、总结 一、导入数据并检查 import pathlib,…...

【掌握 JavaScript 数组迭代:map 和 includes 的使用技巧】

map map()方法是数组原型的一个函数&#xff0c;用于对数组的每个元素执行一个函数&#xff0c;并返回一个新的数组&#xff0c;其中包含么哦一个元素执行的结果。 语法 const newArray array.map(callback(currentValue, index, arr), thisValue)参数 callback&#xff1…...

深入浅出 Android AES 加密解密:从理论到实战

深入浅出 Android AES 加密解密&#xff1a;从理论到实战 在现代移动应用中&#xff0c;数据安全是不可忽视的一环。无论是用户隐私保护&#xff0c;还是敏感信息的存储与传输&#xff0c;加密技术都扮演着重要角色。本文将以 AES&#xff08;Advanced Encryption Standard&am…...

Clickhouse基础(一)

数据存储的目录&#xff0c;在存储数据时是先经过压缩后再存储的&#xff0c;压缩效率很高 操作命令&#xff1a; sudo clickhouse start sudo clickhouse restart sudo clickhouse status进入clickhouse clickhouse-client -mCREATE TABLE db_13.t_assist (modelId UInt64,…...

深度学习|表示学习|一个神经元可以干什么|02

如是我闻&#xff1a; 如果我们只有一个神经元&#xff08;即一个单一的线性或非线性函数&#xff09;&#xff0c;仍然可以完成一些简单的任务。以下是一个神经元可以实现的功能和应用&#xff1a; 1. 实现简单的线性分类 输入&#xff1a;一组特征向量 x x x 输出&#xff…...

ubuntu22.04降级安装CUDA11.3

环境&#xff1a;主机x64的ubuntu22.04&#xff0c;原有CUDA12.1&#xff0c;但是现在需要CUDA11.3&#xff0c;本篇文章介绍步骤。 一、下载CUDA11.3的run文件 下载网址&#xff1a;https://developer.nvidia.com/cuda-11-3-1-download-archive?target_osLinux&target_…...

为AI聊天工具添加一个知识系统 之32 三“中”全“会”:推理式的ISA(父类)和IOS(母本)以及生成式CMN (双亲委派)之1

本文要点和问题 要点 三“中”全“会”&#xff1a;推理式的ISA的&#xff08;父类-父类源码&#xff09;和IOS的&#xff08;母本-母类脚本&#xff09;以及生成式 CMN &#xff08;双亲委派-子类实例&#xff09;。 数据中台三端架构的中间端(信息系统架构ISA &#xff1a…...

Python----Python高级(函数基础,形参和实参,参数传递,全局变量和局部变量,匿名函数,递归函数,eval()函数,LEGB规则)

一、函数基础 1.1、函数的用法和底层分析 函数是可重用的程序代码块。 函数的作用&#xff0c;不仅可以实现代码的复用&#xff0c;更能实现代码的一致性。一致性指的是&#xff0c;只要修改函数的代码&#xff0c;则所有调用该函数的地方都能得到体现。 在编写函数时&#xf…...

spring解决循环依赖的通俗理解

目录标题 1、什么是循环依赖2、解决循环依赖的原理3、Spring通过三级缓存解决循环依赖4、为什么要使用三级缓存而不是二级缓存&#xff1f;5、三级缓存中存放的是lambda表达式而不是一个半成品对象 1、什么是循环依赖 众所周知&#xff0c;Spring的容器中管理整个体系的bean对…...

用 Python 从零开始创建神经网络(十九):真实数据集

真实数据集 引言数据准备数据加载数据预处理数据洗牌批次&#xff08;Batches&#xff09;训练&#xff08;Training&#xff09;到目前为止的全部代码&#xff1a; 引言 在实践中&#xff0c;深度学习通常涉及庞大的数据集&#xff08;通常以TB甚至更多为单位&#xff09;&am…...

介绍PyTorch张量

介绍PyTorch张量 介绍PyTorch张量 PyTorch张量是我们在PyTorch中编程神经网络时将使用的数据结构。 在编程神经网络时&#xff0c;数据预处理通常是整个过程的第一步&#xff0c;数据预处理的一个目标是将原始输入数据转换为张量形式。 torch.Tensor​类的实例 PyTorch张量…...

Vision Transformer (ViT)原理

Vision Transformer (ViT)原理 flyfish Transformer缺乏卷积神经网络&#xff08;CNNs&#xff09;的归纳偏差&#xff08;inductive biases&#xff09;&#xff0c;比如平移不变性和局部受限的感受野。不变性意味着即使实体entity&#xff08;即对象&#xff09;的外观或位…...

移动云自研云原生数据库入围国采!

近日&#xff0c;中央国家机关2024年度事务型数据库软件框架协议联合征集采购项目产品名单正式公布&#xff0c;移动云自主研发的云原生数据库产品顺利入围。这一成就不仅彰显了移动云在数据库领域深耕多年造就的领先技术优势&#xff0c;更标志着国家权威评审机构对移动云在数…...

Unity中对象池的使用(用一个简单粗暴的例子)

问题描述&#xff1a;Unity在创建和销毁对象的时候是很消耗性能的&#xff0c;所以我们在销毁一个对象的时候&#xff0c;可以不用Destroy&#xff0c;而是将这个物体隐藏后放到回收池里面&#xff0c;当再次需要的时候如果回收池里面有之前回收的对象&#xff0c;就直接拿来用…...

linux命令行连接Postgresql常用命令

1.linux系统命令行连接数据库命令 psql -h hostname -p port -U username -d databasename -h 主机名或IP地址 -p 端口 -U 用户名 -d 连接的数据库 2.查询数据库表命令 select version() #查看版本号 \dg #查看用户 \l #查询数据库 \c mydb #切换…...

每日一题-单链表排序

为了对给定的单链表按升序排序&#xff0c;我们可以考虑以下解决方法&#xff1a; 思路 归并排序&#xff08;Merge Sort&#xff09;&#xff1a;由于归并排序的时间复杂度为 O ( n log ⁡ n ) O(n \log n) O(nlogn)&#xff0c;并且归并排序不需要额外的空间&#xff08;空…...

webpack04服务器配置

webpack配置 entryoutput filenamepathpublicPath 。。 打包引入的基本路径&#xff0c;&#xff0c;&#xff0c;比如引入一个bundle.js,。引用之后的路径就是 publicPathfilename -devServer:static : 静态文件的位置。。。hostportopencompress : 静态资源是否用gzip压缩hi…...

JDK下载安装配置

一.JDK安装配置。 1.安装注意路径,其他直接下一步。 2.配置。 下接第4步. 或者 代码复制: JAVA_HOME D:\Program Files\Java\jdk1.8.0_91 %JAVA_HOME%\bin 或者直接配置 D:\Program Files\Java\jdk1.8.0_91\bin 3.验证(CMD)。 java javac java -version javac -version 二.下…...

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…...

华为云AI开发平台ModelArts

华为云ModelArts&#xff1a;重塑AI开发流程的“智能引擎”与“创新加速器”&#xff01; 在人工智能浪潮席卷全球的2025年&#xff0c;企业拥抱AI的意愿空前高涨&#xff0c;但技术门槛高、流程复杂、资源投入巨大的现实&#xff0c;却让许多创新构想止步于实验室。数据科学家…...

JavaScript 中的 ES|QL:利用 Apache Arrow 工具

作者&#xff1a;来自 Elastic Jeffrey Rengifo 学习如何将 ES|QL 与 JavaScript 的 Apache Arrow 客户端工具一起使用。 想获得 Elastic 认证吗&#xff1f;了解下一期 Elasticsearch Engineer 培训的时间吧&#xff01; Elasticsearch 拥有众多新功能&#xff0c;助你为自己…...

高等数学(下)题型笔记(八)空间解析几何与向量代数

目录 0 前言 1 向量的点乘 1.1 基本公式 1.2 例题 2 向量的叉乘 2.1 基础知识 2.2 例题 3 空间平面方程 3.1 基础知识 3.2 例题 4 空间直线方程 4.1 基础知识 4.2 例题 5 旋转曲面及其方程 5.1 基础知识 5.2 例题 6 空间曲面的法线与切平面 6.1 基础知识 6.2…...

Spring数据访问模块设计

前面我们已经完成了IoC和web模块的设计&#xff0c;聪明的码友立马就知道了&#xff0c;该到数据访问模块了&#xff0c;要不就这俩玩个6啊&#xff0c;查库势在必行&#xff0c;至此&#xff0c;它来了。 一、核心设计理念 1、痛点在哪 应用离不开数据&#xff08;数据库、No…...

【开发技术】.Net使用FFmpeg视频特定帧上绘制内容

目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法&#xff0c;当前调用一个医疗行业的AI识别算法后返回…...

C++.OpenGL (14/64)多光源(Multiple Lights)

多光源(Multiple Lights) 多光源渲染技术概览 #mermaid-svg-3L5e5gGn76TNh7Lq {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-3L5e5gGn76TNh7Lq .error-icon{fill:#552222;}#mermaid-svg-3L5e5gGn76TNh7Lq .erro…...

【Android】Android 开发 ADB 常用指令

查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...

mac:大模型系列测试

0 MAC 前几天经过学生优惠以及国补17K入手了mac studio,然后这两天亲自测试其模型行运用能力如何&#xff0c;是否支持微调、推理速度等能力。下面进入正文。 1 mac 与 unsloth 按照下面的进行安装以及测试&#xff0c;是可以跑通文章里面的代码。训练速度也是很快的。 注意…...

Axure 下拉框联动

实现选省、选完省之后选对应省份下的市区...