当前位置: 首页 > news >正文

RocketMQ源码学习笔记:Producer发送消息流程

这是本人学习的总结,主要学习资料如下

  • 马士兵教育
  • rocketMq官方文档

目录

  • 1、Overview
  • 2、验证消息
  • 3、查找路由
  • 4、选择消息发送队列
    • 4.1、选择队列的策略
    • 4.2、源码阅读
      • 4.2.1、轮询规避
      • 4.2.2、故障延迟规避
        • 4.2.2.1、计算规避时间
        • 4.2.2.2、选择队列
      • 4.2.3、ThreadLocal的使用
  • 5、发送消息
    • 5.1、客户端建立的时间


1、Overview

消息发送主要可以分成下面四个步骤。

  1. 验证消息
  2. 查找路由
  3. 选择队列
  4. 消息发送

之后从源码查看四个步骤的具体内容。

我们建立一个DefaultMQProducer之后,调用DefaultMQProducer#send()方法就可发送信息。

查看send()的代码最终会来到DefaultMQProducerImpl#sendDefaultImpl(),我们从这里开始看源码。

2、验证消息

发送前必然验证一下消息。

主要是检验消息的状态,一些必要的值不能为空等。

this.makeSureStateOK();
// 1、检查消息
Validators.checkMessage(msg, this.defaultMQProducer);

公司内部想设置一些新的规则用来发送前拦截信息就适合放在checkMessage()里。

这部分没有太多内容。


3、查找路由

所谓的路由是指可用的Broker的信息,包括地址,具体的消息队列等。

下面这一句获取到路由。

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

DefaultMQProducer内部缓存这路由信息,维护在ConcurrentHashMap<String, TopicPublishInfo>

private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>();

tryToFindTopicPublishInfo()中会先检查路由信息是否存在,不存在还需要从NameServer中获取路由列表。

this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);

更新的时候会加上一个ReentrantLock,更新结束后释放。

获取到路由信息后开始选择队列发送消息。


4、选择消息发送队列

4.1、选择队列的策略

得到路由信息后就开始选择消息队列发送信息。

选择队列有两种策略

  • 轮询规避:轮询选择队列。如果上次发送消息失败,那就消息需要重新发送,这时就需要规避掉上次发送失败的队列,寻找下一个队列发送。
  • 故障延迟策略:在选择队列发送时根据以往发送时长判断该队列的Broker是否可用。对于发送失败的BrokerProducer会规避该Broker一段时间。

这是发送消息的流程图。

假设我们的Broker是集群,有两个Broker。消息会选择其中一个Broker发送消息,如果失败就重试,直到发送成功或者超过重试次数。
在这里插入图片描述


4.2、源码阅读

这里会探索源码如何实现这两种队列选择策略。

选择队列的入口在DefaultMQProducerImpl#sendDefaultImpl -> this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
在这里插入图片描述
从入口进入查看代码,最终在MQFaultStrategy#selectOneMessageQueue,代码通过sendLatencyFaultEnable这个字段来选择不同的选择策略。

在这里插入图片描述

4.2.1、轮询规避

这是轮询规避的源码。
在这里插入图片描述
其中lastBrokerName是上一次消息发送时选择的broker。这代表该消息上一次发送失败了,所以记录着上一次失败的broker以在这次选择Broker时规避他。

所以lastBrokerName==null时该消息是第一次发送,不需要规避,直接随机选择一个队列发送。

如果上一次发送失败,则开始轮询选择一个队列,保证这个新选出的队列和上一个不同后就可以返回。


4.2.2、故障延迟规避

4.2.2.1、计算规避时间

故障延迟规避策略需要记录发送时间并计算。在看选择Broker的代码时需要看看源码如何记录发送时间并计算出规避时间的。

计算规避时间的代码在this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);。这时消息正常发送会调用一次这个方法;如果出现异常在catch快也会调用这个方法计算规避时间。
在这里插入图片描述

进入这个方法,代码如下。

需要注意isolation=true时表示消息发送出现异常,这时便认为延迟时长是30000ms

同时也可以看到sendLatencyFaultEnable==true表示开启故障规避策略,这种情况才需要计算规避时间。选择Broker时也是通过这个属性判断使用过故障规避还是轮询规避。
在这里插入图片描述

规避时间的计算比较简单,阿里根据自己的经验设置了一个对照表来计算时间,如下图所示。比如延迟是550ms以内的Broker不用规避;延迟在550~1000ms的需要规避30s

在这里插入图片描述

这里跳过计算规避时间的代码细节,进入下一行代码this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);

代码如下,它其实就是将不可用的Broker维护在faultItemTable中,并且记录着解禁时间。以后选择Broker会通过这个集合查看Broker是否可用。
在这里插入图片描述

4.2.2.2、选择队列

我们回到选择Broker的代码MQFaultStrategy#selectOneMessageQueue,下图是相关代码。

在这里插入图片描述
它的大概流程是,轮询队列,如果可用就返回。实在找不到可用的就随机选择一个Broker发送。

它通过latencyFaultTolerance.isAvailable(mq.getBrokerName())判断队列是否可用,里面实际就是通过前面讲到的faultItemTable来查看队列是否可用。


4.2.3、ThreadLocal的使用

在选择队列时,无论是轮询规避还是故障延迟规避都需要循环遍历messageQueue找到适合的queue发送信息。

获取下标的方式用到了ThreadLocal。如下图所示,sendWhichQueue本质上就是一个ThreadLocal<Integer>对象。

请添加图片描述
生产者发送信息时可能会有多个线程同时发信息。

这些线程发送信息时应该各自维护一个消息队列的下标,这样每个线程发送信息时才会比较均匀地向每个队列都发送信息。

另外这些线程发送信息时可能会指定消息队列的id,所以线程各自维护一个消息队列的下标是很有必要的。

这个场景就很适合ThreadLocal,选择消息队列时用ThreadLocal来维护下标。



5、发送消息

5.1、客户端建立的时间

客户端发送消息时,建立HTTP连接是在send()方法中而不是在start()方法中。

站在设计者的角度需要考虑到,开发者在start()方法后可能还需要过一段时间才会真正发送信息,甚至不发信息。

那么建立HTTP连接放在start()就比较浪费资源,所以建立HTTP连接放在了send()方法中。

相关文章:

RocketMQ源码学习笔记:Producer发送消息流程

这是本人学习的总结&#xff0c;主要学习资料如下 马士兵教育rocketMq官方文档 目录 1、Overview2、验证消息3、查找路由4、选择消息发送队列4.1、选择队列的策略4.2、源码阅读4.2.1、轮询规避4.2.2、故障延迟规避4.2.2.1、计算规避时间4.2.2.2、选择队列 4.2.3、ThreadLocal的…...

kotlin flow collect collectLatest 区别

在 Kotlin 协程库中&#xff0c;collect 和 collectLatest 都是用于收集 Flow 中发射的数据的方法&#xff0c;但它们在处理数据和响应新数据的方式上有所不同。 collect collect 是一个挂起函数&#xff0c;用于收集 Flow 中发射的所有数据。它会按顺序处理每一个发射的数据…...

ELK集群搭建

ELK集群搭建 文章目录 ELK集群搭建1.环境准备2.Elasticsearch环境搭建1.创建es账户并设置密码2.选择对应版本进行下载3.编辑配置文件4.设置JVM堆大小 #7.0默认为4G5.创建es数据及日志存储目录6.修改安装目录和存储目录权限 3.系统优化1.增加最大文件打开数2.增加最大进程数3.增…...

zookeeper+kafka消息队列集群部署

一.消息队列 1、什么是消息队列 消息&#xff08;Message&#xff09;是指在应用间传送的数据。消息可以非常简单&#xff0c;比如只包含文本字符串&#xff0c;也可以更复杂&#xff0c;可能包含嵌入对象。 消息队列&#xff08;MessageQueue&#xff09;是一种在软件系统中用…...

LLM_入门指南(零基础搭建大模型)

本文主要介绍大模型的prompt&#xff0c;并且给出实战教程。即使零基础也可以实现大模型的搭建。 内容&#xff1a;初级阶段的修炼心法&#xff0c;帮助凝聚和提升内力&#xff0c;为后续修炼打下基础。 1、prompt 1.1含义和作用 prompt就是提示工程的意思。在大型语言模型中…...

Element Plus 与 Vue 3:构建现代化 Web 应用的完美搭档

引言 Element Plus是基于Vue 3的组件库&#xff0c;它继承了Element UI的优秀基因&#xff0c;为Vue 3应用提供了丰富的界面组件。Element Plus不仅拥有与Element UI相同的高质量组件&#xff0c;还针对Vue 3进行了优化和更新&#xff0c;确保了与Vue 3的无缝集成。 环境准备…...

线程间通信与变量修改感知:几种常用方法

线程间通信与变量修改感知&#xff1a;几种常用方法 1. 使用volatile关键字2. 使用synchronized关键字3. 使用wait/notify/notifyAll机制4. 使用轮询&#xff08;Polling&#xff09; &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; 在Java…...

前后端通信 —— HTTP/HTTPS

目录 一、HTTP/HTTPS 简介 1、HTTP 2、HTTPS 二、HTTP 工作过程 三、HTTP 消息 1、HTTP消息结构 2、HTTP消息示例 四、HTTP 方法&#xff08;常用&#xff09; 1、GET 2、POST 3、PUT 4、DELETE 5、GET与POST对比 五、HTTP 状态码&#xff08;常用&#xff09; …...

人工智能 (AI) 应用:一个高精度ASD 诊断和照护支持系统

自闭症谱系障碍&#xff08;ASD&#xff09;是一种多方面的神经发育状况&#xff0c;影响全球大约1/100的儿童&#xff0c;而在中国&#xff0c;这一比例高达1.8%&#xff08;引用自《中国0&#xff5e;6岁儿童孤独症谱系障碍筛查患病现状》&#xff09;&#xff0c;男童为2.6%…...

C# 1.方法

方法组成&#xff1a; 1.修饰符&#xff1a;public一般定义共有的 2.方法返回值&#xff1a;void 无返回值; 非void&#xff0c;可以写成其他类型例如int&#xff0c;float&#xff0c;string,string[]等 3.方法名&#xff1a;Add 大驼峰命名法&#xff0c;每一个首字符大写。…...

【C++进阶学习】第七弹——AVL树——树形结构存储数据的经典模块

二叉搜索树&#xff1a;【C进阶学习】第五弹——二叉搜索树——二叉树进阶及set和map的铺垫-CSDN博客 目录 一、AVL树的概念 二、AVL树的原理与实现 AVL树的节点 AVL树的插入 AVL树的旋转 AVL树的打印 AVL树的检查 三、实现AVL树的完整代码 四、总结 前言&#xff1a…...

px,em,rem之间的关系换算

px,em,rem之间的换算 px&#xff1a;普通大小 em&#xff1a;相对单位&#xff0c;相对于父元素的字体大小 rem&#xff1a;相对单位&#xff0c;相对于根元素&#xff08;html&#xff09;的字体大小 <!DOCTYPE html> <html lang"en"> <head>…...

HTTP——POST请求详情

POST请求 【传输实体文本】向指定资源提交数据进行处理请求&#xff08;例如提交表单或者上传文件&#xff09;。数据被包含在POST请求体中。POST 请求可能会导致新的资源的建立或已有资源的修改。 场景&#xff1a; 1. 提交用户注册信息。 2. 提交修改的用户信息。 常见的…...

外包干了1个月,技术明显退步。。。

有一种打工人的羡慕&#xff0c;叫做“大厂”。 真是年少不知大厂香&#xff0c;错把青春插稻秧。 但是&#xff0c;在深圳有一群比大厂员工更庞大的群体&#xff0c;他们顶着大厂的“名”&#xff0c;做着大厂的工作&#xff0c;还可以享受大厂的伙食&#xff0c;却没有大厂…...

LeetCode加油站(贪心算法/暴力,分析其时间和空间复杂度)

题目描述 一.原本暴力算法 最初的想法是&#xff1a;先比较gas数组和cost数组的大小&#xff0c;找到可以作为起始点的站点(因为如果你起始点的油还不能到达下一个站点&#xff0c;就不能作为起始点)。当找到过后&#xff0c;再去依次顺序跑一圈&#xff0c;如果剩余的油为负数…...

5.1 软件工程基础知识-软件工程概述

软件工程诞生原因 软件工程基本原理&#xff08;容易被考到&#xff09; 软件生存周期 能力成熟度模型 - CMM 能力成熟度模型 - CMMI 真题...

HttpUtil工具

http工具 用到的依赖 <dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.13</version></dependency><dependency><groupId>org.apache.httpcomponent…...

并发编程-锁的分类

锁的分类 可重入锁&不可重入锁 可重入&#xff1a;当一个线程获取某个锁后&#xff0c;再次获取这个锁的时候是可以直接拿到的。不可重入&#xff1a;当一个线程获取某个锁之后&#xff0c;再次获取这个锁的时候拿不到&#xff0c;必须等自己先释放锁再获取。synchronized…...

K8S系列-Kubernetes基本概念及Pod、Deployment、Service的使用

一、Kubernetes 的基本概念和术语 一、资源对象 ​ Kubernetes 的基本概念和术语大多是围绕资源对象 Resource Object 来说的&#xff0c;而资源对象在总体上可分为以下两类: 1、某种资源的对象 ​ 例如节点 Node) Pod 服务 (Service) 、存储卷 (Volume&#xff09;。 2、…...

在VSCode上创建Vue项目详细教程

1.前期环境准备 搭建Vue项目使用的是Vue-cli 脚手架。前期环境需要准备Node.js环境&#xff0c;就像Java开发要依赖JDK环境一样。 1.1 Node.js环境配置 1&#xff09;具体安装步骤操作即可&#xff1a; npm 安装教程_如何安装npm-CSDN博客文章浏览阅读836次。本文主要在Win…...

最新SpringBoot+SpringCloud+Nacos微服务框架分享

文章目录 前言一、服务规划二、架构核心1.cloud的pom2.gateway的异常handler3.gateway的filter4、admin的pom5、admin的登录核心 三、code-helper分享总结 前言 最近有个活蛮赶的&#xff0c;根据Excel列的需求预估的工时直接打骨折&#xff0c;不要问我为什么&#xff0c;主要…...

鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院查看报告小程序

一、开发环境准备 ​​工具安装​​&#xff1a; 下载安装DevEco Studio 4.0&#xff08;支持HarmonyOS 5&#xff09;配置HarmonyOS SDK 5.0确保Node.js版本≥14 ​​项目初始化​​&#xff1a; ohpm init harmony/hospital-report-app 二、核心功能模块实现 1. 报告列表…...

MySQL 8.0 OCP 英文题库解析(十三)

Oracle 为庆祝 MySQL 30 周年&#xff0c;截止到 2025.07.31 之前。所有人均可以免费考取原价245美元的MySQL OCP 认证。 从今天开始&#xff0c;将英文题库免费公布出来&#xff0c;并进行解析&#xff0c;帮助大家在一个月之内轻松通过OCP认证。 本期公布试题111~120 试题1…...

JS设计模式(4):观察者模式

JS设计模式(4):观察者模式 一、引入 在开发中&#xff0c;我们经常会遇到这样的场景&#xff1a;一个对象的状态变化需要自动通知其他对象&#xff0c;比如&#xff1a; 电商平台中&#xff0c;商品库存变化时需要通知所有订阅该商品的用户&#xff1b;新闻网站中&#xff0…...

宇树科技,改名了!

提到国内具身智能和机器人领域的代表企业&#xff0c;那宇树科技&#xff08;Unitree&#xff09;必须名列其榜。 最近&#xff0c;宇树科技的一项新变动消息在业界引发了不少关注和讨论&#xff0c;即&#xff1a; 宇树向其合作伙伴发布了一封公司名称变更函称&#xff0c;因…...

uniapp 集成腾讯云 IM 富媒体消息(地理位置/文件)

UniApp 集成腾讯云 IM 富媒体消息全攻略&#xff08;地理位置/文件&#xff09; 一、功能实现原理 腾讯云 IM 通过 消息扩展机制 支持富媒体类型&#xff0c;核心实现方式&#xff1a; 标准消息类型&#xff1a;直接使用 SDK 内置类型&#xff08;文件、图片等&#xff09;自…...

写一个shell脚本,把局域网内,把能ping通的IP和不能ping通的IP分类,并保存到两个文本文件里

写一个shell脚本&#xff0c;把局域网内&#xff0c;把能ping通的IP和不能ping通的IP分类&#xff0c;并保存到两个文本文件里 脚本1 #!/bin/bash #定义变量 ip10.1.1 #循环去ping主机的IP for ((i1;i<10;i)) doping -c1 $ip.$i &>/dev/null[ $? -eq 0 ] &&am…...

【系统架构设计师-2025上半年真题】综合知识-参考答案及部分详解(回忆版)

更多内容请见: 备考系统架构设计师-专栏介绍和目录 文章目录 【第1题】【第2题】【第3题】【第4题】【第5题】【第6题】【第7题】【第8题】【第9题】【第10题】【第11题】【第12题】【第13题】【第14题】【第15题】【第16题】【第17题】【第18题】【第19题】【第20~21题】【第…...

【动态规划】B4336 [中山市赛 2023] 永别|普及+

B4336 [中山市赛 2023] 永别 题目描述 你做了一个梦&#xff0c;梦里有一个字符串&#xff0c;这个字符串无论正着读还是倒着读都是一样的&#xff0c;例如&#xff1a; a b c b a \tt abcba abcba 就符合这个条件。 但是你醒来时不记得梦中的字符串是什么&#xff0c;只记得…...

Ansible+Zabbix-agent2快速实现对多主机监控

ansible Ansible 是一款开源的自动化工具&#xff0c;用于配置管理&#xff08;Configuration Management&#xff09;、应用部署&#xff08;Application Deployment&#xff09;、任务自动化&#xff08;Task Automation&#xff09;和编排&#xff08;Orchestration&#xf…...