今天学学消息队列RocketMQ:消息类型
RocketMQ支持的消息类型有三种:普通消息、顺序消息、延时消息、事务消息。以下内容的代码部分都是基于rocketmq-spring-boot-starter做的。
普通消息
普通消息是一种无序消息,消息分布在各个MessageQueue当中,以保证效率为第一使命。这种消息适用于对顺序没有要求的基础消费需求。这里的Topic和MessageQueue是多对多关系。

// 生产者
public static void main(String[] args) throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("rmq-group");producer.setNamesrvAddr("172.16.200.38:9876");producer.setInstanceName("producer");producer.start();try {for (int i = 0; i < 10; i++) {Thread.sleep(1000);Message msg = new Message("Topic-test", "testTag", (new Date() + " RocketMQ test msg " + i).getBytes());SendResult sendResult = producer.send(msg);System.out.println(sendResult.getMsgId());System.out.println(sendResult.getMessageQueue());System.out.println(sendResult.getSendStatus());System.out.println(sendResult.getOffsetMsgId());System.out.println(sendResult.getQueueOffset());System.out.println();System.out.println("============================");}} catch (Exception e) {e.printStackTrace();}producer.shutdown();
}// 消费者
public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");consumer.setNamesrvAddr("172.16.200.38:9876");consumer.setInstanceName("consumer");consumer.subscribe("Topic-test", "*");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : msgs) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(messageExt.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();
} 

顺序消息
当我开始对消息的时序性有要求的时候,普通消息就无法满足我们的需求了。当我们要求顺序消费的时候,我们的Topic就不能采用多对多的形式,存放在多个MessageQueue中,而是需要牺牲一部分性能,存放在一个MessageQueue中。

// 生产者
public static void main(String[] args) throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("rmq-group");producer.setNamesrvAddr("172.16.200.38:9876");producer.setInstanceName("producer");producer.start();try {for (int i = 0; i < 10; i++) {Thread.sleep(1000);Message msg = new Message("Topic-test", "testTag", (new Date() + " RocketMQ test msg " + i).getBytes());SendResult sendResult = producer.send(msg, (mqs, msg1, arg) -> mqs.get(0), null);System.out.println(sendResult.getMsgId());System.out.println(sendResult.getMessageQueue());System.out.println(sendResult.getSendStatus());System.out.println(sendResult.getOffsetMsgId());System.out.println(sendResult.getQueueOffset());System.out.println("RocketMQ test msg " + i);System.out.println("============================");}} catch (Exception e) {e.printStackTrace();}producer.shutdown();
}// 消费者
public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");consumer.setNamesrvAddr("172.16.200.38:9876");consumer.setInstanceName("consumer");consumer.subscribe("Topic-test", "*");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 跟普通消费不同的地方,这里采用了顺序消费方法。consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt messageExt : msgs) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(messageExt.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});//启动消费者consumer.start();
} 

延时消息
通过指定的通知时间间隔,让消息不会立刻被消费者收到。
// 生产者
public static void main(String[] args) throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("rmq-group");producer.setNamesrvAddr("172.16.200.38:9876");producer.setInstanceName("producer");producer.start();try {for (int i = 0; i < 10; i++) {Thread.sleep(1000);Message msg = new Message("Topic-test", "testTag", (new Date() + " RocketMQ test msg " + i).getBytes());// 指定延时等级// DelayTimeLevel对应的延时时间在服务端定义:rocketmq/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java// private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";msg.setDelayTimeLevel(4);// 按毫秒延时msg.setDelayTimeMs(1L);// 按秒延时msg.setDelayTimeSec(1);SendResult sendResult = producer.send(msg);System.out.println(sendResult.getMsgId());System.out.println(sendResult.getMessageQueue());System.out.println(sendResult.getSendStatus());System.out.println(sendResult.getOffsetMsgId());System.out.println(sendResult.getQueueOffset());System.out.println();System.out.println("============================");}} catch (Exception e) {e.printStackTrace();}producer.shutdown();
}// 消费者
public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");consumer.setNamesrvAddr("172.16.200.38:9876");consumer.setInstanceName("consumer");consumer.subscribe("Topic-test", "*");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : msgs) {System.out.printf("Receive New Messages: %s At %s %n", new String(messageExt.getBody()), new Date());}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();
} 
30秒的延迟

事务消息
事务消息是RocketMQ提供的一种高级的消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。
事务消息的生命周期
(1)初始化:半事务消息被生产者构建并完成初始化,待发送到RocketMQ服务端的状态。
(2)事务待提交:半事务消息被发送到服务端,和普通消息不同,半事务消息并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。
(3)消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。
(4)提交事务待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见。
(5)消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。此时服务端会等待消费者完成消费并提交消费结果,如果一段时间后没有收到消费者的响应,服务端会对消息进行重试处理。
(6)消费完成:消费者完成了消费动作,并向服务端提交了消费结果,服务端标记当前消息已经被处理。
(7)消息删除:服务端按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

// 生产者
public static void main(String[] args) throws MQClientException {TransactionMQProducer producer = new TransactionMQProducer("rmq-group");TransactionListener listener = new TransactionListenerImpl();producer.setNamesrvAddr("172.16.200.38:9876");producer.setInstanceName("producer");producer.setTransactionListener(listener);producer.start();try {for (int i = 0; i < 10; i++) {Thread.sleep(1000);Message msg = new Message("Topic-test", "testTag", (new Date() + " RocketMQ test msg " + i).getBytes());SendResult sendResult = producer.sendMessageInTransaction(msg, null);System.out.println(new Date());System.out.println(sendResult.getMsgId());System.out.println(sendResult.getMessageQueue());System.out.println(sendResult.getSendStatus());System.out.println(sendResult.getOffsetMsgId());System.out.println(sendResult.getQueueOffset());System.out.println();System.out.println("============================");}} catch (Exception e) {e.printStackTrace();}producer.shutdown();
}// 消费者
public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");consumer.setNamesrvAddr("172.16.200.38:9876");consumer.setInstanceName("consumer");consumer.subscribe("Topic-test", "*");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : msgs) {System.out.printf("Receive New Messages: %s At %s %n", new String(messageExt.getBody()), new Date());}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();
} 
控制台打印
// 生产者控制台打印
Wed Jul 26 17:55:45 CST 2023 RocketMQ test msg 0 : executeLocalTransaction:事务执行失败,回滚
Wed Jul 26 17:55:45 CST 2023
7F00000154F073D16E938497DE420000
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=1]
SEND_OK
null
93============================
Wed Jul 26 17:55:46 CST 2023 RocketMQ test msg 1 : executeLocalTransaction:未知状态
Wed Jul 26 17:55:46 CST 2023
7F00000154F073D16E938497E2340001
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=2]
SEND_OK
null
94============================
Wed Jul 26 17:55:47 CST 2023 RocketMQ test msg 2 : executeLocalTransaction:事务执行失败,回滚
Wed Jul 26 17:55:47 CST 2023
7F00000154F073D16E938497E6230002
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=3]
SEND_OK
null
95============================
Wed Jul 26 17:55:48 CST 2023 RocketMQ test msg 3 : executeLocalTransaction:事务执行成功,提交
Wed Jul 26 17:55:48 CST 2023
7F00000154F073D16E938497EA180003
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=0]
SEND_OK
null
96============================
Wed Jul 26 17:55:49 CST 2023 RocketMQ test msg 4 : executeLocalTransaction:事务执行失败,回滚
Wed Jul 26 17:55:49 CST 2023
7F00000154F073D16E938497EE0B0004
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=1]
SEND_OK
null
97============================
Wed Jul 26 17:55:50 CST 2023 RocketMQ test msg 5 : executeLocalTransaction:事务执行成功,提交
Wed Jul 26 17:55:50 CST 2023
7F00000154F073D16E938497F1F70005
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=2]
SEND_OK
null
98============================
Wed Jul 26 17:55:51 CST 2023 RocketMQ test msg 6 : executeLocalTransaction:事务执行成功,提交
Wed Jul 26 17:55:51 CST 2023
7F00000154F073D16E938497F5EC0006
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=3]
SEND_OK
null
99============================
Wed Jul 26 17:55:52 CST 2023 RocketMQ test msg 7 : executeLocalTransaction:事务执行失败,回滚
Wed Jul 26 17:55:52 CST 2023
7F00000154F073D16E938497F9E50007
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=0]
SEND_OK
null
100============================
Wed Jul 26 17:55:53 CST 2023 RocketMQ test msg 8 : executeLocalTransaction:事务执行失败,回滚
Wed Jul 26 17:55:53 CST 2023
7F00000154F073D16E938497FDD30008
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=1]
SEND_OK
null
101============================
Wed Jul 26 17:55:54 CST 2023 RocketMQ test msg 9 : executeLocalTransaction:事务执行成功,提交
Wed Jul 26 17:55:54 CST 2023
7F00000154F073D16E93849801C90009
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=2]
SEND_OK
null
102============================// 消费者控制台打印
Receive New Messages: Wed Jul 26 17:55:50 CST 2023 RocketMQ test msg 5 At Wed Jul 26 17:55:50 CST 2023 
Receive New Messages: Wed Jul 26 17:55:51 CST 2023 RocketMQ test msg 6 At Wed Jul 26 17:55:51 CST 2023 
Receive New Messages: Wed Jul 26 17:55:54 CST 2023 RocketMQ test msg 9 At Wed Jul 26 17:55:54 CST 2023  
这里消费者只收到了执行事务成功的5,6,9。
相关文章:
今天学学消息队列RocketMQ:消息类型
RocketMQ支持的消息类型有三种:普通消息、顺序消息、延时消息、事务消息。以下内容的代码部分都是基于rocketmq-spring-boot-starter做的。 普通消息 普通消息是一种无序消息,消息分布在各个MessageQueue当中,以保证效率为第一使命。这种消息…...
小程序附件下载并预览功能
一、实现的功能: 1、word、excel、图片等实现下载并预览 2、打开文件后显示文件名称 二、代码: // 判断文件类型whatFileType(url) {let sr url.lastIndexOf("."); // 最后一次出现的位置let fileType url.substr(sr 1); // 截取url的…...
数据库缓存服务——NoSQL之Redis配置与优化
目录 一、缓存概念 1.1 系统缓存 1.2 缓存保存位置及分层结构 1.2.1 DNS缓存 1.2.2 应用层缓存 1.2.3 数据层缓存 1.2.4 硬件缓存 二、关系型数据库与非关系型数据库 2.1 关系型数据库 2.2 非关系型数据库 2.3 关系型数据库和非关系型数据库区别: 2.4 非…...
【雕爷学编程】MicroPython动手做(13)——掌控板之RGB三色灯
知识点:什么是掌控板? 掌控板是一块普及STEAM创客教育、人工智能教育、机器人编程教育的开源智能硬件。它集成ESP-32高性能双核芯片,支持WiFi和蓝牙双模通信,可作为物联网节点,实现物联网应用。同时掌控板上集成了OLED…...
.Net Core上传组件_.Net Core图片上传组件_Uploader7.0
一、.Net Core上传组件Uploader7.0简介 1.当前版本v7.0,前端框架丰富升级 2.前端jquery框架封装,cover.js, 腾讯云cos-js-sdk-v5.min.js 3.后端,支持Asp.Net 和 Asp.Net Core 矿建 4.数据传输模式支持:WebScoket 、Ajax、Form 模式上传到…...
Exadata磁盘损坏导致磁盘组无法mount恢复(oracle一体机磁盘组异常恢复)---惜分飞
Oracle Exadata客户,在换盘过程中,cell节点又一块磁盘损坏,导致datac1磁盘组(该磁盘组是normal方式冗余)无法mount Thu Jul 20 22:01:21 2023 SQL> alter diskgroup datac1 mount force NOTE: cache registered group DATAC1 number1 incarn0x0728ad12 NOTE: ca…...
左值引用与右值引用的区别?右值引用的意义?
左值引用与右值引用的区别?右值引用的意义? 1 区别1.1 功能差异1.2 左值引用1.3 右值引用1.3.1 实现移动语义1.3.2 实现完美转发 2 引用的作用3 区分左值和右值3.1 左值3.2 右值 1 区别 左值引用是对左值的引用;右值引用是对右值的引用。 &…...
2023年深圳杯数学建模D题基于机理的致伤工具推断
2023年深圳杯数学建模 D题 基于机理的致伤工具推断 原题再现: 致伤工具的推断一直是法医工作中的热点和难点。由于作用位置、作用方式的不同,相同的致伤工具在人体组织上会形成不同的损伤形态,不同的致伤工具也可能形成相同的损伤形态。致伤…...
Vue的router学习
,前端路由的核心是什么呢?改变URL,但是页面不进行整体的刷新。 vue-router是基于路由和组件的  路由用于设定访问路径, 将路径和组件映射起来;  在vue-router的单页面应用中, 页面的路径的改变就是组件的切换; 使用router需要…...
Inpaint Anything: 自动化抹除视频元素
自动化抹除视频元素 不用逐帧抠图,直接SAM Tracking Video Inpainting就能实现自动化抹除奔跑吧idol。 https://github.com/geekyutao/Inpaint-Anything 目录 网站演示参考文献 网站 https://huggingface.co/spaces/InpaintAI/Inpaint-Anything 演示 原理就是&a…...
Flutter 开发者工具 Android Studio 开发Flutter应用
Flutter 开发者工具 在 Android Studio 开发Flutter应用 🔥 Android Studio 版本更新 🔥 Android Studio Check for Update Connection failed  解决方案 如果是运行的是32位的android studio需要在andriod studio的启动目录下找到studio.exe.vmoptio…...
后端byte[]传给前端接收默认变成string字符串
创建时间:2023.7.28 建议:最好直接用字符串,我是没办法要求保密,存取都是字符串,程序里面是byte数组 既然他到前端会转换成字符串那么就是被转码了 那我们反向转码就好了 这是在后端处理,反正前端也是乱…...
UE5 动画蓝图模板(Animation Blueprint Template)
文章目录 前言准备内容创建动画蓝图使用动画蓝图模板示例1示例2总结前言 本文基于虚幻5.2版本介绍制作动画蓝图模板,本教程要求使用虚幻5.0及以上版本。 准备内容 使用第三人称游戏内容包,已添加可忽略。 选择第三人称游戏,添加到项目。 创建动画蓝图 在 Characters 文件…...
Log4j源码解析
Log4j源码解析 主要流程 Logger logger Logger.getLogger(Main.class); 1、通过Logger.getLogger(Class clazz) 或 Logger.getLogger(String name)进入。 2、加载LogManager进jvm, 执行静态代码块执行初始化, 创建出RepositorySelector实例及LoggerRepository实例(Hierarchy…...
Docker 容器访问宿主机服务
docker 网络简介 docker 在安装时会默认创建三个网络:bridge(默认网络模式)、 none 、host。 host 直接和宿主机共用网络。bridge 网络隔离,通过虚拟网桥(一般是 docker0)与宿主机通信。none 禁用网络功能…...
Go 发送邮件
要在Go中发送电子邮件,您可以使用第三方库,如 gomail 。以下是一个使用 gomail 发送电子邮件的示例代码: package main import ("fmt""gopkg.in/gomail.v2" ) func main() {// 创建邮件消息m : gomail.NewMessage()m.Se…...
Spring AOP 的概念及其作用
一、什么是 Spring AOP? 在介绍 Spring AOP 之前,首先要了解一下什么是 AOP ? AOP ( Aspect Oriented Programming ):面向切面编程,它是一种思想, 它是对某一类事情的集中处 理 。…...
python基础1——环境安装
文章目录 一、Windows安装二、Linux安装三、pycharm安装3.1 软件安装3.2 个性化设置3.3 基本使用3.3.1 定义变量3.3.2 查看数据类型3.3.3 运算符3.3.4 操作符3.3.5 转义符 一、Windows安装 1、下载软件安装包,官网 2、开始安装。 2.查看是否安装成功。 3.安装…...
uniapp 中 的progress加载进度条 的使用,在 页面显示数据加载的进度条,使用户的使用体验效果更好
学习目标: 学习目标如下: 例如: uniapp 中 的progress加载进度条 的使用,在 页面显示数据加载的进度条,使用户的使用体验效果更好 学习内容: 学习内容如下所示: 相关属性的说明 进度条的显…...
【尚硅谷】第01章:随堂复习与企业真题(Java语言概述)
来源:尚硅谷Java零基础全套视频教程(宋红康2023版,java入门自学必备) 基本都是宋老师发的资料里面的内容,只不过补充几个资料里没直接给出答案的问题的答案。 不想安装markdown笔记的app所以干脆在这里发一遍。 第01章:随堂复习…...
【kafka】Golang实现分布式Masscan任务调度系统
要求: 输出两个程序,一个命令行程序(命令行参数用flag)和一个服务端程序。 命令行程序支持通过命令行参数配置下发IP或IP段、端口、扫描带宽,然后将消息推送到kafka里面。 服务端程序: 从kafka消费者接收…...
1.3 VSCode安装与环境配置
进入网址Visual Studio Code - Code Editing. Redefined下载.deb文件,然后打开终端,进入下载文件夹,键入命令 sudo dpkg -i code_1.100.3-1748872405_amd64.deb 在终端键入命令code即启动vscode 需要安装插件列表 1.Chinese简化 2.ros …...
江苏艾立泰跨国资源接力:废料变黄金的绿色供应链革命
在华东塑料包装行业面临限塑令深度调整的背景下,江苏艾立泰以一场跨国资源接力的创新实践,重新定义了绿色供应链的边界。 跨国回收网络:废料变黄金的全球棋局 艾立泰在欧洲、东南亚建立再生塑料回收点,将海外废弃包装箱通过标准…...
新能源汽车智慧充电桩管理方案:新能源充电桩散热问题及消防安全监管方案
随着新能源汽车的快速普及,充电桩作为核心配套设施,其安全性与可靠性备受关注。然而,在高温、高负荷运行环境下,充电桩的散热问题与消防安全隐患日益凸显,成为制约行业发展的关键瓶颈。 如何通过智慧化管理手段优化散…...
现代密码学 | 椭圆曲线密码学—附py代码
Elliptic Curve Cryptography 椭圆曲线密码学(ECC)是一种基于有限域上椭圆曲线数学特性的公钥加密技术。其核心原理涉及椭圆曲线的代数性质、离散对数问题以及有限域上的运算。 椭圆曲线密码学是多种数字签名算法的基础,例如椭圆曲线数字签…...
3-11单元格区域边界定位(End属性)学习笔记
返回一个Range 对象,只读。该对象代表包含源区域的区域上端下端左端右端的最后一个单元格。等同于按键 End 向上键(End(xlUp))、End向下键(End(xlDown))、End向左键(End(xlToLeft)End向右键(End(xlToRight)) 注意:它移动的位置必须是相连的有内容的单元格…...
使用 SymPy 进行向量和矩阵的高级操作
在科学计算和工程领域,向量和矩阵操作是解决问题的核心技能之一。Python 的 SymPy 库提供了强大的符号计算功能,能够高效地处理向量和矩阵的各种操作。本文将深入探讨如何使用 SymPy 进行向量和矩阵的创建、合并以及维度拓展等操作,并通过具体…...
Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决
Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决 问题背景 在一个基于 Spring Cloud Gateway WebFlux 构建的微服务项目中,新增了一个本地验证码接口 /code,使用函数式路由(RouterFunction)和 Hutool 的 Circle…...
基于TurtleBot3在Gazebo地图实现机器人远程控制
1. TurtleBot3环境配置 # 下载TurtleBot3核心包 mkdir -p ~/catkin_ws/src cd ~/catkin_ws/src git clone -b noetic-devel https://github.com/ROBOTIS-GIT/turtlebot3.git git clone -b noetic https://github.com/ROBOTIS-GIT/turtlebot3_msgs.git git clone -b noetic-dev…...
vulnyx Blogger writeup
信息收集 arp-scan nmap 获取userFlag 上web看看 一个默认的页面,gobuster扫一下目录 可以看到扫出的目录中得到了一个有价值的目录/wordpress,说明目标所使用的cms是wordpress,访问http://192.168.43.213/wordpress/然后查看源码能看到 这…...
