当前位置: 首页 > article >正文

【消息队列RocketMQ】五、RocketMQ 实战应用与生态拓展

本篇文章主要将结合前面几篇文章的基础讲解,来演示RocketMQ的实际场景中的应用。

一、RocketMQ 实战应用场景​

1.1 电商系统中的应用​

在电商系统中,RocketMQ 承担着重要角色。以双十一大促活动为例,短时间内会产生海量的订单请求、库存变更请求和支付请求。​

订单处理:用户下单后,订单服务将下单消息发送至 RocketMQ。订单相关的后续操作,如库存扣减、优惠券核销、物流信息生成等,都通过订阅该订单消息实现异步处理。通过配置broker.conf文件,可设置消息的持久化策略,保证订单消息不丢失。例如,将flushDiskType设置为SYNC_FLUSH,确保消息实时刷盘 。

flushDiskType=SYNC_FLUSH

库存同步:当库存发生变化时,库存服务发送库存变更消息到 RocketMQ,其他依赖库存信息的服务(如商品展示服务、订单服务)订阅该消息,实现库存数据的实时同步。在 CentOS 7 系统中,可使用以下命令启动 Producer 发送库存变更消息:

nohup sh tools.sh org.apache.rocketmq.example.quickstart.Producer &

1.2 金融领域的应用​

在金融行业,对数据一致性和可靠性要求极高。RocketMQ 的事务消息特性在此发挥关键作用。​

转账业务:以跨行转账为例,在转账操作中,涉及到转出账户扣减和转入账户增加两个操作。通过 RocketMQ 的事务消息,确保这两个操作要么都成功,要么都失败。在生产者代码中,需要实现TransactionListener接口来处理事务消息。

TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
producer.setTransactionListener(new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object o) {// 执行本地事务,如账户扣款try {// 模拟扣款操作return LocalTransactionState.COMMIT_MESSAGE;} catch (Exception e) {return LocalTransactionState.ROLLBACK_MESSAGE;}}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt messageExt) {// 检查本地事务状态return LocalTransactionState.COMMIT_MESSAGE;}
});

对账系统:每日交易结束后,各业务系统将交易数据发送至 RocketMQ,对账系统订阅这些消息,进行数据核对。通过调整broker.conf中的transactionCheckInterval参数,可设置事务消息的回查间隔,确保事务的最终一致性。

transactionCheckInterval=60000 # 单位毫秒,设置1分钟回查一次

1.3 日志处理系统​

日志处理是 RocketMQ 的常见应用场景之一。​

日志收集:各应用服务将日志信息发送到 RocketMQ,日志收集服务订阅相关 Topic,将日志数据存储到分布式文件系统(如 HDFS)中。在 CentOS 7 上,可通过修改 Producer 的配置,设置日志消息的 Topic 和标签。

DefaultMQProducer producer = new DefaultMQProducer("log_producer_group");
producer.setNamesrvAddr("localhost:9876");
Message msg = new Message("LogTopic", "InfoTag", logContent.getBytes());
producer.send(msg);

日志分析:数据分析服务从 RocketMQ 中消费日志消息,进行实时数据分析,如统计接口调用频率、用户行为分析等。为了提高日志消息的消费效率,可在broker.conf中调整defaultTopicQueueNums参数,增加 Topic 的队列数量。

defaultTopicQueueNums=8

二、RocketMQ 与其他技术的集成​

2.1 与 Spring Cloud 的集成​

Spring Cloud 是一套微服务框架,与 RocketMQ 集成后,可实现微服务之间的异步通信和解耦。​

引入依赖:在 Spring Cloud 项目的pom.xml文件中添加 RocketMQ 相关依赖。

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version>
</dependency>

配置文件修改:在application.yml文件中配置 RocketMQ 的 NameServer 地址和 Producer 组名。

rocketmq:name-server: localhost:9876producer:group: spring_cloud_producer_group

发送与消费消息:在 Spring Boot 应用中,通过注入RocketMQTemplate发送消息,创建@RocketMQMessageListener注解的消费者类接收消息。

// 发送消息
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendMessage(String message) {rocketMQTemplate.convertAndSend("SpringCloudTopic", message);
}// 消费消息
@Component
@RocketMQMessageListener(topic = "SpringCloudTopic", consumerGroup = "spring_cloud_consumer_group")
public class SpringCloudConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received message: " + message);}
}

2.2 与 Kubernetes 的集成​

Kubernetes 是容器编排工具,将 RocketMQ 部署在 Kubernetes 集群中,可提高集群的资源利用率和管理效率。​

创建 RocketMQ 的 Kubernetes 资源文件:包括Deployment、Service和PersistentVolumeClaim等文件。以Deployment为例:

apiVersion: apps/v1
kind: Deployment
metadata:name: rocketmq-broker
spec:replicas: 2selector:matchLabels:app: rocketmq-brokertemplate:metadata:labels:app: rocketmq-brokerspec:containers:- name: rocketmq-brokerimage: rocketmqinc/rocketmq-broker:4.9.4ports:- containerPort: 10911- containerPort: 10909volumeMounts:- name: data-volumemountPath: /home/rocketmq/storevolumes:- name: data-volumepersistentVolumeClaim:claimName: rocketmq-pvc

部署到 Kubernetes 集群:在 CentOS 7 上,通过kubectl命令将资源文件应用到集群中。

kubectl apply -f rocketmq-deployment.yaml
kubectl apply -f rocketmq-service.yaml
kubectl apply -f rocketmq-pvc.yaml

三、RocketMQ 生态拓展​

3.1 社区生态发展​

RocketMQ 作为 Apache 顶级项目,拥有活跃的社区。社区成员不断贡献新功能、修复 Bug,推动 RocketMQ 的版本迭代。例如,在 RocketMQ 5.0 版本中,引入了新的存储引擎和通信协议,进一步提升了性能和可扩展性。开发者可以通过 Apache 官方网站(RocketMQ · 官方网站 | RocketMQ)获取最新版本信息和技术文档,也可以在 GitHub 仓库(https://github.com/apache/rocketmq)参与开源项目的开发和讨论。​

3.2 周边工具生态​

管理工具

RocketMQ Console 是一款可视化管理工具,方便用户对 RocketMQ 集群进行监控和管理。在 CentOS 7 上,可通过以下步骤部署:​

        1、下载 RocketMQ Console 的 Jar 包:访问 RocketMQ Console 的 GitHub 仓库(https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console),在releases页面下载最新版本的 Jar 包,也可以使用wget命令在 CentOS 7 终端下载,例如:

wget https://github.com/apache/rocketmq-externals/releases/download/v1.0.1/rocketmq-console-ng-1.0.1.jar

        2、执行命令启动:使用以下命令启动 RocketMQ Console,其中--rocketmq.config.namesrvAddr指定 NameServer 的地址和端口,--server.port指定 RocketMQ Console 的服务端口:

nohup java -jar rocketmq-console-ng-1.0.1.jar --rocketmq.config.namesrvAddr=localhost:9876 --server.port=8080 &

         3、访问与使用:通过浏览器访问http://localhost:8080,进入 RocketMQ Console 界面。在该界面中,用户可以直观地查看 Topic、Consumer Group、Broker 等信息,支持创建、删除 Topic,查看 Consumer 的消费进度、消息堆积情况等操作。例如,在 Topic 管理页面,可查看每个 Topic 的消息数量、队列分布,还能手动调整 Topic 的读写权限 。

        4、配置优化:如果需要调整 RocketMQ Console 的配置,如增加内存分配,可修改启动命令为:

nohup java -Xmx512m -Xms256m -jar rocketmq-console-ng-1.0.1.jar --rocketmq.config.namesrvAddr=localhost:9876 --server.port=8080 &

-Xmx512m设置最大堆内存为 512MB,-Xms256m设置初始堆内存为 256MB,以适应大规模集群监控需求。 

    

监控工具

Prometheus 和 Grafana 可以与 RocketMQ 集成,实现对 RocketMQ 集群的监控。通过配置 Prometheus 的采集规则,获取 RocketMQ 的指标数据(如消息发送量、消费延迟等),然后在 Grafana 中进行可视化展示。

        1、Prometheus 配置:首先,在 CentOS 7 上安装 Prometheus。下载 Prometheus 的二进制包,解压后进入目录,编辑prometheus.yml配置文件,添加 RocketMQ 的监控指标采集任务。例如:

global:scrape_interval: 15sscrape_configs:- job_name: 'rocketmq'static_configs:- targets: ['localhost:9876']  # NameServer地址,根据实际情况修改metrics_path: /metrics  # RocketMQ暴露指标的路径params:module: [rocketmq_exporter]

保存配置后,使用以下命令启动 Prometheus:

nohup./prometheus --config.file=prometheus.yml &

Prometheus 会按照配置定期从 RocketMQ 中采集消息发送量、消费延迟、Broker 负载等指标数据。​

        2. Grafana 配置:安装 Grafana 后,访问其 Web 界面(默认地址为http://localhost:3000),使用默认账号密码(admin/admin)登录。在 Grafana 中,首先添加 Prometheus 作为数据源,在数据源配置页面,输入 Prometheus 的地址和端口(如http://localhost:9090,9090 为 Prometheus 默认端口),保存测试连接成功后,即可导入 RocketMQ 相关的监控仪表盘模板。可以从 Grafana 官方网站(https://grafana.com/grafana/dashboards/)搜索 RocketMQ 相关模板,下载 JSON 文件后,在 Grafana 中通过 “+” -> “Import” 导入模板,就能直观地查看 RocketMQ 集群的各项监控指标图表,如消息吞吐量趋势图、Consumer 消费速率对比图等 。

测试工具:

除了基础的管理和监控工具,RocketMQ 生态中还有用于性能测试的工具,如rocketmq-tools自带的压力测试功能。在 CentOS 7 的 RocketMQ 安装目录bin文件夹下,通过以下命令可以进行简单的消息发送压力测试:

sh tools.sh org.apache.rocketmq.example.perf.PerfTestProducer -t TestTopic -n 100000 -m 1024

上述命令中,-t指定测试的 Topic 为TestTopic,-n表示发送 100000 条消息,-m指定每条消息大小为 1024 字节。通过调整这些参数,可以模拟不同场景下的消息发送压力,测试 RocketMQ 集群的性能表现。同时,还可以使用Jmeter与 RocketMQ 结合,进行更复杂的性能测试。在Jmeter中,需要添加 “Java 请求”,引入 RocketMQ 的客户端依赖包,编写 Java 代码实现消息的发送和接收测试,从而全面评估 RocketMQ 在高并发场景下的性能和稳定性。

数据迁移工具:

在实际应用中,当需要对 RocketMQ 集群进行升级、数据迁移等操作时,rocketmq-migration-tool可以发挥重要作用。该工具可以实现不同 RocketMQ 集群之间的数据迁移,支持 Topic、Consumer Group 等信息的迁移。使用时,先在 CentOS 7 系统上下载工具包,配置源集群和目标集群的 NameServer 地址、迁移规则等信息,通过执行迁移命令,即可将消息数据、消费进度等从源集群迁移到目标集群。例如:

java -jar rocketmq-migration-tool.jar --sourceNamesrvAddr=source-namesrv:9876 --targetNamesrvAddr=target-namesrv:9876 --topic=MigrationTopic

上述命令将MigrationTopic的相关数据从源集群(source-namesrv:9876)迁移到目标集群(target-namesrv:9876),保障了业务系统在集群变更过程中的数据完整性和连续性。

相关文章:

【消息队列RocketMQ】五、RocketMQ 实战应用与生态拓展

本篇文章主要将结合前面几篇文章的基础讲解&#xff0c;来演示RocketMQ的实际场景中的应用。 一、RocketMQ 实战应用场景​ 1.1 电商系统中的应用​ 在电商系统中&#xff0c;RocketMQ 承担着重要角色。以双十一大促活动为例&#xff0c;短时间内会产生海量的订单请求、库存…...

volatile怎么保证可见性和有序性?(个人理解)

volatile怎么保证可见性和有序性&#xff1f; volatile变量会在字段修饰符中显示ACC_VOLATILE。通过插入内存屏障指令&#xff0c;禁止指令重排序。不管前面与后面任何指令&#xff0c;都不能与内存屏障指令进行重排&#xff0c;保证前后的指令按顺序执行 。同时保证数据修改的…...

计算机组成与体系结构:直接内存映射(Direct Memory Mapping)

目录 CPU地址怎么找到真实的数据&#xff1f; 内存映射的基本单位和结构 1. Pages&#xff08;页&#xff09;——虚拟地址空间的基本单位 2. Frames&#xff08;页框&#xff09;——物理内存空间的基本单位 3. Blocks&#xff08;块&#xff09;——主存和缓存之间的数据…...

RAGFlow:构建高效检索增强生成流程的技术解析

引言 在当今信息爆炸的时代&#xff0c;如何从海量数据中快速准确地获取所需信息并生成高质量内容已成为人工智能领域的重要挑战。检索增强生成&#xff08;Retrieval-Augmented Generation, RAG&#xff09;技术应运而生&#xff0c;它将信息检索与大型语言模型&#xff08;L…...

STM32提高篇: 蓝牙通讯

STM32提高篇: 蓝牙通讯 一.蓝牙通讯介绍1.蓝牙技术类型 二.蓝牙协议栈1.蓝牙芯片架构2.BLE低功耗蓝牙协议栈框架 三.ESP32-C3中的蓝牙功能1.广播2.扫描3.通讯 四.发送和接收 一.蓝牙通讯介绍 蓝牙&#xff0c;是一种利用低功率无线电&#xff0c;支持设备短距离通信的无线电技…...

SpringMVC处理请求映射路径和接收参数

目录 springmvc处理请求映射路径 案例&#xff1a;访问 OrderController类的pirntUser方法报错&#xff1a;java.lang.IllegalStateException&#xff1a;映射不明确 核心错误信息 springmvc接收参数 一 &#xff0c;常见的字符串和数字类型的参数接收方式 1.1 请求路径的…...

高质量学术引言如何妙用ChatGPT?如何写提示词

目录 1、引言究竟是什么&#xff1f; 2、引言如何构建&#xff1f;&#xff1f; 在学术写作领域&#xff0c;巧妙利用人工智能来构建文章的引言和理论框架是一个尚待探索的领域。小编在这篇文章中探讨一种独特的方法&#xff0c;即利用 ChatGPT 作为工具来构建引言和理论框架…...

【程序员 NLP 入门】词嵌入 - 上下文中的窗口大小是什么意思? (★小白必会版★)

&#x1f31f; 嗨&#xff0c;你好&#xff0c;我是 青松 &#xff01; &#x1f308; 希望用我的经验&#xff0c;让“程序猿”的AI学习之路走的更容易些&#xff0c;若我的经验能为你前行的道路增添一丝轻松&#xff0c;我将倍感荣幸&#xff01;共勉~ 【程序员 NLP 入门】词…...

从物理到预测:数据驱动的深度学习的结构化探索及AI推理

在当今科学探索的时代&#xff0c;理解的前沿不再仅仅存在于我们书写的方程式中&#xff0c;也存在于我们收集的数据和构建的模型中。在物理学和机器学习的交汇处&#xff0c;一个快速发展的领域正在兴起&#xff0c;它不仅观察宇宙&#xff0c;更是在学习宇宙。 AI推理 我们…...

各种各样的bug合集

一、连不上数据库db 1.可能是密码一大包东西不对&#xff1b; 2.可能是里面某个port和数据库不一样&#xff08;针对于修改了数据库但是连不上的情况&#xff09;&#xff1b; 3.可能是git代码没拉对&#xff0c;再拉一下代码。❤ 二、没有这个包 可能是可以#注释掉。❤ …...

大模型AI的“双刃剑“:数据安全与可靠性挑战与破局之道

在数字经济蓬勃发展的浪潮中&#xff0c;数据要素已然成为驱动经济社会创新发展的核心引擎。从智能制造到智慧城市&#xff0c;从电子商务到金融科技&#xff0c;数据要素的深度融合与广泛应用&#xff0c;正以前所未有的力量重塑着产业格局与经济形态。 然而&#xff0c;随着…...

如何使用 CompletableFuture、Function 和 Optional 优雅地处理异步编程?

当异步遇上函数式编程&#xff0c;代码变得更优雅 在日常开发中&#xff0c;很多时候我们需要处理异步任务、函数转换和空值检查。传统的回调方式和空值判断常常让代码看起来繁琐而难以维护。幸运的是&#xff0c;Java 提供了 CompletableFuture、Function 和 Optional&#x…...

基于大模型的结肠癌全病程预测与诊疗方案研究

目录 一、引言 1.1 研究背景与意义 1.2 研究目的与创新点 二、结肠癌概述 2.1 流行病学特征 2.2 发病机制与危险因素 2.3 临床症状与诊断方法 三、大模型技术原理与应用现状 3.1 大模型的基本原理 3.2 在医疗领域的应用情况 3.3 在结肠癌预测中的潜力分析 四、术前…...

操作系统概述与安装

主流操作系统概述 信创平台概述 虚拟机软件介绍与安装 windows server 安装 centos7 安装 银河麒麟V10 安装 一&#xff1a;主流服务器操作系统 &#xff08;1&#xff09;Windows Server 发展历程&#xff1a; 1993年推出第一代 WindowsNT&#xff08;企业级内核&am…...

算法设计与分析(基础)

问题列表 一、 算法的定义与特征&#xff0c;算法设计的基本步骤二、 算法分析的目的是什么&#xff1f;如何评价算法&#xff0c;如何度量算法的复杂性&#xff1f;三、 递归算法、分治法、贪婪法、动态规划法、回溯法的基本思想方法。四、 同一个问题&#xff0c;如TSP&#…...

多线程(线程安全)

一、线程安全的风险来源 1.1 后厨的「订单撞单」现象 场景&#xff1a;两服务员同时录入客人点单到同一个菜单本 问题&#xff1a; 订单可能被覆盖菜品数量统计错误 Java中的表现&#xff1a; public class OrderServlet extends HttpServlet {private int totalOrders 0…...

开发了一个b站视频音频提取器

B站资源提取器-说明书 一、功能说明 本程序可自动解密并提取B站客户端缓存的视频资源&#xff0c;支持以下功能&#xff1a; - 自动识别视频缓存目录 - 将加密的.m4s音频文件转换为标准MP3格式 - 将加密的.m4s视频文件转换为标准MP4格式&#xff08;合并音视频流&#xff09;…...

基于javaweb的SpringBoot校园服务平台系统设计与实现(源码+文档+部署讲解)

技术范围&#xff1a;SpringBoot、Vue、SSM、HLMT、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、小程序、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容&#xff1a;免费功能设计、开题报告、任务书、中期检查PPT、系统功能实现、代码编写、论文编写和辅导、论文…...

北京SMT贴片加工工艺优化要点

内容概要 在北京地区SMT贴片加工领域&#xff0c;工艺优化是实现高可靠电子组装的系统性工程。本文以精密化生产需求为导向&#xff0c;围绕制程关键节点展开技术剖析&#xff0c;从钢网印刷的锡膏成型控制到贴装环节的视觉定位精度&#xff0c;逐步构建全流程优化模型。通过分…...

PHYBench:首个大规模物理场景下的复杂推理能力评估基准

2025-04-23, 由北京大学物理学院和人工智能研究所等机构共同创建的 PHYBench 数据集&#xff0c;这是一个专门用于评估大型语言模型在物理场景下的复杂推理能力的高质量基准。该数据集包含 500 道精心策划的物理问题&#xff0c;覆盖力学、电磁学、热力学、光学、现代物理和高级…...

将输入帧上下文打包到下一个帧的预测模型中用于视频生成

Paper Title: Packing Input Frame Context in Next-Frame Prediction Models for Video Generation 论文发布于2025年4月17日 Abstract部分 在这篇论文中,FramePack是一种新提出的网络结构,旨在解决视频生成中的两个主要问题:遗忘和漂移。 具体来说,遗忘指的是在生成视…...

使用localStorage的方式存储数据,刷新之后,无用户消息,需要重新登录,,localStorage 与 sessionStorage 的区别

1 localStorage 与 sessionStorage 的区别: 特性localStoragesessionStorage存储时长永久存储,除非手动删除或者清空浏览器缓存会话存储,浏览器关闭后数据丢失数据生命周期持久存在,直到被明确删除(即使关闭浏览器也不会消失)当前会话结束后数据自动清空(关闭标签页或浏…...

第15章:MCP服务端项目开发实战:性能优化

第15章:MCP服务端项目开发实战:性能优化 在构建和部署 MCP(Memory, Context, Planning)驱动的 AI Agent 系统时,性能和可扩展性是关键的考量因素。随着用户量、数据量和交互复杂度的增加,系统需要能够高效地处理请求,并能够平滑地扩展以应对更高的负载。本章将探讨 MCP…...

MOA Transformer:一种基于多尺度自注意力机制的图像分类网络

MOA Transformer&#xff1a;一种基于多尺度自注意力机制的图像分类网络 引言 近年来&#xff0c;Transformer 架构在自然语言处理领域取得了巨大的成功&#xff0c;并逐渐扩展到计算机视觉领域。Swin Transformer 就是其中一个典型的成功案例。它通过引入“无卷积”架构&…...

Red:1靶场环境部署及其渗透测试笔记(Vulnhub )

环境介绍&#xff1a; 靶机下载&#xff1a; https://download.vulnhub.com/red/Red.ova 本次实验的环境需要用到VirtualBox&#xff08;桥接网卡&#xff09;&#xff0c;VMware&#xff08;桥接网卡&#xff09;两台虚拟机&#xff08;网段都在192.168.152.0/24&#xff0…...

从 Java 到 Kotlin:在现有项目中迁移的最佳实践!

全文目录&#xff1a; 开篇语 1. 为什么选择 Kotlin&#xff1f;1.1 Kotlin 与 Java 的兼容性1.2 Kotlin 的优势1.3 Kotlin 的挑战 2. Kotlin 迁移最佳实践2.1 渐进式迁移2.1.1 步骤一&#xff1a;将 Kotlin 集成到现有的构建工具中2.1.2 步骤二&#xff1a;逐步迁移2.1.3 步骤…...

Java Collections工具类指南

一、Collections工具类概述 java.util.Collections是Java集合框架中提供的工具类&#xff0c;包含大量静态方法用于操作和返回集合。这些方法主要分为以下几类&#xff1a; 排序操作查找和替换同步控制不可变集合特殊集合视图其他实用方法 二、排序操作 1. 自然排序 List&…...

深入详解人工智能数学基础——概率论中的KL散度在变分自编码器中的应用

🧑 博主简介:CSDN博客专家、CSDN平台优质创作者,高级开发工程师,数学专业,10年以上C/C++, C#, Java等多种编程语言开发经验,拥有高级工程师证书;擅长C/C++、C#等开发语言,熟悉Java常用开发技术,能熟练应用常用数据库SQL server,Oracle,mysql,postgresql等进行开发应用…...

测试模版x

本篇技术博文摘要 &#x1f31f; 引言 &#x1f4d8; 在这个变幻莫测、快速发展的技术时代&#xff0c;与时俱进是每个IT工程师的必修课。我是盛透侧视攻城狮&#xff0c;一名什么都会一丢丢的网络安全工程师&#xff0c;也是众多技术社区的活跃成员以及多家大厂官方认可人员&a…...

Openharmony 和 HarmonyOS 区别?

文章目录 OpenHarmony 与 HarmonyOS 的区别&#xff1a;开源生态与商业发行版的定位差异一、定义与定位二、技术架构对比1. OpenHarmony2. HarmonyOS 三、应用场景差异四、开发主体与生态支持五、关键区别总结六、如何选择&#xff1f;未来展望 OpenHarmony 与 HarmonyOS 的区别…...