SpringBoot整合RocketMQ
前言
在当今快速发展的软件开发领域,构建高效、稳定的应用系统是每个开发者的追求。Spring Boot 作为一款极具影响力的开发框架,凭借其强大的自动化配置和便捷的开发特性,极大地简化了项目搭建过程。使用 Spring Boot,我们无需再为框架之间的兼容性、适用版本等繁杂问题而烦恼。只需简单添加一个配置,就能轻松引入所需的各种功能和组件,实现快速开发。
一、技术介绍
1.1 消息队列
消息队列中间件是分布式系统中不可或缺的重要组件。在分布式系统中,各个服务之间相互独立又紧密协作,消息队列就像是一座桥梁,连接着不同的服务,实现它们之间的高效通信。其主要功能在于解决应用耦合、异步消息处理以及流量削峰等关键问题。
1.2 RocketMQ
RocketMQ 是一款基于队列模型的消息中间件,具有高性能、高可靠、高实时和分布式的显著特点。它采用 Java 语言开发,由阿里巴巴团队开发并开源。RocketMQ 的高性能体现在其能够支持高并发的消息读写操作,满足大规模分布式系统的性能需求。
二、使用步骤
2.1 引入 Maven 依赖
首先需要在项目的pom.xml文件中引入相关的 Maven 依赖。可以直接复制都是通用的
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.4.1</version><relativePath/>
</parent>
<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.1</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.8.0</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-common</artifactId><version>4.8.0</version></dependency>
</dependencies>
引入了rocketmq-spring-boot-starter依赖,它提供了 Spring Boot 与 RocketMQ 集成的相关功能。同时还引入了rocketmq-client和rocketmq-common依赖,它们分别包含了 RocketMQ 客户端的核心功能和公共工具类。
2.2 封装 RocketMQ 工具类
为了更方便地在项目中使用 RocketMQ 发送消息,作者封装了一个 RocketMQ 工具类,将常用的消息发送操作封装起来。
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;@Component
public class RocketMqHelper {private static final Logger LOG = LoggerFactory.getLogger(RocketMqHelper.class);/*** rocketmq模板注入*/@Autowiredprivate RocketMQTemplate rocketMQTemplate;@PostConstructpublic void init() {LOG.info("---RocketMq助手初始化---");}/*** 发送异步消息的基础方法** @param topic 消息Topic* @param message 消息实体* @param sendCallback 回调函数* @param timeout 超时时间* @param delayLevel 延迟消息的级别*/private void asyncSendBase(String topic, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) {if (topic == null || message == null) {LOG.error("发送异步消息时,topic或message不能为空");return;}if (sendCallback == null) {sendCallback = getDefaultSendCallBack();}rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout, delayLevel);}/*** 发送异步消息** @param topic 消息Topic* @param message 消息实体*/public void asyncSend(Enum topic, Message<?> message) {asyncSend(topic.name(), message, null, 0, 0);}/*** 发送异步消息** @param topic 消息Topic* @param message 消息实体* @param sendCallback 回调函数*/public void asyncSend(Enum topic, Message<?> message, SendCallback sendCallback) {asyncSend(topic.name(), message, sendCallback, 0, 0);}/*** 发送异步消息** @param topic 消息Topic* @param message 消息实体*/public void asyncSend(String topic, Message<?> message) {asyncSend(topic, message, null, 0, 0);}/*** 发送异步消息** @param topic 消息Topic* @param message 消息实体* @param sendCallback 回调函数*/public void asyncSend(String topic, Message<?> message, SendCallback sendCallback) {asyncSend(topic, message, sendCallback, 0, 0);}/*** 发送异步消息** @param topic 消息Topic* @param message 消息实体* @param sendCallback 回调函数* @param timeout 超时时间*/public void asyncSend(String topic, Message<?> message, SendCallback sendCallback, long timeout) {asyncSend(topic, message, sendCallback, timeout, 0);}/*** 发送异步消息** @param topic 消息Topic* @param message 消息实体* @param sendCallback 回调函数* @param timeout 超时时间* @param delayLevel 延迟消息的级别*/public void asyncSend(String topic, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) {asyncSendBase(topic, message, sendCallback, timeout, delayLevel);}/*** 发送顺序消息的基础方法** @param topic 消息Topic* @param message 消息实体* @param hashKey 哈希键* @param timeout 超时时间*/private void syncSendOrderlyBase(String topic, Message<?> message, String hashKey, long timeout) {if (topic == null || message == null || hashKey == null) {LOG.error("发送顺序消息时,topic、message或hashKey不能为空");return;}LOG.info("发送顺序消息,topic:{}, hashKey:{}, timeout:{}", topic, hashKey, timeout);rocketMQTemplate.syncSendOrderly(topic, message, hashKey, timeout);}/*** 发送顺序消息** @param topic 消息Topic* @param message 消息实体* @param hashKey 哈希键*/public void syncSendOrderly(Enum topic, Message<?> message, String hashKey) {syncSendOrderly(topic.name(), message, hashKey, 0);}/*** 发送顺序消息** @param topic 消息Topic* @param message 消息实体* @param hashKey 哈希键*/public void syncSendOrderly(String topic, Message<?> message, String hashKey) {syncSendOrderly(topic, message, hashKey, 0);}/*** 发送顺序消息** @param topic 消息Topic* @param message 消息实体* @param hashKey 哈希键* @param timeout 超时时间*/public void syncSendOrderly(String topic, Message<?> message, String hashKey, long timeout) {syncSendOrderlyBase(topic, message, hashKey, timeout);}/*** 默认CallBack函数** @return*/private SendCallback getDefaultSendCallBack() {return new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {LOG.info("---发送MQ成功---");}@Overridepublic void onException(Throwable throwable) {LOG.error("---发送MQ失败---", throwable);}};}@PreDestroypublic void destroy() {LOG.info("---RocketMq助手注销---");}
}
2.3 配置文件
在application.yml配置文件中,配置 RocketMQ 的相关参数,包括 Name Server 地址、生产者配置等配置信息,大部分也是通用的,端口号可以自己适配。
server:port: 8088
#rocketmq配置
rocketmq:name-server: 127.0.0.1:9876# 生产者配置 producer:isOnOff: on# 发送同一类消息的设置为同一个group,保证唯一group: rocketmq-groupgroupName: rocketmq-group# 服务地址namesrvAddr: 127.0.0.1:9876# 消息最大长度 默认1024*4(4M)maxMessageSize: 4096# 发送消息超时时间,默认3000sendMsgTimeout: 3000# 发送消息失败重试次数,默认2retryTimesWhenSendFailed: 2
2.4 单元测试
为了验证 RocketMQ 的集成是否成功,编写单元测试代码,发送消息并监听消息的接收情况。
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;@SpringBootTest
public class RocketMQTest {@Autowiredprivate RocketMqHelper rocketMqHelper;@Testpublic void testProducter() {Student stu= new Student();stu.setName("abc");stu.setScore(19);rocketMqHelper.asyncSend("STUDENT_ADD", MessageBuilder.withPayload(stu).build());}
}
异步发送student对象给RocketMQ的broker
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}", topic = "STUDENT_ADD")
public class PersonMqListener implements RocketMQListener<Person> {@Overridepublic void onMessage(Student stu) {System.out.println("接收到消息,开始消费..name:" + stu.getName() + ",age:" + stu.getAge());}
}
消费逻辑是重写RocketMQlistener的父类方法就行
相关文章:
SpringBoot整合RocketMQ
前言 在当今快速发展的软件开发领域,构建高效、稳定的应用系统是每个开发者的追求。Spring Boot 作为一款极具影响力的开发框架,凭借其强大的自动化配置和便捷的开发特性,极大地简化了项目搭建过程。使用 Spring Boot,我们无需再…...
深入理解 YUV Planar 和色度二次采样 —— 视频处理的核心技术
深入理解 YUV Planar 和色度二次采样 —— 视频处理的核心技术 在现代视频处理和编码中,YUV 颜色空间和**色度二次采样(Chroma Subsampling)**是两个非常重要的概念。它们的结合不仅能够显著减少视频数据量,还能在保持较高视觉质量的同时优化存储和传输效率。而 YUV Plana…...
项目顺利交付,几个关键阶段
年前离放假还有10天的时候,来了一个应急项目, 需要在放假前一天完成一个演示版本的项目,过年期间给甲方领导看。 本想的最后几天摸摸鱼,这么一来,非但摸鱼不了,还得加班。 还在虽然累,但也是…...
第七天 开始学习ArkTS基础,理解声明式UI编程思想
学习 ArkTS 的声明式 UI 编程思想是掌握 HarmonyOS 应用开发的核心基础。以下是一份简洁高效的学习指南,帮助你快速入门: 一、ArkTS 声明式 UI 核心思想 数据驱动 UI f(state):UI 是应用状态的函数,状态变化自动触发 UI 更新。单…...
windows C++ Fiber (协程)
协程,也叫微线程,多个协程在逻辑上是并发的,实际并发由用户控件。 在windows上引入了纤程(fiber)。 WinBase.h 中函数原型 #if(_WIN32_WINNT > 0x0400)// // Fiber begin //#pragma region Application Family or OneCore Family or Game…...
游戏引擎学习第89天
回顾 由于一直没有渲染器,终于决定开始动手做一个渲染器,虽然开始时并不确定该如何进行,但一旦开始做,发现这其实是正确的决定。因此,接下来可能会花一到两周的时间来编写渲染器,甚至可能更长时间…...
2025新鲜出炉--前端面试题(一)
文章目录 1. vue3有用过吗, 和vue2之间有哪些区别2. vue-router有几种路由, 分别怎么实现3. webpack和rollup这两个什么区别, 你会怎么选择4. 你能简单介绍一下webpack项目的构建流程吗5. webpack平时有手写过loader和plugin吗6. webpack这块你平时做过哪些优化吗?7…...
教程 | i.MX RT1180 ECAT_digital_io DEMO 搭建(一)
本文介绍 i.MX RT1180 EtherCAT digital io DEMO 搭建,Master 使用 TwinCAT ,由于步骤较多,分为上下两篇,本文为第一篇,主要介绍使用 TwinCAT 控制前的一些准备。 原厂 SDK 提供了 evkmimxrt1180_ecat_examples_digit…...
Pyecharts系列课程04——折线图/面积图(Line)
本章我们学习在Pyecharts中折线图(Line)的使用。折线图通用应用于数据的趋势分析。 折线图 我们现在有两组数据,x_data是2024年的月份,y_data为对应张三甲每个月的用电量。 # 家庭每月用电量趋势 x_data ["1月", &q…...
变压器-000000
最近一个项目是木田12V的充电器,要设计变压器,输出是12V,电压大于1.5A12.6*1.518.9W. 也就是可以将变压器当成初级输入的一个负载。输入端18.9W. 那么功率UI 。因为变压器的输入是线性上升的,所以电压为二份之一,也就是1/2*功率…...
凝思60重置密码
凝思系统重置密码 - 赛博狗尾草 - 博客园 问题描述 凝思系统进入单用户模式,在此模式下,用户可以访问修复错误配置的文件。也可以在此模式下安装显卡驱动,解决和已加载驱动的冲突问题。 适用范围 linx-6.0.60 linx-6.0.80 linx-6.0.100…...
linux——网络计算机{序列化及反序列化(JSON)(ifdef的用法)}
linux——网络(服务器的永久不挂——守护进程)-CSDN博客 目录 一、序列化与反序列化 1. 推荐 JSON 库 2. 使用 nlohmann/json 示例 安装方法 基础用法 输出结果 3. 常见操作 4. 其他库对比 5. 选择建议 二、ifdef宏的用法 基本语法 核心用途…...
【教程】docker升级镜像
转载请注明出处:小锋学长生活大爆炸[xfxuezhagn.cn] 如果本文帮助到了你,欢迎[点赞、收藏、关注]哦~ 目录 自动升级 手动升级 无论哪种方式,最重要的是一定要通过-v参数做数据的持久化! 自动升级 使用watchtower,可…...
迅为RK3568开发板篇OpenHarmony实操HDF驱动控制LED-编写应用APP
在应用代码中我们实现如下功能: 当应用程序启动后会获取命令行参数。如果命令行没有参数,LED 灯将循环闪烁;如果命令行带有参数,则根据传输的参数控制 LED 灯的开启或关闭。通过 HdfIoServiceBind 绑定 LED灯的 HDF 服务ÿ…...
python代码
python\main_script.py from multiprocessing import Process import subprocessdef call_script(args):# 创建一个新的进程来运行script_to_call.pyprocess Process(targetrun_script, args(args[0], args[1]))process.start()process2 Process(targetrun_script, args(arg…...
React 打印插件 -- react-to-print
一、安装依赖 npm install react-to-print 二、使用 import { useReactToPrint } from "react-to-print"; import React, { useRef, forwardRef } from react;const Content () > {const contentRef useRef(null);const reactToPrintFn useReactToPrint({ c…...
探索C语言简易计算器程序的实现与优化
在C语言编程学习中,实现一个简易计算器是一个常见且有趣的练习项目。它不仅能帮助我们巩固基本的语法知识,如函数、循环、分支结构,还能让我们深入理解程序设计的逻辑。接下来,我们将分析三段实现简易计算器功能的C语言代码&#…...
深入了解 MySQL:从基础到高级特性
引言 在当今数字化时代,数据的存储和管理至关重要。MySQL 作为一款广泛使用的开源关系型数据库管理系统(RDBMS),凭借其高性能、可靠性和易用性,成为众多开发者和企业的首选。本文将详细介绍 MySQL 的基础概念、安装启…...
OSPF基础(1):工作过程、状态机、更新
OSPF基础 1、技术背景(与RIP密不可分,因为RIP中存在的问题) RIP中存在最大跳数为15的限制,不能适应大规模组网周期性发送全部路由信息,占用大量的带宽资源以路由收敛速度慢以跳数作为度量值存在路由环路可能性每隔30秒…...
工业相机如何获得更好的图像色彩
如何获得更好的图像色彩 大部分的工业自动化检测中对物体的色彩信息并不敏感,因此会使用黑白的相机,但是在显微镜成像、颜色分类识别等领域,相机的色彩还原就显得格外重要,在调节相机色彩方面的参数时,有以下几个方面需…...
网络编程(Modbus进阶)
思维导图 Modbus RTU(先学一点理论) 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议,由 Modicon 公司(现施耐德电气)于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…...
利用ngx_stream_return_module构建简易 TCP/UDP 响应网关
一、模块概述 ngx_stream_return_module 提供了一个极简的指令: return <value>;在收到客户端连接后,立即将 <value> 写回并关闭连接。<value> 支持内嵌文本和内置变量(如 $time_iso8601、$remote_addr 等)&a…...
线程同步:确保多线程程序的安全与高效!
全文目录: 开篇语前序前言第一部分:线程同步的概念与问题1.1 线程同步的概念1.2 线程同步的问题1.3 线程同步的解决方案 第二部分:synchronized关键字的使用2.1 使用 synchronized修饰方法2.2 使用 synchronized修饰代码块 第三部分ÿ…...
Java - Mysql数据类型对应
Mysql数据类型java数据类型备注整型INT/INTEGERint / java.lang.Integer–BIGINTlong/java.lang.Long–––浮点型FLOATfloat/java.lang.FloatDOUBLEdouble/java.lang.Double–DECIMAL/NUMERICjava.math.BigDecimal字符串型CHARjava.lang.String固定长度字符串VARCHARjava.lang…...
华为OD机试-食堂供餐-二分法
import java.util.Arrays; import java.util.Scanner;public class DemoTest3 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseint a in.nextIn…...
如何将联系人从 iPhone 转移到 Android
从 iPhone 换到 Android 手机时,你可能需要保留重要的数据,例如通讯录。好在,将通讯录从 iPhone 转移到 Android 手机非常简单,你可以从本文中学习 6 种可靠的方法,确保随时保持连接,不错过任何信息。 第 1…...
【git】把本地更改提交远程新分支feature_g
创建并切换新分支 git checkout -b feature_g 添加并提交更改 git add . git commit -m “实现图片上传功能” 推送到远程 git push -u origin feature_g...
拉力测试cuda pytorch 把 4070显卡拉满
import torch import timedef stress_test_gpu(matrix_size16384, duration300):"""对GPU进行压力测试,通过持续的矩阵乘法来最大化GPU利用率参数:matrix_size: 矩阵维度大小,增大可提高计算复杂度duration: 测试持续时间(秒&…...
JDK 17 新特性
#JDK 17 新特性 /**************** 文本块 *****************/ python/scala中早就支持,不稀奇 String json “”" { “name”: “Java”, “version”: 17 } “”"; /**************** Switch 语句 -> 表达式 *****************/ 挺好的ÿ…...
Android15默认授权浮窗权限
我们经常有那种需求,客户需要定制的apk集成在ROM中,并且默认授予其【显示在其他应用的上层】权限,也就是我们常说的浮窗权限,那么我们就可以通过以下方法在wms、ams等系统服务的systemReady()方法中调用即可实现预置应用默认授权浮…...
