spring-cloud-stream
系列文章目录
第一章 Java线程池技术应用
第二章 CountDownLatch和Semaphone的应用
第三章 Spring Cloud 简介
第四章 Spring Cloud Netflix 之 Eureka
第五章 Spring Cloud Netflix 之 Ribbon
第六章 Spring Cloud 之 OpenFeign
第七章 Spring Cloud 之 GateWay
第八章 Spring Cloud Netflix 之 Hystrix
第九章 代码管理gitlab 使用
第十章 SpringCloud Alibaba 之 Nacos discovery
第十一章 SpringCloud Alibaba 之 Nacos Config
第十二章 Spring Cloud Alibaba 之 Sentinel
第十三章 JWT
第十四章 RabbitMQ应用
第十五章 RabbitMQ 延迟队列
第十六章 spring-cloud-stream

文章目录
- 系列文章目录
- @[TOC](文章目录)
- 前言
- 1、stream设计思想
- 2、编码常用的注解
- 3、编码步骤
- 3.1、添加依赖
- 3.2、修改配置文件
- 3.3、生产
- 3.4、消费
- 3.5、延迟队列
- 3.5.1、修改配置文件
- 3.5.2、生产端
- 3.5.2、消息确认机制 消费端
- 总结
文章目录
- 系列文章目录
- @[TOC](文章目录)
- 前言
- 1、stream设计思想
- 2、编码常用的注解
- 3、编码步骤
- 3.1、添加依赖
- 3.2、修改配置文件
- 3.3、生产
- 3.4、消费
- 3.5、延迟队列
- 3.5.1、修改配置文件
- 3.5.2、生产端
- 3.5.2、消息确认机制 消费端
- 总结
前言
https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit
官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。应用程序通过inputs或者outputs来与Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。
SpringCloud stream通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
Stream让我们不再关注具体MQ的细节我们只需要用一种适配绑定的方式,自动的给我们在各种MQ内切换,总的来说Stream能够屏蔽底层消息中间件的差异、降低切换成本,是统一消息的编程模型。
1、stream设计思想


- Binder:很方便的连接中间件,屏蔽差异
- Channel:通道是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
- Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。
2、编码常用的注解

| 组成 | 说明 |
|---|---|
| Middleware | 中间件,目前只支持RabbitMQ和Kafka |
| Binder | Binder是应用与消息中间件之间的封装,目前实现了Kafka和RabbitMQ的Binder,通过BInder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现。 |
| @Input | 注解标识输入通道,通过该输入通道接收到的消息进入应用程序 |
| @Output | 注解标识输出通道,发布的消息将通过该通道离开应用程序 |
| @StreamListener | 监听队列,用于消费者的队列的消息接收 |
| @EnableBinding | 指信道channel和exchange绑定在一起 |
3、编码步骤
3.1、添加依赖
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
3.2、修改配置文件
server:port: 8088spring:cloud:stream:binders: #需要绑定的rabbitmq的服务信息defaultRabbit: #定义的名称,用于bidding整合type: rabbit #消息组件类型environment: #配置rabbimq连接环境spring:rabbitmq:host: localhost #rabbitmq 服务器的地址port: 5672 #rabbitmq 服务器端口username: tiger #rabbitmq 用户名password: tiger #rabbitmq 密码virtual-host: tiger_vh #虚拟路径bindings: #服务的整合处理saveOrderOutput: #这个是消息通道的名称 --->保存订单输出通道destination: exchange-saveOrder #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。content-type: application/json #设置消息的类型,本次为jsondefault-binder: defaultRabbitgroup: saveOrderGroup #分组saveOrderInput: #生产者绑定,这个是消息通道的名称---> 保存订单输入通道destination: exchange-saveOrder #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。content-type: application/json #设置消息的类型,本次为jsondefault-binder: defaultRabbitgroup: saveOrderGroup #分组
3.3、生产
/*** 订单消息输出通道处理器*/
@Component
public interface OrderOutputChannelProcesor {@Output("saveOrderOutput")MessageChannel saveOrderOutput();
}
@Slf4j
@EnableBinding(OrderOutputChannelProcesor.class)
public class OrderMessageProducer {@Autowired@Output("saveOrderOutput")private MessageChannel messageChannel;public void sentMsg(UserInfo userInfo){messageChannel.send(MessageBuilder.withPayload(userInfo).build());log.info("消息发送成功:" + userInfo);}
}
3.4、消费
/*** 订单消息输入通道处理器*/
@Component
public interface OrderInputChannelProcesor {@Input("saveOrderInput")SubscribableChannel saveOrderInput();
}
@Slf4j
@EnableBinding(OrderInputChannelProcesor.class)
public class OrderMessageConsumer {@StreamListener("saveOrderInput")public void receiveMsg(Message<UserInfo> userInfoMessage){log.info("接收消息成功:" + userInfoMessage.getPayload());}
}
3.5、延迟队列
安装延迟队列插件:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez
下载解压,到plugins目录,执行以下的命令:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
3.5.1、修改配置文件
server:port: 8088spring:cloud:stream:binders: #需要绑定的rabbitmq的服务信息defaultRabbit: #定义的名称,用于bidding整合type: rabbit #消息组件类型environment: #配置rabbimq连接环境spring:rabbitmq:host: localhost #rabbitmq 服务器的地址port: 5672 #rabbitmq 服务器端口username: tiger #rabbitmq 用户名password: tiger #rabbitmq 密码virtual-host: tiger_vh #虚拟路径bindings: #服务的整合处理saveOrderOutput: #这个是消息通道的名称 --->保存订单输出通道destination: exchange-saveOrder-delay #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。content-type: application/json #设置消息的类型,本次为jsondefault-binder: defaultRabbitgroup: saveOrderGroup #分组saveOrderInput: #生产者绑定,这个是消息通道的名称---> 保存订单输入通道destination: exchange-saveOrder-delay #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。content-type: application/json #设置消息的类型,本次为jsondefault-binder: defaultRabbitgroup: saveOrderGroup #分组rabbit:bindings: #服务的整合处理saveOrderOutput: #这个是消息通道的名称 --->保存订单输出通道producer:delayed-exchange: truesaveOrderInput:consumer:delayed-exchange: true
3.5.2、生产端
@Slf4j
@EnableBinding(OrderOutputChannelProcesor.class)
public class OrderMessageProducer {@Autowired@Output("saveOrderOutput")private MessageChannel messageChannel;public void sentMsg(UserInfo userInfo){messageChannel.send(MessageBuilder.withPayload(userInfo).setHeader("x-delay", 5000).build());log.info("消息发送成功:" + userInfo);}
}
3.5.2、消息确认机制 消费端
rabbit:bindings: #服务的整合处理saveOrderInput:consumer:acknowledge-mode: MANUAL #手动确认
@StreamListener("saveOrderInput")
public void receiveMsg(Message<UserInfo> userInfoMessage){log.info("接收消息成功:" + userInfoMessage.getPayload());Channel channel = (Channel) userInfoMessage.getHeaders().get(AmqpHeaders.CHANNEL);Long delieverTag = (Long) userInfoMessage.getHeaders().get(AmqpHeaders.DELIVERY_TAG);/** deliveryTag:Channel的消息投递的唯一标识符。* multiple:是否否定应答多条消息。如果设置为true,则否定应答带指定deliveryTag的消息及该deliveryTag之前的多条消息;* 如果设置为false,则仅否定应答带指定deliveryTag的单条消息。* requeue:被否定应答的消息是否重入队列。如果设置为true,则消息重入队列;* 如果设置为false,则消息被丢弃或发送到死信Exchange。*/try {channel.basicAck(delieverTag,true);} catch (IOException e) {e.printStackTrace();}
}
定义交换机类型为direct
rabbit:bindings: #服务的整合处理saveOrderInput:consumer:bindingRoutingKey: orderRoutingKeybindQueue: trueexchangeType: directsaveOrderOutput:producer:routingKeyExpression: orderRoutingKeyexchangeType: direct
总结
spring-cloud-stream目前支持RabbitMQ和Kafka,与spring-cloud无缝集成,非常方便。
相关文章:
spring-cloud-stream
系列文章目录 第一章 Java线程池技术应用 第二章 CountDownLatch和Semaphone的应用 第三章 Spring Cloud 简介 第四章 Spring Cloud Netflix 之 Eureka 第五章 Spring Cloud Netflix 之 Ribbon 第六章 Spring Cloud 之 OpenFeign 第七章 Spring Cloud 之 GateWay 第八章 Sprin…...
2.0 熟悉CheatEngine修改器
Cheat Engine 一般简称为CE,它是一款功能强大的开源内存修改工具,其主要功能包括、内存扫描、十六进制编辑器、动态调试功能于一体,且该工具自身附带了脚本工具,可以用它很方便的生成自己的脚本窗体,CE工具可以帮助用户…...
微信小程序数据交互和缓存
目录 前言: 数据交互 1. 发起网络请求 2. WebSocket 2.1实时数据库 3. 微信支付 数据缓存 1. 页面级缓存 2. 内存级缓存 3. 数据缓存策略 优化用户体验 总结 前言: 在开发微信小程序时,数据交互和缓存是非常重要的方面。本文将介…...
kubernetes集群编排——k8s认证授权
pod绑定sa [rootk8s2 ~]# kubectl create sa admin [rootk8s2 secret]# vim pod5.yaml apiVersion: v1 kind: Pod metadata:name: mypod spec:serviceAccountName: admincontainers:- name: nginximage: nginxkubectl apply -f pod5.yamlkubectl get pod -o yaml 认证 [rootk8s…...
rabbitmq下载安装教程
1.首先需要下载erlang和rabbitmq安装包: 官网下载比较慢,通过网盘下载: 链接:https://pan.baidu.com/s/1fM2BrJqefyzUDZD4tfZLIg 提取码:5hsu 2.安装,傻瓜式安装就可以,可以自定义自己要安装的目…...
数据分析实战 | SVM算法——病例自动诊断分析
目录 一、数据分析及对象 二、目的及分析任务 三、方法及工具 四、数据读入 五、数据理解 六、数据准备 七、模型训练 八、模型应用及评价 一、数据分析及对象 CSV文件——“bc_data.csv” 数据集链接:https://download.csdn.net/download/m0_70452407/88…...
Splunk Connect for Kafka – Connecting Apache Kafka with Splunk
1: 背景: 1: splunk 有时要去拉取kafka 上的数据: 下面要用的有用的插件:Splunk Connect for Kafka 先说一下这个Splunk connect for kafka 是什么: What is Splunk Connect for Kafka? Spunk Connect for Kafka is a “sink connector” built on the Kafka Connect…...
Unity | Shader(着色器)和material(材质)的关系
一、前言 在上一篇文章中 【精选】Unity | Shader基础知识(什么是shader)_unity shader_菌菌巧乐兹的博客-CSDN博客 我们讲了什么是shader,今天我们讲一下shder和material的关系 二、在unity中shader的本质 unity中,shader就…...
Leetcode—69.x的平方根【简单】
2023每日刷题(二十七) Leetcode—69.x的平方根 直接法实现代码 int mySqrt(int x) {long long i 0;while(i * i < x) {i;}if(i * i > x) {return i - 1;}return i; }运行结果 二分法实现代码 int mySqrt(int x) {long long left 0, right (l…...
再探单例模式
再探单例模式 一:故事背景二:单例重点三:总结提升 一:故事背景 最近在进行单例模式的复习,今天进行一下对应的总结,分析一下各个设计模式。今天从最简单的单例模式开始。 二:单例重点 概念 一…...
Postman使用json提取器和正则表达式实现接口的关联
近期在复习Postman的基础知识,在小破站上跟着百里老师系统复习了一遍,也做了一些笔记,希望可以给大家一点点启发。 一)使用json提取器实现接口关联 实际项目场景,在财务信息页面,需要上传一个营业执照&…...
【11.10】现代密码学1——密码学发展史:密码学概述、安全服务、香农理论、现代密码学
密码学发展史 写在最前面密码学概述现代密码学量子密码学基本术语加解密的通信模型对称加密PKI通信工作流程 古典密码与分析古代密码的加密古典密码的分析 安全服务香农理论现代密码学乘积密码方案代换-置换网络安全性概念可证明安全性——规约(*规约证明的方案——…...
时间序列预测实战(九)PyTorch实现LSTM-ARIMA融合移动平均进行长期预测
一、本文介绍 本文带来的是利用传统时间序列预测模型ARIMA(注意:ARIMA模型不属于机器学习)和利用PyTorch实现深度学习模型LSTM进行融合进行预测,主要思想是->先利用ARIMA先和移动平均结合处理数据的线性部分(例如趋势和季节性)…...
由日期计算当天是星期几
题目 输入:一个合法的公历日期,格式为“XXXXXXXX”,分别代表年(4 位)、月(2 位)、日(2 位)。 输出:当日对应星期几的英语缩写(3 个字母ÿ…...
springboot模板引擎
1.服务端渲染时相比与前后端分离开发 原理是 跳过前端这一层 直接到服务端 通过数据和模板 生成页面返回前端 springboot包含如下模板引擎 典型如thymeleaf 1>导入依赖 2>查看路径 模板页面在 public static final String DEFAULT_PREFIX “classpath:/templates/”; 即…...
如何判断从本机上传到服务器的文件数据内容是一致的?用md5加密算法!
问题场景 最近在帮导师做横向,我想把整个项目环境放到服务器中,需要把一个很大的数据文件传到服务器,传上去很方便,但是涉及到文件的压缩上传和服务器内解压环节,不是太确定文件在本机和服务器的数据内容是否一致。 解…...
Ubuntu 20.04 DNS解析原理, 解决resolv.conf被覆盖问题
------------------------------------------------------------------ author: hjjdebug date: 2023年 11月 09日 星期四 14:01:11 CST description: Ubuntu 20.04 DNS解析原理, 解决resolv.conf被覆盖问题 ----------------------------------------------------------------…...
探索经典算法:贪心、分治、动态规划等
1.贪心算法 贪心算法是一种常见的算法范式,通常在解决最优化问题中使用。 贪心算法是一种在每一步选择中都采取当前状态下最优决策的算法范式。其核心思想是选择每一步的最佳解决方案,以期望达到最终的全局最优解。这种算法特点在于只考虑局部最优解&am…...
【Linux】编译Linux内核
之所以编译内核,是因为gem5全系统仿真需要vmlinux文件,在此记录一下以备后面需要。 此过程编译之后会获得vmlinux和bzImage两个文件; 主要参考知行大佬的编译内核与gem5官方教程 文章目录 一、Linux源码下载二、安装编译依赖三、编译1. 内核编…...
网页判断版本更新
一、需求解析 为什么我会想到这个技术呢,是因为我有一次发现,我司的用户在使用网页的时候,经常会出现一个页面放很久,下班也不关这个页面,这样就会导致页面的代码长时间处于不更新的状态。 在使用到一个功能出了bug&a…...
剧本创作新选择:如何用Trelby免费开源软件提升写作效率
剧本创作新选择:如何用Trelby免费开源软件提升写作效率 【免费下载链接】trelby The free, multiplatform, feature-rich screenwriting program! 项目地址: https://gitcode.com/gh_mirrors/tr/trelby 你是否曾为剧本格式调整而烦恼?是否在寻找一…...
拆穿名词诈骗!用大白话理解晦涩难懂的AI概念谒
1. 架构背景与演进动力 1.1 从单体到碎片化:.NET 的开源征程 在.NET Framework 时代,构建系统主要围绕 Windows 操作系统紧密集成,采用传统的封闭式开发模式。然而,随着.NET Core 的推出,微软开启了彻底的开源与跨平台…...
AI原生缓存架构生死线:当缓存失效导致LLM幻觉率上升22%,你还有3天重构窗口期
第一章:AI原生缓存架构的范式迁移与危机本质 2026奇点智能技术大会(https://ml-summit.org) 传统缓存系统建立在确定性访问模式与静态数据生命周期假设之上,而大语言模型推理、RAG实时检索、多模态流式生成等AI原生工作负载正持续冲击这一根基…...
手把手教你定制Ubuntu安装镜像:集成autoinstall配置,打造开箱即用的系统U盘
深度定制Ubuntu安装镜像:从autoinstall集成到U盘封装实战指南 当我们需要为实验室批量部署开发环境、为企业客户预装专用系统,或是为嵌入式设备打造专属镜像时,传统的手动安装方式显然效率低下。本文将带您深入探索如何将Ubuntu的autoinstall…...
嵌入式NFC开发:轻量级NDEF解析库NDefLib详解
1. NDefLib 库概述NDefLib 是一个面向嵌入式系统的轻量级 NFC 标签操作工具库,专为读写 Type 4 NFC 标签上的 NDEF(NFC Data Exchange Format)消息而设计。其核心定位并非替代完整的 NFC 协议栈(如 ISO/IEC 14443-4、ISO/IEC 7816…...
无需代码!AcousticSense AI音乐分类工具5分钟部署指南
无需代码!AcousticSense AI音乐分类工具5分钟部署指南 1. 让AI听懂音乐:视觉化流派分析新体验 你是否遇到过这样的情况:听到一首好歌却说不清它属于什么风格?或者需要整理上千首音乐却苦于手动分类?AcousticSense AI…...
终极指南:如何用FanControl实现Windows系统风扇精准控制
终极指南:如何用FanControl实现Windows系统风扇精准控制 【免费下载链接】FanControl.Releases This is the release repository for Fan Control, a highly customizable fan controlling software for Windows. 项目地址: https://gitcode.com/GitHub_Trending/…...
告别无效流量!亚马逊关键词挖掘:新手 7 天精准获客不浪费
亚马逊日常运营,关键词选不对,广告全白费:花大价钱投热门大词,点击多、转化少,ACoS 居高不下;自己想的关键词没人搜,广告预算花不出去,零曝光零订单;只盯着 10 几个…...
【ESP32-S3】智能小车中的编码电机PID调整技巧
【ESP32-S3】智能小车中的编码电机PID调整技巧PID 微调参数对照表推荐调试顺序(最安全)常用成品参数PID 微调参数对照表 参数作用太大表现太小表现建议起始值合理范围调整方向Kp 比例反应快慢、跟紧目标速度电机抖、嗡嗡响、抽搐、振荡反应慢、无力、速…...
Backbone:深度解析DLA中的迭代与分层聚合机制
1. 理解DLA的核心设计思想 第一次接触Deep Layer Aggregation(DLA)时,最让我困惑的是:为什么现有的网络结构需要新的聚合方式?经过几个项目的实践验证,我发现传统网络在特征融合方面存在明显短板。比如在做…...
