RocketMQ 5.0 无状态实时性消费详解
作者:绍舒
背景
RocketMQ 5.0 版本引入了 Proxy 模块、无状态 pop 消费机制和 gRPC 协议等创新功能,同时还推出了一种全新的客户端类型:SimpleConsumer。
SimpleConsumer 客户端采用了无状态的 pop 机制,彻底解决了在客户端发布消息、上下线时可能出现的负载均衡问题。然而,这种新机制也带来了一个新的挑战:当客户端数量较少且消息数量较少时,可能会出现消息消费延时的情况。
在当前的消息产品中,消费普遍使用了长轮询机制,即客户端向服务端发送一个超时时间相对较长的请求,该请求会一直挂起,除非队列中存在消息或该请求到达设定的长轮询时间。
然而,在引入 Proxy 之后,目前的长轮询机制出现了一个问题。客户端层面的长轮询和 Proxy 与 Broker 内部的长轮询之间互相耦合,也就是说,一次客户端对 Proxy 的长轮询只对应一次 Proxy 对 Broker 的长轮询。因此,在以下情况下会出现问题:当客户端数量较少且后端存在多个可用的 Broker 时,如果请求到达了没有消息的 Broker,就会触发长轮询挂起逻辑。此时,即使另一台 Broker 存在消息,由于请求挂在了另一个 Broker 上,也无法拉取到消息。这导致客户端无法实时接收到消息,即 false empty response。
这种情况可能导致以下现象:用户发送一条消息后,再次发起消费请求,但该请求却无法实时拉取到消息。这种情况对于消息传递的实时性和可靠性产生了不利影响。
AWS 的文档里也有描述此等现象,他们的解决方案是通过查询是所有的后端服务,减少 false empty response。

其他产品
在设计方案时,首先是需要目前存在的消息商业化产品是如何处理该问题的。
MNS 采取了以下策略,主要是将长轮询时间切割为多个短轮询时间片,以尽可能覆盖所有的 Broker。
首先,在长轮询时间内,会对后端的 Broker 进行多次请求。其次,当未超过短轮询配额时,优先使用短轮询消费请求来与 Broker 进行通信,否则将使用长轮询,其时间等于客户端的长轮询时间。此外,考虑到过多的短轮询可能会导致 CPU 和网络资源消耗过多的问题,因此在短轮询超过一定数量且剩余时间充足时,最后一次请求将转为长轮询。
然而,上述策略虽以尽可能轮询完所有的 Broker 为目标,但并不能解决所有问题。当轮询时间较短或 Broker 数量较多时,无法轮询完所有的 Broker。即使时间足够充足的情况下,也有可能出现时间错位的情况,即在短轮询请求结束后,才有消息在该 Broker 上就绪,导致无法及时取回该消息。
解法
技术方案
首先,需要明确该问题的范围和条件。该问题只会在客户端数量较少且请求较少的情况下出现。当客户端数量较多且具备充足的请求能力时,该问题不会出现。因此,理想情况是设计一个自适应的方案,能够在客户端数量较多时不引入额外成本来解决该问题。
为了解决该问题,关键在于将前端的客户端长轮询和后端的 Broker 长轮询解耦,并赋予 Proxy 感知后端消息个数的能力,使其能够优先选择有消息的 Broker,避免 false empty response。
考虑到 Pop 消费本身的无状态属性,期望设计方案的逻辑与 Pop 一致,而不在代理中引入额外的状态来处理该问题。
另外,简洁性是非常重要的,因此期望该方案能够保持简单可靠,不引入过多的复杂性。
- 为了解决该问题,本质上是要将前端的客户端长轮询和后端的 Broker 长轮询解耦开来,并赋予 Proxy 感知后端消息个数的能力,能够优先选择有消息的 Broker,避免 false empty response。
- 由于 Pop 消费本身的无状态属性,因此期望该方案的设计逻辑和 Pop 一致,而不在 Proxy 引入额外的状态来处理这个事情。
- Simplicity is ALL,因此期望这个方案简单可靠。
我们使用了 NOTIFICATION,可以获取到后端是否有尚未消费的消息。拥有了上述后端消息情况的信息,就能够更加智能地指导 Proxy 侧的消息拉取。
通过重构 NOTIFICATION,我们对其进行了一些改进,以更好地适应这个方案的要求。
pop with notify
一个客户端的请求可以被抽象为一个长轮询任务,该轮询任务由通知任务和请求任务组成。
通知任务的目的是获取 Broker 是否存在可消费的消息,对应的是 Notification 请求;而请求任务的目的是消费 Broker 上的消息,对应的是 Pop 请求。
首先,长轮询任务会执行一次 Pop 请求,以确保在消息积压的情况下能够高效处理。如果成功获取到消息,则会正常返回结果并结束任务。如果没有获取到消息,并且还有剩余的轮询时间,则会向每个 Broker 提交一个异步通知任务。
在任务通知返回时,如果不存在任何消息,长轮询任务将被标记为已完成状态。然而,如果相关的 Broker 存在消息,该结果将被添加到队列中,并且消费任务将被启动。该队列的目的在于缓存多个返回结果,以备将来的重试之需。对于单机代理而言,只要存在一个通知结果返回消息,Proxy 即可进行消息拉取操作。然而,在实际的分布式环境中,可能会存在多个代理,因此即使通知结果返回消息存在,也不能保证客户端能够成功拉取消息。因此,该队列的设计旨在避免发生这种情况。

消费任务会从上述队列中获取结果,若无结果,则直接返回。这是因为只有在通知任务返回该 Broker 存在消息时,消费任务才会被触发。因此,若消费任务无法获取结果,可推断其他并发的消费任务已经处理了该消息。
消费任务从队列获取到结果后,会进行加锁,以确保一个长轮询任务只有一个正在进行的消费任务,以避免额外的未被处理的消息。

如果获取到消息或长轮询时间结束,该任务会被标记完成并返回结果。但如果没有获取到消息(可能是其他客户端的并发操作),则会继续发起该路由所对应的异步通知任务,并尝试进行消费。
自适应切换
考虑到当请求较多时,无需采用 pop with notify 机制,可使用原先的 pop 长轮询 broker 方案,但是需要考虑的是,如何在两者之间进行自适应切换。目前是基于当前 Proxy 统计的 pop 请求数做判断,当请求数少于某一值时,则认为当前请求较少,使用 pop with notify;反之则使用 pop 长轮询。
由于上述方案基于的均为单机视角,因此当消费请求在 proxy 侧不均衡时,可能会导致判断条件结果有所偏差。
Metric
为了之后进一步调优长轮询和观察长轮询的效果,我们设计了一组 metric 指标,来记录并观测实时长轮询的表现和损耗。
- 客户端发起的长轮询次数 (is_long_polling)
- pop with notify 次数 (通过现有 rpc metric 统计)
- 首次 pop 请求命中消息次数 (未触发 notify) (is_short_polling_hit)
使用方式
在使用时需明确长轮询和短轮询的区分,可以参考 AWS 的定义,当轮询时间大于 0 时,长轮询生效。

可以看到需明确一个长轮询最小时间,因为长轮询时间过小时无意义,AWS 的最小值采取了 1 秒。

在目前版本的 Apache RocketMQ 服务端中,采用了最小 5 秒的限制,即需超过 5 秒才能触发长轮询,该值可在 ProxyConfig#grpcClientConsumerMinLongPollingTimeoutMillis 中配置或修改。
对于 SimpleConsumer 而言,可以通过 awaitDuration 字段来调整长轮询时间。
SimpleConsumer consumer = provider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup(consumerGroup)// set await duration for long-polling..setAwaitDuration(awaitDuration).setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)).build();
总结
通过如上方案,我们成功设计了一套基于无状态消费方式的实时消费方案,在做到客户端无状态消费的同时,还能够避免 false empty response,保证消费的实时性,同时,相较于原先 PushConsumer 的长轮询方案,能够大量减少用户侧无效请求数量,降低网络开销。
RocketMQ 学习社区体验地址
RocketMQ 学习社区重磅上线!AI 互动,一秒了解 RocketMQ 功能源码。RocketMQ 学习社区是国内首个基于 AIGC 提供的知识服务社区,旨在成为 RocketMQ 学习路上的“贴身小二”。
PS:RocketMQ 社区以 RocketMQ 5.0 资料为主要训练内容,持续优化迭代中,回答内容均由人工智能模型生成,其准确性和完整性无法保证,且不代表 RocketMQ 学习社区的态度或观点。
立即体验 RocketMQ 学习社区(建议 PC 端体验完整功能):https://rocketmq-learning.com/
为了帮助用户更全面的了解 RocketMQ 5.0,同时收集更多反馈,**「寻找 RocketMQ 首席评测官」**活动惊喜上线!
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WRUz4bGL-1690130818194)(https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/df4c1c35104e46c086cbd7f6bdc7c5aa~tplv-k3u1fbpfcp-zoom-1.image “image”)]
点击此处,即可报名参加!
相关文章:
RocketMQ 5.0 无状态实时性消费详解
作者:绍舒 背景 RocketMQ 5.0 版本引入了 Proxy 模块、无状态 pop 消费机制和 gRPC 协议等创新功能,同时还推出了一种全新的客户端类型:SimpleConsumer。 SimpleConsumer 客户端采用了无状态的 pop 机制,彻底解决了在客户端发布…...
本地 IDC 中的 K8s 集群如何以 Serverless 方式使用云上计算资源
作者:庄宇 在前一篇文章《应对突发流量,如何快速为自建 K8s 添加云上弹性能力》中,我们介绍了如何为 IDC 中 K8s 集群添加云上节点,应对业务流量的增长,通过多级弹性调度,灵活使用云上资源,并通…...
MySQL - 安装、连接、简单介绍
1、安装 MySQL8.0 安装MySQL 8.0的步骤,以 Windows 为例: 1.1 下载MySQL Installer: 需要从MySQL官方网站下载MySQL Installer。在下载页面中,选择适用于Windows的MySQL Installer并下载。 1.2 运行MySQL Installer࿱…...
【算法】求欧拉函数(包括完整的证明以及代码模板,建议收藏)
求欧拉函数 前置知识 互质:互质是公约数只有1的两个整数,叫做互质整数。 欧拉函数定义 1 ∼ N − 1 1∼N-1 1∼N−1中与N互质的数的个数被称为欧拉函数,记为 ϕ ( N ) \phi(N) ϕ(N)。 若在算数基本定理中, N p 1 a 1 p 2 a 2 .…...
Ceph的应用
文章目录 一、创建 CephFS 文件系统 MDS 接口1)在管理节点创建 mds 服务2)查看各个节点的 mds 服务3)创建存储池,启用 ceph 文件系统4)查看mds状态,一个up,其余两个待命,目前的工作的…...
mac m1 触控栏TouchBar功能栏异常
电脑可能在高温下运行时间过长,导致TouchBar之前正常显示的调整屏幕亮度与调整声音等功能的按钮均丢失,然后看了一眼键盘设置,设置也是正常的,已勾选显示功能栏 下面请看 如何在MacBook Pro(macOS Monterey࿰…...
“奢侈品”价格的“快消品”,竹叶青这么想赚年轻人的“茶水钱”?
文 | 螳螂观察 作者 | 青月 或许是受养生焦虑的影响,这届年轻人似乎爱上了喝茶。 《抖音电商茶行业洞察报告》数据显示, 年轻客群已经成为了抖音电商茶行业的增长极,在茶叶、茶具、茶文化书籍等方面,18-30岁消费者是当之无愧消…...
【Matlab】基于随机森林算法的时间序列预测(Excel可直接替换数据)
【Matlab】基于随机森林算法的时间序列预测(Excel可直接替换数据) 1.模型原理2.数学公式3.文件结构4.Excel数据5.分块代码6.完整代码7.运行结果1.模型原理 基于随机森林算法的时间序列预测是一种利用随机森林模型来解决时间序列预测问题的方法。在传统的随机森林算法中,对于…...
vue 中断请求
1 背景:针对一些请求时间较长,组件销毁后即中断请求; 2 方法: data(){return {//用于取消请求abortController:new AbortController(), } }, created(){//请求接口this.groundAcquisition(); }, beforeDestroy(){//中断请求this.…...
Jwt(Json web token)——从Http协议到session+cookie到Token Jwt介绍 Jwt的应用:登陆验证的流程
目录 引出从Http协议到session&cookie到TokenHTTP协议session & cookiesessioncookie为什么需要session & cookie? JavaEE传统解决长连接方案问题:分布式不适用解决方案:令牌Token Jwt,Json web tokenjwt的结构Header加密算法Ba…...
Java使用 java.util.regex.Pattern 正则表达式校验参数值是否规范
场景: java中我们可以利用 Pattern 注解对某个入参进行规则校验,但有些特殊参数在接口入口处不方便校验,需要在代码中校验 一、使用 Pattern 注解校验 Pattern(regexp "^[a-zA-Z0-9]$", message "xxx号限输入字母、…...
HDFS基本操作命令
这里写目录标题 HDFS Shell CLI客户端说明常用命令hadoop fs -mkdir [-p] <path>hadoop fs -ls [-h] [-R] [<path>...]上传文件到指定目录下方法一:hadoop fs -put [-f] [-p] <localsrc>.....<dst>方法二:hadoop fs -moveFromLocal <loc…...
git 实操
首先有安装好的git,安装好后,会在任一目录下右键出现git bash和git gui两个选项 打开git bash,设置好全局变量,用户名和邮箱,设置方法为: git config -- global user.name "xxx" git config --global user.email "xxxxxx.com" 1.创建版本库 git init 命…...
Visual Studio Code Python 扩展中的包管理
排版:Alan Wang Python 凭借其简单的语法和强大的库,目前已成为最流行的编程语言之一,也是最适合那些刚接触编程的人们的语言。但是,随着项目复杂性和规模的增长,管理依赖项的复杂性也会增加。当新用户不断承接更成熟的…...
spring学习笔记九
数据源对象管理 1、加入pom坐标 <dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.16</version></dependency><!-- https://mvnrepository.com/artifact/c3p0/c3p0 --><depe…...
java list stream 使用
1、实现List对象集合的简单去重(distinct()) List<User> list list.stream().distinct().collect(Collectors.toList()); 2、实现List集合的根据属性(name)去重 list list.stream().filter(o -> o.getName() ! …...
两个Ubuntu电脑用SSH远程连接
两个Ubuntu电脑用SSH远程连接 1.ssh客户端及服务端的安装: 打开终端后,只需要以下两个命令即可 sudo apt-get install openssh-clientsudo apt-get install openssh-server2.启动ssh服务,执行以下命令: sudo /etc/init.d/ssh …...
讲解 @ServletComponentScan注解
目录: 1、用法介绍2、实例讲解 1、介绍 在SpringBoot项目启动器中添加ServletComponentScan注解后,SpringBoot在启动时会扫描并注册所有带有WebServlet(控制器)、WebFilter(过滤器)、WebListener(监听器&a…...
20款奔驰S350商务型加装原厂前排座椅通风系统,夏天必备的功能
通风座椅的主动通风功能可以迅速将座椅表面温度降至适宜程度,从而确保最佳座椅舒适性。该功能启用后,车内空气透过打孔皮饰座套被吸入座椅内部,持续时间为 8 分钟。然后,风扇会自动改变旋转方向,将更凉爽的环境空气从座…...
Rust vs Go:常用语法对比(十一)
题目来自 Rust Vs Go: Which Language Is Better For Developing High-Performance Applications?[1] 202. Sum of squares Calculate the sum of squares s of data, an array of floating point values. 计算平方和 package mainimport ( "math")func main() { da…...
Python:操作 Excel 折叠
💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 Python 操作 Excel 系列 读取单元格数据按行写入设置行高和列宽自动调整行高和列宽水平…...
CMake基础:构建流程详解
目录 1.CMake构建过程的基本流程 2.CMake构建的具体步骤 2.1.创建构建目录 2.2.使用 CMake 生成构建文件 2.3.编译和构建 2.4.清理构建文件 2.5.重新配置和构建 3.跨平台构建示例 4.工具链与交叉编译 5.CMake构建后的项目结构解析 5.1.CMake构建后的目录结构 5.2.构…...
【解密LSTM、GRU如何解决传统RNN梯度消失问题】
解密LSTM与GRU:如何让RNN变得更聪明? 在深度学习的世界里,循环神经网络(RNN)以其卓越的序列数据处理能力广泛应用于自然语言处理、时间序列预测等领域。然而,传统RNN存在的一个严重问题——梯度消失&#…...
汽车生产虚拟实训中的技能提升与生产优化
在制造业蓬勃发展的大背景下,虚拟教学实训宛如一颗璀璨的新星,正发挥着不可或缺且日益凸显的关键作用,源源不断地为企业的稳健前行与创新发展注入磅礴强大的动力。就以汽车制造企业这一极具代表性的行业主体为例,汽车生产线上各类…...
鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院挂号小程序
一、开发准备 环境搭建: 安装DevEco Studio 3.0或更高版本配置HarmonyOS SDK申请开发者账号 项目创建: File > New > Create Project > Application (选择"Empty Ability") 二、核心功能实现 1. 医院科室展示 /…...
三体问题详解
从物理学角度,三体问题之所以不稳定,是因为三个天体在万有引力作用下相互作用,形成一个非线性耦合系统。我们可以从牛顿经典力学出发,列出具体的运动方程,并说明为何这个系统本质上是混沌的,无法得到一般解…...
OPENCV形态学基础之二腐蚀
一.腐蚀的原理 (图1) 数学表达式:dst(x,y) erode(src(x,y)) min(x,y)src(xx,yy) 腐蚀也是图像形态学的基本功能之一,腐蚀跟膨胀属于反向操作,膨胀是把图像图像变大,而腐蚀就是把图像变小。腐蚀后的图像变小变暗淡。 腐蚀…...
FFmpeg:Windows系统小白安装及其使用
一、安装 1.访问官网 Download FFmpeg 2.点击版本目录 3.选择版本点击安装 注意这里选择的是【release buids】,注意左上角标题 例如我安装在目录 F:\FFmpeg 4.解压 5.添加环境变量 把你解压后的bin目录(即exe所在文件夹)加入系统变量…...
WebRTC从入门到实践 - 零基础教程
WebRTC从入门到实践 - 零基础教程 目录 WebRTC简介 基础概念 工作原理 开发环境搭建 基础实践 三个实战案例 常见问题解答 1. WebRTC简介 1.1 什么是WebRTC? WebRTC(Web Real-Time Communication)是一个支持网页浏览器进行实时语音…...
区块链技术概述
区块链技术是一种去中心化、分布式账本技术,通过密码学、共识机制和智能合约等核心组件,实现数据不可篡改、透明可追溯的系统。 一、核心技术 1. 去中心化 特点:数据存储在网络中的多个节点(计算机),而非…...
