spring boot 集成rocketmq
集成Spring Boot和RocketMQ
在现代的微服务架构中,消息队列已经成为一种常见的异步处理模式,它能解决服务间的同步调用、耦合度高、流量高峰等问题。RocketMQ是阿里巴巴开源的一款消息中间件,性能优秀,功能齐全,被广泛应用在各种业务场景。
本文将详细介绍如何在Spring Boot项目中集成RocketMQ,实现消息的生产和消费。
开发环境
- JDK 1.8 或更高
- RocketMQ 4.8.0 或更高
- Spring Boot 2.3.1.RELEASE 或更高
- Maven 3.0 或更高
RocketMQ服务器部署
首先,我们需要在本地或服务器上部署RocketMQ。具体的部署步骤可以参考RocketMQ官方文档。为了简化部署,我们可以使用Docker进行部署。
Spring Boot项目创建
我们使用Spring Initializr创建一个新的Spring Boot项目,选择Web、Lombok和RocketMQ Spring Boot Starter为项目依赖。
pom.xml示例:
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
</dependencies>
配置RocketMQ
在application.properties文件中配置RocketMQ的服务器地址和其他相关参数。
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group
在这里,rocketmq.name-server是RocketMQ服务器的地址,rocketmq.producer.group是生产者的组名。
消息生产者
接下来,我们创建一个消息生产者。在Spring Boot项目中,我们可以使用RocketMQTemplate来发送消息。
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class ProducerController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/send")public String send(String message) {rocketMQTemplate.convertAndSend("test-topic", message);return "Message: '" + message + "' sent.";}
}
上述代码中,我们创建了一个RESTful接口/send,当接口被调用时,它将发送一个消息到test-topic主题。
消息消费者
接下来,我们创建一个消息消费者。在Spring Boot项目中,我们可以使用@RocketMQMessageListener注解来定义一个消息消费者。
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "my-consumer_test-topic")
public class ConsumerService implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.printf("------- StringConsumer received: %s \n", message);}
}
上述代码中,我们定义了一个消息消费者,它将监听test-topic主题的消息,当有新的消息时,它将打印消息内容。
测试
至此,我们已经完成了Spring Boot集成RocketMQ的所有代码。接下来,我们就可以运行Spring Boot项目,并通过访问/send接口来发送消息,查看控制台的输出来验证消息消费者是否可以正常接收消息。
这就是Spring Boot集成RocketMQ的全过程。RocketMQ作为一款功能强大的消息中间件,不仅支持基本的消息生产和消费,还支持许多高级特性,如事务消息、顺序消息、延迟消息等。在实际的项目开发中,我们可以根据业务需求选择合适的消息模型,提高系统的可用性和可靠性。
事务消息
RocketMQ支持发送事务消息,也就是说,在发送消息的同时,我们可以执行本地的数据库操作,只有当本地的数据库操作成功时,消息才会真正被发送出去。
下面是一个发送事务消息的例子:
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.concurrent.*;@RestController
public class TransactionProducerController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/sendTransaction")public String sendTransaction(String message) {ExecutorService executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(5000), r -> {Thread thread = new Thread(r);thread.setName("client-transaction-msg-check-thread");return thread;});TransactionListener transactionListener = new TransactionListenerImpl();TransactionMQProducer producer = rocketMQTemplate.createAndStartTransactionMQProducer("transaction-group",transactionListener,executor);producer.sendMessageInTransaction("test-topic", "TagA", message, null);return "Transaction Message: '" + message + "' sent.";}
}
在上述代码中,我们创建了一个TransactionMQProducer,并设置了一个TransactionListener来处理事务的提交和回滚。当发送事务消息时,我们需要调用sendMessageInTransaction方法。
顺序消息
RocketMQ支持发送顺序消息,也就是说,消息会按照发送的顺序被消费。
下面是一个发送顺序消息的例子:
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.messaging.support.MessageBuilder;@RestController
public class OrderlyProducerController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/sendOrderly")public String sendOrderly(String message) {for (int i = 0; i < 100; i++) {rocketMQTemplate.syncSendOrderly("orderly_topic", MessageBuilder.withPayload(message + i).build(), "hashkey");}return "Orderly Message: '" + message + "' sent.";}
}
在上述代码中,我们调用syncSendOrderly方法发送顺序消息。该方法的第三个参数是hashkey,RocketMQ会根据这个key来决定消息发送到哪个队列,具有相同hashkey的消息会发送到同一个队列。
延迟消息
RocketMQ支持发送延迟消息,也就是说,消息不会立即被消费,而是会在指定的时间后被消费。
下面是一个发送延迟消息的例子:
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.messaging.support.MessageBuilder;@RestController
public class DelayProducerController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/sendDelay")public String sendDelay(String message) {rocketMQTemplate.syncSend("delay_topic", MessageBuilder.withPayload(message).build(), 1000, 4);return "Delay Message: '" + message + "' sent.";}
}
在上述代码中,我们调用syncSend方法发送延迟消息。该方法的第三个参数是延迟时间,第四个参数是延迟级别。
以上就是Spring Boot集成RocketMQ的详细步骤和示例代码,希望对大家有所帮助。
相关文章:
spring boot 集成rocketmq
集成Spring Boot和RocketMQ 在现代的微服务架构中,消息队列已经成为一种常见的异步处理模式,它能解决服务间的同步调用、耦合度高、流量高峰等问题。RocketMQ是阿里巴巴开源的一款消息中间件,性能优秀,功能齐全,被广泛…...
redis Hash类型命令
Redis中的Hash类型有多个常用命令可用于对Hash键进行操作。以下是一些常见的Redis Hash类型命令: HSET:设置Hash字段的值。 它将指定字段与相应的值关联起来,如果字段已经存在,则更新其值,如果字段不存在,…...
P1194 买礼物(最小生成树)(内附封面)
买礼物 题目描述 又到了一年一度的明明生日了,明明想要买 B B B 样东西,巧的是,这 B B B 样东西价格都是 A A A 元。 但是,商店老板说最近有促销活动,也就是: 如果你买了第 I I I 样东西࿰…...
oracle基础语法和备份恢复
Oracle总结 sql命令分类 1.DDL,数据定义语言,create创建/drop销毁 2.DCL,数据库控制语言,grant授权/revoke撤销 3.DML,数据操纵语言,insert/update/delete等sql语句 4.DQL,数据查询语言&am…...
【MATLAB第66期】#源码分享 | 基于MATLAB的PAWN全局敏感性分析模型(有条件参数和无条件参数)
【MATLAB第66期】#源码分享 | 基于MATLAB的PAWN全局敏感性分析模型(有条件参数和无条件参数) 文献参考 Pianosi, F., Wagener, T., 2015. A simple and efficient method for global sensitivity analysis based on cumulative distribution functions.…...
vue2过渡vue3技术差异点指南
基础点 reactive() 定义响应式变量(仅仅引用类型有效:对象数组map,set):reactive(),类似于data中return的数据 例子: import { reactive } from vueexport default {setup() {const state reactive({ count: 0 })function in…...
两个多选框(select)之间值的左右上下移动
<!DOCTYPE html> <html> <head><meta charset"utf-8"><title>两个多选框(select)之间值的左右上下移动</title> </head> <script src"https://cdn.bootcss.com/jquery/3.3.1/jquery.js"></script>&…...
【设计模式】——模板模式
什么是模板模式? 模板方法模式(Template Method Pattern),又叫模板模式(Template Pattern),在一个抽象类公开定义了执行它的方法的模板。它的子类可以按需要重写方法实现,但调用将以抽象类中定义的方式进行…...
工业机器视觉系统开发流程简介
需求分析和系统设计:与用户合作,明确系统的功能和性能需求,并设计系统的整体架构。 软、硬件选型:根据需求分析结果,选择适合的软、硬件设备,包括光学传感器、相机、光源、图像采集设备、处理器等。 软件…...
【Unity3D】Renderer Feature简介
1 3D 项目迁移到 URP 项目后出现的问题 3D 项目迁移至 URP 项目后,会出现很多渲染问题,如:材质显示异常、GL 渲染不显示、多 Pass 渲染异常、屏幕后处理异常等问题。下面将针对这些问题给出一些简单的解决方案。 URP 官方教程和 API 详见→Un…...
麻了!包含中科院TOP,共16本期刊被标记为“On Hold”状态!
近日,小编从科睿唯安旗下的“Master Journal List”官网查到,除了知名老牌期刊Chemosphere竟然被标记为“On Hold”状态,目前共有7本SCI期刊,1本SSCI期刊,8本ESCI期刊被标记为“On Hold”,究竟是怎么回事呢…...
2.Flink应用
2.1 数据流 DataStream:DataStream是Flink数据流的核心抽象,其上定义了对数据流的一系列操作DataStreamSource:DataStreamSource 是 DataStream 的 起 点 , DataStreamSource 在StreamExecutionEnvironment 中 创 建 ,…...
Matlab进阶绘图第25期—三维密度散点图
三维密度散点图本质上是一种特征渲染的三维散点图,其颜色表示某一点所在区域的密度信息。 除了作图,三维密度散点图绘制的关键还在于密度的计算。 当然,不管是作图还是密度的计算,这些在《Matlab论文插图绘制模板》和《Matlab点…...
C++设计模式之桥接设计模式
文章目录 C桥接设计模式什么是桥接设计模式该模式有什么优缺点优点缺点 如何使用 C桥接设计模式 什么是桥接设计模式 桥接设计模式是一种结构型设计模式,它可以将抽象接口和实现分离开来,以便它们可以独立地变化和扩展。 该模式有什么优缺点 优点 灵…...
论文笔记:SUPERVISED CONTRASTIVE REGRESSION
2022arxiv的论文,没有中,但一作是P大图班本MIT博,可信度应该还是可以的 0 摘要 深度回归模型通常以端到端的方式进行学习,不明确尝试学习具有回归意识的表示。 它们的表示往往是分散的,未能捕捉回归任务的连续性质。…...
Java 多线程并发 CAS 技术详解
一、CAS概念和应用背景 CAS的作用和用途 CAS(Compare and Swap)是一种并发编程中常用的技术,用于解决多线程环境下的并发访问问题。CAS操作是一种原子操作,它可以提供线程安全性,避免了使用传统锁机制所带来的性能开…...
如何压缩高清PDF文件大小?将PDF文件压缩到最小的三个方法
PDF格式是一种非常常用的文档格式,但是有时候我们需要将PDF文件压缩为更小的大小以便于传输和存储。在本文中,我们将介绍三种PDF压缩的方法,包括在线PDF压缩、利用软件PDF压缩以及使用WPS缩小pdf。 首先,在线PDF压缩是最常用的方…...
04 统计语言模型(n元语言模型)
博客配套视频链接: https://space.bilibili.com/383551518?spm_id_from=333.1007.0.0 b 站直接看 配套 github 链接:https://github.com/nickchen121/Pre-training-language-model 配套博客链接:https://www.cnblogs.com/nickchen121/p/15105048.html 预训练 预先训练 我们…...
Linux各目录详解
Linux文件系统是一个树状结构,由多个目录(或文件夹)组成。以下是常见的Linux目录及其功能的详细解释: /(根目录):在Linux文件系统中,所有其他目录和文件都是从根目录派生的。所有的存…...
【css】属性选择器分类
属性选择器类型示例说明[attribute][target]选择带有 target 属性的所有元素[attributevalue][target_blank]选择带有 target“_blank” 属性的所有元素[attribute~value][title~flower]选择带有包含 “flower” 一词的 title 属性的所有元素[attribute|value][lang|en]选择带有…...
python打卡day49
知识点回顾: 通道注意力模块复习空间注意力模块CBAM的定义 作业:尝试对今天的模型检查参数数目,并用tensorboard查看训练过程 import torch import torch.nn as nn# 定义通道注意力 class ChannelAttention(nn.Module):def __init__(self,…...
【Oracle APEX开发小技巧12】
有如下需求: 有一个问题反馈页面,要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据,方便管理员及时处理反馈。 我的方法:直接将逻辑写在SQL中,这样可以直接在页面展示 完整代码: SELECTSF.FE…...
如何将联系人从 iPhone 转移到 Android
从 iPhone 换到 Android 手机时,你可能需要保留重要的数据,例如通讯录。好在,将通讯录从 iPhone 转移到 Android 手机非常简单,你可以从本文中学习 6 种可靠的方法,确保随时保持连接,不错过任何信息。 第 1…...
土地利用/土地覆盖遥感解译与基于CLUE模型未来变化情景预测;从基础到高级,涵盖ArcGIS数据处理、ENVI遥感解译与CLUE模型情景模拟等
🔍 土地利用/土地覆盖数据是生态、环境和气象等诸多领域模型的关键输入参数。通过遥感影像解译技术,可以精准获取历史或当前任何一个区域的土地利用/土地覆盖情况。这些数据不仅能够用于评估区域生态环境的变化趋势,还能有效评价重大生态工程…...
【学习笔记】深入理解Java虚拟机学习笔记——第4章 虚拟机性能监控,故障处理工具
第2章 虚拟机性能监控,故障处理工具 4.1 概述 略 4.2 基础故障处理工具 4.2.1 jps:虚拟机进程状况工具 命令:jps [options] [hostid] 功能:本地虚拟机进程显示进程ID(与ps相同),可同时显示主类&#x…...
【Oracle】分区表
个人主页:Guiat 归属专栏:Oracle 文章目录 1. 分区表基础概述1.1 分区表的概念与优势1.2 分区类型概览1.3 分区表的工作原理 2. 范围分区 (RANGE Partitioning)2.1 基础范围分区2.1.1 按日期范围分区2.1.2 按数值范围分区 2.2 间隔分区 (INTERVAL Partit…...
Device Mapper 机制
Device Mapper 机制详解 Device Mapper(简称 DM)是 Linux 内核中的一套通用块设备映射框架,为 LVM、加密磁盘、RAID 等提供底层支持。本文将详细介绍 Device Mapper 的原理、实现、内核配置、常用工具、操作测试流程,并配以详细的…...
RSS 2025|从说明书学习复杂机器人操作任务:NUS邵林团队提出全新机器人装配技能学习框架Manual2Skill
视觉语言模型(Vision-Language Models, VLMs),为真实环境中的机器人操作任务提供了极具潜力的解决方案。 尽管 VLMs 取得了显著进展,机器人仍难以胜任复杂的长时程任务(如家具装配),主要受限于人…...
python爬虫——气象数据爬取
一、导入库与全局配置 python 运行 import json import datetime import time import requests from sqlalchemy import create_engine import csv import pandas as pd作用: 引入数据解析、网络请求、时间处理、数据库操作等所需库。requests:发送 …...
对象回调初步研究
_OBJECT_TYPE结构分析 在介绍什么是对象回调前,首先要熟悉下结构 以我们上篇线程回调介绍过的导出的PsProcessType 结构为例,用_OBJECT_TYPE这个结构来解析它,0x80处就是今天要介绍的回调链表,但是先不着急,先把目光…...
