玩转SpringCloud Stream
背景及痛点
现如今消息中间件(MQ)在互联网项目中被广泛的应用,特别是大数据行业应用的特别的多,现在市面上也流行这多个消息中间件框架,比如ActiveMQ
、RabbitMQ
、RocketMQ
、Kafka
等,这些消息中间件各有各的优劣,但是想要解决的问题都基本相同。由于每个框架都有它自己的使用方式,这无疑是增加了开发者的学习成本以及添加相同的业务复杂度。框架的变更或者多个中间件的混合使用使得业务逻辑代码中中间件的切换、项目的维护和开发都会变得更加繁琐。
有没有一种技术让我们不再需要关注具体MQ的使用细节,我们只需要专注业务逻辑的开发,让程序根据实际项目的使用自己去适配绑定,自动在各种MQ内切换呢?springcloud stream
便为此而生。
关于stream
我们用一句话来描述stream就是:屏蔽底层消息中间件的差异,降低切换版本,统一消息的编程模型
官方定义SpringCloud Stream
是一个构建消息驱动微服务的框架,应用通过inputs
或者outputs
来与SpringCloud Stream
中的binder
对象交互,我们通过配置来绑定消息中间件,而SpringCloud Stream
的binder
对象负责与消息中间件交互,所以我们只需要搞清楚如何与SpringCloud Stream
交互即可方便的使用消息中间件。
SpringCloud Stream
通过Spring Integration
来连接消息代理中间件以实现消息事件驱动,它提供了个性化的自动化配置,引用了发布订阅
、消费组
、分区
的三个核心概念,但是目前仅支持RabbitMQ
和Kafka
设计思想
在此之前

生产者和消费者通过消息媒介(queue等)传递信息内容(Message),消息必须通过特定的通道(MessageChannel),通过消息的发布与订阅来决定消息的发送和消费(publish/subscrib
)。
引入中间件
现在假如我们用到了RabbitMQ
和Kafka
,由于这两个消息中间件的架构上的不同,像RabbitMQ
有Exchange
,而Kafka
有topiche
和Partitions
分区

(binder中,input对于消费者,output对应生产者。)
这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,但是后面因为业务需求,需要改用另外一种消息队列进行迁移,这时候无疑就是一 个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream
给我们提供了一种解耦合的方式。
屏蔽底层差异
在没有绑定器(Builder
)这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,通过定义绑定器作为中间件,完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道, 使得应用程序不需要再考虑各种不同的消息中间件实现。
通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
处理架构
Stream对消息中间件的进一步封装可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。

其遵循了发布-订阅模式,主要使用的就是Topic主题进行广播,RabbitMQ就是Exchange,在Kafka中就是Topic
通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
stream流程

-
Binder
:很方便的连接中间件,屏蔽差异 -
Channel
:通道是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过对Channel对队列进行配置 -
Source和Sink
:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入
常用api和注解

使用示例
基本环境
注册中心
:Eureka,可以是其他。-
消息中间件
:RabbitMQrabbitmq:host: localhostport: 5672username: guestpassword: guest
生产端
依赖
<!--stream rabbit -->
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--eureka client-->
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
配置文件
server:port: 8801spring:application:name: cloud-stream-providercloud:# stream 配置stream:binders: # 配置绑定的消息中间件的服务信息defaultRabbit: # 自定义的一个名称,用来下面 bindings 绑定type: rabbit # 消息组件的类型environment: #相关环境配置,设置rabbitmq的环境spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理output: # 通道名称destination: testExchange # 定义要使用的Exchange的名称content-type: application/json # 设置消息类型,对象为json,文本是text/plainbinder: defaultRabbit # 设置要绑定的服务的具体设置,就是我们上面配置的defaultRabbiteureka:client:#表示是否将自己注册进EurekaServer默认为trueregister-with-eureka: true#是否从EurekaServer抓取已有的注册消息,默认为true,单节点无所谓,集群必须设置为true才能配合ribbon使用负载均衡fetch-registry: trueservice-url:#单机版defaultZone: http://localhost:8080/eureka/instance:prefer-ip-address: trueinstance-id: sender01
定义接口
这里需要定义一个接口并实现它,方便其他业务调用。
public interface IMessageProvider {/*** 发送接口* @param msg* @return*/public String send(String msg);
}
接口实现
接口实现中需要添加
@EnableBinding
注解,并引入Source.class
,为什么引入Source.class
呢?因为它是生产者,我们参考stream流程图就可以知道
import com.martain.study.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;@EnableBinding(Source.class)
public class MessageProvider implements IMessageProvider {/*** 注入消息发送管道*/@Resourceprivate MessageChannel output;@Overridepublic String send(String msg) {output.send(MessageBuilder.withPayload(msg).build());System.out.println("******send message:"+msg);return msg;}
}
定义测试controller
@RestController
public class TestController {@AutowiredIMessageProvider messageProvider;@GetMapping("/sendMsg")public String sendMsg(){String msg = UUID.randomUUID().toString();return messageProvider.send(msg);}}
启动类
@SpringBootApplication
public class StreamProviderApplication8801 {public static void main(String[] args) {SpringApplication.run(StreamProviderApplication8801.class,args);}
}
服务启动之后,多次请求/sendMsg
,发送了多条消息。

消费端
依赖
<!--stream rabbit --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency><!--eureka client--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency>
配置文件
与生产者类似,只是bindings中的output改成了input
server:port: 8802
spring:application:name: cloud-stream-consumecloud:# stream 配置stream:binders: # 配置绑定的消息中间件的服务信息defaultRabbit: # 自定义的一个名称,用来下面 bindings 绑定type: rabbit # 消息组件的类型environment: #相关环境配置,设置rabbitmq的环境spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 通道名称destination: testExchange # 定义要使用的Exchange的名称content-type: application/json # 设置消息类型,对象为json,文本是text/plainbinder: defaultRabbit # 设置要绑定的服务的具体设置,就是我们上面配置的defaultRabbiteureka:client:#表示是否将自己注册进EurekaServer默认为trueregister-with-eureka: true#是否从EurekaServer抓取已有的注册消息,默认为true,单节点无所谓,集群必须设置为true才能配合ribbon使用负载均衡fetch-registry: trueservice-url:#单机版defaultZone: http://localhost:8080/eureka/instance:prefer-ip-address: trueinstance-id: recover01
接收服务
接收服务只需要再类名前添加
@EnableBinding()
注解,并引入Sink.class
类,而实际接收的方法中需要添加@StreamListener(Sink.INPUT)
注解。
package com.martain.study.controller;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component; @Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {/*** 获取本服务的端口*/@Value("${server.port}")private String serverPort;/*** 这里表示监听sink的input* @param message*/@StreamListener(Sink.INPUT)public void input(Message<String> message){System.out.println("**** recv msg :"+message.getPayload()+" in port "+serverPort);}
}
启动类
@SpringBootApplication
public class StreamConsumerApplication8802 {public static void main(String[] args) {SpringApplication.run(StreamConsumerApplication8802.class,args);}
}
启动生产服务后,在启动消费服务,多次请求生产服务发送消息,我们可以发现消费者能很快的消费这些消息。

消息分组
当我们有多个
消费者
时,这个时候生产者生产一条消息,会发现所有的消费者都会消费这个消息。比如在一些订单系统的场景中,如果一个订单被多个处理服务一起获取到,就容易造成数据错误,那我们如何避免这种情况呢?这时我们就可以使用Stream的消息分组
来解决重复消费问题。
如何实现Stream的消息分组呢?我们只要简单的在yml文件中配置spring.cloud.stream.bindings.input.group
即可。示例如下:
...
spring:application:name: cloud-stream-consumecloud:# stream 配置stream:binders: # 配置绑定的消息中间件的服务信息defaultRabbit: # 自定义的一个名称,用来下面 bindings 绑定type: rabbit # 消息组件的类型environment: #相关环境配置,设置rabbitmq的环境spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 通道名称destination: testExchange # 定义要使用的Exchange的名称content-type: application/json # 设置消息类型,对象为json,文本是text/plainbinder: defaultRabbit # 设置要绑定的服务的具体设置,就是我们上面配置的defaultRabbitgroup: groupA # 配置分组...
如果没有设置该属性,当消费服务启动时,会有个随机的组名
。
如果我们将所有的消费服务的group
熟悉都设置成一致的话,这些服务就会在同一个组里面,从而能够保证消息只被应用消费一次。
同一组的消费者是竞争关系,不可以重复消费。
消息持久化
当生产者在持续生产消息,消费服务突然挂了,使得拥有许多消息并没有被消费,如果消费没有配置分组的话,消费服务重启是无法消费未消费的消息的,如果配置了分组的话,当消费服务重启之后可以自动去消费未消费的数据。

喜欢的朋友记得点赞、收藏、关注哦!!!
相关文章:

玩转SpringCloud Stream
背景及痛点 现如今消息中间件(MQ)在互联网项目中被广泛的应用,特别是大数据行业应用的特别的多,现在市面上也流行这多个消息中间件框架,比如ActiveMQ、RabbitMQ、RocketMQ、Kafka等,这些消息中间件各有各的优劣,但是想…...

嵌入式经常用到串口,如何判断串口数据接收完成?
说起通信,首先想到的肯定是串口,日常中232和485的使用比比皆是,数据的发送、接收是串口通信最基础的内容。这篇文章主要讨论串口接收数据的断帧操作。 空闲中断断帧 一些mcu(如:stm32f103)在出厂时就已经在…...

iOS App的启动与优化
App的启动流程 App启动分为冷启动和热启动 冷启动:从0开始启动App热启动:App已经在内存中,但是后台还挂着,再次点击图标启动App。 一般对App启动的优化都是针对冷启动。 App冷启动可分为三个阶段: dyld:…...
导出指定文件夹下的文件结构 工具模块-Python
python模块代码 import os import json import xml.etree.ElementTree as ET from typing import List, Optional, Dict, Union from pathlib import Path class DirectoryTreeExporter:def __init__(self,root_path: str,output_file: str,fmt: str txt,show_root: boo…...

Leetcode - 周赛436
目录 一、3446. 按对角线进行矩阵排序二、3447. 将元素分配给有约束条件的组三、3448. 统计可以被最后一个数位整除的子字符串数目四、3449. 最大化游戏分数的最小值 一、3446. 按对角线进行矩阵排序 题目链接 本题可以暴力枚举,在确定了每一个对角线的第一个元素…...
【pytest】编写自动化测试用例命名规范README
API_autoTest 项目介绍 1. pytest命名规范 测试文件: 文件名需要以 test_ 开头或者以 _test.py 结尾。例如,test_login.py、user_management_test.py 这样的命名方式,pytest 能够自动识别并将其作为测试文件来执行其中的测试用例。 测试类…...

Compose常用UI组件
Compose常用UI组件 概述Modifier 修饰符常用Modifier修饰符作用域限定Modifier Modifier 实现原理Modifier.Element链的构建链的解析 常用基础组件常用布局组件列表组件 概述 Compose 预置了很多基础组件,如 Button,TextField,TopAppBar等&a…...

斐波那契数列模型:在动态规划的丝绸之路上追寻斐波那契的足迹(上)
文章目录 引言递归与动态规划的对比递归解法的初探动态规划的优雅与高效自顶向下的记忆化搜索自底向上的迭代法 性能分析与比较小结 引言 斐波那契数列,这一数列如同一条无形的丝线,穿越千年时光,悄然延续其魅力。其定义简单而优美ÿ…...

Hackthebox- Season7- Titanic 简记 [Easy]
简记 ip重定向到 http://titanic.htb,先添加hosts 收集子域名 wfuzz -c -u http://titanic.htb/ -w /usr/share/seclists/Discovery/DNS/subdomains-top1million-20000.txt -H Host:FUZZ.titanic.htb --hl 9 ******************************************************** * Wfu…...

Sa-Token 根据官方文档简单实现登录认证的示例
Sa-Token 根据官方文档实现登录鉴权测试 功能实现步骤依赖配置文件启动类创建 controller启动项目测试不用密码登录查看cookie状态 密码登录查看cookie状态 修改token名称 Apipost 测试无 cookie 模式【使用 token】后端将 token 返回到前端修改代码:测试࿱…...

rustdesk编译修改名字
最近,我用Rust重写了一个2W行C代码的linux内核模块。在此记录一点经验。我此前没写过内核模块,认识比较疏浅,有错误欢迎指正。 为什么要重写? 这个模块2W行代码量看起来不多,却在线上时常故障,永远改不完。…...
BS5852英国家具防火安全条款主要包括哪几个方面呢?
什么是BS5852检测? BS5852是英国针对家用家具的强制性安全要求,主要测试家具在受到燃烧香烟和火柴等火源时的可燃性。这个标准通常分为四个部分进行测试,但实际应用中主要测试第一部分和第二部分,包括烟头测试和利用乙炔火焰模拟…...

【运维】源码编译安装cmake
背景: 已经在本地源码编译安装gcc/g,现在源码安装cmake 下载源码 下载地址:CMake - Upgrade Your Software Build System 安装步骤: ./bootstrap --prefix/usr/local/cmake make make install 错误处理 1、提示找不到libmpc.…...

检测网络安全漏洞 工具
实验一的名称为信息收集和漏洞扫描 实验环境:VMware下的kali linux2021和Windows7 32,网络设置均为NAT,这样子两台机器就在一个网络下。攻击的机器为kali,被攻击的机器为Windows 7。 理论知识记录: 1.信息收集的步骤 2.ping命令…...
frameworks 之 Activity添加View
frameworks 之 Activity添加View 1 LaunchActivityItem1.1 Activity 创建1.2 PhoneWindow 创建1.3 DecorView 创建 2 ResumeActivityItem 讲解 Activity加载View的时机和流程 涉及到的类如下 frameworks/base/core/java/android/app/Activity.javaframeworks/base/services/cor…...
UWB技术中的两种调制方式:PPM与PAM
Ultra-Wideband (UWB) 技术以其低功耗、宽频谱和高精度定位的特点,广泛应用于物联网(IoT)、智能家居、资产追踪和无线通信等领域。在UWB中,信号的调制方式对于数据传输的效率和精度起着至关重要的作用。本文将深入探讨UWB中常用的…...

达梦:用户和模式
目录标题 数据库管理系统与用户权限管理**四权分立****用户管理与权限划分****用户管理界面与权限控制****用户创建与管理****实操**1. **默认创建用户与模式**:2. **用户权限和角色分配**:3. **命令行管理用户与角色**:4. 模式也可以创建 **…...

23. AI-大语言模型-DeepSeek
文章目录 前言一、DeepSeek是什么1. 简介2. 产品版本3. 特征4. 地址链接5. 三种访问方式1. 网页端和APP2. DeepSeek API 二、DeepSeek可以做什么1. 应用场景2. 文本生成1. 文本创作2. 摘要与改写3. 结构化生成 3. 自然语言理解与分析1. 语义分析2. 文本分类3. 知识推理 4. 编程…...

Spring-GPT智谱清言AI项目(附源码)
一、项目介绍 本项目是Spring AI第三方调用整合智谱请言(官网是:https://open.bigmodel.cn)的案例,回答响应流式输出显示,这里使用的是免费模型,需要其他模型可以去 https://www.bigmodel.cn/pricing 切换…...

计算机网络(涵盖OSI,TCP/IP,交换机,路由器,局域网)
一、网络通信基础 (一)网络通信的概念 网络通信是指终端设备之间通过计算机网络进行的信息传递与交流。它类似于现实生活中的物品传递过程:数据(物品)被封装成报文(包裹),通过网络…...

AI-调查研究-01-正念冥想有用吗?对健康的影响及科学指南
点一下关注吧!!!非常感谢!!持续更新!!! 🚀 AI篇持续更新中!(长期更新) 目前2025年06月05日更新到: AI炼丹日志-28 - Aud…...

stm32G473的flash模式是单bank还是双bank?
今天突然有人stm32G473的flash模式是单bank还是双bank?由于时间太久,我真忘记了。搜搜发现,还真有人和我一样。见下面的链接:https://shequ.stmicroelectronics.cn/forum.php?modviewthread&tid644563 根据STM32G4系列参考手…...

相机Camera日志实例分析之二:相机Camx【专业模式开启直方图拍照】单帧流程日志详解
【关注我,后续持续新增专题博文,谢谢!!!】 上一篇我们讲了: 这一篇我们开始讲: 目录 一、场景操作步骤 二、日志基础关键字分级如下 三、场景日志如下: 一、场景操作步骤 操作步…...
Cesium1.95中高性能加载1500个点
一、基本方式: 图标使用.png比.svg性能要好 <template><div id"cesiumContainer"></div><div class"toolbar"><button id"resetButton">重新生成点</button><span id"countDisplay&qu…...
Leetcode 3577. Count the Number of Computer Unlocking Permutations
Leetcode 3577. Count the Number of Computer Unlocking Permutations 1. 解题思路2. 代码实现 题目链接:3577. Count the Number of Computer Unlocking Permutations 1. 解题思路 这一题其实就是一个脑筋急转弯,要想要能够将所有的电脑解锁&#x…...

高等数学(下)题型笔记(八)空间解析几何与向量代数
目录 0 前言 1 向量的点乘 1.1 基本公式 1.2 例题 2 向量的叉乘 2.1 基础知识 2.2 例题 3 空间平面方程 3.1 基础知识 3.2 例题 4 空间直线方程 4.1 基础知识 4.2 例题 5 旋转曲面及其方程 5.1 基础知识 5.2 例题 6 空间曲面的法线与切平面 6.1 基础知识 6.2…...

Springcloud:Eureka 高可用集群搭建实战(服务注册与发现的底层原理与避坑指南)
引言:为什么 Eureka 依然是存量系统的核心? 尽管 Nacos 等新注册中心崛起,但金融、电力等保守行业仍有大量系统运行在 Eureka 上。理解其高可用设计与自我保护机制,是保障分布式系统稳定的必修课。本文将手把手带你搭建生产级 Eur…...

C++ 求圆面积的程序(Program to find area of a circle)
给定半径r,求圆的面积。圆的面积应精确到小数点后5位。 例子: 输入:r 5 输出:78.53982 解释:由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982,因为我们只保留小数点后 5 位数字。 输…...

HDFS分布式存储 zookeeper
hadoop介绍 狭义上hadoop是指apache的一款开源软件 用java语言实现开源框架,允许使用简单的变成模型跨计算机对大型集群进行分布式处理(1.海量的数据存储 2.海量数据的计算)Hadoop核心组件 hdfs(分布式文件存储系统)&a…...

Mysql中select查询语句的执行过程
目录 1、介绍 1.1、组件介绍 1.2、Sql执行顺序 2、执行流程 2.1. 连接与认证 2.2. 查询缓存 2.3. 语法解析(Parser) 2.4、执行sql 1. 预处理(Preprocessor) 2. 查询优化器(Optimizer) 3. 执行器…...