003 SpringBoot集成Kafka操作
4.SpringBoot集成Kafka
文章目录
- 4.SpringBoot集成Kafka
- 1.入门示例
- 2.yml完整配置
- 3.关键配置注释说明
- 1. 生产者优化参数
- 2. 消费者可靠性配置
- 3. 监听器高级特性
- 4. 安全认证配置
- 4.配置验证方法
- 5.不同场景配置模板
- 场景1:高吞吐日志收集
- 场景2:金融级事务消息
- 场景3:跨数据中心同步
- 5.高级配置
- 1.事务支持
- 2.消息重试与死信队列
来源参考的deepseek,如有侵权联系立删
1.入门示例
1.pom依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
2.KafkaProducer消息生产者配置
@Component
@Slf4j
public class KafkaProducer {private HashMap map=new HashMap<>();@Autowiredprivate KafkaTemplate<Integer,String> kafkaTemplate;public void send(String topic,String msg){log.info("开始发送消息,topic:{};message:{}",topic,msg);ListenableFuture<SendResult<Integer,String>> send=kafkaTemplate.send(topic, msg);//消息确认机制send.addCallback(new ListenableFutureCallback<SendResult<Integer,String>>(){@Overridepublic void onSuccess(SendResult<Integer, String> result) {log.info("消息发送成功,topic:{};message:{}",topic,msg);}@Overridepublic void onFailure(Throwable ex) {//落库操作map.put(topic,msg);}});}
}
springboot3.x写法
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import lombok.RequiredArgsConstructor;@Service
@RequiredArgsConstructor
public class KafkaProducerService {private final KafkaTemplate<String, String> kafkaTemplate;// 同步发送(阻塞等待确认)public void sendMessageSync(String topic, String key, String value) {kafkaTemplate.send(topic, key, value).whenComplete((result, ex) -> {if (ex == null) {System.out.printf("发送成功:topic=%s, partition=%d, offset=%d%n",result.getRecordMetadata().topic(),result.getRecordMetadata().partition(),result.getRecordMetadata().offset());} else {System.err.println("发送失败:" + ex.getMessage());}});}// 异步发送(默认方式)public void sendMessageAsync(String topic, String message) {kafkaTemplate.send(topic, message);}
}
- Spring Boot 2.x:
send()返回ListenableFuture<SendResult>,支持addCallback()回调。 - Spring Boot 3.x:
send()返回CompletableFuture<SendResult>,弃用ListenableFuture,因此需要使用CompletableFuture的 API(如whenComplete)。
3.KafkaConsumer消息消费
@Component
@Slf4j
public class KafkaConsumer {private List<String> exist=new ArrayList<>();@KafkaListener(topics = {"lx"},groupId = "lx")public void consumer(ConsumerRecord<Integer,String> record){if (exist.contains(record.value())){log.error("不满足幂等校验!!!");}log.info("消息消费成功,topic:{},message:{}", record.topic(), record.value());exist.add(record.value());}
}
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;@Component
public class KafkaConsumerService {// 单个消息消费(手动提交偏移量)@KafkaListener(topics = "test-topic", groupId = "spring-group")public void listenMessage(String message, Acknowledgment ack) {System.out.println("收到消息:" + message);ack.acknowledge(); // 手动提交}// 批量消费(需配置 listener.type=batch)@KafkaListener(topics = "batch-topic", groupId = "spring-group")public void listenBatch(List<String> messages, Acknowledgment ack) {messages.forEach(msg -> System.out.println("批量消息:" + msg));ack.acknowledge();}
}
4.yml配置文件
生产者配置
#kafka配置
spring:kafka:#kafka集群地址# bootstraps-server: 192.168.25.100:9092bootstrap-servers: 47.122.26.22:9092producer:#批量发送的数据量大小batch-size: 1#可用发送数量的最大缓存buffer-memory: 33554432#key序列化器key-serializer: org.apache.kafka.common.serialization.StringSerializer#value序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer#达到多少时间后,会发送properties:linger.ms: 1# 禁止生产者触发 Topic 创建请求allow.auto.create.topics: false#代表集群中从节点都持久化后才认为发送成功acks: -1
消费者配置
spring:kafka:#kafka集群地址bootstraps-server: 192.168.25.100:9092consumer:enable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:session.timeout.ms: 15000# 禁用生产者触发 Topic 元数据请求时自动创建allow.auto.create.topics: falsegroup-id: testauto-offset-reset: earliestlistener:ack-mode: manual_immediate # 精准控制offset提交concurrency: 3 # 并发消费者数type: batch
5.实体类
@Data
public class KafkaRequest {/*** 主题*/private String topic;/*** 消息*/private String message;
}
6.消息发送
@RestController
@Slf4j
public class KafkaController {private final String topic="lx";private int temp=1;@Autowiredprivate KafkaProducer producer;/*** 下单** @param kafkaRequest* @return null*/@RequestMapping("/test01")public void test01(KafkaRequest kafkaRequest){log.info("test01测试成功!topic:{};message:{}",kafkaRequest.getTopic(), kafkaRequest.getMessage());producer.send(kafkaRequest.getTopic(), kafkaRequest.getMessage());}@RequestMapping("/test02")public void test02(KafkaRequest kafkaRequest){log.info("test02测试成功!topic:{};message:{}",topic, temp);producer.send(topic, String.valueOf(temp));temp++;}
}
kafka启动方式
./kafka-server-start.sh ../config/server.properties
2.yml完整配置
spring:kafka:# 基础配置(必填项)bootstrap-servers: localhost:9092 # Kafka集群地址,多节点用逗号分隔 client-id: spring-boot-app # 客户端标识(日志追踪用)# 生产者配置 producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer # 键序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer # 值序列化器acks: all # 消息确认机制:all表示所有副本确认(最高可靠性)retries: 5 # 发送失败重试次数(需配合幂等性使用)batch-size: 16384 # 批量发送缓冲区大小(单位:字节)linger-ms: 50 # 发送延迟等待时间(毫秒,提高吞吐量)buffer-memory: 33554432 # 生产者内存缓冲区大小(默认32MB)compression-type: snappy # 消息压缩算法(可选gzip/lz4/zstd)transaction-id-prefix: tx- # 开启事务时需配置前缀(需配合@Transactional)# 消费者配置 consumer:group-id: app-consumer-group # 消费者组ID(同一组共享分区)auto-offset-reset: earliest # 无Offset时策略:earliest(从头)/latest(最新)enable-auto-commit: false # 关闭自动提交Offset(推荐手动提交)key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 500 # 单次poll最大消息数(避免OOM)fetch-max-wait-ms: 500 # 消费者等待broker返回数据的最长时间isolation-level: read_committed # 事务消息隔离级别(read_committed/read_uncommitted)# 监听器配置(高级优化)listener:type: single # 监听器类型:single(单条)/batch(批量)ack-mode: manual # Offset提交模式:manual(手动)/batch(批量提交)concurrency: 3 # 消费者线程数(建议等于分区数)poll-timeout: 3000 # poll方法超时时间(毫秒)# 消息重试与死信队列(容错机制)retry:topic:attempts: 3 # 最大重试次数initial-interval: 1000 # 初始重试间隔(毫秒)multiplier: 2.0 # 重试间隔倍数(指数退避)dead-letter-topic: dlq-${topic} # 死信队列命名规则(自动创建)# 安全协议(企业级场景)properties:security.protocol: SASL_PLAINTEXT # 安全协议(如PLAINTEXT/SASL_SSL)sasl.mechanism: PLAIN # SASL认证机制ssl.truststore.location: /path/to/truststore.jks# 自定义业务配置(非Kafka标准参数)app:kafka:topics:input-topic: user-events # 业务输入Topicoutput-topic: processed-events # 业务输出Topic
3.关键配置注释说明
1. 生产者优化参数
| 参数 | 说明 | 推荐值 |
|---|---|---|
acks=all | 确保所有ISR副本写入成功,防止数据丢失 | 高可靠性场景必选 |
compression-type=snappy | 减少网络带宽占用,提升吞吐量 | 消息体>1KB时启用 |
transaction-id-prefix | 支持跨分区原子性写入(需配合@Transactional注解) | 金融交易类业务必配 |
2. 消费者可靠性配置
| 参数 | 说明 | 注意事项 |
|---|---|---|
enable-auto-commit=false | 避免消息处理失败但Offset已提交导致数据丢失 | 需手动调用ack.acknowledge() |
isolation-level=read_committed | 只消费已提交的事务消息 | 需与生产者事务配置联动 |
3. 监听器高级特性
| 参数 | 使用场景 | 示例 |
|---|---|---|
type=batch | 批量消费(提升吞吐量) | 适用于日志处理等实时性要求低的场景 |
concurrency=3 | 并发消费者数 | 需与Topic分区数一致,避免资源浪费 |
4. 安全认证配置
spring:kafka:properties:security.protocol: SASL_SSLsasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="secret";
- 企业级必配:生产环境需启用SSL加密+SASL认证
4.配置验证方法
- 启动检查:添加
@ConfigurationProperties(prefix = "spring.kafka")绑定配置到Bean,通过单元测试验证注入值 - 日志监控:开启DEBUG日志观察生产者/消费者连接状态
logging:level:org.springframework.kafka: DEBUG
- AdminClient 工具:通过编程方式检查Topic元数据
@Autowired
private KafkaAdminClient adminClient;public void checkTopic() {Map<String, TopicDescription> topics = adminClient.describeTopics("user-events");topics.values().forEach(topic -> System.out.println(topic));
}
5.不同场景配置模板
场景1:高吞吐日志收集
producer:compression-type: lz4batch-size: 65536linger-ms: 100
consumer:auto-offset-reset: latestenable-auto-commit: true # 允许少量数据丢失以换取性能
场景2:金融级事务消息
producer:acks: allretries: 10transaction-id-prefix: fin-tx-
consumer:isolation-level: read_committedenable-auto-commit: false
场景3:跨数据中心同步
spring:kafka:bootstrap-servers: kafka-dc1:9092,kafka-dc2:9092properties:client.dns.lookup: use_all_dns_ips # 支持多IP解析reconnect.backoff.ms: 1000 # 断线重连策略
5.高级配置
1.事务支持
// 配置事务管理器
@Bean
public KafkaTransactionManager<String, String> transactionManager(ProducerFactory<String, String> producerFactory) {return new KafkaTransactionManager<>(producerFactory);
}// 使用事务发送
@Transactional
public void sendWithTransaction() {kafkaTemplate.send("topic1", "msg1");kafkaTemplate.send("topic2", "msg2");
}
2.消息重试与死信队列
spring:kafka:listener:retry:max-attempts: 3backoff:initial-interval: 1000multiplier: 2.0dead-letter-topic: my-dlt-topic # 死信队列
相关文章:
003 SpringBoot集成Kafka操作
4.SpringBoot集成Kafka 文章目录 4.SpringBoot集成Kafka1.入门示例2.yml完整配置3.关键配置注释说明1. 生产者优化参数2. 消费者可靠性配置3. 监听器高级特性4. 安全认证配置 4.配置验证方法5.不同场景配置模板场景1:高吞吐日志收集场景2:金融级事务消息…...
Android SystemUI开发(一)
frameworks/base/packages/SystemUI/src/com/android/systemui/SystemUI.java frameworks/base/packages/SystemUI/src/com/android/systemui/SystemUIService.java 关键文件 SystemUI 关键服务 简介 Dependency.class:处理系统依赖关系,提供资源或服…...
C#贪心算法
贪心算法:生活与代码中的 “最优选择大师” 在生活里,我们常常面临各种选择,都希望能做出最有利的决策。比如在超市大促销时,面对琳琅满目的商品,你总想用有限的预算买到价值最高的东西。贪心算法,就像是一…...
Vue程序下载
Vue是一个基于JavaScript(JS)实现的框架,想要使用它,就得先拿到Vue的js文件 Vue官网 Vue2:Vue.js Vue3:Vue.js - 渐进式 JavaScript 框架 | Vue.js 下载并安装vue.js 第一步:打开Vue2官网&a…...
【UCB CS 61B SP24】Lecture 17 - Data Structures 3: B-Trees学习笔记
本文以 2-3-4 树详细讲解了 B 树的概念,逐步分析其操作,并用 Java 实现了标准的 B 树。 1. 2-3 & 2-3-4 Trees 上一节课中讲到的二叉搜索树当数据是随机顺序插入的时候能够使得树变得比较茂密,如下图右侧所示,时间复杂度也就…...
机器学习决策树
一、香农公式 熵: 信息增益: 信息增益信息熵-条件熵 前者是初始信息熵大小,后者是因为条件加入后带来的确定性增加 信息增益表示得知特征X的信息而使得类Y的信息的不确定性减少的程度 信息增益越大说明影响越大 二、代码 ""&…...
Spring Boot + MyBatis 实现 RESTful API 的完整流程
后端开发:Spring Boot 快速开发实战 引言 在现代后端开发中,Spring Boot 因其轻量级、快速开发的特性而备受开发者青睐。本文将带你从零开始,使用 Spring Boot MyBatis 实现一个完整的 RESTful API,并深入探讨如何优雅地处理异…...
通过 ANSYS Discovery 进行 CFD 分析,增强工程设计
概括 工程师使用计算流体动力学 (CFD) 分析来研究和优化各种应用中的流体流动和传热分析。ANSYS Discovery 是一个用户友好的软件平台,使工程师能够轻松设置和解决 CFD 模型,并能够通知设计修改 在这篇博文中,我们将重点介绍在 Ansys Disc…...
家用可燃气体探测器——家庭燃气安全的坚实防线
随着社会的发展和变迁,天然气为我们的生活带来了诸多便利,无论是烹饪美食,还是温暖取暖,都离不开它的支持。然而,燃气安全隐患如影随形,一旦发生泄漏,可能引发爆炸、火灾等严重事故,…...
ListControl双击实现可编辑
为Edit Control控件添加丢失输入焦点事件,可见设为false 为List Control控件添加双击事件 控件和成员变量之间交换数据 CListCtrl ListPrint1; //列表输出 CEdit...
ave-form.vue 组件中 如何将产品名称发送给后端 ?
如何将产品名称发送给后端。 在这段代码中,产品名称(productName)的处理和发送主要发生在 save() 方法中。让我逐步分析: 产品ID的选择: <w-form-selectv-model"form.productId"label"涉及产品&q…...
DeepSeek行业应用实践报告-智灵动力【112页PPT全】
DeepSeek(深度搜索)近期引发广泛关注并成为众多企业/开发者争相接入的现象,主要源于其在技术突破、市场需求适配性及生态建设等方面的综合优势。以下是关键原因分析: 一、技术核心优势 开源与低成本 DeepSeek基于开源架构…...
【Markdown 语法简洁讲解】
Markdown 语法简洁语法讲解 什么是 Markdown1. 标题2. 列表3.文本样式4. 链接与图片5. 代码6. 表格7. 分割线8. 流程图9. 数学公式10. 快捷键11. 字体、字号与颜色 什么是 Markdown Markdown 是一种轻量级标记语言,通过简单的符号实现排版格式化,专注于…...
250301-OpenWebUI配置DeepSeek-火山方舟+硅基流动+联网搜索+推理显示
A. 最终效果 B. 火山方舟配置(一定要点击添加) C. 硅基流动配置(最好要点击添加,否则会自动弹出所有模型) D. 联网搜索配置 E. 推理过程显示 默认是没有下面的推理过程的显示的 设置步骤: 在Functions函…...
【3天快速入门WPF】12-MVVM
目录 1. 什么是MVVM2. 实现简单MVVM2.1. Part 12.2. Part 21. 什么是MVVM MVVM 是 Model-View-ViewModel 的缩写,是一种用于构建用户界面的设计模式,是一种简化用户界面的事件驱动编程方式。 MVVM 的目标是实现用户界面和业务逻辑之间的彻底分离,以便更好地管理和维护应用…...
查找Excel包含关键字的行(の几种简单快速方法)
需求:数据在后缀为xlsx的Excel的sheet1中且量比较大,比如几十万行几百列;想查找一个关键字所在的行,比如"全网首发"; 情况①知道关键字在哪一列 情况②不确定在哪一列,很多列相似又不同,本文演…...
性能测试分析和调优
步骤 性能调优的步骤 性能调优的步骤: 1.确定问题:根据性能测试的结果来分析确定bug。–测试人员职责 2.分析原因:分析问题产生的原因。----开发人员职责 3.给出解决方案:可以是修改软件配置、增加硬件资源配置、修改代码等----…...
(视频教程)Compass代谢分析详细流程及python版-R语言版下游分析和可视化
不想做太多的前情解说了,有点累了,做了很久的内容,包括整个分析,从软件安装和报错解决到后期下游python版-R语言版下游分析和可视化!单细胞代谢分析我们写过很多了,唯独少了最“高级”的compass,…...
【SQL】MySQL中的字符串处理函数:concat 函数拼接字符串,COALESCE函数处理NULL字符串
MySQL中的字符串处理函数:concat 函数 一、concat ()函数 1.1、基本语法1.2、示例1.3、特殊用途 二、COALESCE()函数 2.1、基本语法2.2、示例2.3、用途 三、进阶练习 3.1 条件和 SQL 语句3.2、解释 一、concat &…...
c++中深拷贝和浅拷贝的联系和区别
在 C 编程里,深拷贝和浅拷贝是两种不同的对象复制方式,它们在实现方式、资源管理和适用场景等方面存在显著差异。下面为你详细介绍它们的区别。 1. 基本概念 浅拷贝:浅拷贝仅仅复制对象的成员变量值。对于基本数据类型(如 int、d…...
[特殊字符] 智能合约中的数据是如何在区块链中保持一致的?
🧠 智能合约中的数据是如何在区块链中保持一致的? 为什么所有区块链节点都能得出相同结果?合约调用这么复杂,状态真能保持一致吗?本篇带你从底层视角理解“状态一致性”的真相。 一、智能合约的数据存储在哪里…...
idea大量爆红问题解决
问题描述 在学习和工作中,idea是程序员不可缺少的一个工具,但是突然在有些时候就会出现大量爆红的问题,发现无法跳转,无论是关机重启或者是替换root都无法解决 就是如上所展示的问题,但是程序依然可以启动。 问题解决…...
从零开始打造 OpenSTLinux 6.6 Yocto 系统(基于STM32CubeMX)(九)
设备树移植 和uboot设备树修改的内容同步到kernel将设备树stm32mp157d-stm32mp157daa1-mx.dts复制到内核源码目录下 源码修改及编译 修改arch/arm/boot/dts/st/Makefile,新增设备树编译 stm32mp157f-ev1-m4-examples.dtb \stm32mp157d-stm32mp157daa1-mx.dtb修改…...
vue3 定时器-定义全局方法 vue+ts
1.创建ts文件 路径:src/utils/timer.ts 完整代码: import { onUnmounted } from vuetype TimerCallback (...args: any[]) > voidexport function useGlobalTimer() {const timers: Map<number, NodeJS.Timeout> new Map()// 创建定时器con…...
云原生玩法三问:构建自定义开发环境
云原生玩法三问:构建自定义开发环境 引言 临时运维一个古董项目,无文档,无环境,无交接人,俗称三无。 运行设备的环境老,本地环境版本高,ssh不过去。正好最近对 腾讯出品的云原生 cnb 感兴趣&…...
【从零学习JVM|第三篇】类的生命周期(高频面试题)
前言: 在Java编程中,类的生命周期是指类从被加载到内存中开始,到被卸载出内存为止的整个过程。了解类的生命周期对于理解Java程序的运行机制以及性能优化非常重要。本文会深入探寻类的生命周期,让读者对此有深刻印象。 目录 …...
【网络安全】开源系统getshell漏洞挖掘
审计过程: 在入口文件admin/index.php中: 用户可以通过m,c,a等参数控制加载的文件和方法,在app/system/entrance.php中存在重点代码: 当M_TYPE system并且M_MODULE include时,会设置常量PATH_OWN_FILE为PATH_APP.M_T…...
MacOS下Homebrew国内镜像加速指南(2025最新国内镜像加速)
macos brew国内镜像加速方法 brew install 加速formula.jws.json下载慢加速 🍺 最新版brew安装慢到怀疑人生?别怕,教你轻松起飞! 最近Homebrew更新至最新版,每次执行 brew 命令时都会自动从官方地址 https://formulae.…...
根目录0xa0属性对应的Ntfs!_SCB中的FileObject是什么时候被建立的----NTFS源代码分析--重要
根目录0xa0属性对应的Ntfs!_SCB中的FileObject是什么时候被建立的 第一部分: 0: kd> g Breakpoint 9 hit Ntfs!ReadIndexBuffer: f7173886 55 push ebp 0: kd> kc # 00 Ntfs!ReadIndexBuffer 01 Ntfs!FindFirstIndexEntry 02 Ntfs!NtfsUpda…...
MySQL:分区的基本使用
目录 一、什么是分区二、有什么作用三、分类四、创建分区五、删除分区 一、什么是分区 MySQL 分区(Partitioning)是一种将单张表的数据逻辑上拆分成多个物理部分的技术。这些物理部分(分区)可以独立存储、管理和优化,…...
