当前位置: 首页 > news >正文

Rocketmq技术详解

Rocketmq技术详解

运维部署 docker-compose.yml

version: '3.5'
services:rmqnamesrv:image: foxiswho/rocketmq:servercontainer_name: rmqnamesrvports:- 9876:9876volumes:- ./logs:/opt/logs- ./store:/opt/storenetworks:rmq:aliases:- rmqnamesrvrmqbroker:image: foxiswho/rocketmq:brokercontainer_name: rmqbrokerports:- 10909:10909- 10911:10911volumes:- ./logs:/opt/logs- ./store:/opt/store- ./conf/broker.conf:/etc/rocketmq/broker.confenvironment:NAMESRV_ADDR: "rmqnamesrv:9876"JAVA_OPTS: " -Duser.home=/opt"JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"command: mqbroker -c /etc/rocketmq/broker.confdepends_on:- rmqnamesrvnetworks:rmq:aliases:- rmqbrokerrmqconsole:image: styletang/rocketmq-console-ngcontainer_name: rmqconsoleports:- 8080:8080environment:JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"depends_on:- rmqnamesrvnetworks:rmq:aliases:- rmqconsolenetworks:rmq:name: rmqdriver: bridge

然后在与docker-compose.yml同级下面相应的建立三个文件夹conflogsstore。然后在conf文件夹下面建立broker.conf配置文件,所有文件的目录位置如下所示。

docker-compose.yml
conf- broker.conf
logs
store

然后在编写broker.conf配置文件里面的内容

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.# 所属集群名字
brokerClusterName=DefaultCluster# broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a,
# 在 broker-b.properties 使用: broker-b
brokerName=broker-a# 0 表示 Master,> 0 表示 Slave
brokerId=0# nameServer地址,分号分割
# namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876# 启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
# 解决方式1 加上一句 producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP
brokerIP1=192.168.1.16# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4# 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,false
autoCreateTopicEnable=true# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true# Broker 对外服务的监听端口
listenPort=10911# 删除文件时间点,默认凌晨4点
deleteWhen=04# 文件保留时间,默认48小时
fileReservedTime=120# commitLog 每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824# ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000# destroyMapedFileIntervalForcibly=120000
# redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
# storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
# commitLog 存储路径
# storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
# 消费队列存储
# storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
# 消息索引存储路径
# storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
# checkpoint 文件存储路径
# storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
# abort 文件存储路径
# abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
# 限制的消息大小
maxMessageSize=65536# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=ASYNC_MASTER# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH# 发消息线程池数量
# sendMessageThreadPoolNums=128
# 拉消息线程池数量
# pullMessageThreadPoolNums=128

配置文件中的内容我们只需要改动一点即可,即brokerIP1 这个属性,我们将其更改为我们本机的ip,可以利用ipconfig进行查看。

修改完以后我们直接在docker-compose.yml文件所在的位置输入命令docker-compose up即可启动。启动成功以后在浏览器中输入http://localhost:8080/即可看到管理页面,就表示我们搭建成功了。

在这里插入图片描述](https://img-blog.csdnimg.cn/ec5fd2f43fd7417e888a459e78191e67.png)

架构设计


1 技术架构(参考来源于官网)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-T2Lk6A0V-1678195927847)(null)]

RocketMQ架构上主要分为四部分,如上图所示:

  • Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
  • Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
  • NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Consumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer和Consumer仍然可以动态感知Broker的路由的信息。
  • BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。
    1. Remoting Module:整个Broker的实体,负责处理来自Client端的请求。
    2. Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息。
    3. Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
    4. HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
    5. Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。

img

2 部署架构

img

RocketMQ 网络部署特点

  • NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
  • Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。 注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。
  • Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
  • Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。

结合部署架构图,描述集群工作流程:

  • 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
  • Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
  • 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
  • Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
  • Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

3.内容总结:

结合部署架构图,描述集群工作流程:
1、启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。

2、Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。

3、Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。

4、Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

总结:
Rocketmq发送消息的时候,先启动NameServer,NameServer成功启动会先去和Broken连接,这时候NameServer和Broken就有心跳。当生产者Producer发送消息的时候,先跟NameServer连接,判断发送的Topic在哪些Broken上,然后按轮询选择Broken中的一个队列。然后Producer会和Broken直接建立连接,以后所有发送的消息(同个Topic)都是直接和Broken连接,消费者也是这样流程。

补充:
集群消费:当使用集群消费模式时,消息队列RocketMQ版认为任意一条消息只需要被集群内的任意一个消费者处理即可。
广播消费:当使用广播消费模式时,消息队列RocketMQ版会将每条消息推送给集群内所有注册过的消费者,保证消息至少被每个消费者消费一次。

ps:表示改消费组有一条未消费

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-X2gChwW9-1678195925250)(Rocketmq%E6%8A%80%E6%9C%AF%E8%AF%A6%E8%A7%A3.assets/image-20220730162405864.png)]

数量2 指的是 consumer_topic-queue-three 有两个消费组都是叫这个名字

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FJR3Pt9Q-1678195925250)(Rocketmq%E6%8A%80%E6%9C%AF%E8%AF%A6%E8%A7%A3.assets/image-20220730170724386.png)]

4.运行演示

4.1.1同步发送

这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

说几个概念

4.1.1.1 生产者组(Producer Group)

同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。

4.1.1.2 消费者组(Consumer Group)

同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。

4.1.1.3 集群消费(Clustering)(默认是集群消费)

集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。

4.1.1.4 广播消费(Broadcasting)

广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。

例如代码:生成者发送消息

//同步发送
public void sync() {Message<String> message = new Message<>();message.setId(UUID.randomUUID().toString());message.setContent("Hello, springboot-ac-rocketmq !");rocketMQTemplate.convertAndSend("topic-queue-one", message);rocketMQTemplate.convertAndSend("topic-queue-two", "Hello, springboot-ac-rocketmq !");
}

消费者消费消息

@Slf4j
@Component
public class RocketmqConsumer {@Component@RocketMQMessageListener(topic = "topic-queue-one", consumerGroup = "consumer_topic-queue-one")public class ConsumerOne implements RocketMQListener<Message> {@Overridepublic void onMessage(Message message) {log.info("consumer-one received message: {}", message);}}@Component@RocketMQMessageListener(topic = "topic-queue-two", consumerGroup = "consumer_topic-queue-two")public class ConsumerTwo implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("哈哈哈哈我进来消费  topic-queue-two 消息啦");log.info("consumer-two received message: {}", message);}}
}

运行后打断点发送,队列是先进后去,所以topic-queue-two先消费

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iIjUIwKK-1678195925251)(Rocketmq%E6%8A%80%E6%9C%AF%E8%AF%A6%E8%A7%A3.assets/image-20220730141236228.png)]

消费完再消费这个topic-queue-one

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fSKE77jk-1678195925251)(Rocketmq%E6%8A%80%E6%9C%AF%E8%AF%A6%E8%A7%A3.assets/image-20220730141257637.png)]

4.1.2异步发送(比较重要)

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

看代码 生成者发送消息

public void async() {Message<String> message = new Message<>();message.setId(UUID.randomUUID().toString());message.setContent("Hello,I am asyncSend !");rocketMQTemplate.asyncSend("async-one", message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("send successful");}@Overridepublic void onException(Throwable throwable) {log.info("send fail; {}", throwable.getMessage());}});
}

消费者代码

@Component
@RocketMQMessageListener(topic = "async-one", consumerGroup = "consumer_topic-queue-three")
public class ConsumerThreee implements RocketMQListener<Message> {@Overridepublic void onMessage(Message message) {System.out.println("哈哈哈哈我进来消费  async-one 消息啦");log.info("consumer-two received message: {}", message);}
}

运行后打断点发现可以正常消费

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NOyxeHFE-1678195925252)(Rocketmq%E6%8A%80%E6%9C%AF%E8%AF%A6%E8%A7%A3.assets/image-20220730144409629.png)]

从运行的结果看,15:37分是发送成功的,可是消费是15:39分

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FOVWziW7-1678195925252)(Rocketmq%E6%8A%80%E6%9C%AF%E8%AF%A6%E8%A7%A3.assets/image-20220730153959095.png)]

4.1.3 单向发送消息

这种方式主要用在不特别关心发送结果的场景,例如日志发送。

rocketMQTemplate.sendOneWay("topic-oneWay", "send one-way message");

一调接口里面就返回成功,可是消费等了一会菜消费

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8Fuco6JD-1678195925253)(Rocketmq%E6%8A%80%E6%9C%AF%E8%AF%A6%E8%A7%A3.assets/image-20220730160923982.png)]

5.消费消息的顺序性(比较重要)

5.1简介:

  1. 消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
  2. 顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

技术原理:

就是用hashKey作为每个队列的唯一标志,在电商中,一般是引订单id作为hashKey

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Fq3lhg0Q-1678195925253)(Rocketmq%E6%8A%80%E6%9C%AF%E8%AF%A6%E8%A7%A3.assets/image-20220730172305154.png)]

5.2 demo演示

模拟2个队列,id1和id2进行操作

id1的消息有10,30

id2的消息有 20,40

演示代码如下:

    private final String id1 = "10086";private final String id2 = "10087";/**** hashKey为订单id*/
public void testSendSyncOrderly1() {Message<String> stringMessage = new Message<>();stringMessage.setId(id1);String message = "10";stringMessage.setContent(message);// 模拟有序消费rocketMQTemplate.syncSendOrderly("topic-orderly", stringMessage, id1);
}/**** hashKey为订单id*/
public void testSendSyncOrderly2() {Message<String> stringMessage = new Message<>();stringMessage.setId(id2);String message = "20";stringMessage.setContent(message);// 模拟有序消费rocketMQTemplate.syncSendOrderly("topic-orderly", stringMessage, id2);
}/**** hashKey为订单id*/
public void testSendSyncOrderly3() {Message<String> stringMessage = new Message<>();stringMessage.setId(id1);String message = "30";stringMessage.setContent(message);// 模拟有序消费rocketMQTemplate.syncSendOrderly("topic-orderly", stringMessage, id1);
}/**** hashKey为订单id*/
public void testSendSyncOrderly4() {Message<String> stringMessage = new Message<>();stringMessage.setId(id2);String message = "40";stringMessage.setContent(message);// 模拟有序消费rocketMQTemplate.syncSendOrderly("topic-orderly", stringMessage, id2);
}

消费端代码

@Component
@Slf4j
@RocketMQMessageListener(topic = "topic-orderly",consumerGroup = "orderly-consumer-group", consumeMode = ConsumeMode.ORDERLY
)public class OrderConsumer implements RocketMQListener<Message> {int sumId1 = 0;int sumId2 = 0;@Overridepublic void onMessage(Message message) {if(message.getId().equals("10086")){sumId1 = sumId1+Integer.parseInt((String)message.getContent());}else{sumId2 = sumId2+Integer.parseInt((String)message.getContent());}System.out.println("开始消费");log.info("========{}=======", sumId1);log.info("========{}=======", sumId2);System.out.println("消费结束");}}

发现最后消费是

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1WoZe9r9-1678195925253)(Rocketmq%E6%8A%80%E6%9C%AF%E8%AF%A6%E8%A7%A3.assets/image-20220730172621544.png)]

证明是分区有序性。

6.延时消息样例

应用场景:
比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

CONSUME_FROM_LAST_OFFSET,  //第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费
CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,
CONSUME_FROM_MIN_OFFSET, 
CONSUME_FROM_MAX_OFFSET,
CONSUME_FROM_FIRST_OFFSET, //第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费
CONSUME_FROM_TIMESTAMP; //第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费  (一般选这个)

消费端要实现这个类RocketMQPushConsumerLifecycleListener,代码如下:

/**** 延时消费*/
@Component
@Slf4j
public class OffsetConsumerByHjt {@Component@RocketMQMessageListener(topic = "topic-offset-by-hjt", consumerGroup = "topic-offset-by-hjt-consumer")public class OfferConsumerBy implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {@Overridepublic void onMessage(Message message) {System.out.println("哈哈哈哈我进来消费");String result = result(message.getBody());System.out.println("输出 result "+result);log.info("topic-offset-by-hjt: {}", new String(message.getBody()));}@Overridepublic void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {//第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费defaultMQPushConsumer.setConsumeFromWhere(CONSUME_FROM_LAST_OFFSET);}}public static String result(byte[] decrypt) {try {String result = new String(decrypt, "UTF-8");return result;} catch (UnsupportedEncodingException var2) {var2.printStackTrace();return null;}}
}

生成者代码,还是按顺序消费测试

   /**** hjt写的延时消费demo*/public void sendByHjt() throws Exception {Message message = new Message();//生产者DefaultMQProducer producer = new DefaultMQProducer("topic-offset-by-hjt-product");producer.setNamesrvAddr("192.168.1.219:9876");producer.start();for(int i = 0;i<5;i++){message.setTopic("topic-offset-by-hjt");message.setBody(("我是延迟消费啊啊" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));//private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";//4对于的就是延迟30smessage.setDelayTimeLevel(4);producer.send(message, new SendCallback() {//成功后执行的方法@Overridepublic void onSuccess(SendResult sendResult) {log.info("延迟消费成功");}//失败后执行的方法@Overridepublic void onException(Throwable throwable) {log.error("还未到指定的消费时间");}});}//关闭生产者producer.shutdown();}

30s后发现已进来消费

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zZ3ahzdE-1678195925254)(Rocketmq%E6%8A%80%E6%9C%AF%E8%AF%A6%E8%A7%A3.assets/image-20220801160808898.png)]

为什么能同时消费这么多数据,因为rocketmq在那一瞬间同时去队列中拿数据,那一瞬间一起消费掉。

看了下后台,发现读和写都是4个队列。其中 perm为6 指的是可读可写的队列为6

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-geVd3zmw-1678195925254)(Rocketmq%E6%8A%80%E6%9C%AF%E8%AF%A6%E8%A7%A3.assets/image-20220801161056706.png)]

6.根据Tag演示

1.生成者代码

/*** @author hjt* @date 2019/8/21*/
@Component
@Slf4j
public class TagProducer {@Resourceprivate RocketMQTemplate rocketMQTemplate;public void sendTagsMessage() {String[] tags = new String[]{"A", "B", "C", "D"};String message = "tags message :  ";for (int i = 0; i < tags.length; i++) {rocketMQTemplate.syncSend("topic-tags:" + tags[i], message + tags[i]);}}
}

2.消费者代码

/*** @author hjt* @date 2019/8/21*/
@Component
@Slf4j
@RocketMQMessageListener(topic = "topic-tags",consumerGroup = "tags-consumer-group",selectorExpression = "A||C")
public class TagConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("messgaetag:"+message);log.info("======={}=======", message);}
}

运行结果: 因为 selectorExpression = “A||C” 选择A和C

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JN0sDuGu-1678195925254)(Rocketmq%E6%8A%80%E6%9C%AF%E8%AF%A6%E8%A7%A3.assets/image-20220801163823708.png)]

ps:注意

rocketMQTemplate.syncSend("topic-tags:" + tags[i], message + tags[i]);  //topic-tags: 一定要有冒号

7.分布式事务

(1)相关概念

RocketMQ在其消息定义的基础上,对事务消息扩展了两个相关的概念:

1、Half(Prepare) Message——半消息(预处理消息)
半消息是一种特殊的消息类型,该状态的消息暂时不能被Consumer消费。当一条事务消息被成功投递到Broker上,但是Broker并没有接收到Producer发出的二次确认时,该事务消息就处于"暂时不可被消费"状态,该状态的事务消息被称为半消息。

2、Message Status Check——消息状态回查
由于网络抖动、Producer重启等原因,可能导致Producer向Broker发送的二次确认消息没有成功送达。如果Broker检测到某条事务消息长时间处于半消息状态,则会主动向Producer端发起回查操作,查询该事务消息在Producer端的事务状态(Commit 或 Rollback)。可以看出,Message Status Check主要用来解决分布式事务中的超时问题。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GafVnHM9-1678195925255)(Rocketmq%E6%8A%80%E6%9C%AF%E8%AF%A6%E8%A7%A3.assets/image-20220802092407538.png)]

1、A服务先发送个Half Message给Brock端,消息中携带 B服务 即将要+100元的信息。

2、当A服务知道Half Message发送成功后,那么开始第3步执行本地事务。

3、执行本地事务(会有三种情况1、执行成功。2、执行失败。3、网络等原因导致没有响应)

4.1)、如果本地事务成功,那么Product像Brock服务器发送Commit,这样B服务就可以消费该message。

4.2)、如果本地事务失败,那么Product像Brock服务器发送Rollback,那么就会直接删除上面这条半消息。

4.3)、如果因为网络等原因迟迟没有返回失败还是成功,那么会执行RocketMQ的回调接口,来进行事务的回查。

什么情况会回查

也会有两种情况

1)执行本地事务的时候,由于突然网络等原因一直没有返回执行事务的结果(commit或者rollback)导致最终返回UNKNOW,那么就会回查。2) 本地事务执行成功后,返回Commit进行消息二次确认的时候的服务挂了,在重启服务那么这个时候在brock端它还是个Half Message(半消息),这也会回查。

特别注意: 如果回查,那么一定要先查看当前事务的执行情况,再看是否需要重新执行本地事务。

想象下如果出现第二种情况而引起的回查,如果不先查看当前事务的执行情况,而是直接执行事务,那么就相当于成功执行了两个本地事务。

为什么说MQ是最终一致性事务

通过上面这幅图,我们可以看出,在上面举例事务不一致的两种情况中,永远不会发生

A账户减100 (失败),B账户加100 (成功)

因为:如果A服务本地事务都失败了,那B服务永远不会执行任何操作,因为消息压根就不会传到B服务。

那么 A账户减100 (成功),B账户加100 (失败) 会不会可能存在的。

答案是会的

因为A服务只负责当我消息执行成功了,保证消息能够送达到B,至于B服务接到消息后最终执行结果A并不管。

那B服务失败怎么办?

如果B最终执行失败,几乎可以断定就是代码有问题所以才引起的异常,因为消费端RocketMQ有重试机制,如果不是代码问题一般重试几次就能成功。

如果是代码的原因引起多次重试失败后,也没有关系,将该异常记录下来,由人工处理,人工兜底处理后,就可以让事务达到最终的一致性。

补充说明

(2)消息事务样例

事务消息共有三种状态,提交状态、回滚状态、中间状态:

  • TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。

  • TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。

  • TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。

    事务消息使用上的限制

    1. 事务消息不支持延时消息和批量消息。
    2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionalMessageCheckListener 类来修改这个行为。
    3. 事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。
    4. 事务性消息可能不止一次被检查或消费。
    5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
    6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。

(3)演示demo

看 生产者代码

package com.hjt.transaction;import com.hjt.message.Message;
import com.hjt.message.MessageTransaction;
import com.hjt.transaction.mapper.TransactionMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.UUID;/*** @author hjt* @date 2019/8/20*/
@Component
@Slf4j
public class TransactionProducer {@Resourceprivate RocketMQTemplate rocketMQTemplate;public void produce() {MessageTransaction<String> message = new MessageTransaction<>();//在正在的业务中 Aid和Bid应该是前端已经知道是啥,传给后端,比如A的userId和B的UserIdmessage.setAId(UUID.randomUUID().toString());message.setBId(UUID.randomUUID().toString());message.setContent("B即将要+100元,A要减100元");log.info("========sending message=========:{}",message);
//        rocketMQTemplate.sendMessageInTransaction("tx-group", "topic-tx", MessageBuilder.withPayload(message).build(), null); 2.0.3有这个版本 tx-grouprocketMQTemplate.sendMessageInTransaction( "topic-tx", MessageBuilder.withPayload(message).build(), null);log.info("========finish send =========");}}

监听者代码

import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.StringUtils;import java.util.concurrent.ConcurrentHashMap;/*** @author hjt* @date 2019/8/20*/
@Slf4j
//@RocketMQTransactionListener(txProducerGroup = "tx-group")  2.0.3的版本有这个
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {/**** 存放事务的状态 支持并发的场景*/private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {log.info("==============进到这里说明 Half Message 发送成功");//获取队列中的事务idString rocketmqTransactionId = getRocketmqTransactionId(msg);try{//模拟 执行A服务-100元操作int redMoneyByA = -100;//定义// 0 是中间状态  1 是提交事务状态 2是回滚事务localTrans.put(rocketmqTransactionId,1);//模拟 执行A服务-100元操作失败
//            int redMoneyExceptionByA = 100/0;return RocketMQLocalTransactionState.UNKNOWN;}catch (Exception e){// 执行A服务-100元操作出现异常就  事务回查 调用下面的checkLocalTransaction方法localTrans.put(rocketmqTransactionId,2);log.error("插入数据库失败,原因为:{}",e.getMessage());return RocketMQLocalTransactionState.UNKNOWN;}}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {log.info("============== 模拟回查本地事务 checkLocalTransaction");Object payload = msg.getPayload();MessageHeaders headers = msg.getHeaders();System.out.println("输出:" + payload);System.out.println("输出:" + headers);String rocketmqTransactionId = getRocketmqTransactionId(msg);//查rocketmqTransactionId的事务状态Integer status = localTrans.get(rocketmqTransactionId);if(null!=status){switch (status){case 0:log.info("============== 模拟回查本地事务结束 提交状态为:UNKNOWN");return RocketMQLocalTransactionState.UNKNOWN;case 1:log.info("============== 模拟回查本地事务结束 提交状态为:COMMIT");return RocketMQLocalTransactionState.COMMIT;case 2:log.info("============== 模拟回查本地事务结束 提交状态为:ROLLBACK");return RocketMQLocalTransactionState.ROLLBACK;}}log.info("============== 模拟回查本地事务结束,提交状态为 ROLLBACK");return RocketMQLocalTransactionState.ROLLBACK;}/**** 获取事务id* @param msg* @return*/public  String getRocketmqTransactionId(Message msg){JSONObject json = JSONUtil.parseObj(msg.getHeaders(), false, true);String rocketmqTransactionId = (String)json.get("rocketmq_TRANSACTION_ID");String topic = (String)json.get("rocketmq_TOPIC");log.info("=======事务id========{}",rocketmqTransactionId);log.info("=======topic========{}",topic);if(!StringUtils.isEmpty(rocketmqTransactionId)){return rocketmqTransactionId;}return "";}

消费者代码

package com.hjt.transaction;import com.hjt.message.MessageTransaction;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;/*** @author hjt* @date 2019/8/20*/
@Slf4j
@Service
@RocketMQMessageListener(topic = "topic-tx", consumerGroup = "tx-consumer-group")
public class TransactionConsumer implements RocketMQListener<MessageTransaction> {@Overridepublic void onMessage(MessageTransaction message) {log.info("topic-tx received message: {}", message);log.info("消费端开始消费信息 执行B服务加100操作");//执行B服务加100的操作try{//B服务加100int addMoneyByB = 100;}//如果B服务加100失败,可是A已经减100成功了,这时候要把异常记录下来,人工进行处理catch (Exception e){log.error("B服务加100异常,需要人工处理,异常信息为:{}",e.getMessage());//用一张异常表单独记录  该消息的id 可以作为异常表的主键String id = message.getBId();}}}

rocketmq会稍微等一点时间再去执行checkLocalTransaction方法

正常运行结果:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ui4HRcDI-1678195925255)(Rocketmq%E6%8A%80%E6%9C%AF%E8%AF%A6%E8%A7%A3.assets/image-20220802153757798.png)]

模拟下执行A操作异常的时候

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NSg9hSop-1678195925255)(Rocketmq%E6%8A%80%E6%9C%AF%E8%AF%A6%E8%A7%A3.assets/image-20220802153838039.png)]

运行结果:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YFGOirOj-1678195925255)(Rocketmq%E6%8A%80%E6%9C%AF%E8%AF%A6%E8%A7%A3.assets/image-20220802154033384.png)]

相关文章:

Rocketmq技术详解

Rocketmq技术详解 运维部署 docker-compose.yml version: 3.5 services:rmqnamesrv:image: foxiswho/rocketmq:servercontainer_name: rmqnamesrvports:- 9876:9876volumes:- ./logs:/opt/logs- ./store:/opt/storenetworks:rmq:aliases:- rmqnamesrvrmqbroker:image: foxisw…...

TeeChart VCL/FMX v2023 crack

TeeChart VCL/FMX v2023 crack TeeChart Pro VCL允许您为所有领域(包括商业、工程、金融、统计、科学、医疗、实时和网络)创建通用和专用图表和绘图应用程序。TeeChart Pro VCL具有多种图表类型的图表库&#xff0c;包括2D或3D线条、条形图、水平条、区域、点、饼图、箭头、气泡…...

[Java·算法·困难]LeetCode32. 最长有效括号

每天一题&#xff0c;防止痴呆题目示例分析思路1题解1分析思路2题解2分析思路3题解3&#x1f449;️ 力扣原文 题目 给你一个只包含 ( 和 ) 的字符串&#xff0c;找出最长有效&#xff08;格式正确且连续&#xff09;括号子串的长度。 示例 输入&#xff1a;s "(()&q…...

pytorch如何搭建一个最简单的模型,

一、搭建模型的步骤 在 PyTorch 中&#xff0c;可以使用 torch.nn 模块来搭建深度学习模型。具体步骤如下&#xff1a; 定义一个继承自 torch.nn.Module 的类&#xff0c;这个类将作为我们自己定义的模型。 在类的构造函数 __init__() 中定义网络的各个层和参数。可以使用 to…...

JS实现css的hover效果,兼容移动端

Hi I’m Shendi JS实现css的hover效果&#xff0c;兼容移动端 功能概述 CSS的hover即触碰时触发&#xff0c;在电脑端鼠标触碰&#xff0c;移动端手指触摸 有的时候光靠css实现不了一些效果&#xff0c;例如元素触发hover&#xff0c;其他元素触发动画效果&#xff0c;所以需要…...

企业微信的后台怎么进入和管理?

企业微信管理后台&#xff0c;只有企业的管理员才可以进企业微信后台&#xff0c;普通员工想要进入后台、可以联系管理员将你设置为后台管理员。 一、怎么进入企业微信后台 管理员进入企业微信后台有两种路径&#xff1b; 路径一&#xff1a; 企业管理员直接在浏览器搜索企…...

【2223sW2】LOG2

写在前面 好好学习&#xff0c;走出宿舍&#xff0c;走向毕设&#xff01; 一些心路历程记录&#xff0c;很少有代码出现 因为鬼知道哪条代码到时候变成毕设的一部分了咧&#xff0c;还是不要给自己的查重挖坑罢了 23.3.2 检验FFT 早上师兄帮忙看了一眼我画的丑图&#xff…...

buuctf-web-[SUCTF 2018]MultiSQL1

打开界面&#xff0c;全部点击一遍&#xff0c;只有注册和登录功能可以使用注册一个账号&#xff0c;注册admin提示用户存在&#xff0c;可能有二次注入&#xff0c;注册admin自动加了一个字符&#xff0c;无法二次注入&#xff0c;点击其他功能点换浏览器重新登录后&#xff0…...

GitLab创建仓库分配权限

文章目录创建仓库分配权限参考资料创建仓库 点击“New project”创建新项目 分配权限 点击左侧菜单栏“Members”成员&#xff0c;菜单 “Invite member”邀请成员&#xff0c;添加人员&#xff1b;“Invite group”邀请组织&#xff0c;添加一个组织所有成员下面输入框搜索…...

代码随想录-51-110.平衡二叉树

目录前言题目1.求高度和深度的区别节点的高度节点的深度2. 本题思路分析&#xff1a;3. 算法实现4. pop函数的算法复杂度5. 算法坑点前言 在本科毕设结束后&#xff0c;我开始刷卡哥的“代码随想录”&#xff0c;每天一节。自己的总结笔记均会放在“算法刷题-代码随想录”该专…...

项目实战典型案例27——对生产环境以及生产数据的敬畏之心

对生产环境以及生产数据的敬畏之心一&#xff1a;背景介绍总结升华一&#xff1a;背景介绍 本篇博客是对项目开发中出现的对生产环境以及生产数据的敬畏之心行的总结并进行的改进。目的是将经历转变为自己的经验。通过博客的方式分享给大家&#xff0c;大家一起共同进步和提高…...

如何查找你的IP地址?通过IP地址能直接定位到你家!

我们ip地址分为A、B、C、D、E共5类&#xff0c;每一类地址范围不同&#xff0c;从A到Eip地址范围依次递减&#xff0c;其中哦&#xff0c;D和E是保留地址&#xff0c;我们用不了。A、B、C3类地址很多都被美国这样的西方国家分走了&#xff0c;而留给我们的就剩有限的地址了&…...

Containers--array类

Array 类 简介 Array 类是一个固定大小的数组&#xff0c;它的大小在编译时就已经确定了。Array 类的大小是固定的&#xff0c;因此它的大小不能改变。 数组是固定大小的序列容器:它们以严格的线性顺序保存特定数量的元素。 在内部&#xff0c;数组除了包含的元素之外不保留…...

LinqConnect兼容性并支持Visual Studio 2022版本

LinqConnect兼容性并支持Visual Studio 2022版本 现在支持Microsoft Visual Studio 2022版本17.5预览版。 添加了Microsoft.NET 7兼容性。 共享代码-共享相同的代码&#xff0c;以便在不同的平台上处理数据。LinqConnect是一种数据库连接解决方案&#xff0c;适用于不同的基于.…...

流量监管与整形

流量监管与整形概览流量监管介绍流量监管令牌桶流量监管的具体实现单桶单速流量监管双桶单速流量监管双桶双速流量监管流量整形介绍GTS&#xff08;Generic Traffic Shaping&#xff09;LR&#xff08;Line Rate&#xff09;流量整形与流量监管的区别概览 流量整形是对报文的速…...

详解init 容器

什么是init容器 init 容器是一种特殊容器&#xff0c;在 Pod 内的应用容器启动之前运行。Init 容器可以包括一些应用镜像中不存在的实用工具和安装脚本。 你可以在 Pod 的规约中与用来描述应用容器的 containers 数组平行的位置指定 Init 容器 每个 Pod 中可以包含多个容器&…...

RequestResponseBodyMethodProcessor

既是一个参数解析器&#xff0c;也是一个返回结果处理器。 1.持有消息转换器的集合 protected final List<HttpMessageConverter<?>> messageConverters;2.作为参数解析器&#xff0c;例如对RequestBody标识的参数进行解析 判断是否支持当前类型的参数 Overrid…...

函数的极限

目录 函数的极限 函数极限的定义&#xff1a; 例题&#xff1a; 左右极限&#xff1a; 自变量趋于无穷大时函数的极限&#xff1a; 例题&#xff1a; 函数极限的性质&#xff1a; 函数极限与数列极限之间的关系&#xff1a; 函数的极限 函数极限的定义&#xff1a; 一句…...

dnf命令使用

1. 简介 DNF是新一代的rpm软件包管理器。他首先出现在 Fedora 18 这个发行版中。而最近&#xff0c;它取代了yum&#xff0c;正式成为 Fedora 22 的包管理器 DNF包管理器克服了YUM包管理器的一些瓶颈&#xff0c;提升了包括用户体验&#xff0c;内存占用&#xff0c;依赖分析…...

CLIP CLAP

文章目录CLIPabstractintroCLAP: LEARNING AUDIO CONCEPTS FROM NATURAL LANGUAGE SUPERVISIONabstractmethodCLIP open AI2021.2代码&预训练模型 abstract 原有的基于有监督数据训练的计算机分类任务&#xff0c;在面对新的分类目标时泛化性和可用性都会变差&#xff1…...

Java的Random类

在Java中&#xff0c;java.util.Random 类是日常开发中最常用的伪随机数生成器。它基于线性同余算法生成随机数&#xff0c;只要给定相同的初始值&#xff08;种子 seed&#xff09;&#xff0c;就能生成完全相同的随机数序列。 &#x1f3b2; Random 类的基础使用 使用 Random…...

对比使用Taotoken前后,个人开发者的月度AI调用成本变化

&#x1f680; 告别海外账号与网络限制&#xff01;稳定直连全球优质大模型&#xff0c;限时半价接入中。 &#x1f449; 点击领取海量免费额度 对比使用Taotoken前后&#xff0c;个人开发者的月度AI调用成本变化 在原型开发与日常编码辅助中&#xff0c;频繁调用大模型API已成…...

图解人工智能(10)人工智能的发展历程

人工智能自20世纪50年代发展至今&#xff0c;经历了若干次高潮和低谷。每到陷入困境的时候&#xff0c;总有一些科学家勇敢地打破传统思想的束缚&#xff0c;创造出新理论、新方法&#xff0c;使人工智能重现生机。例如&#xff0c;在符号主义陷入危机的时候&#xff0c;费根鲍…...

3步免费获取公式识别神器:img2latex-mathpix本地部署终极指南

3步免费获取公式识别神器&#xff1a;img2latex-mathpix本地部署终极指南 【免费下载链接】img2latex-mathpix Mathpix has changed their billing policy and no longer has free monthly API requests. This repo is now archived and will not receive any updates for the …...

工业视觉杂散物检测系统方案设计

构建一套完整可靠的工业视觉检测系统&#xff0c;核心在于将其无缝嵌入到现有的装配流程中。下面是一个从系统架构部署、执行标准、再到具体模块技术选型的完整实施方案&#xff0c;希望能帮你构建一套精准且高效的检测闭环。 &#x1f3d7;️ 系统总体架构 一个完整的检测系统…...

Cursor Pro永久免费使用终极指南:如何绕过试用限制完整教程

Cursor Pro永久免费使用终极指南&#xff1a;如何绕过试用限制完整教程 【免费下载链接】cursor-free-vip [Support 0.45]&#xff08;Multi Language 多语言&#xff09;自动注册 Cursor Ai &#xff0c;自动重置机器ID &#xff0c; 免费升级使用Pro 功能: Youve reached you…...

BBDown终极指南:5分钟掌握B站视频本地化完整解决方案

BBDown终极指南&#xff1a;5分钟掌握B站视频本地化完整解决方案 【免费下载链接】BBDown Bilibili Downloader. 一个命令行式哔哩哔哩下载器. 项目地址: https://gitcode.com/gh_mirrors/bb/BBDown 在数字内容爆炸的时代&#xff0c;你是否曾为无法离线观看B站优质视频…...

微信消息自动转发终极指南:5分钟实现跨群智能消息同步

微信消息自动转发终极指南&#xff1a;5分钟实现跨群智能消息同步 【免费下载链接】wechat-forwarding 在微信群之间转发消息 项目地址: https://gitcode.com/gh_mirrors/we/wechat-forwarding 在微信群管理和协作场景中&#xff0c;消息的自动转发与同步是提升效率的关…...

轴承剩余寿命预测 | 基于BP神经网络的轴承剩余寿命预测MATLAB实现!

研究背景 该代码基于IEEE PHM 2012数据挑战赛的轴承全寿命加速退化实验数据&#xff0c;旨在利用数据驱动方法预测滚动轴承的剩余使用寿命&#xff08;RUL&#xff09;。实验中轴承在恒定负载下持续运行至失效&#xff0c;期间通过水平/竖直加速度传感器以25.6 kHz采样频率每隔…...

开源项目remote2mac:用Windows远程桌面无缝控制macOS

1. 项目概述&#xff1a;远程桌面连接的另一条路如果你是一名需要在Windows电脑上远程控制macOS设备的开发者、设计师或者运维人员&#xff0c;那么“远程桌面”这个需求对你来说一定不陌生。传统的方案&#xff0c;比如微软的RDP&#xff08;远程桌面协议&#xff09;对Window…...