当前位置: 首页 > news >正文

Spring Cloud Stream 3.x+kafka 3.8整合

Spring Cloud Stream 3.x+kafka 3.8整合,文末有完整项目链接

  • 前言
  • 一、如何看官方文档(有深入了解需求的人)
  • 二、kafka的安装
    • tar包安装
    • docker安装
  • 三、代码中集成
    • 创建一个测试topic:test
    • producer代码
    • producer配置(配置的格式,上篇文章我有详细解释,大家可以回看)
    • Consumer代码
    • Consumer 配置
    • Consumer 2的代码和配置
  • 四、测试
  • 五、结语

前言

上一篇文章,我们用Spring Cloud Stream整合了RocketMQ:SpringCloud Alibaba五大组件之——RocketMQ,趁着此机会,继续学习了解一下Spring Cloud Stream,本文就以kafka为例。本文项目用到的所有Maven依赖和版本,都是和前面几篇文章一样。

由于整合kafka 不需要用到Cloud Alibaba一系列的技术,所以下载到源码运行不起来的,请删除mysql,nacos,dubbo,redis等一系列相关的依赖和代码。本文写下的时候,kafka最新版本为3.8版本,所以就以3.8版本举例说明。

官方中文文档:https://kafka1x.apachecn.org/documentation.html
官网文档:https://kafka.apache.org/documentation/
中文文档的版本比较老,建议大家对照着英文文档3.8版本的,相互结合起来看。

一、如何看官方文档(有深入了解需求的人)

1.基础操作:建议大家看operation一栏,后面我会简单贴出基本安装使用流程
在这里插入图片描述
2.配置建议看中文版本
在这里插入图片描述

二、kafka的安装

tar包安装

  1. 下载链接:kafka_2.13-3.8.0.tgz

  2. 选择一个合适的位置解压

    tar -zxvf kafka_2.12-3.8.0.tgz
    
  3. 启动自带的zookeeper(后台启动)

    nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
    
  4. 修改kafka server的配置文件,便于外网能够访问
    找到bin\config目录下的server.properties文件
    修改以下两行listeners照着我这样写,advertised.listeners修改为你服务器的ip,端口默认9092

    listeners=PLAINTEXT://0.0.0.0:9092
    advertised.listeners=PLAINTEXT://172.16.72.133:9092
    
  5. 启动kafka server(后台启动)

    nohup bin/kafka-server-start.sh config/server.properties &
    
  6. 稍微扩展一下,集群的搭建,比如我们要扩展为三个集群代理:
    首先,为每个代理创建一个配置文件 (在Windows上使用copy 命令来代替):

    cp config/server.properties config/server-1.properties
    cp config/server.properties config/server-2.properties
    

    编辑这些新文件并设置如下属性:

    config/server-1.properties:broker.id=1listeners=PLAINTEXT://:9093log.dir=/tmp/kafka-logs-1config/server-2.properties:broker.id=2listeners=PLAINTEXT://:9094log.dir=/tmp/kafka-logs-2
    

    broker.id属性是集群中每个节点的名称,这一名称是唯一且永久的,必须重写端口和日志目录
    然后启动就好了:低一个启动的为leader,如果杀死leader,会重新推荐一个leader出来

    bin/kafka-server-start.sh config/server-1.properties &
    bin/kafka-server-start.sh config/server-2.properties &
    

    但是这样扩展的唯一不好的一点就是,会没有以前的数据,新的topic不影响,具体操作大家可以看文档。

docker安装

  1. 拉取镜像

    docker pull apache/kafka:3.8.0
    
  2. 启动

    docker run -p -d 9092:9092 apache/kafka:3.8.0
    

三、代码中集成

创建一个测试topic:test

bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092

所有的topic的操作,都可以用kafka-topics.sh来操作,具体的可以看文档。老版本的启动是加–zookeeper的,会报错not found,新版本要用–bootstrap-server。

producer代码

@RestController
@RequestMapping("/mqtest")
public class KafkaTestController {private static final Logger logger = LoggerFactory.getLogger(KafkaTestController.class);@Autowiredprivate StreamBridge streamBridge;@RequestMapping("/test1")public void testOne() {Message<SimpleMsg> msg = new GenericMessage<>(new SimpleMsg("我是 broadcastMessage", new Date().toString()));streamBridge.send("broadcastMessage-out-0", msg);}
}

自定义消息体SimpleMsg,此类不需要序列化

@AllArgsConstructor
@Data
@NoArgsConstructor
public class SimpleMsg{private String msg;private String time;
}

producer配置(配置的格式,上篇文章我有详细解释,大家可以回看)

key:serializer 重中之重,发送对象消息的时候,解决转换错误,SpringCloudStream默认的是ByteArraySerializer,但是kafkamore默认的是String

spring:cloud:stream:kafka:binder:##kafka的server地址brokers: 172.16.72.133:9092##如果topic不存在则创建auto-create-topics: trueauto-add-partitions: true #自动分区min-partition-count: 1 #最小分区##这个序列化很关键,如果不加这个配置,则发送对象消息时候,会报转换错误configuration:key:serializer: org.apache.kafka.common.serialization.ByteArraySerializerbindings:broadcastMessage-out-0:destination: testcontent-type: application/json

Consumer代码

 @Beanpublic Consumer<Message<SimpleMsg>> broadcastMessage() {return msg -> {log.info(Thread.currentThread().getName() + " Consumer1 Receive New Messages: " + msg.getPayload().getMsg() + msg.getPayload().getTime());};}

Consumer 配置

项目中有更详细的配置,这里为了测试用的简化版

spring:cloud:stream:function:definition: broadcastMessagekafka:binder:brokers: 172.16.72.133:9092auto-create-topics: trueconfiguration:key:serializer: org.apache.kafka.common.serialization.ByteArraySerializerbindings:broadcastMessage-in-0:destination: testgroup: test-topic-accountcontent-type: application/json

Consumer 2的代码和配置

项目中还有一个friend模块,当做第二个消费者,代码和配置和Consumer 1完全一样,唯一不同的就是可以设置group不同,这里就不贴代码了。

四、测试

生产者发送两个消息
在这里插入图片描述
两个消费者实例,分组一样,则轮询消费,分组不同,则单独消费
account模块消费者:
在这里插入图片描述
friend模块消费者:
在这里插入图片描述

五、结语

到这篇文章,这一个系列基本就算结束了,后面可能会补充一下内容,或者去写点其他的东西。或者说,去研究下springboot的集成而不用Spring Cloud Stream,后面再说吧。

本文完整项目代码GitHub地址,请切换到kafka分支
https:https://github.com/wangqing-github/DubboAndNacos.git
ssh:git@github.com:wangqing-github/DubboAndNacos.git

相关文章:

Spring Cloud Stream 3.x+kafka 3.8整合

Spring Cloud Stream 3.xkafka 3.8整合&#xff0c;文末有完整项目链接 前言一、如何看官方文档(有深入了解需求的人)二、kafka的安装tar包安装docker安装 三、代码中集成创建一个测试topic&#xff1a;testproducer代码producer配置&#xff08;配置的格式&#xff0c;上篇文章…...

JavaScript中的数组

1.数组的概念 数组可以把一组相关的数据一起存放&#xff0c;并提供方便的访问/获取方式数组是指一组数据的集合&#xff0c;其中每个数据称之为元素(element)&#xff0c;在数组中可以存放任意类型的元素&#xff0c;数组是一种将一组数据存储在单个变量名下的优雅方式。 2.…...

UE5运行时动态加载场景角色动画任意搭配-场景角色相机动画音乐加载方法(三)

1、将场景打包为Pak并加载 1、参考这篇文章将场景打包为pak,UE4打包并加载Pak-Windows/iOS/Android不同平台Editor/Runtime不同运行模式兼容 2、在Mount Pak后直接打开Map即可 void UMapManager::OpenMap(FString Path) {UWorld* World = UGlobalManager::GetInstance()->…...

c# 中 中文、英文、数字、空格、标点符号占的字符大小

在C#中&#xff0c;中文、英文、数字、空格和标点符号在不同编码下所占的字节大小是不一样的。常见的编码有UTF-8、UTF-16、GB2312等。以下是在不同编码下各种字符类型所占的字节大小&#xff1a; UTF-8&#xff1a; 中文字符&#xff1a;3个字节 英文字符&#xff1a;1个字…...

前端_003_js扫盲

文章目录 var,let,const严格模式数据类型运算符事件常用对象函数绑定call() ,apply(),bind() 闭包浏览器中事件循环回调和异步Promiseasync和await DOMBOMAjax var,let,const let是var的升级版本&#xff0c;对于块作用域&#xff0c;var无法进行限制&#xff0c;let不会存在该…...

ValueError: You cannot perform fine-tuning on purely quantized models.

在使用peft 微调8bit 或者4bit 模型的时候&#xff0c;可能会报错&#xff1a; You cannot perform fine-tuning on purely quantized models. Please attach trainable adapters on top of the quantized model to correctly perform fine-tuning. Please see: https://huggi…...

DELL R720服务器阵列数据恢复,磁盘状态为Foreign

服务器无法正常进入系统&#xff0c;物理磁盘状态变成了Foreign 虚拟磁盘状态变成了Failed 阵列已经丢失了&#xff0c;需要手工强制导入外部配置 单击 Main Menu 屏幕上的 Configuration Management。单击 Manage Foreign Configuration 单击 Preview Foreign Configurati…...

VMDK 0X80BB0005 VirtualBOX虚拟机错误处理-数据恢复——未来之窗数据恢复

打开虚拟盘文件in7.vmdk 失败. Could not get the storage format of the medium 7\win7.vmdk (VERR_NOT_SUPPORTED). 返回 代码:VBOX_E_IPRT_ERROR (0X80BB0005) 组件:MediumWrap 界面:IMedium {a a3f2dfb1} 被召者:IVirtualBox {768 cd607} 被召者 RC:VBOX_E_OBJECT_NOT_F…...

【Verilog学习日常】—牛客网刷题—Verilog企业真题—VL67

十六进制计数器 描述 请用Verilog设计十六进制递增计数器电路&#xff0c;每个时钟周期递增1。 电路的接口如下图所示。Q[3:0]中&#xff0c;Q[3]是高位。 接口电路图如下&#xff1a; 输入描述&#xff1a; input clk , input rst_n ,…...

51、AVR、ARM、DSP等常用芯片之对比

51芯片 51芯片通常指的是基于8051内核的单片机&#xff0c;这是一种经典的微控制器&#xff08;MCU&#xff09;。虽然关于51芯片的详细现代应用和发展可能因具体型号和厂商而有所不同&#xff0c;但基于8051内核的单片机通常具有以下特点&#xff1a; 结构经典&#xff1a;8…...

PostgreSQL 和Oracle 表压缩的对比

PostgreSQL 和Oracle 表压缩的对比 Oracle 和 PostgreSQL 在表压缩的性能方面存在显著差异&#xff0c;主要体现在实现方式、压缩效果、对系统性能的影响以及适用场景等方面。以下是对两者表压缩性能的详细对比&#xff1a; 1. 实现方式 Oracle 表压缩 Oracle 提供了多种压…...

【pyspark学习从入门到精通3】弹性分布式数据集_1

目录 RDD 的内部工作机制 创建 RDDs Schema 从文件中读取 弹性分布式数据集&#xff08;RDDs&#xff09;是一种分布式的不可变 JVM 对象集合&#xff0c;它允许你非常快速地执行计算&#xff0c;并且它们是 Apache Spark 的支柱。 顾名思义&#xff0c;数据集是分布式的&a…...

宠物健康监测仪健康守护者

在宠物护理领域&#xff0c;一款名为宠物健康监测仪的智能设备正逐渐成为宠物主人的新宠。这款设备不仅仅是一个简单的听诊器&#xff0c;它更像是宠物健康的智能管家&#xff0c;能够实时监测宠物的生理指标&#xff0c;并根据这些数据提供个性化的健康建议。 宠物健康监测仪…...

手写mybatis之解析和使用ResultMap映射参数配置

前言 学习源码是在学习什么呢&#xff1f; 就是为了通过这些源码级复杂模型中&#xff0c;学习系统框架的架构思维、设计原则和设计模式。在这些源码学习手写的过程中&#xff0c;感受、吸收并也是锻炼一种思维习惯&#xff0c;并尝试把这些思路技术迁移到平常的复杂业务设计开…...

LDR6500:低成本一拖二快充线解决方案

随着科技的飞速发展&#xff0c;我们的电子设备日益增多&#xff0c;从智能手机到平板电脑&#xff0c;再到各种可穿戴设备&#xff0c;它们已成为我们日常生活不可或缺的一部分。然而&#xff0c;随之而来的充电问题也日益凸显。为了解决这一难题&#xff0c;Type-C接口一拖二…...

DS线性表之单链表的讲解和实现(2)

文章目录 前言一、链表的概念二、链表的分类三、链表的结构四、前置知识准备五、单链表的模拟实现定义头节点初始化单链表销毁单链表打印单链表申请节点头插数据尾插数据头删数据尾删数据查询数据在pos位置之后插入数据删除pos位置之后的数据 总结 前言 本篇的单链表完全来说是…...

LeetCode 73 Set Matrix Zeroes 题目解析和python代码

题目&#xff1a; Given an m x n integer matrix matrix, if an element is 0, set its entire row and column to 0’s. You must do it in place. Example 1: Input: matrix [[1,1,1],[1,0,1],[1,1,1]] Output: [[1,0,1],[0,0,0],[1,0,1]] Example 2: Input: matrix …...

鸿蒙--WaterFlow 实现商城首页

目录结构 ├──entry/src/main/ets // 代码区 │ ├──common │ │ ├──constants │ │ │ └──CommonConstants.ets // 公共常量类 │ │ └──utils │ │ └──Logger.ets // 日志打印类 │ ├──entryability │ │ └──EntryAbility.ets // 程序入口…...

QT 中如何保存matlab 能打开的.mat数据矩阵!

Windows 上安装并使用 MATIO 库来保存 MATLAB 格式的 .mat 文件&#xff0c;需要进行以下步骤&#xff1a; 1. 下载并安装 CMake MATIO 使用 CMake 构建项目&#xff0c;因此你需要先安装 CMake。 前往 CMake 官网下载适用于 Windows 的安装程序并安装。 2. 下载 MATIO 库源…...

菱形继承(多继承)

1. 什么是菱形继承 也就是多继承&#xff0c;C独有的特性。 2. 菱形继承有什么问题&#xff1f; &#xff08;1&#xff09;存在内存浪费&#xff0c;多存一份父类的父类。 &#xff08;2&#xff09;容易造成二义性&#xff08;不知道修改哪一个基本属性&#xff09;。 3. 如…...

Vivado项目文件太多分不清?这份FPGA开发必备的‘文件后缀速查手册’请收好

Vivado项目文件管理终极指南&#xff1a;从后缀识别到高效工作流 当你第一次打开一个成熟的Vivado项目文件夹时&#xff0c;那种面对几十种陌生文件后缀的茫然感&#xff0c;相信每个FPGA开发者都记忆犹新。就像走进了一个满是神秘符号的仓库&#xff0c;每个文件似乎都在向你发…...

ArduinoLog:面向MCU的零开销C++嵌入式日志框架

1. ArduinoLog 项目概述ArduinoLog 是一款专为 Arduino 及兼容嵌入式平台&#xff08;包括 AVR、SAM、ESP8266 等&#xff09;设计的轻量级 C 日志框架。其核心设计哲学是“零运行时开销、零动态内存分配、全编译期可控”&#xff0c;在资源极度受限的微控制器环境中&#xff0…...

MediaPipe人脸检测避坑指南:如何优化检测精度与性能(含模型选择建议)

MediaPipe人脸检测实战优化&#xff1a;从参数调优到模型部署的完整指南 人脸检测作为计算机视觉的基础任务&#xff0c;其性能直接影响后续的面部分析效果。MediaPipe提供的轻量级解决方案在移动端和边缘设备上表现出色&#xff0c;但实际应用中常遇到误检、漏检或性能瓶颈问题…...

PasteMD算力优化成果:Ollama量化后llama3:8b仅需4GB内存,推理速度提升2.3倍

PasteMD算力优化成果&#xff1a;Ollama量化后llama3:8b仅需4GB内存&#xff0c;推理速度提升2.3倍 1. 项目背景与优化挑战 PasteMD是一款基于本地Ollama框架的剪贴板智能美化工具&#xff0c;它能够将杂乱的文本内容一键转换为结构化的Markdown格式。这个工具完全私有化部署…...

Qwen3-14B开源大模型实战:WebUI界面定制+API接口二次开发教程

Qwen3-14B开源大模型实战&#xff1a;WebUI界面定制API接口二次开发教程 1. 开箱即用的私有部署方案 Qwen3-14B作为通义千问最新开源的大语言模型&#xff0c;在14B参数规模下展现出惊人的多任务处理能力。但很多开发者在本地部署时常常遇到环境配置复杂、显存不足、推理速度…...

提升开发效率:IntelliJ IDEA必备插件推荐与安装指南(2023最新版)

2023年IntelliJ IDEA插件生态深度解析&#xff1a;从效率工具到全栈开发支持 JetBrains家族的IntelliJ IDEA早已超越普通代码编辑器的范畴&#xff0c;成为现代开发者手中的瑞士军刀。但鲜有人意识到&#xff0c;真正让这把军刀所向披靡的&#xff0c;是背后超过5000个官方认证…...

Java基础实战:用快马平台快速构建学生成绩管理系统巩固核心知识

最近在复习Java基础知识&#xff0c;发现光看理论很容易遗忘&#xff0c;于是决定通过一个小项目来巩固核心概念。这个简易学生成绩管理系统虽然功能简单&#xff0c;但涵盖了Java基础的多个重要知识点&#xff0c;特别适合像我这样的初学者练手。 项目整体设计思路 首先考虑…...

中国DevOps市场格局重塑:本土合规与全球协作的平衡艺术

中国DevOps市场格局重塑&#xff1a;本土合规与全球协作的平衡艺术 中国企业的DevOps工具链选择正面临前所未有的复杂局面 随着数字经济的深入发展&#xff0c;DevOps工具链已经从单纯的技术选型问题演变为关乎企业数字化转型成败的战略决策。在当前的宏观环境下&#xff0c;…...

Linux 内核中的内核线程:从创建到管理

Linux 内核中的内核线程&#xff1a;从创建到管理 引言 作为一名深耕操作系统和嵌入式开发的工程师&#xff0c;我深知后台任务的重要性。在系统开发中&#xff0c;合理的后台任务管理可以提高系统的响应性和稳定性。在 Linux 内核中&#xff0c;内核线程是执行后台任务的核心机…...

Janus-Pro-7B开发环境搭建:Ubuntu20.04系统配置全攻略

Janus-Pro-7B开发环境搭建&#xff1a;Ubuntu20.04系统配置全攻略 从零开始&#xff0c;手把手带你搭建Janus-Pro-7B多模态AI开发环境 如果你刚接触Janus-Pro-7B这个强大的多模态模型&#xff0c;可能会被环境配置的各种问题困扰。别担心&#xff0c;今天我就带你一步步在Ubunt…...