当前位置: 首页 > news >正文

Springboot集成kafka(环境搭建+演示)|超级详细,建议收藏

Springboot集成kafka

  • 一、前言🔥
  • 二、环境说明🔥
  • 三、概念🔥
  • 四、CentOS7安装kafka🔥
    • 1.下载kafka安装包
    • 2.下载好后,进行解压
  • 六、kafka项目集成🔥
    • 1️⃣pom引入
    • 2️⃣配置kafka
    • 3️⃣一个kafka消息发送端
    • 4️⃣定义一个kafka消息消费端
    • 5️⃣定义一个Controller进行测试
    • 6️⃣测试结果如下

一、前言🔥

上一期,我是带着大家入门了SpringBoot整合WebSocket,今天我再来一期kafka的零基础教学吧。不知道大家对kafka有多少了解,反正我就是从搭建开始,然后再加一个简单演示,这就算是带着大家了个门哈,剩下的我再后边慢慢出教程给大家说。

二、环境说明🔥

演示环境:idea2021 + springboot 2.3.1REALSE + CentOS7 + kafka

三、概念🔥

kafka是linkedin开源的分布式发布-订阅消息系统,目前归属于Apache的顶级项目。主要特点是基于pull模式来处理消息消费,追求高吞吐量,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统。

一开始的目的是日志的收集和传输。0.8版本开始支持复制,不支持事务,对消息的丢失,重复,错误没有严格要求 适用于产生大量数据的互联网服务的数据收集业务。在廉价的服务器上都能有很高的性能,这个主要是基于操作系统底层的pagecache,不用内存胜似使用内存。

综上所述,kafka是一款开源的消息引擎系统(消息队列/消息中间件) 分布式流处理平台

四、CentOS7安装kafka🔥

1.下载kafka安装包

下载地址:https://kafka.apache.org/downloads.html
CSDN:kafka_2.12-2.2.1.zip

2.下载好后,进行解压

通过ftp将kafka安装包kafka_2.11-0.9.0.1.tgz上传到服务器 /opt/monitor/kafka目录下
执行命令unzip kafka_2.12-2.2.1.zip 解压上传的kafka安装包

unzip kafka_2.12-2.2.1.zip 

在这里插入图片描述

输入命令ll查询解压情况

在这里插入图片描述
执行命令 cd /opt/monitor/kafka/kafka_2.12-2.2.1 进入kafka目录

 cd /opt/monitor/kafka/kafka_2.12-2.2.1

1 配置并启动zookeeper
执行命令 创建zookeeper日志文件存放路径

mkdir zklogs

在这里插入图片描述
执行命令 修改zookeeper的配置信息

vim config/zookeeper.properties

按一下键盘上的 i 键进入编辑模式,将光标移动到日志文件存放路径配置信息所在行,并修改dataDir=/opt/monitor/kafka/kafka_2.12-2.2.1/zklogs

dataDir=/opt/monitor/kafka/kafka_2.12-2.2.1/zklogs

在这里插入图片描述
修改好后按下键盘上的Esc 键后 输入:wq 并按下Enter键保存修改的信息并退出,注意这里的:也是要输入的

执行sh./zookeeper-server-start.sh ./config/zookeeper.properties & 命令后台启动zookeeper
在这里插入图片描述

注意这里提示报错权限不足,使用命令修改权限(个人建议把bin的权限全部修改成777)
chmod 777 zookeeper-server-start.sh
在这里插入图片描述
显示没有报错启动zookeeper成功

 sh ./zookeeper-server-start.sh  /opt/monitor/kafka/kafka_2.12-2.2.1/config/zookeeper.properties

在这里插入图片描述
执行命令ps -ef | grep zookeeper 查看zookeeper是否启动成功,出现类型如下信息表示成功启动
在这里插入图片描述
2 配置并启动kafka
执行命令 vim config/server.properties 修改kafka的配置信息
在这里插入图片描述
按一下键盘上的 i 键进入编辑模式,修改advertised.listeners=PLAINTEXT://外网IP:9092;
在这里插入图片描述
修改log.dirs=/opt/monitor/kafka/kafka_2.12-2.2.1/logs该参数为kafka日志文件存放路径
在这里插入图片描述
修改每个topic的默认分区参数num.partitions,默认是1,具体合适的取值需要根据服务器配置进程确定
在这里插入图片描述
修改完成后按下键盘上的Esc 键后 输入:wq 并按下Enter键 保存修改的信息并退出,注意这里的:也是要输入的.

cd /opt/monitor/kafka/kafka_2.12-2.2.1/bin #进入kafka启动目录
sh kafka-server-start.sh /opt/monitor/kafka/kafka_2.12-2.2.1/config/server.properties  #启动kafka服务指定配置文件

在这里插入图片描述
执行命令查看kafka是否启动成功

ps -ef | grep kafka  #查看kafka是否启动成功

在这里插入图片描述

六、kafka项目集成🔥

1️⃣pom引入

<!--kafka依赖-->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

2️⃣配置kafka


spring:kafka:bootstrap-servers: 127.0.0.1:9092producer:# 发生错误后,消息重发的次数。retries: 0#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 设置生产者内存缓冲区的大小。buffer-memory: 33554432# 键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializer# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。acks: 1consumer:# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5Dauto-commit-interval: 1S# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录auto-offset-reset: earliest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: false# 键的反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:# 在侦听器容器中运行的线程数。concurrency: 5#listner负责ack,每调用一次,就立即commitack-mode: manual_immediatemissing-topics-fatal: falseprofiles:active: dev
server:port: 8070

3️⃣一个kafka消息发送端

package com.suihao.kafka;import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;/*** @author suihao* @Title: KafkaProducer* @Description TODO* @date: 2023/03/03 17:* @version: V1.0*/
@Component
@Slf4j
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;//自定义topicpublic static final String TOPIC_TEST = "topic.test";//public static final String TOPIC_GROUP1 = "topic.group1";//public static final String TOPIC_GROUP2 = "topic.group2";public void send(Object obj) {String obj2String = JSONUtil.toJsonStr(obj);log.info("准备发送消息为:{}", obj2String);//发送消息ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST, obj);future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable throwable) {//发送失败的处理log.info(TOPIC_TEST + " - 生产者 发送消息失败:" + throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> stringObjectSendResult) {//成功的处理log.info(TOPIC_TEST + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());}});}
}

4️⃣定义一个kafka消息消费端

package com.suihao.kafka;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.util.Optional;/*** @author suihao* @Title: KafkaConsumer* @Description TODO* @date: 2023/03/03 17:* @version: V1.0*/
@Component
@Slf4j
public class KafkaConsumer {@KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = KafkaProducer.TOPIC_GROUP1)public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {Optional message = Optional.ofNullable(record.value());if (message.isPresent()) {Object msg = message.get();log.info("topic.group1 消费了: Topic:" + topic + ",Message:" + msg);ack.acknowledge();}}@KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = KafkaProducer.TOPIC_GROUP2)public void topic_test1(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {Optional message = Optional.ofNullable(record.value());if (message.isPresent()) {Object msg = message.get();log.info("topic.group2 消费了: Topic:" + topic + ",Message:" + msg);ack.acknowledge();}}
}

5️⃣定义一个Controller进行测试

package com.suihao.controller;import com.suihao.kafka.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;/*** @author suihao* @Title: KafkaController* @Description TODO* @date: 2023/03/03 17:* @version: V1.0*/
@RestController
public class KafkaController {@Autowiredprivate KafkaProducer kafkaProducer;@GetMapping("/send")public void sendMsg(){kafkaProducer.send("------------测试消息-----------");}
}

6️⃣测试结果如下

在这里插入图片描述
彩蛋: https://gitee.com/suihao666/SpringBoot-Kafka

最后送所有正在努力的大家一句话:

你不一定逆风翻盘,但一定要向阳而生。

期待下次发布好的文章:

山水相逢,我们江湖见。

相关文章:

Springboot集成kafka(环境搭建+演示)|超级详细,建议收藏

Springboot集成kafka一、前言&#x1f525;二、环境说明&#x1f525;三、概念&#x1f525;四、CentOS7安装kafka&#x1f525;1.下载kafka安装包2.下载好后&#xff0c;进行解压六、kafka项目集成&#x1f525;1️⃣pom引入2️⃣配置kafka3️⃣一个kafka消息发送端4️⃣定义一…...

Qt 绘制图表 - Qt Charts版

一、前言 自从 Qt 发布以来&#xff0c;给广大跨平台界面研发人员带来了无数的福利。但是Qt自己却一直没有提供自带的图表库&#xff0c;这就使得 QWT、QCustomPlot 等第三方图表库有了巨大的生存空间&#xff0c;为了降低开发成本&#xff0c;大家都涌向了这些第三方库。这种…...

Java学习笔记 --- JavaScript

一、JavaScript介绍 JavaScript语言诞生主要是完成页面的数据验证。因此它运行在客户端&#xff0c;需要运行浏览器来解析执行JavaScript代码。JS是Netcape网景公司的产品&#xff0c;最早取名为LiveScript&#xff1b;为了吸引更多java程序员。更名为 JavaScript JS是弱类型&…...

AP5216 平均电流型LED 降压恒流驱动器

产品描述 AP5216 是一款 PWM工作模式, 高效率、外围简单、内置功率管&#xff0c;适用于5V&#xff5e;100V输入的高精度降压 LED 恒流驱动芯片。输出最大功率可达 9W&#xff0c;最大电流 1.0A。 AP5216 可实现全亮/半亮功能切换&#xff0c;通过MODE 切换&#xff1a;全亮/…...

B站的多个视频教程,怎样生成一个二维码?

商业插画视频教程、电商运营视频教程、在线网课视频、舞蹈视频教程、摄影视频教程、语言学习教程、纪录片视频…所有你发布在哔哩哔哩上的视频&#xff0c;都可以放在一个二维码里面。 任何人只要扫描这个二维码&#xff0c;就能在线观看你的这些视频教程&#xff01;分享起来…...

深入底层源码的Listener内存马(内存马系列篇三)

写在前面 继前面的FilterServlet内存马技术&#xff0c;这是系列文章的第三篇了&#xff0c;这篇将给大家带来的是Listener内存马技术。 前置 什么是Listener&#xff1f; 监听器 Listener 是一个实现特定接口的 Java 程序&#xff0c;这个程序专门用于监听另一个 Java 对象…...

云端需求助力跑赢周期,金山办公有望借助ChatGPT加速腾飞

与微软在办公领域“搏杀”了三十年的金山办公&#xff0c;或许正在迎来自己的“第二春”。2月25日&#xff0c;金山办公&#xff08;688111&#xff09;发布2022年度业绩快报&#xff0c;全年营收38.85亿元人民币&#xff08;单位下同&#xff09;&#xff0c;同比增加18.44%&a…...

Vulnhub靶场----8、DC-8

文章目录一、环境搭建二、渗透流程三、思路总结一、环境搭建 DC-8下载地址&#xff1a;https://download.vulnhub.com/dc/DC-8.zip kali&#xff1a;192.168.144.148 DC-8&#xff1a;192.168.144.156 二、渗透流程 1、信息收集nmap -T5 -A -p- -sV -sT 192.168.144.156思路&am…...

Makefile 和 Shell 脚本的区别与联系

以下内容转载于博客Makefile 和 shell 脚本的区别与联系&#xff0c;有删改与内容添加。 参考内容&#xff1a;初学Makefile指南 一、什么是 Makefile&#xff1f; Makefile 描述了整个工程的编译、链接规则。当源码文件比较多的时候就不适合通过输入 gcc 命令来编译&#xf…...

java25种设计模式之工厂模式

Java设计模式 - 工厂模式 工厂模式是一种创建模式&#xff0c;因为此模式提供了更好的方法来创建对象。 在工厂模式中&#xff0c;我们创建对象而不将创建逻辑暴露给客户端。 例子 在以下部分中&#xff0c;我们将展示如何使用工厂模式创建对象。 由工厂模式创建的对象将是…...

力扣-2020年最后一次登录

大家好&#xff0c;我是空空star&#xff0c;本篇带大家了解一道简单的力扣sql练习题。 文章目录前言一、题目&#xff1a;1890. 2020年最后一次登录二、解题1.正确示范①提交SQL运行结果2.正确示范②提交SQL运行结果3.正确示范③提交SQL运行结果4.正确示范④提交SQL运行结果5.…...

[蓝桥杯] 数学与简单DP问题

文章目录 一、简单数学问题习题练习 1、1 买不到的数目 1、1、1 题目描述 1、1、2 题解关键思路与解答 1、2 饮料换购 1、2、1 题目描述 1、2、2 题解关键思路与解答 二、DP问题习题练习 2、1 背包问题 2、1、1 题目描述 2、1、2 题解关键思路与解答 2、2 摘花生 2、2、1 题目…...

浏览器的渲染过程解析

文章目录浏览器渲染进程有哪些&#xff1f;浏览器的渲染过程浏览器渲染进程有哪些&#xff1f; GUI线程&#xff1a;负责渲染浏览器页面&#xff0c;解析html&#xff0c;css&#xff0c;构建DOM树&#xff0c;CSS规则树&#xff0c;渲染树和绘制页面&#xff0c;当界面需要重…...

【C++容器】std::fstream读写文件错误【2023.03.03】

std::fstream使用细节 1.文件不存不支持时打开文件模式不得有ios::in • 如果文件不存在且打开时包括了ios::in模式则打开文件会失败。 fstream m_f;m_f.open("d://123.csv", ios::in | ios::out | ios::binary);//文件不存在则会打开失败• 我这边尝试行得通的做…...

UVM实战--带有寄存器的加法器

一.整体的设计结构图 这里将DUT换成加法器&#xff0c;可以理解为之前UVM加法器加上寄存器&#xff0c;这里总线的功能不做修改&#xff0c;目的看代码的移植那些部分需要修改。 二.各个组件代码详解 2.1 DUT module dut( input clk, input rst_n, input…...

笔记--学习mini3d代码

主要是记录学习mini3d代码时&#xff0c;查的资料&#xff1b; 从github下载的代码&#xff1a; GitHub - skywind3000/mini3d: 3D Software Renderer in 700 Lines !!3D Software Renderer in 700 Lines !! Contribute to skywind3000/mini3d development by creating an a…...

图片服务器

文章目录一、项目简介二、功能及场景三、业务设计四、数据库设计准备图片表准备实体类五、API设计常用功能封装文件上传文件上传获取图片列表接口获取图片内容删除图片接口六、项目优化七、测试自动化测试测试用例一、项目简介 图片服务器&#xff1a;解决项目中插入图片的问题…...

【JAVA程序设计】【C00110】基于SSM(非maven)的车辆维修管理系统

基于SSM&#xff08;非maven&#xff09;的车辆维修管理系统项目简介项目获取开发环境项目技术运行截图项目简介 基于ssm框架非maven开发的车辆维修管理系统共分为三个角色&#xff1a;管理员、用户 管理员角色包含以下功能&#xff1a; 查看用户、添加用户、查看车辆信息、故…...

微积分小课堂:用动态的眼光去找问题的最优解(最大值/最小值)【中学里的解题技巧】

文章目录 引言I 最优化问题1.1 不同形式的最优化1.2 用动态的眼光去找问题的最优解引言 把比较数大小的问题,变成了寻找函数变化拐点的问题,将这两个问题等同起来,需要发明一种工具,叫做导数。有了导数这个工具,求最大值问题就变成了解方程的问题。 用变化的眼光找到最优…...

网络爬虫和相关工具

在理想的状态下&#xff0c;所有ICP&#xff08;Internet Content Provider&#xff09;都应该为自己的网站提供API接口来共享它们允许其他程序获取的数据&#xff0c;在这种情况下爬虫就不是必需品&#xff0c;国内比较有名的电商平台&#xff08;如淘宝、京东等&#xff09;、…...

Prompt Tuning、P-Tuning、Prefix Tuning的区别

一、Prompt Tuning、P-Tuning、Prefix Tuning的区别 1. Prompt Tuning(提示调优) 核心思想:固定预训练模型参数,仅学习额外的连续提示向量(通常是嵌入层的一部分)。实现方式:在输入文本前添加可训练的连续向量(软提示),模型只更新这些提示参数。优势:参数量少(仅提…...

Unity3D中Gfx.WaitForPresent优化方案

前言 在Unity中&#xff0c;Gfx.WaitForPresent占用CPU过高通常表示主线程在等待GPU完成渲染&#xff08;即CPU被阻塞&#xff09;&#xff0c;这表明存在GPU瓶颈或垂直同步/帧率设置问题。以下是系统的优化方案&#xff1a; 对惹&#xff0c;这里有一个游戏开发交流小组&…...

iPhone密码忘记了办?iPhoneUnlocker,iPhone解锁工具Aiseesoft iPhone Unlocker 高级注册版​分享

平时用 iPhone 的时候&#xff0c;难免会碰到解锁的麻烦事。比如密码忘了、人脸识别 / 指纹识别突然不灵&#xff0c;或者买了二手 iPhone 却被原来的 iCloud 账号锁住&#xff0c;这时候就需要靠谱的解锁工具来帮忙了。Aiseesoft iPhone Unlocker 就是专门解决这些问题的软件&…...

LeetCode - 394. 字符串解码

题目 394. 字符串解码 - 力扣&#xff08;LeetCode&#xff09; 思路 使用两个栈&#xff1a;一个存储重复次数&#xff0c;一个存储字符串 遍历输入字符串&#xff1a; 数字处理&#xff1a;遇到数字时&#xff0c;累积计算重复次数左括号处理&#xff1a;保存当前状态&a…...

Java 加密常用的各种算法及其选择

在数字化时代&#xff0c;数据安全至关重要&#xff0c;Java 作为广泛应用的编程语言&#xff0c;提供了丰富的加密算法来保障数据的保密性、完整性和真实性。了解这些常用加密算法及其适用场景&#xff0c;有助于开发者在不同的业务需求中做出正确的选择。​ 一、对称加密算法…...

Caliper 配置文件解析:config.yaml

Caliper 是一个区块链性能基准测试工具,用于评估不同区块链平台的性能。下面我将详细解释你提供的 fisco-bcos.json 文件结构,并说明它与 config.yaml 文件的关系。 fisco-bcos.json 文件解析 这个文件是针对 FISCO-BCOS 区块链网络的 Caliper 配置文件,主要包含以下几个部…...

自然语言处理——循环神经网络

自然语言处理——循环神经网络 循环神经网络应用到基于机器学习的自然语言处理任务序列到类别同步的序列到序列模式异步的序列到序列模式 参数学习和长程依赖问题基于门控的循环神经网络门控循环单元&#xff08;GRU&#xff09;长短期记忆神经网络&#xff08;LSTM&#xff09…...

mysql已经安装,但是通过rpm -q 没有找mysql相关的已安装包

文章目录 现象&#xff1a;mysql已经安装&#xff0c;但是通过rpm -q 没有找mysql相关的已安装包遇到 rpm 命令找不到已经安装的 MySQL 包时&#xff0c;可能是因为以下几个原因&#xff1a;1.MySQL 不是通过 RPM 包安装的2.RPM 数据库损坏3.使用了不同的包名或路径4.使用其他包…...

OPENCV形态学基础之二腐蚀

一.腐蚀的原理 (图1) 数学表达式&#xff1a;dst(x,y) erode(src(x,y)) min(x,y)src(xx,yy) 腐蚀也是图像形态学的基本功能之一&#xff0c;腐蚀跟膨胀属于反向操作&#xff0c;膨胀是把图像图像变大&#xff0c;而腐蚀就是把图像变小。腐蚀后的图像变小变暗淡。 腐蚀…...

高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数

高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数 在软件开发中,单例模式(Singleton Pattern)是一种常见的设计模式,确保一个类仅有一个实例,并提供一个全局访问点。在多线程环境下,实现单例模式时需要注意线程安全问题,以防止多个线程同时创建实例,导致…...