Spring Boot+RocketMQ 实现多实例分布式环境下的事件驱动
为什么要使用MQ?
在Spring Boot Event这篇文章中已经通过Guava
或者SpringBoot自身的Listener实现了事件驱动,已经做到了对业务的解耦。为什么还要用到MQ来进行业务解耦呢?
首先无论是通过Guava还是Spring Boot自身提供的监听注解来实现的事件驱动他都是处于同一进程中的,意思就是当前事件推送后只有当前的进程可以进行消费。通过MQ可以实现将事件推送到进程外的Broker
中,在多实例/分布式环境下,其他的服务在订阅同一事件(Topic)时,可以在各自的服务中进行消费,最大化空闲服务的利用。
源码地址:Gitee
整合RocketMQ
依赖版本
- JDK 17
- Spring Boot 3.2.0
- RocketMQ-Client 5.0.4
- RocketMQ-Starter 2.2.0
可以参考这篇进行RocketMQ安装
Spring Boot 3.0+ 取消了对spring.factories
的支持。所以在导入时需要手动引入RocketMQ的配置类。
引入RocketMQ依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.4</version>
</dependency>
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version>
</dependency>
解决Spring Boot3+不兼容 spring.factories
rocketmq-spring-boot-starter:2.2.2
版本中:
参考配置文件
# RocketMQ 配置
rocketmq:name-server: 127.0.0.1:9876consumer:group: event-mq-group# 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值pull-batch-size: 1producer:# 发送同一类消息的设置为同一个group,保证唯一group: event-mq-group# 发送消息超时时间,默认3000sendMessageTimeout: 10000# 发送消息失败重试次数,默认2retryTimesWhenSendFailed: 2# 异步消息重试此处,默认2retryTimesWhenSendAsyncFailed: 2# 消息最大长度,默认1024 * 1024 * 4(默认4M)maxMessageSize: 4096# 压缩消息阈值,默认4k(1024 * 4)compressMessageBodyThreshold: 4096# 是否在内部发送失败时重试另一个broker,默认falseretryNextServer: false
参考Issue
-
方法一 :通过
@Import(RocketMQAutoConfiguration.class)
在配置类中引入 -
方法二:在
resources
资源目录下创建文件夹及文件META-INF/spring
,org.springframework.boot.autoconfigure.AutoConfiguration.imports
。
文件内容为RocketMQ自动配置类路径:org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
RocketMQ 使用
解决Spring Boot3+不支持spring.factories的问题
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;/*** 启动类*/
@Import(RocketMQAutoConfiguration.class)
@SpringBootApplication
public class MQEventApplication {public static void main(String[] args) {SpringApplication.run(MQEventApplication.class, args);}
}
RocketMQ操作工具
RocketMQ Message实体
import cn.hutool.core.util.IdUtil;
import jakarta.validation.constraints.NotBlank;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;import java.io.Serializable;
import java.util.List;/*** RocketMQ 消息*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RocketMQMessage<T> implements Serializable {/*** 消息队列主题*/@NotBlank(message = "MQ Topic 不能为空")private String topic;/*** 延迟级别*/@Builder.Defaultprivate DelayLevel delayLevel = DelayLevel.OFF;/*** 消息体*/private T message;/*** 消息体*/private List<T> messages;/*** 使用有序消息发送时,指定发送到队列*/private String hashKey;/*** 任务Id,用于日志打印相关信息*/@Builder.Defaultprivate String taskId = IdUtil.fastSimpleUUID();
}
RocketMQTemplate 二次封装
import com.yiyan.study.domain.RocketMQMessage;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;/*** RocketMQ 消息工具类*/
@Slf4j
@Component
public class RocketMQService {@Resourceprivate RocketMQTemplate rocketMQTemplate;@Value("${rocketmq.producer.sendMessageTimeout}")private int sendMessageTimeout;/*** 异步发送消息回调** @param taskId 任务Id* @param topic 消息主题* @return the send callback*/private static SendCallback asyncSendCallback(String taskId, String topic) {return new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("ROCKETMQ 异步消息发送成功 : [TaskId:{}] - [Topic:{}] - [SendStatus:{}]", taskId, topic, sendResult.getSendStatus());}@Overridepublic void onException(Throwable throwable) {log.error("ROCKETMQ 异步消息发送失败 : [TaskId:{}] - [Topic:{}] - [ErrorMessage:{}]", taskId, topic, throwable.getMessage());}};}/*** 发送同步消息,使用有序发送请设置HashKey** @param message 消息参数*/public <T> void syncSend(RocketMQMessage<T> message) {log.info("ROCKETMQ 同步消息发送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic());SendResult sendResult;if (StringUtils.isNotBlank(message.getHashKey())) {sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey());} else {sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessage(), sendMessageTimeout, message.getDelayLevel().getLevel());}log.info("ROCKETMQ 同步消息发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]",message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());}/*** 批量发送同步消息** @param message 消息参数*/public <T> void syncSendBatch(RocketMQMessage<T> message) {log.info("ROCKETMQ 同步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]",message.getTaskId(), message.getTopic(), message.getMessages().size());SendResult sendResult;if (StringUtils.isNotBlank(message.getHashKey())) {sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey());} else {sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessages());}log.info("ROCKETMQ 同步消息-批量发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]",message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());}/*** 异步发送消息,异步返回消息结果** @param message 消息参数*/public <T> void asyncSend(RocketMQMessage<T> message) {log.info("ROCKETMQ 异步消息发送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic());if (StringUtils.isNotBlank(message.getHashKey())) {rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey(),asyncSendCallback(message.getTaskId(), message.getTopic()));} else {rocketMQTemplate.asyncSend(message.getTopic(), message.getMessage(),asyncSendCallback(message.getTaskId(), message.getTopic()), sendMessageTimeout, message.getDelayLevel().getLevel());}}/*** 批量异步发送消息** @param message 消息参数*/public <T> void asyncSendBatch(RocketMQMessage<T> message) {log.info("ROCKETMQ 异步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]",message.getTaskId(), message.getTopic(), message.getMessages().size());if (StringUtils.isNotBlank(message.getHashKey())) {rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey(),asyncSendCallback(message.getTaskId(), message.getTopic()));} else {rocketMQTemplate.asyncSend(message.getTopic(), message.getMessages(),asyncSendCallback(message.getTaskId(), message.getTopic()));}}/*** 单向发送消息,不关心返回结果,容易消息丢失,适合日志收集、不精确统计等消息发送;** @param message 消息参数*/public <T> void sendOneWay(RocketMQMessage<T> message) {sendOneWay(message, false);}/*** 单向消息 - 批量发送** @param message 消息体* @param batch 是否为批量操作*/public <T> void sendOneWay(RocketMQMessage<T> message, boolean batch) {log.info((batch ? "ROCKETMQ 单向消息发送 : [TaskId:{}] - [Topic:{}]": "ROCKETMQ 单向消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount{}]"),message.getTaskId(), message.getTopic(), message.getMessages().size());if (StringUtils.isNotBlank(message.getHashKey())) {if (batch) {message.getMessages().forEach(msg -> rocketMQTemplate.sendOneWayOrderly(message.getTopic(), msg, message.getHashKey()));} else {rocketMQTemplate.sendOneWayOrderly(message.getTopic(), message.getMessage(), message.getHashKey());}} else {if (batch) {message.getMessages().forEach(msg -> rocketMQTemplate.sendOneWay(message.getTopic(), msg));} else {rocketMQTemplate.sendOneWay(message.getTopic(), message.getMessage());}}}
}
定义RocketMQ消费者
import com.yiyan.study.constants.MQConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** MQ消息监听*/
@Component
@Slf4j
@RocketMQMessageListener(topic = MQConfig.EVENT_TOPIC,consumerGroup = MQConfig.EVENT_CONSUMER_GROUP)
public class MQListener implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("MQListener 接收消息 : {}", message);}
}
定义测试类发送消息
import cn.hutool.core.thread.ThreadUtil;
import com.yiyan.study.constants.MQConfig;
import com.yiyan.study.domain.RocketMQMessage;
import com.yiyan.study.utils.RocketMQService;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;/*** MQ测试*/
@SpringBootTest
public class MQTest {@Resourceprivate RocketMQService rocketMQService;@Testpublic void sendMessage() {int count = 1;while (count <= 50) {rocketMQService.syncSend(RocketMQMessage.builder().topic(MQConfig.EVENT_TOPIC).message(count++).build());}// 休眠等待消费消息ThreadUtil.sleep(2000L);}
}
测试
相关文章:

Spring Boot+RocketMQ 实现多实例分布式环境下的事件驱动
为什么要使用MQ? 在Spring Boot Event这篇文章中已经通过Guava或者SpringBoot自身的Listener实现了事件驱动,已经做到了对业务的解耦。为什么还要用到MQ来进行业务解耦呢? 首先无论是通过Guava还是Spring Boot自身提供的监听注解来实现的事…...
oracle ORA-01704: string literal too long ORACLE数据库clob类型
当oracle数据表中有clob类型字段时候,insert或update的sql语句中,超过长度就会报错 ORA-01704: string literal too long update xxx set xxx <div><h1>123</h1></div> where id 100;可以修改为 DECLAREstr varchar2(10000…...

微星主板强刷BIOS(以微星X370gaming plus 为例)
(前两天手欠,用U盘通过微星的M-flash升级BIOS 升级过程中老没动静就强制关机了 然后电脑就打不开了) 几种强刷主板BIOS的方式 在网上看到有三种强刷BIOS的方式分别是: 使用夹子编程器 (听说不太好夹)使用微星转接线编程器(只能用于微星主板࿰…...
matlab 图像上生成指定中心,指定大小的矩形窗
用matlab实现在图像上生成指定中心,指定大小的矩形窗(奇数*奇数) function PlaneWin PlaneWindow(CentreCoorX,CentreCoorY,RadiusX,RadiusY,SizeImRow,SizeImColumn) % 在图像上生成指定中心,指定大小的矩形窗(奇数*奇数) % % Input: % CentreCoorX(1*1) % CentreCoorY(1*1)…...
❀My学习小记录之算法❀
目录 算法:) 一、定义 二、特征 三、基本要素 常用设计模式 常用实现方法 四、形式化算法 五、复杂度 时间复杂度 空间复杂度 六、非确定性多项式时间(NP) 七、实现 八、示例 求最大值算法 求最大公约数算法 九、分类 算法:) 一、定义 …...

Hive-high Avaliabl
hive—high Avaliable hive的搭建方式有三种,分别是 1、Local/Embedded Metastore Database (Derby) 2、Remote Metastore Database 3、Remote Metastore Server 一般情况下,我们在学习的时候直接使用hive –service metastore的方式…...

码住!8个小众宝藏的开发者学习类网站
1、simplilearn simplilearn是全球排名第一的在线学习网站,它的课程由世界知名大学、顶级企业和领先的行业机构通过实时在线课程设计和提供,其中包括顶级行业从业者、广受欢迎的培训师和全球领导者。 2、VisuAlgo VisuAlgo是一个免费的在线学习算法和数…...

Postman常见问题及解决方法
1、网络连接问题 如果Postman无法发送请求或接收响应,可以尝试以下操作: 检查网络连接是否正常,包括检查网络设置、代理设置等。 确认请求的URL是否正确,并检查是否使用了正确的HTTP方法(例如GET、POST、PUT等&#…...
ubuntu图形化登录默认只有guest session账号解决方法
新安装的ubuntu16.x 图形化界面登录默认只有guest账号,只有进入guest账号之后再去手动切换root账号很麻烦,但是这样确实很安全。为了方便希望能够在登录图形化界面的时候以root身份/或者自定义其他身份登录。做一下简单的记录。 使用终端命令行编辑文件…...
全国计算机等级考试| 二级Python | 真题及解析(1)
一、选择题 1. 按照“后进先出”原则组织数据的数据结构是____ A栈 B双向链表 C二叉树 D队列 正确答案: A 2. 以下选项的叙述中,正确的是 A在循环队列中,只需要队头指针就能反映队列中元素的动态变化情况 B在循环队列中,只需要队尾指针就能反映队列中元素的动态变…...
Java开发框架和中间件面试题(9)
目录 102.你了解秒杀吗?怎么设计? 103.什么是缓存穿透?怎么解决? 102.你了解秒杀吗?怎么设计? 1.设计难点:并发量大,应用,数据库都承受不了。另外难控制超卖。 2.设计…...

【ARMv8M Cortex-M33 系列 2 -- Cortex-M33 JLink 连接 及 JFlash 烧写介绍】
文章目录 Jlink 工具JLink 命令行示例JFlash 烧写问题Jlink 工具 J-Link 是 SEGGER 提供的一款流行的 JTAG 调试器,它支持多个平台和处理器。JLink.exe 是 J-Link 调试器的命令行接口,它允许用户通过命令行执行一系列操作,例如编程、擦除、调试等。 工具链接: https://ww…...
react pwa应用示例
创建一个基于React的PWA应用,你可以使用create-react-app,它自带PWA支持,但默认是关闭的。以下是创建React PWA应用的步骤: 安装create-react-app 如果你还没有安装,你可以通过npm来安装: npm install -…...

python如何通过日志分析加入黑名单
python通过日志分析加入黑名单 监控nginx日志,若有人攻击,则加入黑名单,操作步骤如下: 1.读取日志文件 2.分隔文件,取出ip 3.将取出的ip放入list,然后判读ip的次数 4.若超过设定的次数,则加…...

RabbitMq知识概述
本文来说下RabbitMq相关的知识与概念 文章目录 概述AMQP协议Exchange 消息如何保证100%投递什么是生产端的可靠性投递可靠性投递保障方案 消息幂等性高并发的情况下如何避免消息重复消费confirm 确认消息、Return返回消息如何实现confirm确认消息return消息机制 消费…...
专业级A链接测试特有
A链接普通 A链接添加链接描述带有blank...

Spring Boot 入参校验及全局异常处理
版本依赖 JDK 17 Spring Boot 3.2.0 源码地址:Gitee Spring Boot validation spring-boot-starter-validation是基于hibernate-validator的实现,在Spring Boot项目中直接导入spring-boot-starter-validation即可。 Valid 和 Validated 的区别 适用范围…...
MySQL 和 MySQL2 的区别
MySQL是最流行的开源关系型数据库管理系统,拥有大量的使用者和广泛的应用场景。而MySQL2是MySQL官方团队推出的新一代MySQL驱动,用于取代老版的MySQL模块,提供更好的性能和更丰富的功能。 本文将介绍MySQL2相较于MySQL有哪些优势以及具体的技术区别。 …...

AutoCAD图纸打印后内容不见
用户反映,在CAD里的对象打印出来不显示。其实,这是在CAD的打印对象颜色的问题。(在9.2以下版本有这种问题,9.2及以上版本已默认此种颜色) 1.当背景色为黑色的时候,这里的颜色是白,如下图 2.当C…...

ASUS华硕ROG幻16 2023款GU603VU VV VI笔记本电脑原厂Win11.22H2系统
链接:https://pan.baidu.com/s/1AgevUZleCHBJgCBcIp5CFQ?pwdhjxy 提取码:hjxy 华硕笔记本2023款幻16原厂Windows11系统自带所有驱动、出厂主题壁纸、Office办公软件、MyASUS华硕电脑管家、Armoury Crate奥创控制中心等预装程序 文件格式࿱…...

IDEA运行Tomcat出现乱码问题解决汇总
最近正值期末周,有很多同学在写期末Java web作业时,运行tomcat出现乱码问题,经过多次解决与研究,我做了如下整理: 原因: IDEA本身编码与tomcat的编码与Windows编码不同导致,Windows 系统控制台…...
变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析
一、变量声明设计:let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性,这种设计体现了语言的核心哲学。以下是深度解析: 1.1 设计理念剖析 安全优先原则:默认不可变强制开发者明确声明意图 let x 5; …...

iOS 26 携众系统重磅更新,但“苹果智能”仍与国行无缘
美国西海岸的夏天,再次被苹果点燃。一年一度的全球开发者大会 WWDC25 如期而至,这不仅是开发者的盛宴,更是全球数亿苹果用户翘首以盼的科技春晚。今年,苹果依旧为我们带来了全家桶式的系统更新,包括 iOS 26、iPadOS 26…...

树莓派超全系列教程文档--(61)树莓派摄像头高级使用方法
树莓派摄像头高级使用方法 配置通过调谐文件来调整相机行为 使用多个摄像头安装 libcam 和 rpicam-apps依赖关系开发包 文章来源: http://raspberry.dns8844.cn/documentation 原文网址 配置 大多数用例自动工作,无需更改相机配置。但是,一…...

高频面试之3Zookeeper
高频面试之3Zookeeper 文章目录 高频面试之3Zookeeper3.1 常用命令3.2 选举机制3.3 Zookeeper符合法则中哪两个?3.4 Zookeeper脑裂3.5 Zookeeper用来干嘛了 3.1 常用命令 ls、get、create、delete、deleteall3.2 选举机制 半数机制(过半机制࿰…...

家政维修平台实战20:权限设计
目录 1 获取工人信息2 搭建工人入口3 权限判断总结 目前我们已经搭建好了基础的用户体系,主要是分成几个表,用户表我们是记录用户的基础信息,包括手机、昵称、头像。而工人和员工各有各的表。那么就有一个问题,不同的角色…...
论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一)
宇树机器人多姿态起立控制强化学习框架论文解析 论文解读:交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一) 论文解读:交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化…...

C# 类和继承(抽象类)
抽象类 抽象类是指设计为被继承的类。抽象类只能被用作其他类的基类。 不能创建抽象类的实例。抽象类使用abstract修饰符声明。 抽象类可以包含抽象成员或普通的非抽象成员。抽象类的成员可以是抽象成员和普通带 实现的成员的任意组合。抽象类自己可以派生自另一个抽象类。例…...

Java面试专项一-准备篇
一、企业简历筛选规则 一般企业的简历筛选流程:首先由HR先筛选一部分简历后,在将简历给到对应的项目负责人后再进行下一步的操作。 HR如何筛选简历 例如:Boss直聘(招聘方平台) 直接按照条件进行筛选 例如:…...
Device Mapper 机制
Device Mapper 机制详解 Device Mapper(简称 DM)是 Linux 内核中的一套通用块设备映射框架,为 LVM、加密磁盘、RAID 等提供底层支持。本文将详细介绍 Device Mapper 的原理、实现、内核配置、常用工具、操作测试流程,并配以详细的…...