【RocketMQ系列五】消息示例-顺序消息延迟消息广播消息的实现
1. 前言
上一篇文章我们介绍了简单消息的实现,本文将主要来介绍顺序消息的实现,顺序消息分为局部顺序消息和全局顺序消息。
顺序消息指的是消费者在消费消息时,按照生产者发送消息的顺序进行消费。即先发送的先消费【FIFO】。
顺序消息分为 全局顺序消息和局部顺序消息。
全局顺序消息就是全局使用一个queue。
局部顺序消息就是 有顺序依赖的消息放在同一个queue中,多个queue并行消费。
2. 局部顺序消息
默认情况下RocketMQ会根据轮询的方式将消息发送到某个broker中的某个队列中,这样的话就不能保证消息是有序的。
比如在购物网站下单场景下:有 1. 创建订单---->2. 订单支付---->3. 订单发货---->4. 订单完成 四条消息。这四条消息逻辑上肯定是有序的。但是如果采用RocketMQ默认的消息投递方式,那么同一个订单,有可能创建订单被投递到了 MessageQueue1,订单支付的话被投递到了MessageQueue2。 由于消息在不同的MessageQueue中,消费者在消费的时候就可能会出现订单支付的消息先于创建订单的消息。
局部顺序消息就是要保证同一笔订单4条消息都放在同一个queue中,这样的话就不会出现订单支付的消息先于创建订单的消息被消费。就像下图所示:

局部顺序消息消费者在消费某个topic的某个队列中的消息的时候是顺序的。消费者使用MessageListenerOrderly类来进行消息监听。
2.1. 定义生产者
-
这里定义了名为part_order_topic_test的topic。运行程序之后该topic可以路由到broker-a 以及broker-b 两个broker。

public class OrderProducer {// 局部顺序消费,核心就是自己选择Queue,保证需要顺序保障的消息落到同一个队列中public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer defaultMQProducer = new DefaultMQProducer("order_producer_group");defaultMQProducer.setNamesrvAddr("172.31.184.89:9876");defaultMQProducer.start();for (int i = 0; i < 10; i++) {int orderId = i;for (int j = 0; j < 5; j++) {// 构建消息体,tags和key 只是做一个简单区分Message partOrderMsg = new Message("part_order_topic_test", "order_" + orderId, "KEY_" + orderId, ("局部顺序消息处理_" + orderId + ";step_" + j).getBytes());SendResult send = defaultMQProducer.send(partOrderMsg, new MessageQueueSelector() {@Override//这里的arg参数就是外面的orderIdpublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer orderId = (Integer) arg;int index = orderId % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", send);}}defaultMQProducer.shutdown();}
}
-
在发送消息的时候实现MessageQueueSelector接口用于在发送消息的时候指定队列。其中,
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg)方法有三个参数:其中,mqs表示当前topic所路由的全部队列数,这里就是8个队列,broker-a有4个队列,broker-b有4个队列。msg就是传入的消息体,arg 就是传入的orderId。 -
这里根据orderId与队列数求模取余来获取消息应该发送到哪个队列中,这样就保证了相同的orderId的消息会落到同一个队列中
Integer orderId = (Integer) arg; int index = orderId % mqs.size(); return mqs.get(index);
生产者运行结果(部分截图)

从运行结果可以看出相同orderId的消息被投递到了同一个MessageQueue中,而相同MessageQueue队列天然是有顺序的。
2.2.定义消费者
说完了生产者,接着来说说消费者。消费者的逻辑主要是在消费的时候需要实现 MessageListenerOrderly 类来进行消息监听。核心代码是:
// 2.订阅消费消息defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {System.out.println("消费得到的消息是={}" + msg);System.out.println("消息体内容是={}" + new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});
这里启动了三个消费者,不管消费者消费的顺序如何,相同的orderId下的5条消息都是被顺序消费的。



3. 碰到的问题
在首次调试的时候出现了一个 broker is full 的错误。这是由于磁盘空间不足导致的,可以通过 df -h 命令查看当前磁盘空间的占用情况,当磁盘空间使用率超过90%的话则会报此错。

4. 全局顺序消息
全局顺序消息是指消费者消费全部消息都是顺序的,只能让所有的消息都发送到同一个MessageQueue中来实现,在高并发场景下会非常影响效率。
5. 广播消息
广播消息是向主题(topic)的所有订阅者发送消息,订阅同一个topic的多个消费者,都能全量收到生产者发送的所有消息。
广播消息的生产者与普通同步消息的生产者实现是一致的,不同的是消费者的消息模式不同。这里给出消费者实现的不同之处。
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("broadCastGroup");defaultMQPushConsumer.setNamesrvAddr("172.31.184.89:9876");// 设置消费者的模式是广播模式defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);//从第一位开始消费defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
6. 延迟消息
延迟消息与普通消息的不同之处在于,它们要在指定的时间之后才会被传递。生产者并不会延迟发送消息,而是发送到topic里面,消费者延迟指定的时间进行消费。
6.1. 延迟消息生产者
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("scheduled_group");defaultMQProducer.setNamesrvAddr("172.31.186.180:9876");defaultMQProducer.start();for (int i = 0; i < 100; i++) {Message message = new Message("Schedule_topic", ("延迟消息测试" + i).getBytes());//设置延迟级别,默认有18个延迟级别,这个消息将延迟10秒消费message.setDelayTimeLevel(3);defaultMQProducer.send(message);}System.out.println("所有延迟消息发送完成");defaultMQProducer.shutdown();
延迟消息生产者与普通消息生产者主要的区别是延迟消息需要调用 setDelayTimeLevel 方法设置延迟级别,这里设置级别是3,则是延迟10秒。RocketMQ提供了18种延迟级别。可以在 RocketMQ的仪表板中的集群中的broker配置中找到。

延迟消息的消费者与普通消息的消费者相同的。RocketMQ内部通过名为SCHEDULE_TOPIC_XXXX 的topic来存放延迟消息。

7.批量消息
批量发送消息提高了传递消息的性能。官方建议批量消息的总大小不应超过1M,实际不应超过4M。如果超过4M的批量消息需要进行分批处理。同时设置broker的配置参数为4M(在broker的配置文件中修改:maxMessageSize=4194304)。核心代码如下:
//4.创建消息List<Message> messageList = new ArrayList<>();for (int i = 0; i < 100*100; i++) {// 创建消息,指定topic,以及消息体messageList.add(new Message("batch_topic", ("飞哥测试批量消息" + i).getBytes()));}//批量消息消息小于4M的处理SendResult send = defaultMQProducer.send(messageList);System.out.println(send);
8.过滤消息
使用tag过滤
在大多数情况下,标签是一种简单而有用的设计,可以用来选择你想要的消息。
首先是根据tag来过滤消息,生产者在发送消息的时候指定该消息的tag标签,消费者则可以根据tag来过滤消息。
8.1. 过滤消息生产者
这里定义了三个tag,分别是tagA,tagB以及tagC,生产者在生产消息的时候给每个消息指定不同的tag。
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("TagProducer_group");defaultMQProducer.setNamesrvAddr("172.31.184.89:9876");defaultMQProducer.start();String[] tags = new String[]{"tagA", "tagB", "tagC"};for (int i = 0; i < 15; i++) {Message message = new Message("TagFilterTest", tags[i % tags.length], ("飞哥tag消息过滤" + tags[i % tags.length]).getBytes());SendResult send = defaultMQProducer.send(message);System.out.printf("%s%n", send);}defaultMQProducer.shutdown();
8.2. 过滤消息的消费者
消费者过滤出了标签带有tagA以及tagC的消息进行消费。这里其实是broker将consumer需要的消息推给消费者。
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("tagConsumer");defaultMQPushConsumer.setNamesrvAddr("172.31.184.89:9876");defaultMQPushConsumer.subscribe("TagFilterTest", "tagA||tagC");defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.println("接收到的消息=" + msg);System.out.println("接收到的消息体=" + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});defaultMQPushConsumer.start();System.out.println("消费者已经启动");

使用SQL过滤
SQL 功能可以通过发送消息时输入的属性进行一些计算,在RocketMQ定义的语法下,可以实现一些有趣的逻辑。
语法
RocketMQ只定义了一些基本的语法类支持这个特性。
1. 数值比较:如 `>`,`>=`,`<=`,`BETWEEN`,`=`;
2. 字符比较:如 `=`,'<>',`IN`;
3. `IS NULL` 或 `IS NOT NULL` ;
4. 逻辑`AND`,`OR`,`NOT`;
常量类型有:
1. 数字,如 123,
2. 字符,如 'abc',必须用单引号;
3. `NULL`,特殊常数;
4. 布尔值,`TRUE` 或 `FALSE`;
SQL过滤生产者
生产者主要设置属性过滤 message.putUserProperty("a", String.valueOf(i)); 表示第一条消息键值对是 a=0,第二条消息键值对是a=1。
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("TagProducer_group");defaultMQProducer.setNamesrvAddr("172.31.184.89:9876");defaultMQProducer.start();String[] tags = new String[]{"tagA", "tagB", "tagC"};for (int i = 0; i < 15; i++) {Message message = new Message("SQLFilterTest", tags[i % tags.length], ("飞哥sql消息过滤" + tags[i % tags.length]).getBytes());message.putUserProperty("a", String.valueOf(i));SendResult send = defaultMQProducer.send(message);System.out.printf("%s%n", send);}defaultMQProducer.shutdown();
SQL过滤消费者:
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("tagConsumer");defaultMQPushConsumer.setNamesrvAddr("172.31.184.89:9876");defaultMQPushConsumer.subscribe("SQLFilterTest", MessageSelector.bySql("(TAGS is not null and TAGS in ('tagA','tagC'))"+" and (a is null and a between 0 and 3)"));defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.println("接收到的消息=" + msg);System.out.println("接收到的消息体=" + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});defaultMQPushConsumer.start();System.out.println("消费者已经启动");
如果运行报 The broker does not support consumer to filter message by SQL92

则需要修改 broker.conf 文件,增加如下配置:
# 开启对 propertyfilter的支持
enablePropertyFilter = true
filterSupportRetry = true
然后重启broker。
总结
本文介绍了局部顺序消息,全局顺序消息,广播消息,延迟消息,以及如何批量发送消息和过滤消息。
相关文章:
【RocketMQ系列五】消息示例-顺序消息延迟消息广播消息的实现
1. 前言 上一篇文章我们介绍了简单消息的实现,本文将主要来介绍顺序消息的实现,顺序消息分为局部顺序消息和全局顺序消息。 顺序消息指的是消费者在消费消息时,按照生产者发送消息的顺序进行消费。即先发送的先消费【FIFO】。 顺序消息分为…...
hdfs dfsadmin -safemode无法退出安全模式
退出安全模式 第一种:正常退出安全模式 hdfs dfsadmin -safemode leave如提示Safe mode is OFF,那就说明退出成功,但有时候这个命令也没办法退出安全模式,就需要使用强制退出 第二种:强制退出安全模式 hdfs dfsadmin …...
git 新建 branch 推送 到服务器
通常情况下,需要开发一个模块,从 master 新建立了一个 分支,newbranch,如果推送到服务器; 1:从远程 master 建立本地分支 newbranch; git checkout -b newbranch origin/master 2:当修改完成代码…...
安全渗透测试基础知识之网络基础知识
一、OSI七层模型 7应用层6表示层5会话层4传输层3网络层2数据链路层1物理层1.物理层 提供通信介质和接口标准 网线 2.网络链路层 提供二层寻扯/MAC地址和二层通信(交换机)功能 协议:以太网Ethernet 3.网络层 提供三层寻扯/IP地址和三层通信(路由器...
Unity Editor 打包指定资源(AssetBundle)和加载指定资源
前言: 一般用于ui资源打包和加载,代码比较简单没什么好说的,直接上代码。 打包代码: [MenuItem("Assets/打包指定的预设")]public static void BuildAsset() {var selectObject Selection.activeObject;if (selectObje…...
网站批量替换关键词方法
注意替换操作之前先对文件做好备份 1.下载http://downinfo.myhostadmin.net/ultrareplace5.02.rar 解压出来,运行UltraReplace.exe 2.点击菜单栏中的配置,全选所有文件类型,或者根据自己的需求选择部分,如htm、html、php、asp等 3.若替换单个文件,点击文件,若是要…...
RabbitMQ的LazyQueue
在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如: 消费者宕机或出现网络故障消息发送量激增,超过了消费者处理速度消费者处理业务发生阻塞 一旦…...
面试经典150题——Day16
文章目录 一、题目二、题解 一、题目 42. Trapping Rain Water Given n non-negative integers representing an elevation map where the width of each bar is 1, compute how much water it can trap after raining. Example 1: Input: height [0,1,0,2,1,0,1,3,2,1,2,…...
从零开始搭建第一个django项目
目录 配置环境创建 Django 项目和 APP项目组成 子目录文件组成应用文件组成 配置 settings.py启动项目 数据表创建models.pyDjango-models的常用字段和常用配置 Django-admin 引入admin后台和管理员外键views.pyurls.pypostman接口测试 QuerySetInstance功能APIView 的概念…...
Godot2D角色导航-自动寻路教程(Godot获取导航路径)
文章目录 开始准备获取路径全局点坐标 开始准备 首先创建一个导航场景,具体内容参考下列文章: Godot实现角色随鼠标移动 然后我们需要设置它的导航目标位置,具体关于位置的讲解在下面这个文章: Godot设置导航代理的目标位置 获取…...
用c++写一个高精度计算的减法运算
这段代码是一个用C编写的程序,它实现了两个大整数的减法运算。 #include<iostream> #include<cstdio> #include<cstring> using namespace std;int main(){int a[256],b[256],c[256],lena,lenb,lenc,i;char n[256],n1[256]"1001",n2[2…...
基于白鲸优化的BP神经网络(分类应用) - 附代码
基于白鲸优化的BP神经网络(分类应用) - 附代码 文章目录 基于白鲸优化的BP神经网络(分类应用) - 附代码1.鸢尾花iris数据介绍2.数据集整理3.白鲸优化BP神经网络3.1 BP神经网络参数设置3.2 白鲸算法应用 4.测试结果:5.M…...
Matlab遗传算法工具箱——一个例子搞懂遗传算法
解决问题 我们一般使用遗传算法是用来处理最优解问题的,下面是一个最优解问题的例子 打开遗传算法工具箱 ①在Matlab界面找到应用程序选项,点击应用程序(英文版的Matlab可以点击App选项) ②找到Optimization工具箱,点击打开 创建所需要…...
Coreldraw2020最新64位电脑完整版本下载教程
安装之前所有的杀毒软件都要退出。无论是360,腾讯管家,或者电脑自带的安全中心,要不然会阻止安装。 CorelDRAW2020版win下载如下:https://wm.makeding.com/iclk/?zoneid55678 CorelDRAW2020版mac下载如下:https://wm.makeding.com/iclk/?…...
第一节——vue安装+前端工程化
作者:尤雨溪 官网:简介 | Vue.js 脚手架文档 创建一个项目 | Vue CLI 一、概念(了解) 是一套用于构建用户界面的渐进式框架。与其它大型框架不同的是,Vue 被设计为可以自底向上逐层应用。Vue 的核心库只关注视图层&…...
vue集成钉钉单点登录
初始环境判断 判断是否是来自钉钉环境的访问,返回:boolean类型值 window.navigator.userAgent.includes("DingTalk")前端引入vue中钉钉相关的依赖,并获取钉钉的临时授权码 import * as dingtalk from dingtalk-jsapi; let that …...
凉鞋的 Godot 笔记 203. 变量的常用类型
203. 变量的常用类型 在上一篇,我们对变量进行了概述和简介,知识地图如下: 我们已经接触了,变量的字符串类型,以及一些功能。 在这一篇,我们尝试多接触一些变量的类型。 首先是整数类型。 整数类型 整…...
【现场问题】批量新建工作流的问题
批量建工作流的优势和劣势 关于批量建工作流的优势缺点 关于批量建工作流的优势 不需要手动,直接一键建立,同时节点的批量建立也成功了 缺点 1、机器识别,一次性成形,没有办法手动的去干涉这东西 2、大数据量的表需要单独处理的…...
动态规划14(Leetcode516最长回文子序列)
代码: class Solution {public int longestPalindromeSubseq(String s) {int n s.length();int[][] dp new int[n][n];for(int in-1;i>0;i--){dp[i][i] 1;char c1 s.charAt(i);for(int ji1;j<n;j){char c2 s.charAt(j);if(c1c2){dp[i][j] dp[i1][j-1]2…...
写一个简单的解释器(0) 简介和目标
解释语言和编译语言 编译语言,是指其编译器生成的可执行文件为机器码,可以直接在计算机上运行的语言,比如说 C/C \texttt{C/C} C/C 。 解释语言,是指经由解释器生成的可执行文件为字节码文件,只能运行在特殊的虚拟机…...
无法与IP建立连接,未能下载VSCode服务器
如题,在远程连接服务器的时候突然遇到了这个提示。 查阅了一圈,发现是VSCode版本自动更新惹的祸!!! 在VSCode的帮助->关于这里发现前几天VSCode自动更新了,我的版本号变成了1.100.3 才导致了远程连接出…...
渗透实战PortSwigger靶场-XSS Lab 14:大多数标签和属性被阻止
<script>标签被拦截 我们需要把全部可用的 tag 和 event 进行暴力破解 XSS cheat sheet: https://portswigger.net/web-security/cross-site-scripting/cheat-sheet 通过爆破发现body可以用 再把全部 events 放进去爆破 这些 event 全部可用 <body onres…...
学校招生小程序源码介绍
基于ThinkPHPFastAdminUniApp开发的学校招生小程序源码,专为学校招生场景量身打造,功能实用且操作便捷。 从技术架构来看,ThinkPHP提供稳定可靠的后台服务,FastAdmin加速开发流程,UniApp则保障小程序在多端有良好的兼…...
CocosCreator 之 JavaScript/TypeScript和Java的相互交互
引擎版本: 3.8.1 语言: JavaScript/TypeScript、C、Java 环境:Window 参考:Java原生反射机制 您好,我是鹤九日! 回顾 在上篇文章中:CocosCreator Android项目接入UnityAds 广告SDK。 我们简单讲…...
RNN避坑指南:从数学推导到LSTM/GRU工业级部署实战流程
本文较长,建议点赞收藏,以免遗失。更多AI大模型应用开发学习视频及资料,尽在聚客AI学院。 本文全面剖析RNN核心原理,深入讲解梯度消失/爆炸问题,并通过LSTM/GRU结构实现解决方案,提供时间序列预测和文本生成…...
数据结构:递归的种类(Types of Recursion)
目录 尾递归(Tail Recursion) 什么是 Loop(循环)? 复杂度分析 头递归(Head Recursion) 树形递归(Tree Recursion) 线性递归(Linear Recursion)…...
企业大模型服务合规指南:深度解析备案与登记制度
伴随AI技术的爆炸式发展,尤其是大模型(LLM)在各行各业的深度应用和整合,企业利用AI技术提升效率、创新服务的步伐不断加快。无论是像DeepSeek这样的前沿技术提供者,还是积极拥抱AI转型的传统企业,在面向公众…...
使用 uv 工具快速部署并管理 vLLM 推理环境
uv:现代 Python 项目管理的高效助手 uv:Rust 驱动的 Python 包管理新时代 在部署大语言模型(LLM)推理服务时,vLLM 是一个备受关注的方案,具备高吞吐、低延迟和对 OpenAI API 的良好兼容性。为了提高部署效…...
多模态学习路线(2)——DL基础系列
目录 前言 一、归一化 1. Layer Normalization (LN) 2. Batch Normalization (BN) 3. Instance Normalization (IN) 4. Group Normalization (GN) 5. Root Mean Square Normalization(RMSNorm) 二、激活函数 1. Sigmoid激活函数(二分类&…...
SE(Secure Element)加密芯片与MCU协同工作的典型流程
以下是SE(Secure Element)加密芯片与MCU协同工作的典型流程,综合安全认证、数据保护及防篡改机制: 一、基础认证流程(参数保护方案) 密钥预置 SE芯片与MCU分别预置相同的3DES密钥(Key1、Key2…...
