Spring Cloud Stream实践
概述
不同中间件,有各自的使用方法,代码也不一样。

可以使用Spring Cloud Stream解耦,切换中间件时,不需要修改代码。实现方式为使用绑定层,绑定层对生产者和消费者提供统一的编码方式,需要连接不同的中间件时,绑定层使用不同的绑定器即可,也就是把切换中间件需要做相应的修改工作交给绑定层来做。

本文的操作是在 微服务调用链路追踪 的基础上进行。
环境说明
jdk1.8
maven3.6.3
mysql8
spring cloud2021.0.8
spring boot2.7.12
idea2022
rabbitmq3.12.4
步骤
消息生产者
创建子模块stream_producer
添加依赖
<dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency></dependencies>
刷新依赖
配置application.yml
server:port: 7001
spring:application:name: stream_producerrabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:output:destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称binders:defaultRabbit:type: rabbit #配置默认的绑定器为rabbit
查看Source.class源码

编写生产者代码,发送一条消息("hello world")到rabbitmq的my-default exchange中
package org.example.stream;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;@EnableBinding(Source.class)
@SpringBootApplication
public class StreamProductApplication implements CommandLineRunner {@Autowiredprivate MessageChannel output;@Overridepublic void run(String... args) throws Exception {//发送消息// messageBuilder 工具类,创建消息output.send(MessageBuilder.withPayload("hello world").build());}public static void main(String[] args) {SpringApplication.run(StreamProductApplication.class, args);}}
查看rabbitmq web UI
http://localhost:15672/
看到Exchanges中还没有my-default
运行StreamProductApplication
刷新rabbitmq Web UI,看到了my-dafault的exchange

消息消费者
创建子模块stream_consumer
添加依赖
<dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency></dependencies>
配置application.yml
server:port: 7002
spring:application:name: stream_consumerrabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:input: #内置获取消息的通道,从destination配置值的exchange中获取信息destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称binders:defaultRabbit:type: rabbit #配置默认的绑定器为rabbit
查看内置通道名称为input

编写消息消费者启动类,在启动类监听接收消息
package org.example.stream;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;@SpringBootApplication
@EnableBinding(Sink.class)
public class StreamConsumerApplication {@StreamListener(Sink.INPUT)public void input(Message<String> message){System.out.println("监听收到:" + message.getPayload());}public static void main(String[] args) {SpringApplication.run(StreamConsumerApplication.class, args);}
}
运行stream_consumer消费者服务,监听消息
运行stream_producer生产者服务,发送消息
查看消费者服务控制台日志,接收到了消息

优化代码
之前把生产和消费的消息都写在启动类中了,代码耦合高。
优化思路是把不同功能的代码分开放。
消息生产者
stream_producer 代码结构如下

package org.example.stream.producer;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;/*** 向中间件发送数据*/
@Component
@EnableBinding(Source.class)
public class MessageSender {@Autowiredprivate MessageChannel output;//通道//发送消息public void send(Object obj){output.send(MessageBuilder.withPayload(obj).build());}
}
修改启动类
package org.example.stream;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;@SpringBootApplication
public class StreamProductApplication {public static void main(String[] args) {SpringApplication.run(StreamProductApplication.class, args);}}
pom.xml添加junit依赖
<dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope>
</dependency>
刷新依赖
编写测试类
在stream_producerm模块的src/test目录下,新建org.example.stream包,再建出ProducerTest类,代码如下
package org.example.stream;import org.example.stream.producer.MessageSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ProducerTest {@Autowiredprivate MessageSender messageSender;//注入发送消息工具类@Testpublic void testSend(){messageSender.send("hello world");}
}
消息消费者
stream_consumer代码结构如下

添加MessageListener类获取消息
package org.example.stream.consumer;import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;@Component
@EnableBinding(Sink.class)
public class MessageListener {// 监听binding中的信息@StreamListener(Sink.INPUT)public void input(String message){System.out.println("获取信息:" + message);}
}
修改启动类
package org.example.stream;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;@SpringBootApplication
public class StreamConsumerApplication {public static void main(String[] args) {SpringApplication.run(StreamConsumerApplication.class, args);}}
启动consumer接收消息
执行producer单元测试类ProducerTest的testSend()方法,发送消息
查看consumer控制台输出,接收到信息了

代码解耦后,同样能成功生产消息和消费消息。
自定义消息通道
此前使用默认的消息通道output和input。
也可以自己定义消息通道,例如:myoutput和myinput
消息生产者
在org.example.stream包下新建channel包,在channel包下新建MyProcessor接口类
package org.example.stream.channel;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;/*** 自定义的消息通道*/
public interface MyProcessor {/*** 消息生产这的配置*/String MYOUTPUT = "myoutput";@Output("myoutput")MessageChannel myoutput();/*** 消息消费者的配置*/String MYINPUT = "myinput";@Input("myinput")SubscribableChannel myinput();
}
修改MessageSender类
package org.example.stream.producer;import org.example.stream.channel.MyProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;/*** 向中间件发送数据*/
@Component
@EnableBinding(MyProcessor.class)
public class MessageSender {@Autowiredprivate MessageChannel myoutput;//通道//发送消息public void send(Object obj){myoutput.send(MessageBuilder.withPayload(obj).build());}
}
修改application.yml
cloud:stream:bindings:output:destination: my-default #指定消息发送的目的地myoutput:destination: custom-output

消息消费者
在stream_consumer服务的org.example.stream包下新建channel包,在channel包下新建MyProcessor接口类
package org.example.stream.channel;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;/*** 自定义的消息通道*/
public interface MyProcessor {/*** 消息生产者的配置*/String MYOUTPUT = "myoutput";@Output("myoutput")MessageChannel myoutput();/*** 消息消费者的配置*/String MYINPUT = "myinput";@Input("myinput")SubscribableChannel myinput();
}
修改MessageListener类
package org.example.stream.stream;import org.example.stream.channel.MyProcessor;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;@Component
@EnableBinding(MyProcessor.class)
public class MessageListener {// 监听binding中的信息@StreamListener(MyProcessor.MYINPUT)public void input(String message){System.out.println("获取信息:" + message);}
}
修改application.yml配置
cloud:stream:bindings:input: #内置获取消息的通道,从destination配置值的exchange中获取信息destination: my-default #指定消息发送的目的地myinput:destination: custom-output

测试
启动stream_consumer
运行单元测试的testSend()方法生产消息
查看stream_consumer控制台,能看到生产的消息,如下
获取信息:hello world
消息分组
采用复制配置方式运行两个消费者
启动第一个消费者(端口为7002)
修改端口为7003,copy configuration,再启动另一个消费者
执行生产者单元测试生产消息,看到两个消费者都接收到了信息

说明:如果有两个消费者,生产一条消息后,两个消费者均能收到信息。
但当我们发送一条消息只需要其中一个消费者消费消息时,这时候就需要用到消息分组,发送一条消息消费者组内只有一个消费者消费到。
我们只需要在服务消费者端设置spring.cloud.stream.bindings.input.group 属性即可
重启两个消费者
修改端口号为7002,重新启动第一个消费者
修改端口号为7003,重新启动第二个消费者
生产者生产一条消息
查看消费者接收消息情况,只有一个消费者接收到信息。

消息分区
消息分区就是实现特定消息只往特定机器发送。
修改生产者配置
cloud:stream:bindings:output:destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称myoutput:destination: custom-outputproducer:partition-key-expression: payload #分区关键字 可以是对象中的id,或对象partition-count: 2 #分区数量

修改消费者1的application.yml配置
server:port: 7002
spring:application:name: stream_consumerrabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:input: #内置获取消息的通道,从destination配置值的exchange中获取信息destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称myinput:destination: custom-outputgroup: group1 #消息分组,有多个消费者时,只有一个消费者接收到信息consumer:partitioned: true #开启分区支持binders:defaultRabbit:type: rabbit #配置默认的绑定器为rabbitinstance-count: 2 #消费者总数instance-index: 0 #当前消费者的索引

启动消费者1
修改消费者2的配置
server:port: 7003
spring:application:name: stream_consumerrabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:input: #内置获取消息的通道,从destination配置值的exchange中获取信息destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称myinput:destination: custom-outputgroup: group1 #消息分组,有多个消费者时,只有一个消费者接收到信息consumer:partitioned: true #开启分区支持binders:defaultRabbit:type: rabbit #配置默认的绑定器为rabbitinstance-count: 2 #消费者总数instance-index: 1 #当前消费者的索引
修改端口号为7003,当前消费者的索引instance-index的值修改为1
启动消费者2
生产者发送消息,看到只有Application(2)接收到消息

再用生产者发送一次消息,也是Application(2)接收到消息


说明实现了消息分区
也可以更改发送的数据,看是否能发送到不同消费者
修改生产者,发送数据由hello world变为hello world1,同时发送5次
public void testSend(){for (int i = 0; i < 5; i++) {messageSender.send("hello world1");}}
看到hello world1全部被Application消费


所以消息分区是根据发送的消息不同,发送到不同消费者中。
完成!enjoy it!
相关文章:
Spring Cloud Stream实践
概述 不同中间件,有各自的使用方法,代码也不一样。 可以使用Spring Cloud Stream解耦,切换中间件时,不需要修改代码。实现方式为使用绑定层,绑定层对生产者和消费者提供统一的编码方式,需要连接不同的中间…...
高精度算法【Java】(待更新中~)
高进度加法 在Java中可以使用BigInteger进行高精度计算,除此也可以仿照竖式相加的计算原理进行计算。 BigInteger 提供所有 Java 的基本整数操作符的对应物,并提供 java.lang.Math 的所有相关方法。另外,BigInteger 还提供以下运算࿱…...
说一说HTTP1.0、1.1、2.0版本区别和优化
说一说HTTP1.0、1.1、2.0版本区别和优化 HTTP(Hypertext Transfer Protocol)是一种用于传输超文本的应用层协议。 在不同的版本中,HTTP经历了一系列的演进和改进,主要包括HTTP 1.0、HTTP 1.1和HTTP 2.0。 下面详细解释它们之间…...
51.Sentinel微服务保护
目录 (1)初识Sentinel。 (1.1)雪崩问题及解决方案。 (1.1.1)雪崩问题。 (1.1.2)解决雪崩问题的四种方式。 (1.1.3)总结。 (1.2)…...
【Java 进阶篇】Ajax 实现——JQuery 实现方式 `ajax()`
嗨,亲爱的读者们!欢迎来到这篇关于使用 jQuery 中的 ajax() 方法进行 Ajax 请求的博客。在前端开发中,jQuery 提供了简便而强大的工具,其中 ajax() 方法为我们处理异步请求提供了便捷的解决方案。无需手动创建 XMLHttpRequest 对象…...
I.MX6ULL开发笔记(一)——环境搭建、镜像烧录、网络连接
本系列为使用野火IMX6ULL开发的学习笔记,使用的开发板为如下: 具有的硬件资源有如下: 文章目录 一、环境搭建Win11安装WSL安装串口驱动安装串口工具安装Ubuntu与windows文件互传 二、镜像烧录修改串口终端登录前信息 三、fire-config工具配…...
Javaweb之Ajax的详细解析
1.1 Ajax介绍 1.1.1 Ajax概述 我们前端页面中的数据,如下图所示的表格中的学生信息,应该来自于后台,那么我们的后台和前端是互不影响的2个程序,那么我们前端应该如何从后台获取数据呢?因为是2个程序,所以…...
java基于RestTemplate的微服务发起http请求
实现的效果...
django理解02 前后端分离中的问题
前后端分离相对于传统方式的问题 前后端数据交换的问题跨域问题 页面js往自身程序(django服务)发送请求,这是浏览器默认接受响应 而请求其它地方是浏览器认为存在潜在危险。自动隔离请求!!! 跨域问题的解决…...
设计模式-迭代器模式-笔记
动机(Motivaton) 在软件构建过程中,集合对象内部结构常常变化各异。但对于这些集合对象,我们呢希望在不暴露其内部结构的同时,可以让外部客户代码透明地访问其中包含的元素;同时这种“透明遍历”也为“同一…...
【数据结构】C语言实现队列
目录 前言 1. 队列 1.1 队列的概念 1.2 队列的结构 2. 队列的实现 2.1 队列的定义 2.2 队列的初始化 2.3 入队 2.4 出队 2.5 获取队头元素 2.6 获取队尾元素 2.7 判断空队列 2.8 队列的销毁 3. 队列完整源码 Queue.h Queue.c 🎈个人主页:…...
牛客——OR36 链表的回文结构(C语言,配图,快慢指针)
目录 思路一:链表翻转 思路二:快慢指针,分别从头和尾间开始比较 本题是没有对C的支持的,但因为CPP支持C,所以这里就用C写了,可以面向更多用户 链表的回文结构_牛客题霸_牛客网 (nowcoder.com) 思路一&am…...
Docker build 技巧 —— 筑梦之路
实现目标 更快的构建速度 更小的Docker镜像大小 更少的Docker镜像层 充分利用镜像缓存 增加Dockerfile可读性 让Docker容器使用起来更简单 如何实现 编写.dockerignore文件 容器只运行单个应用 将多个RUN指令合并为一个 基础镜像的标签不要用latest 每个RUN指令后删除…...
2 Redis的高级数据结构
1、Bitmaps 首先,最经典的应用场景就是用户日活的统计,比如说签到等。 字段串:“dbydc”,根据对应的ASCII表,最后可以得到对应的二进制,如图所示 一个字符占8位(bit),…...
Hive默认分割符、存储格式与数据压缩
目录 1、Hive默认分割符2、Hive存储格式3、Hive数据压缩 1、Hive默认分割符 Hive创建表时指定的行受限(ROW FORMAT)配置标准HQL为: ... ROW FORMAT DELIMITED FIELDS TERMINATED BY \u0001 COLLECTION ITEMS TERMINATED BY , MAP KEYS TERMI…...
update_engine-FilesystemVerifierAction和PostinstallRunnerAction
在介绍完了DownloadAction之后,还剩下FilesystemVerifierAction和PostinstallRunnerAction,下面开始对其进行分析。 FilesystemVerifierAction 在数据下载完成后,在DownloadAction中会切换到FilesystemVerifierAction void DownloadAction:…...
深度学习乳腺癌分类 计算机竞赛
文章目录 1 前言2 前言3 数据集3.1 良性样本3.2 病变样本 4 开发环境5 代码实现5.1 实现流程5.2 部分代码实现5.2.1 导入库5.2.2 图像加载5.2.3 标记5.2.4 分组5.2.5 构建模型训练 6 分析指标6.1 精度,召回率和F1度量6.2 混淆矩阵 7 结果和结论8 最后 1 前言 &…...
【Python百宝箱】掌握Python Web开发三剑客:Flask、Django、FastAPI一网打尽
前言 在当今互联网时代,Web应用的开发变得愈发重要和复杂。选择一个合适的Web框架,掌握安全性与认证、数据库与ORM库、前端框架与交互、测试与调试工具等关键知识点,是每个Web开发者都必须面对的挑战。本文将带你深入了解三个流行的Python W…...
【人工智能时代的刑法体系与责任主体概述】
第一节:引言 随着科技的快速发展,人工智能 (Artificial Intelligence, AI) 正日益成为我们生活中不可或缺的一部分。从自动驾驶汽车到语音助手,从智能家居到金融机器人,AI 的广泛应用正不断改变着我们的生活方式和社会结构。然而…...
透视maven打包编译正常,intellj idea编译失败问题的本质
前言 maven多模块类型的项目,在Java的中大型应用中非常常见, 在 module 很多的情况,经常会出现各种各样的编辑依赖错误问题,今天记录一种比较常见的 case : A 子模块依赖 B 子模块,在 Terminal 上终端上 …...
日语AI面试高效通关秘籍:专业解读与青柚面试智能助攻
在如今就业市场竞争日益激烈的背景下,越来越多的求职者将目光投向了日本及中日双语岗位。但是,一场日语面试往往让许多人感到步履维艰。你是否也曾因为面试官抛出的“刁钻问题”而心生畏惧?面对生疏的日语交流环境,即便提前恶补了…...
23-Oracle 23 ai 区块链表(Blockchain Table)
小伙伴有没有在金融强合规的领域中遇见,必须要保持数据不可变,管理员都无法修改和留痕的要求。比如医疗的电子病历中,影像检查检验结果不可篡改行的,药品追溯过程中数据只可插入无法删除的特性需求;登录日志、修改日志…...
基于Uniapp开发HarmonyOS 5.0旅游应用技术实践
一、技术选型背景 1.跨平台优势 Uniapp采用Vue.js框架,支持"一次开发,多端部署",可同步生成HarmonyOS、iOS、Android等多平台应用。 2.鸿蒙特性融合 HarmonyOS 5.0的分布式能力与原子化服务,为旅游应用带来…...
SpringBoot+uniapp 的 Champion 俱乐部微信小程序设计与实现,论文初版实现
摘要 本论文旨在设计并实现基于 SpringBoot 和 uniapp 的 Champion 俱乐部微信小程序,以满足俱乐部线上活动推广、会员管理、社交互动等需求。通过 SpringBoot 搭建后端服务,提供稳定高效的数据处理与业务逻辑支持;利用 uniapp 实现跨平台前…...
Android15默认授权浮窗权限
我们经常有那种需求,客户需要定制的apk集成在ROM中,并且默认授予其【显示在其他应用的上层】权限,也就是我们常说的浮窗权限,那么我们就可以通过以下方法在wms、ams等系统服务的systemReady()方法中调用即可实现预置应用默认授权浮…...
高防服务器能够抵御哪些网络攻击呢?
高防服务器作为一种有着高度防御能力的服务器,可以帮助网站应对分布式拒绝服务攻击,有效识别和清理一些恶意的网络流量,为用户提供安全且稳定的网络环境,那么,高防服务器一般都可以抵御哪些网络攻击呢?下面…...
Swagger和OpenApi的前世今生
Swagger与OpenAPI的关系演进是API标准化进程中的重要篇章,二者共同塑造了现代RESTful API的开发范式。 本期就扒一扒其技术演进的关键节点与核心逻辑: 🔄 一、起源与初创期:Swagger的诞生(2010-2014) 核心…...
以光量子为例,详解量子获取方式
光量子技术获取量子比特可在室温下进行。该方式有望通过与名为硅光子学(silicon photonics)的光波导(optical waveguide)芯片制造技术和光纤等光通信技术相结合来实现量子计算机。量子力学中,光既是波又是粒子。光子本…...
Linux 内存管理实战精讲:核心原理与面试常考点全解析
Linux 内存管理实战精讲:核心原理与面试常考点全解析 Linux 内核内存管理是系统设计中最复杂但也最核心的模块之一。它不仅支撑着虚拟内存机制、物理内存分配、进程隔离与资源复用,还直接决定系统运行的性能与稳定性。无论你是嵌入式开发者、内核调试工…...
C# 表达式和运算符(求值顺序)
求值顺序 表达式可以由许多嵌套的子表达式构成。子表达式的求值顺序可以使表达式的最终值发生 变化。 例如,已知表达式3*52,依照子表达式的求值顺序,有两种可能的结果,如图9-3所示。 如果乘法先执行,结果是17。如果5…...
