RocketMQ源码学习笔记:Producer发送消息流程
这是本人学习的总结,主要学习资料如下
- 马士兵教育
- rocketMq官方文档
目录
- 1、Overview
- 2、验证消息
- 3、查找路由
- 4、选择消息发送队列
- 4.1、选择队列的策略
- 4.2、源码阅读
- 4.2.1、轮询规避
- 4.2.2、故障延迟规避
- 4.2.2.1、计算规避时间
- 4.2.2.2、选择队列
- 4.2.3、ThreadLocal的使用
- 5、发送消息
- 5.1、客户端建立的时间
1、Overview
消息发送主要可以分成下面四个步骤。
- 验证消息
- 查找路由
- 选择队列
- 消息发送
之后从源码查看四个步骤的具体内容。
我们建立一个DefaultMQProducer之后,调用DefaultMQProducer#send()方法就可发送信息。
查看send()的代码最终会来到DefaultMQProducerImpl#sendDefaultImpl(),我们从这里开始看源码。
2、验证消息
发送前必然验证一下消息。
主要是检验消息的状态,一些必要的值不能为空等。
this.makeSureStateOK();
// 1、检查消息
Validators.checkMessage(msg, this.defaultMQProducer);
公司内部想设置一些新的规则用来发送前拦截信息就适合放在checkMessage()里。
这部分没有太多内容。
3、查找路由
所谓的路由是指可用的Broker的信息,包括地址,具体的消息队列等。
下面这一句获取到路由。
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
在DefaultMQProducer内部缓存这路由信息,维护在ConcurrentHashMap<String, TopicPublishInfo>中
private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>();
在tryToFindTopicPublishInfo()中会先检查路由信息是否存在,不存在还需要从NameServer中获取路由列表。
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
更新的时候会加上一个ReentrantLock,更新结束后释放。
获取到路由信息后开始选择队列发送消息。
4、选择消息发送队列
4.1、选择队列的策略
得到路由信息后就开始选择消息队列发送信息。
选择队列有两种策略
- 轮询规避:轮询选择队列。如果上次发送消息失败,那就消息需要重新发送,这时就需要规避掉上次发送失败的队列,寻找下一个队列发送。
- 故障延迟策略:在选择队列发送时根据以往发送时长判断该队列的
Broker是否可用。对于发送失败的Broker,Producer会规避该Broker一段时间。
这是发送消息的流程图。
假设我们的Broker是集群,有两个Broker。消息会选择其中一个Broker发送消息,如果失败就重试,直到发送成功或者超过重试次数。

4.2、源码阅读
这里会探索源码如何实现这两种队列选择策略。
选择队列的入口在DefaultMQProducerImpl#sendDefaultImpl -> this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

从入口进入查看代码,最终在MQFaultStrategy#selectOneMessageQueue,代码通过sendLatencyFaultEnable这个字段来选择不同的选择策略。

4.2.1、轮询规避
这是轮询规避的源码。

其中lastBrokerName是上一次消息发送时选择的broker。这代表该消息上一次发送失败了,所以记录着上一次失败的broker以在这次选择Broker时规避他。
所以lastBrokerName==null时该消息是第一次发送,不需要规避,直接随机选择一个队列发送。
如果上一次发送失败,则开始轮询选择一个队列,保证这个新选出的队列和上一个不同后就可以返回。
4.2.2、故障延迟规避
4.2.2.1、计算规避时间
故障延迟规避策略需要记录发送时间并计算。在看选择Broker的代码时需要看看源码如何记录发送时间并计算出规避时间的。
计算规避时间的代码在this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);。这时消息正常发送会调用一次这个方法;如果出现异常在catch快也会调用这个方法计算规避时间。

进入这个方法,代码如下。
需要注意isolation=true时表示消息发送出现异常,这时便认为延迟时长是30000ms。
同时也可以看到sendLatencyFaultEnable==true表示开启故障规避策略,这种情况才需要计算规避时间。选择Broker时也是通过这个属性判断使用过故障规避还是轮询规避。

规避时间的计算比较简单,阿里根据自己的经验设置了一个对照表来计算时间,如下图所示。比如延迟是550ms以内的Broker不用规避;延迟在550~1000ms的需要规避30s。

这里跳过计算规避时间的代码细节,进入下一行代码this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);。
代码如下,它其实就是将不可用的Broker维护在faultItemTable中,并且记录着解禁时间。以后选择Broker会通过这个集合查看Broker是否可用。

4.2.2.2、选择队列
我们回到选择Broker的代码MQFaultStrategy#selectOneMessageQueue,下图是相关代码。

它的大概流程是,轮询队列,如果可用就返回。实在找不到可用的就随机选择一个Broker发送。
它通过latencyFaultTolerance.isAvailable(mq.getBrokerName())判断队列是否可用,里面实际就是通过前面讲到的faultItemTable来查看队列是否可用。
4.2.3、ThreadLocal的使用
在选择队列时,无论是轮询规避还是故障延迟规避都需要循环遍历messageQueue找到适合的queue发送信息。
获取下标的方式用到了ThreadLocal。如下图所示,sendWhichQueue本质上就是一个ThreadLocal<Integer>对象。

生产者发送信息时可能会有多个线程同时发信息。
这些线程发送信息时应该各自维护一个消息队列的下标,这样每个线程发送信息时才会比较均匀地向每个队列都发送信息。
另外这些线程发送信息时可能会指定消息队列的id,所以线程各自维护一个消息队列的下标是很有必要的。
这个场景就很适合ThreadLocal,选择消息队列时用ThreadLocal来维护下标。
5、发送消息
5.1、客户端建立的时间
客户端发送消息时,建立HTTP连接是在send()方法中而不是在start()方法中。
站在设计者的角度需要考虑到,开发者在start()方法后可能还需要过一段时间才会真正发送信息,甚至不发信息。
那么建立HTTP连接放在start()就比较浪费资源,所以建立HTTP连接放在了send()方法中。
相关文章:
RocketMQ源码学习笔记:Producer发送消息流程
这是本人学习的总结,主要学习资料如下 马士兵教育rocketMq官方文档 目录 1、Overview2、验证消息3、查找路由4、选择消息发送队列4.1、选择队列的策略4.2、源码阅读4.2.1、轮询规避4.2.2、故障延迟规避4.2.2.1、计算规避时间4.2.2.2、选择队列 4.2.3、ThreadLocal的…...
kotlin flow collect collectLatest 区别
在 Kotlin 协程库中,collect 和 collectLatest 都是用于收集 Flow 中发射的数据的方法,但它们在处理数据和响应新数据的方式上有所不同。 collect collect 是一个挂起函数,用于收集 Flow 中发射的所有数据。它会按顺序处理每一个发射的数据…...
ELK集群搭建
ELK集群搭建 文章目录 ELK集群搭建1.环境准备2.Elasticsearch环境搭建1.创建es账户并设置密码2.选择对应版本进行下载3.编辑配置文件4.设置JVM堆大小 #7.0默认为4G5.创建es数据及日志存储目录6.修改安装目录和存储目录权限 3.系统优化1.增加最大文件打开数2.增加最大进程数3.增…...
zookeeper+kafka消息队列集群部署
一.消息队列 1、什么是消息队列 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。 消息队列(MessageQueue)是一种在软件系统中用…...
LLM_入门指南(零基础搭建大模型)
本文主要介绍大模型的prompt,并且给出实战教程。即使零基础也可以实现大模型的搭建。 内容:初级阶段的修炼心法,帮助凝聚和提升内力,为后续修炼打下基础。 1、prompt 1.1含义和作用 prompt就是提示工程的意思。在大型语言模型中…...
Element Plus 与 Vue 3:构建现代化 Web 应用的完美搭档
引言 Element Plus是基于Vue 3的组件库,它继承了Element UI的优秀基因,为Vue 3应用提供了丰富的界面组件。Element Plus不仅拥有与Element UI相同的高质量组件,还针对Vue 3进行了优化和更新,确保了与Vue 3的无缝集成。 环境准备…...
线程间通信与变量修改感知:几种常用方法
线程间通信与变量修改感知:几种常用方法 1. 使用volatile关键字2. 使用synchronized关键字3. 使用wait/notify/notifyAll机制4. 使用轮询(Polling) 💖The Begin💖点点关注,收藏不迷路💖 在Java…...
前后端通信 —— HTTP/HTTPS
目录 一、HTTP/HTTPS 简介 1、HTTP 2、HTTPS 二、HTTP 工作过程 三、HTTP 消息 1、HTTP消息结构 2、HTTP消息示例 四、HTTP 方法(常用) 1、GET 2、POST 3、PUT 4、DELETE 5、GET与POST对比 五、HTTP 状态码(常用) …...
人工智能 (AI) 应用:一个高精度ASD 诊断和照护支持系统
自闭症谱系障碍(ASD)是一种多方面的神经发育状况,影响全球大约1/100的儿童,而在中国,这一比例高达1.8%(引用自《中国0~6岁儿童孤独症谱系障碍筛查患病现状》),男童为2.6%…...
C# 1.方法
方法组成: 1.修饰符:public一般定义共有的 2.方法返回值:void 无返回值; 非void,可以写成其他类型例如int,float,string,string[]等 3.方法名:Add 大驼峰命名法,每一个首字符大写。…...
【C++进阶学习】第七弹——AVL树——树形结构存储数据的经典模块
二叉搜索树:【C进阶学习】第五弹——二叉搜索树——二叉树进阶及set和map的铺垫-CSDN博客 目录 一、AVL树的概念 二、AVL树的原理与实现 AVL树的节点 AVL树的插入 AVL树的旋转 AVL树的打印 AVL树的检查 三、实现AVL树的完整代码 四、总结 前言:…...
px,em,rem之间的关系换算
px,em,rem之间的换算 px:普通大小 em:相对单位,相对于父元素的字体大小 rem:相对单位,相对于根元素(html)的字体大小 <!DOCTYPE html> <html lang"en"> <head>…...
HTTP——POST请求详情
POST请求 【传输实体文本】向指定资源提交数据进行处理请求(例如提交表单或者上传文件)。数据被包含在POST请求体中。POST 请求可能会导致新的资源的建立或已有资源的修改。 场景: 1. 提交用户注册信息。 2. 提交修改的用户信息。 常见的…...
外包干了1个月,技术明显退步。。。
有一种打工人的羡慕,叫做“大厂”。 真是年少不知大厂香,错把青春插稻秧。 但是,在深圳有一群比大厂员工更庞大的群体,他们顶着大厂的“名”,做着大厂的工作,还可以享受大厂的伙食,却没有大厂…...
LeetCode加油站(贪心算法/暴力,分析其时间和空间复杂度)
题目描述 一.原本暴力算法 最初的想法是:先比较gas数组和cost数组的大小,找到可以作为起始点的站点(因为如果你起始点的油还不能到达下一个站点,就不能作为起始点)。当找到过后,再去依次顺序跑一圈,如果剩余的油为负数…...
5.1 软件工程基础知识-软件工程概述
软件工程诞生原因 软件工程基本原理(容易被考到) 软件生存周期 能力成熟度模型 - CMM 能力成熟度模型 - CMMI 真题...
HttpUtil工具
http工具 用到的依赖 <dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.13</version></dependency><dependency><groupId>org.apache.httpcomponent…...
并发编程-锁的分类
锁的分类 可重入锁&不可重入锁 可重入:当一个线程获取某个锁后,再次获取这个锁的时候是可以直接拿到的。不可重入:当一个线程获取某个锁之后,再次获取这个锁的时候拿不到,必须等自己先释放锁再获取。synchronized…...
K8S系列-Kubernetes基本概念及Pod、Deployment、Service的使用
一、Kubernetes 的基本概念和术语 一、资源对象 Kubernetes 的基本概念和术语大多是围绕资源对象 Resource Object 来说的,而资源对象在总体上可分为以下两类: 1、某种资源的对象 例如节点 Node) Pod 服务 (Service) 、存储卷 (Volume)。 2、…...
在VSCode上创建Vue项目详细教程
1.前期环境准备 搭建Vue项目使用的是Vue-cli 脚手架。前期环境需要准备Node.js环境,就像Java开发要依赖JDK环境一样。 1.1 Node.js环境配置 1)具体安装步骤操作即可: npm 安装教程_如何安装npm-CSDN博客文章浏览阅读836次。本文主要在Win…...
React hook之useRef
React useRef 详解 useRef 是 React 提供的一个 Hook,用于在函数组件中创建可变的引用对象。它在 React 开发中有多种重要用途,下面我将全面详细地介绍它的特性和用法。 基本概念 1. 创建 ref const refContainer useRef(initialValue);initialValu…...
SciencePlots——绘制论文中的图片
文章目录 安装一、风格二、1 资源 安装 # 安装最新版 pip install githttps://github.com/garrettj403/SciencePlots.git# 安装稳定版 pip install SciencePlots一、风格 简单好用的深度学习论文绘图专用工具包–Science Plot 二、 1 资源 论文绘图神器来了:一行…...
LeetCode - 394. 字符串解码
题目 394. 字符串解码 - 力扣(LeetCode) 思路 使用两个栈:一个存储重复次数,一个存储字符串 遍历输入字符串: 数字处理:遇到数字时,累积计算重复次数左括号处理:保存当前状态&a…...
智能在线客服平台:数字化时代企业连接用户的 AI 中枢
随着互联网技术的飞速发展,消费者期望能够随时随地与企业进行交流。在线客服平台作为连接企业与客户的重要桥梁,不仅优化了客户体验,还提升了企业的服务效率和市场竞争力。本文将探讨在线客服平台的重要性、技术进展、实际应用,并…...
智能分布式爬虫的数据处理流水线优化:基于深度强化学习的数据质量控制
在数字化浪潮席卷全球的今天,数据已成为企业和研究机构的核心资产。智能分布式爬虫作为高效的数据采集工具,在大规模数据获取中发挥着关键作用。然而,传统的数据处理流水线在面对复杂多变的网络环境和海量异构数据时,常出现数据质…...
Linux nano命令的基本使用
参考资料 GNU nanoを使いこなすnano基础 目录 一. 简介二. 文件打开2.1 普通方式打开文件2.2 只读方式打开文件 三. 文件查看3.1 打开文件时,显示行号3.2 翻页查看 四. 文件编辑4.1 Ctrl K 复制 和 Ctrl U 粘贴4.2 Alt/Esc U 撤回 五. 文件保存与退出5.1 Ctrl …...
Rust 开发环境搭建
环境搭建 1、开发工具RustRover 或者vs code 2、Cygwin64 安装 https://cygwin.com/install.html 在工具终端执行: rustup toolchain install stable-x86_64-pc-windows-gnu rustup default stable-x86_64-pc-windows-gnu 2、Hello World fn main() { println…...
jdbc查询mysql数据库时,出现id顺序错误的情况
我在repository中的查询语句如下所示,即传入一个List<intager>的数据,返回这些id的问题列表。但是由于数据库查询时ID列表的顺序与预期不一致,会导致返回的id是从小到大排列的,但我不希望这样。 Query("SELECT NEW com…...
Matlab实现任意伪彩色图像可视化显示
Matlab实现任意伪彩色图像可视化显示 1、灰度原始图像2、RGB彩色原始图像 在科研研究中,如何展示好看的实验结果图像非常重要!!! 1、灰度原始图像 灰度图像每个像素点只有一个数值,代表该点的亮度(或…...
aardio 自动识别验证码输入
技术尝试 上周在发学习日志时有网友提议“在网页上识别验证码”,于是尝试整合图像识别与网页自动化技术,完成了这套模拟登录流程。核心思路是:截图验证码→OCR识别→自动填充表单→提交并验证结果。 代码在这里 import soImage; import we…...
