微服务 RocketMQ-延时消息 消息过滤 管控台搜索问题
~~微服务 RocketMQ-延时消息 消息过滤 管控台搜索问题~~
- RocketMQ-延时消息
- 实现延时消息
- RocketMQ-消息过滤
- Tag标签过滤
- SQL标签过滤
- 管控台搜索问题
RocketMQ-延时消息
给消息设置延时时间,到一定时间,消费者才能消费的到,中间件内部通过每秒钟扫描,判断是否到达要求时间
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
但这是默认的,我们可以修改


想修改可以去rocketmq的conf文件夹,修改broker.conf配置参数
该时间是指消息在中间件里面存储的时间
实现延时消息
消费者类:
public class Consumer {public static void main(String[] args) throws Exception {//定义消息消费者(在同一个JVM中,消费者的组名不能重复)DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("helloConsumerGroup");//设置nameServer地址consumer.setNamesrvAddr("43.143.161.59:9876");//设置订阅的主题consumer.subscribe("helloTopic","*");//设置消息的监听器consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for(MessageExt msg:list){String s = new String(msg.getBody(), Charset.defaultCharset());System.out.println("消息消费时间:"+new Date()+",消息的内容:"+s);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();}
}
生产者类:
public class Producer {public static void main(String[] args) throws Exception {//定义一个生产者对象DefaultMQProducer producer = new DefaultMQProducer("helloGroup");//连接nameServerproducer.setNamesrvAddr("43.143.161.59:9876");//启动生产者producer.start();//设置消息发送的目的地String topic = "helloTopic";//发送消息Message msg = new Message(topic,("延时消息,发送时间:"+new Date()).getBytes(Charset.defaultCharset()));//设置消息延时级别msg.setDelayTimeLevel(3);producer.sendOneway(msg);System.out.println("消息发送完毕.");TimeUnit.SECONDS.sleep(5);//关闭资源producer.shutdown();}
}
RocketMQ-消息过滤
Tag标签过滤
用Tag方式进行过滤的方法是传入感兴趣的Tag标签,Tag标签是一个普通字符串,是在创建Message的时候添加的,一个Message只能有一个Tag。使用Tag方式过滤非常高效。
生产者类:
public class Producer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("tagProduceGroup");producer.setNamesrvAddr("43.143.161.59:9876");producer.start();String topic = "tagFilterTopic";Message msg1 = new Message(topic,"TagA",("消息A").getBytes(Charset.defaultCharset()));Message msg2 = new Message(topic,"TagB",("消息B").getBytes(Charset.defaultCharset()));Message msg3 = new Message(topic,"TagC",("消息C").getBytes(Charset.defaultCharset()));producer.sendOneway(msg1);producer.sendOneway(msg2);producer.sendOneway(msg3);System.out.println("消息发送完毕.");TimeUnit.SECONDS.sleep(5);producer.shutdown();}
}
消费者类:
public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tagFilterConsumerGroup");consumer.setNamesrvAddr("43.143.161.59:9876");consumer.subscribe("tagFilterTopic","TagA || TagC");consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for(MessageExt msg:list){String s = new String(msg.getBody(), Charset.defaultCharset());System.out.println("消息的内容:"+s);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();}
}
运行结果

SQL标签过滤
可以过滤内容,像写where一样
生产者类:
public class Producer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("sqlProduceGroup");producer.setNamesrvAddr("43.143.161.59:9876");producer.start();String topic = "sqlFilterTopic";Message msg1 = new Message(topic,("美女A,年龄22,体重45").getBytes(Charset.defaultCharset()));msg1.putUserProperty("age","22");msg1.putUserProperty("weight","45");Message msg2 = new Message(topic,("美女B,年龄25,体重60").getBytes(Charset.defaultCharset()));msg2.putUserProperty("age","25");msg2.putUserProperty("weight","60");Message msg3 = new Message(topic,("美女C,年龄40,体重70").getBytes(Charset.defaultCharset()));msg3.putUserProperty("age","40");msg3.putUserProperty("weight","70");producer.sendOneway(msg1);producer.sendOneway(msg2);producer.sendOneway(msg3);System.out.println("消息发送完毕.");TimeUnit.SECONDS.sleep(5);producer.shutdown();}
}
消费者类:
public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sqlFilterConsumerGroup");consumer.setNamesrvAddr("43.143.161.59:9876");consumer.subscribe("sqlFilterTopic", MessageSelector.bySql("age>23 and weight>60"));consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for(MessageExt msg:list){String s = new String(msg.getBody(), Charset.defaultCharset());System.out.println("消息的内容:"+s);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();}
}
运行结果:

原因是因为默认是不支持sql过滤的,需要更改配置文件之后重启broker服务

在文件最后一行添加enablePropertyFilter=true即可
随后重新启动broker服务

之后运行结果

管控台搜索问题
为什么有时候管控台的消息都没有显示收到此消息,但消费者却能消费?
因为时间问题,因为我们的rocketmq是部署在虚拟机上的,当我们虚拟机和windows时间是同步的时候,消息是没有问题的,控制台显示时间内上下波动一小时的消息,但当虚拟机关掉的时候,时间是不动的,windows的时间却因为电脑里面的一个物理小电池,时间还在正常运行,两者时间不同步了,造成我们发消息是虚拟机的时间,控制台显示的是windows的时间,但消费因为并没有按照时间过滤,所以还是可以接收的到,把时间改一下又可以看到消息了

相关文章:
微服务 RocketMQ-延时消息 消息过滤 管控台搜索问题
~~微服务 RocketMQ-延时消息 消息过滤 管控台搜索问题~~ RocketMQ-延时消息实现延时消息RocketMQ-消息过滤Tag标签过滤SQL标签过滤管控台搜索问题RocketMQ-延时消息 给消息设置延时时间,到一定时间,消费者才能消费的到,中间件内部通过每秒钟扫…...
js发送邮件(node.js)
以前看别人博客留言或者评论文章时必须填写邮箱信息,感觉甚是麻烦。 后来才知道是为了在博主回复后让访客收到邮件,用心良苦。 于是我也在新增留言和文章评论的接口里,新增了给自己发送邮件提醒的功能。 我用的QQ邮箱,具体如下…...
English Learning - Day58 一周高频问题汇总 2023.2.12 周日
English Learning - Day58 一周高频问题汇总 2023.2.12 周日这周主要内容继续说说状语从句结果状语从句这周主要内容 DAY58【周日总结】 一周高频问题汇总 (打卡作业详见 Day59) 一近期主要讲了 一 01.主动脉修饰 以下是最常问到的知识点拓展ÿ…...
【微电网】基于风光储能和需求响应的微电网日前经济调度(Python代码实现)
目录 1 概述 2 知识点及数学模型 3 算例实现 3.1算例介绍 3.2风光参与的模型求解 3.3 风光和储能参与的模型求解 3.5 风光储能和需求响应都参与模型求解 3.6 结果分析对比 4 Python代码及算例数据 1 概述 近年来,微电网、清洁能源等已成为全球关注的热点…...
四种方式的MySQL安装
mysql安装常见的方法有四种序号 安装方式 说明1 yum\rpm简单、快速,不能定制参数2二进制 解压,简单配置就可使用 免安装 mysql-a.b.c-linux2.x-x86_64.tar.gz3源码编译 可以定制参数,安装时间长 mysql-a.b.c.tar.gz4源码制成rpm包 把源码制…...
软考高级信息系统项目管理师系列之九:项目范围管理
软考高级信息系统项目管理师系列之九:项目范围管理 一、范围管理输入、输出、工具和技术表二、范围管理概述三、规划范围管理四、收集需求1.收集需求:2.需求分类3.收集需求的工具与技术4.收集需求过程主要输出5.需求文件内容6.需求管理7.可跟踪性8.双向可跟踪性9.需求跟踪矩阵…...
【项目精选】javaEE健康管理系统(论文+开题报告+答辩PPT+源代码+数据库+讲解视频)
点击下载源码 javaEE健康管理系统主要功能包括:教师登录退出、教师饮食管理、教师健康日志、体检管理等等。本系统结构如下: (1)用户模块: 实现登录功能 实现用户登录的退出 实现用户注册 (2)教…...
ctfshow nodejs
web 334 大小写转换特殊字符绕过。 “ı”.toUpperCase() ‘I’,“ſ”.toUpperCase() ‘S’。 “K”.toLowerCase() ‘k’. payload: CTFſHOW 123456web 335 通过源码可知 eval(xxx),eval 中可以执行 js 代码,那么我们可以依此执行系…...
无线传感器原理及方法|重点理论知识|2021年19级|期末考试
Min-Max定位 【P63】 最小最大法的基本思想是依据未知节点到各锚节点的距离测量值及锚节点的坐标构造若干个边界框,即以参考节点为圆心,未知节点到该锚节点的距离测量值为半径所构成圆的外接矩形,计算外接矩形的质心为未知节点的估计坐标。 多边定位法的浮点运算量大,计算代…...
带你写出符合 Promise/A+ 规范 Promise 的源码
Promise是前端面试中的高频问题,如果你能根据PromiseA的规范,写出符合规范的源码,那么我想,对于面试中的Promise相关的问题,都能够给出比较完美的答案。 我的建议是,对照规范多写几次实现,也许…...
回流与重绘
触发回流与重绘条件👉回流当渲染树中部分或者全部元素的尺寸、结构或者属性发生变化时,浏览器会重新渲染部分或者全部文档的过程就称为 回流。引起回流原因1.页面的首次渲染2.浏览器的窗口大小发生变化3.元素的内容发生变化4.元素的尺寸或者位置发生变化…...
openpyxl表格的简单实用
示例:创建简单的电子表格和条形图 在这个例子中,我们将从头开始创建一个工作表并添加一些数据,然后绘制它。我们还将探索一些有限的单元格样式和格式。 我们将在工作表上输入的数据如下: 首先,让我们加载 openpyxl 并创建一个新工作簿。并获取活动表。我们还将输入我们…...
【寒假day4】leetcode刷题
🌈一、选择题❤1.下列哪一个是析构函数的特征( )。A: 析构函数定义只能在类体内 B: 一个类中只能定义一个析构函数 C: 析构函数名与类名相同 D: 析构函数可以有一个或多个参数答案:B答案解析:析构函数是构造函…...
【竞赛题】6355. 统计公平数对的数目
题目: 给你一个下标从 0 开始、长度为 n 的整数数组 nums ,和两个整数 lower 和 upper ,返回 公平数对的数目 。 如果 (i, j) 数对满足以下情况,则认为它是一个 公平数对 : 0 < i < j < n,且 l…...
Redis集群搭建(主从、哨兵、分片)
1.单机安装Redis 首先需要安装Redis所需要的依赖: yum install -y gcc tcl然后将课前资料提供的Redis安装包上传到虚拟机的任意目录: 例如,我放到了/tmp目录: 解压缩: tar -xzf redis-6.2.4.tar.gz解压后࿱…...
Dart语法基础补充
Asynchrony support Dart 库中充满了返回 Future 或 Stream 对象的函数。 这些函数是异步的:它们在设置一个可能耗时的操作(例如 I/O)后返回,而不等待该操作完成。 async 和 await 关键字支持异步编程,让编写看起来类…...
Nginx - 深入理解nginx的处理请求、进程关系和配置文件重载
概述 Nginx的系统学习整理的第三篇博客,主要介绍nginx的应用场景和架构基础,以便更好的理解,再生产环境中进行性能调优。 Nginx的三个主要应用场景 1.静态资源服务,通过本地文件系统提供服务 2.反向代理服务,强大的性…...
华为OD机试 - 服务依赖(Python)| 真题含思路
服务依赖 题目 在某系统中有众多服务,每个服务用字符串(只包含字母和数字,长度<=10)唯一标识,服务间可能有依赖关系,如A依赖B,则当 B 故障时导致 A 也故障。 传递具有依赖性,如 A依赖 B,B 依赖 C,当 C 故障时导致 B 故障,也导致 A 故障。给出所有依赖关系以及当…...
html的表单标签(form)
目录标题1、表单标签主要有三大类:2、表单标签中常见的属性3、例子代码及结果4、注意:5、表单中特殊的属性表单标签可以用来数据交互,而前面学的六个标签只能发送不能接收。 表单标签的作用就是数据交互1、表单标签主要有三大类: …...
手把手教你部署ruoyi前后端分离版本
下载源码(当前版本3.8.5)RuoYi-Vue: 🎉 基于SpringBoot,Spring Security,JWT,Vue & Element 的前后端分离权限管理系统,同时提供了 Vue3 的版本 (gitee.com)创建数据库(一定要是这三个&…...
OpenLayers 可视化之热力图
注:当前使用的是 ol 5.3.0 版本,天地图使用的key请到天地图官网申请,并替换为自己的key 热力图(Heatmap)又叫热点图,是一种通过特殊高亮显示事物密度分布、变化趋势的数据可视化技术。采用颜色的深浅来显示…...
调用支付宝接口响应40004 SYSTEM_ERROR问题排查
在对接支付宝API的时候,遇到了一些问题,记录一下排查过程。 Body:{"datadigital_fincloud_generalsaas_face_certify_initialize_response":{"msg":"Business Failed","code":"40004","sub_msg…...
应用升级/灾备测试时使用guarantee 闪回点迅速回退
1.场景 应用要升级,当升级失败时,数据库回退到升级前. 要测试系统,测试完成后,数据库要回退到测试前。 相对于RMAN恢复需要很长时间, 数据库闪回只需要几分钟。 2.技术实现 数据库设置 2个db_recovery参数 创建guarantee闪回点,不需要开启数据库闪回。…...
简易版抽奖活动的设计技术方案
1.前言 本技术方案旨在设计一套完整且可靠的抽奖活动逻辑,确保抽奖活动能够公平、公正、公开地进行,同时满足高并发访问、数据安全存储与高效处理等需求,为用户提供流畅的抽奖体验,助力业务顺利开展。本方案将涵盖抽奖活动的整体架构设计、核心流程逻辑、关键功能实现以及…...
前端导出带有合并单元格的列表
// 导出async function exportExcel(fileName "共识调整.xlsx") {// 所有数据const exportData await getAllMainData();// 表头内容let fitstTitleList [];const secondTitleList [];allColumns.value.forEach(column > {if (!column.children) {fitstTitleL…...
定时器任务——若依源码分析
分析util包下面的工具类schedule utils: ScheduleUtils 是若依中用于与 Quartz 框架交互的工具类,封装了定时任务的 创建、更新、暂停、删除等核心逻辑。 createScheduleJob createScheduleJob 用于将任务注册到 Quartz,先构建任务的 JobD…...
从零实现STL哈希容器:unordered_map/unordered_set封装详解
本篇文章是对C学习的STL哈希容器自主实现部分的学习分享 希望也能为你带来些帮助~ 那咱们废话不多说,直接开始吧! 一、源码结构分析 1. SGISTL30实现剖析 // hash_set核心结构 template <class Value, class HashFcn, ...> class hash_set {ty…...
12.找到字符串中所有字母异位词
🧠 题目解析 题目描述: 给定两个字符串 s 和 p,找出 s 中所有 p 的字母异位词的起始索引。 返回的答案以数组形式表示。 字母异位词定义: 若两个字符串包含的字符种类和出现次数完全相同,顺序无所谓,则互为…...
如何在网页里填写 PDF 表格?
有时候,你可能希望用户能在你的网站上填写 PDF 表单。然而,这件事并不简单,因为 PDF 并不是一种原生的网页格式。虽然浏览器可以显示 PDF 文件,但原生并不支持编辑或填写它们。更糟的是,如果你想收集表单数据ÿ…...
python爬虫——气象数据爬取
一、导入库与全局配置 python 运行 import json import datetime import time import requests from sqlalchemy import create_engine import csv import pandas as pd作用: 引入数据解析、网络请求、时间处理、数据库操作等所需库。requests:发送 …...
