RocketMQ的简单使用
这里需要创建2.x版本的springboot项目
导入依赖
<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.7.6</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency></dependencies>
定义配置文件
server:port: 3000rocketmq:name-server: xxx.xxx.xxx.xxx:9876 # NameServer 地址producer:group: rocketmq-4x-service_common-message-execute_pg # 全局发送者组定义
生产者定义
这里的生产者有两个,一个是普通的,一个是延时。
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.yhy.MessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.messaging.support.MessageBuilder;@Component
@Slf4j
public class GeneralMessageDemoProduce {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public SendResult sendMessage(String topic, String tag, String keys, MessageEvent messageSendEvent) {SendResult sendResult;try{StringBuilder destinationBuilder = StrUtil.builder().append(topic);if(StrUtil.isNotBlank(tag)){destinationBuilder.append(":").append(tag);}Message<?> message = MessageBuilder.withPayload(messageSendEvent).setHeader(MessageConst.PROPERTY_KEYS,keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();// 设置消息的延时级别sendResult=rocketMQTemplate.syncSend(destinationBuilder.toString(),message,2000L);log.info("[普通消息] 消息发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), keys);}catch(Throwable ex){log.error("[普通消息] 消息发送失败,消息体:{}", JSON.toJSONString(messageSendEvent), ex);throw ex;}return sendResult;}
}
延时的
@Component
@Slf4j
public class ScheduleProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public SendResult sendMessage(String topic, String tag, String keys, MessageEvent messageSendEvent ) {SendResult sendResult;try {StringBuilder destinationBuilder = StrUtil.builder().append(topic);if(StrUtil.isNotBlank(tag)){destinationBuilder.append(":").append(tag);}Message<?> message = MessageBuilder.withPayload(messageSendEvent).setHeader(MessageConst.PROPERTY_KEYS,keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();// 设置消息的延时级别sendResult=rocketMQTemplate.syncSend(destinationBuilder.toString(),message,2000L,6);log.info("[延时消息] 消息发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), keys);}catch(Throwable ex){log.error("[延时消息] 消息发送失败,消息体:{}", JSON.toJSONString(messageSendEvent), ex);throw ex;}return sendResult;}
}
消费者定义
这里也是两个消费者,普通的和延时的不在同一个主题的内
@Slf4j
@Component
@RocketMQMessageListener(topic = "rocketmq-yhy_topic",selectorExpression = "general",consumerGroup = "rocketmq-demo_general-message_cg"
)
public class GeneralMessageDemoConsume implements RocketMQListener<MessageEvent> {@Overridepublic void onMessage(MessageEvent message) {log.info("接到RocketMQ消息,消息体:{}", JSON.toJSONString(message));}
}
import com.alibaba.fastjson.JSON;
import com.yhy.MessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RocketMQMessageListener(topic = "Delay",selectorExpression = "general",consumerGroup = "rocketmq-demo_general-message_cg"
)
public class GeneralMessageDemoConsume_Delay implements RocketMQListener<MessageEvent> {@Overridepublic void onMessage(MessageEvent message) {log.info("接到RocketMQ的延时消息,消息体:{}", JSON.toJSONString(message));}
}
发送消息
这里直接在启动类发送。
@SpringBootApplication
@RestController
public class RocketMQDemoApplication {@Autowiredprivate GeneralMessageDemoProduce generalMessageDemoProduce;@Autowiredprivate ScheduleProducer scheduleProducer;@PostMapping("/test/send/general-message")public String sendGeneralMessage() {String keys= UUID.randomUUID().toString();MessageEvent messageEvent=new MessageEvent("消息具体内容——yhy",keys);SendResult sendResult=generalMessageDemoProduce.sendMessage("rocketmq-yhy_topic","general",keys,messageEvent);SendResult sendResult2=scheduleProducer.sendMessage("Delay","general",keys,messageEvent);System.out.println(sendResult.getSendStatus().name() );System.out.println(sendResult2.getSendStatus().name());return sendResult.getSendStatus().name();}public static void main(String[] args) {SpringApplication.run(RocketMQDemoApplication.class, args);}
}
postman触发

相关文章:
RocketMQ的简单使用
这里需要创建2.x版本的springboot项目 导入依赖 <dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version></dependency>&…...
速盾:服务器有cdn 带宽上限建议多少
CDN(内容传输网络)是一种通过分布在全球不同地点的服务器来提供高效内容分发的技术。当用户请求访问某个网站时,CDN会根据用户的地理位置,将内容从离用户最近的服务器上提供给用户,这样可以减少延迟和带宽消耗…...
智慧工地安全+绿色施工方案
塔机监测 塔吊监测可以实现对塔机监测、群塔防碰撞、塔机区域防护和吊钩可视化 1司机身份识别认证:只有司机在监控设备进行刷卡、指纹、人脸、虹膜验证身份后才能进行设备的作业操作。 2运行工况采集与显示:清晰实时显示起重机械设备运行工况,主要显示的内容:起重量、起…...
SQL Server 存储过程:BBS论坛(表结构文档下载及30个存储过程)
基于 Asp.Net 和 SQL Server 实现了一个BBS论坛,论坛功能比较强大,论坛大部分业务逻辑基于存储过程实现,记录一下。 BBS论坛存储过程清单 序号存储过程功能说明1sp_bbs_admin_add添加管理员2sp_bbs_admin_del删除系统管理员3sp_bbs_admin_m…...
03 Python进阶:MySQL - mysql-connector
mysql-connector安装 要在 Python 中使用 MySQL 数据库,你需要安装 MySQL 官方提供的 MySQL Connector/Python。下面是安装 MySQL Connector/Python 的步骤: 首先,确保你已经安装了 Python,如果没有安装,可以在 Python…...
InnoDB 行记录格式(“存储一行行数据的结构“)
1.行格式 1.1 Compact行格式 1.1.1 示意图 1.1.2 准备一下 1)建表 mysql> CREATE TABLE record_format_demo (-> c1 VARCHAR(10),-> c2 VARCHAR(10) NOT NULL,-> c3 CHAR(10),-> c4 VARCHAR(10)-> ) CHARSETascii ROW_FORMATCOM…...
【洛谷】P9236 [蓝桥杯 2023 省 A] 异或和之和
题目链接 P9236 [蓝桥杯 2023 省 A] 异或和之和 - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 思路 1. 暴力求解 直接枚举出所有子数组,求每个子数组的异或和,再对所有的异或和求和 枚举所有子数组的时间复杂度为O(N^2)&…...
ThreadLocal加切面实现线程级别的方法缓存
1、实现效果 当一个请求线程多次请求A方法时,只会触发一次A方法的实际调用,会将方法结果缓存起来,避免多次调用。 2、实现过程 1. 需要一个注解ThreadLocalCache,在需要缓存的方法上加上该注解 2. 需要一个切面,借助ThreadLocal,将结果缓存起来,利用环绕通知来实现方法拦截从…...
使用 Flume 将 CSV 数据导入 Kafka:实现实时数据流
使用 Flume 将 CSV 数据导入 Kafka:实现实时数据流 文介绍了如何使用 Apache Flume 将 CSV 格式的数据从本地文件系统导入到 Apache Kafka 中,以实现实时数据流处理。通过 Flume 的配置和操作步骤,我们可以轻松地将数据从 CSV 文件中读取并发…...
对代理模式的理解
目录 一、前言二、案例1 代码2 自定义代理类【静态代理】2.1 一个接口多个实现,到底注入哪个依赖呢?2.1.1 Primary注解2.1.2 Resource注解(指定name属性)2.1.3 Qualifier注解 2.2 面向接口编程2.3 如果没接口咋办呢?2.…...
#QT项目实战(天气预报)
1.IDE:QTCreator 2.实验: 3.记录: (1)调用API的Url a.调用API获取IP whois.pconline.com.cn/ipJson.jsp?iphttp://whois.pconline.com.cn/ipJson.jsp?ip if(window.IPCallBack) {IPCallBack({"ip":&quo…...
数据挖掘|关联分析与Apriori算法详解
数据挖掘|关联分析与Apriori算法 1. 关联分析2. 关联规则相关概念2.1 项目2.2 事务2.3 项目集2.4 频繁项目集2.5 支持度2.6 置信度2.7 提升度2.8 强关联规则2.9 关联规则的分类 3. Apriori算法3.1 Apriori算法的Python实现3.2 基于mlxtend库的Apriori算法的Python实现 1. 关联分…...
ChatGPT Excel 大师
原文:ChatGPT Excel Mastery 译者:飞龙 协议:CC BY-NC-SA 4.0 序言 欢迎来到 Excel 掌握的变革之旅,在这里,尖端技术和永恒专业知识在“ChatGPT Excel 掌握:释放专家技巧和窍门的力量”中融合。在当今快节…...
C 语言中的 end, _end 符号
使用 man 3 end 可以看到相关符号的解释 这些符号不是在 C 语言文件和头文件中定义的,它们是 ld 在链接所有 .o 文件的时候自己添加的。 end 和 _end 的地址,就是最终程序的堆的起始地址 要打印它们的话,一个样例程序在下面: …...
绿联 安装PDF工具
这是一个强大的本地托管的基于 Web 的 PDF 操作工具,使用 docker,允许您对 PDF 文件执行各种操作,例如拆分、合并、转换、重组、添加图像、旋转、压缩等。这个本地托管的 Web 应用程序最初是 100% ChatGPT 制作的应用程序,现已发展…...
备战蓝桥杯---数论相关问题
目录 一、最大公约数和最小公倍数 二、素数判断 三、同余 四、唯一分解定理 五、约数个数定理 六、约数和定理 五、快速幂 六、费马小定理 七、逆元 一、最大公约数和最小公倍数 文章链接:最大公约数和最小公倍数 二、素数判断 文章链接:在J…...
苹果手表Apple Watch录了两个半小时的录音,却只能播放4秒,同步到手机也一样,还能修复好吗?
好多人遇到这个情况,用苹果手表Apple Watch录音,有的录1个多小时,有的录了3、4小时,甚至更长时间,因为手表没电,忘记保存等原因造成录音损坏,都是只能播放4秒,同步到手机也一样&…...
RGB三通道和灰度值的理解
本文都是来自于chatGPT的回答!!! 目录 Q1:像素具有什么属性?Q2:图像的色彩是怎么实现的?Q3:灰度值和颜色值是一个概念吗?Q4:是不是像素具有灰度值,也有三个颜色分量RGB?Q5:灰度图像是没有色彩的吗?Q6: 彩色图像是既具有灰度值也具有RGB三…...
ARM、X86、RISC-V三分天下
引入: 简单的介绍一下X86、ARM、RISC-V三种cpu架构的区别和应用场景。 目录 简单概念讲解 1. X86架构 2. ARM架构 3. RISC-V架构 应用场景 X86、ARM和RISC-V是三种不同的CPU架构,它们在设计理念、指令集和应用场景上有一些区别。 简单概念讲解 1. X…...
力控机器人原理及力控制实现
力控机器人原理及力控制实现 力控机器人是一种能够感知力量并具有实时控制能力的机器人系统。它们可以在与人类进行精准协作和合作时,将力传感技术(Force Sensing Technology)和控制算法(Control Algorithm)结合起来&a…...
1564286-24-3,Cy5 DBCO SE,应用于生物分子标记、分子成像
一.名称英文名称:Cy5 DBCO NHS Ester,Cy5 DBCO SE,Cyanine5 DBCO NHS Ester,Cy5 Dibenzocyclooctyne NHS Ester中文名称:Cy5-二苯并环辛炔-NHS 酯,花菁染料Cy5-二苯并环辛炔-琥珀酰亚胺酯CAS 号:…...
DeepSeek-OCR 2技术突破:动态视觉token重排效果展示
DeepSeek-OCR 2技术突破:动态视觉token重排效果展示 1. 引言 想象一下,当你阅读一份复杂的学术论文时,眼睛不会机械地从左上角扫到右下角,而是会自然地跳过标题、关注图表、追踪公式推导,甚至在不同的文本栏之间灵活…...
League Akari:英雄联盟玩家的终极智能辅助工具实战指南
League Akari:英雄联盟玩家的终极智能辅助工具实战指南 【免费下载链接】League-Toolkit 兴趣使然的、简单易用的英雄联盟工具集。支持战绩查询、自动秒选等功能。基于 LCU API。 项目地址: https://gitcode.com/gh_mirrors/le/League-Toolkit 你是否厌倦了在…...
行业观察2026年3月五家geo优化服务商实测对比与选型决策指南
在2026年3月的智能商业环境中,企业竞争的焦点已从信息曝光转向认知塑造。随着生成式AI全面接管用户的信息获取与决策流程,品牌若无法在AI的“思考”过程中占据一席之地,便意味着在未来的商业对话中失语。第三方独立数据显示,2025年…...
3分钟完成Axure RP中文界面汉化:终极完整指南
3分钟完成Axure RP中文界面汉化:终极完整指南 【免费下载链接】axure-cn Chinese language file for Axure RP. Axure RP 简体中文语言包,不定期更新。支持 Axure 9、Axure 10。 项目地址: https://gitcode.com/gh_mirrors/ax/axure-cn 还在为Axu…...
MCP 测试文章 1774508531523
这是一篇来自 MCP Server 的测试文章 测试正常工作!...
Unity Enter Play Mode Settings 搭配手动Reload全攻略:既保速度又保数据安全
Unity开发效率革命:Enter Play Mode Settings与智能Reload的黄金组合 在Unity项目开发的中后期,随着代码量膨胀和资源规模增长,每次按下Play按钮后的等待时间逐渐成为效率杀手。传统工作流中,脚本修改后的自动Reload机制像一把双刃…...
Qt新手必看:MinGW和MSVC构建套件到底怎么选?保姆级对比指南
Qt构建套件选择指南:MinGW与MSVC深度对比与实战决策 刚接触Qt开发的初学者,往往在配置开发环境的第一步就陷入选择困难——面对MinGW和MSVC这两个构建套件选项,究竟该如何抉择?这个看似简单的选择背后,实则关系到后续开…...
告别CANFD高速丢帧!手把手教你配置STM32 FDCAN的收发器延时补偿(TDC)
攻克CANFD高速通信难题:STM32 FDCAN延时补偿实战指南 当CANFD的波特率飙升至10Mb/s时,许多工程师突然发现原本稳定的通信开始频繁丢帧——这往往不是代码逻辑问题,而是物理层信号延时在作祟。本文将带您深入STM32 FDCAN的Transceiver Delay C…...
30分钟快速搭建企业级工作流系统:RuoYi-Flowable-Plus完整指南
30分钟快速搭建企业级工作流系统:RuoYi-Flowable-Plus完整指南 【免费下载链接】RuoYi-Flowable-Plus 本项目基于 RuoYi-Vue-Plus 进行二次开发扩展Flowable工作流功能,支持在线表单设计和丰富的工作流程设计能力。如果觉得这个项目不错,麻烦…...
