当前位置: 首页 > news >正文

RocketMQ 5.1.0 源码详解 | Producer 发送流程

文章目录

  • 初始化DefaultMQProducer实例
  • 发送流程
    • DefaultMQProducer#send
    • DefaultMQProducerImpl#send
    • MQClientInstance#updateTopicRouteInfoFromNameServer
      • 使用特定 topic 获取路由信息
      • 使用默认 topic 获取路由信息
    • DefaultMQProducerImpl#sendDefaultImpl
    • 发送流程总结

初始化DefaultMQProducer实例

详细内容见文章
RocketMQ 5.1.0 源码详解 | Producer 启动流程
第一部分

发送流程

DefaultMQProducer#send

只需要执行以下代码即可开始消息的发送流程

try {Message msg = new Message(TOPIC, TAG, "OrderID188", "Hello world".getBytes(StandardCharsets.UTF_8));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);
} catch (Exception e) {e.printStackTrace();
}

RocketMQ 发送普通消息有同步(Sync)发送、异步(Async)发送和单向(Oneway)发送三种方式,send() 方法中只传入 message 则默认为 SYNC 模式

producersend 方法内容如下

public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {msg.setTopic(withNamespace(msg.getTopic()));return this.defaultMQProducerImpl.send(msg);
}

可以看到在发送消息时 DefaultMQProducer 也只是一个门面类,具体的实现都是由 DefaultMQProducerImpl 去做的

DefaultMQProducerImpl#send

DefaultMQProducerImplsend 方法内容如下

public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

可以看到,基本就是继续调用了几个函数以补齐缺失的参数如超时时间、发送消息的类型和回调函数(由于是同步发送因此回调函数为 null),发送消息的逻辑则主要是在 sendDefaultImpl 方法中实现的

由于此方法内容太多,因此先看看整体的流程

private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 确认生产者处于RUNNING状态this.makeSureStateOK();// 检查消息是否合法Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID = random.nextLong();long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;// 获取topic的路由信息TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());// topicPublishInfo不为空且可用if (topicPublishInfo != null && topicPublishInfo.ok()) {...}// 校验 NameServer 配置是否正确validateNameServerSetting();throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

首先检查生产者是否处于 RUNNING 状态,接着检查要发送的消息是否合法,然后会调用 tryToFindTopicPublishInfo 获取路由信息,如果获取成功则进入分支语句中的逻辑,否则校验 NameServer 配置是否正确。如果 NameServer 配置为空则抛出 No name server address 异常,否则抛出 No route info of this topic 异常

由于其他的逻辑相对容易,我们接下来先直接分析 tryToFindTopicPublishInfo 方法的内容

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {// 从本地缓存(ConcurrentMap< String/* topic */,  TopicPublishInfo>)中尝试获取,第一次肯定为空TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);if (null == topicPublishInfo || !topicPublishInfo.ok()) {this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());// 1.尝试从NameServer获取特定topic路由信息并更新本地缓存配置this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);topicPublishInfo = this.topicPublishInfoTable.get(topic);}// 如果找到可用的路由信息并返回if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {return topicPublishInfo;} else { // 2.如果未找到路由信息,则再次尝试使用默认的topic获取路由信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);topicPublishInfo = this.topicPublishInfoTable.get(topic);return topicPublishInfo;}
}

可以看到此方法首先会从本地的 topicPublishInfoTable 中寻找 topicPublishInfo,由于之前没有向 topic 发送过消息,因此第一次必然不会从本地找到

此时会首先向 topicPublishInfoTable 中添加空白 topicPublishInfo,然后再调用 mQClientFactory 对象的 updateTopicRouteInfoFromNameServer 方法来更新 topicPublishInfoTabletopicPublishInfo 的数据

又因为是向一个还不存在的 topic 发送消息,因此第一次尝试从 NameServer 获取配置信息并更新本地缓存配置失败,会进行尝试使用默认的 topic 去找路由配置信息

MQClientInstance#updateTopicRouteInfoFromNameServer

由上述章节可知此方法被调用了两次,第一次尝试从 NameServer 获取特定 topic 路由信息并更新本地缓存配置失败,第二次尝试使用默认的 topic 获取路由信息

使用特定 topic 获取路由信息

第一次尝试使用特定 topic 获取路由信息,调用方法为 updateTopicRouteInfoFromNameServer(topic)

public boolean updateTopicRouteInfoFromNameServer(final String topic) {return updateTopicRouteInfoFromNameServer(topic, false, null);
}

此方法又会调用其重载方法,即updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer),其中 isDefault 传入的值为 false

由于方法的内容太多,因此我们只看代码走过的部分

TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {// ... 
} else {// 获取指定topic的配置信息topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
}

isDefault 的值为 false,因此进入 else 分支,尝试从 NameServer 中获取特定 topic 的路由信息,其中 getTopicRouteInfoFromNameServer 方法通过 Netty 使用 RPC 调用获取 Topic 路由信息,方法内容如下

public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)throws RemotingException, MQClientException, InterruptedException {return getTopicRouteInfoFromNameServer(topic, timeoutMillis, true);
}public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();requestHeader.setTopic(topic);RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.TOPIC_NOT_EXIST: {if (allowTopicNotExist) {log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);}break;}//...}throw new MQClientException(response.getCode(), response.getRemark());
}

但是我们向一个不存在的 topic 发送消息,因此进入 case ResponseCode.TOPIC_NOT_EXIST 分支。又因为 allowTopicNotExist 传入的值为 true,所以打印警告并抛出异常,方法结束

使用默认 topic 获取路由信息

第二次获取时调用了 updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) ,其中 isDefault 传入的值为 true

TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {// 从NameServer中获取默认的topic路由信息topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),clientConfig.getMqClientApiTimeout());if (topicRouteData != null) {// 修正topic路由信息中的读写队列数,使其最大不超过默认的topic队列数for (QueueData data : topicRouteData.getQueueDatas()) {int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());data.setReadQueueNums(queueNums);data.setWriteQueueNums(queueNums);}}
}

上述代码分为两个步骤:

  1. 从 NameServer 中获取默认 topic 即 TBW102 的路由信息
  2. 修正获取到的默认 topic 路由信息

此时我们的 topicRouteData 不为空,且其 QueueData 属性也经过了修正,具体内容如下

TopicRouteData [orderTopicConf=null, queueDatas=[QueueData [brokerName=broker-a, readQueueNums=4, writeQueueNums=4, perm=6, topicSysFlag=0]], brokerDatas=[BrokerData [brokerName=broker-a, brokerAddrs={0=192.168.142.1:10911}, enableActingMaster=false]], filterServerTable={}, topicQueueMappingInfoTable=null
]

接着执行下面的代码

if (topicRouteData != null) {TopicRouteData old = this.topicRouteTable.get(topic);// 与本地缓存中的 topic 发布信息进行比较,如果有变化,则需要同步更新发送者、消费者关于该 topic 的缓存boolean changed = topicRouteData.topicRouteDataChanged(old);if (!changed) {changed = this.isNeedUpdateTopicRouteInfo(topic);} else {log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);}if (changed) { // 如果有变化,则需要同步更新发送者、消费者关于该 topic 的缓存for (BrokerData bd : topicRouteData.getBrokerDatas()) {// 更新broker地址this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());}// Update endpoint map{ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, topicRouteData);if (!mqEndPoints.isEmpty()) {topicEndPointsTable.put(topic, mqEndPoints);}}// Update Pub info{// 根据topic路由信息组装TopicPublishInfo对象TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);publishInfo.setHaveTopicRouterInfo(true);for (Entry<String, MQProducerInner> entry : this.producerTable.entrySet()) {MQProducerInner impl = entry.getValue();if (impl != null) {// 更新DefaultMQProducerImpl的topicPublishInfoTable表impl.updateTopicPublishInfo(topic, publishInfo);}}}// Update sub info 生产者实例的consumerTable为空if (!consumerTable.isEmpty()) {//...}TopicRouteData cloneTopicRouteData = new TopicRouteData(topicRouteData);log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);this.topicRouteTable.put(topic, cloneTopicRouteData);return true;}
}

很明显新获取到的和本地缓存中的 topic 路由信息相比有变化,因此 changed 为 true

接着会根据 topicRouteData 组装TopicPublishInfo 对象,并将其保存到 DefaultMQProducerImpltopicPublishInfoTable 中,key 为 topic 名称,value 为 TopicPublishInfo 对象

最后将 topicRouteData 保存在 topicRouteTable 中,方法结束

DefaultMQProducerImpl#sendDefaultImpl

现在我们已经获取到了要发送的 topic 的发布路由 topicPublishInfo,之后就开始发送了

boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
// 发送失败后重试最多的次数
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {String lastBrokerName = null == mq ? null : mq.getBrokerName();// 选择一个MessageQueue发送消息MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected != null) {//发送消息...} else {break;}
}

其中 selectOneMessageQueue 方法就是选择一个可用的 MessageQueue 发送消息

在这里插入图片描述

如上图所示,MessageQueue 有一个三元组标识唯一一个队列,即 (topic, brokerName, queueId),最上方的 MessageQueue 的三元组可能是 (TopicTest, broker-a, 0)

当我们得到了要发送的 MessageQueue 后就开始执行发送消息的步骤

mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {beginTimestampPrev = System.currentTimeMillis();if (times > 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {callTimeout = true;break;}// 向 MessageQueue 发送消息sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:// 同步调用方式(SYNC)下如果发送失败则执行失败重试策略,默认重试两次,即最多发送三次if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}
}

通过代码可以看出又调用了 sendKernelImpl 方法发送消息

private SendResult sendKernelImpl(final Message msg,final MessageQueue mq,final CommunicationMode communicationMode,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();String brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);if (null == brokerAddr) {tryToFindTopicPublishInfo(mq.getTopic());brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);}SendMessageContext context = null;if (brokerAddr != null) {// 根据配置判断是否使用VIP通道brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);byte[] prevBody = msg.getBody();try {//for MessageBatch,ID has been set in the generating process// 检查消息是否为 MessageBatch 类型if (!(msg instanceof MessageBatch)) {MessageClientIDSetter.setUniqID(msg);}boolean topicWithNamespace = false;// 检查客户端配置中是否设置了命名空间if (null != this.mQClientFactory.getClientConfig().getNamespace()) {msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());topicWithNamespace = true;}// sysFlag是消息的系统标志位,包含压缩标志位、事务标志位、批量标志位、多队列标志位等int sysFlag = 0;boolean msgBodyCompressed = false;// 尝试压缩消息体if (this.tryToCompressMessage(msg)) {sysFlag |= MessageSysFlag.COMPRESSED_FLAG;sysFlag |= compressType.getCompressionFlag();msgBodyCompressed = true;}final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);// 检查消息是否为事务消息if (Boolean.parseBoolean(tranMsg)) {sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;}// 发送消息的校验钩子if (hasCheckForbiddenHook()) {CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());checkForbiddenContext.setCommunicationMode(communicationMode);checkForbiddenContext.setBrokerAddr(brokerAddr);checkForbiddenContext.setMessage(msg);checkForbiddenContext.setMq(mq);checkForbiddenContext.setUnitMode(this.isUnitMode());this.executeCheckForbiddenHook(checkForbiddenContext);}// 发送消息前的钩子if (this.hasSendMessageHook()) {context = new SendMessageContext();context.setProducer(this);context.setProducerGroup(this.defaultMQProducer.getProducerGroup());context.setCommunicationMode(communicationMode);context.setBornHost(this.defaultMQProducer.getClientIP());context.setBrokerAddr(brokerAddr);context.setMessage(msg);context.setMq(mq);context.setNamespace(this.defaultMQProducer.getNamespace());String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (isTrans != null && isTrans.equals("true")) {context.setMsgType(MessageType.Trans_Msg_Half);}if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {context.setMsgType(MessageType.Delay_Msg);}this.executeSendMessageHookBefore(context);}// 设置发送消息的请求头SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTopic(msg.getTopic());requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());requestHeader.setQueueId(mq.getQueueId());requestHeader.setSysFlag(sysFlag);requestHeader.setBornTimestamp(System.currentTimeMillis());requestHeader.setFlag(msg.getFlag());requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));requestHeader.setReconsumeTimes(0);requestHeader.setUnitMode(this.isUnitMode());requestHeader.setBatch(msg instanceof MessageBatch);requestHeader.setBname(brokerName);// 如果是重发消息,则设置重发消息的次数if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {// 重发消息的次数String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);if (reconsumeTimes != null) {// 设置重发消息的次数requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));// 清除消息的重发次数属性,因为消息的重发次数属性是在消息重发时设置的MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);}// 消息的最大重发次数String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);if (maxReconsumeTimes != null) {// 设置消息的最大重发次数requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));// 清除消息的最大重发次数属性,因为消息的最大重发次数属性是在消息重发时设置的MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);}}SendResult sendResult = null;switch (communicationMode) {case ASYNC:Message tmpMessage = msg;boolean messageCloned = false;if (msgBodyCompressed) {//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;// 防止压缩后的消息体重发时被再次压缩msg.setBody(prevBody);}if (topicWithNamespace) {if (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;}// 防止设置了命名空间的topic重发时被再次设置命名空间msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,brokerName,tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);break;case ONEWAY:case SYNC:long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,brokerName,msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:assert false;break;}// 发送消息后的钩子if (this.hasSendMessageHook()) {context.setSendResult(sendResult);this.executeSendMessageHookAfter(context);}return sendResult;} catch (RemotingException | InterruptedException | MQBrokerException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} finally {msg.setBody(prevBody);msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}}throw new MQClientException("The broker[" + brokerName + "] not exist", null);
}

这段代码虽然比较长,但是结合注释还是挺容易理解的。不过其中在异步 (ASYNC) 发送消息时有下面一段代码可能会让人疑惑

Message tmpMessage = msg;
boolean messageCloned = false;
if (msgBodyCompressed) {//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;// 防止压缩后的消息体重发时被再次压缩msg.setBody(prevBody);
}if (topicWithNamespace) {if (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;}// 防止设置了命名空间的topic重发时被再次设置命名空间msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}

这段代码主要是克隆了一个和 msg 内容一样的 tmpMessage 并发送,而 msg 本身的 body 被设置成了压缩之前的 body,topic 也被设置成了添加命名空间之前的 topic

发送流程总结

  1. 检查消息是否合法
  2. 获取 topic 路由信息
    1. 先尝试从本地获取路由信息,没有则向 NameServer 获取
      1. 向 NameServer 获取路由信息并更新本地缓存,没有则抛出异常并返回
      2. 从本地获取路由信息
    2. 如果本地扔获取不到路由信息则获取默认路由信息
      1. 向 NameServer 获取默认路由信息,如果获取不到则抛出异常并返回
      2. 修改获取到的默认路由信息为新的 topic 的路由信息
      3. 更新本地路由信息缓存
  3. 获取路由信息成功;失败则跳转到第4步
    1. 选择一个 MessageQueue
    2. MessageQueue 发送消息
      1. 根据配置判断是否使用 VIP 通道
      2. 检查消息是否为 MessageBatch 类型
      3. 检查客户端配置中是否设置了命名空间
      4. 设置消息的标志位 sysFlag
        1. 尝试压缩消息体并更新 sysFlag
        2. 检查消息是否为事务消息并更新 sysFlag
      5. 调用钩子函数
      6. 设置消息请求头
      7. 根据发送消息的方式发送消息
  4. 获取路由信息失败
    1. 校验 NameServer 配置是否正确
    2. 抛出异常结束

相关文章:

RocketMQ 5.1.0 源码详解 | Producer 发送流程

文章目录 初始化DefaultMQProducer实例发送流程DefaultMQProducer#sendDefaultMQProducerImpl#sendMQClientInstance#updateTopicRouteInfoFromNameServer使用特定 topic 获取路由信息使用默认 topic 获取路由信息 DefaultMQProducerImpl#sendDefaultImpl发送流程总结 初始化De…...

电脑ip地址怎么改 ip地址怎么改到别的城市

一、ip地址怎么改到别的城市 1.ip地址怎么改到别的城市&#xff0c;1、重启WIFI路由设备 一般手机或电脑在家或公司上网时都是接入到路由器的WIFI网络,再由路由器分配上网IP地址,如果要更换上网IP那么重启路由器设备后,路由器会向网络运营商进行宽带的重新拨号,此时手机或电脑设…...

Android Studio实现列表展示图片

效果&#xff1a; MainActivity 类 package com.example.tabulation;import android.content.Intent; import android.os.Bundle; import android.view.View;import androidx.appcompat.app.AppCompatActivity; import androidx.recyclerview.widget.LinearLayoutManager; im…...

每天一道leetcode:300. 最长递增子序列(动态规划中等)

今日份题目&#xff1a; 给你一个整数数组 nums &#xff0c;找到其中最长严格递增子序列的长度。 子序列 是由数组派生而来的序列&#xff0c;删除&#xff08;或不删除&#xff09;数组中的元素而不改变其余元素的顺序。例如&#xff0c;[3,6,2,7] 是数组 [0,3,1,6,2,2,7] …...

【无监督】2、MAE | 自监督模型提取的图像特征也很能打!(CVPR2022 Oral)

文章目录 一、背景二、方法三、效果 论文&#xff1a;Masked Autoencoders Are Scalable Vision Learners 代码&#xff1a;https://github.com/facebookresearch/mae 出处&#xff1a;CVPR2022 Oral | 何凯明 | FAIR 一、背景 本文的标题突出了两个词&#xff1a; masked…...

pytorch单机多卡后台运行

nohup sh ./train_chat.sh > train_chat20230814.log 2>1&参考资料 Pytorch单机多卡后台运行的解决办法...

linux配置上网 linux adsl拨号上网设置

Linux里面配置ADSL上网是件很麻烦的事。但配置完成之后就能开机自动拨号上网&#xff0c;可谓十分的方便。支持的系统有Redhat,CentOS,SuSE,FreeBSD,Ubuntu等常见的Linux。 工具/原料 ADSL网络&#xff0c;电信&#xff0c;网通&#xff0c;移动等常见宽带。 Linux系统的安装光…...

XML学习基础知识归纳(一)

一、XML基本概述 &#xff08;1&#xff09;概念&#xff1a;XML是可扩展的标记语言&#xff0c;xml文档的后缀名为 .xml &#xff08;2&#xff09;作用&#xff1a;用来用来传输和存储数据&#xff0c;不用于表现和展示数据&#xff0c;这点呢相比于HTML来说是不同的&#…...

2023.8.14论文阅读

文章目录 ESPNet: Efficient Spatial Pyramid of Dilated Convolutions for Semantic Segmentation摘要本文方法实验结果 DeepFusion: Lidar-Camera Deep Fusion for Multi-Modal 3D Object Detection摘要本文方法实验结果 ESPNet: Efficient Spatial Pyramid of Dilated Convo…...

FL Studio for Windows-21.1.0.3713中文直装版功能介绍及系统配置要求

FL Studio 21简称FL水果软件,全称是&#xff1a;Fruity Loops Studio编曲&#xff0c;由于其Logo长的比较像一款水果因此&#xff0c;在大家更多的是喜欢称他为水果萝卜&#xff0c;FL studio21是目前最新的版本&#xff0c;这是一款可以让你的计算机就像是一个全功能的录音室&…...

基于网格变形的二维图像变形算法:C++实现与应用

在计算机图形学中&#xff0c;图像变形是一种常见的技术&#xff0c;它可以改变图像的形状和结构&#xff0c;以满足特定的视觉效果或者应用需求。本文将介绍一种基于网格变形的二维图像变形算法&#xff0c;并使用C进行实现。 一、算法原理 网格变形是一种基于网格的图像变形…...

【数据结构】八大排序详解

&#x1f680; 作者简介&#xff1a;一名在后端领域学习&#xff0c;并渴望能够学有所成的追梦人。 &#x1f40c; 个人主页&#xff1a;蜗牛牛啊 &#x1f525; 系列专栏&#xff1a;&#x1f6f9;数据结构、&#x1f6f4;C &#x1f4d5; 学习格言&#xff1a;博观而约取&…...

VSCode如何设置高亮

一、概述 本文主要介绍在 VSCode 看代码时&#xff0c;怎样使某个单词高亮显示&#xff0c;主要通过以下三步实现&#xff1a; 安装 highlight-words 插件 配置 highlight-words 插件 设置高亮快捷键F8 工作是嵌入式开发的&#xff0c;代码主要是C/C的&#xff0c;之前一直用…...

密钥大全ubuntu

VMware Workstation Tech Preview 20H2 GG1JR-APD1P-0857Q-DQQN9-PU2CA VMware Workstation v16 Pro for Windows&#xff08;反馈失效&#xff09; ZF3R0-FHED2-M80TY-8QYGC-NPKYF YF390-0HF8P-M81RQ-2DXQE-M2UT6 ZF71R-DMX85-08DQY-8YMNC-PPHV8 VMware Workstation v15 f…...

Spring Task入门案例

Spring Task 是Spring框架提供的任务调度工具&#xff0c;可以按照约定的时间自动执行某个代码逻辑。 定位&#xff1a;定时任务框架 作用&#xff1a;定时自动执行某段Java代码 强调&#xff1a;只要是需要定时处理的场景都可以使用Spring Task 1. cron表达式 cron表达式…...

针对Android项目蓝牙如何学习

一、概述(Overview) 蓝牙是一种专有的开放式无线技术标准,用于在固定和移动设备之间进行短距离数据交换(使用2400–2480 MHz ISM波段的短波长无线电传输),从而创建具有高度安全性的个人局域网(PANs)。由电信供应商爱立信(telecoms vendor Ericsson)于1994年创建,[1…...

C++学习笔记总结练习:内存分配器编程实现

内存分配器练习 C内存分配器是用于管理程序运行时内存的工具。它负责分配和释放内存&#xff0c;以满足程序在运行过程中的动态内存需求。在C中&#xff0c;有几种内存分配器可供选择&#xff0c;包括操作系统提供的默认分配器、自定义分配器和第三方库提供的分配器。 默认分配…...

【uniapp】使用Vs Code开发uniapp:

文章目录 一、使用命令行创建uniapp项目&#xff1a;二、安装插件与配置&#xff1a;三、编译和运行:四、修改pinia&#xff1a; 一、使用命令行创建uniapp项目&#xff1a; 二、安装插件与配置&#xff1a; 三、编译和运行: 该项目下的dist》dev》mp-weixin文件导入微信开发者…...

【STM32】高效开发工具CubeMonitor快速上手

工欲善其事必先利其器。拥有一个辅助测试工具&#xff0c;能极大提高开发项目的效率。STM32CubeMonitor系列工具能够实时读取和呈现其变量&#xff0c;从而在运行时帮助微调和诊断STM32应用&#xff0c;类似于一个简单的示波器。它是一款基于流程的图形化编程工具&#xff0c;类…...

React 使用 i18n 翻译换行解决方法

当前问题&#xff1a; json 配置文件 "detail": {"10001": "Top 10 \nBIGGEST WINS" } 按以上方式文本在渲染的时候并不能识别我们加入 \n 要实现换行的意图&#xff0c;通过拆分成两个多语来实现又太低级。 解决方法&#xff1a; 在该多语…...

装饰模式(Decorator Pattern)重构java邮件发奖系统实战

前言 现在我们有个如下的需求&#xff0c;设计一个邮件发奖的小系统&#xff0c; 需求 1.数据验证 → 2. 敏感信息加密 → 3. 日志记录 → 4. 实际发送邮件 装饰器模式&#xff08;Decorator Pattern&#xff09;允许向一个现有的对象添加新的功能&#xff0c;同时又不改变其…...

超短脉冲激光自聚焦效应

前言与目录 强激光引起自聚焦效应机理 超短脉冲激光在脆性材料内部加工时引起的自聚焦效应&#xff0c;这是一种非线性光学现象&#xff0c;主要涉及光学克尔效应和材料的非线性光学特性。 自聚焦效应可以产生局部的强光场&#xff0c;对材料产生非线性响应&#xff0c;可能…...

基于距离变化能量开销动态调整的WSN低功耗拓扑控制开销算法matlab仿真

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.算法仿真参数 5.算法理论概述 6.参考文献 7.完整程序 1.程序功能描述 通过动态调整节点通信的能量开销&#xff0c;平衡网络负载&#xff0c;延长WSN生命周期。具体通过建立基于距离的能量消耗模型&am…...

【入坑系列】TiDB 强制索引在不同库下不生效问题

文章目录 背景SQL 优化情况线上SQL运行情况分析怀疑1:执行计划绑定问题?尝试:SHOW WARNINGS 查看警告探索 TiDB 的 USE_INDEX 写法Hint 不生效问题排查解决参考背景 项目中使用 TiDB 数据库,并对 SQL 进行优化了,添加了强制索引。 UAT 环境已经生效,但 PROD 环境强制索…...

基于Flask实现的医疗保险欺诈识别监测模型

基于Flask实现的医疗保险欺诈识别监测模型 项目截图 项目简介 社会医疗保险是国家通过立法形式强制实施&#xff0c;由雇主和个人按一定比例缴纳保险费&#xff0c;建立社会医疗保险基金&#xff0c;支付雇员医疗费用的一种医疗保险制度&#xff0c; 它是促进社会文明和进步的…...

LeetCode - 199. 二叉树的右视图

题目 199. 二叉树的右视图 - 力扣&#xff08;LeetCode&#xff09; 思路 右视图是指从树的右侧看&#xff0c;对于每一层&#xff0c;只能看到该层最右边的节点。实现思路是&#xff1a; 使用深度优先搜索(DFS)按照"根-右-左"的顺序遍历树记录每个节点的深度对于…...

#Uniapp篇:chrome调试unapp适配

chrome调试设备----使用Android模拟机开发调试移动端页面 Chrome://inspect/#devices MuMu模拟器Edge浏览器&#xff1a;Android原生APP嵌入的H5页面元素定位 chrome://inspect/#devices uniapp单位适配 根路径下 postcss.config.js 需要装这些插件 “postcss”: “^8.5.…...

网站指纹识别

网站指纹识别 网站的最基本组成&#xff1a;服务器&#xff08;操作系统&#xff09;、中间件&#xff08;web容器&#xff09;、脚本语言、数据厍 为什么要了解这些&#xff1f;举个例子&#xff1a;发现了一个文件读取漏洞&#xff0c;我们需要读/etc/passwd&#xff0c;如…...

Go 并发编程基础:通道(Channel)的使用

在 Go 中&#xff0c;Channel 是 Goroutine 之间通信的核心机制。它提供了一个线程安全的通信方式&#xff0c;用于在多个 Goroutine 之间传递数据&#xff0c;从而实现高效的并发编程。 本章将介绍 Channel 的基本概念、用法、缓冲、关闭机制以及 select 的使用。 一、Channel…...

GO协程(Goroutine)问题总结

在使用Go语言来编写代码时&#xff0c;遇到的一些问题总结一下 [参考文档]&#xff1a;https://www.topgoer.com/%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B/goroutine.html 1. main()函数默认的Goroutine 场景再现&#xff1a; 今天在看到这个教程的时候&#xff0c;在自己的电…...