springboot整合kafka
springboot整合kafka
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.4.0</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>cn.lhz</groupId><artifactId>kafka</artifactId><version>0.0.1</version><name>kafka</name><description>kafka</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>21</java.version><jdk.version>21</jdk.version><maven.compiler.source>${jdk.version}</maven.compiler.source><maven.compiler.target>${jdk.version}</maven.compiler.target><maven.compiler.compilerVersion>${jdk.version}</maven.compiler.compilerVersion><maven.compiler.encoding>utf-8</maven.compiler.encoding><project.build.sourceEncoding>utf-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><maven.test.failure.ignore>true</maven.test.failure.ignore><maven.test.skip>true</maven.test.skip></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-kafka</artifactId></dependency>--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>com.github.xiaoymin</groupId><artifactId>knife4j-openapi3-jakarta-spring-boot-starter</artifactId><version>4.5.0</version></dependency></dependencies><build><finalName>${project.name}</finalName><plugins><!-- 编译级别 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.13.0</version><configuration><!-- 设置编译字符编码 --><encoding>UTF-8</encoding><!-- 设置编译jdk版本 --><source>${jdk.version}</source><target>${jdk.version}</target></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build><repositories><repository><id>public</id><name>aliyun nexus</name><url>https://maven.aliyun.com/repository/public</url><releases><enabled>true</enabled></releases></repository></repositories><pluginRepositories><pluginRepository><id>public</id><name>aliyun nexus</name><url>https://maven.aliyun.com/repository/public</url><releases><enabled>true</enabled></releases><snapshots><enabled>false</enabled></snapshots></pluginRepository></pluginRepositories>
</project>
application.yml配置kafka
本案例使用最少的配置,在
application.yml
中添加Kafka的连接配置:
spring:application:name: springboot-kafkakafka:bootstrap-servers: lihaozhe01:9092,lihaozhe02:9092,lihaozhe03:9092consumer:group-id: lihaozheauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
#是否激活 swagger true or false
springdoc:swagger-ui:path: /swagger-uitags-sorter: alphaoperations-sorter: alphaapi-docs:path: /v3/api-docsgroup-configs:- group: 'default'paths-to-match: '/**'packages-to-scan: cn.lhz.controller
# knife4j的增强配置,不需要增强可以不配
knife4j:enable: truesetting:language: zh_cnbasic:# 启用基本认证enable: true# 设置用户名username: admin# 设置密码password: lihaozhe
Kafka配置类
创建
KafkaConfig.java
配置类,用于创建Kafka主题:
package cn.lhz.config;import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;/*** @author 李昊哲* @version 1.0.0*/
@Configuration
@EnableKafka
public class KafkaConfig {
// @Bean
// public ProducerFactory<String, String> producerFactory() {
// Map<String, Object> configProps = new HashMap<>();
// // 配置属性
// return new DefaultKafkaProducerFactory<>(configProps);
// }
//
// @Bean
// public KafkaTemplate<String, String> kafkaTemplate() {
// return new KafkaTemplate<>(producerFactory());
// }// @Bean
// public NewTopic myTopic() {
// return new NewTopic("lihaozhe", 1, (short) 1);
// }
}
Kafka消费者
创建
KafkaConsumer.java
消费者类,用于从Kafka主题接收消息:以下两段代码二选一
- 第一个案例代码只获取
value
值- 第二个案例代码获取了
ConsumerRecord
完整信息
获取value
package cn.lhz.service;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;/*** @author 李昊哲* @version 1.0.0*/
@Service
public class KafkaConsumer {/*** 从 topic 中获取 value** @param value topic 中获取 value*/@KafkaListener(topics = "lihaozhe", groupId = "lihaozhe")public void consume(String value) {System.out.printf("Consumed: %s\n", value);}
}
获取ConsumerRecord
package cn.lhz.service;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;/*** @author 李昊哲* @version 1.0.0*/
@Service
public class KafkaConsumer {/*** 从 topic 中获取 ConsumerRecord** @param record topic 中获取 ConsumerRecord*/@KafkaListener(topics = "lihaozhe", groupId = "lihaozhe")public void consume(ConsumerRecord<String, String> record) {System.out.printf("Consumed: %s-%d-%s:%s\n", record.topic(), record.partition(), record.key(), record.value());}
}
Kafka生产者
package cn.lhz.service;import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;/*** @author 李昊哲* @version 1.0.0*/
@Service
@RequiredArgsConstructor
public class KafkaProducer {private final KafkaTemplate<String, String> kafkaTemplate;/*** 或者 topic 中的 value** @param topic kafka topic* @param value kafka topic 中的 value*/public void sendMessage(String topic, String value) {kafkaTemplate.send(topic, value);}/*** 或者 topic 中的 value** @param topic kafka topic* @param key kafka topic 中的 key* @param value kafka topic 中的 value*/public void sendMessage(String topic, String key, String value) {kafkaTemplate.send(topic, key, value);}
}
创建控制器发送消息
package cn.lhz.controller;import cn.lhz.service.KafkaProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;/*** @author 李昊哲* @version 1.0.0*/
@RestController
@RequiredArgsConstructor
public class KafkaController {private final KafkaProducer kafkaProducer;@GetMapping("/send/{message}")public String sendMessage(@PathVariable(value = "message") String message) {// 只发送 valuekafkaProducer.sendMessage("lihaozhe", message);return "消息:" + message;}@GetMapping("/send/{key}/{message}")public String sendMessage(@PathVariable(value = "key") String key,@PathVariable(value = "message") String message) {// 只发送 key 和 valuekafkaProducer.sendMessage("lihaozhe", key, message);return "消息:" + message;}
}
配置OpenApi
package cn.lhz.config;import io.swagger.v3.oas.models.OpenAPI;
import io.swagger.v3.oas.models.info.Contact;
import io.swagger.v3.oas.models.info.Info;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.web.context.WebServerInitializedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.net.Inet4Address;
import java.net.UnknownHostException;/*** @author 李昊哲* @version 1.0.0*/
@Configuration
@Slf4j
public class OpenApiConfig implements ApplicationListener<WebServerInitializedEvent> {@Beanpublic OpenAPI springOpenAPI() {Contact contact = new Contact();contact.setName("李昊哲");contact.setUrl("https://space.bilibili.com/480308139");contact.setEmail("646269678@qq.com");// 访问路径:http://localhost:8080/doc.html// 访问路径:http://localhost:8080/swagger-ui/index.htmlreturn new OpenAPI().info(new Info().title("SpringBoot Kafka API").description("SpringBoot Kafka Simple Application").contact(contact).version("1.0.0"));}@Overridepublic void onApplicationEvent(WebServerInitializedEvent event) {try {//获取IPString hostAddress = Inet4Address.getLocalHost().getHostAddress();//获取端口号int port = event.getWebServer().getPort();//获取应用名String applicationName = event.getApplicationContext().getApplicationName();// TODO:这个localhost改成host地址log.info("项目启动启动成功!接口文档地址: http://{}:{}{}/doc.html", hostAddress, port, applicationName);log.info("项目启动启动成功!接口文档地址: http://{}:{}{}/swagger-ui/index.html", hostAddress, port, applicationName);} catch (UnknownHostException e) {e.printStackTrace();}}
}
启动项目测试
控制台输出
Consumed: hello world
控制台输出
Consumed: lihaozhe-0-hello:world
相关文章:

springboot整合kafka
springboot整合kafka pom.xml <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven…...

Python深度学习框架:PyTorch、Keras、Scikit-learn、TensorFlow如何使用?学会轻松玩转AI!
前言 我们先简单了解一下PyTorch、Keras、Scikit-learn和TensorFlow都是什么。 想象一下你要盖一座大房子。你需要砖头、水泥、工具等等,对吧?机器学习也是一样,需要一些工具来帮忙。PyTorch、Keras、Scikit-learn和TensorFlow就是四种不同的…...

【Linux】安装cuda
一、安装nvidia驱动 # 添加nvidia驱动ppa库 sudo add-apt-repository ppa:graphics-drivers/ppa sudo apt update# 查找推荐版本 sudo ubuntu-drivers devices# 安装推荐版本 sudo apt install nvidia-driver-560# 检验nvidia驱动是否安装 nvidia-smi 二、安装cudatoolkit&…...
为什么DDoS防御很贵?
分布式拒绝服务攻击(DDoS攻击)是一种常见的网络安全威胁,通过大量恶意流量使目标服务器无法提供正常服务。DDoS防御是一项复杂且昂贵的服务,本文将详细探讨为什么DDoS防御如此昂贵,并提供一些实用的代码示例和解决方案…...

将WPS的PPT 无损的用微软的PowerPoint打开
用WPS做了PPT,但是用用PowerPoint打开的时候,老是会有几张图错位。 解决方案:将wps做的PPT另存为PowerPoint的格式 参考博客:解决office的PPT和WPS的PPT不兼容的问题_office ppt和wps中代码不通用-CSDN博客 另存为的时候&#…...
【汇编】uniapp开发
UniApp是一款基于Vue.js构建的跨平台开发框架,可以用于快速开发同时运行在多个平台(包括iOS、Android、H5和小程序)的应用程序。UniApp的目标是提供一套代码即可在不同平台上运行的开发模式,从而节省开发者的时间和精力。本文将介…...
详解Oracle表的类型(二)
1.引言: Oracle数据库提供了多种表类型,以满足不同的数据存储和管理需求。本博文将对Oracle分区表及使用场景进行详细介绍。 2. 分区表 分区表是Oracle数据库中一种重要的表类型,它通过将表数据分割成多个逻辑部分来提高查询性能、管理灵活…...

Docker--通过Docker容器创建一个Web服务器
Web服务器 Web服务器,一般指网站服务器,是驻留于因特网上某种类型计算机的程序。 Web服务器可以向浏览器等Web客户端提供文档,也可以放置网站文件以供全世界浏览,或放置数据文件以供全世界下载。 Web服务器的主要功能是提供网上…...

Next.js-样式处理
#题引:我认为跟着官方文档学习不会走歪路 Next.js 支持多种为应用程序添加样式的方法,包括: CSS Modules:创建局部作用域的 CSS 类,避免命名冲突并提高可维护性。全局 CSS:使用简单,对于有传统…...
整合Springboot shiro jpa mysql 实现权限管理系统(附源码地址)
一、在开发企业级应用时,权限管理是一个至关重要的功能。本文将围绕 Spring Boot、JPA、MySQL 和 Apache Shiro,构建一个基础的权限管理系统,涵盖用户认证与授权等核心功能。 一、技术选型及框架介绍 Spring Boot:简化 Spring 应用的配置和开发。JPA:实现数据持久化,提供…...

极智嘉嵌入式面试题及参考答案
对于交叉编译器的理解 交叉编译器是一种在一个计算机平台上为另一个不同架构的计算机平台生成可执行代码的编译器。它在嵌入式系统开发中起着关键作用。 从其必要性来看,嵌入式系统通常使用的处理器架构与我们日常使用的 PC 等通用计算机不同,如 ARM、MI…...
【MySQL】数据库核心技术与应用指南
数据库的各种概念 1. 指一门学科《数据库原理与应用》。(研究如何设计实现一个数据库) 2. 指一类用来管理数据的软件。 3. 指某一个具体的数据库软件。 4. 指部署了某个数据库软件的电脑。 数据库软件 关系型数据库 1. 使用 “表” 的结构来组织数据。…...

23省赛区块链应用与维护(房屋租凭)
23省赛区块链应用与维护(房屋租凭) 背景描述 随着异地务工人员的增多,房屋租赁成为一个广阔市场。目前,现有技术中的房屋租赁是由房主发布租赁信息,租赁信息发布在房屋中介或租赁软件,租客获取租赁信息后,现场看房,并签订纸质的房屋租赁合同,房屋租赁费用通过中介或…...
深度学习4
一、手动构建模型 epoch 一次完整数据的训练过程(可细分多次训练),称为 一代训练 Batch 小部分样本对权重的更新,称为 一批数据 iteration 使用一个 Batch 的过程,称为 一次训练 步骤: 1、生成 x,y 的…...
跳绳视觉计数方案
产品概述 提供基于摄像头视觉技术的跳绳计数解决方案,可精准完成跳绳动作的实时计数,效果完全满足考试水平的要求。方案采用先进的计算机视觉算法,结合高效的模型架构,确保计数的准确性和稳定性。适用场景 学校体育考试ÿ…...

TEA加密逆向
IDA伪代码 do{if ( v15 )v17 v38; // x120x0->0x79168ba790, 输入字符串经过check1处理后字符串elsev17 v40;v18 (unsigned int *)&v17[v16]; // 0x78cbbd47fc add x12, x12, x8 ; x120x79168ba790->…...
LeetCode 404.左叶子之和
题目:给定二叉树的根节点 root ,返回所有左叶子之和。 思路:一个节点为「左叶子」节点,当且仅当它是某个节点的左子节点,并且它是一个叶子结点。因此我们可以考虑对整 node 时,如果它的左子节点是一个叶子…...

01-go入门
文章目录 Go语言学习1. 简介安装windows安装linux安装编译工具安装-goland 2. 入门2.1 Helloworld注释 2.2 变量初始化打印内存地址变量交换匿名变量作用域局部变量全局变量 2.3 常量iota 2.4 数据类型布尔整数浮点类型复数字符串定义字符串字符串拼接符定义多行字符串 map数据…...

【经典】抽奖系统(HTML,CSS、JS)
目录 1、添加参与者 2、多次添加 3、点击抽奖 功能介绍: 使用方法: 完整代码: 一个简单但功能强大的抽奖系统的示例,用于在网页上实现抽奖。 1、添加参与者 2、多次添加 3、点击抽奖 功能介绍: 参与者添加&…...

GoF设计模式——结构型设计模式分析与应用
文章目录 UML图的结构主要表现为:继承(抽象)、关联 、组合或聚合 的三种关系。1. 继承(抽象,泛化关系)2. 关联3. 组合/聚合各种可能的配合:1. 关联后抽象2. 关联的集合3. 组合接口4. 递归聚合接…...

多模态2025:技术路线“神仙打架”,视频生成冲上云霄
文|魏琳华 编|王一粟 一场大会,聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中,汇集了学界、创业公司和大厂等三方的热门选手,关于多模态的集中讨论达到了前所未有的热度。其中,…...

边缘计算医疗风险自查APP开发方案
核心目标:在便携设备(智能手表/家用检测仪)部署轻量化疾病预测模型,实现低延迟、隐私安全的实时健康风险评估。 一、技术架构设计 #mermaid-svg-iuNaeeLK2YoFKfao {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg…...
Cesium1.95中高性能加载1500个点
一、基本方式: 图标使用.png比.svg性能要好 <template><div id"cesiumContainer"></div><div class"toolbar"><button id"resetButton">重新生成点</button><span id"countDisplay&qu…...
生成 Git SSH 证书
🔑 1. 生成 SSH 密钥对 在终端(Windows 使用 Git Bash,Mac/Linux 使用 Terminal)执行命令: ssh-keygen -t rsa -b 4096 -C "your_emailexample.com" 参数说明: -t rsa&#x…...
Spring Boot面试题精选汇总
🤟致敬读者 🟩感谢阅读🟦笑口常开🟪生日快乐⬛早点睡觉 📘博主相关 🟧博主信息🟨博客首页🟫专栏推荐🟥活动信息 文章目录 Spring Boot面试题精选汇总⚙️ **一、核心概…...

Module Federation 和 Native Federation 的比较
前言 Module Federation 是 Webpack 5 引入的微前端架构方案,允许不同独立构建的应用在运行时动态共享模块。 Native Federation 是 Angular 官方基于 Module Federation 理念实现的专为 Angular 优化的微前端方案。 概念解析 Module Federation (模块联邦) Modul…...

QT: `long long` 类型转换为 `QString` 2025.6.5
在 Qt 中,将 long long 类型转换为 QString 可以通过以下两种常用方法实现: 方法 1:使用 QString::number() 直接调用 QString 的静态方法 number(),将数值转换为字符串: long long value 1234567890123456789LL; …...
AspectJ 在 Android 中的完整使用指南
一、环境配置(Gradle 7.0 适配) 1. 项目级 build.gradle // 注意:沪江插件已停更,推荐官方兼容方案 buildscript {dependencies {classpath org.aspectj:aspectjtools:1.9.9.1 // AspectJ 工具} } 2. 模块级 build.gradle plu…...

基于 TAPD 进行项目管理
起因 自己写了个小工具,仓库用的Github。之前在用markdown进行需求管理,现在随着功能的增加,感觉有点难以管理了,所以用TAPD这个工具进行需求、Bug管理。 操作流程 注册 TAPD,需要提供一个企业名新建一个项目&#…...

Linux 中如何提取压缩文件 ?
Linux 是一种流行的开源操作系统,它提供了许多工具来管理、压缩和解压缩文件。压缩文件有助于节省存储空间,使数据传输更快。本指南将向您展示如何在 Linux 中提取不同类型的压缩文件。 1. Unpacking ZIP Files ZIP 文件是非常常见的,要在 …...