springboot整合kafka
springboot整合kafka
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.4.0</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>cn.lhz</groupId><artifactId>kafka</artifactId><version>0.0.1</version><name>kafka</name><description>kafka</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>21</java.version><jdk.version>21</jdk.version><maven.compiler.source>${jdk.version}</maven.compiler.source><maven.compiler.target>${jdk.version}</maven.compiler.target><maven.compiler.compilerVersion>${jdk.version}</maven.compiler.compilerVersion><maven.compiler.encoding>utf-8</maven.compiler.encoding><project.build.sourceEncoding>utf-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><maven.test.failure.ignore>true</maven.test.failure.ignore><maven.test.skip>true</maven.test.skip></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-kafka</artifactId></dependency>--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>com.github.xiaoymin</groupId><artifactId>knife4j-openapi3-jakarta-spring-boot-starter</artifactId><version>4.5.0</version></dependency></dependencies><build><finalName>${project.name}</finalName><plugins><!-- 编译级别 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.13.0</version><configuration><!-- 设置编译字符编码 --><encoding>UTF-8</encoding><!-- 设置编译jdk版本 --><source>${jdk.version}</source><target>${jdk.version}</target></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build><repositories><repository><id>public</id><name>aliyun nexus</name><url>https://maven.aliyun.com/repository/public</url><releases><enabled>true</enabled></releases></repository></repositories><pluginRepositories><pluginRepository><id>public</id><name>aliyun nexus</name><url>https://maven.aliyun.com/repository/public</url><releases><enabled>true</enabled></releases><snapshots><enabled>false</enabled></snapshots></pluginRepository></pluginRepositories>
</project>
application.yml配置kafka
本案例使用最少的配置,在
application.yml中添加Kafka的连接配置:
spring:application:name: springboot-kafkakafka:bootstrap-servers: lihaozhe01:9092,lihaozhe02:9092,lihaozhe03:9092consumer:group-id: lihaozheauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
#是否激活 swagger true or false
springdoc:swagger-ui:path: /swagger-uitags-sorter: alphaoperations-sorter: alphaapi-docs:path: /v3/api-docsgroup-configs:- group: 'default'paths-to-match: '/**'packages-to-scan: cn.lhz.controller
# knife4j的增强配置,不需要增强可以不配
knife4j:enable: truesetting:language: zh_cnbasic:# 启用基本认证enable: true# 设置用户名username: admin# 设置密码password: lihaozhe
Kafka配置类
创建
KafkaConfig.java配置类,用于创建Kafka主题:
package cn.lhz.config;import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;/*** @author 李昊哲* @version 1.0.0*/
@Configuration
@EnableKafka
public class KafkaConfig {
// @Bean
// public ProducerFactory<String, String> producerFactory() {
// Map<String, Object> configProps = new HashMap<>();
// // 配置属性
// return new DefaultKafkaProducerFactory<>(configProps);
// }
//
// @Bean
// public KafkaTemplate<String, String> kafkaTemplate() {
// return new KafkaTemplate<>(producerFactory());
// }// @Bean
// public NewTopic myTopic() {
// return new NewTopic("lihaozhe", 1, (short) 1);
// }
}
Kafka消费者
创建
KafkaConsumer.java消费者类,用于从Kafka主题接收消息:以下两段代码二选一
- 第一个案例代码只获取
value值- 第二个案例代码获取了
ConsumerRecord完整信息
获取value
package cn.lhz.service;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;/*** @author 李昊哲* @version 1.0.0*/
@Service
public class KafkaConsumer {/*** 从 topic 中获取 value** @param value topic 中获取 value*/@KafkaListener(topics = "lihaozhe", groupId = "lihaozhe")public void consume(String value) {System.out.printf("Consumed: %s\n", value);}
}
获取ConsumerRecord
package cn.lhz.service;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;/*** @author 李昊哲* @version 1.0.0*/
@Service
public class KafkaConsumer {/*** 从 topic 中获取 ConsumerRecord** @param record topic 中获取 ConsumerRecord*/@KafkaListener(topics = "lihaozhe", groupId = "lihaozhe")public void consume(ConsumerRecord<String, String> record) {System.out.printf("Consumed: %s-%d-%s:%s\n", record.topic(), record.partition(), record.key(), record.value());}
}
Kafka生产者
package cn.lhz.service;import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;/*** @author 李昊哲* @version 1.0.0*/
@Service
@RequiredArgsConstructor
public class KafkaProducer {private final KafkaTemplate<String, String> kafkaTemplate;/*** 或者 topic 中的 value** @param topic kafka topic* @param value kafka topic 中的 value*/public void sendMessage(String topic, String value) {kafkaTemplate.send(topic, value);}/*** 或者 topic 中的 value** @param topic kafka topic* @param key kafka topic 中的 key* @param value kafka topic 中的 value*/public void sendMessage(String topic, String key, String value) {kafkaTemplate.send(topic, key, value);}
}
创建控制器发送消息
package cn.lhz.controller;import cn.lhz.service.KafkaProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;/*** @author 李昊哲* @version 1.0.0*/
@RestController
@RequiredArgsConstructor
public class KafkaController {private final KafkaProducer kafkaProducer;@GetMapping("/send/{message}")public String sendMessage(@PathVariable(value = "message") String message) {// 只发送 valuekafkaProducer.sendMessage("lihaozhe", message);return "消息:" + message;}@GetMapping("/send/{key}/{message}")public String sendMessage(@PathVariable(value = "key") String key,@PathVariable(value = "message") String message) {// 只发送 key 和 valuekafkaProducer.sendMessage("lihaozhe", key, message);return "消息:" + message;}
}
配置OpenApi
package cn.lhz.config;import io.swagger.v3.oas.models.OpenAPI;
import io.swagger.v3.oas.models.info.Contact;
import io.swagger.v3.oas.models.info.Info;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.web.context.WebServerInitializedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.net.Inet4Address;
import java.net.UnknownHostException;/*** @author 李昊哲* @version 1.0.0*/
@Configuration
@Slf4j
public class OpenApiConfig implements ApplicationListener<WebServerInitializedEvent> {@Beanpublic OpenAPI springOpenAPI() {Contact contact = new Contact();contact.setName("李昊哲");contact.setUrl("https://space.bilibili.com/480308139");contact.setEmail("646269678@qq.com");// 访问路径:http://localhost:8080/doc.html// 访问路径:http://localhost:8080/swagger-ui/index.htmlreturn new OpenAPI().info(new Info().title("SpringBoot Kafka API").description("SpringBoot Kafka Simple Application").contact(contact).version("1.0.0"));}@Overridepublic void onApplicationEvent(WebServerInitializedEvent event) {try {//获取IPString hostAddress = Inet4Address.getLocalHost().getHostAddress();//获取端口号int port = event.getWebServer().getPort();//获取应用名String applicationName = event.getApplicationContext().getApplicationName();// TODO:这个localhost改成host地址log.info("项目启动启动成功!接口文档地址: http://{}:{}{}/doc.html", hostAddress, port, applicationName);log.info("项目启动启动成功!接口文档地址: http://{}:{}{}/swagger-ui/index.html", hostAddress, port, applicationName);} catch (UnknownHostException e) {e.printStackTrace();}}
}
启动项目测试

控制台输出
Consumed: hello world

控制台输出
Consumed: lihaozhe-0-hello:world
相关文章:
springboot整合kafka
springboot整合kafka pom.xml <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven…...
Python深度学习框架:PyTorch、Keras、Scikit-learn、TensorFlow如何使用?学会轻松玩转AI!
前言 我们先简单了解一下PyTorch、Keras、Scikit-learn和TensorFlow都是什么。 想象一下你要盖一座大房子。你需要砖头、水泥、工具等等,对吧?机器学习也是一样,需要一些工具来帮忙。PyTorch、Keras、Scikit-learn和TensorFlow就是四种不同的…...
【Linux】安装cuda
一、安装nvidia驱动 # 添加nvidia驱动ppa库 sudo add-apt-repository ppa:graphics-drivers/ppa sudo apt update# 查找推荐版本 sudo ubuntu-drivers devices# 安装推荐版本 sudo apt install nvidia-driver-560# 检验nvidia驱动是否安装 nvidia-smi 二、安装cudatoolkit&…...
为什么DDoS防御很贵?
分布式拒绝服务攻击(DDoS攻击)是一种常见的网络安全威胁,通过大量恶意流量使目标服务器无法提供正常服务。DDoS防御是一项复杂且昂贵的服务,本文将详细探讨为什么DDoS防御如此昂贵,并提供一些实用的代码示例和解决方案…...
将WPS的PPT 无损的用微软的PowerPoint打开
用WPS做了PPT,但是用用PowerPoint打开的时候,老是会有几张图错位。 解决方案:将wps做的PPT另存为PowerPoint的格式 参考博客:解决office的PPT和WPS的PPT不兼容的问题_office ppt和wps中代码不通用-CSDN博客 另存为的时候&#…...
【汇编】uniapp开发
UniApp是一款基于Vue.js构建的跨平台开发框架,可以用于快速开发同时运行在多个平台(包括iOS、Android、H5和小程序)的应用程序。UniApp的目标是提供一套代码即可在不同平台上运行的开发模式,从而节省开发者的时间和精力。本文将介…...
详解Oracle表的类型(二)
1.引言: Oracle数据库提供了多种表类型,以满足不同的数据存储和管理需求。本博文将对Oracle分区表及使用场景进行详细介绍。 2. 分区表 分区表是Oracle数据库中一种重要的表类型,它通过将表数据分割成多个逻辑部分来提高查询性能、管理灵活…...
Docker--通过Docker容器创建一个Web服务器
Web服务器 Web服务器,一般指网站服务器,是驻留于因特网上某种类型计算机的程序。 Web服务器可以向浏览器等Web客户端提供文档,也可以放置网站文件以供全世界浏览,或放置数据文件以供全世界下载。 Web服务器的主要功能是提供网上…...
Next.js-样式处理
#题引:我认为跟着官方文档学习不会走歪路 Next.js 支持多种为应用程序添加样式的方法,包括: CSS Modules:创建局部作用域的 CSS 类,避免命名冲突并提高可维护性。全局 CSS:使用简单,对于有传统…...
整合Springboot shiro jpa mysql 实现权限管理系统(附源码地址)
一、在开发企业级应用时,权限管理是一个至关重要的功能。本文将围绕 Spring Boot、JPA、MySQL 和 Apache Shiro,构建一个基础的权限管理系统,涵盖用户认证与授权等核心功能。 一、技术选型及框架介绍 Spring Boot:简化 Spring 应用的配置和开发。JPA:实现数据持久化,提供…...
极智嘉嵌入式面试题及参考答案
对于交叉编译器的理解 交叉编译器是一种在一个计算机平台上为另一个不同架构的计算机平台生成可执行代码的编译器。它在嵌入式系统开发中起着关键作用。 从其必要性来看,嵌入式系统通常使用的处理器架构与我们日常使用的 PC 等通用计算机不同,如 ARM、MI…...
【MySQL】数据库核心技术与应用指南
数据库的各种概念 1. 指一门学科《数据库原理与应用》。(研究如何设计实现一个数据库) 2. 指一类用来管理数据的软件。 3. 指某一个具体的数据库软件。 4. 指部署了某个数据库软件的电脑。 数据库软件 关系型数据库 1. 使用 “表” 的结构来组织数据。…...
23省赛区块链应用与维护(房屋租凭)
23省赛区块链应用与维护(房屋租凭) 背景描述 随着异地务工人员的增多,房屋租赁成为一个广阔市场。目前,现有技术中的房屋租赁是由房主发布租赁信息,租赁信息发布在房屋中介或租赁软件,租客获取租赁信息后,现场看房,并签订纸质的房屋租赁合同,房屋租赁费用通过中介或…...
深度学习4
一、手动构建模型 epoch 一次完整数据的训练过程(可细分多次训练),称为 一代训练 Batch 小部分样本对权重的更新,称为 一批数据 iteration 使用一个 Batch 的过程,称为 一次训练 步骤: 1、生成 x,y 的…...
跳绳视觉计数方案
产品概述 提供基于摄像头视觉技术的跳绳计数解决方案,可精准完成跳绳动作的实时计数,效果完全满足考试水平的要求。方案采用先进的计算机视觉算法,结合高效的模型架构,确保计数的准确性和稳定性。适用场景 学校体育考试ÿ…...
TEA加密逆向
IDA伪代码 do{if ( v15 )v17 v38; // x120x0->0x79168ba790, 输入字符串经过check1处理后字符串elsev17 v40;v18 (unsigned int *)&v17[v16]; // 0x78cbbd47fc add x12, x12, x8 ; x120x79168ba790->…...
LeetCode 404.左叶子之和
题目:给定二叉树的根节点 root ,返回所有左叶子之和。 思路:一个节点为「左叶子」节点,当且仅当它是某个节点的左子节点,并且它是一个叶子结点。因此我们可以考虑对整 node 时,如果它的左子节点是一个叶子…...
01-go入门
文章目录 Go语言学习1. 简介安装windows安装linux安装编译工具安装-goland 2. 入门2.1 Helloworld注释 2.2 变量初始化打印内存地址变量交换匿名变量作用域局部变量全局变量 2.3 常量iota 2.4 数据类型布尔整数浮点类型复数字符串定义字符串字符串拼接符定义多行字符串 map数据…...
【经典】抽奖系统(HTML,CSS、JS)
目录 1、添加参与者 2、多次添加 3、点击抽奖 功能介绍: 使用方法: 完整代码: 一个简单但功能强大的抽奖系统的示例,用于在网页上实现抽奖。 1、添加参与者 2、多次添加 3、点击抽奖 功能介绍: 参与者添加&…...
GoF设计模式——结构型设计模式分析与应用
文章目录 UML图的结构主要表现为:继承(抽象)、关联 、组合或聚合 的三种关系。1. 继承(抽象,泛化关系)2. 关联3. 组合/聚合各种可能的配合:1. 关联后抽象2. 关联的集合3. 组合接口4. 递归聚合接…...
Vue记事本应用实现教程
文章目录 1. 项目介绍2. 开发环境准备3. 设计应用界面4. 创建Vue实例和数据模型5. 实现记事本功能5.1 添加新记事项5.2 删除记事项5.3 清空所有记事 6. 添加样式7. 功能扩展:显示创建时间8. 功能扩展:记事项搜索9. 完整代码10. Vue知识点解析10.1 数据绑…...
CVPR 2025 MIMO: 支持视觉指代和像素grounding 的医学视觉语言模型
CVPR 2025 | MIMO:支持视觉指代和像素对齐的医学视觉语言模型 论文信息 标题:MIMO: A medical vision language model with visual referring multimodal input and pixel grounding multimodal output作者:Yanyuan Chen, Dexuan Xu, Yu Hu…...
Spring Boot 实现流式响应(兼容 2.7.x)
在实际开发中,我们可能会遇到一些流式数据处理的场景,比如接收来自上游接口的 Server-Sent Events(SSE) 或 流式 JSON 内容,并将其原样中转给前端页面或客户端。这种情况下,传统的 RestTemplate 缓存机制会…...
visual studio 2022更改主题为深色
visual studio 2022更改主题为深色 点击visual studio 上方的 工具-> 选项 在选项窗口中,选择 环境 -> 常规 ,将其中的颜色主题改成深色 点击确定,更改完成...
使用分级同态加密防御梯度泄漏
抽象 联邦学习 (FL) 支持跨分布式客户端进行协作模型训练,而无需共享原始数据,这使其成为在互联和自动驾驶汽车 (CAV) 等领域保护隐私的机器学习的一种很有前途的方法。然而,最近的研究表明&…...
为什么需要建设工程项目管理?工程项目管理有哪些亮点功能?
在建筑行业,项目管理的重要性不言而喻。随着工程规模的扩大、技术复杂度的提升,传统的管理模式已经难以满足现代工程的需求。过去,许多企业依赖手工记录、口头沟通和分散的信息管理,导致效率低下、成本失控、风险频发。例如&#…...
JVM垃圾回收机制全解析
Java虚拟机(JVM)中的垃圾收集器(Garbage Collector,简称GC)是用于自动管理内存的机制。它负责识别和清除不再被程序使用的对象,从而释放内存空间,避免内存泄漏和内存溢出等问题。垃圾收集器在Ja…...
ESP32 I2S音频总线学习笔记(四): INMP441采集音频并实时播放
简介 前面两期文章我们介绍了I2S的读取和写入,一个是通过INMP441麦克风模块采集音频,一个是通过PCM5102A模块播放音频,那如果我们将两者结合起来,将麦克风采集到的音频通过PCM5102A播放,是不是就可以做一个扩音器了呢…...
Mac软件卸载指南,简单易懂!
刚和Adobe分手,它却总在Library里给你写"回忆录"?卸载的Final Cut Pro像电子幽灵般阴魂不散?总是会有残留文件,别慌!这份Mac软件卸载指南,将用最硬核的方式教你"数字分手术"࿰…...
css的定位(position)详解:相对定位 绝对定位 固定定位
在 CSS 中,元素的定位通过 position 属性控制,共有 5 种定位模式:static(静态定位)、relative(相对定位)、absolute(绝对定位)、fixed(固定定位)和…...
