003 SpringBoot集成Kafka操作
4.SpringBoot集成Kafka
文章目录
- 4.SpringBoot集成Kafka
- 1.入门示例
- 2.yml完整配置
- 3.关键配置注释说明
- 1. 生产者优化参数
- 2. 消费者可靠性配置
- 3. 监听器高级特性
- 4. 安全认证配置
- 4.配置验证方法
- 5.不同场景配置模板
- 场景1:高吞吐日志收集
- 场景2:金融级事务消息
- 场景3:跨数据中心同步
- 5.高级配置
- 1.事务支持
- 2.消息重试与死信队列
来源参考的deepseek,如有侵权联系立删
1.入门示例
1.pom依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
2.KafkaProducer消息生产者配置
@Component
@Slf4j
public class KafkaProducer {private HashMap map=new HashMap<>();@Autowiredprivate KafkaTemplate<Integer,String> kafkaTemplate;public void send(String topic,String msg){log.info("开始发送消息,topic:{};message:{}",topic,msg);ListenableFuture<SendResult<Integer,String>> send=kafkaTemplate.send(topic, msg);//消息确认机制send.addCallback(new ListenableFutureCallback<SendResult<Integer,String>>(){@Overridepublic void onSuccess(SendResult<Integer, String> result) {log.info("消息发送成功,topic:{};message:{}",topic,msg);}@Overridepublic void onFailure(Throwable ex) {//落库操作map.put(topic,msg);}});}
}
springboot3.x写法
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import lombok.RequiredArgsConstructor;@Service
@RequiredArgsConstructor
public class KafkaProducerService {private final KafkaTemplate<String, String> kafkaTemplate;// 同步发送(阻塞等待确认)public void sendMessageSync(String topic, String key, String value) {kafkaTemplate.send(topic, key, value).whenComplete((result, ex) -> {if (ex == null) {System.out.printf("发送成功:topic=%s, partition=%d, offset=%d%n",result.getRecordMetadata().topic(),result.getRecordMetadata().partition(),result.getRecordMetadata().offset());} else {System.err.println("发送失败:" + ex.getMessage());}});}// 异步发送(默认方式)public void sendMessageAsync(String topic, String message) {kafkaTemplate.send(topic, message);}
}
- Spring Boot 2.x:
send()
返回ListenableFuture<SendResult>
,支持addCallback()
回调。 - Spring Boot 3.x:
send()
返回CompletableFuture<SendResult>
,弃用ListenableFuture
,因此需要使用CompletableFuture
的 API(如whenComplete
)。
3.KafkaConsumer消息消费
@Component
@Slf4j
public class KafkaConsumer {private List<String> exist=new ArrayList<>();@KafkaListener(topics = {"lx"},groupId = "lx")public void consumer(ConsumerRecord<Integer,String> record){if (exist.contains(record.value())){log.error("不满足幂等校验!!!");}log.info("消息消费成功,topic:{},message:{}", record.topic(), record.value());exist.add(record.value());}
}
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;@Component
public class KafkaConsumerService {// 单个消息消费(手动提交偏移量)@KafkaListener(topics = "test-topic", groupId = "spring-group")public void listenMessage(String message, Acknowledgment ack) {System.out.println("收到消息:" + message);ack.acknowledge(); // 手动提交}// 批量消费(需配置 listener.type=batch)@KafkaListener(topics = "batch-topic", groupId = "spring-group")public void listenBatch(List<String> messages, Acknowledgment ack) {messages.forEach(msg -> System.out.println("批量消息:" + msg));ack.acknowledge();}
}
4.yml配置文件
生产者配置
#kafka配置
spring:kafka:#kafka集群地址# bootstraps-server: 192.168.25.100:9092bootstrap-servers: 47.122.26.22:9092producer:#批量发送的数据量大小batch-size: 1#可用发送数量的最大缓存buffer-memory: 33554432#key序列化器key-serializer: org.apache.kafka.common.serialization.StringSerializer#value序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer#达到多少时间后,会发送properties:linger.ms: 1# 禁止生产者触发 Topic 创建请求allow.auto.create.topics: false#代表集群中从节点都持久化后才认为发送成功acks: -1
消费者配置
spring:kafka:#kafka集群地址bootstraps-server: 192.168.25.100:9092consumer:enable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:session.timeout.ms: 15000# 禁用生产者触发 Topic 元数据请求时自动创建allow.auto.create.topics: falsegroup-id: testauto-offset-reset: earliestlistener:ack-mode: manual_immediate # 精准控制offset提交concurrency: 3 # 并发消费者数type: batch
5.实体类
@Data
public class KafkaRequest {/*** 主题*/private String topic;/*** 消息*/private String message;
}
6.消息发送
@RestController
@Slf4j
public class KafkaController {private final String topic="lx";private int temp=1;@Autowiredprivate KafkaProducer producer;/*** 下单** @param kafkaRequest* @return null*/@RequestMapping("/test01")public void test01(KafkaRequest kafkaRequest){log.info("test01测试成功!topic:{};message:{}",kafkaRequest.getTopic(), kafkaRequest.getMessage());producer.send(kafkaRequest.getTopic(), kafkaRequest.getMessage());}@RequestMapping("/test02")public void test02(KafkaRequest kafkaRequest){log.info("test02测试成功!topic:{};message:{}",topic, temp);producer.send(topic, String.valueOf(temp));temp++;}
}
kafka启动方式
./kafka-server-start.sh ../config/server.properties
2.yml完整配置
spring:kafka:# 基础配置(必填项)bootstrap-servers: localhost:9092 # Kafka集群地址,多节点用逗号分隔 client-id: spring-boot-app # 客户端标识(日志追踪用)# 生产者配置 producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer # 键序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer # 值序列化器acks: all # 消息确认机制:all表示所有副本确认(最高可靠性)retries: 5 # 发送失败重试次数(需配合幂等性使用)batch-size: 16384 # 批量发送缓冲区大小(单位:字节)linger-ms: 50 # 发送延迟等待时间(毫秒,提高吞吐量)buffer-memory: 33554432 # 生产者内存缓冲区大小(默认32MB)compression-type: snappy # 消息压缩算法(可选gzip/lz4/zstd)transaction-id-prefix: tx- # 开启事务时需配置前缀(需配合@Transactional)# 消费者配置 consumer:group-id: app-consumer-group # 消费者组ID(同一组共享分区)auto-offset-reset: earliest # 无Offset时策略:earliest(从头)/latest(最新)enable-auto-commit: false # 关闭自动提交Offset(推荐手动提交)key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 500 # 单次poll最大消息数(避免OOM)fetch-max-wait-ms: 500 # 消费者等待broker返回数据的最长时间isolation-level: read_committed # 事务消息隔离级别(read_committed/read_uncommitted)# 监听器配置(高级优化)listener:type: single # 监听器类型:single(单条)/batch(批量)ack-mode: manual # Offset提交模式:manual(手动)/batch(批量提交)concurrency: 3 # 消费者线程数(建议等于分区数)poll-timeout: 3000 # poll方法超时时间(毫秒)# 消息重试与死信队列(容错机制)retry:topic:attempts: 3 # 最大重试次数initial-interval: 1000 # 初始重试间隔(毫秒)multiplier: 2.0 # 重试间隔倍数(指数退避)dead-letter-topic: dlq-${topic} # 死信队列命名规则(自动创建)# 安全协议(企业级场景)properties:security.protocol: SASL_PLAINTEXT # 安全协议(如PLAINTEXT/SASL_SSL)sasl.mechanism: PLAIN # SASL认证机制ssl.truststore.location: /path/to/truststore.jks# 自定义业务配置(非Kafka标准参数)app:kafka:topics:input-topic: user-events # 业务输入Topicoutput-topic: processed-events # 业务输出Topic
3.关键配置注释说明
1. 生产者优化参数
参数 | 说明 | 推荐值 |
---|---|---|
acks=all | 确保所有ISR副本写入成功,防止数据丢失 | 高可靠性场景必选 |
compression-type=snappy | 减少网络带宽占用,提升吞吐量 | 消息体>1KB时启用 |
transaction-id-prefix | 支持跨分区原子性写入(需配合@Transactional注解) | 金融交易类业务必配 |
2. 消费者可靠性配置
参数 | 说明 | 注意事项 |
---|---|---|
enable-auto-commit=false | 避免消息处理失败但Offset已提交导致数据丢失 | 需手动调用ack.acknowledge() |
isolation-level=read_committed | 只消费已提交的事务消息 | 需与生产者事务配置联动 |
3. 监听器高级特性
参数 | 使用场景 | 示例 |
---|---|---|
type=batch | 批量消费(提升吞吐量) | 适用于日志处理等实时性要求低的场景 |
concurrency=3 | 并发消费者数 | 需与Topic分区数一致,避免资源浪费 |
4. 安全认证配置
spring:kafka:properties:security.protocol: SASL_SSLsasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="secret";
- 企业级必配:生产环境需启用SSL加密+SASL认证
4.配置验证方法
- 启动检查:添加
@ConfigurationProperties(prefix = "spring.kafka")
绑定配置到Bean,通过单元测试验证注入值 - 日志监控:开启DEBUG日志观察生产者/消费者连接状态
logging:level:org.springframework.kafka: DEBUG
- AdminClient 工具:通过编程方式检查Topic元数据
@Autowired
private KafkaAdminClient adminClient;public void checkTopic() {Map<String, TopicDescription> topics = adminClient.describeTopics("user-events");topics.values().forEach(topic -> System.out.println(topic));
}
5.不同场景配置模板
场景1:高吞吐日志收集
producer:compression-type: lz4batch-size: 65536linger-ms: 100
consumer:auto-offset-reset: latestenable-auto-commit: true # 允许少量数据丢失以换取性能
场景2:金融级事务消息
producer:acks: allretries: 10transaction-id-prefix: fin-tx-
consumer:isolation-level: read_committedenable-auto-commit: false
场景3:跨数据中心同步
spring:kafka:bootstrap-servers: kafka-dc1:9092,kafka-dc2:9092properties:client.dns.lookup: use_all_dns_ips # 支持多IP解析reconnect.backoff.ms: 1000 # 断线重连策略
5.高级配置
1.事务支持
// 配置事务管理器
@Bean
public KafkaTransactionManager<String, String> transactionManager(ProducerFactory<String, String> producerFactory) {return new KafkaTransactionManager<>(producerFactory);
}// 使用事务发送
@Transactional
public void sendWithTransaction() {kafkaTemplate.send("topic1", "msg1");kafkaTemplate.send("topic2", "msg2");
}
2.消息重试与死信队列
spring:kafka:listener:retry:max-attempts: 3backoff:initial-interval: 1000multiplier: 2.0dead-letter-topic: my-dlt-topic # 死信队列
相关文章:
003 SpringBoot集成Kafka操作
4.SpringBoot集成Kafka 文章目录 4.SpringBoot集成Kafka1.入门示例2.yml完整配置3.关键配置注释说明1. 生产者优化参数2. 消费者可靠性配置3. 监听器高级特性4. 安全认证配置 4.配置验证方法5.不同场景配置模板场景1:高吞吐日志收集场景2:金融级事务消息…...

Android SystemUI开发(一)
frameworks/base/packages/SystemUI/src/com/android/systemui/SystemUI.java frameworks/base/packages/SystemUI/src/com/android/systemui/SystemUIService.java 关键文件 SystemUI 关键服务 简介 Dependency.class:处理系统依赖关系,提供资源或服…...

C#贪心算法
贪心算法:生活与代码中的 “最优选择大师” 在生活里,我们常常面临各种选择,都希望能做出最有利的决策。比如在超市大促销时,面对琳琅满目的商品,你总想用有限的预算买到价值最高的东西。贪心算法,就像是一…...

Vue程序下载
Vue是一个基于JavaScript(JS)实现的框架,想要使用它,就得先拿到Vue的js文件 Vue官网 Vue2:Vue.js Vue3:Vue.js - 渐进式 JavaScript 框架 | Vue.js 下载并安装vue.js 第一步:打开Vue2官网&a…...

【UCB CS 61B SP24】Lecture 17 - Data Structures 3: B-Trees学习笔记
本文以 2-3-4 树详细讲解了 B 树的概念,逐步分析其操作,并用 Java 实现了标准的 B 树。 1. 2-3 & 2-3-4 Trees 上一节课中讲到的二叉搜索树当数据是随机顺序插入的时候能够使得树变得比较茂密,如下图右侧所示,时间复杂度也就…...

机器学习决策树
一、香农公式 熵: 信息增益: 信息增益信息熵-条件熵 前者是初始信息熵大小,后者是因为条件加入后带来的确定性增加 信息增益表示得知特征X的信息而使得类Y的信息的不确定性减少的程度 信息增益越大说明影响越大 二、代码 ""&…...
Spring Boot + MyBatis 实现 RESTful API 的完整流程
后端开发:Spring Boot 快速开发实战 引言 在现代后端开发中,Spring Boot 因其轻量级、快速开发的特性而备受开发者青睐。本文将带你从零开始,使用 Spring Boot MyBatis 实现一个完整的 RESTful API,并深入探讨如何优雅地处理异…...
通过 ANSYS Discovery 进行 CFD 分析,增强工程设计
概括 工程师使用计算流体动力学 (CFD) 分析来研究和优化各种应用中的流体流动和传热分析。ANSYS Discovery 是一个用户友好的软件平台,使工程师能够轻松设置和解决 CFD 模型,并能够通知设计修改 在这篇博文中,我们将重点介绍在 Ansys Disc…...

家用可燃气体探测器——家庭燃气安全的坚实防线
随着社会的发展和变迁,天然气为我们的生活带来了诸多便利,无论是烹饪美食,还是温暖取暖,都离不开它的支持。然而,燃气安全隐患如影随形,一旦发生泄漏,可能引发爆炸、火灾等严重事故,…...

ListControl双击实现可编辑
为Edit Control控件添加丢失输入焦点事件,可见设为false 为List Control控件添加双击事件 控件和成员变量之间交换数据 CListCtrl ListPrint1; //列表输出 CEdit...

ave-form.vue 组件中 如何将产品名称发送给后端 ?
如何将产品名称发送给后端。 在这段代码中,产品名称(productName)的处理和发送主要发生在 save() 方法中。让我逐步分析: 产品ID的选择: <w-form-selectv-model"form.productId"label"涉及产品&q…...

DeepSeek行业应用实践报告-智灵动力【112页PPT全】
DeepSeek(深度搜索)近期引发广泛关注并成为众多企业/开发者争相接入的现象,主要源于其在技术突破、市场需求适配性及生态建设等方面的综合优势。以下是关键原因分析: 一、技术核心优势 开源与低成本 DeepSeek基于开源架构…...

【Markdown 语法简洁讲解】
Markdown 语法简洁语法讲解 什么是 Markdown1. 标题2. 列表3.文本样式4. 链接与图片5. 代码6. 表格7. 分割线8. 流程图9. 数学公式10. 快捷键11. 字体、字号与颜色 什么是 Markdown Markdown 是一种轻量级标记语言,通过简单的符号实现排版格式化,专注于…...

250301-OpenWebUI配置DeepSeek-火山方舟+硅基流动+联网搜索+推理显示
A. 最终效果 B. 火山方舟配置(一定要点击添加) C. 硅基流动配置(最好要点击添加,否则会自动弹出所有模型) D. 联网搜索配置 E. 推理过程显示 默认是没有下面的推理过程的显示的 设置步骤: 在Functions函…...
【3天快速入门WPF】12-MVVM
目录 1. 什么是MVVM2. 实现简单MVVM2.1. Part 12.2. Part 21. 什么是MVVM MVVM 是 Model-View-ViewModel 的缩写,是一种用于构建用户界面的设计模式,是一种简化用户界面的事件驱动编程方式。 MVVM 的目标是实现用户界面和业务逻辑之间的彻底分离,以便更好地管理和维护应用…...

查找Excel包含关键字的行(の几种简单快速方法)
需求:数据在后缀为xlsx的Excel的sheet1中且量比较大,比如几十万行几百列;想查找一个关键字所在的行,比如"全网首发"; 情况①知道关键字在哪一列 情况②不确定在哪一列,很多列相似又不同,本文演…...

性能测试分析和调优
步骤 性能调优的步骤 性能调优的步骤: 1.确定问题:根据性能测试的结果来分析确定bug。–测试人员职责 2.分析原因:分析问题产生的原因。----开发人员职责 3.给出解决方案:可以是修改软件配置、增加硬件资源配置、修改代码等----…...

(视频教程)Compass代谢分析详细流程及python版-R语言版下游分析和可视化
不想做太多的前情解说了,有点累了,做了很久的内容,包括整个分析,从软件安装和报错解决到后期下游python版-R语言版下游分析和可视化!单细胞代谢分析我们写过很多了,唯独少了最“高级”的compass,…...

【SQL】MySQL中的字符串处理函数:concat 函数拼接字符串,COALESCE函数处理NULL字符串
MySQL中的字符串处理函数:concat 函数 一、concat ()函数 1.1、基本语法1.2、示例1.3、特殊用途 二、COALESCE()函数 2.1、基本语法2.2、示例2.3、用途 三、进阶练习 3.1 条件和 SQL 语句3.2、解释 一、concat &…...
c++中深拷贝和浅拷贝的联系和区别
在 C 编程里,深拷贝和浅拷贝是两种不同的对象复制方式,它们在实现方式、资源管理和适用场景等方面存在显著差异。下面为你详细介绍它们的区别。 1. 基本概念 浅拷贝:浅拷贝仅仅复制对象的成员变量值。对于基本数据类型(如 int、d…...

深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录
ASP.NET Core 是一个跨平台的开源框架,用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录,以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...

iPhone密码忘记了办?iPhoneUnlocker,iPhone解锁工具Aiseesoft iPhone Unlocker 高级注册版分享
平时用 iPhone 的时候,难免会碰到解锁的麻烦事。比如密码忘了、人脸识别 / 指纹识别突然不灵,或者买了二手 iPhone 却被原来的 iCloud 账号锁住,这时候就需要靠谱的解锁工具来帮忙了。Aiseesoft iPhone Unlocker 就是专门解决这些问题的软件&…...

最新SpringBoot+SpringCloud+Nacos微服务框架分享
文章目录 前言一、服务规划二、架构核心1.cloud的pom2.gateway的异常handler3.gateway的filter4、admin的pom5、admin的登录核心 三、code-helper分享总结 前言 最近有个活蛮赶的,根据Excel列的需求预估的工时直接打骨折,不要问我为什么,主要…...
工程地质软件市场:发展现状、趋势与策略建议
一、引言 在工程建设领域,准确把握地质条件是确保项目顺利推进和安全运营的关键。工程地质软件作为处理、分析、模拟和展示工程地质数据的重要工具,正发挥着日益重要的作用。它凭借强大的数据处理能力、三维建模功能、空间分析工具和可视化展示手段&…...

让AI看见世界:MCP协议与服务器的工作原理
让AI看见世界:MCP协议与服务器的工作原理 MCP(Model Context Protocol)是一种创新的通信协议,旨在让大型语言模型能够安全、高效地与外部资源进行交互。在AI技术快速发展的今天,MCP正成为连接AI与现实世界的重要桥梁。…...
css3笔记 (1) 自用
outline: none 用于移除元素获得焦点时默认的轮廓线 broder:0 用于移除边框 font-size:0 用于设置字体不显示 list-style: none 消除<li> 标签默认样式 margin: xx auto 版心居中 width:100% 通栏 vertical-align 作用于行内元素 / 表格单元格ÿ…...

代码规范和架构【立芯理论一】(2025.06.08)
1、代码规范的目标 代码简洁精炼、美观,可持续性好高效率高复用,可移植性好高内聚,低耦合没有冗余规范性,代码有规可循,可以看出自己当时的思考过程特殊排版,特殊语法,特殊指令,必须…...

协议转换利器,profinet转ethercat网关的两大派系,各有千秋
随着工业以太网的发展,其高效、便捷、协议开放、易于冗余等诸多优点,被越来越多的工业现场所采用。西门子SIMATIC S7-1200/1500系列PLC集成有Profinet接口,具有实时性、开放性,使用TCP/IP和IT标准,符合基于工业以太网的…...

jdbc查询mysql数据库时,出现id顺序错误的情况
我在repository中的查询语句如下所示,即传入一个List<intager>的数据,返回这些id的问题列表。但是由于数据库查询时ID列表的顺序与预期不一致,会导致返回的id是从小到大排列的,但我不希望这样。 Query("SELECT NEW com…...

GAN模式奔溃的探讨论文综述(一)
简介 简介:今天带来一篇关于GAN的,对于模式奔溃的一个探讨的一个问题,帮助大家更好的解决训练中遇到的一个难题。 论文题目:An in-depth review and analysis of mode collapse in GAN 期刊:Machine Learning 链接:...