Spring Cloud Stream实践
概述
不同中间件,有各自的使用方法,代码也不一样。
可以使用Spring Cloud Stream解耦,切换中间件时,不需要修改代码。实现方式为使用绑定层,绑定层对生产者和消费者提供统一的编码方式,需要连接不同的中间件时,绑定层使用不同的绑定器即可,也就是把切换中间件需要做相应的修改工作交给绑定层来做。
本文的操作是在 微服务调用链路追踪 的基础上进行。
环境说明
jdk1.8
maven3.6.3
mysql8
spring cloud2021.0.8
spring boot2.7.12
idea2022
rabbitmq3.12.4
步骤
消息生产者
创建子模块stream_producer
添加依赖
<dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency></dependencies>
刷新依赖
配置application.yml
server:port: 7001
spring:application:name: stream_producerrabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:output:destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称binders:defaultRabbit:type: rabbit #配置默认的绑定器为rabbit
查看Source.class
源码
编写生产者代码,发送一条消息("hello world")到rabbitmq的my-default exchange中
package org.example.stream;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;@EnableBinding(Source.class)
@SpringBootApplication
public class StreamProductApplication implements CommandLineRunner {@Autowiredprivate MessageChannel output;@Overridepublic void run(String... args) throws Exception {//发送消息// messageBuilder 工具类,创建消息output.send(MessageBuilder.withPayload("hello world").build());}public static void main(String[] args) {SpringApplication.run(StreamProductApplication.class, args);}}
查看rabbitmq web UI
http://localhost:15672/
看到Exchanges中还没有my-default
运行StreamProductApplication
刷新rabbitmq Web UI,看到了my-dafault的exchange
消息消费者
创建子模块stream_consumer
添加依赖
<dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency></dependencies>
配置application.yml
server:port: 7002
spring:application:name: stream_consumerrabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:input: #内置获取消息的通道,从destination配置值的exchange中获取信息destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称binders:defaultRabbit:type: rabbit #配置默认的绑定器为rabbit
查看内置通道名称为input
编写消息消费者启动类,在启动类监听接收消息
package org.example.stream;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;@SpringBootApplication
@EnableBinding(Sink.class)
public class StreamConsumerApplication {@StreamListener(Sink.INPUT)public void input(Message<String> message){System.out.println("监听收到:" + message.getPayload());}public static void main(String[] args) {SpringApplication.run(StreamConsumerApplication.class, args);}
}
运行stream_consumer
消费者服务,监听消息
运行stream_producer
生产者服务,发送消息
查看消费者服务控制台日志,接收到了消息
优化代码
之前把生产和消费的消息都写在启动类中了,代码耦合高。
优化思路是把不同功能的代码分开放。
消息生产者
stream_producer 代码结构如下
package org.example.stream.producer;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;/*** 向中间件发送数据*/
@Component
@EnableBinding(Source.class)
public class MessageSender {@Autowiredprivate MessageChannel output;//通道//发送消息public void send(Object obj){output.send(MessageBuilder.withPayload(obj).build());}
}
修改启动类
package org.example.stream;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;@SpringBootApplication
public class StreamProductApplication {public static void main(String[] args) {SpringApplication.run(StreamProductApplication.class, args);}}
pom.xml添加junit依赖
<dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope>
</dependency>
刷新依赖
编写测试类
在stream_producerm模块的src/test目录下,新建org.example.stream
包,再建出ProducerTest类,代码如下
package org.example.stream;import org.example.stream.producer.MessageSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ProducerTest {@Autowiredprivate MessageSender messageSender;//注入发送消息工具类@Testpublic void testSend(){messageSender.send("hello world");}
}
消息消费者
stream_consumer代码结构如下
添加MessageListener类获取消息
package org.example.stream.consumer;import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;@Component
@EnableBinding(Sink.class)
public class MessageListener {// 监听binding中的信息@StreamListener(Sink.INPUT)public void input(String message){System.out.println("获取信息:" + message);}
}
修改启动类
package org.example.stream;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;@SpringBootApplication
public class StreamConsumerApplication {public static void main(String[] args) {SpringApplication.run(StreamConsumerApplication.class, args);}}
启动consumer接收消息
执行producer单元测试类ProducerTest的testSend()方法,发送消息
查看consumer控制台输出,接收到信息了
代码解耦后,同样能成功生产消息和消费消息。
自定义消息通道
此前使用默认的消息通道output
和input。
也可以自己定义消息通道,例如:myoutput
和myinput
消息生产者
在org.example.stream
包下新建channel
包,在channel
包下新建MyProcessor接口类
package org.example.stream.channel;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;/*** 自定义的消息通道*/
public interface MyProcessor {/*** 消息生产这的配置*/String MYOUTPUT = "myoutput";@Output("myoutput")MessageChannel myoutput();/*** 消息消费者的配置*/String MYINPUT = "myinput";@Input("myinput")SubscribableChannel myinput();
}
修改MessageSender
类
package org.example.stream.producer;import org.example.stream.channel.MyProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;/*** 向中间件发送数据*/
@Component
@EnableBinding(MyProcessor.class)
public class MessageSender {@Autowiredprivate MessageChannel myoutput;//通道//发送消息public void send(Object obj){myoutput.send(MessageBuilder.withPayload(obj).build());}
}
修改application.yml
cloud:stream:bindings:output:destination: my-default #指定消息发送的目的地myoutput:destination: custom-output
消息消费者
在stream_consumer服务的org.example.stream
包下新建channel
包,在channel
包下新建MyProcessor接口类
package org.example.stream.channel;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;/*** 自定义的消息通道*/
public interface MyProcessor {/*** 消息生产者的配置*/String MYOUTPUT = "myoutput";@Output("myoutput")MessageChannel myoutput();/*** 消息消费者的配置*/String MYINPUT = "myinput";@Input("myinput")SubscribableChannel myinput();
}
修改MessageListener
类
package org.example.stream.stream;import org.example.stream.channel.MyProcessor;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;@Component
@EnableBinding(MyProcessor.class)
public class MessageListener {// 监听binding中的信息@StreamListener(MyProcessor.MYINPUT)public void input(String message){System.out.println("获取信息:" + message);}
}
修改application.yml配置
cloud:stream:bindings:input: #内置获取消息的通道,从destination配置值的exchange中获取信息destination: my-default #指定消息发送的目的地myinput:destination: custom-output
测试
启动stream_consumer
运行单元测试的testSend()方法生产消息
查看stream_consumer
控制台,能看到生产的消息,如下
获取信息:hello world
消息分组
采用复制配置方式运行两个消费者
启动第一个消费者(端口为7002)
修改端口为7003,copy configuration,再启动另一个消费者
执行生产者单元测试生产消息,看到两个消费者都接收到了信息
说明:如果有两个消费者,生产一条消息后,两个消费者均能收到信息。
但当我们发送一条消息只需要其中一个消费者消费消息时,这时候就需要用到消息分组,发送一条消息消费者组内只有一个消费者消费到。
我们只需要在服务消费者端设置spring.cloud.stream.bindings.input.group
属性即可
重启两个消费者
修改端口号为7002,重新启动第一个消费者
修改端口号为7003,重新启动第二个消费者
生产者生产一条消息
查看消费者接收消息情况,只有一个消费者接收到信息。
消息分区
消息分区就是实现特定消息只往特定机器发送。
修改生产者配置
cloud:stream:bindings:output:destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称myoutput:destination: custom-outputproducer:partition-key-expression: payload #分区关键字 可以是对象中的id,或对象partition-count: 2 #分区数量
修改消费者1的application.yml配置
server:port: 7002
spring:application:name: stream_consumerrabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:input: #内置获取消息的通道,从destination配置值的exchange中获取信息destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称myinput:destination: custom-outputgroup: group1 #消息分组,有多个消费者时,只有一个消费者接收到信息consumer:partitioned: true #开启分区支持binders:defaultRabbit:type: rabbit #配置默认的绑定器为rabbitinstance-count: 2 #消费者总数instance-index: 0 #当前消费者的索引
启动消费者1
修改消费者2的配置
server:port: 7003
spring:application:name: stream_consumerrabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:input: #内置获取消息的通道,从destination配置值的exchange中获取信息destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称myinput:destination: custom-outputgroup: group1 #消息分组,有多个消费者时,只有一个消费者接收到信息consumer:partitioned: true #开启分区支持binders:defaultRabbit:type: rabbit #配置默认的绑定器为rabbitinstance-count: 2 #消费者总数instance-index: 1 #当前消费者的索引
修改端口号为7003,当前消费者的索引instance-index
的值修改为1
启动消费者2
生产者发送消息,看到只有Application(2)接收到消息
再用生产者发送一次消息,也是Application(2)接收到消息
说明实现了消息分区
也可以更改发送的数据,看是否能发送到不同消费者
修改生产者,发送数据由hello world
变为hello world1
,同时发送5次
public void testSend(){for (int i = 0; i < 5; i++) {messageSender.send("hello world1");}}
看到hello world1
全部被Application消费
所以消息分区是根据发送的消息不同,发送到不同消费者中。
完成!enjoy it!
相关文章:

Spring Cloud Stream实践
概述 不同中间件,有各自的使用方法,代码也不一样。 可以使用Spring Cloud Stream解耦,切换中间件时,不需要修改代码。实现方式为使用绑定层,绑定层对生产者和消费者提供统一的编码方式,需要连接不同的中间…...

高精度算法【Java】(待更新中~)
高进度加法 在Java中可以使用BigInteger进行高精度计算,除此也可以仿照竖式相加的计算原理进行计算。 BigInteger 提供所有 Java 的基本整数操作符的对应物,并提供 java.lang.Math 的所有相关方法。另外,BigInteger 还提供以下运算࿱…...
说一说HTTP1.0、1.1、2.0版本区别和优化
说一说HTTP1.0、1.1、2.0版本区别和优化 HTTP(Hypertext Transfer Protocol)是一种用于传输超文本的应用层协议。 在不同的版本中,HTTP经历了一系列的演进和改进,主要包括HTTP 1.0、HTTP 1.1和HTTP 2.0。 下面详细解释它们之间…...

51.Sentinel微服务保护
目录 (1)初识Sentinel。 (1.1)雪崩问题及解决方案。 (1.1.1)雪崩问题。 (1.1.2)解决雪崩问题的四种方式。 (1.1.3)总结。 (1.2)…...

【Java 进阶篇】Ajax 实现——JQuery 实现方式 `ajax()`
嗨,亲爱的读者们!欢迎来到这篇关于使用 jQuery 中的 ajax() 方法进行 Ajax 请求的博客。在前端开发中,jQuery 提供了简便而强大的工具,其中 ajax() 方法为我们处理异步请求提供了便捷的解决方案。无需手动创建 XMLHttpRequest 对象…...

I.MX6ULL开发笔记(一)——环境搭建、镜像烧录、网络连接
本系列为使用野火IMX6ULL开发的学习笔记,使用的开发板为如下: 具有的硬件资源有如下: 文章目录 一、环境搭建Win11安装WSL安装串口驱动安装串口工具安装Ubuntu与windows文件互传 二、镜像烧录修改串口终端登录前信息 三、fire-config工具配…...

Javaweb之Ajax的详细解析
1.1 Ajax介绍 1.1.1 Ajax概述 我们前端页面中的数据,如下图所示的表格中的学生信息,应该来自于后台,那么我们的后台和前端是互不影响的2个程序,那么我们前端应该如何从后台获取数据呢?因为是2个程序,所以…...

java基于RestTemplate的微服务发起http请求
实现的效果...

django理解02 前后端分离中的问题
前后端分离相对于传统方式的问题 前后端数据交换的问题跨域问题 页面js往自身程序(django服务)发送请求,这是浏览器默认接受响应 而请求其它地方是浏览器认为存在潜在危险。自动隔离请求!!! 跨域问题的解决…...

设计模式-迭代器模式-笔记
动机(Motivaton) 在软件构建过程中,集合对象内部结构常常变化各异。但对于这些集合对象,我们呢希望在不暴露其内部结构的同时,可以让外部客户代码透明地访问其中包含的元素;同时这种“透明遍历”也为“同一…...

【数据结构】C语言实现队列
目录 前言 1. 队列 1.1 队列的概念 1.2 队列的结构 2. 队列的实现 2.1 队列的定义 2.2 队列的初始化 2.3 入队 2.4 出队 2.5 获取队头元素 2.6 获取队尾元素 2.7 判断空队列 2.8 队列的销毁 3. 队列完整源码 Queue.h Queue.c 🎈个人主页:…...

牛客——OR36 链表的回文结构(C语言,配图,快慢指针)
目录 思路一:链表翻转 思路二:快慢指针,分别从头和尾间开始比较 本题是没有对C的支持的,但因为CPP支持C,所以这里就用C写了,可以面向更多用户 链表的回文结构_牛客题霸_牛客网 (nowcoder.com) 思路一&am…...
Docker build 技巧 —— 筑梦之路
实现目标 更快的构建速度 更小的Docker镜像大小 更少的Docker镜像层 充分利用镜像缓存 增加Dockerfile可读性 让Docker容器使用起来更简单 如何实现 编写.dockerignore文件 容器只运行单个应用 将多个RUN指令合并为一个 基础镜像的标签不要用latest 每个RUN指令后删除…...

2 Redis的高级数据结构
1、Bitmaps 首先,最经典的应用场景就是用户日活的统计,比如说签到等。 字段串:“dbydc”,根据对应的ASCII表,最后可以得到对应的二进制,如图所示 一个字符占8位(bit),…...
Hive默认分割符、存储格式与数据压缩
目录 1、Hive默认分割符2、Hive存储格式3、Hive数据压缩 1、Hive默认分割符 Hive创建表时指定的行受限(ROW FORMAT)配置标准HQL为: ... ROW FORMAT DELIMITED FIELDS TERMINATED BY \u0001 COLLECTION ITEMS TERMINATED BY , MAP KEYS TERMI…...
update_engine-FilesystemVerifierAction和PostinstallRunnerAction
在介绍完了DownloadAction之后,还剩下FilesystemVerifierAction和PostinstallRunnerAction,下面开始对其进行分析。 FilesystemVerifierAction 在数据下载完成后,在DownloadAction中会切换到FilesystemVerifierAction void DownloadAction:…...

深度学习乳腺癌分类 计算机竞赛
文章目录 1 前言2 前言3 数据集3.1 良性样本3.2 病变样本 4 开发环境5 代码实现5.1 实现流程5.2 部分代码实现5.2.1 导入库5.2.2 图像加载5.2.3 标记5.2.4 分组5.2.5 构建模型训练 6 分析指标6.1 精度,召回率和F1度量6.2 混淆矩阵 7 结果和结论8 最后 1 前言 &…...
【Python百宝箱】掌握Python Web开发三剑客:Flask、Django、FastAPI一网打尽
前言 在当今互联网时代,Web应用的开发变得愈发重要和复杂。选择一个合适的Web框架,掌握安全性与认证、数据库与ORM库、前端框架与交互、测试与调试工具等关键知识点,是每个Web开发者都必须面对的挑战。本文将带你深入了解三个流行的Python W…...
【人工智能时代的刑法体系与责任主体概述】
第一节:引言 随着科技的快速发展,人工智能 (Artificial Intelligence, AI) 正日益成为我们生活中不可或缺的一部分。从自动驾驶汽车到语音助手,从智能家居到金融机器人,AI 的广泛应用正不断改变着我们的生活方式和社会结构。然而…...

透视maven打包编译正常,intellj idea编译失败问题的本质
前言 maven多模块类型的项目,在Java的中大型应用中非常常见, 在 module 很多的情况,经常会出现各种各样的编辑依赖错误问题,今天记录一种比较常见的 case : A 子模块依赖 B 子模块,在 Terminal 上终端上 …...

JavaScript 中的 ES|QL:利用 Apache Arrow 工具
作者:来自 Elastic Jeffrey Rengifo 学习如何将 ES|QL 与 JavaScript 的 Apache Arrow 客户端工具一起使用。 想获得 Elastic 认证吗?了解下一期 Elasticsearch Engineer 培训的时间吧! Elasticsearch 拥有众多新功能,助你为自己…...

04-初识css
一、css样式引入 1.1.内部样式 <div style"width: 100px;"></div>1.2.外部样式 1.2.1.外部样式1 <style>.aa {width: 100px;} </style> <div class"aa"></div>1.2.2.外部样式2 <!-- rel内表面引入的是style样…...
C++.OpenGL (10/64)基础光照(Basic Lighting)
基础光照(Basic Lighting) 冯氏光照模型(Phong Lighting Model) #mermaid-svg-GLdskXwWINxNGHso {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-GLdskXwWINxNGHso .error-icon{fill:#552222;}#mermaid-svg-GLd…...
AI编程--插件对比分析:CodeRider、GitHub Copilot及其他
AI编程插件对比分析:CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展,AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者,分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...
【HTTP三个基础问题】
面试官您好!HTTP是超文本传输协议,是互联网上客户端和服务器之间传输超文本数据(比如文字、图片、音频、视频等)的核心协议,当前互联网应用最广泛的版本是HTTP1.1,它基于经典的C/S模型,也就是客…...

【JavaWeb】Docker项目部署
引言 之前学习了Linux操作系统的常见命令,在Linux上安装软件,以及如何在Linux上部署一个单体项目,大多数同学都会有相同的感受,那就是麻烦。 核心体现在三点: 命令太多了,记不住 软件安装包名字复杂&…...

Unsafe Fileupload篇补充-木马的详细教程与木马分享(中国蚁剑方式)
在之前的皮卡丘靶场第九期Unsafe Fileupload篇中我们学习了木马的原理并且学了一个简单的木马文件 本期内容是为了更好的为大家解释木马(服务器方面的)的原理,连接,以及各种木马及连接工具的分享 文件木马:https://w…...
MySQL 部分重点知识篇
一、数据库对象 1. 主键 定义 :主键是用于唯一标识表中每一行记录的字段或字段组合。它具有唯一性和非空性特点。 作用 :确保数据的完整性,便于数据的查询和管理。 示例 :在学生信息表中,学号可以作为主键ÿ…...
BLEU评分:机器翻译质量评估的黄金标准
BLEU评分:机器翻译质量评估的黄金标准 1. 引言 在自然语言处理(NLP)领域,衡量一个机器翻译模型的性能至关重要。BLEU (Bilingual Evaluation Understudy) 作为一种自动化评估指标,自2002年由IBM的Kishore Papineni等人提出以来,…...
在树莓派上添加音频输入设备的几种方法
在树莓派上添加音频输入设备可以通过以下步骤完成,具体方法取决于设备类型(如USB麦克风、3.5mm接口麦克风或HDMI音频输入)。以下是详细指南: 1. 连接音频输入设备 USB麦克风/声卡:直接插入树莓派的USB接口。3.5mm麦克…...