Spring Boot 中整合 Kafka
在 Spring Boot 中整合 Kafka 非常简单,Spring Kafka 提供了丰富的支持,使得我们可以轻松地实现 Kafka 的生产者和消费者。下面是一个简单的 Spring Boot 整合 Kafka 的示例。
1. 添加依赖
首先,在
pom.xml
中添加 Spring Kafka 的依赖:xml
复制
<dependencies><!-- Spring Boot Starter Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- Spring Boot Starter Test --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- Spring Kafka Test --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency> </dependencies>运行 HTML
2. 配置 Kafka
在
application.properties
或application.yml
中配置 Kafka 的连接信息:properties
复制
# application.properties spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=my-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
3. 创建 Kafka 生产者
创建一个 Kafka 生产者,用于发送消息到 Kafka 主题:
java
复制
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service;@Service public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);} }
4. 创建 Kafka 消费者
创建一个 Kafka 消费者,用于从 Kafka 主题中接收消息:
java
复制
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service;@Service public class KafkaConsumer {@KafkaListener(topics = "my-topic", groupId = "my-group")public void listen(String message) {System.out.println("Received message: " + message);} }
5. 创建控制器
创建一个控制器,用于发送消息到 Kafka 主题:
java
复制
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;@RestController public class KafkaController {@Autowiredprivate KafkaProducer kafkaProducer;@GetMapping("/send")public String sendMessage(@RequestParam("message") String message) {kafkaProducer.sendMessage("my-topic", message);return "Message sent: " + message;} }
6. 启动应用程序
创建一个 Spring Boot 应用程序启动类:
java
复制
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class KafkaDemoApplication {public static void main(String[] args) {SpringApplication.run(KafkaDemoApplication.class, args);} }
7. 运行应用程序
启动 Spring Boot 应用程序,然后访问
http://localhost:8080/send?message=Hello
,你将看到控制台输出Received message: Hello
。总结
通过以上步骤,我们成功地在 Spring Boot 中整合了 Kafka,并实现了消息的生产和消费。Spring Kafka 提供了简单易用的 API,使得我们可以轻松地与 Kafka 进行交互。你可以根据实际需求进一步扩展和优化这个示例。
在 Kafka 中,消费者消费消息失败后是否重新入队(重回队列)取决于消费者的配置和处理逻辑。Kafka 本身并没有直接提供“消费失败重回队列”的功能,但可以通过一些策略来实现类似的效果。
Kafka 消费者的基本行为
自动提交偏移量:默认情况下,Kafka 消费者会自动提交偏移量。如果消息处理失败,偏移量仍然会被提交,导致消息丢失。
手动提交偏移量:通过配置
enable.auto.commit=false
,消费者可以手动提交偏移量。这样可以在消息处理成功后再提交偏移量,确保消息不会丢失。实现消费失败重回队列的策略
1. 手动提交偏移量
通过手动提交偏移量,可以在消息处理成功后再提交偏移量,从而避免消息丢失。如果消息处理失败,可以选择不提交偏移量,让消费者在下一次拉取消息时重新处理该消息。
java
复制
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Service;@Service public class KafkaConsumer {@KafkaListener(topics = "my-topic", groupId = "my-group")public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {try {// 处理消息processMessage(record.value());// 处理成功后提交偏移量acknowledgment.acknowledge();} catch (Exception e) {// 处理失败,不提交偏移量,让消费者在下一次拉取时重新处理该消息System.err.println("Message processing failed: " + e.getMessage());}}private void processMessage(String message) {// 模拟消息处理逻辑if (message.contains("error")) {throw new RuntimeException("Simulated error");}System.out.println("Processed message: " + message);} }
相关文章:
Spring Boot 中整合 Kafka
在 Spring Boot 中整合 Kafka 非常简单,Spring Kafka 提供了丰富的支持,使得我们可以轻松地实现 Kafka 的生产者和消费者。下面是一个简单的 Spring Boot 整合 Kafka 的示例。 1. 添加依赖 首先,在 pom.xml 中添加 Spring Kafka 的依赖&#…...

什么是开放式耳机?具有什么特色?非常值得入手的蓝牙耳机推荐
开放式耳机是当下较为热门的一种耳机类型。它具有以下特点: 设计结构: 呈现开放式的构造,不会完全堵住耳道。如此一来,外界声音能够较容易地被使用者听到,在使用耳机时可以保持对周围环境的察觉。比如在户外…...
编译 FFmpeg 以支持 AV1 编解码器以及其他硬件加速选项(如 NVENC、VAAPI 等)
步骤 1: 安装必要的依赖 sudo apt update sudo apt install -y \autoconf automake build-essential cmake git libass-dev libfreetype6-dev \libsdl2-dev libtool libva-dev libvdpau-dev libxcb1-dev libxcb-shm0-dev \libxcb-xfixes0-dev pkg-config texinfo wget zlib1g-…...
解释一下Java中的多线程。如何创建一个新的线程?
在Java中,多线程是一种机制,允许一个程序同时执行多个任务或处理。每个任务被称为一个线程。 这种并行执行可以极大地提高应用程序的效率和响应速度。 例如,在开发一个桌面应用程序时,你可以使用一个线程来更新用户界面…...

Java语言程序设计基础篇_编程练习题**18.30 (找出单词)
题目:**18.30 (找出单词) 编写一个程序,递归地找出某个目录下的所有文件中某个单词出现的次数。从命令行如下传递参数: java Exercise18_30 dirName word 习题思路 (读取路径方法)和18.28题差不多,把找…...
MyBatis中 #{} 和 ${} 的区别
1. #{id}(参数占位符) 作用: 使用 #{id} 时,MyBatis 会将 id 参数绑定为 JDBC 的参数。这种方式能够有效防止 SQL 注入攻击,因为它会进行参数的预处理,将参数值作为数据类型的绑定,而不是直接插入到 SQL 语…...

Android Perfetto 学习
1、如何抓取性能日志 方式1、通过手机里的System Tracing抓取 1、点击Settings->System->Developer options->System Tracing->Record trace 打开 2、操作完成后,点击Settings->System->Developer options->System Tracing->Record trace…...
ES数据的删除与备份
背景 需要删除索引下满足指定条件的文档数据,并将删除的数据进行备份。 操作步骤 新建索引 该索引结构与映射关系与原索引一致 查看原索引设置 GET /tb/_settings结果: {"tb" : {"settings" : {"index" : {"ro…...
论文解读《Object-Centric Learning with Slot Attention》
系列文章目录 文章目录 系列文章目录论文细节理解 1. 研究背景2. 论文贡献3. 方法框架3.1 Slot Attention模块3.2 无监督对象发现架构 4. 研究思路5. 实验6. 限制 论文细节理解 supervised property prediction tasks是什么? Supervised property prediction tasks…...

YOLOv8+注意力机制+PyQt5玉米病害检测系统完整资源集合
资源包含可视化的玉米病害检测系统,基于最新的YOLOv8注意力机制训练的玉米病害检测模型,和基于PyQt5制作的可视玉米病害系统,包含登陆页面和检测页面,该系统可自动检测和识别图片或视频当中出现的七类玉米病害:矮花叶病…...

tcp、udp通信调试工具Socket Tool
tcp、udp通信调试工具Socket Tool ]...

MedPrompt:基于提示工程的医学诊断准确率优化方法
Medprompt:基于提示工程的医学诊断准确率优化方法 秒懂大纲解法拆解MedPrompt 提示词全流程分析总结创意视角 论文:Can Generalist Foundation Models Outcompete Special-Purpose Tuning? Case Study in Medicine 秒懂大纲 ├── 1 研究背景【描述背…...
关于ollama 在mac的部署问题
安装 官网链接 直接按需求下载即可 默认模型下载地址 macOS: ~/.ollama/models Linux: /usr/share/ollama/.ollama/models Windows: C:\Users<username>.ollama\models 根据需要的平台设置相应的环境变量: OLLAMA_MODELS 如Mac 设置 ~/.zshrc …...
职业技能大赛-单元测试笔记(assertThat)分享
前言 assertThat 是一种用于编写测试断言的方法,广泛应用于 Java 及其他编程语言的测试框架中,如 JUnit 和 AssertJ。它特别强调可读性和流畅性,使得测试代码更加直观易懂,从而提高了开发者在编写和维护测试时的效率。传统的断言方法通常以较为简洁但不够清晰的形式出现,例…...

AI大模型:OpenAI o1或能成为引领AI Phenomenal Ride的LLM新范式
OpenAI 发布 o1 系列大模型,AI 大模型进入新纪元**。**9 月 12 日,OpenAI 宣布开发了一系列全新AI 模型,其被命名为 OpenAI o1-preview,旨在在回应前投入更多时间思考。与之前的模型相比,这些模型能够更好地进行推理&a…...

天命人,如何轻松利用仿真技术打造出属于你的“金箍棒”?
近期,一款以西游记为背景的国产游戏,重塑了悟空这一经典角色,将其置于一个黑暗、魔幻的世界中。同时也是国内第一款 3A 游戏大作,而所谓 3A 游戏,简单来说就是高质量,高体量,高成本的单机…...

【Qt | QAction】Qt 的 QAction 类介绍
😁博客主页😁:🚀https://blog.csdn.net/wkd_007🚀 🤑博客内容🤑:🍭嵌入式开发、Linux、C语言、C、数据结构、音视频🍭 🤣本文内容🤣&a…...
写论文一定要知道的三大AI工具!5分钟完成论文初稿
在当今的学术研究和写作领域,AI工具已经成为不可或缺的助手。它们不仅能够提高写作效率,还能帮助研究者生成高质量的论文。以下是三大值得推荐的AI工具,它们可以帮助你在5分钟内完成论文初稿,并且特别推荐千笔-AIPasspaper。 千笔…...

时装购物|时装购物系统|基于springboot的时装购物系统设计与实现(源码+数据库+文档)
时装购物系统目录 目录 基于springboot的时装购物系统设计与实现 一、前言 二、系统功能设计 三、系统实现 5.1管理员功能模块 四、数据库设计 1、实体ER图 2、具体的表设计如下所示: 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源码获取…...
Android——内部/外部存储
Android 内部存储 与宿主 App 的生命周期相同,应用卸载时,会被系统自动删除。宿主 App 可以直接访问,无需权限。其他应用无权访问。用户访问需 Root 权限。适合存储与应用直接相关,隐私性或敏感性高的数据。 主要API getDataDi…...

观成科技:隐蔽隧道工具Ligolo-ng加密流量分析
1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具,该工具基于TUN接口实现其功能,利用反向TCP/TLS连接建立一条隐蔽的通信信道,支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式,适应复杂网…...
rknn优化教程(二)
文章目录 1. 前述2. 三方库的封装2.1 xrepo中的库2.2 xrepo之外的库2.2.1 opencv2.2.2 rknnrt2.2.3 spdlog 3. rknn_engine库 1. 前述 OK,开始写第二篇的内容了。这篇博客主要能写一下: 如何给一些三方库按照xmake方式进行封装,供调用如何按…...

CMake基础:构建流程详解
目录 1.CMake构建过程的基本流程 2.CMake构建的具体步骤 2.1.创建构建目录 2.2.使用 CMake 生成构建文件 2.3.编译和构建 2.4.清理构建文件 2.5.重新配置和构建 3.跨平台构建示例 4.工具链与交叉编译 5.CMake构建后的项目结构解析 5.1.CMake构建后的目录结构 5.2.构…...

Cloudflare 从 Nginx 到 Pingora:性能、效率与安全的全面升级
在互联网的快速发展中,高性能、高效率和高安全性的网络服务成为了各大互联网基础设施提供商的核心追求。Cloudflare 作为全球领先的互联网安全和基础设施公司,近期做出了一个重大技术决策:弃用长期使用的 Nginx,转而采用其内部开发…...
Java多线程实现之Thread类深度解析
Java多线程实现之Thread类深度解析 一、多线程基础概念1.1 什么是线程1.2 多线程的优势1.3 Java多线程模型 二、Thread类的基本结构与构造函数2.1 Thread类的继承关系2.2 构造函数 三、创建和启动线程3.1 继承Thread类创建线程3.2 实现Runnable接口创建线程 四、Thread类的核心…...

【Redis】笔记|第8节|大厂高并发缓存架构实战与优化
缓存架构 代码结构 代码详情 功能点: 多级缓存,先查本地缓存,再查Redis,最后才查数据库热点数据重建逻辑使用分布式锁,二次查询更新缓存采用读写锁提升性能采用Redis的发布订阅机制通知所有实例更新本地缓存适用读多…...
LangFlow技术架构分析
🔧 LangFlow 的可视化技术栈 前端节点编辑器 底层框架:基于 (一个现代化的 React 节点绘图库) 功能: 拖拽式构建 LangGraph 状态机 实时连线定义节点依赖关系 可视化调试循环和分支逻辑 与 LangGraph 的深…...
API网关Kong的鉴权与限流:高并发场景下的核心实践
🔥「炎码工坊」技术弹药已装填! 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 引言 在微服务架构中,API网关承担着流量调度、安全防护和协议转换的核心职责。作为云原生时代的代表性网关,Kong凭借其插件化架构…...
[USACO23FEB] Bakery S
题目描述 Bessie 开了一家面包店! 在她的面包店里,Bessie 有一个烤箱,可以在 t C t_C tC 的时间内生产一块饼干或在 t M t_M tM 单位时间内生产一块松糕。 ( 1 ≤ t C , t M ≤ 10 9 ) (1 \le t_C,t_M \le 10^9) (1≤tC,tM≤109)。由于空间…...
鸿蒙HarmonyOS 5军旗小游戏实现指南
1. 项目概述 本军旗小游戏基于鸿蒙HarmonyOS 5开发,采用DevEco Studio实现,包含完整的游戏逻辑和UI界面。 2. 项目结构 /src/main/java/com/example/militarychess/├── MainAbilitySlice.java // 主界面├── GameView.java // 游戏核…...