Spring Boot+RocketMQ 实现多实例分布式环境下的事件驱动
为什么要使用MQ?
在Spring Boot Event这篇文章中已经通过Guava或者SpringBoot自身的Listener实现了事件驱动,已经做到了对业务的解耦。为什么还要用到MQ来进行业务解耦呢?
首先无论是通过Guava还是Spring Boot自身提供的监听注解来实现的事件驱动他都是处于同一进程中的,意思就是当前事件推送后只有当前的进程可以进行消费。通过MQ可以实现将事件推送到进程外的Broker中,在多实例/分布式环境下,其他的服务在订阅同一事件(Topic)时,可以在各自的服务中进行消费,最大化空闲服务的利用。

源码地址:Gitee
整合RocketMQ
依赖版本
- JDK 17
- Spring Boot 3.2.0
- RocketMQ-Client 5.0.4
- RocketMQ-Starter 2.2.0
可以参考这篇进行RocketMQ安装
Spring Boot 3.0+ 取消了对spring.factories的支持。所以在导入时需要手动引入RocketMQ的配置类。
引入RocketMQ依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.4</version>
</dependency>
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version>
</dependency>
解决Spring Boot3+不兼容 spring.factories
rocketmq-spring-boot-starter:2.2.2版本中:

参考配置文件
# RocketMQ 配置
rocketmq:name-server: 127.0.0.1:9876consumer:group: event-mq-group# 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值pull-batch-size: 1producer:# 发送同一类消息的设置为同一个group,保证唯一group: event-mq-group# 发送消息超时时间,默认3000sendMessageTimeout: 10000# 发送消息失败重试次数,默认2retryTimesWhenSendFailed: 2# 异步消息重试此处,默认2retryTimesWhenSendAsyncFailed: 2# 消息最大长度,默认1024 * 1024 * 4(默认4M)maxMessageSize: 4096# 压缩消息阈值,默认4k(1024 * 4)compressMessageBodyThreshold: 4096# 是否在内部发送失败时重试另一个broker,默认falseretryNextServer: false
参考Issue
-
方法一 :通过
@Import(RocketMQAutoConfiguration.class)在配置类中引入 -
方法二:在
resources资源目录下创建文件夹及文件META-INF/spring,org.springframework.boot.autoconfigure.AutoConfiguration.imports。
文件内容为RocketMQ自动配置类路径:org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
RocketMQ 使用
解决Spring Boot3+不支持spring.factories的问题
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;/*** 启动类*/
@Import(RocketMQAutoConfiguration.class)
@SpringBootApplication
public class MQEventApplication {public static void main(String[] args) {SpringApplication.run(MQEventApplication.class, args);}
}
RocketMQ操作工具
RocketMQ Message实体
import cn.hutool.core.util.IdUtil;
import jakarta.validation.constraints.NotBlank;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;import java.io.Serializable;
import java.util.List;/*** RocketMQ 消息*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RocketMQMessage<T> implements Serializable {/*** 消息队列主题*/@NotBlank(message = "MQ Topic 不能为空")private String topic;/*** 延迟级别*/@Builder.Defaultprivate DelayLevel delayLevel = DelayLevel.OFF;/*** 消息体*/private T message;/*** 消息体*/private List<T> messages;/*** 使用有序消息发送时,指定发送到队列*/private String hashKey;/*** 任务Id,用于日志打印相关信息*/@Builder.Defaultprivate String taskId = IdUtil.fastSimpleUUID();
}
RocketMQTemplate 二次封装
import com.yiyan.study.domain.RocketMQMessage;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;/*** RocketMQ 消息工具类*/
@Slf4j
@Component
public class RocketMQService {@Resourceprivate RocketMQTemplate rocketMQTemplate;@Value("${rocketmq.producer.sendMessageTimeout}")private int sendMessageTimeout;/*** 异步发送消息回调** @param taskId 任务Id* @param topic 消息主题* @return the send callback*/private static SendCallback asyncSendCallback(String taskId, String topic) {return new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("ROCKETMQ 异步消息发送成功 : [TaskId:{}] - [Topic:{}] - [SendStatus:{}]", taskId, topic, sendResult.getSendStatus());}@Overridepublic void onException(Throwable throwable) {log.error("ROCKETMQ 异步消息发送失败 : [TaskId:{}] - [Topic:{}] - [ErrorMessage:{}]", taskId, topic, throwable.getMessage());}};}/*** 发送同步消息,使用有序发送请设置HashKey** @param message 消息参数*/public <T> void syncSend(RocketMQMessage<T> message) {log.info("ROCKETMQ 同步消息发送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic());SendResult sendResult;if (StringUtils.isNotBlank(message.getHashKey())) {sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey());} else {sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessage(), sendMessageTimeout, message.getDelayLevel().getLevel());}log.info("ROCKETMQ 同步消息发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]",message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());}/*** 批量发送同步消息** @param message 消息参数*/public <T> void syncSendBatch(RocketMQMessage<T> message) {log.info("ROCKETMQ 同步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]",message.getTaskId(), message.getTopic(), message.getMessages().size());SendResult sendResult;if (StringUtils.isNotBlank(message.getHashKey())) {sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey());} else {sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessages());}log.info("ROCKETMQ 同步消息-批量发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]",message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());}/*** 异步发送消息,异步返回消息结果** @param message 消息参数*/public <T> void asyncSend(RocketMQMessage<T> message) {log.info("ROCKETMQ 异步消息发送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic());if (StringUtils.isNotBlank(message.getHashKey())) {rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey(),asyncSendCallback(message.getTaskId(), message.getTopic()));} else {rocketMQTemplate.asyncSend(message.getTopic(), message.getMessage(),asyncSendCallback(message.getTaskId(), message.getTopic()), sendMessageTimeout, message.getDelayLevel().getLevel());}}/*** 批量异步发送消息** @param message 消息参数*/public <T> void asyncSendBatch(RocketMQMessage<T> message) {log.info("ROCKETMQ 异步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]",message.getTaskId(), message.getTopic(), message.getMessages().size());if (StringUtils.isNotBlank(message.getHashKey())) {rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey(),asyncSendCallback(message.getTaskId(), message.getTopic()));} else {rocketMQTemplate.asyncSend(message.getTopic(), message.getMessages(),asyncSendCallback(message.getTaskId(), message.getTopic()));}}/*** 单向发送消息,不关心返回结果,容易消息丢失,适合日志收集、不精确统计等消息发送;** @param message 消息参数*/public <T> void sendOneWay(RocketMQMessage<T> message) {sendOneWay(message, false);}/*** 单向消息 - 批量发送** @param message 消息体* @param batch 是否为批量操作*/public <T> void sendOneWay(RocketMQMessage<T> message, boolean batch) {log.info((batch ? "ROCKETMQ 单向消息发送 : [TaskId:{}] - [Topic:{}]": "ROCKETMQ 单向消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount{}]"),message.getTaskId(), message.getTopic(), message.getMessages().size());if (StringUtils.isNotBlank(message.getHashKey())) {if (batch) {message.getMessages().forEach(msg -> rocketMQTemplate.sendOneWayOrderly(message.getTopic(), msg, message.getHashKey()));} else {rocketMQTemplate.sendOneWayOrderly(message.getTopic(), message.getMessage(), message.getHashKey());}} else {if (batch) {message.getMessages().forEach(msg -> rocketMQTemplate.sendOneWay(message.getTopic(), msg));} else {rocketMQTemplate.sendOneWay(message.getTopic(), message.getMessage());}}}
}
定义RocketMQ消费者
import com.yiyan.study.constants.MQConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** MQ消息监听*/
@Component
@Slf4j
@RocketMQMessageListener(topic = MQConfig.EVENT_TOPIC,consumerGroup = MQConfig.EVENT_CONSUMER_GROUP)
public class MQListener implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("MQListener 接收消息 : {}", message);}
}
定义测试类发送消息
import cn.hutool.core.thread.ThreadUtil;
import com.yiyan.study.constants.MQConfig;
import com.yiyan.study.domain.RocketMQMessage;
import com.yiyan.study.utils.RocketMQService;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;/*** MQ测试*/
@SpringBootTest
public class MQTest {@Resourceprivate RocketMQService rocketMQService;@Testpublic void sendMessage() {int count = 1;while (count <= 50) {rocketMQService.syncSend(RocketMQMessage.builder().topic(MQConfig.EVENT_TOPIC).message(count++).build());}// 休眠等待消费消息ThreadUtil.sleep(2000L);}
}
测试

相关文章:
Spring Boot+RocketMQ 实现多实例分布式环境下的事件驱动
为什么要使用MQ? 在Spring Boot Event这篇文章中已经通过Guava或者SpringBoot自身的Listener实现了事件驱动,已经做到了对业务的解耦。为什么还要用到MQ来进行业务解耦呢? 首先无论是通过Guava还是Spring Boot自身提供的监听注解来实现的事…...
oracle ORA-01704: string literal too long ORACLE数据库clob类型
当oracle数据表中有clob类型字段时候,insert或update的sql语句中,超过长度就会报错 ORA-01704: string literal too long update xxx set xxx <div><h1>123</h1></div> where id 100;可以修改为 DECLAREstr varchar2(10000…...
微星主板强刷BIOS(以微星X370gaming plus 为例)
(前两天手欠,用U盘通过微星的M-flash升级BIOS 升级过程中老没动静就强制关机了 然后电脑就打不开了) 几种强刷主板BIOS的方式 在网上看到有三种强刷BIOS的方式分别是: 使用夹子编程器 (听说不太好夹)使用微星转接线编程器(只能用于微星主板࿰…...
matlab 图像上生成指定中心,指定大小的矩形窗
用matlab实现在图像上生成指定中心,指定大小的矩形窗(奇数*奇数) function PlaneWin PlaneWindow(CentreCoorX,CentreCoorY,RadiusX,RadiusY,SizeImRow,SizeImColumn) % 在图像上生成指定中心,指定大小的矩形窗(奇数*奇数) % % Input: % CentreCoorX(1*1) % CentreCoorY(1*1)…...
❀My学习小记录之算法❀
目录 算法:) 一、定义 二、特征 三、基本要素 常用设计模式 常用实现方法 四、形式化算法 五、复杂度 时间复杂度 空间复杂度 六、非确定性多项式时间(NP) 七、实现 八、示例 求最大值算法 求最大公约数算法 九、分类 算法:) 一、定义 …...
Hive-high Avaliabl
hive—high Avaliable hive的搭建方式有三种,分别是 1、Local/Embedded Metastore Database (Derby) 2、Remote Metastore Database 3、Remote Metastore Server 一般情况下,我们在学习的时候直接使用hive –service metastore的方式…...
码住!8个小众宝藏的开发者学习类网站
1、simplilearn simplilearn是全球排名第一的在线学习网站,它的课程由世界知名大学、顶级企业和领先的行业机构通过实时在线课程设计和提供,其中包括顶级行业从业者、广受欢迎的培训师和全球领导者。 2、VisuAlgo VisuAlgo是一个免费的在线学习算法和数…...
Postman常见问题及解决方法
1、网络连接问题 如果Postman无法发送请求或接收响应,可以尝试以下操作: 检查网络连接是否正常,包括检查网络设置、代理设置等。 确认请求的URL是否正确,并检查是否使用了正确的HTTP方法(例如GET、POST、PUT等&#…...
ubuntu图形化登录默认只有guest session账号解决方法
新安装的ubuntu16.x 图形化界面登录默认只有guest账号,只有进入guest账号之后再去手动切换root账号很麻烦,但是这样确实很安全。为了方便希望能够在登录图形化界面的时候以root身份/或者自定义其他身份登录。做一下简单的记录。 使用终端命令行编辑文件…...
全国计算机等级考试| 二级Python | 真题及解析(1)
一、选择题 1. 按照“后进先出”原则组织数据的数据结构是____ A栈 B双向链表 C二叉树 D队列 正确答案: A 2. 以下选项的叙述中,正确的是 A在循环队列中,只需要队头指针就能反映队列中元素的动态变化情况 B在循环队列中,只需要队尾指针就能反映队列中元素的动态变…...
Java开发框架和中间件面试题(9)
目录 102.你了解秒杀吗?怎么设计? 103.什么是缓存穿透?怎么解决? 102.你了解秒杀吗?怎么设计? 1.设计难点:并发量大,应用,数据库都承受不了。另外难控制超卖。 2.设计…...
【ARMv8M Cortex-M33 系列 2 -- Cortex-M33 JLink 连接 及 JFlash 烧写介绍】
文章目录 Jlink 工具JLink 命令行示例JFlash 烧写问题Jlink 工具 J-Link 是 SEGGER 提供的一款流行的 JTAG 调试器,它支持多个平台和处理器。JLink.exe 是 J-Link 调试器的命令行接口,它允许用户通过命令行执行一系列操作,例如编程、擦除、调试等。 工具链接: https://ww…...
react pwa应用示例
创建一个基于React的PWA应用,你可以使用create-react-app,它自带PWA支持,但默认是关闭的。以下是创建React PWA应用的步骤: 安装create-react-app 如果你还没有安装,你可以通过npm来安装: npm install -…...
python如何通过日志分析加入黑名单
python通过日志分析加入黑名单 监控nginx日志,若有人攻击,则加入黑名单,操作步骤如下: 1.读取日志文件 2.分隔文件,取出ip 3.将取出的ip放入list,然后判读ip的次数 4.若超过设定的次数,则加…...
RabbitMq知识概述
本文来说下RabbitMq相关的知识与概念 文章目录 概述AMQP协议Exchange 消息如何保证100%投递什么是生产端的可靠性投递可靠性投递保障方案 消息幂等性高并发的情况下如何避免消息重复消费confirm 确认消息、Return返回消息如何实现confirm确认消息return消息机制 消费…...
专业级A链接测试特有
A链接普通 A链接添加链接描述带有blank...
Spring Boot 入参校验及全局异常处理
版本依赖 JDK 17 Spring Boot 3.2.0 源码地址:Gitee Spring Boot validation spring-boot-starter-validation是基于hibernate-validator的实现,在Spring Boot项目中直接导入spring-boot-starter-validation即可。 Valid 和 Validated 的区别 适用范围…...
MySQL 和 MySQL2 的区别
MySQL是最流行的开源关系型数据库管理系统,拥有大量的使用者和广泛的应用场景。而MySQL2是MySQL官方团队推出的新一代MySQL驱动,用于取代老版的MySQL模块,提供更好的性能和更丰富的功能。 本文将介绍MySQL2相较于MySQL有哪些优势以及具体的技术区别。 …...
AutoCAD图纸打印后内容不见
用户反映,在CAD里的对象打印出来不显示。其实,这是在CAD的打印对象颜色的问题。(在9.2以下版本有这种问题,9.2及以上版本已默认此种颜色) 1.当背景色为黑色的时候,这里的颜色是白,如下图 2.当C…...
ASUS华硕ROG幻16 2023款GU603VU VV VI笔记本电脑原厂Win11.22H2系统
链接:https://pan.baidu.com/s/1AgevUZleCHBJgCBcIp5CFQ?pwdhjxy 提取码:hjxy 华硕笔记本2023款幻16原厂Windows11系统自带所有驱动、出厂主题壁纸、Office办公软件、MyASUS华硕电脑管家、Armoury Crate奥创控制中心等预装程序 文件格式࿱…...
浅谈 React Hooks
React Hooks 是 React 16.8 引入的一组 API,用于在函数组件中使用 state 和其他 React 特性(例如生命周期方法、context 等)。Hooks 通过简洁的函数接口,解决了状态与 UI 的高度解耦,通过函数式编程范式实现更灵活 Rea…...
未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?
编辑:陈萍萍的公主一点人工一点智能 未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战,在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…...
大数据学习栈记——Neo4j的安装与使用
本文介绍图数据库Neofj的安装与使用,操作系统:Ubuntu24.04,Neofj版本:2025.04.0。 Apt安装 Neofj可以进行官网安装:Neo4j Deployment Center - Graph Database & Analytics 我这里安装是添加软件源的方法 最新版…...
iOS 26 携众系统重磅更新,但“苹果智能”仍与国行无缘
美国西海岸的夏天,再次被苹果点燃。一年一度的全球开发者大会 WWDC25 如期而至,这不仅是开发者的盛宴,更是全球数亿苹果用户翘首以盼的科技春晚。今年,苹果依旧为我们带来了全家桶式的系统更新,包括 iOS 26、iPadOS 26…...
AI Agent与Agentic AI:原理、应用、挑战与未来展望
文章目录 一、引言二、AI Agent与Agentic AI的兴起2.1 技术契机与生态成熟2.2 Agent的定义与特征2.3 Agent的发展历程 三、AI Agent的核心技术栈解密3.1 感知模块代码示例:使用Python和OpenCV进行图像识别 3.2 认知与决策模块代码示例:使用OpenAI GPT-3进…...
MongoDB学习和应用(高效的非关系型数据库)
一丶 MongoDB简介 对于社交类软件的功能,我们需要对它的功能特点进行分析: 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具: mysql:关系型数据库&am…...
大型活动交通拥堵治理的视觉算法应用
大型活动下智慧交通的视觉分析应用 一、背景与挑战 大型活动(如演唱会、马拉松赛事、高考中考等)期间,城市交通面临瞬时人流车流激增、传统摄像头模糊、交通拥堵识别滞后等问题。以演唱会为例,暖城商圈曾因观众集中离场导致周边…...
【位运算】消失的两个数字(hard)
消失的两个数字(hard) 题⽬描述:解法(位运算):Java 算法代码:更简便代码 题⽬链接:⾯试题 17.19. 消失的两个数字 题⽬描述: 给定⼀个数组,包含从 1 到 N 所有…...
测试markdown--肇兴
day1: 1、去程:7:04 --11:32高铁 高铁右转上售票大厅2楼,穿过候车厅下一楼,上大巴车 ¥10/人 **2、到达:**12点多到达寨子,买门票,美团/抖音:¥78人 3、中饭&a…...
今日学习:Spring线程池|并发修改异常|链路丢失|登录续期|VIP过期策略|数值类缓存
文章目录 优雅版线程池ThreadPoolTaskExecutor和ThreadPoolTaskExecutor的装饰器并发修改异常并发修改异常简介实现机制设计原因及意义 使用线程池造成的链路丢失问题线程池导致的链路丢失问题发生原因 常见解决方法更好的解决方法设计精妙之处 登录续期登录续期常见实现方式特…...
