003 SpringBoot集成Kafka操作
4.SpringBoot集成Kafka
文章目录
- 4.SpringBoot集成Kafka
- 1.入门示例
- 2.yml完整配置
- 3.关键配置注释说明
- 1. 生产者优化参数
- 2. 消费者可靠性配置
- 3. 监听器高级特性
- 4. 安全认证配置
- 4.配置验证方法
- 5.不同场景配置模板
- 场景1:高吞吐日志收集
- 场景2:金融级事务消息
- 场景3:跨数据中心同步
- 5.高级配置
- 1.事务支持
- 2.消息重试与死信队列
来源参考的deepseek,如有侵权联系立删
1.入门示例
1.pom依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
2.KafkaProducer消息生产者配置
@Component
@Slf4j
public class KafkaProducer {private HashMap map=new HashMap<>();@Autowiredprivate KafkaTemplate<Integer,String> kafkaTemplate;public void send(String topic,String msg){log.info("开始发送消息,topic:{};message:{}",topic,msg);ListenableFuture<SendResult<Integer,String>> send=kafkaTemplate.send(topic, msg);//消息确认机制send.addCallback(new ListenableFutureCallback<SendResult<Integer,String>>(){@Overridepublic void onSuccess(SendResult<Integer, String> result) {log.info("消息发送成功,topic:{};message:{}",topic,msg);}@Overridepublic void onFailure(Throwable ex) {//落库操作map.put(topic,msg);}});}
}
springboot3.x写法
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import lombok.RequiredArgsConstructor;@Service
@RequiredArgsConstructor
public class KafkaProducerService {private final KafkaTemplate<String, String> kafkaTemplate;// 同步发送(阻塞等待确认)public void sendMessageSync(String topic, String key, String value) {kafkaTemplate.send(topic, key, value).whenComplete((result, ex) -> {if (ex == null) {System.out.printf("发送成功:topic=%s, partition=%d, offset=%d%n",result.getRecordMetadata().topic(),result.getRecordMetadata().partition(),result.getRecordMetadata().offset());} else {System.err.println("发送失败:" + ex.getMessage());}});}// 异步发送(默认方式)public void sendMessageAsync(String topic, String message) {kafkaTemplate.send(topic, message);}
}
- Spring Boot 2.x:
send()
返回ListenableFuture<SendResult>
,支持addCallback()
回调。 - Spring Boot 3.x:
send()
返回CompletableFuture<SendResult>
,弃用ListenableFuture
,因此需要使用CompletableFuture
的 API(如whenComplete
)。
3.KafkaConsumer消息消费
@Component
@Slf4j
public class KafkaConsumer {private List<String> exist=new ArrayList<>();@KafkaListener(topics = {"lx"},groupId = "lx")public void consumer(ConsumerRecord<Integer,String> record){if (exist.contains(record.value())){log.error("不满足幂等校验!!!");}log.info("消息消费成功,topic:{},message:{}", record.topic(), record.value());exist.add(record.value());}
}
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;@Component
public class KafkaConsumerService {// 单个消息消费(手动提交偏移量)@KafkaListener(topics = "test-topic", groupId = "spring-group")public void listenMessage(String message, Acknowledgment ack) {System.out.println("收到消息:" + message);ack.acknowledge(); // 手动提交}// 批量消费(需配置 listener.type=batch)@KafkaListener(topics = "batch-topic", groupId = "spring-group")public void listenBatch(List<String> messages, Acknowledgment ack) {messages.forEach(msg -> System.out.println("批量消息:" + msg));ack.acknowledge();}
}
4.yml配置文件
生产者配置
#kafka配置
spring:kafka:#kafka集群地址# bootstraps-server: 192.168.25.100:9092bootstrap-servers: 47.122.26.22:9092producer:#批量发送的数据量大小batch-size: 1#可用发送数量的最大缓存buffer-memory: 33554432#key序列化器key-serializer: org.apache.kafka.common.serialization.StringSerializer#value序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer#达到多少时间后,会发送properties:linger.ms: 1# 禁止生产者触发 Topic 创建请求allow.auto.create.topics: false#代表集群中从节点都持久化后才认为发送成功acks: -1
消费者配置
spring:kafka:#kafka集群地址bootstraps-server: 192.168.25.100:9092consumer:enable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:session.timeout.ms: 15000# 禁用生产者触发 Topic 元数据请求时自动创建allow.auto.create.topics: falsegroup-id: testauto-offset-reset: earliestlistener:ack-mode: manual_immediate # 精准控制offset提交concurrency: 3 # 并发消费者数type: batch
5.实体类
@Data
public class KafkaRequest {/*** 主题*/private String topic;/*** 消息*/private String message;
}
6.消息发送
@RestController
@Slf4j
public class KafkaController {private final String topic="lx";private int temp=1;@Autowiredprivate KafkaProducer producer;/*** 下单** @param kafkaRequest* @return null*/@RequestMapping("/test01")public void test01(KafkaRequest kafkaRequest){log.info("test01测试成功!topic:{};message:{}",kafkaRequest.getTopic(), kafkaRequest.getMessage());producer.send(kafkaRequest.getTopic(), kafkaRequest.getMessage());}@RequestMapping("/test02")public void test02(KafkaRequest kafkaRequest){log.info("test02测试成功!topic:{};message:{}",topic, temp);producer.send(topic, String.valueOf(temp));temp++;}
}
kafka启动方式
./kafka-server-start.sh ../config/server.properties
2.yml完整配置
spring:kafka:# 基础配置(必填项)bootstrap-servers: localhost:9092 # Kafka集群地址,多节点用逗号分隔 client-id: spring-boot-app # 客户端标识(日志追踪用)# 生产者配置 producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer # 键序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer # 值序列化器acks: all # 消息确认机制:all表示所有副本确认(最高可靠性)retries: 5 # 发送失败重试次数(需配合幂等性使用)batch-size: 16384 # 批量发送缓冲区大小(单位:字节)linger-ms: 50 # 发送延迟等待时间(毫秒,提高吞吐量)buffer-memory: 33554432 # 生产者内存缓冲区大小(默认32MB)compression-type: snappy # 消息压缩算法(可选gzip/lz4/zstd)transaction-id-prefix: tx- # 开启事务时需配置前缀(需配合@Transactional)# 消费者配置 consumer:group-id: app-consumer-group # 消费者组ID(同一组共享分区)auto-offset-reset: earliest # 无Offset时策略:earliest(从头)/latest(最新)enable-auto-commit: false # 关闭自动提交Offset(推荐手动提交)key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 500 # 单次poll最大消息数(避免OOM)fetch-max-wait-ms: 500 # 消费者等待broker返回数据的最长时间isolation-level: read_committed # 事务消息隔离级别(read_committed/read_uncommitted)# 监听器配置(高级优化)listener:type: single # 监听器类型:single(单条)/batch(批量)ack-mode: manual # Offset提交模式:manual(手动)/batch(批量提交)concurrency: 3 # 消费者线程数(建议等于分区数)poll-timeout: 3000 # poll方法超时时间(毫秒)# 消息重试与死信队列(容错机制)retry:topic:attempts: 3 # 最大重试次数initial-interval: 1000 # 初始重试间隔(毫秒)multiplier: 2.0 # 重试间隔倍数(指数退避)dead-letter-topic: dlq-${topic} # 死信队列命名规则(自动创建)# 安全协议(企业级场景)properties:security.protocol: SASL_PLAINTEXT # 安全协议(如PLAINTEXT/SASL_SSL)sasl.mechanism: PLAIN # SASL认证机制ssl.truststore.location: /path/to/truststore.jks# 自定义业务配置(非Kafka标准参数)app:kafka:topics:input-topic: user-events # 业务输入Topicoutput-topic: processed-events # 业务输出Topic
3.关键配置注释说明
1. 生产者优化参数
参数 | 说明 | 推荐值 |
---|---|---|
acks=all | 确保所有ISR副本写入成功,防止数据丢失 | 高可靠性场景必选 |
compression-type=snappy | 减少网络带宽占用,提升吞吐量 | 消息体>1KB时启用 |
transaction-id-prefix | 支持跨分区原子性写入(需配合@Transactional注解) | 金融交易类业务必配 |
2. 消费者可靠性配置
参数 | 说明 | 注意事项 |
---|---|---|
enable-auto-commit=false | 避免消息处理失败但Offset已提交导致数据丢失 | 需手动调用ack.acknowledge() |
isolation-level=read_committed | 只消费已提交的事务消息 | 需与生产者事务配置联动 |
3. 监听器高级特性
参数 | 使用场景 | 示例 |
---|---|---|
type=batch | 批量消费(提升吞吐量) | 适用于日志处理等实时性要求低的场景 |
concurrency=3 | 并发消费者数 | 需与Topic分区数一致,避免资源浪费 |
4. 安全认证配置
spring:kafka:properties:security.protocol: SASL_SSLsasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="secret";
- 企业级必配:生产环境需启用SSL加密+SASL认证
4.配置验证方法
- 启动检查:添加
@ConfigurationProperties(prefix = "spring.kafka")
绑定配置到Bean,通过单元测试验证注入值 - 日志监控:开启DEBUG日志观察生产者/消费者连接状态
logging:level:org.springframework.kafka: DEBUG
- AdminClient 工具:通过编程方式检查Topic元数据
@Autowired
private KafkaAdminClient adminClient;public void checkTopic() {Map<String, TopicDescription> topics = adminClient.describeTopics("user-events");topics.values().forEach(topic -> System.out.println(topic));
}
5.不同场景配置模板
场景1:高吞吐日志收集
producer:compression-type: lz4batch-size: 65536linger-ms: 100
consumer:auto-offset-reset: latestenable-auto-commit: true # 允许少量数据丢失以换取性能
场景2:金融级事务消息
producer:acks: allretries: 10transaction-id-prefix: fin-tx-
consumer:isolation-level: read_committedenable-auto-commit: false
场景3:跨数据中心同步
spring:kafka:bootstrap-servers: kafka-dc1:9092,kafka-dc2:9092properties:client.dns.lookup: use_all_dns_ips # 支持多IP解析reconnect.backoff.ms: 1000 # 断线重连策略
5.高级配置
1.事务支持
// 配置事务管理器
@Bean
public KafkaTransactionManager<String, String> transactionManager(ProducerFactory<String, String> producerFactory) {return new KafkaTransactionManager<>(producerFactory);
}// 使用事务发送
@Transactional
public void sendWithTransaction() {kafkaTemplate.send("topic1", "msg1");kafkaTemplate.send("topic2", "msg2");
}
2.消息重试与死信队列
spring:kafka:listener:retry:max-attempts: 3backoff:initial-interval: 1000multiplier: 2.0dead-letter-topic: my-dlt-topic # 死信队列
相关文章:
003 SpringBoot集成Kafka操作
4.SpringBoot集成Kafka 文章目录 4.SpringBoot集成Kafka1.入门示例2.yml完整配置3.关键配置注释说明1. 生产者优化参数2. 消费者可靠性配置3. 监听器高级特性4. 安全认证配置 4.配置验证方法5.不同场景配置模板场景1:高吞吐日志收集场景2:金融级事务消息…...

Android SystemUI开发(一)
frameworks/base/packages/SystemUI/src/com/android/systemui/SystemUI.java frameworks/base/packages/SystemUI/src/com/android/systemui/SystemUIService.java 关键文件 SystemUI 关键服务 简介 Dependency.class:处理系统依赖关系,提供资源或服…...

C#贪心算法
贪心算法:生活与代码中的 “最优选择大师” 在生活里,我们常常面临各种选择,都希望能做出最有利的决策。比如在超市大促销时,面对琳琅满目的商品,你总想用有限的预算买到价值最高的东西。贪心算法,就像是一…...

Vue程序下载
Vue是一个基于JavaScript(JS)实现的框架,想要使用它,就得先拿到Vue的js文件 Vue官网 Vue2:Vue.js Vue3:Vue.js - 渐进式 JavaScript 框架 | Vue.js 下载并安装vue.js 第一步:打开Vue2官网&a…...

【UCB CS 61B SP24】Lecture 17 - Data Structures 3: B-Trees学习笔记
本文以 2-3-4 树详细讲解了 B 树的概念,逐步分析其操作,并用 Java 实现了标准的 B 树。 1. 2-3 & 2-3-4 Trees 上一节课中讲到的二叉搜索树当数据是随机顺序插入的时候能够使得树变得比较茂密,如下图右侧所示,时间复杂度也就…...

机器学习决策树
一、香农公式 熵: 信息增益: 信息增益信息熵-条件熵 前者是初始信息熵大小,后者是因为条件加入后带来的确定性增加 信息增益表示得知特征X的信息而使得类Y的信息的不确定性减少的程度 信息增益越大说明影响越大 二、代码 ""&…...
Spring Boot + MyBatis 实现 RESTful API 的完整流程
后端开发:Spring Boot 快速开发实战 引言 在现代后端开发中,Spring Boot 因其轻量级、快速开发的特性而备受开发者青睐。本文将带你从零开始,使用 Spring Boot MyBatis 实现一个完整的 RESTful API,并深入探讨如何优雅地处理异…...
通过 ANSYS Discovery 进行 CFD 分析,增强工程设计
概括 工程师使用计算流体动力学 (CFD) 分析来研究和优化各种应用中的流体流动和传热分析。ANSYS Discovery 是一个用户友好的软件平台,使工程师能够轻松设置和解决 CFD 模型,并能够通知设计修改 在这篇博文中,我们将重点介绍在 Ansys Disc…...

家用可燃气体探测器——家庭燃气安全的坚实防线
随着社会的发展和变迁,天然气为我们的生活带来了诸多便利,无论是烹饪美食,还是温暖取暖,都离不开它的支持。然而,燃气安全隐患如影随形,一旦发生泄漏,可能引发爆炸、火灾等严重事故,…...

ListControl双击实现可编辑
为Edit Control控件添加丢失输入焦点事件,可见设为false 为List Control控件添加双击事件 控件和成员变量之间交换数据 CListCtrl ListPrint1; //列表输出 CEdit...

ave-form.vue 组件中 如何将产品名称发送给后端 ?
如何将产品名称发送给后端。 在这段代码中,产品名称(productName)的处理和发送主要发生在 save() 方法中。让我逐步分析: 产品ID的选择: <w-form-selectv-model"form.productId"label"涉及产品&q…...

DeepSeek行业应用实践报告-智灵动力【112页PPT全】
DeepSeek(深度搜索)近期引发广泛关注并成为众多企业/开发者争相接入的现象,主要源于其在技术突破、市场需求适配性及生态建设等方面的综合优势。以下是关键原因分析: 一、技术核心优势 开源与低成本 DeepSeek基于开源架构…...

【Markdown 语法简洁讲解】
Markdown 语法简洁语法讲解 什么是 Markdown1. 标题2. 列表3.文本样式4. 链接与图片5. 代码6. 表格7. 分割线8. 流程图9. 数学公式10. 快捷键11. 字体、字号与颜色 什么是 Markdown Markdown 是一种轻量级标记语言,通过简单的符号实现排版格式化,专注于…...

250301-OpenWebUI配置DeepSeek-火山方舟+硅基流动+联网搜索+推理显示
A. 最终效果 B. 火山方舟配置(一定要点击添加) C. 硅基流动配置(最好要点击添加,否则会自动弹出所有模型) D. 联网搜索配置 E. 推理过程显示 默认是没有下面的推理过程的显示的 设置步骤: 在Functions函…...
【3天快速入门WPF】12-MVVM
目录 1. 什么是MVVM2. 实现简单MVVM2.1. Part 12.2. Part 21. 什么是MVVM MVVM 是 Model-View-ViewModel 的缩写,是一种用于构建用户界面的设计模式,是一种简化用户界面的事件驱动编程方式。 MVVM 的目标是实现用户界面和业务逻辑之间的彻底分离,以便更好地管理和维护应用…...

查找Excel包含关键字的行(の几种简单快速方法)
需求:数据在后缀为xlsx的Excel的sheet1中且量比较大,比如几十万行几百列;想查找一个关键字所在的行,比如"全网首发"; 情况①知道关键字在哪一列 情况②不确定在哪一列,很多列相似又不同,本文演…...

性能测试分析和调优
步骤 性能调优的步骤 性能调优的步骤: 1.确定问题:根据性能测试的结果来分析确定bug。–测试人员职责 2.分析原因:分析问题产生的原因。----开发人员职责 3.给出解决方案:可以是修改软件配置、增加硬件资源配置、修改代码等----…...

(视频教程)Compass代谢分析详细流程及python版-R语言版下游分析和可视化
不想做太多的前情解说了,有点累了,做了很久的内容,包括整个分析,从软件安装和报错解决到后期下游python版-R语言版下游分析和可视化!单细胞代谢分析我们写过很多了,唯独少了最“高级”的compass,…...

【SQL】MySQL中的字符串处理函数:concat 函数拼接字符串,COALESCE函数处理NULL字符串
MySQL中的字符串处理函数:concat 函数 一、concat ()函数 1.1、基本语法1.2、示例1.3、特殊用途 二、COALESCE()函数 2.1、基本语法2.2、示例2.3、用途 三、进阶练习 3.1 条件和 SQL 语句3.2、解释 一、concat &…...
c++中深拷贝和浅拷贝的联系和区别
在 C 编程里,深拷贝和浅拷贝是两种不同的对象复制方式,它们在实现方式、资源管理和适用场景等方面存在显著差异。下面为你详细介绍它们的区别。 1. 基本概念 浅拷贝:浅拷贝仅仅复制对象的成员变量值。对于基本数据类型(如 int、d…...
在软件开发中正确使用MySQL日期时间类型的深度解析
在日常软件开发场景中,时间信息的存储是底层且核心的需求。从金融交易的精确记账时间、用户操作的行为日志,到供应链系统的物流节点时间戳,时间数据的准确性直接决定业务逻辑的可靠性。MySQL作为主流关系型数据库,其日期时间类型的…...
第25节 Node.js 断言测试
Node.js的assert模块主要用于编写程序的单元测试时使用,通过断言可以提早发现和排查出错误。 稳定性: 5 - 锁定 这个模块可用于应用的单元测试,通过 require(assert) 可以使用这个模块。 assert.fail(actual, expected, message, operator) 使用参数…...

Unity | AmplifyShaderEditor插件基础(第七集:平面波动shader)
目录 一、👋🏻前言 二、😈sinx波动的基本原理 三、😈波动起来 1.sinx节点介绍 2.vertexPosition 3.集成Vector3 a.节点Append b.连起来 4.波动起来 a.波动的原理 b.时间节点 c.sinx的处理 四、🌊波动优化…...

三分算法与DeepSeek辅助证明是单峰函数
前置 单峰函数有唯一的最大值,最大值左侧的数值严格单调递增,最大值右侧的数值严格单调递减。 单谷函数有唯一的最小值,最小值左侧的数值严格单调递减,最小值右侧的数值严格单调递增。 三分的本质 三分和二分一样都是通过不断缩…...
Spring AI Chat Memory 实战指南:Local 与 JDBC 存储集成
一个面向 Java 开发者的 Sring-Ai 示例工程项目,该项目是一个 Spring AI 快速入门的样例工程项目,旨在通过一些小的案例展示 Spring AI 框架的核心功能和使用方法。 项目采用模块化设计,每个模块都专注于特定的功能领域,便于学习和…...

倒装芯片凸点成型工艺
UBM(Under Bump Metallization)与Bump(焊球)形成工艺流程。我们可以将整张流程图分为三大阶段来理解: 🔧 一、UBM(Under Bump Metallization)工艺流程(黄色区域ÿ…...

Linux基础开发工具——vim工具
文章目录 vim工具什么是vimvim的多模式和使用vim的基础模式vim的三种基础模式三种模式的初步了解 常用模式的详细讲解插入模式命令模式模式转化光标的移动文本的编辑 底行模式替换模式视图模式总结 使用vim的小技巧vim的配置(了解) vim工具 本文章仍然是继续讲解Linux系统下的…...
2025.6.9总结(利与弊)
凡事都有两面性。在大厂上班也不例外。今天找开发定位问题,从一个接口人不断溯源到另一个 接口人。有时候,不知道是谁的责任填。将工作内容分的很细,每个人负责其中的一小块。我清楚的意识到,自己就是个可以随时替换的螺丝钉&…...

未授权访问事件频发,我们应当如何应对?
在当下,数据已成为企业和组织的核心资产,是推动业务发展、决策制定以及创新的关键驱动力。然而,未授权访问这一隐匿的安全威胁,正如同高悬的达摩克利斯之剑,时刻威胁着数据的安全,一旦触发,便可…...
无需布线的革命:电力载波技术赋能楼宇自控系统-亚川科技
无需布线的革命:电力载波技术赋能楼宇自控系统 在楼宇自动化领域,传统控制系统依赖复杂的专用通信线路,不仅施工成本高昂,后期维护和扩展也极为不便。电力载波技术(PLC)的突破性应用,彻底改变了…...