当前位置: 首页 > news >正文

从零开始读RocketMq源码(二)Message的发送详解

目录

前言

准备

消息发送方式

深入源码

消息发送模式

选择发送方式

同步发送消息

校验消息体

获取Topic订阅信息

高级特性-消息重投

选择消息队列-负载均衡

装载消息体发送消息

压缩消息内容

构造发送message的请求的Header

更新broker故障信息

异步发送消息

总结


前言

上一篇我们已经对RocketMq生产者启动源码进行了学习《从零开始读RocketMq源码(一)生产者启动》那么本篇我们将对生产者发送消息的源码进行学习

准备

如果没看前一篇的,这里还是要强调本篇的rocketmq版本

首先我们从github上拉取rocketmqd的源码链接到本地,使用idea打开。

源码地址:https://github.com/apache/rocketmq

目前最新版本为:5.2.0

那么我们在idea上切换分支为 release-5.2.0

注:请保持和本篇的版本一直,方便后面文章中给出的代码块定位

消息发送方式

在读源码之前我们先了解下mq支持的发送消息的类型。

消息的发送方式有三种,但我们最常用的是同步的方式发送

  • sync 同步:消息发送后,必须等待消息的发送结果返回后,才能发送下一条消息
  • async 异步:消息发送后,不用等待返回结果,直接发送下一条数据,但会设置一个回调方法接收返回结果
  • oneway 单向:消息发送后,不会返回结果,也不会等待,也不会设置回调方法。适用场景日志收集、监控数据和快速通知等对可靠性要求不高但需要高性能的场景

深入源码

首先进入外层的producer.send()方法中

//源码位置:
//包名:org.apache.rocketmq.example.simple
//文件名:Producer
//行数:42
SendResult sendResult = producer.send(msg);

消息发送模式

//源码位置:
//包名:org.apache.rocketmq.client.producer
//文件名:DefaultMQProducer
//行数:431
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {msg.setTopic(withNamespace(msg.getTopic()));//批量发送if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {return sendByAccumulator(msg, null, null);} else {//单条发送return sendDirect(msg, null, null);}
}
  1. 自动批处理发送 -sendByAccumulator()
  • 该方法用于将消息累积到一个批处理容器中,等待足够的消息数量或达到某个时间间隔后,再进行批量发送。
  • 可以显著减少发送次数,提高吞吐量。

     2. 直接发送 -sendDirect()

  • 适用于即时发送或消息已经是批处理消息的情况

本章的重点就是直接发送消息,这也是开发中使用最频发的方式

选择发送方式

//源码位置:
//包名:org.apache.rocketmq.client.producer
//文件名:DefaultMQProducer
//行数:720
public SendResult sendDirect(Message msg, MessageQueue mq,SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {// send in sync modeif (sendCallback == null) {if (mq == null) {//同步不指定队列return this.defaultMQProducerImpl.send(msg);} else {//同步指定队列return this.defaultMQProducerImpl.send(msg, mq);}} else {if (mq == null) {//异步不指定队列this.defaultMQProducerImpl.send(msg, sendCallback);} else {//异步指定队列this.defaultMQProducerImpl.send(msg, mq, sendCallback);}return null;}
}

有上面代码可以知道,方法中提供了三个参数设置:

  • msg :消息体,这个为必填项
  • sendCallback :消息回调对象,如果这个参数不为空,则为异步发送,为空则为同步发送
  • mq :指定的队列(指定与不指定的区别在于后续是否需要对队列负载均衡,下面源码中会讲到)

根据最开始生产者发送消息,我们只传入了msg,所以本次重点看同步不指定队列代码实现

同步发送消息

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:1525
public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

跟踪代码我们可以看到,方法中我们默认设置了CommunicationMode.SYNC 同步发送模式,并且回调参数为空,以及设置了默认超时时间3s

校验消息体

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:704
Validators.checkMessage(msg, this.defaultMQProducer);

该方法就是校验消息内容是否合规

  • 校验消息内容是否不为空,消息大小是否超过最大值maxMessageSize = 1024 * 1024 * 4; // 4M
  • 校验消息发送的topic是否为不为空,以及topic的长度是否超过默认最长值127

获取Topic订阅信息

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:709
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

该方法通过消息体中的topic名称获取topic的订阅信息,该方法在我们上一篇生产者启动中已经出现过了,深入方法内部其实就是先从本地topicPublishInfoTable map中获取数据,没有则从远程nameserver中拉取

高级特性-消息重投

这是rocketMq中一个重要的特性,消息如果投递失败了,会重新投递

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:715
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;

这段代码就是获取总过重投的次数:

不难看出,只有发送方式为同步发送时才为1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() =3次,其余发送方式都只有一次机会。

只有同步发送消息才支持消息重投,如果第一次投递失败了,mq还回重试2次投递

找到上面源码位置往下看,其实可以看到下面代码就是使用了一个for循环来进行重投

选择消息队列-负载均衡

通过上面我们知道,最开始并没有指定队列,所以需要程序来获取一个队列。

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:724
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName, resetIndex);

因为自动创建的topic,会被默认分配4个队列(生产环境为手动创建topic以及设置队列数量),所以我们必须使用负载均衡保证队列的合理分配到不同队列上,减轻单个队列的压力

  • topicPublishInfo:为消息发送到指定topic的订阅信息
  • lastBrokerName :为上一次选择的broker名称(如果在集群模式下,topic也会存在于多个broker上,因此记录上一次选择的broker名称可以避免连续选择同一个 Broker,从而实现更好的负载均衡和容错处理
  • resetIndex :重置队列索引位置(根据源码逻辑可知,当消息进行重新投递时会重置topic订阅消息中队列的索引位置)

深入上面源码会发现,队列负载均衡的算法获取索引策略默认就是轮询

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:TopicPublishInfo
//行数:101
int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size());

负载均衡策略

  1. 轮询策略 (Round-Robin)
  2. 随机策略 (Random)
  3. 一致性哈希策略 (Consistent Hashing)
  4. 权重随机策略 (Weighted Random)
  5. 最少连接策略 (Least Connections)

装载消息体发送消息

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:740
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);

该方法就是发现消息的核心方法了,不管是同步发送还是异步发送都会执行该方法

做一些发送消息前的准备,接下深入该方法查看

压缩消息内容

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:898
if (this.tryToCompressMessage(msg)) {sysFlag |= MessageSysFlag.COMPRESSED_FLAG;sysFlag |= compressType.getCompressionFlag();msgBodyCompressed = true;
}
  • 首先判断消息是否大于4k( compressMsgBodyOverHowmuch = 1024 * 4),大于则进行压缩,小于则不处理
//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:1070
byte[] data = compressor.compress(body, compressLevel);
  • 传入消息体以及压缩的等级,这里大佬们提供了三种压缩实现,分别基于三种不同的压缩框架

在我们日常工作中,如果需要压缩内容,也可以参考大佬们的实现,学习源码不仅仅是了解框架的本身,也要吸取优秀的地方合理运用

构造发送message的请求的Header

message是Producer发送给Broker的一个请求,我们可以把内容抽象成两部分组成:请求头请求体

  • 请求体就是消息本身数据
  • 请求头 SendMessageRequestHeader 则包含了各种必要的数据,比如topicmessaeQueue等等,更多可直接查看请求头对象源码

最后就是使用基于netty实现的远程调用发送消息到broker中

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:1016
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,brokerName,msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);

更新broker故障信息

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:742
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true);

程序执行到这个位置,说明前面消息发送的流程全部执行完成了,那么我们也知道了消息发送的结果,从而知道broker服务的状态情况,我们需要把当前的broker故障情况更新到 faultItemTable 本地map中,供后续对broker服务的故障规避faultItemTable 该map在前一篇生产者启动中也提到过。

异步发送消息

选择发送方式代码中当sendCallback!=null时则进入异步发送消息

跟踪源码我们可知,异步发送其实就是创建了一个单独的线程,使用Runnable对象实现,因为会返回一个执行结果

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:550
Runnable runnable = new Runnable() {@Overridepublic void run() {long costTime = System.currentTimeMillis() - beginStartTime;if (timeout > costTime) {try {sendDefaultImpl(msg, CommunicationMode.ASYNC, newCallBack, timeout - costTime);} catch (Exception e) {newCallBack.onException(e);}} else {newCallBack.onException(new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));}}executeAsyncMessageSend(runnable, msg, newCallBack, timeout, beginStartTime);
};
  • sendDefaultImpl() 该方法就是和同步发送调用的同一个了,唯一区别就是类型 CommunicationMode.ASYNC 和存在回调方法newCallBack
  • executeAsyncMessageSend() 执行异步消息发送

总结

本篇对生产者发送消息源码进行了跟踪学习,你是否也有所收获呢。下一篇我们将对rocketMq的核心组件Broker进行源码解读,Broker负责接收和存储消息,管理消息队列,并将消息分发给消费者, 是担任连接生产者和消费者,确保消息的高效传输和存储,保证系统的可靠性和性能的重要角色。

相关文章:

从零开始读RocketMq源码(二)Message的发送详解

目录 前言 准备 消息发送方式 深入源码 消息发送模式 选择发送方式 同步发送消息 校验消息体 获取Topic订阅信息 高级特性-消息重投 选择消息队列-负载均衡 装载消息体发送消息 压缩消息内容 构造发送message的请求的Header 更新broker故障信息 异步发送消息 …...

怎样优化 PostgreSQL 中对布尔类型数据的查询?

文章目录 一、索引的合理使用1. 常规 B-tree 索引2. 部分索引 二、查询编写技巧1. 避免不必要的类型转换2. 逻辑表达式的优化 三、表结构设计1. 避免过度细分的布尔列2. 规范化与反规范化 四、数据分布与分区1. 数据分布的考虑2. 表分区 五、数据库参数调整1. 相关配置参数2. 定…...

mysql在linux系统下重置root密码

mysql在linux系统下重置root密码 登录服务器时候mysql密码忘记了,没办法只能重置,找了一圈,把行之有效的方法介绍在这里。 错误展示: 我还以为yes就可以了呢,这是不行的意思。 关掉mysql服务 sudo systemctl stop …...

设计模式探索:观察者模式

1. 观察者模式 1.1 什么是观察者模式 观察者模式用于建立一种对象与对象之间的依赖关系,当一个对象发生改变时将自动通知其他对象,其他对象会相应地作出反应。 在观察者模式中有如下角色: Subject(抽象主题/被观察者&#xf…...

Perl语言入门到高级学习

Perl语言介绍 Perl,全称为Practical Extraction and Report Language,即“实用报表提取语言”,是一种高级、通用、直译式、动态的编程语言。Perl最初由Larry Wall设计,并于1987年12月18日首次发布。经过多年的不断发展和更新,Perl已经成为一种功能丰富且应用广泛的计算机程…...

DOM 基本操作 - 获取元素

theme: smartblue 一、简介 1.1 概念 文档对象模型(Document Object Model),是 W3C 组织推荐的处理可拓展标记语言的标准编程接口。 1.2 DOM 树 二、 获取元素 获取页面中的元素主要可以使用以几种方式: - 根据 ID 获取 - 根据 标签名 获取 - 通过 HTML5 新增的方法…...

Google 搜索引擎:便捷高效、精准查询,带来无与伦比的搜索体验

Google搜索引擎不仅具备检索功能,实则是引领探索万千世界的神秘钥匙。试想,无论何时何地,只需轻触屏幕,所需信息即可唾手可得。便捷与高效,令人叹为观止。其界面设计简约直观,操控体验犹如与未来对话&#…...

tomcat的介绍与优化

tomcat介绍 tomcat和php一样,都是用来处理动态页面的。 tomcat也可以作为web应用服务器,开源的。 php .php tomcat .jsp nginx .html tomcat 是用java代码写的程序,运行的是javaweb应用程序 tomcat的特点和功能: 1.servlet容器…...

Python 插入、替换、提取、或删除Excel中的图片

Excel是主要用于处理表格和数据的工具,我们也能在其中插入、编辑或管理图片,为工作表增添视觉效果,提升报告的吸引力。本文将详细介绍如何使用Python操作Excel中的图片,包含以下4个基础示例: 文章目录 Python 在Excel…...

紧凑型建模的veriloga语句要怎么看?

说点人话,真传一句话,那些一堆公式似是而非的东西,都是半懂不懂的人沽名钓誉用的。 其实建模,归根结底明白几个东西就行了。 1.什么是你的输入和输出信号? 2.你对输入输出信号要建立什么功能关系? 那我们看…...

大语言模型系列-Transformer介绍

大语言模型系列:Transformer介绍 引言 在自然语言处理(NLP)领域,Transformer模型已经成为了许多任务的标准方法。自从Vaswani等人在2017年提出Transformer以来,它已经彻底改变了NLP模型的设计。本文将介绍Transforme…...

JavaDS —— 顺序表ArrayList

顺序表 顺序表是用一段物理地址连续的存储单元依次存储数据元素的线性结构,一般情况下采用数组存储。在数组上完成数据的增删查改。在物理和逻辑上都是连续的。 模拟实现 下面是我们要自己模拟实现的方法: 首先我们要创建一个顺序表,顺序表…...

Sphinx 搜索配置

官方文档 http://sphinxsearch.com/docs/sphinx3.html 支持中文,英文,日文,韩文,俄罗斯语搜索 版本是 官网3.6.1版本 文件 sphinx.conf.dist 的windows 配置,官网下载下来后微微配置即可。 # Minimal Sphinx confi…...

如何在不关闭防火墙的情况下,让两台设备ping通

问题现象 如题,做虚拟机实验的时候,有一台linux系统的虚拟机配置的ip地址是192.168.172.181 物理主机的ip地址是192.168.172.1 此时物理主机可以ping通虚拟机 但是虚拟机不能ping通物理主机 此时我们可以想到,有可能是物理主机防火墙的原因。…...

windows USB 设备驱动开发-USB 等时传输

客户端驱动程序可以生成 USB 请求块 (URB) 以在 USB 设备中向/从常时等量端点传输数据。虽然USB设备一向以非等时传输出名,USB提供的是一种串行数据,而非等时,但是USB仍然设计了等时传输的机制,但根据笔者的经验,等时传…...

【文件共享 windows和linux】Windows Server 2016上开启文件夹共享,并在CentOS 7.4上访问和下载文件

要在Windows Server 2016上开启文件夹共享,并在CentOS 7.4上访问和下载文件,请按照以下步骤操作: 在Windows Server 2016上开启文件夹共享: 启用SMB服务: 打开“服务器管理器”。选择“文件和存储服务” > “共享…...

【知网CNKI-注册安全分析报告】

前言 由于网站注册入口容易被黑客攻击,存在如下安全问题: 暴力破解密码,造成用户信息泄露短信盗刷的安全问题,影响业务及导致用户投诉带来经济损失,尤其是后付费客户,风险巨大,造成亏损无底洞…...

【Python_GUI】tkinter常用组件——文本类组件

文本时窗口中必不可少的一部分,tkinter模块中,有3种常用的文本类组件,通过这3种组件,可以在窗口中显示以及输入单行文本、多行文本、图片等。 Label标签组件 Label组件的基本使用 Label组件是窗口中比较常用的组件,…...

zdppy+onlyoffice+vue3解决文档加载和文档强制保存时弹出警告的问题

解决过程 第一次排查 最开始排查的是官方文档说的 https://api.onlyoffice.com/editors/troubleshooting#key 解决方案。参考的是官方的 https://github.com/ONLYOFFICE/document-server-integration/releases/latest/download/Python.Example.zip 基于Django的Python代码。 …...

C语言从头学31——与字符串变量相关的几个函数

strlen、strcpy、strcat、strcmp、sprintf这些函数都是与字符串相关的,除了sprintf是定义在stdio.h中外,其余几个都定义在string.h中,比较新的编译器版本stdio.h中已经含有string.h的内容,所以编程时不需要再包含string.h这个头文…...

MySQL 隔离级别:脏读、幻读及不可重复读的原理与示例

一、MySQL 隔离级别 MySQL 提供了四种隔离级别,用于控制事务之间的并发访问以及数据的可见性,不同隔离级别对脏读、幻读、不可重复读这几种并发数据问题有着不同的处理方式,具体如下: 隔离级别脏读不可重复读幻读性能特点及锁机制读未提交(READ UNCOMMITTED)允许出现允许…...

实现弹窗随键盘上移居中

实现弹窗随键盘上移的核心思路 在Android中&#xff0c;可以通过监听键盘的显示和隐藏事件&#xff0c;动态调整弹窗的位置。关键点在于获取键盘高度&#xff0c;并计算剩余屏幕空间以重新定位弹窗。 // 在Activity或Fragment中设置键盘监听 val rootView findViewById<V…...

Linux离线(zip方式)安装docker

目录 基础信息操作系统信息docker信息 安装实例安装步骤示例 遇到的问题问题1&#xff1a;修改默认工作路径启动失败问题2 找不到对应组 基础信息 操作系统信息 OS版本&#xff1a;CentOS 7 64位 内核版本&#xff1a;3.10.0 相关命令&#xff1a; uname -rcat /etc/os-rele…...

浪潮交换机配置track检测实现高速公路收费网络主备切换NQA

浪潮交换机track配置 项目背景高速网络拓扑网络情况分析通信线路收费网络路由 收费汇聚交换机相应配置收费汇聚track配置 项目背景 在实施省内一条高速公路时遇到的需求&#xff0c;本次涉及的主要是收费汇聚交换机的配置&#xff0c;浪潮网络设备在高速项目很少&#xff0c;通…...

LINUX 69 FTP 客服管理系统 man 5 /etc/vsftpd/vsftpd.conf

FTP 客服管理系统 实现kefu123登录&#xff0c;不允许匿名访问&#xff0c;kefu只能访问/data/kefu目录&#xff0c;不能查看其他目录 创建账号密码 useradd kefu echo 123|passwd -stdin kefu [rootcode caozx26420]# echo 123|passwd --stdin kefu 更改用户 kefu 的密码…...

QT3D学习笔记——圆台、圆锥

类名作用Qt3DWindow3D渲染窗口容器QEntity场景中的实体&#xff08;对象或容器&#xff09;QCamera控制观察视角QPointLight点光源QConeMesh圆锥几何网格QTransform控制实体的位置/旋转/缩放QPhongMaterialPhong光照材质&#xff08;定义颜色、反光等&#xff09;QFirstPersonC…...

Kafka入门-生产者

生产者 生产者发送流程&#xff1a; 延迟时间为0ms时&#xff0c;也就意味着每当有数据就会直接发送 异步发送API 异步发送和同步发送的不同在于&#xff1a;异步发送不需要等待结果&#xff0c;同步发送必须等待结果才能进行下一步发送。 普通异步发送 首先导入所需的k…...

C#学习第29天:表达式树(Expression Trees)

目录 什么是表达式树&#xff1f; 核心概念 1.表达式树的构建 2. 表达式树与Lambda表达式 3.解析和访问表达式树 4.动态条件查询 表达式树的优势 1.动态构建查询 2.LINQ 提供程序支持&#xff1a; 3.性能优化 4.元数据处理 5.代码转换和重写 适用场景 代码复杂性…...

作为测试我们应该关注redis哪些方面

1、功能测试 数据结构操作&#xff1a;验证字符串、列表、哈希、集合和有序的基本操作是否正确 持久化&#xff1a;测试aof和aof持久化机制&#xff0c;确保数据在开启后正确恢复。 事务&#xff1a;检查事务的原子性和回滚机制。 发布订阅&#xff1a;确保消息正确传递。 2、性…...

Android写一个捕获全局异常的工具类

项目开发和实际运行过程中难免会遇到异常发生&#xff0c;系统提供了一个可以捕获全局异常的工具Uncaughtexceptionhandler&#xff0c;它是Thread的子类&#xff08;就是package java.lang;里线程的Thread&#xff09;。本文将利用它将设备信息、报错信息以及错误的发生时间都…...