当前位置: 首页 > news >正文

02-RocketMQ开发模型

目录汇总:RocketMQ从入门到精通汇总
上一篇:01-RocketMQ整体理解与快速实战


上一部分,我们可以搭建RocketMQ集群,然后也可以用命令行往RocketMQ写入消息并进行消费了。这一部分我们就来看怎么在项目中用上RocketMQ。

一、RocketMQ原生API使用

​ 使用RocketMQ的原生API开发是最简单也是目前看来最牢靠的方式。这里我们用SpringBoot来搭建一系列消息生产者和消息消费者,来访问我们之前搭建的RocketMQ集群。

1、测试环境搭建

​ 首先创建一个基于Maven的SpringBoot工程,引入如下依赖:

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.7.1</version>
</dependency>

另外还与一些依赖,例如openmessage、acl等扩展功能还需要添加对应的依赖。具体可以参见RocketMQ源码中的example模块。在RocketMQ源码包中的example模块提供了非常详尽的测试代码,也可以拿来直接调试。我们这里就用源码包中的示例来连接我们自己搭建的RocketMQ集群来进行演示。

RocketMQ的官网上有很多经典的测试代码,这些代码虽然依赖的版本比较老,但是还是都可以运行的。所以我们还是以官网上的顺序进行学习。

​ 但是在调试这些代码的时候要注意一个问题:这些测试代码中的生产者和消费者都需要依赖NameServer才能运行,只需要将NameServer指向我们自己搭建的RocketMQ集群,而不需要管Broker在哪里,就可以连接我们自己的自己的RocketMQ集群。而RocketMQ提供的生产者和消费者寻找NameServer的方式有两种:

​1. 在代码中指定namesrvAddr属性。例如:consumer.setNamesrvAddr(“127.0.0.1:9876”);

​2. 通过NAMESRV_ADDR环境变量来指定。多个NameServer之间用分号连接。

2、RocketMQ的编程模型

​ 然后RocketMQ的生产者和消费者的编程模型都是有个比较固定的步骤的,掌握这个固定的步骤,对于我们学习源码以及以后使用都是很有帮助的。

  • 消息发送者的固定步骤
  1. 创建消息生产者producer,并制定生产者组名
  2. 指定Nameserver地址
  3. 启动producer
  4. 创建消息对象,指定主题Topic、Tag和消息体
  5. 发送消息
  6. 关闭生产者producer
  • 消息消费者的固定步骤
  1. 创建消费者Consumer,制定消费者组名
  2. 指定Nameserver地址
  3. 订阅主题Topic和Tag
  4. 设置回调函数,处理消息
  5. 启动消费者consumer

3、RocketMQ的消息样例

​ 那我们来逐一连接下RocketMQ都支持哪些类型的消息:

3.1 基本样例

​ 基本样例部分我们使用消息生产者分别通过三种方式发送消息,同步发送、异步发送以及单向发送。

​ 然后使用消费者来消费这些消息。

​ 1. 同步发送消息的样例见:org.apache.rocketmq.example.simple.Producer

等待消息返回后再继续进行下面的操作。

​ 2. 异步发送消息的样例见:org.apache.rocketmq.example.simple.AsyncProducer

这个示例有个比较有趣的地方就是引入了一个countDownLatch来保证所有消息回调方法都执行完了再关闭Producer。 所以从这里可以看出,RocketMQ的Producer也是一个服务端,在往Broker发送消息的时候也要作为服务端提供服务。

​ 3. 单向发送消息的样例:

public class OnewayProducer {public static void main(String[] args) throws Exception{//Instantiate with a producer group name.DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// Specify name server addresses.producer.setNamesrvAddr("localhost:9876");//Launch the instance.producer.start();for (int i = 0; i < 100; i++) {//Create a message instance, specifying topic, tag and message body.Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);//Call send message to deliver message to one of brokers.producer.sendOneway(msg);}//Wait for sending to completeThread.sleep(5000);        producer.shutdown();}
}

关键点就是使用producer.sendOneWay方式来发送消息,这个方法没有返回值,也没有回调。就是只管把消息发出去就行了。

  1. 使用消费者消费消息

​ 消费者消费消息有两种模式,一种是消费者主动去Broker上拉取消息的拉模式,另一种是消费者等待Broker把消息推送过来的推模式。

​ 拉模式的样例见:org.apache.rocketmq.example.simple.PullConsumer

​ 推模式的样例见:org.apache.rocketmq.example.simple.PushConsumer

通常情况下,用推模式比较简单。

实际上RocketMQ的推模式也是由拉模式封装出来的。

4.7.1版本中DefaultMQPullConsumerImpl这个消费者类已标记为过期,但是还是可以使用的。替换的类是DefaultLitePullConsumerImpl。

3.2 顺序消息

顺序消息生产者样例见:org.apache.rocketmq.example.order.Producer

顺序消息消费者样例见:org.apache.rocketmq.example.order.Consumer

验证时,可以启动多个Consumer实例,观察下每一个订单的消息分配以及每个订单下多个步骤的消费顺序。

不管订单在多个Consumer实例之前是如何分配的,每个订单下的多条消息顺序都是固定从0~5的。

RocketMQ保证的是消息的局部有序,而不是全局有序。

先从控制台上看下List mqs是什么。

再回看我们的样例,实际上,RocketMQ也只保证了每个OrderID的所有消息有序(发到了同一个queue),而并不能保证所有消息都有序。所以这就涉及到了RocketMQ消息有序的原理。要保证最终消费到的消息是有序的,需要从Producer、Broker、Consumer三个步骤都保证消息有序才行。

首先在发送者端:在默认情况下,消息发送者会采取Round Robin轮询方式把消息发送到不同的MessageQueue(分区队列),而消费者消费的时候也从多个MessageQueue上拉取消息,这种情况下消息是不能保证顺序的。而只有当一组有序的消息发送到同一个MessageQueue上时,才能利用MessageQueue先进先出的特性保证这一组消息有序。

而Broker中一个队列内的消息是可以保证有序的。

然后在消费者端:消费者会从多个消息队列上去拿消息。这时虽然每个消息队列上的消息是有序的,但是多个队列之间的消息仍然是乱序的。消费者端要保证消息有序,就需要按队列一个一个来取消息,即取完一个队列的消息后,再去取下一个队列的消息。而给consumer注入的MessageListenerOrderly对象,在RocketMQ内部就会通过锁队列的方式保证消息是一个一个队列来取的。MessageListenerConcurrently这个消息监听器则不会锁队列,每次都是从多个Message中取一批数据(默认不超过32条)。因此也无法保证消息有序。

3.3 广播消息

​ 广播消息的消息生产者样例见:org.apache.rocketmq.example.broadcast.PushConsumer

广播消息并没有特定的消息消费者样例,这是因为这涉及到消费者的集群消费模式。在集群状态(MessageModel.CLUSTERING)下,每一条消息只会被同一个消费者组中的一个实例消费到(这跟kafka和rabbitMQ的集群模式是一样的)。而广播模式则是把消息发给了所有订阅了对应主题的消费者,而不管消费者是不是同一个消费者组。


3.4 延迟消息

​ 延迟消息的生产者案例

public class ScheduledMessageProducer {public static void main(String[] args) throws Exception {// Instantiate a producer to send scheduled messagesDefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");// Launch producerproducer.start();int totalMessagesToSend = 100;for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());// This message will be delivered to consumer 10 seconds later.message.setDelayTimeLevel(3);// Send the messageproducer.send(message);}// Shutdown producer after use.producer.shutdown();}}

延迟消息实现的效果就是在调用producer.send方法后,消息并不会立即发送出去,而是会等一段时间再发送出去。这是RocketMQ特有的一个功能。

那会延迟多久呢?延迟时间的设置就是在Message消息对象上设置一个延迟级别message.setDelayTimeLevel(3);

开源版本的RocketMQ中,对延迟消息并不支持任意时间的延迟设定(商业版本中支持),而是只支持18个固定的延迟级别,1到18分别对应messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。这从哪里看出来的?其实从rocketmq-console控制台就能看出来。而这18个延迟级别也支持自行定义,不过一般情况下最好不要自定义修改。

那这么好用的延迟消息是怎么实现的?这18个延迟级别除了在延迟消息中用,还有什么地方用到了?别急,我们会在后面部分进行详细讲解。

3.5 批量消息

批量消息是指将多条消息合并成一个批量消息,一次发送出去。这样的好处是可以减少网络IO,提升吞吐量。

批量消息的消息生产者样例见:org.apache.rocketmq.example.batch.SimpleBatchProducer和org.apache.rocketmq.example.batch.SplitBatchProducer

相信大家在官网以及测试代码中都看到了关键的注释:如果批量消息大于1MB就不要用一个批次发送,而要拆分成多个批次消息发送。也就是说,一个批次消息的大小不要超过1MB

实际使用时,这个1MB的限制可以稍微扩大点,实际最大的限制是4194304字节,大概4MB。但是使用批量消息时,这个消息长度确实是必须考虑的一个问题。而且批量消息的使用是有一定限制的,这些消息应该有相同的Topic,相同的waitStoreMsgOK。而且不能是延迟消息、事务消息等。

3.6 过滤消息

在大多数情况下,可以使用Message的Tag属性来简单快速的过滤信息。

使用Tag过滤消息的消息生产者案例见:org.apache.rocketmq.example.filter.TagFilterProducer

使用Tag过滤消息的消息消费者案例见:org.apache.rocketmq.example.filter.TagFilterConsumer

主要是看消息消费者。consumer.subscribe(“TagFilterTest”, “TagA || TagC”); 这句只订阅TagA和TagC的消息。

TAG是RocketMQ中特有的一个消息属性。RocketMQ的最佳实践中就建议,使用RocketMQ时,一个应用可以就用一个Topic,而应用中的不同业务就用TAG来区分。

但是,这种方式有一个很大的限制,就是一个消息只能有一个TAG,这在一些比较复杂的场景就有点不足了。 这时候,可以使用SQL表达式来对消息进行过滤。

SQL过滤的消息生产者案例见:org.apache.rocketmq.example.filter.SqlFilterProducer

SQL过滤的消息消费者案例见:org.apache.rocketmq.example.filter.SqlFilterConsumer

这个模式的关键是在消费者端使用MessageSelector.bySql(String sql)返回的一个MessageSelector。这里面的sql语句是按照SQL92标准来执行的。sql中可以使用的参数有默认的TAGS和一个在生产者中加入的a属性。

SQL92语法:

RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。

数值比较,比如:>,>=,<,<=,BETWEEN,=;
字符比较,比如:=,<>,IN;
IS NULL 或者 IS NOT NULL;
逻辑符号 AND,OR,NOT;
常量支持类型为:

数值,比如:123,3.1415;
字符,比如:‘abc’,必须用单引号包裹起来;
NULL,特殊的常量
布尔值,TRUE 或 FALSE
使用注意:只有推模式的消费者可以使用SQL过滤。拉模式是用不了的。

大家想一下,这个消息过滤是在Broker端进行的还是在Consumer端进行的?

3.7 事务消息

这个事务消息是RocketMQ提供的一个非常有特色的功能,需要着重理解。

​ 首先,我们了解下什么是事务消息。官网的介绍是:事务消息是在分布式系统中保证最终一致性的两阶段提交的消息实现。他可以保证本地事务执行与消息发送两个操作的原子性,也就是这两个操作一起成功或者一起失败。

​ 其次,我们来理解下事务消息的编程模型。事务消息只保证消息发送者的本地事务与发消息这两个操作的原子性,因此,事务消息的示例只涉及到消息发送者,对于消息消费者来说,并没有什么特别的。

事务消息生产者的案例见:org.apache.rocketmq.example.transaction.TransactionProducer

事务消息的关键是在TransactionMQProducer中指定了一个TransactionListener事务监听器,这个事务监听器就是事务消息的关键控制器。源码中的案例有点复杂,我这里准备了一个更清晰明了的事务监听器示例

public class TransactionListenerImpl implements TransactionListener {//在提交完事务消息后执行。//返回COMMIT_MESSAGE状态的消息会立即被消费者消费到。//返回ROLLBACK_MESSAGE状态的消息会被丢弃。//返回UNKNOWN状态的消息会由Broker过一段时间再来回查事务的状态。@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {String tags = msg.getTags();//TagA的消息会立即被消费者消费到if(StringUtils.contains(tags,"TagA")){return LocalTransactionState.COMMIT_MESSAGE;//TagB的消息会被丢弃}else if(StringUtils.contains(tags,"TagB")){return LocalTransactionState.ROLLBACK_MESSAGE;//其他消息会等待Broker进行事务状态回查。}else{return LocalTransactionState.UNKNOW;}}//在对UNKNOWN状态的消息进行状态回查时执行。返回的结果是一样的。@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {String tags = msg.getTags();//TagC的消息过一段时间会被消费者消费到if(StringUtils.contains(tags,"TagC")){return LocalTransactionState.COMMIT_MESSAGE;//TagD的消息也会在状态回查时被丢弃掉}else if(StringUtils.contains(tags,"TagD")){return LocalTransactionState.ROLLBACK_MESSAGE;//剩下TagE的消息会在多次状态回查后最终丢弃}else{return LocalTransactionState.UNKNOW;}}
}

​ 然后,我们要了解下事务消息的使用限制:

​ 1. 事务消息不支持延迟消息和批量消息。

​ 2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。

回查次数是由BrokerConfig.transactionCheckMax这个参数来配置的,默认15次,可以在broker.conf中覆盖。
然后实际的检查次数会在message中保存一个用户属性MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES。这个属性值大于transactionCheckMax,就会丢弃。 这个用户属性值会按回查次数递增,也可以在Producer中自行覆盖这个属性。

​ 3. 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。

由BrokerConfig.transactionTimeOut这个参数来配置。默认6秒,可以在broker.conf中进行修改。
另外,也可以给消息配置一个MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS属性来给消息指定一个特定的消息回查时间。
msg.putUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, “10000”); 这样就是10秒。

​ 4. 事务性消息可能不止一次被检查或消费。

​ 5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。

​ 6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。

​ 接下来,我们还要了解下事务消息的实现机制,参见下图:
在这里插入图片描述

事务消息机制的关键是在发送消息时,会将消息转为一个half半消息,并存入RocketMQ内部的一个 RMQ_SYS_TRANS_HALF_TOPIC 这个Topic,这样对消费者是不可见的。再经过一系列事务检查通过后,再将消息转存到目标Topic,这样对消费者就可见了。

​ 最后,我们还需要思考下事务消息的作用。

​ 大家想一下这个事务消息跟分布式事务有什么关系?为什么扯到了分布式事务相关的两阶段提交上了?事务消息只保证了发送者本地事务和发送消息这两个操作的原子性,但是并不保证消费者本地事务的原子性,所以,事务消息只保证了分布式事务的一半。但是即使这样,对于复杂的分布式事务,RocketMQ提供的事务消息也是目前业内最佳的降级方案。

3.8 ACL权限控制

​ 权限控制(ACL)主要为RocketMQ提供Topic资源级别的用户访问控制。用户在使用RocketMQ权限控制时,可以在Client客户端通过 RPCHook注入AccessKey和SecretKey签名;同时,将对应的权限控制属性(包括Topic访问权限、IP白名单和AccessKey和SecretKey签名等)设置在$ROCKETMQ_HOME/conf/plain_acl.yml的配置文件中。Broker端对AccessKey所拥有的权限进行校验,校验不过,抛出异常; ACL客户端可以参考:org.apache.rocketmq.example.simple包下面的AclClient代码。

注意,如果要在自己的客户端中使用RocketMQ的ACL功能,还需要引入一个单独的依赖包

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>4.7.1</version>
</dependency>

而Broker端具体的配置信息可以参见源码包下docs/cn/acl/user_guide.md。主要是在broker.conf中打开acl的标志:aclEnable=true。然后就可以用plain_acl.yml来进行权限配置了。并且这个配置文件是热加载的,也就是说要修改配置时,只要修改配置文件就可以了,不用重启Broker服务。我们来简单分析下源码中的plan_acl.yml的配置:

#全局白名单,不受ACL控制
#通常需要将主从架构中的所有节点加进来
globalWhiteRemoteAddresses:
- 10.10.103.*
- 192.168.0.*accounts:
#第一个账户
- accessKey: RocketMQsecretKey: 12345678whiteRemoteAddress:admin: false defaultTopicPerm: DENY #默认Topic访问策略是拒绝defaultGroupPerm: SUB #默认Group访问策略是只允许订阅topicPerms:- topicA=DENY #topicA拒绝- topicB=PUB|SUB #topicB允许发布和订阅消息- topicC=SUB #topicC只允许订阅groupPerms:# the group should convert to retry topic- groupA=DENY- groupB=PUB|SUB- groupC=SUB
#第二个账户,只要是来自192.168.1.*的IP,就可以访问所有资源
- accessKey: rocketmq2secretKey: 12345678whiteRemoteAddress: 192.168.1.*# if it is admin, it could access all resourcesadmin: true

二、SpringBoot整合RocketMQ

1、快速实战

这部分我们看下SpringBoot如何快速集成RocketMQ。

在使用SpringBoot的starter集成包时,要特别注意版本。因为SpringBoot集成RocketMQ的starter依赖是由Spring社区提供的,目前正在快速迭代的过程当中,不同版本之间的差距非常大,甚至基础的底层对象都会经常有改动。例如如果使用rocketmq-spring-boot-starter:2.0.4版本开发的代码,升级到目前最新的rocketmq-spring-boot-starter:2.1.1后,基本就用不了了。

我们创建一个maven工程,引入关键依赖:

<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.1</version><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-core</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-webmvc</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.1.6.RELEASE</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.9.2</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency></dependencies>

rocketmq-spring-boot-starter:2.1.1引入的SpringBoot包版本是2.0.5.RELEASE,这里把SpringBoot的依赖包升级了一下。

然后我们以SpringBoot的方式,快速创建一个简单的Demo

启动类:

@SpringBootApplication
public class RocketMQScApplication {public static void main(String[] args) {SpringApplication.run(RocketMQScApplication.class,args);}
}

配置文件 application.properties

#NameServer地址
rocketmq.name-server=192.168.232.128:9876
#默认的消息生产者组
rocketmq.producer.group=springBootGroup

消息生产者

package com.roy.rocket.basic;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;/*** @author :楼兰* @date :Created in 2020/10/22* @description:**/
@Component
public class SpringProducer {@Resourceprivate RocketMQTemplate rocketMQTemplate;//发送普通消息的示例public void sendMessage(String topic,String msg){this.rocketMQTemplate.convertAndSend(topic,msg);}//发送事务消息的示例public void sendMessageInTransaction(String topic,String msg) throws InterruptedException {String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 10; i++) {Message<String> message = MessageBuilder.withPayload(msg).build();String destination =topic+":"+tags[i % tags.length];SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, message,destination);System.out.printf("%s%n", sendResult);Thread.sleep(10);}}
}

消息消费者

package com.roy.rocket.basic;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** @author :楼兰* @date :Created in 2020/10/22* @description:**/
@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic")
public class SpringConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received message : "+ message);}
}

SpringBoot集成RocketMQ,消费者部分的核心就在这个@RocketMQMessageListener注解上。所有消费者的核心功能也都会集成到这个注解中。所以我们还要注意下这个注解里面的属性:

例如:消息过滤可以由里面的selectorType属性和selectorExpression来定制

消息有序消费还是并发消费则由consumeMode属性定制。

消费者是集群部署还是广播部署由messageModel属性定制。

然后关于事务消息,还需要配置一个事务消息监听器:

package com.roy.rocket.config;import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.StringMessageConverter;import java.util.concurrent.ConcurrentHashMap;/*** @author :楼兰* @date :Created in 2020/11/5* @description:**/@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class MyTransactionImpl implements RocketMQLocalTransactionListener {private ConcurrentHashMap<Object, String> localTrans = new ConcurrentHashMap<>();@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {Object id = msg.getHeaders().get("id");String destination = arg.toString();localTrans.put(id,destination);org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(),"UTF-8",destination, msg);String tags = message.getTags();if(StringUtils.contains(tags,"TagA")){return RocketMQLocalTransactionState.COMMIT;}else if(StringUtils.contains(tags,"TagB")){return RocketMQLocalTransactionState.ROLLBACK;}else{return RocketMQLocalTransactionState.UNKNOWN;}}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {//SpringBoot的消息对象中,并没有transactionId这个属性。跟原生API不一样。
//        String destination = localTrans.get(msg.getTransactionId());return RocketMQLocalTransactionState.COMMIT;}
}

这样我们启动应用后,就能够通过访问 http://localhost:8080/MQTest/sendMessage?message=123 接口来发送一条简单消息。并在SpringConsumer中消费到。

也可以通过访问http://localhost:8080/MQTest/sendTransactionMessage?message=123 ,来发送一条事务消息。

这里可以看到,对事务消息,SpringBoot进行封装时,就缺少了transactionId,这在事务控制中是非常关键的。

2、其他更多消息类型

对于其他的消息类型,文档中就不一一记录了。具体可以参见源码中的junit测试案例。

3、总结

  • SpringBoot 引入org.apache.rocketmq:rocketmq-spring-boot-starter依赖后,就可以通过内置的RocketMQTemplate来与RocketMQ交互。相关属性都以rockemq.开头。具体所有的配置信息可以参见org.apache.rocketmq.spring.autoconfigure.RocketMQProperties这个类。
  • SpringBoot依赖中的Message对象和RocketMQ-client中的Message对象是两个不同的对象,这在使用的时候要非常容易弄错。例如RocketMQ-client中的Message里的TAG属性,在SpringBoot依赖中的Message中就没有。Tag属性被移到了发送目标中,与Topic一起,以Topic:Tag的方式指定。
  • 最后强调一次,一定要注意版本。rocketmq-spring-boot-starter的更新进度一般都会略慢于RocketMQ的版本更新,并且版本不同会引发很多奇怪的问题。apache有一个官方的rocketmq-spring示例,地址:https://github.com/apache/rocketmq-spring.git 以后如果版本更新了,可以参考下这个示例代码。

三、SpringCloudStream整合RocketMQ

​ SpringCloudStream是Spring社区提供的一个统一的消息驱动框架,目的是想要以一个统一的编程模型来对接所有的MQ消息中间件产品。我们还是来看看SpringCloudStream如何来集成RocketMQ。

1、快速实战

​ 创建Maven工程,引入依赖:

<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.7.1</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>4.7.1</version></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId><version>2.2.3.RELEASE</version><exclusions><exclusion><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId></exclusion><exclusion><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.3.3.RELEASE</version></dependency></dependencies>

应用启动类:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;/*** @author :楼兰* @date :Created in 2020/10/22* @description:**/
@EnableBinding({Source.class, Sink.class})
@SpringBootApplication
public class ScRocketMQApplication {public static void main(String[] args) {SpringApplication.run(ScRocketMQApplication.class,args);}
}

注意这个@EnableBinding({Source.class, Sink.class})注解,这是SpringCloudStream引入的Binder配置。

然后增加配置文件application.properties

#ScStream通用的配置以spring.cloud.stream开头
spring.cloud.stream.bindings.input.destination=TestTopic
spring.cloud.stream.bindings.input.group=scGroup
spring.cloud.stream.bindings.output.destination=TestTopic
#rocketMQ的个性化配置以spring.cloud.stream.rocketmq开头
#spring.cloud.stream.rocketmq.binder.name-server=192.168.232.128:9876;192.168.232.129:9876;192.168.232.130:9876
spring.cloud.stream.rocketmq.binder.name-server=192.168.232.128:9876

SpringCloudStream中,一个binding对应一个消息通道。这其中配置的input,是在Sink.class中定义的,对应一个消息消费者。而output,是在Source.class中定义的,对应一个消息生产者。

然后就可以增加消息消费者:

package com.roy.scrocket.basic;import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;/*** @author :楼兰* @date :Created in 2020/10/22* @description:**/
@Component
public class ScConsumer {@StreamListener(Sink.INPUT)public void onMessage(String messsage){System.out.println("received message:"+messsage+" from binding:"+ Sink.INPUT);}
}

消息生产者:

package com.roy.scrocket.basic;import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;/*** @author :楼兰* @date :Created in 2020/10/22* @description:**/
@Component
public class ScProducer {@Resourceprivate Source source;public void sendMessage(String msg){Map<String, Object> headers = new HashMap<>();headers.put(MessageConst.PROPERTY_TAGS, "testTag");MessageHeaders messageHeaders = new MessageHeaders(headers);Message<String> message = MessageBuilder.createMessage(msg, messageHeaders);this.source.output().send(message);}
}

最后增加一个Controller类用于测试:

package com.roy.scrocket.controller;import com.roy.scrocket.basic.ScProducer;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;/*** @author :楼兰* @date :Created in 2020/10/27* @description:**/
@RestController
@RequestMapping("/MQTest")
public class MQTestController {@Resourceprivate ScProducer producer;@RequestMapping("/sendMessage")public String sendMessage(String message){producer.sendMessage(message);return "消息发送完成";}
}

启动应用后,就可以访问http://localhost:8080/MQTest/sendMessage?message=123,给RocketMQ发送一条消息到TestTopic,并在ScConsumer中消费到了。

2、总结

  • 关于SpringCloudStream。这是一套几乎通用的消息中间件编程框架,例如从对接RocketMQ换到对接Kafka,业务代码几乎不需要动,只需要更换pom依赖并且修改配置文件就行了。但是,由于各个MQ产品都有自己的业务模型,差距非常大,所以使用使用SpringCloudStream时要注意业务模型转换。并且在实际使用中,要非常注意各个MQ的个性化配置属性。例如RocketMQ的个性化属性都是以spring.cloud.stream.rocketmq开头,只有通过这些属性才能用上RocketMQ的延迟消息、排序消息、事务消息等个性化功能。
  • SpringCloudStream是Spring社区提供的一套统一框架,但是官方目前只封装了kafka、kafka Stream、RabbitMQ的具体依赖。而RocketMQ的依赖是交由厂商自己维护的,也就是由阿里巴巴自己来维护。这个维护力度显然是有不小差距的。所以一方面可以看到之前在使用SpringBoot时着重强调的版本问题,在使用SpringCloudStream中被放大了很多。spring-cloud-starter-stream-rocketmq目前最新的2.2.3.RELEASE版本中包含的rocketmq-client版本还是4.4.0。这个差距就非常大了。另一方面,RocketMQ这帮大神不屑于写文档的问题也特别严重,SpringCloudStream中关于RocketMQ的个性化配置几乎很难找到完整的文档。
  • 总之,对于RocketMQ来说,SpringCloudStream目前来说还并不是一个非常好的集成方案。这方面跟kafka和Rabbit还没法比。所以使用时要慎重。

【送人玫瑰,手留余香,感谢你的点赞


下一篇:03-RocketMQ高级原理

相关文章:

02-RocketMQ开发模型

目录汇总&#xff1a;RocketMQ从入门到精通汇总 上一篇&#xff1a;01-RocketMQ整体理解与快速实战 上一部分&#xff0c;我们可以搭建RocketMQ集群&#xff0c;然后也可以用命令行往RocketMQ写入消息并进行消费了。这一部分我们就来看怎么在项目中用上RocketMQ。 一、RocketMQ…...

第83步 时间序列建模实战:Catboost回归建模

基于WIN10的64位系统演示 一、写在前面 这一期&#xff0c;我们介绍Catboost回归。 同样&#xff0c;这里使用这个数据&#xff1a; 《PLoS One》2015年一篇题目为《Comparison of Two Hybrid Models for Forecasting the Incidence of Hemorrhagic Fever with Renal Syndr…...

开源任务调度框架

本文主要介绍一下任务调度框架Flowjob的整体结构&#xff0c;以及整体的心路历程。 功能介绍 flowjob主要用于搭建统一的任务调度平台&#xff0c;方便各个业务方进行接入使用。 项目在设计的时候&#xff0c;考虑了扩展性、稳定性、伸缩性等相关问题&#xff0c;可以作为公司…...

Android Native 开发 要点记录

Android Studio 中写 C 代码 android studio创建C项目_android studio native c-CSDN博客 项目配置参考 【CMake】CMakeLists.txt的超傻瓜手把手教程&#xff08;附实例源码&#xff09;_【cmake】cmakelists.txt的超傻瓜手把手教程(附实例源码)-CSDN博客 CMakeLists.txt 讲解…...

数据库中查询所有表信息,查询所有字段信息

MYSQL中 所有表信息 information_schema.tables表 SELECT * FROM information_schema.tables -- TABLE_NAME 表名 -- TABLE_COMMENT 表中文名所有字段信息 information_schema.COLUMNS表 SELECT * FROM information_schema.tables -- TABLE_SCHEMA 数据库名 -- COLUMN…...

改进智能优化算法常用指标一键导出为EXCEL,最优值,平均值,标准差,最差值,中位数,秩和检验,箱线图...

声明&#xff1a;对于作者的原创代码&#xff0c;禁止转售倒卖&#xff0c;违者必究&#xff01; 为了突出改进智能优化算法的效果&#xff0c;常常会将改进的智能算法与其他算法进行对比。 在一些期刊论文中&#xff0c;经常会看到一个超级大的表格&#xff0c;统计着每个算法…...

在asp.net中,实现类似安卓界面toast的方法(附更多弹窗样式)

目录 一、背景 二、操作方法 2.1修改前 2.2修改后 三、总结 附&#xff1a;参考文章&#xff1a; 一、背景 最近在以前的asp.net网页中&#xff0c;每次点击确定都弹窗&#xff0c;然后还要弹窗点击确认&#xff0c;太麻烦了&#xff0c;这次想升级一下&#xff0c;实现…...

一站式解决方案:Qt 跨平台开发灵活可靠

一站式解决方案&#xff1a;Qt 跨平台开发灵活可靠 Qt 是一种跨平台开发工具&#xff0c;为开发者提供了一站式解决方案。无论您的项目目标是 Windows、Linux、macOS、嵌入式系统还是移动平台&#xff0c;Qt 都能胜任。这种跨平台的特性不仅节省开支&#xff0c;还推动了战略的…...

将cpu版本的pytorch换成gpu版本

1.首先激活虚拟环境 winRcmd 打开dos命令窗口 查看虚拟环境列表 conda env list 激活虚拟环境 2.将原来的pytorch_cpu版本换成gpu版本 注意&#xff1a;安装gpu版本的pytorch时并不需要先卸载原来的cpu版本pytorch,安装时会自己替换的 打开pytorch官网查看以前版本 Previo…...

Ubuntu安装QQ

原文网址&#xff1a;2023在Ubuntu安装最新版QQ Linux v3.1.0 - 哔哩哔哩 作者&#xff1a;sprlightning https://www.bilibili.com/read/cv22100663/ 出处&#xff1a;bilibili 2022年末QQ推出了QQ Linux v3.0系列&#xff0c;目前最新版是今年2月24日推出的v3.1.0版本。注意…...

【Python】实现excel文档中指定工作表数据的更新操作

在做数值计算时&#xff0c;个人比较习惯利用excel文档的公式做数值计算进行对比&#xff0c;检查异常&#xff0c;虽然计算量大后&#xff0c;excel计算会比较缓慢&#xff0c;但设计简单&#xff0c;易排错 但一般测试过程中使用到的数据都不是最终数值&#xff0c;会不停根据…...

力扣(LeetCode)2731. 移动机器人(C++)

脑经急转弯排序 碰撞只改变运动方向&#xff0c;速度始终如"1"&#xff0c;且机器人视为无差别的&#xff0c;所以碰撞等于擦肩而过&#xff01;"机器人碰撞&#xff0c;到底撞没撞&#xff0c;如撞。"因此只考虑每个机器人单方向移动&#xff0c;d秒后停…...

vite和webpack

vite和webpack 文章目录 vite和webpackvite介绍什么是vite为什么使用vitevite优缺点热更新的实现原理 webpack介绍什么是webpackwebpack 优缺点 Vite 为什么比 Webpack 快vite和webpack的区别面试问题Vite为什么比webpack快&#xff1f; vite介绍 什么是vite Vite 是新型前端…...

MinIO图片正常上传不可查看,MinIO通过页面无法设置桶为public

项目场景&#xff1a;国产中标麒麟操作系统部署MinIO正常启动后发现图片能正常上传&#xff0c;但是匿名浏览该图片的时候无法查看。通过网络查询解决方案&#xff0c;得出的结论是&#xff1a;需要把当前上传文件的桶设置为public,由于创建桶默认是private且不可通过浏览器进行…...

Linux 指令心法(七)`cat` 查看、合并和创建文本文件

文章目录 命令的概述和用途命令的用法命令行选项和参数的详细说明命令的示例命令的注意事项或提示 命令的概述和用途 cat 是 “concatenate” 的缩写&#xff0c;它是一个 Linux 和 Unix 系统中的命令&#xff0c;用于查看、合并和创建文本文件。cat 主要用于以下几个方面&…...

解决docker开启MySQL的binlog无法成功。docker内部报错:mysql: [ERROR] unknown variable

1. 报错信息 2. 操作流程 整个流程是这样的&#xff1a; 我愉快的输入docker ps&#xff0c;查看MySQL的docker 容器id 执行指令docker exec -it 8a \bin\bash进入容器内部执行vim /etc/my.cnf&#xff0c;打开配置文件按照网上说的&#xff0c;添加如下配置信息退出docker容…...

c,python ,java,c++ c#在控制台打印彩色文本

在C语言、Java和C#中&#xff0c;你可以通过使用特定的控制字符或库来设置文本颜色。下面分别演示如何在这三种编程语言中实现文本颜色的设置&#xff1a; 在C语言中实现文本颜色设置&#xff1a; C语言中的颜色设置通常依赖于特定的终端或操作系统。以下是一种使用C语言的方…...

MySQL数据库技术笔记(5)

聚合函数&#xff1a; count(): 统计某种数据的数量 sum(): 统计某种数据的总和 max(): 某种数据的最大值 min(): 某种数据的最小值 avg(): 某种数据的平均值 排序的用法 : 关键字 order by 升序 : ASC &#xff08;从小到大排序&#xff09; 默认为升序 降序 : DESC…...

python生成随机数

在Python中生成随机数可以使用内置的random模块。以下是一些生成随机数的示例&#xff1a; 生成一个0到1之间的随机浮点数&#xff1a; import random random_float random.random() print(random_float) 生成一个指定范围内的随机整数&#xff1a; import random random_int…...

Twitter优化秘籍:置顶、列表、受众增长

在 Twitter 上&#xff0c;将你的一条推送文置顶到个人数据顶部是提高可见性和吸引关注者的绝佳方式。无论你是个人用户还是企业&#xff0c;此功能都可以让你的重要信息常驻在众人眼前&#xff0c;即使你发布了新的推文。接下来&#xff0c;我们将分享一些优化建议&#xff0c…...

vscode更改为中文版本

方式一 在扩展里安装chinese插件 方式二 1.Ctrl&#xff0b; Shift &#xff0b;P&#xff08;commandshiftP&#xff09; 2.输入Configure display Language 3.选择zh-cn 这时候vscode会提示需要重启&#xff0c;点击restart重启vscode&#xff0c;重启后vscode就会显示中…...

【Linux系统KVM虚拟机实战】LVM逻辑卷之磁盘扩容

【Linux系统KVM虚拟机实战】LVM逻辑卷之磁盘扩容 一、LVM与KVM介绍1.1 LVM介绍1.2 KVM介绍1.2.1 KVM简介1.2.2 KVM优点二、本次实践介绍2.1 本次实践简介2.2 环境规划三、虚拟机环境检查3.1 检查KVM虚拟机磁盘空间3.2 KVM虚拟机检查系统情况3.3 检查物理磁盘分区3.4 查看PV状态…...

史上最全 结构型模式之 桥接 外观 组合 享元模式

史上最全 结构型模式之 代理 适配器 装饰者 模式-CSDN博客 5.4 桥接模式 5.4.1 概述 现在有一个需求&#xff0c;需要创建不同的图形&#xff0c;并且每个图形都有可能会有不同的颜色。我们可以利用继承的方式来设计类的关系&#xff1a; 我们可以发现有很多的类&#xff0c;假…...

KBU810-ASEMI高性能整流桥KBU810

编辑&#xff1a;ll KBU810-ASEMI高性能整流桥KBU810 型号&#xff1a;KBU810 品牌&#xff1a;ASEMI 封装&#xff1a;KBU-4 恢复时间&#xff1a;&#xff1e;50ns 正向电流&#xff1a;8A 反向耐压&#xff1a;1000V 芯片个数&#xff1a;4 引脚数量&#xff1a;4 …...

uniapp快速入门系列(2)- Vue基础知识

章节二&#xff1a;Vue基础知识 Vue的介绍和特性Vue的简单易用Vue的灵活高效 Vue的常用指令和组件v-bind指令v-on指令Vue组件的通信方式父子组件通信兄弟组件通信 总结 Vue的介绍和特性 Vue是一款轻量级的JavaScript框架&#xff0c;用于构建用户界面。它的特点是简单易用、灵…...

mac(M1)安装anaconda3

首先下载 然后正常安装即可&#xff0c;之所以我现在测试了anaconda,因为我发现miniconda后&#xff0c;jupyter notebook的安装就出现问题&#xff0c;所以就直接卸载miniconda&#xff0c;而直接安装anaconda了 (base) yxkbogon ~ % pip list Package …...

vscode远程ssh服务器且更改服务器别名

目录 1、打开VS Code并确保已安装"Remote - SSH"扩展。如果尚未安装&#xff0c;请在扩展市场中搜索并安装它。 2、单击左下角的"Remote Explorer"图标&#xff0c;打开远程资源管理器。 3、在远程资源管理器中&#xff0c;单击右上角的齿轮图标&#x…...

【算法笔记】LCR 086. 分割回文串

基本思想是使用回溯法&#xff0c;回溯法都可以将问题划分为一个解空间树&#xff1a;假设字符串s为"aab"&#xff0c;那么我们可以使用深度优先搜索去构建解空间树&#xff1a; dfs遍历出来的第一个序列是[a, a, b]&#xff0c;显然该序列都是回文子串&#xff0c;…...

centos 安装svn

卸载 yum remove subversion安装 yum -y install subversion仓库目录 mkdir -p /home/svn/project版本目录 svnadmin create /home/svn/project主目录切换 cd /home/svn/project/conf服务配置 vim svnserve.confanon-access read auth-access write …...

Java中的类加载器双亲委派模型机制

Java中的类加载器双亲委派模型机制 Java中的类加载器双亲委派模型是一种类加载机制&#xff0c;用于加载Java类文件。它有助于维护类加载器的层次结构&#xff0c;并确保类的唯一性。以下是关于类加载器双亲委派模型的详细解释、作用、优缺点&#xff0c;以及示例说明。 双亲…...