当前位置: 首页 > article >正文

RocketMQ 消息发送核心源码解析:DefaultMQProducerImpl.send () 方法深度剖析

引言

在分布式系统中,消息队列是实现异步通信、服务解耦和流量削峰的关键组件。Apache RocketMQ 作为一款高性能、高可靠的消息中间件,被广泛应用于各类互联网场景。其中,消息发送是最基础也是最重要的功能之一。本文将深入剖析 RocketMQ 中 DefaultMQProducerImpl.send() 方法的实现原理,带你了解消息发送的核心流程和关键技术点。

一、DefaultMQProducerImpl 概述

DefaultMQProducerImpl 是 RocketMQ 消息生产者的核心实现类,它实现了消息发送的具体逻辑。其类层次结构如下:

DefaultMQProducerImpl├── 实现了 MQProducerInner 接口├── 依赖于 MQClientInstance 进行网络通信├── 包含 TopicPublishInfoManager 管理主题路由信息├── 使用 SendMessageHookList 支持发送钩子扩展

 核心属性

 private final InternalLogger log = ClientLogger.getLog();/*** 随机数*/private final Random random = new Random();/*** 关联的消息生产者的组件*/private final DefaultMQProducer defaultMQProducer;/*** topic 发布信息的映射表*/private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =new ConcurrentHashMap<String, TopicPublishInfo>();/*** 发送消息的钩子List**/private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();/*** 完成事务消息的钩子list*/private final ArrayList<EndTransactionHook> endTransactionHookList = new ArrayList<EndTransactionHook>();/*** rpc钩子*/private final RPCHook rpcHook;/*** 异步发送线程池队列*/private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;/*** 默认异步发送消息的线程池*/private final ExecutorService defaultAsyncSenderExecutor;/*** 检查请求的队列*/protected BlockingQueue<Runnable> checkRequestQueue;/*** 检查请求的线程池*/protected ExecutorService checkExecutor;/*** 服务的状态*/private ServiceState serviceState = ServiceState.CREATE_JUST;/*** 客户端的实例*/private MQClientInstance mQClientFactory;/*** 检查禁用钩子的list*/private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();/*** 容错策略*/private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();/*** 异步化发送消息的线程池*/private ExecutorService asyncSenderExecutor;

二、send () 方法的核心流程

2.1 方法签名与重载

DefaultMQProducerImpl 提供了多个重载的 send() 方法,支持同步、异步和单向发送模式:

// 同步发送
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;// 异步发送
public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;// 单向发送
public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException;// 带超时参数的同步发送
public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;// 带队列选择器的发送
public SendResult send(Message msg, MessageQueueSelector selector, Object arg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;

2.2 核心发送流程

所有 send() 方法最终都会调用 sendDefaultImpl() 方法,这是消息发送的核心实现:

    private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID = random.nextLong();long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;//根据topic进行查找topic的路由信息TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {boolean callTimeout = false;MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;//设置总的重试次数int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];for (; times < timesTotal; times++) {String lastBrokerName = null == mq ? null : mq.getBrokerName();//进行选择一个消息队列MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);//选择出来的队列不为nullif (mqSelected != null) {mq = mqSelected;brokersSent[times] = mq.getBrokerName();try {beginTimestampPrev = System.currentTimeMillis();if (times > 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {callTimeout = true;break;}//走一个网络通信逻辑sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}} catch (RemotingException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQClientException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQBrokerException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {continue;} else {if (sendResult != null) {return sendResult;}throw e;}} catch (InterruptedException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());throw e;}} else {break;}}if (sendResult != null) {return sendResult;}String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",times,System.currentTimeMillis() - beginTimestampFirst,msg.getTopic(),Arrays.toString(brokersSent));info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);MQClientException mqClientException = new MQClientException(info, exception);if (callTimeout) {throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");}if (exception instanceof MQBrokerException) {mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());} else if (exception instanceof RemotingConnectException) {mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);} else if (exception instanceof RemotingTimeoutException) {mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);} else if (exception instanceof MQClientException) {mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);}throw mqClientException;}validateNameServerSetting();throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);}

三、关键步骤详解

3.1 获取主题路由信息

 private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {//从本地的Map中进行获取Topic的路由信息TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);//本地的Map中没有这个Topic的路由信息,那么就进行从NameServer中获取路由信息if (null == topicPublishInfo || !topicPublishInfo.ok()) {this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());// 从远程的NameServer中获取路由信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);topicPublishInfo = this.topicPublishInfoTable.get(topic);}//topic的路由信息是可用的,那么就直接返回这个topicPublishInfoif (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {return topicPublishInfo;} else {//从远程的NameServer中获取路由信息 在进行返回this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);topicPublishInfo = this.topicPublishInfoTable.get(topic);return topicPublishInfo;}}

3.2 消息队列选择

/*** 选择一个可用的队列* @param tpInfo topic的路由信息* @param lastBrokerName 上一次发送时候的broker* @return*/public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {if (this.sendLatencyFaultEnable) {try {//对这个queue进行累加一下indexint index = tpInfo.getSendWhichQueue().incrementAndGet();//遍历topic里的每个consumeQueuefor (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {//针对index 进行取模int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (pos < 0)pos = 0;//轮询选择一个messageQueueMessageQueue mq = tpInfo.getMessageQueueList().get(pos);//判断这个broker是否可用if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))return mq;}//遍历完之后,发现没有可用的broker 至少选择一个brokerfinal String notBestBroker = latencyFaultTolerance.pickOneAtLeast();int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {final MessageQueue mq = tpInfo.selectOneMessageQueue();if (notBestBroker != null) {return new MessageQueue(mq.getTopic(), notBestBroker, tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);} else {return mq;}} else {latencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error("Error occurred when selecting message queue", e);}return tpInfo.selectOneMessageQueue();}return tpInfo.selectOneMessageQueue(lastBrokerName);}

3.3 底层消息发送

private SendResult sendKernelImpl(final Message msg,final MessageQueue mq,final CommunicationMode communicationMode,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();//获取broker地址String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());if (null == brokerAddr) {tryToFindTopicPublishInfo(mq.getTopic());brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());}SendMessageContext context = null;if (brokerAddr != null) {brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);byte[] prevBody = msg.getBody();try {//for MessageBatch,ID has been set in the generating processif (!(msg instanceof MessageBatch)) {MessageClientIDSetter.setUniqID(msg);}boolean topicWithNamespace = false;if (null != this.mQClientFactory.getClientConfig().getNamespace()) {msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());topicWithNamespace = true;}int sysFlag = 0;boolean msgBodyCompressed = false;if (this.tryToCompressMessage(msg)) {sysFlag |= MessageSysFlag.COMPRESSED_FLAG;sysFlag |= compressType.getCompressionFlag();msgBodyCompressed = true;}final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (Boolean.parseBoolean(tranMsg)) {sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;}if (hasCheckForbiddenHook()) {CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());checkForbiddenContext.setCommunicationMode(communicationMode);checkForbiddenContext.setBrokerAddr(brokerAddr);checkForbiddenContext.setMessage(msg);checkForbiddenContext.setMq(mq);checkForbiddenContext.setUnitMode(this.isUnitMode());this.executeCheckForbiddenHook(checkForbiddenContext);}if (this.hasSendMessageHook()) {context = new SendMessageContext();context.setProducer(this);context.setProducerGroup(this.defaultMQProducer.getProducerGroup());context.setCommunicationMode(communicationMode);context.setBornHost(this.defaultMQProducer.getClientIP());context.setBrokerAddr(brokerAddr);context.setMessage(msg);context.setMq(mq);context.setNamespace(this.defaultMQProducer.getNamespace());String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (isTrans != null && isTrans.equals("true")) {context.setMsgType(MessageType.Trans_Msg_Half);}if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {context.setMsgType(MessageType.Delay_Msg);}this.executeSendMessageHookBefore(context);}//发送消息的请求头SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTopic(msg.getTopic());requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());requestHeader.setQueueId(mq.getQueueId());requestHeader.setSysFlag(sysFlag);requestHeader.setBornTimestamp(System.currentTimeMillis());requestHeader.setFlag(msg.getFlag());requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));requestHeader.setReconsumeTimes(0);requestHeader.setUnitMode(this.isUnitMode());requestHeader.setBatch(msg instanceof MessageBatch);requestHeader.setBname(mq.getBrokerName());if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);if (reconsumeTimes != null) {requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);}String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);if (maxReconsumeTimes != null) {requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);}}SendResult sendResult = null;switch (communicationMode) {case ASYNC:Message tmpMessage = msg;boolean messageCloned = false;if (msgBodyCompressed) {//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;msg.setBody(prevBody);}if (topicWithNamespace) {if (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;}msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);break;case ONEWAY:case SYNC: //同步发送long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}//同步发送消息sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:assert false;break;}if (this.hasSendMessageHook()) {context.setSendResult(sendResult);this.executeSendMessageHookAfter(context);}return sendResult;} catch (RemotingException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (MQBrokerException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (InterruptedException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} finally {msg.setBody(prevBody);msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}}throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);}

3.4 调用NettyClient发送消息

 public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {//获取一个当前的时间戳long beginStartTime = System.currentTimeMillis();//这里获取一个channel 这个channel 就是Broker跟NameServer之间的连接final Channel channel = this.getAndCreateChannel(addr);//如果连接存在并且连接是活跃的 那么就可以发送请求if (channel != null && channel.isActive()) {try {//执行发送前的一些操作 先不关心doBeforeRpcHooks(addr, request);long costTime = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTime) {throw new RemotingTimeoutException("invokeSync call the addr[" + addr + "] timeout");}//这里就是真正发送网络请求的地方RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);//执行完成后的一些操作 先不关心doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);return response;} catch (RemotingSendRequestException e) {log.warn("invokeSync: send request exception, so close the channel[{}]", addr);this.closeChannel(addr, channel);throw e;} catch (RemotingTimeoutException e) {if (nettyClientConfig.isClientCloseSocketIfTimeout()) {this.closeChannel(addr, channel);log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);}log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);throw e;}} else {this.closeChannel(addr, channel);throw new RemotingConnectException(addr);}}
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,final long timeoutMillis)throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {final int opaque = request.getOpaque();try {//初始化响应的future对象final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);this.responseTable.put(opaque, responseFuture);//获取请求的网络连接地址final SocketAddress addr = channel.remoteAddress();channel.writeAndFlush(request).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {if (f.isSuccess()) {responseFuture.setSendRequestOK(true);return;} else {responseFuture.setSendRequestOK(false);}responseTable.remove(opaque);responseFuture.setCause(f.cause());responseFuture.putResponse(null);log.warn("send a request command to channel <" + addr + "> failed.");}});//等待响应 使用countDownLatch进行同步的等待RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);//没有返回结果if (null == responseCommand) {if (responseFuture.isSendRequestOK()) {//请求超时了 发送成功了throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,responseFuture.getCause());} else {//请求压根就没有发送成功throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());}}//如果响应成功return responseCommand;} finally {//最终一定会把响应从table中删除this.responseTable.remove(opaque);}}

 3.5 BrokerController接收请求

        客户端发送消息给BrokerController,BrokerController会进行调用SendMessageProcessor进行处理发送过来的消息。

    public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final SendMessageContext mqtraceContext;switch (request.getCode()) {case RequestCode.CONSUMER_SEND_MSG_BACK:return this.asyncConsumerSendMsgBack(ctx, request);default://解析出来一个发送消息请求的headerSendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return CompletableFuture.completedFuture(null);}//构建一个消息的上下文mqtraceContext = buildMsgContext(ctx, requestHeader);this.executeSendMessageHookBefore(ctx, request, mqtraceContext);if (requestHeader.isBatch()) {return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);}}}

  处理单个消息的逻辑:asyncSendMessage,调用底层的存储逻辑进行异步增加消息。

this.brokerController.getMessageStore().asyncPutMessage(msgInner);

最终会进行调用 CommitLog#asyncPutMessage的方法,根据决定是调用CommitLog#asyncPutMessage还是调用 DLedgerCommitLog#asyncPutMessage方法。如果是调用 DLedgerCommitLog#asyncPutMessage方法,最终会进行调用DLedgerServer#handleAppend。

保存完消息之后,ReputMessageService线程会调用CommitLogDispatcherBuildConsumeQueue(向ConsumeQueue中添加数据)和CommitLogDispatcherBuildIndex(向索引中添加数据) 这两个分发组件。

四、性能优化与最佳实践

4.1 性能优化建议

  1. 批量发送:对于小消息,使用批量发送提高吞吐量

List<Message> messages = new ArrayList<>();
messages.add(new Message("TopicTest", "TagA", "Message 1".getBytes()));
messages.add(new Message("TopicTest", "TagA", "Message 2".getBytes()));
producer.send(messages);

   2.异步发送:对响应时间敏感的场景使用异步模式

producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 处理成功响应}@Overridepublic void onException(Throwable e) {// 处理异常}
});

3.合理配置重试次数:根据业务需求调整 retryTimesWhenSendFailed

4.启用故障延迟机制:避免向故障 Broker 发送消息

4.2 监控与告警

  1. 发送成功率:监控 SendStatus.SEND_OK 的比例

  2. 响应时间:监控消息发送的平均响应时间

  3. 异常日志:关注 sendDefaultImpl 方法中的异常捕获

  4. Broker 状态:监控 Broker 的可用性和负载情况

相关文章:

RocketMQ 消息发送核心源码解析:DefaultMQProducerImpl.send () 方法深度剖析

引言 在分布式系统中&#xff0c;消息队列是实现异步通信、服务解耦和流量削峰的关键组件。Apache RocketMQ 作为一款高性能、高可靠的消息中间件&#xff0c;被广泛应用于各类互联网场景。其中&#xff0c;消息发送是最基础也是最重要的功能之一。本文将深入剖析 RocketMQ 中…...

BiliNote部署实践

​ 开源地址&#xff1a; https://github.com/JefferyHcool/BiliNote &#x1f680; 快速开始 1. 克隆仓库 git clone https://github.com/JefferyHcool/BiliNote.git cd BiliNote mv .env.example .env2. 启动后端&#xff08;FastAPI&#xff09; cd backend pip insta…...

deepseek问答记录:请讲解一下transformers.HfArgumentParser()

1. 核心概念&#xff1a; transformers.HfArgumentParser 是 Hugging Face Transformers 库提供的一个命令行参数解析器。它基于 Python 内置的 argparse 模块&#xff0c;但进行了专门增强&#xff0c;目的是为了更简单、更优雅地管理机器学习&#xff08;尤其是 NLP 任务&am…...

bismark OT CTOT OB CTOB 以及mapping后的bam文件中的XG,XR列的含义

首先&#xff0c;OT&#xff0c;OB&#xff0c;CTOT&#xff0c;CTOB都是描述测序reads的&#xff0c;而不是描述参考基因组的。 bisul-fate建库会将DNA双链文库中非甲基化的C转化成U。转化结束后&#xff0c;被转化的U和互补链的G并不配对。此时正链&#xff08;&#xff0c;…...

new语法

在C中&#xff0c;new 是用于动态内存分配的操作符&#xff0c;允许在运行时请求内存空间。以下是 new 的完整语法和用法说明&#xff1a; 1. 基本语法 1.1 单一对象分配 type* pointer new type(initializer);作用&#xff1a;分配一个 type 类型的对象&#xff0c;并返回…...

npm、yarn幽灵依赖问题

很好&#xff01;我们来专门讲讲**幽灵依赖&#xff08;Phantom Dependency&#xff09;**是什么&#xff0c;以及为什么 pnpm 对这个问题非常严格。 &#x1f47b; 什么是幽灵依赖&#xff1f; 幽灵依赖&#xff08;Phantom Dependency&#xff09;&#xff0c;指的是&#x…...

Android Native 之 adbd进程分析

目录 1、adbd守护进程 2、adbd权限降级 3、adbd命令解析 1&#xff09;adb shell 2&#xff09;adb root 3&#xff09;adb reboot 4、案例 1&#xff09;案例之实现不需要执行adb root命令自动具有root权限 2&#xff09;案例之实现不需要RSA认证直接能够使用adb she…...

CAN通讯协议中各种参数解析

1.各种参数缩写 2.多帧传输时间参数解析 - Sender&#xff08;左侧&#xff09; 指的是 多帧数据的发送者&#xff0c;也就是&#xff1a; ECU&#xff08;被测系统 / 响应方&#xff09; - Receiver&#xff08;右侧&#xff09; 指的是 多帧数据的接收者&#xff0c;也就是…...

网络攻防技术三:网络脆弱性分析

文章目录 一、影响安全的因素二、计算机网络三、网络体系结构脆弱性1、因特网容易被攻击的特性 四、典型网络协议安全性分析&#xff08;重要&#xff09;1、IPv42、RIP&#xff08;UDP)3、ICMP(UDP)4、ARP5、OSPF(IP数据报&#xff09;6、BGP(TCP)7、UDP8、TCP9、DNS(UDP)10、…...

(八)登录认证与学生写作画像

本次将赵昱琨同学之前完成的学生写作画像与智能学习路径规划的后端与目前已有的后端框架进行整合。同时为了实现学生写作画像与智能学习路径规划&#xff0c;需要在之前简易的登录系统上进行重构&#xff0c;所以本次大规模重写了登录模块&#xff0c;同时发现很多过去冗余的代…...

Netty学习example示例

文章目录 simpleServer端NettyServerNettyServerHandler Client端NettyClientNettyClientHandler tcp&#xff08;粘包和拆包&#xff09;Server端NettyTcpServerNettyTcpServerHandler Client端NettyTcpClientNettyTcpClientHandler protocolcodecCustomMessageDecoderCustomM…...

几种常用的Agent的Prompt格式

一、基础框架范式&#xff08;Google推荐标准&#xff09; 1. 角色与职能定义 <Role_Definition> 你是“项目专家”&#xff08;Project Pro&#xff09;&#xff0c;作为家居园艺零售商的首席AI助手&#xff0c;专注于家装改造领域。你的核心使命&#xff1a; 1. 协助…...

数据库运维管理系统在AI方向的实践

引言 关系型数据库(如MySQL、PostgreSQL、SQL Server、Oracle等)作为核心数据存储平台,承载着关键业务系统的运行。数据库的运维管理(DBA)工作变得愈发复杂和重要,涉及性能监控、故障诊断、容量规划、安全审计、自动化运维等多个方面。传统的数据库运维依赖人工经验,效…...

[RoarCTF 2019]Easy Calc

查看源代码 <!--Ive set up WAF to ensure security.--> <script>$(#calc).submit(function(){$.ajax({url:"calc.php?num"encodeURIComponent($("#content").val()),type:GET,success:function(data){$("#result").html(<div …...

[Windows]在Win上安装bash和zsh - 一个脚本搞定

目录 前言安装步骤配置要求下载安装脚本启动程序 前言 Windows是一个很流行的系统, 但是在Windows上安装bash和zsh一直是一个让人头疼的问题. 本蛙特意打包了一个程序, 用于一站式解决这一类的问题. 安装步骤 配置要求 系统: Windows软件: Powershell 5.1或以上 下载安装…...

ubuntu系统上运行jar程序输出时间时区不对

springboot项目打包jar文件在ubuntu系统上运行&#xff0c;发现在系统和日志里面&#xff0c;显示和打印的当前时间时区都是UTC0&#xff0c;通过timedatectl命令设置系统时区为Asia/Shanghai&#xff0c;命令date -R发现系统已经修改成功&#xff0c;但是发现springboot仍然输…...

React 播客专栏 Vol.18|React 第二阶段复习 · 样式与 Hooks 全面整合

视频版 &#x1f399; 欢迎回到《前端达人 React播客书单》第 18 期。 今天&#xff0c;我们将对第二阶段的内容进行系统复盘&#xff0c;重点是两个关键词&#xff1a;样式 与 Hooks。 样式&#xff0c;决定组件“长什么样”Hooks&#xff0c;决定组件“怎么动起来” 我们不但…...

从认识AI开始-----解密LSTM:RNN的进化之路

前言 我在上一篇文章中介绍了 RNN&#xff0c;它是一个隐变量模型&#xff0c;主要通过隐藏状态连接时间序列&#xff0c;实现了序列信息的记忆与建模。然而&#xff0c;RNN在实践中面临严重的“梯度消失”与“长期依赖建模困难”问题&#xff1a; 难以捕捉相隔很远的时间步之…...

leetcode0513. 找树左下角的值-meidum

1 题目&#xff1a;找树左下角的值 官方标定难度&#xff1a;中 给定一个二叉树的 根节点 root&#xff0c;请找出该二叉树的 最底层 最左边 节点的值。 假设二叉树中至少有一个节点。 示例 1: 输入: root [2,1,3] 输出: 1 示例 2: 输入: [1,2,3,4,null,5,6,null,null,7]…...

命令行式本地与服务器互传文件

文章目录 1. 背景2. 传输方式2.1 SCP 协议传输2.2 SFTP 协议传输 3. 注意 命令行式本地与服务器互传文件 1. 背景 多设备协同工作中&#xff0c;因操作系统的不同&#xff0c;我们经常需要将另外一个系统中的文件传输到本地PC进行浏览、编译。多设备文件互传&#xff0c;在嵌入…...

MPTCP 聚合吞吐

只破不立假把式&#xff0c;前面连续喷 MPTCP 是个错误&#xff0c;今天说说如何克服。 到底谁在阻碍 MPTCP 聚合吞吐一定要搞清楚&#xff0c;是算法硬伤&#xff0c;是数据不足。前文说过&#xff0c;将一个窗口内的数据多路径 spray 有损吞吐&#xff0c;想要聚合吞吐&…...

JavaScript性能优化实战技术文章大纲

代码层面优化 避免全局变量污染&#xff0c;使用let和const替代var&#xff0c;减少作用域链查找开销。 // 反例&#xff1a;全局变量 var globalVar 低效;// 正例&#xff1a;局部变量 function optimized() {const localVar 高效; }减少DOM操作&#xff0c;合并多次操作或…...

LabelImg: 开源图像标注工具指南

LabelImg: 开源图像标注工具指南 1. 简介 LabelImg 是一个图形化的图像标注工具&#xff0c;使用 Python 和 Qt 开发。它是目标检测任务中最常用的标注工具之一&#xff0c;支持 PASCAL VOC 和 YOLO 格式的标注输出。该工具开源、免费&#xff0c;并且跨平台支持 Windows、Lin…...

计算机网络 TCP篇常见面试题总结

目录 TCP 的三次握手与四次挥手详解 1. 三次握手&#xff08;Three-Way Handshake&#xff09; 2. 四次挥手&#xff08;Four-Way Handshake&#xff09; TCP 为什么可靠&#xff1f; 1. 序列号与确认应答&#xff08;ACK&#xff09; 2. 超时重传&#xff08;Retransmis…...

树欲静而风不止,子欲养而亲不待

2025年6月2日&#xff0c;13~26℃&#xff0c;一般 待办&#xff1a; 物理2 、物理 学生重修 职称材料的最后检查 教学技能大赛PPT 遇见&#xff1a;使用通义创作了一副照片&#xff0c;很好看&#xff01;都有想用来创作自己的头像了&#xff01; 提示词如下&#xff1a; A b…...

Kotlin中的::操作符详解

Kotlin提供了::操作符&#xff0c;用于创建对类或对象的成员(函数、属性)的引用。这种机制叫做成员引用(Member Reference)。这是Kotlin高阶函数和函数式编程的重要组成部分。 简化函数传递 在Java中&#xff0c;我们这样传方法&#xff1a; list.forEach(item -> System.…...

【Linux】(1)—进程概念-③Linux进程概念与PCB

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、Linux进程概念与PCB 前言 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 什么是进程&#xff1f; 进程可以理解为"正在执行的…...

神经网络中的梯度消失与梯度爆炸

在深层次的神经网络中很容易出现梯度消失与梯度爆炸的问题。这篇博客就详细介绍一下为什么会产生梯度消失与梯度爆炸的问题&#xff0c;以及如何解决。 首先梯度是什么 类比快递员送包裹&#xff1a; 神经网络训练时&#xff0c;需要根据预测错误&#xff08;损失函数&#…...

深入详解编译与链接:翻译环境和运行环境,翻译环境:预编译+编译+汇编+链接,运行环境

目录 一、翻译环境和运行环境 二、翻译环境&#xff1a;预编译编译汇编链接 &#xff08;一&#xff09;预处理&#xff08;预编译&#xff09; &#xff08;二&#xff09;编译 1、词法分析 2、语法分析 3、语义分析 &#xff08;三&#xff09;汇编 &#xff08;四&…...

系统架构设计师案例分析----经典架构风格特点

这次的考试太大意了&#xff0c;很多知识点有印象&#xff0c;但不能完整的描述出来。今年11月的考试&#xff0c;要认真备考&#xff0c;从现在开始&#xff0c;把案例分析和论文内容整理出来&#xff0c;一是方便记忆&#xff0c;二是和各位考一起分享。欢迎各位拍砖。 这段…...