当前位置: 首页 > news >正文

Springboot项目如何消费Kafka数据

目录

  • 一、引入依赖
  • 二、添加Kafka配置
  • 三、创建 Kafka 消费者
    • (一)Kafka生产的消息是JSON 字符串
      • 1、方式一
      • 2、方式二:需要直接访问消息元数据
    • (二)Kafka生产的消息是对象Order
  • 四、创建 启动类
  • 五、配置 Kafka 生产者(可选)
    • (一)消息类型为json串
    • (二)消息类型为对象Order
  • 六、启动 Kafka 服务
  • 七、测试 Kafka 消费者
  • 九、测试和调试
  • 十、 结语

一、引入依赖

你需要在 pom.xml 中添加 spring-kafka 相关依赖:

<dependencies><!-- Spring Boot Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- Spring Boot Starter for Logging (optional but useful for debugging) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></dependency><!-- Spring Boot Starter for Testing --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>

二、添加Kafka配置

在 application.yml 或 application.properties 文件中配置 Kafka 连接属性:

  • application.yml 示例:
spring:kafka:bootstrap-servers: localhost:9092  # Kafka服务器地址consumer:group-id: my-consumer-group   # 消费者组IDauto-offset-reset: earliest   # 消费者从头开始读取(如果没有已提交的偏移量)key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # 设置key的反序列化器value-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # 设置value的反序列化器为字符串listener:missing-topics-fatal: false    # 如果主题不存在,不抛出致命错误
  • application.properties 示例:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.missing-topics-fatal=false
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer  # 设置key的反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer  # 设置value的反序列化器为字符串
  • 注意:spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 设置key的反序列化器
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 设置value的反序列化器为字符串
    以上配置说明Kafka生产的数据是json字符串,那么消费接收的数据默认也是json字符串,如果接收消息想用对象接受,需要自定义序列化器,比如以下配置
spring:kafka:producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer  # 对 Key 使用 StringSerializervalue-serializer: org.springframework.kafka.support.serializer.ErrorHandlingSerializer  # 对 Value 使用 ErrorHandlingSerializerproperties:spring.json.value.default.type: com.example.Order  # 默认的 JSON 反序列化目标类型为 Order

三、创建 Kafka 消费者

创建一个 Kafka 消费者类来处理消息。你可以使用 @KafkaListener 注解来监听 Kafka 中的消息

(一)Kafka生产的消息是JSON 字符串

1、方式一

  • 如果消息是 JSON 字符串,你可以使用 StringDeserializer 获取消息后,再使用 ObjectMapper 将其转换为
    Java 对象(如 Order)。
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;@Service
@EnableKafka  // 启用 Kafka 消费者
public class KafkaConsumer {private final ObjectMapper objectMapper = new ObjectMapper();// 监听 Kafka 中的 order-topic 主题@KafkaListener(topics = "order-topic", groupId = "order-consumer-group")public void consumeOrder(String message) {try {// 将 JSON 字符串反序列化为 Order 对象Order order = objectMapper.readValue(message, Order.class);System.out.println("Received order: " + order);} catch (Exception e) {e.printStackTrace();}}}

说明:

  • @KafkaListener(topics = “my-topic”, groupId = “my-consumer-group”):
    topics 表示监听的 Kafka 主题,groupId 表示消费者所属的消费者组。
  • listen(String message): 该方法会被调用来处理收到的每条消息。在此示例中,我们打印出消息内容。

2、方式二:需要直接访问消息元数据

  • 可以通过 ConsumerRecord 来接收 Kafka 消息。这种方式适用于需要直接访问消息元数据(如
    topic、partition、offset)的场景,也适合手动管理消息消费和偏移量提交的情况。
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumer {// 监听 Kafka 中的 order-topic 主题@KafkaListener(topics = "order-topic", groupId = "order-consumer-group")public void consumeOrder(ConsumerRecord<String, String> record) {// 获取消息的详细信息String key = record.key();           // 获取消息的 keyString value = record.value();       // 获取消息的 valueString topic = record.topic();       // 获取消息的 topicint partition = record.partition(); // 获取消息的分区long offset = record.offset();      // 获取消息的偏移量long timestamp = record.timestamp(); // 获取消息的时间戳// 处理消息(这里我们只是打印消息)System.out.println("Consumed record: ");System.out.println("Key: " + key);System.out.println("Value: " + value);System.out.println("Topic: " + topic);System.out.println("Partition: " + partition);System.out.println("Offset: " + offset);System.out.println("Timestamp: " + timestamp);}
}

(二)Kafka生产的消息是对象Order

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumer {// 监听 Kafka 中的 order-topic 主题@KafkaListener(topics = "order-topic", groupId = "order-consumer-group")public void consumeOrder(ConsumerRecord<String, Order> record) {// 获取消息的详细信息String key = record.key();           // 获取消息的 keyOrder value = record.value();       // 获取消息的 valueString topic = record.topic();       // 获取消息的 topicint partition = record.partition(); // 获取消息的分区long offset = record.offset();      // 获取消息的偏移量long timestamp = record.timestamp(); // 获取消息的时间戳// 处理消息(这里我们只是打印消息)System.out.println("Consumed record: ");System.out.println("Key: " + key);System.out.println("Value: " + value);System.out.println("Topic: " + topic);System.out.println("Partition: " + partition);System.out.println("Offset: " + offset);System.out.println("Timestamp: " + timestamp);}
}

四、创建 启动类

确保你的 Spring Boot 启动类正确配置了 Spring Boot 应用程序启动。

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class KafkaConsumerApplication {public static void main(String[] args) {SpringApplication.run(KafkaConsumerApplication.class, args);}}

五、配置 Kafka 生产者(可选)

(一)消息类型为json串

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;@Service
@EnableKafka
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;  // 发送的是 String 类型消息private ObjectMapper objectMapper = new ObjectMapper();  // Jackson ObjectMapper 用于序列化// 发送订单到 Kafkapublic void sendOrder(String topic, Order order) {try {// 将 Order 对象转换为 JSON 字符串String orderJson = objectMapper.writeValueAsString(order);// 发送 JSON 字符串到 KafkakafkaTemplate.send(topic, orderJson);  // 发送字符串消息System.out.println("Order JSON sent to Kafka: " + orderJson);} catch (Exception e) {e.printStackTrace();}}
}

(二)消息类型为对象Order

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;@Service
@EnableKafka
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, Order> kafkaTemplate;// 发送订单到 Kafkapublic void sendOrder(String topic, Order order) {kafkaTemplate.send(topic, order);  // 发送订单对象,Spring Kafka 会自动将 Order 转换为 JSON}
}

六、启动 Kafka 服务

启动 Kafka 服务

bin/kafka-server-start.sh config/server.properties

七、测试 Kafka 消费者

你可以通过向 Kafka 发送消息来测试消费者是否工作正常。假设你已经在 Kafka 中创建了一个名为 my-topic 的主题,可以使用 KafkaProducer 来发送消息:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {@Autowiredprivate KafkaProducer kafkaProducer;@GetMapping("/sendOrder")public String sendOrder() {Order order = new Order();order.setOrderId(1L);order.setUserId(123L);order.setProduct("Laptop");order.setQuantity(2);order.setStatus("Created");kafkaProducer.sendOrder("order-topic", order);return "Order sent!";}
}

当你访问 /sendOrder端点时,KafkaProducer 会将消息发送到 Kafka,KafkaConsumer 会接收到这条消息并打印出来。

九、测试和调试

你可以通过查看 Kafka 消费者日志,确保消息已经被成功消费。你还可以使用 KafkaTemplate 发送消息,并确保 Kafka 生产者和消费者之间的连接正常。

十、 结语

至此,你已经在 Spring Boot 中成功配置并实现了 Kafka 消费者和生产者。你可以根据需要扩展功能,例如处理更复杂的消息类型、批量消费等。

相关文章:

Springboot项目如何消费Kafka数据

目录 一、引入依赖二、添加Kafka配置三、创建 Kafka 消费者&#xff08;一&#xff09;Kafka生产的消息是JSON 字符串1、方式一2、方式二&#xff1a;需要直接访问消息元数据 &#xff08;二&#xff09;Kafka生产的消息是对象Order 四、创建 启动类五、配置 Kafka 生产者&…...

LeetCode 热题 100 | 子串

子串基础 前缀和&#xff1a;前面的数加在一起等于多少&#xff0c;放进map里&#xff0c;key为和&#xff0c;value为这个和出现的次数。单调队列&#xff1a;单调递增/递减队列&#xff0c;每次加入新元素&#xff0c;比新元素大/小的元素全部弹出。滑动窗口&#xff1a;两层…...

深度学习笔记11-优化器对比实验(Tensorflow)

&#x1f368; 本文为&#x1f517;365天深度学习训练营中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 目录 一、导入数据并检查 二、配置数据集 三、数据可视化 四、构建模型 五、训练模型 六、模型对比评估 七、总结 一、导入数据并检查 import pathlib,…...

【掌握 JavaScript 数组迭代:map 和 includes 的使用技巧】

map map()方法是数组原型的一个函数&#xff0c;用于对数组的每个元素执行一个函数&#xff0c;并返回一个新的数组&#xff0c;其中包含么哦一个元素执行的结果。 语法 const newArray array.map(callback(currentValue, index, arr), thisValue)参数 callback&#xff1…...

深入浅出 Android AES 加密解密:从理论到实战

深入浅出 Android AES 加密解密&#xff1a;从理论到实战 在现代移动应用中&#xff0c;数据安全是不可忽视的一环。无论是用户隐私保护&#xff0c;还是敏感信息的存储与传输&#xff0c;加密技术都扮演着重要角色。本文将以 AES&#xff08;Advanced Encryption Standard&am…...

Clickhouse基础(一)

数据存储的目录&#xff0c;在存储数据时是先经过压缩后再存储的&#xff0c;压缩效率很高 操作命令&#xff1a; sudo clickhouse start sudo clickhouse restart sudo clickhouse status进入clickhouse clickhouse-client -mCREATE TABLE db_13.t_assist (modelId UInt64,…...

深度学习|表示学习|一个神经元可以干什么|02

如是我闻&#xff1a; 如果我们只有一个神经元&#xff08;即一个单一的线性或非线性函数&#xff09;&#xff0c;仍然可以完成一些简单的任务。以下是一个神经元可以实现的功能和应用&#xff1a; 1. 实现简单的线性分类 输入&#xff1a;一组特征向量 x x x 输出&#xff…...

ubuntu22.04降级安装CUDA11.3

环境&#xff1a;主机x64的ubuntu22.04&#xff0c;原有CUDA12.1&#xff0c;但是现在需要CUDA11.3&#xff0c;本篇文章介绍步骤。 一、下载CUDA11.3的run文件 下载网址&#xff1a;https://developer.nvidia.com/cuda-11-3-1-download-archive?target_osLinux&target_…...

为AI聊天工具添加一个知识系统 之32 三“中”全“会”:推理式的ISA(父类)和IOS(母本)以及生成式CMN (双亲委派)之1

本文要点和问题 要点 三“中”全“会”&#xff1a;推理式的ISA的&#xff08;父类-父类源码&#xff09;和IOS的&#xff08;母本-母类脚本&#xff09;以及生成式 CMN &#xff08;双亲委派-子类实例&#xff09;。 数据中台三端架构的中间端(信息系统架构ISA &#xff1a…...

Python----Python高级(函数基础,形参和实参,参数传递,全局变量和局部变量,匿名函数,递归函数,eval()函数,LEGB规则)

一、函数基础 1.1、函数的用法和底层分析 函数是可重用的程序代码块。 函数的作用&#xff0c;不仅可以实现代码的复用&#xff0c;更能实现代码的一致性。一致性指的是&#xff0c;只要修改函数的代码&#xff0c;则所有调用该函数的地方都能得到体现。 在编写函数时&#xf…...

spring解决循环依赖的通俗理解

目录标题 1、什么是循环依赖2、解决循环依赖的原理3、Spring通过三级缓存解决循环依赖4、为什么要使用三级缓存而不是二级缓存&#xff1f;5、三级缓存中存放的是lambda表达式而不是一个半成品对象 1、什么是循环依赖 众所周知&#xff0c;Spring的容器中管理整个体系的bean对…...

用 Python 从零开始创建神经网络(十九):真实数据集

真实数据集 引言数据准备数据加载数据预处理数据洗牌批次&#xff08;Batches&#xff09;训练&#xff08;Training&#xff09;到目前为止的全部代码&#xff1a; 引言 在实践中&#xff0c;深度学习通常涉及庞大的数据集&#xff08;通常以TB甚至更多为单位&#xff09;&am…...

介绍PyTorch张量

介绍PyTorch张量 介绍PyTorch张量 PyTorch张量是我们在PyTorch中编程神经网络时将使用的数据结构。 在编程神经网络时&#xff0c;数据预处理通常是整个过程的第一步&#xff0c;数据预处理的一个目标是将原始输入数据转换为张量形式。 torch.Tensor​类的实例 PyTorch张量…...

Vision Transformer (ViT)原理

Vision Transformer (ViT)原理 flyfish Transformer缺乏卷积神经网络&#xff08;CNNs&#xff09;的归纳偏差&#xff08;inductive biases&#xff09;&#xff0c;比如平移不变性和局部受限的感受野。不变性意味着即使实体entity&#xff08;即对象&#xff09;的外观或位…...

移动云自研云原生数据库入围国采!

近日&#xff0c;中央国家机关2024年度事务型数据库软件框架协议联合征集采购项目产品名单正式公布&#xff0c;移动云自主研发的云原生数据库产品顺利入围。这一成就不仅彰显了移动云在数据库领域深耕多年造就的领先技术优势&#xff0c;更标志着国家权威评审机构对移动云在数…...

Unity中对象池的使用(用一个简单粗暴的例子)

问题描述&#xff1a;Unity在创建和销毁对象的时候是很消耗性能的&#xff0c;所以我们在销毁一个对象的时候&#xff0c;可以不用Destroy&#xff0c;而是将这个物体隐藏后放到回收池里面&#xff0c;当再次需要的时候如果回收池里面有之前回收的对象&#xff0c;就直接拿来用…...

linux命令行连接Postgresql常用命令

1.linux系统命令行连接数据库命令 psql -h hostname -p port -U username -d databasename -h 主机名或IP地址 -p 端口 -U 用户名 -d 连接的数据库 2.查询数据库表命令 select version() #查看版本号 \dg #查看用户 \l #查询数据库 \c mydb #切换…...

每日一题-单链表排序

为了对给定的单链表按升序排序&#xff0c;我们可以考虑以下解决方法&#xff1a; 思路 归并排序&#xff08;Merge Sort&#xff09;&#xff1a;由于归并排序的时间复杂度为 O ( n log ⁡ n ) O(n \log n) O(nlogn)&#xff0c;并且归并排序不需要额外的空间&#xff08;空…...

webpack04服务器配置

webpack配置 entryoutput filenamepathpublicPath 。。 打包引入的基本路径&#xff0c;&#xff0c;&#xff0c;比如引入一个bundle.js,。引用之后的路径就是 publicPathfilename -devServer:static : 静态文件的位置。。。hostportopencompress : 静态资源是否用gzip压缩hi…...

JDK下载安装配置

一.JDK安装配置。 1.安装注意路径,其他直接下一步。 2.配置。 下接第4步. 或者 代码复制: JAVA_HOME D:\Program Files\Java\jdk1.8.0_91 %JAVA_HOME%\bin 或者直接配置 D:\Program Files\Java\jdk1.8.0_91\bin 3.验证(CMD)。 java javac java -version javac -version 二.下…...

看BEYOND REALITY Z-Image如何生成电影级人像:高清作品案例大赏

看BEYOND REALITY Z-Image如何生成电影级人像&#xff1a;高清作品案例大赏 你有没有想过&#xff0c;用AI生成一张人像照片&#xff0c;能逼真到什么程度&#xff1f;是那种一眼就能看出“AI味”的塑料感&#xff0c;还是无限接近真实胶片摄影的细腻质感&#xff1f;今天&…...

别只盯着对接分数!用PyMOL手把手教你目视筛查分子对接结果的3个关键点(氢键、疏水、应变能)

别只盯着对接分数&#xff01;用PyMOL手把手教你目视筛查分子对接结果的3个关键点&#xff08;氢键、疏水、应变能&#xff09; 刚拿到分子对接结果时&#xff0c;很多初学者会陷入一个误区——过度关注对接分数&#xff08;docking score&#xff09;这个单一指标。实际上&…...

腾讯混元HY-MT1.5-1.8B翻译模型:开箱即用的本地化部署方案

腾讯混元HY-MT1.5-1.8B翻译模型&#xff1a;开箱即用的本地化部署方案 1. 引言&#xff1a;为什么选择本地化翻译模型 在当今全球化的商业环境中&#xff0c;跨语言沟通已成为日常工作的重要组成部分。传统云端翻译服务虽然方便&#xff0c;但在数据安全、网络依赖和响应速度…...

Granite-4.0-H-350M在数学建模竞赛中的应用:算法优化

Granite-4.0-H-350M在数学建模竞赛中的应用&#xff1a;算法优化 1. 数学建模竞赛中的真实痛点 数学建模竞赛对参赛者来说从来都不是轻松的任务。从拿到题目到提交最终报告&#xff0c;通常只有短短几天时间&#xff0c;而在这有限的时间里&#xff0c;团队需要完成问题理解、…...

多模态扩展:OpenClaw调用Qwen3-32B实现截图内容分析

多模态扩展&#xff1a;OpenClaw调用Qwen3-32B实现截图内容分析 1. 为什么需要截图内容分析能力 去年我在整理技术文档时&#xff0c;经常遇到这样的场景&#xff1a;某个软件界面的配置项需要记录下来&#xff0c;但手动抄写既费时又容易出错。当时我尝试过各种OCR工具&…...

Stable Diffusion v1.5实时生成系统:5分钟搭建,实时查看图片生成全过程

Stable Diffusion v1.5实时生成系统&#xff1a;5分钟搭建&#xff0c;实时查看图片生成全过程 1. 项目介绍&#xff1a;打破黑盒的生成体验 你是否曾经在使用Stable Diffusion时感到困惑&#xff1f;输入提示词后&#xff0c;只能盯着进度条干等&#xff0c;不知道模型内部发…...

网站社交媒体推广对SEO有什么作用_图片和视频如何优化以提高搜索引擎收录

网站社交媒体推广对SEO有什么作用 在当前数字化时代&#xff0c;网站的SEO&#xff08;搜索引擎优化&#xff09;已经成为任何希望提升在线存在感的企业和个人的首要任务。SEO并不仅仅是关于在网站上优化文本内容。如今&#xff0c;社交媒体推广也在这一过程中发挥着越来越重要…...

搜索关键词SEO优化需要多长时间才能看到效果_搜索关键词SEO优化需要多少预算投入

搜索关键词SEO优化需要多长时间才能看到效果_搜索关键词SEO优化需要多少预算投入 在当今互联网时代&#xff0c;搜索引擎优化&#xff08;SEO&#xff09;是每个网站和在线企业提升流量、吸引潜在客户的重要手段。许多人在进行SEO优化时常常会疑惑&#xff1a;“搜索关键词SEO…...

OpenClaw跨平台控制:千问3.5-35B-A3B-FP8任务手机端触发方案

OpenClaw跨平台控制&#xff1a;千问3.5-35B-A3B-FP8任务手机端触发方案 1. 为什么需要移动端触发自动化任务&#xff1f; 上周三凌晨两点&#xff0c;我被手机闹铃惊醒——服务器监控报警显示生产环境出现异常。当我手忙脚乱打开电脑准备排查时&#xff0c;突然想到&#xf…...

Windows虚拟机中部署黑群晖7.2 NAS:从零搭建到内网穿透全攻略

1. 为什么要在Windows虚拟机跑黑群晖&#xff1f; 很多朋友第一次听说在Windows里装黑群晖都会觉得奇怪——NAS不是应该用实体机吗&#xff1f;我最初也是这么想的&#xff0c;直到去年家里老笔记本闲置下来&#xff0c;实测发现用虚拟机跑群晖不仅省电省钱&#xff0c;还能实现…...