当前位置: 首页 > news >正文

rocketmq-product-send方法源码分析

先看有哪些send方法

在这里插入图片描述

首先说红圈的

有3个红圈。归类成3种发送方式。假设前提条件,发送的topic,有3个broker,每个broker总共4个write队列,总共有12个队列。

  • 普通发送。负载均衡12个队列。指定超时时间
  • 指定MessageQueue,发送,指定超时时间
  • 指定selector器,指定特定参数,指定超时时间。一般用于局部有序,比如相同userId的,到同一个队列

默认超时时间时3秒

再说蓝圈

  • sendDefaultImpl 负载均衡的方式,选择队列。然后调sendKernelImpl
  • sendSelectImpl 指定队列selector和arg的方式,选择队列。然后调sendKernelImpl
  • sendKernelImpl 最核心的方式。这里已经明确队列,做真实的消息发送

很明显,只需要简单解读sendDefaultImpl和sendSelectImpl如何选择队列。然后重点在于查看sendKernelImpl方法实现

sendDefaultImpl选择队列分析

先看源码

private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID = random.nextLong();long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {boolean callTimeout = false;MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];for (; times < timesTotal; times++) {String lastBrokerName = null == mq ? null : mq.getBrokerName();MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected != null) {mq = mqSelected;brokersSent[times] = mq.getBrokerName();try {beginTimestampPrev = System.currentTimeMillis();if (times > 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {callTimeout = true;break;}sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl

第一步,通过topic查找路由信息tryToFindTopicPublishInfo
先从内存中获取。内存是DefaultMQProducerImpl#topicPublishInfoTable
如果内存没有,则从nameserver获取
org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String)

内存是什么时候添加的呢?是有定时器任务更新的。详情看我写的文章rocketmq-push模式-消费侧重平衡-类流程图分析

第二步、设定默认重试3次(包含首次),选择topic的其中一个队列
org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {if (lastBrokerName == null) {return selectOneMessageQueue();} else {for (int i = 0; i < this.messageQueueList.size(); i++) {int index = this.sendWhichQueue.incrementAndGet();int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;MessageQueue mq = this.messageQueueList.get(pos);if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}}return selectOneMessageQueue();}
}

可以发现,topic对应的TopicPublishInfo,维护者一个ThreadLocalIndex对象。
每个线程先会获取一个index,然后对index取模,得到某一个队列。
这意味着,sendDefaultImpl中,队列的负载均衡是线程独立的。每个线程维护着自己的index,每发送一次,index+1。

public int incrementAndGet() {Integer index = this.threadLocalIndex.get();if (null == index) {index = Math.abs(random.nextInt());this.threadLocalIndex.set(index);}this.threadLocalIndex.set(++index);return Math.abs(index & POSITIVE_MASK);}

第三步、选择完MessageQueue后,调用sendKernelImpl发送消息

sendSelectImpl选择队列分析

先看源码

private SendResult sendSelectImpl(Message msg,MessageQueueSelector selector,Object arg,final CommunicationMode communicationMode,final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {MessageQueue mq = null;try {List<MessageQueue> messageQueueList =mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());Message userMessage = MessageAccessor.cloneMessage(msg);String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());userMessage.setTopic(userTopic);mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));} catch (Throwable e) {throw new MQClientException("select message queue threw exception.", e);}long costTime = System.currentTimeMillis() - beginStartTime;if (timeout < costTime) {throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");}if (mq != null) {return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);} else {throw new MQClientException("select message queue return null.", null);}}validateNameServerSetting();throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);}

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl

第一步,通过topic查找路由信息tryToFindTopicPublishInfo。分析同上
第二步,通过MessageQueueSelector,找出发送的MessageQueue
MessageQueueSelector的实现方式,可以自定义。提供了2种
SelectMessageQueueByRandom 随机一个
SelectMessageQueueByHash 根据arg的hashcode取模一个。适合局部有序

public class SelectMessageQueueByHash implements MessageQueueSelector {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {int value = arg.hashCode() % mqs.size();if (value < 0) {value = Math.abs(value);}return mqs.get(value);}
}

第三步、选择完MessageQueue后,调用sendKernelImpl发送消息

sendKernelImpl发送分析

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl
第一步、通过MessageQueue,获取对应的master节点地址
第二步、设置消息的唯一id。详情看以下实现。明显是客户端生成的,(由于不是分布式唯一ID的创建方式,有点怀疑会重复。后续查看)
org.apache.rocketmq.common.message.MessageClientIDSetter#createUniqID
第三步、对消息body做消息压缩
第四步、判断该消息是否是事务消息。给sysFlag位标志变量加标志
第五步、发送前可做一些自定义的检查CheckForbiddenHook、SendMessageHook
第六步、构建SendMessageRequestHeader requestHeader,将msg的一些内容设置到header上
第七部、根据发送模式communicationMode,调用不同的sendMessage方法
org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessage

switch (communicationMode) {case ASYNC:Message tmpMessage = msg;boolean messageCloned = false;if (msgBodyCompressed) {//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;msg.setBody(prevBody);}if (topicWithNamespace) {if (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;}msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);break;case ONEWAY:case SYNC:long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:assert false;break;
}

第八步、最终会调用NettyRemotingClient的发送方法
SYNC:
org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync
ONEWAY:
org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeOneway
ASYNC:
org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeAsync

总结

product的发送有几种API模式,其实目的都是为了选择MessageQueue

  • 默认的发送,是根据topic的队列,做负载均衡的方式,topicPublishInfo内部维护着ThreadLocalIndex对象,做线程级别的负载均衡。而且默认都3次重试机会,意味可以选择不同队列做发送;
  • 指定messageQueue,是调用方明确知道发送的MessageQueue,这种失败不会做重试;
  • 指定MessageQueueSelector等,这种是通过传入的参数,计算出对应的MessageQueue,这种失败不会做重试,适合作为局部有序的发送方式

选择好队列后,就会调用org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl方法,主要是构建SendMessageRequestHeader,执行自定义的发送before和after的处理。
sendKernelImpl最终会调用NettyRemotingClient提供的接口,分别处理SYNC、ONEWAY、ASYNC的三种模式

相关文章:

rocketmq-product-send方法源码分析

先看有哪些send方法 首先说红圈的 有3个红圈。归类成3种发送方式。假设前提条件&#xff0c;发送的topic&#xff0c;有3个broker&#xff0c;每个broker总共4个write队列&#xff0c;总共有12个队列。 普通发送。负载均衡12个队列。指定超时时间指定MessageQueue,发送&#…...

python flask中使用or查询和and查询,还有同时使用or、and的情况

在 Flask 中处理数据库查询时&#xff0c;通常会结合使用 ORM 工具&#xff0c;例如 SQLAlchemy。以下是 or 查询、and 查询以及两者同时使用的示例。 文章目录 基础准备1. 使用 or_ 查询2. 使用 and_ 查询3. 同时使用 or_ 和 and_4. 更加复杂的嵌套查询 基础准备 假设有一个…...

【第一天】零基础入门刷题Python-算法篇-数据结构与算法的介绍(持续更新)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、Python数据结构与算法的详细介绍1.基本概念2.Python中的数据结构1. 列表&#xff08;List&#xff09;2. 元组&#xff08;Tuple&#xff09;3. 字典&#…...

租房管理系统实现智能化租赁提升用户体验与运营效率

内容概要 在当今快速发展的租赁市场中&#xff0c;租房管理系统的智能化转型显得尤为重要。它不仅帮助房东和租客之间建立更高效的沟通桥梁&#xff0c;还优化了整个租赁流程。通过智能化技术&#xff0c;这套系统能够自动处理资产管理、合同签署、财务管理等所有关键环节。这…...

python3+TensorFlow 2.x(四)反向传播

目录 反向传播算法 反向传播算法基本步骤&#xff1a; 反向中的参数变化 总结 反向传播算法 反向传播算法&#xff08;Backpropagation&#xff09;是训练人工神经网络时使用的一个重要算法&#xff0c;它是通过计算梯度并优化神经网络的权重来最小化误差。反向传播算法的核…...

Flutter 使用 flutter_inappwebview 加载 App 本地 HTML 文件

在 Flutter 开发中&#xff0c;加载本地 HTML 文件是一个常见的需求&#xff0c;尤其是在需要展示离线内容或自定义页面时。flutter_inappwebview 是一个功能强大的插件&#xff0c;支持加载本地文件和网络资源。本文将详细介绍如何使用 flutter_inappwebview 加载 App 本地 HT…...

Word常见问题:嵌入图片无法显示完整

场景&#xff1a;在Word中&#xff0c;嵌入式图片显示不全&#xff0c;一部分图片在文字下方。如&#xff1a; 问题原因&#xff1a;因段落行距导致 方法一 快捷方式 选中图片&#xff0c;通过"ctrl1"快捷调整为1倍行距 方法二 通过工具栏调整 选中图片&#xff0…...

为AI聊天工具添加一个知识系统 之68 详细设计 之9 三种中台和时间度量 之1

本文要点 要点 在维度0上 被分离出来 的业务中台 需求、技术中台要求、和数据中台请求 &#xff08;分别在时间层/空间层/时空层上 对应一个不同种类槽的容器&#xff0c;分别表示业务特征Feature[3]/技术方面Aspect[3]/数据流Fluent[3]&#xff09; 在维度1~3的运动过程中 从…...

On to OpenGL and 3D computer graphics

2. On to OpenGL and 3D computer graphics 声明&#xff1a;该代码来自&#xff1a;Computer Graphics Through OpenGL From Theory to Experiments&#xff0c;仅用作学习参考 2.1 First Program Square.cpp完整代码 /// // square.cpp // // OpenGL program to draw a squ…...

从曾国藩的经历看如何打破成长中的瓶颈

《曾国藩传》是一部充满智慧与人生哲理的传记&#xff0c;而曾国藩本人更是一个从“最笨”到“最智慧”的奇人。看他的成长与蜕变&#xff0c;不仅能感受到他如何超越自己的局限&#xff0c;也能从中获得关于人性、社会和历史的重要启示。曾国藩的一生让人深思&#xff0c;正是…...

JavaWeb学习-SpringBotWeb开发入门(HTTP协议)

(一)SpringBotWeb开发步骤 (1)创建springboot工程,并勾选开发相关依赖 (2)定义HelloController类,添加方法hello,并添加注解 (3)运行测试 (二)HTTP入门概述 创建请求页面 package com.itheima.demo3; /*请求处理类,加上注解标识为请求处理类*/import org.spr…...

数据库用户管理

数据库用户管理 1.创建用户 MySQL在安装是&#xff0c;会默认创建一个名位root的用户&#xff0c;该用户拥有超级权限&#xff0c;可以控制整个MySQL服务器。 在对MySQL的日常管理和操作中&#xff0c;通常创建一些具有适当权限的用户&#xff0c;尽可能的不用或少用root登录…...

BGP边界网关协议(Border Gateway Protocol)路由聚合详解

一、路由聚合 1、意义 在大规模的网络中&#xff0c;BGP路由表十分庞大&#xff0c;给设备造成了很大的负担&#xff0c;同时使发生路由振荡的几率也大大增加&#xff0c;影响网络的稳定性。 路由聚合是将多条路由合并的机制&#xff0c;它通过只向对等体发送聚合后的路由而…...

ASP.NET Core WebAPI的异步及返回值

目录 Action方法的异步 Action方法参数 捕捉URL占位符 捕捉QueryString的值 JSON报文体 其他方式 Action方法的异步 Action方法既可以同步也可以异步。异步Action方法的名字一般不需要以Async结尾。Web API中Action方法的返回值如果是普通数据类型&#xff0c;那么返回值…...

「 机器人 」仿生扑翼飞行器中的“被动旋转机制”概述

前言 在仿生扑翼飞行器的机翼设计中,模仿昆虫翼的被动旋转机制是一项关键技术。其核心思想在于:机翼旋转角度(攻角)并非完全通过主动伺服来控制,而是利用空气动力和惯性力的作用,自然地实现被动调节。以下对这种设计的背景、原理与优势进行详细说明。 1. 背景:昆虫的被动…...

「 机器人 」扑翼飞行器的数据驱动建模核心方法

前言 数据驱动建模可充分利用扑翼飞行器的已有运行数据,改进动力学模型与控制策略,并对未建模动态做出更精确的预测。在复杂的非线性飞行环境中,该方法能有效弥补传统解析建模的不足,具有较高的研究与应用价值。以下针对主要研究方向和实现步骤进行整理与阐述。 1. 数据驱动…...

个人网站搭建

搭建 LNMP环境搭建&#xff1a; LNMP环境指&#xff1a;Linux Nginx MySQL/MariaDB PHP&#xff0c;在debian上安装整体需要300MB的磁盘空间。MariaDB 是 MySQL 的一个分支&#xff0c;由 MySQL 的原开发者维护&#xff0c;通常在性能和优化上有所改进。由于其轻量化和与M…...

飞书项目流程入门指导手册

飞书项目流程入门指导手册 参考资料准备工作新建空间国际化配置新建工作项字段管理新建字段对接标识授权角色 流程管理基础说明流程节点配置流程节点的布局配置页面上布局按钮布局配置 流程节点驳回流程图展示自动化字段修改 局限性 参考资料 飞书官方参考文档&#xff1a;飞书…...

xss靶场

xss-labs下载地址&#xff1a;GitHub - do0dl3/xss-labs: xss 跨站漏洞平台 xss常见触发标签&#xff1a;XSS跨站脚本攻击实例与防御策略-CSDN博客 level-1 首先查看网页的源代码发现get传参的name的值test插入了html里头&#xff0c;还回显了payload的长度。 <!DOCTYPE …...

XML实体注入漏洞攻与防

JAVA中的XXE攻防 回显型 无回显型 cve-2014-3574...

DeepSeek 赋能智慧能源:微电网优化调度的智能革新路径

目录 一、智慧能源微电网优化调度概述1.1 智慧能源微电网概念1.2 优化调度的重要性1.3 目前面临的挑战 二、DeepSeek 技术探秘2.1 DeepSeek 技术原理2.2 DeepSeek 独特优势2.3 DeepSeek 在 AI 领域地位 三、DeepSeek 在微电网优化调度中的应用剖析3.1 数据处理与分析3.2 预测与…...

SCAU期末笔记 - 数据分析与数据挖掘题库解析

这门怎么题库答案不全啊日 来简单学一下子来 一、选择题&#xff08;可多选&#xff09; 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘&#xff1a;专注于发现数据中…...

java调用dll出现unsatisfiedLinkError以及JNA和JNI的区别

UnsatisfiedLinkError 在对接硬件设备中&#xff0c;我们会遇到使用 java 调用 dll文件 的情况&#xff0c;此时大概率出现UnsatisfiedLinkError链接错误&#xff0c;原因可能有如下几种 类名错误包名错误方法名参数错误使用 JNI 协议调用&#xff0c;结果 dll 未实现 JNI 协…...

Python如何给视频添加音频和字幕

在Python中&#xff0c;给视频添加音频和字幕可以使用电影文件处理库MoviePy和字幕处理库Subtitles。下面将详细介绍如何使用这些库来实现视频的音频和字幕添加&#xff0c;包括必要的代码示例和详细解释。 环境准备 在开始之前&#xff0c;需要安装以下Python库&#xff1a;…...

NFT模式:数字资产确权与链游经济系统构建

NFT模式&#xff1a;数字资产确权与链游经济系统构建 ——从技术架构到可持续生态的范式革命 一、确权技术革新&#xff1a;构建可信数字资产基石 1. 区块链底层架构的进化 跨链互操作协议&#xff1a;基于LayerZero协议实现以太坊、Solana等公链资产互通&#xff0c;通过零知…...

关于 WASM:1. WASM 基础原理

一、WASM 简介 1.1 WebAssembly 是什么&#xff1f; WebAssembly&#xff08;WASM&#xff09; 是一种能在现代浏览器中高效运行的二进制指令格式&#xff0c;它不是传统的编程语言&#xff0c;而是一种 低级字节码格式&#xff0c;可由高级语言&#xff08;如 C、C、Rust&am…...

QT3D学习笔记——圆台、圆锥

类名作用Qt3DWindow3D渲染窗口容器QEntity场景中的实体&#xff08;对象或容器&#xff09;QCamera控制观察视角QPointLight点光源QConeMesh圆锥几何网格QTransform控制实体的位置/旋转/缩放QPhongMaterialPhong光照材质&#xff08;定义颜色、反光等&#xff09;QFirstPersonC…...

CRMEB 中 PHP 短信扩展开发:涵盖一号通、阿里云、腾讯云、创蓝

目前已有一号通短信、阿里云短信、腾讯云短信扩展 扩展入口文件 文件目录 crmeb\services\sms\Sms.php 默认驱动类型为&#xff1a;一号通 namespace crmeb\services\sms;use crmeb\basic\BaseManager; use crmeb\services\AccessTokenServeService; use crmeb\services\sms\…...

elementUI点击浏览table所选行数据查看文档

项目场景&#xff1a; table按照要求特定的数据变成按钮可以点击 解决方案&#xff1a; <el-table-columnprop"mlname"label"名称"align"center"width"180"><template slot-scope"scope"><el-buttonv-if&qu…...

Cilium动手实验室: 精通之旅---13.Cilium LoadBalancer IPAM and L2 Service Announcement

Cilium动手实验室: 精通之旅---13.Cilium LoadBalancer IPAM and L2 Service Announcement 1. LAB环境2. L2公告策略2.1 部署Death Star2.2 访问服务2.3 部署L2公告策略2.4 服务宣告 3. 可视化 ARP 流量3.1 部署新服务3.2 准备可视化3.3 再次请求 4. 自动IPAM4.1 IPAM Pool4.2 …...