Spring Cloud Stream实践
概述
不同中间件,有各自的使用方法,代码也不一样。

可以使用Spring Cloud Stream解耦,切换中间件时,不需要修改代码。实现方式为使用绑定层,绑定层对生产者和消费者提供统一的编码方式,需要连接不同的中间件时,绑定层使用不同的绑定器即可,也就是把切换中间件需要做相应的修改工作交给绑定层来做。

本文的操作是在 微服务调用链路追踪 的基础上进行。
环境说明
jdk1.8
maven3.6.3
mysql8
spring cloud2021.0.8
spring boot2.7.12
idea2022
rabbitmq3.12.4
步骤
消息生产者
创建子模块stream_producer
添加依赖
<dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency></dependencies>
刷新依赖
配置application.yml
server:port: 7001
spring:application:name: stream_producerrabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:output:destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称binders:defaultRabbit:type: rabbit #配置默认的绑定器为rabbit
查看Source.class源码

编写生产者代码,发送一条消息("hello world")到rabbitmq的my-default exchange中
package org.example.stream;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;@EnableBinding(Source.class)
@SpringBootApplication
public class StreamProductApplication implements CommandLineRunner {@Autowiredprivate MessageChannel output;@Overridepublic void run(String... args) throws Exception {//发送消息// messageBuilder 工具类,创建消息output.send(MessageBuilder.withPayload("hello world").build());}public static void main(String[] args) {SpringApplication.run(StreamProductApplication.class, args);}}
查看rabbitmq web UI
http://localhost:15672/
看到Exchanges中还没有my-default
运行StreamProductApplication
刷新rabbitmq Web UI,看到了my-dafault的exchange

消息消费者
创建子模块stream_consumer
添加依赖
<dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency></dependencies>
配置application.yml
server:port: 7002
spring:application:name: stream_consumerrabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:input: #内置获取消息的通道,从destination配置值的exchange中获取信息destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称binders:defaultRabbit:type: rabbit #配置默认的绑定器为rabbit
查看内置通道名称为input

编写消息消费者启动类,在启动类监听接收消息
package org.example.stream;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;@SpringBootApplication
@EnableBinding(Sink.class)
public class StreamConsumerApplication {@StreamListener(Sink.INPUT)public void input(Message<String> message){System.out.println("监听收到:" + message.getPayload());}public static void main(String[] args) {SpringApplication.run(StreamConsumerApplication.class, args);}
}
运行stream_consumer消费者服务,监听消息
运行stream_producer生产者服务,发送消息
查看消费者服务控制台日志,接收到了消息

优化代码
之前把生产和消费的消息都写在启动类中了,代码耦合高。
优化思路是把不同功能的代码分开放。
消息生产者
stream_producer 代码结构如下

package org.example.stream.producer;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;/*** 向中间件发送数据*/
@Component
@EnableBinding(Source.class)
public class MessageSender {@Autowiredprivate MessageChannel output;//通道//发送消息public void send(Object obj){output.send(MessageBuilder.withPayload(obj).build());}
}
修改启动类
package org.example.stream;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;@SpringBootApplication
public class StreamProductApplication {public static void main(String[] args) {SpringApplication.run(StreamProductApplication.class, args);}}
pom.xml添加junit依赖
<dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope>
</dependency>
刷新依赖
编写测试类
在stream_producerm模块的src/test目录下,新建org.example.stream包,再建出ProducerTest类,代码如下
package org.example.stream;import org.example.stream.producer.MessageSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ProducerTest {@Autowiredprivate MessageSender messageSender;//注入发送消息工具类@Testpublic void testSend(){messageSender.send("hello world");}
}
消息消费者
stream_consumer代码结构如下

添加MessageListener类获取消息
package org.example.stream.consumer;import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;@Component
@EnableBinding(Sink.class)
public class MessageListener {// 监听binding中的信息@StreamListener(Sink.INPUT)public void input(String message){System.out.println("获取信息:" + message);}
}
修改启动类
package org.example.stream;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;@SpringBootApplication
public class StreamConsumerApplication {public static void main(String[] args) {SpringApplication.run(StreamConsumerApplication.class, args);}}
启动consumer接收消息
执行producer单元测试类ProducerTest的testSend()方法,发送消息
查看consumer控制台输出,接收到信息了

代码解耦后,同样能成功生产消息和消费消息。
自定义消息通道
此前使用默认的消息通道output和input。
也可以自己定义消息通道,例如:myoutput和myinput
消息生产者
在org.example.stream包下新建channel包,在channel包下新建MyProcessor接口类
package org.example.stream.channel;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;/*** 自定义的消息通道*/
public interface MyProcessor {/*** 消息生产这的配置*/String MYOUTPUT = "myoutput";@Output("myoutput")MessageChannel myoutput();/*** 消息消费者的配置*/String MYINPUT = "myinput";@Input("myinput")SubscribableChannel myinput();
}
修改MessageSender类
package org.example.stream.producer;import org.example.stream.channel.MyProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;/*** 向中间件发送数据*/
@Component
@EnableBinding(MyProcessor.class)
public class MessageSender {@Autowiredprivate MessageChannel myoutput;//通道//发送消息public void send(Object obj){myoutput.send(MessageBuilder.withPayload(obj).build());}
}
修改application.yml
cloud:stream:bindings:output:destination: my-default #指定消息发送的目的地myoutput:destination: custom-output

消息消费者
在stream_consumer服务的org.example.stream包下新建channel包,在channel包下新建MyProcessor接口类
package org.example.stream.channel;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;/*** 自定义的消息通道*/
public interface MyProcessor {/*** 消息生产者的配置*/String MYOUTPUT = "myoutput";@Output("myoutput")MessageChannel myoutput();/*** 消息消费者的配置*/String MYINPUT = "myinput";@Input("myinput")SubscribableChannel myinput();
}
修改MessageListener类
package org.example.stream.stream;import org.example.stream.channel.MyProcessor;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;@Component
@EnableBinding(MyProcessor.class)
public class MessageListener {// 监听binding中的信息@StreamListener(MyProcessor.MYINPUT)public void input(String message){System.out.println("获取信息:" + message);}
}
修改application.yml配置
cloud:stream:bindings:input: #内置获取消息的通道,从destination配置值的exchange中获取信息destination: my-default #指定消息发送的目的地myinput:destination: custom-output

测试
启动stream_consumer
运行单元测试的testSend()方法生产消息
查看stream_consumer控制台,能看到生产的消息,如下
获取信息:hello world
消息分组
采用复制配置方式运行两个消费者
启动第一个消费者(端口为7002)
修改端口为7003,copy configuration,再启动另一个消费者
执行生产者单元测试生产消息,看到两个消费者都接收到了信息

说明:如果有两个消费者,生产一条消息后,两个消费者均能收到信息。
但当我们发送一条消息只需要其中一个消费者消费消息时,这时候就需要用到消息分组,发送一条消息消费者组内只有一个消费者消费到。
我们只需要在服务消费者端设置spring.cloud.stream.bindings.input.group 属性即可
重启两个消费者
修改端口号为7002,重新启动第一个消费者
修改端口号为7003,重新启动第二个消费者
生产者生产一条消息
查看消费者接收消息情况,只有一个消费者接收到信息。

消息分区
消息分区就是实现特定消息只往特定机器发送。
修改生产者配置
cloud:stream:bindings:output:destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称myoutput:destination: custom-outputproducer:partition-key-expression: payload #分区关键字 可以是对象中的id,或对象partition-count: 2 #分区数量

修改消费者1的application.yml配置
server:port: 7002
spring:application:name: stream_consumerrabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:input: #内置获取消息的通道,从destination配置值的exchange中获取信息destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称myinput:destination: custom-outputgroup: group1 #消息分组,有多个消费者时,只有一个消费者接收到信息consumer:partitioned: true #开启分区支持binders:defaultRabbit:type: rabbit #配置默认的绑定器为rabbitinstance-count: 2 #消费者总数instance-index: 0 #当前消费者的索引

启动消费者1
修改消费者2的配置
server:port: 7003
spring:application:name: stream_consumerrabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:input: #内置获取消息的通道,从destination配置值的exchange中获取信息destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称myinput:destination: custom-outputgroup: group1 #消息分组,有多个消费者时,只有一个消费者接收到信息consumer:partitioned: true #开启分区支持binders:defaultRabbit:type: rabbit #配置默认的绑定器为rabbitinstance-count: 2 #消费者总数instance-index: 1 #当前消费者的索引
修改端口号为7003,当前消费者的索引instance-index的值修改为1
启动消费者2
生产者发送消息,看到只有Application(2)接收到消息

再用生产者发送一次消息,也是Application(2)接收到消息


说明实现了消息分区
也可以更改发送的数据,看是否能发送到不同消费者
修改生产者,发送数据由hello world变为hello world1,同时发送5次
public void testSend(){for (int i = 0; i < 5; i++) {messageSender.send("hello world1");}}
看到hello world1全部被Application消费


所以消息分区是根据发送的消息不同,发送到不同消费者中。
完成!enjoy it!
相关文章:
Spring Cloud Stream实践
概述 不同中间件,有各自的使用方法,代码也不一样。 可以使用Spring Cloud Stream解耦,切换中间件时,不需要修改代码。实现方式为使用绑定层,绑定层对生产者和消费者提供统一的编码方式,需要连接不同的中间…...
高精度算法【Java】(待更新中~)
高进度加法 在Java中可以使用BigInteger进行高精度计算,除此也可以仿照竖式相加的计算原理进行计算。 BigInteger 提供所有 Java 的基本整数操作符的对应物,并提供 java.lang.Math 的所有相关方法。另外,BigInteger 还提供以下运算࿱…...
说一说HTTP1.0、1.1、2.0版本区别和优化
说一说HTTP1.0、1.1、2.0版本区别和优化 HTTP(Hypertext Transfer Protocol)是一种用于传输超文本的应用层协议。 在不同的版本中,HTTP经历了一系列的演进和改进,主要包括HTTP 1.0、HTTP 1.1和HTTP 2.0。 下面详细解释它们之间…...
51.Sentinel微服务保护
目录 (1)初识Sentinel。 (1.1)雪崩问题及解决方案。 (1.1.1)雪崩问题。 (1.1.2)解决雪崩问题的四种方式。 (1.1.3)总结。 (1.2)…...
【Java 进阶篇】Ajax 实现——JQuery 实现方式 `ajax()`
嗨,亲爱的读者们!欢迎来到这篇关于使用 jQuery 中的 ajax() 方法进行 Ajax 请求的博客。在前端开发中,jQuery 提供了简便而强大的工具,其中 ajax() 方法为我们处理异步请求提供了便捷的解决方案。无需手动创建 XMLHttpRequest 对象…...
I.MX6ULL开发笔记(一)——环境搭建、镜像烧录、网络连接
本系列为使用野火IMX6ULL开发的学习笔记,使用的开发板为如下: 具有的硬件资源有如下: 文章目录 一、环境搭建Win11安装WSL安装串口驱动安装串口工具安装Ubuntu与windows文件互传 二、镜像烧录修改串口终端登录前信息 三、fire-config工具配…...
Javaweb之Ajax的详细解析
1.1 Ajax介绍 1.1.1 Ajax概述 我们前端页面中的数据,如下图所示的表格中的学生信息,应该来自于后台,那么我们的后台和前端是互不影响的2个程序,那么我们前端应该如何从后台获取数据呢?因为是2个程序,所以…...
java基于RestTemplate的微服务发起http请求
实现的效果...
django理解02 前后端分离中的问题
前后端分离相对于传统方式的问题 前后端数据交换的问题跨域问题 页面js往自身程序(django服务)发送请求,这是浏览器默认接受响应 而请求其它地方是浏览器认为存在潜在危险。自动隔离请求!!! 跨域问题的解决…...
设计模式-迭代器模式-笔记
动机(Motivaton) 在软件构建过程中,集合对象内部结构常常变化各异。但对于这些集合对象,我们呢希望在不暴露其内部结构的同时,可以让外部客户代码透明地访问其中包含的元素;同时这种“透明遍历”也为“同一…...
【数据结构】C语言实现队列
目录 前言 1. 队列 1.1 队列的概念 1.2 队列的结构 2. 队列的实现 2.1 队列的定义 2.2 队列的初始化 2.3 入队 2.4 出队 2.5 获取队头元素 2.6 获取队尾元素 2.7 判断空队列 2.8 队列的销毁 3. 队列完整源码 Queue.h Queue.c 🎈个人主页:…...
牛客——OR36 链表的回文结构(C语言,配图,快慢指针)
目录 思路一:链表翻转 思路二:快慢指针,分别从头和尾间开始比较 本题是没有对C的支持的,但因为CPP支持C,所以这里就用C写了,可以面向更多用户 链表的回文结构_牛客题霸_牛客网 (nowcoder.com) 思路一&am…...
Docker build 技巧 —— 筑梦之路
实现目标 更快的构建速度 更小的Docker镜像大小 更少的Docker镜像层 充分利用镜像缓存 增加Dockerfile可读性 让Docker容器使用起来更简单 如何实现 编写.dockerignore文件 容器只运行单个应用 将多个RUN指令合并为一个 基础镜像的标签不要用latest 每个RUN指令后删除…...
2 Redis的高级数据结构
1、Bitmaps 首先,最经典的应用场景就是用户日活的统计,比如说签到等。 字段串:“dbydc”,根据对应的ASCII表,最后可以得到对应的二进制,如图所示 一个字符占8位(bit),…...
Hive默认分割符、存储格式与数据压缩
目录 1、Hive默认分割符2、Hive存储格式3、Hive数据压缩 1、Hive默认分割符 Hive创建表时指定的行受限(ROW FORMAT)配置标准HQL为: ... ROW FORMAT DELIMITED FIELDS TERMINATED BY \u0001 COLLECTION ITEMS TERMINATED BY , MAP KEYS TERMI…...
update_engine-FilesystemVerifierAction和PostinstallRunnerAction
在介绍完了DownloadAction之后,还剩下FilesystemVerifierAction和PostinstallRunnerAction,下面开始对其进行分析。 FilesystemVerifierAction 在数据下载完成后,在DownloadAction中会切换到FilesystemVerifierAction void DownloadAction:…...
深度学习乳腺癌分类 计算机竞赛
文章目录 1 前言2 前言3 数据集3.1 良性样本3.2 病变样本 4 开发环境5 代码实现5.1 实现流程5.2 部分代码实现5.2.1 导入库5.2.2 图像加载5.2.3 标记5.2.4 分组5.2.5 构建模型训练 6 分析指标6.1 精度,召回率和F1度量6.2 混淆矩阵 7 结果和结论8 最后 1 前言 &…...
【Python百宝箱】掌握Python Web开发三剑客:Flask、Django、FastAPI一网打尽
前言 在当今互联网时代,Web应用的开发变得愈发重要和复杂。选择一个合适的Web框架,掌握安全性与认证、数据库与ORM库、前端框架与交互、测试与调试工具等关键知识点,是每个Web开发者都必须面对的挑战。本文将带你深入了解三个流行的Python W…...
【人工智能时代的刑法体系与责任主体概述】
第一节:引言 随着科技的快速发展,人工智能 (Artificial Intelligence, AI) 正日益成为我们生活中不可或缺的一部分。从自动驾驶汽车到语音助手,从智能家居到金融机器人,AI 的广泛应用正不断改变着我们的生活方式和社会结构。然而…...
透视maven打包编译正常,intellj idea编译失败问题的本质
前言 maven多模块类型的项目,在Java的中大型应用中非常常见, 在 module 很多的情况,经常会出现各种各样的编辑依赖错误问题,今天记录一种比较常见的 case : A 子模块依赖 B 子模块,在 Terminal 上终端上 …...
RestClient
什么是RestClient RestClient 是 Elasticsearch 官方提供的 Java 低级 REST 客户端,它允许HTTP与Elasticsearch 集群通信,而无需处理 JSON 序列化/反序列化等底层细节。它是 Elasticsearch Java API 客户端的基础。 RestClient 主要特点 轻量级ÿ…...
进程地址空间(比特课总结)
一、进程地址空间 1. 环境变量 1 )⽤户级环境变量与系统级环境变量 全局属性:环境变量具有全局属性,会被⼦进程继承。例如当bash启动⼦进程时,环 境变量会⾃动传递给⼦进程。 本地变量限制:本地变量只在当前进程(ba…...
Java如何权衡是使用无序的数组还是有序的数组
在 Java 中,选择有序数组还是无序数组取决于具体场景的性能需求与操作特点。以下是关键权衡因素及决策指南: ⚖️ 核心权衡维度 维度有序数组无序数组查询性能二分查找 O(log n) ✅线性扫描 O(n) ❌插入/删除需移位维护顺序 O(n) ❌直接操作尾部 O(1) ✅内存开销与无序数组相…...
【Web 进阶篇】优雅的接口设计:统一响应、全局异常处理与参数校验
系列回顾: 在上一篇中,我们成功地为应用集成了数据库,并使用 Spring Data JPA 实现了基本的 CRUD API。我们的应用现在能“记忆”数据了!但是,如果你仔细审视那些 API,会发现它们还很“粗糙”:有…...
三体问题详解
从物理学角度,三体问题之所以不稳定,是因为三个天体在万有引力作用下相互作用,形成一个非线性耦合系统。我们可以从牛顿经典力学出发,列出具体的运动方程,并说明为何这个系统本质上是混沌的,无法得到一般解…...
前端开发面试题总结-JavaScript篇(一)
文章目录 JavaScript高频问答一、作用域与闭包1.什么是闭包(Closure)?闭包有什么应用场景和潜在问题?2.解释 JavaScript 的作用域链(Scope Chain) 二、原型与继承3.原型链是什么?如何实现继承&a…...
让AI看见世界:MCP协议与服务器的工作原理
让AI看见世界:MCP协议与服务器的工作原理 MCP(Model Context Protocol)是一种创新的通信协议,旨在让大型语言模型能够安全、高效地与外部资源进行交互。在AI技术快速发展的今天,MCP正成为连接AI与现实世界的重要桥梁。…...
【JavaSE】绘图与事件入门学习笔记
-Java绘图坐标体系 坐标体系-介绍 坐标原点位于左上角,以像素为单位。 在Java坐标系中,第一个是x坐标,表示当前位置为水平方向,距离坐标原点x个像素;第二个是y坐标,表示当前位置为垂直方向,距离坐标原点y个像素。 坐标体系-像素 …...
2025季度云服务器排行榜
在全球云服务器市场,各厂商的排名和地位并非一成不变,而是由其独特的优势、战略布局和市场适应性共同决定的。以下是根据2025年市场趋势,对主要云服务器厂商在排行榜中占据重要位置的原因和优势进行深度分析: 一、全球“三巨头”…...
用机器学习破解新能源领域的“弃风”难题
音乐发烧友深有体会,玩音乐的本质就是玩电网。火电声音偏暖,水电偏冷,风电偏空旷。至于太阳能发的电,则略显朦胧和单薄。 不知你是否有感觉,近两年家里的音响声音越来越冷,听起来越来越单薄? —…...
