今天学学消息队列RocketMQ:消息类型
RocketMQ支持的消息类型有三种:普通消息、顺序消息、延时消息、事务消息。以下内容的代码部分都是基于rocketmq-spring-boot-starter做的。
普通消息
普通消息是一种无序消息,消息分布在各个MessageQueue当中,以保证效率为第一使命。这种消息适用于对顺序没有要求的基础消费需求。这里的Topic和MessageQueue是多对多关系。

// 生产者
public static void main(String[] args) throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("rmq-group");producer.setNamesrvAddr("172.16.200.38:9876");producer.setInstanceName("producer");producer.start();try {for (int i = 0; i < 10; i++) {Thread.sleep(1000);Message msg = new Message("Topic-test", "testTag", (new Date() + " RocketMQ test msg " + i).getBytes());SendResult sendResult = producer.send(msg);System.out.println(sendResult.getMsgId());System.out.println(sendResult.getMessageQueue());System.out.println(sendResult.getSendStatus());System.out.println(sendResult.getOffsetMsgId());System.out.println(sendResult.getQueueOffset());System.out.println();System.out.println("============================");}} catch (Exception e) {e.printStackTrace();}producer.shutdown();
}// 消费者
public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");consumer.setNamesrvAddr("172.16.200.38:9876");consumer.setInstanceName("consumer");consumer.subscribe("Topic-test", "*");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : msgs) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(messageExt.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();
}

顺序消息
当我开始对消息的时序性有要求的时候,普通消息就无法满足我们的需求了。当我们要求顺序消费的时候,我们的Topic就不能采用多对多的形式,存放在多个MessageQueue中,而是需要牺牲一部分性能,存放在一个MessageQueue中。

// 生产者
public static void main(String[] args) throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("rmq-group");producer.setNamesrvAddr("172.16.200.38:9876");producer.setInstanceName("producer");producer.start();try {for (int i = 0; i < 10; i++) {Thread.sleep(1000);Message msg = new Message("Topic-test", "testTag", (new Date() + " RocketMQ test msg " + i).getBytes());SendResult sendResult = producer.send(msg, (mqs, msg1, arg) -> mqs.get(0), null);System.out.println(sendResult.getMsgId());System.out.println(sendResult.getMessageQueue());System.out.println(sendResult.getSendStatus());System.out.println(sendResult.getOffsetMsgId());System.out.println(sendResult.getQueueOffset());System.out.println("RocketMQ test msg " + i);System.out.println("============================");}} catch (Exception e) {e.printStackTrace();}producer.shutdown();
}// 消费者
public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");consumer.setNamesrvAddr("172.16.200.38:9876");consumer.setInstanceName("consumer");consumer.subscribe("Topic-test", "*");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 跟普通消费不同的地方,这里采用了顺序消费方法。consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt messageExt : msgs) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(messageExt.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});//启动消费者consumer.start();
}

延时消息
通过指定的通知时间间隔,让消息不会立刻被消费者收到。
// 生产者
public static void main(String[] args) throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("rmq-group");producer.setNamesrvAddr("172.16.200.38:9876");producer.setInstanceName("producer");producer.start();try {for (int i = 0; i < 10; i++) {Thread.sleep(1000);Message msg = new Message("Topic-test", "testTag", (new Date() + " RocketMQ test msg " + i).getBytes());// 指定延时等级// DelayTimeLevel对应的延时时间在服务端定义:rocketmq/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java// private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";msg.setDelayTimeLevel(4);// 按毫秒延时msg.setDelayTimeMs(1L);// 按秒延时msg.setDelayTimeSec(1);SendResult sendResult = producer.send(msg);System.out.println(sendResult.getMsgId());System.out.println(sendResult.getMessageQueue());System.out.println(sendResult.getSendStatus());System.out.println(sendResult.getOffsetMsgId());System.out.println(sendResult.getQueueOffset());System.out.println();System.out.println("============================");}} catch (Exception e) {e.printStackTrace();}producer.shutdown();
}// 消费者
public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");consumer.setNamesrvAddr("172.16.200.38:9876");consumer.setInstanceName("consumer");consumer.subscribe("Topic-test", "*");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : msgs) {System.out.printf("Receive New Messages: %s At %s %n", new String(messageExt.getBody()), new Date());}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();
}
30秒的延迟

事务消息
事务消息是RocketMQ提供的一种高级的消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。
事务消息的生命周期
(1)初始化:半事务消息被生产者构建并完成初始化,待发送到RocketMQ服务端的状态。
(2)事务待提交:半事务消息被发送到服务端,和普通消息不同,半事务消息并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。
(3)消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。
(4)提交事务待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见。
(5)消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。此时服务端会等待消费者完成消费并提交消费结果,如果一段时间后没有收到消费者的响应,服务端会对消息进行重试处理。
(6)消费完成:消费者完成了消费动作,并向服务端提交了消费结果,服务端标记当前消息已经被处理。
(7)消息删除:服务端按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

// 生产者
public static void main(String[] args) throws MQClientException {TransactionMQProducer producer = new TransactionMQProducer("rmq-group");TransactionListener listener = new TransactionListenerImpl();producer.setNamesrvAddr("172.16.200.38:9876");producer.setInstanceName("producer");producer.setTransactionListener(listener);producer.start();try {for (int i = 0; i < 10; i++) {Thread.sleep(1000);Message msg = new Message("Topic-test", "testTag", (new Date() + " RocketMQ test msg " + i).getBytes());SendResult sendResult = producer.sendMessageInTransaction(msg, null);System.out.println(new Date());System.out.println(sendResult.getMsgId());System.out.println(sendResult.getMessageQueue());System.out.println(sendResult.getSendStatus());System.out.println(sendResult.getOffsetMsgId());System.out.println(sendResult.getQueueOffset());System.out.println();System.out.println("============================");}} catch (Exception e) {e.printStackTrace();}producer.shutdown();
}// 消费者
public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");consumer.setNamesrvAddr("172.16.200.38:9876");consumer.setInstanceName("consumer");consumer.subscribe("Topic-test", "*");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : msgs) {System.out.printf("Receive New Messages: %s At %s %n", new String(messageExt.getBody()), new Date());}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();
}
控制台打印
// 生产者控制台打印
Wed Jul 26 17:55:45 CST 2023 RocketMQ test msg 0 : executeLocalTransaction:事务执行失败,回滚
Wed Jul 26 17:55:45 CST 2023
7F00000154F073D16E938497DE420000
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=1]
SEND_OK
null
93============================
Wed Jul 26 17:55:46 CST 2023 RocketMQ test msg 1 : executeLocalTransaction:未知状态
Wed Jul 26 17:55:46 CST 2023
7F00000154F073D16E938497E2340001
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=2]
SEND_OK
null
94============================
Wed Jul 26 17:55:47 CST 2023 RocketMQ test msg 2 : executeLocalTransaction:事务执行失败,回滚
Wed Jul 26 17:55:47 CST 2023
7F00000154F073D16E938497E6230002
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=3]
SEND_OK
null
95============================
Wed Jul 26 17:55:48 CST 2023 RocketMQ test msg 3 : executeLocalTransaction:事务执行成功,提交
Wed Jul 26 17:55:48 CST 2023
7F00000154F073D16E938497EA180003
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=0]
SEND_OK
null
96============================
Wed Jul 26 17:55:49 CST 2023 RocketMQ test msg 4 : executeLocalTransaction:事务执行失败,回滚
Wed Jul 26 17:55:49 CST 2023
7F00000154F073D16E938497EE0B0004
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=1]
SEND_OK
null
97============================
Wed Jul 26 17:55:50 CST 2023 RocketMQ test msg 5 : executeLocalTransaction:事务执行成功,提交
Wed Jul 26 17:55:50 CST 2023
7F00000154F073D16E938497F1F70005
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=2]
SEND_OK
null
98============================
Wed Jul 26 17:55:51 CST 2023 RocketMQ test msg 6 : executeLocalTransaction:事务执行成功,提交
Wed Jul 26 17:55:51 CST 2023
7F00000154F073D16E938497F5EC0006
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=3]
SEND_OK
null
99============================
Wed Jul 26 17:55:52 CST 2023 RocketMQ test msg 7 : executeLocalTransaction:事务执行失败,回滚
Wed Jul 26 17:55:52 CST 2023
7F00000154F073D16E938497F9E50007
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=0]
SEND_OK
null
100============================
Wed Jul 26 17:55:53 CST 2023 RocketMQ test msg 8 : executeLocalTransaction:事务执行失败,回滚
Wed Jul 26 17:55:53 CST 2023
7F00000154F073D16E938497FDD30008
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=1]
SEND_OK
null
101============================
Wed Jul 26 17:55:54 CST 2023 RocketMQ test msg 9 : executeLocalTransaction:事务执行成功,提交
Wed Jul 26 17:55:54 CST 2023
7F00000154F073D16E93849801C90009
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=2]
SEND_OK
null
102============================// 消费者控制台打印
Receive New Messages: Wed Jul 26 17:55:50 CST 2023 RocketMQ test msg 5 At Wed Jul 26 17:55:50 CST 2023
Receive New Messages: Wed Jul 26 17:55:51 CST 2023 RocketMQ test msg 6 At Wed Jul 26 17:55:51 CST 2023
Receive New Messages: Wed Jul 26 17:55:54 CST 2023 RocketMQ test msg 9 At Wed Jul 26 17:55:54 CST 2023
这里消费者只收到了执行事务成功的5,6,9。
相关文章:
今天学学消息队列RocketMQ:消息类型
RocketMQ支持的消息类型有三种:普通消息、顺序消息、延时消息、事务消息。以下内容的代码部分都是基于rocketmq-spring-boot-starter做的。 普通消息 普通消息是一种无序消息,消息分布在各个MessageQueue当中,以保证效率为第一使命。这种消息…...
小程序附件下载并预览功能
一、实现的功能: 1、word、excel、图片等实现下载并预览 2、打开文件后显示文件名称 二、代码: // 判断文件类型whatFileType(url) {let sr url.lastIndexOf("."); // 最后一次出现的位置let fileType url.substr(sr 1); // 截取url的…...
数据库缓存服务——NoSQL之Redis配置与优化
目录 一、缓存概念 1.1 系统缓存 1.2 缓存保存位置及分层结构 1.2.1 DNS缓存 1.2.2 应用层缓存 1.2.3 数据层缓存 1.2.4 硬件缓存 二、关系型数据库与非关系型数据库 2.1 关系型数据库 2.2 非关系型数据库 2.3 关系型数据库和非关系型数据库区别: 2.4 非…...
【雕爷学编程】MicroPython动手做(13)——掌控板之RGB三色灯
知识点:什么是掌控板? 掌控板是一块普及STEAM创客教育、人工智能教育、机器人编程教育的开源智能硬件。它集成ESP-32高性能双核芯片,支持WiFi和蓝牙双模通信,可作为物联网节点,实现物联网应用。同时掌控板上集成了OLED…...
.Net Core上传组件_.Net Core图片上传组件_Uploader7.0
一、.Net Core上传组件Uploader7.0简介 1.当前版本v7.0,前端框架丰富升级 2.前端jquery框架封装,cover.js, 腾讯云cos-js-sdk-v5.min.js 3.后端,支持Asp.Net 和 Asp.Net Core 矿建 4.数据传输模式支持:WebScoket 、Ajax、Form 模式上传到…...
Exadata磁盘损坏导致磁盘组无法mount恢复(oracle一体机磁盘组异常恢复)---惜分飞
Oracle Exadata客户,在换盘过程中,cell节点又一块磁盘损坏,导致datac1磁盘组(该磁盘组是normal方式冗余)无法mount Thu Jul 20 22:01:21 2023 SQL> alter diskgroup datac1 mount force NOTE: cache registered group DATAC1 number1 incarn0x0728ad12 NOTE: ca…...
左值引用与右值引用的区别?右值引用的意义?
左值引用与右值引用的区别?右值引用的意义? 1 区别1.1 功能差异1.2 左值引用1.3 右值引用1.3.1 实现移动语义1.3.2 实现完美转发 2 引用的作用3 区分左值和右值3.1 左值3.2 右值 1 区别 左值引用是对左值的引用;右值引用是对右值的引用。 &…...
2023年深圳杯数学建模D题基于机理的致伤工具推断
2023年深圳杯数学建模 D题 基于机理的致伤工具推断 原题再现: 致伤工具的推断一直是法医工作中的热点和难点。由于作用位置、作用方式的不同,相同的致伤工具在人体组织上会形成不同的损伤形态,不同的致伤工具也可能形成相同的损伤形态。致伤…...
Vue的router学习
,前端路由的核心是什么呢?改变URL,但是页面不进行整体的刷新。 vue-router是基于路由和组件的 路由用于设定访问路径, 将路径和组件映射起来; 在vue-router的单页面应用中, 页面的路径的改变就是组件的切换; 使用router需要…...
Inpaint Anything: 自动化抹除视频元素
自动化抹除视频元素 不用逐帧抠图,直接SAM Tracking Video Inpainting就能实现自动化抹除奔跑吧idol。 https://github.com/geekyutao/Inpaint-Anything 目录 网站演示参考文献 网站 https://huggingface.co/spaces/InpaintAI/Inpaint-Anything 演示 原理就是&a…...
Flutter 开发者工具 Android Studio 开发Flutter应用
Flutter 开发者工具 在 Android Studio 开发Flutter应用 🔥 Android Studio 版本更新 🔥 Android Studio Check for Update Connection failed 解决方案 如果是运行的是32位的android studio需要在andriod studio的启动目录下找到studio.exe.vmoptio…...
后端byte[]传给前端接收默认变成string字符串
创建时间:2023.7.28 建议:最好直接用字符串,我是没办法要求保密,存取都是字符串,程序里面是byte数组 既然他到前端会转换成字符串那么就是被转码了 那我们反向转码就好了 这是在后端处理,反正前端也是乱…...
UE5 动画蓝图模板(Animation Blueprint Template)
文章目录 前言准备内容创建动画蓝图使用动画蓝图模板示例1示例2总结前言 本文基于虚幻5.2版本介绍制作动画蓝图模板,本教程要求使用虚幻5.0及以上版本。 准备内容 使用第三人称游戏内容包,已添加可忽略。 选择第三人称游戏,添加到项目。 创建动画蓝图 在 Characters 文件…...
Log4j源码解析
Log4j源码解析 主要流程 Logger logger Logger.getLogger(Main.class); 1、通过Logger.getLogger(Class clazz) 或 Logger.getLogger(String name)进入。 2、加载LogManager进jvm, 执行静态代码块执行初始化, 创建出RepositorySelector实例及LoggerRepository实例(Hierarchy…...
Docker 容器访问宿主机服务
docker 网络简介 docker 在安装时会默认创建三个网络:bridge(默认网络模式)、 none 、host。 host 直接和宿主机共用网络。bridge 网络隔离,通过虚拟网桥(一般是 docker0)与宿主机通信。none 禁用网络功能…...
Go 发送邮件
要在Go中发送电子邮件,您可以使用第三方库,如 gomail 。以下是一个使用 gomail 发送电子邮件的示例代码: package main import ("fmt""gopkg.in/gomail.v2" ) func main() {// 创建邮件消息m : gomail.NewMessage()m.Se…...
Spring AOP 的概念及其作用
一、什么是 Spring AOP? 在介绍 Spring AOP 之前,首先要了解一下什么是 AOP ? AOP ( Aspect Oriented Programming ):面向切面编程,它是一种思想, 它是对某一类事情的集中处 理 。…...
python基础1——环境安装
文章目录 一、Windows安装二、Linux安装三、pycharm安装3.1 软件安装3.2 个性化设置3.3 基本使用3.3.1 定义变量3.3.2 查看数据类型3.3.3 运算符3.3.4 操作符3.3.5 转义符 一、Windows安装 1、下载软件安装包,官网 2、开始安装。 2.查看是否安装成功。 3.安装…...
uniapp 中 的progress加载进度条 的使用,在 页面显示数据加载的进度条,使用户的使用体验效果更好
学习目标: 学习目标如下: 例如: uniapp 中 的progress加载进度条 的使用,在 页面显示数据加载的进度条,使用户的使用体验效果更好 学习内容: 学习内容如下所示: 相关属性的说明 进度条的显…...
【尚硅谷】第01章:随堂复习与企业真题(Java语言概述)
来源:尚硅谷Java零基础全套视频教程(宋红康2023版,java入门自学必备) 基本都是宋老师发的资料里面的内容,只不过补充几个资料里没直接给出答案的问题的答案。 不想安装markdown笔记的app所以干脆在这里发一遍。 第01章:随堂复习…...
循环冗余码校验CRC码 算法步骤+详细实例计算
通信过程:(白话解释) 我们将原始待发送的消息称为 M M M,依据发送接收消息双方约定的生成多项式 G ( x ) G(x) G(x)(意思就是 G ( x ) G(x) G(x) 是已知的)࿰…...
1688商品列表API与其他数据源的对接思路
将1688商品列表API与其他数据源对接时,需结合业务场景设计数据流转链路,重点关注数据格式兼容性、接口调用频率控制及数据一致性维护。以下是具体对接思路及关键技术点: 一、核心对接场景与目标 商品数据同步 场景:将1688商品信息…...
Nuxt.js 中的路由配置详解
Nuxt.js 通过其内置的路由系统简化了应用的路由配置,使得开发者可以轻松地管理页面导航和 URL 结构。路由配置主要涉及页面组件的组织、动态路由的设置以及路由元信息的配置。 自动路由生成 Nuxt.js 会根据 pages 目录下的文件结构自动生成路由配置。每个文件都会对…...
蓝桥杯3498 01串的熵
问题描述 对于一个长度为 23333333的 01 串, 如果其信息熵为 11625907.5798, 且 0 出现次数比 1 少, 那么这个 01 串中 0 出现了多少次? #include<iostream> #include<cmath> using namespace std;int n 23333333;int main() {//枚举 0 出现的次数//因…...
短视频矩阵系统文案创作功能开发实践,定制化开发
在短视频行业迅猛发展的当下,企业和个人创作者为了扩大影响力、提升传播效果,纷纷采用短视频矩阵运营策略,同时管理多个平台、多个账号的内容发布。然而,频繁的文案创作需求让运营者疲于应对,如何高效产出高质量文案成…...
莫兰迪高级灰总结计划简约商务通用PPT模版
莫兰迪高级灰总结计划简约商务通用PPT模版,莫兰迪调色板清新简约工作汇报PPT模版,莫兰迪时尚风极简设计PPT模版,大学生毕业论文答辩PPT模版,莫兰迪配色总结计划简约商务通用PPT模版,莫兰迪商务汇报PPT模版,…...
数学建模-滑翔伞伞翼面积的设计,运动状态计算和优化 !
我们考虑滑翔伞的伞翼面积设计问题以及运动状态描述。滑翔伞的性能主要取决于伞翼面积、气动特性以及飞行员的重量。我们的目标是建立数学模型来描述滑翔伞的运动状态,并优化伞翼面积的设计。 一、问题分析 滑翔伞在飞行过程中受到重力、升力和阻力的作用。升力和阻力与伞翼面…...
【Linux】Linux安装并配置RabbitMQ
目录 1. 安装 Erlang 2. 安装 RabbitMQ 2.1.添加 RabbitMQ 仓库 2.2.安装 RabbitMQ 3.配置 3.1.启动和管理服务 4. 访问管理界面 5.安装问题 6.修改密码 7.修改端口 7.1.找到文件 7.2.修改文件 1. 安装 Erlang 由于 RabbitMQ 是用 Erlang 编写的,需要先安…...
跨平台商品数据接口的标准化与规范化发展路径:淘宝京东拼多多的最新实践
在电商行业蓬勃发展的当下,多平台运营已成为众多商家的必然选择。然而,不同电商平台在商品数据接口方面存在差异,导致商家在跨平台运营时面临诸多挑战,如数据对接困难、运营效率低下、用户体验不一致等。跨平台商品数据接口的标准…...
中科院1区顶刊|IF14+:多组学MR联合单细胞时空分析,锁定心血管代谢疾病的免疫治疗新靶点
中科院1区顶刊|IF14:多组学MR联合单细胞时空分析,锁定心血管代谢疾病的免疫治疗新靶点 当下,免疫与代谢性疾病的关联研究已成为生命科学领域的前沿热点。随着研究的深入,我们愈发清晰地认识到免疫系统与代谢系统之间存在着极为复…...
