SpringBoot3集成Kafka
标签:Kafka3.Kafka-eagle3;
一、简介
Kafka是一个开源的分布式事件流平台,常被用于高性能数据管道、流分析、数据集成和关键任务应用,基于Zookeeper协调的处理平台,也是一种消息系统,具有更好的吞吐量、内置分区、复制和容错,这使得它成为大规模消息处理应用程序的一个很好的解决方案;
二、环境搭建
1、Kafka部署
1、下载安装包:kafka_2.13-3.5.0.tgz2、配置环境变量open -e ~/.bash_profileexport KAFKA_HOME=/本地路径/kafka3.5
export PATH=$PATH:$KAFKA_HOME/binsource ~/.bash_profile3、该目录【kafka3.5/bin】启动zookeeper
zookeeper-server-start.sh ../config/zookeeper.properties4、该目录【kafka3.5/bin】启动kafka
kafka-server-start.sh ../config/server.properties
2、Kafka测试
1、生产者
kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
>id-1-message
>id-2-message2、消费者
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic
id-1-message
id-2-message3、查看topic列表
kafka-topics.sh --bootstrap-server localhost:9092 --list
test-topic4、查看消息列表
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --partition 0
id-1-message
id-2-message
3、可视化工具
配置和部署
1、下载安装包:kafka-eagle-bin-3.0.2.tar.gz2、配置环境变量open -e ~/.bash_profileexport KE_HOME=/本地路径/efak-web-3.0.2
export PATH=$PATH:$KE_HOME/binsource ~/.bash_profile3、修改配置文件:system-config.propertiesefak.zk.cluster.alias=cluster1
cluster1.zk.list=localhost:2181
efak.url=jdbc:mysql://127.0.0.1:3306/kafka-eagle4、本地新建数据库:kafka-eagle,注意用户名和密码是否一致5、启动命令
efak-web-3.0.2/bin/ke.sh start
命令语法: ./ke.sh {start|stop|restart|status|stats|find|gc|jdk|version|sdate|cluster}6、本地访问【localhost:8048】 username:admin password:123456

KSQL语句测试
select * from `test-topic` where `partition` in (0) order by `date` desc limit 5

select * from `test-topic` where `partition` in (0) and msg like '%5%' order by `date` desc limit 3

三、工程搭建
1、工程结构

2、依赖管理
这里关于依赖的管理就比较复杂了,首先spring-kafka组件选择与boot框架中spring相同的依赖,即6.0.10版本,在spring-kafka最近的版本中3.0.8符合;
但是该版本使用的是kafka-clients组件的3.3.2版本,在Spring文档的kafka模块中,明确说明spring-boot:3.1要使用kafka-clients:3.4,所以从spring-kafka组件中排除掉,重新依赖kafka-clients组件;
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>${spring-kafka.version}</version><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions>
</dependency>
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka-clients.version}</version>
</dependency>
3、配置文件
配置kafka连接地址,监听器的消息应答机制,消费者的基础模式;
spring:# kafka配置kafka:bootstrap-servers: localhost:9092listener:missing-topics-fatal: falseack-mode: manual_immediateconsumer:group-id: boot-kafka-groupenable-auto-commit: falsemax-poll-records: 10properties:max.poll.interval.ms: 3600000
四、基础用法
1、消息生产
模板类KafkaTemplate用于执行高级的操作,封装各种消息发送的方法,在该方法中,通过topic和key以及消息主体,实现消息的生产;
@RestController
public class ProducerWeb {@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("/send/msg")public String sendMsg (){try {// 构建消息主体JsonMapper jsonMapper = new JsonMapper();String msgBody = jsonMapper.writeValueAsString(new MqMsg(7,"boot-kafka-msg"));// 发送消息kafkaTemplate.send("boot-kafka-topic","boot-kafka-key",msgBody);} catch (JsonProcessingException e) {e.printStackTrace();}return "OK" ;}
}
2、消息消费
编写消息监听类,通过KafkaListener注解控制监听的具体信息,在实现消息生产和消费的方法测试后,使用可视化工具kafka-eagle查看topic和消息列表;
@Component
public class ConsumerListener {private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class);@KafkaListener(topics = "boot-kafka-topic")public void listenUser (ConsumerRecord<?,String> record, Acknowledgment acknowledgment) {try {String key = String.valueOf(record.key());String body = record.value();log.info("\n=====\ntopic:boot-kafka-topic,key{},body:{}\n=====\n",key,body);} catch (Exception e){e.printStackTrace();} finally {acknowledgment.acknowledge();}}
}

五、参考源码
文档仓库:
https://gitee.com/cicadasmile/butte-java-note源码仓库:
https://gitee.com/cicadasmile/butte-spring-parent
相关文章:
SpringBoot3集成Kafka
标签:Kafka3.Kafka-eagle3; 一、简介 Kafka是一个开源的分布式事件流平台,常被用于高性能数据管道、流分析、数据集成和关键任务应用,基于Zookeeper协调的处理平台,也是一种消息系统,具有更好的吞吐量、内…...
css学习1
1、样式定义如何显示元素。 2、样式通常保存至外部的css文件中。 3、样式可以使内容与表现分离。 4、css主要有两部分组成:选择器与一条或多条声明。 选择器通常为要改变的html元素,每条声明由一个属性和一个值组成。每个属性有一个值,属性…...
rust踩雷笔记(1)——切片传参和解引用赋值
最近学习rust,网上资料还是很有限,做题遇到的问题,有时需要自己试验。把自己做题过程遇到的问题,和试验的结论,做一些简单记录。 阅读下列文字和代码 用切片(的引用)做参数要非常小心ÿ…...
安全 1自测
常见对称加密算法: DES(Data Encryption Standard):数据加密标准,速度较快,适用于加密大量数据的场合; 3DES(Triple DES):是基于DES,对一块数据用…...
寻路算法小游戏
寻路算法小demo 寻路算法有两种,一种是dfs 深度优先算法,一种是 dfs 深度优先算法 深度优先搜索的步骤分为 1.递归下去 2.回溯上来。顾名思义,深度优先,则是以深度为准则,先一条路走到底,直到达到目标。这…...
CSS基础 知识点总结
一.CSS简介 1.1 CSS简介 ① CSS指的是层叠样式表,用来控制网页外观的一门技术 ② CSS发展至今,经历过CSS1.0 CSS2.0 CSS2.1 CSS3.0这几个版本,CSS3.0是CSS最新版本 1.2 CSS引入方式 ① 在一个页面引入CSS,共有三种方式 外部…...
自动执行探索性数据分析 (EDA),更快、更轻松地理解数据
一、说明 EDA是 exploratory data analysis (探索性数据分析 )的缩写。所谓EDA就是在数据分析之前需要对数据进行以此系统性研判,在这个研判后,得到基本的数据先验知识,在这个基础上进行数据分析。本文将在R语言和python语言的探索性处理。 摄…...
【自定义系统服务】【android13】添加自定义java系统服务
背景 在平时的业务开发中,我们往往需要开发自定义的系统服务来处理自己特殊的需求,这里介绍的是添加自定义的Java系统服务,可以在系统App中直接调用 定义aidl Binder默认可以传输基本类型的数据,如果要传递类对象,则这个类需要实现序列化。我们先定义一个序列化的自定义…...
【Sklearn】基于随机梯度下降算法的数据分类预测(Excel可直接替换数据)
【Sklearn】基于随机梯度下降算法的数据分类预测(Excel可直接替换数据) 1.模型原理2.模型参数3.文件结构4.Excel数据5.下载地址6.完整代码7.运行结果1.模型原理 随机梯度下降(Stochastic Gradient Descent,SGD)是一种优化算法,用于训练模型的参数以最小化损失函数。在分…...
44、TCP报文(二)
接上节内容,本节我们继续TCP报文首部字段含义的学习。上节为止我们学习到“数据偏移”和“保留”字段。接下来我们学习后面的一些字段(暂不包含“检验和”的计算方法和选项字段)。 TCP首部结构(续) “数据偏移”和“保…...
目标检测(Object Detection)
文章目录 1. 目标检测1.1 目标检测简要概述及名词解释1.2 IOU1.3 TP TN FP FN1.4 precision(精确度)和recall(召回率) 2. 边框回归Bounding-Box regression3. Faster R-CNN3.1 Faster-RCNN:conv layer3.2 Faster-RCNN&…...
vue中实现文字检索时候将搜索内容标红
实现结果 html: <div class"searchBox"><span class"bt">标  题</span><div class"search"><div class"shuru"><!-- <span class"title">生产经营<…...
PCL protocol composition logic
PCL 协议组合逻辑 一 主体(principal)和线程(thread)的区分 1.主体:指 **协议的参与者,用X^来表示。**每个主体可以扮演一个或多个角色,如 InitCR和RespCR ; 2.线程:主…...
聊聊看React和Vue的区别
Vue 更适合小项目,React 更适合大公司大项目; Vue 的学习成本较低,很容易上手,但项目质量不能保证...... 真的是这样吗?借助本篇文章,我们来从一些方面的比较来客观的去看这个问题。 论文档的丰富性 从两个…...
OSPF在广播类型的网络拓扑中DR和BDR的选举
指定路由器(DR): 一个网段上的其他路由器都和指定路由器(DR)构成邻接关系,而不是它们互相之间构成邻接关系。 备份指定路由器(BDR): 当DR出现问题,由BDR接…...
系统学习Linux-Mariadb高可用MHA
概念 MHA(MasterHigh Availability)是一套优秀的MySQL高可用环境下故障切换和主从复制的软件。 MHA 的出现就是解决MySQL 单点的问题。 MySQL故障切换过程中,MHA能做到0-30秒内自动完成故障切换操作。 MHA能在故障切换的过程中最大程度上…...
慢SQL的原因
如何排查慢SQL问题 识别慢SQL:使用数据库性能监控工具,如慢SQL日志,识别耗时较长的查询。执行计划分析:使用数据库提供的分析工具,例如EXPLAIN来查看查询的执行计划,判断是否存在全表扫描,索引…...
php正则替换文章的图片
要使用正则表达式替换文章中的图片链接,可以按照以下步骤进行操作: 1. 获取文章内容:首先,你需要获取包含图片链接的文章内容。你可以从文件中读取文章,或者从数据库中检索文章内容。 2. 使用正则表达式匹配图片链接…...
57 | TAPTAP客户端分析
TAPTAP客户端分析 一、用户群分析 首先,TapTap用户群可分为三大类: 游戏爱好者游戏发烧者游戏开发者(次要用户,有开发者后台,可以显示数据,不重点分析)注:爱好者与发烧者区别在于,前者是用空余时间来玩游戏,时间不如后者充足,且后者更执着于游戏,游戏种类更多。 …...
开源了一套基于springboot+vue+uniapp的商城,包含分类、sku、商户管理、分销、会员、适合企业或个人二次开发
RuoYi-Mall-JAVA商城-电商系统简介 开源了一套基于若依框架,SringBoot2MybatisPlusSpringSecurityjwtredisVueUniapp的前后端分离的商城系统, 包含分类、sku、商户管理、分销、会员、适合企业或个人二次开发。 前端采用Vue、Element UI(ant…...
Qt Network 模块中的 TCP/IP 网络编程详解
Qt 是一个功能强大的跨平台 C 框架,其 Qt Network 模块为应用程序提供了丰富的网络通信能力,极大地简化了网络编程的复杂性。在众多网络协议中,TCP/IP 协议栈是互联网通信的基础,Qt Network 提供了 QTcpSocket 和 QTcpServer 等类…...
基于宝塔面板与Docker Compose快速部署Dify最新版实战指南
1. 为什么选择宝塔Docker Compose部署Dify? 最近在帮几个创业团队搭建AI开发环境时,发现很多小伙伴都被复杂的部署流程劝退。传统的手动部署方式需要逐个安装Python、Redis、PostgreSQL等依赖,光是版本兼容问题就能折腾大半天。直到上个月我…...
OpenCode应用案例:搭建企业内部代码审查助手,提升开发效率
OpenCode应用案例:搭建企业内部代码审查助手,提升开发效率 1. 项目背景与痛点分析 在软件开发团队中,代码审查是保证代码质量的关键环节。然而传统人工审查方式面临诸多挑战: 时间成本高:资深工程师需要花费大量时间…...
Unity资源提取技术解密:AssetRipper效能革命与实战指南
Unity资源提取技术解密:AssetRipper效能革命与实战指南 【免费下载链接】AssetRipper GUI Application to work with engine assets, asset bundles, and serialized files 项目地址: https://gitcode.com/GitHub_Trending/as/AssetRipper 在游戏开发迭代加速…...
Qwen3-TTS快速部署教程:一键启动Web服务,3分钟开始声音克隆
Qwen3-TTS快速部署教程:一键启动Web服务,3分钟开始声音克隆 1. 为什么选择Qwen3-TTS进行语音克隆 想象一下这样的场景:你需要为海外客户录制多语言产品介绍,但雇佣专业配音演员成本高昂;或者想为自己的视频内容添加个…...
解锁JSON Viewer 3大效率黑科技:从数据解析到开发提效的全流程解决方案
解锁JSON Viewer 3大效率黑科技:从数据解析到开发提效的全流程解决方案 【免费下载链接】json-viewer It is a Chrome extension for printing JSON and JSONP. 项目地址: https://gitcode.com/gh_mirrors/js/json-viewer JSON Viewer是一款专为开发者打造的…...
【ArkTS】基础语法
一、ArkTS 语言简介 ArkTS 是一种设计用于构建高性能应用的编程语言。它在继承 TypeScript 语法的基础上进行了优化,以提供更高的性能和开发效率。 许多编程语言在设计之初未考虑移动设备,导致应用运行缓慢、低效且功耗大。随着移动设备在日常生活中越来越普遍,针对移动环境…...
【Leetcode LCR 112】【记忆化搜索】矩阵中的最长递增路径
题目跳转 这一道题十分有意思(bushi),我们来一起看一下 1.题目考点与理解 主要考点: 记忆化搜索DFS 的递归思想与状态定义方向遍历与边界合法性判断 主要理解: 重要理解1 : 不一定要从最小的111开始,每一个都需要遍历(贪心思想错误) 重要理解2&#…...
如何打破微信单设备限制:WeChatPad终极指南
如何打破微信单设备限制:WeChatPad终极指南 【免费下载链接】WeChatPad 强制使用微信平板模式 项目地址: https://gitcode.com/gh_mirrors/we/WeChatPad 你是不是也遇到过这样的尴尬时刻?在电脑上登录微信工作,手机上的微信就被迫下线…...
告别双流!用Vision Transformer (ViT) 搭建单流目标跟踪器OSTrack,实测速度提升40%
单流目标跟踪新范式:ViT驱动的OSTrack实战解析 在计算机视觉领域,目标跟踪技术正经历着从传统双流架构向单流范式的革命性转变。当我们面对复杂场景中的实时跟踪需求时,传统方法的性能瓶颈日益凸显——特征提取与关系建模的割裂处理导致计算冗…...
