Spring Cloud Stream 3.x+kafka 3.8整合
Spring Cloud Stream 3.x+kafka 3.8整合,文末有完整项目链接
- 前言
- 一、如何看官方文档(有深入了解需求的人)
- 二、kafka的安装
- tar包安装
- docker安装
- 三、代码中集成
- 创建一个测试topic:test
- producer代码
- producer配置(配置的格式,上篇文章我有详细解释,大家可以回看)
- Consumer代码
- Consumer 配置
- Consumer 2的代码和配置
- 四、测试
- 五、结语
前言
上一篇文章,我们用Spring Cloud Stream整合了RocketMQ:SpringCloud Alibaba五大组件之——RocketMQ,趁着此机会,继续学习了解一下Spring Cloud Stream,本文就以kafka为例。本文项目用到的所有Maven依赖和版本,都是和前面几篇文章一样。
由于整合kafka 不需要用到Cloud Alibaba一系列的技术,所以下载到源码运行不起来的,请删除mysql,nacos,dubbo,redis等一系列相关的依赖和代码。本文写下的时候,kafka最新版本为3.8版本,所以就以3.8版本举例说明。
官方中文文档:https://kafka1x.apachecn.org/documentation.html
官网文档:https://kafka.apache.org/documentation/
中文文档的版本比较老,建议大家对照着英文文档3.8版本的,相互结合起来看。
一、如何看官方文档(有深入了解需求的人)
1.基础操作:建议大家看operation一栏,后面我会简单贴出基本安装使用流程

2.配置建议看中文版本

二、kafka的安装
tar包安装
-
下载链接:kafka_2.13-3.8.0.tgz
-
选择一个合适的位置解压
tar -zxvf kafka_2.12-3.8.0.tgz -
启动自带的zookeeper(后台启动)
nohup bin/zookeeper-server-start.sh config/zookeeper.properties & -
修改kafka server的配置文件,便于外网能够访问
找到bin\config目录下的server.properties文件
修改以下两行listeners照着我这样写,advertised.listeners修改为你服务器的ip,端口默认9092listeners=PLAINTEXT://0.0.0.0:9092 advertised.listeners=PLAINTEXT://172.16.72.133:9092 -
启动kafka server(后台启动)
nohup bin/kafka-server-start.sh config/server.properties & -
稍微扩展一下,集群的搭建,比如我们要扩展为三个集群代理:
首先,为每个代理创建一个配置文件 (在Windows上使用copy 命令来代替):cp config/server.properties config/server-1.properties cp config/server.properties config/server-2.properties编辑这些新文件并设置如下属性:
config/server-1.properties:broker.id=1listeners=PLAINTEXT://:9093log.dir=/tmp/kafka-logs-1config/server-2.properties:broker.id=2listeners=PLAINTEXT://:9094log.dir=/tmp/kafka-logs-2broker.id属性是集群中每个节点的名称,这一名称是唯一且永久的,必须重写端口和日志目录
然后启动就好了:低一个启动的为leader,如果杀死leader,会重新推荐一个leader出来bin/kafka-server-start.sh config/server-1.properties & bin/kafka-server-start.sh config/server-2.properties &但是这样扩展的唯一不好的一点就是,会没有以前的数据,新的topic不影响,具体操作大家可以看文档。
docker安装
-
拉取镜像
docker pull apache/kafka:3.8.0 -
启动
docker run -p -d 9092:9092 apache/kafka:3.8.0
三、代码中集成
创建一个测试topic:test
bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
所有的topic的操作,都可以用kafka-topics.sh来操作,具体的可以看文档。老版本的启动是加–zookeeper的,会报错not found,新版本要用–bootstrap-server。
producer代码
@RestController
@RequestMapping("/mqtest")
public class KafkaTestController {private static final Logger logger = LoggerFactory.getLogger(KafkaTestController.class);@Autowiredprivate StreamBridge streamBridge;@RequestMapping("/test1")public void testOne() {Message<SimpleMsg> msg = new GenericMessage<>(new SimpleMsg("我是 broadcastMessage", new Date().toString()));streamBridge.send("broadcastMessage-out-0", msg);}
}
自定义消息体SimpleMsg,此类不需要序列化
@AllArgsConstructor
@Data
@NoArgsConstructor
public class SimpleMsg{private String msg;private String time;
}
producer配置(配置的格式,上篇文章我有详细解释,大家可以回看)
key:serializer 重中之重,发送对象消息的时候,解决转换错误,SpringCloudStream默认的是ByteArraySerializer,但是kafkamore默认的是String
spring:cloud:stream:kafka:binder:##kafka的server地址brokers: 172.16.72.133:9092##如果topic不存在则创建auto-create-topics: trueauto-add-partitions: true #自动分区min-partition-count: 1 #最小分区##这个序列化很关键,如果不加这个配置,则发送对象消息时候,会报转换错误configuration:key:serializer: org.apache.kafka.common.serialization.ByteArraySerializerbindings:broadcastMessage-out-0:destination: testcontent-type: application/json
Consumer代码
@Beanpublic Consumer<Message<SimpleMsg>> broadcastMessage() {return msg -> {log.info(Thread.currentThread().getName() + " Consumer1 Receive New Messages: " + msg.getPayload().getMsg() + msg.getPayload().getTime());};}
Consumer 配置
项目中有更详细的配置,这里为了测试用的简化版
spring:cloud:stream:function:definition: broadcastMessagekafka:binder:brokers: 172.16.72.133:9092auto-create-topics: trueconfiguration:key:serializer: org.apache.kafka.common.serialization.ByteArraySerializerbindings:broadcastMessage-in-0:destination: testgroup: test-topic-accountcontent-type: application/json
Consumer 2的代码和配置
项目中还有一个friend模块,当做第二个消费者,代码和配置和Consumer 1完全一样,唯一不同的就是可以设置group不同,这里就不贴代码了。
四、测试
生产者发送两个消息

两个消费者实例,分组一样,则轮询消费,分组不同,则单独消费
account模块消费者:

friend模块消费者:

五、结语
到这篇文章,这一个系列基本就算结束了,后面可能会补充一下内容,或者去写点其他的东西。或者说,去研究下springboot的集成而不用Spring Cloud Stream,后面再说吧。
本文完整项目代码GitHub地址,请切换到kafka分支
https:https://github.com/wangqing-github/DubboAndNacos.git
ssh:git@github.com:wangqing-github/DubboAndNacos.git
相关文章:
Spring Cloud Stream 3.x+kafka 3.8整合
Spring Cloud Stream 3.xkafka 3.8整合,文末有完整项目链接 前言一、如何看官方文档(有深入了解需求的人)二、kafka的安装tar包安装docker安装 三、代码中集成创建一个测试topic:testproducer代码producer配置(配置的格式,上篇文章…...
JavaScript中的数组
1.数组的概念 数组可以把一组相关的数据一起存放,并提供方便的访问/获取方式数组是指一组数据的集合,其中每个数据称之为元素(element),在数组中可以存放任意类型的元素,数组是一种将一组数据存储在单个变量名下的优雅方式。 2.…...
UE5运行时动态加载场景角色动画任意搭配-场景角色相机动画音乐加载方法(三)
1、将场景打包为Pak并加载 1、参考这篇文章将场景打包为pak,UE4打包并加载Pak-Windows/iOS/Android不同平台Editor/Runtime不同运行模式兼容 2、在Mount Pak后直接打开Map即可 void UMapManager::OpenMap(FString Path) {UWorld* World = UGlobalManager::GetInstance()->…...
c# 中 中文、英文、数字、空格、标点符号占的字符大小
在C#中,中文、英文、数字、空格和标点符号在不同编码下所占的字节大小是不一样的。常见的编码有UTF-8、UTF-16、GB2312等。以下是在不同编码下各种字符类型所占的字节大小: UTF-8: 中文字符:3个字节 英文字符:1个字…...
前端_003_js扫盲
文章目录 var,let,const严格模式数据类型运算符事件常用对象函数绑定call() ,apply(),bind() 闭包浏览器中事件循环回调和异步Promiseasync和await DOMBOMAjax var,let,const let是var的升级版本,对于块作用域,var无法进行限制,let不会存在该…...
ValueError: You cannot perform fine-tuning on purely quantized models.
在使用peft 微调8bit 或者4bit 模型的时候,可能会报错: You cannot perform fine-tuning on purely quantized models. Please attach trainable adapters on top of the quantized model to correctly perform fine-tuning. Please see: https://huggi…...
DELL R720服务器阵列数据恢复,磁盘状态为Foreign
服务器无法正常进入系统,物理磁盘状态变成了Foreign 虚拟磁盘状态变成了Failed 阵列已经丢失了,需要手工强制导入外部配置 单击 Main Menu 屏幕上的 Configuration Management。单击 Manage Foreign Configuration 单击 Preview Foreign Configurati…...
VMDK 0X80BB0005 VirtualBOX虚拟机错误处理-数据恢复——未来之窗数据恢复
打开虚拟盘文件in7.vmdk 失败. Could not get the storage format of the medium 7\win7.vmdk (VERR_NOT_SUPPORTED). 返回 代码:VBOX_E_IPRT_ERROR (0X80BB0005) 组件:MediumWrap 界面:IMedium {a a3f2dfb1} 被召者:IVirtualBox {768 cd607} 被召者 RC:VBOX_E_OBJECT_NOT_F…...
【Verilog学习日常】—牛客网刷题—Verilog企业真题—VL67
十六进制计数器 描述 请用Verilog设计十六进制递增计数器电路,每个时钟周期递增1。 电路的接口如下图所示。Q[3:0]中,Q[3]是高位。 接口电路图如下: 输入描述: input clk , input rst_n ,…...
51、AVR、ARM、DSP等常用芯片之对比
51芯片 51芯片通常指的是基于8051内核的单片机,这是一种经典的微控制器(MCU)。虽然关于51芯片的详细现代应用和发展可能因具体型号和厂商而有所不同,但基于8051内核的单片机通常具有以下特点: 结构经典:8…...
PostgreSQL 和Oracle 表压缩的对比
PostgreSQL 和Oracle 表压缩的对比 Oracle 和 PostgreSQL 在表压缩的性能方面存在显著差异,主要体现在实现方式、压缩效果、对系统性能的影响以及适用场景等方面。以下是对两者表压缩性能的详细对比: 1. 实现方式 Oracle 表压缩 Oracle 提供了多种压…...
【pyspark学习从入门到精通3】弹性分布式数据集_1
目录 RDD 的内部工作机制 创建 RDDs Schema 从文件中读取 弹性分布式数据集(RDDs)是一种分布式的不可变 JVM 对象集合,它允许你非常快速地执行计算,并且它们是 Apache Spark 的支柱。 顾名思义,数据集是分布式的&a…...
宠物健康监测仪健康守护者
在宠物护理领域,一款名为宠物健康监测仪的智能设备正逐渐成为宠物主人的新宠。这款设备不仅仅是一个简单的听诊器,它更像是宠物健康的智能管家,能够实时监测宠物的生理指标,并根据这些数据提供个性化的健康建议。 宠物健康监测仪…...
手写mybatis之解析和使用ResultMap映射参数配置
前言 学习源码是在学习什么呢? 就是为了通过这些源码级复杂模型中,学习系统框架的架构思维、设计原则和设计模式。在这些源码学习手写的过程中,感受、吸收并也是锻炼一种思维习惯,并尝试把这些思路技术迁移到平常的复杂业务设计开…...
LDR6500:低成本一拖二快充线解决方案
随着科技的飞速发展,我们的电子设备日益增多,从智能手机到平板电脑,再到各种可穿戴设备,它们已成为我们日常生活不可或缺的一部分。然而,随之而来的充电问题也日益凸显。为了解决这一难题,Type-C接口一拖二…...
DS线性表之单链表的讲解和实现(2)
文章目录 前言一、链表的概念二、链表的分类三、链表的结构四、前置知识准备五、单链表的模拟实现定义头节点初始化单链表销毁单链表打印单链表申请节点头插数据尾插数据头删数据尾删数据查询数据在pos位置之后插入数据删除pos位置之后的数据 总结 前言 本篇的单链表完全来说是…...
LeetCode 73 Set Matrix Zeroes 题目解析和python代码
题目: Given an m x n integer matrix matrix, if an element is 0, set its entire row and column to 0’s. You must do it in place. Example 1: Input: matrix [[1,1,1],[1,0,1],[1,1,1]] Output: [[1,0,1],[0,0,0],[1,0,1]] Example 2: Input: matrix …...
鸿蒙--WaterFlow 实现商城首页
目录结构 ├──entry/src/main/ets // 代码区 │ ├──common │ │ ├──constants │ │ │ └──CommonConstants.ets // 公共常量类 │ │ └──utils │ │ └──Logger.ets // 日志打印类 │ ├──entryability │ │ └──EntryAbility.ets // 程序入口…...
QT 中如何保存matlab 能打开的.mat数据矩阵!
Windows 上安装并使用 MATIO 库来保存 MATLAB 格式的 .mat 文件,需要进行以下步骤: 1. 下载并安装 CMake MATIO 使用 CMake 构建项目,因此你需要先安装 CMake。 前往 CMake 官网下载适用于 Windows 的安装程序并安装。 2. 下载 MATIO 库源…...
菱形继承(多继承)
1. 什么是菱形继承 也就是多继承,C独有的特性。 2. 菱形继承有什么问题? (1)存在内存浪费,多存一份父类的父类。 (2)容易造成二义性(不知道修改哪一个基本属性)。 3. 如…...
DeerFlow效果展示:自动生成的深度研究报告与播客内容惊艳分享
DeerFlow效果展示:自动生成的深度研究报告与播客内容惊艳分享 1. DeerFlow核心能力概览 DeerFlow作为一款深度研究智能助手,整合了语言模型、网络搜索和代码执行能力,能够自动完成从信息收集到内容生成的全流程工作。其核心功能亮点包括&am…...
饭局下半场,别人忙着解酒,我从开局就赢在酒杯里
1. 饭局如战场,后半场才是真正的考验任何一场饭局,都可以被分成两个阶段。前半场,推杯换盏,人人意气风发。酒过三巡,大家还在比拼谁喝得多、谁喝得猛,气氛热烈而体面。但到了后半场,画风开始分裂…...
像素剧本圣殿部署指南:Qwen2.5-14B-Instruct在生产环境中稳定运行的GPU显存优化技巧
像素剧本圣殿部署指南:Qwen2.5-14B-Instruct在生产环境中稳定运行的GPU显存优化技巧 1. 项目概述 像素剧本圣殿(Pixel Script Temple)是一款基于Qwen2.5-14B-Instruct大模型深度微调的专业剧本创作工具。它将先进的AI推理能力与独特的8-Bit…...
用STM32F103的TIM3实现旋转编码器方向判断:AB相相位差处理的5个关键细节
STM32F103旋转编码器方向判断实战:TIM3相位差处理的5个核心技巧 旋转编码器作为工业控制和人机交互中广泛使用的传感器,其方向判断的准确性直接影响系统控制的可靠性。本文将深入探讨基于STM32F103的TIM3定时器实现旋转编码器方向判断的关键技术细节&…...
【卷积神经网络作业实现人脸的关键点定位功能】
下面是完成这道题目的代码:import os import cv2 import numpy as np import pandas as pd import torch import torch.nn as nn from torch.utils.data import Dataset,DataLoader from torchvision import transforms import matplotlib.pyplot as plt1. 数据集定…...
保姆级教程:手把手教你用GLM-4v-9b搭建图片问答机器人
保姆级教程:手把手教你用GLM-4v-9b搭建图片问答机器人 你是不是经常遇到这样的情况:看到一张复杂的图表,想快速了解里面的数据含义;或者收到一张产品图,想知道它的具体型号和功能;又或者辅导孩子作业时&am…...
告别环境冲突!在PyCharm里用Anaconda为ArcGIS 10.2创建专属Arcpy虚拟环境(附32/64位切换指南)
告别环境冲突!在PyCharm里用Anaconda为ArcGIS 10.2创建专属Arcpy虚拟环境(附32/64位切换指南) 当你在处理多个GIS项目时,是否经常遇到这样的困扰:一个项目需要ArcGIS 10.2的32位环境,另一个项目却需要64位…...
造相Z-Image文生图模型v2:3步搭建你的专属AI画师
造相Z-Image文生图模型v2:3步搭建你的专属AI画师 1. 为什么选择Z-Image v2作为你的AI画师 在众多文生图模型中,造相Z-Image v2以其独特的优势脱颖而出。作为阿里通义万相团队开源的高性能模型,它原生支持768768及以上分辨率的高清图像生成&…...
如何选择ComfyUI-FramePackWrapper的模型加载方案?从技术选型到场景适配全解析
如何选择ComfyUI-FramePackWrapper的模型加载方案?从技术选型到场景适配全解析 【免费下载链接】ComfyUI-FramePackWrapper 项目地址: https://gitcode.com/gh_mirrors/co/ComfyUI-FramePackWrapper 在AI视频生成工作流中,模型加载是影响效率与稳…...
提升开发效率:IntelliJ IDEA必备插件推荐与安装指南(2023最新版)
2023年IntelliJ IDEA插件生态深度解析:从效率工具到全栈开发支持 JetBrains家族的IntelliJ IDEA早已超越普通代码编辑器的范畴,成为现代开发者手中的瑞士军刀。但鲜有人意识到,真正让这把军刀所向披靡的,是背后超过5000个官方认证…...
