当前位置: 首页 > news >正文

Springboot项目如何消费Kafka数据

目录

  • 一、引入依赖
  • 二、添加Kafka配置
  • 三、创建 Kafka 消费者
    • (一)Kafka生产的消息是JSON 字符串
      • 1、方式一
      • 2、方式二:需要直接访问消息元数据
    • (二)Kafka生产的消息是对象Order
  • 四、创建 启动类
  • 五、配置 Kafka 生产者(可选)
    • (一)消息类型为json串
    • (二)消息类型为对象Order
  • 六、启动 Kafka 服务
  • 七、测试 Kafka 消费者
  • 九、测试和调试
  • 十、 结语

一、引入依赖

你需要在 pom.xml 中添加 spring-kafka 相关依赖:

<dependencies><!-- Spring Boot Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- Spring Boot Starter for Logging (optional but useful for debugging) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></dependency><!-- Spring Boot Starter for Testing --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>

二、添加Kafka配置

在 application.yml 或 application.properties 文件中配置 Kafka 连接属性:

  • application.yml 示例:
spring:kafka:bootstrap-servers: localhost:9092  # Kafka服务器地址consumer:group-id: my-consumer-group   # 消费者组IDauto-offset-reset: earliest   # 消费者从头开始读取(如果没有已提交的偏移量)key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # 设置key的反序列化器value-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # 设置value的反序列化器为字符串listener:missing-topics-fatal: false    # 如果主题不存在,不抛出致命错误
  • application.properties 示例:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.missing-topics-fatal=false
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer  # 设置key的反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer  # 设置value的反序列化器为字符串
  • 注意:spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 设置key的反序列化器
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 设置value的反序列化器为字符串
    以上配置说明Kafka生产的数据是json字符串,那么消费接收的数据默认也是json字符串,如果接收消息想用对象接受,需要自定义序列化器,比如以下配置
spring:kafka:producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer  # 对 Key 使用 StringSerializervalue-serializer: org.springframework.kafka.support.serializer.ErrorHandlingSerializer  # 对 Value 使用 ErrorHandlingSerializerproperties:spring.json.value.default.type: com.example.Order  # 默认的 JSON 反序列化目标类型为 Order

三、创建 Kafka 消费者

创建一个 Kafka 消费者类来处理消息。你可以使用 @KafkaListener 注解来监听 Kafka 中的消息

(一)Kafka生产的消息是JSON 字符串

1、方式一

  • 如果消息是 JSON 字符串,你可以使用 StringDeserializer 获取消息后,再使用 ObjectMapper 将其转换为
    Java 对象(如 Order)。
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;@Service
@EnableKafka  // 启用 Kafka 消费者
public class KafkaConsumer {private final ObjectMapper objectMapper = new ObjectMapper();// 监听 Kafka 中的 order-topic 主题@KafkaListener(topics = "order-topic", groupId = "order-consumer-group")public void consumeOrder(String message) {try {// 将 JSON 字符串反序列化为 Order 对象Order order = objectMapper.readValue(message, Order.class);System.out.println("Received order: " + order);} catch (Exception e) {e.printStackTrace();}}}

说明:

  • @KafkaListener(topics = “my-topic”, groupId = “my-consumer-group”):
    topics 表示监听的 Kafka 主题,groupId 表示消费者所属的消费者组。
  • listen(String message): 该方法会被调用来处理收到的每条消息。在此示例中,我们打印出消息内容。

2、方式二:需要直接访问消息元数据

  • 可以通过 ConsumerRecord 来接收 Kafka 消息。这种方式适用于需要直接访问消息元数据(如
    topic、partition、offset)的场景,也适合手动管理消息消费和偏移量提交的情况。
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumer {// 监听 Kafka 中的 order-topic 主题@KafkaListener(topics = "order-topic", groupId = "order-consumer-group")public void consumeOrder(ConsumerRecord<String, String> record) {// 获取消息的详细信息String key = record.key();           // 获取消息的 keyString value = record.value();       // 获取消息的 valueString topic = record.topic();       // 获取消息的 topicint partition = record.partition(); // 获取消息的分区long offset = record.offset();      // 获取消息的偏移量long timestamp = record.timestamp(); // 获取消息的时间戳// 处理消息(这里我们只是打印消息)System.out.println("Consumed record: ");System.out.println("Key: " + key);System.out.println("Value: " + value);System.out.println("Topic: " + topic);System.out.println("Partition: " + partition);System.out.println("Offset: " + offset);System.out.println("Timestamp: " + timestamp);}
}

(二)Kafka生产的消息是对象Order

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumer {// 监听 Kafka 中的 order-topic 主题@KafkaListener(topics = "order-topic", groupId = "order-consumer-group")public void consumeOrder(ConsumerRecord<String, Order> record) {// 获取消息的详细信息String key = record.key();           // 获取消息的 keyOrder value = record.value();       // 获取消息的 valueString topic = record.topic();       // 获取消息的 topicint partition = record.partition(); // 获取消息的分区long offset = record.offset();      // 获取消息的偏移量long timestamp = record.timestamp(); // 获取消息的时间戳// 处理消息(这里我们只是打印消息)System.out.println("Consumed record: ");System.out.println("Key: " + key);System.out.println("Value: " + value);System.out.println("Topic: " + topic);System.out.println("Partition: " + partition);System.out.println("Offset: " + offset);System.out.println("Timestamp: " + timestamp);}
}

四、创建 启动类

确保你的 Spring Boot 启动类正确配置了 Spring Boot 应用程序启动。

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class KafkaConsumerApplication {public static void main(String[] args) {SpringApplication.run(KafkaConsumerApplication.class, args);}}

五、配置 Kafka 生产者(可选)

(一)消息类型为json串

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;@Service
@EnableKafka
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;  // 发送的是 String 类型消息private ObjectMapper objectMapper = new ObjectMapper();  // Jackson ObjectMapper 用于序列化// 发送订单到 Kafkapublic void sendOrder(String topic, Order order) {try {// 将 Order 对象转换为 JSON 字符串String orderJson = objectMapper.writeValueAsString(order);// 发送 JSON 字符串到 KafkakafkaTemplate.send(topic, orderJson);  // 发送字符串消息System.out.println("Order JSON sent to Kafka: " + orderJson);} catch (Exception e) {e.printStackTrace();}}
}

(二)消息类型为对象Order

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;@Service
@EnableKafka
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, Order> kafkaTemplate;// 发送订单到 Kafkapublic void sendOrder(String topic, Order order) {kafkaTemplate.send(topic, order);  // 发送订单对象,Spring Kafka 会自动将 Order 转换为 JSON}
}

六、启动 Kafka 服务

启动 Kafka 服务

bin/kafka-server-start.sh config/server.properties

七、测试 Kafka 消费者

你可以通过向 Kafka 发送消息来测试消费者是否工作正常。假设你已经在 Kafka 中创建了一个名为 my-topic 的主题,可以使用 KafkaProducer 来发送消息:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {@Autowiredprivate KafkaProducer kafkaProducer;@GetMapping("/sendOrder")public String sendOrder() {Order order = new Order();order.setOrderId(1L);order.setUserId(123L);order.setProduct("Laptop");order.setQuantity(2);order.setStatus("Created");kafkaProducer.sendOrder("order-topic", order);return "Order sent!";}
}

当你访问 /sendOrder端点时,KafkaProducer 会将消息发送到 Kafka,KafkaConsumer 会接收到这条消息并打印出来。

九、测试和调试

你可以通过查看 Kafka 消费者日志,确保消息已经被成功消费。你还可以使用 KafkaTemplate 发送消息,并确保 Kafka 生产者和消费者之间的连接正常。

十、 结语

至此,你已经在 Spring Boot 中成功配置并实现了 Kafka 消费者和生产者。你可以根据需要扩展功能,例如处理更复杂的消息类型、批量消费等。

相关文章:

Springboot项目如何消费Kafka数据

目录 一、引入依赖二、添加Kafka配置三、创建 Kafka 消费者&#xff08;一&#xff09;Kafka生产的消息是JSON 字符串1、方式一2、方式二&#xff1a;需要直接访问消息元数据 &#xff08;二&#xff09;Kafka生产的消息是对象Order 四、创建 启动类五、配置 Kafka 生产者&…...

LeetCode 热题 100 | 子串

子串基础 前缀和&#xff1a;前面的数加在一起等于多少&#xff0c;放进map里&#xff0c;key为和&#xff0c;value为这个和出现的次数。单调队列&#xff1a;单调递增/递减队列&#xff0c;每次加入新元素&#xff0c;比新元素大/小的元素全部弹出。滑动窗口&#xff1a;两层…...

深度学习笔记11-优化器对比实验(Tensorflow)

&#x1f368; 本文为&#x1f517;365天深度学习训练营中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 目录 一、导入数据并检查 二、配置数据集 三、数据可视化 四、构建模型 五、训练模型 六、模型对比评估 七、总结 一、导入数据并检查 import pathlib,…...

【掌握 JavaScript 数组迭代:map 和 includes 的使用技巧】

map map()方法是数组原型的一个函数&#xff0c;用于对数组的每个元素执行一个函数&#xff0c;并返回一个新的数组&#xff0c;其中包含么哦一个元素执行的结果。 语法 const newArray array.map(callback(currentValue, index, arr), thisValue)参数 callback&#xff1…...

深入浅出 Android AES 加密解密:从理论到实战

深入浅出 Android AES 加密解密&#xff1a;从理论到实战 在现代移动应用中&#xff0c;数据安全是不可忽视的一环。无论是用户隐私保护&#xff0c;还是敏感信息的存储与传输&#xff0c;加密技术都扮演着重要角色。本文将以 AES&#xff08;Advanced Encryption Standard&am…...

Clickhouse基础(一)

数据存储的目录&#xff0c;在存储数据时是先经过压缩后再存储的&#xff0c;压缩效率很高 操作命令&#xff1a; sudo clickhouse start sudo clickhouse restart sudo clickhouse status进入clickhouse clickhouse-client -mCREATE TABLE db_13.t_assist (modelId UInt64,…...

深度学习|表示学习|一个神经元可以干什么|02

如是我闻&#xff1a; 如果我们只有一个神经元&#xff08;即一个单一的线性或非线性函数&#xff09;&#xff0c;仍然可以完成一些简单的任务。以下是一个神经元可以实现的功能和应用&#xff1a; 1. 实现简单的线性分类 输入&#xff1a;一组特征向量 x x x 输出&#xff…...

ubuntu22.04降级安装CUDA11.3

环境&#xff1a;主机x64的ubuntu22.04&#xff0c;原有CUDA12.1&#xff0c;但是现在需要CUDA11.3&#xff0c;本篇文章介绍步骤。 一、下载CUDA11.3的run文件 下载网址&#xff1a;https://developer.nvidia.com/cuda-11-3-1-download-archive?target_osLinux&target_…...

为AI聊天工具添加一个知识系统 之32 三“中”全“会”:推理式的ISA(父类)和IOS(母本)以及生成式CMN (双亲委派)之1

本文要点和问题 要点 三“中”全“会”&#xff1a;推理式的ISA的&#xff08;父类-父类源码&#xff09;和IOS的&#xff08;母本-母类脚本&#xff09;以及生成式 CMN &#xff08;双亲委派-子类实例&#xff09;。 数据中台三端架构的中间端(信息系统架构ISA &#xff1a…...

Python----Python高级(函数基础,形参和实参,参数传递,全局变量和局部变量,匿名函数,递归函数,eval()函数,LEGB规则)

一、函数基础 1.1、函数的用法和底层分析 函数是可重用的程序代码块。 函数的作用&#xff0c;不仅可以实现代码的复用&#xff0c;更能实现代码的一致性。一致性指的是&#xff0c;只要修改函数的代码&#xff0c;则所有调用该函数的地方都能得到体现。 在编写函数时&#xf…...

spring解决循环依赖的通俗理解

目录标题 1、什么是循环依赖2、解决循环依赖的原理3、Spring通过三级缓存解决循环依赖4、为什么要使用三级缓存而不是二级缓存&#xff1f;5、三级缓存中存放的是lambda表达式而不是一个半成品对象 1、什么是循环依赖 众所周知&#xff0c;Spring的容器中管理整个体系的bean对…...

用 Python 从零开始创建神经网络(十九):真实数据集

真实数据集 引言数据准备数据加载数据预处理数据洗牌批次&#xff08;Batches&#xff09;训练&#xff08;Training&#xff09;到目前为止的全部代码&#xff1a; 引言 在实践中&#xff0c;深度学习通常涉及庞大的数据集&#xff08;通常以TB甚至更多为单位&#xff09;&am…...

介绍PyTorch张量

介绍PyTorch张量 介绍PyTorch张量 PyTorch张量是我们在PyTorch中编程神经网络时将使用的数据结构。 在编程神经网络时&#xff0c;数据预处理通常是整个过程的第一步&#xff0c;数据预处理的一个目标是将原始输入数据转换为张量形式。 torch.Tensor​类的实例 PyTorch张量…...

Vision Transformer (ViT)原理

Vision Transformer (ViT)原理 flyfish Transformer缺乏卷积神经网络&#xff08;CNNs&#xff09;的归纳偏差&#xff08;inductive biases&#xff09;&#xff0c;比如平移不变性和局部受限的感受野。不变性意味着即使实体entity&#xff08;即对象&#xff09;的外观或位…...

移动云自研云原生数据库入围国采!

近日&#xff0c;中央国家机关2024年度事务型数据库软件框架协议联合征集采购项目产品名单正式公布&#xff0c;移动云自主研发的云原生数据库产品顺利入围。这一成就不仅彰显了移动云在数据库领域深耕多年造就的领先技术优势&#xff0c;更标志着国家权威评审机构对移动云在数…...

Unity中对象池的使用(用一个简单粗暴的例子)

问题描述&#xff1a;Unity在创建和销毁对象的时候是很消耗性能的&#xff0c;所以我们在销毁一个对象的时候&#xff0c;可以不用Destroy&#xff0c;而是将这个物体隐藏后放到回收池里面&#xff0c;当再次需要的时候如果回收池里面有之前回收的对象&#xff0c;就直接拿来用…...

linux命令行连接Postgresql常用命令

1.linux系统命令行连接数据库命令 psql -h hostname -p port -U username -d databasename -h 主机名或IP地址 -p 端口 -U 用户名 -d 连接的数据库 2.查询数据库表命令 select version() #查看版本号 \dg #查看用户 \l #查询数据库 \c mydb #切换…...

每日一题-单链表排序

为了对给定的单链表按升序排序&#xff0c;我们可以考虑以下解决方法&#xff1a; 思路 归并排序&#xff08;Merge Sort&#xff09;&#xff1a;由于归并排序的时间复杂度为 O ( n log ⁡ n ) O(n \log n) O(nlogn)&#xff0c;并且归并排序不需要额外的空间&#xff08;空…...

webpack04服务器配置

webpack配置 entryoutput filenamepathpublicPath 。。 打包引入的基本路径&#xff0c;&#xff0c;&#xff0c;比如引入一个bundle.js,。引用之后的路径就是 publicPathfilename -devServer:static : 静态文件的位置。。。hostportopencompress : 静态资源是否用gzip压缩hi…...

JDK下载安装配置

一.JDK安装配置。 1.安装注意路径,其他直接下一步。 2.配置。 下接第4步. 或者 代码复制: JAVA_HOME D:\Program Files\Java\jdk1.8.0_91 %JAVA_HOME%\bin 或者直接配置 D:\Program Files\Java\jdk1.8.0_91\bin 3.验证(CMD)。 java javac java -version javac -version 二.下…...

调用支付宝接口响应40004 SYSTEM_ERROR问题排查

在对接支付宝API的时候&#xff0c;遇到了一些问题&#xff0c;记录一下排查过程。 Body:{"datadigital_fincloud_generalsaas_face_certify_initialize_response":{"msg":"Business Failed","code":"40004","sub_msg…...

RocketMQ延迟消息机制

两种延迟消息 RocketMQ中提供了两种延迟消息机制 指定固定的延迟级别 通过在Message中设定一个MessageDelayLevel参数&#xff0c;对应18个预设的延迟级别指定时间点的延迟级别 通过在Message中设定一个DeliverTimeMS指定一个Long类型表示的具体时间点。到了时间点后&#xf…...

QMC5883L的驱动

简介 本篇文章的代码已经上传到了github上面&#xff0c;开源代码 作为一个电子罗盘模块&#xff0c;我们可以通过I2C从中获取偏航角yaw&#xff0c;相对于六轴陀螺仪的yaw&#xff0c;qmc5883l几乎不会零飘并且成本较低。 参考资料 QMC5883L磁场传感器驱动 QMC5883L磁力计…...

PPT|230页| 制造集团企业供应链端到端的数字化解决方案:从需求到结算的全链路业务闭环构建

制造业采购供应链管理是企业运营的核心环节&#xff0c;供应链协同管理在供应链上下游企业之间建立紧密的合作关系&#xff0c;通过信息共享、资源整合、业务协同等方式&#xff0c;实现供应链的全面管理和优化&#xff0c;提高供应链的效率和透明度&#xff0c;降低供应链的成…...

Java - Mysql数据类型对应

Mysql数据类型java数据类型备注整型INT/INTEGERint / java.lang.Integer–BIGINTlong/java.lang.Long–––浮点型FLOATfloat/java.lang.FloatDOUBLEdouble/java.lang.Double–DECIMAL/NUMERICjava.math.BigDecimal字符串型CHARjava.lang.String固定长度字符串VARCHARjava.lang…...

HTML前端开发:JavaScript 常用事件详解

作为前端开发的核心&#xff0c;JavaScript 事件是用户与网页交互的基础。以下是常见事件的详细说明和用法示例&#xff1a; 1. onclick - 点击事件 当元素被单击时触发&#xff08;左键点击&#xff09; button.onclick function() {alert("按钮被点击了&#xff01;&…...

在鸿蒙HarmonyOS 5中使用DevEco Studio实现录音机应用

1. 项目配置与权限设置 1.1 配置module.json5 {"module": {"requestPermissions": [{"name": "ohos.permission.MICROPHONE","reason": "录音需要麦克风权限"},{"name": "ohos.permission.WRITE…...

Caliper 配置文件解析:config.yaml

Caliper 是一个区块链性能基准测试工具,用于评估不同区块链平台的性能。下面我将详细解释你提供的 fisco-bcos.json 文件结构,并说明它与 config.yaml 文件的关系。 fisco-bcos.json 文件解析 这个文件是针对 FISCO-BCOS 区块链网络的 Caliper 配置文件,主要包含以下几个部…...

MySQL用户和授权

开放MySQL白名单 可以通过iptables-save命令确认对应客户端ip是否可以访问MySQL服务&#xff1a; test: # iptables-save | grep 3306 -A mp_srv_whitelist -s 172.16.14.102/32 -p tcp -m tcp --dport 3306 -j ACCEPT -A mp_srv_whitelist -s 172.16.4.16/32 -p tcp -m tcp -…...

以光量子为例,详解量子获取方式

光量子技术获取量子比特可在室温下进行。该方式有望通过与名为硅光子学&#xff08;silicon photonics&#xff09;的光波导&#xff08;optical waveguide&#xff09;芯片制造技术和光纤等光通信技术相结合来实现量子计算机。量子力学中&#xff0c;光既是波又是粒子。光子本…...