Spring Cloud Stream 消息驱动基础入门与实践总结
Spring Cloud Stream是用于构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务框架,该框架提供了一个灵活的编程模型,它建立在已经建立和熟悉的Spring熟语和最佳实践上,包括支持持久化的发布/订阅、消费组以及消息分区这三个核心概念。
【1】概念介绍
① 什么是Spring Cloud Stream
官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。简单来讲,就是屏蔽了底层XXMQ,应用层不用关注底层是RabbitMQ还是Kafka
。类似于Spring Data抽离持久层屏蔽底层各种数据库的概念。
应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定) ,而 Spring Cloud Stream 的 binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。
通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。
官网文档:https://spring.io/projects/spring-cloud-stream#overview
② stream如何统一底层差异
在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性。
Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者。
通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。
-
通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。
-
通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。
-
通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
③ Spring Cloud Stream标准流程设计
Stream中的消息通信方式遵循了发布-订阅模式。
- Binder:很方便的连接中间件,屏蔽差异
- Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
- Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。
④ 几个API注解
@EnableBinding:指信道channel和exchange绑定在一起。
@StreamListener:监听队列,用于消费者的队列的消息接收
@Input:注解标识输入通道,通过该输入通道接收到的消息进入应用程序。
@Output:注解标识输出通道,发布的消息将通过该通道离开应用程序
下面以RabbitMQ为底层MQ来说明如何使用Stream,当然同样要先安装好RabbitMQ。
【2】消息生产者
① pom依赖
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>
② yml配置
server:port: 8801spring:application:name: cloud-stream-providercloud:stream:binders: #在此配置要绑定的 rabbitmq的服务信息defaultRabbit: # 表示定义的名称,用于 binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理output: # 输出通道的名称destination: studyExchange #表示要使用的 Exchange 名称定义content-type: application/json # 消息类型binder: defaultRabbit
eureka:client:service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳时间间隔默认30slease-expiration-duration-in-seconds: 5 # 如果超过了5秒的间隔默认90sinstance-id: send-8001.com #信息列表显示主机名称prefer-ip-address: true # 访问路径变为ip地址
③ 消息服务类
public interface IMessageProvider {public String send();
}@EnableBinding(Source.class)//定义消息推送管道
@Slf4j
public class IMessageProviderImpl implements IMessageProvider {@Resourceprivate MessageChannel output;//消息发送通道@Overridepublic String send() {String serial = UUID.randomUUID().toString();output.send(MessageBuilder.withPayload(serial).build());log.info(serial+"***********************");return serial;}
}
编写控制器发送消息:
@RestController
public class IMessageController {@Resourceprivate IMessageProvider provider;@GetMapping("/sendMessage")public String send(){return provider.send();}
}
【3】消息消费者
① pom依赖
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>
② yml配置
server:port: 8802spring:application:name: cloud-stream-consumercloud:stream:binders: #在此配置要绑定的 rabbitmq的服务信息defaultRabbit: # 表示定义的名称,用于 binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 输出通道的名称destination: studyExchange #表示要使用的 Exchange 名称定义content-type: application/json # 消息类型binder: defaultRabbitgroup: group1
eureka:client:service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳时间间隔默认30slease-expiration-duration-in-seconds: 5 # 如果超过了5秒的间隔默认90sinstance-id: receive-8002.com #信息列表显示主机名称prefer-ip-address: true # 访问路径变为ip地址
③ 消息接收服务
@Component
@Slf4j
@EnableBinding(Sink.class)
public class StreamController {@Value("${server.port}")private String serverPort;@StreamListener(Sink.INPUT)public void input(Message<String>message){log.info("消费者1号接收到消息"+message.getPayload()+"\t port:"+serverPort);}}
上面【2】【3】即可实现消息的发送和接收,但是假设有多个消费者,我们还要考虑两个问题:消息的重复消费和消息的持久化。
【4】group分组解决重复消费问题
比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。这时我们就可以使用Stream中的消息分组来解决。
在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。
- 不同组是可以全面消费的(重复消费),
- 同一组内会发生竞争关系,只有其中一个可以消费。
也就是说,两个消费者微服务的group定义为同一个,即可以解决重复消费问题
。
【5】group分组解决消息丢失问题
即,在你服务停机重启期间,消息在不断发送,而服务启动后并没有接收到发送的消息。这是由于你没有配置group属性导致的。
解决方案:配置group属性。
spring:application:name: cloud-stream-consumercloud:stream:binders: #在此配置要绑定的 rabbitmq的服务信息defaultRabbit: # 表示定义的名称,用于 binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 输出通道的名称destination: studyExchange #表示要使用的 Exchange 名称定义content-type: application/json # 消息类型binder: defaultRabbitgroup: group1 # 这个很重要!!!
相关文章:

Spring Cloud Stream 消息驱动基础入门与实践总结
Spring Cloud Stream是用于构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务框架,该框架提供了一个灵活的编程模型,它建立在已经建立和熟悉的Spring熟语和最佳实践上,包括支持持久化的发布/订阅、消费组以及消息分区这三个核心概念。…...
你好rust
第一次安装rust,记录一下笔记。 几年前就听说过rust,自己一直是个c爱好者,所以比较抵触rust,早年还有什么rust向上突破群。一直比较抵触,直到这几年rust已经渐渐深入到linux内核、云原生可观测以及zend社区当中&#x…...

STM32 printf 重定向到CAN
最近在调试一款电机驱动板 使用的是CAN总线而且板子上只有一个CAN 想移植Easylogger到上面试试easylogger的效果,先实现pritnf的重定向功能来打印输出 只需要添加以下代码即可实现 代码 #include <stdarg.h> uint8_t FDCAN_UserTxBuffer[512]; void FDCAN_p…...

jmeter性能优化之mysql监控sql慢查询语句分析
接上次博客:基础配置 多用户登录并退出jmx文件:百度网盘 提取码:0000 一、练习jmeter脚本检测mysql慢查询 随意找一个脚本(多用户登录并退出),并发数设置300、500后分别查看mysql监控平台 启动后查看,主要查看mysql…...

海南聚广众达电子商务咨询有限公司引领行业变革
在数字化浪潮席卷全球的今天,电商行业正以前所未有的速度发展。海南聚广众达电子商务咨询有限公司,凭借其在抖音电商领域的深厚积累和不断创新,正逐步成为行业的佼佼者。这家以专注、专业、专注为核心理念的公司,不仅为客户提供全…...

Unity API学习之资源的动态加载
资源的动态加载 在实际游戏开发的更新换代中,随着开发的软件不断更新,我们在脚本中需要拖拽赋值的变量会变空,而要想重新拖拽又太花费时间,因此我们就需要用到Resources.Load<文件类型>("文件名")函数来在一开始…...
C++算法——回溯
回溯算法 实现思想 先看一个实例: //暴力枚举的算法 int n 5; for (int a 1; i < n; i) {for (int b 1; b < n; b){for (int c 1; c < n; c){for (int d 1; d < n; d){for (int e 1; e < n; e){//判断 abcde 是否互补相同if (a ! b &&a…...
java的深拷贝和浅拷贝
总结: 深拷贝:无论是基本类型还是引用类型都会创建新的实例。 浅拷贝:对于基本类型就是复制其值,对于引用类型则是复制了指向这些数据类型的内存地址。 浅拷贝(Shallow Copy) 浅拷贝是指在创建新对象时&am…...

AI产品经理,应掌握哪些技术?
美国的麻省理工学院(Massachusetts Institute of Technology)专门负责科技成果转化商用的部门研究表明: 每一块钱的科研投入,需要100块钱与之配套的投资(人、财、物),才能把思想转化为产品&…...

同三维T80004EHL-W-4K30 4K HDMI编码器,支持WEBRTC协议
输入:1路HDMI1路3.5音频,1路HDMI环出1路3.5音频解嵌输出 4K30超高清,支持U盘/移动硬盘/TF卡录制,支持WEBRTC协议,超低延时,支持3个点外网访问 1个主流1个副流输出,可定制选配POE供电模块,WEBR…...

Hi3861 OpenHarmony嵌入式应用入门--点灯
本篇实现对gpio的控制,通过控制输出进行gpio的点灯操作。 硬件 我们来操作IO2,控制绿色的灯。 软件 GPIO API API名称 说明 hi_u32 hi_gpio_deinit(hi_void); GPIO模块初始化 hi_u32 hi_io_set_pull(hi_io_name id, hi_io_pull val); 设置某个IO…...

SaaS案例分享:成功构建销售渠道的实战经验
面对SaaS产品推广的难题,你是否曾感到迷茫,不知如何选择有效的销售渠道?Shopify独立站联盟营销或许能为你提供新的思路。Shopify作为领先的电商解决方案提供商,其独立站功能为众多商家提供了强大的在线销售平台。而联盟营销&#…...

密钥管理简介
首先我们要知道什么是密钥管理? 密钥管理是一种涉及生成、存储、使用和更新密钥的过程。 密钥的种类 我们知道,对称密码主要包括分组密码和序列密码。但有时也可以将杂凑函数和消息认证码划分为这一类,将它们的密钥称为对称密钥;…...

2024中国应急(消防)品牌巡展成都站成功召开!
汇聚品牌力量,共同相聚成都。6月14日,由中国安全产业协会指导,中国安全产业协会应急创新分会、应急救援产业网联合主办,四川省消防协会协办的“一切为了安全”2024年中国应急(消防)品牌巡展-成都站成功举办。该巡展旨在展示中国应…...

ansible-Role角色批量按照node_export节点,并追加信息到Prometheus文件中
文章目录 剧本功能 inventory.yaml文件定义deploy.yaml角色定义node_exporter_lock角色定义任务角色main.yamlnode_exporter_tasks.yml角色触发任务notifyextra_tasks.yml角色prometheus_node_config.j2模板文件 执行命令查看变量 剧本功能 功能1: 批量执行node_ex…...
求最小公倍数 、小球走过路程计算 题目
题目 JAVA11 求最小公倍数分析:代码:大佬代码: JAVA12 小球走过路程计算分析:代码: JAVA11 求最小公倍数 描述 编写一个方法,该方法的返回值是两个不大于100的正整数的最小公倍数。 输入描述:…...
【Android面试八股文】你能说一说为什么IO是耗时操作?
IO(输入/输出)操作之所以是耗时操作,主要是由于以下几个原因: 1. 物理设备的限制 机械动作:传统的硬盘驱动器(HDD)包含旋转的磁盘和移动的磁头,以读取或写入数据。这些机械动作需要时间完成。虽然固态硬盘(SSD)没有机械部件,但它们仍然受到电子信号传输速度的限制。…...
怎样增强 CLike 游戏的社交功能,促进玩家之间的互动和交流?
要增强CLike游戏的社交功能,以促进玩家之间的互动和交流,可以考虑以下几个方面: 添加聊天功能:在游戏中加入实时聊天功能,让玩家可以在游戏内互相交流。可以通过文本聊天或者语音聊天来实现。 社交平台集成࿱…...

12_YouOnlyLookOnce(YOLOv3)新一代实时目标检测技术
1.1 回顾V1和V2 V1:05_YouOnlyLookOnce(YOLOV1)目标检测领域的革命性突破-CSDN博客 V2:07_YouOnlyLookOnce(YOLOv2)Better,Faster,Stronger-CSDN博客 1.2 简介 YOLOv3(You Only Look Once version 3)是…...

安装 Nuxt.js 的步骤和注意事项
title: 安装 Nuxt.js 的步骤和注意事项 date: 2024/6/17 updated: 2024/6/17 author: cmdragon excerpt: Nuxt.js在Vue.js基础上提供的服务器端渲染框架优势,包括提高开发效率、代码维护性和应用性能。指南详细说明了从环境准备、Nuxt.js安装配置到进阶部署技巧&…...

边缘计算医疗风险自查APP开发方案
核心目标:在便携设备(智能手表/家用检测仪)部署轻量化疾病预测模型,实现低延迟、隐私安全的实时健康风险评估。 一、技术架构设计 #mermaid-svg-iuNaeeLK2YoFKfao {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg…...
IGP(Interior Gateway Protocol,内部网关协议)
IGP(Interior Gateway Protocol,内部网关协议) 是一种用于在一个自治系统(AS)内部传递路由信息的路由协议,主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...

学校招生小程序源码介绍
基于ThinkPHPFastAdminUniApp开发的学校招生小程序源码,专为学校招生场景量身打造,功能实用且操作便捷。 从技术架构来看,ThinkPHP提供稳定可靠的后台服务,FastAdmin加速开发流程,UniApp则保障小程序在多端有良好的兼…...

智能在线客服平台:数字化时代企业连接用户的 AI 中枢
随着互联网技术的飞速发展,消费者期望能够随时随地与企业进行交流。在线客服平台作为连接企业与客户的重要桥梁,不仅优化了客户体验,还提升了企业的服务效率和市场竞争力。本文将探讨在线客服平台的重要性、技术进展、实际应用,并…...
Java 加密常用的各种算法及其选择
在数字化时代,数据安全至关重要,Java 作为广泛应用的编程语言,提供了丰富的加密算法来保障数据的保密性、完整性和真实性。了解这些常用加密算法及其适用场景,有助于开发者在不同的业务需求中做出正确的选择。 一、对称加密算法…...

Unity | AmplifyShaderEditor插件基础(第七集:平面波动shader)
目录 一、👋🏻前言 二、😈sinx波动的基本原理 三、😈波动起来 1.sinx节点介绍 2.vertexPosition 3.集成Vector3 a.节点Append b.连起来 4.波动起来 a.波动的原理 b.时间节点 c.sinx的处理 四、🌊波动优化…...
Device Mapper 机制
Device Mapper 机制详解 Device Mapper(简称 DM)是 Linux 内核中的一套通用块设备映射框架,为 LVM、加密磁盘、RAID 等提供底层支持。本文将详细介绍 Device Mapper 的原理、实现、内核配置、常用工具、操作测试流程,并配以详细的…...

MySQL:分区的基本使用
目录 一、什么是分区二、有什么作用三、分类四、创建分区五、删除分区 一、什么是分区 MySQL 分区(Partitioning)是一种将单张表的数据逻辑上拆分成多个物理部分的技术。这些物理部分(分区)可以独立存储、管理和优化,…...
深入浅出Diffusion模型:从原理到实践的全方位教程
I. 引言:生成式AI的黎明 – Diffusion模型是什么? 近年来,生成式人工智能(Generative AI)领域取得了爆炸性的进展,模型能够根据简单的文本提示创作出逼真的图像、连贯的文本,乃至更多令人惊叹的…...

Ubuntu系统复制(U盘-电脑硬盘)
所需环境 电脑自带硬盘:1块 (1T) U盘1:Ubuntu系统引导盘(用于“U盘2”复制到“电脑自带硬盘”) U盘2:Ubuntu系统盘(1T,用于被复制) !!!建议“电脑…...