微服务 RocketMQ-延时消息 消息过滤 管控台搜索问题
~~微服务 RocketMQ-延时消息 消息过滤 管控台搜索问题~~
- RocketMQ-延时消息
- 实现延时消息
- RocketMQ-消息过滤
- Tag标签过滤
- SQL标签过滤
- 管控台搜索问题
RocketMQ-延时消息
给消息设置延时时间,到一定时间,消费者才能消费的到,中间件内部通过每秒钟扫描,判断是否到达要求时间
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
但这是默认的,我们可以修改
想修改可以去rocketmq的conf文件夹,修改broker.conf配置参数
该时间是指消息在中间件里面存储的时间
实现延时消息
消费者类:
public class Consumer {public static void main(String[] args) throws Exception {//定义消息消费者(在同一个JVM中,消费者的组名不能重复)DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("helloConsumerGroup");//设置nameServer地址consumer.setNamesrvAddr("43.143.161.59:9876");//设置订阅的主题consumer.subscribe("helloTopic","*");//设置消息的监听器consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for(MessageExt msg:list){String s = new String(msg.getBody(), Charset.defaultCharset());System.out.println("消息消费时间:"+new Date()+",消息的内容:"+s);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();}
}
生产者类:
public class Producer {public static void main(String[] args) throws Exception {//定义一个生产者对象DefaultMQProducer producer = new DefaultMQProducer("helloGroup");//连接nameServerproducer.setNamesrvAddr("43.143.161.59:9876");//启动生产者producer.start();//设置消息发送的目的地String topic = "helloTopic";//发送消息Message msg = new Message(topic,("延时消息,发送时间:"+new Date()).getBytes(Charset.defaultCharset()));//设置消息延时级别msg.setDelayTimeLevel(3);producer.sendOneway(msg);System.out.println("消息发送完毕.");TimeUnit.SECONDS.sleep(5);//关闭资源producer.shutdown();}
}
RocketMQ-消息过滤
Tag标签过滤
用Tag方式进行过滤的方法是传入感兴趣的Tag标签,Tag标签是一个普通字符串,是在创建Message的时候添加的,一个Message只能有一个Tag。使用Tag方式过滤非常高效。
生产者类:
public class Producer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("tagProduceGroup");producer.setNamesrvAddr("43.143.161.59:9876");producer.start();String topic = "tagFilterTopic";Message msg1 = new Message(topic,"TagA",("消息A").getBytes(Charset.defaultCharset()));Message msg2 = new Message(topic,"TagB",("消息B").getBytes(Charset.defaultCharset()));Message msg3 = new Message(topic,"TagC",("消息C").getBytes(Charset.defaultCharset()));producer.sendOneway(msg1);producer.sendOneway(msg2);producer.sendOneway(msg3);System.out.println("消息发送完毕.");TimeUnit.SECONDS.sleep(5);producer.shutdown();}
}
消费者类:
public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tagFilterConsumerGroup");consumer.setNamesrvAddr("43.143.161.59:9876");consumer.subscribe("tagFilterTopic","TagA || TagC");consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for(MessageExt msg:list){String s = new String(msg.getBody(), Charset.defaultCharset());System.out.println("消息的内容:"+s);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();}
}
运行结果
SQL标签过滤
可以过滤内容,像写where一样
生产者类:
public class Producer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("sqlProduceGroup");producer.setNamesrvAddr("43.143.161.59:9876");producer.start();String topic = "sqlFilterTopic";Message msg1 = new Message(topic,("美女A,年龄22,体重45").getBytes(Charset.defaultCharset()));msg1.putUserProperty("age","22");msg1.putUserProperty("weight","45");Message msg2 = new Message(topic,("美女B,年龄25,体重60").getBytes(Charset.defaultCharset()));msg2.putUserProperty("age","25");msg2.putUserProperty("weight","60");Message msg3 = new Message(topic,("美女C,年龄40,体重70").getBytes(Charset.defaultCharset()));msg3.putUserProperty("age","40");msg3.putUserProperty("weight","70");producer.sendOneway(msg1);producer.sendOneway(msg2);producer.sendOneway(msg3);System.out.println("消息发送完毕.");TimeUnit.SECONDS.sleep(5);producer.shutdown();}
}
消费者类:
public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sqlFilterConsumerGroup");consumer.setNamesrvAddr("43.143.161.59:9876");consumer.subscribe("sqlFilterTopic", MessageSelector.bySql("age>23 and weight>60"));consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for(MessageExt msg:list){String s = new String(msg.getBody(), Charset.defaultCharset());System.out.println("消息的内容:"+s);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();}
}
运行结果:
原因是因为默认是不支持sql过滤的,需要更改配置文件之后重启broker服务
在文件最后一行添加enablePropertyFilter=true即可
随后重新启动broker服务
之后运行结果
管控台搜索问题
为什么有时候管控台的消息都没有显示收到此消息,但消费者却能消费?
因为时间问题,因为我们的rocketmq是部署在虚拟机上的,当我们虚拟机和windows时间是同步的时候,消息是没有问题的,控制台显示时间内上下波动一小时的消息,但当虚拟机关掉的时候,时间是不动的,windows的时间却因为电脑里面的一个物理小电池,时间还在正常运行,两者时间不同步了,造成我们发消息是虚拟机的时间,控制台显示的是windows的时间,但消费因为并没有按照时间过滤,所以还是可以接收的到,把时间改一下又可以看到消息了
相关文章:

微服务 RocketMQ-延时消息 消息过滤 管控台搜索问题
~~微服务 RocketMQ-延时消息 消息过滤 管控台搜索问题~~ RocketMQ-延时消息实现延时消息RocketMQ-消息过滤Tag标签过滤SQL标签过滤管控台搜索问题RocketMQ-延时消息 给消息设置延时时间,到一定时间,消费者才能消费的到,中间件内部通过每秒钟扫…...
js发送邮件(node.js)
以前看别人博客留言或者评论文章时必须填写邮箱信息,感觉甚是麻烦。 后来才知道是为了在博主回复后让访客收到邮件,用心良苦。 于是我也在新增留言和文章评论的接口里,新增了给自己发送邮件提醒的功能。 我用的QQ邮箱,具体如下…...
English Learning - Day58 一周高频问题汇总 2023.2.12 周日
English Learning - Day58 一周高频问题汇总 2023.2.12 周日这周主要内容继续说说状语从句结果状语从句这周主要内容 DAY58【周日总结】 一周高频问题汇总 (打卡作业详见 Day59) 一近期主要讲了 一 01.主动脉修饰 以下是最常问到的知识点拓展ÿ…...

【微电网】基于风光储能和需求响应的微电网日前经济调度(Python代码实现)
目录 1 概述 2 知识点及数学模型 3 算例实现 3.1算例介绍 3.2风光参与的模型求解 3.3 风光和储能参与的模型求解 3.5 风光储能和需求响应都参与模型求解 3.6 结果分析对比 4 Python代码及算例数据 1 概述 近年来,微电网、清洁能源等已成为全球关注的热点…...

四种方式的MySQL安装
mysql安装常见的方法有四种序号 安装方式 说明1 yum\rpm简单、快速,不能定制参数2二进制 解压,简单配置就可使用 免安装 mysql-a.b.c-linux2.x-x86_64.tar.gz3源码编译 可以定制参数,安装时间长 mysql-a.b.c.tar.gz4源码制成rpm包 把源码制…...
软考高级信息系统项目管理师系列之九:项目范围管理
软考高级信息系统项目管理师系列之九:项目范围管理 一、范围管理输入、输出、工具和技术表二、范围管理概述三、规划范围管理四、收集需求1.收集需求:2.需求分类3.收集需求的工具与技术4.收集需求过程主要输出5.需求文件内容6.需求管理7.可跟踪性8.双向可跟踪性9.需求跟踪矩阵…...

【项目精选】javaEE健康管理系统(论文+开题报告+答辩PPT+源代码+数据库+讲解视频)
点击下载源码 javaEE健康管理系统主要功能包括:教师登录退出、教师饮食管理、教师健康日志、体检管理等等。本系统结构如下: (1)用户模块: 实现登录功能 实现用户登录的退出 实现用户注册 (2)教…...
ctfshow nodejs
web 334 大小写转换特殊字符绕过。 “ı”.toUpperCase() ‘I’,“ſ”.toUpperCase() ‘S’。 “K”.toLowerCase() ‘k’. payload: CTFſHOW 123456web 335 通过源码可知 eval(xxx),eval 中可以执行 js 代码,那么我们可以依此执行系…...
无线传感器原理及方法|重点理论知识|2021年19级|期末考试
Min-Max定位 【P63】 最小最大法的基本思想是依据未知节点到各锚节点的距离测量值及锚节点的坐标构造若干个边界框,即以参考节点为圆心,未知节点到该锚节点的距离测量值为半径所构成圆的外接矩形,计算外接矩形的质心为未知节点的估计坐标。 多边定位法的浮点运算量大,计算代…...
带你写出符合 Promise/A+ 规范 Promise 的源码
Promise是前端面试中的高频问题,如果你能根据PromiseA的规范,写出符合规范的源码,那么我想,对于面试中的Promise相关的问题,都能够给出比较完美的答案。 我的建议是,对照规范多写几次实现,也许…...
回流与重绘
触发回流与重绘条件👉回流当渲染树中部分或者全部元素的尺寸、结构或者属性发生变化时,浏览器会重新渲染部分或者全部文档的过程就称为 回流。引起回流原因1.页面的首次渲染2.浏览器的窗口大小发生变化3.元素的内容发生变化4.元素的尺寸或者位置发生变化…...

openpyxl表格的简单实用
示例:创建简单的电子表格和条形图 在这个例子中,我们将从头开始创建一个工作表并添加一些数据,然后绘制它。我们还将探索一些有限的单元格样式和格式。 我们将在工作表上输入的数据如下: 首先,让我们加载 openpyxl 并创建一个新工作簿。并获取活动表。我们还将输入我们…...

【寒假day4】leetcode刷题
🌈一、选择题❤1.下列哪一个是析构函数的特征( )。A: 析构函数定义只能在类体内 B: 一个类中只能定义一个析构函数 C: 析构函数名与类名相同 D: 析构函数可以有一个或多个参数答案:B答案解析:析构函数是构造函…...
【竞赛题】6355. 统计公平数对的数目
题目: 给你一个下标从 0 开始、长度为 n 的整数数组 nums ,和两个整数 lower 和 upper ,返回 公平数对的数目 。 如果 (i, j) 数对满足以下情况,则认为它是一个 公平数对 : 0 < i < j < n,且 l…...

Redis集群搭建(主从、哨兵、分片)
1.单机安装Redis 首先需要安装Redis所需要的依赖: yum install -y gcc tcl然后将课前资料提供的Redis安装包上传到虚拟机的任意目录: 例如,我放到了/tmp目录: 解压缩: tar -xzf redis-6.2.4.tar.gz解压后࿱…...
Dart语法基础补充
Asynchrony support Dart 库中充满了返回 Future 或 Stream 对象的函数。 这些函数是异步的:它们在设置一个可能耗时的操作(例如 I/O)后返回,而不等待该操作完成。 async 和 await 关键字支持异步编程,让编写看起来类…...

Nginx - 深入理解nginx的处理请求、进程关系和配置文件重载
概述 Nginx的系统学习整理的第三篇博客,主要介绍nginx的应用场景和架构基础,以便更好的理解,再生产环境中进行性能调优。 Nginx的三个主要应用场景 1.静态资源服务,通过本地文件系统提供服务 2.反向代理服务,强大的性…...
华为OD机试 - 服务依赖(Python)| 真题含思路
服务依赖 题目 在某系统中有众多服务,每个服务用字符串(只包含字母和数字,长度<=10)唯一标识,服务间可能有依赖关系,如A依赖B,则当 B 故障时导致 A 也故障。 传递具有依赖性,如 A依赖 B,B 依赖 C,当 C 故障时导致 B 故障,也导致 A 故障。给出所有依赖关系以及当…...

html的表单标签(form)
目录标题1、表单标签主要有三大类:2、表单标签中常见的属性3、例子代码及结果4、注意:5、表单中特殊的属性表单标签可以用来数据交互,而前面学的六个标签只能发送不能接收。 表单标签的作用就是数据交互1、表单标签主要有三大类: …...

手把手教你部署ruoyi前后端分离版本
下载源码(当前版本3.8.5)RuoYi-Vue: 🎉 基于SpringBoot,Spring Security,JWT,Vue & Element 的前后端分离权限管理系统,同时提供了 Vue3 的版本 (gitee.com)创建数据库(一定要是这三个&…...

观成科技:隐蔽隧道工具Ligolo-ng加密流量分析
1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具,该工具基于TUN接口实现其功能,利用反向TCP/TLS连接建立一条隐蔽的通信信道,支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式,适应复杂网…...
web vue 项目 Docker化部署
Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段: 构建阶段(Build Stage):…...

阿里云ACP云计算备考笔记 (5)——弹性伸缩
目录 第一章 概述 第二章 弹性伸缩简介 1、弹性伸缩 2、垂直伸缩 3、优势 4、应用场景 ① 无规律的业务量波动 ② 有规律的业务量波动 ③ 无明显业务量波动 ④ 混合型业务 ⑤ 消息通知 ⑥ 生命周期挂钩 ⑦ 自定义方式 ⑧ 滚的升级 5、使用限制 第三章 主要定义 …...

8k长序列建模,蛋白质语言模型Prot42仅利用目标蛋白序列即可生成高亲和力结合剂
蛋白质结合剂(如抗体、抑制肽)在疾病诊断、成像分析及靶向药物递送等关键场景中发挥着不可替代的作用。传统上,高特异性蛋白质结合剂的开发高度依赖噬菌体展示、定向进化等实验技术,但这类方法普遍面临资源消耗巨大、研发周期冗长…...
django filter 统计数量 按属性去重
在Django中,如果你想要根据某个属性对查询集进行去重并统计数量,你可以使用values()方法配合annotate()方法来实现。这里有两种常见的方法来完成这个需求: 方法1:使用annotate()和Count 假设你有一个模型Item,并且你想…...
HTML前端开发:JavaScript 常用事件详解
作为前端开发的核心,JavaScript 事件是用户与网页交互的基础。以下是常见事件的详细说明和用法示例: 1. onclick - 点击事件 当元素被单击时触发(左键点击) button.onclick function() {alert("按钮被点击了!&…...
Java入门学习详细版(一)
大家好,Java 学习是一个系统学习的过程,核心原则就是“理论 实践 坚持”,并且需循序渐进,不可过于着急,本篇文章推出的这份详细入门学习资料将带大家从零基础开始,逐步掌握 Java 的核心概念和编程技能。 …...

IT供电系统绝缘监测及故障定位解决方案
随着新能源的快速发展,光伏电站、储能系统及充电设备已广泛应用于现代能源网络。在光伏领域,IT供电系统凭借其持续供电性好、安全性高等优势成为光伏首选,但在长期运行中,例如老化、潮湿、隐裂、机械损伤等问题会影响光伏板绝缘层…...
精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南
精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南 在数字化营销时代,邮件列表效度、用户参与度和网站性能等指标往往决定着创业公司的增长成败。今天,我们将深入解析邮件打开率、网站可用性、页面参与时…...
rnn判断string中第一次出现a的下标
# coding:utf8 import torch import torch.nn as nn import numpy as np import random import json""" 基于pytorch的网络编写 实现一个RNN网络完成多分类任务 判断字符 a 第一次出现在字符串中的位置 """class TorchModel(nn.Module):def __in…...