RocketMQ 5.0 无状态实时性消费详解
作者:绍舒
背景
RocketMQ 5.0 版本引入了 Proxy 模块、无状态 pop 消费机制和 gRPC 协议等创新功能,同时还推出了一种全新的客户端类型:SimpleConsumer。
SimpleConsumer 客户端采用了无状态的 pop 机制,彻底解决了在客户端发布消息、上下线时可能出现的负载均衡问题。然而,这种新机制也带来了一个新的挑战:当客户端数量较少且消息数量较少时,可能会出现消息消费延时的情况。
在当前的消息产品中,消费普遍使用了长轮询机制,即客户端向服务端发送一个超时时间相对较长的请求,该请求会一直挂起,除非队列中存在消息或该请求到达设定的长轮询时间。
然而,在引入 Proxy 之后,目前的长轮询机制出现了一个问题。客户端层面的长轮询和 Proxy 与 Broker 内部的长轮询之间互相耦合,也就是说,一次客户端对 Proxy 的长轮询只对应一次 Proxy 对 Broker 的长轮询。因此,在以下情况下会出现问题:当客户端数量较少且后端存在多个可用的 Broker 时,如果请求到达了没有消息的 Broker,就会触发长轮询挂起逻辑。此时,即使另一台 Broker 存在消息,由于请求挂在了另一个 Broker 上,也无法拉取到消息。这导致客户端无法实时接收到消息,即 false empty response。
这种情况可能导致以下现象:用户发送一条消息后,再次发起消费请求,但该请求却无法实时拉取到消息。这种情况对于消息传递的实时性和可靠性产生了不利影响。
AWS 的文档里也有描述此等现象,他们的解决方案是通过查询是所有的后端服务,减少 false empty response。

其他产品
在设计方案时,首先是需要目前存在的消息商业化产品是如何处理该问题的。
MNS 采取了以下策略,主要是将长轮询时间切割为多个短轮询时间片,以尽可能覆盖所有的 Broker。
首先,在长轮询时间内,会对后端的 Broker 进行多次请求。其次,当未超过短轮询配额时,优先使用短轮询消费请求来与 Broker 进行通信,否则将使用长轮询,其时间等于客户端的长轮询时间。此外,考虑到过多的短轮询可能会导致 CPU 和网络资源消耗过多的问题,因此在短轮询超过一定数量且剩余时间充足时,最后一次请求将转为长轮询。
然而,上述策略虽以尽可能轮询完所有的 Broker 为目标,但并不能解决所有问题。当轮询时间较短或 Broker 数量较多时,无法轮询完所有的 Broker。即使时间足够充足的情况下,也有可能出现时间错位的情况,即在短轮询请求结束后,才有消息在该 Broker 上就绪,导致无法及时取回该消息。
解法
技术方案
首先,需要明确该问题的范围和条件。该问题只会在客户端数量较少且请求较少的情况下出现。当客户端数量较多且具备充足的请求能力时,该问题不会出现。因此,理想情况是设计一个自适应的方案,能够在客户端数量较多时不引入额外成本来解决该问题。
为了解决该问题,关键在于将前端的客户端长轮询和后端的 Broker 长轮询解耦,并赋予 Proxy 感知后端消息个数的能力,使其能够优先选择有消息的 Broker,避免 false empty response。
考虑到 Pop 消费本身的无状态属性,期望设计方案的逻辑与 Pop 一致,而不在代理中引入额外的状态来处理该问题。
另外,简洁性是非常重要的,因此期望该方案能够保持简单可靠,不引入过多的复杂性。
- 为了解决该问题,本质上是要将前端的客户端长轮询和后端的 Broker 长轮询解耦开来,并赋予 Proxy 感知后端消息个数的能力,能够优先选择有消息的 Broker,避免 false empty response。
- 由于 Pop 消费本身的无状态属性,因此期望该方案的设计逻辑和 Pop 一致,而不在 Proxy 引入额外的状态来处理这个事情。
- Simplicity is ALL,因此期望这个方案简单可靠。
我们使用了 NOTIFICATION,可以获取到后端是否有尚未消费的消息。拥有了上述后端消息情况的信息,就能够更加智能地指导 Proxy 侧的消息拉取。
通过重构 NOTIFICATION,我们对其进行了一些改进,以更好地适应这个方案的要求。
pop with notify
一个客户端的请求可以被抽象为一个长轮询任务,该轮询任务由通知任务和请求任务组成。
通知任务的目的是获取 Broker 是否存在可消费的消息,对应的是 Notification 请求;而请求任务的目的是消费 Broker 上的消息,对应的是 Pop 请求。
首先,长轮询任务会执行一次 Pop 请求,以确保在消息积压的情况下能够高效处理。如果成功获取到消息,则会正常返回结果并结束任务。如果没有获取到消息,并且还有剩余的轮询时间,则会向每个 Broker 提交一个异步通知任务。
在任务通知返回时,如果不存在任何消息,长轮询任务将被标记为已完成状态。然而,如果相关的 Broker 存在消息,该结果将被添加到队列中,并且消费任务将被启动。该队列的目的在于缓存多个返回结果,以备将来的重试之需。对于单机代理而言,只要存在一个通知结果返回消息,Proxy 即可进行消息拉取操作。然而,在实际的分布式环境中,可能会存在多个代理,因此即使通知结果返回消息存在,也不能保证客户端能够成功拉取消息。因此,该队列的设计旨在避免发生这种情况。

消费任务会从上述队列中获取结果,若无结果,则直接返回。这是因为只有在通知任务返回该 Broker 存在消息时,消费任务才会被触发。因此,若消费任务无法获取结果,可推断其他并发的消费任务已经处理了该消息。
消费任务从队列获取到结果后,会进行加锁,以确保一个长轮询任务只有一个正在进行的消费任务,以避免额外的未被处理的消息。

如果获取到消息或长轮询时间结束,该任务会被标记完成并返回结果。但如果没有获取到消息(可能是其他客户端的并发操作),则会继续发起该路由所对应的异步通知任务,并尝试进行消费。
自适应切换
考虑到当请求较多时,无需采用 pop with notify 机制,可使用原先的 pop 长轮询 broker 方案,但是需要考虑的是,如何在两者之间进行自适应切换。目前是基于当前 Proxy 统计的 pop 请求数做判断,当请求数少于某一值时,则认为当前请求较少,使用 pop with notify;反之则使用 pop 长轮询。
由于上述方案基于的均为单机视角,因此当消费请求在 proxy 侧不均衡时,可能会导致判断条件结果有所偏差。
Metric
为了之后进一步调优长轮询和观察长轮询的效果,我们设计了一组 metric 指标,来记录并观测实时长轮询的表现和损耗。
- 客户端发起的长轮询次数 (is_long_polling)
- pop with notify 次数 (通过现有 rpc metric 统计)
- 首次 pop 请求命中消息次数 (未触发 notify) (is_short_polling_hit)
使用方式
在使用时需明确长轮询和短轮询的区分,可以参考 AWS 的定义,当轮询时间大于 0 时,长轮询生效。

可以看到需明确一个长轮询最小时间,因为长轮询时间过小时无意义,AWS 的最小值采取了 1 秒。

在目前版本的 Apache RocketMQ 服务端中,采用了最小 5 秒的限制,即需超过 5 秒才能触发长轮询,该值可在 ProxyConfig#grpcClientConsumerMinLongPollingTimeoutMillis 中配置或修改。
对于 SimpleConsumer 而言,可以通过 awaitDuration 字段来调整长轮询时间。
SimpleConsumer consumer = provider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup(consumerGroup)// set await duration for long-polling..setAwaitDuration(awaitDuration).setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)).build();
总结
通过如上方案,我们成功设计了一套基于无状态消费方式的实时消费方案,在做到客户端无状态消费的同时,还能够避免 false empty response,保证消费的实时性,同时,相较于原先 PushConsumer 的长轮询方案,能够大量减少用户侧无效请求数量,降低网络开销。
RocketMQ 学习社区体验地址
RocketMQ 学习社区重磅上线!AI 互动,一秒了解 RocketMQ 功能源码。RocketMQ 学习社区是国内首个基于 AIGC 提供的知识服务社区,旨在成为 RocketMQ 学习路上的“贴身小二”。
PS:RocketMQ 社区以 RocketMQ 5.0 资料为主要训练内容,持续优化迭代中,回答内容均由人工智能模型生成,其准确性和完整性无法保证,且不代表 RocketMQ 学习社区的态度或观点。
立即体验 RocketMQ 学习社区(建议 PC 端体验完整功能):https://rocketmq-learning.com/
为了帮助用户更全面的了解 RocketMQ 5.0,同时收集更多反馈,**「寻找 RocketMQ 首席评测官」**活动惊喜上线!
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WRUz4bGL-1690130818194)(https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/df4c1c35104e46c086cbd7f6bdc7c5aa~tplv-k3u1fbpfcp-zoom-1.image “image”)]
点击此处,即可报名参加!
相关文章:
RocketMQ 5.0 无状态实时性消费详解
作者:绍舒 背景 RocketMQ 5.0 版本引入了 Proxy 模块、无状态 pop 消费机制和 gRPC 协议等创新功能,同时还推出了一种全新的客户端类型:SimpleConsumer。 SimpleConsumer 客户端采用了无状态的 pop 机制,彻底解决了在客户端发布…...
本地 IDC 中的 K8s 集群如何以 Serverless 方式使用云上计算资源
作者:庄宇 在前一篇文章《应对突发流量,如何快速为自建 K8s 添加云上弹性能力》中,我们介绍了如何为 IDC 中 K8s 集群添加云上节点,应对业务流量的增长,通过多级弹性调度,灵活使用云上资源,并通…...
MySQL - 安装、连接、简单介绍
1、安装 MySQL8.0 安装MySQL 8.0的步骤,以 Windows 为例: 1.1 下载MySQL Installer: 需要从MySQL官方网站下载MySQL Installer。在下载页面中,选择适用于Windows的MySQL Installer并下载。 1.2 运行MySQL Installer࿱…...
【算法】求欧拉函数(包括完整的证明以及代码模板,建议收藏)
求欧拉函数 前置知识 互质:互质是公约数只有1的两个整数,叫做互质整数。 欧拉函数定义 1 ∼ N − 1 1∼N-1 1∼N−1中与N互质的数的个数被称为欧拉函数,记为 ϕ ( N ) \phi(N) ϕ(N)。 若在算数基本定理中, N p 1 a 1 p 2 a 2 .…...
Ceph的应用
文章目录 一、创建 CephFS 文件系统 MDS 接口1)在管理节点创建 mds 服务2)查看各个节点的 mds 服务3)创建存储池,启用 ceph 文件系统4)查看mds状态,一个up,其余两个待命,目前的工作的…...
mac m1 触控栏TouchBar功能栏异常
电脑可能在高温下运行时间过长,导致TouchBar之前正常显示的调整屏幕亮度与调整声音等功能的按钮均丢失,然后看了一眼键盘设置,设置也是正常的,已勾选显示功能栏 下面请看 如何在MacBook Pro(macOS Monterey࿰…...
“奢侈品”价格的“快消品”,竹叶青这么想赚年轻人的“茶水钱”?
文 | 螳螂观察 作者 | 青月 或许是受养生焦虑的影响,这届年轻人似乎爱上了喝茶。 《抖音电商茶行业洞察报告》数据显示, 年轻客群已经成为了抖音电商茶行业的增长极,在茶叶、茶具、茶文化书籍等方面,18-30岁消费者是当之无愧消…...
【Matlab】基于随机森林算法的时间序列预测(Excel可直接替换数据)
【Matlab】基于随机森林算法的时间序列预测(Excel可直接替换数据) 1.模型原理2.数学公式3.文件结构4.Excel数据5.分块代码6.完整代码7.运行结果1.模型原理 基于随机森林算法的时间序列预测是一种利用随机森林模型来解决时间序列预测问题的方法。在传统的随机森林算法中,对于…...
vue 中断请求
1 背景:针对一些请求时间较长,组件销毁后即中断请求; 2 方法: data(){return {//用于取消请求abortController:new AbortController(), } }, created(){//请求接口this.groundAcquisition(); }, beforeDestroy(){//中断请求this.…...
Jwt(Json web token)——从Http协议到session+cookie到Token Jwt介绍 Jwt的应用:登陆验证的流程
目录 引出从Http协议到session&cookie到TokenHTTP协议session & cookiesessioncookie为什么需要session & cookie? JavaEE传统解决长连接方案问题:分布式不适用解决方案:令牌Token Jwt,Json web tokenjwt的结构Header加密算法Ba…...
Java使用 java.util.regex.Pattern 正则表达式校验参数值是否规范
场景: java中我们可以利用 Pattern 注解对某个入参进行规则校验,但有些特殊参数在接口入口处不方便校验,需要在代码中校验 一、使用 Pattern 注解校验 Pattern(regexp "^[a-zA-Z0-9]$", message "xxx号限输入字母、…...
HDFS基本操作命令
这里写目录标题 HDFS Shell CLI客户端说明常用命令hadoop fs -mkdir [-p] <path>hadoop fs -ls [-h] [-R] [<path>...]上传文件到指定目录下方法一:hadoop fs -put [-f] [-p] <localsrc>.....<dst>方法二:hadoop fs -moveFromLocal <loc…...
git 实操
首先有安装好的git,安装好后,会在任一目录下右键出现git bash和git gui两个选项 打开git bash,设置好全局变量,用户名和邮箱,设置方法为: git config -- global user.name "xxx" git config --global user.email "xxxxxx.com" 1.创建版本库 git init 命…...
Visual Studio Code Python 扩展中的包管理
排版:Alan Wang Python 凭借其简单的语法和强大的库,目前已成为最流行的编程语言之一,也是最适合那些刚接触编程的人们的语言。但是,随着项目复杂性和规模的增长,管理依赖项的复杂性也会增加。当新用户不断承接更成熟的…...
spring学习笔记九
数据源对象管理 1、加入pom坐标 <dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.16</version></dependency><!-- https://mvnrepository.com/artifact/c3p0/c3p0 --><depe…...
java list stream 使用
1、实现List对象集合的简单去重(distinct()) List<User> list list.stream().distinct().collect(Collectors.toList()); 2、实现List集合的根据属性(name)去重 list list.stream().filter(o -> o.getName() ! …...
两个Ubuntu电脑用SSH远程连接
两个Ubuntu电脑用SSH远程连接 1.ssh客户端及服务端的安装: 打开终端后,只需要以下两个命令即可 sudo apt-get install openssh-clientsudo apt-get install openssh-server2.启动ssh服务,执行以下命令: sudo /etc/init.d/ssh …...
讲解 @ServletComponentScan注解
目录: 1、用法介绍2、实例讲解 1、介绍 在SpringBoot项目启动器中添加ServletComponentScan注解后,SpringBoot在启动时会扫描并注册所有带有WebServlet(控制器)、WebFilter(过滤器)、WebListener(监听器&a…...
20款奔驰S350商务型加装原厂前排座椅通风系统,夏天必备的功能
通风座椅的主动通风功能可以迅速将座椅表面温度降至适宜程度,从而确保最佳座椅舒适性。该功能启用后,车内空气透过打孔皮饰座套被吸入座椅内部,持续时间为 8 分钟。然后,风扇会自动改变旋转方向,将更凉爽的环境空气从座…...
Rust vs Go:常用语法对比(十一)
题目来自 Rust Vs Go: Which Language Is Better For Developing High-Performance Applications?[1] 202. Sum of squares Calculate the sum of squares s of data, an array of floating point values. 计算平方和 package mainimport ( "math")func main() { da…...
使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式
一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明:假设每台服务器已…...
第19节 Node.js Express 框架
Express 是一个为Node.js设计的web开发框架,它基于nodejs平台。 Express 简介 Express是一个简洁而灵活的node.js Web应用框架, 提供了一系列强大特性帮助你创建各种Web应用,和丰富的HTTP工具。 使用Express可以快速地搭建一个完整功能的网站。 Expre…...
docker详细操作--未完待续
docker介绍 docker官网: Docker:加速容器应用程序开发 harbor官网:Harbor - Harbor 中文 使用docker加速器: Docker镜像极速下载服务 - 毫秒镜像 是什么 Docker 是一种开源的容器化平台,用于将应用程序及其依赖项(如库、运行时环…...
全球首个30米分辨率湿地数据集(2000—2022)
数据简介 今天我们分享的数据是全球30米分辨率湿地数据集,包含8种湿地亚类,该数据以0.5X0.5的瓦片存储,我们整理了所有属于中国的瓦片名称与其对应省份,方便大家研究使用。 该数据集作为全球首个30米分辨率、覆盖2000–2022年时间…...
[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?
论文网址:pdf 英文是纯手打的!论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误,若有发现欢迎评论指正!文章偏向于笔记,谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...
Neo4j 集群管理:原理、技术与最佳实践深度解析
Neo4j 的集群技术是其企业级高可用性、可扩展性和容错能力的核心。通过深入分析官方文档,本文将系统阐述其集群管理的核心原理、关键技术、实用技巧和行业最佳实践。 Neo4j 的 Causal Clustering 架构提供了一个强大而灵活的基石,用于构建高可用、可扩展且一致的图数据库服务…...
css的定位(position)详解:相对定位 绝对定位 固定定位
在 CSS 中,元素的定位通过 position 属性控制,共有 5 种定位模式:static(静态定位)、relative(相对定位)、absolute(绝对定位)、fixed(固定定位)和…...
Spring AI与Spring Modulith核心技术解析
Spring AI核心架构解析 Spring AI(https://spring.io/projects/spring-ai)作为Spring生态中的AI集成框架,其核心设计理念是通过模块化架构降低AI应用的开发复杂度。与Python生态中的LangChain/LlamaIndex等工具类似,但特别为多语…...
学习STC51单片机32(芯片为STC89C52RCRC)OLED显示屏2
每日一言 今天的每一份坚持,都是在为未来积攒底气。 案例:OLED显示一个A 这边观察到一个点,怎么雪花了就是都是乱七八糟的占满了屏幕。。 解释 : 如果代码里信号切换太快(比如 SDA 刚变,SCL 立刻变&#…...
MySQL 知识小结(一)
一、my.cnf配置详解 我们知道安装MySQL有两种方式来安装咱们的MySQL数据库,分别是二进制安装编译数据库或者使用三方yum来进行安装,第三方yum的安装相对于二进制压缩包的安装更快捷,但是文件存放起来数据比较冗余,用二进制能够更好管理咱们M…...
