SpringBoot整合RocketMQ与客户端注意事项
SpringBoot整合RocketMQ
引入依赖(5.3.0比较稳定)
<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.3.1</version><exclusions><exclusion><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version><scope>test</scope></dependency></dependencies>
配置文件
rocketmq.name-server=192.168.234.141:9876
#配了发送者,才会初始化RocketMQTemplate或者不配置,自己根据业务注入不同的RocketMQTemplate
rocketmq.producer.group=springBootGroup
rocketmq.producer.send-message-timeout=10000
rocketmq.#如果这里不配,那就需要在消费者的注解中配。
#rocketmq.consumer.topic=
rocketmq.consumer.group=testGroup#rocketmq.pull-consumer.group=
#rocketmq.pull-consumer.topic=
消费者注册到spring容器
@Component
//@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic",consumeMode= ConsumeMode.CONCURRENTLY,messageModel= MessageModel.BROADCASTING)
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic")
public class SpringConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received message : "+ message);}
}
生产者消息在需要生产消息的地方注入生产者调用方法
@Component
public class SpringProducer {@Resourceprivate RocketMQTemplate rocketMQTemplate;@Resourceprivate DefaultMQProducer defaultMQProducer;public void sendMessage(String topic,String msg){this.rocketMQTemplate.convertAndSend(topic,msg);//pull消息需要配置rocketmq.consumer.topic和rocketmq.consumer.group参数//this.rocketMQTemplate.receive();//事务消息//this.rocketMQTemplate.sendMessageInTransaction}}
事务监听
2.0.4版本中,是需要指定txProducerGroup指向一个消息发送者组。不同的组可以有不同的事务消息逻辑。但是到了2.1.1版本,只能指定rocketMQTemplateBeanMame,也就是说如果你有多个发送者组需要有不同的事务消息逻辑,那就需要定义多个RocketMQTemplate。
//@RocketMQTransactionListener(txProducerGroup = "springBootGroup2")
//@RocketMQTransactionListener(rocketMQTemplateBeanName = "ExtRocketMQTemplate")
@RocketMQTransactionListener()
public class MyTransactionImpl implements RocketMQLocalTransactionListener {@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {}
}
客户端注意事项
消息的ID,Key和Tag
- MessageId是RocketMQ内部给每条消息分配的唯⼀索引
- key是Message中的补充信息
针对key这⼀个属性,建议在业务中可以添加⼀些带有业务唯⼀性的数据,作为MessageId的补充。RocketMQ基于Keys属性,实现了消息溯源、消息压缩等⼀系列功能。 - 通过Tag进⾏消息过滤性能⾮常⾼
最佳实践
⼀个应⽤尽可能⽤⼀个Topic,⽽消息⼦类型则可以⽤tags来标识。tags可以由应⽤⾃由设置,只有⽣产者在发送消息设置了tags,消费⽅在订阅消息时才可以利⽤tags通过broker做消息过滤:message.setTags(“TagA”)。
Kafka的⼀⼤问题是Topic过多,会造成Partition⽂件过多,影响性能。⽽RocketMQ中的Topic完全不会对消息转发性能有影响。但是Topic过多,还是会加⼤RocketMQ的元数据维护的性能消耗。所以,在使⽤时,还是需要对Topic进⾏合理的分配。使⽤Tag区分消息时,尽量直接使⽤Tag过滤,不要使⽤复杂的SQL过滤。因为消息过滤机制虽然可以减少⽹络IO,但是毕竟会加⼤Broker端的消息处理压⼒。所以,消息过滤的逻辑,还是越简单越好。
消费者端进行幂等控制
在MQ系统中,对于消息幂等有三种实现语义:
- at most once 最多⼀次:每条消息最多只会被消费⼀次
- at least once ⾄少⼀次:每条消息⾄少会被消费⼀次
- exactly once 刚刚好⼀次:每条消息都只会确定的消费⼀次
消息幂等的必要性
在互联⽹应⽤中,尤其在⽹络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况:
- 发送时消息重复
当⼀条消息已被成功发送到服务端并完成持久化,此时出现了⽹络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。 - 投递时消息重复
消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候⽹络闪断。 为了保证消息⾄少被消费⼀次,消息队列 RocketMQ 的服务端将在⽹络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。 - 负载均衡时消息重复(包括但不限于⽹络抖动、Broker 重启以及订阅⽅应⽤重启)
当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。
而要处理这个问题,RocketMQ的每条消息都有⼀个唯⼀的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以⽤这个MessageId来作为判断幂等的关键依据。
但是,这个MessageId是无法保证全局唯⼀的,也会有冲突的情况。所以在⼀些对幂等性要求严格的场景,最好是使⽤业务上唯⼀的⼀个标识⽐较靠谱。例如订单ID。⽽这个业务标识可以使⽤Message的Key来进行传递。
关注错误消息重试
多关注重试队列,可以及时了解消费者端的运⾏情况。这个队列中出现了⼤量的消息,就意味着消费者的运行出现了问题,要及时跟踪进行干预。
RocketMQ默认允许每条消息最多重试16次,每次重试的间隔时间如下:
这个重试时间跟延迟消息的延迟级别是对应的。不过取的是延迟级别的后16级别。
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
这个重试时间可以将源码中的org.apache.rocketmq.example.quickstart.Consumer⾥的消息监听器返回状态改为RECONSUME_LATER测试⼀下。
如果消息重试16次后仍然失败,消息将不再投递。转为进⼊死信队列。
手动处理死信队列
当⼀条消息消费失败,RocketMQ就会⾃动进⾏消息重试。而如果消息超过最⼤重试次数,RocketMQ就会认为这个消息有问题。但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,而会将其发送到这个消费者组对应的⼀种特殊队列:死信队列。
意味着消息在消费处理的过程中出现了比较严重的错误,并且无法自行恢复。此时,⼀般需要人工去查看死信队列中的消息,对错误原因进⾏排查。然后对死信消息进行处理,⽐如转发到正常的Topic重新进⾏消费,或者丢弃。
死信队列的名称是%DLQ%+ConsumGroup
死信队列的特征:
- ⼀个死信队列对应⼀个ConsumGroup,而不是对应某个消费者实例。
- 如果⼀个ConsumeGroup没有产生死信队列,RocketMQ就不会为其创建相应的死信队列。
- ⼀个死信队列包含了这个ConsumeGroup里的所有死信消息,而不区分该消息属于哪个Topic。
- 死信队列中的消息不会再被消费者正常消费。
- 死信队列的有效期跟正常消息相同。默认3天,对应broker.conf中的fileReservedTime属性。超过这个最长时间的消息都会被删除,而不管消息是否消费过。
默认创建出来的死信队列,他⾥⾯的消息是⽆法读取的,在控制台和消费者中都⽆法读取。这是因为这些默认的死信队列,他们的权限perm被设置成了2:禁读(这个权限有三种 2:禁读,4:禁写,6:可读可写)。需要⼿动将死信队列的权限配置成6,才能被消费(可以通过mqadmin指定或者web控制台操作)。
相关文章:

SpringBoot整合RocketMQ与客户端注意事项
SpringBoot整合RocketMQ 引入依赖(5.3.0比较稳定) <dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.3.1</version&…...
Github 2025-06-04 C开源项目日报 Top7
根据Github Trendings的统计,今日(2025-06-04统计)共有7个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量C项目7C++项目1Assembly项目1jq:轻量灵活的命令行JSON处理器 创建周期:4207 天开发语言:C协议类型:OtherStar数量:27698 个Fork数量:1538 …...
大二下期末
一.Numpy(Numerical Python) Numpy库是Python用于科学计算的基础包,也是大量Python数学和科学计算包的基础。不少数据处理和分析包都是在Numpy的基础上开发的,如后面介绍的Pandas包。 Numpy的核心基础是ndarray(N-di…...
LeetCode 热题 100 74. 搜索二维矩阵
LeetCode 热题 100 | 74. 搜索二维矩阵 大家好,今天我们来解决一道经典的算法题——搜索二维矩阵。这道题在 LeetCode 上被标记为中等难度,要求我们在一个满足特定条件的二维矩阵中查找一个目标值。如果目标值在矩阵中,返回 true;…...
解决 VSCode 中无法识别 Node.js 的问题
当 VSCode 无法识别 Node.js 时,通常会出现以下症状: 代码提示缺失require 等 Node.js API 被标记为错误调试功能无法正常工作终端无法运行 Node.js 命令 常见原因及解决方案 1. Node.js 未安装或未正确配置 解决方法: 确保已安…...

Mysql的卸载与安装
确保卸载干净mysql 不然在进行mysal安装时候会出现不一的页面和问题 1、卸载 在应用页面将查询到的mysql相关应用卸载 2、到c盘下将残留的软件包进行数据删除 3、删除programData下的mysql数据 4、检查系统中的mysql是否存在 cmd中执行 sc deleted mysql80 5、删除注册表中的…...
ES101系列09 | 运维、监控与性能优化
本篇文章主要讲解 ElasticSearch 中 DevOps 与性能优化的内容,包括集群部署最佳实践、容量规划、读写性能优化和缓存、熔断器等。 集群部署最佳实践 在生产环境中建议设置单一角色的节点。 Dedicated master eligible nodes:负责集群状态的管理。使用…...
Java常用的判空方法
文章目录 Java常用的判空方法JDK 自带的判空方法1. 使用 或 ! 运算符2. 使用 equals 方法3. Objects.isNull / Objects.nonNull4. Objects.equals4. JDK8 中的 Optional 第三方工具包1. Apache Commons Lang32. Google Guava3. Lombok 注解4. Vavr(函数式风格&…...

Excel处理控件Aspose.Cells教程:使用 C# 在 Excel 中创建组合图表
可视化项目时间线对于有效规划和跟踪至关重要。在本篇教程中,您将学习如何使用 C# 在 Excel 中创建组合图。只需几行代码,即可自动生成动态、美观的组合图。无论您是在构建项目管理工具还是处理内部报告,本指南都将向您展示如何将任务数据转换…...

【多线程初阶】阻塞队列 生产者消费者模型
文章目录 一、阻塞队列二、生产者消费者模型(一)概念(二)生产者消费者的两个重要优势(阻塞队列的运用)1) 解耦合(不一定是两个线程之间,也可以是两个服务器之间)2) 削峰填谷 (三)生产者消费者模型付出的代价 三、标准库中的阻塞队列(一)观察模型的运行效果(二)观察阻塞效果1) 队…...

《100天精通Python——基础篇 2025 第5天:巩固核心知识,选择题实战演练基础语法》
目录 一、踏上Python之旅二、Python输入与输出三、变量与基本数据类型四、运算符五、流程控制 一、踏上Python之旅 1.想要输出 I Love Python,应该使用()函数。 A.printf() B.print() C.println() D.Print() 在Python中想要在屏幕中输出内容,应该使用print()函数…...

机器人夹爪的选型与ROS通讯——机器人抓取系统基础系列(六)
文章目录 前言一、夹爪的选型1.1 任务需求分析1.2 软体夹爪的选型 二、夹爪的ROS通讯2.1 夹爪的通信方式介绍2.2 串口助手测试2.3 ROS通讯节点实现 总结Reference: 前言 本文将介绍夹爪的选型方法和通讯方式。以鞋子这类操作对象为例,将详细阐述了对应的夹爪选型过…...

第二十八章 RTC——实时时钟
第二十八章 RTC——实时时钟 目录 第二十八章 RTC——实时时钟 1 RTC实时时钟简介 2 RTC外设框图剖析 3 UNIX时间戳 4 与RTC控制相关的库函数 4.1 等待时钟同步和操作完成 4.2 使能备份域涉及RTC配置 4.3 设置RTC时钟分频 4.4 设置、获取RTC计数器及闹钟 5 实时时…...

使用 DuckLake 和 DuckDB 构建 S3 数据湖实战指南
本文介绍了由 DuckDB 和 DuckLake 组成的轻量级数据湖方案,旨在解决传统数据湖(如HadoopHive)元数据管理复杂、查询性能低及厂商锁定等问题。该方案为中小规模数据湖场景提供了简单、高性能且无厂商锁定的替代选择。 1. 什么是 DuckLake 和 D…...

大语言模型提示词(LLM Prompt)工程系统性学习指南:从理论基础到实战应用的完整体系
文章目录 前言:为什么提示词工程成为AI时代的核心技能一、提示词的本质探源:认知科学与逻辑学的理论基础1.1 认知科学视角下的提示词本质信息处理理论的深层机制图式理论的实际应用认知负荷理论的优化策略 1.2 逻辑学框架下的提示词架构形式逻辑的三段论…...

如何基于Mihomo Party http端口配置git与bash命令行代理
如何基于Mihomo Party http端口配置git与bash命令行代理 1. 确定Mihomo Party http端口配置 点击内核设置后即可查看 默认7892端口,开启允许局域网连接 2. 配置git代理 配置本机代理可以使用 127.0.0.1 配置局域网内其它机代理需要使用本机的非回环地址 IP&am…...
CMake 为 Debug 版本的库或可执行文件添加 d 后缀
在使用 CMake 构建项目时,我们经常需要区分 Debug 和 Release 构建版本。一个常见的做法是为 Debug 版本的库或可执行文件添加后缀(如 d),例如 libmylibd.so 或 myappd.exe。 本文将介绍几种在 CMake 中实现为 Debug 版本自动添加 d 后缀的方法。 方法一:使用 CMAKE_DEBU…...
Linux 特殊权限位详解:SetUID, SetGID, Sticky Bit
Linux 特殊权限位详解:SetUID, SetGID, Sticky Bit 在Linux权限系统中,除了基本的读、写(w)、执行(x)权限外,还有三个特殊权限位:SetUID、SetGID和Sticky Bit。这些权限位提供了更精细的权限控制机制,尤其在需要临时提升权限或管理共享资源时非常有用。 一、SetUID (s位…...

埃文科技智能数据引擎产品入选《中国网络安全细分领域产品名录》
嘶吼安全产业研究院发布《中国网络安全细分领域产品名录》,埃文科技智能数据引擎产品成功入选数据分级分类产品名录。 在数字化转型加速的今天,网络安全已成为企业生存与发展的核心基石,为了解这一蓬勃发展的产业格局,嘶吼安全产业…...
使用VTK还是OpenGL集成到qt程序里哪个好?
在Qt程序中集成VTK与OpenGL:选择哪个更好? 在Qt程序中实现三维可视化时,开发者常常面临一个选择:是使用VTK(Visualization Toolkit)还是OpenGL(Open Graphics Library)。这两种技术…...
Java-IO流之打印流详解
Java-IO流之打印流详解 一、打印流概述1.1 什么是打印流1.2 打印流的特点1.3 打印流的应用场景 二、PrintStream详解2.1 基本概念2.2 构造函数2.3 核心方法2.4 使用示例 三、PrintWriter详解3.1 基本概念3.2 构造函数3.3 核心方法3.4 使用示例 四、PrintStream与PrintWriter的比…...
高效图像处理:使用 Pillow 进行格式转换与优化
高效图像处理:使用 Pillow 进行格式转换与优化 1. 背景引入 在图像处理应用中,格式转换、裁剪、压缩等操作是常见需求。Python 的 Pillow 库基于 PIL(Python Imaging Library),提供 轻量、强大 的图像处理能力,广泛用于 Web 开发、数据分析、机器学习 等领域。 本文将…...
Github 2025-06-06 Java开源项目日报Top10
根据Github Trendings的统计,今日(2025-06-06统计)共有10个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量Java项目10TypeScript项目1Java实现的算法集合:使用Gitpod.io进行编辑和贡献 创建周期:2883 天开发语言:Java协议类型:MIT LicenseStar数量…...
使用 Ansible 在 Windows 服务器上安装 SSL 证书
在本教程中,我将向您展示如何使用 Ansible 在 Windows 服务器上安装 SSL 证书。使用 Ansible 自动化 SSL 证书安装过程可以提高 IT 运营的效率、一致性和协作性。我将介绍以下步骤: 将 SSL 证书文件复制到服务器将 PFX 证书导入指定的存储区获取导入的证…...
厂区能源监控系统:网关赋能下的高效能源管理与环保监测
在现代工业生产领域,能源的有效利用与环境保护是企业实现可持续发展的两大关键要素。厂区能源监控系统借助先进的信息技术与自动化控制手段,对厂区内能源消耗及污水处理等核心环节展开实时监控与精细化管理。其中,御控网关作为系统关键枢纽&a…...
CentOS 7 如何安装llvm-project-10.0.0?
CentOS 7 如何安装llvm-project-10.0.0? 需要先升级gcc至7.5版本,详见CentOS 7如何编译安装升级gcc版本?一文 # 备份之前的yum .repo文件至 /tmp/repo_bak 目录 mkdir -p /tmp/repo_bak && cd /etc/yum.repo.d && /bin/mv ./*.repo …...
Cursor 1.0 的核心功能亮点及技术价值分析
Cursor 1.0 的核心功能亮点及技术价值分析 结合官方更新和开发者实测整理: 🛠️ 一、BugBot:智能自动化代码审查 功能亮点:深度集成 GitHub,自动扫描 Pull Request(PR)中的潜在 Bug(…...
软考 系统架构设计师系列知识点之杂项集萃(83)
接前一篇文章:软考 系统架构设计师系列知识点之杂项集萃(82) 第150题 体系结构权衡分析方法(Architecture Tradeoff Analysis Method,ATAM)是一种常见的系统架构评估框架,该框架主要关注系统的…...

NLP学习路线图(二十六):自注意力机制
一、为何需要你?序列建模的困境 在你出现之前,循环神经网络(RNN)及其变种LSTM、GRU是处理序列数据(如文本、语音、时间序列)的主流工具。它们按顺序逐个处理输入元素,将历史信息压缩在一个隐藏…...

Unity3D仿星露谷物语开发60之定制角色其他部位
1、目标 上一篇中定制了角色的衬衫、手臂。 本篇中将定制角色其他部位的图形,包括:裤子、发型、皮肤、帽子等。 2、定制裤子 (1)修改ApplyCharacterCustomisation.cs脚本 我们需要设置一个输入框选择裤子的颜色。 // Select …...