面试题---深入源码理解MQ长轮询优化机制
引言
在分布式系统中,消息队列(MQ)作为一种重要的中间件,广泛应用于解耦、异步处理、流量削峰等场景。其中,延时消息和定时消息作为MQ的高级功能,能够进一步满足复杂的业务需求。为了实现这些功能,MQ系统需要进行一系列优化,长轮询机制便是其中的关键一环。本文将深入探讨MQ如何设计延时消息和定时消息的优化机制,特别是长轮询机制的实现原理及其在Java中的模拟实现。
一、MQ基础概念与业务场景
1.1 MQ基础概念
MQ(Message Queue)即消息队列,是一种应用程序对应用程序的通信方法。它通过在发送方和接收方之间引入一个中间层,实现异步、解耦的消息传递。常见的MQ产品有ActiveMQ、RabbitMQ、Kafka、RocketMQ等。
1.2 业务场景
延时消息
延时消息指的是消息在发送到MQ后,并不会立即被消费者消费,而是等待一段指定的时间后才被投递给消费者。这种机制广泛应用于以下场景:
- 订单超时处理:用户下单后,如果长时间未支付,系统自动取消订单。
- 短信验证码:用户注册或登录时,发送验证码短信,验证码在一定时间内有效。
- 任务调度:在指定时间后执行某项任务,如定时清理日志、备份数据等。
定时消息
定时消息与延时消息类似,但更加灵活。它允许用户指定消息在将来的某个具体时间点被投递给消费者。定时消息适用于以下场景:
- 定时通知:在指定时间点发送通知消息,如每日工作报告、定时提醒等。
- 周期性任务:按照固定的时间间隔执行任务,如每小时数据汇总、每日系统维护等。
二、MQ长轮询机制原理
2.1 轮询与长轮询
轮询
轮询是一种客户端与服务器之间实时通信的技术手段。客户端定期发送请求来查询服务器是否有新数据或事件,并将响应返回给客户端。轮询的优点是简单易实现,适用于各种浏览器和服务器。然而,轮询也存在明显的缺点:会产生大量的无效请求,浪费带宽和服务器资源,产生不必要的网络流量和延迟。
长轮询
长轮询是对轮询的一种改进。在长轮询中,客户端发送一个HTTP请求给服务器,并保持连接打开。如果服务器没有新数据,则不会立即返回响应,而是将请求挂起,直到有新数据到达或超时。这种方式显著减少了无效的网络请求,提高了数据更新的实时性。
2.2 长轮询机制在MQ中的应用
在MQ系统中,长轮询机制主要用于优化消费者拉取消息的过程。传统的轮询方式下,消费者需要定期向Broker发送拉取请求,即使Broker没有新消息也会返回空响应。这种方式会导致大量的无效请求和资源浪费。而长轮询机制则允许消费者在没有新消息时保持连接挂起状态,直到有新消息到达或超时后再返回响应。这样,消费者可以实时地获取新消息,同时减少了无效请求和资源浪费。
三、RocketMQ长轮询机制源码分析
3.1 RocketMQ概述
RocketMQ是一款分布式消息中间件,由阿里巴巴开源。它支持高吞吐、低延迟的消息传递,并提供了丰富的消息过滤、顺序消息、事务消息等高级功能。RocketMQ中的消费者拉取消息时,就采用了长轮询机制来优化性能。
3.2 PullMessageService组件
在RocketMQ中,PullMessageService
组件负责处理消费者的拉取请求。它是一个后台线程服务,会不断地从pullRequestQueue
中取出PullRequest
对象,并向Broker发送拉取请求。
java复制代码
public void run() {
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);} catch (InterruptedException ignored) {} catch (Exception e) {log.error("Pull Message Service Run Method exception", e);}}
}
3.3 PullRequest对象
PullRequest
对象表示一个拉取请求,它包含了消费者的消息队列、拉取偏移量、挂起时间等信息。当PullMessageService
从pullRequestQueue
中取出PullRequest
对象后,会调用pullMessage
方法向Broker发送拉取请求。
java复制代码
public void pullMessage(final PullRequest pullRequest) {
// ... 省略部分代码 ...
try {
this.executePullRequestImmediately(pullRequest);} catch (Exception e) {
// ... 省略异常处理代码 ...}
}
3.4 长轮询实现细节
在executePullRequestImmediately
方法中,RocketMQ会根据是否启用长轮询机制来决定拉取策略。如果启用了长轮询(longPollingEnable=true
),则会根据消费者设置的挂起超时时间(brokerSuspendMaxTimeMillis
)来决定重试时间。
java复制代码
private void executePullRequestImmediately(final PullRequest pullRequest) {
// ... 省略部分代码 ...
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
// 长轮询逻辑
final long beginLockTimestamp = System.currentTimeMillis();
// ... 省略加锁和超时处理代码 ...
this.pullMessage(pullRequest);
// ... 省略部分代码 ...} else {
// 短轮询逻辑
// ... 省略短轮询处理代码 ...}
}
在长轮询逻辑中,RocketMQ会调用pullMessage
方法向Broker发送拉取请求。如果Broker没有新消息,则会将请求挂起一段时间(默认为5秒),直到有新消息到达或超时后再返回响应。
3.5 PullRequestHoldService与ReputMessageService
RocketMQ中的长轮询机制由PullRequestHoldService
和ReputMessageService
两个线程共同实现。
- PullRequestHoldService:每隔一定时间(默认为5秒)检查
pullRequestTable
中的挂起请求,如果有新消息到达则触发拉取操作,否则继续挂起。 - ReputMessageService:负责处理消息存储中的新消息到达事件。每当有新消息到达时,它会调用
PullRequestHoldService
中的相关方法尝试拉取消息。
这两个线程的协作确保了消费者在没有新消息时不会频繁发送拉取请求,从而减少了无效请求和资源浪费。
四、Java模拟实现长轮询功能
4.1 模拟场景
为了演示长轮询机制的实现原理,我们可以模拟一个简单的场景:客户端向服务器订阅某个频道的消息,服务器在有新消息到达时推送给客户端。客户端使用长轮询机制来保持与服务器的连接并实时获取新消息。
4.2 服务器端实现
服务器端使用Spring Boot框架来创建一个简单的Web服务,并使用DeferredResult
来实现长轮询功能。
java复制代码
@RestController
@RequestMapping("/im")
public class IMController {
private final ConcurrentHashMap<String, DeferredResult<String>> clientMap = new ConcurrentHashMap<>();
private final List<String> messageQueue = new CopyOnWriteArrayList<>();
@GetMapping("/subscribe")
public DeferredResult<String> subscribe(@RequestParam String channel) {DeferredResult<String> deferredResult = new DeferredResult<>(10000L); // 设置超时时间为10秒clientMap.put(channel, deferredResult);
return deferredResult;}
@PostMapping("/send")
public String send(@RequestParam String channel, @RequestParam String message) {messageQueue.add(channel + ":" + message);notifyClients(channel);
return "Message sent";}
private void notifyClients(String channel) {DeferredResult<String> deferredResult = clientMap.get(channel);
if (deferredResult != null) {
String message = messageQueue.poll();
if (message != null) {deferredResult.setResult(message);clientMap.remove(channel);} else {
// 如果没有新消息,则重新放入队列等待下一次检查clientMap.put(channel, deferredResult);}}}
}
在上面的代码中,subscribe
方法用于处理客户端的订阅请求,并返回一个DeferredResult
对象。该对象会在有新消息到达时被设置结果并返回给客户端。send
方法用于处理消息发送请求,并将消息添加到消息队列中。notifyClients
方法负责检查消息队列并通知等待中的客户端。
4.3 客户端实现
客户端使用JavaScript的fetch
API来发送长轮询请求。
javascript复制代码
function subscribe(channel) {
fetch(`/im/subscribe?channel=${channel}`).then(response => {
if (!response.ok) {
throw new Error('Network response was not ok');}
return response.text();}).then(message => {
console.log(`Received message: ${message}`);
// 收到消息后再次发起订阅请求以保持长轮询
setTimeout(() => subscribe(channel), 1000);}).catch(error => {
console.error('There was a problem with the fetch operation:', error);
// 请求失败或超时后重新发起订阅请求
setTimeout(() => subscribe(channel), 5000);});
}
// 示例:订阅"testChannel"频道
subscribe('testChannel');
在上面的代码中,subscribe
函数用于发送订阅请求并保持长轮询连接。当收到服务器返回的消息时,会打印消息内容并再次发起订阅请求以保持连接。如果请求失败或超时,则会在一段时间后重新发起订阅请求。
五、总结与展望
本文深入探讨了MQ系统中长轮询机制的原理及其在RocketMQ中的实现细节。通过源码分析和Java模拟实现,我们了解了长轮询机制如何优化消费者拉取消息的过程,减少无效请求和资源浪费。未来,随着分布式系统的不断发展和消息中间件的不断演进,长轮询机制将继续发挥其重要作用,为消息传递提供更加高效、可靠的解决方案。
同时,我们也应该看到长轮询机制并不是万能的。在实际应用中,我们需要根据具体的业务场景和需求来选择合适的消息传递模式和优化策略。例如,在对于实时性要求极高的场景下,我们可以考虑使用WebSocket等更高级的技术来实现全双工通信。而在对于消息顺序和一致性要求较高的场景下,则需要结合其他机制(如分布式事务、消息重试等)来确保消息的可靠传递。
总之,MQ系统中的长轮询机制是一种重要的优化手段,它能够帮助我们更好地实现消息的异步传递和实时更新。在未来的发展中,我们将继续探索和优化这一机制,为分布式系统的消息传递提供更加高效、可靠的解决方案。
相关文章:
面试题---深入源码理解MQ长轮询优化机制
引言 在分布式系统中,消息队列(MQ)作为一种重要的中间件,广泛应用于解耦、异步处理、流量削峰等场景。其中,延时消息和定时消息作为MQ的高级功能,能够进一步满足复杂的业务需求。为了实现这些功能…...
stable diffusion生成模型
1、stable diffusion Stable Diffusion 是一种扩散模型,基于对图像的逐步去噪过程训练和生成。它的核心包括以下几个步骤: 扩散过程(Diffusion Process)在训练时,向真实图像逐步添加噪声,最终将其变为纯随机噪声。这是一个正向过程,目的是学习如何将复杂的图像分解成随…...
分治法的魅力:高效解决复杂问题的利器
文章目录 分治法 (Divide and Conquer) 综合解析一、基本原理二、应用场景及详细分析1. 排序算法快速排序 (Quicksort)归并排序 (Mergesort) 2. 大整数运算大整数乘法 3. 几何问题最近点对问题 4. 字符串匹配KMP算法的优化版 三、优点四、局限性五、分治法与动态规划的对比六、…...

Spring IOC实战指南:从零到一的构建过程
Spring 优点: 方便解耦,简化开发。将所有对象创建和依赖关系维护交给 Spring 管理(IOC 的作用)AOP 切面编程的支持。方便的实现对程序进行权限的拦截、运行监控等功能(可扩展性)声明式事务的支持。只需通过配置就可以完成对事务的管理,无需手…...
3.langchain中的prompt模板 (few shot examples in chat models)
本教程将介绍如何使用LangChain库和智谱清言的 GLM-4-Plus 模型来理解和推理一个自定义的运算符(例如使用鹦鹉表情符号🦜)。我们将通过一系列示例来训练模型,使其能够理解和推断该运算符的含义。 环境准备 首先,确保…...
量子感知机
神经网络类似于人类大脑,是模拟生物神经网络进行信息处理的一种数学模型。它能解决分类、回归等问题,是机器学习的重要组成部分。量子神经网络是将量子理论与神经网络相结合而产生的一种新型计算模式。1995年美国路易斯安那州立大学KAK教授首次提出了量子…...

VM虚拟机装MAC后无法联网,如何解决?
✨在vm虚拟机上,给虚拟机MacOS设置网络适配器。选择NAT模式用于共享主机的IP地址 ✨在MacOS设置中设置网络 以太网 使用DHCP ✨回到本地电脑上,打开 服务,找到VMware DHCP和VMware NAT,把这两个服务打开,专一般问题就…...

IDEA 基本设置
设置主题 设置字体 设置编码格式 改变字体大小 开启 按住 ctrl 滚轮 改变字体大小。 开启自动编译...

Chrome 浏览器 131 版本新特性
Chrome 浏览器 131 版本新特性 一、Chrome 浏览器 131 版本更新 1. 在 iOS 上使用 Google Lens 搜索 自 Chrome 126 版本以来,用户可以通过 Google Lens 搜索屏幕上看到的任何图片或文字。 要使用此功能,请访问网站,并点击聚焦时出现在地…...
使用php和Xunsearch提升音乐网站的歌曲搜索效果
文章精选推荐 1 JetBrains Ai assistant 编程工具让你的工作效率翻倍 2 Extra Icons:JetBrains IDE的图标增强神器 3 IDEA插件推荐-SequenceDiagram,自动生成时序图 4 BashSupport Pro 这个ides插件主要是用来干嘛的 ? 5 IDEA必装的插件&…...

计算机毕设-基于springboot的高校网上缴费综合务系统视频的设计与实现(附源码+lw+ppt+开题报告)
博主介绍:✌多个项目实战经验、多个大型网购商城开发经验、在某机构指导学员上千名、专注于本行业领域✌ 技术范围:Java实战项目、Python实战项目、微信小程序/安卓实战项目、爬虫大数据实战项目、Nodejs实战项目、PHP实战项目、.NET实战项目、Golang实战…...

STL关联式容器之map
map的特性是,所有元素都会根据元素的键值自动被排序。map的所有元素都是pair,同时拥有实值(value)和键值(key)。pair的第一元素被视为键值,第二元素被视为实值。map不允许两个元素拥有相同的键值。下面是<stl_pair.h>中pair的定义 tem…...

【HarmonyOS】鸿蒙应用唤起系统相机拍照
【HarmonyOS】鸿蒙应用唤起系统相机拍照 方案一: 官方推荐的方式,使用CameraPicker来调用安全相机进行拍照。 let pathDir getContext().filesDir;let fileName ${new Date().getTime()}let filePath pathDir /${fileName}.tmpfileIo.createRandomA…...

Linux系统使用valgrind分析C++程序内存资源使用情况
内存占用是我们开发的时候需要重点关注的一个问题,我们可以人工根据代码推理出一个消耗内存较大的函数,也可以推理出大概会消耗多少内存,但是这种方法不仅麻烦,而且得到的只是推理的数据,而不是实际的数据。 我们可以…...
Java基础夯实——2.7 线程上下文切换
线程上下文切换(Thread Context Switching)是操作系统在多线程环境中,切换CPU从执行一个线程的上下文到另一个线程的上下文的过程。这种切换是实现多线程并发执行的核心机制之一。 1 上下文: 线程的上下文指线程在某一时刻的执行状态,如&am…...
死锁相关习题 10道 附详解
2022 设系统中有三种类型的资源(A,B,C)和五个进程(P1,P2,P3,P4,P5),A资源的数量是17,B资源的数量是6,C资源的数量是19。在T0时刻系统的状态: 最大资源需求量已分配资源量A,B,CA,B,…...

VisionPro 机器视觉案例 之 彩色保险丝个数统计
第十四篇 机器视觉案例 之 彩色保险丝颜色识别个数统计 文章目录 第十四篇 机器视觉案例 之 彩色保险丝颜色识别个数统计1.案例要求2.实现思路2.1 方法一 颜色分离工具CogColorSegmenterTool将每一种颜色分离出来,得到对应的单独图像,使用斑点工具CogBlo…...

go-zero(七) RPC服务和ETCD
go-zero 实现 RPC 服务 在实际的开发中,我们是通过RPC来传递数据的,下面我将通过一个简单的示例,说明如何使用go-zero框架和 Protocol Buffers 定义 RPC 服务。 一、生成 RPC项目 在这个教程中,我们根据user.api文件࿰…...

Jenkins + gitee 自动触发项目拉取部署(Webhook配置)
目录 前言 Generic Webhook Trigger 插件 下载插件 编辑 配置WebHook 生成tocken 总结 前言 前文简单介绍了Jenkins环境搭建,本文主要来介绍一下如何使用 WebHook 触发自动拉取构建项目; Generic Webhook Trigger 插件 实现代码推送后,触…...

043 商品详情
文章目录 详情页数据表结构voSkuItemVo.javaSkuItemSaleAttrVo.javaAttrValueAndSkuIdVo.javaSpuAttrGroupVo.javaGroupAttrParamVo.java pom.xmlSkuSaleAttrValueDao.xmlSkuSaleAttrValueDao.javaAttrGroupDao.xmlAttrGroupServiceImpl.javaSkuInfoServiceImpl.javaSkuSaleAtt…...

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型
摘要 拍照搜题系统采用“三层管道(多模态 OCR → 语义检索 → 答案渲染)、两级检索(倒排 BM25 向量 HNSW)并以大语言模型兜底”的整体框架: 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后,分别用…...
云计算——弹性云计算器(ECS)
弹性云服务器:ECS 概述 云计算重构了ICT系统,云计算平台厂商推出使得厂家能够主要关注应用管理而非平台管理的云平台,包含如下主要概念。 ECS(Elastic Cloud Server):即弹性云服务器,是云计算…...

vscode(仍待补充)
写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh? debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...

高频面试之3Zookeeper
高频面试之3Zookeeper 文章目录 高频面试之3Zookeeper3.1 常用命令3.2 选举机制3.3 Zookeeper符合法则中哪两个?3.4 Zookeeper脑裂3.5 Zookeeper用来干嘛了 3.1 常用命令 ls、get、create、delete、deleteall3.2 选举机制 半数机制(过半机制࿰…...
React Native在HarmonyOS 5.0阅读类应用开发中的实践
一、技术选型背景 随着HarmonyOS 5.0对Web兼容层的增强,React Native作为跨平台框架可通过重新编译ArkTS组件实现85%以上的代码复用率。阅读类应用具有UI复杂度低、数据流清晰的特点。 二、核心实现方案 1. 环境配置 (1)使用React Native…...

【Java_EE】Spring MVC
目录 Spring Web MVC 编辑注解 RestController RequestMapping RequestParam RequestParam RequestBody PathVariable RequestPart 参数传递 注意事项 编辑参数重命名 RequestParam 编辑编辑传递集合 RequestParam 传递JSON数据 编辑RequestBody …...
在鸿蒙HarmonyOS 5中使用DevEco Studio实现录音机应用
1. 项目配置与权限设置 1.1 配置module.json5 {"module": {"requestPermissions": [{"name": "ohos.permission.MICROPHONE","reason": "录音需要麦克风权限"},{"name": "ohos.permission.WRITE…...
Java 二维码
Java 二维码 **技术:**谷歌 ZXing 实现 首先添加依赖 <!-- 二维码依赖 --><dependency><groupId>com.google.zxing</groupId><artifactId>core</artifactId><version>3.5.1</version></dependency><de…...

HashMap中的put方法执行流程(流程图)
1 put操作整体流程 HashMap 的 put 操作是其最核心的功能之一。在 JDK 1.8 及以后版本中,其主要逻辑封装在 putVal 这个内部方法中。整个过程大致如下: 初始判断与哈希计算: 首先,putVal 方法会检查当前的 table(也就…...

Golang——9、反射和文件操作
反射和文件操作 1、反射1.1、reflect.TypeOf()获取任意值的类型对象1.2、reflect.ValueOf()1.3、结构体反射 2、文件操作2.1、os.Open()打开文件2.2、方式一:使用Read()读取文件2.3、方式二:bufio读取文件2.4、方式三:os.ReadFile读取2.5、写…...