当前位置: 首页 > news >正文

【RocketMQ系列十四】RocketMQ中消息堆积如何处理

您好,我是码农飞哥(wei158556),感谢您阅读本文,欢迎一键三连哦
💪🏻 1. Python基础专栏,基础知识一网打尽,9.9元买不了吃亏,买不了上当。 Python从入门到精通
😁 2. 毕业设计专栏,毕业季咱们不慌忙,几百款毕业设计等你选。
❤️ 3. Python爬虫专栏,系统性的学习爬虫的知识点。9.9元买不了吃亏,买不了上当 。python爬虫入门进阶
❤️ 4. Ceph实战,从原理到实战应有尽有。 Ceph实战
❤️ 5. Java高并发编程入门,打卡学习Java高并发。 Java高并发编程入门

文章目录

    • 1. 消息堆积
    • 2. 消息堆积出现的原因
    • 3. 如何解决消息堆积

1. 消息堆积

消息堆积顾名思义就是消息队列中堆积了大量未被处理的消息,主要发生在高并发的场景下,生产者发送消息的速率远大于消费者组消息的速度。在物联网的AIOT场景中比较常见。

在RocketMQ的Console上可以查看某个Topic上消息堆积的情况。

消息堆积

这里有个延迟就表示目前堆积的消息数。

2. 消息堆积出现的原因

消息堆积的本质原因还是消费者消费消息的速度赶不上生产者发送消息的速度。可能的情况有:

  1. 第一种情况: 新上线的消费者的消费逻辑存在Bug,导致消息不能被正常消费。这种场景主要存在于代码逻辑不严谨导致某些消息消费失败,或者消费超时,从而导致消息被大量堆积。

  2. 第二种情况:消费者实例宕机或者由于网络的原因不能连上Broker集群。这种情况主要是消费者实例可能是单节点或者机房网络不好的情况。

  3. 第三种情况:生产者短时间内大量发送消息到Broker端,消费者的消费能力不足。消费者消费消息往往是一些比较耗时的IO操作,比如操作数据库,调用其他服务。这导致消费者的消费速率远低于生产者发送速率。这种情况也是消息堆积的常见场景。

3. 如何解决消息堆积

  1. 解决第一种情况:对需要上线的消费者进行严格的测试,确保每种消息的场景都能覆盖到。另外,在上线的时候采用灰度发布,先灰度小范围的用户进行使用,确认没有问题了,在全量放开所有用户使用。

  2. 解决第二种情况:在上线消费者实例时需要,采用多实例,异地多活的方式,确保极端的情况下都能有消费者能够正常消费消息。

  3. 解决第三种情况:这种情况的解决本质上是如何提高消费者的消费速率。主要可以从如下方面解决:

    1. 同一个消费者组下,增加消费者实例。比如Topic中有8个队列,那么可以将消费者数量最多增加到8个。那么有同学会问为啥只增加到8个,我增加到9个,乃至10个行不行?答案是你可以增加10个消费者,但是多余的2个消费者是分不到Queue的。这是因为 在RocketMQ中某个topic下的某个队列只能被同一消费者组中的某个消费者消费。 如果消费者数量少于Queue的数量,那么有可能会出现消费不均的情况。

    2. 提高单个消费者的消费并行线程。RocketMQ 支持批量消费消息,可以通过修改DefaultMQPushConsumer 消费者类的consumeThreadMin(最少消费线程数),以及consumeThreadMax(最大消费线程数)来提高单个消费者的消费能力。

    3. 批量消费消息:

      某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量。建议使用5.x SDK的SimpleConsumer,每次接口调用设置批次大小,一次性拉取消费多条消息。

    下面就让我们来看个例子:

    生产者:使用的是DefaultMQProducer;

    	//4.创建消息StopWatch stopWatch = new StopWatch();stopWatch.start();for (int i = 0; i < 20000; i++) {// 创建消息,指定topic,以及消息体Message message = new Message("heap_topic", ("消息堆积测试" + i).getBytes());//5.发送消息SendResult send = defaultMQProducer.send(message);System.out.println(send);}stopWatch.stop();System.out.println("生产者发送2万条消息用时="+stopWatch.getTotalTimeSeconds()+"秒");
    

    消费者:使用的是DefaultMQPushConsumer;

    	// 4.创建一个回调函数consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {System.out.println("本批次收到的消息数="+msgs.size());// 5.处理消息for (MessageExt msg : msgs) {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("当前时间="+System.currentTimeMillis()+" 收到的消息内容:" + new String(msg.getBody()));}// 返回消费成功的对象return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});
    

    生产者329秒内发送了2万条消息,平均60条,

    image-20231014152350841

    而消费者消费一条消息需要一秒,所以生产者发送完消息之后,两个消费者还在消费。

    image-20231014144541572

image-20231014152042570

这里消费者使用的是DefaultMQPushConsumer消费者 每批次Broker端会向消费者推送32条消息,通过pullBatchSize字段设置,而消费者,每次消费1条消息,通过consumeMessageBatchMaxSize字段设置。

image-20231014153721132

当然,官方推荐使用SimpleConsumer进行批量消费消息。

	//每批次拉取16条消息int maxMessageNum = 16;// Set message invisible duration after it is received.Duration invisibleDuration = Duration.ofSeconds(15);// Receive message, multi-threading is more recommended.do {final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);log.info("Received {} message(s)", messages.size());for (MessageView message : messages) {final MessageId messageId = message.getMessageId();try {consumer.ack(message);log.info("Message is acknowledged successfully, messageId={}", messageId);} catch (Throwable t) {log.error("Message is failed to be acknowledged, messageId={}", messageId, t);}}} while (true);

官方的代码示例

相关文章:

【RocketMQ系列十四】RocketMQ中消息堆积如何处理

您好&#xff0c;我是码农飞哥&#xff08;wei158556&#xff09;&#xff0c;感谢您阅读本文&#xff0c;欢迎一键三连哦。 &#x1f4aa;&#x1f3fb; 1. Python基础专栏&#xff0c;基础知识一网打尽&#xff0c;9.9元买不了吃亏&#xff0c;买不了上当。 Python从入门到精…...

Ubuntu - sudo apt update 报错源问题解决方案

sudo apt update 报错…lease’ does not have a Release file. 反正就是觉得是网络的问题 尝试添加国内清华源、阿里源 不行 尝试DNS 为8.8.8.8&#xff0c;114.114.114.114 还是不行 解决方案&#xff1a;设置里面让 Ubuntu 找到适合自己的源 1、Settings -> About…...

MSQL系列(八) Mysql实战-SQL存储引擎

Mysql实战-SQL存储引擎 前面我们讲解了索引的存储结构&#xff0c;BTree的索引结构&#xff0c;我们一般都知道Mysql的存储引擎有两种&#xff0c;MyISAM和InnoDB,今天我们来详细讲解下Mysql的存储引擎 文章目录 Mysql实战-SQL存储引擎1.存储引擎2.MyISAM的特点3. InnoDB的特…...

vue3 + fastapi 实现选择目录所有文件自定义上传到服务器

文章目录 ⭐前言&#x1f496; 技术栈选择 ⭐前端页面搭建&#x1f496; 调整请求content-type传递formData ⭐后端接口实现&#x1f496; swagger文档测试接口 ⭐前后端实现效果&#x1f496; 上传单个文件&#x1f496; 上传目录文件 ⭐总结⭐结束 ⭐前言 大家好&#xff0c…...

html 常见兼容性问题

目录 前言: 用法: 代码: 1. 盒模型差异: 2. 表格布局问题: 3. 浏览器前缀问题: 4. 字体渲染问题: 理解: 讨论: 前言: 在Web开发中&#xff0c;兼容性问题是常见的挑战之一。不同的浏览器和设备可能以不同的方式解释和呈现HTML&#xff0c;导致网页在某些环境下出现问题…...

PCL 点云投影到圆柱(C++详细过程版)

目录 一、算法原理1、圆柱方程2、投影原理二、代码实现三、结果展示1、原始点云2、投影结果四、参考链接一、算法原理 1、圆柱方程 圆柱方程可以表示为: ( x − x...

以太网链路聚合与交换机堆叠,集群

目录 以太网链路聚合 一.链路聚合的基本概念 二.链路聚合的配置 1.手工模式 2.LACP模式 系统优先级 接口优先级 最大活动接口数 活动链路选举 负载分担 负载分担模式 三.典型使用场景 交换机之间 交换机和服务器之间 交换机和堆叠系统 防火墙双机热备心跳线 四…...

5G RedCap工业智能网关

5G RedCap工业智能网关是当前工业智能化发展领域的重要技术之一。随着物联网和工业互联网的迅速发展&#xff0c;企业对于实时数据传输和高速通信需求越来越迫切。在这种背景下&#xff0c;5G RedCap工业智能网关以其卓越的性能和功能&#xff0c;成为众多企业的首选。 5G RedC…...

STM32-ADC实验

AD转换包括采样阶段和转换阶段。在采样阶段才对通道数据进行&#xff1b;在转换阶段只是将采集的数据进行转换为数字量输出&#xff0c;此刻通道数据变化不会改变转换结果。 实验1&#xff1a;单ADC单通道中断 硬件原理图 由于PC1接到电位器上&#xff0c;所以我们实验选择PC1…...

05、Python -- 爬取ts文件格式视频思路

目录 第一步&#xff1a;爬取一段5秒视频找url代码结果 第二步&#xff1a;下载整个视频的所有片段代码&#xff1a;结果&#xff1a; 第三步&#xff1a;合成视频安装模块代码&#xff1a;结果 简洁代码代码&#xff1a;结果&#xff1a; 最终代码简洁前代码简洁后代码 思路&a…...

【QT】其他常用控件2

新建项目 lineEdit 什么都不显示&#xff08;linux password&#xff09; password textEdit和plainTextEdit spinBox和doubleSpinBox timeEdit、dateEdit、dateTimeEdit label 显示图案&#xff0c;导入资源&#xff1a;【QT】资源文件导入_复制其他项目中的文件到qt项目中_St…...

django报错--Not Found The requested URL was not found on the server.

这个问题通常是由于服务器配置或代码错误导致的。以下是解决这个问题的一些建议和步骤&#xff1a; 首先&#xff0c;请确保你的URL拼写正确。确认URL中的路径和文件名都是正确的&#xff0c;并且没有任何拼写错误。如果你是从浏览器中复制粘贴URL&#xff0c;请确保没有任何额…...

VLOOKUP函数的使用方法

VLOOKUP是一个查找函数&#xff0c;给定一个查找的目标&#xff0c;它就能从指定的查找区域中查找返回想要查找到的值。它的基本语法为&#xff1a; VLOOKUP&#xff08;查找目标&#xff0c;查找范围&#xff0c;返回值的列数&#xff0c;精确OR模糊查找)下面以一个实例来介绍…...

关于前端如何下载后端接口返回content-type为application/octet-stream的文件

关于前端如何下载后端接口返回response-type为application/octet-stream的文件 问题描述 后端接口定义为直接返回一个文件&#xff0c;如果带认证信息可以直接通过浏览器url下载&#xff0c;但是接口需要传headers认证信息&#xff0c;url上又不支持传相关信息 解决 前端…...

报错:SSL routines:ssl3_get_record:wrong version number

一、问题描述 前后端联调的时候&#xff0c;连接后端本地服务器&#xff0c;接口一直pending调不通&#xff0c;控制台还报以下错误&#xff1a; 立马随手搜索了一下解决方案&#xff0c;但是emmm&#xff0c;不符合前端的实际情况&#xff1a; 二、解决方法&#xff1a; 实际…...

Flask后端开发(一)-基础知识和前期准备

目录 1.背景介绍1.1. 项目背景1.2. 项目难点1.3. 项目环境 2. flask后端开发实现的功能3. flask部署和前后端对接3.1. flask运行配置和服务器部署3.2. flask前后端传参 4. 后端测试工具4.1. 工具介绍4.2. 工具使用 后记 1.背景介绍 1.1. 项目背景 就是前几个月临时接手了一个…...

基于SSM的幼儿园管理系统

基于SSM的幼儿园管理系统的设计与实现~ 开发语言&#xff1a;Java数据库&#xff1a;MySQL技术&#xff1a;SpringSpringMVCMyBatis工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 登录界面 管理员界面 摘要 基于SSM&#xff08;Spring、Spring MVC、MyBatis&#…...

互联网Java工程师面试题·Spring篇·第三弹

目录 ​编辑 4、注解 4.1、什么是基于注解的容器配置 4.2、如何在 spring 中启动注解装配&#xff1f; 4.3、Component, Controller, Repository,Service 有何区别&#xff1f; 4.4、Required 注解有什么用&#xff1f; 4.5、Autowired 注解有什么用&#xff1f; 4.6、…...

前端(二十三)——轮询和长轮询

&#x1f62b;博主&#xff1a;小猫娃来啦 &#x1f62b;文章核心&#xff1a;实现客户端与服务器实时通信的技术手段 文章目录 前言轮询技术轮询的概念轮询的实现原理轮询的优缺点轮询的使用场景 长轮询技术长轮询的概念长轮询的实现原理长轮询的优缺点长轮询的使用场景 轮询与…...

uniapp把文件中的内复制到另一个文件中

使用的是Html 5的plus.io.resolveLocalFileSystemURL方法&#xff0c;文档&#xff1a;HTML5 API Reference var soursePath file:///storage/emulated/0/a/;//用于读取var removePath file:///storage/emulated/0/w/;//用于移除w这个文件夹var targetPath file:///storage/…...

【Axure高保真原型】引导弹窗

今天和大家中分享引导弹窗的原型模板&#xff0c;载入页面后&#xff0c;会显示引导弹窗&#xff0c;适用于引导用户使用页面&#xff0c;点击完成后&#xff0c;会显示下一个引导弹窗&#xff0c;直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…...

Python实现prophet 理论及参数优化

文章目录 Prophet理论及模型参数介绍Python代码完整实现prophet 添加外部数据进行模型优化 之前初步学习prophet的时候&#xff0c;写过一篇简单实现&#xff0c;后期随着对该模型的深入研究&#xff0c;本次记录涉及到prophet 的公式以及参数调优&#xff0c;从公式可以更直观…...

使用Matplotlib创建炫酷的3D散点图:数据可视化的新维度

文章目录 基础实现代码代码解析进阶技巧1. 自定义点的大小和颜色2. 添加图例和样式美化3. 真实数据应用示例实用技巧与注意事项完整示例(带样式)应用场景在数据科学和可视化领域,三维图形能为我们提供更丰富的数据洞察。本文将手把手教你如何使用Python的Matplotlib库创建引…...

Java毕业设计:WML信息查询与后端信息发布系统开发

JAVAWML信息查询与后端信息发布系统实现 一、系统概述 本系统基于Java和WML(无线标记语言)技术开发&#xff0c;实现了移动设备上的信息查询与后端信息发布功能。系统采用B/S架构&#xff0c;服务器端使用Java Servlet处理请求&#xff0c;数据库采用MySQL存储信息&#xff0…...

RabbitMQ入门4.1.0版本(基于java、SpringBoot操作)

RabbitMQ 一、RabbitMQ概述 RabbitMQ RabbitMQ最初由LShift和CohesiveFT于2007年开发&#xff0c;后来由Pivotal Software Inc.&#xff08;现为VMware子公司&#xff09;接管。RabbitMQ 是一个开源的消息代理和队列服务器&#xff0c;用 Erlang 语言编写。广泛应用于各种分布…...

django blank 与 null的区别

1.blank blank控制表单验证时是否允许字段为空 2.null null控制数据库层面是否为空 但是&#xff0c;要注意以下几点&#xff1a; Django的表单验证与null无关&#xff1a;null参数控制的是数据库层面字段是否可以为NULL&#xff0c;而blank参数控制的是Django表单验证时字…...

【p2p、分布式,区块链笔记 MESH】Bluetooth蓝牙通信 BLE Mesh协议的拓扑结构 定向转发机制

目录 节点的功能承载层&#xff08;GATT/Adv&#xff09;局限性&#xff1a; 拓扑关系定向转发机制定向转发意义 CG 节点的功能 节点的功能由节点支持的特性和功能决定。所有节点都能够发送和接收网格消息。节点还可以选择支持一个或多个附加功能&#xff0c;如 Configuration …...

【FTP】ftp文件传输会丢包吗?批量几百个文件传输,有一些文件没有传输完整,如何解决?

FTP&#xff08;File Transfer Protocol&#xff09;本身是一个基于 TCP 的协议&#xff0c;理论上不会丢包。但 FTP 文件传输过程中仍可能出现文件不完整、丢失或损坏的情况&#xff0c;主要原因包括&#xff1a; ✅ 一、FTP传输可能“丢包”或文件不完整的原因 原因描述网络…...

绕过 Xcode?使用 Appuploader和主流工具实现 iOS 上架自动化

iOS 应用的发布流程一直是开发链路中最“苹果味”的环节&#xff1a;强依赖 Xcode、必须使用 macOS、各种证书和描述文件配置……对很多跨平台开发者来说&#xff0c;这一套流程并不友好。 特别是当你的项目主要在 Windows 或 Linux 下开发&#xff08;例如 Flutter、React Na…...

《信号与系统》第 6 章 信号与系统的时域和频域特性

目录 6.0 引言 6.1 傅里叶变换的模和相位表示 6.2 线性时不变系统频率响应的模和相位表示 6.2.1 线性与非线性相位 6.2.2 群时延 6.2.3 对数模和相位图 6.3 理想频率选择性滤波器的时域特性 6.4 非理想滤波器的时域和频域特性讨论 6.5 一阶与二阶连续时间系统 6.5.1 …...