当前位置: 首页 > news >正文

SpringBoot整合Kafka的快速使用教程

       

目录

一、引入Kafka的依赖

二、配置Kafka

三、创建主题

1、自动创建(不推荐)

2、手动动创建

四、生产者代码

五、消费者代码  

六、常用的KafKa的命令


        Kafka是一个高性能、分布式的消息发布-订阅系统,被广泛应用于大数据处理、实时日志分析等场景。Spring Boot作为目前最流行的Java开发框架之一,其简洁的配置和丰富的工具使得与Kafka的集成变得更加容易。本文将介绍如何使用Spring Boot整合Kafka,实现高效的数据处理和消息传递。

一、引入Kafka的依赖

       <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>

二、配置Kafka

spring:kafka:bootstrap-servers: 156.65.20.76:9092,156.65.20.77:9092,156.65.20.78:9092 #指定Kafka集群的地址,这里有三个地址,用逗号分隔。listener:ack-mode: manual_immediate #设置消费者的确认模式为manual_immediate,表示消费者在接收到消息后立即手动确认。concurrency: 3  #设置消费者的并发数为3missing-topics-fatal: false  #设置为false,表示如果消费者订阅的主题不存在,不会抛出异常。producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer  # 设置消息键的序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer #设置消息值的序列化器acks: 1  #一般就是选择1,兼顾可靠性和吞吐量 ,如果想要更高的吞吐量设置为0,如果要求更高的可靠性就设置为-1consumer:auto-offset-reset: earliest #设置为"earliest"表示将从最早的可用消息开始消费,即从分区的起始位置开始读取消息。enable-auto-commit: false #禁用了自动提交偏移量的功能,为了避免出现重复数据和数据丢失,一般都是手动提交key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # 设置消息键的反序列化器value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #设置消息值的反序列化器

注:kafka的acks有三个值,可以根据实际情况和需求平衡消息系统的吞吐量和数据安全性,来选择对应的值。

  • acks=0:这是最不可靠的模式。当设置为acks=0时,生产者在发送消息后不会等待任何服务器端的确认响应。这种模式下,生产者可以迅速继续发送下一批消息,效率最高,但风险也最大。如果在此模式下发生网络问题或broker故障,发送的消息可能会永久丢失,生产者无法得知消息是否成功到达Kafka broker。因此,这种配置适合于能够容忍少量数据丢失的场景,例如实时数据分析或生成非关键的实时报表。
  • acks=1:这是默认的配置模式,也是一种折衷方案。在这种模式下,生产者会等待分区的领导者节点(leader)确认消息已经成功写入磁盘,才会发送确认信息给生产者。这提高了数据的安全性,因为只要领导者节点保存了消息,即使跟随者(replicas)没有及时同步,消息也不会丢失。然而,如果领导者在同步给所有追随者之前崩溃,那么尚未同步的副本将无法获取该消息,仍然存在消息丢失的风险。
  • acks=all或-1:这是最可靠的模式。在这个模式下,生产者不仅需要领导者节点确认,还会等待所有同步副本(In-sync replicas, ISR)都确认写入消息后才会收到确认。这极大地增强了数据的持久性保证,确保了即使在多个节点故障的情况下,消息也不会丢失。此模式适用于数据可靠性要求非常高的场景,如金融交易系统或重要的日志记录

三、创建主题

    1、自动创建(不推荐)

         不存在的主题,会自动创建,分区数和副本数均为默认值。而默认值可能会不符合某些场景的要求。

在kafka的安装目录conf目录下找到该配置文件server.properties,添加如下配置:
num.partitions=3 #默认3个分区
auto.create.topics.enable=true #开启自动创建主题
default.replication.factor=3 #默认3个副本

    2、手动动创建

         在kafka的安装目录bin目录下,执行如下命令: 

//创建一个有三个分区和三个副本,名为zhuoye的主题
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3  --topic zhuoye 

四、生产者代码

@Slf4j
@Component
public class ALiYunServiceImpl implents IALiYunService {@Autowiredprivate KafkaTemplate kafkaTemplate;@Autowiredprivate ExecutorService executorService;String topicName = "zhuoye";@Overridepublic void queryECSMetricInfo() {//发送到kafka的消息集合,因为使用了多线程,并且在多线程中往该集合进行添加操作,所以需要线程安全的List<Message> messages = Collections.synchronizedList(new ArrayList<>());boolean flag = true;//获取上次查询时间Long startTime = Long.valueOf(queryTimeRecordMapper.selectTimeByBelongId(3)) * 1000;Long endTime = System.currentTimeMillis();try {//查询出所有的运行中的实例List<CloudInstanceAssetDto> cloudInstances = cloudInstanceAssetMapper.queryAllRunningInstance(1, "Running");if (CollectionUtils.isEmpty(cloudInstances)) {return;}//定义计数器CountDownLatch latch = new CountDownLatch(cloudInstances.size());//遍历查询for (CloudInstanceAssetDto instance : cloudInstances) {executorService.submit(() -> {try {//获取内网流出带宽,并将结果封装到消息集合中dealMetricDataToMessage(ALiYunConstant.ECS_INTRANET_OUT_RATE, ALiYunConstant.INTRANET_OUT_RATE_NAME, ALiYunConstant.LW_INTRANET_OUT_RATE_CODE,startTime, endTime, instance, messages);} catch (Exception e) {log.error("获取ECS的指标数据-多线程处理任务异常!", e);} finally {latch.countDown();}});}//等待任务执行完毕latch.await();//将最终的消息集合发送到kafkaif (CollectionUtils.isNotEmpty(messages)) {for (int i = 0; i < messages.size(); i++) {if (StringUtils.isNotBlank(messages.get(i).getValue())&& "noSuchInstance".equals(messages.get(i).getValue())) {continue;}kafkaTemplate.send(topicName,  messages.get(i));}}} catch (Exception e) {flag = false;log.error("获取ECS的指标数据失败", e);}//更新记录上次查询时间if (flag) {QueryTimeRecord queryTimeRecord = new QueryTimeRecord();queryTimeRecord.setBelongId(3).setLastQueryTime(String.valueOf((endTime - 1000 * 60 * 1) / 1000)); //开始时间往前推1分钟queryTimeRecordMapper.updateByBelongId(queryTimeRecord);}}

这个时候,如果你想看有没有把消息发送到kafka的指定主题可以使用如下命令:

kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic zhuoye 

五、消费者代码  

@Slf4j
@Component
public class KafkaConsumer {// 消费监听@KafkaListener(topics = "zhuoye",groupId ="zhuoye-aliyunmetric")public void consumeExtractorChangeMessage(ConsumerRecord<String, String> record, Acknowledgment ack){try {String value = record.value();//处理数据,存入openTsDb.................................ack.acknowledge();//手动提交}catch (Exception e){log.error("kafa-topic【zhuoye】消费阿里云指标源消息【失败】");log.error(e.getMessage());}}
}

六、常用的KafKa的命令

//创建主题
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3  --topic zhuoye
//查看kafka是否接收对应的消息
 kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic zhuoye
// 修改kafka-topic分区数
./kafka-topics.sh --zookeeper localhost:2181 -alter --partitions 6 --topic zhuoye
// 查看topic分区数
./kafka-topics.sh --zookeeper localhost:2181 --describe --topic zhuoye
// 查看用户组消费情况
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group zhuoye-aliyunmetric --describe

相关文章:

SpringBoot整合Kafka的快速使用教程

目录 一、引入Kafka的依赖 二、配置Kafka 三、创建主题 1、自动创建(不推荐) 2、手动动创建 四、生产者代码 五、消费者代码 六、常用的KafKa的命令 Kafka是一个高性能、分布式的消息发布-订阅系统&#xff0c;被广泛应用于大数据处理、实时日志分析等场景。Spring B…...

低边驱动与高边驱动

一.高边驱动和低边驱动 低边驱动(LSD): 在电路的接地端加了一个可控开关&#xff0c;低边驱动就是通过闭合地线来控制这个开关的开关。容易实现&#xff08;电路也比较简单&#xff0c;一般由MOS管加几个电阻、电容&#xff09;、适用电路简化和成本控制的情况。 高边驱动&am…...

【C++】入门(二):引用、内联、auto

书接上回&#xff1a;【C】入门&#xff08;一&#xff09;&#xff1a;命名空间、缺省参数、函数重载 文章目录 六、引用引用的概念引用的使用场景1. 引用做参数作用1&#xff1a;输出型参数作用2&#xff1a;对象比较大&#xff0c;减少拷贝&#xff0c;提高效率 2. 引用作为…...

编程学习 (C规划) 6 {24_4_18} 七 ( 简单扫雷游戏)

首先我们要清楚扫雷大概是如何实现的&#xff1a; 1.布置雷 2.扫雷&#xff08;排查雷&#xff09; &#xff08;1&#xff09;如果这个位置是雷就炸了&#xff0c;游戏结束 &#xff08;2&#xff09;如果不是雷&#xff0c;就告诉周围有几个雷 3.把所有不是雷的位置都找…...

【AI】llama-fs的 安装与运行

pip install -r .\requirements.txt Windows PowerShell Copyright (C) Microsoft Corporation. All rights reserved.Install the latest PowerShell for new features and improvements! https://aka.ms/PSWindows(venv) PS D:\XTRANS\pythonProject>...

Android NDK系列(五)内存监控

在日常的开发中&#xff0c;内存泄漏是一种比较比较棘手的问题&#xff0c;这是由于其具有隐蔽性&#xff0c;即使发生了泄漏&#xff0c;很难检测到并且不好定位到哪里导致的泄漏。如果程序在运行的过程中不断出现内存泄漏&#xff0c;那么越来越多的内存得不到释放&#xff0…...

软件设计师,下午题 ——试题六

模型图 简单工厂模式 工厂方法模式抽象工厂模式生成器模式原型模式适配器模式桥接模式组合模式装饰&#xff08;器&#xff09;模式亨元模式命令模式观察者模式状态模式策略模式访问者模式中介者模式 简单工厂模式 工厂方法模式 抽象工厂模式 生成器模式 原型模式 适配器模式 桥…...

《Kubernetes部署篇:基于麒麟V10+ARM64架构部署harbor v2.4.0镜像仓库》

总结&#xff1a;整理不易&#xff0c;如果对你有帮助&#xff0c;可否点赞关注一下&#xff1f; 更多详细内容请参考&#xff1a;企业级K8s集群运维实战 一、环境信息 K8S版本 操作系统 CPU架构 服务版本 1.26.15 Kylin Linux Advanced Server V10 ARM64 harbor v2.4.0 二、部…...

远程工作/线上兼职网站整理(数字游民友好)

文章目录 国外线上兼职网站fiverrupwork 国内线上兼职网站甜薪工场猪八戒网云队友 国外线上兼职网站 fiverr https://www.fiverr.com/start_selling?sourcetop_nav upwork https://www.upwork.com/ 国内线上兼职网站 甜薪工场 https://www.txgc.com/ 猪八戒网 云队友 …...

elasticsearch7.15实现用户输入自动补全

Elasticsearch Completion Suggester&#xff08;补全建议&#xff09; Elasticsearch7.15安装 官方文档 补全建议器提供了根据输入自动补全/搜索的功能。这是一个导航功能&#xff0c;引导用户在输入时找到相关结果&#xff0c;提高搜索精度。 理想情况下&#xff0c;自动补…...

掌握正则表达式的力量:全方位解析PCRE的基础与进阶技能

Perl 兼容正则表达式&#xff08;PCRE&#xff09;是 Perl scripting language 中所使用的正则表达式语法标准。这些正则表达式在 Linux 命令行工具&#xff08;如 grep -P&#xff09;及其他编程语言和工具中也有广泛应用。以下是一些基础和进阶特性&#xff0c;帮你掌握和使用…...

FastFM库,一款强大神奇的Python系统分析预测的工具

FastFM库概述 在机器学习领域,Factorization Machines&#xff08;FM&#xff09;是处理稀疏数据集中特征间交互的重要工具.Python的fastFM库提供了高效的实现,特别适合用于推荐系统、评分预测等任务.本文将全面介绍fastFM的安装、特性、基本和高级功能,并结合实际应用场景展示…...

R语言绘图 --- 饼状图(Biorplot 开发日志 --- 2)

「写在前面」 在科研数据分析中我们会重复地绘制一些图形&#xff0c;如果代码管理不当经常就会忘记之前绘图的代码。于是我计划开发一个 R 包&#xff08;Biorplot&#xff09;&#xff0c;用来管理自己 R 语言绘图的代码。本系列文章用于记录 Biorplot 包开发日志。 相关链接…...

用于日常任务的实用 Python 脚本

Python 是一种多功能编程语言&#xff0c;以其简单易读而闻名。它广泛应用于从 Web 开发到数据分析等各个领域。Python 脚本&#xff0c;它们可以通过自动执行常见任务来使您的生活更轻松。 用于日常任务的实用 Python 脚本 1. 使用 Pandas 进行数据分析2. 使用 BeautifulSoup …...

7-Zip是什么呢

1. 简介 7-Zip 是一个功能强大、免费开源的文件压缩和解压缩工具&#xff0c;适用于个人用户和企业用户&#xff0c;可以在多种操作系统上进行使用&#xff0c;并且支持广泛的压缩格式和高级功能。 2. 特点与优势 开源免费&#xff1a;7-Zip 是免费的开源软件&#xff0c;可…...

Satellite Stereo Pipeline学习

1.在Anaconda某个环境中安装s2p pip install s2p 2.在Ubuntu系统中安装s2p源代码 git clone https://github.com/centreborelli/s2p.git --recursive cd s2p pip install -e ".[test]" 3.在s2p中进行make all处理 中间会有很多情况&#xff0c;基本上哪个包出问题…...

linux-gpio

在Linux shell中测试GPIO通信&#xff0c;通常需要使用GPIO的设备文件&#xff0c;这些文件通常位于/sys/class/gpio目录下。要使用特定的GPIO引脚&#xff0c;比如GPIO92&#xff0c;你需要执行以下步骤&#xff1a; 导出GPIO引脚&#xff1a;首先&#xff0c;需要确保GPIO92已…...

C# 代码配置的艺术

文章目录 1、代码配置的定义及其在软件工程中的作用2、C# 代码配置的基本概念和工具3、代码配置的实践步骤4、实现代码配置使用属性&#xff08;Properties&#xff09;使用配置文件&#xff08;Config Files&#xff09;使用依赖注入&#xff08;Dependency Injection&#xf…...

268 基于matlab的模拟双滑块连杆机构运动

基于matlab的模拟双滑块连杆机构运动&#xff0c;并绘制运动动画&#xff0c;连杆轨迹可视化输出&#xff0c;并输出杆件质心轨迹、角速度、速度变化曲线。可定义杆长、滑块速度&#xff0c;滑块初始位置等参数。程序已调通&#xff0c;可直接运行。 268 双滑块连杆机构运动 连…...

进口铝合金电动隔膜泵

进口铝合金电动隔膜泵是一种高效、可靠的工业泵&#xff0c;其特点、性能与应用广泛&#xff0c;以下是对其的详细分析&#xff1a; 特点 材质与结构&#xff1a; 采用铝合金材料制造&#xff0c;具有良好的耐腐蚀性和轻量化特点。铝合金材质使得泵体结构紧凑、轻便&#xff…...

UE5 学习系列(二)用户操作界面及介绍

这篇博客是 UE5 学习系列博客的第二篇&#xff0c;在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下&#xff1a; 【Note】&#xff1a;如果你已经完成安装等操作&#xff0c;可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作&#xff0c;重…...

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式

一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明&#xff1a;假设每台服务器已…...

地震勘探——干扰波识别、井中地震时距曲线特点

目录 干扰波识别反射波地震勘探的干扰波 井中地震时距曲线特点 干扰波识别 有效波&#xff1a;可以用来解决所提出的地质任务的波&#xff1b;干扰波&#xff1a;所有妨碍辨认、追踪有效波的其他波。 地震勘探中&#xff0c;有效波和干扰波是相对的。例如&#xff0c;在反射波…...

【OSG学习笔记】Day 18: 碰撞检测与物理交互

物理引擎&#xff08;Physics Engine&#xff09; 物理引擎 是一种通过计算机模拟物理规律&#xff08;如力学、碰撞、重力、流体动力学等&#xff09;的软件工具或库。 它的核心目标是在虚拟环境中逼真地模拟物体的运动和交互&#xff0c;广泛应用于 游戏开发、动画制作、虚…...

Qwen3-Embedding-0.6B深度解析:多语言语义检索的轻量级利器

第一章 引言&#xff1a;语义表示的新时代挑战与Qwen3的破局之路 1.1 文本嵌入的核心价值与技术演进 在人工智能领域&#xff0c;文本嵌入技术如同连接自然语言与机器理解的“神经突触”——它将人类语言转化为计算机可计算的语义向量&#xff0c;支撑着搜索引擎、推荐系统、…...

CocosCreator 之 JavaScript/TypeScript和Java的相互交互

引擎版本&#xff1a; 3.8.1 语言&#xff1a; JavaScript/TypeScript、C、Java 环境&#xff1a;Window 参考&#xff1a;Java原生反射机制 您好&#xff0c;我是鹤九日&#xff01; 回顾 在上篇文章中&#xff1a;CocosCreator Android项目接入UnityAds 广告SDK。 我们简单讲…...

Python爬虫(一):爬虫伪装

一、网站防爬机制概述 在当今互联网环境中&#xff0c;具有一定规模或盈利性质的网站几乎都实施了各种防爬措施。这些措施主要分为两大类&#xff1a; 身份验证机制&#xff1a;直接将未经授权的爬虫阻挡在外反爬技术体系&#xff1a;通过各种技术手段增加爬虫获取数据的难度…...

【git】把本地更改提交远程新分支feature_g

创建并切换新分支 git checkout -b feature_g 添加并提交更改 git add . git commit -m “实现图片上传功能” 推送到远程 git push -u origin feature_g...

让AI看见世界:MCP协议与服务器的工作原理

让AI看见世界&#xff1a;MCP协议与服务器的工作原理 MCP&#xff08;Model Context Protocol&#xff09;是一种创新的通信协议&#xff0c;旨在让大型语言模型能够安全、高效地与外部资源进行交互。在AI技术快速发展的今天&#xff0c;MCP正成为连接AI与现实世界的重要桥梁。…...

深入解析C++中的extern关键字:跨文件共享变量与函数的终极指南

&#x1f680; C extern 关键字深度解析&#xff1a;跨文件编程的终极指南 &#x1f4c5; 更新时间&#xff1a;2025年6月5日 &#x1f3f7;️ 标签&#xff1a;C | extern关键字 | 多文件编程 | 链接与声明 | 现代C 文章目录 前言&#x1f525;一、extern 是什么&#xff1f;&…...