微服务 RocketMQ-延时消息 消息过滤 管控台搜索问题
~~微服务 RocketMQ-延时消息 消息过滤 管控台搜索问题~~
- RocketMQ-延时消息
- 实现延时消息
- RocketMQ-消息过滤
- Tag标签过滤
- SQL标签过滤
- 管控台搜索问题
RocketMQ-延时消息
给消息设置延时时间,到一定时间,消费者才能消费的到,中间件内部通过每秒钟扫描,判断是否到达要求时间
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
但这是默认的,我们可以修改
想修改可以去rocketmq的conf文件夹,修改broker.conf配置参数
该时间是指消息在中间件里面存储的时间
实现延时消息
消费者类:
public class Consumer {public static void main(String[] args) throws Exception {//定义消息消费者(在同一个JVM中,消费者的组名不能重复)DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("helloConsumerGroup");//设置nameServer地址consumer.setNamesrvAddr("43.143.161.59:9876");//设置订阅的主题consumer.subscribe("helloTopic","*");//设置消息的监听器consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for(MessageExt msg:list){String s = new String(msg.getBody(), Charset.defaultCharset());System.out.println("消息消费时间:"+new Date()+",消息的内容:"+s);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();}
}
生产者类:
public class Producer {public static void main(String[] args) throws Exception {//定义一个生产者对象DefaultMQProducer producer = new DefaultMQProducer("helloGroup");//连接nameServerproducer.setNamesrvAddr("43.143.161.59:9876");//启动生产者producer.start();//设置消息发送的目的地String topic = "helloTopic";//发送消息Message msg = new Message(topic,("延时消息,发送时间:"+new Date()).getBytes(Charset.defaultCharset()));//设置消息延时级别msg.setDelayTimeLevel(3);producer.sendOneway(msg);System.out.println("消息发送完毕.");TimeUnit.SECONDS.sleep(5);//关闭资源producer.shutdown();}
}
RocketMQ-消息过滤
Tag标签过滤
用Tag方式进行过滤的方法是传入感兴趣的Tag标签,Tag标签是一个普通字符串,是在创建Message的时候添加的,一个Message只能有一个Tag。使用Tag方式过滤非常高效。
生产者类:
public class Producer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("tagProduceGroup");producer.setNamesrvAddr("43.143.161.59:9876");producer.start();String topic = "tagFilterTopic";Message msg1 = new Message(topic,"TagA",("消息A").getBytes(Charset.defaultCharset()));Message msg2 = new Message(topic,"TagB",("消息B").getBytes(Charset.defaultCharset()));Message msg3 = new Message(topic,"TagC",("消息C").getBytes(Charset.defaultCharset()));producer.sendOneway(msg1);producer.sendOneway(msg2);producer.sendOneway(msg3);System.out.println("消息发送完毕.");TimeUnit.SECONDS.sleep(5);producer.shutdown();}
}
消费者类:
public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tagFilterConsumerGroup");consumer.setNamesrvAddr("43.143.161.59:9876");consumer.subscribe("tagFilterTopic","TagA || TagC");consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for(MessageExt msg:list){String s = new String(msg.getBody(), Charset.defaultCharset());System.out.println("消息的内容:"+s);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();}
}
运行结果
SQL标签过滤
可以过滤内容,像写where一样
生产者类:
public class Producer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("sqlProduceGroup");producer.setNamesrvAddr("43.143.161.59:9876");producer.start();String topic = "sqlFilterTopic";Message msg1 = new Message(topic,("美女A,年龄22,体重45").getBytes(Charset.defaultCharset()));msg1.putUserProperty("age","22");msg1.putUserProperty("weight","45");Message msg2 = new Message(topic,("美女B,年龄25,体重60").getBytes(Charset.defaultCharset()));msg2.putUserProperty("age","25");msg2.putUserProperty("weight","60");Message msg3 = new Message(topic,("美女C,年龄40,体重70").getBytes(Charset.defaultCharset()));msg3.putUserProperty("age","40");msg3.putUserProperty("weight","70");producer.sendOneway(msg1);producer.sendOneway(msg2);producer.sendOneway(msg3);System.out.println("消息发送完毕.");TimeUnit.SECONDS.sleep(5);producer.shutdown();}
}
消费者类:
public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sqlFilterConsumerGroup");consumer.setNamesrvAddr("43.143.161.59:9876");consumer.subscribe("sqlFilterTopic", MessageSelector.bySql("age>23 and weight>60"));consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for(MessageExt msg:list){String s = new String(msg.getBody(), Charset.defaultCharset());System.out.println("消息的内容:"+s);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();}
}
运行结果:
原因是因为默认是不支持sql过滤的,需要更改配置文件之后重启broker服务
在文件最后一行添加enablePropertyFilter=true即可
随后重新启动broker服务
之后运行结果
管控台搜索问题
为什么有时候管控台的消息都没有显示收到此消息,但消费者却能消费?
因为时间问题,因为我们的rocketmq是部署在虚拟机上的,当我们虚拟机和windows时间是同步的时候,消息是没有问题的,控制台显示时间内上下波动一小时的消息,但当虚拟机关掉的时候,时间是不动的,windows的时间却因为电脑里面的一个物理小电池,时间还在正常运行,两者时间不同步了,造成我们发消息是虚拟机的时间,控制台显示的是windows的时间,但消费因为并没有按照时间过滤,所以还是可以接收的到,把时间改一下又可以看到消息了
相关文章:

微服务 RocketMQ-延时消息 消息过滤 管控台搜索问题
~~微服务 RocketMQ-延时消息 消息过滤 管控台搜索问题~~ RocketMQ-延时消息实现延时消息RocketMQ-消息过滤Tag标签过滤SQL标签过滤管控台搜索问题RocketMQ-延时消息 给消息设置延时时间,到一定时间,消费者才能消费的到,中间件内部通过每秒钟扫…...
js发送邮件(node.js)
以前看别人博客留言或者评论文章时必须填写邮箱信息,感觉甚是麻烦。 后来才知道是为了在博主回复后让访客收到邮件,用心良苦。 于是我也在新增留言和文章评论的接口里,新增了给自己发送邮件提醒的功能。 我用的QQ邮箱,具体如下…...
English Learning - Day58 一周高频问题汇总 2023.2.12 周日
English Learning - Day58 一周高频问题汇总 2023.2.12 周日这周主要内容继续说说状语从句结果状语从句这周主要内容 DAY58【周日总结】 一周高频问题汇总 (打卡作业详见 Day59) 一近期主要讲了 一 01.主动脉修饰 以下是最常问到的知识点拓展ÿ…...

【微电网】基于风光储能和需求响应的微电网日前经济调度(Python代码实现)
目录 1 概述 2 知识点及数学模型 3 算例实现 3.1算例介绍 3.2风光参与的模型求解 3.3 风光和储能参与的模型求解 3.5 风光储能和需求响应都参与模型求解 3.6 结果分析对比 4 Python代码及算例数据 1 概述 近年来,微电网、清洁能源等已成为全球关注的热点…...

四种方式的MySQL安装
mysql安装常见的方法有四种序号 安装方式 说明1 yum\rpm简单、快速,不能定制参数2二进制 解压,简单配置就可使用 免安装 mysql-a.b.c-linux2.x-x86_64.tar.gz3源码编译 可以定制参数,安装时间长 mysql-a.b.c.tar.gz4源码制成rpm包 把源码制…...
软考高级信息系统项目管理师系列之九:项目范围管理
软考高级信息系统项目管理师系列之九:项目范围管理 一、范围管理输入、输出、工具和技术表二、范围管理概述三、规划范围管理四、收集需求1.收集需求:2.需求分类3.收集需求的工具与技术4.收集需求过程主要输出5.需求文件内容6.需求管理7.可跟踪性8.双向可跟踪性9.需求跟踪矩阵…...

【项目精选】javaEE健康管理系统(论文+开题报告+答辩PPT+源代码+数据库+讲解视频)
点击下载源码 javaEE健康管理系统主要功能包括:教师登录退出、教师饮食管理、教师健康日志、体检管理等等。本系统结构如下: (1)用户模块: 实现登录功能 实现用户登录的退出 实现用户注册 (2)教…...
ctfshow nodejs
web 334 大小写转换特殊字符绕过。 “ı”.toUpperCase() ‘I’,“ſ”.toUpperCase() ‘S’。 “K”.toLowerCase() ‘k’. payload: CTFſHOW 123456web 335 通过源码可知 eval(xxx),eval 中可以执行 js 代码,那么我们可以依此执行系…...
无线传感器原理及方法|重点理论知识|2021年19级|期末考试
Min-Max定位 【P63】 最小最大法的基本思想是依据未知节点到各锚节点的距离测量值及锚节点的坐标构造若干个边界框,即以参考节点为圆心,未知节点到该锚节点的距离测量值为半径所构成圆的外接矩形,计算外接矩形的质心为未知节点的估计坐标。 多边定位法的浮点运算量大,计算代…...
带你写出符合 Promise/A+ 规范 Promise 的源码
Promise是前端面试中的高频问题,如果你能根据PromiseA的规范,写出符合规范的源码,那么我想,对于面试中的Promise相关的问题,都能够给出比较完美的答案。 我的建议是,对照规范多写几次实现,也许…...
回流与重绘
触发回流与重绘条件👉回流当渲染树中部分或者全部元素的尺寸、结构或者属性发生变化时,浏览器会重新渲染部分或者全部文档的过程就称为 回流。引起回流原因1.页面的首次渲染2.浏览器的窗口大小发生变化3.元素的内容发生变化4.元素的尺寸或者位置发生变化…...

openpyxl表格的简单实用
示例:创建简单的电子表格和条形图 在这个例子中,我们将从头开始创建一个工作表并添加一些数据,然后绘制它。我们还将探索一些有限的单元格样式和格式。 我们将在工作表上输入的数据如下: 首先,让我们加载 openpyxl 并创建一个新工作簿。并获取活动表。我们还将输入我们…...

【寒假day4】leetcode刷题
🌈一、选择题❤1.下列哪一个是析构函数的特征( )。A: 析构函数定义只能在类体内 B: 一个类中只能定义一个析构函数 C: 析构函数名与类名相同 D: 析构函数可以有一个或多个参数答案:B答案解析:析构函数是构造函…...
【竞赛题】6355. 统计公平数对的数目
题目: 给你一个下标从 0 开始、长度为 n 的整数数组 nums ,和两个整数 lower 和 upper ,返回 公平数对的数目 。 如果 (i, j) 数对满足以下情况,则认为它是一个 公平数对 : 0 < i < j < n,且 l…...

Redis集群搭建(主从、哨兵、分片)
1.单机安装Redis 首先需要安装Redis所需要的依赖: yum install -y gcc tcl然后将课前资料提供的Redis安装包上传到虚拟机的任意目录: 例如,我放到了/tmp目录: 解压缩: tar -xzf redis-6.2.4.tar.gz解压后࿱…...
Dart语法基础补充
Asynchrony support Dart 库中充满了返回 Future 或 Stream 对象的函数。 这些函数是异步的:它们在设置一个可能耗时的操作(例如 I/O)后返回,而不等待该操作完成。 async 和 await 关键字支持异步编程,让编写看起来类…...

Nginx - 深入理解nginx的处理请求、进程关系和配置文件重载
概述 Nginx的系统学习整理的第三篇博客,主要介绍nginx的应用场景和架构基础,以便更好的理解,再生产环境中进行性能调优。 Nginx的三个主要应用场景 1.静态资源服务,通过本地文件系统提供服务 2.反向代理服务,强大的性…...
华为OD机试 - 服务依赖(Python)| 真题含思路
服务依赖 题目 在某系统中有众多服务,每个服务用字符串(只包含字母和数字,长度<=10)唯一标识,服务间可能有依赖关系,如A依赖B,则当 B 故障时导致 A 也故障。 传递具有依赖性,如 A依赖 B,B 依赖 C,当 C 故障时导致 B 故障,也导致 A 故障。给出所有依赖关系以及当…...

html的表单标签(form)
目录标题1、表单标签主要有三大类:2、表单标签中常见的属性3、例子代码及结果4、注意:5、表单中特殊的属性表单标签可以用来数据交互,而前面学的六个标签只能发送不能接收。 表单标签的作用就是数据交互1、表单标签主要有三大类: …...

手把手教你部署ruoyi前后端分离版本
下载源码(当前版本3.8.5)RuoYi-Vue: 🎉 基于SpringBoot,Spring Security,JWT,Vue & Element 的前后端分离权限管理系统,同时提供了 Vue3 的版本 (gitee.com)创建数据库(一定要是这三个&…...

TDengine 快速体验(Docker 镜像方式)
简介 TDengine 可以通过安装包、Docker 镜像 及云服务快速体验 TDengine 的功能,本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker,请使用 安装包的方式快…...
论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(二)
HoST框架核心实现方法详解 - 论文深度解读(第二部分) 《Learning Humanoid Standing-up Control across Diverse Postures》 系列文章: 论文深度解读 + 算法与代码分析(二) 作者机构: 上海AI Lab, 上海交通大学, 香港大学, 浙江大学, 香港中文大学 论文主题: 人形机器人…...
golang循环变量捕获问题
在 Go 语言中,当在循环中启动协程(goroutine)时,如果在协程闭包中直接引用循环变量,可能会遇到一个常见的陷阱 - 循环变量捕获问题。让我详细解释一下: 问题背景 看这个代码片段: fo…...

基于距离变化能量开销动态调整的WSN低功耗拓扑控制开销算法matlab仿真
目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.算法仿真参数 5.算法理论概述 6.参考文献 7.完整程序 1.程序功能描述 通过动态调整节点通信的能量开销,平衡网络负载,延长WSN生命周期。具体通过建立基于距离的能量消耗模型&am…...

安宝特方案丨XRSOP人员作业标准化管理平台:AR智慧点检验收套件
在选煤厂、化工厂、钢铁厂等过程生产型企业,其生产设备的运行效率和非计划停机对工业制造效益有较大影响。 随着企业自动化和智能化建设的推进,需提前预防假检、错检、漏检,推动智慧生产运维系统数据的流动和现场赋能应用。同时,…...

linux arm系统烧录
1、打开瑞芯微程序 2、按住linux arm 的 recover按键 插入电源 3、当瑞芯微检测到有设备 4、松开recover按键 5、选择升级固件 6、点击固件选择本地刷机的linux arm 镜像 7、点击升级 (忘了有没有这步了 估计有) 刷机程序 和 镜像 就不提供了。要刷的时…...

DIY|Mac 搭建 ESP-IDF 开发环境及编译小智 AI
前一阵子在百度 AI 开发者大会上,看到基于小智 AI DIY 玩具的演示,感觉有点意思,想着自己也来试试。 如果只是想烧录现成的固件,乐鑫官方除了提供了 Windows 版本的 Flash 下载工具 之外,还提供了基于网页版的 ESP LA…...
Java数值运算常见陷阱与规避方法
整数除法中的舍入问题 问题现象 当开发者预期进行浮点除法却误用整数除法时,会出现小数部分被截断的情况。典型错误模式如下: void process(int value) {double half = value / 2; // 整数除法导致截断// 使用half变量 }此时...
Webpack性能优化:构建速度与体积优化策略
一、构建速度优化 1、升级Webpack和Node.js 优化效果:Webpack 4比Webpack 3构建时间降低60%-98%。原因: V8引擎优化(for of替代forEach、Map/Set替代Object)。默认使用更快的md4哈希算法。AST直接从Loa…...
苹果AI眼镜:从“工具”到“社交姿态”的范式革命——重新定义AI交互入口的未来机会
在2025年的AI硬件浪潮中,苹果AI眼镜(Apple Glasses)正在引发一场关于“人机交互形态”的深度思考。它并非简单地替代AirPods或Apple Watch,而是开辟了一个全新的、日常可接受的AI入口。其核心价值不在于功能的堆叠,而在于如何通过形态设计打破社交壁垒,成为用户“全天佩戴…...