芋道 Spring Cloud Alibaba 消息队列 RocketMQ 入门
1. 概述
RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。
具有以下特点:
- 能够保证严格的消息顺序
- 提供丰富的消息拉取模式
- 高效的订阅者水平扩展能力
- 实时的消息订阅机制
- 亿级消息堆积能力
Spring Cloud Stream
2.1 Spring Cloud Stream 是什么
Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架,它通过 Spring Integration 与消息中间件(如 RabbitMQ、Kafka、RocketMQ)进行连接。
2.2 核心概念
- Binder:与消息中间件集成的组件,负责创建对应的 Binding。
- Binding:消息中间件与应用程序之间的桥梁,分为 Input Binding(用于消费消息)和 Output Binding(用于生产消息)。
2.3 Broker 的角色
Broker 是消息队列中间件的代理服务器,负责存储消息、转发消息。例如,在 RocketMQ 中,Broker 负责接收从生产者发送来的消息并存储,同时为消费者的拉取请求作准备。

三、快速入门
3.1 搭建生产者
3.1.1 引入依赖
在 pom.xml 中引入 Spring Cloud Alibaba RocketMQ 相关依赖:
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
3.1.2 配置文件
在 application.yaml 中添加 Spring Cloud Alibaba RocketMQ 相关配置:
spring:application:name: demo-producer-applicationcloud:stream:bindings:demo01-output:destination: DEMO-TOPIC-01content-type: application/jsonrocketmq:binder:name-server: 127.0.0.1:9876bindings:demo01-output:producer:group: testsync: true
3.1.3 创建 MySource 接口
声明名字为 Output Binding:
public interface MySource {@Output("demo01-output")MessageChannel demo01Output();
}
3.1.4 创建 Demo01Message 类
作为示例消息:
public class Demo01Message {private Integer id;// getter 和 setter 方法
}
3.1.5 创建 Demo01Controller 类
提供发送消息的 HTTP 接口:
@RestController
@RequestMapping("/demo01")
public class Demo01Controller {@Autowiredprivate MySource mySource;@GetMapping("/send")public boolean send() {Demo01Message message = new Demo01Message().setId(new Random().nextInt());Message<Demo01Message> springMessage = MessageBuilder.withPayload(message).build();return mySource.demo01Output().send(springMessage);}
}
3.1.6 创建 ProducerApplication 类
启动应用:
@SpringBootApplication
@EnableBinding(MySource.class)
public class ProducerApplication {public static void main(String[] args) {SpringApplication.run(ProducerApplication.class, args);}
}
3.2 搭建消费者
3.2.1 引入依赖
与生产者类似,引入 Spring Cloud Alibaba RocketMQ 相关依赖。
3.2.2 配置文件
在 application.yaml 中添加消费者相关的配置:
spring:application:name: demo-consumer-applicationcloud:stream:bindings:demo01-input:destination: DEMO-TOPIC-01content-type: application/jsongroup: demo01-consumer-group-DEMO-TOPIC-01rocketmq:binder:name-server: 127.0.0.1:9876bindings:demo01-input:consumer:enabled: truebroadcasting: false
3.2.3 创建 MySink 接口
声明名字为 Input Binding:
public interface MySink {String DEMO01_INPUT = "demo01-input";@Input(DEMO01_INPUT)SubscribableChannel demo01Input();
}
3.2.4 创建 Demo01Message 类
与生产者一致。
3.2.5 创建 Demo01Consumer 类
消费消息:
@Component
public class Demo01Consumer {private Logger logger = LoggerFactory.getLogger(getClass());@StreamListener(MySink.DEMO01_INPUT)public void onMessage(@Payload Demo01Message message) {logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);}
}
3.2.6 创建 ConsumerApplication 类
启动应用:
@SpringBootApplication
@EnableBinding(MySink.class)
public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args);}
}
四、定时消息
4.1 定时消息的概念
定时消息是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。
4.2 实现定时消息
在发送消息时,通过设置消息的延迟级别来实现定时消息。例如:
@GetMapping("/send_delay")
public boolean sendDelay() {Demo01Message message = new Demo01Message().setId(new Random().nextInt());Message<Demo01Message> springMessage = MessageBuilder.withPayload(message).setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "3") // 设置延迟级别为 3,10 秒后消费.build();return mySource.demo01Output().send(springMessage);
}
五、消费重试
5.1 消费重试的机制
当消息消费失败时,RocketMQ 会通过消费重试机制,重新投递该消息给 Consumer,让 Consumer 有机会重新消费消息。
5.2 配置消费重试
在配置文件中设置消费重试相关的配置项:
spring:cloud:stream:bindings:demo01-input:consumer:max-attempts: 1rocketmq:bindings:demo01-input:consumer:delay-level-when-next-consume: 0
六、消费异常处理机制
6.1 异常处理的方式
Spring Cloud Stream 提供了通用的消费异常处理机制,可以通过 @ServiceActivator 或 @StreamListener 注解订阅错误通道,实现自定义的异常处理逻辑。
6.2 实现异常处理
在消费者中添加异常处理方法:
@Component
public class Demo01Consumer {// ...@ServiceActivator(inputChannel = "DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01.errors")public void handleError(ErrorMessage errorMessage) {logger.error("[handleError][payload:{}]", ExceptionUtils.getRootCauseMessage(errorMessage.getPayload()));}@StreamListener(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)public void globalHandleError(ErrorMessage errorMessage) {logger.error("[globalHandleError][payload:{}]", ExceptionUtils.getRootCauseMessage(errorMessage.getPayload()));}
}
七、广播消费
7.1 广播消费的概念
广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。
7.2 配置广播消费
在配置文件中设置 broadcasting 配置项为 true:
spring:cloud:stream:bindings:demo01-input:consumer:broadcasting: true
八、顺序消息
8.1 顺序消息的概念
RocketMQ 支持普通顺序消息和完全严格顺序消息,确保消息按顺序消费。
8.2 实现顺序消息
在生产者中设置分区 key 表达式,在消费者中设置顺序消费:
# 生产者配置
spring:cloud:stream:bindings:demo01-output:producer:partition-key-expression: payload['id']rocketmq:bindings:demo01-output:producer:group: testsync: true# 消费者配置
spring:cloud:stream:bindings:demo01-input:consumer:orderly: true
九、消息过滤
9.1 消息过滤的方式
RocketMQ 提供基于 Tag 和 SQL92 的消息过滤方式。
9.2 基于 Tag 过滤
在生产者中设置消息的 Tag,在消费者中设置过滤的 Tag:
// 生产者
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message).setHeader(MessageConst.PROPERTY_TAGS, "yunai").build();// 消费者配置
spring:cloud:stream:bindings:demo01-input:consumer:tags: yunai || yutou
9.3 基于 SQL92 过滤
在消费者中设置 SQL92 过滤表达式:
spring:cloud:stream:bindings:demo01-input:consumer:sql: "id > 100"
十、事务消息
10.1 事务消息的概念
RocketMQ 提供完整的事务消息功能,确保分布式事务的最终一致性。
10.2 实现事务消息
在生产者中发送事务消息,并实现事务监听器:
@GetMapping("/send_transaction")
public boolean sendTransaction() {Demo01Message message = new Demo01Message().setId(new Random().nextInt());Args args = new Args().setArgs1(1).setArgs2("2");Message<Demo01Message> springMessage = MessageBuilder.withPayload(message).setHeader("args", JSON.toJSONString(args)).build();return mySource.demo01Output().send(springMessage);
}@RocketMQTransactionListener(txProducerGroup = "test")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 执行本地事务逻辑return RocketMQLocalTransactionState.UNKNOWN;}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {// 回查本地事务状态return RocketMQLocalTransactionState.COMMIT;}
}
十一、监控端点
11.1 监控端点的作用
Spring Cloud Stream 提供了自定义监控端点,用于获取 Binding 和 Channel 信息,以及 RocketMQ 客户端的健康状态。
11.2 配置监控端点
在 pom.xml 中引入 Spring Boot Actuator 相关依赖,并在配置文件中开放监控端点:
management:endpoints:web:exposure:include: '*'endpoint:health:enabled: trueshow-details: ALWAYS
十二、更多的配置项信息
12.1 RocketMQ Binder Properties
配置项包括 name-server、access-key、secret-key 等。
12.2 RocketMQ Consumer Properties
配置项包括 enable、tags、sql、broadcasting、orderly 等。
12.3 RocketMQ Provider Properties
配置项包括 enable、group、maxMessageSize、transactional 等。
十三、接入阿里云的消息队列 RocketMQ
13.1 配置阿里云 RocketMQ
在配置文件中设置访问阿里云 RocketMQ 的账号、Namesrv 地址等参数:
spring:cloud:stream:bindings:demo01-output:destination: TOPIC_YUNAI_TESTrocketmq:binder:name-server: onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80access-key: ${ALIYUN_ACCESS_KEY}secret-key: ${ALIYUN_SECRET_KEY}bindings:demo01-output:producer:group: GID_PRODUCER_GROUP_YUNAI_TESTsync: true
总结
本文详细介绍了如何在 Spring Cloud Alibaba 中使用 RocketMQ 作为消息队列,从基础概念到快速入门,再到高级特性,如定时消息、消费重试、广播消费等,帮助开发者全面了解并应用 RocketMQ 到实际项目中。
相关文章:
芋道 Spring Cloud Alibaba 消息队列 RocketMQ 入门
1. 概述 RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销…...
【Go】切片
知识点关键概念切片声明var slice []int初始化切片slice : []int{1,2,3}make() 创建切片make([]int, len, cap)获取长度和容量len(slice), cap(slice)追加元素slice append(slice, value)切片截取slice[start:end](返回子切片)拷贝切片copy(dest, src)&…...
html css js网页制作成品——HTML+CSS+js迪奥口红网站网页设计(4页)附源码
目录 一、👨🎓网站题目 二、✍️网站描述 三、📚网站介绍 四、🌐网站效果 五、🪓 代码实现 🧱HTML 六、🥇 如何让学习不再盲目 七、🎁更多干货 一、👨…...
PPT 转高精度图片 API 接口
PPT 转高精度图片 API 接口 文件处理 / 图片处理,将 PPT 文件转换为图片序列。 1. 产品功能 支持将 PPT 文件转换为高质量图片序列;支持 .ppt 和 .pptx 格式;保持原始 PPT 的布局和样式;转换后的图片支持永久访问;全…...
python学习笔记--实现简单的爬虫(二)
任务:爬取B站上最爱欢迎的编程课程 网址:编程-哔哩哔哩_bilibili 打开网页的代码模块,如下图: 标题均位于class_"bili-video-card__info--tit"的h3标签中,下面通过代码来实现,需要说明的是URL中…...
【颠覆性缓存架构】Caffeine双引擎缓存实战:CPU和内存双优化,命中率提升到92%,内存减少75%
千万级QPS验证!Caffeine智能双缓存实现 92%命中率,内存减少75% 摘要: 本文揭秘千万级流量场景下的缓存革命性方案!基于Caffeine打造智能双模式缓存系统,通过冷热数据分离存储与精准资源分配策略,实现CPU利…...
STM32八股【2】-----ARM架构
1、架构包含哪几部分内容 寄存器处理模式流水线MMU指令集中断FPU总线架构 2、以STM32为例进行介绍 2.1 寄存器 寄存器名称作用R0-R3通用寄存器用于数据传递、计算及函数参数传递;R0 也用于存储函数返回值。R4-R12通用寄存器用于存储局部变量,减少频繁…...
智能汽车图像及视频处理方案,支持视频智能包装能力
美摄科技的智能汽车图像及视频处理方案,通过深度学习算法与先进的色彩管理技术,能够自动调整图像中的亮度、对比度、饱和度等关键参数,确保在各种光线条件下,图像都能呈现出最接近人眼的自然色彩与细节层次。这不仅提升了驾驶者的…...
<C#> 详细介绍.net 三种依赖注入:AddTransient、AddScoped、AddSingleton 的区别
在 .NET 8 里,AddTransient、AddScoped 和 AddSingleton 均为依赖注入容器用于注册服务的方法,不过它们的生命周期管理方式存在差异。下面为你详细介绍这三种方法的区别。 1. AddTransient AddTransient 方法所注册的服务,每次被请求时都会…...
jenkins+1panel面板java运行环境自动化部署java项目
本文章不包含1panel面板安装、jenkins部署、jenkins连接git服务器等操作教程,如有需要可以抽空后期补上 jenkins安装插件Publish Over SSH 在系统配置添加服务器 查看项目的工作空间 项目Configure->构Post Steps选择Send files or execute commands over SSH…...
C语言 【实现电脑关机小游戏】非常好玩
引言 在时间限制内做出正确的回答,时间一到,电脑自动关机,听起来是不是很有意思,下面来看看怎么实现吧。 注意:该游戏只在windows系统下可以玩, 一、游戏原理: 在Windows系统下,通…...
备份比赛数据【算法赛】
0备份比赛数据【算法赛】 - 蓝桥云课 问题描述 蓝桥杯大赛的组委会最近遇到了一个棘手的问题。他们有 N 台电脑需要备份比赛数据,每台电脑所需的备份时间分别为 A1,A2,…,AN 分钟。 备份必须按编号顺序依次进行,即先第 1 台,再第 2 …...
[网络安全] 滥用Azure内置Contributor角色横向移动至Azure VM
本文来源于团队的超辉老师,其系统分析了Azure RBAC角色模型及其在权限滥用场景下的攻击路径。通过利用AADInternals工具提升用户至Contributor角色,攻击者可在Azure VM中远程执行命令,创建后门账户,实现横向移动。文中详述了攻击步…...
人工智能(AI)系统化学习路线
一、为什么需要系统化学习AI? 人工智能技术正在重塑各行各业,但许多初学者容易陷入误区: ❌ 盲目跟风:直接学习TensorFlow/PyTorch,忽视数学与算法基础。 ❌ 纸上谈兵:只看理论不写代码,无法解…...
Ubuntu系统使用nmcli配置静态IP
1. 配置静态IP 以下命令请全部加上sudo, 否则很可能会报错!!! 列出可用的网络连接 nmcli connection show找到你的 WiFi 连接名称(如 "WiFi名称")。 设置静态 IP 地址、网关和 DNS nmcli connection modif…...
vue3,element-plus 表格单选、多选、反选、全选
准备 定义数据 // 表格 const table ref(); // 表格数据 import type { User } from "/interface"; const tableData ref<User[]>([]); // 表格选集 const tableSelection ref<User[]>([]); // 表格选择行 const tableSelectedRow ref<User>…...
ngx_http_core_server_name
定义在 src\http\ngx_http_core_module.c static char * ngx_http_core_server_name(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {ngx_http_core_srv_conf_t *cscf conf;u_char ch;ngx_str_t *value;ngx_uint_t i;ngx_…...
如何提升库存系统的高并发和稳定性:算法与设计模式
库存系统是企业运营的核心模块,尤其是在电商、零售和供应链管理中,系统的高并发和稳定性直接影响订单处理的准确性和效率。面对海量订单、复杂的库存管理需求,如何在高并发环境下确保库存数据的准确性和系统的稳定性?本文将从架构…...
【Linux】从开发到系统管理深入理解环境变量
文章目录 前言一、环境变量概念1.1 为什么需要环境变量?1.2 环境变量的本质特征 二、环境变量PATH2.1 PATH的运作机制2.2 常见环境变量及其作用2.3 环境变量操作指南 三、再谈环境变量3.1main函数命令行参数解析3.2 环境变量的继承机制3.3 本地变量与内部构建命令 总…...
C++相关
1.定义pos时最好用无符号整型 如uint8_t size_t 编译器可能会有(有符号/无符号不匹配)的警告 总的来说就是符号一致 2.遇到俩个lambda相互调用的情况 使用std:funtion前置声明 3.回顾了虚函数,定义virtual 就是虚函数 一般是父类指针指向子…...
智算中心系统化建设与运营框架
智算中心系统化建设与运营框架 围绕智算中心全生命周期,从政策驱动到技术落地构建完整解决方案: 一、政策与产业生态 政策支撑体系 算力补贴机制: 国家层面:工信部“东数西算”工程对西部智算中心给予电价优惠(0.3元/…...
空气质量查询API:助力健康生活与环境监测的智能工具
引言 随着工业化和城市化的快速发展,空气质量问题日益受到人们的关注。空气质量不仅影响我们的日常生活,还直接关系到我们的健康。因此,了解空气质量指数(AQI)以及各项污染物的浓度,对于保障人们的健康至关…...
【CGE】社会核算矩阵构建(一):SAM基本结构
【CGE】社会核算矩阵构建(一):SAM基本结构 社会核算矩阵构建(一):SAM基本结构一、SAM的概念和基本特点二、SAM的基本结构1.开放经济体的SAM表结构2.SAM表各账户的主要核算内容(1)社会…...
Ubuntu 系统部署 Ollama + DeepSeek + Docker + Ragflow
🌹作者主页:青花锁 🌹简介:Java领域优质创作者🏆、Java微服务架构公号作者😄 🌹简历模板、学习资料、面试题库、技术互助 🌹文末获取联系方式 📝 Mysql数据库规范 一、Ol…...
深入探究 JVM 堆的垃圾回收机制(二)— 回收
GC Roots 枚举需要遍历整个应用程序的上下文,而在进行可达性分析或者垃圾回收时,如果我们还是进行全堆扫描及收集,那么会非常耗时。JVM 将堆分为新生代及老生代,它们的回收频率及算法不一样。 1 回收算法 在进行可达性分析时&am…...
第三讲 | C/C++内存管理完全手册
C/C内存管理 一、 C/C内存分布二、 C语言中动态内存管理方式:malloc/calloc/realloc/free三、 C内存管理方式1. new/delete操作内置类型2. new和delete操作自定义类型 四、operator new和operator delete函数(重点)五、new和delete的实现原理…...
2021年蓝桥杯第十二届CC++大学B组真题及代码
目录 1A:空间(填空5分_单位转换) 2B:卡片(填空5分_模拟) 3C:直线(填空10分_数学排序) 4D:货物摆放(填空10分_质因数) 5E…...
秒杀业务优化之从分布式锁到基于消息队列的异步秒杀
一、业务场景介绍 优惠券、门票等限时抢购常常出现在各类应用中,这样的业务一般为了引流宣传而降低利润,所以一旦出现问题将造成较大损失,那么在业务中就要求我们对这类型商品严格限时、限量、每位用户限一次、准确无误的创建订单,…...
IntelliJ IDEA 将 Spring Boot 项目远程部署到服务器
使用 IntelliJ IDEA 将 Spring Boot 项目远程部署到服务器的详细步骤,涵盖多种常见方法: 方法一:通过 SSH Maven 插件直接部署 1. 服务器环境准备 确保服务器已安装: Java 运行环境(与项目 JDK 版本一致࿰…...
Qt 重入和线程安全
重入和线程安全 在整个文档中,"重入"和 "线程安全 "这两个术语被用来标记类和函数,以表明它们在多线程应用程序中的使用方式: 线程安全函数可以同时被多个线程调用,即使调用使用的是共享数据,因…...
