Kafka集群搭建与SpringBoot项目集成
本篇文章的目的是帮助Kafka初学者快速搭建一个Kafka集群,以及怎么在SpringBoot项目中使用Kafka。
kafka集群环境包地址:百度网盘 请输入提取码 提取码:x9yn

一、Kafka集群搭建
1、准备环境
(1)准备三台LINUX服务器:
xxx.xxx.xxx.1
xxx.xxx.xxx.2
xxx.xxx.xxx.3
(2)jdk版本大于1.8即可,我是1.8.0_181
(3)在三台服务器上创建用户admin,将环境放到admin用户下,嫌麻烦的同学也可以直接使用root用户安装(真实生产上不建议这么做)
tips:LINUX怎么给普通用户赋文件夹操作权限?
- 切换到root用户
- 使用chown -R admin:admin /home/admin命令
- 执行su - admin命令就可以切换用户并定位到/home/admin下
(4)一定要关闭三台服务器的防火墙,不然安装肯定会出问题,切记!这个真的很重要!
2、搭建Zookeeper集群
(1)解压zookeeper-3.4.12.tar.gz,进入zookeeper文件夹

(2)进入conf文件夹
1)复制zoo.cfg文件 cp zoo.cfg zoo_sample.cfg2)修改zoo.cfg文件 vim zoo.cfg

这里的3个IP的作用如下:
2181:对cline端提供服务
3888:选举leader使用
2888:集群内机器通讯使用(Leader监听此端口)
(3)进入data文件夹,若没有自己创建一个
在data文件夹下创建myid文件,三台机器分别填入server对应的ID,这里我是1、2、3

(4)启动zookeeper集群
- 1. 启动ZK服务: sh bin/zkServer.sh start
- 2. 查看ZK服务状态: sh bin/zkServer.sh status
- 3. 停止ZK服务: sh bin/zkServer.sh stop
- 4. 重启ZK服务: sh bin/zkServer.sh restart
(5)三台机器都需要重复上述操作,注意myid中的ID要对应
3、搭建Kafka集群
(1)解压kafka_2.12-2.5.0.tgz,进入kafka文件夹

(2)进入config文件夹,修改 server.properties内容
# Kafka使用唯一的一个整数来标识每个broker,该参数默认是-1。如果不指定,kafka会自动生成一个唯一值
broker.id=1
# broker监听器的CSV列表,格式是[协议]://[主机名]:[端口]。
listeners=PLAINTEXT://xxx.xxx.xxx.1:9092
# 非常重要的参数!该参数指定了kafka持久化消息的目录。该参数可以设置多个目录,以逗号分隔,比如/home/kafka1,/home/kafka2,多目录的做法是推荐的
log.dirs=/tmp/kafka-logs
# 同样是很重要的参数!这个参数完全没有默认值,是必须要自己设置的
zookeeper.connect=xxx.xxx.xxx.1:2181,xxx.xxx.xxx.2:2181,xxx.xxx.xxx.3:2181
# 是否开启unclean leader选举。由于开始可能不能保证数据一致性,所以设置为false
unclean.leader.election.enable=false
# topic 在当前 broker 上的分区个数
num.partitions=1
# 用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
# segment文件保留的最长时间,超时将被删除
log.retention.hours=16
# 删除 topic 功能使能 ( 允许删除数据 ) ( 手动指定 )
delete.topic.enable=true
# 处理网络请求的线程数量
num.network.threads=3
# 用来处理磁盘 IO 的线程数量
num.io.threads=8
# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
# 接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
# 请求套接字的缓冲区大小
socket.request.max.bytes=10485760
(3)配置环境变量
vim ~/.bash_profile
# KAFKA_HOME
export KAFKA_HOME=/export/servers/kafka_2.11-0.11.0.0
export PATH=$PATH:$KAFKA_HOME/bin
(4)启动kafka集群
启动 :bin/kafka-server-start.sh config/server.properties &
关闭 :bin/kafka-server-stop.sh stop
二、使用kafkatool工具操作Kafka
这里提供一篇详细操作:https://www.cnblogs.com/frankdeng/p/9452982.html

三、Kafka与SpringBoot集成
1、pom.xml导入
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.0.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.sunyard.bigdata</groupId><artifactId>springbootkafka</artifactId><version>0.0.1-SNAPSHOT</version><name>springbootkafka</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
2、application.properties配置
server.port=9001
spring.application.name=kafka#### kafka配置生产者 begin ####
# 指定kafka server的地址,集群配多个,中间,逗号隔开
spring.kafka.bootstrap-servers=xxx.xxx.xxx.1:9092,xxx.xxx.xxx.2:9092,xxx.xxx.xxx.3:9092
# 写入失败时,重试次数。当leader失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
spring.kafka.producer.retries=0
# 每次批量发送消息的数量,produce积累到一定数据,一次发送
spring.kafka.producer.batch-size=16384
# produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
spring.kafka.producer.buffer-memory=33554432
#procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
#可以设置的值为:all, -1, 0, 1
spring.kafka.producer.acks=1
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#### kafka配置生产者 end ######## kafka配置消费者 start ####
# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
spring.kafka.consumer.group-id=test1
# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
spring.kafka.consumer.auto-offset-reset=earliest
# enable.auto.commit:true --> 设置自动提交offset
spring.kafka.consumer.enable-auto-commit=true
#如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
spring.kafka.consumer.auto-commit-interval=1000
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#### kafka配置消费者 end ####
3、启动类代码
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;@SpringBootApplication
@EnableKafka
public class SpringbootkafkaApplication {public static void main(String[] args) {SpringApplication.run(SpringbootkafkaApplication.class, args);}
}
4、生产者代码
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.*;@RestController
@RequestMapping("/api/kafka/")
public class KafkaController {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;@GetMapping("send")@ResponseBodypublic boolean send(@RequestParam String message) {try {kafkaTemplate.send("test-topic", message);kafkaTemplate.send("test-topic2", message);System.out.println("消息发送成功...");} catch (Exception e) {e.printStackTrace();}return true;}@GetMapping("test")@ResponseBodypublic String test() {System.out.println("hello world!");return "ok";}
}
5、消费者代码
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class ConsumerListener {@KafkaListener(topics = "test-topic")public void onMessage1(String message) {System.out.println("我是第一个消费者:" + message);}@KafkaListener(topics = "test-topic2")public void onMessage2(String message) {System.out.println("我是第二个消费者:" + message);}
}
相关文章:
Kafka集群搭建与SpringBoot项目集成
本篇文章的目的是帮助Kafka初学者快速搭建一个Kafka集群,以及怎么在SpringBoot项目中使用Kafka。 kafka集群环境包地址:百度网盘 请输入提取码 提取码:x9yn 一、Kafka集群搭建 1、准备环境 (1)准备三台…...
一个简单的注册的页面,如有错误请指正;(3.JavaScript)
这段代码是一个JavaScript函数,实现了用户登录和上传图片的功能,并包含了一些辅助函数。让我一一解释: 1. login():这个函数用于登录操作。首先,通过$(#name).val()来获取ID为name的元素的值,同理获取其他…...
selenium (自动化概念 测试环境配置)
什么是自动化测试 自动化测试介绍 自动化测试指软件测试的自动化,在预设状态下运行应用程序或者系统. 预设条件包括正常和异常,最后评估运行结果。 自动化测试,就是将人为驱动的测试行为转化为机器执行的过程。 【机器 代替 人工】 自动化…...
Mybatis-Plus(企业实际开发应用)
一、Mybatis-Plus简介 MyBatis-Plus是MyBatis框架的一个增强工具,可以简化持久层代码开发MyBatis-Plus(简称 MP)是一个 MyBatis 的增强工具,在 MyBatis 的基础上只做增强不做改变,为简化开发、提高效率而生。 官网&a…...
Spring Web MVC入门
一:了解Spring Web MVC (1)关于Java开发 🌟Java开发大多数场景是业务开发 比如说京东的业务就是电商卖货、今日头条的业务就推送新闻;快手的业务就是短视频推荐 (2)Spring Web MVC的简单理解 💗Spring Web MVC:如何使…...
【C++】mapset的底层结构 -- AVL树(高度平衡二叉搜索树)
前面我们对 map / multimap / set / multiset 进行了简单的介绍,可以发现,这几个容器有个共同点是:其底层都是按照二叉搜索树来实现的。 但是二叉搜索树有其自身的缺陷,假如往树中插入的元素有序或者接近有序,二叉搜索…...
吴恩达《机器学习》1-4:无监督学习
一、无监督学习 无监督学习就像你拿到一堆未分类的东西,没有标签告诉你它们是什么,然后你的任务是自己找出它们之间的关系或者分成不同的组,而不依赖于任何人给你关于这些东西的指导。 以聚类为例,无监督学习算法可以将数据点分成…...
一个简单的注册页面,如有错误请指正(2.css)
这段CSS代码定义了页面的样式,让我逐个解释其功能: 1. * {}:通配符选择器,用于将页面中的所有元素设置统一的样式。这里将margins和paddings设置为0,以去除默认的边距。 2. div img {}:选择页面中所有div…...
【Unity精华一记】特殊文件夹
👨💻个人主页:元宇宙-秩沅 👨💻 hallo 欢迎 点赞👍 收藏⭐ 留言📝 加关注✅! 👨💻 本文由 秩沅 原创 👨💻 收录于专栏:uni…...
Node.js中的单线程服务器
为了解决多线程服务器在高并发的I/O密集型应用中的不足,同时避免早期简单单线程服务器的性能障碍,Node.js采用了基于"事件循环"的非阻塞式单线程模型,实现了如下两个目标: (1)保证每个请求都可以…...
如何删除数组中的某个元素?
如何删除数组中的某个元素? 例:给你一个数组 nums 和一个值 val,你需要移除所有数值等于 val 的元素,并返回移除后数组的新长度。 三种方法 1.元素前移(时间复杂度:O(N^2),空间复杂度&#x…...
Apache ActiveMQ RCE漏洞复现(CNVD-2023-69477)
0x01 产品简介 ActiveMQ是一个开源的消息代理和集成模式服务器,它支持Java消息服务(JMS) API。它是Apache Software Foundation下的一个项目,用于实现消息中间件,帮助不同的应用程序或系统之间进行通信。 0x02 漏洞概述 Apache ActiveMQ 中存…...
【BUG】Nginx转发失败解决方案
最近在做项目的时候出现了一个问题,琢磨了好久,来浅浅记录一下。 这个项目后端使用的是gateway网关和nacos实现动态的路由,前端使用nginx来管理前端资源,大体流程:浏览器发起请求,经过nginx代理,…...
综合OA管理系统源码 OA系统源码
综合OA管理系统源码 OA系统源码 功能介绍: 编号:LQ10 一:系统管理 系统配置,功能模块,功能节点,权限角色,操作日志,备份数据,还原数据 二:基础数据 审批…...
9-MySQL提高数据管理效率(分库分表实践)
MySQL提高数据管理效率(分库分表实践) 在当今的互联网时代,随着业务规模的不断扩大,数据量也呈现出爆炸性的增长。如何有效地管理和存储这些数据,以及提高数据库的性能和可扩展性,成为了一个迫切需要解决的…...
经典卷积神经网络 - NIN
网络中的网络,NIN。 AlexNet和VGG都是先由卷积层构成的模块充分抽取空间特征,再由全连接层构成的模块来输出分类结果。但是其中的全连接层的参数量过于巨大,因此NiN提出用1*1卷积代替全连接层,串联多个由卷积层和“全连接”层构成…...
leetcode_2558 从数量最多的堆取走礼物
1. 题意 给定一个数组,每次从中取走最大的数,返回开根号向下取整送入堆中,最后计算总和。 从数量最多的堆取走礼物 2. 题解 直接用堆模拟即可 2.1 我的代码 用了额外的空间O( n ) priority_queue会自动调用make_heap() 、pop_heap() c…...
01. 嵌入式与人工智能是如何结合的?
CPU是Arm A57的 GPU是128cuda核 一.小车跟踪的需求和设计方法 比如有一个小车跟踪的项目。 需求是:小车识别出罪犯,然后去跟踪他。方法:摄像头采集到人之后传入到开发板,内部做一下识别,然后控制小车去跟随。在人工智…...
vue3.0运行npm run dev 报错Cannot find module node:url
vue3.0运行npm run dev 报错Cannot find module 问题背景 近期用vue3.0写项目,npm init vuelatest —> npm install 都正常,npm run dev的时候报错如下: failed to load config from F:\code\testVue\vue-demo\vite.config.js error when starting…...
26. 删除排序数组中的重复项、Leetcode的Python实现
博客主页:🏆看看是李XX还是李歘歘 🏆 🌺每天分享一些包括但不限于计算机基础、算法等相关的知识点🌺 💗点关注不迷路,总有一些📖知识点📖是你想要的💗 ⛽️今…...
C++实现分布式网络通信框架RPC(3)--rpc调用端
目录 一、前言 二、UserServiceRpc_Stub 三、 CallMethod方法的重写 头文件 实现 四、rpc调用端的调用 实现 五、 google::protobuf::RpcController *controller 头文件 实现 六、总结 一、前言 在前边的文章中,我们已经大致实现了rpc服务端的各项功能代…...
MongoDB学习和应用(高效的非关系型数据库)
一丶 MongoDB简介 对于社交类软件的功能,我们需要对它的功能特点进行分析: 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具: mysql:关系型数据库&am…...
连锁超市冷库节能解决方案:如何实现超市降本增效
在连锁超市冷库运营中,高能耗、设备损耗快、人工管理低效等问题长期困扰企业。御控冷库节能解决方案通过智能控制化霜、按需化霜、实时监控、故障诊断、自动预警、远程控制开关六大核心技术,实现年省电费15%-60%,且不改动原有装备、安装快捷、…...
【机器视觉】单目测距——运动结构恢复
ps:图是随便找的,为了凑个封面 前言 在前面对光流法进行进一步改进,希望将2D光流推广至3D场景流时,发现2D转3D过程中存在尺度歧义问题,需要补全摄像头拍摄图像中缺失的深度信息,否则解空间不收敛…...
鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院查看报告小程序
一、开发环境准备 工具安装: 下载安装DevEco Studio 4.0(支持HarmonyOS 5)配置HarmonyOS SDK 5.0确保Node.js版本≥14 项目初始化: ohpm init harmony/hospital-report-app 二、核心功能模块实现 1. 报告列表…...
成都鼎讯硬核科技!雷达目标与干扰模拟器,以卓越性能制胜电磁频谱战
在现代战争中,电磁频谱已成为继陆、海、空、天之后的 “第五维战场”,雷达作为电磁频谱领域的关键装备,其干扰与抗干扰能力的较量,直接影响着战争的胜负走向。由成都鼎讯科技匠心打造的雷达目标与干扰模拟器,凭借数字射…...
【论文阅读28】-CNN-BiLSTM-Attention-(2024)
本文把滑坡位移序列拆开、筛优质因子,再用 CNN-BiLSTM-Attention 来动态预测每个子序列,最后重构出总位移,预测效果超越传统模型。 文章目录 1 引言2 方法2.1 位移时间序列加性模型2.2 变分模态分解 (VMD) 具体步骤2.3.1 样本熵(S…...
Python 包管理器 uv 介绍
Python 包管理器 uv 全面介绍 uv 是由 Astral(热门工具 Ruff 的开发者)推出的下一代高性能 Python 包管理器和构建工具,用 Rust 编写。它旨在解决传统工具(如 pip、virtualenv、pip-tools)的性能瓶颈,同时…...
Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习)
Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习) 一、Aspose.PDF 简介二、说明(⚠️仅供学习与研究使用)三、技术流程总览四、准备工作1. 下载 Jar 包2. Maven 项目依赖配置 五、字节码修改实现代码&#…...
安宝特案例丨Vuzix AR智能眼镜集成专业软件,助力卢森堡医院药房转型,赢得辉瑞创新奖
在Vuzix M400 AR智能眼镜的助力下,卢森堡罗伯特舒曼医院(the Robert Schuman Hospitals, HRS)凭借在无菌制剂生产流程中引入增强现实技术(AR)创新项目,荣获了2024年6月7日由卢森堡医院药剂师协会࿰…...
