玩转SpringCloud Stream
背景及痛点
现如今消息中间件(MQ)在互联网项目中被广泛的应用,特别是大数据行业应用的特别的多,现在市面上也流行这多个消息中间件框架,比如ActiveMQ、RabbitMQ、RocketMQ、Kafka等,这些消息中间件各有各的优劣,但是想要解决的问题都基本相同。由于每个框架都有它自己的使用方式,这无疑是增加了开发者的学习成本以及添加相同的业务复杂度。框架的变更或者多个中间件的混合使用使得业务逻辑代码中中间件的切换、项目的维护和开发都会变得更加繁琐。
有没有一种技术让我们不再需要关注具体MQ的使用细节,我们只需要专注业务逻辑的开发,让程序根据实际项目的使用自己去适配绑定,自动在各种MQ内切换呢?springcloud stream便为此而生。
关于stream
我们用一句话来描述stream就是:屏蔽底层消息中间件的差异,降低切换版本,统一消息的编程模型
官方定义SpringCloud Stream是一个构建消息驱动微服务的框架,应用通过inputs或者outputs来与SpringCloud Stream中的binder对象交互,我们通过配置来绑定消息中间件,而SpringCloud Stream的binder对象负责与消息中间件交互,所以我们只需要搞清楚如何与SpringCloud Stream交互即可方便的使用消息中间件。
SpringCloud Stream通过Spring Integration来连接消息代理中间件以实现消息事件驱动,它提供了个性化的自动化配置,引用了发布订阅、消费组、分区的三个核心概念,但是目前仅支持RabbitMQ和Kafka
设计思想
在此之前
生产者和消费者通过消息媒介(queue等)传递信息内容(Message),消息必须通过特定的通道(MessageChannel),通过消息的发布与订阅来决定消息的发送和消费(publish/subscrib)。
引入中间件
现在假如我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有Exchange,而Kafka有topiche和Partitions分区
(binder中,input对于消费者,output对应生产者。)
这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,但是后面因为业务需求,需要改用另外一种消息队列进行迁移,这时候无疑就是一 个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。
屏蔽底层差异
在没有绑定器(Builder)这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,通过定义绑定器作为中间件,完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道, 使得应用程序不需要再考虑各种不同的消息中间件实现。
通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
处理架构
Stream对消息中间件的进一步封装可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。
其遵循了发布-订阅模式,主要使用的就是Topic主题进行广播,RabbitMQ就是Exchange,在Kafka中就是Topic
通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
stream流程
-
Binder:很方便的连接中间件,屏蔽差异 -
Channel:通道是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过对Channel对队列进行配置 -
Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入
常用api和注解
使用示例
基本环境
注册中心:Eureka,可以是其他。-
消息中间件:RabbitMQrabbitmq:host: localhostport: 5672username: guestpassword: guest
生产端
依赖
<!--stream rabbit -->
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--eureka client-->
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
配置文件
server:port: 8801spring:application:name: cloud-stream-providercloud:# stream 配置stream:binders: # 配置绑定的消息中间件的服务信息defaultRabbit: # 自定义的一个名称,用来下面 bindings 绑定type: rabbit # 消息组件的类型environment: #相关环境配置,设置rabbitmq的环境spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理output: # 通道名称destination: testExchange # 定义要使用的Exchange的名称content-type: application/json # 设置消息类型,对象为json,文本是text/plainbinder: defaultRabbit # 设置要绑定的服务的具体设置,就是我们上面配置的defaultRabbiteureka:client:#表示是否将自己注册进EurekaServer默认为trueregister-with-eureka: true#是否从EurekaServer抓取已有的注册消息,默认为true,单节点无所谓,集群必须设置为true才能配合ribbon使用负载均衡fetch-registry: trueservice-url:#单机版defaultZone: http://localhost:8080/eureka/instance:prefer-ip-address: trueinstance-id: sender01
定义接口
这里需要定义一个接口并实现它,方便其他业务调用。
public interface IMessageProvider {/*** 发送接口* @param msg* @return*/public String send(String msg);
}
接口实现
接口实现中需要添加
@EnableBinding注解,并引入Source.class,为什么引入Source.class呢?因为它是生产者,我们参考stream流程图就可以知道
import com.martain.study.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;@EnableBinding(Source.class)
public class MessageProvider implements IMessageProvider {/*** 注入消息发送管道*/@Resourceprivate MessageChannel output;@Overridepublic String send(String msg) {output.send(MessageBuilder.withPayload(msg).build());System.out.println("******send message:"+msg);return msg;}
}
定义测试controller
@RestController
public class TestController {@AutowiredIMessageProvider messageProvider;@GetMapping("/sendMsg")public String sendMsg(){String msg = UUID.randomUUID().toString();return messageProvider.send(msg);}}
启动类
@SpringBootApplication
public class StreamProviderApplication8801 {public static void main(String[] args) {SpringApplication.run(StreamProviderApplication8801.class,args);}
}
服务启动之后,多次请求/sendMsg,发送了多条消息。
消费端
依赖
<!--stream rabbit --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency><!--eureka client--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency>
配置文件
与生产者类似,只是bindings中的output改成了input
server:port: 8802
spring:application:name: cloud-stream-consumecloud:# stream 配置stream:binders: # 配置绑定的消息中间件的服务信息defaultRabbit: # 自定义的一个名称,用来下面 bindings 绑定type: rabbit # 消息组件的类型environment: #相关环境配置,设置rabbitmq的环境spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 通道名称destination: testExchange # 定义要使用的Exchange的名称content-type: application/json # 设置消息类型,对象为json,文本是text/plainbinder: defaultRabbit # 设置要绑定的服务的具体设置,就是我们上面配置的defaultRabbiteureka:client:#表示是否将自己注册进EurekaServer默认为trueregister-with-eureka: true#是否从EurekaServer抓取已有的注册消息,默认为true,单节点无所谓,集群必须设置为true才能配合ribbon使用负载均衡fetch-registry: trueservice-url:#单机版defaultZone: http://localhost:8080/eureka/instance:prefer-ip-address: trueinstance-id: recover01 接收服务
接收服务只需要再类名前添加
@EnableBinding()注解,并引入Sink.class类,而实际接收的方法中需要添加@StreamListener(Sink.INPUT)注解。
package com.martain.study.controller;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component; @Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {/*** 获取本服务的端口*/@Value("${server.port}")private String serverPort;/*** 这里表示监听sink的input* @param message*/@StreamListener(Sink.INPUT)public void input(Message<String> message){System.out.println("**** recv msg :"+message.getPayload()+" in port "+serverPort);}
} 启动类
@SpringBootApplication
public class StreamConsumerApplication8802 {public static void main(String[] args) {SpringApplication.run(StreamConsumerApplication8802.class,args);}
} 启动生产服务后,在启动消费服务,多次请求生产服务发送消息,我们可以发现消费者能很快的消费这些消息。
消息分组
当我们有多个
消费者时,这个时候生产者生产一条消息,会发现所有的消费者都会消费这个消息。比如在一些订单系统的场景中,如果一个订单被多个处理服务一起获取到,就容易造成数据错误,那我们如何避免这种情况呢?这时我们就可以使用Stream的消息分组来解决重复消费问题。
如何实现Stream的消息分组呢?我们只要简单的在yml文件中配置spring.cloud.stream.bindings.input.group即可。示例如下:
...
spring:application:name: cloud-stream-consumecloud:# stream 配置stream:binders: # 配置绑定的消息中间件的服务信息defaultRabbit: # 自定义的一个名称,用来下面 bindings 绑定type: rabbit # 消息组件的类型environment: #相关环境配置,设置rabbitmq的环境spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 通道名称destination: testExchange # 定义要使用的Exchange的名称content-type: application/json # 设置消息类型,对象为json,文本是text/plainbinder: defaultRabbit # 设置要绑定的服务的具体设置,就是我们上面配置的defaultRabbitgroup: groupA # 配置分组...
如果没有设置该属性,当消费服务启动时,会有个随机的组名。
如果我们将所有的消费服务的group熟悉都设置成一致的话,这些服务就会在同一个组里面,从而能够保证消息只被应用消费一次。
同一组的消费者是竞争关系,不可以重复消费。
消息持久化
当生产者在持续生产消息,消费服务突然挂了,使得拥有许多消息并没有被消费,如果消费没有配置分组的话,消费服务重启是无法消费未消费的消息的,如果配置了分组的话,当消费服务重启之后可以自动去消费未消费的数据。
喜欢的朋友记得点赞、收藏、关注哦!!!
相关文章:
玩转SpringCloud Stream
背景及痛点 现如今消息中间件(MQ)在互联网项目中被广泛的应用,特别是大数据行业应用的特别的多,现在市面上也流行这多个消息中间件框架,比如ActiveMQ、RabbitMQ、RocketMQ、Kafka等,这些消息中间件各有各的优劣,但是想…...
嵌入式经常用到串口,如何判断串口数据接收完成?
说起通信,首先想到的肯定是串口,日常中232和485的使用比比皆是,数据的发送、接收是串口通信最基础的内容。这篇文章主要讨论串口接收数据的断帧操作。 空闲中断断帧 一些mcu(如:stm32f103)在出厂时就已经在…...
iOS App的启动与优化
App的启动流程 App启动分为冷启动和热启动 冷启动:从0开始启动App热启动:App已经在内存中,但是后台还挂着,再次点击图标启动App。 一般对App启动的优化都是针对冷启动。 App冷启动可分为三个阶段: dyld:…...
导出指定文件夹下的文件结构 工具模块-Python
python模块代码 import os import json import xml.etree.ElementTree as ET from typing import List, Optional, Dict, Union from pathlib import Path class DirectoryTreeExporter:def __init__(self,root_path: str,output_file: str,fmt: str txt,show_root: boo…...
Leetcode - 周赛436
目录 一、3446. 按对角线进行矩阵排序二、3447. 将元素分配给有约束条件的组三、3448. 统计可以被最后一个数位整除的子字符串数目四、3449. 最大化游戏分数的最小值 一、3446. 按对角线进行矩阵排序 题目链接 本题可以暴力枚举,在确定了每一个对角线的第一个元素…...
【pytest】编写自动化测试用例命名规范README
API_autoTest 项目介绍 1. pytest命名规范 测试文件: 文件名需要以 test_ 开头或者以 _test.py 结尾。例如,test_login.py、user_management_test.py 这样的命名方式,pytest 能够自动识别并将其作为测试文件来执行其中的测试用例。 测试类…...
Compose常用UI组件
Compose常用UI组件 概述Modifier 修饰符常用Modifier修饰符作用域限定Modifier Modifier 实现原理Modifier.Element链的构建链的解析 常用基础组件常用布局组件列表组件 概述 Compose 预置了很多基础组件,如 Button,TextField,TopAppBar等&a…...
斐波那契数列模型:在动态规划的丝绸之路上追寻斐波那契的足迹(上)
文章目录 引言递归与动态规划的对比递归解法的初探动态规划的优雅与高效自顶向下的记忆化搜索自底向上的迭代法 性能分析与比较小结 引言 斐波那契数列,这一数列如同一条无形的丝线,穿越千年时光,悄然延续其魅力。其定义简单而优美ÿ…...
Hackthebox- Season7- Titanic 简记 [Easy]
简记 ip重定向到 http://titanic.htb,先添加hosts 收集子域名 wfuzz -c -u http://titanic.htb/ -w /usr/share/seclists/Discovery/DNS/subdomains-top1million-20000.txt -H Host:FUZZ.titanic.htb --hl 9 ******************************************************** * Wfu…...
Sa-Token 根据官方文档简单实现登录认证的示例
Sa-Token 根据官方文档实现登录鉴权测试 功能实现步骤依赖配置文件启动类创建 controller启动项目测试不用密码登录查看cookie状态 密码登录查看cookie状态 修改token名称 Apipost 测试无 cookie 模式【使用 token】后端将 token 返回到前端修改代码:测试࿱…...
rustdesk编译修改名字
最近,我用Rust重写了一个2W行C代码的linux内核模块。在此记录一点经验。我此前没写过内核模块,认识比较疏浅,有错误欢迎指正。 为什么要重写? 这个模块2W行代码量看起来不多,却在线上时常故障,永远改不完。…...
BS5852英国家具防火安全条款主要包括哪几个方面呢?
什么是BS5852检测? BS5852是英国针对家用家具的强制性安全要求,主要测试家具在受到燃烧香烟和火柴等火源时的可燃性。这个标准通常分为四个部分进行测试,但实际应用中主要测试第一部分和第二部分,包括烟头测试和利用乙炔火焰模拟…...
【运维】源码编译安装cmake
背景: 已经在本地源码编译安装gcc/g,现在源码安装cmake 下载源码 下载地址:CMake - Upgrade Your Software Build System 安装步骤: ./bootstrap --prefix/usr/local/cmake make make install 错误处理 1、提示找不到libmpc.…...
检测网络安全漏洞 工具
实验一的名称为信息收集和漏洞扫描 实验环境:VMware下的kali linux2021和Windows7 32,网络设置均为NAT,这样子两台机器就在一个网络下。攻击的机器为kali,被攻击的机器为Windows 7。 理论知识记录: 1.信息收集的步骤 2.ping命令…...
frameworks 之 Activity添加View
frameworks 之 Activity添加View 1 LaunchActivityItem1.1 Activity 创建1.2 PhoneWindow 创建1.3 DecorView 创建 2 ResumeActivityItem 讲解 Activity加载View的时机和流程 涉及到的类如下 frameworks/base/core/java/android/app/Activity.javaframeworks/base/services/cor…...
UWB技术中的两种调制方式:PPM与PAM
Ultra-Wideband (UWB) 技术以其低功耗、宽频谱和高精度定位的特点,广泛应用于物联网(IoT)、智能家居、资产追踪和无线通信等领域。在UWB中,信号的调制方式对于数据传输的效率和精度起着至关重要的作用。本文将深入探讨UWB中常用的…...
达梦:用户和模式
目录标题 数据库管理系统与用户权限管理**四权分立****用户管理与权限划分****用户管理界面与权限控制****用户创建与管理****实操**1. **默认创建用户与模式**:2. **用户权限和角色分配**:3. **命令行管理用户与角色**:4. 模式也可以创建 **…...
23. AI-大语言模型-DeepSeek
文章目录 前言一、DeepSeek是什么1. 简介2. 产品版本3. 特征4. 地址链接5. 三种访问方式1. 网页端和APP2. DeepSeek API 二、DeepSeek可以做什么1. 应用场景2. 文本生成1. 文本创作2. 摘要与改写3. 结构化生成 3. 自然语言理解与分析1. 语义分析2. 文本分类3. 知识推理 4. 编程…...
Spring-GPT智谱清言AI项目(附源码)
一、项目介绍 本项目是Spring AI第三方调用整合智谱请言(官网是:https://open.bigmodel.cn)的案例,回答响应流式输出显示,这里使用的是免费模型,需要其他模型可以去 https://www.bigmodel.cn/pricing 切换…...
计算机网络(涵盖OSI,TCP/IP,交换机,路由器,局域网)
一、网络通信基础 (一)网络通信的概念 网络通信是指终端设备之间通过计算机网络进行的信息传递与交流。它类似于现实生活中的物品传递过程:数据(物品)被封装成报文(包裹),通过网络…...
Linux应用开发之网络套接字编程(实例篇)
服务端与客户端单连接 服务端代码 #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> …...
Unity3D中Gfx.WaitForPresent优化方案
前言 在Unity中,Gfx.WaitForPresent占用CPU过高通常表示主线程在等待GPU完成渲染(即CPU被阻塞),这表明存在GPU瓶颈或垂直同步/帧率设置问题。以下是系统的优化方案: 对惹,这里有一个游戏开发交流小组&…...
大数据零基础学习day1之环境准备和大数据初步理解
学习大数据会使用到多台Linux服务器。 一、环境准备 1、VMware 基于VMware构建Linux虚拟机 是大数据从业者或者IT从业者的必备技能之一也是成本低廉的方案 所以VMware虚拟机方案是必须要学习的。 (1)设置网关 打开VMware虚拟机,点击编辑…...
【解密LSTM、GRU如何解决传统RNN梯度消失问题】
解密LSTM与GRU:如何让RNN变得更聪明? 在深度学习的世界里,循环神经网络(RNN)以其卓越的序列数据处理能力广泛应用于自然语言处理、时间序列预测等领域。然而,传统RNN存在的一个严重问题——梯度消失&#…...
鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院挂号小程序
一、开发准备 环境搭建: 安装DevEco Studio 3.0或更高版本配置HarmonyOS SDK申请开发者账号 项目创建: File > New > Create Project > Application (选择"Empty Ability") 二、核心功能实现 1. 医院科室展示 /…...
蓝桥杯3498 01串的熵
问题描述 对于一个长度为 23333333的 01 串, 如果其信息熵为 11625907.5798, 且 0 出现次数比 1 少, 那么这个 01 串中 0 出现了多少次? #include<iostream> #include<cmath> using namespace std;int n 23333333;int main() {//枚举 0 出现的次数//因…...
【C++特殊工具与技术】优化内存分配(一):C++中的内存分配
目录 一、C 内存的基本概念 1.1 内存的物理与逻辑结构 1.2 C 程序的内存区域划分 二、栈内存分配 2.1 栈内存的特点 2.2 栈内存分配示例 三、堆内存分配 3.1 new和delete操作符 4.2 内存泄漏与悬空指针问题 4.3 new和delete的重载 四、智能指针…...
RSS 2025|从说明书学习复杂机器人操作任务:NUS邵林团队提出全新机器人装配技能学习框架Manual2Skill
视觉语言模型(Vision-Language Models, VLMs),为真实环境中的机器人操作任务提供了极具潜力的解决方案。 尽管 VLMs 取得了显著进展,机器人仍难以胜任复杂的长时程任务(如家具装配),主要受限于人…...
Python 训练营打卡 Day 47
注意力热力图可视化 在day 46代码的基础上,对比不同卷积层热力图可视化的结果 import torch import torch.nn as nn import torch.optim as optim from torchvision import datasets, transforms from torch.utils.data import DataLoader import matplotlib.pypl…...
SpringAI实战:ChatModel智能对话全解
一、引言:Spring AI 与 Chat Model 的核心价值 🚀 在 Java 生态中集成大模型能力,Spring AI 提供了高效的解决方案 🤖。其中 Chat Model 作为核心交互组件,通过标准化接口简化了与大语言模型(LLM࿰…...
