Spring Cloud Stream 3.x+kafka 3.8整合
Spring Cloud Stream 3.x+kafka 3.8整合,文末有完整项目链接
- 前言
- 一、如何看官方文档(有深入了解需求的人)
- 二、kafka的安装
- tar包安装
- docker安装
- 三、代码中集成
- 创建一个测试topic:test
- producer代码
- producer配置(配置的格式,上篇文章我有详细解释,大家可以回看)
- Consumer代码
- Consumer 配置
- Consumer 2的代码和配置
- 四、测试
- 五、结语
前言
上一篇文章,我们用Spring Cloud Stream整合了RocketMQ:SpringCloud Alibaba五大组件之——RocketMQ,趁着此机会,继续学习了解一下Spring Cloud Stream,本文就以kafka为例。本文项目用到的所有Maven依赖和版本,都是和前面几篇文章一样。
由于整合kafka 不需要用到Cloud Alibaba一系列的技术,所以下载到源码运行不起来的,请删除mysql,nacos,dubbo,redis等一系列相关的依赖和代码。本文写下的时候,kafka最新版本为3.8版本,所以就以3.8版本举例说明。
官方中文文档:https://kafka1x.apachecn.org/documentation.html
官网文档:https://kafka.apache.org/documentation/
中文文档的版本比较老,建议大家对照着英文文档3.8版本的,相互结合起来看。
一、如何看官方文档(有深入了解需求的人)
1.基础操作:建议大家看operation一栏,后面我会简单贴出基本安装使用流程

2.配置建议看中文版本

二、kafka的安装
tar包安装
-
下载链接:kafka_2.13-3.8.0.tgz
-
选择一个合适的位置解压
tar -zxvf kafka_2.12-3.8.0.tgz -
启动自带的zookeeper(后台启动)
nohup bin/zookeeper-server-start.sh config/zookeeper.properties & -
修改kafka server的配置文件,便于外网能够访问
找到bin\config目录下的server.properties文件
修改以下两行listeners照着我这样写,advertised.listeners修改为你服务器的ip,端口默认9092listeners=PLAINTEXT://0.0.0.0:9092 advertised.listeners=PLAINTEXT://172.16.72.133:9092 -
启动kafka server(后台启动)
nohup bin/kafka-server-start.sh config/server.properties & -
稍微扩展一下,集群的搭建,比如我们要扩展为三个集群代理:
首先,为每个代理创建一个配置文件 (在Windows上使用copy 命令来代替):cp config/server.properties config/server-1.properties cp config/server.properties config/server-2.properties编辑这些新文件并设置如下属性:
config/server-1.properties:broker.id=1listeners=PLAINTEXT://:9093log.dir=/tmp/kafka-logs-1config/server-2.properties:broker.id=2listeners=PLAINTEXT://:9094log.dir=/tmp/kafka-logs-2broker.id属性是集群中每个节点的名称,这一名称是唯一且永久的,必须重写端口和日志目录
然后启动就好了:低一个启动的为leader,如果杀死leader,会重新推荐一个leader出来bin/kafka-server-start.sh config/server-1.properties & bin/kafka-server-start.sh config/server-2.properties &但是这样扩展的唯一不好的一点就是,会没有以前的数据,新的topic不影响,具体操作大家可以看文档。
docker安装
-
拉取镜像
docker pull apache/kafka:3.8.0 -
启动
docker run -p -d 9092:9092 apache/kafka:3.8.0
三、代码中集成
创建一个测试topic:test
bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
所有的topic的操作,都可以用kafka-topics.sh来操作,具体的可以看文档。老版本的启动是加–zookeeper的,会报错not found,新版本要用–bootstrap-server。
producer代码
@RestController
@RequestMapping("/mqtest")
public class KafkaTestController {private static final Logger logger = LoggerFactory.getLogger(KafkaTestController.class);@Autowiredprivate StreamBridge streamBridge;@RequestMapping("/test1")public void testOne() {Message<SimpleMsg> msg = new GenericMessage<>(new SimpleMsg("我是 broadcastMessage", new Date().toString()));streamBridge.send("broadcastMessage-out-0", msg);}
}
自定义消息体SimpleMsg,此类不需要序列化
@AllArgsConstructor
@Data
@NoArgsConstructor
public class SimpleMsg{private String msg;private String time;
}
producer配置(配置的格式,上篇文章我有详细解释,大家可以回看)
key:serializer 重中之重,发送对象消息的时候,解决转换错误,SpringCloudStream默认的是ByteArraySerializer,但是kafkamore默认的是String
spring:cloud:stream:kafka:binder:##kafka的server地址brokers: 172.16.72.133:9092##如果topic不存在则创建auto-create-topics: trueauto-add-partitions: true #自动分区min-partition-count: 1 #最小分区##这个序列化很关键,如果不加这个配置,则发送对象消息时候,会报转换错误configuration:key:serializer: org.apache.kafka.common.serialization.ByteArraySerializerbindings:broadcastMessage-out-0:destination: testcontent-type: application/json
Consumer代码
@Beanpublic Consumer<Message<SimpleMsg>> broadcastMessage() {return msg -> {log.info(Thread.currentThread().getName() + " Consumer1 Receive New Messages: " + msg.getPayload().getMsg() + msg.getPayload().getTime());};}
Consumer 配置
项目中有更详细的配置,这里为了测试用的简化版
spring:cloud:stream:function:definition: broadcastMessagekafka:binder:brokers: 172.16.72.133:9092auto-create-topics: trueconfiguration:key:serializer: org.apache.kafka.common.serialization.ByteArraySerializerbindings:broadcastMessage-in-0:destination: testgroup: test-topic-accountcontent-type: application/json
Consumer 2的代码和配置
项目中还有一个friend模块,当做第二个消费者,代码和配置和Consumer 1完全一样,唯一不同的就是可以设置group不同,这里就不贴代码了。
四、测试
生产者发送两个消息

两个消费者实例,分组一样,则轮询消费,分组不同,则单独消费
account模块消费者:

friend模块消费者:

五、结语
到这篇文章,这一个系列基本就算结束了,后面可能会补充一下内容,或者去写点其他的东西。或者说,去研究下springboot的集成而不用Spring Cloud Stream,后面再说吧。
本文完整项目代码GitHub地址,请切换到kafka分支
https:https://github.com/wangqing-github/DubboAndNacos.git
ssh:git@github.com:wangqing-github/DubboAndNacos.git
相关文章:
Spring Cloud Stream 3.x+kafka 3.8整合
Spring Cloud Stream 3.xkafka 3.8整合,文末有完整项目链接 前言一、如何看官方文档(有深入了解需求的人)二、kafka的安装tar包安装docker安装 三、代码中集成创建一个测试topic:testproducer代码producer配置(配置的格式,上篇文章…...
JavaScript中的数组
1.数组的概念 数组可以把一组相关的数据一起存放,并提供方便的访问/获取方式数组是指一组数据的集合,其中每个数据称之为元素(element),在数组中可以存放任意类型的元素,数组是一种将一组数据存储在单个变量名下的优雅方式。 2.…...
UE5运行时动态加载场景角色动画任意搭配-场景角色相机动画音乐加载方法(三)
1、将场景打包为Pak并加载 1、参考这篇文章将场景打包为pak,UE4打包并加载Pak-Windows/iOS/Android不同平台Editor/Runtime不同运行模式兼容 2、在Mount Pak后直接打开Map即可 void UMapManager::OpenMap(FString Path) {UWorld* World = UGlobalManager::GetInstance()->…...
c# 中 中文、英文、数字、空格、标点符号占的字符大小
在C#中,中文、英文、数字、空格和标点符号在不同编码下所占的字节大小是不一样的。常见的编码有UTF-8、UTF-16、GB2312等。以下是在不同编码下各种字符类型所占的字节大小: UTF-8: 中文字符:3个字节 英文字符:1个字…...
前端_003_js扫盲
文章目录 var,let,const严格模式数据类型运算符事件常用对象函数绑定call() ,apply(),bind() 闭包浏览器中事件循环回调和异步Promiseasync和await DOMBOMAjax var,let,const let是var的升级版本,对于块作用域,var无法进行限制,let不会存在该…...
ValueError: You cannot perform fine-tuning on purely quantized models.
在使用peft 微调8bit 或者4bit 模型的时候,可能会报错: You cannot perform fine-tuning on purely quantized models. Please attach trainable adapters on top of the quantized model to correctly perform fine-tuning. Please see: https://huggi…...
DELL R720服务器阵列数据恢复,磁盘状态为Foreign
服务器无法正常进入系统,物理磁盘状态变成了Foreign 虚拟磁盘状态变成了Failed 阵列已经丢失了,需要手工强制导入外部配置 单击 Main Menu 屏幕上的 Configuration Management。单击 Manage Foreign Configuration 单击 Preview Foreign Configurati…...
VMDK 0X80BB0005 VirtualBOX虚拟机错误处理-数据恢复——未来之窗数据恢复
打开虚拟盘文件in7.vmdk 失败. Could not get the storage format of the medium 7\win7.vmdk (VERR_NOT_SUPPORTED). 返回 代码:VBOX_E_IPRT_ERROR (0X80BB0005) 组件:MediumWrap 界面:IMedium {a a3f2dfb1} 被召者:IVirtualBox {768 cd607} 被召者 RC:VBOX_E_OBJECT_NOT_F…...
【Verilog学习日常】—牛客网刷题—Verilog企业真题—VL67
十六进制计数器 描述 请用Verilog设计十六进制递增计数器电路,每个时钟周期递增1。 电路的接口如下图所示。Q[3:0]中,Q[3]是高位。 接口电路图如下: 输入描述: input clk , input rst_n ,…...
51、AVR、ARM、DSP等常用芯片之对比
51芯片 51芯片通常指的是基于8051内核的单片机,这是一种经典的微控制器(MCU)。虽然关于51芯片的详细现代应用和发展可能因具体型号和厂商而有所不同,但基于8051内核的单片机通常具有以下特点: 结构经典:8…...
PostgreSQL 和Oracle 表压缩的对比
PostgreSQL 和Oracle 表压缩的对比 Oracle 和 PostgreSQL 在表压缩的性能方面存在显著差异,主要体现在实现方式、压缩效果、对系统性能的影响以及适用场景等方面。以下是对两者表压缩性能的详细对比: 1. 实现方式 Oracle 表压缩 Oracle 提供了多种压…...
【pyspark学习从入门到精通3】弹性分布式数据集_1
目录 RDD 的内部工作机制 创建 RDDs Schema 从文件中读取 弹性分布式数据集(RDDs)是一种分布式的不可变 JVM 对象集合,它允许你非常快速地执行计算,并且它们是 Apache Spark 的支柱。 顾名思义,数据集是分布式的&a…...
宠物健康监测仪健康守护者
在宠物护理领域,一款名为宠物健康监测仪的智能设备正逐渐成为宠物主人的新宠。这款设备不仅仅是一个简单的听诊器,它更像是宠物健康的智能管家,能够实时监测宠物的生理指标,并根据这些数据提供个性化的健康建议。 宠物健康监测仪…...
手写mybatis之解析和使用ResultMap映射参数配置
前言 学习源码是在学习什么呢? 就是为了通过这些源码级复杂模型中,学习系统框架的架构思维、设计原则和设计模式。在这些源码学习手写的过程中,感受、吸收并也是锻炼一种思维习惯,并尝试把这些思路技术迁移到平常的复杂业务设计开…...
LDR6500:低成本一拖二快充线解决方案
随着科技的飞速发展,我们的电子设备日益增多,从智能手机到平板电脑,再到各种可穿戴设备,它们已成为我们日常生活不可或缺的一部分。然而,随之而来的充电问题也日益凸显。为了解决这一难题,Type-C接口一拖二…...
DS线性表之单链表的讲解和实现(2)
文章目录 前言一、链表的概念二、链表的分类三、链表的结构四、前置知识准备五、单链表的模拟实现定义头节点初始化单链表销毁单链表打印单链表申请节点头插数据尾插数据头删数据尾删数据查询数据在pos位置之后插入数据删除pos位置之后的数据 总结 前言 本篇的单链表完全来说是…...
LeetCode 73 Set Matrix Zeroes 题目解析和python代码
题目: Given an m x n integer matrix matrix, if an element is 0, set its entire row and column to 0’s. You must do it in place. Example 1: Input: matrix [[1,1,1],[1,0,1],[1,1,1]] Output: [[1,0,1],[0,0,0],[1,0,1]] Example 2: Input: matrix …...
鸿蒙--WaterFlow 实现商城首页
目录结构 ├──entry/src/main/ets // 代码区 │ ├──common │ │ ├──constants │ │ │ └──CommonConstants.ets // 公共常量类 │ │ └──utils │ │ └──Logger.ets // 日志打印类 │ ├──entryability │ │ └──EntryAbility.ets // 程序入口…...
QT 中如何保存matlab 能打开的.mat数据矩阵!
Windows 上安装并使用 MATIO 库来保存 MATLAB 格式的 .mat 文件,需要进行以下步骤: 1. 下载并安装 CMake MATIO 使用 CMake 构建项目,因此你需要先安装 CMake。 前往 CMake 官网下载适用于 Windows 的安装程序并安装。 2. 下载 MATIO 库源…...
菱形继承(多继承)
1. 什么是菱形继承 也就是多继承,C独有的特性。 2. 菱形继承有什么问题? (1)存在内存浪费,多存一份父类的父类。 (2)容易造成二义性(不知道修改哪一个基本属性)。 3. 如…...
变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析
一、变量声明设计:let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性,这种设计体现了语言的核心哲学。以下是深度解析: 1.1 设计理念剖析 安全优先原则:默认不可变强制开发者明确声明意图 let x 5; …...
VB.net复制Ntag213卡写入UID
本示例使用的发卡器:https://item.taobao.com/item.htm?ftt&id615391857885 一、读取旧Ntag卡的UID和数据 Private Sub Button15_Click(sender As Object, e As EventArgs) Handles Button15.Click轻松读卡技术支持:网站:Dim i, j As IntegerDim cardidhex, …...
从零实现富文本编辑器#5-编辑器选区模型的状态结构表达
先前我们总结了浏览器选区模型的交互策略,并且实现了基本的选区操作,还调研了自绘选区的实现。那么相对的,我们还需要设计编辑器的选区表达,也可以称为模型选区。编辑器中应用变更时的操作范围,就是以模型选区为基准来…...
【Java学习笔记】Arrays类
Arrays 类 1. 导入包:import java.util.Arrays 2. 常用方法一览表 方法描述Arrays.toString()返回数组的字符串形式Arrays.sort()排序(自然排序和定制排序)Arrays.binarySearch()通过二分搜索法进行查找(前提:数组是…...
基于服务器使用 apt 安装、配置 Nginx
🧾 一、查看可安装的 Nginx 版本 首先,你可以运行以下命令查看可用版本: apt-cache madison nginx-core输出示例: nginx-core | 1.18.0-6ubuntu14.6 | http://archive.ubuntu.com/ubuntu focal-updates/main amd64 Packages ng…...
全球首个30米分辨率湿地数据集(2000—2022)
数据简介 今天我们分享的数据是全球30米分辨率湿地数据集,包含8种湿地亚类,该数据以0.5X0.5的瓦片存储,我们整理了所有属于中国的瓦片名称与其对应省份,方便大家研究使用。 该数据集作为全球首个30米分辨率、覆盖2000–2022年时间…...
Python爬虫(二):爬虫完整流程
爬虫完整流程详解(7大核心步骤实战技巧) 一、爬虫完整工作流程 以下是爬虫开发的完整流程,我将结合具体技术点和实战经验展开说明: 1. 目标分析与前期准备 网站技术分析: 使用浏览器开发者工具(F12&…...
HBuilderX安装(uni-app和小程序开发)
下载HBuilderX 访问官方网站:https://www.dcloud.io/hbuilderx.html 根据您的操作系统选择合适版本: Windows版(推荐下载标准版) Windows系统安装步骤 运行安装程序: 双击下载的.exe安装文件 如果出现安全提示&…...
从零实现STL哈希容器:unordered_map/unordered_set封装详解
本篇文章是对C学习的STL哈希容器自主实现部分的学习分享 希望也能为你带来些帮助~ 那咱们废话不多说,直接开始吧! 一、源码结构分析 1. SGISTL30实现剖析 // hash_set核心结构 template <class Value, class HashFcn, ...> class hash_set {ty…...
基于matlab策略迭代和值迭代法的动态规划
经典的基于策略迭代和值迭代法的动态规划matlab代码,实现机器人的最优运输 Dynamic-Programming-master/Environment.pdf , 104724 Dynamic-Programming-master/README.md , 506 Dynamic-Programming-master/generalizedPolicyIteration.m , 1970 Dynamic-Programm…...
