RocketMQ的简单使用
这里需要创建2.x版本的springboot项目
导入依赖
<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.7.6</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency></dependencies>
定义配置文件
server:port: 3000rocketmq:name-server: xxx.xxx.xxx.xxx:9876 # NameServer 地址producer:group: rocketmq-4x-service_common-message-execute_pg # 全局发送者组定义
生产者定义
这里的生产者有两个,一个是普通的,一个是延时。
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.yhy.MessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.messaging.support.MessageBuilder;@Component
@Slf4j
public class GeneralMessageDemoProduce {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public SendResult sendMessage(String topic, String tag, String keys, MessageEvent messageSendEvent) {SendResult sendResult;try{StringBuilder destinationBuilder = StrUtil.builder().append(topic);if(StrUtil.isNotBlank(tag)){destinationBuilder.append(":").append(tag);}Message<?> message = MessageBuilder.withPayload(messageSendEvent).setHeader(MessageConst.PROPERTY_KEYS,keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();// 设置消息的延时级别sendResult=rocketMQTemplate.syncSend(destinationBuilder.toString(),message,2000L);log.info("[普通消息] 消息发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), keys);}catch(Throwable ex){log.error("[普通消息] 消息发送失败,消息体:{}", JSON.toJSONString(messageSendEvent), ex);throw ex;}return sendResult;}
}
延时的
@Component
@Slf4j
public class ScheduleProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public SendResult sendMessage(String topic, String tag, String keys, MessageEvent messageSendEvent ) {SendResult sendResult;try {StringBuilder destinationBuilder = StrUtil.builder().append(topic);if(StrUtil.isNotBlank(tag)){destinationBuilder.append(":").append(tag);}Message<?> message = MessageBuilder.withPayload(messageSendEvent).setHeader(MessageConst.PROPERTY_KEYS,keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();// 设置消息的延时级别sendResult=rocketMQTemplate.syncSend(destinationBuilder.toString(),message,2000L,6);log.info("[延时消息] 消息发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), keys);}catch(Throwable ex){log.error("[延时消息] 消息发送失败,消息体:{}", JSON.toJSONString(messageSendEvent), ex);throw ex;}return sendResult;}
}
消费者定义
这里也是两个消费者,普通的和延时的不在同一个主题的内
@Slf4j
@Component
@RocketMQMessageListener(topic = "rocketmq-yhy_topic",selectorExpression = "general",consumerGroup = "rocketmq-demo_general-message_cg"
)
public class GeneralMessageDemoConsume implements RocketMQListener<MessageEvent> {@Overridepublic void onMessage(MessageEvent message) {log.info("接到RocketMQ消息,消息体:{}", JSON.toJSONString(message));}
}
import com.alibaba.fastjson.JSON;
import com.yhy.MessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RocketMQMessageListener(topic = "Delay",selectorExpression = "general",consumerGroup = "rocketmq-demo_general-message_cg"
)
public class GeneralMessageDemoConsume_Delay implements RocketMQListener<MessageEvent> {@Overridepublic void onMessage(MessageEvent message) {log.info("接到RocketMQ的延时消息,消息体:{}", JSON.toJSONString(message));}
}
发送消息
这里直接在启动类发送。
@SpringBootApplication
@RestController
public class RocketMQDemoApplication {@Autowiredprivate GeneralMessageDemoProduce generalMessageDemoProduce;@Autowiredprivate ScheduleProducer scheduleProducer;@PostMapping("/test/send/general-message")public String sendGeneralMessage() {String keys= UUID.randomUUID().toString();MessageEvent messageEvent=new MessageEvent("消息具体内容——yhy",keys);SendResult sendResult=generalMessageDemoProduce.sendMessage("rocketmq-yhy_topic","general",keys,messageEvent);SendResult sendResult2=scheduleProducer.sendMessage("Delay","general",keys,messageEvent);System.out.println(sendResult.getSendStatus().name() );System.out.println(sendResult2.getSendStatus().name());return sendResult.getSendStatus().name();}public static void main(String[] args) {SpringApplication.run(RocketMQDemoApplication.class, args);}
}
postman触发

相关文章:
RocketMQ的简单使用
这里需要创建2.x版本的springboot项目 导入依赖 <dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version></dependency>&…...
速盾:服务器有cdn 带宽上限建议多少
CDN(内容传输网络)是一种通过分布在全球不同地点的服务器来提供高效内容分发的技术。当用户请求访问某个网站时,CDN会根据用户的地理位置,将内容从离用户最近的服务器上提供给用户,这样可以减少延迟和带宽消耗…...
智慧工地安全+绿色施工方案
塔机监测 塔吊监测可以实现对塔机监测、群塔防碰撞、塔机区域防护和吊钩可视化 1司机身份识别认证:只有司机在监控设备进行刷卡、指纹、人脸、虹膜验证身份后才能进行设备的作业操作。 2运行工况采集与显示:清晰实时显示起重机械设备运行工况,主要显示的内容:起重量、起…...
SQL Server 存储过程:BBS论坛(表结构文档下载及30个存储过程)
基于 Asp.Net 和 SQL Server 实现了一个BBS论坛,论坛功能比较强大,论坛大部分业务逻辑基于存储过程实现,记录一下。 BBS论坛存储过程清单 序号存储过程功能说明1sp_bbs_admin_add添加管理员2sp_bbs_admin_del删除系统管理员3sp_bbs_admin_m…...
03 Python进阶:MySQL - mysql-connector
mysql-connector安装 要在 Python 中使用 MySQL 数据库,你需要安装 MySQL 官方提供的 MySQL Connector/Python。下面是安装 MySQL Connector/Python 的步骤: 首先,确保你已经安装了 Python,如果没有安装,可以在 Python…...
InnoDB 行记录格式(“存储一行行数据的结构“)
1.行格式 1.1 Compact行格式 1.1.1 示意图 1.1.2 准备一下 1)建表 mysql> CREATE TABLE record_format_demo (-> c1 VARCHAR(10),-> c2 VARCHAR(10) NOT NULL,-> c3 CHAR(10),-> c4 VARCHAR(10)-> ) CHARSETascii ROW_FORMATCOM…...
【洛谷】P9236 [蓝桥杯 2023 省 A] 异或和之和
题目链接 P9236 [蓝桥杯 2023 省 A] 异或和之和 - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 思路 1. 暴力求解 直接枚举出所有子数组,求每个子数组的异或和,再对所有的异或和求和 枚举所有子数组的时间复杂度为O(N^2)&…...
ThreadLocal加切面实现线程级别的方法缓存
1、实现效果 当一个请求线程多次请求A方法时,只会触发一次A方法的实际调用,会将方法结果缓存起来,避免多次调用。 2、实现过程 1. 需要一个注解ThreadLocalCache,在需要缓存的方法上加上该注解 2. 需要一个切面,借助ThreadLocal,将结果缓存起来,利用环绕通知来实现方法拦截从…...
使用 Flume 将 CSV 数据导入 Kafka:实现实时数据流
使用 Flume 将 CSV 数据导入 Kafka:实现实时数据流 文介绍了如何使用 Apache Flume 将 CSV 格式的数据从本地文件系统导入到 Apache Kafka 中,以实现实时数据流处理。通过 Flume 的配置和操作步骤,我们可以轻松地将数据从 CSV 文件中读取并发…...
对代理模式的理解
目录 一、前言二、案例1 代码2 自定义代理类【静态代理】2.1 一个接口多个实现,到底注入哪个依赖呢?2.1.1 Primary注解2.1.2 Resource注解(指定name属性)2.1.3 Qualifier注解 2.2 面向接口编程2.3 如果没接口咋办呢?2.…...
#QT项目实战(天气预报)
1.IDE:QTCreator 2.实验: 3.记录: (1)调用API的Url a.调用API获取IP whois.pconline.com.cn/ipJson.jsp?iphttp://whois.pconline.com.cn/ipJson.jsp?ip if(window.IPCallBack) {IPCallBack({"ip":&quo…...
数据挖掘|关联分析与Apriori算法详解
数据挖掘|关联分析与Apriori算法 1. 关联分析2. 关联规则相关概念2.1 项目2.2 事务2.3 项目集2.4 频繁项目集2.5 支持度2.6 置信度2.7 提升度2.8 强关联规则2.9 关联规则的分类 3. Apriori算法3.1 Apriori算法的Python实现3.2 基于mlxtend库的Apriori算法的Python实现 1. 关联分…...
ChatGPT Excel 大师
原文:ChatGPT Excel Mastery 译者:飞龙 协议:CC BY-NC-SA 4.0 序言 欢迎来到 Excel 掌握的变革之旅,在这里,尖端技术和永恒专业知识在“ChatGPT Excel 掌握:释放专家技巧和窍门的力量”中融合。在当今快节…...
C 语言中的 end, _end 符号
使用 man 3 end 可以看到相关符号的解释 这些符号不是在 C 语言文件和头文件中定义的,它们是 ld 在链接所有 .o 文件的时候自己添加的。 end 和 _end 的地址,就是最终程序的堆的起始地址 要打印它们的话,一个样例程序在下面: …...
绿联 安装PDF工具
这是一个强大的本地托管的基于 Web 的 PDF 操作工具,使用 docker,允许您对 PDF 文件执行各种操作,例如拆分、合并、转换、重组、添加图像、旋转、压缩等。这个本地托管的 Web 应用程序最初是 100% ChatGPT 制作的应用程序,现已发展…...
备战蓝桥杯---数论相关问题
目录 一、最大公约数和最小公倍数 二、素数判断 三、同余 四、唯一分解定理 五、约数个数定理 六、约数和定理 五、快速幂 六、费马小定理 七、逆元 一、最大公约数和最小公倍数 文章链接:最大公约数和最小公倍数 二、素数判断 文章链接:在J…...
苹果手表Apple Watch录了两个半小时的录音,却只能播放4秒,同步到手机也一样,还能修复好吗?
好多人遇到这个情况,用苹果手表Apple Watch录音,有的录1个多小时,有的录了3、4小时,甚至更长时间,因为手表没电,忘记保存等原因造成录音损坏,都是只能播放4秒,同步到手机也一样&…...
RGB三通道和灰度值的理解
本文都是来自于chatGPT的回答!!! 目录 Q1:像素具有什么属性?Q2:图像的色彩是怎么实现的?Q3:灰度值和颜色值是一个概念吗?Q4:是不是像素具有灰度值,也有三个颜色分量RGB?Q5:灰度图像是没有色彩的吗?Q6: 彩色图像是既具有灰度值也具有RGB三…...
ARM、X86、RISC-V三分天下
引入: 简单的介绍一下X86、ARM、RISC-V三种cpu架构的区别和应用场景。 目录 简单概念讲解 1. X86架构 2. ARM架构 3. RISC-V架构 应用场景 X86、ARM和RISC-V是三种不同的CPU架构,它们在设计理念、指令集和应用场景上有一些区别。 简单概念讲解 1. X…...
力控机器人原理及力控制实现
力控机器人原理及力控制实现 力控机器人是一种能够感知力量并具有实时控制能力的机器人系统。它们可以在与人类进行精准协作和合作时,将力传感技术(Force Sensing Technology)和控制算法(Control Algorithm)结合起来&a…...
19c补丁后oracle属主变化,导致不能识别磁盘组
补丁后服务器重启,数据库再次无法启动 ORA01017: invalid username/password; logon denied Oracle 19c 在打上 19.23 或以上补丁版本后,存在与用户组权限相关的问题。具体表现为,Oracle 实例的运行用户(oracle)和集…...
HTML 语义化
目录 HTML 语义化HTML5 新特性HTML 语义化的好处语义化标签的使用场景最佳实践 HTML 语义化 HTML5 新特性 标准答案: 语义化标签: <header>:页头<nav>:导航<main>:主要内容<article>&#x…...
SkyWalking 10.2.0 SWCK 配置过程
SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外,K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案,全安装在K8S群集中。 具体可参…...
CMake基础:构建流程详解
目录 1.CMake构建过程的基本流程 2.CMake构建的具体步骤 2.1.创建构建目录 2.2.使用 CMake 生成构建文件 2.3.编译和构建 2.4.清理构建文件 2.5.重新配置和构建 3.跨平台构建示例 4.工具链与交叉编译 5.CMake构建后的项目结构解析 5.1.CMake构建后的目录结构 5.2.构…...
网络编程(UDP编程)
思维导图 UDP基础编程(单播) 1.流程图 服务器:短信的接收方 创建套接字 (socket)-----------------------------------------》有手机指定网络信息-----------------------------------------------》有号码绑定套接字 (bind)--------------…...
RNN避坑指南:从数学推导到LSTM/GRU工业级部署实战流程
本文较长,建议点赞收藏,以免遗失。更多AI大模型应用开发学习视频及资料,尽在聚客AI学院。 本文全面剖析RNN核心原理,深入讲解梯度消失/爆炸问题,并通过LSTM/GRU结构实现解决方案,提供时间序列预测和文本生成…...
深度学习习题2
1.如果增加神经网络的宽度,精确度会增加到一个特定阈值后,便开始降低。造成这一现象的可能原因是什么? A、即使增加卷积核的数量,只有少部分的核会被用作预测 B、当卷积核数量增加时,神经网络的预测能力会降低 C、当卷…...
Android第十三次面试总结(四大 组件基础)
Activity生命周期和四大启动模式详解 一、Activity 生命周期 Activity 的生命周期由一系列回调方法组成,用于管理其创建、可见性、焦点和销毁过程。以下是核心方法及其调用时机: onCreate() 调用时机:Activity 首次创建时调用。…...
安宝特案例丨Vuzix AR智能眼镜集成专业软件,助力卢森堡医院药房转型,赢得辉瑞创新奖
在Vuzix M400 AR智能眼镜的助力下,卢森堡罗伯特舒曼医院(the Robert Schuman Hospitals, HRS)凭借在无菌制剂生产流程中引入增强现实技术(AR)创新项目,荣获了2024年6月7日由卢森堡医院药剂师协会࿰…...
20个超级好用的 CSS 动画库
分享 20 个最佳 CSS 动画库。 它们中的大多数将生成纯 CSS 代码,而不需要任何外部库。 1.Animate.css 一个开箱即用型的跨浏览器动画库,可供你在项目中使用。 2.Magic Animations CSS3 一组简单的动画,可以包含在你的网页或应用项目中。 3.An…...
