当前位置: 首页 > news >正文

【RocketMQ系列五】消息示例-顺序消息延迟消息广播消息的实现

1. 前言

上一篇文章我们介绍了简单消息的实现,本文将主要来介绍顺序消息的实现,顺序消息分为局部顺序消息和全局顺序消息。

顺序消息指的是消费者在消费消息时,按照生产者发送消息的顺序进行消费。即先发送的先消费【FIFO】。

顺序消息分为 全局顺序消息和局部顺序消息。

全局顺序消息就是全局使用一个queue。

局部顺序消息就是 有顺序依赖的消息放在同一个queue中,多个queue并行消费。

2. 局部顺序消息

默认情况下RocketMQ会根据轮询的方式将消息发送到某个broker中的某个队列中,这样的话就不能保证消息是有序的。

比如在购物网站下单场景下:有 1. 创建订单---->2. 订单支付---->3. 订单发货---->4. 订单完成 四条消息。这四条消息逻辑上肯定是有序的。但是如果采用RocketMQ默认的消息投递方式,那么同一个订单,有可能创建订单被投递到了 MessageQueue1,订单支付的话被投递到了MessageQueue2。 由于消息在不同的MessageQueue中,消费者在消费的时候就可能会出现订单支付的消息先于创建订单的消息。

局部顺序消息就是要保证同一笔订单4条消息都放在同一个queue中,这样的话就不会出现订单支付的消息先于创建订单的消息被消费。就像下图所示:

局部顺序消息

局部顺序消息消费者在消费某个topic的某个队列中的消息的时候是顺序的。消费者使用MessageListenerOrderly类来进行消息监听。

2.1. 定义生产者

  1. 这里定义了名为part_order_topic_test的topic。运行程序之后该topic可以路由到broker-a 以及broker-b 两个broker。

    image-20231003154231683

public class OrderProducer {// 局部顺序消费,核心就是自己选择Queue,保证需要顺序保障的消息落到同一个队列中public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer defaultMQProducer = new DefaultMQProducer("order_producer_group");defaultMQProducer.setNamesrvAddr("172.31.184.89:9876");defaultMQProducer.start();for (int i = 0; i < 10; i++) {int orderId = i;for (int j = 0; j < 5; j++) {// 构建消息体,tags和key 只是做一个简单区分Message partOrderMsg = new Message("part_order_topic_test", "order_" + orderId, "KEY_" + orderId, ("局部顺序消息处理_" + orderId + ";step_" + j).getBytes());SendResult send = defaultMQProducer.send(partOrderMsg, new MessageQueueSelector() {@Override//这里的arg参数就是外面的orderIdpublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer orderId = (Integer) arg;int index = orderId % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", send);}}defaultMQProducer.shutdown();}
}
  1. 在发送消息的时候实现MessageQueueSelector接口用于在发送消息的时候指定队列。其中, public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) 方法有三个参数:其中,mqs表示当前topic所路由的全部队列数,这里就是8个队列,broker-a有4个队列,broker-b有4个队列。msg就是传入的消息体,arg 就是传入的orderId。

  2. 这里根据orderId与队列数求模取余来获取消息应该发送到哪个队列中,这样就保证了相同的orderId的消息会落到同一个队列中

    Integer orderId = (Integer) arg;
    int index = orderId % mqs.size();
    return mqs.get(index);
    
生产者运行结果(部分截图)

image-20231003160039915

从运行结果可以看出相同orderId的消息被投递到了同一个MessageQueue中,而相同MessageQueue队列天然是有顺序的。

2.2.定义消费者

说完了生产者,接着来说说消费者。消费者的逻辑主要是在消费的时候需要实现 MessageListenerOrderly 类来进行消息监听。核心代码是:

	// 2.订阅消费消息defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {System.out.println("消费得到的消息是={}" + msg);System.out.println("消息体内容是={}" + new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});

这里启动了三个消费者,不管消费者消费的顺序如何,相同的orderId下的5条消息都是被顺序消费的。

image-20231010125014684

image-20231010125043838

image-20231010125110337

3. 碰到的问题

在首次调试的时候出现了一个 broker is full 的错误。这是由于磁盘空间不足导致的,可以通过 df -h 命令查看当前磁盘空间的占用情况,当磁盘空间使用率超过90%的话则会报此错。

image-20231003131237358

4. 全局顺序消息

全局顺序消息是指消费者消费全部消息都是顺序的,只能让所有的消息都发送到同一个MessageQueue中来实现,在高并发场景下会非常影响效率。

5. 广播消息

广播消息是向主题(topic)的所有订阅者发送消息,订阅同一个topic的多个消费者,都能全量收到生产者发送的所有消息。

广播消息的生产者与普通同步消息的生产者实现是一致的,不同的是消费者的消息模式不同。这里给出消费者实现的不同之处。

	DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("broadCastGroup");defaultMQPushConsumer.setNamesrvAddr("172.31.184.89:9876");// 设置消费者的模式是广播模式defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);//从第一位开始消费defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

6. 延迟消息

延迟消息与普通消息的不同之处在于,它们要在指定的时间之后才会被传递。生产者并不会延迟发送消息,而是发送到topic里面,消费者延迟指定的时间进行消费。

6.1. 延迟消息生产者

DefaultMQProducer defaultMQProducer = new DefaultMQProducer("scheduled_group");defaultMQProducer.setNamesrvAddr("172.31.186.180:9876");defaultMQProducer.start();for (int i = 0; i < 100; i++) {Message message = new Message("Schedule_topic", ("延迟消息测试" + i).getBytes());//设置延迟级别,默认有18个延迟级别,这个消息将延迟10秒消费message.setDelayTimeLevel(3);defaultMQProducer.send(message);}System.out.println("所有延迟消息发送完成");defaultMQProducer.shutdown();

延迟消息生产者与普通消息生产者主要的区别是延迟消息需要调用 setDelayTimeLevel 方法设置延迟级别,这里设置级别是3,则是延迟10秒。RocketMQ提供了18种延迟级别。可以在 RocketMQ的仪表板中的集群中的broker配置中找到。

image-20231003200021490

延迟消息的消费者与普通消息的消费者相同的。RocketMQ内部通过名为SCHEDULE_TOPIC_XXXX 的topic来存放延迟消息。

image-20231003201410410

7.批量消息

批量发送消息提高了传递消息的性能。官方建议批量消息的总大小不应超过1M,实际不应超过4M。如果超过4M的批量消息需要进行分批处理。同时设置broker的配置参数为4M(在broker的配置文件中修改:maxMessageSize=4194304)。核心代码如下:

	//4.创建消息List<Message> messageList = new ArrayList<>();for (int i = 0; i < 100*100; i++) {// 创建消息,指定topic,以及消息体messageList.add(new Message("batch_topic", ("飞哥测试批量消息" + i).getBytes()));}//批量消息消息小于4M的处理SendResult send = defaultMQProducer.send(messageList);System.out.println(send);

8.过滤消息

使用tag过滤

在大多数情况下,标签是一种简单而有用的设计,可以用来选择你想要的消息。

首先是根据tag来过滤消息,生产者在发送消息的时候指定该消息的tag标签,消费者则可以根据tag来过滤消息。

8.1. 过滤消息生产者

这里定义了三个tag,分别是tagA,tagB以及tagC,生产者在生产消息的时候给每个消息指定不同的tag。

DefaultMQProducer defaultMQProducer = new DefaultMQProducer("TagProducer_group");defaultMQProducer.setNamesrvAddr("172.31.184.89:9876");defaultMQProducer.start();String[] tags = new String[]{"tagA", "tagB", "tagC"};for (int i = 0; i < 15; i++) {Message message = new Message("TagFilterTest", tags[i % tags.length], ("飞哥tag消息过滤" + tags[i % tags.length]).getBytes());SendResult send = defaultMQProducer.send(message);System.out.printf("%s%n", send);}defaultMQProducer.shutdown();

8.2. 过滤消息的消费者

消费者过滤出了标签带有tagA以及tagC的消息进行消费。这里其实是broker将consumer需要的消息推给消费者。

	DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("tagConsumer");defaultMQPushConsumer.setNamesrvAddr("172.31.184.89:9876");defaultMQPushConsumer.subscribe("TagFilterTest", "tagA||tagC");defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.println("接收到的消息=" + msg);System.out.println("接收到的消息体=" + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});defaultMQPushConsumer.start();System.out.println("消费者已经启动");

image-20231003212919155

使用SQL过滤

SQL 功能可以通过发送消息时输入的属性进行一些计算,在RocketMQ定义的语法下,可以实现一些有趣的逻辑。

语法

RocketMQ只定义了一些基本的语法类支持这个特性。

1. 数值比较:如 `>`,`>=`,`<=`,`BETWEEN`,`=`;
2. 字符比较:如 `=`,'<>',`IN`;
3. `IS NULL` 或 `IS NOT NULL` ;
4. 逻辑`AND`,`OR`,`NOT`;

常量类型有:

1. 数字,如 123,
2. 字符,如 'abc',必须用单引号;
3. `NULL`,特殊常数;
4. 布尔值,`TRUE` 或 `FALSE`;

SQL过滤生产者

生产者主要设置属性过滤 message.putUserProperty("a", String.valueOf(i)); 表示第一条消息键值对是 a=0,第二条消息键值对是a=1。

	DefaultMQProducer defaultMQProducer = new DefaultMQProducer("TagProducer_group");defaultMQProducer.setNamesrvAddr("172.31.184.89:9876");defaultMQProducer.start();String[] tags = new String[]{"tagA", "tagB", "tagC"};for (int i = 0; i < 15; i++) {Message message = new Message("SQLFilterTest", tags[i % tags.length], ("飞哥sql消息过滤" + tags[i % tags.length]).getBytes());message.putUserProperty("a", String.valueOf(i));SendResult send = defaultMQProducer.send(message);System.out.printf("%s%n", send);}defaultMQProducer.shutdown();

SQL过滤消费者:

	DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("tagConsumer");defaultMQPushConsumer.setNamesrvAddr("172.31.184.89:9876");defaultMQPushConsumer.subscribe("SQLFilterTest", MessageSelector.bySql("(TAGS is not null and TAGS in ('tagA','tagC'))"+" and (a is null and a between 0 and 3)"));defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.println("接收到的消息=" + msg);System.out.println("接收到的消息体=" + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});defaultMQPushConsumer.start();System.out.println("消费者已经启动");

如果运行报 The broker does not support consumer to filter message by SQL92

image-20231003221207618

则需要修改 broker.conf 文件,增加如下配置:

# 开启对 propertyfilter的支持
enablePropertyFilter = true 
filterSupportRetry = true

然后重启broker。

总结

本文介绍了局部顺序消息,全局顺序消息,广播消息,延迟消息,以及如何批量发送消息和过滤消息。

相关文章:

【RocketMQ系列五】消息示例-顺序消息延迟消息广播消息的实现

1. 前言 上一篇文章我们介绍了简单消息的实现&#xff0c;本文将主要来介绍顺序消息的实现&#xff0c;顺序消息分为局部顺序消息和全局顺序消息。 顺序消息指的是消费者在消费消息时&#xff0c;按照生产者发送消息的顺序进行消费。即先发送的先消费【FIFO】。 顺序消息分为…...

hdfs dfsadmin -safemode无法退出安全模式

退出安全模式 第一种&#xff1a;正常退出安全模式 hdfs dfsadmin -safemode leave如提示Safe mode is OFF&#xff0c;那就说明退出成功&#xff0c;但有时候这个命令也没办法退出安全模式&#xff0c;就需要使用强制退出 第二种&#xff1a;强制退出安全模式 hdfs dfsadmin …...

git 新建 branch 推送 到服务器

通常情况下&#xff0c;需要开发一个模块&#xff0c;从 master 新建立了一个 分支&#xff0c;newbranch&#xff0c;如果推送到服务器&#xff1b; 1&#xff1a;从远程 master 建立本地分支 newbranch&#xff1b; git checkout -b newbranch origin/master 2:当修改完成代码…...

安全渗透测试基础知识之网络基础知识

一、OSI七层模型 7应用层6表示层5会话层4传输层3网络层2数据链路层1物理层1.物理层 提供通信介质和接口标准 网线 2.网络链路层 提供二层寻扯/MAC地址和二层通信(交换机)功能 协议:以太网Ethernet 3.网络层 提供三层寻扯/IP地址和三层通信(路由器...

Unity Editor 打包指定资源(AssetBundle)和加载指定资源

前言&#xff1a; 一般用于ui资源打包和加载&#xff0c;代码比较简单没什么好说的&#xff0c;直接上代码。 打包代码&#xff1a; [MenuItem("Assets/打包指定的预设")]public static void BuildAsset() {var selectObject Selection.activeObject;if (selectObje…...

网站批量替换关键词方法

注意替换操作之前先对文件做好备份 1.下载http://downinfo.myhostadmin.net/ultrareplace5.02.rar 解压出来,运行UltraReplace.exe 2.点击菜单栏中的配置&#xff0c;全选所有文件类型,或者根据自己的需求选择部分,如htm、html、php、asp等 3.若替换单个文件,点击文件,若是要…...

RabbitMQ的LazyQueue

在默认情况下&#xff0c;RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下&#xff0c;这会导致消息积压&#xff0c;比如&#xff1a; 消费者宕机或出现网络故障消息发送量激增&#xff0c;超过了消费者处理速度消费者处理业务发生阻塞 一旦…...

面试经典150题——Day16

文章目录 一、题目二、题解 一、题目 42. Trapping Rain Water Given n non-negative integers representing an elevation map where the width of each bar is 1, compute how much water it can trap after raining. Example 1: Input: height [0,1,0,2,1,0,1,3,2,1,2,…...

从零开始搭建第一个django项目

目录 配置环境创建 Django 项目和 APP项目组成  ‍子目录文件组成应用文件组成 配置 settings.py启动项目 数据表创建models.pyDjango-models的常用字段和常用配置 Django-admin 引入admin后台和管理员外键views.pyurls.pypostman接口测试 QuerySetInstance功能APIView 的概念…...

Godot2D角色导航-自动寻路教程(Godot获取导航路径)

文章目录 开始准备获取路径全局点坐标 开始准备 首先创建一个导航场景&#xff0c;具体内容参考下列文章&#xff1a; Godot实现角色随鼠标移动 然后我们需要设置它的导航目标位置&#xff0c;具体关于位置的讲解在下面这个文章&#xff1a; Godot设置导航代理的目标位置 获取…...

用c++写一个高精度计算的减法运算

这段代码是一个用C编写的程序&#xff0c;它实现了两个大整数的减法运算。 #include<iostream> #include<cstdio> #include<cstring> using namespace std;int main(){int a[256],b[256],c[256],lena,lenb,lenc,i;char n[256],n1[256]"1001",n2[2…...

基于白鲸优化的BP神经网络(分类应用) - 附代码

基于白鲸优化的BP神经网络&#xff08;分类应用&#xff09; - 附代码 文章目录 基于白鲸优化的BP神经网络&#xff08;分类应用&#xff09; - 附代码1.鸢尾花iris数据介绍2.数据集整理3.白鲸优化BP神经网络3.1 BP神经网络参数设置3.2 白鲸算法应用 4.测试结果&#xff1a;5.M…...

Matlab遗传算法工具箱——一个例子搞懂遗传算法

解决问题 我们一般使用遗传算法是用来处理最优解问题的&#xff0c;下面是一个最优解问题的例子 打开遗传算法工具箱 ①在Matlab界面找到应用程序选项&#xff0c;点击应用程序(英文版的Matlab可以点击App选项) ②找到Optimization工具箱&#xff0c;点击打开 创建所需要…...

Coreldraw2020最新64位电脑完整版本下载教程

安装之前所有的杀毒软件都要退出。无论是360&#xff0c;腾讯管家&#xff0c;或者电脑自带的安全中心&#xff0c;要不然会阻止安装。 CorelDRAW2020版win下载如下:https://wm.makeding.com/iclk/?zoneid55678 CorelDRAW2020版mac下载如下:https://wm.makeding.com/iclk/?…...

第一节——vue安装+前端工程化

作者&#xff1a;尤雨溪 官网&#xff1a;简介 | Vue.js 脚手架文档 创建一个项目 | Vue CLI 一、概念&#xff08;了解&#xff09; 是一套用于构建用户界面的渐进式框架。与其它大型框架不同的是&#xff0c;Vue 被设计为可以自底向上逐层应用。Vue 的核心库只关注视图层&…...

vue集成钉钉单点登录

初始环境判断 判断是否是来自钉钉环境的访问&#xff0c;返回&#xff1a;boolean类型值 window.navigator.userAgent.includes("DingTalk")前端引入vue中钉钉相关的依赖&#xff0c;并获取钉钉的临时授权码 import * as dingtalk from dingtalk-jsapi; let that …...

凉鞋的 Godot 笔记 203. 变量的常用类型

203. 变量的常用类型 在上一篇&#xff0c;我们对变量进行了概述和简介&#xff0c;知识地图如下&#xff1a; 我们已经接触了&#xff0c;变量的字符串类型&#xff0c;以及一些功能。 在这一篇&#xff0c;我们尝试多接触一些变量的类型。 首先是整数类型。 整数类型 整…...

【现场问题】批量新建工作流的问题

批量建工作流的优势和劣势 关于批量建工作流的优势缺点 关于批量建工作流的优势 不需要手动&#xff0c;直接一键建立&#xff0c;同时节点的批量建立也成功了 缺点 1、机器识别&#xff0c;一次性成形&#xff0c;没有办法手动的去干涉这东西 2、大数据量的表需要单独处理的…...

动态规划14(Leetcode516最长回文子序列)

代码&#xff1a; class Solution {public int longestPalindromeSubseq(String s) {int n s.length();int[][] dp new int[n][n];for(int in-1;i>0;i--){dp[i][i] 1;char c1 s.charAt(i);for(int ji1;j<n;j){char c2 s.charAt(j);if(c1c2){dp[i][j] dp[i1][j-1]2…...

写一个简单的解释器(0) 简介和目标

解释语言和编译语言 编译语言&#xff0c;是指其编译器生成的可执行文件为机器码&#xff0c;可以直接在计算机上运行的语言&#xff0c;比如说 C/C \texttt{C/C} C/C 。 解释语言&#xff0c;是指经由解释器生成的可执行文件为字节码文件&#xff0c;只能运行在特殊的虚拟机…...

利用ngx_stream_return_module构建简易 TCP/UDP 响应网关

一、模块概述 ngx_stream_return_module 提供了一个极简的指令&#xff1a; return <value>;在收到客户端连接后&#xff0c;立即将 <value> 写回并关闭连接。<value> 支持内嵌文本和内置变量&#xff08;如 $time_iso8601、$remote_addr 等&#xff09;&a…...

【OSG学习笔记】Day 18: 碰撞检测与物理交互

物理引擎&#xff08;Physics Engine&#xff09; 物理引擎 是一种通过计算机模拟物理规律&#xff08;如力学、碰撞、重力、流体动力学等&#xff09;的软件工具或库。 它的核心目标是在虚拟环境中逼真地模拟物体的运动和交互&#xff0c;广泛应用于 游戏开发、动画制作、虚…...

工业安全零事故的智能守护者:一体化AI智能安防平台

前言&#xff1a; 通过AI视觉技术&#xff0c;为船厂提供全面的安全监控解决方案&#xff0c;涵盖交通违规检测、起重机轨道安全、非法入侵检测、盗窃防范、安全规范执行监控等多个方面&#xff0c;能够实现对应负责人反馈机制&#xff0c;并最终实现数据的统计报表。提升船厂…...

R语言AI模型部署方案:精准离线运行详解

R语言AI模型部署方案:精准离线运行详解 一、项目概述 本文将构建一个完整的R语言AI部署解决方案,实现鸢尾花分类模型的训练、保存、离线部署和预测功能。核心特点: 100%离线运行能力自包含环境依赖生产级错误处理跨平台兼容性模型版本管理# 文件结构说明 Iris_AI_Deployme…...

可靠性+灵活性:电力载波技术在楼宇自控中的核心价值

可靠性灵活性&#xff1a;电力载波技术在楼宇自控中的核心价值 在智能楼宇的自动化控制中&#xff0c;电力载波技术&#xff08;PLC&#xff09;凭借其独特的优势&#xff0c;正成为构建高效、稳定、灵活系统的核心解决方案。它利用现有电力线路传输数据&#xff0c;无需额外布…...

Qt Widget类解析与代码注释

#include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this); }Widget::~Widget() {delete ui; }//解释这串代码&#xff0c;写上注释 当然可以&#xff01;这段代码是 Qt …...

汽车生产虚拟实训中的技能提升与生产优化​

在制造业蓬勃发展的大背景下&#xff0c;虚拟教学实训宛如一颗璀璨的新星&#xff0c;正发挥着不可或缺且日益凸显的关键作用&#xff0c;源源不断地为企业的稳健前行与创新发展注入磅礴强大的动力。就以汽车制造企业这一极具代表性的行业主体为例&#xff0c;汽车生产线上各类…...

Java-41 深入浅出 Spring - 声明式事务的支持 事务配置 XML模式 XML+注解模式

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; &#x1f680; AI篇持续更新中&#xff01;&#xff08;长期更新&#xff09; 目前2025年06月05日更新到&#xff1a; AI炼丹日志-28 - Aud…...

【2025年】解决Burpsuite抓不到https包的问题

环境&#xff1a;windows11 burpsuite:2025.5 在抓取https网站时&#xff0c;burpsuite抓取不到https数据包&#xff0c;只显示&#xff1a; 解决该问题只需如下三个步骤&#xff1a; 1、浏览器中访问 http://burp 2、下载 CA certificate 证书 3、在设置--隐私与安全--…...

UR 协作机器人「三剑客」:精密轻量担当(UR7e)、全能协作主力(UR12e)、重型任务专家(UR15)

UR协作机器人正以其卓越性能在现代制造业自动化中扮演重要角色。UR7e、UR12e和UR15通过创新技术和精准设计满足了不同行业的多样化需求。其中&#xff0c;UR15以其速度、精度及人工智能准备能力成为自动化领域的重要突破。UR7e和UR12e则在负载规格和市场定位上不断优化&#xf…...