rocketmq-product-send方法源码分析
先看有哪些send方法

首先说红圈的
有3个红圈。归类成3种发送方式。假设前提条件,发送的topic,有3个broker,每个broker总共4个write队列,总共有12个队列。
- 普通发送。负载均衡12个队列。指定超时时间
- 指定MessageQueue,发送,指定超时时间
- 指定selector器,指定特定参数,指定超时时间。一般用于局部有序,比如相同userId的,到同一个队列
默认超时时间时3秒
再说蓝圈
- sendDefaultImpl 负载均衡的方式,选择队列。然后调sendKernelImpl
- sendSelectImpl 指定队列selector和arg的方式,选择队列。然后调sendKernelImpl
- sendKernelImpl 最核心的方式。这里已经明确队列,做真实的消息发送
很明显,只需要简单解读sendDefaultImpl和sendSelectImpl如何选择队列。然后重点在于查看sendKernelImpl方法实现
sendDefaultImpl选择队列分析
先看源码
private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID = random.nextLong();long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {boolean callTimeout = false;MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];for (; times < timesTotal; times++) {String lastBrokerName = null == mq ? null : mq.getBrokerName();MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected != null) {mq = mqSelected;brokersSent[times] = mq.getBrokerName();try {beginTimestampPrev = System.currentTimeMillis();if (times > 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {callTimeout = true;break;}sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
第一步,通过topic查找路由信息tryToFindTopicPublishInfo
先从内存中获取。内存是DefaultMQProducerImpl#topicPublishInfoTable
如果内存没有,则从nameserver获取
org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String)
内存是什么时候添加的呢?是有定时器任务更新的。详情看我写的文章rocketmq-push模式-消费侧重平衡-类流程图分析
第二步、设定默认重试3次(包含首次),选择topic的其中一个队列
org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {if (lastBrokerName == null) {return selectOneMessageQueue();} else {for (int i = 0; i < this.messageQueueList.size(); i++) {int index = this.sendWhichQueue.incrementAndGet();int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;MessageQueue mq = this.messageQueueList.get(pos);if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}}return selectOneMessageQueue();}
}
可以发现,topic对应的TopicPublishInfo,维护者一个ThreadLocalIndex对象。
每个线程先会获取一个index,然后对index取模,得到某一个队列。
这意味着,sendDefaultImpl中,队列的负载均衡是线程独立的。每个线程维护着自己的index,每发送一次,index+1。
public int incrementAndGet() {Integer index = this.threadLocalIndex.get();if (null == index) {index = Math.abs(random.nextInt());this.threadLocalIndex.set(index);}this.threadLocalIndex.set(++index);return Math.abs(index & POSITIVE_MASK);}
第三步、选择完MessageQueue后,调用sendKernelImpl发送消息
sendSelectImpl选择队列分析
先看源码
private SendResult sendSelectImpl(Message msg,MessageQueueSelector selector,Object arg,final CommunicationMode communicationMode,final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {MessageQueue mq = null;try {List<MessageQueue> messageQueueList =mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());Message userMessage = MessageAccessor.cloneMessage(msg);String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());userMessage.setTopic(userTopic);mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));} catch (Throwable e) {throw new MQClientException("select message queue threw exception.", e);}long costTime = System.currentTimeMillis() - beginStartTime;if (timeout < costTime) {throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");}if (mq != null) {return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);} else {throw new MQClientException("select message queue return null.", null);}}validateNameServerSetting();throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);}
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl
第一步,通过topic查找路由信息tryToFindTopicPublishInfo。分析同上
第二步,通过MessageQueueSelector,找出发送的MessageQueue
MessageQueueSelector的实现方式,可以自定义。提供了2种
SelectMessageQueueByRandom 随机一个
SelectMessageQueueByHash 根据arg的hashcode取模一个。适合局部有序
public class SelectMessageQueueByHash implements MessageQueueSelector {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {int value = arg.hashCode() % mqs.size();if (value < 0) {value = Math.abs(value);}return mqs.get(value);}
}
第三步、选择完MessageQueue后,调用sendKernelImpl发送消息
sendKernelImpl发送分析
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl
第一步、通过MessageQueue,获取对应的master节点地址
第二步、设置消息的唯一id。详情看以下实现。明显是客户端生成的,(由于不是分布式唯一ID的创建方式,有点怀疑会重复。后续查看)
org.apache.rocketmq.common.message.MessageClientIDSetter#createUniqID
第三步、对消息body做消息压缩
第四步、判断该消息是否是事务消息。给sysFlag位标志变量加标志
第五步、发送前可做一些自定义的检查CheckForbiddenHook、SendMessageHook
第六步、构建SendMessageRequestHeader requestHeader,将msg的一些内容设置到header上
第七部、根据发送模式communicationMode,调用不同的sendMessage方法
org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessage
switch (communicationMode) {case ASYNC:Message tmpMessage = msg;boolean messageCloned = false;if (msgBodyCompressed) {//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;msg.setBody(prevBody);}if (topicWithNamespace) {if (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;}msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);break;case ONEWAY:case SYNC:long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:assert false;break;
}
第八步、最终会调用NettyRemotingClient的发送方法
SYNC:
org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync
ONEWAY:
org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeOneway
ASYNC:
org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeAsync
总结
product的发送有几种API模式,其实目的都是为了选择MessageQueue
- 默认的发送,是根据topic的队列,做负载均衡的方式,topicPublishInfo内部维护着ThreadLocalIndex对象,做线程级别的负载均衡。而且默认都3次重试机会,意味可以选择不同队列做发送;
- 指定messageQueue,是调用方明确知道发送的MessageQueue,这种失败不会做重试;
- 指定MessageQueueSelector等,这种是通过传入的参数,计算出对应的MessageQueue,这种失败不会做重试,适合作为局部有序的发送方式
选择好队列后,就会调用org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl方法,主要是构建SendMessageRequestHeader,执行自定义的发送before和after的处理。
sendKernelImpl最终会调用NettyRemotingClient提供的接口,分别处理SYNC、ONEWAY、ASYNC的三种模式
相关文章:
rocketmq-product-send方法源码分析
先看有哪些send方法 首先说红圈的 有3个红圈。归类成3种发送方式。假设前提条件,发送的topic,有3个broker,每个broker总共4个write队列,总共有12个队列。 普通发送。负载均衡12个队列。指定超时时间指定MessageQueue,发送&#…...
centos下设置服务器开机自启动 redis
在客户服务器中,服务器重启,发现 Redis 没有重启, 可以按照类似的步骤来创建自启动脚本,并将它添加到定时任务中。 解决办法: 1. 创建自启动脚本 进入服务器并创建脚本文件,例如 /usr/local/bin/redis_…...
【Linux】APT 密钥管理:官方推荐的解决方案应对 apt-key 弃用
引言 在 Ubuntu 和 Debian 系统中,apt-key 命令用于管理 GPG 密钥,验证来自软件包存储库的包是否合法并且未被篡改。然而,从 Debian 11 和 Ubuntu 22.04 开始,apt-key 被弃用,并将在未来的版本中完全移除。因此&#…...
69.在 Vue 3 中使用 OpenLayers 拖拽实现放大区域的效果(DragPan)
引言 在现代 Web 开发中,地图功能已经成为许多应用的重要组成部分。OpenLayers 是一个功能强大的开源地图库,支持多种地图源和交互操作。Vue 3 是一个流行的前端框架,以其响应式数据和组件化开发著称。本文将介绍如何在 Vue 3 中集成 OpenLa…...
77,【1】.[CISCN2019 华东南赛区]Web4
有句英文,看看什么意思 好像也可以不看 进入靶场 点击蓝色字体 我勒个豆,百度哇 所以重点应该在url上,属于任意文件读取类型 接下来该判断框架了 常见的web框架如下 一,Python 框架 1.Flask URL 示例 1:http://…...
手撕B-树
一、概述 1.历史 B树(B-Tree)结构是一种高效存储和查询数据的方法,它的历史可以追溯到1970年代早期。B树的发明人Rudolf Bayer和Edward M. McCreight分别发表了一篇论文介绍了B树。这篇论文是1972年发表于《ACM Transactions on Database S…...
SQL 指南
SQL 指南 引言 SQL(Structured Query Language,结构化查询语言)是一种用于管理关系数据库系统的标准计算机语言。自1970年代问世以来,SQL已经成为了数据库管理和数据操作的事实标准。本文旨在为初学者和有经验的数据库用户提供一个全面的SQL指南,涵盖SQL的基础知识、高级…...
一文简单回顾复习Java基础概念
还是和往常一样,我以提问的方式回顾复习,今天回顾下Java小白入门应该知道的一些基础知识 Java语言有哪些特点呢? Java语言的特点有: 面向对象,主要是封装、继承、多态;平台无关性,“一次编写…...
Git上传了秘钥如何彻底修改包括历史记录【从安装到实战详细版】
使用 BFG Repo-Cleaner 清除 Git 仓库中的敏感信息 1. 背景介绍 在使用 Git 进行版本控制时,有时会不小心将敏感信息(如 API 密钥、密码等)提交到仓库中。即使后续删除,这些信息仍然存在于 Git 的历史记录中。本文将介绍如何使用…...
GCC之编译(8)AR打包命令
GCC之(8)AR二进制打包命令 Author: Once Day Date: 2025年1月23日 一位热衷于Linux学习和开发的菜鸟,试图谱写一场冒险之旅,也许终点只是一场白日梦… 漫漫长路,有人对你微笑过嘛… 全系列文章请查看专栏: Linux实践记录_Once-Day的博客-C…...
Linux二进制部署K8s集群的平滑升级教程
一、升级前的准备工作 备份集群配置和数据 备份/etc/kubernetes/目录,其中包含Kubernetes集群的配置文件。 备份/var/lib/etcd/目录,其中存储了etcd数据库的数据。 使用etcdctl工具备份etcd数据: bash复制 ETCDCTL_API3 etcdctl snapshot s…...
2.1.3 第一个工程,点灯!
新建工程 点击菜单栏左上角,新建工程或者选择“文件”-“新建工程”,选择工程类型“标准工程”选择设备类型和编程语言,并指定工程文件名及保存路径,如下图所示: 选择工程类型为“标准工程” 选择主模块机型&#x…...
图像处理算法研究的程序框架
目录 1 程序框架简介 2 C#图像读取、显示、保存模块 3 C动态库图像算法模块 4 C#调用C动态库 5 演示Demo 5.1 开发环境 5.2 功能介绍 5.3 下载地址 参考 1 程序框架简介 一个图像处理算法研究的常用程序逻辑框架,如下图所示 在该框架中,将图像处…...
计算机工程:解锁未来科技之门!
计算机工程与应用是一个充满无限可能性的领域。随着科技的迅猛发展,计算机技术已经深深渗透到我们生活的方方面面,从医疗、金融到教育,无一不在彰显着计算机工程的巨大魅力和潜力。 在医疗行业,计算机技术的应用尤为突出。比如&a…...
Linux初识——基本指令(2)
本文将继续从上篇末尾讲起,讲解我们剩下的基本指令 一、剩余的基本指令 1、mv mv指令是move(移动)的缩写,其功能为:1.剪切文件、目录。2.重命名 先演示下重命名,假设我想把当前目录下的di34改成dir5 那…...
单片机-STM32 WIFI模块--ESP8266 (十二)
1.WIFI模块--ESP8266 名字由来: Wi-Fi这个术语被人们普遍误以为是指无线保真(Wireless Fidelity),并且即便是Wi-Fi联盟本身也经常在新闻稿和文件中使用“Wireless Fidelity”这个词,Wi-Fi还出现在ITAA的一个论文中。…...
[C++技能提升]类注册
最近在做AI信息在各个平台流转的框架设计,想要设计一种可以灵活扩展、不改变原有代码的框架,了解到了类注册。 具体需求是这样的:AI算法在客户本地电脑和云端都有部署,原先AI在这两个平台下的输出格式并不统一,且每个…...
OpenHarmony 5.0.2 Release来了!
版本概述 OpenHarmony 5.0.2 Release版本对标准系统的能力进行持续完善,以快速迭代的方式推出API 14,相比5.0.1 Release版本,重点做出了如下特性新增或增强: 进一步增强ArkUI、图形图像的能力,提供更多组件的高级属性…...
80,【4】BUUCTF WEB [SUCTF 2018]MultiSQL
53,【3】BUUCTF WEB october 2019 Twice SQLinjection-CSDN博客 上面这个链接是我第一次接触二次注入 这道题也涉及了 对二次注入不熟悉的可以看看 BUUCTF出了点问题,打不开,以下面这两篇wp作为学习对象 [SUCTF 2018]MultiSQL-CSDN博客 …...
NR_shell运行流程简析
nr_shell 是一套开源 shell 框架,基于框架可创建终端交互功能。 为了记录终端输入指令,以及进行解析处理,nr_shell 提供了一套 cmd 结构体,具体如下:typedef struct static_cmd_function_struct {char cmd[NR_SHELL_CM…...
Prometheus部署及linux、mysql、monog、redis、RocketMQ、java_jvm监控配置
Prometheus部署及linux、mysql、monog、redis、RocketMQ、java_jvm监控配置 1.Prometheus部署1.2.Prometheus修改默认端口 2.grafana可视化页面部署3.alertmanager部署4.监控配置4.1.主机监控node-exporter4.2.监控mysql数据库mysqld_exporter4.3.监控mongod数据库mongodb_expo…...
问题排查 - TC397 CORE2 50MS/100MS任务不运行
1、问题描述 CORE2 的任务运行次数的计数值OsTask_100ms_Core2 - task_cnt[12]、OsTask_50ms_Core2 - task_cnt[16]不在累加,但是其他任务OsAlarm_1ms_Core2、OsAlarm_5ms_Core2、OsAlarm_10ms_Core2、OsAlarm_20ms_Core2 任务计数值累加正常。 如果是任务栈溢出&a…...
Spring FatJar写文件到RCE分析
背景 现在生产环境部署 spring boot 项目一般都是将其打包成一个 FatJar,即把所有依赖的第三方 jar 也打包进自身的 app.jar 中,最后以 java -jar app.jar 形式来运行整个项目。 运行时项目的 classpath 包括 app.jar 中的 BOOT-INF/classes 目录和 BO…...
百度APP iOS端磁盘优化实践(上)
01 概览 在APP的开发中,磁盘管理已成为不可忽视的部分。随着功能的复杂化和数据量的快速增长,如何高效管理磁盘空间直接关系到用户体验和APP性能。本文将结合磁盘管理的实践经验,详细介绍iOS沙盒环境下的文件存储规范,探讨业务缓…...
蓝桥杯之c++入门(一)【第一个c++程序】
目录 前言一、第⼀个C程序1.1 基础程序1.2 main函数1.3 字符串1.4 头文件1.5 cin 和 cout 初识1.6 名字空间1.7 注释 二、四道简单习题(点击跳转链接)练习1:Hello,World!练习2:打印飞机练习3:第⼆个整数练习4ÿ…...
14-6-1C++STL的list
(一)list容器的基本概念 list容器简介: 1.list是一个双向链表容器,可高效地进行插入删除元素 2.list不可以随机存取元素,所以不支持at.(pos)函数与[ ]操作符 (二)list容器头部和尾部的操作 list对象的默…...
【AI论文】Sigma:对查询、键和值进行差分缩放,以实现高效语言模型
摘要:我们推出了Sigma,这是一个专为系统领域设计的高效大型语言模型,其独特之处在于采用了包括DiffQKV注意力机制在内的新型架构,并在我们精心收集的系统领域数据上进行了预训练。DiffQKV注意力机制通过根据查询(Q&…...
InceptionV1_V2
目录 不同大小的感受野去提取特征 经典 Inception 网络的设计思路与运行流程 背景任务:图像分类(以 CIFAR-10 数据集为例) Inception 网络的设计思路 Inception 网络的运行流程 打个比方 多个损失函数的理解 1. 为什么需要多个损失函数&#…...
ORB-SLAM2源码学习:Initializer.cc⑧: Initializer::CheckRT检验三角化结果
前言 ORB-SLAM2源码学习:Initializer.cc⑦: Initializer::Triangulate特征点对的三角化_cv::svd::compute-CSDN博客 经过上面的三角化我们成功得到了三维点,但是经过三角化成功的三维点并不一定是有效的,需要筛选才能作为初始化地图点。 …...
【ArcGIS微课1000例】0141:提取多波段影像中的单个波段
文章目录 一、波段提取函数二、加载单波段导出问题描述:如下图所示,img格式的时序NDVI数据有24个波段。现在需要提取某一个波段,该怎样操作? 一、波段提取函数 首先加载多波段数据。点击【窗口】→【影像分析】。 选择需要处理的多波段影像,点击下方的【添加函数】。 在多…...
