当前位置: 首页 > news >正文

SpringCloud之Stream框架集成RocketMQ消息中间件

        Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于 Spring Boot 来创建独立的、可用于生产的 Spring 应用程序。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。简单的说,Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架。

     目前 Spring Cloud Stream只支持 RabbitMQ 和 Kafka 的自动化配置。

     Spring Cloud Stream 提供了 Binder (负责与消息中间件进行交互),我们则通过 inputs 或者 outputs 这样的消息通道与 Binder 进行交互。Binder 绑定器是 Spring cloud Stream 中一个非常重要的概念,实现了应用程序和消息中间件之间的隔离,同时我们也可以通过应用程序实现,消息中间件之间的通信。在我们的项目的可以继承多种绑定器,我们可以根据不同特性的消息使用不同的消息中间件。Spring Cloud Stream 为我们实现了 RabbitMQ 和Kafka 的绑定器。如果你想使用其他的消息中间件需要自己去实现绑定器接口.

在 SpringCloudStream 3.x 版本前是通过 @StreamListener 和 @EnableBinding 进行消息的发送和消费的,springCloudStream 3.x 版本后 @StreamListener 和 @EnableBinding 都打上了@Deprecated 注解,不建议使用了;后续的版本更新中替换成函数式的方式实现。

既然通过四大函数式接口的方式替换了注解的方式 那么该如何进行绑定呢?

通过 spring.cloud.stream.function.definition:名称的方式进行绑定 公开 topic。

不管是创建 Consumer 还是 Supplier 或者是 Function Stream都会将其的 方法名称 进行 一个 topic拆封 和 绑定 假设 创建了一个 Consumer< String > myTopic 的方法,Stream 会将其 拆分成 In 和 out 两个通道:

  • 输入 - + -in- + < index >

     例如:myTopic-in-0

  • 输出 - + -out- + < index >

       例如:myTopic-out-0

注意:这里的 functionName需要和代码声明的函数名称还有spring.cloud.stream.function.definition下的名称保持一致(后面还会在项目实战中展示一遍)

代码示例:

----------------------------------项目实战--------------------------------------

看下我们项目中的配置,配置文件是放在nacos上面的:

消息发送:

/*** @ClassName MessageParamParentDto* @Author zxd* @Version 1.0.0* @Description TODO* @CreateTime 2023/6/13 11:27 - 星期二*/
@Data
public class MessageParamParentDto implements Serializable {private static final long serialVersionUID = 7963819193258646924L;private  String routeUrl;}

--------------------------------------------------------------------------------------------------------------

/*** @ClassName MessageParamDto* @Author kch* @Version 1.0.0* @Description 消息队列接收系统消息实体对象* @CreateTime 2022/9/18 15:16 - 星期日*/
@Data
public class MessageParamDto  extends MessageParamParentDto implements Serializable {private static final long serialVersionUID = 7111819193258646924L;/*** 消息模板code*/@NotNull(message = "消息模板不能为空")private String templateCode;/*** 可变参数,必传字段* 该参数匹配模板字符串中的变量和URL中的变量,所以模板和URL中的变量名不能重复*/@NotNull(message = "参数不能为空")private Map<String, String> params;/*** 消息详情跳转路径参数(没有不传,有参数按照URL参数拼接规范拼接,不加?号)* 例如:userId=1&userCode=test*/
//    private String routerParams;/*** 消息操作跳转路径参数(没有不传,有参数按照URL参数拼接规范拼接,不加?号)* 例如:userId=1&userCode=test*/
//    private String contentPathParams;/*** 接收者租户*/@NotNull(message = "接收者租户ID不能为空")private Long tenantId;/*** 接收人*/@NotNull(message = "接收者用户ID不能为空")@Size(min = 1, message = "接收者用户ID不能为空")private List<RecipientUser> recipientUsers;@Valid@Data@AllArgsConstructor@NoArgsConstructorpublic static class RecipientUser implements Serializable {/*** 接收人id*/@NotNull(message = "接收者用户ID不能为空")private Long recipientId;/*** 接收人手机号*/@Pattern(regexp = RegexPool.MOBILE, message = "手机格式错误")private String phone;}}

-----------------------------------------------------------------------------------------------------------

/*** @ClassName MessageMqBinding* @Author zpp* @Version 1.0.0* @Description TODO* @CreateTime 2023/2/10 15:37 - 星期五*/
public interface MessageMqBinding {/*** 系统消息生产者交换机*/String MESSAGE_MQ_OUTPUT = "dyzsMessageProvider-out-0";
}

----------------------------------------------------------------------------------------

@Slf4j
@RestController
@RequestMapping("/mq")
public class MessageMqController {@Resourceprivate StreamBridge streamBridge;/*** @param :* @Author zpp* @Description 发送系统消息* @Date 2023/2/10 15:27* @Return com.zysy.common.api.entity.Result<java.lang.Boolean>*/@PostMappingpublic Result<Boolean> sendMessage(@RequestBody @Validated MessageParamDto dto) {log.info("接收到系统消息发送请求:{}", JSONObject.toJSONString(dto));MessageMQParamDto paramDto = new MessageMQParamDto(dto);paramDto.setCreateBy(UserUtil.getUserId());paramDto.setCreateDept(UserUtil.getDeptId());List<MessageMQParamDto> paramDtoList = new ArrayList<>();paramDtoList.add(paramDto);MessageBuilder builder = MessageBuilder.withPayload(paramDtoList).setHeader("Content-Type", "application/json");return Result.success(streamBridge.send(MessageMqBinding.MESSAGE_MQ_OUTPUT, builder.build()));}

------------------------------------------------------------------------------------------------------

消息消费:

          下图是在代码中配置的消息消费者,这里的函数名称要和上图中的function.definition配置的名称一样;

相关文章:

SpringCloud之Stream框架集成RocketMQ消息中间件

Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于 Spring Boot 来创建独立的、可用于生产的 Spring 应用程序。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现&#xff0c;并引入了发布-订阅、消费组、分区这三…...

与创新者同行!Apache Doris 首届线下峰会即将开启,最新议程公开!|即刻预约

点击此处 即刻报名 Doris Summit Asia 2023 回顾人类的发展史&#xff0c;地球起源于 46 亿年前的原始星云、地球生命最初出现于 35 亿年前的原始海洋、人类物种诞生于数百万年前&#xff0c;而人类生产力的真正提升源于十八世纪六十年代的工业革命&#xff0c;自此以后&#…...

vue解决:Parsing error: No Babel config file detected for ....

报错信息 Parsing error: No Babel config file detected for C:\Users\Admin\Desktop\shabi\work\src\App.vue. Either disable config file checking with requireConfigFile: false, or configure Babel so that it can find the config files. 分析错误&#xff1a;没有检测…...

算法题:K 次取反后最大化的数组和(典型的贪心算法问题)

这道题没有看题解&#xff0c;直接提交&#xff0c;成绩超越99.5%&#xff0c;说明思路是优的。就是考虑的情况里面弯弯绕比较多&#xff0c;需要考虑全面一点。&#xff08;本题完整题目附在了最后面&#xff09; 具体思路如下&#xff1a; 1、首先排序&#xff0c;然后从最…...

Go语言中向[]byte数组中增加一个元素

要向http.Request的body中添加一个键值对&#xff0c;可以先将其转换为一个map&#xff0c;然后对其进行修改&#xff0c;最后再将其转回为byte数组。 以下是一个示例代码&#xff1a; import ("net/http""io/ioutil""encoding/json" )type Re…...

CSS 布局案例: 2行、多行每行格数不定,最后一列对齐

布局期望的效果如下&#xff1a; 第二行最后一格与第一行最后一格对齐。每行格数不定。自动拉伸填充整个宽度 实现&#xff1a; 一开始打算用display:flex&#xff0c; 自动分散&#xff0c;但是第二行对齐第一行最后一格控制不了。 使用grid fr均分单位控制。 <!DOCTYPE…...

数据结构--算法、数据结构的基本概念

&#x1f4d5;参考&#xff1a;王道 一、算法的基本概念 1.程序数据结构算法 2.算法的特性 &#xff08;1&#xff09;有穷性 执行有穷步之后结束&#xff0c;且每一步都可在有穷时间内完成。 &#xff08;2&#xff09;确定性 &#xff08;3&#xff09;可行性 可通过已经实…...

Edge浏览器下载文件被保存为 .crdownload 文件的问题小记

问题 近期使用Edge浏览器下载文件时&#xff0c;文件都被保存为 .crdownload 格式的文件了&#xff0c;不确定从哪个版本开始的。除非下载未完成导致文件不完整&#xff0c;否则不会被保存为 .crdownload 格式的文件&#xff1b;实际上文件已完成了下载&#xff0c;且手工修改…...

6-10 单链表分段逆转 分数 15

void K_Reverse( List L, int K ) { //此题已经默认size > K 因为当size < K时 反转后将不再符合链表的定义//求出表中元素个数int size 0;for (List cur L->Next; cur ! NULL; cur cur->Next)size; List prv, cur, next, first, head L;//共需要反转 si…...

【单片机】17-温度传感器DS18B20

1.DS18B20相关背景知识 1.温度传感器 &#xff08;1&#xff09;测温度的方式&#xff1a;物理&#xff08;汞柱&#xff0c;气压&#xff09;&#xff0c;电子&#xff08;金属电性能随温度变化&#xff09; &#xff08;2&#xff09;早期&#xff1a;热敏电阻&#xff08;模…...

力扣 -- 5. 最长回文子串

解题步骤&#xff1a; 参考代码&#xff1a; class Solution { public:string longestPalindrome(string s) {int ns.size();vector<vector<bool>> dp(n,vector<bool>(n));//最长回文串的起始位置int start0;//最长回文串的长度int len0;for(int in-1;i>…...

SpringCloud源码探析(十)-Web消息推送

1.概述 消息推送在日常使用中的场景比较多&#xff0c;比如有人点赞了我的博客或者关注了我&#xff0c;这时我就会收到一条推送消息&#xff0c;以此来吸引我点击或者打开应用。消息推送的方式主要分为两种&#xff1a;web消息推送和移动端消息推送。它将所要发送的信息&…...

Vue、React和小程序中的组件通信:父传子和子传父

在前端开发中&#xff0c;组件化是一种常见的开发模式&#xff0c;它可以将复杂的用户界面拆分成多个可重用的组件。在Vue、React和小程序中&#xff0c;组件之间的数据和事件传递是非常关键的&#xff0c;其中父传子和子传父是常见的通信方式。本文将介绍在Vue、React和小程序…...

安卓玩机----展讯芯片机型解锁 读写分区工具 操作步骤解析

国内机型大都使用高通和MTK芯片。展讯芯片使用的较少。相对来说高通和mtk机型解锁以及读取分区工具较多。展讯的几乎没有。目前有大佬开发出了一款展讯芯片解锁 与读写分区工具.开源的tools 官方分享说明&#xff1a; 是一款专为 Windows 计算机设计的免费、用户友好的工具&am…...

微软放大招!Bing支持DALL-E3,免费AI绘画等你来体验!

最近 OpenAI 发布了DALL-E3模型&#xff0c;出图效果和Midjourney不相上下&#xff0c;不过要使用它有些门槛&#xff0c;必须是 ChatGPT Plus 账户&#xff0c;而且还要排队&#xff0c;怎么等都等不到&#xff0c;搞得大家都比较焦虑。 不过现在微软在Bing上也支持 DALL-E3 …...

tp5访问的时候必须加index.php,TP5配置隐藏入口index.php文件

PS&#xff1a;这里说的入口文件指的是public/index.php,配置文件就在这个目录下 可以去掉URL地址里面的入口文件index.php&#xff0c;但是需要额外配置WEB服务器的重写规则。 以Apache为例&#xff0c;需要在入口文件的同级添加.htaccess文件(官方默认自带了该文件)&#x…...

16k面试中的10个问题

你好&#xff0c;我是田哥 节前&#xff0c;有位朋友跟我反馈面试中一些问题&#xff0c;这位朋友的基本情况&#xff1a; 坐标&#xff1a;上海&#xff0c;年限&#xff1a;3年不到&#xff0c;期望薪资&#xff1b;16k 下面我们来看看具体问题&#xff1a; 01&#xff1a;请…...

STM32单片机入门学习(六)-光敏传感器控制LED

光敏传感器模块和LED接线 LED负极接B12,正极接VCC 光敏传感模块一DO端接B13,GND接GND&#xff0c;VCC接VCC,AO不接。 如图&#xff1a; 主程序代码&#xff1a;main.c #include "stm32f10x.h" #include "Delay.h" //delay函数所在头文件 #include …...

MFC 鼠标悬停提示框

MFC 鼠标悬停提示框 运行效果 在MFC窗口中添加一个控件 工具栏中拖拽List Box到MFC窗口给List Box添加变量 CListBox m_listbox 增加成员变量 CWnd* m_tip_parent_wnd; CToolTipCtrl m_tip;给m_listbox创建提示框 void create_tip_window(CWnd* tip_wnd, CToolTipCtrl* ti…...

大数据学习,涉及哪些技术?

学习大数据需要涉及多种技术和概念&#xff0c;因为大数据领域非常广泛&#xff0c;涵盖了数据的采集、存储、处理、分析和可视化等多个方面。以下是学习大数据时需要考虑的一些关键技术和概念&#xff1a; 1、数据采集和存储&#xff1a; 数据库管理系统&#xff08;DBMS&am…...

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型

摘要 拍照搜题系统采用“三层管道&#xff08;多模态 OCR → 语义检索 → 答案渲染&#xff09;、两级检索&#xff08;倒排 BM25 向量 HNSW&#xff09;并以大语言模型兜底”的整体框架&#xff1a; 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后&#xff0c;分别用…...

未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?

编辑&#xff1a;陈萍萍的公主一点人工一点智能 未来机器人的大脑&#xff1a;如何用神经网络模拟器实现更智能的决策&#xff1f;RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战&#xff0c;在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…...

AI-调查研究-01-正念冥想有用吗?对健康的影响及科学指南

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; &#x1f680; AI篇持续更新中&#xff01;&#xff08;长期更新&#xff09; 目前2025年06月05日更新到&#xff1a; AI炼丹日志-28 - Aud…...

Chapter03-Authentication vulnerabilities

文章目录 1. 身份验证简介1.1 What is authentication1.2 difference between authentication and authorization1.3 身份验证机制失效的原因1.4 身份验证机制失效的影响 2. 基于登录功能的漏洞2.1 密码爆破2.2 用户名枚举2.3 有缺陷的暴力破解防护2.3.1 如果用户登录尝试失败次…...

日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする

日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする 1、前言(1)情况说明(2)工程师的信仰2、知识点(1) にする1,接续:名词+にする2,接续:疑问词+にする3,(A)は(B)にする。(2)復習:(1)复习句子(2)ために & ように(3)そう(4)にする3、…...

CMake基础:构建流程详解

目录 1.CMake构建过程的基本流程 2.CMake构建的具体步骤 2.1.创建构建目录 2.2.使用 CMake 生成构建文件 2.3.编译和构建 2.4.清理构建文件 2.5.重新配置和构建 3.跨平台构建示例 4.工具链与交叉编译 5.CMake构建后的项目结构解析 5.1.CMake构建后的目录结构 5.2.构…...

Java 加密常用的各种算法及其选择

在数字化时代&#xff0c;数据安全至关重要&#xff0c;Java 作为广泛应用的编程语言&#xff0c;提供了丰富的加密算法来保障数据的保密性、完整性和真实性。了解这些常用加密算法及其适用场景&#xff0c;有助于开发者在不同的业务需求中做出正确的选择。​ 一、对称加密算法…...

return this;返回的是谁

一个审批系统的示例来演示责任链模式的实现。假设公司需要处理不同金额的采购申请&#xff0c;不同级别的经理有不同的审批权限&#xff1a; // 抽象处理者&#xff1a;审批者 abstract class Approver {protected Approver successor; // 下一个处理者// 设置下一个处理者pub…...

基于Java+MySQL实现(GUI)客户管理系统

客户资料管理系统的设计与实现 第一章 需求分析 1.1 需求总体介绍 本项目为了方便维护客户信息为了方便维护客户信息&#xff0c;对客户进行统一管理&#xff0c;可以把所有客户信息录入系统&#xff0c;进行维护和统计功能。可通过文件的方式保存相关录入数据&#xff0c;对…...

在golang中如何将已安装的依赖降级处理,比如:将 go-ansible/v2@v2.2.0 更换为 go-ansible/@v1.1.7

在 Go 项目中降级 go-ansible 从 v2.2.0 到 v1.1.7 具体步骤&#xff1a; 第一步&#xff1a; 修改 go.mod 文件 // 原 v2 版本声明 require github.com/apenella/go-ansible/v2 v2.2.0 替换为&#xff1a; // 改为 v…...