当前位置: 首页 > news >正文

Rocket面试(五)Rocketmq发生流量控制的情况有哪些?

在使用rocketmq过程中总能看见一下异常

[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 206ms, size of queue: 5

这是因为Rocketmq出发了流量控制。

 

触发流量控制就是为了防止Broker压力过大挂掉。主要分为Broker流控,Consumer流控

1.Broker流控 

Rocketmq默认采取的是异步刷盘方式,Producer把消息发送到broker后,Broker会把消息暂放在Page Cache中刷盘线程定时的把数据刷到磁盘中

1.1 broker busy 

Broker是开启快速失败的,处理逻辑类是BrokerFastFailure,这个类中有一个定时任务用来清理过期的请求,每 10 ms 执行一次,代码如下:

public void start() {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {cleanExpiredRequest();}}}, 1000, 10, TimeUnit.MILLISECONDS);
}

 1.1.1 Page Cache繁忙

清理过期请求之前会先判断一下Page Cache是否繁忙,如果繁忙就会给Producer返回一个系统繁忙的状态码(code=2,remark="[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d"),那怎么判断Page Cache繁忙呢?

当Broker收到消息后,会放到Page Cache中,这个过程,首先会取一个CommitLog写入锁,如果持有锁的时间超过1s就认为Page Cache繁忙具体代码见 DefaultMessageStore 类 isOSPageCacheBusy 方法。

1.1.2清理过期请求

清理过期请求时,如果请求线程创建的时间与当前系统时间的间隔大于200ms.然后给 Producer 返回一个系统繁忙的状态码(code=2,remark="[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d")

1.2 system busy

这个异常在 NettyRemotingAbstract#processRequestCommand 方法.

1.拒绝请求

如果 NettyRequestProcessor 拒绝了请求,就会给 Producer 返回一个系统繁忙的状态码(code=2,remark="[REJECTREQUEST]system busy, start flow control for a while")

那什么情况下请求会被拒绝呢?看下面这段代码:

//SendMessageProcessor类
public boolean rejectRequest() {return this.brokerController.getMessageStore().isOSPageCacheBusy() ||this.brokerController.getMessageStore().isTransientStorePoolDeficient();
}

从代码中可以看到,请求被拒绝的情况有两种可能,一个是 Page Cache 繁忙,另一个是 TransientStorePoolDeficient。跟踪 isTransientStorePoolDeficient 方法,发现判断依据是在开启 transientStorePoolEnable 配置的情况下,是否还有可用的 ByteBuffer。

注意:在开启 transientStorePoolEnable 的情况下,写入消息时会先写入堆外内存(DirectByteBuffer),然后刷入 Page Cache,最后刷入磁盘。而读取消息是从 Page Cache,这样可以实现读写分离,避免读写都在 Page Cache 带来的问题

2.线程拒绝

Broker 收到请求后,会把处理逻辑封装成到 Runnable 中,由线程池来提交执行,如果线程池满了就会拒绝请求(这里线程池中队列的大小默认是 10000,可以通过参数 sendThreadPoolQueueCapacity 进行配置),线程池拒绝后会抛出异常 RejectedExecutionException,程序捕获到异常后,会判断是不是单向请求(OnewayRPC),如果不是,就会给 Producer 返回一个系统繁忙的状态码(code=2,remark="[OVERLOAD]system busy, start flow control for a while")

判断 OnewayRPC 的代码如下,flag = 2 或者 3 时是单向请求:

public boolean isOnewayRPC() {int bits = 1 << RPC_ONEWAY;return (this.flag & bits) == bits;
}

 1.3消息重试

Broker 发生流量控制的情况下,返回给 Producer 系统繁忙的状态码(code=2),Producer 收到这个状态码是不会进行重试的。下面是会进行重试的响应码:

//DefaultMQProducer类
private final Set<Integer> retryResponseCodes = new CopyOnWriteArraySet<Integer>(Arrays.asList(ResponseCode.TOPIC_NOT_EXIST,ResponseCode.SERVICE_NOT_AVAILABLE,ResponseCode.SYSTEM_ERROR,ResponseCode.NO_PERMISSION,ResponseCode.NO_BUYER_ID,ResponseCode.NOT_IN_CURRENT_UNIT
));

2.Consumer流控

        DefaultMQPushConsumerImpl 类中有 Consumer 流控的逻辑 。

2.1 缓存消息数量超过阈值

ProcessQueue 保存的消息数量超过阈值(默认 1000,可以配置),源码如下:

if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;
}

2.2缓存消息大小超过阈值

ProcessQueue 保存的消息大小超过阈值(默认 100M,可以配置),源码如下:

if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;
}

2.3缓存消息跨度超过阈值

对于非顺序消费的场景,ProcessQueue 中保存的最后一条和第一条消息偏移量之差超过阈值(默认 2000,可以配置)。源代码如下:

if (!this.consumeOrderly) {if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {log.warn("the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),pullRequest, queueMaxSpanFlowControlTimes);}return;}
}

2.4获取锁失败

对于顺序消费的情况,ProcessQueue加锁失败,也会延迟拉取,这个延迟时间默认是 3s,可以配置。

3.总结

本文介绍了 RocketMQ 发生流量控制的 8 个场景,其中 Broker 4 个场景,Consumer 4 个场景。Broker 的流量控制,本质是对 Producer 的流量控制,最好的解决方法就是给 Broker 扩容,增加 Broker 写入能力。而对于 Consumer 端的流量控制,需要解决 Consumer 端消费慢的问题,比如有第三方接口响应慢或者有慢 SQL。

Broker4种场景:

page cache 繁忙:获取commitlog写入锁超过1s 

清理过期请求 如果请求过期请求的时间到当前系统时间超过了200ms 

请求拒绝  一种是page cache繁忙 一种是 transientStorePoolEnable模式看是否可用buffer

线程池拒绝 Broker 收到请求后,会把处理逻辑封装成到 Runnable 中,由线程池来提交执行,如果线程池满了就会拒绝请求(这里线程池中队列的大小默认是 10000

Consumer4种场景

消息数量超过阈值1000

消息大小超过阈值100m

缓存消息跨度超过阈值 对于非顺序消费的场景,ProcessQueue 中保存的最后一条和第一条消息偏移量之差超过阈值2000

ProcessQueue加锁失败 也会延迟加载

相关文章:

Rocket面试(五)Rocketmq发生流量控制的情况有哪些?

在使用rocketmq过程中总能看见一下异常 [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 206ms, size of queue: 5这是因为Rocketmq出发了流量控制。 触发流量控制就是为了防止Broker压力过大挂掉。主要分为Broker流控&#xff0c;Consu…...

Tableau招聘信息数据可视化

获取的招聘信息数据为某招聘网站发布的大数据及数据分析相关岗位&#xff0c;对其他计算机相关岗位的招聘信息数据分析也有一定的参考价值。因为所获取的招聘信息数据数量只有1万左右&#xff0c;实际的招聘信息数量肯定不止1万&#xff0c;所以可能会与实际信息有一定的误差。…...

游戏服务器开发指南(八):合理应对异常

大家好&#xff01;我是长三月&#xff0c;一位在游戏行业工作多年的老程序员&#xff0c;专注于分享服务器开发相关的文章。 本文是通用程序设计主题下的第二篇。这个主题主要探讨如何编写高效、健壮、易读的游戏业务代码&#xff0c;每篇从一个小点切入。本次讨论的要点是&a…...

【g】聚类算法之K-means算法

聚类算法是一种无监督学习方法&#xff0c;它将相似的数据样本划分为一组&#xff0c;同时将不相似的数据样本划分为另一组。这个过程由计算机自动完成&#xff0c;不需要任何人为的干预。 K-means算法是一种经典的聚类算法&#xff0c;它的主要思想是把数据集分成k个簇&#…...

scala内建控制结构

一、条件表达式 &#xff08;一&#xff09;语法格式 - if (条件) 值1 else 值2&#xff08;二&#xff09;执行情况 条件为真&#xff0c;结果是值1&#xff1b;条件为假&#xff0c;结果是值2。如果if和else的返回结果同为某种类型&#xff0c;那么条件表达式结果也是那种类…...

Linux SSH命令实战教程,提升你的服务器管理基本功!

前言 大家好&#xff0c;又见面了&#xff0c;我是沐风晓月&#xff0c;本文是专栏【linux基本功-基础命令实战】的第62篇文章。 专栏地址&#xff1a;[linux基本功-基础命令专栏] &#xff0c; 此专栏是沐风晓月对Linux常用命令的汇总&#xff0c;希望能够加深自己的印象&am…...

【Python】Python进阶系列教程-- Python3 CGI编程(二)

文章目录 前言什么是CGI网页浏览CGI架构图Web服务器支持及配置第一个CGI程序HTTP头部CGI环境变量GET和POST方法使用GET方法传输数据简单的表单实例&#xff1a;GET方法使用POST方法传递数据通过CGI程序传递checkbox数据通过CGI程序传递Radio数据通过CGI程序传递 Textarea 数据通…...

do..while、while、for循环反汇编剖析

1、循环语句重要特征提取 循环语句最重要的特点就是执行的过程中会往上跳&#xff01;&#xff01;&#xff01; 箭头往上跳的一般都是循环语句&#xff0c;比如下面的for循环&#xff1a; 2、do..while语句反汇编 #include<iostream> using namespace std; #pragma …...

【代码随想录】刷题Day53

1.最长公共子序列 1143. 最长公共子序列 和之前的一道题目的区别就是这个子序列不需要每个字符相邻。那么条件就变成两种了&#xff0c;一种是当前的字符相同&#xff0c;一种是不同。相同跟之前的条件一样&#xff1b;不同则需要继承上次比较的较大值。if (text1[i - 1] tex…...

MySQL 索引及查询优化总结

一个简单的对比测试 前面的案例中&#xff0c;c2c_zwdb.t_file_count表只有一个自增id&#xff0c;FFileName字段未加索引的sql执行情况如下&#xff1a; 在上图中&#xff0c;typeall&#xff0c;keynull&#xff0c;rows33777。该sql未使用索引&#xff0c;是一个效率非常低…...

什么是AJAX?

AJAX是一种基于Web的技术&#xff0c;它允许Web应用程序在不刷新整个页面的情况下与服务器进行交互。通过AJAX&#xff0c;Web应用程序可以使用JavaScript向服务器发送异步请求并在不干扰用户的情况下更新页面的部分内容。 AJAX是Asynchronous JavaScript and XML的缩写。尽管…...

报表生成器FastReport .Net用户指南:显示数据列、HTML标签

FastReport .Net是一款全功能的Windows Forms、ASP.NET和MVC报表分析解决方案&#xff0c;使用FastReport .NET可以创建独立于应用程序的.NET报表&#xff0c;同时FastReport .Net支持中文、英语等14种语言&#xff0c;可以让你的产品保证真正的国际性。 FastReport.NET官方版…...

bootstrap-dialog弹框,去掉遮盖层,可移动

1.去掉遮盖层的设置data-backdrop"false" <div class"modal fade" id"modal" aria-modal"true" role"dialog" data-backdrop"false" style"width:50%"><div class"modal-dialog modal-l…...

7. user-Agent破解反爬机制

文章目录 1. 为什么要设置反爬机制2. 服务器如何区分浏览器访问和爬虫访问3. 反爬虫机制4. User-Agent是什么5. 如何查询网页的User-Agent6. user-agent信息解析7. 爬虫程序user-agent和浏览器user-agent的区别8. 代码查看爬虫程序的user-agent9. 在代码中加入请求头信息 1. 为…...

3.Nginx+Tomcat负载均衡和动静分离群集

文章目录 NginxTomcat负载均衡和动静分离群集Nginx作用实验七层反向代理nginx动静分离四层反向代理负载均衡 NginxTomcat负载均衡和动静分离群集 Nginx是-款非常优秀的HTTP服务器软件 支持高达50 000个并发连接数的响应拥有强大的静态资源处理能力运行稳定内存、CPU等系统资源…...

数据结构与算法之树结构

目录 为什么要使用树结构树结构基本概念树的种类树的存储与表示常见的一些树的应用场景为什么要使用树结构 线性结构中不论是数组还是链表,他们都存在着诟病;比如查找某个数必须从头开始查,消耗较多的时间。使用树结构,在插入和查找的性能上相对都会比线性结构要好 树结构…...

【python】 用来将对象持久化的 pickle 模块

pickle 模块可以对一个 Python 对象的二进制进行序列化和反序列化。说白了&#xff0c;就是它能够实现任意对象与二进制直接的相互转化&#xff0c;也可以实现对象与文本之间的相互转化。 比如&#xff0c;我程序里有一个 python 对象&#xff0c;我想把它存到磁盘里&#xff…...

【博客654】prometheus配置抓取保护以防止压力过载

prometheus抓取保护配置以防止压力过载 场景 担心您的应用程序指标可能突然激增&#xff0c;以及指标突然激增导致prometheus压力过载 就像生活中的许多事情一样&#xff0c;标签要有节制。当带有用户 ID 或电子邮件地址的标签被添加到指标时&#xff0c;虽然它不太可能结束…...

Backtrader官方中文文档:第十三章Observers观察者

本文档参考backtrader官方文档,是官方文档的完整中文翻译,可作为backtrader中文教程、backtrader中文参考手册、backtrader中文开发手册、backtrader入门资料使用。 本章包含 backtrader 官方Observers章节全部内容,入口 : https://backtrader.com/docu/observers-and-sta…...

算法leetcode|54. 螺旋矩阵(rust重拳出击)

文章目录 54. 螺旋矩阵&#xff1a;样例 1&#xff1a;样例 2&#xff1a;提示&#xff1a; 分析&#xff1a;题解&#xff1a;rust&#xff1a;go&#xff1a;c&#xff1a;python&#xff1a;java&#xff1a;每次循环移动一步&#xff1a;每次循环完成一个顺时针&#xff1a…...

单片机软件架构师使用Taotoken多模型对比分析内存分配策略

&#x1f680; 告别海外账号与网络限制&#xff01;稳定直连全球优质大模型&#xff0c;限时半价接入中。 &#x1f449; 点击领取海量免费额度 单片机软件架构师使用Taotoken多模型对比分析内存分配策略 在嵌入式软件开发中&#xff0c;内存分配策略的选择直接影响着系统的实…...

Zynq/ZynqMP PL端以太网实战:手把手教你用GMII to RGMII IP和EMIO打通网络(附KSZ9031 PHY驱动修改)

Zynq/ZynqMP PL端以太网实战&#xff1a;从硬件配置到驱动适配全流程解析 在嵌入式系统开发中&#xff0c;以太网通信是许多项目的核心需求。当我们需要在Zynq或ZynqMP平台上实现PL端以太网功能时&#xff0c;往往会遇到硬件IP配置和PHY驱动适配两大挑战。本文将带你完整走通从…...

xhs签名验证机制详解:如何绕过小红书反爬虫系统的终极指南

xhs签名验证机制详解&#xff1a;如何绕过小红书反爬虫系统的终极指南 【免费下载链接】xhs 基于小红书 Web 端进行的请求封装。https://reajason.github.io/xhs/ 项目地址: https://gitcode.com/gh_mirrors/xh/xhs 在小红书数据爬取领域&#xff0c;xhs签名验证机制是开…...

MiGPT终极指南:如何将小爱音箱改造成AI语音助手

MiGPT终极指南&#xff1a;如何将小爱音箱改造成AI语音助手 【免费下载链接】mi-gpt &#x1f3e0; 将小爱音箱接入 ChatGPT 和豆包&#xff0c;改造成你的专属语音助手。 项目地址: https://gitcode.com/GitHub_Trending/mi/mi-gpt 在智能家居日益普及的今天&#xff0…...

MTKClient终极指南:解锁联发科设备的完整刷机与调试解决方案

MTKClient终极指南&#xff1a;解锁联发科设备的完整刷机与调试解决方案 【免费下载链接】mtkclient MTK reverse engineering and flash tool 项目地址: https://gitcode.com/gh_mirrors/mt/mtkclient 你是否曾经遇到过联发科设备变砖无法启动的困境&#xff1f;或者想…...

仅剩72小时可获取的2026终极对比手册(含Prompt工程调优参数表、国产信创环境适配补丁包、等保2.0三级适配验证清单):ChatGPT与Gemini,你选错一个就多花237万年运维成本

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;ChatGPT与Gemini 2026年全面对比的基准定义与评估范式 为确保跨模型评估的科学性与可复现性&#xff0c;2026年主流AI基准已统一采用**多维动态评估范式&#xff08;MDEP&#xff09;**&#xff0c;该范…...

西门子S7-300/400老系统改造:用DP/DP Coupler打通新旧产线数据(附Step7组态避坑点)

西门子S7-300/400老系统改造&#xff1a;用DP/DP Coupler打通新旧产线数据&#xff08;附Step7组态避坑点&#xff09; 在工业自动化领域&#xff0c;老旧产线升级改造往往面临新旧设备通讯协议不兼容的难题。当传统S7-300系统需要与现代化S7-400或带PN接口的PLC进行数据交互时…...

基于区块链与IPFS的视频版权存证系统之区块链部分设计

本节对视频版权存证系统的区块链部分做一个简单的介绍,包括目录结构、文件作用、设计思路。 购买专栏前请认真阅读:《基于区块链与IPFS的视频版权存证系统》专栏简介 一、区块链部分文件目录简介 ├── bin //保存了二进制文件方便启动网络 │ ├── configtxgen //生成…...

FMCP协议:构建创作者统一文件管理中枢,打破应用孤岛

1. 项目概述&#xff1a;一个为创作者而生的文件管理中枢如果你是一位内容创作者&#xff0c;无论是视频剪辑师、摄影师、平面设计师&#xff0c;还是播客制作人&#xff0c;你的工作流里一定少不了与海量文件打交道。原始素材、工程文件、渲染输出、版本迭代……这些文件散落在…...

Vexip UI暗黑主题实现:CSS变量与主题切换完全指南 [特殊字符]

Vexip UI暗黑主题实现&#xff1a;CSS变量与主题切换完全指南 &#x1f3a8; 【免费下载链接】vexip-ui A Vue 3 UI library, highly customizability, full TypeScript, performance pretty good. 项目地址: https://gitcode.com/gh_mirrors/ve/vexip-ui 想要为你的Vue…...