当前位置: 首页 > article >正文

SpringBoot整合RocketMQ实战:从消息发送到消费的完整流程(含Docker部署指南)

SpringBoot与RocketMQ深度整合实战从基础到Docker化部署在当今分布式系统架构中消息队列已成为解耦服务、削峰填谷的关键组件。RocketMQ作为阿里巴巴开源的高性能分布式消息中间件凭借其高吞吐、低延迟和强一致性的特点在电商、金融等领域得到广泛应用。本文将带您从零开始完整实现SpringBoot与RocketMQ的整合并包含Docker环境下的部署方案为开发者提供一站式解决方案。1. 环境准备与基础配置1.1 依赖引入与基础配置首先创建一个SpringBoot项目推荐使用Spring Initializr初始化在pom.xml中添加必要的依赖dependencies !-- RocketMQ SpringBoot Starter -- dependency groupIdorg.apache.rocketmq/groupId artifactIdrocketmq-spring-boot-starter/artifactId version2.2.2/version /dependency !-- 其他辅助依赖 -- dependency groupIdorg.projectlombok/groupId artifactIdlombok/artifactId optionaltrue/optional /dependency /dependencies在application.yml中配置RocketMQ连接信息rocketmq: name-server: 127.0.0.1:9876 producer: group: PRODUCER_GROUP_DEMO send-message-timeout: 3000 retry-times-when-send-failed: 2注意生产环境建议将配置信息放在配置中心而非直接写在配置文件中1.2 RocketMQ核心概念快速回顾在开始编码前有必要了解RocketMQ的几个核心概念Topic消息主题一级消息类型Tag消息标签二级消息类型用于细粒度过滤Producer Group生产者组Consumer Group消费者组NameServer轻量级注册中心Broker消息存储与转发服务器消息发送类型对比类型特点适用场景同步发送等待Broker响应重要通知、订单创建异步发送回调通知结果日志记录、非关键流程单向发送不关心结果性能监控数据上报2. 消息生产与消费实战2.1 消息生产者实现创建生产者服务类演示多种消息发送方式Service Slf4j public class MessageProducerService { private final RocketMQTemplate rocketMQTemplate; private static final String ORDER_TOPIC ORDER_TOPIC; private static final String LOG_TOPIC LOG_TOPIC; public MessageProducerService(RocketMQTemplate rocketMQTemplate) { this.rocketMQTemplate rocketMQTemplate; } // 同步发送订单消息 public SendResult sendOrderMessage(Order order) { return rocketMQTemplate.syncSend( ORDER_TOPIC :CREATE, MessageBuilder.withPayload(order) .setHeader(RocketMQHeaders.KEYS, order.getOrderNo()) .build() ); } // 异步发送日志消息 public void sendLogAsync(LogMessage logMessage) { rocketMQTemplate.asyncSend(LOG_TOPIC, logMessage, new SendCallback() { Override public void onSuccess(SendResult sendResult) { log.info(日志发送成功: {}, sendResult.getMsgId()); } Override public void onException(Throwable e) { log.error(日志发送失败, e); } }); } // 发送延时消息订单超时取消 public void sendDelayMessage(String orderNo, int delayLevel) { rocketMQTemplate.syncSend( ORDER_TOPIC :CANCEL, MessageBuilder.withPayload(orderNo).build(), 3000, delayLevel ); } }2.2 消息消费者实现消费者端实现消息监听处理Slf4j Service RocketMQMessageListener( topic ORDER_TOPIC, selectorExpression CREATE, consumerGroup ORDER_CONSUMER_GROUP ) public class OrderCreateConsumer implements RocketMQListenerOrder { Override public void onMessage(Order order) { log.info(收到订单创建消息: {}, order.getOrderNo()); // 业务处理逻辑 } } Service RocketMQMessageListener( topic ORDER_TOPIC, selectorExpression CANCEL, consumerGroup ORDER_CONSUMER_GROUP ) public class OrderCancelConsumer implements RocketMQListenerString { Override public void onMessage(String orderNo) { log.info(收到订单取消消息: {}, orderNo); // 取消订单业务逻辑 } }提示实际项目中建议将不同业务的消费者拆分到不同类中保持单一职责3. Docker化部署RocketMQ集群3.1 单机版快速部署对于开发和测试环境可以使用以下docker-compose.yml快速启动RocketMQ服务version: 3 services: namesrv: image: apache/rocketmq:4.9.4 container_name: rocketmq-namesrv ports: - 9876:9876 command: sh mqnamesrv volumes: - ./logs:/home/rocketmq/logs broker: image: apache/rocketmq:4.9.4 container_name: rocketmq-broker ports: - 10909:10909 - 10911:10911 environment: - NAMESRV_ADDRrocketmq-namesrv:9876 command: sh mqbroker -c /home/rocketmq/conf/broker.conf volumes: - ./broker.conf:/home/rocketmq/conf/broker.conf - ./store:/home/rocketmq/store - ./logs:/home/rocketmq/logs depends_on: - namesrvbroker.conf配置文件示例brokerClusterName DefaultCluster brokerName broker-a brokerId 0 deleteWhen 04 fileReservedTime 48 brokerRole ASYNC_MASTER flushDiskType ASYNC_FLUSH启动命令docker-compose up -d3.2 生产环境集群部署建议对于生产环境建议采用多节点部署方案NameServer集群至少部署2-3个节点Broker集群主从架构2m-2s不同Broker组部署在不同物理机网络配置节点间网络延迟1ms万兆网络最佳集群配置参数对比参数开发环境生产环境flushDiskTypeASYNC_FLUSHSYNC_FLUSHbrokerRoleASYNC_MASTERSYNC_MASTERfileReservedTime24 (小时)72 (小时)sendMessageThreadPoolNums1632-644. 高级特性与最佳实践4.1 消息过滤与顺序消费RocketMQ支持两种消息过滤方式Tag过滤在消费者端通过selectorExpression指定SQL92过滤通过消息属性进行过滤顺序消息实现示例// 生产者 public SendResult sendOrderedMessage(Order order) { return rocketMQTemplate.syncSendOrderly( ORDER_TOPIC, MessageBuilder.withPayload(order).build(), order.getOrderNo() // 使用订单号作为ShardingKey ); } // 消费者 Service RocketMQMessageListener( topic ORDER_TOPIC, consumerGroup ORDER_PAY_GROUP, consumeMode ConsumeMode.ORDERLY ) public class OrderPayConsumer implements RocketMQListenerOrder { Override public void onMessage(Order order) { // 保证同一订单的顺序处理 } }4.2 消息轨迹与监控启用消息轨迹追踪rocketmq: producer: trace-enabled: true consumer: trace-enabled: true推荐监控方案组合RocketMQ控制台官方提供的Web管理界面Prometheus Grafana指标监控与告警ELK日志收集与分析4.3 常见问题解决方案消息重复消费实现幂等处理使用Redis分布式锁数据库唯一索引消息堆积处理增加消费者实例优化消费逻辑性能设置合理的批量消费参数RocketMQMessageListener( topic BULK_TOPIC, consumerGroup BULK_CONSUMER, consumeMessageBatchMaxSize 20, // 每次拉取最大消息数 pullBatchSize 32 // 每次RPC调用拉取消息数 )在实际项目中使用RocketMQ时建议从项目初期就规划好消息Topic和Tag的命名规范避免后期混乱。我们团队采用的规范是业务域_子域_操作如order_payment_create表示订单支付业务的创建操作。

相关文章:

SpringBoot整合RocketMQ实战:从消息发送到消费的完整流程(含Docker部署指南)

SpringBoot与RocketMQ深度整合实战:从基础到Docker化部署 在当今分布式系统架构中,消息队列已成为解耦服务、削峰填谷的关键组件。RocketMQ作为阿里巴巴开源的高性能分布式消息中间件,凭借其高吞吐、低延迟和强一致性的特点,在电商…...

2026最新版!AI免费tokens全攻略,零成本玩转OpenClaw

原文链接:2026最新版!AI免费tokens全攻略,零成本玩转OpenClaw...

树莓派5实战:用NCNN跑通YOLOv5目标检测(附完整代码)

树莓派5实战:用NCNN跑通YOLOv5目标检测(附完整代码) 最近在捣鼓树莓派5,想在上面跑点“硬核”的视觉应用,比如实时目标检测。市面上方案不少,但要么太重,动辄几百兆的框架塞不进小小的SD卡&…...

Web原生数据库工具选型指南:SQLynx vs Navicat在云环境下的真实表现

Web原生数据库工具选型指南:SQLynx vs Navicat在云环境下的真实表现 最近和几个技术团队负责人聊天,话题总绕不开一个痛点:数据库管理工具在云时代好像有点“水土不服”。过去,我们习惯在本地装个客户端,连上数据库就开…...

GTE模型在在线教育中的应用:学习资源智能推荐

GTE模型在在线教育中的应用:学习资源智能推荐 1. 引言 在线教育平台面临着一个共同的难题:如何从海量的学习资源中,为每个学生找到最适合的内容?传统的关键词匹配方式往往力不从心,学生搜索"机器学习入门"…...

Intel(R) Wireless-AC 9560网络适配器故障排查指南(从设备管理器到网络重置)

1. 当你的Wi-Fi突然“消失”:从设备管理器开始诊断 不知道你有没有遇到过这种情况:正用着笔记本电脑,突然发现右下角的Wi-Fi图标不见了,或者它变成了一个地球仪,提示你“未连接”。你点开网络列表,空空如也…...

5分钟快速上手腾讯混元翻译模型HY-MT1.5-1.8B,开箱即用

5分钟快速上手腾讯混元翻译模型HY-MT1.5-1.8B,开箱即用 你是不是也遇到过这样的场景?想给海外客户发一封邮件,对着翻译软件纠结半天,总觉得词不达意;或者想快速翻译一份技术文档,却发现免费的在线工具要么…...

从医疗设备到工业控制:Multisim电路设计的5个实战技巧(以呼叫系统为例)

从医疗设备到工业控制:Multisim电路设计的5个实战技巧(以呼叫系统为例) 很多硬件工程师在从教学案例转向实际工业项目时,总会遇到一个尴尬的境地:仿真跑得风生水起,一到实际打板就问题频出。这中间的鸿沟&a…...

通达OA header伪造漏洞实战:从原理到未授权访问

1. 通达OA身份认证绕过漏洞初探 第一次听说通达OA这个漏洞时,我正在给客户做安全审计。当时发现一个奇怪的现象:明明没有登录,却能直接访问后台管理页面。后来深入研究才发现,原来是header伪造导致的身份认证绕过问题。这个漏洞影…...

国产MCU USB多协议转换器设计与实现

1. 项目概述USB多协议转换器是一种面向嵌入式系统调试、传感器数据汇聚与工业现场通信协同的硬件桥接设备。其核心目标是将单一USB主机接口统一映射为多路异构物理层通信通道,实现上位机对底层多样化外设的集中管控与数据调度。本设计基于国产高性能Cortex-M4F内核M…...

STM32 TM1637数码管驱动:IIC时序解析与Proteus仿真验证

1. 从零开始:为什么选择STM32和TM1637这对“黄金搭档”? 大家好,我是老李,一个在嵌入式领域摸爬滚打了十多年的“老码农”。今天想和大家聊聊一个非常经典且实用的组合:用STM32的GPIO口去驱动TM1637数码管模块。很多刚…...

Ubuntu 22.04 LTS 服务器 SSH 密钥配置与自动化部署实践

1. 从零开始:为什么SSH密钥是服务器管理的基石 如果你刚接触服务器运维,或者还在用密码登录你的Ubuntu 22.04服务器,那今天这篇分享可能会彻底改变你的工作流。我管理过上百台服务器,从早期的密码登录到后来的密钥认证&#xff0c…...

STM32G070多传感器融合终端设计:温湿度/空气质量/称重/RTC一体化嵌入式系统

1. 项目概述本项目是一款集成环境参数监测、实时时钟显示与便携式电子称重功能的嵌入式终端设备,面向嵌入式学习、环境监测原型开发及小型IoT节点应用场景。系统以STM32G070CBT6为主控核心,运行FreeRTOS实时操作系统,通过多任务协同调度实现温…...

探秘RestTemplateBuilder:为何连接超时设置频频‘失效’及最佳实践

1. 从一次深夜告警说起:你的超时设置真的生效了吗? 我记得很清楚,那是一个周五的晚上,正准备下班,突然手机开始疯狂震动。监控系统显示,我们一个核心服务的接口响应时间飙到了60秒以上,大量请求…...

构建城市可信数据空间:从标准到实践的全方位指南

1. 城市数据困局:我们为什么需要一个“可信”的空间? 想象一下,你所在的城市,交通部门掌握着实时车流数据,环保部门监测着空气质量,卫健委管理着医疗资源分布,而商业平台则记录着市民的消费习惯…...

基于AIR001的FRS数字对讲机设计与实现

1. 项目概述本项目是一款基于AIR001主控芯片与SR_FRS_2WUS无线对讲模块构建的便携式数字对讲终端,定位于轻量级、低功耗、高可用性的短距语音通信场景。系统在城市复杂电磁环境下实测通信距离超过1公里,语音清晰可辨,具备完整的频道管理、亚音…...

从CVSS2.0评分到漏洞证书:详解CNVD漏洞评级背后的逻辑

从CVSS2.0评分到漏洞证书:详解CNVD漏洞评级背后的逻辑 在数字化安全领域,漏洞评级体系如同医疗行业的急诊分诊系统,决定了有限资源应当优先分配给哪些威胁。CNVD作为国家级漏洞库,其评级机制直接影响着数千万互联网资产的防御优先…...

⚖️Lychee-Rerank多场景落地:制造业BOM文档检索、电力规程匹配、航空手册查检

Lychee-Rerank多场景落地:制造业BOM文档检索、电力规程匹配、航空手册查检 1. 引言:当精准匹配成为刚需 想象一下,你是一位制造业的工程师,面对一份包含上千个零部件的BOM(物料清单)文档,需要…...

从内网到外网:手把手教你用FFmpeg+RTSP实现远程视频监控(2023最新版)

2023年跨网络视频监控实战:基于FFmpeg与RTSP的高效部署指南 在智能安防需求激增的当下,远程视频监控已成为中小企业、家庭农场乃至个人工作室的刚需配置。传统方案常受限于网络边界,而现代技术栈让内网摄像头穿透NAT成为可能——无需昂贵硬件…...

Linux服务器外网访问失败的5个常见坑点(附详细排查命令)

Linux服务器外网访问失败的5个系统性排查指南 刚部署完项目却发现外网无法访问?这可能是每个Linux运维新手都会遇到的"成人礼"。不同于零散的问题解决,本文将用系统化的排查思路,带你从底层网络原理到实操命令,彻底掌握…...

Dify 2026 API网关安全攻防推演(2024Q4最新CISA红队渗透报告深度解码)

第一章:Dify 2026 API网关安全态势全景概览Dify 2026 版本将API网关安全能力提升至企业级零信任架构标准,全面覆盖认证、授权、流量审计、策略执行与威胁响应五大核心维度。其安全态势不再依赖单点防护组件,而是通过统一策略引擎驱动动态策略…...

原子操作 CAS 与锁实现

原子操作 CAS 与锁实现 文章目录原子操作 CAS 与锁实现1. CPU 缓存架构与缓存一致性1.1 为什么需要 CPU 缓存?1.2 写回策略与缓存不一致问题1.3 缓存一致性协议:MESI 与总线嗅探2. 原子操作:不可分割的执行单元2.1 什么是原子操作&#xff1f…...

商旅MICE平台怎么选?2026高性价比平台推荐|含核心功能测评

2026年中国十大商旅MICE平台综合推荐与深度解析 随着企业数字化转型的加速和全球业务拓展的需求增长,商旅MICE(会议、奖励旅游、会议展览)管理已成为企业战略的重要组成部分。到2026年,中国商旅管理市场预计将突破5000亿规模&…...

本地部署千问大模型

下载千问大模型大家可以从魔搭社区平台,下载各种版本的各种大模型,尽量能在自己的电脑上运行,所以这边下载1.5B的版本下载后的文件夹里的东西不要动即可环境准备在开始之前,先统一环境。本文基于transformersPyTorch,支…...

C# 基于OpenCv的视觉工作流-章34-投影向量

C# 基于OpenCv的视觉工作流-章34-投影向量 本章目标: 一、投影向量;一、 投影向量 投影向量分为行投影、列投影,原理是将各行/行像素值进行汇总统计。 本例中对汇总统计进行求取平均值,截取高出平均值的部分进行数量统计。 OpenCv…...

Using Vulkan -- Queues

应用程序通过 VkQueue 提交工作,通常以 VkCommandBuffer 对象或稀疏绑定的形式提交。 提交到同一个 VkQueue 的命令缓冲区按提交顺序开始执行,但开始后允许独立推进并乱序完成。 提交到不同队列的命令缓冲区彼此之间是无序的,除非使用 VkSe…...

无人机高空工程车辆识别 高清工程车辆识别 高清车辆识别 高清铲车压路机识别 无人机矿场行人识别 深度学习yolo第10558期

工程车辆识别计算机视觉数据集数据集概览 本数据集基于高空视角遥感影像构建,聚焦工程场景目标识别,为目标检测模型提供标准化标注样本,支撑工地监测与工程管理场景应用。项目内容类别数量4类类别名称汽车、人员、工程车1、工程车2图像数量50…...

MySQL的安装和卸载组件

目录安装组件:卸载MySQL的组件手动删除目录检查服务📝前言: 我们前面学习了如何安装MySQL,但如果有一些组件需要再安装或者是卸载,可以通过下面的方法执行这个是:MySQL安装和组件安装🔗 通过搜索…...

告别手动截图!Python+SCPI让示波器自动采集数据

在日常测试工作里,频繁手动操作示波器调节参数、截图、记录数据,不仅效率低下,还容易出现操作失误和数据遗漏。借助Python/Labview/C#SCPI指令实现远程自动化控制,就能让罗德示波器自动完成电压波形采集、界面截图与原始数据保存&…...

【力扣-42. 接雨水】Python笔记

题目回顾题目编号:42 题目名称:接雨水 题目难度:困难 输入示例:height [0,1,0,2,1,0,1,3,2,1] 输出示例:6给定 n 个非负整数表示每个宽度为 1 的柱子的高度图,计算按此排列的柱子,下雨之后能接…...