当前位置: 首页 > article >正文

RocketMQ 深度解析:消息中间件核心原理与实践指南

一、RocketMQ 概述

1.1 什么是 RocketMQ

RocketMQ 是阿里巴巴开源的一款分布式消息中间件,后捐赠给 Apache 基金会成为顶级项目。它具有低延迟、高并发、高可用、高可靠等特点,广泛应用于订单交易、消息推送、流计算、IoT 等场景。

1.2 核心特性

  • 高吞吐量:单机支持 10 万级 TPS
  • 低延迟:毫秒级消息投递
  • 高可用:主从架构,支持多副本
  • 消息可靠:支持消息持久化、事务消息
  • 扩展性强:支持集群部署,可水平扩展
  • 丰富的消息类型:顺序消息、定时消息、事务消息等

二、RocketMQ 核心架构

2.1 架构组成

RocketMQ 由四个核心组件构成:

  1. NameServer:轻量级注册中心,负责 Broker 的注册与发现
  2. Broker:消息存储与转发服务器
  3. Producer:消息生产者
  4. Consumer:消息消费者
[Producer]  →  [NameServer]  ←  [Consumer]↓                ↑[Broker] ←→ [Broker]

2.2 核心概念

  • Topic:消息主题,一级消息类型
  • Message Queue:消息队列,Topic 的分区单位
  • Tag:消息标签,二级消息类型
  • Group:生产者/消费者组
  • Offset:消息在队列中的位置标识

三、消息生产与消费

3.1 消息发送模式

3.1.1 同步发送
public class SyncProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("producer_group");producer.setNamesrvAddr("localhost:9876");producer.start();Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));// 同步发送,等待Broker返回结果SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);producer.shutdown();}
}
3.1.2 异步发送
public class AsyncProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("producer_group");producer.setNamesrvAddr("localhost:9876");producer.start();Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));// 异步发送,设置回调函数producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.printf("Send success: %s%n", sendResult);}@Overridepublic void onException(Throwable e) {System.out.printf("Send failed: %s%n", e);}});Thread.sleep(5000); // 等待回调完成producer.shutdown();}
}
3.1.3 单向发送
// 只发送消息,不关心结果
producer.sendOneway(msg);

3.2 消息消费模式

3.2.1 集群消费(CLUSTERING)
public class ClusterConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("TopicTest", "*");// 集群模式(默认)consumer.setMessageModel(MessageModel.CLUSTERING);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Consumer Started.");}
}
3.2.2 广播消费(BROADCASTING)
// 广播模式(所有消费者都收到全量消息)
consumer.setMessageModel(MessageModel.BROADCASTING);

四、高级特性

4.1 顺序消息

生产者

// 确保相同订单ID的消息发送到同一个队列
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}
}, orderId); // orderId作为选择队列的依据

消费者

consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {// 保证顺序消费return ConsumeOrderlyStatus.SUCCESS;}
});

4.2 事务消息

public class TransactionProducer {public static void main(String[] args) throws Exception {TransactionMQProducer producer = new TransactionMQProducer("transaction_group");producer.setNamesrvAddr("localhost:9876");// 设置事务监听器producer.setTransactionListener(new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 执行本地事务try {// 模拟业务处理System.out.println("Executing local transaction: " + msg);Thread.sleep(1000);return LocalTransactionState.COMMIT_MESSAGE;} catch (Exception e) {return LocalTransactionState.ROLLBACK_MESSAGE;}}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// 检查本地事务状态return LocalTransactionState.COMMIT_MESSAGE;}});producer.start();Message msg = new Message("TransactionTopic", null, "Transaction Message".getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送事务消息TransactionSendResult sendResult = producer.sendMessageInTransaction(msg, null);System.out.printf("%s%n", sendResult);Thread.sleep(5000);producer.shutdown();}
}

4.3 延迟消息

// 设置延迟级别(1-18分别对应1s,5s,10s,30s,1m...2h)
msg.setDelayTimeLevel(3); // 10秒后投递

五、RocketMQ 集群部署

5.1 集群模式

  1. 单Master模式:开发测试用,不可靠
  2. 多Master模式:高性能,无单点故障
  3. 多Master多Slave模式(异步复制):性能与可靠性的平衡
  4. 多Master多Slave模式(同步双写):高可靠性,性能略低

5.2 部署建议

  • NameServer:至少2台,保证高可用
  • Broker
    • 生产环境推荐多Master多Slave
    • 每个Master配置至少1个Slave
    • 主从分布在不同的物理机器

六、性能优化

6.1 生产者优化

  1. 批量发送

    List<Message> messages = new ArrayList<>();
    // 添加多条消息
    producer.send(messages);
    
  2. 合理设置发送超时

    producer.setSendMsgTimeout(3000); // 3秒
    
  3. 关闭VIP通道(非阿里云环境):

    producer.setVipChannelEnabled(false);
    

6.2 消费者优化

  1. 增加消费线程数

    consumer.setConsumeThreadMin(20);
    consumer.setConsumeThreadMax(64);
    
  2. 设置批量消费

    consumer.setConsumeMessageBatchMaxSize(10); // 每次最多消费10条
    
  3. 跳过堆积消息(特殊场景):

    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    

七、常见问题解决方案

7.1 消息重复消费

解决方案

  1. 消费端实现幂等处理
  2. 使用数据库唯一键约束
  3. 使用Redis等缓存记录已处理消息ID

7.2 消息堆积

解决方案

  1. 增加消费者实例
  2. 提高消费者并行度
  3. 优化消费逻辑,减少处理时间
  4. 临时扩容Topic队列数

7.3 消息丢失

预防措施

  1. 生产端使用同步发送+重试机制
  2. Broker配置同步刷盘
    flushDiskType=SYNC_FLUSH
    
  3. 主从同步双写模式

八、监控与管理

8.1 控制台部署

RocketMQ 提供可视化控制台,可监控:

  • 消息堆积情况
  • 消费者延迟
  • Broker运行状态
  • Topic/Queue分布

8.2 关键监控指标

  1. 生产消费TPS
  2. 消息堆积量
  3. 消费延迟时间
  4. Broker CPU/Memory
  5. 磁盘IO使用率

九、最佳实践

  1. Topic命名规范:业务线_子系统_功能,如"Trade_Order_Notify"
  2. Tag使用原则:细化消息分类,如"PaySuccess", “PayFailed”
  3. 消息大小控制:建议不超过1MB
  4. Producer/Consumer分组:按业务功能划分
  5. 消息Key设置:便于问题追踪
    msg.setKeys("ORDER_10086");
    

十、总结

RocketMQ 作为一款成熟的分布式消息中间件,在电商、金融、IoT等领域有着广泛应用。掌握其核心原理、部署架构和优化技巧,能够帮助开发者构建高性能、高可靠的消息系统。在实际应用中,需要根据业务场景合理选择消息模式,并做好监控与运维工作,确保消息系统的稳定运行。

相关文章:

RocketMQ 深度解析:消息中间件核心原理与实践指南

一、RocketMQ 概述 1.1 什么是 RocketMQ RocketMQ 是阿里巴巴开源的一款分布式消息中间件&#xff0c;后捐赠给 Apache 基金会成为顶级项目。它具有低延迟、高并发、高可用、高可靠等特点&#xff0c;广泛应用于订单交易、消息推送、流计算、IoT 等场景。 1.2 核心特性 高吞…...

使用Docker Compose部署Dify

目录 1. 克隆项目代码2. 准备配置文件3. 配置环境变量4. 启动服务5. 验证部署6. 访问服务注意事项 1. 克隆项目代码 首先&#xff0c;克隆Dify项目的1.4.0版本&#xff1a; git clone https://github.com/langgenius/dify.git --branch 1.4.02. 准备配置文件 进入docker目录…...

基于 Vue3 与 exceljs 实现自定义导出 Excel 模板

在开发中&#xff0c;我们需要常常为用户提供更多的数据录入方式&#xff0c;Excel 模板导出与导入是一个常见的功能点。本文将介绍如何使用 Vue3、exceljs 和 file-saver 实现一个自定义导出 Excel 模板&#xff0c;并在特定列添加下拉框选择的数据验证功能。 技术选型 excelj…...

杰发科技AC7840——CSE硬件加密模块使用(1)

1. 简介 2. 功能概述 3. 简单的代码分析 测试第二个代码例程 初始化随机数 这里的CSE_CMD_RND在FuncID中体现了 CSE_SECRET_KEY在17个用户KEY中体现 最后的读取RNG值&#xff0c;可以看出计算结果在PRAM中。 总的来看 和示例说明一样&#xff0c;CSE 初次使用&#xff0c;添加…...

前端地图数据格式标准及应用

前端地图数据格式标准及应用 坐标系EPSGgeojson标准格式基于OGC标准的地图服务shapefile文件3D模型数据常见地图框架 坐标系EPSG EPSG&#xff08;European Petroleum Survey Group&#xff09;是一个国际组织&#xff0c;负责维护和管理地理坐标系统和投影系统的标准化编码 E…...

threejs几何体BufferGeometry顶点

1. 几何体顶点位置数据和点模型 本章节主要目的是给大家讲解几何体geometry的顶点概念,相对偏底层一些&#xff0c;不过掌握以后&#xff0c;你更容易深入理解Threejs的几何体和模型对象。 缓冲类型几何体BufferGeometry threejs的长方体BoxGeometry、球体SphereGeometry等几…...

向量数据库选型实战指南:Milvus架构深度解析与技术对比

导读&#xff1a;随着大语言模型和AI应用的快速普及&#xff0c;传统数据库在处理高维向量数据时面临的性能瓶颈日益凸显。当文档经过嵌入模型处理生成768到1536维的向量后&#xff0c;传统B-Tree索引的检索效率会出现显著下降&#xff0c;而现代应用对毫秒级响应的严苛要求使得…...

java方法重写学习笔记

方法重写介绍 子类和父类有两个返回值&#xff0c;参数&#xff0c;名称都一样的方法&#xff0c; 子类的方法会覆盖父类的方法。 调用 public class Overide01 {public static void main(String[] args) {Dog dog new Dog();dog.cry();} }Animal类 public class Animal {…...

解决WPF短暂的白色闪烁(白色闪屏)

在 WPF 应用程序启动时出现 短暂的白色闪烁&#xff08;白色闪屏&#xff09;&#xff0c;通常是由于以下原因导致的&#xff1a; 主要原因 WPF 默认窗口背景是白色&#xff0c;在加载 UI 之前会短暂显示白色背景。 解决方案 设置窗口背景为透明或黑色&#xff08;推荐&…...

如何在Java中处理PDF文档(教程)

在开发文档管理系统、自动化工具或商业应用程序时&#xff0c;Java开发者常需处理PDF文档的编辑需求。无论是添加页面、调整内容尺寸、插入水印还是添加注释&#xff0c;选择一套可靠易用的Java PDF开发工具包至关重要。 JPedal&#xff08;Java PDF开发工具包&#xff09;的新…...

TensorBoard安装与基本操作指南(PyTorch)

文章目录 什么是TensorBoard&#xff1f;TensorBoardX与TensorBoard的依赖关系易混关系辨析Pytorch安装TensorBoard并验证1. TensorBoard安装和访问2. TensorBoard主要界面介绍实用技巧 什么是TensorBoard&#xff1f; TensorBoard是TensorFlow生态系统中的一款强大的可视化工…...

基于PyTorch的残差网络图像分类实现指南

以下是一份超过6000字的详细技术文档&#xff0c;介绍如何在Python环境下使用PyTorch框架实现ResNet进行图像分类任务&#xff0c;并部署在服务器环境运行。内容包含完整代码实现、原理分析和工程实践细节。 基于PyTorch的残差网络图像分类实现指南 目录 残差网络理论基础服务…...

2025/5/25 学习日记 linux进阶命令学习

tree:以树状结构显示目录下的文件和子目录&#xff0c;方便直观查看文件系统结构。 -d&#xff1a;仅显示目录&#xff0c;不显示文件。-L [层数]&#xff1a;限制显示的目录层级&#xff08;如 -L 2 表示显示当前目录下 2 层子目录&#xff09;。-h&#xff1a;以人类可读的格…...

【MPC控制 - 从ACC到自动驾驶】4 MPC的“实战演练”:ACC Simulink仿真与结果深度解读

【MPC控制 - 从ACC到自动驾驶】MPC的“实战演练”&#xff1a;ACC Simulink仿真与结果深度解读 在过去的几天里&#xff0c;我们一起&#xff1a; Day 1: 认识了ACC这位聪明的“跟车小能手”和MPC这位“深谋远虑的棋手”。Day 2: 给汽车“画了像”&#xff0c;建立了它的纵向…...

【时时三省】Python 语言----牛客网刷题笔记

目录 1,常用函数 1,input() 2,map() 3,split() 4,range() 5, 切片 6,列表推导式 山不在高,有仙则名。水不在深,有龙则灵。 ----CSDN 时时三省 1,常用函数 1,input() 该函数遇到 换行停止接收,返回类型为字符串 2,map() 该函数出镜率较高,目的是将一个可迭…...

OPENEULER搭建私有云存储服务器

一、关闭防火墙和selinux 二、下载相关软件 下载nginx&#xff0c;mariadb、php、nextcloud 下载nextcloud&#xff1a; sudo wget https://download.nextcloud.com/server/releases/nextcloud-30.0.1.zip sudo unzip nextcloud-30.0.1.zip -d /var/www/html/ sudo chown -R…...

PyQt学习系列10-性能优化与调试技巧

PyQt学习系列笔记&#xff08;Python Qt框架&#xff09; 第十课&#xff1a;PyQt的性能优化与调试技巧 课程目标 掌握 PyQt应用的性能优化策略&#xff08;内存管理、渲染优化、多线程&#xff09;学习 调试技巧&#xff08;日志输出、断点设置、性能分析工具&#xff09;解…...

卷积神经网络(CNN)深度讲解

卷积神经网络&#xff08;CNN&#xff09; 本篇博客参考自大佬的开源书籍&#xff0c;帮助大家从头开始学习卷积神经网络&#xff0c;谢谢各位的支持了&#xff0c;在此期待各位能与我共同进步​ 卷积神经网络&#xff08;CNN&#xff09;是一种特殊的深度学习网络结构&#x…...

Docker部署Zookeeper集群

简介 ZooKeeper 是一个开源的分布式协调服务&#xff0c;由 Apache 软件基金会开发和维护。它主要用于管理和协调分布式系统中的多个节点&#xff0c;以解决分布式环境下的常见问题&#xff0c;如配置管理、服务发现、分布式锁等。ZooKeeper 提供了一种可靠的机制&#xff0c;…...

数据结构—(概述)

目录 一 数据结构&#xff0c;相关概念 1. 数据结构&#xff1a; 2. 数据(Data): 3. 数据元素(Data Element): 4. 数据项&#xff1a; 5. 数据对象(Data Object): 6. 容器&#xff08;container&#xff09;&#xff1a; 7. 结点&#xff08;Node&#xff09;&#xff…...

python打卡day34

GPU训练及类的call方法 知识点回归&#xff1a; CPU性能的查看&#xff1a;看架构代际、核心数、线程数GPU性能的查看&#xff1a;看显存、看级别、看架构代际GPU训练的方法&#xff1a;数据和模型移动到GPU device上类的call方法&#xff1a;为什么定义前向传播时可以直接写作…...

华为OD机试真题—— 流水线(2025B卷:100分)Java/python/JavaScript/C/C++/GO最佳实现

2025 B卷 100分 题型 本专栏内全部题目均提供Java、python、JavaScript、C、C++、GO六种语言的最佳实现方式; 并且每种语言均涵盖详细的问题分析、解题思路、代码实现、代码详解、3个测试用例以及综合分析; 本文收录于专栏:《2025华为OD真题目录+全流程解析+备考攻略+经验分…...

【数据架构01】数据技术架构篇

✅ 9张高质量数据架构图&#xff1a;大数据平台功能架构、数据全生命周期管理图、AI技术融合架构等&#xff1b; &#x1f680;无论你是数据架构师、治理专家&#xff0c;还是数字化转型负责人&#xff0c;这份资料库都能为你提供体系化参考&#xff0c;高效解决“架构设计难、…...

【安全攻防与漏洞​】​​HTTPS中的常见攻击与防御​​

HTTPS 中常见攻击与防御策略涵盖中间人攻击&#xff08;MITM&#xff09;、SSL剥离、重放攻击等&#xff0c;帮助构建安全的 HTTPS 通信环境&#xff1a; 一、中间人攻击&#xff08;MITM&#xff09; 攻击原理 场景&#xff1a;攻击者通过伪造证书或劫持网络流量&#xff0c…...

esp32cmini SK6812 2个方式

1 #include <SPI.h> // ESP32-C系列的SPI引脚 #define MOSI_PIN 7 // ESP32-C3/C6的SPI MOSI引脚 #define NUM_LEDS 30 // LED灯带实际LED数量 - 确保与实际数量匹配&#xff01; #define SPI_CLOCK 10000000 // SPI时钟频率 // 颜色结构体 st…...

【数据集】30 m地表温度LST数据集

目录 数据概述🔧研究目标与意义🧠 算法核心组成1. 地表比辐射率(LSE)估算2. 大气校正(Atmospheric Correction)LST反演流程图📊 精度验证与评估结果参考《Generating the 30-m land surface temperature product over continental China and USA from Landsat 5/7/8 …...

【CATIA的二次开发07】草图编辑器对象结构及应用

【CATIA的二次开发07】草图编辑器对象结构及应用 草图编辑器(SketchEditor)是用于创建和编辑2D草图的核心对象。其对象结构遵循CATIA的层级关系,以下是详细说明及代码示例: 一、核心对象结构图 Application │ └─ Documents│└─ Document (.CATPart)│└─ Part│└─…...

IT | 词汇科普手册Ⅱ

目录 1.报文(Message) 2.Token(令牌) Token vs. Cookie Token vs. Key "碰一碰"支付 3.NFC 4.Nginx 5.JSON 6.前置机 前置机vs.Nginx反向代理 以PDA、WMS举例前置机场景 7.RabbitMQ 核心功能 1.报文(Message) 报文&#xff08;Message&#xff09;​​是系统或组件之…...

【 java 基础问题 第一篇 】

目录 1.概念 1.1.java的特定有哪些&#xff1f; 1.2.java有哪些优势哪些劣势&#xff1f; 1.3.java为什么可以跨平台&#xff1f; 1.4JVM,JDK,JRE它们有什么区别&#xff1f; 1.5.编译型语言与解释型语言的区别&#xff1f; 2.数据类型 2.1.long与int类型可以互转吗&…...

以前端的角度理解 Kubernetes(K8s)

作为一名前端开发者&#xff0c;我们每天都在与 React、Vue、Webpack 等工具打交道&#xff0c;而 Kubernetes&#xff08;K8s&#xff09;听起来更像是后端或运维的“专属领域”。但实际上&#xff0c;K8s 的核心思想和前端开发中的某些模式高度相似。那么咱们用熟悉的类比帮助…...