使用Spring Boot和Kafka实现消息发送和订阅
文章目录
- 一,新建Spring Boot
- 1,Maven配置
- 2,无法识别为SpringBoot项目
- 3,无效的源发行版
- 4,无法访问SpringApplication
- 5,运行直接Finish
- 6,服务运行成功
- 二,安装启动Kafka
- 1,下载
- 2,配置
- 3,启动
- 4,其他命令
- 三,生产消费消息
- 1,加入依赖
- 2,yam配置文件
- 3,报错enabled mechanisms are []
- 4,生产者生产消息
- 5,订阅和消费消息
- 6,接口
- 7,测试结果
- 四,参考博文
一,新建Spring Boot
最近忙着搞低代码开发,好久没新建spring项目了,结果今天心血来潮准备建个springboot项目

注意Type选Maven,java选8,其他默认

1,Maven配置
点下一步后完成就新建了一个spring boot项目,配置下Maven环境,主要是settings.xml文件,里面要包含阿里云仓库,不然可能依赖下载不下来


2,无法识别为SpringBoot项目
在maven配置没问题的前提下,IDEA无法识别这是一个Spring Boot项目,倒腾半天,终于发现问题原因所在=======>是Maven版本太高的原因

把.mvn/wrapper目录下的maven-wrapper.properties文件第一行的版本号降低,比如说降为3.5.4,然后重新点下Maven的同步按钮

3,无效的源发行版
接下来运行项目报错:java: 无效的源发行版: 14

修改pom.xml中java.version值为8,原来是17
<properties><java.version>17</java.version></properties>
4,无法访问SpringApplication
继续运行,继续报错
降低spring-boot-starter-parent版本,原来是3.1.3,改为2.7.2
5,运行直接Finish
继续运行,没报错,服务直接Finished

需要添加web依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
6,服务运行成功
终于,一个空的spring boot项目成功跑起来了,喜极而泣

二,安装启动Kafka
1,下载
官网=>https://kafka.apache.org/downloads,下载最新版的kafka,目前是3.5.1

2,配置
解压到D盘Config目录下即完成安装,目录为D:\Config\kafka_2.13-3.5.1
修改配置文件
(1) server.properties
broker.id=1
log.dirs=/Config/kafka_2.13-3.5.1/logs-kafka
(2) zookeeper.properties
dataDir=/Config/kafka_2.13-3.5.1/logs-zookeeper
3,启动
先启动zookeeper
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
再启动kafka
bin\windows\kafka-server-start.bat config\server.properties
停止的时候,先停止kafka,再停止zookeeper,直接ctrl+c停止
4,其他命令
1,查看topic列表
bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
2,查看topic具体信息
bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic test
3,创建topic
bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
三,生产消费消息
1,加入依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
2,yam配置文件
application.yaml
spring:profiles:active: dev
application-dev.yaml
server:port: 8082servlet:context-path: /test-kafkaspring:cache:type: ehcacheconfig: classpath:ehcache.xmljpa:database-platform: com.enigmabridge.hibernate.dialect.SQLiteDialectkafka:bootstrap-servers: 127.0.0.1:9092consumer:group-id: kafka-demo-kafka-groupkey-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer retries: 10
3,报错enabled mechanisms are []
Connection to node -1 (activate.navicat.com/127.0.0.1:9092) failed authentication due to: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are []

这个错误我本地测试下来是因为没把账号密码配置这块注释掉

4,生产者生产消息
@Slf4j
@Component
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public String sendMessage(String content) {String topic = "test_topic";kafkaTemplate.send(topic, content).addCallback(success -> {String topic = success.getRecordMetadata().topic();int partition = success.getRecordMetadata().partition();long offset = success.getRecordMetadata().offset();log.info("发送成功:主题:{},分区:{},偏移量:{}",topic,partition,offset);}, failure -> {log.info("发送失败:{}",failure.getMessage());});return "发送成功";}
}
5,订阅和消费消息
一,订阅主题
1,获取消费者
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;import java.util.Properties;/*** kafka消费者配置* @author liuxunming*/
@Configuration
@Component
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Value("${spring.kafka.consumer.key-deserializer}")private String keyDeserializer;@Value("${spring.kafka.consumer.value-deserializer}")private String valueDeserializer;public KafkaConsumer<String, String> createConsumer() {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);return consumer;}}
2,订阅topic
KafkaConsumer<String, String> consumer = kafkaConfig.createConsumer();consumer.subscribe(Collections.singleton("traffic"));
3,拉取消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {String key = record.key();String value = record.value();log.info("\n收到消息key=>{}\n收到消息value=>{}",key,value);
}
4,消费位移,释放资源
// 提交消费位移
consumer.commitSync();
// 关闭消费者以释放资源
consumer.close();
二,点对点模式
@Slf4j
@Component
public class KafkaConsumer {@KafkaListener(topics = {"test_topic"})public void handlerMsg(String content) {log.info("接收到消息:消息值:{} ",content);}
}
6,接口
@Slf4j
@RestController
public class KafkaController {@Autowiredprivate KafkaProducer kafkaProducer;@PostMapping("/sendMessage")public String sendMessage(@RequestParam String content) {kafkaProducer.sendMessage(content);return "ok";}
}
7,测试结果

接收到消息

四,参考博文
- 解决IDEA无法识别SpringBoot项目
- SpringBoot从入门到精通(十二)SpringBoot集成Kafka
- Kafka的下载安装以及使用
- Kafka消息消费流程详解
- Kafka之Consumer使用与基本原理
相关文章:
使用Spring Boot和Kafka实现消息发送和订阅
文章目录 一,新建Spring Boot1,Maven配置2,无法识别为SpringBoot项目3,无效的源发行版4,无法访问SpringApplication5,运行直接Finish6,服务运行成功 二,安装启动Kafka1,下…...
探讨uniapp的组件使用的问题
1 视图容器 1.1 view Flex是Flexible Box的缩写,意为“弹性布局”,用来为盒状模型提供最大的灵活性。 当设置display: flex后,继续给view等容器组件设置flex-direction:row或column,就可以在该容器内按行或列排布子组件。uni-ap…...
【跟小嘉学 Rust 编程】十七、面向对象语言特性
系列文章目录 【跟小嘉学 Rust 编程】一、Rust 编程基础 【跟小嘉学 Rust 编程】二、Rust 包管理工具使用 【跟小嘉学 Rust 编程】三、Rust 的基本程序概念 【跟小嘉学 Rust 编程】四、理解 Rust 的所有权概念 【跟小嘉学 Rust 编程】五、使用结构体关联结构化数据 【跟小嘉学…...
mall :rabbit项目源码解析
文章目录 一、mall开源项目1.1 来源1.2 项目转移1.3 项目克隆 二、RabbitMQ 消息中间件2.1 rabbit简介2.2 分布式后端项目的使用流程2.3 分布式后端项目的使用场景 三、安装RabbitMQ(Win10)3.1安装erLang语言,配置环境变量3.2 安装RabbitMQ服务端3.3 测试安装效果 四…...
JDBC连接数据库
目录 一.什么是JDBC 二.JDBC的实现步骤 三.简单使用JDBC 一.什么是JDBC JDBC是Java数据库连接,是java中提供数据库访问的Java API,它为关系型数据库的提供了统一访问规范。 二.JDBC的实现步骤 1.创建数据库连接 这里有两种方式: DataSource创建,提…...
Linux学习之Ubuntu 20中OpenResty的nginx目录里内容和配置文件
参考的文章是《nginx配置详解》 可以参考我以前的文章安装OpenResty。 cd /usr/local/openresty切换目录,ls -l查看目录里边的内容。 我的系统中,nginx目录是/usr/local/openresty/nginx,在这个目录里边有一些目录,如下ÿ…...
使用axi_quad_spi操作spi_flash
文章目录 基本测试情况IP支持的命令 基本测试情况 有spi_flash需要访问,为简单计,选择使用axi_quad_spi进行操作。开始时,将IP配置成如下参数, 这样配置,是想着能够适应各家的FLASH(实际使用的则是micron…...
Linux:tomcat (源码包安装)(官网下载-安装-启动-配置-等等等-----从入门到入土)
介绍 Apache Tomcat软件是一个开源实现 Jakarta Servlet、Jakarta Server Pages、Jakarta Expression Language、Jakarta WebSocket、Jakarta Annotations 和 Jakarta Authentication 规范。 这些规范是Jakarta EE平台的一部分。 Apache Tomcat软件是在开放和参与式中开发的。 …...
中科驭数以DPU先进计算技术,夯实下一代金融IT基础设施底座
由中国计算机学会主办的第19届CCF全国高性能计算学术年会(CCF HPC China 2023)于8月23日至26日在青岛成功召开。在“高性能金融计算”主题论坛上,中科驭数高级副总裁、CTO卢文岩应邀发表了题为《DPU先进计算技术助力下一代交易底座》的演讲&a…...
Android 手游聚合SDK小知识(二) 聚合分包
更新: 在上一篇文章中,我们介绍了如何聚合SDK的基本原理,介绍了聚合SDK的接口设计,那么当CP接入了我们的聚合SDK,给了我们游戏apk包时,这时我们又当如何分发渠道包呢? 分发渠道包:…...
【RISC-V】RISC-V寄存器
一、通用寄存器 32位RISC-V体系结构提供32个32位的整型通用寄存器寄存器别名全称说明X0zero零寄存器可做源寄存器(rs)或目标寄存器(rd)X1ra链接寄存器保存函数返回地址X2sp栈指针寄存器指向栈的地址X3gp全局寄存器用于链接器松弛优化X4tp线程寄存器常用于在OS中保存指向进程控…...
Python爬虫异常处理实践:处理被封禁和网站升级问题
在这篇文章中,我们将一起探讨Python爬虫异常处理实践,特别关注处理被封禁和网站升级问题。让我们一起来看看如何解决这些问题,提高我们爬虫程序的稳定性和可靠性。 首先,我们要了解为什么会遇到这些问题。网站封禁爬虫的原因主…...
重大工程建造云服务平台源码 SpringCloud+Vue
技术架构: 微服务JavaSpring Cloud VueUniApp MySql 开发语言:Java 开发工具:Idea 前端框架:Vue 后端框架:Spring Cloud 数 据 库:MySql 移 动 端:UniApp 系统端口:PC端&…...
MyBatisPlus简单入门
1、简单介绍MyBatisPlus MyBatisPlus是一个MyBatis的增强工具,在MyBatis的基础上只做增强不做改变,完全去SQL化,封装好了大量的CRUD操作。甚至吧CRUD操作封装到了Service层,可以直接在Controller调用现成的CRUD服务层,…...
神经网络入门
神经网络的基本骨架 1. nn.Module的使用 所有的模型都要继承 Module 类需要重写初始化函数和运算步骤函数 eg: import torch.nn as nn import torch.nn.functional as Fclass Model(nn.Module): # 继承父类Module def __init__(self): # 重写初始化函数super()…...
【面试经典150题】多数元素
🔗题目链接 ✈题目描述: 给定一个大小为 n 的数组 nums ,返回其中的多数元素。多数元素是指在数组中出现次数 大于 ⌊ n/2 ⌋ 的元素。 你可以假设数组是非空的,并且给定的数组总是存在多数元素。 ⌊ n/2 ⌋表示n/2结果向下取…...
c#垃圾回收(Garbage Collection)
在C#中,垃圾回收(Garbage Collection)是一种自动管理内存的机制。它负责跟踪和释放不再使用的内存,以便程序可以有效地使用内存资源。 C#中的垃圾回收器是由.NET运行时(CLR)提供和管理的。它使用了一种叫做…...
vue 基于element-plus el-button封装按钮组件
封装组件的原则是:组件只是数据流通的一个管道,不要糅合太多的逻辑在里面,是一个纯组件,还要根据自己项目的业务场景做具体的处理。 // MyButton.vue // 基于element-plus中el-button来封装按钮 <template><el-button c…...
smbus只能再python2.7下运行?不能再python3.8下运行吗?
不是的,SMBus并不只能在Python 2.7下运行,它也可以在Python 3.8及更高版本下运行。SMBus是用于访问系统上的I2C设备(Inter-Integrated Circuit,一种串行通信协议)的Python库,它应该与Python 3.8兼容。 要在…...
python中is和==的区别
is 和 的区别 在Python中,is和是两个用于比较对象的操作符,它们有不同的作用和用法。 is操作符: is用于比较两个对象的身份标识,即判断两个对象是否引用同一个内存地址的对象。当is操作符用于比较两个对象时,它会判断…...
阿里云ACP云计算备考笔记 (5)——弹性伸缩
目录 第一章 概述 第二章 弹性伸缩简介 1、弹性伸缩 2、垂直伸缩 3、优势 4、应用场景 ① 无规律的业务量波动 ② 有规律的业务量波动 ③ 无明显业务量波动 ④ 混合型业务 ⑤ 消息通知 ⑥ 生命周期挂钩 ⑦ 自定义方式 ⑧ 滚的升级 5、使用限制 第三章 主要定义 …...
FastAPI 教程:从入门到实践
FastAPI 是一个现代、快速(高性能)的 Web 框架,用于构建 API,支持 Python 3.6。它基于标准 Python 类型提示,易于学习且功能强大。以下是一个完整的 FastAPI 入门教程,涵盖从环境搭建到创建并运行一个简单的…...
c++ 面试题(1)-----深度优先搜索(DFS)实现
操作系统:ubuntu22.04 IDE:Visual Studio Code 编程语言:C11 题目描述 地上有一个 m 行 n 列的方格,从坐标 [0,0] 起始。一个机器人可以从某一格移动到上下左右四个格子,但不能进入行坐标和列坐标的数位之和大于 k 的格子。 例…...
OkHttp 中实现断点续传 demo
在 OkHttp 中实现断点续传主要通过以下步骤完成,核心是利用 HTTP 协议的 Range 请求头指定下载范围: 实现原理 Range 请求头:向服务器请求文件的特定字节范围(如 Range: bytes1024-) 本地文件记录:保存已…...
PL0语法,分析器实现!
简介 PL/0 是一种简单的编程语言,通常用于教学编译原理。它的语法结构清晰,功能包括常量定义、变量声明、过程(子程序)定义以及基本的控制结构(如条件语句和循环语句)。 PL/0 语法规范 PL/0 是一种教学用的小型编程语言,由 Niklaus Wirth 设计,用于展示编译原理的核…...
ardupilot 开发环境eclipse 中import 缺少C++
目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...
【Java学习笔记】BigInteger 和 BigDecimal 类
BigInteger 和 BigDecimal 类 二者共有的常见方法 方法功能add加subtract减multiply乘divide除 注意点:传参类型必须是类对象 一、BigInteger 1. 作用:适合保存比较大的整型数 2. 使用说明 创建BigInteger对象 传入字符串 3. 代码示例 import j…...
Java + Spring Boot + Mybatis 实现批量插入
在 Java 中使用 Spring Boot 和 MyBatis 实现批量插入可以通过以下步骤完成。这里提供两种常用方法:使用 MyBatis 的 <foreach> 标签和批处理模式(ExecutorType.BATCH)。 方法一:使用 XML 的 <foreach> 标签ÿ…...
【电力电子】基于STM32F103C8T6单片机双极性SPWM逆变(硬件篇)
本项目是基于 STM32F103C8T6 微控制器的 SPWM(正弦脉宽调制)电源模块,能够生成可调频率和幅值的正弦波交流电源输出。该项目适用于逆变器、UPS电源、变频器等应用场景。 供电电源 输入电压采集 上图为本设计的电源电路,图中 D1 为二极管, 其目的是防止正负极电源反接, …...
招商蛇口 | 执笔CID,启幕低密生活新境
作为中国城市生长的力量,招商蛇口以“美好生活承载者”为使命,深耕全球111座城市,以央企担当匠造时代理想人居。从深圳湾的开拓基因到西安高新CID的战略落子,招商蛇口始终与城市发展同频共振,以建筑诠释对土地与生活的…...
