当前位置: 首页 > news >正文

学习RocketMQ(记录了个人艰难学习RocketMQ的笔记)

一、部署单点RocketMQ

Docker 部署 RocketMQ (图文并茂超详细)_docker 部署rocketmq-CSDN博客

这个博主讲的很好,可食用,替大家实践了一遍

二、原理篇

为什么使用RocketMQ:

为什么选择RocketMQ | RocketMQ

关于一些原理,感觉官网讲的也非常透彻

领域模型概述 | RocketMQ

还有一些功能特性:

普通消息 | RocketMQ

本文的实操篇只是讲了发送普通消息

关于中间件对比,下面我之前有看过一些很好的文章:

Kafka、RabbitMQ、RocketMQ等消息中间件的对比_rabbimq rocket 差异-CSDN博客

rpc和zmq性能对比 rpc mq区别_mob6454cc70642f的技术博客_51CTO博客

RabbitMQ,RocketMQ,Kafka--区别/对比/选型_51CTO博客_rocketmq rabbitmq kafka选型

三、实操篇

先讲讲原理:

如果你需要不同业务,就需要不同消费者组,不要想着同一个消费者组可以通过订阅不同主题达到不同业务,因为同一个消费者组内的功能必须是一致的,可以换个角度想,既然你是一个业务,一个业务就是一个主题嘛,你用不同的业务实现,就多添加几个消费者组,分别订阅那个主题(业务),然后通过不同的Tag区分就行了,而且而且,不要想着说:一个消费者组一个主题通过不同Tag来区分,虽然我在刚刚学习的时候也这样子想,结果踩了一天的坑,看了好多博客好文来理解,在文末也有关于为什么不能这样子做。

1、引入依赖

RocketMQ的依赖:

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version>
</dependency>

demo案例的全部依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.bluefoxyu</groupId><artifactId>RocketMQ-demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>17</maven.compiler.source><maven.compiler.target>17</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>3.2.4</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.27</version></dependency><!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.20</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.16</version></dependency></dependencies></project>

2、启动自动装配

2.2.3 版本的RocketMQ 没有适配 SpringBoot3,只适配SpringBoot2,所以需要自己去配置好自动装配。可以参考我下面这篇文章:

Springboot3+自动装配_springboot3自动装配-CSDN博客

在项目中的 resources 目录下创建 META-INF/spring 文件夹,并创建下面这个文件。

org.springframework.boot.autoconfigure.AutoConfiguration.imports

# RocketMQ 2.2.3 version does not adapt to SpringBoot3
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

3、配置application.yml

server:port: 8080spring:profiles:active: devrocketmq:name-server: xxx:9876 # NameServer 地址producer:group: rocketmq-v3-demo-sent-message-group_bluefoxyu # 全局发送者组定义send-message-timeout: 2000# 发送消息失败时的重试次数。设置为 1 表示如果发送失败,会再重试一次(总共尝试两次)。适用于同步发送消息失败时的重试次数。retry-times-when-send-failed: 1# 异步发送失败时的重试次数。设置为 1 表示在异步发送失败时会再尝试一次。适用于异步发送消息失败时的重试次数。retry-times-when-send-async-failed: 1logging:level:com:bluefoxyu:producer: infoconsumer: infocontroller: info

4、启动类

相比这个不必多说了。

RocketMQApplication:
package com.bluefoxyu;import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@Slf4j
@SpringBootApplication
public class RocketMQApplication {public static void main(String[] args) {SpringApplication.run(RocketMQApplication.class, args);}
}

5、编写一个统一格式的消息对象

package com.bluefoxyu.message;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serial;
import java.io.Serializable;@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class GeneralMessageEvent implements Serializable {@Serialprivate static final long serialVersionUID = 1L;private String body;private String keys;}

上述实体类实现了Serializable接口,能够正常被序列化或者反序列化。

6、生产者

编写一个生产者,统一做好发送消息的一个模板,方便简化接口实现发送消息的代码编写,显得更加优雅一点,说到发送消息,就需要知道发送到哪个主题,然后哪些消费者组去消费,然后还有每条消息的唯一标识key,唯一标识可以用uuid生成,也可以用redis生成一个增长的不重复的id,这里使用uuid简化。

注意:如果你的项目里面只有一个消费者组,只有一个消费业务,这样子是不需要传Tag(过滤标签)的,但是正常情况都会有多个消息队列任务,下面提供两种重载的方法。

code:

package com.bluefoxyu.producer;import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;/*** 封装全体的消息生产者*/
@Slf4j
@Component
@RequiredArgsConstructor
public class GeneralMessageProducer {private final RocketMQTemplate rocketMQTemplate;/*** 发送普通消息** @param topic            消息发送主题,用于标识同一类业务逻辑的消息* @param keys             消息索引键,可根据关键字精确查找某条消息* @param messageSendEvent 普通消息发送事件,自定义对象,最终都会序列化为字符串* @return 消息发送 RocketMQ 返回结果*/public SendResult sendMessage(String topic, String keys, GeneralMessageEvent messageSendEvent) {SendResult sendResult;try {Message<?> message = MessageBuilder.withPayload(messageSendEvent).setHeader(MessageConst.PROPERTY_KEYS, keys).build();// 2000L 表示发送消息的超时时间为 2000 毫秒,即 2 秒sendResult = rocketMQTemplate.syncSend(topic,message,2000L);log.info("[普通消息] 消息发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), keys);} catch (Throwable ex) {log.error("[普通消息] 消息发送失败,消息体:{}", JSON.toJSONString(messageSendEvent), ex);throw ex;}return sendResult;}/*** 发送普通消息** @param topic            消息发送主题,用于标识同一类业务逻辑的消息* @param tag              消息的过滤标签,消费者可通过Tag对消息进行过滤,仅接收指定标签的消息。* @param keys             消息索引键,可根据关键字精确查找某条消息* @param messageSendEvent 普通消息发送事件,自定义对象,最终都会序列化为字符串* @return 消息发送 RocketMQ 返回结果*/public SendResult sendMessage(String topic, String tag, String keys, GeneralMessageEvent messageSendEvent) {SendResult sendResult;try {// 构建消息的 destination (主题和标签)StringBuilder destinationBuilder = StrUtil.builder().append(topic);if (StrUtil.isNotBlank(tag)) {destinationBuilder.append(":").append(tag);  // 设置tag}Message<?> message = MessageBuilder.withPayload(messageSendEvent).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag) // 设置消息的标签.build();// 2000L 表示发送消息的超时时间为 2000 毫秒,即 2 秒sendResult = rocketMQTemplate.syncSend(destinationBuilder.toString(),message,2000L);log.info("[普通消息] 消息发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), keys);} catch (Throwable ex) {log.error("[普通消息] 消息发送失败,消息体:{}", JSON.toJSONString(messageSendEvent), ex);throw ex;}return sendResult;}}

7、定义一个constant

package com.bluefoxyu.constant;/*** RocketMQ 常量类* @author bluefoxyu*/
public class RocketMQConstant {/*** Group 消费者组定义*/public static final String GENERAL_MESSAGE_CONSUMER_GROUP = "general_message_consumer_group";public static final String MESSAGE_CONSUMER_GROUP_A = "message_consumer_group_A";public static final String MESSAGE_CONSUMER_GROUP_B = "message_consumer_group_B";public static final String MESSAGE_CONSUMER_GROUP_C = "message_consumer_group_C";/*** Topic 主题定义*/public static final String MESSAGE_TOPIC_1 = "message_topic_1";public static final String MESSAGE_TOPIC_2 = "message_topic_2";/*** Tag 标签*/public static final String MESSAGE_TAG_A = "message_tag_A";public static final String MESSAGE_TAG_B = "message_tag_B";public static final String MESSAGE_TAG_C = "message_tag_C";}

8、多/单个消费者订阅一个主题

1.实现消费者

这里需要实现监听的消息的实体类类型是什么,GeneralMessageEvent 是我们之前封装的统一消息对象

implements RocketMQListener<GeneralMessageEvent>

在onMessage方法中,通过

JSON.toJSONString(message)

就可以拿到解析好的消息内容,也就是我们真正需要发送的消息,下面我编写三个消费者来进行消费,不过绑定的都是同一个主题,类似负载均衡的功能,这里只用一个消费者也是一样的,因为后续还需要测其他功能,所以这里我写了三个消费者。

GeneralMessageConsumer1:
package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.GENERAL_MESSAGE_CONSUMER_GROUP;
import static com.bluefoxyu.constant.RocketMQConstant.MESSAGE_TOPIC_1;/*** topic 对应主题* consumerGroup 指定消费的分组*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MESSAGE_TOPIC_1,consumerGroup = GENERAL_MESSAGE_CONSUMER_GROUP // 相同的 consumerGroup
)
public class GeneralMessageConsumer1 implements RocketMQListener<GeneralMessageEvent>{@Overridepublic void onMessage(GeneralMessageEvent message) {log.info("[消费者GeneralMessageConsumer1] 接收到消息,消息体:{},消息:{}", JSON.toJSONString(message), JSON.toJSONString(message.getBody()));}}
GeneralMessageConsumer2:
package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.GENERAL_MESSAGE_CONSUMER_GROUP;
import static com.bluefoxyu.constant.RocketMQConstant.MESSAGE_TOPIC_1;/*** topic 对应主题* consumerGroup 指定消费的分组*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MESSAGE_TOPIC_1,consumerGroup = GENERAL_MESSAGE_CONSUMER_GROUP // 相同的 consumerGroup
)
public class GeneralMessageConsumer2 implements RocketMQListener<GeneralMessageEvent>{@Overridepublic void onMessage(GeneralMessageEvent message) {log.info("[消费者GeneralMessageConsumer2] 接收到消息,消息体:{},消息:{}", JSON.toJSONString(message), JSON.toJSONString(message.getBody()));}}
GeneralMessageConsumer3:
package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.GENERAL_MESSAGE_CONSUMER_GROUP;
import static com.bluefoxyu.constant.RocketMQConstant.MESSAGE_TOPIC_1;/*** topic 对应主题* consumerGroup 指定消费的分组*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MESSAGE_TOPIC_1,consumerGroup = GENERAL_MESSAGE_CONSUMER_GROUP // 相同的 consumerGroup
)
public class GeneralMessageConsumer3 implements RocketMQListener<GeneralMessageEvent> {@Overridepublic void onMessage(GeneralMessageEvent message) {log.info("[消费者GeneralMessageConsumer3] 接收到消息,消息体:{},消息:{}", JSON.toJSONString(message), JSON.toJSONString(message.getBody()));}}

2.编写接口发送消息

发送消息需要发送这四要素:

  1.  topic 主题 
  2.  key 唯一标识
  3. message 需要发送的消息
package com.bluefoxyu.controller;import com.bluefoxyu.producer.GeneralMessageProducer;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.UUID;import static com.bluefoxyu.constant.RocketMQConstant.MESSAGE_TOPIC_1;@RestController
@RequiredArgsConstructor
public class controller {private final GeneralMessageProducer generalMessageDemoProduce;@PostMapping("/send/topic1/general-messageA")public String sendTopic1GeneralMessageA() {String keys = UUID.randomUUID().toString();GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder().body("Message for A").keys(keys).build();SendResult sendResult = generalMessageDemoProduce.sendMessage(MESSAGE_TOPIC_1,keys,generalMessageEvent);// 返回发送成功的状态名return sendResult.getSendStatus().name();}@PostMapping("/send/topic1/general-messageB")public String sendTopic1GeneralMessageB() {String keys = UUID.randomUUID().toString();GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder().body("Message for B").keys(keys).build();SendResult sendResult = generalMessageDemoProduce.sendMessage(MESSAGE_TOPIC_1,keys,generalMessageEvent);return sendResult.getSendStatus().name();}@PostMapping("/send/topic1/general-messageC")public String sendTopic1GeneralMessageC() {String keys = UUID.randomUUID().toString();GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder().body("Message for C").keys(keys).build();SendResult sendResult = generalMessageDemoProduce.sendMessage(MESSAGE_TOPIC_1,keys,generalMessageEvent);return sendResult.getSendStatus().name();}}

3.接口测试

准备这三个测试接口:

开始分别测试三个接口这里就不一一展示了。

看控制台:

如结果消费了三次

9、测试多个消费者分别订阅不同主题

如果相同消费组的三个消费者组分别订阅不同主题,会怎么样呢。修改的代码如下,

当然,哈哈哈哈哈哈,就是消费不到消息(对于小白的我也被困扰了好久),由于是有问题的,代码就不粘贴了【狗头】。如下:

这里参考了一篇大佬的文章:

rocketmq问题汇总-一个consumerGroup只对应一个topic_org.apache.rocketmq.client.exception.mqbrokerexcep-CSDN博客

看完后悟了很多,大概意思就是一个消费者组中的职责应该是一致的,应该都去订阅相同主题的,如果一个消费者订阅了两个主题,那么其他同组的消费者也应该订阅那两个主题,参考评论区这几大佬的评论:

这个大佬就说的很透彻了:

10、一个消费者订阅多个主题

在上面说了既然一个消费者可以订阅多个主题,但是前提条件是同一个消费组中必须订阅相同主题,那应该怎么实现呢。

直接给代码:

package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Component;import java.util.List;/*** topic 对应主题* consumerGroup 指定消费的分组* RocketMQPushConsumerLifecycleListener 基础改监听器可以监听多个主题*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "",consumerGroup = "rocketmq-v2-demo-message-group_bluefoxyu" // 相同的 consumerGroup
)
public class GeneralMessageConsumer implements RocketMQListener<GeneralMessageEvent> , RocketMQPushConsumerLifecycleListener {// 由于修改成可以订阅多个主题,在下面prepareStart就已经消费了,onMessage就不会执行了@Overridepublic void onMessage(GeneralMessageEvent message) {/*log.info("消费者GeneralMessageConsumer接收到消息,消息体:{}", JSON.toJSONString(message));System.out.println("General message = " + message);*/}@Overridepublic void prepareStart(DefaultMQPushConsumer consumer) {try {consumer.subscribe("rocketmq-demo-message_topic_general", "*");consumer.subscribe("rocketmq-demo-message_topic_TagA", "*");consumer.subscribe("rocketmq-demo-message_topic_TagB", "*");consumer.registerMessageListener((List<MessageExt> messages, ConsumeConcurrentlyContext context) -> {if (CollectionUtils.isNotEmpty(messages)) {for (MessageExt message : messages) {log.info("消费者GeneralMessageConsumer接收到消息,消费完成:消费主题为:{} , 消费的消息为:{}" ,message.getTopic() ,new String(message.getBody()));}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});} catch (MQClientException e) {log.error("消费失败,异常消息为:{}",e.getMessage());}}}
package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Component;import java.util.List;/*** topic 对应主题* consumerGroup 指定消费的分组* RocketMQPushConsumerLifecycleListener 基础改监听器可以监听多个主题*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "",consumerGroup = "rocketmq-v2-demo-message-group_bluefoxyu" // 相同的 consumerGroup
)
public class TagAMessageConsumer implements RocketMQListener<GeneralMessageEvent> , RocketMQPushConsumerLifecycleListener {// 由于修改成可以订阅多个主题,在下面prepareStart就已经消费了,onMessage就不会执行了@Overridepublic void onMessage(GeneralMessageEvent message) {/*log.info("消费者TagAMessageConsumer接收到消息,消息体:{}", JSON.toJSONString(message));System.out.println("tagA message = " + message);*/}@Overridepublic void prepareStart(DefaultMQPushConsumer consumer) {try {consumer.subscribe("rocketmq-demo-message_topic_general", "*");consumer.subscribe("rocketmq-demo-message_topic_TagA", "*");consumer.subscribe("rocketmq-demo-message_topic_TagB", "*");consumer.registerMessageListener((List<MessageExt> messages, ConsumeConcurrentlyContext context) -> {if (CollectionUtils.isNotEmpty(messages)) {for (MessageExt message : messages) {log.info("消费者TagAMessageConsumer接收到消息,消费完成:消费主题为:{} , 消费的消息为:{}" ,message.getTopic() ,new String(message.getBody()));}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});} catch (MQClientException e) {log.error("消费失败,异常消息为:{}",e.getMessage());}}
}
package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Component;import java.util.List;/*** topic 对应主题* consumerGroup 指定消费的分组* RocketMQPushConsumerLifecycleListener 基础改监听器可以监听多个主题*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "",consumerGroup = "rocketmq-v2-demo-message-group_bluefoxyu" // 相同的 consumerGroup
)
public class TagBMessageConsumer implements RocketMQListener<GeneralMessageEvent> , RocketMQPushConsumerLifecycleListener {// 由于修改成可以订阅多个主题,在下面prepareStart就已经消费了,onMessage就不会执行了@Overridepublic void onMessage(GeneralMessageEvent message) {/*log.info("消费者TagBMessageConsumer接收到消息,消息体:{}", JSON.toJSONString(message));System.out.println("tagB message = " + message);*/}@Overridepublic void prepareStart(DefaultMQPushConsumer consumer) {try {consumer.subscribe("rocketmq-demo-message_topic_general", "*");consumer.subscribe("rocketmq-demo-message_topic_TagA", "*");consumer.subscribe("rocketmq-demo-message_topic_TagB", "*");consumer.registerMessageListener((List<MessageExt> messages, ConsumeConcurrentlyContext context) -> {if (CollectionUtils.isNotEmpty(messages)) {for (MessageExt message : messages) {log.info("消费者TagBMessageConsumer接收到消息,消费完成:消费主题为:{} , 消费的消息为:{}" ,message.getTopic() ,new String(message.getBody()));}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});} catch (MQClientException e) {log.error("消费失败,异常消息为:{}",e.getMessage());}}
}

分别测试三个接口:

参考这位大佬的博客:

rocketmq (消费者消费同一个消费组,不同的topic)_rocketmq一个消费组消费多个topic-CSDN博客

11、多个消费者组订阅相同主题

这个业务经常是有的,希望订阅同一种业务,但是有不同的实现,这时候就需要使用Tag过滤标签来区分了。

1、实现消费者
MessageConsumerA:

package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.*;/*** topic 对应主题* consumerGroup 指定消费的分组*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MESSAGE_TOPIC_2,consumerGroup = MESSAGE_CONSUMER_GROUP_A,selectorExpression = MESSAGE_TAG_A
)
public class MessageConsumerA implements RocketMQListener<GeneralMessageEvent>{@Overridepublic void onMessage(GeneralMessageEvent message) {log.info("[消费者MessageConsumerA] 接收到消息,消息体:{},消息:{}", JSON.toJSONString(message), JSON.toJSONString(message.getBody()));}}
MessageConsumerB:

package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.*;/*** topic 对应主题* consumerGroup 指定消费的分组*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MESSAGE_TOPIC_2,consumerGroup = MESSAGE_CONSUMER_GROUP_B,selectorExpression = MESSAGE_TAG_B
)
public class MessageConsumerB implements RocketMQListener<GeneralMessageEvent>{@Overridepublic void onMessage(GeneralMessageEvent message) {log.info("[消费者MessageConsumerB] 接收到消息,消息体:{},消息:{}", JSON.toJSONString(message), JSON.toJSONString(message.getBody()));}}
MessageConsumerC:

package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.*;/*** topic 对应主题* consumerGroup 指定消费的分组*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MESSAGE_TOPIC_2,consumerGroup = MESSAGE_CONSUMER_GROUP_C,selectorExpression = MESSAGE_TAG_C
)
public class MessageConsumerC implements RocketMQListener<GeneralMessageEvent>{@Overridepublic void onMessage(GeneralMessageEvent message) {log.info("[消费者MessageConsumerC] 接收到消息,消息体:{},消息:{}", JSON.toJSONString(message), JSON.toJSONString(message.getBody()));}}

2、编写接口发送消息

再controller添加那三个接口

    @PostMapping("/send/topic2/messageA")public String sendTopic2MessageA() {String keys = UUID.randomUUID().toString();GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder().body("Message for A").keys(keys).build();SendResult sendResult = generalMessageDemoProduce.sendMessage(MESSAGE_TOPIC_2,MESSAGE_TAG_A,keys,generalMessageEvent);// 返回发送成功的状态名return sendResult.getSendStatus().name();}@PostMapping("/send/topic2/messageB")public String sendTopic3MessageB() {String keys = UUID.randomUUID().toString();GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder().body("Message for B").keys(keys).build();SendResult sendResult = generalMessageDemoProduce.sendMessage(MESSAGE_TOPIC_2,MESSAGE_TAG_B,keys,generalMessageEvent);return sendResult.getSendStatus().name();}@PostMapping("/send/topic2/messageC")public String sendTopic2MessageC() {String keys = UUID.randomUUID().toString();GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder().body("Message for C").keys(keys).build();SendResult sendResult = generalMessageDemoProduce.sendMessage(MESSAGE_TOPIC_2,MESSAGE_TAG_C,keys,generalMessageEvent);return sendResult.getSendStatus().name();}
3、接口测试

消费成功!

也可以参考一个佬的博客:RocketMQ,同一个topic下是否可以通过不同的tag来进行订阅吗?_rocketmq一个topic多个tag-CSDN博客

当然,如果订阅不同主题也是没问题的,这里就不作演示了。

四、文末大佬好文

最后加上两个大佬好文,感觉讲的都很好:

面试官:RocketMQ同一个消费组内的消费者订阅了不同tag,会有问题吗?_rocketmq 订阅多个tag-CSDN博客

面试官:RocketMQ一个消费组内订阅同一个主题不同的TAG为什么会丢消息_为什么rocketmq相同消费组不同tag会有问题-CSDN博客

对于一个消费者组订阅同一个主题不同tag会丢消息,在前几天从0到1学习的时候,以为是可以的,但是踩了大坑。

相关文章:

学习RocketMQ(记录了个人艰难学习RocketMQ的笔记)

一、部署单点RocketMQ Docker 部署 RocketMQ (图文并茂超详细)_docker 部署rocketmq-CSDN博客 这个博主讲的很好&#xff0c;可食用&#xff0c;替大家实践了一遍 二、原理篇 为什么使用RocketMQ&#xff1a; 为什么选择RocketMQ | RocketMQ 关于一些原理&#xff0c;感觉…...

【设计模式】策略模式定义及其实现代码示例

文章目录 一、策略模式1.1 策略模式的定义1.2 策略模式的参与者1.3 策略模式的优点1.4 策略模式的缺点1.5 策略模式的使用场景 二、策略模式简单实现2.1 案例描述2.2 实现代码 三、策略模式的代码优化3.1 优化思路3.2 抽象策略接口3.3 上下文3.4 具体策略实现类3.5 测试 参考资…...

list与iterator的之间的区别,如何用斐波那契数列探索yield

问题 list与iterator的之间的区别是什么&#xff1f;如何用斐波那契数列探索yield&#xff1f; 2 方法 将数据转换成list,通过对list索引和切片操作&#xff0c;以及可以进行添加、删除和修改元素。 iterator是一种对象&#xff0c;用于遍历可迭代对象&#xff08;如列表、元组…...

抖音店铺数据也就是抖店,如何使用小店数据集来挖掘价值?

​ 抖音商家现在基本达到二百多万家抖店&#xff0c;有一些公司可能会根据开放的数据研究行业分布、GMV等等&#xff0c;就像是也出了专业的一些平台如“蝉妈妈”、“达多多”&#xff0c;对我来说受限制就是难受。 当然也有很多大型合法的数据平台有抖店数据集&#xff0c;但…...

KubeVirt 安装和配置 Windows虚拟机

本文将将介绍如何安装 KubeVirt 和使用 KubeVirt 配置 Windows 虚拟机。 前置条件 准备 Ubuntu 操作系统&#xff0c;一定要安装图形化界面。 安装 Docker&#xff08;最新版本&#xff09; 安装 libvirt 和 TigerVNC&#xff1a; apt install libvirt-daemon-system libvir…...

CM API方式设置YARN队列资源

简述 对于CDH版本我们可以参考Fayson的文章,本次是CDP7.1.7 CM7.4.4 ,下面只演示一个设置队列容量百分比的示例,其他请参考cloudera官网。 获取cookies文件 生成cookies.txt文件 curl -i -k -v -c cookies.txt -u admin:admin http://192.168.242.100:7180/api/v44/clusters …...

Mysql常用语法一篇文章速成

文章目录 前言前置环境数据库的增删改查查询数据查询所有条件查询多条件查询模糊查询分页查询排序查询分组查询⭐️⭐️关联查询关联分页查询 添加数据insert插入多条记录不指定列名(适用于所有列都有值的情况) 更新数据更新多条记录更新多个列更新不满足条件的记录 删除统计数…...

Intel nuc x15 重装系统步骤和注意事项(LAPKC71F、LAPKC71E、LAPKC51E)

注意本教程的对象是11代CPU&#xff0c;英伟达独显的nuc x15&#xff0c;不是12代arc显卡的。 x15安装win11 24h2&#xff0c;如果在装系统时联网&#xff0c;windows自动下载的最新驱动有兼容问题&#xff0c;会导致【英特尔显卡控制中心】装不上&#xff0c;或者【英特尔nuc…...

Linux之实战命令59:iwlist应用实例(九十三)

简介&#xff1a; CSDN博客专家、《Android系统多媒体进阶实战》一书作者 新书发布&#xff1a;《Android系统多媒体进阶实战》&#x1f680; 优质专栏&#xff1a; Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a; 多媒体系统工程师系列【…...

数据库_SQLite3

下载 1、更新软件源&#xff1a; sudo apt-get update 2、下载SQLite3&#xff1a; sudo apt-get install sqlite3 3、验证&#xff1a; sqlite3启动数据库&#xff0c;出现以下界面代表运行正常。输入 .exit 可以退出数据库 4、安装sqlite3的库 sudo apt-get install l…...

.Net Framework里演示怎么样使用StringBuilder、Math.Min和String.Format

StringBuilder、Math.Min和String.Format, 这几个功能都是我们经常使用的功能, 但是怎么样正确地使用,还是得向微软的开发人员学习。 他们在写.Net Framework的源码时,就会大量使用。 因此,我们可以多看看这分代码,就可以理解他们怎么样使用的。 他们的使用方式,一…...

Oracle创建存储过程,创建定时任务

在Oracle数据库中&#xff0c;创建存储过程和定时任务&#xff08;也称为调度任务&#xff09;是常见的数据库管理任务。以下是创建存储过程和定时任务的步骤和说明。 创建存储过程 创建存储过程的sql脚本 create or replace procedure 存储过程名称... is begin脚本逻辑...…...

<HarmonyOS第一课>应用/元服务上架的课后习题

善者&#xff0c;吾善之&#xff1b; 不善者&#xff0c;吾亦善之&#xff0c;德善。 信者&#xff0c;吾信之&#xff1b; 不信者&#xff0c;吾亦信之&#xff0c;德信。 圣人在天下&#xff0c;歙歙焉为天下浑其心&#xff0c;百姓皆注其耳目&#xff0c;圣人皆孩之。 通过&…...

【Python】探索函数的奥秘:从基础到高级的深度解析(下)

目录 &#x1f354; 函数的参数进阶 1、函数的参数 2、函数的参数类型(调用) 2.1 位置参数 2.2 关键词参数&#xff08;Python特有&#xff09; 3、函数定义时缺省参数&#xff08;参数默认值&#xff09; 4、不定长参数 4.1 不定长元组&#xff08;位置&#xff09;参数…...

ima.copilot:智慧因你而生

在数字化时代&#xff0c;信息的获取、处理和创作已经成为我们日常工作和学习中不可或缺的一部分。腾讯公司推出的ima.copilot&#xff08;简称ima&#xff09;正是为了满足这一需求&#xff0c;它是一款由腾讯混元大模型提供技术支持的智能工作台产品&#xff0c;旨在通过智能…...

Vue-$el属性

原博客地址&#xff1a;深入 Vue.js 的心脏&#xff1a;全面剖析 $el 属性_vue $el-CSDN博客 目录 1 $el是什么 1.1 $el本质 1.2 访问$el时机 1.3 $el与模板的关系 2 $el使用场景 2.1 集成第三方库 2.2 操作DOM元素样式 2.3 处理焦点和事件 2.4 实现自定义指令 3 $e…...

LLC Power Switches and Resonant Tank 笔记

1.概述 上面是一个典型的LLC电路。注意Lm是励磁电感&#xff0c;就是次级线圈空载时的主变压器电感&#xff0c;据说在计算谐振频率时无需关心。然后&#xff0c;作为DCDC电源&#xff0c;它通过调整谐振频率&#xff0c;来改变输出的电流。负载越大&#xff0c;频率越低&#…...

Python 如何在 Web 环境中使用 Matplotlib 进行数据可视化

Python Matplotlib 在 Web 环境中的可视化 数据可视化是数据科学和分析中一个至关重要的部分&#xff0c;它能帮助我们更好地理解和解释数据。在现代应用中&#xff0c;越来越多的开发者希望能够将数据可视化结果展示在网页上。Matplotlib 是 Python 中最常用的数据可视化库之…...

C#-数组:一维数组、二维数组、交错数组

数组&#xff1a;声明初始化过后&#xff0c;就不能在原有的基础上进行 添加 或者 删除 了 一&#xff1a;一维数组 一般将一维数组简称为数组 1.1 数组的声明 int[] arr1; 没有分配房间。初始化后就分配房间了int[] arr2 new int[5]; 存在默认值&#xff0c;为0int[] arr3…...

动态规划应该如何学习?

动态规划如何学习 参考灵神的视频和题解做的笔记&#xff08;灵神YYDS&#xff0c;以后也都会用这套逻辑去思考&#xff09; 枚举选哪个&#xff1a; 动态规划入门&#xff1a;从记忆化搜索到递推_哔哩哔哩_bilibili 746. 使用最小花费爬楼梯 - 力扣&#xff08;LeetCode&a…...

CTF show Web 红包题第六弹

提示 1.不是SQL注入 2.需要找关键源码 思路 进入页面发现是一个登录框&#xff0c;很难让人不联想到SQL注入&#xff0c;但提示都说了不是SQL注入&#xff0c;所以就不往这方面想了 ​ 先查看一下网页源码&#xff0c;发现一段JavaScript代码&#xff0c;有一个关键类ctfs…...

《Playwright:微软的自动化测试工具详解》

Playwright 简介:声明内容来自网络&#xff0c;将内容拼接整理出来的文档 Playwright 是微软开发的自动化测试工具&#xff0c;支持 Chrome、Firefox、Safari 等主流浏览器&#xff0c;提供多语言 API&#xff08;Python、JavaScript、Java、.NET&#xff09;。它的特点包括&a…...

C# SqlSugar:依赖注入与仓储模式实践

C# SqlSugar&#xff1a;依赖注入与仓储模式实践 在 C# 的应用开发中&#xff0c;数据库操作是必不可少的环节。为了让数据访问层更加简洁、高效且易于维护&#xff0c;许多开发者会选择成熟的 ORM&#xff08;对象关系映射&#xff09;框架&#xff0c;SqlSugar 就是其中备受…...

学校时钟系统,标准考场时钟系统,AI亮相2025高考,赛思时钟系统为教育公平筑起“精准防线”

2025年#高考 将在近日拉开帷幕&#xff0c;#AI 监考一度冲上热搜。当AI深度融入高考&#xff0c;#时间同步 不再是辅助功能&#xff0c;而是决定AI监考系统成败的“生命线”。 AI亮相2025高考&#xff0c;40种异常行为0.5秒精准识别 2025年高考即将拉开帷幕&#xff0c;江西、…...

【C++进阶篇】智能指针

C内存管理终极指南&#xff1a;智能指针从入门到源码剖析 一. 智能指针1.1 auto_ptr1.2 unique_ptr1.3 shared_ptr1.4 make_shared 二. 原理三. shared_ptr循环引用问题三. 线程安全问题四. 内存泄漏4.1 什么是内存泄漏4.2 危害4.3 避免内存泄漏 五. 最后 一. 智能指针 智能指…...

【Elasticsearch】Elasticsearch 在大数据生态圈的地位 实践经验

Elasticsearch 在大数据生态圈的地位 & 实践经验 1.Elasticsearch 的优势1.1 Elasticsearch 解决的核心问题1.1.1 传统方案的短板1.1.2 Elasticsearch 的解决方案 1.2 与大数据组件的对比优势1.3 关键优势技术支撑1.4 Elasticsearch 的竞品1.4.1 全文搜索领域1.4.2 日志分析…...

使用SSE解决获取状态不一致问题

使用SSE解决获取状态不一致问题 1. 问题描述2. SSE介绍2.1 SSE 的工作原理2.2 SSE 的事件格式规范2.3 SSE与其他技术对比2.4 SSE 的优缺点 3. 实战代码 1. 问题描述 目前做的一个功能是上传多个文件&#xff0c;这个上传文件是整体功能的一部分&#xff0c;文件在上传的过程中…...

若依登录用户名和密码加密

/*** 获取公钥&#xff1a;前端用来密码加密* return*/GetMapping("/getPublicKey")public RSAUtil.RSAKeyPair getPublicKey() {return RSAUtil.rsaKeyPair();}新建RSAUti.Java package com.ruoyi.common.utils;import org.apache.commons.codec.binary.Base64; im…...

GB/T 43887-2024 核级柔性石墨板材检测

核级柔性石墨板材是指以可膨胀石墨为原料、未经改性和增强、用于核工业的核级柔性石墨板材。 GB/T 43887-2024核级柔性石墨板材检测检测指标&#xff1a; 测试项目 测试标准 外观 GB/T 43887 尺寸偏差 GB/T 43887 化学成分 GB/T 43887 密度偏差 GB/T 43887 拉伸强度…...

RKNN开发环境搭建2-RKNN Model Zoo 环境搭建

目录 1.简介2.环境搭建2.1 启动 docker 环境2.2 安装依赖工具2.3 下载 RKNN Model Zoo2.4 RKNN模型转化2.5编译C++1.简介 RKNN Model Zoo基于 RKNPU SDK 工具链开发, 提供了目前主流算法的部署例程. 例程包含导出RKNN模型, 使用 Python API, CAPI 推理 RKNN 模型的流程.   本…...