SpringBoot3集成Kafka
标签:Kafka3.Kafka-eagle3;
一、简介
Kafka是一个开源的分布式事件流平台,常被用于高性能数据管道、流分析、数据集成和关键任务应用,基于Zookeeper协调的处理平台,也是一种消息系统,具有更好的吞吐量、内置分区、复制和容错,这使得它成为大规模消息处理应用程序的一个很好的解决方案;
二、环境搭建
1、Kafka部署
1、下载安装包:kafka_2.13-3.5.0.tgz2、配置环境变量open -e ~/.bash_profileexport KAFKA_HOME=/本地路径/kafka3.5
export PATH=$PATH:$KAFKA_HOME/binsource ~/.bash_profile3、该目录【kafka3.5/bin】启动zookeeper
zookeeper-server-start.sh ../config/zookeeper.properties4、该目录【kafka3.5/bin】启动kafka
kafka-server-start.sh ../config/server.properties
2、Kafka测试
1、生产者
kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
>id-1-message
>id-2-message2、消费者
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic
id-1-message
id-2-message3、查看topic列表
kafka-topics.sh --bootstrap-server localhost:9092 --list
test-topic4、查看消息列表
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --partition 0
id-1-message
id-2-message
3、可视化工具
配置和部署
1、下载安装包:kafka-eagle-bin-3.0.2.tar.gz2、配置环境变量open -e ~/.bash_profileexport KE_HOME=/本地路径/efak-web-3.0.2
export PATH=$PATH:$KE_HOME/binsource ~/.bash_profile3、修改配置文件:system-config.propertiesefak.zk.cluster.alias=cluster1
cluster1.zk.list=localhost:2181
efak.url=jdbc:mysql://127.0.0.1:3306/kafka-eagle4、本地新建数据库:kafka-eagle,注意用户名和密码是否一致5、启动命令
efak-web-3.0.2/bin/ke.sh start
命令语法: ./ke.sh {start|stop|restart|status|stats|find|gc|jdk|version|sdate|cluster}6、本地访问【localhost:8048】 username:admin password:123456

KSQL语句测试
select * from `test-topic` where `partition` in (0) order by `date` desc limit 5

select * from `test-topic` where `partition` in (0) and msg like '%5%' order by `date` desc limit 3

三、工程搭建
1、工程结构

2、依赖管理
这里关于依赖的管理就比较复杂了,首先spring-kafka组件选择与boot框架中spring相同的依赖,即6.0.10版本,在spring-kafka最近的版本中3.0.8符合;
但是该版本使用的是kafka-clients组件的3.3.2版本,在Spring文档的kafka模块中,明确说明spring-boot:3.1要使用kafka-clients:3.4,所以从spring-kafka组件中排除掉,重新依赖kafka-clients组件;
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>${spring-kafka.version}</version><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions>
</dependency>
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka-clients.version}</version>
</dependency>
3、配置文件
配置kafka连接地址,监听器的消息应答机制,消费者的基础模式;
spring:# kafka配置kafka:bootstrap-servers: localhost:9092listener:missing-topics-fatal: falseack-mode: manual_immediateconsumer:group-id: boot-kafka-groupenable-auto-commit: falsemax-poll-records: 10properties:max.poll.interval.ms: 3600000
四、基础用法
1、消息生产
模板类KafkaTemplate用于执行高级的操作,封装各种消息发送的方法,在该方法中,通过topic和key以及消息主体,实现消息的生产;
@RestController
public class ProducerWeb {@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("/send/msg")public String sendMsg (){try {// 构建消息主体JsonMapper jsonMapper = new JsonMapper();String msgBody = jsonMapper.writeValueAsString(new MqMsg(7,"boot-kafka-msg"));// 发送消息kafkaTemplate.send("boot-kafka-topic","boot-kafka-key",msgBody);} catch (JsonProcessingException e) {e.printStackTrace();}return "OK" ;}
}
2、消息消费
编写消息监听类,通过KafkaListener注解控制监听的具体信息,在实现消息生产和消费的方法测试后,使用可视化工具kafka-eagle查看topic和消息列表;
@Component
public class ConsumerListener {private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class);@KafkaListener(topics = "boot-kafka-topic")public void listenUser (ConsumerRecord<?,String> record, Acknowledgment acknowledgment) {try {String key = String.valueOf(record.key());String body = record.value();log.info("\n=====\ntopic:boot-kafka-topic,key{},body:{}\n=====\n",key,body);} catch (Exception e){e.printStackTrace();} finally {acknowledgment.acknowledge();}}
}

五、参考源码
文档仓库:
https://gitee.com/cicadasmile/butte-java-note源码仓库:
https://gitee.com/cicadasmile/butte-spring-parent
相关文章:
SpringBoot3集成Kafka
标签:Kafka3.Kafka-eagle3; 一、简介 Kafka是一个开源的分布式事件流平台,常被用于高性能数据管道、流分析、数据集成和关键任务应用,基于Zookeeper协调的处理平台,也是一种消息系统,具有更好的吞吐量、内…...
css学习1
1、样式定义如何显示元素。 2、样式通常保存至外部的css文件中。 3、样式可以使内容与表现分离。 4、css主要有两部分组成:选择器与一条或多条声明。 选择器通常为要改变的html元素,每条声明由一个属性和一个值组成。每个属性有一个值,属性…...
rust踩雷笔记(1)——切片传参和解引用赋值
最近学习rust,网上资料还是很有限,做题遇到的问题,有时需要自己试验。把自己做题过程遇到的问题,和试验的结论,做一些简单记录。 阅读下列文字和代码 用切片(的引用)做参数要非常小心ÿ…...
安全 1自测
常见对称加密算法: DES(Data Encryption Standard):数据加密标准,速度较快,适用于加密大量数据的场合; 3DES(Triple DES):是基于DES,对一块数据用…...
寻路算法小游戏
寻路算法小demo 寻路算法有两种,一种是dfs 深度优先算法,一种是 dfs 深度优先算法 深度优先搜索的步骤分为 1.递归下去 2.回溯上来。顾名思义,深度优先,则是以深度为准则,先一条路走到底,直到达到目标。这…...
CSS基础 知识点总结
一.CSS简介 1.1 CSS简介 ① CSS指的是层叠样式表,用来控制网页外观的一门技术 ② CSS发展至今,经历过CSS1.0 CSS2.0 CSS2.1 CSS3.0这几个版本,CSS3.0是CSS最新版本 1.2 CSS引入方式 ① 在一个页面引入CSS,共有三种方式 外部…...
自动执行探索性数据分析 (EDA),更快、更轻松地理解数据
一、说明 EDA是 exploratory data analysis (探索性数据分析 )的缩写。所谓EDA就是在数据分析之前需要对数据进行以此系统性研判,在这个研判后,得到基本的数据先验知识,在这个基础上进行数据分析。本文将在R语言和python语言的探索性处理。 摄…...
【自定义系统服务】【android13】添加自定义java系统服务
背景 在平时的业务开发中,我们往往需要开发自定义的系统服务来处理自己特殊的需求,这里介绍的是添加自定义的Java系统服务,可以在系统App中直接调用 定义aidl Binder默认可以传输基本类型的数据,如果要传递类对象,则这个类需要实现序列化。我们先定义一个序列化的自定义…...
【Sklearn】基于随机梯度下降算法的数据分类预测(Excel可直接替换数据)
【Sklearn】基于随机梯度下降算法的数据分类预测(Excel可直接替换数据) 1.模型原理2.模型参数3.文件结构4.Excel数据5.下载地址6.完整代码7.运行结果1.模型原理 随机梯度下降(Stochastic Gradient Descent,SGD)是一种优化算法,用于训练模型的参数以最小化损失函数。在分…...
44、TCP报文(二)
接上节内容,本节我们继续TCP报文首部字段含义的学习。上节为止我们学习到“数据偏移”和“保留”字段。接下来我们学习后面的一些字段(暂不包含“检验和”的计算方法和选项字段)。 TCP首部结构(续) “数据偏移”和“保…...
目标检测(Object Detection)
文章目录 1. 目标检测1.1 目标检测简要概述及名词解释1.2 IOU1.3 TP TN FP FN1.4 precision(精确度)和recall(召回率) 2. 边框回归Bounding-Box regression3. Faster R-CNN3.1 Faster-RCNN:conv layer3.2 Faster-RCNN&…...
vue中实现文字检索时候将搜索内容标红
实现结果 html: <div class"searchBox"><span class"bt">标  题</span><div class"search"><div class"shuru"><!-- <span class"title">生产经营<…...
PCL protocol composition logic
PCL 协议组合逻辑 一 主体(principal)和线程(thread)的区分 1.主体:指 **协议的参与者,用X^来表示。**每个主体可以扮演一个或多个角色,如 InitCR和RespCR ; 2.线程:主…...
聊聊看React和Vue的区别
Vue 更适合小项目,React 更适合大公司大项目; Vue 的学习成本较低,很容易上手,但项目质量不能保证...... 真的是这样吗?借助本篇文章,我们来从一些方面的比较来客观的去看这个问题。 论文档的丰富性 从两个…...
OSPF在广播类型的网络拓扑中DR和BDR的选举
指定路由器(DR): 一个网段上的其他路由器都和指定路由器(DR)构成邻接关系,而不是它们互相之间构成邻接关系。 备份指定路由器(BDR): 当DR出现问题,由BDR接…...
系统学习Linux-Mariadb高可用MHA
概念 MHA(MasterHigh Availability)是一套优秀的MySQL高可用环境下故障切换和主从复制的软件。 MHA 的出现就是解决MySQL 单点的问题。 MySQL故障切换过程中,MHA能做到0-30秒内自动完成故障切换操作。 MHA能在故障切换的过程中最大程度上…...
慢SQL的原因
如何排查慢SQL问题 识别慢SQL:使用数据库性能监控工具,如慢SQL日志,识别耗时较长的查询。执行计划分析:使用数据库提供的分析工具,例如EXPLAIN来查看查询的执行计划,判断是否存在全表扫描,索引…...
php正则替换文章的图片
要使用正则表达式替换文章中的图片链接,可以按照以下步骤进行操作: 1. 获取文章内容:首先,你需要获取包含图片链接的文章内容。你可以从文件中读取文章,或者从数据库中检索文章内容。 2. 使用正则表达式匹配图片链接…...
57 | TAPTAP客户端分析
TAPTAP客户端分析 一、用户群分析 首先,TapTap用户群可分为三大类: 游戏爱好者游戏发烧者游戏开发者(次要用户,有开发者后台,可以显示数据,不重点分析)注:爱好者与发烧者区别在于,前者是用空余时间来玩游戏,时间不如后者充足,且后者更执着于游戏,游戏种类更多。 …...
开源了一套基于springboot+vue+uniapp的商城,包含分类、sku、商户管理、分销、会员、适合企业或个人二次开发
RuoYi-Mall-JAVA商城-电商系统简介 开源了一套基于若依框架,SringBoot2MybatisPlusSpringSecurityjwtredisVueUniapp的前后端分离的商城系统, 包含分类、sku、商户管理、分销、会员、适合企业或个人二次开发。 前端采用Vue、Element UI(ant…...
uniapp 对接腾讯云IM群组成员管理(增删改查)
UniApp 实战:腾讯云IM群组成员管理(增删改查) 一、前言 在社交类App开发中,群组成员管理是核心功能之一。本文将基于UniApp框架,结合腾讯云IM SDK,详细讲解如何实现群组成员的增删改查全流程。 权限校验…...
Objective-C常用命名规范总结
【OC】常用命名规范总结 文章目录 【OC】常用命名规范总结1.类名(Class Name)2.协议名(Protocol Name)3.方法名(Method Name)4.属性名(Property Name)5.局部变量/实例变量(Local / Instance Variables&…...
Rust 异步编程
Rust 异步编程 引言 Rust 是一种系统编程语言,以其高性能、安全性以及零成本抽象而著称。在多核处理器成为主流的今天,异步编程成为了一种提高应用性能、优化资源利用的有效手段。本文将深入探讨 Rust 异步编程的核心概念、常用库以及最佳实践。 异步编程基础 什么是异步…...
Springboot社区养老保险系统小程序
一、前言 随着我国经济迅速发展,人们对手机的需求越来越大,各种手机软件也都在被广泛应用,但是对于手机进行数据信息管理,对于手机的各种软件也是备受用户的喜爱,社区养老保险系统小程序被用户普遍使用,为方…...
蓝桥杯 冶炼金属
原题目链接 🔧 冶炼金属转换率推测题解 📜 原题描述 小蓝有一个神奇的炉子用于将普通金属 O O O 冶炼成为一种特殊金属 X X X。这个炉子有一个属性叫转换率 V V V,是一个正整数,表示每 V V V 个普通金属 O O O 可以冶炼出 …...
9-Oracle 23 ai Vector Search 特性 知识准备
很多小伙伴是不是参加了 免费认证课程(限时至2025/5/15) Oracle AI Vector Search 1Z0-184-25考试,都顺利拿到certified了没。 各行各业的AI 大模型的到来,传统的数据库中的SQL还能不能打,结构化和非结构的话数据如何和…...
【Post-process】【VBA】ETABS VBA FrameObj.GetNameList and write to EXCEL
ETABS API实战:导出框架元素数据到Excel 在结构工程师的日常工作中,经常需要从ETABS模型中提取框架元素信息进行后续分析。手动复制粘贴不仅耗时,还容易出错。今天我们来用简单的VBA代码实现自动化导出。 🎯 我们要实现什么? 一键点击,就能将ETABS中所有框架元素的基…...
WEB3全栈开发——面试专业技能点P7前端与链上集成
一、Next.js技术栈 ✅ 概念介绍 Next.js 是一个基于 React 的 服务端渲染(SSR)与静态网站生成(SSG) 框架,由 Vercel 开发。它简化了构建生产级 React 应用的过程,并内置了很多特性: ✅ 文件系…...
2.3 物理层设备
在这个视频中,我们要学习工作在物理层的两种网络设备,分别是中继器和集线器。首先来看中继器。在计算机网络中两个节点之间,需要通过物理传输媒体或者说物理传输介质进行连接。像同轴电缆、双绞线就是典型的传输介质,假设A节点要给…...
算法刷题-回溯
今天给大家分享的还是一道关于dfs回溯的问题,对于这类问题大家还是要多刷和总结,总体难度还是偏大。 对于回溯问题有几个关键点: 1.首先对于这类回溯可以节点可以随机选择的问题,要做mian函数中循环调用dfs(i&#x…...
