Spring Cloud Stream 3.x+kafka 3.8整合
Spring Cloud Stream 3.x+kafka 3.8整合,文末有完整项目链接
- 前言
- 一、如何看官方文档(有深入了解需求的人)
- 二、kafka的安装
- tar包安装
- docker安装
- 三、代码中集成
- 创建一个测试topic:test
- producer代码
- producer配置(配置的格式,上篇文章我有详细解释,大家可以回看)
- Consumer代码
- Consumer 配置
- Consumer 2的代码和配置
- 四、测试
- 五、结语
前言
上一篇文章,我们用Spring Cloud Stream整合了RocketMQ:SpringCloud Alibaba五大组件之——RocketMQ,趁着此机会,继续学习了解一下Spring Cloud Stream,本文就以kafka为例。本文项目用到的所有Maven依赖和版本,都是和前面几篇文章一样。
由于整合kafka 不需要用到Cloud Alibaba一系列的技术,所以下载到源码运行不起来的,请删除mysql,nacos,dubbo,redis等一系列相关的依赖和代码。本文写下的时候,kafka最新版本为3.8版本,所以就以3.8版本举例说明。
官方中文文档:https://kafka1x.apachecn.org/documentation.html
官网文档:https://kafka.apache.org/documentation/
中文文档的版本比较老,建议大家对照着英文文档3.8版本的,相互结合起来看。
一、如何看官方文档(有深入了解需求的人)
1.基础操作:建议大家看operation一栏,后面我会简单贴出基本安装使用流程

2.配置建议看中文版本

二、kafka的安装
tar包安装
-
下载链接:kafka_2.13-3.8.0.tgz
-
选择一个合适的位置解压
tar -zxvf kafka_2.12-3.8.0.tgz -
启动自带的zookeeper(后台启动)
nohup bin/zookeeper-server-start.sh config/zookeeper.properties & -
修改kafka server的配置文件,便于外网能够访问
找到bin\config目录下的server.properties文件
修改以下两行listeners照着我这样写,advertised.listeners修改为你服务器的ip,端口默认9092listeners=PLAINTEXT://0.0.0.0:9092 advertised.listeners=PLAINTEXT://172.16.72.133:9092 -
启动kafka server(后台启动)
nohup bin/kafka-server-start.sh config/server.properties & -
稍微扩展一下,集群的搭建,比如我们要扩展为三个集群代理:
首先,为每个代理创建一个配置文件 (在Windows上使用copy 命令来代替):cp config/server.properties config/server-1.properties cp config/server.properties config/server-2.properties编辑这些新文件并设置如下属性:
config/server-1.properties:broker.id=1listeners=PLAINTEXT://:9093log.dir=/tmp/kafka-logs-1config/server-2.properties:broker.id=2listeners=PLAINTEXT://:9094log.dir=/tmp/kafka-logs-2broker.id属性是集群中每个节点的名称,这一名称是唯一且永久的,必须重写端口和日志目录
然后启动就好了:低一个启动的为leader,如果杀死leader,会重新推荐一个leader出来bin/kafka-server-start.sh config/server-1.properties & bin/kafka-server-start.sh config/server-2.properties &但是这样扩展的唯一不好的一点就是,会没有以前的数据,新的topic不影响,具体操作大家可以看文档。
docker安装
-
拉取镜像
docker pull apache/kafka:3.8.0 -
启动
docker run -p -d 9092:9092 apache/kafka:3.8.0
三、代码中集成
创建一个测试topic:test
bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
所有的topic的操作,都可以用kafka-topics.sh来操作,具体的可以看文档。老版本的启动是加–zookeeper的,会报错not found,新版本要用–bootstrap-server。
producer代码
@RestController
@RequestMapping("/mqtest")
public class KafkaTestController {private static final Logger logger = LoggerFactory.getLogger(KafkaTestController.class);@Autowiredprivate StreamBridge streamBridge;@RequestMapping("/test1")public void testOne() {Message<SimpleMsg> msg = new GenericMessage<>(new SimpleMsg("我是 broadcastMessage", new Date().toString()));streamBridge.send("broadcastMessage-out-0", msg);}
}
自定义消息体SimpleMsg,此类不需要序列化
@AllArgsConstructor
@Data
@NoArgsConstructor
public class SimpleMsg{private String msg;private String time;
}
producer配置(配置的格式,上篇文章我有详细解释,大家可以回看)
key:serializer 重中之重,发送对象消息的时候,解决转换错误,SpringCloudStream默认的是ByteArraySerializer,但是kafkamore默认的是String
spring:cloud:stream:kafka:binder:##kafka的server地址brokers: 172.16.72.133:9092##如果topic不存在则创建auto-create-topics: trueauto-add-partitions: true #自动分区min-partition-count: 1 #最小分区##这个序列化很关键,如果不加这个配置,则发送对象消息时候,会报转换错误configuration:key:serializer: org.apache.kafka.common.serialization.ByteArraySerializerbindings:broadcastMessage-out-0:destination: testcontent-type: application/json
Consumer代码
@Beanpublic Consumer<Message<SimpleMsg>> broadcastMessage() {return msg -> {log.info(Thread.currentThread().getName() + " Consumer1 Receive New Messages: " + msg.getPayload().getMsg() + msg.getPayload().getTime());};}
Consumer 配置
项目中有更详细的配置,这里为了测试用的简化版
spring:cloud:stream:function:definition: broadcastMessagekafka:binder:brokers: 172.16.72.133:9092auto-create-topics: trueconfiguration:key:serializer: org.apache.kafka.common.serialization.ByteArraySerializerbindings:broadcastMessage-in-0:destination: testgroup: test-topic-accountcontent-type: application/json
Consumer 2的代码和配置
项目中还有一个friend模块,当做第二个消费者,代码和配置和Consumer 1完全一样,唯一不同的就是可以设置group不同,这里就不贴代码了。
四、测试
生产者发送两个消息

两个消费者实例,分组一样,则轮询消费,分组不同,则单独消费
account模块消费者:

friend模块消费者:

五、结语
到这篇文章,这一个系列基本就算结束了,后面可能会补充一下内容,或者去写点其他的东西。或者说,去研究下springboot的集成而不用Spring Cloud Stream,后面再说吧。
本文完整项目代码GitHub地址,请切换到kafka分支
https:https://github.com/wangqing-github/DubboAndNacos.git
ssh:git@github.com:wangqing-github/DubboAndNacos.git
相关文章:
Spring Cloud Stream 3.x+kafka 3.8整合
Spring Cloud Stream 3.xkafka 3.8整合,文末有完整项目链接 前言一、如何看官方文档(有深入了解需求的人)二、kafka的安装tar包安装docker安装 三、代码中集成创建一个测试topic:testproducer代码producer配置(配置的格式,上篇文章…...
JavaScript中的数组
1.数组的概念 数组可以把一组相关的数据一起存放,并提供方便的访问/获取方式数组是指一组数据的集合,其中每个数据称之为元素(element),在数组中可以存放任意类型的元素,数组是一种将一组数据存储在单个变量名下的优雅方式。 2.…...
UE5运行时动态加载场景角色动画任意搭配-场景角色相机动画音乐加载方法(三)
1、将场景打包为Pak并加载 1、参考这篇文章将场景打包为pak,UE4打包并加载Pak-Windows/iOS/Android不同平台Editor/Runtime不同运行模式兼容 2、在Mount Pak后直接打开Map即可 void UMapManager::OpenMap(FString Path) {UWorld* World = UGlobalManager::GetInstance()->…...
c# 中 中文、英文、数字、空格、标点符号占的字符大小
在C#中,中文、英文、数字、空格和标点符号在不同编码下所占的字节大小是不一样的。常见的编码有UTF-8、UTF-16、GB2312等。以下是在不同编码下各种字符类型所占的字节大小: UTF-8: 中文字符:3个字节 英文字符:1个字…...
前端_003_js扫盲
文章目录 var,let,const严格模式数据类型运算符事件常用对象函数绑定call() ,apply(),bind() 闭包浏览器中事件循环回调和异步Promiseasync和await DOMBOMAjax var,let,const let是var的升级版本,对于块作用域,var无法进行限制,let不会存在该…...
ValueError: You cannot perform fine-tuning on purely quantized models.
在使用peft 微调8bit 或者4bit 模型的时候,可能会报错: You cannot perform fine-tuning on purely quantized models. Please attach trainable adapters on top of the quantized model to correctly perform fine-tuning. Please see: https://huggi…...
DELL R720服务器阵列数据恢复,磁盘状态为Foreign
服务器无法正常进入系统,物理磁盘状态变成了Foreign 虚拟磁盘状态变成了Failed 阵列已经丢失了,需要手工强制导入外部配置 单击 Main Menu 屏幕上的 Configuration Management。单击 Manage Foreign Configuration 单击 Preview Foreign Configurati…...
VMDK 0X80BB0005 VirtualBOX虚拟机错误处理-数据恢复——未来之窗数据恢复
打开虚拟盘文件in7.vmdk 失败. Could not get the storage format of the medium 7\win7.vmdk (VERR_NOT_SUPPORTED). 返回 代码:VBOX_E_IPRT_ERROR (0X80BB0005) 组件:MediumWrap 界面:IMedium {a a3f2dfb1} 被召者:IVirtualBox {768 cd607} 被召者 RC:VBOX_E_OBJECT_NOT_F…...
【Verilog学习日常】—牛客网刷题—Verilog企业真题—VL67
十六进制计数器 描述 请用Verilog设计十六进制递增计数器电路,每个时钟周期递增1。 电路的接口如下图所示。Q[3:0]中,Q[3]是高位。 接口电路图如下: 输入描述: input clk , input rst_n ,…...
51、AVR、ARM、DSP等常用芯片之对比
51芯片 51芯片通常指的是基于8051内核的单片机,这是一种经典的微控制器(MCU)。虽然关于51芯片的详细现代应用和发展可能因具体型号和厂商而有所不同,但基于8051内核的单片机通常具有以下特点: 结构经典:8…...
PostgreSQL 和Oracle 表压缩的对比
PostgreSQL 和Oracle 表压缩的对比 Oracle 和 PostgreSQL 在表压缩的性能方面存在显著差异,主要体现在实现方式、压缩效果、对系统性能的影响以及适用场景等方面。以下是对两者表压缩性能的详细对比: 1. 实现方式 Oracle 表压缩 Oracle 提供了多种压…...
【pyspark学习从入门到精通3】弹性分布式数据集_1
目录 RDD 的内部工作机制 创建 RDDs Schema 从文件中读取 弹性分布式数据集(RDDs)是一种分布式的不可变 JVM 对象集合,它允许你非常快速地执行计算,并且它们是 Apache Spark 的支柱。 顾名思义,数据集是分布式的&a…...
宠物健康监测仪健康守护者
在宠物护理领域,一款名为宠物健康监测仪的智能设备正逐渐成为宠物主人的新宠。这款设备不仅仅是一个简单的听诊器,它更像是宠物健康的智能管家,能够实时监测宠物的生理指标,并根据这些数据提供个性化的健康建议。 宠物健康监测仪…...
手写mybatis之解析和使用ResultMap映射参数配置
前言 学习源码是在学习什么呢? 就是为了通过这些源码级复杂模型中,学习系统框架的架构思维、设计原则和设计模式。在这些源码学习手写的过程中,感受、吸收并也是锻炼一种思维习惯,并尝试把这些思路技术迁移到平常的复杂业务设计开…...
LDR6500:低成本一拖二快充线解决方案
随着科技的飞速发展,我们的电子设备日益增多,从智能手机到平板电脑,再到各种可穿戴设备,它们已成为我们日常生活不可或缺的一部分。然而,随之而来的充电问题也日益凸显。为了解决这一难题,Type-C接口一拖二…...
DS线性表之单链表的讲解和实现(2)
文章目录 前言一、链表的概念二、链表的分类三、链表的结构四、前置知识准备五、单链表的模拟实现定义头节点初始化单链表销毁单链表打印单链表申请节点头插数据尾插数据头删数据尾删数据查询数据在pos位置之后插入数据删除pos位置之后的数据 总结 前言 本篇的单链表完全来说是…...
LeetCode 73 Set Matrix Zeroes 题目解析和python代码
题目: Given an m x n integer matrix matrix, if an element is 0, set its entire row and column to 0’s. You must do it in place. Example 1: Input: matrix [[1,1,1],[1,0,1],[1,1,1]] Output: [[1,0,1],[0,0,0],[1,0,1]] Example 2: Input: matrix …...
鸿蒙--WaterFlow 实现商城首页
目录结构 ├──entry/src/main/ets // 代码区 │ ├──common │ │ ├──constants │ │ │ └──CommonConstants.ets // 公共常量类 │ │ └──utils │ │ └──Logger.ets // 日志打印类 │ ├──entryability │ │ └──EntryAbility.ets // 程序入口…...
QT 中如何保存matlab 能打开的.mat数据矩阵!
Windows 上安装并使用 MATIO 库来保存 MATLAB 格式的 .mat 文件,需要进行以下步骤: 1. 下载并安装 CMake MATIO 使用 CMake 构建项目,因此你需要先安装 CMake。 前往 CMake 官网下载适用于 Windows 的安装程序并安装。 2. 下载 MATIO 库源…...
菱形继承(多继承)
1. 什么是菱形继承 也就是多继承,C独有的特性。 2. 菱形继承有什么问题? (1)存在内存浪费,多存一份父类的父类。 (2)容易造成二义性(不知道修改哪一个基本属性)。 3. 如…...
接口测试中缓存处理策略
在接口测试中,缓存处理策略是一个关键环节,直接影响测试结果的准确性和可靠性。合理的缓存处理策略能够确保测试环境的一致性,避免因缓存数据导致的测试偏差。以下是接口测试中常见的缓存处理策略及其详细说明: 一、缓存处理的核…...
springboot 百货中心供应链管理系统小程序
一、前言 随着我国经济迅速发展,人们对手机的需求越来越大,各种手机软件也都在被广泛应用,但是对于手机进行数据信息管理,对于手机的各种软件也是备受用户的喜爱,百货中心供应链管理系统被用户普遍使用,为方…...
ES6从入门到精通:前言
ES6简介 ES6(ECMAScript 2015)是JavaScript语言的重大更新,引入了许多新特性,包括语法糖、新数据类型、模块化支持等,显著提升了开发效率和代码可维护性。 核心知识点概览 变量声明 let 和 const 取代 var…...
React Native 开发环境搭建(全平台详解)
React Native 开发环境搭建(全平台详解) 在开始使用 React Native 开发移动应用之前,正确设置开发环境是至关重要的一步。本文将为你提供一份全面的指南,涵盖 macOS 和 Windows 平台的配置步骤,如何在 Android 和 iOS…...
练习(含atoi的模拟实现,自定义类型等练习)
一、结构体大小的计算及位段 (结构体大小计算及位段 详解请看:自定义类型:结构体进阶-CSDN博客) 1.在32位系统环境,编译选项为4字节对齐,那么sizeof(A)和sizeof(B)是多少? #pragma pack(4)st…...
基于数字孪生的水厂可视化平台建设:架构与实践
分享大纲: 1、数字孪生水厂可视化平台建设背景 2、数字孪生水厂可视化平台建设架构 3、数字孪生水厂可视化平台建设成效 近几年,数字孪生水厂的建设开展的如火如荼。作为提升水厂管理效率、优化资源的调度手段,基于数字孪生的水厂可视化平台的…...
MODBUS TCP转CANopen 技术赋能高效协同作业
在现代工业自动化领域,MODBUS TCP和CANopen两种通讯协议因其稳定性和高效性被广泛应用于各种设备和系统中。而随着科技的不断进步,这两种通讯协议也正在被逐步融合,形成了一种新型的通讯方式——开疆智能MODBUS TCP转CANopen网关KJ-TCPC-CANP…...
Cloudflare 从 Nginx 到 Pingora:性能、效率与安全的全面升级
在互联网的快速发展中,高性能、高效率和高安全性的网络服务成为了各大互联网基础设施提供商的核心追求。Cloudflare 作为全球领先的互联网安全和基础设施公司,近期做出了一个重大技术决策:弃用长期使用的 Nginx,转而采用其内部开发…...
【HTML-16】深入理解HTML中的块元素与行内元素
HTML元素根据其显示特性可以分为两大类:块元素(Block-level Elements)和行内元素(Inline Elements)。理解这两者的区别对于构建良好的网页布局至关重要。本文将全面解析这两种元素的特性、区别以及实际应用场景。 1. 块元素(Block-level Elements) 1.1 基本特性 …...
数据库分批入库
今天在工作中,遇到一个问题,就是分批查询的时候,由于批次过大导致出现了一些问题,一下是问题描述和解决方案: 示例: // 假设已有数据列表 dataList 和 PreparedStatement pstmt int batchSize 1000; // …...
