【RabbitMQ】消息堆积、推拉模式
消息堆积
原因
消息堆积是指在消息队列中,待处理的消息数量超过了消费者处理能力,导致消息在队列中不断堆积的现象。通常有以下几种原因:
消息生产过快:在高流量或者高负载的情况下,生产者以极高的速率发送消息,超过了消费者的处理能力。
消费者处理能力不足:消费者消费消息的速度跟不上消息生产的速度,也会导致消息在队列中积压。可能的原因有:
- 消费端业务逻辑复杂、耗时长。
- 消费端代码性能低。
- 系统资源限制,如CPU、内存、磁盘等也会限制消费者处理消息的速率。
- 异常处理不当,消费者在处理消息时出现异常,导致消息无法被正确处理和确认。
网络问题:因为网络延迟或不稳定,消费者无法及时接收或确认消息,最终导致消息积压。
RabbitMQ服务器配置偏低
解决方案
消息积压可能会导致系统性能下降,影响用户体验,甚至导致系统崩溃。因此,及时发现消息积压并解决对于维护系统稳定性至关重要。
遇到消息积压时,首先要分析消息积压造成的原因,根据原因来调整策略。通常从以下几个方面考虑:
提高消费者效率
- 增加消费者实例数量,比如新增机器。
- 优化业务逻辑,比如使用多线程来处理业务。
- 设置prefetchCount,当一个消费者阻塞时,消息转发到其他未阻塞的消费者。
- 消息发生异常时,设置合适的重试策略,或者转入到死信队列。
限制生产者效率:比如流量控制、限流算法等。
- 流量控制:在消息生产者中实现流量控制逻辑,根据消费者处理能力动态调整发送效率。
- 限流:使用限流工具,为消息发送效率设置一个上限。
- 设置过期时间:如果消息过期未被消费,可以配置死信队列,以避免消息丢失,同时减少对主队列的压力。
资源与配置优化:比如省级RabbitMQ服务器的硬件,调整RabbitMQ的配置参数等。
推拉模式
概述
RabbitMQ支持两种消息传递模式:推模式(push)和拉模式(pull)。
推模式:消息中间件主动将消息推送给消费者(对消息的获取更加实时,适合对数据实时性要求较高的业务,例如实时数据处理:监控系统、报表系统等)。
拉模式:消费者主动从消息中间件拉取消息(消费端可以按照自己的处理速度来消费,避免消息积压,适合需要流量控制,或者需要大量计算资源的任务,拉取模式允许消费者准备好之后再请求消息,避免资源浪费)。
RabbitMQ主要就是基于推模式工作的,例如最开始谈到的几种工作模式,都是基于推模式来进行实现。它的设计核心是让消息队列中的消费者接收到由生产者发送的消息,使用channel.basicConsume方式订阅队列,MQ就会把消息推送到订阅该队列的消费者。如果只想从队列中获取单条消息而不是持续订阅,则可以使用channel.basicGet方式来进行消费消息。
SpringBoot方式
@Configuration
public class PushAndPull {@Bean("pushQueue")public Queue pushQueue() {return QueueBuilder.durable(Constants.PUSH_QUEUE).build();}@Bean("pushExchange")public Exchange pushExchange() {return ExchangeBuilder.directExchange(Constants.PUSH_EXCHANGE).durable(true).build();}@Bean("pushQueueBind")public Binding pushQueueBind(@Qualifier("pushExchange") Exchange exchange,@Qualifier("pushQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("push").noargs();}@Bean("pullQueue")public Queue pullQueue() {return QueueBuilder.durable(Constants.PULL_QUEUE).build();}@Bean("pullExchange")public Exchange pullExchange() {return ExchangeBuilder.directExchange(Constants.PULL_EXCHANGE).durable(true).build();}@Bean("pullQueueBind")public Binding pullQueueBind(@Qualifier("pullExchange") Exchange exchange,@Qualifier("pullQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("pull").noargs();}
}
@Slf4j
@RestController
@RequestMapping("/pushAndPull")
public class PushAndPullController {@Resourceprivate RabbitTemplate rabbitTemplate;// 推模式@RequestMapping("/push")public void push() {this.rabbitTemplate.convertAndSend(Constants.PUSH_EXCHANGE, "push", "hello push");System.out.println("推模式消息发送成功!");}// 拉模式@RequestMapping("/pull")public void pull() {this.rabbitTemplate.convertAndSend(Constants.PULL_EXCHANGE, "pull", "hello pull");System.out.println("拉模式消息发送成功!");}}
@Configuration
@RestController
public class PushAndPullListener {// 推模式@RabbitListener(queues = Constants.PUSH_QUEUE)public void push(String message) {System.out.println("推模式: " + message);}@Resourceprivate RabbitTemplate rabbitTemplate;// 拉模式@RequestMapping("/pullConsumer")public void pull(String message) {Message receive = this.rabbitTemplate.receive(Constants.PULL_QUEUE);System.out.println("拉模式: " + receive);}}
SDK方式
// 推模式生产者
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("113.45.220.15"); // IPconnectionFactory.setPort(5672); // PORTconnectionFactory.setUsername("admin"); // 用户名connectionFactory.setPassword("admin"); // 密码connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机// TODO 创建连接Connection connection = connectionFactory.newConnection();// TODO 获取信道Channel channel = connection.createChannel();// TODO 声明交换机channel.exchangeDeclare(Constants.PUSH_EXCHANGE, BuiltinExchangeType.DIRECT, true);// TODO 声明队列channel.queueDeclare(Constants.PUSH_QUEUE, true, false, false, null);// TODO 绑定交换机和队列channel.queueBind(Constants.PUSH_QUEUE, Constants.PUSH_EXCHANGE, "push");// TODO 发送消息channel.basicPublish(Constants.PUSH_EXCHANGE, "push", null, "推模式".getBytes());System.out.println("推模式发送消息成功!");// TODO 释放资源channel.close();connection.close();}}
// 推模式消费者
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("113.45.220.15"); // IPconnectionFactory.setPort(5672); // PORTconnectionFactory.setUsername("admin"); // 用户名connectionFactory.setPassword("admin"); // 密码connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机// TODO 创建连接Connection connection = connectionFactory.newConnection();// TODO 获取信道Channel channel = connection.createChannel();// TODO 声明交换机channel.exchangeDeclare(Constants.PUSH_EXCHANGE, BuiltinExchangeType.DIRECT, true);// TODO 声明队列channel.queueDeclare(Constants.PUSH_QUEUE, true, false, false, null);// TODO 绑定交换机和队列channel.queueBind(Constants.PUSH_QUEUE, Constants.PUSH_EXCHANGE, "push");// TODO 接收消息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("推模式消费者接收到消息:" + new String(body));}};channel.basicConsume(Constants.PUSH_QUEUE, true, consumer);// TODO 释放资源}}
// 拉模式生产者
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("113.45.220.15"); // IPconnectionFactory.setPort(5672); // PORTconnectionFactory.setUsername("admin"); // 用户名connectionFactory.setPassword("admin"); // 密码connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机// TODO 创建连接Connection connection = connectionFactory.newConnection();// TODO 获取信道Channel channel = connection.createChannel();// TODO 声明交换机channel.exchangeDeclare(Constants.PULL_EXCHANGE, BuiltinExchangeType.DIRECT, true);// TODO 声明队列channel.queueDeclare(Constants.PULL_QUEUE, true, false, false, null);// TODO 绑定交换机和队列channel.queueBind(Constants.PULL_QUEUE, Constants.PULL_EXCHANGE, "pull");// TODO 发送消息channel.basicPublish(Constants.PULL_EXCHANGE, "pull", null, "拉模式".getBytes());System.out.println("拉模式发送消息成功!");// TODO 释放资源channel.close();connection.close();}}
// 拉模式消费者
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("113.45.220.15"); // IPconnectionFactory.setPort(5672); // PORTconnectionFactory.setUsername("admin"); // 用户名connectionFactory.setPassword("admin"); // 密码connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机// TODO 创建连接Connection connection = connectionFactory.newConnection();// TODO 获取信道Channel channel = connection.createChannel();// TODO 声明交换机channel.exchangeDeclare(Constants.PULL_EXCHANGE, BuiltinExchangeType.DIRECT, true);// TODO 声明队列channel.queueDeclare(Constants.PULL_QUEUE, true, false, false, null);// TODO 绑定交换机和队列channel.queueBind(Constants.PULL_QUEUE, Constants.PULL_EXCHANGE, "pull");// TODO 接收消息GetResponse getResponse = channel.basicGet(Constants.PULL_QUEUE, true);if (getResponse != null) {System.out.println("拉模式收到消息:" + new String(getResponse.getBody()));}// TODO 释放资源}}相关文章:
【RabbitMQ】消息堆积、推拉模式
消息堆积 原因 消息堆积是指在消息队列中,待处理的消息数量超过了消费者处理能力,导致消息在队列中不断堆积的现象。通常有以下几种原因: 消息生产过快:在高流量或者高负载的情况下,生产者以极高的速率发送消息&…...
MySQL常用SQL语句(持续更新中)
文章目录 数据库相关表相关索引相关添加索引 编码相关系统变量相关 收录一些经常用到的sql 数据库相关 建数据库 CREATE DATABASE [IF NOT EXISTS] <数据库名> [[DEFAULT] CHARACTER SET <字符集名>] [[DEFAULT] COLLATE <校对规则名>];例如: C…...
【更新】红色文化之红色博物馆数据集(经纬度+地址)
数据简介:红色博物馆作为国家红色文化传承与爱国主义教育的重要基地,遍布全国各地,承载着丰富的革命历史与文化记忆。本数据说明旨在汇总并分析全国范围内具有代表性的红色博物馆的基本信息,包括其地址、特色及教育意义࿰…...
Python项目Flask框架整合Redis
一、在配置文件中创建Redis连接信息 二、 实现Redis配置类 import redis from config.config import REDIS_HOST, REDIS_PORT, REDIS_PASSWD, REDIS_DB, EXPIRE_TIMEclass RedisDb():def __init__(self, REDIS_HOST, REDIS_PORT, REDIS_DB, EXPIRE_TIME, REDIS_PASSWD):# 建立…...
完整网络模型训练(一)
文章目录 一、网络模型的搭建二、网络模型正确性检验三、创建网络函数 一、网络模型的搭建 以CIFAR10数据集作为训练例子 准备数据集: #因为CIFAR10是属于PRL的数据集,所以需要转化成tensor数据集 train_data torchvision.datasets.CIFAR10(root&quo…...
高效便捷,体验不一样的韩语翻译神器
嘿,大家好啊!今天想跟大家聊聊我用过的几款翻译神器,特别是它们在翻译韩语时的那些小感受。作为一个偶尔需要啃啃韩语资料或者跟韩国朋友聊天的普通人,我真心觉得这些翻译工具简直就是我的救星! 一、福昕在线翻译 网址…...
Markdown笔记管理工具Haptic
什么是 Haptic ? Haptic 是一个新的本地优先、注重隐私的开源 Markdown 笔记管理工具。它简约、轻量、高效,旨在提供您所需的一切,而不包含多余的功能。 目前官方提供了 docker 和 Mac 客户端。 Haptic 仍在积极开发中。以下是未来计划的一些…...
网络原理-传输层UDP
上集回顾: 上一篇博客中讲述了应用层如何自定义协议:确定传输信息,确定数据格式 应用层也有一些现成的协议:HTTP协议 这一篇博客中来讲述传输层协议 传输层 socket api都是传输层协议提供的(操作系统内核实现的了…...
C++中,如何使你设计的迭代器被标准算法库所支持。
iterator(读写迭代器) const_iterator(只读迭代器) reverse_iterator(反向读写迭代器) const_reverse_iterator(反向只读迭代器) 以经常介绍的_DList类为例,它的迭代…...
Java NIO 全面详解:掌握 `Path` 和 `Files` 的一切
在 Java 7 中引入的 NIO (New I/O) 为文件系统和流的操作带来了强大的能力,其中 Path 和 Files 是核心部分。Path 作为对文件路径的抽象,提供了灵活的方式处理文件系统中的路径;Files 则通过一系列静态方法,使得文件的读写、复制、…...
bluez免提协议hands-free介绍,全到无法想象,bluez hfp ag介绍
零. 前言 由于Bluez的介绍文档有限,以及对Linux 系统/驱动概念、D-Bus 通信和蓝牙协议都有要求,加上网络上其实没有一个完整的介绍Bluez系列的文档,所以不管是蓝牙初学者还是蓝牙从业人员,都有不小的难度,学习曲线也相对较陡,所以我有了这个想法,专门对Bluez做一个系统…...
关于区块链的安全和隐私
背景 区块链技术在近年来发展迅速,被认为是安全计算的突破,但其安全和隐私问题在不同应用中的部署仍处于争论焦点。 目的 对区块链的安全和隐私进行全面综述,帮助读者深入了解区块链的相关概念、属性、技术和系统。 结构 首先介绍区块链…...
特征工程——一门提高机器学习性能的艺术
当前围绕人工智能(AI)和机器学习(ML)展开的许多讨论以模型为中心,聚焦于 ML和深度学习(DL)的最新进展。这种模型优先的方法往往对用于训练这些模型的数据关注不足,甚至完全忽视。类似MLOps的领域正迅速发展,通过系统性地训练和利用ML模型&…...
Paper解读:工作场所人机协作的团队形成:促进组织变革的目标编程模型
人工智能(AI)具有降低运营成本、提高效率和改善客户体验的潜力。 因此,在组织中组建项目团队至关重要,这样他们就会在决策过程中欢迎人工智能。 当前的技术革命要求公司快速变革,并增加了对团队在促进创新采用方面的作…...
图文深入理解Oracle Network配置管理(一)
List item 本篇图文深入介绍Oracle Network配置管理。 Oracle Network概述 Oracle Net 服务 Oracle Net 监听程序 <oracle_home>/network/admin/listener.ora <oracle_home>/network/admin/sqlnet.ora建立网络连接 要建立客户机或中间层连接,Oracle…...
leetcode-链表篇3
leetcode-61 给你一个链表的头节点 head ,旋转链表,将链表每个节点向右移动 k 个位置。 示例 1: 输入:head [1,2,3,4,5], k 2 输出:[4,5,1,2,3]示例 2: 输入:head [0,1,2], k 4 输出&#x…...
RAG(Retrieval Augmented Generation)及衍生框架:CRAG、Self-RAG与HyDe的深入探讨
近年来,随着大型语言模型(LLMs)的迅猛发展,我们在寻求更精确、更可靠的语言生成能力上取得了显著进展。其中,检索增强生成(Retrieval-Augmented Generation)作为一种创新方法,极大地…...
C语言介绍
什么是C语言 C programing language 能干什么 Hello world? 如何学C语言 no reading no learning...
损失函数篇 | YOLOv10 更换损失函数之 MPDIoU | 《2023 一种用于高效准确的边界框回归的损失函数》
论文地址:https://arxiv.org/pdf/2307.07662v1.pdf 边界框回归(Bounding Box Regression,BBR)在目标检测和实例分割中得到了广泛应用,是目标定位的重要步骤。然而,对于边界框回归的大多数现有损失函数来说,当预测的边界框与真值边界框具有相同的长宽比,但宽度和高度的…...
WMware安装WMware Tools(Linux~Ubuntu)
1、这里终端里面输入sudo apt upgrade用于更新最新的包 sudo apt upgrade 2、安装 open-vm-tools-desktop 包, Ps:这里是以为我已经安装好了。 udo apt install open-vm-tools-desktop -y3、最后重启就大功告成了 reboot 4、测试是否成功:…...
eNSP-Cloud(实现本地电脑与eNSP内设备之间通信)
说明: 想象一下,你正在用eNSP搭建一个虚拟的网络世界,里面有虚拟的路由器、交换机、电脑(PC)等等。这些设备都在你的电脑里面“运行”,它们之间可以互相通信,就像一个封闭的小王国。 但是&#…...
Leetcode 3576. Transform Array to All Equal Elements
Leetcode 3576. Transform Array to All Equal Elements 1. 解题思路2. 代码实现 题目链接:3576. Transform Array to All Equal Elements 1. 解题思路 这一题思路上就是分别考察一下是否能将其转化为全1或者全-1数组即可。 至于每一种情况是否可以达到…...
PPT|230页| 制造集团企业供应链端到端的数字化解决方案:从需求到结算的全链路业务闭环构建
制造业采购供应链管理是企业运营的核心环节,供应链协同管理在供应链上下游企业之间建立紧密的合作关系,通过信息共享、资源整合、业务协同等方式,实现供应链的全面管理和优化,提高供应链的效率和透明度,降低供应链的成…...
线程同步:确保多线程程序的安全与高效!
全文目录: 开篇语前序前言第一部分:线程同步的概念与问题1.1 线程同步的概念1.2 线程同步的问题1.3 线程同步的解决方案 第二部分:synchronized关键字的使用2.1 使用 synchronized修饰方法2.2 使用 synchronized修饰代码块 第三部分ÿ…...
【2025年】解决Burpsuite抓不到https包的问题
环境:windows11 burpsuite:2025.5 在抓取https网站时,burpsuite抓取不到https数据包,只显示: 解决该问题只需如下三个步骤: 1、浏览器中访问 http://burp 2、下载 CA certificate 证书 3、在设置--隐私与安全--…...
C++ 求圆面积的程序(Program to find area of a circle)
给定半径r,求圆的面积。圆的面积应精确到小数点后5位。 例子: 输入:r 5 输出:78.53982 解释:由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982,因为我们只保留小数点后 5 位数字。 输…...
Typeerror: cannot read properties of undefined (reading ‘XXX‘)
最近需要在离线机器上运行软件,所以得把软件用docker打包起来,大部分功能都没问题,出了一个奇怪的事情。同样的代码,在本机上用vscode可以运行起来,但是打包之后在docker里出现了问题。使用的是dialog组件,…...
JVM 内存结构 详解
内存结构 运行时数据区: Java虚拟机在运行Java程序过程中管理的内存区域。 程序计数器: 线程私有,程序控制流的指示器,分支、循环、跳转、异常处理、线程恢复等基础功能都依赖这个计数器完成。 每个线程都有一个程序计数…...
人机融合智能 | “人智交互”跨学科新领域
本文系统地提出基于“以人为中心AI(HCAI)”理念的人-人工智能交互(人智交互)这一跨学科新领域及框架,定义人智交互领域的理念、基本理论和关键问题、方法、开发流程和参与团队等,阐述提出人智交互新领域的意义。然后,提出人智交互研究的三种新范式取向以及它们的意义。最后,总结…...
人工智能(大型语言模型 LLMs)对不同学科的影响以及由此产生的新学习方式
今天是关于AI如何在教学中增强学生的学习体验,我把重要信息标红了。人文学科的价值被低估了 ⬇️ 转型与必要性 人工智能正在深刻地改变教育,这并非炒作,而是已经发生的巨大变革。教育机构和教育者不能忽视它,试图简单地禁止学生使…...
