当前位置: 首页 > news >正文

使用Spring Boot和Kafka实现消息发送和订阅

文章目录

  • 一,新建Spring Boot
    • 1,Maven配置
    • 2,无法识别为SpringBoot项目
    • 3,无效的源发行版
    • 4,无法访问SpringApplication
    • 5,运行直接Finish
    • 6,服务运行成功
  • 二,安装启动Kafka
    • 1,下载
    • 2,配置
    • 3,启动
    • 4,其他命令
  • 三,生产消费消息
    • 1,加入依赖
    • 2,yam配置文件
    • 3,报错enabled mechanisms are []
    • 4,生产者生产消息
    • 5,订阅和消费消息
    • 6,接口
    • 7,测试结果
  • 四,参考博文

一,新建Spring Boot

最近忙着搞低代码开发,好久没新建spring项目了,结果今天心血来潮准备建个springboot项目
在这里插入图片描述
注意Type选Maven,java选8,其他默认
在这里插入图片描述

1,Maven配置

点下一步后完成就新建了一个spring boot项目,配置下Maven环境,主要是settings.xml文件,里面要包含阿里云仓库,不然可能依赖下载不下来
在这里插入图片描述
在这里插入图片描述

2,无法识别为SpringBoot项目

在maven配置没问题的前提下,IDEA无法识别这是一个Spring Boot项目,倒腾半天,终于发现问题原因所在=======>是Maven版本太高的原因
在这里插入图片描述
把.mvn/wrapper目录下的maven-wrapper.properties文件第一行的版本号降低,比如说降为3.5.4,然后重新点下Maven的同步按钮
在这里插入图片描述

3,无效的源发行版

接下来运行项目报错:java: 无效的源发行版: 14
在这里插入图片描述
修改pom.xml中java.version值为8,原来是17

	<properties><java.version>17</java.version></properties>

4,无法访问SpringApplication

继续运行,继续报错在这里插入图片描述
降低spring-boot-starter-parent版本,原来是3.1.3,改为2.7.2

5,运行直接Finish

继续运行,没报错,服务直接Finished
在这里插入图片描述
需要添加web依赖

 		<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>

6,服务运行成功

终于,一个空的spring boot项目成功跑起来了,喜极而泣
在这里插入图片描述

二,安装启动Kafka

1,下载

官网=>https://kafka.apache.org/downloads,下载最新版的kafka,目前是3.5.1
在这里插入图片描述

2,配置

解压到D盘Config目录下即完成安装,目录为D:\Config\kafka_2.13-3.5.1
修改配置文件
(1) server.properties

broker.id=1
log.dirs=/Config/kafka_2.13-3.5.1/logs-kafka

(2) zookeeper.properties

dataDir=/Config/kafka_2.13-3.5.1/logs-zookeeper

3,启动

先启动zookeeper

bin\windows\zookeeper-server-start.bat config\zookeeper.properties	

再启动kafka

bin\windows\kafka-server-start.bat config\server.properties

停止的时候,先停止kafka,再停止zookeeper,直接ctrl+c停止

4,其他命令

1,查看topic列表

bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092

2,查看topic具体信息

bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic test

3,创建topic

bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

三,生产消费消息

1,加入依赖

 		<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

2,yam配置文件

application.yaml

spring:profiles:active: dev

application-dev.yaml

server:port: 8082servlet:context-path: /test-kafkaspring:cache:type: ehcacheconfig: classpath:ehcache.xmljpa:database-platform: com.enigmabridge.hibernate.dialect.SQLiteDialectkafka:bootstrap-servers: 127.0.0.1:9092consumer:group-id: kafka-demo-kafka-groupkey-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer retries: 10

3,报错enabled mechanisms are []

Connection to node -1 (activate.navicat.com/127.0.0.1:9092) failed authentication due to: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are []

在这里插入图片描述
这个错误我本地测试下来是因为没把账号密码配置这块注释掉
在这里插入图片描述

4,生产者生产消息

@Slf4j
@Component
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public String sendMessage(String content) {String topic = "test_topic";kafkaTemplate.send(topic, content).addCallback(success -> {String topic = success.getRecordMetadata().topic();int partition = success.getRecordMetadata().partition();long offset = success.getRecordMetadata().offset();log.info("发送成功:主题:{},分区:{},偏移量:{}",topic,partition,offset);}, failure -> {log.info("发送失败:{}",failure.getMessage());});return "发送成功";}
}

5,订阅和消费消息

一,订阅主题
1,获取消费者

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;import java.util.Properties;/*** kafka消费者配置* @author liuxunming*/
@Configuration
@Component
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Value("${spring.kafka.consumer.key-deserializer}")private String keyDeserializer;@Value("${spring.kafka.consumer.value-deserializer}")private String valueDeserializer;public KafkaConsumer<String, String> createConsumer() {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);return consumer;}}

2,订阅topic

 		KafkaConsumer<String, String> consumer = kafkaConfig.createConsumer();consumer.subscribe(Collections.singleton("traffic"));

3,拉取消息

 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {String key = record.key();String value = record.value();log.info("\n收到消息key=>{}\n收到消息value=>{}",key,value);
}

4,消费位移,释放资源

// 提交消费位移
consumer.commitSync();
// 关闭消费者以释放资源
consumer.close();

二,点对点模式

@Slf4j
@Component
public class KafkaConsumer {@KafkaListener(topics = {"test_topic"})public void handlerMsg(String content) {log.info("接收到消息:消息值:{} ",content);}
}

6,接口

@Slf4j
@RestController
public class KafkaController {@Autowiredprivate KafkaProducer kafkaProducer;@PostMapping("/sendMessage")public String sendMessage(@RequestParam String content) {kafkaProducer.sendMessage(content);return "ok";}
}

7,测试结果

在这里插入图片描述
接收到消息
在这里插入图片描述

四,参考博文

  1. 解决IDEA无法识别SpringBoot项目
  2. SpringBoot从入门到精通(十二)SpringBoot集成Kafka
  3. Kafka的下载安装以及使用
  4. Kafka消息消费流程详解
  5. Kafka之Consumer使用与基本原理

相关文章:

使用Spring Boot和Kafka实现消息发送和订阅

文章目录 一&#xff0c;新建Spring Boot1&#xff0c;Maven配置2&#xff0c;无法识别为SpringBoot项目3&#xff0c;无效的源发行版4&#xff0c;无法访问SpringApplication5&#xff0c;运行直接Finish6&#xff0c;服务运行成功 二&#xff0c;安装启动Kafka1&#xff0c;下…...

探讨uniapp的组件使用的问题

1 视图容器 1.1 view Flex是Flexible Box的缩写&#xff0c;意为“弹性布局”&#xff0c;用来为盒状模型提供最大的灵活性。 当设置display: flex后&#xff0c;继续给view等容器组件设置flex-direction:row或column&#xff0c;就可以在该容器内按行或列排布子组件。uni-ap…...

【跟小嘉学 Rust 编程】十七、面向对象语言特性

系列文章目录 【跟小嘉学 Rust 编程】一、Rust 编程基础 【跟小嘉学 Rust 编程】二、Rust 包管理工具使用 【跟小嘉学 Rust 编程】三、Rust 的基本程序概念 【跟小嘉学 Rust 编程】四、理解 Rust 的所有权概念 【跟小嘉学 Rust 编程】五、使用结构体关联结构化数据 【跟小嘉学…...

mall :rabbit项目源码解析

文章目录 一、mall开源项目1.1 来源1.2 项目转移1.3 项目克隆 二、RabbitMQ 消息中间件2.1 rabbit简介2.2 分布式后端项目的使用流程2.3 分布式后端项目的使用场景 三、安装RabbitMQ(Win10)3.1安装erLang语言&#xff0c;配置环境变量3.2 安装RabbitMQ服务端3.3 测试安装效果 四…...

JDBC连接数据库

目录 一.什么是JDBC 二.JDBC的实现步骤 三.简单使用JDBC 一.什么是JDBC JDBC是Java数据库连接&#xff0c;是java中提供数据库访问的Java API,它为关系型数据库的提供了统一访问规范。 二.JDBC的实现步骤 1.创建数据库连接 这里有两种方式: DataSource创建&#xff0c;提…...

Linux学习之Ubuntu 20中OpenResty的nginx目录里内容和配置文件

参考的文章是《nginx配置详解》 可以参考我以前的文章安装OpenResty。 cd /usr/local/openresty切换目录&#xff0c;ls -l查看目录里边的内容。 我的系统中&#xff0c;nginx目录是/usr/local/openresty/nginx&#xff0c;在这个目录里边有一些目录&#xff0c;如下&#xff…...

使用axi_quad_spi操作spi_flash

文章目录 基本测试情况IP支持的命令 基本测试情况 有spi_flash需要访问&#xff0c;为简单计&#xff0c;选择使用axi_quad_spi进行操作。开始时&#xff0c;将IP配置成如下参数&#xff0c; 这样配置&#xff0c;是想着能够适应各家的FLASH&#xff08;实际使用的则是micron…...

Linux:tomcat (源码包安装)(官网下载-安装-启动-配置-等等等-----从入门到入土)

介绍 Apache Tomcat软件是一个开源实现 Jakarta Servlet、Jakarta Server Pages、Jakarta Expression Language、Jakarta WebSocket、Jakarta Annotations 和 Jakarta Authentication 规范。 这些规范是Jakarta EE平台的一部分。 Apache Tomcat软件是在开放和参与式中开发的。 …...

中科驭数以DPU先进计算技术,夯实下一代金融IT基础设施底座

由中国计算机学会主办的第19届CCF全国高性能计算学术年会&#xff08;CCF HPC China 2023&#xff09;于8月23日至26日在青岛成功召开。在“高性能金融计算”主题论坛上&#xff0c;中科驭数高级副总裁、CTO卢文岩应邀发表了题为《DPU先进计算技术助力下一代交易底座》的演讲&a…...

Android 手游聚合SDK小知识(二) 聚合分包

更新&#xff1a; 在上一篇文章中&#xff0c;我们介绍了如何聚合SDK的基本原理&#xff0c;介绍了聚合SDK的接口设计&#xff0c;那么当CP接入了我们的聚合SDK&#xff0c;给了我们游戏apk包时&#xff0c;这时我们又当如何分发渠道包呢&#xff1f; 分发渠道包&#xff1a;…...

【RISC-V】RISC-V寄存器

一、通用寄存器 32位RISC-V体系结构提供32个32位的整型通用寄存器寄存器别名全称说明X0zero零寄存器可做源寄存器(rs)或目标寄存器(rd)X1ra链接寄存器保存函数返回地址X2sp栈指针寄存器指向栈的地址X3gp全局寄存器用于链接器松弛优化X4tp线程寄存器常用于在OS中保存指向进程控…...

Python爬虫异常处理实践:处理被封禁和网站升级问题

在这篇文章中&#xff0c;我们将一起探讨Python爬虫异常处理实践&#xff0c;特别关注处理被封禁和网站升级问题。让我们一起来看看如何解决这些问题&#xff0c;提高我们爬虫程序的稳定性和可靠性。   首先&#xff0c;我们要了解为什么会遇到这些问题。网站封禁爬虫的原因主…...

重大工程建造云服务平台源码 SpringCloud+Vue

技术架构&#xff1a; 微服务JavaSpring Cloud VueUniApp MySql 开发语言&#xff1a;Java 开发工具&#xff1a;Idea 前端框架&#xff1a;Vue 后端框架&#xff1a;Spring Cloud 数 据 库&#xff1a;MySql 移 动 端&#xff1a;UniApp 系统端口&#xff1a;PC端&…...

MyBatisPlus简单入门

1、简单介绍MyBatisPlus MyBatisPlus是一个MyBatis的增强工具&#xff0c;在MyBatis的基础上只做增强不做改变&#xff0c;完全去SQL化&#xff0c;封装好了大量的CRUD操作。甚至吧CRUD操作封装到了Service层&#xff0c;可以直接在Controller调用现成的CRUD服务层&#xff0c…...

神经网络入门

神经网络的基本骨架 1. nn.Module的使用 所有的模型都要继承 Module 类需要重写初始化函数和运算步骤函数 eg&#xff1a; import torch.nn as nn import torch.nn.functional as Fclass Model(nn.Module): # 继承父类Module def __init__(self): # 重写初始化函数super()…...

【面试经典150题】多数元素

&#x1f517;题目链接 ✈题目描述&#xff1a; 给定一个大小为 n 的数组 nums &#xff0c;返回其中的多数元素。多数元素是指在数组中出现次数 大于 ⌊ n/2 ⌋ 的元素。 你可以假设数组是非空的&#xff0c;并且给定的数组总是存在多数元素。 ⌊ n/2 ⌋表示n/2结果向下取…...

c#垃圾回收(Garbage Collection)

在C#中&#xff0c;垃圾回收&#xff08;Garbage Collection&#xff09;是一种自动管理内存的机制。它负责跟踪和释放不再使用的内存&#xff0c;以便程序可以有效地使用内存资源。 C#中的垃圾回收器是由.NET运行时&#xff08;CLR&#xff09;提供和管理的。它使用了一种叫做…...

vue 基于element-plus el-button封装按钮组件

封装组件的原则是&#xff1a;组件只是数据流通的一个管道&#xff0c;不要糅合太多的逻辑在里面&#xff0c;是一个纯组件&#xff0c;还要根据自己项目的业务场景做具体的处理。 // MyButton.vue // 基于element-plus中el-button来封装按钮 <template><el-button c…...

smbus只能再python2.7下运行?不能再python3.8下运行吗?

不是的&#xff0c;SMBus并不只能在Python 2.7下运行&#xff0c;它也可以在Python 3.8及更高版本下运行。SMBus是用于访问系统上的I2C设备&#xff08;Inter-Integrated Circuit&#xff0c;一种串行通信协议&#xff09;的Python库&#xff0c;它应该与Python 3.8兼容。 要在…...

python中is和==的区别

is 和 的区别 在Python中&#xff0c;is和是两个用于比较对象的操作符&#xff0c;它们有不同的作用和用法。 is操作符&#xff1a; is用于比较两个对象的身份标识&#xff0c;即判断两个对象是否引用同一个内存地址的对象。当is操作符用于比较两个对象时&#xff0c;它会判断…...

linux之kylin系统nginx的安装

一、nginx的作用 1.可做高性能的web服务器 直接处理静态资源&#xff08;HTML/CSS/图片等&#xff09;&#xff0c;响应速度远超传统服务器类似apache支持高并发连接 2.反向代理服务器 隐藏后端服务器IP地址&#xff0c;提高安全性 3.负载均衡服务器 支持多种策略分发流量…...

Spark 之 入门讲解详细版(1)

1、简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室&#xff08;Algorithms, Machines, and People Lab&#xff09;开发通用内存并行计算框架。Spark在2013年6月进入Apache成为孵化项目&#xff0c;8个月后成为Apache顶级项目&#xff0c;速度之快足见过人之处&…...

visual studio 2022更改主题为深色

visual studio 2022更改主题为深色 点击visual studio 上方的 工具-> 选项 在选项窗口中&#xff0c;选择 环境 -> 常规 &#xff0c;将其中的颜色主题改成深色 点击确定&#xff0c;更改完成...

UE5 学习系列(三)创建和移动物体

这篇博客是该系列的第三篇&#xff0c;是在之前两篇博客的基础上展开&#xff0c;主要介绍如何在操作界面中创建和拖动物体&#xff0c;这篇博客跟随的视频链接如下&#xff1a; B 站视频&#xff1a;s03-创建和移动物体 如果你不打算开之前的博客并且对UE5 比较熟的话按照以…...

【项目实战】通过多模态+LangGraph实现PPT生成助手

PPT自动生成系统 基于LangGraph的PPT自动生成系统&#xff0c;可以将Markdown文档自动转换为PPT演示文稿。 功能特点 Markdown解析&#xff1a;自动解析Markdown文档结构PPT模板分析&#xff1a;分析PPT模板的布局和风格智能布局决策&#xff1a;匹配内容与合适的PPT布局自动…...

江苏艾立泰跨国资源接力:废料变黄金的绿色供应链革命

在华东塑料包装行业面临限塑令深度调整的背景下&#xff0c;江苏艾立泰以一场跨国资源接力的创新实践&#xff0c;重新定义了绿色供应链的边界。 跨国回收网络&#xff1a;废料变黄金的全球棋局 艾立泰在欧洲、东南亚建立再生塑料回收点&#xff0c;将海外废弃包装箱通过标准…...

python爬虫:Newspaper3k 的详细使用(好用的新闻网站文章抓取和解析的Python库)

更多内容请见: 爬虫和逆向教程-专栏介绍和目录 文章目录 一、Newspaper3k 概述1.1 Newspaper3k 介绍1.2 主要功能1.3 典型应用场景1.4 安装二、基本用法2.2 提取单篇文章的内容2.2 处理多篇文档三、高级选项3.1 自定义配置3.2 分析文章情感四、实战案例4.1 构建新闻摘要聚合器…...

从零开始打造 OpenSTLinux 6.6 Yocto 系统(基于STM32CubeMX)(九)

设备树移植 和uboot设备树修改的内容同步到kernel将设备树stm32mp157d-stm32mp157daa1-mx.dts复制到内核源码目录下 源码修改及编译 修改arch/arm/boot/dts/st/Makefile&#xff0c;新增设备树编译 stm32mp157f-ev1-m4-examples.dtb \stm32mp157d-stm32mp157daa1-mx.dtb修改…...

《基于Apache Flink的流处理》笔记

思维导图 1-3 章 4-7章 8-11 章 参考资料 源码&#xff1a; https://github.com/streaming-with-flink 博客 https://flink.apache.org/bloghttps://www.ververica.com/blog 聚会及会议 https://flink-forward.orghttps://www.meetup.com/topics/apache-flink https://n…...

力扣-35.搜索插入位置

题目描述 给定一个排序数组和一个目标值&#xff0c;在数组中找到目标值&#xff0c;并返回其索引。如果目标值不存在于数组中&#xff0c;返回它将会被按顺序插入的位置。 请必须使用时间复杂度为 O(log n) 的算法。 class Solution {public int searchInsert(int[] nums, …...