SpringBoot整合RocketMQ
前言
在当今快速发展的软件开发领域,构建高效、稳定的应用系统是每个开发者的追求。Spring Boot 作为一款极具影响力的开发框架,凭借其强大的自动化配置和便捷的开发特性,极大地简化了项目搭建过程。使用 Spring Boot,我们无需再为框架之间的兼容性、适用版本等繁杂问题而烦恼。只需简单添加一个配置,就能轻松引入所需的各种功能和组件,实现快速开发。
一、技术介绍
1.1 消息队列
消息队列中间件是分布式系统中不可或缺的重要组件。在分布式系统中,各个服务之间相互独立又紧密协作,消息队列就像是一座桥梁,连接着不同的服务,实现它们之间的高效通信。其主要功能在于解决应用耦合、异步消息处理以及流量削峰等关键问题。
1.2 RocketMQ
RocketMQ 是一款基于队列模型的消息中间件,具有高性能、高可靠、高实时和分布式的显著特点。它采用 Java 语言开发,由阿里巴巴团队开发并开源。RocketMQ 的高性能体现在其能够支持高并发的消息读写操作,满足大规模分布式系统的性能需求。
二、使用步骤
2.1 引入 Maven 依赖
首先需要在项目的pom.xml文件中引入相关的 Maven 依赖。可以直接复制都是通用的
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.4.1</version><relativePath/>
</parent>
<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.1</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.8.0</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-common</artifactId><version>4.8.0</version></dependency>
</dependencies>
引入了rocketmq-spring-boot-starter依赖,它提供了 Spring Boot 与 RocketMQ 集成的相关功能。同时还引入了rocketmq-client和rocketmq-common依赖,它们分别包含了 RocketMQ 客户端的核心功能和公共工具类。
2.2 封装 RocketMQ 工具类
为了更方便地在项目中使用 RocketMQ 发送消息,作者封装了一个 RocketMQ 工具类,将常用的消息发送操作封装起来。
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;@Component
public class RocketMqHelper {private static final Logger LOG = LoggerFactory.getLogger(RocketMqHelper.class);/*** rocketmq模板注入*/@Autowiredprivate RocketMQTemplate rocketMQTemplate;@PostConstructpublic void init() {LOG.info("---RocketMq助手初始化---");}/*** 发送异步消息的基础方法** @param topic 消息Topic* @param message 消息实体* @param sendCallback 回调函数* @param timeout 超时时间* @param delayLevel 延迟消息的级别*/private void asyncSendBase(String topic, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) {if (topic == null || message == null) {LOG.error("发送异步消息时,topic或message不能为空");return;}if (sendCallback == null) {sendCallback = getDefaultSendCallBack();}rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout, delayLevel);}/*** 发送异步消息** @param topic 消息Topic* @param message 消息实体*/public void asyncSend(Enum topic, Message<?> message) {asyncSend(topic.name(), message, null, 0, 0);}/*** 发送异步消息** @param topic 消息Topic* @param message 消息实体* @param sendCallback 回调函数*/public void asyncSend(Enum topic, Message<?> message, SendCallback sendCallback) {asyncSend(topic.name(), message, sendCallback, 0, 0);}/*** 发送异步消息** @param topic 消息Topic* @param message 消息实体*/public void asyncSend(String topic, Message<?> message) {asyncSend(topic, message, null, 0, 0);}/*** 发送异步消息** @param topic 消息Topic* @param message 消息实体* @param sendCallback 回调函数*/public void asyncSend(String topic, Message<?> message, SendCallback sendCallback) {asyncSend(topic, message, sendCallback, 0, 0);}/*** 发送异步消息** @param topic 消息Topic* @param message 消息实体* @param sendCallback 回调函数* @param timeout 超时时间*/public void asyncSend(String topic, Message<?> message, SendCallback sendCallback, long timeout) {asyncSend(topic, message, sendCallback, timeout, 0);}/*** 发送异步消息** @param topic 消息Topic* @param message 消息实体* @param sendCallback 回调函数* @param timeout 超时时间* @param delayLevel 延迟消息的级别*/public void asyncSend(String topic, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) {asyncSendBase(topic, message, sendCallback, timeout, delayLevel);}/*** 发送顺序消息的基础方法** @param topic 消息Topic* @param message 消息实体* @param hashKey 哈希键* @param timeout 超时时间*/private void syncSendOrderlyBase(String topic, Message<?> message, String hashKey, long timeout) {if (topic == null || message == null || hashKey == null) {LOG.error("发送顺序消息时,topic、message或hashKey不能为空");return;}LOG.info("发送顺序消息,topic:{}, hashKey:{}, timeout:{}", topic, hashKey, timeout);rocketMQTemplate.syncSendOrderly(topic, message, hashKey, timeout);}/*** 发送顺序消息** @param topic 消息Topic* @param message 消息实体* @param hashKey 哈希键*/public void syncSendOrderly(Enum topic, Message<?> message, String hashKey) {syncSendOrderly(topic.name(), message, hashKey, 0);}/*** 发送顺序消息** @param topic 消息Topic* @param message 消息实体* @param hashKey 哈希键*/public void syncSendOrderly(String topic, Message<?> message, String hashKey) {syncSendOrderly(topic, message, hashKey, 0);}/*** 发送顺序消息** @param topic 消息Topic* @param message 消息实体* @param hashKey 哈希键* @param timeout 超时时间*/public void syncSendOrderly(String topic, Message<?> message, String hashKey, long timeout) {syncSendOrderlyBase(topic, message, hashKey, timeout);}/*** 默认CallBack函数** @return*/private SendCallback getDefaultSendCallBack() {return new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {LOG.info("---发送MQ成功---");}@Overridepublic void onException(Throwable throwable) {LOG.error("---发送MQ失败---", throwable);}};}@PreDestroypublic void destroy() {LOG.info("---RocketMq助手注销---");}
}
2.3 配置文件
在application.yml配置文件中,配置 RocketMQ 的相关参数,包括 Name Server 地址、生产者配置等配置信息,大部分也是通用的,端口号可以自己适配。
server:port: 8088
#rocketmq配置
rocketmq:name-server: 127.0.0.1:9876# 生产者配置 producer:isOnOff: on# 发送同一类消息的设置为同一个group,保证唯一group: rocketmq-groupgroupName: rocketmq-group# 服务地址namesrvAddr: 127.0.0.1:9876# 消息最大长度 默认1024*4(4M)maxMessageSize: 4096# 发送消息超时时间,默认3000sendMsgTimeout: 3000# 发送消息失败重试次数,默认2retryTimesWhenSendFailed: 2
2.4 单元测试
为了验证 RocketMQ 的集成是否成功,编写单元测试代码,发送消息并监听消息的接收情况。
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;@SpringBootTest
public class RocketMQTest {@Autowiredprivate RocketMqHelper rocketMqHelper;@Testpublic void testProducter() {Student stu= new Student();stu.setName("abc");stu.setScore(19);rocketMqHelper.asyncSend("STUDENT_ADD", MessageBuilder.withPayload(stu).build());}
}
异步发送student对象给RocketMQ的broker
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}", topic = "STUDENT_ADD")
public class PersonMqListener implements RocketMQListener<Person> {@Overridepublic void onMessage(Student stu) {System.out.println("接收到消息,开始消费..name:" + stu.getName() + ",age:" + stu.getAge());}
}
消费逻辑是重写RocketMQlistener的父类方法就行
相关文章:
SpringBoot整合RocketMQ
前言 在当今快速发展的软件开发领域,构建高效、稳定的应用系统是每个开发者的追求。Spring Boot 作为一款极具影响力的开发框架,凭借其强大的自动化配置和便捷的开发特性,极大地简化了项目搭建过程。使用 Spring Boot,我们无需再…...
深入理解 YUV Planar 和色度二次采样 —— 视频处理的核心技术
深入理解 YUV Planar 和色度二次采样 —— 视频处理的核心技术 在现代视频处理和编码中,YUV 颜色空间和**色度二次采样(Chroma Subsampling)**是两个非常重要的概念。它们的结合不仅能够显著减少视频数据量,还能在保持较高视觉质量的同时优化存储和传输效率。而 YUV Plana…...
项目顺利交付,几个关键阶段
年前离放假还有10天的时候,来了一个应急项目, 需要在放假前一天完成一个演示版本的项目,过年期间给甲方领导看。 本想的最后几天摸摸鱼,这么一来,非但摸鱼不了,还得加班。 还在虽然累,但也是…...
第七天 开始学习ArkTS基础,理解声明式UI编程思想
学习 ArkTS 的声明式 UI 编程思想是掌握 HarmonyOS 应用开发的核心基础。以下是一份简洁高效的学习指南,帮助你快速入门: 一、ArkTS 声明式 UI 核心思想 数据驱动 UI f(state):UI 是应用状态的函数,状态变化自动触发 UI 更新。单…...
windows C++ Fiber (协程)
协程,也叫微线程,多个协程在逻辑上是并发的,实际并发由用户控件。 在windows上引入了纤程(fiber)。 WinBase.h 中函数原型 #if(_WIN32_WINNT > 0x0400)// // Fiber begin //#pragma region Application Family or OneCore Family or Game…...
游戏引擎学习第89天
回顾 由于一直没有渲染器,终于决定开始动手做一个渲染器,虽然开始时并不确定该如何进行,但一旦开始做,发现这其实是正确的决定。因此,接下来可能会花一到两周的时间来编写渲染器,甚至可能更长时间…...
2025新鲜出炉--前端面试题(一)
文章目录 1. vue3有用过吗, 和vue2之间有哪些区别2. vue-router有几种路由, 分别怎么实现3. webpack和rollup这两个什么区别, 你会怎么选择4. 你能简单介绍一下webpack项目的构建流程吗5. webpack平时有手写过loader和plugin吗6. webpack这块你平时做过哪些优化吗?7…...
教程 | i.MX RT1180 ECAT_digital_io DEMO 搭建(一)
本文介绍 i.MX RT1180 EtherCAT digital io DEMO 搭建,Master 使用 TwinCAT ,由于步骤较多,分为上下两篇,本文为第一篇,主要介绍使用 TwinCAT 控制前的一些准备。 原厂 SDK 提供了 evkmimxrt1180_ecat_examples_digit…...
Pyecharts系列课程04——折线图/面积图(Line)
本章我们学习在Pyecharts中折线图(Line)的使用。折线图通用应用于数据的趋势分析。 折线图 我们现在有两组数据,x_data是2024年的月份,y_data为对应张三甲每个月的用电量。 # 家庭每月用电量趋势 x_data ["1月", &q…...
变压器-000000
最近一个项目是木田12V的充电器,要设计变压器,输出是12V,电压大于1.5A12.6*1.518.9W. 也就是可以将变压器当成初级输入的一个负载。输入端18.9W. 那么功率UI 。因为变压器的输入是线性上升的,所以电压为二份之一,也就是1/2*功率…...
凝思60重置密码
凝思系统重置密码 - 赛博狗尾草 - 博客园 问题描述 凝思系统进入单用户模式,在此模式下,用户可以访问修复错误配置的文件。也可以在此模式下安装显卡驱动,解决和已加载驱动的冲突问题。 适用范围 linx-6.0.60 linx-6.0.80 linx-6.0.100…...
linux——网络计算机{序列化及反序列化(JSON)(ifdef的用法)}
linux——网络(服务器的永久不挂——守护进程)-CSDN博客 目录 一、序列化与反序列化 1. 推荐 JSON 库 2. 使用 nlohmann/json 示例 安装方法 基础用法 输出结果 3. 常见操作 4. 其他库对比 5. 选择建议 二、ifdef宏的用法 基本语法 核心用途…...
【教程】docker升级镜像
转载请注明出处:小锋学长生活大爆炸[xfxuezhagn.cn] 如果本文帮助到了你,欢迎[点赞、收藏、关注]哦~ 目录 自动升级 手动升级 无论哪种方式,最重要的是一定要通过-v参数做数据的持久化! 自动升级 使用watchtower,可…...
迅为RK3568开发板篇OpenHarmony实操HDF驱动控制LED-编写应用APP
在应用代码中我们实现如下功能: 当应用程序启动后会获取命令行参数。如果命令行没有参数,LED 灯将循环闪烁;如果命令行带有参数,则根据传输的参数控制 LED 灯的开启或关闭。通过 HdfIoServiceBind 绑定 LED灯的 HDF 服务ÿ…...
python代码
python\main_script.py from multiprocessing import Process import subprocessdef call_script(args):# 创建一个新的进程来运行script_to_call.pyprocess Process(targetrun_script, args(args[0], args[1]))process.start()process2 Process(targetrun_script, args(arg…...
React 打印插件 -- react-to-print
一、安装依赖 npm install react-to-print 二、使用 import { useReactToPrint } from "react-to-print"; import React, { useRef, forwardRef } from react;const Content () > {const contentRef useRef(null);const reactToPrintFn useReactToPrint({ c…...
探索C语言简易计算器程序的实现与优化
在C语言编程学习中,实现一个简易计算器是一个常见且有趣的练习项目。它不仅能帮助我们巩固基本的语法知识,如函数、循环、分支结构,还能让我们深入理解程序设计的逻辑。接下来,我们将分析三段实现简易计算器功能的C语言代码&#…...
深入了解 MySQL:从基础到高级特性
引言 在当今数字化时代,数据的存储和管理至关重要。MySQL 作为一款广泛使用的开源关系型数据库管理系统(RDBMS),凭借其高性能、可靠性和易用性,成为众多开发者和企业的首选。本文将详细介绍 MySQL 的基础概念、安装启…...
OSPF基础(1):工作过程、状态机、更新
OSPF基础 1、技术背景(与RIP密不可分,因为RIP中存在的问题) RIP中存在最大跳数为15的限制,不能适应大规模组网周期性发送全部路由信息,占用大量的带宽资源以路由收敛速度慢以跳数作为度量值存在路由环路可能性每隔30秒…...
工业相机如何获得更好的图像色彩
如何获得更好的图像色彩 大部分的工业自动化检测中对物体的色彩信息并不敏感,因此会使用黑白的相机,但是在显微镜成像、颜色分类识别等领域,相机的色彩还原就显得格外重要,在调节相机色彩方面的参数时,有以下几个方面需…...
【Axure高保真原型】引导弹窗
今天和大家中分享引导弹窗的原型模板,载入页面后,会显示引导弹窗,适用于引导用户使用页面,点击完成后,会显示下一个引导弹窗,直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…...
智慧医疗能源事业线深度画像分析(上)
引言 医疗行业作为现代社会的关键基础设施,其能源消耗与环境影响正日益受到关注。随着全球"双碳"目标的推进和可持续发展理念的深入,智慧医疗能源事业线应运而生,致力于通过创新技术与管理方案,重构医疗领域的能源使用模式。这一事业线融合了能源管理、可持续发…...
Golang dig框架与GraphQL的完美结合
将 Go 的 Dig 依赖注入框架与 GraphQL 结合使用,可以显著提升应用程序的可维护性、可测试性以及灵活性。 Dig 是一个强大的依赖注入容器,能够帮助开发者更好地管理复杂的依赖关系,而 GraphQL 则是一种用于 API 的查询语言,能够提…...
【快手拥抱开源】通过快手团队开源的 KwaiCoder-AutoThink-preview 解锁大语言模型的潜力
引言: 在人工智能快速发展的浪潮中,快手Kwaipilot团队推出的 KwaiCoder-AutoThink-preview 具有里程碑意义——这是首个公开的AutoThink大语言模型(LLM)。该模型代表着该领域的重大突破,通过独特方式融合思考与非思考…...
鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院查看报告小程序
一、开发环境准备 工具安装: 下载安装DevEco Studio 4.0(支持HarmonyOS 5)配置HarmonyOS SDK 5.0确保Node.js版本≥14 项目初始化: ohpm init harmony/hospital-report-app 二、核心功能模块实现 1. 报告列表…...
零基础设计模式——行为型模式 - 责任链模式
第四部分:行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习!行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想:使多个对象都有机会处…...
精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南
精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南 在数字化营销时代,邮件列表效度、用户参与度和网站性能等指标往往决定着创业公司的增长成败。今天,我们将深入解析邮件打开率、网站可用性、页面参与时…...
力扣热题100 k个一组反转链表题解
题目: 代码: func reverseKGroup(head *ListNode, k int) *ListNode {cur : headfor i : 0; i < k; i {if cur nil {return head}cur cur.Next}newHead : reverse(head, cur)head.Next reverseKGroup(cur, k)return newHead }func reverse(start, end *ListNode) *ListN…...
uniapp 字符包含的相关方法
在uniapp中,如果你想检查一个字符串是否包含另一个子字符串,你可以使用JavaScript中的includes()方法或者indexOf()方法。这两种方法都可以达到目的,但它们在处理方式和返回值上有所不同。 使用includes()方法 includes()方法用于判断一个字…...
rknn toolkit2搭建和推理
安装Miniconda Miniconda - Anaconda Miniconda 选择一个 新的 版本 ,不用和RKNN的python版本保持一致 使用 ./xxx.sh进行安装 下面配置一下载源 # 清华大学源(最常用) conda config --add channels https://mirrors.tuna.tsinghua.edu.cn…...
