当前位置: 首页 > news >正文

从零开始读RocketMq源码(二)Message的发送详解

目录

前言

准备

消息发送方式

深入源码

消息发送模式

选择发送方式

同步发送消息

校验消息体

获取Topic订阅信息

高级特性-消息重投

选择消息队列-负载均衡

装载消息体发送消息

压缩消息内容

构造发送message的请求的Header

更新broker故障信息

异步发送消息

总结


前言

上一篇我们已经对RocketMq生产者启动源码进行了学习《从零开始读RocketMq源码(一)生产者启动》那么本篇我们将对生产者发送消息的源码进行学习

准备

如果没看前一篇的,这里还是要强调本篇的rocketmq版本

首先我们从github上拉取rocketmqd的源码链接到本地,使用idea打开。

源码地址:https://github.com/apache/rocketmq

目前最新版本为:5.2.0

那么我们在idea上切换分支为 release-5.2.0

注:请保持和本篇的版本一直,方便后面文章中给出的代码块定位

消息发送方式

在读源码之前我们先了解下mq支持的发送消息的类型。

消息的发送方式有三种,但我们最常用的是同步的方式发送

  • sync 同步:消息发送后,必须等待消息的发送结果返回后,才能发送下一条消息
  • async 异步:消息发送后,不用等待返回结果,直接发送下一条数据,但会设置一个回调方法接收返回结果
  • oneway 单向:消息发送后,不会返回结果,也不会等待,也不会设置回调方法。适用场景日志收集、监控数据和快速通知等对可靠性要求不高但需要高性能的场景

深入源码

首先进入外层的producer.send()方法中

//源码位置:
//包名:org.apache.rocketmq.example.simple
//文件名:Producer
//行数:42
SendResult sendResult = producer.send(msg);

消息发送模式

//源码位置:
//包名:org.apache.rocketmq.client.producer
//文件名:DefaultMQProducer
//行数:431
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {msg.setTopic(withNamespace(msg.getTopic()));//批量发送if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {return sendByAccumulator(msg, null, null);} else {//单条发送return sendDirect(msg, null, null);}
}
  1. 自动批处理发送 -sendByAccumulator()
  • 该方法用于将消息累积到一个批处理容器中,等待足够的消息数量或达到某个时间间隔后,再进行批量发送。
  • 可以显著减少发送次数,提高吞吐量。

     2. 直接发送 -sendDirect()

  • 适用于即时发送或消息已经是批处理消息的情况

本章的重点就是直接发送消息,这也是开发中使用最频发的方式

选择发送方式

//源码位置:
//包名:org.apache.rocketmq.client.producer
//文件名:DefaultMQProducer
//行数:720
public SendResult sendDirect(Message msg, MessageQueue mq,SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {// send in sync modeif (sendCallback == null) {if (mq == null) {//同步不指定队列return this.defaultMQProducerImpl.send(msg);} else {//同步指定队列return this.defaultMQProducerImpl.send(msg, mq);}} else {if (mq == null) {//异步不指定队列this.defaultMQProducerImpl.send(msg, sendCallback);} else {//异步指定队列this.defaultMQProducerImpl.send(msg, mq, sendCallback);}return null;}
}

有上面代码可以知道,方法中提供了三个参数设置:

  • msg :消息体,这个为必填项
  • sendCallback :消息回调对象,如果这个参数不为空,则为异步发送,为空则为同步发送
  • mq :指定的队列(指定与不指定的区别在于后续是否需要对队列负载均衡,下面源码中会讲到)

根据最开始生产者发送消息,我们只传入了msg,所以本次重点看同步不指定队列代码实现

同步发送消息

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:1525
public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

跟踪代码我们可以看到,方法中我们默认设置了CommunicationMode.SYNC 同步发送模式,并且回调参数为空,以及设置了默认超时时间3s

校验消息体

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:704
Validators.checkMessage(msg, this.defaultMQProducer);

该方法就是校验消息内容是否合规

  • 校验消息内容是否不为空,消息大小是否超过最大值maxMessageSize = 1024 * 1024 * 4; // 4M
  • 校验消息发送的topic是否为不为空,以及topic的长度是否超过默认最长值127

获取Topic订阅信息

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:709
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

该方法通过消息体中的topic名称获取topic的订阅信息,该方法在我们上一篇生产者启动中已经出现过了,深入方法内部其实就是先从本地topicPublishInfoTable map中获取数据,没有则从远程nameserver中拉取

高级特性-消息重投

这是rocketMq中一个重要的特性,消息如果投递失败了,会重新投递

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:715
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;

这段代码就是获取总过重投的次数:

不难看出,只有发送方式为同步发送时才为1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() =3次,其余发送方式都只有一次机会。

只有同步发送消息才支持消息重投,如果第一次投递失败了,mq还回重试2次投递

找到上面源码位置往下看,其实可以看到下面代码就是使用了一个for循环来进行重投

选择消息队列-负载均衡

通过上面我们知道,最开始并没有指定队列,所以需要程序来获取一个队列。

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:724
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName, resetIndex);

因为自动创建的topic,会被默认分配4个队列(生产环境为手动创建topic以及设置队列数量),所以我们必须使用负载均衡保证队列的合理分配到不同队列上,减轻单个队列的压力

  • topicPublishInfo:为消息发送到指定topic的订阅信息
  • lastBrokerName :为上一次选择的broker名称(如果在集群模式下,topic也会存在于多个broker上,因此记录上一次选择的broker名称可以避免连续选择同一个 Broker,从而实现更好的负载均衡和容错处理
  • resetIndex :重置队列索引位置(根据源码逻辑可知,当消息进行重新投递时会重置topic订阅消息中队列的索引位置)

深入上面源码会发现,队列负载均衡的算法获取索引策略默认就是轮询

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:TopicPublishInfo
//行数:101
int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size());

负载均衡策略

  1. 轮询策略 (Round-Robin)
  2. 随机策略 (Random)
  3. 一致性哈希策略 (Consistent Hashing)
  4. 权重随机策略 (Weighted Random)
  5. 最少连接策略 (Least Connections)

装载消息体发送消息

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:740
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);

该方法就是发现消息的核心方法了,不管是同步发送还是异步发送都会执行该方法

做一些发送消息前的准备,接下深入该方法查看

压缩消息内容

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:898
if (this.tryToCompressMessage(msg)) {sysFlag |= MessageSysFlag.COMPRESSED_FLAG;sysFlag |= compressType.getCompressionFlag();msgBodyCompressed = true;
}
  • 首先判断消息是否大于4k( compressMsgBodyOverHowmuch = 1024 * 4),大于则进行压缩,小于则不处理
//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:1070
byte[] data = compressor.compress(body, compressLevel);
  • 传入消息体以及压缩的等级,这里大佬们提供了三种压缩实现,分别基于三种不同的压缩框架

在我们日常工作中,如果需要压缩内容,也可以参考大佬们的实现,学习源码不仅仅是了解框架的本身,也要吸取优秀的地方合理运用

构造发送message的请求的Header

message是Producer发送给Broker的一个请求,我们可以把内容抽象成两部分组成:请求头请求体

  • 请求体就是消息本身数据
  • 请求头 SendMessageRequestHeader 则包含了各种必要的数据,比如topicmessaeQueue等等,更多可直接查看请求头对象源码

最后就是使用基于netty实现的远程调用发送消息到broker中

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:1016
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,brokerName,msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);

更新broker故障信息

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:742
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true);

程序执行到这个位置,说明前面消息发送的流程全部执行完成了,那么我们也知道了消息发送的结果,从而知道broker服务的状态情况,我们需要把当前的broker故障情况更新到 faultItemTable 本地map中,供后续对broker服务的故障规避faultItemTable 该map在前一篇生产者启动中也提到过。

异步发送消息

选择发送方式代码中当sendCallback!=null时则进入异步发送消息

跟踪源码我们可知,异步发送其实就是创建了一个单独的线程,使用Runnable对象实现,因为会返回一个执行结果

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:550
Runnable runnable = new Runnable() {@Overridepublic void run() {long costTime = System.currentTimeMillis() - beginStartTime;if (timeout > costTime) {try {sendDefaultImpl(msg, CommunicationMode.ASYNC, newCallBack, timeout - costTime);} catch (Exception e) {newCallBack.onException(e);}} else {newCallBack.onException(new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));}}executeAsyncMessageSend(runnable, msg, newCallBack, timeout, beginStartTime);
};
  • sendDefaultImpl() 该方法就是和同步发送调用的同一个了,唯一区别就是类型 CommunicationMode.ASYNC 和存在回调方法newCallBack
  • executeAsyncMessageSend() 执行异步消息发送

总结

本篇对生产者发送消息源码进行了跟踪学习,你是否也有所收获呢。下一篇我们将对rocketMq的核心组件Broker进行源码解读,Broker负责接收和存储消息,管理消息队列,并将消息分发给消费者, 是担任连接生产者和消费者,确保消息的高效传输和存储,保证系统的可靠性和性能的重要角色。

相关文章:

从零开始读RocketMq源码(二)Message的发送详解

目录 前言 准备 消息发送方式 深入源码 消息发送模式 选择发送方式 同步发送消息 校验消息体 获取Topic订阅信息 高级特性-消息重投 选择消息队列-负载均衡 装载消息体发送消息 压缩消息内容 构造发送message的请求的Header 更新broker故障信息 异步发送消息 …...

怎样优化 PostgreSQL 中对布尔类型数据的查询?

文章目录 一、索引的合理使用1. 常规 B-tree 索引2. 部分索引 二、查询编写技巧1. 避免不必要的类型转换2. 逻辑表达式的优化 三、表结构设计1. 避免过度细分的布尔列2. 规范化与反规范化 四、数据分布与分区1. 数据分布的考虑2. 表分区 五、数据库参数调整1. 相关配置参数2. 定…...

mysql在linux系统下重置root密码

mysql在linux系统下重置root密码 登录服务器时候mysql密码忘记了,没办法只能重置,找了一圈,把行之有效的方法介绍在这里。 错误展示: 我还以为yes就可以了呢,这是不行的意思。 关掉mysql服务 sudo systemctl stop …...

设计模式探索:观察者模式

1. 观察者模式 1.1 什么是观察者模式 观察者模式用于建立一种对象与对象之间的依赖关系,当一个对象发生改变时将自动通知其他对象,其他对象会相应地作出反应。 在观察者模式中有如下角色: Subject(抽象主题/被观察者&#xf…...

Perl语言入门到高级学习

Perl语言介绍 Perl,全称为Practical Extraction and Report Language,即“实用报表提取语言”,是一种高级、通用、直译式、动态的编程语言。Perl最初由Larry Wall设计,并于1987年12月18日首次发布。经过多年的不断发展和更新,Perl已经成为一种功能丰富且应用广泛的计算机程…...

DOM 基本操作 - 获取元素

theme: smartblue 一、简介 1.1 概念 文档对象模型(Document Object Model),是 W3C 组织推荐的处理可拓展标记语言的标准编程接口。 1.2 DOM 树 二、 获取元素 获取页面中的元素主要可以使用以几种方式: - 根据 ID 获取 - 根据 标签名 获取 - 通过 HTML5 新增的方法…...

Google 搜索引擎:便捷高效、精准查询,带来无与伦比的搜索体验

Google搜索引擎不仅具备检索功能,实则是引领探索万千世界的神秘钥匙。试想,无论何时何地,只需轻触屏幕,所需信息即可唾手可得。便捷与高效,令人叹为观止。其界面设计简约直观,操控体验犹如与未来对话&#…...

tomcat的介绍与优化

tomcat介绍 tomcat和php一样,都是用来处理动态页面的。 tomcat也可以作为web应用服务器,开源的。 php .php tomcat .jsp nginx .html tomcat 是用java代码写的程序,运行的是javaweb应用程序 tomcat的特点和功能: 1.servlet容器…...

Python 插入、替换、提取、或删除Excel中的图片

Excel是主要用于处理表格和数据的工具,我们也能在其中插入、编辑或管理图片,为工作表增添视觉效果,提升报告的吸引力。本文将详细介绍如何使用Python操作Excel中的图片,包含以下4个基础示例: 文章目录 Python 在Excel…...

紧凑型建模的veriloga语句要怎么看?

说点人话,真传一句话,那些一堆公式似是而非的东西,都是半懂不懂的人沽名钓誉用的。 其实建模,归根结底明白几个东西就行了。 1.什么是你的输入和输出信号? 2.你对输入输出信号要建立什么功能关系? 那我们看…...

大语言模型系列-Transformer介绍

大语言模型系列:Transformer介绍 引言 在自然语言处理(NLP)领域,Transformer模型已经成为了许多任务的标准方法。自从Vaswani等人在2017年提出Transformer以来,它已经彻底改变了NLP模型的设计。本文将介绍Transforme…...

JavaDS —— 顺序表ArrayList

顺序表 顺序表是用一段物理地址连续的存储单元依次存储数据元素的线性结构,一般情况下采用数组存储。在数组上完成数据的增删查改。在物理和逻辑上都是连续的。 模拟实现 下面是我们要自己模拟实现的方法: 首先我们要创建一个顺序表,顺序表…...

Sphinx 搜索配置

官方文档 http://sphinxsearch.com/docs/sphinx3.html 支持中文,英文,日文,韩文,俄罗斯语搜索 版本是 官网3.6.1版本 文件 sphinx.conf.dist 的windows 配置,官网下载下来后微微配置即可。 # Minimal Sphinx confi…...

如何在不关闭防火墙的情况下,让两台设备ping通

问题现象 如题,做虚拟机实验的时候,有一台linux系统的虚拟机配置的ip地址是192.168.172.181 物理主机的ip地址是192.168.172.1 此时物理主机可以ping通虚拟机 但是虚拟机不能ping通物理主机 此时我们可以想到,有可能是物理主机防火墙的原因。…...

windows USB 设备驱动开发-USB 等时传输

客户端驱动程序可以生成 USB 请求块 (URB) 以在 USB 设备中向/从常时等量端点传输数据。虽然USB设备一向以非等时传输出名,USB提供的是一种串行数据,而非等时,但是USB仍然设计了等时传输的机制,但根据笔者的经验,等时传…...

【文件共享 windows和linux】Windows Server 2016上开启文件夹共享,并在CentOS 7.4上访问和下载文件

要在Windows Server 2016上开启文件夹共享,并在CentOS 7.4上访问和下载文件,请按照以下步骤操作: 在Windows Server 2016上开启文件夹共享: 启用SMB服务: 打开“服务器管理器”。选择“文件和存储服务” > “共享…...

【知网CNKI-注册安全分析报告】

前言 由于网站注册入口容易被黑客攻击,存在如下安全问题: 暴力破解密码,造成用户信息泄露短信盗刷的安全问题,影响业务及导致用户投诉带来经济损失,尤其是后付费客户,风险巨大,造成亏损无底洞…...

【Python_GUI】tkinter常用组件——文本类组件

文本时窗口中必不可少的一部分,tkinter模块中,有3种常用的文本类组件,通过这3种组件,可以在窗口中显示以及输入单行文本、多行文本、图片等。 Label标签组件 Label组件的基本使用 Label组件是窗口中比较常用的组件,…...

zdppy+onlyoffice+vue3解决文档加载和文档强制保存时弹出警告的问题

解决过程 第一次排查 最开始排查的是官方文档说的 https://api.onlyoffice.com/editors/troubleshooting#key 解决方案。参考的是官方的 https://github.com/ONLYOFFICE/document-server-integration/releases/latest/download/Python.Example.zip 基于Django的Python代码。 …...

C语言从头学31——与字符串变量相关的几个函数

strlen、strcpy、strcat、strcmp、sprintf这些函数都是与字符串相关的,除了sprintf是定义在stdio.h中外,其余几个都定义在string.h中,比较新的编译器版本stdio.h中已经含有string.h的内容,所以编程时不需要再包含string.h这个头文…...

大话软工笔记—需求分析概述

需求分析,就是要对需求调研收集到的资料信息逐个地进行拆分、研究,从大量的不确定“需求”中确定出哪些需求最终要转换为确定的“功能需求”。 需求分析的作用非常重要,后续设计的依据主要来自于需求分析的成果,包括: 项目的目的…...

Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以?

Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以? 在 Golang 的面试中,map 类型的使用是一个常见的考点,其中对 key 类型的合法性 是一道常被提及的基础却很容易被忽视的问题。本文将带你深入理解 Golang 中…...

PHP和Node.js哪个更爽?

先说结论,rust完胜。 php:laravel,swoole,webman,最开始在苏宁的时候写了几年php,当时觉得php真的是世界上最好的语言,因为当初活在舒适圈里,不愿意跳出来,就好比当初活在…...

Java如何权衡是使用无序的数组还是有序的数组

在 Java 中,选择有序数组还是无序数组取决于具体场景的性能需求与操作特点。以下是关键权衡因素及决策指南: ⚖️ 核心权衡维度 维度有序数组无序数组查询性能二分查找 O(log n) ✅线性扫描 O(n) ❌插入/删除需移位维护顺序 O(n) ❌直接操作尾部 O(1) ✅内存开销与无序数组相…...

汽车生产虚拟实训中的技能提升与生产优化​

在制造业蓬勃发展的大背景下,虚拟教学实训宛如一颗璀璨的新星,正发挥着不可或缺且日益凸显的关键作用,源源不断地为企业的稳健前行与创新发展注入磅礴强大的动力。就以汽车制造企业这一极具代表性的行业主体为例,汽车生产线上各类…...

在WSL2的Ubuntu镜像中安装Docker

Docker官网链接: https://docs.docker.com/engine/install/ubuntu/ 1、运行以下命令卸载所有冲突的软件包: for pkg in docker.io docker-doc docker-compose docker-compose-v2 podman-docker containerd runc; do sudo apt-get remove $pkg; done2、设置Docker…...

Maven 概述、安装、配置、仓库、私服详解

目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...

Unity | AmplifyShaderEditor插件基础(第七集:平面波动shader)

目录 一、👋🏻前言 二、😈sinx波动的基本原理 三、😈波动起来 1.sinx节点介绍 2.vertexPosition 3.集成Vector3 a.节点Append b.连起来 4.波动起来 a.波动的原理 b.时间节点 c.sinx的处理 四、🌊波动优化…...

基于IDIG-GAN的小样本电机轴承故障诊断

目录 🔍 核心问题 一、IDIG-GAN模型原理 1. 整体架构 2. 核心创新点 (1) ​梯度归一化(Gradient Normalization)​​ (2) ​判别器梯度间隙正则化(Discriminator Gradient Gap Regularization)​​ (3) ​自注意力机制(Self-Attention)​​ 3. 完整损失函数 二…...

MySQL JOIN 表过多的优化思路

当 MySQL 查询涉及大量表 JOIN 时,性能会显著下降。以下是优化思路和简易实现方法: 一、核心优化思路 减少 JOIN 数量 数据冗余:添加必要的冗余字段(如订单表直接存储用户名)合并表:将频繁关联的小表合并成…...