当前位置: 首页 > news >正文

RocketMQ 5.1.0 源码详解 | Producer 发送流程

文章目录

  • 初始化DefaultMQProducer实例
  • 发送流程
    • DefaultMQProducer#send
    • DefaultMQProducerImpl#send
    • MQClientInstance#updateTopicRouteInfoFromNameServer
      • 使用特定 topic 获取路由信息
      • 使用默认 topic 获取路由信息
    • DefaultMQProducerImpl#sendDefaultImpl
    • 发送流程总结

初始化DefaultMQProducer实例

详细内容见文章
RocketMQ 5.1.0 源码详解 | Producer 启动流程
第一部分

发送流程

DefaultMQProducer#send

只需要执行以下代码即可开始消息的发送流程

try {Message msg = new Message(TOPIC, TAG, "OrderID188", "Hello world".getBytes(StandardCharsets.UTF_8));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);
} catch (Exception e) {e.printStackTrace();
}

RocketMQ 发送普通消息有同步(Sync)发送、异步(Async)发送和单向(Oneway)发送三种方式,send() 方法中只传入 message 则默认为 SYNC 模式

producersend 方法内容如下

public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {msg.setTopic(withNamespace(msg.getTopic()));return this.defaultMQProducerImpl.send(msg);
}

可以看到在发送消息时 DefaultMQProducer 也只是一个门面类,具体的实现都是由 DefaultMQProducerImpl 去做的

DefaultMQProducerImpl#send

DefaultMQProducerImplsend 方法内容如下

public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

可以看到,基本就是继续调用了几个函数以补齐缺失的参数如超时时间、发送消息的类型和回调函数(由于是同步发送因此回调函数为 null),发送消息的逻辑则主要是在 sendDefaultImpl 方法中实现的

由于此方法内容太多,因此先看看整体的流程

private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 确认生产者处于RUNNING状态this.makeSureStateOK();// 检查消息是否合法Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID = random.nextLong();long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;// 获取topic的路由信息TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());// topicPublishInfo不为空且可用if (topicPublishInfo != null && topicPublishInfo.ok()) {...}// 校验 NameServer 配置是否正确validateNameServerSetting();throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

首先检查生产者是否处于 RUNNING 状态,接着检查要发送的消息是否合法,然后会调用 tryToFindTopicPublishInfo 获取路由信息,如果获取成功则进入分支语句中的逻辑,否则校验 NameServer 配置是否正确。如果 NameServer 配置为空则抛出 No name server address 异常,否则抛出 No route info of this topic 异常

由于其他的逻辑相对容易,我们接下来先直接分析 tryToFindTopicPublishInfo 方法的内容

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {// 从本地缓存(ConcurrentMap< String/* topic */,  TopicPublishInfo>)中尝试获取,第一次肯定为空TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);if (null == topicPublishInfo || !topicPublishInfo.ok()) {this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());// 1.尝试从NameServer获取特定topic路由信息并更新本地缓存配置this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);topicPublishInfo = this.topicPublishInfoTable.get(topic);}// 如果找到可用的路由信息并返回if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {return topicPublishInfo;} else { // 2.如果未找到路由信息,则再次尝试使用默认的topic获取路由信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);topicPublishInfo = this.topicPublishInfoTable.get(topic);return topicPublishInfo;}
}

可以看到此方法首先会从本地的 topicPublishInfoTable 中寻找 topicPublishInfo,由于之前没有向 topic 发送过消息,因此第一次必然不会从本地找到

此时会首先向 topicPublishInfoTable 中添加空白 topicPublishInfo,然后再调用 mQClientFactory 对象的 updateTopicRouteInfoFromNameServer 方法来更新 topicPublishInfoTabletopicPublishInfo 的数据

又因为是向一个还不存在的 topic 发送消息,因此第一次尝试从 NameServer 获取配置信息并更新本地缓存配置失败,会进行尝试使用默认的 topic 去找路由配置信息

MQClientInstance#updateTopicRouteInfoFromNameServer

由上述章节可知此方法被调用了两次,第一次尝试从 NameServer 获取特定 topic 路由信息并更新本地缓存配置失败,第二次尝试使用默认的 topic 获取路由信息

使用特定 topic 获取路由信息

第一次尝试使用特定 topic 获取路由信息,调用方法为 updateTopicRouteInfoFromNameServer(topic)

public boolean updateTopicRouteInfoFromNameServer(final String topic) {return updateTopicRouteInfoFromNameServer(topic, false, null);
}

此方法又会调用其重载方法,即updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer),其中 isDefault 传入的值为 false

由于方法的内容太多,因此我们只看代码走过的部分

TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {// ... 
} else {// 获取指定topic的配置信息topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
}

isDefault 的值为 false,因此进入 else 分支,尝试从 NameServer 中获取特定 topic 的路由信息,其中 getTopicRouteInfoFromNameServer 方法通过 Netty 使用 RPC 调用获取 Topic 路由信息,方法内容如下

public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)throws RemotingException, MQClientException, InterruptedException {return getTopicRouteInfoFromNameServer(topic, timeoutMillis, true);
}public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();requestHeader.setTopic(topic);RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.TOPIC_NOT_EXIST: {if (allowTopicNotExist) {log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);}break;}//...}throw new MQClientException(response.getCode(), response.getRemark());
}

但是我们向一个不存在的 topic 发送消息,因此进入 case ResponseCode.TOPIC_NOT_EXIST 分支。又因为 allowTopicNotExist 传入的值为 true,所以打印警告并抛出异常,方法结束

使用默认 topic 获取路由信息

第二次获取时调用了 updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) ,其中 isDefault 传入的值为 true

TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {// 从NameServer中获取默认的topic路由信息topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),clientConfig.getMqClientApiTimeout());if (topicRouteData != null) {// 修正topic路由信息中的读写队列数,使其最大不超过默认的topic队列数for (QueueData data : topicRouteData.getQueueDatas()) {int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());data.setReadQueueNums(queueNums);data.setWriteQueueNums(queueNums);}}
}

上述代码分为两个步骤:

  1. 从 NameServer 中获取默认 topic 即 TBW102 的路由信息
  2. 修正获取到的默认 topic 路由信息

此时我们的 topicRouteData 不为空,且其 QueueData 属性也经过了修正,具体内容如下

TopicRouteData [orderTopicConf=null, queueDatas=[QueueData [brokerName=broker-a, readQueueNums=4, writeQueueNums=4, perm=6, topicSysFlag=0]], brokerDatas=[BrokerData [brokerName=broker-a, brokerAddrs={0=192.168.142.1:10911}, enableActingMaster=false]], filterServerTable={}, topicQueueMappingInfoTable=null
]

接着执行下面的代码

if (topicRouteData != null) {TopicRouteData old = this.topicRouteTable.get(topic);// 与本地缓存中的 topic 发布信息进行比较,如果有变化,则需要同步更新发送者、消费者关于该 topic 的缓存boolean changed = topicRouteData.topicRouteDataChanged(old);if (!changed) {changed = this.isNeedUpdateTopicRouteInfo(topic);} else {log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);}if (changed) { // 如果有变化,则需要同步更新发送者、消费者关于该 topic 的缓存for (BrokerData bd : topicRouteData.getBrokerDatas()) {// 更新broker地址this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());}// Update endpoint map{ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, topicRouteData);if (!mqEndPoints.isEmpty()) {topicEndPointsTable.put(topic, mqEndPoints);}}// Update Pub info{// 根据topic路由信息组装TopicPublishInfo对象TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);publishInfo.setHaveTopicRouterInfo(true);for (Entry<String, MQProducerInner> entry : this.producerTable.entrySet()) {MQProducerInner impl = entry.getValue();if (impl != null) {// 更新DefaultMQProducerImpl的topicPublishInfoTable表impl.updateTopicPublishInfo(topic, publishInfo);}}}// Update sub info 生产者实例的consumerTable为空if (!consumerTable.isEmpty()) {//...}TopicRouteData cloneTopicRouteData = new TopicRouteData(topicRouteData);log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);this.topicRouteTable.put(topic, cloneTopicRouteData);return true;}
}

很明显新获取到的和本地缓存中的 topic 路由信息相比有变化,因此 changed 为 true

接着会根据 topicRouteData 组装TopicPublishInfo 对象,并将其保存到 DefaultMQProducerImpltopicPublishInfoTable 中,key 为 topic 名称,value 为 TopicPublishInfo 对象

最后将 topicRouteData 保存在 topicRouteTable 中,方法结束

DefaultMQProducerImpl#sendDefaultImpl

现在我们已经获取到了要发送的 topic 的发布路由 topicPublishInfo,之后就开始发送了

boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
// 发送失败后重试最多的次数
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {String lastBrokerName = null == mq ? null : mq.getBrokerName();// 选择一个MessageQueue发送消息MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected != null) {//发送消息...} else {break;}
}

其中 selectOneMessageQueue 方法就是选择一个可用的 MessageQueue 发送消息

在这里插入图片描述

如上图所示,MessageQueue 有一个三元组标识唯一一个队列,即 (topic, brokerName, queueId),最上方的 MessageQueue 的三元组可能是 (TopicTest, broker-a, 0)

当我们得到了要发送的 MessageQueue 后就开始执行发送消息的步骤

mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {beginTimestampPrev = System.currentTimeMillis();if (times > 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {callTimeout = true;break;}// 向 MessageQueue 发送消息sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:// 同步调用方式(SYNC)下如果发送失败则执行失败重试策略,默认重试两次,即最多发送三次if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}
}

通过代码可以看出又调用了 sendKernelImpl 方法发送消息

private SendResult sendKernelImpl(final Message msg,final MessageQueue mq,final CommunicationMode communicationMode,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();String brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);if (null == brokerAddr) {tryToFindTopicPublishInfo(mq.getTopic());brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);}SendMessageContext context = null;if (brokerAddr != null) {// 根据配置判断是否使用VIP通道brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);byte[] prevBody = msg.getBody();try {//for MessageBatch,ID has been set in the generating process// 检查消息是否为 MessageBatch 类型if (!(msg instanceof MessageBatch)) {MessageClientIDSetter.setUniqID(msg);}boolean topicWithNamespace = false;// 检查客户端配置中是否设置了命名空间if (null != this.mQClientFactory.getClientConfig().getNamespace()) {msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());topicWithNamespace = true;}// sysFlag是消息的系统标志位,包含压缩标志位、事务标志位、批量标志位、多队列标志位等int sysFlag = 0;boolean msgBodyCompressed = false;// 尝试压缩消息体if (this.tryToCompressMessage(msg)) {sysFlag |= MessageSysFlag.COMPRESSED_FLAG;sysFlag |= compressType.getCompressionFlag();msgBodyCompressed = true;}final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);// 检查消息是否为事务消息if (Boolean.parseBoolean(tranMsg)) {sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;}// 发送消息的校验钩子if (hasCheckForbiddenHook()) {CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());checkForbiddenContext.setCommunicationMode(communicationMode);checkForbiddenContext.setBrokerAddr(brokerAddr);checkForbiddenContext.setMessage(msg);checkForbiddenContext.setMq(mq);checkForbiddenContext.setUnitMode(this.isUnitMode());this.executeCheckForbiddenHook(checkForbiddenContext);}// 发送消息前的钩子if (this.hasSendMessageHook()) {context = new SendMessageContext();context.setProducer(this);context.setProducerGroup(this.defaultMQProducer.getProducerGroup());context.setCommunicationMode(communicationMode);context.setBornHost(this.defaultMQProducer.getClientIP());context.setBrokerAddr(brokerAddr);context.setMessage(msg);context.setMq(mq);context.setNamespace(this.defaultMQProducer.getNamespace());String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (isTrans != null && isTrans.equals("true")) {context.setMsgType(MessageType.Trans_Msg_Half);}if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {context.setMsgType(MessageType.Delay_Msg);}this.executeSendMessageHookBefore(context);}// 设置发送消息的请求头SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTopic(msg.getTopic());requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());requestHeader.setQueueId(mq.getQueueId());requestHeader.setSysFlag(sysFlag);requestHeader.setBornTimestamp(System.currentTimeMillis());requestHeader.setFlag(msg.getFlag());requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));requestHeader.setReconsumeTimes(0);requestHeader.setUnitMode(this.isUnitMode());requestHeader.setBatch(msg instanceof MessageBatch);requestHeader.setBname(brokerName);// 如果是重发消息,则设置重发消息的次数if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {// 重发消息的次数String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);if (reconsumeTimes != null) {// 设置重发消息的次数requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));// 清除消息的重发次数属性,因为消息的重发次数属性是在消息重发时设置的MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);}// 消息的最大重发次数String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);if (maxReconsumeTimes != null) {// 设置消息的最大重发次数requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));// 清除消息的最大重发次数属性,因为消息的最大重发次数属性是在消息重发时设置的MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);}}SendResult sendResult = null;switch (communicationMode) {case ASYNC:Message tmpMessage = msg;boolean messageCloned = false;if (msgBodyCompressed) {//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;// 防止压缩后的消息体重发时被再次压缩msg.setBody(prevBody);}if (topicWithNamespace) {if (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;}// 防止设置了命名空间的topic重发时被再次设置命名空间msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,brokerName,tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);break;case ONEWAY:case SYNC:long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,brokerName,msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:assert false;break;}// 发送消息后的钩子if (this.hasSendMessageHook()) {context.setSendResult(sendResult);this.executeSendMessageHookAfter(context);}return sendResult;} catch (RemotingException | InterruptedException | MQBrokerException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} finally {msg.setBody(prevBody);msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}}throw new MQClientException("The broker[" + brokerName + "] not exist", null);
}

这段代码虽然比较长,但是结合注释还是挺容易理解的。不过其中在异步 (ASYNC) 发送消息时有下面一段代码可能会让人疑惑

Message tmpMessage = msg;
boolean messageCloned = false;
if (msgBodyCompressed) {//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;// 防止压缩后的消息体重发时被再次压缩msg.setBody(prevBody);
}if (topicWithNamespace) {if (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;}// 防止设置了命名空间的topic重发时被再次设置命名空间msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}

这段代码主要是克隆了一个和 msg 内容一样的 tmpMessage 并发送,而 msg 本身的 body 被设置成了压缩之前的 body,topic 也被设置成了添加命名空间之前的 topic

发送流程总结

  1. 检查消息是否合法
  2. 获取 topic 路由信息
    1. 先尝试从本地获取路由信息,没有则向 NameServer 获取
      1. 向 NameServer 获取路由信息并更新本地缓存,没有则抛出异常并返回
      2. 从本地获取路由信息
    2. 如果本地扔获取不到路由信息则获取默认路由信息
      1. 向 NameServer 获取默认路由信息,如果获取不到则抛出异常并返回
      2. 修改获取到的默认路由信息为新的 topic 的路由信息
      3. 更新本地路由信息缓存
  3. 获取路由信息成功;失败则跳转到第4步
    1. 选择一个 MessageQueue
    2. MessageQueue 发送消息
      1. 根据配置判断是否使用 VIP 通道
      2. 检查消息是否为 MessageBatch 类型
      3. 检查客户端配置中是否设置了命名空间
      4. 设置消息的标志位 sysFlag
        1. 尝试压缩消息体并更新 sysFlag
        2. 检查消息是否为事务消息并更新 sysFlag
      5. 调用钩子函数
      6. 设置消息请求头
      7. 根据发送消息的方式发送消息
  4. 获取路由信息失败
    1. 校验 NameServer 配置是否正确
    2. 抛出异常结束

相关文章:

RocketMQ 5.1.0 源码详解 | Producer 发送流程

文章目录 初始化DefaultMQProducer实例发送流程DefaultMQProducer#sendDefaultMQProducerImpl#sendMQClientInstance#updateTopicRouteInfoFromNameServer使用特定 topic 获取路由信息使用默认 topic 获取路由信息 DefaultMQProducerImpl#sendDefaultImpl发送流程总结 初始化De…...

电脑ip地址怎么改 ip地址怎么改到别的城市

一、ip地址怎么改到别的城市 1.ip地址怎么改到别的城市&#xff0c;1、重启WIFI路由设备 一般手机或电脑在家或公司上网时都是接入到路由器的WIFI网络,再由路由器分配上网IP地址,如果要更换上网IP那么重启路由器设备后,路由器会向网络运营商进行宽带的重新拨号,此时手机或电脑设…...

Android Studio实现列表展示图片

效果&#xff1a; MainActivity 类 package com.example.tabulation;import android.content.Intent; import android.os.Bundle; import android.view.View;import androidx.appcompat.app.AppCompatActivity; import androidx.recyclerview.widget.LinearLayoutManager; im…...

每天一道leetcode:300. 最长递增子序列(动态规划中等)

今日份题目&#xff1a; 给你一个整数数组 nums &#xff0c;找到其中最长严格递增子序列的长度。 子序列 是由数组派生而来的序列&#xff0c;删除&#xff08;或不删除&#xff09;数组中的元素而不改变其余元素的顺序。例如&#xff0c;[3,6,2,7] 是数组 [0,3,1,6,2,2,7] …...

【无监督】2、MAE | 自监督模型提取的图像特征也很能打!(CVPR2022 Oral)

文章目录 一、背景二、方法三、效果 论文&#xff1a;Masked Autoencoders Are Scalable Vision Learners 代码&#xff1a;https://github.com/facebookresearch/mae 出处&#xff1a;CVPR2022 Oral | 何凯明 | FAIR 一、背景 本文的标题突出了两个词&#xff1a; masked…...

pytorch单机多卡后台运行

nohup sh ./train_chat.sh > train_chat20230814.log 2>1&参考资料 Pytorch单机多卡后台运行的解决办法...

linux配置上网 linux adsl拨号上网设置

Linux里面配置ADSL上网是件很麻烦的事。但配置完成之后就能开机自动拨号上网&#xff0c;可谓十分的方便。支持的系统有Redhat,CentOS,SuSE,FreeBSD,Ubuntu等常见的Linux。 工具/原料 ADSL网络&#xff0c;电信&#xff0c;网通&#xff0c;移动等常见宽带。 Linux系统的安装光…...

XML学习基础知识归纳(一)

一、XML基本概述 &#xff08;1&#xff09;概念&#xff1a;XML是可扩展的标记语言&#xff0c;xml文档的后缀名为 .xml &#xff08;2&#xff09;作用&#xff1a;用来用来传输和存储数据&#xff0c;不用于表现和展示数据&#xff0c;这点呢相比于HTML来说是不同的&#…...

2023.8.14论文阅读

文章目录 ESPNet: Efficient Spatial Pyramid of Dilated Convolutions for Semantic Segmentation摘要本文方法实验结果 DeepFusion: Lidar-Camera Deep Fusion for Multi-Modal 3D Object Detection摘要本文方法实验结果 ESPNet: Efficient Spatial Pyramid of Dilated Convo…...

FL Studio for Windows-21.1.0.3713中文直装版功能介绍及系统配置要求

FL Studio 21简称FL水果软件,全称是&#xff1a;Fruity Loops Studio编曲&#xff0c;由于其Logo长的比较像一款水果因此&#xff0c;在大家更多的是喜欢称他为水果萝卜&#xff0c;FL studio21是目前最新的版本&#xff0c;这是一款可以让你的计算机就像是一个全功能的录音室&…...

基于网格变形的二维图像变形算法:C++实现与应用

在计算机图形学中&#xff0c;图像变形是一种常见的技术&#xff0c;它可以改变图像的形状和结构&#xff0c;以满足特定的视觉效果或者应用需求。本文将介绍一种基于网格变形的二维图像变形算法&#xff0c;并使用C进行实现。 一、算法原理 网格变形是一种基于网格的图像变形…...

【数据结构】八大排序详解

&#x1f680; 作者简介&#xff1a;一名在后端领域学习&#xff0c;并渴望能够学有所成的追梦人。 &#x1f40c; 个人主页&#xff1a;蜗牛牛啊 &#x1f525; 系列专栏&#xff1a;&#x1f6f9;数据结构、&#x1f6f4;C &#x1f4d5; 学习格言&#xff1a;博观而约取&…...

VSCode如何设置高亮

一、概述 本文主要介绍在 VSCode 看代码时&#xff0c;怎样使某个单词高亮显示&#xff0c;主要通过以下三步实现&#xff1a; 安装 highlight-words 插件 配置 highlight-words 插件 设置高亮快捷键F8 工作是嵌入式开发的&#xff0c;代码主要是C/C的&#xff0c;之前一直用…...

密钥大全ubuntu

VMware Workstation Tech Preview 20H2 GG1JR-APD1P-0857Q-DQQN9-PU2CA VMware Workstation v16 Pro for Windows&#xff08;反馈失效&#xff09; ZF3R0-FHED2-M80TY-8QYGC-NPKYF YF390-0HF8P-M81RQ-2DXQE-M2UT6 ZF71R-DMX85-08DQY-8YMNC-PPHV8 VMware Workstation v15 f…...

Spring Task入门案例

Spring Task 是Spring框架提供的任务调度工具&#xff0c;可以按照约定的时间自动执行某个代码逻辑。 定位&#xff1a;定时任务框架 作用&#xff1a;定时自动执行某段Java代码 强调&#xff1a;只要是需要定时处理的场景都可以使用Spring Task 1. cron表达式 cron表达式…...

针对Android项目蓝牙如何学习

一、概述(Overview) 蓝牙是一种专有的开放式无线技术标准,用于在固定和移动设备之间进行短距离数据交换(使用2400–2480 MHz ISM波段的短波长无线电传输),从而创建具有高度安全性的个人局域网(PANs)。由电信供应商爱立信(telecoms vendor Ericsson)于1994年创建,[1…...

C++学习笔记总结练习:内存分配器编程实现

内存分配器练习 C内存分配器是用于管理程序运行时内存的工具。它负责分配和释放内存&#xff0c;以满足程序在运行过程中的动态内存需求。在C中&#xff0c;有几种内存分配器可供选择&#xff0c;包括操作系统提供的默认分配器、自定义分配器和第三方库提供的分配器。 默认分配…...

【uniapp】使用Vs Code开发uniapp:

文章目录 一、使用命令行创建uniapp项目&#xff1a;二、安装插件与配置&#xff1a;三、编译和运行:四、修改pinia&#xff1a; 一、使用命令行创建uniapp项目&#xff1a; 二、安装插件与配置&#xff1a; 三、编译和运行: 该项目下的dist》dev》mp-weixin文件导入微信开发者…...

【STM32】高效开发工具CubeMonitor快速上手

工欲善其事必先利其器。拥有一个辅助测试工具&#xff0c;能极大提高开发项目的效率。STM32CubeMonitor系列工具能够实时读取和呈现其变量&#xff0c;从而在运行时帮助微调和诊断STM32应用&#xff0c;类似于一个简单的示波器。它是一款基于流程的图形化编程工具&#xff0c;类…...

React 使用 i18n 翻译换行解决方法

当前问题&#xff1a; json 配置文件 "detail": {"10001": "Top 10 \nBIGGEST WINS" } 按以上方式文本在渲染的时候并不能识别我们加入 \n 要实现换行的意图&#xff0c;通过拆分成两个多语来实现又太低级。 解决方法&#xff1a; 在该多语…...

idea大量爆红问题解决

问题描述 在学习和工作中&#xff0c;idea是程序员不可缺少的一个工具&#xff0c;但是突然在有些时候就会出现大量爆红的问题&#xff0c;发现无法跳转&#xff0c;无论是关机重启或者是替换root都无法解决 就是如上所展示的问题&#xff0c;但是程序依然可以启动。 问题解决…...

【根据当天日期输出明天的日期(需对闰年做判定)。】2022-5-15

缘由根据当天日期输出明天的日期(需对闰年做判定)。日期类型结构体如下&#xff1a; struct data{ int year; int month; int day;};-编程语言-CSDN问答 struct mdata{ int year; int month; int day; }mdata; int 天数(int year, int month) {switch (month){case 1: case 3:…...

【OSG学习笔记】Day 18: 碰撞检测与物理交互

物理引擎&#xff08;Physics Engine&#xff09; 物理引擎 是一种通过计算机模拟物理规律&#xff08;如力学、碰撞、重力、流体动力学等&#xff09;的软件工具或库。 它的核心目标是在虚拟环境中逼真地模拟物体的运动和交互&#xff0c;广泛应用于 游戏开发、动画制作、虚…...

突破不可导策略的训练难题:零阶优化与强化学习的深度嵌合

强化学习&#xff08;Reinforcement Learning, RL&#xff09;是工业领域智能控制的重要方法。它的基本原理是将最优控制问题建模为马尔可夫决策过程&#xff0c;然后使用强化学习的Actor-Critic机制&#xff08;中文译作“知行互动”机制&#xff09;&#xff0c;逐步迭代求解…...

LeetCode - 394. 字符串解码

题目 394. 字符串解码 - 力扣&#xff08;LeetCode&#xff09; 思路 使用两个栈&#xff1a;一个存储重复次数&#xff0c;一个存储字符串 遍历输入字符串&#xff1a; 数字处理&#xff1a;遇到数字时&#xff0c;累积计算重复次数左括号处理&#xff1a;保存当前状态&a…...

Python如何给视频添加音频和字幕

在Python中&#xff0c;给视频添加音频和字幕可以使用电影文件处理库MoviePy和字幕处理库Subtitles。下面将详细介绍如何使用这些库来实现视频的音频和字幕添加&#xff0c;包括必要的代码示例和详细解释。 环境准备 在开始之前&#xff0c;需要安装以下Python库&#xff1a;…...

爬虫基础学习day2

# 爬虫设计领域 工商&#xff1a;企查查、天眼查短视频&#xff1a;抖音、快手、西瓜 ---> 飞瓜电商&#xff1a;京东、淘宝、聚美优品、亚马逊 ---> 分析店铺经营决策标题、排名航空&#xff1a;抓取所有航空公司价格 ---> 去哪儿自媒体&#xff1a;采集自媒体数据进…...

保姆级教程:在无网络无显卡的Windows电脑的vscode本地部署deepseek

文章目录 1 前言2 部署流程2.1 准备工作2.2 Ollama2.2.1 使用有网络的电脑下载Ollama2.2.2 安装Ollama&#xff08;有网络的电脑&#xff09;2.2.3 安装Ollama&#xff08;无网络的电脑&#xff09;2.2.4 安装验证2.2.5 修改大模型安装位置2.2.6 下载Deepseek模型 2.3 将deepse…...

人机融合智能 | “人智交互”跨学科新领域

本文系统地提出基于“以人为中心AI(HCAI)”理念的人-人工智能交互(人智交互)这一跨学科新领域及框架,定义人智交互领域的理念、基本理论和关键问题、方法、开发流程和参与团队等,阐述提出人智交互新领域的意义。然后,提出人智交互研究的三种新范式取向以及它们的意义。最后,总结…...

Scrapy-Redis分布式爬虫架构的可扩展性与容错性增强:基于微服务与容器化的解决方案

在大数据时代&#xff0c;海量数据的采集与处理成为企业和研究机构获取信息的关键环节。Scrapy-Redis作为一种经典的分布式爬虫架构&#xff0c;在处理大规模数据抓取任务时展现出强大的能力。然而&#xff0c;随着业务规模的不断扩大和数据抓取需求的日益复杂&#xff0c;传统…...