SpringCloud之Stream框架集成RocketMQ消息中间件
Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于 Spring Boot 来创建独立的、可用于生产的 Spring 应用程序。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。简单的说,Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架。
目前 Spring Cloud Stream只支持 RabbitMQ 和 Kafka 的自动化配置。
Spring Cloud Stream 提供了 Binder (负责与消息中间件进行交互),我们则通过 inputs 或者 outputs 这样的消息通道与 Binder 进行交互。Binder 绑定器是 Spring cloud Stream 中一个非常重要的概念,实现了应用程序和消息中间件之间的隔离,同时我们也可以通过应用程序实现,消息中间件之间的通信。在我们的项目的可以继承多种绑定器,我们可以根据不同特性的消息使用不同的消息中间件。Spring Cloud Stream 为我们实现了 RabbitMQ 和Kafka 的绑定器。如果你想使用其他的消息中间件需要自己去实现绑定器接口.
在 SpringCloudStream 3.x 版本前是通过 @StreamListener 和 @EnableBinding 进行消息的发送和消费的,springCloudStream 3.x 版本后 @StreamListener 和 @EnableBinding 都打上了@Deprecated 注解,不建议使用了;后续的版本更新中替换成函数式的方式实现。
既然通过四大函数式接口的方式替换了注解的方式 那么该如何进行绑定呢?
通过 spring.cloud.stream.function.definition:名称的方式进行绑定 公开 topic。
不管是创建 Consumer 还是 Supplier 或者是 Function Stream都会将其的 方法名称 进行 一个 topic拆封 和 绑定 假设 创建了一个 Consumer< String > myTopic 的方法,Stream 会将其 拆分成 In 和 out 两个通道:
- 输入 - + -in- + < index >
例如:myTopic-in-0
- 输出 - + -out- + < index >
例如:myTopic-out-0
注意:这里的 functionName需要和代码声明的函数名称还有spring.cloud.stream.function.definition下的名称保持一致(后面还会在项目实战中展示一遍)
代码示例:
----------------------------------项目实战--------------------------------------
看下我们项目中的配置,配置文件是放在nacos上面的:
消息发送:
/*** @ClassName MessageParamParentDto* @Author zxd* @Version 1.0.0* @Description TODO* @CreateTime 2023/6/13 11:27 - 星期二*/ @Data public class MessageParamParentDto implements Serializable {private static final long serialVersionUID = 7963819193258646924L;private String routeUrl;}
--------------------------------------------------------------------------------------------------------------
/*** @ClassName MessageParamDto* @Author kch* @Version 1.0.0* @Description 消息队列接收系统消息实体对象* @CreateTime 2022/9/18 15:16 - 星期日*/ @Data public class MessageParamDto extends MessageParamParentDto implements Serializable {private static final long serialVersionUID = 7111819193258646924L;/*** 消息模板code*/@NotNull(message = "消息模板不能为空")private String templateCode;/*** 可变参数,必传字段* 该参数匹配模板字符串中的变量和URL中的变量,所以模板和URL中的变量名不能重复*/@NotNull(message = "参数不能为空")private Map<String, String> params;/*** 消息详情跳转路径参数(没有不传,有参数按照URL参数拼接规范拼接,不加?号)* 例如:userId=1&userCode=test*/ // private String routerParams;/*** 消息操作跳转路径参数(没有不传,有参数按照URL参数拼接规范拼接,不加?号)* 例如:userId=1&userCode=test*/ // private String contentPathParams;/*** 接收者租户*/@NotNull(message = "接收者租户ID不能为空")private Long tenantId;/*** 接收人*/@NotNull(message = "接收者用户ID不能为空")@Size(min = 1, message = "接收者用户ID不能为空")private List<RecipientUser> recipientUsers;@Valid@Data@AllArgsConstructor@NoArgsConstructorpublic static class RecipientUser implements Serializable {/*** 接收人id*/@NotNull(message = "接收者用户ID不能为空")private Long recipientId;/*** 接收人手机号*/@Pattern(regexp = RegexPool.MOBILE, message = "手机格式错误")private String phone;}}
-----------------------------------------------------------------------------------------------------------
/*** @ClassName MessageMqBinding* @Author zpp* @Version 1.0.0* @Description TODO* @CreateTime 2023/2/10 15:37 - 星期五*/ public interface MessageMqBinding {/*** 系统消息生产者交换机*/String MESSAGE_MQ_OUTPUT = "dyzsMessageProvider-out-0"; }
----------------------------------------------------------------------------------------
@Slf4j @RestController @RequestMapping("/mq") public class MessageMqController {@Resourceprivate StreamBridge streamBridge;/*** @param :* @Author zpp* @Description 发送系统消息* @Date 2023/2/10 15:27* @Return com.zysy.common.api.entity.Result<java.lang.Boolean>*/@PostMappingpublic Result<Boolean> sendMessage(@RequestBody @Validated MessageParamDto dto) {log.info("接收到系统消息发送请求:{}", JSONObject.toJSONString(dto));MessageMQParamDto paramDto = new MessageMQParamDto(dto);paramDto.setCreateBy(UserUtil.getUserId());paramDto.setCreateDept(UserUtil.getDeptId());List<MessageMQParamDto> paramDtoList = new ArrayList<>();paramDtoList.add(paramDto);MessageBuilder builder = MessageBuilder.withPayload(paramDtoList).setHeader("Content-Type", "application/json");return Result.success(streamBridge.send(MessageMqBinding.MESSAGE_MQ_OUTPUT, builder.build()));}
------------------------------------------------------------------------------------------------------
消息消费:
下图是在代码中配置的消息消费者,这里的函数名称要和上图中的function.definition配置的名称一样;
相关文章:

SpringCloud之Stream框架集成RocketMQ消息中间件
Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于 Spring Boot 来创建独立的、可用于生产的 Spring 应用程序。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三…...

与创新者同行!Apache Doris 首届线下峰会即将开启,最新议程公开!|即刻预约
点击此处 即刻报名 Doris Summit Asia 2023 回顾人类的发展史,地球起源于 46 亿年前的原始星云、地球生命最初出现于 35 亿年前的原始海洋、人类物种诞生于数百万年前,而人类生产力的真正提升源于十八世纪六十年代的工业革命,自此以后&#…...

vue解决:Parsing error: No Babel config file detected for ....
报错信息 Parsing error: No Babel config file detected for C:\Users\Admin\Desktop\shabi\work\src\App.vue. Either disable config file checking with requireConfigFile: false, or configure Babel so that it can find the config files. 分析错误:没有检测…...

算法题:K 次取反后最大化的数组和(典型的贪心算法问题)
这道题没有看题解,直接提交,成绩超越99.5%,说明思路是优的。就是考虑的情况里面弯弯绕比较多,需要考虑全面一点。(本题完整题目附在了最后面) 具体思路如下: 1、首先排序,然后从最…...
Go语言中向[]byte数组中增加一个元素
要向http.Request的body中添加一个键值对,可以先将其转换为一个map,然后对其进行修改,最后再将其转回为byte数组。 以下是一个示例代码: import ("net/http""io/ioutil""encoding/json" )type Re…...

CSS 布局案例: 2行、多行每行格数不定,最后一列对齐
布局期望的效果如下: 第二行最后一格与第一行最后一格对齐。每行格数不定。自动拉伸填充整个宽度 实现: 一开始打算用display:flex, 自动分散,但是第二行对齐第一行最后一格控制不了。 使用grid fr均分单位控制。 <!DOCTYPE…...

数据结构--算法、数据结构的基本概念
📕参考:王道 一、算法的基本概念 1.程序数据结构算法 2.算法的特性 (1)有穷性 执行有穷步之后结束,且每一步都可在有穷时间内完成。 (2)确定性 (3)可行性 可通过已经实…...

Edge浏览器下载文件被保存为 .crdownload 文件的问题小记
问题 近期使用Edge浏览器下载文件时,文件都被保存为 .crdownload 格式的文件了,不确定从哪个版本开始的。除非下载未完成导致文件不完整,否则不会被保存为 .crdownload 格式的文件;实际上文件已完成了下载,且手工修改…...

6-10 单链表分段逆转 分数 15
void K_Reverse( List L, int K ) { //此题已经默认size > K 因为当size < K时 反转后将不再符合链表的定义//求出表中元素个数int size 0;for (List cur L->Next; cur ! NULL; cur cur->Next)size; List prv, cur, next, first, head L;//共需要反转 si…...

【单片机】17-温度传感器DS18B20
1.DS18B20相关背景知识 1.温度传感器 (1)测温度的方式:物理(汞柱,气压),电子(金属电性能随温度变化) (2)早期:热敏电阻(模…...

力扣 -- 5. 最长回文子串
解题步骤: 参考代码: class Solution { public:string longestPalindrome(string s) {int ns.size();vector<vector<bool>> dp(n,vector<bool>(n));//最长回文串的起始位置int start0;//最长回文串的长度int len0;for(int in-1;i>…...

SpringCloud源码探析(十)-Web消息推送
1.概述 消息推送在日常使用中的场景比较多,比如有人点赞了我的博客或者关注了我,这时我就会收到一条推送消息,以此来吸引我点击或者打开应用。消息推送的方式主要分为两种:web消息推送和移动端消息推送。它将所要发送的信息&…...
Vue、React和小程序中的组件通信:父传子和子传父
在前端开发中,组件化是一种常见的开发模式,它可以将复杂的用户界面拆分成多个可重用的组件。在Vue、React和小程序中,组件之间的数据和事件传递是非常关键的,其中父传子和子传父是常见的通信方式。本文将介绍在Vue、React和小程序…...

安卓玩机----展讯芯片机型解锁 读写分区工具 操作步骤解析
国内机型大都使用高通和MTK芯片。展讯芯片使用的较少。相对来说高通和mtk机型解锁以及读取分区工具较多。展讯的几乎没有。目前有大佬开发出了一款展讯芯片解锁 与读写分区工具.开源的tools 官方分享说明: 是一款专为 Windows 计算机设计的免费、用户友好的工具&am…...

微软放大招!Bing支持DALL-E3,免费AI绘画等你来体验!
最近 OpenAI 发布了DALL-E3模型,出图效果和Midjourney不相上下,不过要使用它有些门槛,必须是 ChatGPT Plus 账户,而且还要排队,怎么等都等不到,搞得大家都比较焦虑。 不过现在微软在Bing上也支持 DALL-E3 …...
tp5访问的时候必须加index.php,TP5配置隐藏入口index.php文件
PS:这里说的入口文件指的是public/index.php,配置文件就在这个目录下 可以去掉URL地址里面的入口文件index.php,但是需要额外配置WEB服务器的重写规则。 以Apache为例,需要在入口文件的同级添加.htaccess文件(官方默认自带了该文件)&#x…...
16k面试中的10个问题
你好,我是田哥 节前,有位朋友跟我反馈面试中一些问题,这位朋友的基本情况: 坐标:上海,年限:3年不到,期望薪资;16k 下面我们来看看具体问题: 01:请…...

STM32单片机入门学习(六)-光敏传感器控制LED
光敏传感器模块和LED接线 LED负极接B12,正极接VCC 光敏传感模块一DO端接B13,GND接GND,VCC接VCC,AO不接。 如图: 主程序代码:main.c #include "stm32f10x.h" #include "Delay.h" //delay函数所在头文件 #include …...

MFC 鼠标悬停提示框
MFC 鼠标悬停提示框 运行效果 在MFC窗口中添加一个控件 工具栏中拖拽List Box到MFC窗口给List Box添加变量 CListBox m_listbox 增加成员变量 CWnd* m_tip_parent_wnd; CToolTipCtrl m_tip;给m_listbox创建提示框 void create_tip_window(CWnd* tip_wnd, CToolTipCtrl* ti…...
大数据学习,涉及哪些技术?
学习大数据需要涉及多种技术和概念,因为大数据领域非常广泛,涵盖了数据的采集、存储、处理、分析和可视化等多个方面。以下是学习大数据时需要考虑的一些关键技术和概念: 1、数据采集和存储: 数据库管理系统(DBMS&am…...
vscode里如何用git
打开vs终端执行如下: 1 初始化 Git 仓库(如果尚未初始化) git init 2 添加文件到 Git 仓库 git add . 3 使用 git commit 命令来提交你的更改。确保在提交时加上一个有用的消息。 git commit -m "备注信息" 4 …...
零门槛NAS搭建:WinNAS如何让普通电脑秒变私有云?
一、核心优势:专为Windows用户设计的极简NAS WinNAS由深圳耘想存储科技开发,是一款收费低廉但功能全面的Windows NAS工具,主打“无学习成本部署” 。与其他NAS软件相比,其优势在于: 无需硬件改造:将任意W…...

【网络安全产品大调研系列】2. 体验漏洞扫描
前言 2023 年漏洞扫描服务市场规模预计为 3.06(十亿美元)。漏洞扫描服务市场行业预计将从 2024 年的 3.48(十亿美元)增长到 2032 年的 9.54(十亿美元)。预测期内漏洞扫描服务市场 CAGR(增长率&…...

屋顶变身“发电站” ,中天合创屋面分布式光伏发电项目顺利并网!
5月28日,中天合创屋面分布式光伏发电项目顺利并网发电,该项目位于内蒙古自治区鄂尔多斯市乌审旗,项目利用中天合创聚乙烯、聚丙烯仓库屋面作为场地建设光伏电站,总装机容量为9.96MWp。 项目投运后,每年可节约标煤3670…...

涂鸦T5AI手搓语音、emoji、otto机器人从入门到实战
“🤖手搓TuyaAI语音指令 😍秒变表情包大师,让萌系Otto机器人🔥玩出智能新花样!开整!” 🤖 Otto机器人 → 直接点明主体 手搓TuyaAI语音 → 强调 自主编程/自定义 语音控制(TuyaAI…...

3-11单元格区域边界定位(End属性)学习笔记
返回一个Range 对象,只读。该对象代表包含源区域的区域上端下端左端右端的最后一个单元格。等同于按键 End 向上键(End(xlUp))、End向下键(End(xlDown))、End向左键(End(xlToLeft)End向右键(End(xlToRight)) 注意:它移动的位置必须是相连的有内容的单元格…...

使用 SymPy 进行向量和矩阵的高级操作
在科学计算和工程领域,向量和矩阵操作是解决问题的核心技能之一。Python 的 SymPy 库提供了强大的符号计算功能,能够高效地处理向量和矩阵的各种操作。本文将深入探讨如何使用 SymPy 进行向量和矩阵的创建、合并以及维度拓展等操作,并通过具体…...
Fabric V2.5 通用溯源系统——增加图片上传与下载功能
fabric-trace项目在发布一年后,部署量已突破1000次,为支持更多场景,现新增支持图片信息上链,本文对图片上传、下载功能代码进行梳理,包含智能合约、后端、前端部分。 一、智能合约修改 为了增加图片信息上链溯源,需要对底层数据结构进行修改,在此对智能合约中的农产品数…...

网站指纹识别
网站指纹识别 网站的最基本组成:服务器(操作系统)、中间件(web容器)、脚本语言、数据厍 为什么要了解这些?举个例子:发现了一个文件读取漏洞,我们需要读/etc/passwd,如…...
vue3 daterange正则踩坑
<el-form-item label"空置时间" prop"vacantTime"> <el-date-picker v-model"form.vacantTime" type"daterange" start-placeholder"开始日期" end-placeholder"结束日期" clearable :editable"fal…...