系列十二、Java操作RocketMQ之带标签Tag的消息
一、带标签的Tag消息
1.1、概述
RocketMQ提供消息过滤的功能,通过Tag或者Key进行区分。我们往一个主题里面发送消息的时候,根据业务逻辑可能需要区分,比如带有tagA标签的消息被消费者A消费,带有tagB标签的消息被消费者B消费,还有在事务监听的类里面,只要是事务消息都要走同一个监听,我们也需要通过过滤才能区别对待。
1.2、什么时候该用Topic,什么时候该用Tag
不同的业务应该使用不同的Topic,如果仅仅是相同的业务里边有不同的表现形式,那么我们要使用Tag进行区分。至于说具体怎么选择,可以从以下几个方面进行区分:
(1)消息类型是否一致:如普通消息、事务消息、延时消息、顺序消息、不同的消息类型使用不同的Topic,无法通过Tag进行区分;
(2)业务是否相关联:没有直接关联的消息,如淘宝交易信息、京东物流消息使用不同的Topic进行区分;而同样是淘宝交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用Tag进行区分;
(3)消息优先级是否一致:如同样是物流消息,盒马必须2小时内送达,天猫超市24小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的Topic进行区分;
(4)消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级别的消息使用同一个Topic,则有可能会因为过长的等待时间而"饿死",此时需要将不同量级的消息进行区分,使用不同的Topic;
总的来说,针对消息分类、可以选择创建多个Topic或者在同一个Topic下创建多个Tag。但是通常情况下,不同Topic之间的消息没有必然的联系。而Tag则用来区分同一个Topic下相互关联的消息,例如:全集和子集的关系,流程先后的关系。
二、案例代码
2.1、pom
同案例五
2.2、RocketMQConstant
同案例五
2.3、消费者
2.3.1、TagConsumer1
package org.star.tag.consumer;import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.star.constants.RocketMQConstant;import java.util.List;/*** @Author: 一叶浮萍归大海* @Date: 2023/8/30 10:33* @Description: Tag消息消费者*/
@Slf4j
public class TagConsumer1 {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagConsumer1Group");consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);consumer.subscribe("TagTopic","NBA");consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {if (CollectionUtils.isNotEmpty(list)) {log.info("消费者[TagConsumer1]收到消息,消息详情:{}", StrUtil.utf8Str(list.get(0).getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();log.info("消费者[TagConsumer1] start success");}}
2.3.2、TagConsumer2
package org.star.tag.consumer;import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.star.constants.RocketMQConstant;import java.util.List;/*** @Author: 一叶浮萍归大海* @Date: 2023/8/30 10:33* @Description: Tag消息消费者*/
@Slf4j
public class TagConsumer2 {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagConsumer2Group");consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);consumer.subscribe("TagTopic","RUN");consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {if (CollectionUtils.isNotEmpty(list)) {log.info("消费者[TagConsumer2]收到消息,消息详情:{}", StrUtil.utf8Str(list.get(0).getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();log.info("消费者[TagConsumer2] start success");}}
2.3.3、TagConsumer3
package org.star.tag.consumer;import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.star.constants.RocketMQConstant;import java.util.List;/*** @Author: 一叶浮萍归大海* @Date: 2023/8/30 10:33* @Description: Tag消息消费者*/
@Slf4j
public class TagConsumer3 {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagConsumer3Group");consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);consumer.subscribe("TagTopic","STAR || CAR");consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {if (CollectionUtils.isNotEmpty(list)) {log.info("消费者[TagConsumer3]收到消息,消息详情:{}", StrUtil.utf8Str(list.get(0).getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();log.info("消费者[TagConsumer3] start success");}}
2.3.4、TagConsumer4
package org.star.tag.consumer;import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.star.constants.RocketMQConstant;import java.util.List;/*** @Author: 一叶浮萍归大海* @Date: 2023/8/30 10:33* @Description: Tag消息消费者*/
@Slf4j
public class TagConsumer4 {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagConsumer4Group");consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);consumer.subscribe("TagTopic","CAR || MOBILE || TOURISM");consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {if (CollectionUtils.isNotEmpty(list)) {log.info("消费者[TagConsumer4]收到消息,消息详情:{}", StrUtil.utf8Str(list.get(0).getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();log.info("消费者[TagConsumer4] start success");}}
2.4、TagProducer
package org.star.tag.producer;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.star.constants.RocketMQConstant;import java.nio.charset.StandardCharsets;/*** @Author: 一叶浮萍归大海* @Date: 2023/8/30 10:22* @Description: Tag消息生产者*/
@Slf4j
public class TagProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("TagProducer");producer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);producer.start();log.info("Tag消息生产者 start success");String[] tags = new String[]{"NBA", "RUN", "STAR","CAR","MOBILE","TOURISM"};for (int i = 0; i < 6; i++) {String tag = tags[i % tags.length];String content = "";if ("NBA".equals(tag)) {content = "NBA message,消息编号[" + i + "]";} else if ("RUN".equals(tag)) {content = "RUN message,消息编号[" + i + "]";} else if ("STAR".equals(tag)) {content = "STAR message,消息编号[" + i + "]";} else if ("CAR".equals(tag)) {content = "CAR message,消息编号[" + i + "]";} else if ("MOBILE".equals(tag)) {content = "MOBILE message,消息编号[" + i + "]";} else if ("TOURISM".equals(tag)) {content = "TOURISM message,消息编号[" + i + "]";}log.info("当前tag:{},消息内容:{}", tag, content);Message message = new Message("TagTopic", tag, content.getBytes(StandardCharsets.UTF_8));SendResult result = producer.send(message);log.info("sendStatus:{},brokerName:{},queueId:{},msgId:{}", result.getSendStatus(), result.getMessageQueue().getBrokerName(), result.getMessageQueue().getQueueId(), result.getMsgId());}producer.shutdown();}}
2.5 、控制台打印结果
# 生产者
09:53:44.850 [main] INFO org.star.tag.producer.TagProducer - Tag消息生产者 start success
09:53:44.850 [main] INFO org.star.tag.producer.TagProducer - 当前tag:NBA,消息内容:NBA message,消息编号[0]
09:53:45.308 [main] INFO org.star.tag.producer.TagProducer - sendStatus:SEND_OK,brokerName:broker-a,queueId:0,msgId:0AA867618F8018B4AAC2262C1D530000
09:53:45.308 [main] INFO org.star.tag.producer.TagProducer - 当前tag:RUN,消息内容:RUN message,消息编号[1]
09:53:45.315 [main] INFO org.star.tag.producer.TagProducer - sendStatus:SEND_OK,brokerName:broker-a,queueId:1,msgId:0AA867618F8018B4AAC2262C1D5C0001
09:53:45.315 [main] INFO org.star.tag.producer.TagProducer - 当前tag:STAR,消息内容:STAR message,消息编号[2]
09:53:45.319 [main] INFO org.star.tag.producer.TagProducer - sendStatus:SEND_OK,brokerName:broker-a,queueId:2,msgId:0AA867618F8018B4AAC2262C1D640002
09:53:45.319 [main] INFO org.star.tag.producer.TagProducer - 当前tag:CAR,消息内容:CAR message,消息编号[3]
09:53:45.322 [main] INFO org.star.tag.producer.TagProducer - sendStatus:SEND_OK,brokerName:broker-a,queueId:3,msgId:0AA867618F8018B4AAC2262C1D680003
09:53:45.323 [main] INFO org.star.tag.producer.TagProducer - 当前tag:MOBILE,消息内容:MOBILE message,消息编号[4]
09:53:45.326 [main] INFO org.star.tag.producer.TagProducer - sendStatus:SEND_OK,brokerName:broker-a,queueId:0,msgId:0AA867618F8018B4AAC2262C1D6B0004
09:53:45.326 [main] INFO org.star.tag.producer.TagProducer - 当前tag:TOURISM,消息内容:TOURISM message,消息编号[5]
09:53:45.329 [main] INFO org.star.tag.producer.TagProducer - sendStatus:SEND_OK,brokerName:broker-a,queueId:1,msgId:0AA867618F8018B4AAC2262C1D6E0005# 消费者TagConsumer1
09:53:45.310 [ConsumeMessageThread_2] INFO org.star.tag.consumer.TagConsumer1 - 消费者[TagConsumer1]收到消息,消息详情:NBA message,消息编号[0]# 消费者TagConsumer2
09:53:45.316 [ConsumeMessageThread_2] INFO org.star.tag.consumer.TagConsumer2 - 消费者[TagConsumer2]收到消息,消息详情:RUN message,消息编号[1]# 消费者TagConsumer3
09:53:45.322 [ConsumeMessageThread_3] INFO org.star.tag.consumer.TagConsumer3 - 消费者[TagConsumer3]收到消息,消息详情:STAR message,消息编号[2]
09:53:45.327 [ConsumeMessageThread_4] INFO org.star.tag.consumer.TagConsumer3 - 消费者[TagConsumer3]收到消息,消息详情:CAR message,消息编号[3]# 消费者TagConsumer4
09:53:45.327 [ConsumeMessageThread_4] INFO org.star.tag.consumer.TagConsumer4 - 消费者[TagConsumer4]收到消息,消息详情:CAR message,消息编号[3]
09:53:45.344 [ConsumeMessageThread_6] INFO org.star.tag.consumer.TagConsumer4 - 消费者[TagConsumer4]收到消息,消息详情:MOBILE message,消息编号[4]
09:53:45.344 [ConsumeMessageThread_5] INFO org.star.tag.consumer.TagConsumer4 - 消费者[TagConsumer4]收到消息,消息详情:TOURISM message,消息编号[5]
相关文章:
系列十二、Java操作RocketMQ之带标签Tag的消息
一、带标签的Tag消息 1.1、概述 RocketMQ提供消息过滤的功能,通过Tag或者Key进行区分。我们往一个主题里面发送消息的时候,根据业务逻辑可能需要区分,比如带有tagA标签的消息被消费者A消费,带有tagB标签的消息被消费者B消费&…...
Java面向对象学习笔记-1
前言 “Java 学习笔记” 是为初学者和希望加深对Java编程语言的理解的人们编写的。Java是一门广泛应用于软件开发领域的强大编程语言,它的语法和概念对于初学者来说可能有些复杂。这份学习笔记的目的是帮助读者逐步学习Java的基本概念,并提供了一系列示…...
el-table根据data动态生成列和行
css //el-table-column加上fixed后会导致悬浮样式丢失,用下面方法可以避免 .el-table__body .el-table__row.hover-row td{background-color: #083a78 !important; } .el-table tbody tr:hover>td {background: #171F34 !important; }html <el-table ref&quo…...

【c++】如何有效地利用命名空间?
🌱博客主页:青竹雾色间 😘博客制作不易欢迎各位👍点赞⭐收藏➕关注 ✨人生如寄,多忧何为 ✨ 目录 前言什么是命名空间?命名空间的语法命名空间的使用避免命名冲突命名空间的嵌套总结 前言 当谈到C编…...
Go语言传参
为了让新手尽快熟悉go的使用,特记录此文,不必谢我,转载请注明! Go 语言中参数传递的各种效果,主要内容包括: 传值效果指针传递结构体传递map 传递channel 传递切片传递错误传递传递效果示例传递方式选择原文连接:https://mp.weixin.qq.com/s?__biz=MzA5Mzk4Njk1OA==&…...

SAP PI 配置SSL链接接口报错问题处理Peer certificate rejected by ChainVerifier
出现这种情况一般无非是没有正确导入证书或者证书过期的情况 第一种,如果没有导入证书的话,需要在NWA中的证书与验证-》CAs中导入管理员提供的证书,这里需要注意的是,需要导入完整的证书链。 第二种如果是证书过期的,…...

【MyBatisⅡ】动态 SQL
目录 🎒1 if 标签 🫖2 trim 标签 👠3 where 标签 🦺4 set 标签 🎨5 foreach 标签 动态 sql 是Mybatis的强⼤特性之⼀,能够完成不同条件下不同的 sql 拼接。 在 xml 里面写判断条件。 动态SQL 在数据库里…...

音视频入门基础理论知识
文章目录 前言一、视频1、视频的概念2、常见的视频格式3、视频帧4、帧率5、色彩空间6、采用 YUV 的优势7、RGB 和 YUV 的换算 二、音频1、音频的概念2、采样率和采样位数①、采样率②、采样位数 3、音频编码4、声道数5、码率6、音频格式 三、编码1、为什么要编码2、视频编码①、…...

Pytorch中如何加载数据、Tensorboard、Transforms的使用
一、Pytorch中如何加载数据 在Pytorch中涉及到如何读取数据,主要是两个类一个类是Dataset、Dataloader Dataset 提供一种方式获取数据,及其对应的label。主要包含以下两个功能: 如何获取每一个数据以及label 告诉我们总共有多少的数据 Datal…...
python如何使用打开文件对话框选择文件?
python如何使用打开文件对话框选择文件? ━━━━━━━━━━━━━━━━━━━━━━ 在Python中,可以使用Tkinter库中的filedialog子模块来打开一个文件对话框以供用户选择文件。以下是一个简单的例子,演示如何使用tkinter.filedialog打…...

虚拟化和容器
文章目录 1 介绍1.1 简介1.2 虚拟化工作原理1.3 两大核心组件:QEMU、KVMQEMUKVM 1.4 发展历史1.5 虚拟化类型1.6 云计算与虚拟化1.7 HypervisorHypervisor分为两大类 1.8 虚拟化 VS 容器 2 虚拟化应用dockerdocker 与虚拟机的区别 K8Swine 参考 1 介绍 1.1 简介 虚…...
LeetCode-78-子集
题目描述: 给你一个整数数组 nums ,数组中的元素 互不相同。返回该数组所有可能的子集(幂集)。 解集 不能 包含重复的子集。你可以按 任意顺序 返回解集。 题目链接:LeetCode-78-子集 解题思路:递归回溯 题…...
js对象转json文件
目录 需求1.首先寻找类似需求的数据2.对数据进行转换3.将转换后的数据转为json文件4.完整代码 需求 需求:在做项目时,遇到了需要制作地址列表的功能,这一般都会用到一些开源的组件库,但是有个问题是不同组件库之间的城市列表数据结…...

【免费模板】2023数学建模国赛word+latex模板免费分享
无需转发 免费获取2023国赛模板,获取方式见文末 模板文件预览如下: 模板参考格式如下: (题目)XXXXXX 摘 要: 开头段:需要充分概括论文内容,一般两到三句话即可,长度控…...

基于HBuilder X平台下的 驾校报名考试管理系统 uniapp 微信小程序3n9o5
本课题研究的是基于HBuilder X系统平台下的驾校管理系统,开发这款驾校管理系统主要是为了帮助学员可以不用约束时间与地点进行查看教练信息、考场信息等内容。本文详细讲述了驾校管理系统的界面设计及使用,主要包括界面的实现、控件的使用、界面的布局和…...

电商3D资产优化管线的自动化
如果你曾经尝试将从 CAD 程序导出的 3D 模型上传到 WebGL 或 AR 服务,那么可能会遇到最大文件大小、永无休止的进度条和糟糕的帧速率等问题。 为了创作良好的在线交互体验,优化 3D 数据的大小和性能至关重要。 这也有利于你的盈利,因为较小的…...

Android 大图显示优化方案-加载Gif 自定义解码器
基于Glide做了图片显示的优化,尤其是加载Gif图的优化,原生Glide加载Gif图性能较低。在原生基础上做了自定义解码器的优化,提升Glide性能 Glide加载大图和Gif 尤其是列表存在gif时,会有明显卡顿,cpu和内存占用较高&…...
Leetcode.664 奇怪的打印机
题目链接 Leetcode.664 奇怪的打印机 hard 题目描述 有台奇怪的打印机有以下两个特殊要求: 打印机每次只能打印由 同一个字符 组成的序列。每次可以在从起始到结束的任意位置打印新字符,并且会覆盖掉原来已有的字符。 给你一个字符串 s ,你…...

正中优配:散户怎么实现T+0?散户在股市上怎么变相T+0?
T0是指当天买入的标的物,在当天就能卖出的买卖方式,其中,在a股市场上,散户能够通过一些办法直接地完成T0买卖方式,接下来,正中优配为大家预备了相关内容,以供参阅。 散户在股票市场上࿰…...

ZooInspector
一、在window,使用我们先打开Zookeeper,目录bin下的zkServer.cmd,把Zookeeper运行起来 编辑https://img.111com.net/attachment/art/187687/5f0c25fbe580c.png 二、可以使用目录bin下的zkCli.cmd,查询Zookeeper数据的方式,但是…...

【JavaEE】-- HTTP
1. HTTP是什么? HTTP(全称为"超文本传输协议")是一种应用非常广泛的应用层协议,HTTP是基于TCP协议的一种应用层协议。 应用层协议:是计算机网络协议栈中最高层的协议,它定义了运行在不同主机上…...

理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端
🌟 什么是 MCP? 模型控制协议 (MCP) 是一种创新的协议,旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议,它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...
【决胜公务员考试】求职OMG——见面课测验1
2025最新版!!!6.8截至答题,大家注意呀! 博主码字不易点个关注吧,祝期末顺利~~ 1.单选题(2分) 下列说法错误的是:( B ) A.选调生属于公务员系统 B.公务员属于事业编 C.选调生有基层锻炼的要求 D…...
python报错No module named ‘tensorflow.keras‘
是由于不同版本的tensorflow下的keras所在的路径不同,结合所安装的tensorflow的目录结构修改from语句即可。 原语句: from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense 修改后: from tensorflow.python.keras.lay…...

Linux中《基础IO》详细介绍
目录 理解"文件"狭义理解广义理解文件操作的归类认知系统角度文件类别 回顾C文件接口打开文件写文件读文件稍作修改,实现简单cat命令 输出信息到显示器,你有哪些方法stdin & stdout & stderr打开文件的方式 系统⽂件I/O⼀种传递标志位…...

【无标题】湖北理元理律师事务所:债务优化中的生活保障与法律平衡之道
文/法律实务观察组 在债务重组领域,专业机构的核心价值不仅在于减轻债务数字,更在于帮助债务人在履行义务的同时维持基本生活尊严。湖北理元理律师事务所的服务实践表明,合法债务优化需同步实现三重平衡: 法律刚性(债…...
DiscuzX3.5发帖json api
参考文章:PHP实现独立Discuz站外发帖(直连操作数据库)_discuz 发帖api-CSDN博客 简单改造了一下,适配我自己的需求 有一个站点存在多个采集站,我想通过主站拿标题,采集站拿内容 使用到的sql如下 CREATE TABLE pre_forum_post_…...

恶补电源:1.电桥
一、元器件的选择 搜索并选择电桥,再multisim中选择FWB,就有各种型号的电桥: 电桥是用来干嘛的呢? 它是一个由四个二极管搭成的“桥梁”形状的电路,用来把交流电(AC)变成直流电(DC)。…...

【Linux】Linux安装并配置RabbitMQ
目录 1. 安装 Erlang 2. 安装 RabbitMQ 2.1.添加 RabbitMQ 仓库 2.2.安装 RabbitMQ 3.配置 3.1.启动和管理服务 4. 访问管理界面 5.安装问题 6.修改密码 7.修改端口 7.1.找到文件 7.2.修改文件 1. 安装 Erlang 由于 RabbitMQ 是用 Erlang 编写的,需要先安…...

MeshGPT 笔记
[2311.15475] MeshGPT: Generating Triangle Meshes with Decoder-Only Transformers https://library.scholarcy.com/try 真正意义上的AI生成三维模型MESHGPT来袭!_哔哩哔哩_bilibili GitHub - lucidrains/meshgpt-pytorch: Implementation of MeshGPT, SOTA Me…...