【RocketMQ】源码详解:生产者启动与消息发送流程
消息发送
生产者启动
入口 : org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)
生产者在调用send()方法发送消息之前,需要调用start进行启动, 生产者启动过程中会启动一些服务和线程
启动过程中会启动MQClientInstance, 这个实例是针对一个项目的全部生产者消费者, 而不是单个的生产者或消费者
MQClientInstance内部会启动一些服务和定时任务,如netty服务、内部生产者服务等
启动方法最后,则会发送心跳包给broker
生产者启动: org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)
public void start(final boolean startFactory) throws MQClientException {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// 检查配置,主要是生产者组名this.checkConfig();if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {this.defaultMQProducer.changeInstanceNameToPID();}// 创建 MQClientInstance 实例, 消费者启动时也有这一步(对于每个客户端来说, 只有一个客户端实例(一个项目有多个生产者、消费者))this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);// 将当前生产者注册到MQClientInstance中的producerTableboolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException();}// 自动创建topic的配置this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());if (startFactory) {/** 启动 MQClientInstance* netty服务、各种定时任务、拉取消息服务、rebalanceService服务*/mQClientFactory.start();}log.info("the producer [{}] start OK);this.serviceState = ServiceState.RUNNING;break;// ...省略default:break;}// 发送心跳信息给所有broker。this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();// 启动扫描 超时请求 的定时任务,this.startScheduledTask();}
客户端实例启动: org.apache.rocketmq.client.impl.factory.MQClientInstance#start
public void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// If not specified,looking address from name serverif (null == this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr();}// netty服务this.mQClientAPIImpl.start();// 启动各种定时任务this.startScheduledTask();// 拉取消息服务,针对消费者this.pullMessageService.start();// 重平衡服务,针对消费者this.rebalanceService.start();this.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info("the client factory [{}] start OK", this.clientId);this.serviceState = ServiceState.RUNNING;break;case START_FAILED:throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);default:break;}}
}
消息发送流程
入口: org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
调用任一发送方法后,会一路调用到sendDefaultImpl方法
首先会检查消费者状态和消息的格式是否正确
之后会进入一个循环来发送消息,同步消息的循环次数为3次,即可以重试两次,其余消息只发送一次
在循环中,首先会按照轮询的方法选择一个queue进行发送,若发送出现异常则退出当前循环进入下一次循环(若开启故障延迟还会更新broker的故障表,设置隔离时间,隔离时间根据 MQFaultStrategy类中的latencyMax和notAvailableDuration数组进行判断,如其中超时在0.55s - 1s内则隔离30s)
在重新获取queue时,若开启故障延迟,在选择时则会选择【不在故障列表中,或者在故障列表但是时间已经过了其下一次可用的时间点的可用broker】,以实现高可用。若未开启故障延迟,则会传入上一次选择的broker,在这次选择时避开,选择方式也是轮询。
private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 检查生产者状态this.makeSureStateOK();/*** 检查消息格式是否合法:* 1. msg是否为null* 2. topic 是否为空、长度是否大于127、字符串是否有非法字符、是否是系统topic(比如延时topic)* 3. 消息体 是否为空、大小是否大于4MB*/Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID = random.nextLong();long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;// 获取topic路由信息(存在哪些broker上), 首先获取本地缓存的,若没有则获取nameServer的TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {boolean callTimeout = false;MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;// 计算最大发送次数,同步模式为3,即默认允许重试2次,可更改重试次数// 其他模式为1,即不允许重试,不可更改。int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];for (; times < timesTotal; times++) {/*** 如果mq为空则说明第一次进入,则不存在lastBrokerName* 否则,说明为循环进入,则上一次发送失败,则获取上一次失败的brokerName*/String lastBrokerName = null == mq ? null : mq.getBrokerName();/*** 选择一个queue** selectOneMessageQueue方法内,可选故障转移为开启, 需要sendLatencyFaultEnable设置为true* 开启:* 对于请求响应较慢的broker,可以在一段时间内将其状态置为不可用(下方catch中有调用的updateFaultItem方法)* 消息队列选择时,会过滤掉mq认为不可用的broker,以此来避免不断向宕机的broker发送消息* 选取一个延迟较短的broker,实现消息发送高可用。* 不开启:* 则传入lastBrokerName,即不会再次选择上次发送失败的broker**/MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected != null) {mq = mqSelected;brokersSent[times] = mq.getBrokerName();try {beginTimestampPrev = System.currentTimeMillis();if (times > 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {callTimeout = true;break;}// 发送消息sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();// 这里调用并传入false,是为了在发送时间超过550ms时,把broker置为故障,// 隔离时间根据 MQFaultStrategy类中的latencyMax和notAvailableDuration数组进行判断,如其中超时在0.55s - 1s内则隔离30sthis.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}} catch (RemotingException e) {endTimestamp = System.currentTimeMillis();// 异常传入为true,表示隔离时间采用默认的30sthis.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;}// ... 省略代码
选择queue : org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {// 判断是否启用故障延迟机制,默认不启用if (this.sendLatencyFaultEnable) {try {int index = tpInfo.getSendWhichQueue().incrementAndGet();for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (pos < 0)pos = 0;// 轮询获取到一个MessageQueue mq = tpInfo.getMessageQueueList().get(pos);// 如果该broker不在故障列表中,或者在故障列表但是时间已经过了其下一次可用的时间点,则为可用,直接返回if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))return mq;}// 到这里说明全部不正常// 没有选出无故障的mq,那么从故障集合中随机选择一个final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();// 如果写队列数大于0,那么选择该brokerint writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {final MessageQueue mq = tpInfo.selectOneMessageQueue();if (notBestBroker != null) {mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);}return mq;} else {// 如果写队列数小于0,那么移除该brokerlatencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error("Error occurred when selecting message queue", e);}// 上面都没有返回,则采用轮询的方式选择return tpInfo.selectOneMessageQueue();}// 默认不启用return tpInfo.selectOneMessageQueue(lastBrokerName);
}
更新延时表: org.apache.rocketmq.client.latency.MQFaultStrategy#updateFaultItem
判断延时时间: org.apache.rocketmq.client.latency.MQFaultStrategy#computeNotAvailableDuration
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {if (this.sendLatencyFaultEnable) {// 若isolation为true则默认延时30s,否则调用方法根据超时时间来获取延时时间long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);//更新故障记录表this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);}
}private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30 * 1000L, 60 * 1000L, 120 * 1000L/* 2min */, 180000L/* 3min */, 600000L/* 10min */};private long computeNotAvailableDuration(final long currentLatency) {/*** 根据latencyMax和notAvailableDuration的下标一一对应,若超时时间大于等于notAvailableDuration,则延时latencyMax对应下标的时间* 小于0.55s : 0s* [0.55s,1s) : 30s* [1s,2s) : 60s* ....省略*/for (int i = latencyMax.length - 1; i >= 0; i--) {if (currentLatency >= latencyMax[i])return this.notAvailableDuration[i];}return 0;
}
相关文章:
【RocketMQ】源码详解:生产者启动与消息发送流程
消息发送 生产者启动 入口 : org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean) 生产者在调用send()方法发送消息之前,需要调用start进行启动, 生产者启动过程中会启动一些服务和线程 启动过程中会启动MQClientInstance, 这个实例是针对一个项…...

信息安全(一)
思维导图 一、AES加解密 1.概述 1.1 概念 AES: 高级加密标准(Advanced Encryption Standard)是一种对称加密的区块加密标准。 (1)替代DES的新一代分组加密算法 (2)支持三种长度密钥&#x…...

企业多会场视频直播(主会场、分会场直播)实例效果
阿酷TONY 2023-2-16 长沙 活动直播做多会场切换功能(主会场、分会场、会场一、会场二、会场三自由切换) 企业多会场视频直播(主会场、分会场直播)实例效果 特点:支持PC端,也支持移动端观看,会…...
线性代数速览(一)行列式
文章目录行列式🌻 行列式的定义🌼 行列式的性质🌷 一些定理🥀 行列式的计算🌺 克莱姆法则行列式 行列式的本质,就是一个数值。 🌻 行列式的定义 有三种定义:1、按行展开ÿ…...

恭喜山东翰林“智慧园区管理系统”获易知微可视化设计大赛二等奖
数字化经济发展是全球经济发展的重中之重,“数字孪生(Digital Twin)”这一词汇正在成为学术界和产业界的一个热点。数字孪生作为近年来的新兴技术,其与国民经济各产业融合不断深化,推动着各大产业数字化、网络化、智能…...
gulp简单使用
gulp gulp的核心理念是task runner 可以定义自己的一系列任务 等待任务被执行 基于文件stream的构建流 我们可以使用gulp的插件体系来完成某些任务 webpack的核心理念是module bundler webpack是一个模块化的打包工具 可以使用各种各样的loader来加载不同的模块 可以使用各种…...

ce认证机构如何选择?
CE认证想必大家都已经有所了解,它是产品进入欧盟销售的通行证,那么我们在办理CE认证时该怎么进行选择?带大家了解一下CE认证机构,以及该怎么去进行选择? 以下信息由证果果编辑整理,更多认证机构信息请到证果果网站查看。找机构…...

全网招募P图高手!阿里巴巴持续训练鉴假AI
P过的证件如何鉴定为真?三千万网友都晒出了与梅西的合影?图像编辑技术的普及让人人都能P图,但也带来“假图”识别难题,甚至是欺诈问题。 为此,阿里安全联合华中科技大学国家防伪工程中心、国际文档分析识别方向的唯一顶…...

webrtc QOS笔记一 Neteq直方图算法浅读
webrtc QOS笔记一 Neteq直方图算法浅读 文章目录webrtc QOS笔记一 Neteq直方图算法浅读Histogram Algorithm获取目标延迟遗忘因子曲线Histogram Algorithm DelayManager::Update()->Histogram::Add() 会根据计算的iat_packet(inter arrival times, 实际包间间隔 / 打包时长…...
细分和切入点
本文重点介绍做SEO网站细分和切入点的方法:当我们的行业和关键词竞争性比较大的时候,我们可以考虑对行业或者产品做细分,从而找到切入点。可以按照以下三个方面进行细分。1、按城市细分例如:A:餐饮培训,当前…...

iOS创建Universal Link
iOS 9之前,一直使用的是URL Schemes技术来从外部对App进行跳转,但是iOS系统中进行URL Schemes跳转的时候如果没有安装App,会提示无法打开页面的提示。 iOS 9之后起可以使用Universal Links技术进行跳转页面,这是一种体验更加完美的…...

RuoYi-Vue搭建(若依)
项目简介 RuoYi-Vue基于SpringBootVue前后端分离的Java快速开发框架1.前端采用Vue、Element UI2.后端采用Spring Boot、Spring Security、Redis & Jwt3.权限认证使用Jwt,支持多终端认证系统4.支持加载动态权限菜单,多方式轻松权限控制5.高效率开发&a…...

进程组和用处
进程组:一个或多个进程的集合,进程组id是一个正整数。组长进程:进程组id 进程id组长进程可以创建一个进程组,创建该进程组的进程,终止了,只要进程组有一个进程存在,进程组就存在,与…...

Nacos集群+Nginx负载均衡
搭建Nacos集群 注意: 3个或3个以上Nacos节点才能构成集群。要求服务器内存分配最好大于6G以上(如果不够则需修改nacos启动脚本中的默认内存配置)根据nacos自带的mysql建库脚本建立对应数据库(/conf/nacos-mysql.sql)如果是三台服…...
TypeScript 学习之类型兼容
TypeScript 的类型兼容性是基于结构子类型的。 结构类型是一种只使用其成员来描述类型的方式。 interface Named {name: string; }class Person {name: string; }let p: Named; p new Person();// 赋值成功,因为都是结构类型,只要Person 类型的包含 Nam…...

Linux软件管理RPM
目录 前言 RPM软件管理程序:rpm RPM默认安装的路径 PRM讲解前准备工作 RPM安装(install) RPM查询(query) RPM卸载(erase) RPM升级与更新(upgrade/freshen) RPM重…...
01背包问题
背包问题的递归解决过程如下: 第一步明确思路 在解决问题之前,为描述方便,首先定义一些变量:Vi表示第 i 个物品的价值,Wi表示第 i 个物品的体积,定义V(i,j):当前背包容量 j,前 i 个…...

14_FreeRTOS二值信号量
目录 信号量的简介 队列与信号量的对比 二值信号量 二值信号量相关API函数 实验源码 信号量的简介 信号量是一种解决同步问题的机制,可以实现对共享资源的有序访问。 假设有一个人需要在停车场停车 1.首先判断停车场是否还有空车位(判断信号量是否有资源) 2.停车场正好…...

JavaScript随手笔记---轮播图(点击切换)
💌 所属专栏:【JavaScript随手笔记】 😀 作 者:我是夜阑的狗🐶 🚀 个人简介:一个正在努力学技术的CV工程师,专注基础和实战分享 ,欢迎咨询! &#…...
机器人学 markdown数学公式常用语法
参考链接1 本文包含了markdown常用的数学公式,按照目录可查询选用 初始类 行内数学公式均用两个符号包裹行间数学公式均用两个符号包裹 行间数学公式均用两个符号包裹行间数学公式均用两个符号包裹,用于表示重要的、需在行间单独列出的公式 $行内数学…...
Android Wi-Fi 连接失败日志分析
1. Android wifi 关键日志总结 (1) Wi-Fi 断开 (CTRL-EVENT-DISCONNECTED reason3) 日志相关部分: 06-05 10:48:40.987 943 943 I wpa_supplicant: wlan0: CTRL-EVENT-DISCONNECTED bssid44:9b:c1:57:a8:90 reason3 locally_generated1解析: CTR…...
在HarmonyOS ArkTS ArkUI-X 5.0及以上版本中,手势开发全攻略:
在 HarmonyOS 应用开发中,手势交互是连接用户与设备的核心纽带。ArkTS 框架提供了丰富的手势处理能力,既支持点击、长按、拖拽等基础单一手势的精细控制,也能通过多种绑定策略解决父子组件的手势竞争问题。本文将结合官方开发文档,…...
mongodb源码分析session执行handleRequest命令find过程
mongo/transport/service_state_machine.cpp已经分析startSession创建ASIOSession过程,并且验证connection是否超过限制ASIOSession和connection是循环接受客户端命令,把数据流转换成Message,状态转变流程是:State::Created 》 St…...

无法与IP建立连接,未能下载VSCode服务器
如题,在远程连接服务器的时候突然遇到了这个提示。 查阅了一圈,发现是VSCode版本自动更新惹的祸!!! 在VSCode的帮助->关于这里发现前几天VSCode自动更新了,我的版本号变成了1.100.3 才导致了远程连接出…...

优选算法第十二讲:队列 + 宽搜 优先级队列
优选算法第十二讲:队列 宽搜 && 优先级队列 1.N叉树的层序遍历2.二叉树的锯齿型层序遍历3.二叉树最大宽度4.在每个树行中找最大值5.优先级队列 -- 最后一块石头的重量6.数据流中的第K大元素7.前K个高频单词8.数据流的中位数 1.N叉树的层序遍历 2.二叉树的锯…...
ip子接口配置及删除
配置永久生效的子接口,2个IP 都可以登录你这一台服务器。重启不失效。 永久的 [应用] vi /etc/sysconfig/network-scripts/ifcfg-eth0修改文件内内容 TYPE"Ethernet" BOOTPROTO"none" NAME"eth0" DEVICE"eth0" ONBOOT&q…...

10-Oracle 23 ai Vector Search 概述和参数
一、Oracle AI Vector Search 概述 企业和个人都在尝试各种AI,使用客户端或是内部自己搭建集成大模型的终端,加速与大型语言模型(LLM)的结合,同时使用检索增强生成(Retrieval Augmented Generation &#…...

Selenium常用函数介绍
目录 一,元素定位 1.1 cssSeector 1.2 xpath 二,操作测试对象 三,窗口 3.1 案例 3.2 窗口切换 3.3 窗口大小 3.4 屏幕截图 3.5 关闭窗口 四,弹窗 五,等待 六,导航 七,文件上传 …...
MySQL 8.0 事务全面讲解
以下是一个结合两次回答的 MySQL 8.0 事务全面讲解,涵盖了事务的核心概念、操作示例、失败回滚、隔离级别、事务性 DDL 和 XA 事务等内容,并修正了查看隔离级别的命令。 MySQL 8.0 事务全面讲解 一、事务的核心概念(ACID) 事务是…...

脑机新手指南(七):OpenBCI_GUI:从环境搭建到数据可视化(上)
一、OpenBCI_GUI 项目概述 (一)项目背景与目标 OpenBCI 是一个开源的脑电信号采集硬件平台,其配套的 OpenBCI_GUI 则是专为该硬件设计的图形化界面工具。对于研究人员、开发者和学生而言,首次接触 OpenBCI 设备时,往…...