Rocket面试(五)Rocketmq发生流量控制的情况有哪些?
在使用rocketmq过程中总能看见一下异常
[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 206ms, size of queue: 5
这是因为Rocketmq出发了流量控制。
触发流量控制就是为了防止Broker压力过大挂掉。主要分为Broker流控,Consumer流控
1.Broker流控
Rocketmq默认采取的是异步刷盘方式,Producer把消息发送到broker后,Broker会把消息暂放在Page Cache中刷盘线程定时的把数据刷到磁盘中

1.1 broker busy
Broker是开启快速失败的,处理逻辑类是BrokerFastFailure,这个类中有一个定时任务用来清理过期的请求,每 10 ms 执行一次,代码如下:
public void start() {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {cleanExpiredRequest();}}}, 1000, 10, TimeUnit.MILLISECONDS);
}
1.1.1 Page Cache繁忙
清理过期请求之前会先判断一下Page Cache是否繁忙,如果繁忙就会给Producer返回一个系统繁忙的状态码(code=2,remark="[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d"),那怎么判断Page Cache繁忙呢?
当Broker收到消息后,会放到Page Cache中,这个过程,首先会取一个CommitLog写入锁,如果持有锁的时间超过1s就认为Page Cache繁忙具体代码见 DefaultMessageStore 类 isOSPageCacheBusy 方法。
1.1.2清理过期请求
清理过期请求时,如果请求线程创建的时间与当前系统时间的间隔大于200ms.然后给 Producer 返回一个系统繁忙的状态码(code=2,remark="[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d")
1.2 system busy
这个异常在 NettyRemotingAbstract#processRequestCommand 方法.
1.拒绝请求
如果 NettyRequestProcessor 拒绝了请求,就会给 Producer 返回一个系统繁忙的状态码(code=2,remark="[REJECTREQUEST]system busy, start flow control for a while")。
那什么情况下请求会被拒绝呢?看下面这段代码:
//SendMessageProcessor类
public boolean rejectRequest() {return this.brokerController.getMessageStore().isOSPageCacheBusy() ||this.brokerController.getMessageStore().isTransientStorePoolDeficient();
}
从代码中可以看到,请求被拒绝的情况有两种可能,一个是 Page Cache 繁忙,另一个是 TransientStorePoolDeficient。跟踪 isTransientStorePoolDeficient 方法,发现判断依据是在开启 transientStorePoolEnable 配置的情况下,是否还有可用的 ByteBuffer。
注意:在开启 transientStorePoolEnable 的情况下,写入消息时会先写入堆外内存(DirectByteBuffer),然后刷入 Page Cache,最后刷入磁盘。而读取消息是从 Page Cache,这样可以实现读写分离,避免读写都在 Page Cache 带来的问题

2.线程拒绝
Broker 收到请求后,会把处理逻辑封装成到 Runnable 中,由线程池来提交执行,如果线程池满了就会拒绝请求(这里线程池中队列的大小默认是 10000,可以通过参数 sendThreadPoolQueueCapacity 进行配置),线程池拒绝后会抛出异常 RejectedExecutionException,程序捕获到异常后,会判断是不是单向请求(OnewayRPC),如果不是,就会给 Producer 返回一个系统繁忙的状态码(code=2,remark="[OVERLOAD]system busy, start flow control for a while")
判断 OnewayRPC 的代码如下,flag = 2 或者 3 时是单向请求:
public boolean isOnewayRPC() {int bits = 1 << RPC_ONEWAY;return (this.flag & bits) == bits;
}
1.3消息重试
Broker 发生流量控制的情况下,返回给 Producer 系统繁忙的状态码(code=2),Producer 收到这个状态码是不会进行重试的。下面是会进行重试的响应码:
//DefaultMQProducer类
private final Set<Integer> retryResponseCodes = new CopyOnWriteArraySet<Integer>(Arrays.asList(ResponseCode.TOPIC_NOT_EXIST,ResponseCode.SERVICE_NOT_AVAILABLE,ResponseCode.SYSTEM_ERROR,ResponseCode.NO_PERMISSION,ResponseCode.NO_BUYER_ID,ResponseCode.NOT_IN_CURRENT_UNIT
));
2.Consumer流控
DefaultMQPushConsumerImpl 类中有 Consumer 流控的逻辑 。
2.1 缓存消息数量超过阈值
ProcessQueue 保存的消息数量超过阈值(默认 1000,可以配置),源码如下:
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;
}
2.2缓存消息大小超过阈值
ProcessQueue 保存的消息大小超过阈值(默认 100M,可以配置),源码如下:
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;
}
2.3缓存消息跨度超过阈值
对于非顺序消费的场景,ProcessQueue 中保存的最后一条和第一条消息偏移量之差超过阈值(默认 2000,可以配置)。源代码如下:
if (!this.consumeOrderly) {if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {log.warn("the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),pullRequest, queueMaxSpanFlowControlTimes);}return;}
}
2.4获取锁失败
对于顺序消费的情况,ProcessQueue加锁失败,也会延迟拉取,这个延迟时间默认是 3s,可以配置。
3.总结
本文介绍了 RocketMQ 发生流量控制的 8 个场景,其中 Broker 4 个场景,Consumer 4 个场景。Broker 的流量控制,本质是对 Producer 的流量控制,最好的解决方法就是给 Broker 扩容,增加 Broker 写入能力。而对于 Consumer 端的流量控制,需要解决 Consumer 端消费慢的问题,比如有第三方接口响应慢或者有慢 SQL。
Broker4种场景:
page cache 繁忙:获取commitlog写入锁超过1s
清理过期请求 如果请求过期请求的时间到当前系统时间超过了200ms
请求拒绝 一种是page cache繁忙 一种是 transientStorePoolEnable模式看是否可用buffer
线程池拒绝 Broker 收到请求后,会把处理逻辑封装成到 Runnable 中,由线程池来提交执行,如果线程池满了就会拒绝请求(这里线程池中队列的大小默认是 10000
Consumer4种场景
消息数量超过阈值1000
消息大小超过阈值100m
缓存消息跨度超过阈值 对于非顺序消费的场景,ProcessQueue 中保存的最后一条和第一条消息偏移量之差超过阈值2000
ProcessQueue加锁失败 也会延迟加载
相关文章:
Rocket面试(五)Rocketmq发生流量控制的情况有哪些?
在使用rocketmq过程中总能看见一下异常 [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 206ms, size of queue: 5这是因为Rocketmq出发了流量控制。 触发流量控制就是为了防止Broker压力过大挂掉。主要分为Broker流控,Consu…...
Tableau招聘信息数据可视化
获取的招聘信息数据为某招聘网站发布的大数据及数据分析相关岗位,对其他计算机相关岗位的招聘信息数据分析也有一定的参考价值。因为所获取的招聘信息数据数量只有1万左右,实际的招聘信息数量肯定不止1万,所以可能会与实际信息有一定的误差。…...
游戏服务器开发指南(八):合理应对异常
大家好!我是长三月,一位在游戏行业工作多年的老程序员,专注于分享服务器开发相关的文章。 本文是通用程序设计主题下的第二篇。这个主题主要探讨如何编写高效、健壮、易读的游戏业务代码,每篇从一个小点切入。本次讨论的要点是&a…...
【g】聚类算法之K-means算法
聚类算法是一种无监督学习方法,它将相似的数据样本划分为一组,同时将不相似的数据样本划分为另一组。这个过程由计算机自动完成,不需要任何人为的干预。 K-means算法是一种经典的聚类算法,它的主要思想是把数据集分成k个簇&#…...
scala内建控制结构
一、条件表达式 (一)语法格式 - if (条件) 值1 else 值2(二)执行情况 条件为真,结果是值1;条件为假,结果是值2。如果if和else的返回结果同为某种类型,那么条件表达式结果也是那种类…...
Linux SSH命令实战教程,提升你的服务器管理基本功!
前言 大家好,又见面了,我是沐风晓月,本文是专栏【linux基本功-基础命令实战】的第62篇文章。 专栏地址:[linux基本功-基础命令专栏] , 此专栏是沐风晓月对Linux常用命令的汇总,希望能够加深自己的印象&am…...
【Python】Python进阶系列教程-- Python3 CGI编程(二)
文章目录 前言什么是CGI网页浏览CGI架构图Web服务器支持及配置第一个CGI程序HTTP头部CGI环境变量GET和POST方法使用GET方法传输数据简单的表单实例:GET方法使用POST方法传递数据通过CGI程序传递checkbox数据通过CGI程序传递Radio数据通过CGI程序传递 Textarea 数据通…...
do..while、while、for循环反汇编剖析
1、循环语句重要特征提取 循环语句最重要的特点就是执行的过程中会往上跳!!! 箭头往上跳的一般都是循环语句,比如下面的for循环: 2、do..while语句反汇编 #include<iostream> using namespace std; #pragma …...
【代码随想录】刷题Day53
1.最长公共子序列 1143. 最长公共子序列 和之前的一道题目的区别就是这个子序列不需要每个字符相邻。那么条件就变成两种了,一种是当前的字符相同,一种是不同。相同跟之前的条件一样;不同则需要继承上次比较的较大值。if (text1[i - 1] tex…...
MySQL 索引及查询优化总结
一个简单的对比测试 前面的案例中,c2c_zwdb.t_file_count表只有一个自增id,FFileName字段未加索引的sql执行情况如下: 在上图中,typeall,keynull,rows33777。该sql未使用索引,是一个效率非常低…...
什么是AJAX?
AJAX是一种基于Web的技术,它允许Web应用程序在不刷新整个页面的情况下与服务器进行交互。通过AJAX,Web应用程序可以使用JavaScript向服务器发送异步请求并在不干扰用户的情况下更新页面的部分内容。 AJAX是Asynchronous JavaScript and XML的缩写。尽管…...
报表生成器FastReport .Net用户指南:显示数据列、HTML标签
FastReport .Net是一款全功能的Windows Forms、ASP.NET和MVC报表分析解决方案,使用FastReport .NET可以创建独立于应用程序的.NET报表,同时FastReport .Net支持中文、英语等14种语言,可以让你的产品保证真正的国际性。 FastReport.NET官方版…...
bootstrap-dialog弹框,去掉遮盖层,可移动
1.去掉遮盖层的设置data-backdrop"false" <div class"modal fade" id"modal" aria-modal"true" role"dialog" data-backdrop"false" style"width:50%"><div class"modal-dialog modal-l…...
7. user-Agent破解反爬机制
文章目录 1. 为什么要设置反爬机制2. 服务器如何区分浏览器访问和爬虫访问3. 反爬虫机制4. User-Agent是什么5. 如何查询网页的User-Agent6. user-agent信息解析7. 爬虫程序user-agent和浏览器user-agent的区别8. 代码查看爬虫程序的user-agent9. 在代码中加入请求头信息 1. 为…...
3.Nginx+Tomcat负载均衡和动静分离群集
文章目录 NginxTomcat负载均衡和动静分离群集Nginx作用实验七层反向代理nginx动静分离四层反向代理负载均衡 NginxTomcat负载均衡和动静分离群集 Nginx是-款非常优秀的HTTP服务器软件 支持高达50 000个并发连接数的响应拥有强大的静态资源处理能力运行稳定内存、CPU等系统资源…...
数据结构与算法之树结构
目录 为什么要使用树结构树结构基本概念树的种类树的存储与表示常见的一些树的应用场景为什么要使用树结构 线性结构中不论是数组还是链表,他们都存在着诟病;比如查找某个数必须从头开始查,消耗较多的时间。使用树结构,在插入和查找的性能上相对都会比线性结构要好 树结构…...
【python】 用来将对象持久化的 pickle 模块
pickle 模块可以对一个 Python 对象的二进制进行序列化和反序列化。说白了,就是它能够实现任意对象与二进制直接的相互转化,也可以实现对象与文本之间的相互转化。 比如,我程序里有一个 python 对象,我想把它存到磁盘里ÿ…...
【博客654】prometheus配置抓取保护以防止压力过载
prometheus抓取保护配置以防止压力过载 场景 担心您的应用程序指标可能突然激增,以及指标突然激增导致prometheus压力过载 就像生活中的许多事情一样,标签要有节制。当带有用户 ID 或电子邮件地址的标签被添加到指标时,虽然它不太可能结束…...
Backtrader官方中文文档:第十三章Observers观察者
本文档参考backtrader官方文档,是官方文档的完整中文翻译,可作为backtrader中文教程、backtrader中文参考手册、backtrader中文开发手册、backtrader入门资料使用。 本章包含 backtrader 官方Observers章节全部内容,入口 : https://backtrader.com/docu/observers-and-sta…...
算法leetcode|54. 螺旋矩阵(rust重拳出击)
文章目录 54. 螺旋矩阵:样例 1:样例 2:提示: 分析:题解:rust:go:c:python:java:每次循环移动一步:每次循环完成一个顺时针:…...
UE5 学习系列(二)用户操作界面及介绍
这篇博客是 UE5 学习系列博客的第二篇,在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下: 【Note】:如果你已经完成安装等操作,可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作,重…...
接口测试中缓存处理策略
在接口测试中,缓存处理策略是一个关键环节,直接影响测试结果的准确性和可靠性。合理的缓存处理策略能够确保测试环境的一致性,避免因缓存数据导致的测试偏差。以下是接口测试中常见的缓存处理策略及其详细说明: 一、缓存处理的核…...
OpenLayers 可视化之热力图
注:当前使用的是 ol 5.3.0 版本,天地图使用的key请到天地图官网申请,并替换为自己的key 热力图(Heatmap)又叫热点图,是一种通过特殊高亮显示事物密度分布、变化趋势的数据可视化技术。采用颜色的深浅来显示…...
vue3+vite项目中使用.env文件环境变量方法
vue3vite项目中使用.env文件环境变量方法 .env文件作用命名规则常用的配置项示例使用方法注意事项在vite.config.js文件中读取环境变量方法 .env文件作用 .env 文件用于定义环境变量,这些变量可以在项目中通过 import.meta.env 进行访问。Vite 会自动加载这些环境变…...
DeepSeek 技术赋能无人农场协同作业:用 AI 重构农田管理 “神经网”
目录 一、引言二、DeepSeek 技术大揭秘2.1 核心架构解析2.2 关键技术剖析 三、智能农业无人农场协同作业现状3.1 发展现状概述3.2 协同作业模式介绍 四、DeepSeek 的 “农场奇妙游”4.1 数据处理与分析4.2 作物生长监测与预测4.3 病虫害防治4.4 农机协同作业调度 五、实际案例大…...
GitHub 趋势日报 (2025年06月06日)
📊 由 TrendForge 系统生成 | 🌐 https://trendforge.devlive.org/ 🌐 本日报中的项目描述已自动翻译为中文 📈 今日获星趋势图 今日获星趋势图 590 cognee 551 onlook 399 project-based-learning 348 build-your-own-x 320 ne…...
LRU 缓存机制详解与实现(Java版) + 力扣解决
📌 LRU 缓存机制详解与实现(Java版) 一、📖 问题背景 在日常开发中,我们经常会使用 缓存(Cache) 来提升性能。但由于内存有限,缓存不可能无限增长,于是需要策略决定&am…...
【Linux手册】探秘系统世界:从用户交互到硬件底层的全链路工作之旅
目录 前言 操作系统与驱动程序 是什么,为什么 怎么做 system call 用户操作接口 总结 前言 日常生活中,我们在使用电子设备时,我们所输入执行的每一条指令最终大多都会作用到硬件上,比如下载一款软件最终会下载到硬盘上&am…...
【堆垛策略】设计方法
堆垛策略的设计是积木堆叠系统的核心,直接影响堆叠的稳定性、效率和容错能力。以下是分层次的堆垛策略设计方法,涵盖基础规则、优化算法和容错机制: 1. 基础堆垛规则 (1) 物理稳定性优先 重心原则: 大尺寸/重量积木在下…...
热烈祝贺埃文科技正式加入可信数据空间发展联盟
2025年4月29日,在福州举办的第八届数字中国建设峰会“可信数据空间分论坛”上,可信数据空间发展联盟正式宣告成立。国家数据局党组书记、局长刘烈宏出席并致辞,强调该联盟是推进全国一体化数据市场建设的关键抓手。 郑州埃文科技有限公司&am…...
