SpringBoot整合RocketMQ
前言
在当今快速发展的软件开发领域,构建高效、稳定的应用系统是每个开发者的追求。Spring Boot 作为一款极具影响力的开发框架,凭借其强大的自动化配置和便捷的开发特性,极大地简化了项目搭建过程。使用 Spring Boot,我们无需再为框架之间的兼容性、适用版本等繁杂问题而烦恼。只需简单添加一个配置,就能轻松引入所需的各种功能和组件,实现快速开发。
一、技术介绍
1.1 消息队列
消息队列中间件是分布式系统中不可或缺的重要组件。在分布式系统中,各个服务之间相互独立又紧密协作,消息队列就像是一座桥梁,连接着不同的服务,实现它们之间的高效通信。其主要功能在于解决应用耦合、异步消息处理以及流量削峰等关键问题。
1.2 RocketMQ
RocketMQ 是一款基于队列模型的消息中间件,具有高性能、高可靠、高实时和分布式的显著特点。它采用 Java 语言开发,由阿里巴巴团队开发并开源。RocketMQ 的高性能体现在其能够支持高并发的消息读写操作,满足大规模分布式系统的性能需求。
二、使用步骤
2.1 引入 Maven 依赖
首先需要在项目的pom.xml文件中引入相关的 Maven 依赖。可以直接复制都是通用的
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.4.1</version><relativePath/>
</parent>
<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.1</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.8.0</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-common</artifactId><version>4.8.0</version></dependency>
</dependencies>
引入了rocketmq-spring-boot-starter依赖,它提供了 Spring Boot 与 RocketMQ 集成的相关功能。同时还引入了rocketmq-client和rocketmq-common依赖,它们分别包含了 RocketMQ 客户端的核心功能和公共工具类。
2.2 封装 RocketMQ 工具类
为了更方便地在项目中使用 RocketMQ 发送消息,作者封装了一个 RocketMQ 工具类,将常用的消息发送操作封装起来。
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;@Component
public class RocketMqHelper {private static final Logger LOG = LoggerFactory.getLogger(RocketMqHelper.class);/*** rocketmq模板注入*/@Autowiredprivate RocketMQTemplate rocketMQTemplate;@PostConstructpublic void init() {LOG.info("---RocketMq助手初始化---");}/*** 发送异步消息的基础方法** @param topic 消息Topic* @param message 消息实体* @param sendCallback 回调函数* @param timeout 超时时间* @param delayLevel 延迟消息的级别*/private void asyncSendBase(String topic, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) {if (topic == null || message == null) {LOG.error("发送异步消息时,topic或message不能为空");return;}if (sendCallback == null) {sendCallback = getDefaultSendCallBack();}rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout, delayLevel);}/*** 发送异步消息** @param topic 消息Topic* @param message 消息实体*/public void asyncSend(Enum topic, Message<?> message) {asyncSend(topic.name(), message, null, 0, 0);}/*** 发送异步消息** @param topic 消息Topic* @param message 消息实体* @param sendCallback 回调函数*/public void asyncSend(Enum topic, Message<?> message, SendCallback sendCallback) {asyncSend(topic.name(), message, sendCallback, 0, 0);}/*** 发送异步消息** @param topic 消息Topic* @param message 消息实体*/public void asyncSend(String topic, Message<?> message) {asyncSend(topic, message, null, 0, 0);}/*** 发送异步消息** @param topic 消息Topic* @param message 消息实体* @param sendCallback 回调函数*/public void asyncSend(String topic, Message<?> message, SendCallback sendCallback) {asyncSend(topic, message, sendCallback, 0, 0);}/*** 发送异步消息** @param topic 消息Topic* @param message 消息实体* @param sendCallback 回调函数* @param timeout 超时时间*/public void asyncSend(String topic, Message<?> message, SendCallback sendCallback, long timeout) {asyncSend(topic, message, sendCallback, timeout, 0);}/*** 发送异步消息** @param topic 消息Topic* @param message 消息实体* @param sendCallback 回调函数* @param timeout 超时时间* @param delayLevel 延迟消息的级别*/public void asyncSend(String topic, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) {asyncSendBase(topic, message, sendCallback, timeout, delayLevel);}/*** 发送顺序消息的基础方法** @param topic 消息Topic* @param message 消息实体* @param hashKey 哈希键* @param timeout 超时时间*/private void syncSendOrderlyBase(String topic, Message<?> message, String hashKey, long timeout) {if (topic == null || message == null || hashKey == null) {LOG.error("发送顺序消息时,topic、message或hashKey不能为空");return;}LOG.info("发送顺序消息,topic:{}, hashKey:{}, timeout:{}", topic, hashKey, timeout);rocketMQTemplate.syncSendOrderly(topic, message, hashKey, timeout);}/*** 发送顺序消息** @param topic 消息Topic* @param message 消息实体* @param hashKey 哈希键*/public void syncSendOrderly(Enum topic, Message<?> message, String hashKey) {syncSendOrderly(topic.name(), message, hashKey, 0);}/*** 发送顺序消息** @param topic 消息Topic* @param message 消息实体* @param hashKey 哈希键*/public void syncSendOrderly(String topic, Message<?> message, String hashKey) {syncSendOrderly(topic, message, hashKey, 0);}/*** 发送顺序消息** @param topic 消息Topic* @param message 消息实体* @param hashKey 哈希键* @param timeout 超时时间*/public void syncSendOrderly(String topic, Message<?> message, String hashKey, long timeout) {syncSendOrderlyBase(topic, message, hashKey, timeout);}/*** 默认CallBack函数** @return*/private SendCallback getDefaultSendCallBack() {return new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {LOG.info("---发送MQ成功---");}@Overridepublic void onException(Throwable throwable) {LOG.error("---发送MQ失败---", throwable);}};}@PreDestroypublic void destroy() {LOG.info("---RocketMq助手注销---");}
}
2.3 配置文件
在application.yml配置文件中,配置 RocketMQ 的相关参数,包括 Name Server 地址、生产者配置等配置信息,大部分也是通用的,端口号可以自己适配。
server:port: 8088
#rocketmq配置
rocketmq:name-server: 127.0.0.1:9876# 生产者配置 producer:isOnOff: on# 发送同一类消息的设置为同一个group,保证唯一group: rocketmq-groupgroupName: rocketmq-group# 服务地址namesrvAddr: 127.0.0.1:9876# 消息最大长度 默认1024*4(4M)maxMessageSize: 4096# 发送消息超时时间,默认3000sendMsgTimeout: 3000# 发送消息失败重试次数,默认2retryTimesWhenSendFailed: 2
2.4 单元测试
为了验证 RocketMQ 的集成是否成功,编写单元测试代码,发送消息并监听消息的接收情况。
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;@SpringBootTest
public class RocketMQTest {@Autowiredprivate RocketMqHelper rocketMqHelper;@Testpublic void testProducter() {Student stu= new Student();stu.setName("abc");stu.setScore(19);rocketMqHelper.asyncSend("STUDENT_ADD", MessageBuilder.withPayload(stu).build());}
}
异步发送student对象给RocketMQ的broker
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}", topic = "STUDENT_ADD")
public class PersonMqListener implements RocketMQListener<Person> {@Overridepublic void onMessage(Student stu) {System.out.println("接收到消息,开始消费..name:" + stu.getName() + ",age:" + stu.getAge());}
}
消费逻辑是重写RocketMQlistener的父类方法就行
相关文章:
SpringBoot整合RocketMQ
前言 在当今快速发展的软件开发领域,构建高效、稳定的应用系统是每个开发者的追求。Spring Boot 作为一款极具影响力的开发框架,凭借其强大的自动化配置和便捷的开发特性,极大地简化了项目搭建过程。使用 Spring Boot,我们无需再…...
深入理解 YUV Planar 和色度二次采样 —— 视频处理的核心技术
深入理解 YUV Planar 和色度二次采样 —— 视频处理的核心技术 在现代视频处理和编码中,YUV 颜色空间和**色度二次采样(Chroma Subsampling)**是两个非常重要的概念。它们的结合不仅能够显著减少视频数据量,还能在保持较高视觉质量的同时优化存储和传输效率。而 YUV Plana…...
项目顺利交付,几个关键阶段
年前离放假还有10天的时候,来了一个应急项目, 需要在放假前一天完成一个演示版本的项目,过年期间给甲方领导看。 本想的最后几天摸摸鱼,这么一来,非但摸鱼不了,还得加班。 还在虽然累,但也是…...
第七天 开始学习ArkTS基础,理解声明式UI编程思想
学习 ArkTS 的声明式 UI 编程思想是掌握 HarmonyOS 应用开发的核心基础。以下是一份简洁高效的学习指南,帮助你快速入门: 一、ArkTS 声明式 UI 核心思想 数据驱动 UI f(state):UI 是应用状态的函数,状态变化自动触发 UI 更新。单…...
windows C++ Fiber (协程)
协程,也叫微线程,多个协程在逻辑上是并发的,实际并发由用户控件。 在windows上引入了纤程(fiber)。 WinBase.h 中函数原型 #if(_WIN32_WINNT > 0x0400)// // Fiber begin //#pragma region Application Family or OneCore Family or Game…...
游戏引擎学习第89天
回顾 由于一直没有渲染器,终于决定开始动手做一个渲染器,虽然开始时并不确定该如何进行,但一旦开始做,发现这其实是正确的决定。因此,接下来可能会花一到两周的时间来编写渲染器,甚至可能更长时间…...
2025新鲜出炉--前端面试题(一)
文章目录 1. vue3有用过吗, 和vue2之间有哪些区别2. vue-router有几种路由, 分别怎么实现3. webpack和rollup这两个什么区别, 你会怎么选择4. 你能简单介绍一下webpack项目的构建流程吗5. webpack平时有手写过loader和plugin吗6. webpack这块你平时做过哪些优化吗?7…...
教程 | i.MX RT1180 ECAT_digital_io DEMO 搭建(一)
本文介绍 i.MX RT1180 EtherCAT digital io DEMO 搭建,Master 使用 TwinCAT ,由于步骤较多,分为上下两篇,本文为第一篇,主要介绍使用 TwinCAT 控制前的一些准备。 原厂 SDK 提供了 evkmimxrt1180_ecat_examples_digit…...
Pyecharts系列课程04——折线图/面积图(Line)
本章我们学习在Pyecharts中折线图(Line)的使用。折线图通用应用于数据的趋势分析。 折线图 我们现在有两组数据,x_data是2024年的月份,y_data为对应张三甲每个月的用电量。 # 家庭每月用电量趋势 x_data ["1月", &q…...
变压器-000000
最近一个项目是木田12V的充电器,要设计变压器,输出是12V,电压大于1.5A12.6*1.518.9W. 也就是可以将变压器当成初级输入的一个负载。输入端18.9W. 那么功率UI 。因为变压器的输入是线性上升的,所以电压为二份之一,也就是1/2*功率…...
凝思60重置密码
凝思系统重置密码 - 赛博狗尾草 - 博客园 问题描述 凝思系统进入单用户模式,在此模式下,用户可以访问修复错误配置的文件。也可以在此模式下安装显卡驱动,解决和已加载驱动的冲突问题。 适用范围 linx-6.0.60 linx-6.0.80 linx-6.0.100…...
linux——网络计算机{序列化及反序列化(JSON)(ifdef的用法)}
linux——网络(服务器的永久不挂——守护进程)-CSDN博客 目录 一、序列化与反序列化 1. 推荐 JSON 库 2. 使用 nlohmann/json 示例 安装方法 基础用法 输出结果 3. 常见操作 4. 其他库对比 5. 选择建议 二、ifdef宏的用法 基本语法 核心用途…...
【教程】docker升级镜像
转载请注明出处:小锋学长生活大爆炸[xfxuezhagn.cn] 如果本文帮助到了你,欢迎[点赞、收藏、关注]哦~ 目录 自动升级 手动升级 无论哪种方式,最重要的是一定要通过-v参数做数据的持久化! 自动升级 使用watchtower,可…...
迅为RK3568开发板篇OpenHarmony实操HDF驱动控制LED-编写应用APP
在应用代码中我们实现如下功能: 当应用程序启动后会获取命令行参数。如果命令行没有参数,LED 灯将循环闪烁;如果命令行带有参数,则根据传输的参数控制 LED 灯的开启或关闭。通过 HdfIoServiceBind 绑定 LED灯的 HDF 服务ÿ…...
python代码
python\main_script.py from multiprocessing import Process import subprocessdef call_script(args):# 创建一个新的进程来运行script_to_call.pyprocess Process(targetrun_script, args(args[0], args[1]))process.start()process2 Process(targetrun_script, args(arg…...
React 打印插件 -- react-to-print
一、安装依赖 npm install react-to-print 二、使用 import { useReactToPrint } from "react-to-print"; import React, { useRef, forwardRef } from react;const Content () > {const contentRef useRef(null);const reactToPrintFn useReactToPrint({ c…...
探索C语言简易计算器程序的实现与优化
在C语言编程学习中,实现一个简易计算器是一个常见且有趣的练习项目。它不仅能帮助我们巩固基本的语法知识,如函数、循环、分支结构,还能让我们深入理解程序设计的逻辑。接下来,我们将分析三段实现简易计算器功能的C语言代码&#…...
深入了解 MySQL:从基础到高级特性
引言 在当今数字化时代,数据的存储和管理至关重要。MySQL 作为一款广泛使用的开源关系型数据库管理系统(RDBMS),凭借其高性能、可靠性和易用性,成为众多开发者和企业的首选。本文将详细介绍 MySQL 的基础概念、安装启…...
OSPF基础(1):工作过程、状态机、更新
OSPF基础 1、技术背景(与RIP密不可分,因为RIP中存在的问题) RIP中存在最大跳数为15的限制,不能适应大规模组网周期性发送全部路由信息,占用大量的带宽资源以路由收敛速度慢以跳数作为度量值存在路由环路可能性每隔30秒…...
工业相机如何获得更好的图像色彩
如何获得更好的图像色彩 大部分的工业自动化检测中对物体的色彩信息并不敏感,因此会使用黑白的相机,但是在显微镜成像、颜色分类识别等领域,相机的色彩还原就显得格外重要,在调节相机色彩方面的参数时,有以下几个方面需…...
5分钟掌握B站音频提取:BilibiliDown开源工具的完整指南
5分钟掌握B站音频提取:BilibiliDown开源工具的完整指南 【免费下载链接】BilibiliDown (GUI-多平台支持) B站 哔哩哔哩 视频下载器。支持稍后再看、收藏夹、UP主视频批量下载|Bilibili Video Downloader 😳 项目地址: https://gitcode.com/gh_mirrors/…...
如何用Planck-Pi实现低成本嵌入式开发?基于F1C200s的全栈方案解析
如何用Planck-Pi实现低成本嵌入式开发?基于F1C200s的全栈方案解析 【免费下载链接】Planck-Pi Super TINY & Low-cost Linux Develop-Kit Based On F1C200s. 项目地址: https://gitcode.com/gh_mirrors/pl/Planck-Pi Planck-Pi作为一款基于全志F1C200s芯…...
2026年了,为什么很多企业做了智慧气象,结果还是没把风险降下来?
上个月,和一位新能源集团的运营负责人聊天,他抛出一个百思不得其解的问题:“我们花了300多万上了智慧气象系统,接了精细化预报,预警信息每天推送到手机、电脑、大屏,三个渠道同步。结果上个月一场雷暴&…...
程序替换与shell
程序替换函数execlexeclpexecvexecvpexecvpeexecle一共介绍七个函数 这里全都是以exec开头的 执行任何程序, 需要: 1.找到它 加载它(路劲加程序名) 2.怎么执行(例如ls,你想带什么选项呀,如 -l -a -d之类&a…...
AudioLDM-S极速音效生成:5分钟搞定游戏音效,小白也能当音效师
AudioLDM-S极速音效生成:5分钟搞定游戏音效,小白也能当音效师 1. 游戏音效制作的新纪元 想象一下这样的场景:你正在开发一款独立游戏,需要一个"科幻飞船引擎启动"的音效。传统方式可能需要花费数小时搜索音效库、购买…...
基于FPGA的伺服驱动系统:电流环控制与多环路反馈、SVPWM及编码器协议实现的研究
伺服驱动FPGA电流环,包含坐标变换,电流环,速度环,位置环,电机反馈接口,SVPWM,编码器协议,电流环和编码器协议都是FPGA里实现的伺服驱动系统里玩FPGA可不是闹着玩的,尤其是…...
Retinaface+CurricularFace模型在智能门禁系统中的实战应用
RetinafaceCurricularFace模型在智能门禁系统中的实战应用 1. 引言 想象一下这样的场景:每天早晨上班高峰期,办公楼入口排着长队等待刷卡进门;访客需要在前台登记身份证,保安还要手动核对信息。这种传统门禁方式不仅效率低下&am…...
打造掌机媒体中心:wiliwili跨设备播放全攻略
打造掌机媒体中心:wiliwili跨设备播放全攻略 【免费下载链接】wiliwili 专为手柄控制设计的第三方跨平台B站客户端,目前可以运行在PC全平台、PSVita、PS4 和 Nintendo Switch上 项目地址: https://gitcode.com/GitHub_Trending/wi/wiliwili 在移动…...
tao-8k部署避坑指南:Xinference日志排查、WebUI访问与调用验证
tao-8k部署避坑指南:Xinference日志排查、WebUI访问与调用验证 1. 环境准备与快速部署 在开始部署tao-8k模型之前,我们先来了解一下这个模型的基本情况。tao-8k是由Hugging Face开发者amu研发并开源的专业文本嵌入模型,它能够将文本转换为高…...
《C语言学习:判断语句if-else》5
写在前面:本笔记为个人学习各平台C语言系列课程所作,仅供交流学习,不得作他用。1. if基本用法if(/*条件*/){/*做法*/ } //如果满足条件,则做大括号中的事情圆括号中是条件,或者说一个表达式。当它是0,则不执…...
