当前位置: 首页 > news >正文

RocketMQ 5.1.0 源码详解 | Producer 发送流程

文章目录

  • 初始化DefaultMQProducer实例
  • 发送流程
    • DefaultMQProducer#send
    • DefaultMQProducerImpl#send
    • MQClientInstance#updateTopicRouteInfoFromNameServer
      • 使用特定 topic 获取路由信息
      • 使用默认 topic 获取路由信息
    • DefaultMQProducerImpl#sendDefaultImpl
    • 发送流程总结

初始化DefaultMQProducer实例

详细内容见文章
RocketMQ 5.1.0 源码详解 | Producer 启动流程
第一部分

发送流程

DefaultMQProducer#send

只需要执行以下代码即可开始消息的发送流程

try {Message msg = new Message(TOPIC, TAG, "OrderID188", "Hello world".getBytes(StandardCharsets.UTF_8));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);
} catch (Exception e) {e.printStackTrace();
}

RocketMQ 发送普通消息有同步(Sync)发送、异步(Async)发送和单向(Oneway)发送三种方式,send() 方法中只传入 message 则默认为 SYNC 模式

producersend 方法内容如下

public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {msg.setTopic(withNamespace(msg.getTopic()));return this.defaultMQProducerImpl.send(msg);
}

可以看到在发送消息时 DefaultMQProducer 也只是一个门面类,具体的实现都是由 DefaultMQProducerImpl 去做的

DefaultMQProducerImpl#send

DefaultMQProducerImplsend 方法内容如下

public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

可以看到,基本就是继续调用了几个函数以补齐缺失的参数如超时时间、发送消息的类型和回调函数(由于是同步发送因此回调函数为 null),发送消息的逻辑则主要是在 sendDefaultImpl 方法中实现的

由于此方法内容太多,因此先看看整体的流程

private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 确认生产者处于RUNNING状态this.makeSureStateOK();// 检查消息是否合法Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID = random.nextLong();long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;// 获取topic的路由信息TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());// topicPublishInfo不为空且可用if (topicPublishInfo != null && topicPublishInfo.ok()) {...}// 校验 NameServer 配置是否正确validateNameServerSetting();throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

首先检查生产者是否处于 RUNNING 状态,接着检查要发送的消息是否合法,然后会调用 tryToFindTopicPublishInfo 获取路由信息,如果获取成功则进入分支语句中的逻辑,否则校验 NameServer 配置是否正确。如果 NameServer 配置为空则抛出 No name server address 异常,否则抛出 No route info of this topic 异常

由于其他的逻辑相对容易,我们接下来先直接分析 tryToFindTopicPublishInfo 方法的内容

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {// 从本地缓存(ConcurrentMap< String/* topic */,  TopicPublishInfo>)中尝试获取,第一次肯定为空TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);if (null == topicPublishInfo || !topicPublishInfo.ok()) {this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());// 1.尝试从NameServer获取特定topic路由信息并更新本地缓存配置this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);topicPublishInfo = this.topicPublishInfoTable.get(topic);}// 如果找到可用的路由信息并返回if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {return topicPublishInfo;} else { // 2.如果未找到路由信息,则再次尝试使用默认的topic获取路由信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);topicPublishInfo = this.topicPublishInfoTable.get(topic);return topicPublishInfo;}
}

可以看到此方法首先会从本地的 topicPublishInfoTable 中寻找 topicPublishInfo,由于之前没有向 topic 发送过消息,因此第一次必然不会从本地找到

此时会首先向 topicPublishInfoTable 中添加空白 topicPublishInfo,然后再调用 mQClientFactory 对象的 updateTopicRouteInfoFromNameServer 方法来更新 topicPublishInfoTabletopicPublishInfo 的数据

又因为是向一个还不存在的 topic 发送消息,因此第一次尝试从 NameServer 获取配置信息并更新本地缓存配置失败,会进行尝试使用默认的 topic 去找路由配置信息

MQClientInstance#updateTopicRouteInfoFromNameServer

由上述章节可知此方法被调用了两次,第一次尝试从 NameServer 获取特定 topic 路由信息并更新本地缓存配置失败,第二次尝试使用默认的 topic 获取路由信息

使用特定 topic 获取路由信息

第一次尝试使用特定 topic 获取路由信息,调用方法为 updateTopicRouteInfoFromNameServer(topic)

public boolean updateTopicRouteInfoFromNameServer(final String topic) {return updateTopicRouteInfoFromNameServer(topic, false, null);
}

此方法又会调用其重载方法,即updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer),其中 isDefault 传入的值为 false

由于方法的内容太多,因此我们只看代码走过的部分

TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {// ... 
} else {// 获取指定topic的配置信息topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
}

isDefault 的值为 false,因此进入 else 分支,尝试从 NameServer 中获取特定 topic 的路由信息,其中 getTopicRouteInfoFromNameServer 方法通过 Netty 使用 RPC 调用获取 Topic 路由信息,方法内容如下

public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)throws RemotingException, MQClientException, InterruptedException {return getTopicRouteInfoFromNameServer(topic, timeoutMillis, true);
}public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();requestHeader.setTopic(topic);RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.TOPIC_NOT_EXIST: {if (allowTopicNotExist) {log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);}break;}//...}throw new MQClientException(response.getCode(), response.getRemark());
}

但是我们向一个不存在的 topic 发送消息,因此进入 case ResponseCode.TOPIC_NOT_EXIST 分支。又因为 allowTopicNotExist 传入的值为 true,所以打印警告并抛出异常,方法结束

使用默认 topic 获取路由信息

第二次获取时调用了 updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) ,其中 isDefault 传入的值为 true

TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {// 从NameServer中获取默认的topic路由信息topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),clientConfig.getMqClientApiTimeout());if (topicRouteData != null) {// 修正topic路由信息中的读写队列数,使其最大不超过默认的topic队列数for (QueueData data : topicRouteData.getQueueDatas()) {int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());data.setReadQueueNums(queueNums);data.setWriteQueueNums(queueNums);}}
}

上述代码分为两个步骤:

  1. 从 NameServer 中获取默认 topic 即 TBW102 的路由信息
  2. 修正获取到的默认 topic 路由信息

此时我们的 topicRouteData 不为空,且其 QueueData 属性也经过了修正,具体内容如下

TopicRouteData [orderTopicConf=null, queueDatas=[QueueData [brokerName=broker-a, readQueueNums=4, writeQueueNums=4, perm=6, topicSysFlag=0]], brokerDatas=[BrokerData [brokerName=broker-a, brokerAddrs={0=192.168.142.1:10911}, enableActingMaster=false]], filterServerTable={}, topicQueueMappingInfoTable=null
]

接着执行下面的代码

if (topicRouteData != null) {TopicRouteData old = this.topicRouteTable.get(topic);// 与本地缓存中的 topic 发布信息进行比较,如果有变化,则需要同步更新发送者、消费者关于该 topic 的缓存boolean changed = topicRouteData.topicRouteDataChanged(old);if (!changed) {changed = this.isNeedUpdateTopicRouteInfo(topic);} else {log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);}if (changed) { // 如果有变化,则需要同步更新发送者、消费者关于该 topic 的缓存for (BrokerData bd : topicRouteData.getBrokerDatas()) {// 更新broker地址this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());}// Update endpoint map{ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, topicRouteData);if (!mqEndPoints.isEmpty()) {topicEndPointsTable.put(topic, mqEndPoints);}}// Update Pub info{// 根据topic路由信息组装TopicPublishInfo对象TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);publishInfo.setHaveTopicRouterInfo(true);for (Entry<String, MQProducerInner> entry : this.producerTable.entrySet()) {MQProducerInner impl = entry.getValue();if (impl != null) {// 更新DefaultMQProducerImpl的topicPublishInfoTable表impl.updateTopicPublishInfo(topic, publishInfo);}}}// Update sub info 生产者实例的consumerTable为空if (!consumerTable.isEmpty()) {//...}TopicRouteData cloneTopicRouteData = new TopicRouteData(topicRouteData);log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);this.topicRouteTable.put(topic, cloneTopicRouteData);return true;}
}

很明显新获取到的和本地缓存中的 topic 路由信息相比有变化,因此 changed 为 true

接着会根据 topicRouteData 组装TopicPublishInfo 对象,并将其保存到 DefaultMQProducerImpltopicPublishInfoTable 中,key 为 topic 名称,value 为 TopicPublishInfo 对象

最后将 topicRouteData 保存在 topicRouteTable 中,方法结束

DefaultMQProducerImpl#sendDefaultImpl

现在我们已经获取到了要发送的 topic 的发布路由 topicPublishInfo,之后就开始发送了

boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
// 发送失败后重试最多的次数
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {String lastBrokerName = null == mq ? null : mq.getBrokerName();// 选择一个MessageQueue发送消息MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected != null) {//发送消息...} else {break;}
}

其中 selectOneMessageQueue 方法就是选择一个可用的 MessageQueue 发送消息

在这里插入图片描述

如上图所示,MessageQueue 有一个三元组标识唯一一个队列,即 (topic, brokerName, queueId),最上方的 MessageQueue 的三元组可能是 (TopicTest, broker-a, 0)

当我们得到了要发送的 MessageQueue 后就开始执行发送消息的步骤

mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {beginTimestampPrev = System.currentTimeMillis();if (times > 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {callTimeout = true;break;}// 向 MessageQueue 发送消息sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:// 同步调用方式(SYNC)下如果发送失败则执行失败重试策略,默认重试两次,即最多发送三次if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}
}

通过代码可以看出又调用了 sendKernelImpl 方法发送消息

private SendResult sendKernelImpl(final Message msg,final MessageQueue mq,final CommunicationMode communicationMode,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();String brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);if (null == brokerAddr) {tryToFindTopicPublishInfo(mq.getTopic());brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);}SendMessageContext context = null;if (brokerAddr != null) {// 根据配置判断是否使用VIP通道brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);byte[] prevBody = msg.getBody();try {//for MessageBatch,ID has been set in the generating process// 检查消息是否为 MessageBatch 类型if (!(msg instanceof MessageBatch)) {MessageClientIDSetter.setUniqID(msg);}boolean topicWithNamespace = false;// 检查客户端配置中是否设置了命名空间if (null != this.mQClientFactory.getClientConfig().getNamespace()) {msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());topicWithNamespace = true;}// sysFlag是消息的系统标志位,包含压缩标志位、事务标志位、批量标志位、多队列标志位等int sysFlag = 0;boolean msgBodyCompressed = false;// 尝试压缩消息体if (this.tryToCompressMessage(msg)) {sysFlag |= MessageSysFlag.COMPRESSED_FLAG;sysFlag |= compressType.getCompressionFlag();msgBodyCompressed = true;}final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);// 检查消息是否为事务消息if (Boolean.parseBoolean(tranMsg)) {sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;}// 发送消息的校验钩子if (hasCheckForbiddenHook()) {CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());checkForbiddenContext.setCommunicationMode(communicationMode);checkForbiddenContext.setBrokerAddr(brokerAddr);checkForbiddenContext.setMessage(msg);checkForbiddenContext.setMq(mq);checkForbiddenContext.setUnitMode(this.isUnitMode());this.executeCheckForbiddenHook(checkForbiddenContext);}// 发送消息前的钩子if (this.hasSendMessageHook()) {context = new SendMessageContext();context.setProducer(this);context.setProducerGroup(this.defaultMQProducer.getProducerGroup());context.setCommunicationMode(communicationMode);context.setBornHost(this.defaultMQProducer.getClientIP());context.setBrokerAddr(brokerAddr);context.setMessage(msg);context.setMq(mq);context.setNamespace(this.defaultMQProducer.getNamespace());String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (isTrans != null && isTrans.equals("true")) {context.setMsgType(MessageType.Trans_Msg_Half);}if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {context.setMsgType(MessageType.Delay_Msg);}this.executeSendMessageHookBefore(context);}// 设置发送消息的请求头SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTopic(msg.getTopic());requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());requestHeader.setQueueId(mq.getQueueId());requestHeader.setSysFlag(sysFlag);requestHeader.setBornTimestamp(System.currentTimeMillis());requestHeader.setFlag(msg.getFlag());requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));requestHeader.setReconsumeTimes(0);requestHeader.setUnitMode(this.isUnitMode());requestHeader.setBatch(msg instanceof MessageBatch);requestHeader.setBname(brokerName);// 如果是重发消息,则设置重发消息的次数if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {// 重发消息的次数String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);if (reconsumeTimes != null) {// 设置重发消息的次数requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));// 清除消息的重发次数属性,因为消息的重发次数属性是在消息重发时设置的MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);}// 消息的最大重发次数String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);if (maxReconsumeTimes != null) {// 设置消息的最大重发次数requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));// 清除消息的最大重发次数属性,因为消息的最大重发次数属性是在消息重发时设置的MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);}}SendResult sendResult = null;switch (communicationMode) {case ASYNC:Message tmpMessage = msg;boolean messageCloned = false;if (msgBodyCompressed) {//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;// 防止压缩后的消息体重发时被再次压缩msg.setBody(prevBody);}if (topicWithNamespace) {if (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;}// 防止设置了命名空间的topic重发时被再次设置命名空间msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,brokerName,tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);break;case ONEWAY:case SYNC:long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,brokerName,msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:assert false;break;}// 发送消息后的钩子if (this.hasSendMessageHook()) {context.setSendResult(sendResult);this.executeSendMessageHookAfter(context);}return sendResult;} catch (RemotingException | InterruptedException | MQBrokerException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} finally {msg.setBody(prevBody);msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}}throw new MQClientException("The broker[" + brokerName + "] not exist", null);
}

这段代码虽然比较长,但是结合注释还是挺容易理解的。不过其中在异步 (ASYNC) 发送消息时有下面一段代码可能会让人疑惑

Message tmpMessage = msg;
boolean messageCloned = false;
if (msgBodyCompressed) {//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;// 防止压缩后的消息体重发时被再次压缩msg.setBody(prevBody);
}if (topicWithNamespace) {if (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;}// 防止设置了命名空间的topic重发时被再次设置命名空间msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}

这段代码主要是克隆了一个和 msg 内容一样的 tmpMessage 并发送,而 msg 本身的 body 被设置成了压缩之前的 body,topic 也被设置成了添加命名空间之前的 topic

发送流程总结

  1. 检查消息是否合法
  2. 获取 topic 路由信息
    1. 先尝试从本地获取路由信息,没有则向 NameServer 获取
      1. 向 NameServer 获取路由信息并更新本地缓存,没有则抛出异常并返回
      2. 从本地获取路由信息
    2. 如果本地扔获取不到路由信息则获取默认路由信息
      1. 向 NameServer 获取默认路由信息,如果获取不到则抛出异常并返回
      2. 修改获取到的默认路由信息为新的 topic 的路由信息
      3. 更新本地路由信息缓存
  3. 获取路由信息成功;失败则跳转到第4步
    1. 选择一个 MessageQueue
    2. MessageQueue 发送消息
      1. 根据配置判断是否使用 VIP 通道
      2. 检查消息是否为 MessageBatch 类型
      3. 检查客户端配置中是否设置了命名空间
      4. 设置消息的标志位 sysFlag
        1. 尝试压缩消息体并更新 sysFlag
        2. 检查消息是否为事务消息并更新 sysFlag
      5. 调用钩子函数
      6. 设置消息请求头
      7. 根据发送消息的方式发送消息
  4. 获取路由信息失败
    1. 校验 NameServer 配置是否正确
    2. 抛出异常结束

相关文章:

RocketMQ 5.1.0 源码详解 | Producer 发送流程

文章目录 初始化DefaultMQProducer实例发送流程DefaultMQProducer#sendDefaultMQProducerImpl#sendMQClientInstance#updateTopicRouteInfoFromNameServer使用特定 topic 获取路由信息使用默认 topic 获取路由信息 DefaultMQProducerImpl#sendDefaultImpl发送流程总结 初始化De…...

电脑ip地址怎么改 ip地址怎么改到别的城市

一、ip地址怎么改到别的城市 1.ip地址怎么改到别的城市&#xff0c;1、重启WIFI路由设备 一般手机或电脑在家或公司上网时都是接入到路由器的WIFI网络,再由路由器分配上网IP地址,如果要更换上网IP那么重启路由器设备后,路由器会向网络运营商进行宽带的重新拨号,此时手机或电脑设…...

Android Studio实现列表展示图片

效果&#xff1a; MainActivity 类 package com.example.tabulation;import android.content.Intent; import android.os.Bundle; import android.view.View;import androidx.appcompat.app.AppCompatActivity; import androidx.recyclerview.widget.LinearLayoutManager; im…...

每天一道leetcode:300. 最长递增子序列(动态规划中等)

今日份题目&#xff1a; 给你一个整数数组 nums &#xff0c;找到其中最长严格递增子序列的长度。 子序列 是由数组派生而来的序列&#xff0c;删除&#xff08;或不删除&#xff09;数组中的元素而不改变其余元素的顺序。例如&#xff0c;[3,6,2,7] 是数组 [0,3,1,6,2,2,7] …...

【无监督】2、MAE | 自监督模型提取的图像特征也很能打!(CVPR2022 Oral)

文章目录 一、背景二、方法三、效果 论文&#xff1a;Masked Autoencoders Are Scalable Vision Learners 代码&#xff1a;https://github.com/facebookresearch/mae 出处&#xff1a;CVPR2022 Oral | 何凯明 | FAIR 一、背景 本文的标题突出了两个词&#xff1a; masked…...

pytorch单机多卡后台运行

nohup sh ./train_chat.sh > train_chat20230814.log 2>1&参考资料 Pytorch单机多卡后台运行的解决办法...

linux配置上网 linux adsl拨号上网设置

Linux里面配置ADSL上网是件很麻烦的事。但配置完成之后就能开机自动拨号上网&#xff0c;可谓十分的方便。支持的系统有Redhat,CentOS,SuSE,FreeBSD,Ubuntu等常见的Linux。 工具/原料 ADSL网络&#xff0c;电信&#xff0c;网通&#xff0c;移动等常见宽带。 Linux系统的安装光…...

XML学习基础知识归纳(一)

一、XML基本概述 &#xff08;1&#xff09;概念&#xff1a;XML是可扩展的标记语言&#xff0c;xml文档的后缀名为 .xml &#xff08;2&#xff09;作用&#xff1a;用来用来传输和存储数据&#xff0c;不用于表现和展示数据&#xff0c;这点呢相比于HTML来说是不同的&#…...

2023.8.14论文阅读

文章目录 ESPNet: Efficient Spatial Pyramid of Dilated Convolutions for Semantic Segmentation摘要本文方法实验结果 DeepFusion: Lidar-Camera Deep Fusion for Multi-Modal 3D Object Detection摘要本文方法实验结果 ESPNet: Efficient Spatial Pyramid of Dilated Convo…...

FL Studio for Windows-21.1.0.3713中文直装版功能介绍及系统配置要求

FL Studio 21简称FL水果软件,全称是&#xff1a;Fruity Loops Studio编曲&#xff0c;由于其Logo长的比较像一款水果因此&#xff0c;在大家更多的是喜欢称他为水果萝卜&#xff0c;FL studio21是目前最新的版本&#xff0c;这是一款可以让你的计算机就像是一个全功能的录音室&…...

基于网格变形的二维图像变形算法:C++实现与应用

在计算机图形学中&#xff0c;图像变形是一种常见的技术&#xff0c;它可以改变图像的形状和结构&#xff0c;以满足特定的视觉效果或者应用需求。本文将介绍一种基于网格变形的二维图像变形算法&#xff0c;并使用C进行实现。 一、算法原理 网格变形是一种基于网格的图像变形…...

【数据结构】八大排序详解

&#x1f680; 作者简介&#xff1a;一名在后端领域学习&#xff0c;并渴望能够学有所成的追梦人。 &#x1f40c; 个人主页&#xff1a;蜗牛牛啊 &#x1f525; 系列专栏&#xff1a;&#x1f6f9;数据结构、&#x1f6f4;C &#x1f4d5; 学习格言&#xff1a;博观而约取&…...

VSCode如何设置高亮

一、概述 本文主要介绍在 VSCode 看代码时&#xff0c;怎样使某个单词高亮显示&#xff0c;主要通过以下三步实现&#xff1a; 安装 highlight-words 插件 配置 highlight-words 插件 设置高亮快捷键F8 工作是嵌入式开发的&#xff0c;代码主要是C/C的&#xff0c;之前一直用…...

密钥大全ubuntu

VMware Workstation Tech Preview 20H2 GG1JR-APD1P-0857Q-DQQN9-PU2CA VMware Workstation v16 Pro for Windows&#xff08;反馈失效&#xff09; ZF3R0-FHED2-M80TY-8QYGC-NPKYF YF390-0HF8P-M81RQ-2DXQE-M2UT6 ZF71R-DMX85-08DQY-8YMNC-PPHV8 VMware Workstation v15 f…...

Spring Task入门案例

Spring Task 是Spring框架提供的任务调度工具&#xff0c;可以按照约定的时间自动执行某个代码逻辑。 定位&#xff1a;定时任务框架 作用&#xff1a;定时自动执行某段Java代码 强调&#xff1a;只要是需要定时处理的场景都可以使用Spring Task 1. cron表达式 cron表达式…...

针对Android项目蓝牙如何学习

一、概述(Overview) 蓝牙是一种专有的开放式无线技术标准,用于在固定和移动设备之间进行短距离数据交换(使用2400–2480 MHz ISM波段的短波长无线电传输),从而创建具有高度安全性的个人局域网(PANs)。由电信供应商爱立信(telecoms vendor Ericsson)于1994年创建,[1…...

C++学习笔记总结练习:内存分配器编程实现

内存分配器练习 C内存分配器是用于管理程序运行时内存的工具。它负责分配和释放内存&#xff0c;以满足程序在运行过程中的动态内存需求。在C中&#xff0c;有几种内存分配器可供选择&#xff0c;包括操作系统提供的默认分配器、自定义分配器和第三方库提供的分配器。 默认分配…...

【uniapp】使用Vs Code开发uniapp:

文章目录 一、使用命令行创建uniapp项目&#xff1a;二、安装插件与配置&#xff1a;三、编译和运行:四、修改pinia&#xff1a; 一、使用命令行创建uniapp项目&#xff1a; 二、安装插件与配置&#xff1a; 三、编译和运行: 该项目下的dist》dev》mp-weixin文件导入微信开发者…...

【STM32】高效开发工具CubeMonitor快速上手

工欲善其事必先利其器。拥有一个辅助测试工具&#xff0c;能极大提高开发项目的效率。STM32CubeMonitor系列工具能够实时读取和呈现其变量&#xff0c;从而在运行时帮助微调和诊断STM32应用&#xff0c;类似于一个简单的示波器。它是一款基于流程的图形化编程工具&#xff0c;类…...

React 使用 i18n 翻译换行解决方法

当前问题&#xff1a; json 配置文件 "detail": {"10001": "Top 10 \nBIGGEST WINS" } 按以上方式文本在渲染的时候并不能识别我们加入 \n 要实现换行的意图&#xff0c;通过拆分成两个多语来实现又太低级。 解决方法&#xff1a; 在该多语…...

Android Wi-Fi 连接失败日志分析

1. Android wifi 关键日志总结 (1) Wi-Fi 断开 (CTRL-EVENT-DISCONNECTED reason3) 日志相关部分&#xff1a; 06-05 10:48:40.987 943 943 I wpa_supplicant: wlan0: CTRL-EVENT-DISCONNECTED bssid44:9b:c1:57:a8:90 reason3 locally_generated1解析&#xff1a; CTR…...

【入坑系列】TiDB 强制索引在不同库下不生效问题

文章目录 背景SQL 优化情况线上SQL运行情况分析怀疑1:执行计划绑定问题?尝试:SHOW WARNINGS 查看警告探索 TiDB 的 USE_INDEX 写法Hint 不生效问题排查解决参考背景 项目中使用 TiDB 数据库,并对 SQL 进行优化了,添加了强制索引。 UAT 环境已经生效,但 PROD 环境强制索…...

Java-41 深入浅出 Spring - 声明式事务的支持 事务配置 XML模式 XML+注解模式

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; &#x1f680; AI篇持续更新中&#xff01;&#xff08;长期更新&#xff09; 目前2025年06月05日更新到&#xff1a; AI炼丹日志-28 - Aud…...

mysql已经安装,但是通过rpm -q 没有找mysql相关的已安装包

文章目录 现象&#xff1a;mysql已经安装&#xff0c;但是通过rpm -q 没有找mysql相关的已安装包遇到 rpm 命令找不到已经安装的 MySQL 包时&#xff0c;可能是因为以下几个原因&#xff1a;1.MySQL 不是通过 RPM 包安装的2.RPM 数据库损坏3.使用了不同的包名或路径4.使用其他包…...

以光量子为例,详解量子获取方式

光量子技术获取量子比特可在室温下进行。该方式有望通过与名为硅光子学&#xff08;silicon photonics&#xff09;的光波导&#xff08;optical waveguide&#xff09;芯片制造技术和光纤等光通信技术相结合来实现量子计算机。量子力学中&#xff0c;光既是波又是粒子。光子本…...

智能AI电话机器人系统的识别能力现状与发展水平

一、引言 随着人工智能技术的飞速发展&#xff0c;AI电话机器人系统已经从简单的自动应答工具演变为具备复杂交互能力的智能助手。这类系统结合了语音识别、自然语言处理、情感计算和机器学习等多项前沿技术&#xff0c;在客户服务、营销推广、信息查询等领域发挥着越来越重要…...

解读《网络安全法》最新修订,把握网络安全新趋势

《网络安全法》自2017年施行以来&#xff0c;在维护网络空间安全方面发挥了重要作用。但随着网络环境的日益复杂&#xff0c;网络攻击、数据泄露等事件频发&#xff0c;现行法律已难以完全适应新的风险挑战。 2025年3月28日&#xff0c;国家网信办会同相关部门起草了《网络安全…...

DBLP数据库是什么?

DBLP&#xff08;Digital Bibliography & Library Project&#xff09;Computer Science Bibliography是全球著名的计算机科学出版物的开放书目数据库。DBLP所收录的期刊和会议论文质量较高&#xff0c;数据库文献更新速度很快&#xff0c;很好地反映了国际计算机科学学术研…...

数据结构:递归的种类(Types of Recursion)

目录 尾递归&#xff08;Tail Recursion&#xff09; 什么是 Loop&#xff08;循环&#xff09;&#xff1f; 复杂度分析 头递归&#xff08;Head Recursion&#xff09; 树形递归&#xff08;Tree Recursion&#xff09; 线性递归&#xff08;Linear Recursion&#xff09;…...

spring Security对RBAC及其ABAC的支持使用

RBAC (基于角色的访问控制) RBAC (Role-Based Access Control) 是 Spring Security 中最常用的权限模型&#xff0c;它将权限分配给角色&#xff0c;再将角色分配给用户。 RBAC 核心实现 1. 数据库设计 users roles permissions ------- ------…...