当前位置: 首页 > news >正文

【RabbitMQ】消息堆积、推拉模式

消息堆积

原因

消息堆积是指在消息队列中,待处理的消息数量超过了消费者处理能力,导致消息在队列中不断堆积的现象。通常有以下几种原因:

消息生产过快:在高流量或者高负载的情况下,生产者以极高的速率发送消息,超过了消费者的处理能力。

消费者处理能力不足:消费者消费消息的速度跟不上消息生产的速度,也会导致消息在队列中积压。可能的原因有:

  • 消费端业务逻辑复杂、耗时长。
  • 消费端代码性能低。
  • 系统资源限制,如CPU、内存、磁盘等也会限制消费者处理消息的速率。
  • 异常处理不当,消费者在处理消息时出现异常,导致消息无法被正确处理和确认。

网络问题:因为网络延迟或不稳定,消费者无法及时接收或确认消息,最终导致消息积压。

RabbitMQ服务器配置偏低

解决方案

消息积压可能会导致系统性能下降,影响用户体验,甚至导致系统崩溃。因此,及时发现消息积压并解决对于维护系统稳定性至关重要。

遇到消息积压时,首先要分析消息积压造成的原因,根据原因来调整策略。通常从以下几个方面考虑:

提高消费者效率

  • 增加消费者实例数量,比如新增机器。
  • 优化业务逻辑,比如使用多线程来处理业务。
  • 设置prefetchCount,当一个消费者阻塞时,消息转发到其他未阻塞的消费者。
  • 消息发生异常时,设置合适的重试策略,或者转入到死信队列。

限制生产者效率:比如流量控制、限流算法等。

  • 流量控制:在消息生产者中实现流量控制逻辑,根据消费者处理能力动态调整发送效率。
  • 限流:使用限流工具,为消息发送效率设置一个上限。
  • 设置过期时间:如果消息过期未被消费,可以配置死信队列,以避免消息丢失,同时减少对主队列的压力。

资源与配置优化:比如省级RabbitMQ服务器的硬件,调整RabbitMQ的配置参数等。

推拉模式

概述

RabbitMQ支持两种消息传递模式:推模式(push)和拉模式(pull)。

推模式:消息中间件主动将消息推送给消费者(对消息的获取更加实时,适合对数据实时性要求较高的业务,例如实时数据处理:监控系统、报表系统等)。

拉模式:消费者主动从消息中间件拉取消息(消费端可以按照自己的处理速度来消费,避免消息积压,适合需要流量控制,或者需要大量计算资源的任务,拉取模式允许消费者准备好之后再请求消息,避免资源浪费)。

RabbitMQ主要就是基于推模式工作的,例如最开始谈到的几种工作模式,都是基于推模式来进行实现。它的设计核心是让消息队列中的消费者接收到由生产者发送的消息,使用channel.basicConsume方式订阅队列,MQ就会把消息推送到订阅该队列的消费者。如果只想从队列中获取单条消息而不是持续订阅,则可以使用channel.basicGet方式来进行消费消息。

SpringBoot方式

@Configuration
public class PushAndPull {@Bean("pushQueue")public Queue pushQueue() {return QueueBuilder.durable(Constants.PUSH_QUEUE).build();}@Bean("pushExchange")public Exchange pushExchange() {return ExchangeBuilder.directExchange(Constants.PUSH_EXCHANGE).durable(true).build();}@Bean("pushQueueBind")public Binding pushQueueBind(@Qualifier("pushExchange") Exchange exchange,@Qualifier("pushQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("push").noargs();}@Bean("pullQueue")public Queue pullQueue() {return QueueBuilder.durable(Constants.PULL_QUEUE).build();}@Bean("pullExchange")public Exchange pullExchange() {return ExchangeBuilder.directExchange(Constants.PULL_EXCHANGE).durable(true).build();}@Bean("pullQueueBind")public Binding pullQueueBind(@Qualifier("pullExchange") Exchange exchange,@Qualifier("pullQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("pull").noargs();}
}
@Slf4j
@RestController
@RequestMapping("/pushAndPull")
public class PushAndPullController {@Resourceprivate RabbitTemplate rabbitTemplate;// 推模式@RequestMapping("/push")public void push() {this.rabbitTemplate.convertAndSend(Constants.PUSH_EXCHANGE, "push", "hello push");System.out.println("推模式消息发送成功!");}// 拉模式@RequestMapping("/pull")public void pull() {this.rabbitTemplate.convertAndSend(Constants.PULL_EXCHANGE, "pull", "hello pull");System.out.println("拉模式消息发送成功!");}}
@Configuration
@RestController
public class PushAndPullListener {// 推模式@RabbitListener(queues = Constants.PUSH_QUEUE)public void push(String message) {System.out.println("推模式: " + message);}@Resourceprivate RabbitTemplate rabbitTemplate;// 拉模式@RequestMapping("/pullConsumer")public void pull(String message) {Message receive = this.rabbitTemplate.receive(Constants.PULL_QUEUE);System.out.println("拉模式: " + receive);}}

SDK方式

// 推模式生产者
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("113.45.220.15"); // IPconnectionFactory.setPort(5672); // PORTconnectionFactory.setUsername("admin"); // 用户名connectionFactory.setPassword("admin"); // 密码connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机// TODO 创建连接Connection connection = connectionFactory.newConnection();// TODO 获取信道Channel channel = connection.createChannel();// TODO 声明交换机channel.exchangeDeclare(Constants.PUSH_EXCHANGE, BuiltinExchangeType.DIRECT, true);// TODO 声明队列channel.queueDeclare(Constants.PUSH_QUEUE, true, false, false, null);// TODO 绑定交换机和队列channel.queueBind(Constants.PUSH_QUEUE, Constants.PUSH_EXCHANGE, "push");// TODO 发送消息channel.basicPublish(Constants.PUSH_EXCHANGE, "push", null, "推模式".getBytes());System.out.println("推模式发送消息成功!");// TODO 释放资源channel.close();connection.close();}}
// 推模式消费者
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("113.45.220.15"); // IPconnectionFactory.setPort(5672); // PORTconnectionFactory.setUsername("admin"); // 用户名connectionFactory.setPassword("admin"); // 密码connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机// TODO 创建连接Connection connection = connectionFactory.newConnection();// TODO 获取信道Channel channel = connection.createChannel();// TODO 声明交换机channel.exchangeDeclare(Constants.PUSH_EXCHANGE, BuiltinExchangeType.DIRECT, true);// TODO 声明队列channel.queueDeclare(Constants.PUSH_QUEUE, true, false, false, null);// TODO 绑定交换机和队列channel.queueBind(Constants.PUSH_QUEUE, Constants.PUSH_EXCHANGE, "push");// TODO 接收消息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("推模式消费者接收到消息:" + new String(body));}};channel.basicConsume(Constants.PUSH_QUEUE, true, consumer);// TODO 释放资源}}
// 拉模式生产者
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("113.45.220.15"); // IPconnectionFactory.setPort(5672); // PORTconnectionFactory.setUsername("admin"); // 用户名connectionFactory.setPassword("admin"); // 密码connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机// TODO 创建连接Connection connection = connectionFactory.newConnection();// TODO 获取信道Channel channel = connection.createChannel();// TODO 声明交换机channel.exchangeDeclare(Constants.PULL_EXCHANGE, BuiltinExchangeType.DIRECT, true);// TODO 声明队列channel.queueDeclare(Constants.PULL_QUEUE, true, false, false, null);// TODO 绑定交换机和队列channel.queueBind(Constants.PULL_QUEUE, Constants.PULL_EXCHANGE, "pull");// TODO 发送消息channel.basicPublish(Constants.PULL_EXCHANGE, "pull", null, "拉模式".getBytes());System.out.println("拉模式发送消息成功!");// TODO 释放资源channel.close();connection.close();}}
// 拉模式消费者
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("113.45.220.15"); // IPconnectionFactory.setPort(5672); // PORTconnectionFactory.setUsername("admin"); // 用户名connectionFactory.setPassword("admin"); // 密码connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机// TODO 创建连接Connection connection = connectionFactory.newConnection();// TODO 获取信道Channel channel = connection.createChannel();// TODO 声明交换机channel.exchangeDeclare(Constants.PULL_EXCHANGE, BuiltinExchangeType.DIRECT, true);// TODO 声明队列channel.queueDeclare(Constants.PULL_QUEUE, true, false, false, null);// TODO 绑定交换机和队列channel.queueBind(Constants.PULL_QUEUE, Constants.PULL_EXCHANGE, "pull");// TODO 接收消息GetResponse getResponse = channel.basicGet(Constants.PULL_QUEUE, true);if (getResponse != null) {System.out.println("拉模式收到消息:" + new String(getResponse.getBody()));}// TODO 释放资源}}

相关文章:

【RabbitMQ】消息堆积、推拉模式

消息堆积 原因 消息堆积是指在消息队列中,待处理的消息数量超过了消费者处理能力,导致消息在队列中不断堆积的现象。通常有以下几种原因: 消息生产过快:在高流量或者高负载的情况下,生产者以极高的速率发送消息&…...

MySQL常用SQL语句(持续更新中)

文章目录 数据库相关表相关索引相关添加索引 编码相关系统变量相关 收录一些经常用到的sql 数据库相关 建数据库 CREATE DATABASE [IF NOT EXISTS] <数据库名> [[DEFAULT] CHARACTER SET <字符集名>] [[DEFAULT] COLLATE <校对规则名>];例如&#xff1a; C…...

【更新】红色文化之红色博物馆数据集(经纬度+地址)

数据简介&#xff1a;红色博物馆作为国家红色文化传承与爱国主义教育的重要基地&#xff0c;遍布全国各地&#xff0c;承载着丰富的革命历史与文化记忆。本数据说明旨在汇总并分析全国范围内具有代表性的红色博物馆的基本信息&#xff0c;包括其地址、特色及教育意义&#xff0…...

Python项目Flask框架整合Redis

一、在配置文件中创建Redis连接信息 二、 实现Redis配置类 import redis from config.config import REDIS_HOST, REDIS_PORT, REDIS_PASSWD, REDIS_DB, EXPIRE_TIMEclass RedisDb():def __init__(self, REDIS_HOST, REDIS_PORT, REDIS_DB, EXPIRE_TIME, REDIS_PASSWD):# 建立…...

完整网络模型训练(一)

文章目录 一、网络模型的搭建二、网络模型正确性检验三、创建网络函数 一、网络模型的搭建 以CIFAR10数据集作为训练例子 准备数据集&#xff1a; #因为CIFAR10是属于PRL的数据集&#xff0c;所以需要转化成tensor数据集 train_data torchvision.datasets.CIFAR10(root&quo…...

高效便捷,体验不一样的韩语翻译神器

嘿&#xff0c;大家好啊&#xff01;今天想跟大家聊聊我用过的几款翻译神器&#xff0c;特别是它们在翻译韩语时的那些小感受。作为一个偶尔需要啃啃韩语资料或者跟韩国朋友聊天的普通人&#xff0c;我真心觉得这些翻译工具简直就是我的救星&#xff01; 一、福昕在线翻译 网址…...

Markdown笔记管理工具Haptic

什么是 Haptic &#xff1f; Haptic 是一个新的本地优先、注重隐私的开源 Markdown 笔记管理工具。它简约、轻量、高效&#xff0c;旨在提供您所需的一切&#xff0c;而不包含多余的功能。 目前官方提供了 docker 和 Mac 客户端。 Haptic 仍在积极开发中。以下是未来计划的一些…...

网络原理-传输层UDP

上集回顾&#xff1a; 上一篇博客中讲述了应用层如何自定义协议&#xff1a;确定传输信息&#xff0c;确定数据格式 应用层也有一些现成的协议&#xff1a;HTTP协议 这一篇博客中来讲述传输层协议 传输层 socket api都是传输层协议提供的&#xff08;操作系统内核实现的了…...

C++中,如何使你设计的迭代器被标准算法库所支持。

iterator&#xff08;读写迭代器&#xff09; const_iterator&#xff08;只读迭代器&#xff09; reverse_iterator&#xff08;反向读写迭代器&#xff09; const_reverse_iterator&#xff08;反向只读迭代器&#xff09; 以经常介绍的_DList类为例&#xff0c;它的迭代…...

Java NIO 全面详解:掌握 `Path` 和 `Files` 的一切

在 Java 7 中引入的 NIO (New I/O) 为文件系统和流的操作带来了强大的能力&#xff0c;其中 Path 和 Files 是核心部分。Path 作为对文件路径的抽象&#xff0c;提供了灵活的方式处理文件系统中的路径&#xff1b;Files 则通过一系列静态方法&#xff0c;使得文件的读写、复制、…...

bluez免提协议hands-free介绍,全到无法想象,bluez hfp ag介绍

零. 前言 由于Bluez的介绍文档有限,以及对Linux 系统/驱动概念、D-Bus 通信和蓝牙协议都有要求,加上网络上其实没有一个完整的介绍Bluez系列的文档,所以不管是蓝牙初学者还是蓝牙从业人员,都有不小的难度,学习曲线也相对较陡,所以我有了这个想法,专门对Bluez做一个系统…...

关于区块链的安全和隐私

背景 区块链技术在近年来发展迅速&#xff0c;被认为是安全计算的突破&#xff0c;但其安全和隐私问题在不同应用中的部署仍处于争论焦点。 目的 对区块链的安全和隐私进行全面综述&#xff0c;帮助读者深入了解区块链的相关概念、属性、技术和系统。 结构 首先介绍区块链…...

特征工程——一门提高机器学习性能的艺术

当前围绕人工智能(AI)和机器学习(ML)展开的许多讨论以模型为中心&#xff0c;聚焦于 ML和深度学习(DL)的最新进展。这种模型优先的方法往往对用于训练这些模型的数据关注不足&#xff0c;甚至完全忽视。类似MLOps的领域正迅速发展&#xff0c;通过系统性地训练和利用ML模型&…...

Paper解读:工作场所人机协作的团队形成:促进组织变革的目标编程模型

人工智能&#xff08;AI&#xff09;具有降低运营成本、提高效率和改善客户体验的潜力。 因此&#xff0c;在组织中组建项目团队至关重要&#xff0c;这样他们就会在决策过程中欢迎人工智能。 当前的技术革命要求公司快速变革&#xff0c;并增加了对团队在促进创新采用方面的作…...

图文深入理解Oracle Network配置管理(一)

List item 本篇图文深入介绍Oracle Network配置管理。 Oracle Network概述 Oracle Net 服务 Oracle Net 监听程序 <oracle_home>/network/admin/listener.ora <oracle_home>/network/admin/sqlnet.ora建立网络连接 要建立客户机或中间层连接&#xff0c;Oracle…...

leetcode-链表篇3

leetcode-61 给你一个链表的头节点 head &#xff0c;旋转链表&#xff0c;将链表每个节点向右移动 k 个位置。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,4,5], k 2 输出&#xff1a;[4,5,1,2,3]示例 2&#xff1a; 输入&#xff1a;head [0,1,2], k 4 输出&#x…...

RAG(Retrieval Augmented Generation)及衍生框架:CRAG、Self-RAG与HyDe的深入探讨

近年来&#xff0c;随着大型语言模型&#xff08;LLMs&#xff09;的迅猛发展&#xff0c;我们在寻求更精确、更可靠的语言生成能力上取得了显著进展。其中&#xff0c;检索增强生成&#xff08;Retrieval-Augmented Generation&#xff09;作为一种创新方法&#xff0c;极大地…...

C语言介绍

什么是C语言 C programing language 能干什么 Hello world&#xff1f; 如何学C语言 no reading no learning...

损失函数篇 | YOLOv10 更换损失函数之 MPDIoU | 《2023 一种用于高效准确的边界框回归的损失函数》

论文地址:https://arxiv.org/pdf/2307.07662v1.pdf 边界框回归(Bounding Box Regression,BBR)在目标检测和实例分割中得到了广泛应用,是目标定位的重要步骤。然而,对于边界框回归的大多数现有损失函数来说,当预测的边界框与真值边界框具有相同的长宽比,但宽度和高度的…...

WMware安装WMware Tools(Linux~Ubuntu)

1、这里终端里面输入sudo apt upgrade用于更新最新的包 sudo apt upgrade 2、安装 open-vm-tools-desktop 包&#xff0c; Ps&#xff1a;这里是以为我已经安装好了。 udo apt install open-vm-tools-desktop -y3、最后重启就大功告成了 reboot 4、测试是否成功&#xff1a…...

Lombok 的 @Data 注解失效,未生成 getter/setter 方法引发的HTTP 406 错误

HTTP 状态码 406 (Not Acceptable) 和 500 (Internal Server Error) 是两类完全不同的错误&#xff0c;它们的含义、原因和解决方法都有显著区别。以下是详细对比&#xff1a; 1. HTTP 406 (Not Acceptable) 含义&#xff1a; 客户端请求的内容类型与服务器支持的内容类型不匹…...

椭圆曲线密码学(ECC)

一、ECC算法概述 椭圆曲线密码学&#xff08;Elliptic Curve Cryptography&#xff09;是基于椭圆曲线数学理论的公钥密码系统&#xff0c;由Neal Koblitz和Victor Miller在1985年独立提出。相比RSA&#xff0c;ECC在相同安全强度下密钥更短&#xff08;256位ECC ≈ 3072位RSA…...

golang循环变量捕获问题​​

在 Go 语言中&#xff0c;当在循环中启动协程&#xff08;goroutine&#xff09;时&#xff0c;如果在协程闭包中直接引用循环变量&#xff0c;可能会遇到一个常见的陷阱 - ​​循环变量捕获问题​​。让我详细解释一下&#xff1a; 问题背景 看这个代码片段&#xff1a; fo…...

(二)TensorRT-LLM | 模型导出(v0.20.0rc3)

0. 概述 上一节 对安装和使用有个基本介绍。根据这个 issue 的描述&#xff0c;后续 TensorRT-LLM 团队可能更专注于更新和维护 pytorch backend。但 tensorrt backend 作为先前一直开发的工作&#xff0c;其中包含了大量可以学习的地方。本文主要看看它导出模型的部分&#x…...

理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端

&#x1f31f; 什么是 MCP&#xff1f; 模型控制协议 (MCP) 是一种创新的协议&#xff0c;旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议&#xff0c;它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...

蓝桥杯 2024 15届国赛 A组 儿童节快乐

P10576 [蓝桥杯 2024 国 A] 儿童节快乐 题目描述 五彩斑斓的气球在蓝天下悠然飘荡&#xff0c;轻快的音乐在耳边持续回荡&#xff0c;小朋友们手牵着手一同畅快欢笑。在这样一片安乐祥和的氛围下&#xff0c;六一来了。 今天是六一儿童节&#xff0c;小蓝老师为了让大家在节…...

ESP32读取DHT11温湿度数据

芯片&#xff1a;ESP32 环境&#xff1a;Arduino 一、安装DHT11传感器库 红框的库&#xff0c;别安装错了 二、代码 注意&#xff0c;DATA口要连接在D15上 #include "DHT.h" // 包含DHT库#define DHTPIN 15 // 定义DHT11数据引脚连接到ESP32的GPIO15 #define D…...

MVC 数据库

MVC 数据库 引言 在软件开发领域,Model-View-Controller(MVC)是一种流行的软件架构模式,它将应用程序分为三个核心组件:模型(Model)、视图(View)和控制器(Controller)。这种模式有助于提高代码的可维护性和可扩展性。本文将深入探讨MVC架构与数据库之间的关系,以…...

Python实现prophet 理论及参数优化

文章目录 Prophet理论及模型参数介绍Python代码完整实现prophet 添加外部数据进行模型优化 之前初步学习prophet的时候&#xff0c;写过一篇简单实现&#xff0c;后期随着对该模型的深入研究&#xff0c;本次记录涉及到prophet 的公式以及参数调优&#xff0c;从公式可以更直观…...

postgresql|数据库|只读用户的创建和删除(备忘)

CREATE USER read_only WITH PASSWORD 密码 -- 连接到xxx数据库 \c xxx -- 授予对xxx数据库的只读权限 GRANT CONNECT ON DATABASE xxx TO read_only; GRANT USAGE ON SCHEMA public TO read_only; GRANT SELECT ON ALL TABLES IN SCHEMA public TO read_only; GRANT EXECUTE O…...