当前位置: 首页 > news >正文

【Spring Boot】Spring Boot 集成 RocketMQ 实现简单的消息发送和消费

文章目录

  • 前言
  • 基本概念
    • 消息和主题相关
    • 发送普通消息
  • 发送顺序消息
  • RocketMQTemplate的API介绍
  • 参考资料:

前言

本文主要有以下内容:

  • 简单消息的发送
  • 顺序消息的发送
  • RocketMQTemplate的API介绍

环境搭建:
RocketMQ的安装教程:在官网上下载bin文件,解压到本地,并配置环境变量,如下图所示:
在这里插入图片描述

在 Spring boot 项目中引入 RocketMQ 依赖:

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version>
</dependency>

在application.yml增加相关配置:

server:port: 10001
rocketmq:name-server: 127.0.0.1:9876producer:group: springboot_produce_group # 必须指定groupsend-message-timeout: 3000 # 消息发送超时时长,默认3sretry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2consumer:group: springboot_consumer_group

在 Spring Boot 中使用RocketMQ很简单直接注入RocketMQTemplate对象即可:

@Resource
private RocketMQTemplate rocketMQTemplate;

基本概念

消息和主题相关

消息 message:通信交互的载体,分为事务消息,半事务消息,延迟消息,顺序消息等。
主题 topic:一类消息的集合,逻辑概念。
队列 queue:主题由一个队列或者多个队列构成,当消息发送到某一个主题时,需要选择某一个队列。
偏移量 offset:消息追加到主题的队列后会分配一个数值,表示该队列的几条消息。
消费者相关:
消费组 consume group:消费组用于订阅主题消费消息,可以订阅多个主题,一个消费组可以有多个消费者。
广播模式:同一个消费组内的所有消费者都会消费订阅主题的所有消息。即一条消息会被该消费者组的所有消费者消费。
集群模式:同一个消费组内的所有消费者只消费订阅主题的一部分消息,即一条消息只会被改消费组的一个消费者消费。
并发消费:同一个队列的消息由多线程消费且不保证消息的顺序。
顺序消费:保证同一队列的消息按顺序消费。

发送普通消息

创建MsgController,代码如下:

@RestController
@RequestMapping("send/")
@CrossOrigin(allowedHeaders = "*", origins = "*")
@Slf4j
public class MsgController {@Resourceprivate RocketMQTemplate rocketMQTemplate;@GetMapping("normal")public void sendNormalMsg() {Message<String> msg = MessageBuilder.withPayload("Hello,RocketMQ Normal_msg").build();rocketMQTemplate.send("normal_msg", msg);}
}

创建消息的消费者,只需要实现RocketMQListener接口中的方法即可,代码如下:

@Component
@RocketMQMessageListener(topic = "normal_msg", consumerGroup = "consumer_normal")
@Slf4j
public class NormalMsgConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("Receive Normal Msg: {}",message);}
}

@RocketMQMessageListener注解用在消费者类上,指定当前类消费的主题。

topic:指定消费者的主题 comsumerGroup:指定消费者组(Consumer Group)名称,用于区分不同的消费者。

启动项目,运行结果如下图所示:
在这里插入图片描述

发送顺序消息

顺序消息:保证同一队列的消息按顺序消费。
在MsgController 中添加如下代码:

@GetMapping("order")
public void sendOrderMsg(){
​log.info("开始发送顺序消息");for (int j = 0; j < 10; j++) {Message<String> sendOrderMsg = MessageBuilder.withPayload("Send Order Msg = " + j + " time: "+ LocalDateTime.now()).build();try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}rocketMQTemplate.convertAndSend("msg:order", sendOrderMsg);}log.info("顺序消息发送结束");
}

创建对应topic消息的消费者,代码如下所示:

@Component
@RocketMQMessageListener(topic = "msg",consumerGroup = "consumer_order_group",selectorExpression = "order",messageModel = MessageModel.CLUSTERING,selectorType = SelectorType.TAG)
@Slf4j
public class OrderMsgConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("Receive Order Msg: {}",message);}
}

@RocketMQMessageListener其他属性介绍:

  • selectorExpression: 消息选择表达式,用于过滤消息,只有满足表达式条件的消息才会被消费。默认值为 *,表示订阅所有消息。

全匹配:*,默认值。
属性匹配:指定tag = ‘tagName’,上面的代码就可以改写为"tag = ‘order’"
表达式匹配:需要指定selectType = SelectorType.SQL92,见下面。

  • selectorType:指明了消息选择通过tag的方式,默认值SelectorType.TAG。可选值有SelectorType.SQL92

TAG:支持"tagName"的方式配置,如果有多个标签则用||进行连接
SQL92:关键字有AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL。支持的数据类型有Boolean, String, Decimal, Float number等。使用方式如(a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)

  • messageModel:消息模式,可选值为 MessageModel.CLUSTERING(默认)或 MessageModel.BROADCASTING,分别表示集群模式和广播模式。

重新启动项目,运行结果如下图所示:
在这里插入图片描述

RocketMQTemplate的API介绍

在上面的api使用中,都没有去关注是否消息发送的状态,如是否成功,发送到了哪一个队列等。接下来就介绍一下相关API的使用

带返回值的发送普通消息SendResult syncSend(String destination, Message<?> message);

在MsgController添加如下代码:

@GetMapping("normal_result")
public void sendNormalResultMsg() {Message<String> msg = MessageBuilder.withPayload("normal_return_result").build();SendResult normalMsg = rocketMQTemplate.syncSend("normal_msg", msg);log.info("normalMsg = {}",normalMsg);
}

在这里插入图片描述

如log所示,可以看到发送状态等信息。

发送异步消息,在MsgController中添加如下代码:

@GetMapping("callback")
public void sendNormalResultMsgWithCallback(){Message<String> msg = MessageBuilder.withPayload("normal_return_result").build();rocketMQTemplate.asyncSend("normal_msg", msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("success");}@Overridepublic void onException(Throwable throwable) {log.info("error");}});
}

运行结果如下所示:
在这里插入图片描述

发送顺序消息:在第二部分以及展示过了也可以用如下代码替换

rocketMQTemplate.convertAndSend("msg:order", sendOrderMsg);
// 替换为
rocketMQTemplate.syncSendOrderly("msg:order", sendOrderMsg,String.valueOf(j));

发送单向消息

@GetMapping("oneway")
public void  sendOneWay(){Message<String> oneWay = MessageBuilder.withPayload("Send Order Msg = " + " time: "+ LocalDateTime.now()).build();rocketMQTemplate.sendOneWay("normal_msg",oneWay);
}

运行结果如下图所示:
在这里插入图片描述

发送事务消息:暂不举例,后续补充
发送事务消息带回调:和syncSend()类似,后续补充相关用法。

参考资料:

  • 《RocketMQ 实战》

相关文章:

【Spring Boot】Spring Boot 集成 RocketMQ 实现简单的消息发送和消费

文章目录 前言基本概念消息和主题相关发送普通消息 发送顺序消息RocketMQTemplate的API介绍参考资料&#xff1a; 前言 本文主要有以下内容&#xff1a; 简单消息的发送顺序消息的发送RocketMQTemplate的API介绍 环境搭建&#xff1a; RocketMQ的安装教程&#xff1a;在官网…...

uniapp:图片验证码检验问题处理

图形验证码功能实现 uniapp&#xff1a;解决图形验证码问题及利用arraybuffer二进制转base64格式图片&#xff08;后端传的图片数据形式&#xff1a;x00\x10JFIF\x00\x01\x02\x00…&#xff09;_❆VE❆的博客-CSDN博客 UI稿&#xff1a; 需求&#xff1a;向后端请求验证码图片&…...

将Visio和Excel导出成没有白边的PDF文件

1、VISIO如何无白边导出pdf格式 在使用Latex时&#xff0c;要导入矢量图eps格式。但是VISIO无法输出eps格式&#xff0c;这就需要将其导出为pdf。但是导出pdf时&#xff0c;往往会有大量的白边。VISIO无白边导出pdf格式的方法如下&#xff1a; 1.文件——开发工具——显示sha…...

String类及其工具类

一、String类 1.字符串对象 String str new String("hello");String对象是final修饰的&#xff0c;不可修改的&#xff0c;修改后的字符串对象是另外一个对象&#xff0c;只是修改了引用地址。每次创建都会创建一个新的对象。 2. 字面量 String s "hello&…...

踩坑(5)整合kafka 报错 java.net.UnknownHostException: 不知道这样的主机

java.net.UnknownHostException: 不知道这样的主机。 (5c0c3c629db9)at java.base/java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) ~[na:na]at java.base/java.net.InetAddress$PlatformNameService.lookupAllHostAddr(InetAddress.java:933) ~[na:na]at java.ba…...

rust持续学习 get_or_insert_with

通常使用一个值 if(xnull)xsome_valid_value 忽然今天看见一段代码 pub fn get_id() -> u64 { let mut res struct.data.borrow_mut(); *res.get_or_insert_with(||{let mut xx ...... some logiclet id xx.id; id}); }感觉这个名字蛮奇怪的 insert 然后翻了一下代码&a…...

卡尔曼滤波 | Matlab实现无迹kalman滤波仿真

文章目录 效果一览文章概述研究内容程序设计参考资料效果一览 文章概述 卡尔曼滤波 | Matlab实现无迹kalman滤波仿真 研究内容 无迹kalman滤波(UKF)不是采用的将非线性函数线性化的做法。无迹kalman仍然采用的是线性kalman滤波的架构,对于一步预测方程,使用无迹变换(UT)来…...

C++---list常用接口和模拟实现

list---模拟实现 list的简介list函数的使用构造函数迭代器的使用list的capacitylist element accesslist modifiers list的模拟实现构造函数&#xff0c;拷贝构造函数和迭代器begin和endinsert和eraseclear和析构函数 源码 list的简介 list是用双向带头联表实现的一个容器&…...

[openCV]基于赛道追踪的智能车巡线方案V1

import cv2 as cv import os import numpy as npimport time# 遍历文件夹函数 def getFileList(dir, Filelist, extNone):"""获取文件夹及其子文件夹中文件列表输入 dir&#xff1a;文件夹根目录输入 ext: 扩展名返回&#xff1a; 文件路径列表""&quo…...

SpringIoc-个人学习笔记

Spring的Ioc、DI、AOP思想 Ioc Ioc思想&#xff1a;Inversion of Control&#xff0c;控制反转&#xff0c;在创建Bean的权利反转给第三方 DI DI思想&#xff1a;Dependency Injection&#xff0c;依赖注入&#xff0c;强调Bean之间的关系&#xff0c;这种关系由第三方负责去设…...

【一文搞懂泛型】

3.3泛型 3.3.1泛型出现的背景 泛型出现的背景有两点&#xff1a; 第一点是在集合容器中&#xff0c;如果没有指定对应类型的话&#xff0c;那么底层的元素就是object&#xff0c;要对容器中的元素进行存取的时候&#xff0c;取出来的同时需要进行类型转换&#xff0c;如果有…...

概念解析 | 利用MIMO雷达技术实现高性能目标检测的关键技术解析

注1:本文系“概念解析”系列之一,致力于简洁清晰地解释、辨析复杂而专业的概念。本次辨析的概念是:MIMO雷达目标检测技术 参考资料:何子述, 程子扬, 李军, 等. 集中式 MIMO 雷达研究综述[J]. 雷达学报, 2022, 11(5): 805-829. 利用MIMO雷达技术实现高性能目标检测的关键技术解…...

Grafana制作图表-自定义Flink监控图表

简要 有时候我们在官网的Grafana下载的图表是这样的&#xff0c;如下图 #算子的处理时间&#xff0c;就是处理数据的延迟数据抓取&#xff0c;这个的说明看下下面的文章 metrics.latency.interval: 60 metrics.reporter.promgateway.class: org.apache.flink.metrics.prometh…...

【TypeScript】初识TypeScript和变量类型介绍

TypeScript 1&#xff0c;TypeScript是什么?2&#xff0c;类型的缺失带来的影响3&#xff0c;Ts搭建环境-本博主有专门的文章专说明这个4&#xff0c;使用tsc对ts文件进行编译5&#xff0c;TS运行初体验简化Ts运行步骤解决方案1解决方案2&#xff08;常见&#xff09; 开始学习…...

阿里云瑶池 PolarDB 开源官网焕新升级上线

导读近日&#xff0c;阿里云开源云原生数据库 PolarDB 官方网站全新升级上线。作为 PolarDB 开源项目与开发者、生态伙伴、用户沟通的平台&#xff0c;将以开放、共享、促进交流为宗旨&#xff0c;打造开放多元的环境&#xff0c;以实现共享共赢的目标。 立即体验全新官网&…...

泡水书为什么不能再出售

近日&#xff0c;京津冀持续强降雨&#xff0c;多家出版机构位于涿州等地的图书库房受到影响。 中图网11日发文称&#xff0c;其位于涿州的仓储中心被洪水淹了&#xff0c;一库房有400多万册的书籍。 网友纷纷在文章下暖心留言&#xff1a;注意人身安全&#xff0c;泡水的书也…...

Mac 执行 .sh命令报错 command not found

使用终端执行.sh命令&#xff0c;可输入&#xff1a; ./FileName.sh如果提示 Permission denied 权限不足&#xff0c;可增加sudo&#xff0c;命令如下&#xff1a; sudo ./FileName.sh如果提示 command not found 可以这样: chmod ux *.sh sudo ./FileName.sh...

postgresql 使用之 存储架构 触摸真实数据的存储结构以及组织形式,存入数据库的数据原来在这里

存储架构 ​专栏内容&#xff1a; postgresql内核源码分析 手写数据库toadb 并发编程 个人主页&#xff1a;我的主页 座右铭&#xff1a;天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物. 概述 postgresql 数据库服务运行时&#xff0c;数据在磁…...

Node.Js安装与配置教程

目录 1.下载官网 2.选择安装路径 3.添加环境变量 4.验证是否安装成功 5.修改模块下载位置 (1)查看npm默认存放位置 6.在node.js安装目录下&#xff0c;创建两个文件夹 7.修改默认文件夹 8.测试默认位置是否更改成功 9.安装报错解决办法 10.路径未更改成功解决办法 …...

Element-Plus DatePicker获取时间戳

文章目录 0、先上答案1、渔&#xff1f;1-1 Element-Plus 官网1-2 溯源 Day.js 0、先上答案 <!-- 秒 --><el-date-pickerv-model"timeStamp"type"datetime"value-format"X"/><!-- 毫秒 --><el-date-pickerv-model"tim…...

大型知识竞赛的技术保障:构建服务器、网络与备用方案的坚实堡垒

&#x1f3d7;️ 大型知识竞赛的技术保障&#xff1a;构建服务器、网络与备用方案的坚实堡垒稳定 高效 安全 让技术成为竞赛的隐形支撑&#x1f3af; 引言&#xff1a;技术保障是竞赛成功的基石一场成功的大型知识竞赛&#xff0c;其精彩纷呈的背后&#xff0c;离不开一套周…...

STM32 PVD中断防数据丢失实战:手把手教你配置2.9V阈值与紧急保存逻辑

STM32 PVD中断防数据丢失实战&#xff1a;手把手教你配置2.9V阈值与紧急保存逻辑 当嵌入式设备在野外采集数据或进行关键操作时&#xff0c;突然断电可能导致数月积累的传感器数据毁于一旦。我曾在一个农业物联网项目中亲历这种灾难——某次田间设备因电池接触不良断电&#xf…...

实在Agent物流对账全流程自动化方案与落地案例:2026智享财务新标杆

在2026年5月这个生成式AI深度重构实体经济的关键周期&#xff0c;全球物流行业已全面跨入“智能体&#xff08;Agent&#xff09;常态化运营”时代。根据《2026年全球供应链数字化趋势报告》显示&#xff0c;超过65%的大型物流企业已部署了具备自主决策能力的智能体来替代传统的…...

BGP EVPN Type2/3/5路由:VXLAN控制平面的三大支柱

1. 揭开BGP EVPN Type2/3/5路由的神秘面纱 第一次接触VXLAN控制平面时&#xff0c;我被各种路由类型搞得晕头转向。直到在数据中心网络改造项目中踩了几个坑&#xff0c;才真正理解BGP EVPN这三种核心路由就像乐高积木&#xff0c;各自独立却又完美拼合。想象一下&#xff0c;T…...

哔哩下载姬终极指南:三步掌握B站视频批量下载技巧

哔哩下载姬终极指南&#xff1a;三步掌握B站视频批量下载技巧 【免费下载链接】downkyi 哔哩下载姬downkyi&#xff0c;哔哩哔哩网站视频下载工具&#xff0c;支持批量下载&#xff0c;支持8K、HDR、杜比视界&#xff0c;提供工具箱&#xff08;音视频提取、去水印等&#xff0…...

基于Arduino Yun的DIY无线安防摄像头:运动检测、云端同步与实时流媒体

1. 项目概述与核心价值 手头有个闲置的Arduino Yun和USB摄像头&#xff0c;一直琢磨着怎么把它们利用起来&#xff0c;做个有点意思的东西。市面上那些无线监控摄像头功能是挺全&#xff0c;但总觉得少了点“掌控感”&#xff0c;数据存在哪里、怎么访问&#xff0c;都得听厂家…...

用C++和Eigen库手把手实现UR3机械臂逆解(附完整代码与避坑指南)

从理论到实践&#xff1a;基于Eigen库的UR3机械臂逆运动学完整实现指南 在工业自动化和机器人研究领域&#xff0c;六轴协作机械臂因其灵活性和广泛的应用场景而备受关注。UR3作为Universal Robots旗下的紧凑型协作机械臂&#xff0c;凭借其轻量化设计和用户友好特性&#xff0…...

Python封装币安API:从零构建Binance-Claw量化数据工具

1. 项目概述与核心价值最近在GitHub上看到一个挺有意思的项目&#xff0c;叫“Binance-Claw”&#xff0c;作者是Scandalousnessmotley216。光看这个名字&#xff0c;可能有点摸不着头脑&#xff0c;“Claw”是爪子的意思&#xff0c;难道是要“抓取”币安的数据&#xff1f;点…...

从 API Key 管理与审计日志功能看 Taotoken 的企业级安全支持

&#x1f680; 告别海外账号与网络限制&#xff01;稳定直连全球优质大模型&#xff0c;限时半价接入中。 &#x1f449; 点击领取海量免费额度 从 API Key 管理与审计日志功能看 Taotoken 的企业级安全支持 对于将大模型能力集成到业务流程中的企业而言&#xff0c;API 访问的…...

HiveWE魔兽地图编辑器:5分钟快速上手指南,告别卡顿创作新时代

HiveWE魔兽地图编辑器&#xff1a;5分钟快速上手指南&#xff0c;告别卡顿创作新时代 【免费下载链接】HiveWE A Warcraft III world editor. 项目地址: https://gitcode.com/gh_mirrors/hi/HiveWE 还在为《魔兽争霸III》原版地图编辑器缓慢的加载速度和繁琐的操作而烦恼…...