Spring Cloud Stream 3.x+kafka 3.8整合
Spring Cloud Stream 3.x+kafka 3.8整合,文末有完整项目链接
- 前言
- 一、如何看官方文档(有深入了解需求的人)
- 二、kafka的安装
- tar包安装
- docker安装
- 三、代码中集成
- 创建一个测试topic:test
- producer代码
- producer配置(配置的格式,上篇文章我有详细解释,大家可以回看)
- Consumer代码
- Consumer 配置
- Consumer 2的代码和配置
- 四、测试
- 五、结语
前言
上一篇文章,我们用Spring Cloud Stream整合了RocketMQ:SpringCloud Alibaba五大组件之——RocketMQ,趁着此机会,继续学习了解一下Spring Cloud Stream,本文就以kafka为例。本文项目用到的所有Maven依赖和版本,都是和前面几篇文章一样。
由于整合kafka 不需要用到Cloud Alibaba一系列的技术,所以下载到源码运行不起来的,请删除mysql,nacos,dubbo,redis等一系列相关的依赖和代码。本文写下的时候,kafka最新版本为3.8版本,所以就以3.8版本举例说明。
官方中文文档:https://kafka1x.apachecn.org/documentation.html
官网文档:https://kafka.apache.org/documentation/
中文文档的版本比较老,建议大家对照着英文文档3.8版本的,相互结合起来看。
一、如何看官方文档(有深入了解需求的人)
1.基础操作:建议大家看operation一栏,后面我会简单贴出基本安装使用流程

2.配置建议看中文版本

二、kafka的安装
tar包安装
-
下载链接:kafka_2.13-3.8.0.tgz
-
选择一个合适的位置解压
tar -zxvf kafka_2.12-3.8.0.tgz -
启动自带的zookeeper(后台启动)
nohup bin/zookeeper-server-start.sh config/zookeeper.properties & -
修改kafka server的配置文件,便于外网能够访问
找到bin\config目录下的server.properties文件
修改以下两行listeners照着我这样写,advertised.listeners修改为你服务器的ip,端口默认9092listeners=PLAINTEXT://0.0.0.0:9092 advertised.listeners=PLAINTEXT://172.16.72.133:9092 -
启动kafka server(后台启动)
nohup bin/kafka-server-start.sh config/server.properties & -
稍微扩展一下,集群的搭建,比如我们要扩展为三个集群代理:
首先,为每个代理创建一个配置文件 (在Windows上使用copy 命令来代替):cp config/server.properties config/server-1.properties cp config/server.properties config/server-2.properties编辑这些新文件并设置如下属性:
config/server-1.properties:broker.id=1listeners=PLAINTEXT://:9093log.dir=/tmp/kafka-logs-1config/server-2.properties:broker.id=2listeners=PLAINTEXT://:9094log.dir=/tmp/kafka-logs-2broker.id属性是集群中每个节点的名称,这一名称是唯一且永久的,必须重写端口和日志目录
然后启动就好了:低一个启动的为leader,如果杀死leader,会重新推荐一个leader出来bin/kafka-server-start.sh config/server-1.properties & bin/kafka-server-start.sh config/server-2.properties &但是这样扩展的唯一不好的一点就是,会没有以前的数据,新的topic不影响,具体操作大家可以看文档。
docker安装
-
拉取镜像
docker pull apache/kafka:3.8.0 -
启动
docker run -p -d 9092:9092 apache/kafka:3.8.0
三、代码中集成
创建一个测试topic:test
bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
所有的topic的操作,都可以用kafka-topics.sh来操作,具体的可以看文档。老版本的启动是加–zookeeper的,会报错not found,新版本要用–bootstrap-server。
producer代码
@RestController
@RequestMapping("/mqtest")
public class KafkaTestController {private static final Logger logger = LoggerFactory.getLogger(KafkaTestController.class);@Autowiredprivate StreamBridge streamBridge;@RequestMapping("/test1")public void testOne() {Message<SimpleMsg> msg = new GenericMessage<>(new SimpleMsg("我是 broadcastMessage", new Date().toString()));streamBridge.send("broadcastMessage-out-0", msg);}
}
自定义消息体SimpleMsg,此类不需要序列化
@AllArgsConstructor
@Data
@NoArgsConstructor
public class SimpleMsg{private String msg;private String time;
}
producer配置(配置的格式,上篇文章我有详细解释,大家可以回看)
key:serializer 重中之重,发送对象消息的时候,解决转换错误,SpringCloudStream默认的是ByteArraySerializer,但是kafkamore默认的是String
spring:cloud:stream:kafka:binder:##kafka的server地址brokers: 172.16.72.133:9092##如果topic不存在则创建auto-create-topics: trueauto-add-partitions: true #自动分区min-partition-count: 1 #最小分区##这个序列化很关键,如果不加这个配置,则发送对象消息时候,会报转换错误configuration:key:serializer: org.apache.kafka.common.serialization.ByteArraySerializerbindings:broadcastMessage-out-0:destination: testcontent-type: application/json
Consumer代码
@Beanpublic Consumer<Message<SimpleMsg>> broadcastMessage() {return msg -> {log.info(Thread.currentThread().getName() + " Consumer1 Receive New Messages: " + msg.getPayload().getMsg() + msg.getPayload().getTime());};}
Consumer 配置
项目中有更详细的配置,这里为了测试用的简化版
spring:cloud:stream:function:definition: broadcastMessagekafka:binder:brokers: 172.16.72.133:9092auto-create-topics: trueconfiguration:key:serializer: org.apache.kafka.common.serialization.ByteArraySerializerbindings:broadcastMessage-in-0:destination: testgroup: test-topic-accountcontent-type: application/json
Consumer 2的代码和配置
项目中还有一个friend模块,当做第二个消费者,代码和配置和Consumer 1完全一样,唯一不同的就是可以设置group不同,这里就不贴代码了。
四、测试
生产者发送两个消息

两个消费者实例,分组一样,则轮询消费,分组不同,则单独消费
account模块消费者:

friend模块消费者:

五、结语
到这篇文章,这一个系列基本就算结束了,后面可能会补充一下内容,或者去写点其他的东西。或者说,去研究下springboot的集成而不用Spring Cloud Stream,后面再说吧。
本文完整项目代码GitHub地址,请切换到kafka分支
https:https://github.com/wangqing-github/DubboAndNacos.git
ssh:git@github.com:wangqing-github/DubboAndNacos.git
相关文章:
Spring Cloud Stream 3.x+kafka 3.8整合
Spring Cloud Stream 3.xkafka 3.8整合,文末有完整项目链接 前言一、如何看官方文档(有深入了解需求的人)二、kafka的安装tar包安装docker安装 三、代码中集成创建一个测试topic:testproducer代码producer配置(配置的格式,上篇文章…...
JavaScript中的数组
1.数组的概念 数组可以把一组相关的数据一起存放,并提供方便的访问/获取方式数组是指一组数据的集合,其中每个数据称之为元素(element),在数组中可以存放任意类型的元素,数组是一种将一组数据存储在单个变量名下的优雅方式。 2.…...
UE5运行时动态加载场景角色动画任意搭配-场景角色相机动画音乐加载方法(三)
1、将场景打包为Pak并加载 1、参考这篇文章将场景打包为pak,UE4打包并加载Pak-Windows/iOS/Android不同平台Editor/Runtime不同运行模式兼容 2、在Mount Pak后直接打开Map即可 void UMapManager::OpenMap(FString Path) {UWorld* World = UGlobalManager::GetInstance()->…...
c# 中 中文、英文、数字、空格、标点符号占的字符大小
在C#中,中文、英文、数字、空格和标点符号在不同编码下所占的字节大小是不一样的。常见的编码有UTF-8、UTF-16、GB2312等。以下是在不同编码下各种字符类型所占的字节大小: UTF-8: 中文字符:3个字节 英文字符:1个字…...
前端_003_js扫盲
文章目录 var,let,const严格模式数据类型运算符事件常用对象函数绑定call() ,apply(),bind() 闭包浏览器中事件循环回调和异步Promiseasync和await DOMBOMAjax var,let,const let是var的升级版本,对于块作用域,var无法进行限制,let不会存在该…...
ValueError: You cannot perform fine-tuning on purely quantized models.
在使用peft 微调8bit 或者4bit 模型的时候,可能会报错: You cannot perform fine-tuning on purely quantized models. Please attach trainable adapters on top of the quantized model to correctly perform fine-tuning. Please see: https://huggi…...
DELL R720服务器阵列数据恢复,磁盘状态为Foreign
服务器无法正常进入系统,物理磁盘状态变成了Foreign 虚拟磁盘状态变成了Failed 阵列已经丢失了,需要手工强制导入外部配置 单击 Main Menu 屏幕上的 Configuration Management。单击 Manage Foreign Configuration 单击 Preview Foreign Configurati…...
VMDK 0X80BB0005 VirtualBOX虚拟机错误处理-数据恢复——未来之窗数据恢复
打开虚拟盘文件in7.vmdk 失败. Could not get the storage format of the medium 7\win7.vmdk (VERR_NOT_SUPPORTED). 返回 代码:VBOX_E_IPRT_ERROR (0X80BB0005) 组件:MediumWrap 界面:IMedium {a a3f2dfb1} 被召者:IVirtualBox {768 cd607} 被召者 RC:VBOX_E_OBJECT_NOT_F…...
【Verilog学习日常】—牛客网刷题—Verilog企业真题—VL67
十六进制计数器 描述 请用Verilog设计十六进制递增计数器电路,每个时钟周期递增1。 电路的接口如下图所示。Q[3:0]中,Q[3]是高位。 接口电路图如下: 输入描述: input clk , input rst_n ,…...
51、AVR、ARM、DSP等常用芯片之对比
51芯片 51芯片通常指的是基于8051内核的单片机,这是一种经典的微控制器(MCU)。虽然关于51芯片的详细现代应用和发展可能因具体型号和厂商而有所不同,但基于8051内核的单片机通常具有以下特点: 结构经典:8…...
PostgreSQL 和Oracle 表压缩的对比
PostgreSQL 和Oracle 表压缩的对比 Oracle 和 PostgreSQL 在表压缩的性能方面存在显著差异,主要体现在实现方式、压缩效果、对系统性能的影响以及适用场景等方面。以下是对两者表压缩性能的详细对比: 1. 实现方式 Oracle 表压缩 Oracle 提供了多种压…...
【pyspark学习从入门到精通3】弹性分布式数据集_1
目录 RDD 的内部工作机制 创建 RDDs Schema 从文件中读取 弹性分布式数据集(RDDs)是一种分布式的不可变 JVM 对象集合,它允许你非常快速地执行计算,并且它们是 Apache Spark 的支柱。 顾名思义,数据集是分布式的&a…...
宠物健康监测仪健康守护者
在宠物护理领域,一款名为宠物健康监测仪的智能设备正逐渐成为宠物主人的新宠。这款设备不仅仅是一个简单的听诊器,它更像是宠物健康的智能管家,能够实时监测宠物的生理指标,并根据这些数据提供个性化的健康建议。 宠物健康监测仪…...
手写mybatis之解析和使用ResultMap映射参数配置
前言 学习源码是在学习什么呢? 就是为了通过这些源码级复杂模型中,学习系统框架的架构思维、设计原则和设计模式。在这些源码学习手写的过程中,感受、吸收并也是锻炼一种思维习惯,并尝试把这些思路技术迁移到平常的复杂业务设计开…...
LDR6500:低成本一拖二快充线解决方案
随着科技的飞速发展,我们的电子设备日益增多,从智能手机到平板电脑,再到各种可穿戴设备,它们已成为我们日常生活不可或缺的一部分。然而,随之而来的充电问题也日益凸显。为了解决这一难题,Type-C接口一拖二…...
DS线性表之单链表的讲解和实现(2)
文章目录 前言一、链表的概念二、链表的分类三、链表的结构四、前置知识准备五、单链表的模拟实现定义头节点初始化单链表销毁单链表打印单链表申请节点头插数据尾插数据头删数据尾删数据查询数据在pos位置之后插入数据删除pos位置之后的数据 总结 前言 本篇的单链表完全来说是…...
LeetCode 73 Set Matrix Zeroes 题目解析和python代码
题目: Given an m x n integer matrix matrix, if an element is 0, set its entire row and column to 0’s. You must do it in place. Example 1: Input: matrix [[1,1,1],[1,0,1],[1,1,1]] Output: [[1,0,1],[0,0,0],[1,0,1]] Example 2: Input: matrix …...
鸿蒙--WaterFlow 实现商城首页
目录结构 ├──entry/src/main/ets // 代码区 │ ├──common │ │ ├──constants │ │ │ └──CommonConstants.ets // 公共常量类 │ │ └──utils │ │ └──Logger.ets // 日志打印类 │ ├──entryability │ │ └──EntryAbility.ets // 程序入口…...
QT 中如何保存matlab 能打开的.mat数据矩阵!
Windows 上安装并使用 MATIO 库来保存 MATLAB 格式的 .mat 文件,需要进行以下步骤: 1. 下载并安装 CMake MATIO 使用 CMake 构建项目,因此你需要先安装 CMake。 前往 CMake 官网下载适用于 Windows 的安装程序并安装。 2. 下载 MATIO 库源…...
菱形继承(多继承)
1. 什么是菱形继承 也就是多继承,C独有的特性。 2. 菱形继承有什么问题? (1)存在内存浪费,多存一份父类的父类。 (2)容易造成二义性(不知道修改哪一个基本属性)。 3. 如…...
告别迷茫!在嵌入式Linux上用libwebsockets v4.0实现WebSocket客户端(含SSL配置避坑)
嵌入式Linux实战:libwebsockets v4.0客户端开发与SSL避坑指南 当树莓派的GPIO引脚需要与云端实时同步数据时,WebSocket往往是嵌入式开发者的首选协议。但面对内存仅512MB的ARMv7开发板,选用一个既支持SSL加密又能兼容C99标准的轻量级库&#…...
高考解析几何“秒杀”技巧:用极点极线快速搞定椭圆定点定值难题
高考解析几何“秒杀”技巧:用极点极线快速搞定椭圆定点定值难题 解析几何作为高考数学的压轴题型,常常让考生望而生畏。面对复杂的计算和抽象的条件,如何在有限时间内快速找到突破口?极点极线理论作为高等几何中的重要工具&#x…...
iOS越狱终极指南:解锁iPhone隐藏功能的3个关键步骤
iOS越狱终极指南:解锁iPhone隐藏功能的3个关键步骤 【免费下载链接】Jailbreak iOS 26.4 - 26, 17 - 17.7.5 & iOS 18 - 18.7.3 Jailbreak Tools, Cydia/Sileo/Zebra Tweaks & Jailbreak News Updates || AI Jailbreak Finder 👇 项目地址: ht…...
Linuxbonding链路异常定位实战
Linuxbonding链路异常定位实战这是一篇面向中级 Linux 使用者的技术文章,主题聚焦在bonding链路,重点讨论链路聚合、冗余切换和接口状态。在真实生产环境中,bonding链路相关问题往往不会以单一错误形式出现,而是混杂在日志、权限、…...
终极罗技PUBG鼠标宏配置指南:5步告别压枪烦恼
终极罗技PUBG鼠标宏配置指南:5步告别压枪烦恼 【免费下载链接】logitech-pubg PUBG no recoil script for Logitech gaming mouse / 绝地求生 罗技 鼠标宏 项目地址: https://gitcode.com/gh_mirrors/lo/logitech-pubg 还在为《绝地求生》中疯狂上跳的枪口而…...
Cursor IDE事件日志分析工具:Python实现开发者行为可视化与效率洞察
1. 项目概述:一个为开发者“把脉”的智能分析工具如果你是一名开发者,尤其是深度使用Cursor这类AI编程助手的开发者,你肯定有过这样的体验:面对一个复杂的项目,你向AI助手提了无数个问题,生成了大量代码片段…...
AI驱动代码审查:Cursor与Git工作流融合实践
1. 项目概述:当AI代码助手遇上代码审查最近在GitHub上看到一个挺有意思的项目,叫guinacio/cursor-review。光看名字,你可能会觉得这又是一个普通的代码审查工具,但点进去仔细研究,你会发现它的核心思路非常巧妙&#x…...
CN2628 可用太阳能供电 5 伏特低压差电压调制集成电路
概述: CN2628是一款可用太阳能供电的低噪声线性电压调制集成电路,采用固定5.0V输出电压,最大 输出电流可达1安培,在5.5V到7V的输入电压范围内输出电压精度可达1%。CN2628工作电流只有520微安,而且同输入和输出的压差没有关系。 CN…...
AI量化交易实战:从机器学习模型到加密货币对冲基金系统构建
1. 项目概述:一个面向加密货币的AI对冲基金框架最近几年,AI在量化交易领域的应用已经从实验室走向了实战,尤其是在波动性极高的加密货币市场。如果你对量化交易和机器学习感兴趣,并且想找一个能直接上手、结构清晰的实战项目来学习…...
Arm Neoverse CMN-700多芯片架构与一致性哈希解析
1. Arm Neoverse CMN-700多芯片架构解析在现代高性能计算领域,多芯片系统架构已成为突破单芯片性能瓶颈的关键技术路径。Arm Neoverse CMN-700作为第二代一致性网状网络控制器,其设计哲学体现在三个维度:首先是通过模块化设计实现计算单元的可…...
