当前位置: 首页 > news >正文

面试题---深入源码理解MQ长轮询优化机制

引言

在分布式系统中,消息队列(MQ)作为一种重要的中间件,广泛应用于解耦、异步处理、流量削峰等场景。其中,延时消息和定时消息作为MQ的高级功能,能够进一步满足复杂的业务需求。为了实现这些功能,MQ系统需要进行一系列优化,长轮询机制便是其中的关键一环。本文将深入探讨MQ如何设计延时消息和定时消息的优化机制,特别是长轮询机制的实现原理及其在Java中的模拟实现。

一、MQ基础概念与业务场景

1.1 MQ基础概念

MQ(Message Queue)即消息队列,是一种应用程序对应用程序的通信方法。它通过在发送方和接收方之间引入一个中间层,实现异步、解耦的消息传递。常见的MQ产品有ActiveMQ、RabbitMQ、Kafka、RocketMQ等。

1.2 业务场景

延时消息

延时消息指的是消息在发送到MQ后,并不会立即被消费者消费,而是等待一段指定的时间后才被投递给消费者。这种机制广泛应用于以下场景:

  • 订单超时处理:用户下单后,如果长时间未支付,系统自动取消订单。
  • 短信验证码:用户注册或登录时,发送验证码短信,验证码在一定时间内有效。
  • 任务调度:在指定时间后执行某项任务,如定时清理日志、备份数据等。
定时消息

定时消息与延时消息类似,但更加灵活。它允许用户指定消息在将来的某个具体时间点被投递给消费者。定时消息适用于以下场景:

  • 定时通知:在指定时间点发送通知消息,如每日工作报告、定时提醒等。
  • 周期性任务:按照固定的时间间隔执行任务,如每小时数据汇总、每日系统维护等。

二、MQ长轮询机制原理

2.1 轮询与长轮询

轮询

轮询是一种客户端与服务器之间实时通信的技术手段。客户端定期发送请求来查询服务器是否有新数据或事件,并将响应返回给客户端。轮询的优点是简单易实现,适用于各种浏览器和服务器。然而,轮询也存在明显的缺点:会产生大量的无效请求,浪费带宽和服务器资源,产生不必要的网络流量和延迟。

长轮询

长轮询是对轮询的一种改进。在长轮询中,客户端发送一个HTTP请求给服务器,并保持连接打开。如果服务器没有新数据,则不会立即返回响应,而是将请求挂起,直到有新数据到达或超时。这种方式显著减少了无效的网络请求,提高了数据更新的实时性。

2.2 长轮询机制在MQ中的应用

在MQ系统中,长轮询机制主要用于优化消费者拉取消息的过程。传统的轮询方式下,消费者需要定期向Broker发送拉取请求,即使Broker没有新消息也会返回空响应。这种方式会导致大量的无效请求和资源浪费。而长轮询机制则允许消费者在没有新消息时保持连接挂起状态,直到有新消息到达或超时后再返回响应。这样,消费者可以实时地获取新消息,同时减少了无效请求和资源浪费。

三、RocketMQ长轮询机制源码分析

3.1 RocketMQ概述

RocketMQ是一款分布式消息中间件,由阿里巴巴开源。它支持高吞吐、低延迟的消息传递,并提供了丰富的消息过滤、顺序消息、事务消息等高级功能。RocketMQ中的消费者拉取消息时,就采用了长轮询机制来优化性能。

3.2 PullMessageService组件

在RocketMQ中,PullMessageService组件负责处理消费者的拉取请求。它是一个后台线程服务,会不断地从pullRequestQueue中取出PullRequest对象,并向Broker发送拉取请求。

java复制代码
public void run() {
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);} catch (InterruptedException ignored) {} catch (Exception e) {log.error("Pull Message Service Run Method exception", e);}}
}

3.3 PullRequest对象

PullRequest对象表示一个拉取请求,它包含了消费者的消息队列、拉取偏移量、挂起时间等信息。当PullMessageServicepullRequestQueue中取出PullRequest对象后,会调用pullMessage方法向Broker发送拉取请求。

java复制代码
public void pullMessage(final PullRequest pullRequest) {
// ... 省略部分代码 ...
try {
this.executePullRequestImmediately(pullRequest);} catch (Exception e) {
// ... 省略异常处理代码 ...}
}

3.4 长轮询实现细节

executePullRequestImmediately方法中,RocketMQ会根据是否启用长轮询机制来决定拉取策略。如果启用了长轮询(longPollingEnable=true),则会根据消费者设置的挂起超时时间(brokerSuspendMaxTimeMillis)来决定重试时间。

java复制代码
private void executePullRequestImmediately(final PullRequest pullRequest) {
// ... 省略部分代码 ...
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
// 长轮询逻辑
final long beginLockTimestamp = System.currentTimeMillis();
// ... 省略加锁和超时处理代码 ...
this.pullMessage(pullRequest);
// ... 省略部分代码 ...} else {
// 短轮询逻辑
// ... 省略短轮询处理代码 ...}
}

在长轮询逻辑中,RocketMQ会调用pullMessage方法向Broker发送拉取请求。如果Broker没有新消息,则会将请求挂起一段时间(默认为5秒),直到有新消息到达或超时后再返回响应。

3.5 PullRequestHoldService与ReputMessageService

RocketMQ中的长轮询机制由PullRequestHoldServiceReputMessageService两个线程共同实现。

  • PullRequestHoldService:每隔一定时间(默认为5秒)检查pullRequestTable中的挂起请求,如果有新消息到达则触发拉取操作,否则继续挂起。
  • ReputMessageService:负责处理消息存储中的新消息到达事件。每当有新消息到达时,它会调用PullRequestHoldService中的相关方法尝试拉取消息。

这两个线程的协作确保了消费者在没有新消息时不会频繁发送拉取请求,从而减少了无效请求和资源浪费。

四、Java模拟实现长轮询功能

4.1 模拟场景

为了演示长轮询机制的实现原理,我们可以模拟一个简单的场景:客户端向服务器订阅某个频道的消息,服务器在有新消息到达时推送给客户端。客户端使用长轮询机制来保持与服务器的连接并实时获取新消息。

4.2 服务器端实现

服务器端使用Spring Boot框架来创建一个简单的Web服务,并使用DeferredResult来实现长轮询功能。

java复制代码
@RestController
@RequestMapping("/im")
public class IMController {
private final ConcurrentHashMap<String, DeferredResult<String>> clientMap = new ConcurrentHashMap<>();
private final List<String> messageQueue = new CopyOnWriteArrayList<>();
@GetMapping("/subscribe")
public DeferredResult<String> subscribe(@RequestParam String channel) {DeferredResult<String> deferredResult = new DeferredResult<>(10000L); // 设置超时时间为10秒clientMap.put(channel, deferredResult);
return deferredResult;}
@PostMapping("/send")
public String send(@RequestParam String channel, @RequestParam String message) {messageQueue.add(channel + ":" + message);notifyClients(channel);
return "Message sent";}
private void notifyClients(String channel) {DeferredResult<String> deferredResult = clientMap.get(channel);
if (deferredResult != null) {
String message = messageQueue.poll();
if (message != null) {deferredResult.setResult(message);clientMap.remove(channel);} else {
// 如果没有新消息,则重新放入队列等待下一次检查clientMap.put(channel, deferredResult);}}}
}

在上面的代码中,subscribe方法用于处理客户端的订阅请求,并返回一个DeferredResult对象。该对象会在有新消息到达时被设置结果并返回给客户端。send方法用于处理消息发送请求,并将消息添加到消息队列中。notifyClients方法负责检查消息队列并通知等待中的客户端。

4.3 客户端实现

客户端使用JavaScript的fetch API来发送长轮询请求。

javascript复制代码
function subscribe(channel) {
fetch(`/im/subscribe?channel=${channel}`).then(response => {
if (!response.ok) {
throw new Error('Network response was not ok');}
return response.text();}).then(message => {
console.log(`Received message: ${message}`);
// 收到消息后再次发起订阅请求以保持长轮询
setTimeout(() => subscribe(channel), 1000);}).catch(error => {
console.error('There was a problem with the fetch operation:', error);
// 请求失败或超时后重新发起订阅请求
setTimeout(() => subscribe(channel), 5000);});
}
// 示例:订阅"testChannel"频道
subscribe('testChannel');

在上面的代码中,subscribe函数用于发送订阅请求并保持长轮询连接。当收到服务器返回的消息时,会打印消息内容并再次发起订阅请求以保持连接。如果请求失败或超时,则会在一段时间后重新发起订阅请求。

五、总结与展望

本文深入探讨了MQ系统中长轮询机制的原理及其在RocketMQ中的实现细节。通过源码分析和Java模拟实现,我们了解了长轮询机制如何优化消费者拉取消息的过程,减少无效请求和资源浪费。未来,随着分布式系统的不断发展和消息中间件的不断演进,长轮询机制将继续发挥其重要作用,为消息传递提供更加高效、可靠的解决方案。

同时,我们也应该看到长轮询机制并不是万能的。在实际应用中,我们需要根据具体的业务场景和需求来选择合适的消息传递模式和优化策略。例如,在对于实时性要求极高的场景下,我们可以考虑使用WebSocket等更高级的技术来实现全双工通信。而在对于消息顺序和一致性要求较高的场景下,则需要结合其他机制(如分布式事务、消息重试等)来确保消息的可靠传递。

总之,MQ系统中的长轮询机制是一种重要的优化手段,它能够帮助我们更好地实现消息的异步传递和实时更新。在未来的发展中,我们将继续探索和优化这一机制,为分布式系统的消息传递提供更加高效、可靠的解决方案。

相关文章:

面试题---深入源码理解MQ长轮询优化机制

引言 在分布式系统中&#xff0c;消息队列&#xff08;MQ&#xff09;作为一种重要的中间件&#xff0c;广泛应用于解耦、异步处理、流量削峰等场景。其中&#xff0c;延时消息和定时消息作为MQ的高级功能&#xff0c;能够进一步满足复杂的业务需求。为了实现这些功能&#xf…...

stable diffusion生成模型

1、stable diffusion Stable Diffusion 是一种扩散模型,基于对图像的逐步去噪过程训练和生成。它的核心包括以下几个步骤: 扩散过程(Diffusion Process)在训练时,向真实图像逐步添加噪声,最终将其变为纯随机噪声。这是一个正向过程,目的是学习如何将复杂的图像分解成随…...

分治法的魅力:高效解决复杂问题的利器

文章目录 分治法 (Divide and Conquer) 综合解析一、基本原理二、应用场景及详细分析1. 排序算法快速排序 (Quicksort)归并排序 (Mergesort) 2. 大整数运算大整数乘法 3. 几何问题最近点对问题 4. 字符串匹配KMP算法的优化版 三、优点四、局限性五、分治法与动态规划的对比六、…...

Spring IOC实战指南:从零到一的构建过程

Spring 优点&#xff1a; 方便解耦&#xff0c;简化开发。将所有对象创建和依赖关系维护交给 Spring 管理(IOC 的作用)AOP 切面编程的支持。方便的实现对程序进行权限的拦截、运行监控等功能(可扩展性)声明式事务的支持。只需通过配置就可以完成对事务的管理&#xff0c;无需手…...

3.langchain中的prompt模板 (few shot examples in chat models)

本教程将介绍如何使用LangChain库和智谱清言的 GLM-4-Plus 模型来理解和推理一个自定义的运算符&#xff08;例如使用鹦鹉表情符号&#x1f99c;&#xff09;。我们将通过一系列示例来训练模型&#xff0c;使其能够理解和推断该运算符的含义。 环境准备 首先&#xff0c;确保…...

量子感知机

神经网络类似于人类大脑&#xff0c;是模拟生物神经网络进行信息处理的一种数学模型。它能解决分类、回归等问题&#xff0c;是机器学习的重要组成部分。量子神经网络是将量子理论与神经网络相结合而产生的一种新型计算模式。1995年美国路易斯安那州立大学KAK教授首次提出了量子…...

VM虚拟机装MAC后无法联网,如何解决?

✨在vm虚拟机上&#xff0c;给虚拟机MacOS设置网络适配器。选择NAT模式用于共享主机的IP地址 ✨在MacOS设置中设置网络 以太网 使用DHCP ✨回到本地电脑上&#xff0c;打开 服务&#xff0c;找到VMware DHCP和VMware NAT&#xff0c;把这两个服务打开&#xff0c;专一般问题就…...

IDEA 基本设置

设置主题 设置字体 设置编码格式 改变字体大小 开启 按住 ctrl 滚轮 改变字体大小。 开启自动编译...

Chrome 浏览器 131 版本新特性

Chrome 浏览器 131 版本新特性 一、Chrome 浏览器 131 版本更新 1. 在 iOS 上使用 Google Lens 搜索 自 Chrome 126 版本以来&#xff0c;用户可以通过 Google Lens 搜索屏幕上看到的任何图片或文字。 要使用此功能&#xff0c;请访问网站&#xff0c;并点击聚焦时出现在地…...

使用php和Xunsearch提升音乐网站的歌曲搜索效果

文章精选推荐 1 JetBrains Ai assistant 编程工具让你的工作效率翻倍 2 Extra Icons&#xff1a;JetBrains IDE的图标增强神器 3 IDEA插件推荐-SequenceDiagram&#xff0c;自动生成时序图 4 BashSupport Pro 这个ides插件主要是用来干嘛的 &#xff1f; 5 IDEA必装的插件&…...

计算机毕设-基于springboot的高校网上缴费综合务系统视频的设计与实现(附源码+lw+ppt+开题报告)

博主介绍&#xff1a;✌多个项目实战经验、多个大型网购商城开发经验、在某机构指导学员上千名、专注于本行业领域✌ 技术范围&#xff1a;Java实战项目、Python实战项目、微信小程序/安卓实战项目、爬虫大数据实战项目、Nodejs实战项目、PHP实战项目、.NET实战项目、Golang实战…...

STL关联式容器之map

map的特性是&#xff0c;所有元素都会根据元素的键值自动被排序。map的所有元素都是pair&#xff0c;同时拥有实值(value)和键值(key)。pair的第一元素被视为键值&#xff0c;第二元素被视为实值。map不允许两个元素拥有相同的键值。下面是<stl_pair.h>中pair的定义 tem…...

【HarmonyOS】鸿蒙应用唤起系统相机拍照

【HarmonyOS】鸿蒙应用唤起系统相机拍照 方案一&#xff1a; 官方推荐的方式&#xff0c;使用CameraPicker来调用安全相机进行拍照。 let pathDir getContext().filesDir;let fileName ${new Date().getTime()}let filePath pathDir /${fileName}.tmpfileIo.createRandomA…...

Linux系统使用valgrind分析C++程序内存资源使用情况

内存占用是我们开发的时候需要重点关注的一个问题&#xff0c;我们可以人工根据代码推理出一个消耗内存较大的函数&#xff0c;也可以推理出大概会消耗多少内存&#xff0c;但是这种方法不仅麻烦&#xff0c;而且得到的只是推理的数据&#xff0c;而不是实际的数据。 我们可以…...

Java基础夯实——2.7 线程上下文切换

线程上下文切换&#xff08;Thread Context Switching&#xff09;是操作系统在多线程环境中&#xff0c;切换CPU从执行一个线程的上下文到另一个线程的上下文的过程。这种切换是实现多线程并发执行的核心机制之一。 1 上下文: 线程的上下文指线程在某一时刻的执行状态,如&am…...

死锁相关习题 10道 附详解

2022 设系统中有三种类型的资源(A,B,C)和五个进程(P1,P2,P3,P4,P5)&#xff0c;A资源的数量是17&#xff0c;B资源的数量是6&#xff0c;C资源的数量是19。在T0时刻系统的状态&#xff1a; 最大资源需求量已分配资源量A&#xff0c;B&#xff0c;CA&#xff0c;B&#xff0c;…...

VisionPro 机器视觉案例 之 彩色保险丝个数统计

第十四篇 机器视觉案例 之 彩色保险丝颜色识别个数统计 文章目录 第十四篇 机器视觉案例 之 彩色保险丝颜色识别个数统计1.案例要求2.实现思路2.1 方法一 颜色分离工具CogColorSegmenterTool将每一种颜色分离出来&#xff0c;得到对应的单独图像&#xff0c;使用斑点工具CogBlo…...

go-zero(七) RPC服务和ETCD

go-zero 实现 RPC 服务 在实际的开发中&#xff0c;我们是通过RPC来传递数据的&#xff0c;下面我将通过一个简单的示例&#xff0c;说明如何使用go-zero框架和 Protocol Buffers 定义 RPC 服务。 一、生成 RPC项目 在这个教程中&#xff0c;我们根据user.api文件&#xff0…...

Jenkins + gitee 自动触发项目拉取部署(Webhook配置)

目录 前言 Generic Webhook Trigger 插件 下载插件 ​编辑 配置WebHook 生成tocken 总结 前言 前文简单介绍了Jenkins环境搭建&#xff0c;本文主要来介绍一下如何使用 WebHook 触发自动拉取构建项目&#xff1b; Generic Webhook Trigger 插件 实现代码推送后&#xff0c;触…...

043 商品详情

文章目录 详情页数据表结构voSkuItemVo.javaSkuItemSaleAttrVo.javaAttrValueAndSkuIdVo.javaSpuAttrGroupVo.javaGroupAttrParamVo.java pom.xmlSkuSaleAttrValueDao.xmlSkuSaleAttrValueDao.javaAttrGroupDao.xmlAttrGroupServiceImpl.javaSkuInfoServiceImpl.javaSkuSaleAtt…...

SpringBoot-17-MyBatis动态SQL标签之常用标签

文章目录 1 代码1.1 实体User.java1.2 接口UserMapper.java1.3 映射UserMapper.xml1.3.1 标签if1.3.2 标签if和where1.3.3 标签choose和when和otherwise1.4 UserController.java2 常用动态SQL标签2.1 标签set2.1.1 UserMapper.java2.1.2 UserMapper.xml2.1.3 UserController.ja…...

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility 1. 实验室环境1.1 实验室环境1.2 小测试 2. The Endor System2.1 部署应用2.2 检查现有策略 3. Cilium 策略实体3.1 创建 allow-all 网络策略3.2 在 Hubble CLI 中验证网络策略源3.3 …...

数据链路层的主要功能是什么

数据链路层&#xff08;OSI模型第2层&#xff09;的核心功能是在相邻网络节点&#xff08;如交换机、主机&#xff09;间提供可靠的数据帧传输服务&#xff0c;主要职责包括&#xff1a; &#x1f511; 核心功能详解&#xff1a; 帧封装与解封装 封装&#xff1a; 将网络层下发…...

Android15默认授权浮窗权限

我们经常有那种需求&#xff0c;客户需要定制的apk集成在ROM中&#xff0c;并且默认授予其【显示在其他应用的上层】权限&#xff0c;也就是我们常说的浮窗权限&#xff0c;那么我们就可以通过以下方法在wms、ams等系统服务的systemReady()方法中调用即可实现预置应用默认授权浮…...

Fabric V2.5 通用溯源系统——增加图片上传与下载功能

fabric-trace项目在发布一年后,部署量已突破1000次,为支持更多场景,现新增支持图片信息上链,本文对图片上传、下载功能代码进行梳理,包含智能合约、后端、前端部分。 一、智能合约修改 为了增加图片信息上链溯源,需要对底层数据结构进行修改,在此对智能合约中的农产品数…...

视觉slam十四讲实践部分记录——ch2、ch3

ch2 一、使用g++编译.cpp为可执行文件并运行(P30) g++ helloSLAM.cpp ./a.out运行 二、使用cmake编译 mkdir build cd build cmake .. makeCMakeCache.txt 文件仍然指向旧的目录。这表明在源代码目录中可能还存在旧的 CMakeCache.txt 文件,或者在构建过程中仍然引用了旧的路…...

Python 训练营打卡 Day 47

注意力热力图可视化 在day 46代码的基础上&#xff0c;对比不同卷积层热力图可视化的结果 import torch import torch.nn as nn import torch.optim as optim from torchvision import datasets, transforms from torch.utils.data import DataLoader import matplotlib.pypl…...

Vue 模板语句的数据来源

&#x1f9e9; Vue 模板语句的数据来源&#xff1a;全方位解析 Vue 模板&#xff08;<template> 部分&#xff09;中的表达式、指令绑定&#xff08;如 v-bind, v-on&#xff09;和插值&#xff08;{{ }}&#xff09;都在一个特定的作用域内求值。这个作用域由当前 组件…...

ubuntu22.04 安装docker 和docker-compose

首先你要确保没有docker环境或者使用命令删掉docker sudo apt-get remove docker docker-engine docker.io containerd runc安装docker 更新软件环境 sudo apt update sudo apt upgrade下载docker依赖和GPG 密钥 # 依赖 apt-get install ca-certificates curl gnupg lsb-rel…...

react菜单,动态绑定点击事件,菜单分离出去单独的js文件,Ant框架

1、菜单文件treeTop.js // 顶部菜单 import { AppstoreOutlined, SettingOutlined } from ant-design/icons; // 定义菜单项数据 const treeTop [{label: Docker管理,key: 1,icon: <AppstoreOutlined />,url:"/docker/index"},{label: 权限管理,key: 2,icon:…...