当前位置: 首页 > article >正文

RocketMQ系列第三篇:Java原生基础使用实操,手把手写生产者消费者Demo

文章目录一、本篇前言理论落地从部署到代码实操二、前置准备项目环境必备配置1. 基础环境要求2. 导入RocketMQ核心Maven依赖三、核心基础RocketMQ消息核心对象说明1. DefaultMQProducer消息生产者核心类2. DefaultMQPushConsumer消息消费者核心类3. Message消息实体对象四、Java实操一三种常用生产者消息发送Demo1. 同步发送消息生产最常用金融/订单核心业务2. 异步发送消息高并发吞吐业务3. 单向发送消息极低优先级无需确认业务五、Java实操二消费者订阅消费消息完整Demo六、代码运行顺序控制台验证步骤七、新手常见踩坑问题快速排查一、本篇前言理论落地从部署到代码实操前面两篇我们已经搞定了RocketMQ核心概念工作原理、单机集群环境安装部署服务已经稳稳跑在服务器上。环境搭好只是基础真正开发工作中我们都是通过Java代码对接RocketMQ实现消息生产发送、订阅消费业务逻辑。本篇零基础新手跟着步骤复制代码就能快速跑通创建Topic、发消息、收消息全链路彻底弄懂Java和RocketMQ的基础交互逻辑为后续SpringBoot整合、高阶消息类型使用打好编码根基。二、前置准备项目环境必备配置1. 基础环境要求已搭建完成RocketMQ单机/集群环境NameServer、Broker正常启动运行Java开发环境JDK8及以上IDEA/Eclipse开发工具Maven项目工程普通Java项目即可无需Spring框架服务器防火墙开放9876、10911端口本地电脑能正常连通RocketMQ服务。2. 导入RocketMQ核心Maven依赖在pom.xml文件中引入RocketMQ官方Java客户端依赖版本和服务端版本保持一致即可兼容性拉满稳定无冲突。!-- RocketMQ Java客户端核心依赖 --dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client-java/artifactIdversion5.1.4/version/dependency依赖刷新下载完成后即可开始编写生产者、消费者核心代码所有API均为官方原生标准接口无第三方封装简单易懂好上手。三、核心基础RocketMQ消息核心对象说明写代码前先记三个核心基础对象所有后续编码都围绕这三个对象展开不用死记看懂用途即可1. DefaultMQProducer消息生产者核心类负责连接RocketMQ服务、创建生产实例、发送各类业务消息必须指定生产组名称和NameServer地址启动后才能正常投递消息。2. DefaultMQPushConsumer消息消费者核心类业务开发最常用的消费者模式消费者主动监听订阅的TopicBroker推送消息回调处理自动负载均衡、自动维护消费偏移量无需手动管控消费进度开箱即用。3. Message消息实体对象消息封装载体构造方法核心四个参数Topic消息主题、Tag消息标签、Key业务唯一标识、Body消息体真实业务数据字节数组精准匹配之前学的核心概念。四、Java实操一三种常用生产者消息发送DemoRocketMQ原生Java API提供三种核心消息发送模式适配不同业务场景下面逐个编写可直接运行的完整代码附带场景说明和详细注释。1. 同步发送消息生产最常用金融/订单核心业务适用场景支付下单、订单创建、资金扣款等必须保证消息发送成功的核心业务发送消息后阻塞等待Broker返回发送结果确认成功再执行业务后续逻辑可靠性最高。importorg.apache.rocketmq.client.producer.DefaultMQProducer;importorg.apache.rocketmq.client.producer.SendResult;importorg.apache.rocketmq.common.message.Message;importorg.apache.rocketmq.remoting.common.RemotingHelper;/** * 同步消息生产者Demo * 场景核心业务必须确认消息发送成功 */publicclassSyncProducerDemo{publicstaticvoidmain(String[]args)throwsException{// 1. 创建生产者实例指定生产组名称自定义同业务生产者组名一致DefaultMQProducerproducernewDefaultMQProducer(order_sync_producer_group);// 2. 设置RocketMQ NameServer地址替换为自己服务器IP:9876producer.setNamesrvAddr(127.0.0.1:9876);// 3. 启动生产者producer.start();System.out.println(同步生产者启动成功);// 4. 循环发送5条测试消息for(inti1;i5;i){// 5. 构建消息实体Topic主题、Tag标签、业务Key、消息体内容MessagemessagenewMessage(order_test_topic,// 消息主题自定义命名order_create_tag,// 消息标签订单创建标签order_key_00i,// 业务唯一Key用于消息排查追踪(订单编号00i订单创建成功).getBytes(RemotingHelper.DEFAULT_CHARSET));// 6. 同步发送消息等待Broker返回发送结果SendResultsendResultproducer.send(message);// 打印发送结果状态、消息ID等信息System.out.println(第i条消息发送结果sendResult);}// 7. 发送完成后关闭生产者实际项目常驻服务无需关闭producer.shutdown();}}2. 异步发送消息高并发吞吐业务适用场景日志埋点、短信通知、运营推送等高并发、不等待响应业务发送消息后不阻塞主线程通过回调接口接收发送成功或失败结果吞吐量远高于同步发送。importorg.apache.rocketmq.client.producer.DefaultMQProducer;importorg.apache.rocketmq.client.producer.SendCallback;importorg.apache.rocketmq.client.producer.SendResult;importorg.apache.rocketmq.common.message.Message;importorg.apache.rocketmq.remoting.common.RemotingHelper;/** * 异步消息生产者Demo * 场景高并发业务无需同步等待响应追求吞吐量 */publicclassAsyncProducerDemo{publicstaticvoidmain(String[]args)throwsException{// 1. 创建生产者实例指定生产组DefaultMQProducerproducernewDefaultMQProducer(order_async_producer_group);// 2. 设置NameServer地址producer.setNamesrvAddr(127.0.0.1:9876);// 3. 启动生产者producer.start();System.out.println(异步生产者启动成功);// 4. 循环发送5条异步消息for(inti1;i5;i){MessagemessagenewMessage(order_test_topic,order_notice_tag,notice_key_00i,(短信通知用户00i支付成功).getBytes(RemotingHelper.DEFAULT_CHARSET));// 5. 异步发送注册回调函数处理发送结果producer.send(message,newSendCallback(){// 发送成功回调OverridepublicvoidonSuccess(SendResultsendResult){System.out.println(异步消息发送成功sendResult.getMsgId());}// 发送失败回调处理异常重试、日志记录OverridepublicvoidonException(Throwablee){System.err.println(异步消息发送失败异常信息e.getMessage());e.printStackTrace();}});}// 异步发送无需等待短暂休眠保证回调执行完成Thread.sleep(1000);producer.shutdown();}}3. 单向发送消息极低优先级无需确认业务适用场景系统日志统计、简单埋点上报等无需确认发送结果、不关心是否投递成功的低优先级业务只管发送无需响应性能极致最高。importorg.apache.rocketmq.client.producer.DefaultMQProducer;importorg.apache.rocketmq.common.message.Message;importorg.apache.rocketmq.remoting.common.RemotingHelper;/** * 单向发送生产者Demo * 场景日志埋点、简单统计无需确认发送结果 */publicclassOneWayProducerDemo{publicstaticvoidmain(String[]args)throwsException{DefaultMQProducerproducernewDefaultMQProducer(log_oneway_producer_group);producer.setNamesrvAddr(127.0.0.1:9876);producer.start();System.out.println(单向生产者启动成功);// 发送埋点日志消息MessagemessagenewMessage(log_test_topic,log_click_tag,click_key_001,用户页面点击行为埋点日志.getBytes(RemotingHelper.DEFAULT_CHARSET));// 单向发送无返回值、无回调producer.sendOneway(message);System.out.println(单向消息发送完成无需确认结果);producer.shutdown();}}五、Java实操二消费者订阅消费消息完整Demo生产者发送消息后必须通过消费者订阅对应Topic和Tag才能拉取并处理业务消息。生产环境默认使用PushConsumer模式代码如下常驻运行持续监听消息。importorg.apache.rocketmq.client.consumer.DefaultMQPushConsumer;importorg.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;importorg.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;importorg.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;importorg.apache.rocketmq.common.message.MessageExt;importjava.util.List;/** * 消息消费者Demo * 订阅order_test_topic主题消费对应消息 */publicclassDefaultConsumerDemo{publicstaticvoidmain(String[]args)throwsException{// 1. 创建消费者实例指定消费组名称DefaultMQPushConsumerconsumernewDefaultMQPushConsumer(order_consumer_group);// 2. 设置NameServer连接地址consumer.setNamesrvAddr(127.0.0.1:9876);// 3. 订阅需要消费的Topic和Tag*代表订阅该主题下所有Tag消息consumer.subscribe(order_test_topic,*);// 4. 注册消息监听回调收到消息后执行业务处理consumer.registerMessageListener((ListMessageExtmessageExtList,ConsumeConcurrentlyContextcontext)-{// 循环处理每一条消费到的消息for(MessageExtmessageExt:messageExtList){// 获取消息主题、标签、业务Key、消息体内容StringtopicmessageExt.getTopic();StringtagmessageExt.getTags();StringmsgKeymessageExt.getKeys();StringmsgBodynewString(messageExt.getBody());// 打印消费到的消息信息模拟业务处理逻辑System.out.println(收到RocketMQ消息);System.out.println(消息Topictopic);System.out.println(消息Tagtag);System.out.println(业务KeymsgKey);System.out.println(消息内容msgBody);System.out.println();}// 返回消费成功状态Broker更新消费偏移量不再重复消费returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 5. 启动消费者常驻监听消息consumer.start();System.out.println(消费者启动成功持续监听消费消息中...);}}消费核心关键点代码最后返回CONSUME_SUCCESS代表消费成功Broker记录消费位置如果消费异常返回RECONSUME_LATERRocketMQ会自动重试消费重试多次失败后自动转入死信队列和之前讲的死信概念完美对应。六、代码运行顺序控制台验证步骤第一步确保RocketMQ NameServer、Broker全部正常启动无报错日志第二步先运行消费者代码常驻监听Topic消息等待消息投递第三步运行任意一个生产者代码发送测试消息第四步查看消费者控制台正常打印消息内容代表生产消费全链路通第五步打开RocketMQ可视化Dashboard查看Topic消息生产数量、消费堆积、死信情况可视化验证运行状态。七、新手常见踩坑问题快速排查连接不上NameServerIP地址写错、9876端口防火墙未开放、RocketMQ服务未启动生产者发消息失败报错Broker未关联NameServer、autoCreateTopicEnable未开启Topic不存在消费者收不到消息订阅Topic名称和生产者不一致、消费组名称重复、消费者启动晚于生产者程序启动内存报错本地开发无需修改JVM内存服务端已在上篇安装时优化配置。

相关文章:

RocketMQ系列第三篇:Java原生基础使用实操,手把手写生产者消费者Demo

文章目录一、本篇前言:理论落地,从部署到代码实操二、前置准备:项目环境必备配置1. 基础环境要求2. 导入RocketMQ核心Maven依赖三、核心基础:RocketMQ消息核心对象说明1. DefaultMQProducer:消息生产者核心类2. Defaul…...

告别VSCode C++插件卡顿!ROS开发用clangd实现丝滑补全的保姆级配置

告别VSCode C插件卡顿!ROS开发用clangd实现丝滑补全的保姆级配置 在ROS开发中,代码补全的流畅度直接影响开发效率。许多开发者习惯使用VSCode进行ROS项目开发,但原生的C/C插件在大型项目中的表现往往不尽如人意——补全速度慢、误报错误、占用…...

深度神经网络中的不等式紧性分析与工程实践

1. 项目背景与核心价值深度神经网络中的不等式分析一直是理论研究的难点和热点。子加性与子乘性不等式作为描述网络层间关系的重要数学工具,其紧性分析直接关系到我们对神经网络表达能力、泛化性能和优化过程的理解。在实际应用中,这类分析能够帮助我们设…...

3步搞定RTL8821CE无线网卡:Linux驱动安装终极指南

3步搞定RTL8821CE无线网卡:Linux驱动安装终极指南 【免费下载链接】rtl8821ce 项目地址: https://gitcode.com/gh_mirrors/rt/rtl8821ce 还在为Linux系统下Realtek RTL8821CE无线网卡无法正常工作而烦恼吗?这款高性能的802.11ac无线芯片在Window…...

KVCache-Factory:LLM推理加速的缓存工厂设计与实战

1. 项目概述:一个为LLM推理加速而生的缓存工厂如果你最近在折腾大语言模型(LLM)的本地部署或者API调用,大概率会遇到一个头疼的问题:推理速度慢,尤其是当输入序列(Prompt)很长&#…...

Command line is too long. Shorten the command line via JAR manifest or via a classpath file

这种情况一般是在本地通过windows启动才会触发的,原因是启动时是使用命令行启动,而windows的启动命令是8191 个字符,超过的话就会报这个异常 1.启动命令行:2.异常:Error running ${启动类} Error running ${启动类}. Command line is too long. Shorten the command line via …...

完美光标库原理与应用:贝塞尔曲线实现平滑跟随动画

1. 项目概述:从“完美光标”说起最近在折腾一个需要高度自定义光标交互的前端项目,遇到了一个挺有意思的库——caterpi11ar/perfect-cursor。乍一看这个名字,你可能会觉得它又是一个处理鼠标样式的CSS库,但实际上,它解…...

告别记忆负担:用快马ai将自然语言秒变精准gitbash命令

作为一个经常和Git打交道的开发者,我深知那些复杂的Git命令有多让人头疼。特别是刚入门的时候,光是记住git rebase和git merge的区别就够喝一壶的。最近我发现了一个特别实用的方法,用AI来帮我们生成Git命令,简直就像有个随身的Gi…...

Tessy单元测试避坑指南:手把手解决9个最常见的头文件导入与编译错误

Tessy单元测试避坑实战:9类头文件与编译错误的深度解析与解决方案 嵌入式开发者在初次接触Tessy进行C/C单元测试时,头文件导入与编译环节堪称"新手坟场"。本文将从工程配置底层逻辑出发,系统梳理九类高频错误的诊断方法与解决路径&…...

基于MCP协议的代码智能体:从代码理解到精准操作

1. 项目概述:一个为开发者赋能的代码生成与理解工具最近在GitHub上看到一个挺有意思的项目,叫opencode-mcp,作者是AlaeddineMessadi。第一眼看到这个仓库名,我下意识地以为又是一个基于大语言模型的代码生成工具,毕竟“…...

别再只用snmputil了!Windows下net-snmp 5.5.0完整安装与SNMPv3配置实战

别再只用snmputil了!Windows下net-snmp 5.5.0完整安装与SNMPv3配置实战 如果你还在用snmputil这类功能受限的工具管理Windows网络设备,可能会错过SNMP协议90%的高级功能。作为运维工程师,我经历过从snmputil到net-snmp的升级过程——就像从自…...

AI接口代理服务器:统一多模型调用,集成缓存与流式响应

1. 项目概述与核心价值最近在折腾AI应用开发,特别是想给现有系统快速集成一个智能对话或代码补全能力时,发现了一个宝藏级的开源项目:lucgagan/completions。这个项目在GitHub上不算特别火爆,但它的定位非常精准——它不是一个庞大…...

嵌入式系统电源与时钟管理技术解析

1. 嵌入式系统电源与时钟管理架构解析在移动设备和物联网终端爆炸式增长的今天,嵌入式系统的能效比成为产品竞争力的关键指标。我曾参与一款智能穿戴设备的开发,当系统在动态电压频率调节(DVFS)和SmartReflex技术加持下&#xff0…...

Blender顶点权重混合修改器,除了合并还能做什么?3个你可能不知道的实用技巧

Blender顶点权重混合修改器:超越合并的3个高阶应用技巧 在角色绑定和布料模拟中,顶点权重是控制模型变形的核心数据层。大多数Blender用户只把顶点权重混合修改器当作简单的合并工具,却忽略了它在权重微调领域的强大潜力。今天我们将打破常规…...

Go语言重构AI编码助手:gocode的极速架构与多智能体实战

1. 项目概述:为什么我们需要一个全新的AI编码助手如果你和我一样,每天都在终端里敲代码,那你肯定对AI编码助手不陌生。从早期的GitHub Copilot Chat到后来惊艳全场的Claude Code,这些工具确实改变了我们写代码的方式。但用久了&am…...

通过TaotokenCLI工具一键配置团队统一的大模型开发环境

通过TaotokenCLI工具一键配置团队统一的大模型开发环境 1. 安装Taotoken CLI工具 Taotoken CLI提供两种安装方式,适合不同使用场景。对于需要频繁调用CLI的团队管理员,推荐全局安装: npm install -g taotoken/taotoken若仅需临时使用或避免…...

维普 AIGC 率太高不用愁!这几款降重工具一次解决查重率和 AI 痕迹两个难题

毕业季论文查重、AIGC 检测双重压力拉满!不少同学熬大夜改稿,维普查重率仍飘红,AIGC 疑似率更是居高不下,反复修改却越改越乱,甚至影响论文核心逻辑。其实不用死磕手动改写,2026 年多款双效降重神器已实现 …...

一文帮你搞懂JavaScript的核心概念

JavaScript的核心概念介绍JavaScript作为现代Web开发的基石,掌握其核心概念对开发者至关重要。以下从语言特性、运行机制和关键组件三个维度展开分析。变量与作用域JavaScript采用var、let、const三种变量声明方式。var存在变量提升特性,函数作用域&…...

【农业物联网PHP可视化实战指南】:手把手教你用Laravel+Chart.js实时渲染土壤温湿度数据流

更多请点击: https://intelliparadigm.com 第一章:农业物联网数据可视化项目概述 农业物联网数据可视化项目旨在将田间部署的温湿度传感器、土壤水分探头、光照强度计及气象站等设备采集的实时数据,通过统一协议汇聚至边缘网关,并…...

保姆级避坑指南:在VMware虚拟机Ubuntu20.04上搞定RobotiQ 2F-85夹爪的ROS Noetic驱动

虚拟机环境下的RobotiQ夹爪ROS驱动避坑实战手册 在机器人开发领域,虚拟化环境与物理硬件的联动调试一直是令人头疼的难题。特别是当RobotiQ 2F-85这样的工业级夹爪遇上VMware虚拟化的Ubuntu系统,各种"坑"接踵而至——从rosdep的神秘报错到串口…...

为什么你的AI策略在R 4.5中年化衰减超42%?——揭秘RcppParallel加速失效、xts时区错位与回测引擎底层Bug

更多请点击: https://intelliparadigm.com 第一章:R 4.5量化投资AI策略回测的系统性失效诊断 当R语言升级至4.5版本后,大量基于quantstrat、blotter与TTR构建的AI驱动回测框架出现静默性失效——非报错崩溃,而是信号生成偏移、滑…...

Dify+PLC/SCADA文档智能检索落地全记录(含OPC UA语义对齐技术细节)

更多请点击: https://intelliparadigm.com 第一章:DifyPLC/SCADA文档智能检索落地全记录(含OPC UA语义对齐技术细节) 在工业自动化系统中,PLC与SCADA文档常以PDF、Word及HTML混合格式分散存储,导致运维人员…...

为AI Agent构建全链路可观测性:基于OpenTelemetry与Apache Doris的运维实践

1. 项目概述:为AI Agent装上“全链路透视镜”如果你正在大规模使用OpenClaw这类AI Agent调度平台,我猜你肯定遇到过这样的场景:某个关键的业务流程突然卡住了,你只知道最终结果不对,但完全不清楚是哪个Agent出的问题、…...

如何让小爱音箱播放任何音乐:10分钟快速搭建私人音乐库

如何让小爱音箱播放任何音乐:10分钟快速搭建私人音乐库 【免费下载链接】xiaomusic 使用小爱音箱播放音乐,音乐使用 yt-dlp 下载。 项目地址: https://gitcode.com/GitHub_Trending/xia/xiaomusic 想让你的小爱音箱播放自己喜欢的音乐&#xff0c…...

HCIP的stp(生成树)2

一、TCN BPDU 报文二、配置BPDU的工作过程1.只有根设备会主动发送配置BPDU2.一开始,所有交换机运行之后,都认为自己是根网桥,则会主动发送配置BPDU,从所有激活STP的接口 上发出,此时也能接收到其他设备的配置BPDU&…...

AI编程助手量化回测技能库:基于VectorBT的专业策略开发实战

1. 项目概述:为AI编程助手打造的量化回测技能库 如果你正在用Claude Code、Cursor或者GitHub Copilot这类AI编程工具来写量化交易策略,那你肯定遇到过这样的场景:脑子里有个策略想法,想让AI助手帮你快速实现回测,结果…...

基于MLX框架在Mac本地部署与优化多模态大模型实战指南

1. 项目概述:在Mac上本地运行多模态大模型的利器如果你是一名Mac用户,同时又对当前火热的视觉语言模型(VLM)和全模态模型(Omni Model)感兴趣,那么你很可能已经受够了云端API的延迟、高昂的成本&…...

TmuxAI:无缝融入终端的AI助手,重塑开发工作流

1. TmuxAI:你的终端智能副驾,如何重塑我的开发工作流 作为一名常年与终端为伴的开发者,我的工作流几乎被 tmux 和 vim 完全定义。分屏、会话管理、快速切换,这些操作早已成为肌肉记忆。但即便是最熟练的终端使用者&#xff0…...

AI算力核心:Token吞吐量决定一切!你了解Token的真正价值吗?

文章深入解析了Token作为AI处理信息最小单元的重要性,指出Token吞吐量是算力的核心标尺。Token不仅是算力消耗、成本计价的标尺,也是产业竞争的关键。文章详细解释了Token的定义、技术本质,以及它与算力的关系,强调Token是算力的“…...

手把手教你用RK3588开发板+ModelBox,5分钟搞定疲劳驾驶检测Demo

基于RK3588开发板的边缘AI疲劳驾驶检测实战指南 在智能交通和工业安全领域,实时监测驾驶员状态已成为刚需。本文将带你使用Rockchip RK3588开发板配合ModelBox框架,从零构建一个完整的疲劳驾驶检测系统。不同于传统方案需要昂贵设备和复杂部署&#xff0…...