003 SpringBoot集成Kafka操作
4.SpringBoot集成Kafka
文章目录
- 4.SpringBoot集成Kafka
- 1.入门示例
- 2.yml完整配置
- 3.关键配置注释说明
- 1. 生产者优化参数
- 2. 消费者可靠性配置
- 3. 监听器高级特性
- 4. 安全认证配置
- 4.配置验证方法
- 5.不同场景配置模板
- 场景1:高吞吐日志收集
- 场景2:金融级事务消息
- 场景3:跨数据中心同步
- 5.高级配置
- 1.事务支持
- 2.消息重试与死信队列
来源参考的deepseek,如有侵权联系立删
1.入门示例
1.pom依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
2.KafkaProducer消息生产者配置
@Component
@Slf4j
public class KafkaProducer {private HashMap map=new HashMap<>();@Autowiredprivate KafkaTemplate<Integer,String> kafkaTemplate;public void send(String topic,String msg){log.info("开始发送消息,topic:{};message:{}",topic,msg);ListenableFuture<SendResult<Integer,String>> send=kafkaTemplate.send(topic, msg);//消息确认机制send.addCallback(new ListenableFutureCallback<SendResult<Integer,String>>(){@Overridepublic void onSuccess(SendResult<Integer, String> result) {log.info("消息发送成功,topic:{};message:{}",topic,msg);}@Overridepublic void onFailure(Throwable ex) {//落库操作map.put(topic,msg);}});}
}
springboot3.x写法
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import lombok.RequiredArgsConstructor;@Service
@RequiredArgsConstructor
public class KafkaProducerService {private final KafkaTemplate<String, String> kafkaTemplate;// 同步发送(阻塞等待确认)public void sendMessageSync(String topic, String key, String value) {kafkaTemplate.send(topic, key, value).whenComplete((result, ex) -> {if (ex == null) {System.out.printf("发送成功:topic=%s, partition=%d, offset=%d%n",result.getRecordMetadata().topic(),result.getRecordMetadata().partition(),result.getRecordMetadata().offset());} else {System.err.println("发送失败:" + ex.getMessage());}});}// 异步发送(默认方式)public void sendMessageAsync(String topic, String message) {kafkaTemplate.send(topic, message);}
}
- Spring Boot 2.x:
send()返回ListenableFuture<SendResult>,支持addCallback()回调。 - Spring Boot 3.x:
send()返回CompletableFuture<SendResult>,弃用ListenableFuture,因此需要使用CompletableFuture的 API(如whenComplete)。
3.KafkaConsumer消息消费
@Component
@Slf4j
public class KafkaConsumer {private List<String> exist=new ArrayList<>();@KafkaListener(topics = {"lx"},groupId = "lx")public void consumer(ConsumerRecord<Integer,String> record){if (exist.contains(record.value())){log.error("不满足幂等校验!!!");}log.info("消息消费成功,topic:{},message:{}", record.topic(), record.value());exist.add(record.value());}
}
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;@Component
public class KafkaConsumerService {// 单个消息消费(手动提交偏移量)@KafkaListener(topics = "test-topic", groupId = "spring-group")public void listenMessage(String message, Acknowledgment ack) {System.out.println("收到消息:" + message);ack.acknowledge(); // 手动提交}// 批量消费(需配置 listener.type=batch)@KafkaListener(topics = "batch-topic", groupId = "spring-group")public void listenBatch(List<String> messages, Acknowledgment ack) {messages.forEach(msg -> System.out.println("批量消息:" + msg));ack.acknowledge();}
}
4.yml配置文件
生产者配置
#kafka配置
spring:kafka:#kafka集群地址# bootstraps-server: 192.168.25.100:9092bootstrap-servers: 47.122.26.22:9092producer:#批量发送的数据量大小batch-size: 1#可用发送数量的最大缓存buffer-memory: 33554432#key序列化器key-serializer: org.apache.kafka.common.serialization.StringSerializer#value序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer#达到多少时间后,会发送properties:linger.ms: 1# 禁止生产者触发 Topic 创建请求allow.auto.create.topics: false#代表集群中从节点都持久化后才认为发送成功acks: -1
消费者配置
spring:kafka:#kafka集群地址bootstraps-server: 192.168.25.100:9092consumer:enable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:session.timeout.ms: 15000# 禁用生产者触发 Topic 元数据请求时自动创建allow.auto.create.topics: falsegroup-id: testauto-offset-reset: earliestlistener:ack-mode: manual_immediate # 精准控制offset提交concurrency: 3 # 并发消费者数type: batch
5.实体类
@Data
public class KafkaRequest {/*** 主题*/private String topic;/*** 消息*/private String message;
}
6.消息发送
@RestController
@Slf4j
public class KafkaController {private final String topic="lx";private int temp=1;@Autowiredprivate KafkaProducer producer;/*** 下单** @param kafkaRequest* @return null*/@RequestMapping("/test01")public void test01(KafkaRequest kafkaRequest){log.info("test01测试成功!topic:{};message:{}",kafkaRequest.getTopic(), kafkaRequest.getMessage());producer.send(kafkaRequest.getTopic(), kafkaRequest.getMessage());}@RequestMapping("/test02")public void test02(KafkaRequest kafkaRequest){log.info("test02测试成功!topic:{};message:{}",topic, temp);producer.send(topic, String.valueOf(temp));temp++;}
}
kafka启动方式
./kafka-server-start.sh ../config/server.properties
2.yml完整配置
spring:kafka:# 基础配置(必填项)bootstrap-servers: localhost:9092 # Kafka集群地址,多节点用逗号分隔 client-id: spring-boot-app # 客户端标识(日志追踪用)# 生产者配置 producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer # 键序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer # 值序列化器acks: all # 消息确认机制:all表示所有副本确认(最高可靠性)retries: 5 # 发送失败重试次数(需配合幂等性使用)batch-size: 16384 # 批量发送缓冲区大小(单位:字节)linger-ms: 50 # 发送延迟等待时间(毫秒,提高吞吐量)buffer-memory: 33554432 # 生产者内存缓冲区大小(默认32MB)compression-type: snappy # 消息压缩算法(可选gzip/lz4/zstd)transaction-id-prefix: tx- # 开启事务时需配置前缀(需配合@Transactional)# 消费者配置 consumer:group-id: app-consumer-group # 消费者组ID(同一组共享分区)auto-offset-reset: earliest # 无Offset时策略:earliest(从头)/latest(最新)enable-auto-commit: false # 关闭自动提交Offset(推荐手动提交)key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 500 # 单次poll最大消息数(避免OOM)fetch-max-wait-ms: 500 # 消费者等待broker返回数据的最长时间isolation-level: read_committed # 事务消息隔离级别(read_committed/read_uncommitted)# 监听器配置(高级优化)listener:type: single # 监听器类型:single(单条)/batch(批量)ack-mode: manual # Offset提交模式:manual(手动)/batch(批量提交)concurrency: 3 # 消费者线程数(建议等于分区数)poll-timeout: 3000 # poll方法超时时间(毫秒)# 消息重试与死信队列(容错机制)retry:topic:attempts: 3 # 最大重试次数initial-interval: 1000 # 初始重试间隔(毫秒)multiplier: 2.0 # 重试间隔倍数(指数退避)dead-letter-topic: dlq-${topic} # 死信队列命名规则(自动创建)# 安全协议(企业级场景)properties:security.protocol: SASL_PLAINTEXT # 安全协议(如PLAINTEXT/SASL_SSL)sasl.mechanism: PLAIN # SASL认证机制ssl.truststore.location: /path/to/truststore.jks# 自定义业务配置(非Kafka标准参数)app:kafka:topics:input-topic: user-events # 业务输入Topicoutput-topic: processed-events # 业务输出Topic
3.关键配置注释说明
1. 生产者优化参数
| 参数 | 说明 | 推荐值 |
|---|---|---|
acks=all | 确保所有ISR副本写入成功,防止数据丢失 | 高可靠性场景必选 |
compression-type=snappy | 减少网络带宽占用,提升吞吐量 | 消息体>1KB时启用 |
transaction-id-prefix | 支持跨分区原子性写入(需配合@Transactional注解) | 金融交易类业务必配 |
2. 消费者可靠性配置
| 参数 | 说明 | 注意事项 |
|---|---|---|
enable-auto-commit=false | 避免消息处理失败但Offset已提交导致数据丢失 | 需手动调用ack.acknowledge() |
isolation-level=read_committed | 只消费已提交的事务消息 | 需与生产者事务配置联动 |
3. 监听器高级特性
| 参数 | 使用场景 | 示例 |
|---|---|---|
type=batch | 批量消费(提升吞吐量) | 适用于日志处理等实时性要求低的场景 |
concurrency=3 | 并发消费者数 | 需与Topic分区数一致,避免资源浪费 |
4. 安全认证配置
spring:kafka:properties:security.protocol: SASL_SSLsasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="secret";
- 企业级必配:生产环境需启用SSL加密+SASL认证
4.配置验证方法
- 启动检查:添加
@ConfigurationProperties(prefix = "spring.kafka")绑定配置到Bean,通过单元测试验证注入值 - 日志监控:开启DEBUG日志观察生产者/消费者连接状态
logging:level:org.springframework.kafka: DEBUG
- AdminClient 工具:通过编程方式检查Topic元数据
@Autowired
private KafkaAdminClient adminClient;public void checkTopic() {Map<String, TopicDescription> topics = adminClient.describeTopics("user-events");topics.values().forEach(topic -> System.out.println(topic));
}
5.不同场景配置模板
场景1:高吞吐日志收集
producer:compression-type: lz4batch-size: 65536linger-ms: 100
consumer:auto-offset-reset: latestenable-auto-commit: true # 允许少量数据丢失以换取性能
场景2:金融级事务消息
producer:acks: allretries: 10transaction-id-prefix: fin-tx-
consumer:isolation-level: read_committedenable-auto-commit: false
场景3:跨数据中心同步
spring:kafka:bootstrap-servers: kafka-dc1:9092,kafka-dc2:9092properties:client.dns.lookup: use_all_dns_ips # 支持多IP解析reconnect.backoff.ms: 1000 # 断线重连策略
5.高级配置
1.事务支持
// 配置事务管理器
@Bean
public KafkaTransactionManager<String, String> transactionManager(ProducerFactory<String, String> producerFactory) {return new KafkaTransactionManager<>(producerFactory);
}// 使用事务发送
@Transactional
public void sendWithTransaction() {kafkaTemplate.send("topic1", "msg1");kafkaTemplate.send("topic2", "msg2");
}
2.消息重试与死信队列
spring:kafka:listener:retry:max-attempts: 3backoff:initial-interval: 1000multiplier: 2.0dead-letter-topic: my-dlt-topic # 死信队列
相关文章:
003 SpringBoot集成Kafka操作
4.SpringBoot集成Kafka 文章目录 4.SpringBoot集成Kafka1.入门示例2.yml完整配置3.关键配置注释说明1. 生产者优化参数2. 消费者可靠性配置3. 监听器高级特性4. 安全认证配置 4.配置验证方法5.不同场景配置模板场景1:高吞吐日志收集场景2:金融级事务消息…...
2.✨java练习1(熟悉“类”)
1. A B - AcWing题库 问题描述 输入两个整数,求这两个整数的和是多少。 输入格式 输入两个整数A,B,用空格隔开 输出格式 输出一个整数,表示这两个数的和 数据范围 0≤A,B≤1e8 C #include <iostream> // 包含标准输入输出库 using n…...
基本网络安全的实现
基本网络安全的实现 一 :AAA AAA 是Authentication,Authorization and Accounting(认证、授权和计费)的简 称,它提供了一个用来对认证、授权和计费这三种安全功能进行配置的一致性框架, 它是对网络安全…...
快手前端通用静态托管服务KFX演进历程:从崎岖土路到平坦高速
快手静态部署托管服务(KFX)历经四年发展,经历了三个阶段,一步步从勉强能行车的“崎岖土路”到现在多车道并行的“平坦高速”,这一转变极大地提升了资源利用率和效率,满足业务的实际需要。本文将带你了解其背…...
登录逻辑结合redis
1. 用户登录 用户访问登录页面,输入用户名和密码,提交表单。 服务端验证用户名和密码: 如果验证成功,生成 ticket,并将 ticket 和用户 ID 存储在缓存中(如 Redis)。 将 ticket 放入 Cookie 中…...
Locale+Jackson导致Controller接口StackOverflowError异常解决
问题 由于参与的项目有出海需求,即需要给外国人使用,即:需要支持i18n(Internationalization的缩写,共20个字母,除去首尾两个字母,中间有18个,故简称i18n)。 本来是好的…...
安卓工控平板电脑在环境监测设备中的运用
安卓工控平板电脑在环境监测设备中的运用主要体现在以下几个方面: 一、耐用性与可靠性 安卓工控平板电脑通常具有坚固耐用的外壳设计,如铝合金面板和镀锌钢板箱体结构,能够抵抗高温、低温、湿度、震动等恶劣的工作环境。这种耐用性和可靠性…...
【洛谷排序算法】P1012拼数-详细讲解
洛谷 P1012 拼数这道题本身并非单纯考察某种经典排序算法(如冒泡排序、选择排序、插入排序、快速排序、归并排序等)的实现,而是在排序的基础上,自定义了排序的比较规则,属于自定义排序类型的题目。不过它借助了标准库中…...
文心一言AI创意画
介绍 文心一言是百度推出的新一代知识增强大语言模型,属于文心大模型家族的新成员。它能够与人对话互动、回答问题、协助创作,高效便捷地帮助人们获取信息、知识和灵感。 特点 文心一言基于数万亿数据和数千亿知识进行融合学习,采用预训…...
java项目之基于ssm的图书馆书库管理系统(源码+文档)
风定落花生,歌声逐流水,大家好我是风歌,混迹在java圈的辛苦码农。今天要和大家聊的是一款基于ssm的图书馆书库管理系统。项目源码以及部署相关请联系风歌,文末附上联系信息 。 项目简介: 该系统可以实现图书信息管理…...
使用OpenCV实现帧间变化检测:基于轮廓的动态区域标注
在计算机视觉中,帧间差异检测(frame differencing)是一种常用的技术,用于检测视频流中的动态变化区域。这种方法尤其适用于监控、运动分析、目标追踪等场景。在这篇博客中,我们将通过分析一个基于OpenCV的简单帧间差异…...
deepseek从入门到精通-第一篇.本地化部署
前言 自从22年年底开始,人工智能开始从实验室一下子走入了普通人的视野中,chatgpt像一颗石子投入水中,溅起了一波又一波的涟漪。我们都通过各种方式试用大预言模型和机器进行对话或者提问。随着大语言模型的出现,各个类型的大模型…...
2025年SCI一区智能优化算法:真菌生长优化算法(Fungal Growth Optimizer,FGO),提供MATLAB代码
一. 真菌生长优化算法(FGO) 真菌生长优化算法(Fungal Growth Optimizer,FGO)是一种新型的自然启发式元启发式算法,其灵感来源于自然界中真菌的生长行为。该算法通过模拟真菌的菌丝尖端生长、分支和孢子萌发…...
一个行为类似标准库find算法的模板
函数需要两个模板类型参数,一个表示函数的迭代器参数,另一个表示值的类型。 代码 #include<iostream> #include<string> #include<vector> #include<list>using namespace std;template <typename IterType,typename T>…...
Ubutu部署WordPress
前言 什么是word press WordPress是一种使用PHP语言开发的建站系统,用户可以在支持PHP和MySQL数据库的服务器上架设WordPress。它是一个开源的内容管理系统(CMS),允许用户构建动态网站和博客。现在的WordPress已经强大到几乎可以…...
网络渗透作业
第一题:使用Xpath对Order by 语句进行布尔盲注 <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> <html xmlns"http://www.w3.org/1999/xhtml&quo…...
BIO、NIO、AIO解析
一、基础概念 1、IO的含义 IO,Input/Output,即输入/输出。从计算机结构来看,IO描述了计算机系统和外部设备之间通讯的过程。从应用程序角度来看,一个进程的地址空间划分为 用户空间(User space) 和 内核空…...
【Python网络爬虫笔记】14-使用代理绕过访问限制
【Python网络爬虫笔记】14-网络代理 目录什么是代理?为什么需要使用代理?代理的类型如何在Python中使用代理?使用requests库设置代理使用urllib库设置代理使用scrapy框架设置代理 典型案例:使用代理爬取豆瓣电影Top250步骤1&#…...
⭐算法OJ⭐位操作实战(C++ 实现)190. Reverse Bits | 268. Missing Number | 338. Counting Bits
文章目录 190. Reverse Bits逐位反转思路步骤代码复杂度分析 268. Missing Number338. Counting Bits动态规划 最低有效位思路步骤代码复杂度分析 动态规划 最后设置位思路步骤代码复杂度分析 190. Reverse Bits Reverse bits of a given 32 bits unsigned integer. 逐位反…...
Linux中Shell运行原理和权限(下)(4)
文章目录 前言一、Shell的运行原理二、Linux当中的权限问题Linux权限的概念如何将普通用户添加到信任列表 三、Linux权限管理文件访问者的分类(人)文件类型和访问权限(事物属性)文件权限值的表示方法文件访问权限的相关设置方法如…...
Java中的缓存技术:Guava Cache vs Caffeine vs Redis
在Java中,缓存技术是提升应用性能的重要手段。常见的缓存技术包括Guava Cache、Caffeine和Redis。它们各有优缺点,适用于不同的场景。以下是对它们的详细对比: 1. Guava Cache 类型: 本地缓存 特点: 基于内存的缓存,适用于单机应…...
C# 弃元的使用
总目录 前言 在C# 7.0及更高版本中,弃元(Discard)是一个新的语言特性,允许开发者在特定情况下忽略某些值。弃元用下划线 _ 作为占位符,明确表示忽略某个值,提升代码可读性 一、弃元是什么? 1.…...
OceanBase数据库实战:Windows Docker部署与DBeaver无缝对接
一、前言 OceanBase 是一款高性能、高可扩展的分布式数据库,适用于大规模数据处理和企业级应用。 随着大数据和云计算的普及,OceanBase 在企业数字化转型中扮演着重要角色。学习 OceanBase 可以帮助开发者掌握先进的分布式数据库技术,提升数…...
技术速递|.NET 9 网络优化
作者:Mňa,Natalia,Anton 排版:Alan Wang 秉承我们的传统,我们很高兴与您分享这篇博客文章,以介绍新的 .NET 版本中网络领域相关的最新动态和最有趣的变化。今年,我们带来了 HTTP 领域的改变、新…...
如何让 Git 管理本地项目
如何让 Git 管理本地项目:详细步骤指南 Git 是最流行的分布式版本控制系统,能够高效管理项目的代码变更历史。以下是将本地项目交给 Git 管理的完整流程,适用于首次使用 Git 的开发者。 一、前置条件 安装 Git 二、初始化 Git 仓库 进入项目…...
如何进行OceanBase 运维工具的部署和表性能优化
本文来自OceanBase 用户的实践分享 随着OceanBase数据库应用的日益深入,数据量不断攀升,单个表中存储数百万乃至数千万条数据的情况变得愈发普遍。因此,部署专门的运维工具、实施针对性的表性能优化策略,以及加强指标监测工作&…...
Tag标签的使用
一个非常适合运用在vue项目中的组件:Tag标签。 目录 一、准备工作 1、安装element-plus库 2、配置element-plus库 二、Tag标签入门 1、打开element官网,搜索tag标签 2、体验Tag标签的基础用法 三、Tag标签进阶训练1 1、定义一个数组,…...
DeepSeek系统架构的逐层分类拆解分析,从底层基础设施到用户端分发全链路
一、底层基础设施层 1. 硬件服务器集群 算力单元: GPU集群:基于NVIDIA H800/H100 GPU构建,单集群规模超10,000卡,采用NVLink全互联架构实现低延迟通信。国产化支持:适配海光DCU、寒武纪MLU等国产芯片,通过…...
Linux:(3)
一:Linux和Linux互传(压缩包) scp:Linux scp 命令用于 Linux 之间复制文件和目录。 scp 是 secure copy 的缩写, scp 是 linux 系统下基于 ssh 登陆进行安全的远程文件拷贝命令。 scp 是加密的,rcp 是不加密的,scp 是…...
el-select滚动获取下拉数据;el-select滚动加载
el-select下拉获取数据 1.解决问题2.封装MyScrollSelect组件3.使用MyScrollSelect组件 1.解决问题 场景:下拉数据量过大,后端提供一个分页查询接口;需要每次滚动加载下一页的下拉数据 且单选的状态,需要支持回显,通过n…...
