当前位置: 首页 > article >正文

SpringBoot 系列之集成 RabbitMQ 实现高效流量控制

系列博客专栏:

  • JVM系列博客专栏
  • SpringBoot系列博客

Spring Boot 2.2.1 集成 RabbitMQ 实现高效流量控制

在分布式系统中,消息队列是实现异步通信、解耦服务的重要组件。RabbitMQ 作为一款成熟的开源消息队列,广泛应用于各类项目中。本文将结合 Spring Boot 2.2.1,详细介绍如何集成 RabbitMQ 并实现基于队列长度、内存和磁盘的流量控制,同时引入服务端限流配置,进一步提升系统的稳定性与可靠性。

一、RabbitMQ 流量控制的重要性

当消息产生速度过快,超过消息队列的处理能力时,可能会导致队列积压、系统性能下降甚至崩溃。通过流量控制,可以有效限制消息的流入速度,使系统能够在合理的负载下运行,保障服务的稳定性和可靠性。

二、Spring Boot 2.2.1 集成 RabbitMQ 基础配置

1. 引入依赖

pom.xml 文件中添加 Spring Boot AMQP 和 Web 依赖:

<dependencies><!-- Spring Boot Starter AMQP --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- Spring Boot Starter Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- JSON处理依赖 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!-- Spring Boot Starter Test --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- RabbitMQ测试依赖 --><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency>
</dependencies>

2. 配置文件

application.yml 中配置 RabbitMQ 连接信息和相关参数:

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /requested-heartbeat: 30connection-timeout: 10000publisher-confirms: truepublisher-returns: truelistener:simple:acknowledge-mode: autoprefetch: 50concurrency: 3max-concurrency: 10cache:channel:size: 50checkout-timeout: 30000connection:mode: CHANNELsize: 5# 自定义流量控制配置
app:flow-control:max-messages: 1000duration: 5000

3. RabbitMQ 配置类

创建 RabbitMQConfig 类,配置队列、交换机、绑定关系、消息转换器以及 RabbitTemplate:

package com.example.springboot.rabbitmq.configuration;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@Slf4j
public class RabbitMQConfig {public static final String QUEUE_NAME = "flow.control.queue";public static final String EXCHANGE_NAME = "flow.control.exchange";public static final String ROUTING_KEY = "flow.control.key";// 配置队列@Beanpublic Queue queue() {return QueueBuilder.durable(QUEUE_NAME).maxLength(1000).build();}// 配置交换机@Beanpublic DirectExchange exchange() {return new DirectExchange(EXCHANGE_NAME);}// 绑定队列和交换机@Beanpublic Binding binding(Queue queue, DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);}// 配置消息转换器@Beanpublic Jackson2JsonMessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}// 配置RabbitTemplate@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,Jackson2JsonMessageConverter messageConverter) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(messageConverter);// 设置mandatory标志,确保消息在无法路由时返回rabbitTemplate.setMandatory(true);// 设置发布确认回调rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {log.info("消息发送成功: {}",  correlationData);} else {log.warn("消息发送失败: {}",  cause);}});// 设置返回回调rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {log.info("消息被退回: {}", new String(message.getBody()));log.info("回复码: ", replyCode);log.info("回复文本: ", replyText);log.info("交换机: ", exchange);log.info("路由键: ", routingKey);});return rabbitTemplate;}// 配置监听器容器工厂@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,Jackson2JsonMessageConverter messageConverter) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(messageConverter);factory.setConcurrentConsumers(3); // 设置并发消费者数量factory.setMaxConcurrentConsumers(10);factory.setPrefetchCount(50); // 设置 QoSfactory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动确认模式return factory;}
}

三、基于队列长度的流量控制

MessageProducer 类中实现基于队列长度的流量控制逻辑:

package com.example.demo.service;import com.example.demo.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;@Service
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;private final AtomicInteger messageCount = new AtomicInteger(0);private static final int MAX_MESSAGES = 1000;private volatile boolean flowControlEnabled = false;public void sendMessage(String message) {if (flowControlEnabled) {System.out.println("流量控制已启用,暂停发送消息");return;}if (messageCount.get() >= MAX_MESSAGES) {System.out.println("达到最大消息数量,触发流量控制");enableFlowControl(5000);return;}String correlationId = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(correlationId);rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,RabbitMQConfig.ROUTING_KEY,message,correlationData);messageCount.incrementAndGet();System.out.println("发送消息: " + message + ", 消息ID: " + correlationId);}public void enableFlowControl(long durationMillis) {flowControlEnabled = true;System.out.println("流量控制已启用,持续时间: " + durationMillis + "ms");new Thread(() -> {try {Thread.sleep(durationMillis);} catch (InterruptedException e) {Thread.currentThread().interrupt();}flowControlEnabled = false;messageCount.set(0);System.out.println("流量控制已禁用");}).start();}
}

除了用代码限制外,可以用maxLength设置,示例代码:

 // 配置队列@Beanpublic Queue queue() {return QueueBuilder.durable(QUEUE_NAME).maxLength(1000).build();}

四、x-max-length-bytes 参数详解

x-max-length-bytes 用于限制队列中消息的总字节数。在创建队列时,可以通过代码配置:

@Bean
public Queue queue() {return QueueBuilder.durable(QUEUE_NAME).maxLength(1000).maxLengthBytes(1024 * 1024 * 10) // 设置队列消息总字节数上限为10MB.build();
}

当队列中消息的总字节数达到设定的阈值时,后续新消息的处理策略由 x-overflow 参数决定:

  • drop-head:丢弃队列头部的消息,为新消息腾出空间。
  • reject-publish:拒绝接收新消息,并向生产者返回 Basic.Reject 响应。

五、基于内存和磁盘的流量控制

通过配置 RabbitMQ 服务器的内存和磁盘告警阈值,当服务器内存使用或磁盘空间达到阈值时,会自动触发流量控制。例如:

rabbitmqctl set_vm_memory_high_watermark 0.6

此命令将内存高水位线设置为系统内存的 60%。

六、服务端限流配置

1. 基于 Guava 的限流实现

添加 Guava 依赖:

<dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>28.2-jre</version>
</dependency>

使用 RateLimiter 进行限流:

package com.example.demo.service;import com.google.common.util.concurrent.RateLimiter;
import org.springframework.stereotype.Service;@Service
public class LimitedService {private final RateLimiter rateLimiter = RateLimiter.create(5);public void limitedMethod() {if (rateLimiter.tryAcquire()) {System.out.println("请求被处理");} else {System.out.println("请求被限流");}}
}

七、 消费端限流

默认情况下,如果不进行配置,RabbitMQ会尽可能快速地把队列中的消息发送到消费者。如果消息数量过多,可能会导致OOM或者影响其他进程的正常运行

1. 消费端限流示例

package com.example.springboot.rabbitmq.service;import com.example.springboot.rabbitmq.configuration.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;@Service
@Slf4j
public class MessageConsumer {@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)@Retryable(value = {IOException.class}, maxAttempts = 3,backoff = @Backoff(delay = 2000, multiplier = 2))public void receiveMessage(Message message, Channel channel) throws IOException {try {if (channel == null || !channel.isOpen()) {log.warn("Channel is closed or null, unable to process message");return;}// 动态设置预取计数channel.basicQos(calculatePrefetchCount());String content = new String(message.getBody());log.info("接收到消息:{} ", content);// 模拟消息处理时间Thread.sleep(100);// 发送消息确认channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);log.info("消息处理完成");} catch (Exception e) {log.error("处理消息时发生错误: {}", e.getMessage(), e);if (channel != null && channel.isOpen()) {channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // 失败后重新入队}}}// 根据系统负载动态计算预取计数private int calculatePrefetchCount() {double cpuLoad = getSystemCpuLoad();int basePrefetch = 10;return (int) Math.max(1, basePrefetch * (1 - cpuLoad));}// 获取当前系统 CPU 负载private double getSystemCpuLoad() {OperatingSystemMXBean osBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();return osBean.getSystemLoadAverage() / osBean.getAvailableProcessors();}}

八、总结

通过上述配置和代码示例,您可以实现对 RabbitMQ 的高效流量控制,从而提升系统的稳定性和可靠性。合理利用队列长度限制、内存和磁盘流量控制,以及服务端限流策略,可以帮助系统在高负载情况下保持良好的运行状态。

相关文章:

SpringBoot 系列之集成 RabbitMQ 实现高效流量控制

系列博客专栏&#xff1a; JVM系列博客专栏SpringBoot系列博客 Spring Boot 2.2.1 集成 RabbitMQ 实现高效流量控制 在分布式系统中&#xff0c;消息队列是实现异步通信、解耦服务的重要组件。RabbitMQ 作为一款成熟的开源消息队列&#xff0c;广泛应用于各类项目中。本文将…...

LLaMA-Factory和python版本的兼容性问题解决

引言 笔者今天在电脑上安装下LLaMA-Factory做下本地的模型调优。 从github上拉取代码git clone https://github.com/hiyouga/LLaMA-Factory.git. pycharm建立工程,按照官网指导如下: LLaMA-Factory 安装 在安装 LLaMA-Factory 之前&#xff0c;请确保您安装了下列依赖: 运行以…...

掌握子网划分:优化IP分配与管理

子网划分是通过调整子网掩码&#xff0c;将单一IP网络划分为多个逻辑子网的过程&#xff0c;其核心原理是借用主机位作为子网位以优化地址分配和管理。具体方法与原理如下&#xff1a; 一、子网划分基本原理 核心目的&#xff1a; 减少IP浪费&#xff1a;避免大块地址闲置&…...

Linux中shell编程表达式和数组讲解

一、表达式 1.1 测试表达式 样式1: test 条件表达式 样式2: [ 条件表达式 ] 注意&#xff1a;以上两种方法的作用完全一样&#xff0c;后者为常用。但后者需要注意方括号[、]与条件表达式之间至少有一个空格。test跟 [] 的意思一样条件成立&#xff0c;状态返回值是0条件不成…...

每日算法-250605

每日算法 - 20240605 525. 连续数组 题目描述 给定一个二进制数组 nums , 找到含有相同数量的 0 和 1 的最长连续子数组&#xff0c;并返回该子数组的长度。 思路 前缀和 哈希表 解题过程 核心思想是将问题巧妙地转换为寻找和为特定值的子数组问题。 转换问题&#xff1a;我…...

分布式锁-Redisson实现

目录 本地锁的局限性 Redisson解决分布式锁问题 在分布式环境下&#xff0c;分布式锁可以保证在多个节点上的并发操作时数据的一致性和互斥性。分布式锁有多种实现方案&#xff0c;最常用的两种方案是&#xff1a;zookeeper和redis&#xff0c;本文介绍redis实现分布式锁方案…...

HTTP 请求协议简单介绍

目录 常见的 HTTP 响应头字段 Java 示例代码&#xff1a;发送 HTTP 请求并处理响应 代码解释&#xff1a; 运行结果&#xff1a; 文件名&#xff1a; 总结&#xff1a; HTTP&#xff08;HyperText Transfer Protocol&#xff09;是用于客户端与服务器之间通信的协议。它定…...

C++学习-入门到精通【14】标准库算法

C学习-入门到精通【14】标准库算法 目录 C学习-入门到精通【14】标准库算法一、对迭代器的最低要求迭代器无效 二、算法1.fill、fill_n、generate和generate_n2.equal、mismatch和lexicographical_compare3.remove、remove_if、remove_copy和remove_copy_if4.replace、replace_…...

银行用户评分规则 深度学习

思考模型的实际应用场景。用户的核心疑问在于&#xff1a;在银行真实的评级系统中&#xff0c;基于规则的评级和基于模型的预测评级哪个更有价值&#xff1f;ta担心自己写的代码只是学术练习而没有实际意义。 从用户提到的银行评级规则来看&#xff08;AAAA到E的划分&#xff…...

HarmonyOS运动语音开发:如何让运动开始时的语音播报更温暖

##鸿蒙核心技术##运动开发##Core Speech Kit&#xff08;基础语音服务&#xff09;# 前言 在运动类应用中&#xff0c;语音播报功能不仅可以提升用户体验&#xff0c;还能让运动过程更加生动有趣。想象一下&#xff0c;当你准备开始运动时&#xff0c;一个温暖的声音提醒你“…...

# 从底层架构到应用实践:为何部分大模型在越狱攻击下失守?

从底层架构到应用实践&#xff1a;为何部分大模型在越狱攻击下失守&#xff1f; 引言 近期&#xff0c;我们对多个主流大语言模型&#xff08;LLM&#xff09;进行了安全性测试&#xff0c;使用了极具诱导性的越狱提示词&#xff0c;试图绕过其内容安全机制。测试结果显示&am…...

vscode使用系列之快速生成html模板

一.欢迎来到我的酒馆 vscode&#xff0c;yyds! 目录 一.欢迎来到我的酒馆二.vscode下载安装1.关于vscode你需要知道2.开始下载安装 三.vscode快速创建html模板 二.vscode下载安装 1.关于vscode你需要知道 Q&#xff1a;为什么使用vscode? A&#xff1a;使用vscode写…...

Thinkphp6软删除

方法一 从控制器层直接操作 删除 此操作不会直接删除数据 而是在delete_time字段更新删除时间 ->useSoftDelete(delete_time,get_datetime())->delete() 查询 这里的数据库字段需要设置为默认NULL 查询的时候仅查询未更新删除时间的数据 ->whereNull("dele…...

网页前端开发(基础进阶4--axios)

Ajax Ajax(异步的JavaScript和XML) 。 XML是可扩展标记语言&#xff0c;本质上是一种数据格式&#xff0c;可以用来存储复杂的数据结构。 可以通过Ajax给服务器发送请求&#xff0c;并获取服务器响应的数据。 Ajax采用异步交互&#xff1a;可以在不重新加载整个页面的情况下&am…...

软件安全:漏洞利用与渗透测试剖析、流程、方法、案例

在数字时代&#xff0c;软件已深度融入生活与工作的方方面面&#xff0c;从手机应用到企业核心系统&#xff0c;软件安全至关重要。而漏洞利用与渗透测试&#xff0c;作为软件安全领域中相互关联的两个关键环节&#xff0c;一个是黑客攻击的手段&#xff0c;一个是安全防护的方…...

Haproxy的基础配置

1、参考文档 官方文档&#xff1a;HAProxy version 2.2.22 - Configuration Manual 运维派配置手册&#xff1a;Haproxy-基础配置详解 - 运维派 Haproxy 的配置文件haproxy.cfg由两大部分组成&#xff0c;分别是global和proxies部分。 2、haproxy global 配置 global&…...

考研系列—操作系统:冲刺笔记(1-3章)

目录 第一章 计算机系统概述 1.基本概念 2.内核态和用户态 3.中断(外中断)、异常(内中断-与当前执行的) 4.系统调用 5.操作系统引导程序 2021年真题: 6.操作系统结构 大纲新增 (1)分层结构 (2)模块化 (3)外核 7.虚拟机 第二章 进程管理 1.画作业运行的顺序和甘…...

使用 Docker Compose 部署 Jenkins(LTS 版)持续集成环境

一、前言 Jenkins 是目前最流行的开源持续集成工具之一。本教程将手把手带你使用 Docker Compose 快速部署 Jenkins LTS&#xff08;长期支持版本&#xff09;&#xff0c;同时保留数据持久化、Docker 命令转发等功能&#xff0c;适合用于生产或本地开发测试环境。 二、环境准…...

Java调用大模型API实战指南

文章目录 前言调用大模型的流程概述和基本原理获取 DeepSeek 的 API keyJava 实现调用大模型 API 的Demo进阶扩展建议 前言 随着大语言模型&#xff08;如 OpenAI、DeepSeek、通义千问等&#xff09;的发展&#xff0c;我们可以很方便地用 API 接口调用这些强大的智能助手。在…...

C#中的路由事件(Routed Events)

路由事件的基本概念 路由事件是WPF中特有的事件系统&#xff0c;它允许事件在可视化树中"路由"传递&#xff0c;具有以下特点&#xff1a; 事件路由方向&#xff1a; 冒泡(Tunneling)&#xff1a;从事件源向根元素传递 隧道(Bubbling)&#xff1a;从根元素向事件源…...

[蓝桥杯]通电

通电 题目描述 2015 年&#xff0c;全中国实现了户户通电。作为一名电力建设者&#xff0c;小明正在帮助一带一路上的国家通电。 这一次&#xff0c;小明要帮助 nn 个村庄通电&#xff0c;其中 1 号村庄正好可以建立一个发电站&#xff0c;所发的电足够所有村庄使用。 现在…...

单片机0-10V电压输出电路分享

一、原理图 二、芯片介绍 GP8101是一个PWM信号转模拟信号转换器&#xff0c;相当于一个PWM信号输入&#xff0c;模拟信号输出的DAC。此 芯片可以将占空比为0%到100%的PWM信号线性转换成0-5V或者0-10V的模拟电压&#xff0c;并且输出电压 精度小于1%。GP8101M可以处理高频调制的…...

从零开始,搭建一个基于 Django 的 Web 项目

&#x1f3af; 主要步骤概述 1️⃣ 安装 Python 和 pip 2️⃣ 创建虚拟环境 3️⃣ 安装 Django 4️⃣ 创建 Django 项目 5️⃣ 运行开发服务器 6️⃣ 创建一个简单的应用&#xff08;app&#xff09; 7️⃣ 配置数据库并迁移 8️⃣ 创建超级用户&#xff08;admin&#xff09;…...

大模型模型部署和暴露接口

创建环境 激活案件 安装相关依赖 conda create -n fastApi python3.10 conda activate fastApi conda install -c conda-forge fastapi uvicorn transformers pytorch pip install safetensors sentencepiece protobuf 新建文件夹 mkdir App cd App touch main.py 复制代码…...

2025服装收银系统推荐:智能管理助力服装商家高效经营

在服装批发零售行业&#xff0c;一套高效的收银系统不仅能简化日常经营流程&#xff0c;还能通过数据分析帮助商家优化库存、提升销售。随着AI技术的普及&#xff0c;现代收银系统已不再局限于简单的记账功能&#xff0c;而是能提供智能选品、库存预警、精准营销等进阶服务。 …...

Microsoft Copilot Studio - 尝试一下Agent

1.简单介绍 Microsoft Copilot Studio以前的名字是Power Virtual Agent(简称PVA)。Power Virutal Agent是2019年出现的&#xff0c;是低代码平台Power Platform的一部分。当时Generative AI还没有出现&#xff0c;但是基于已有的Conversation AI技术&#xff0c;即Microsoft L…...

【Python 算法零基础 4.排序 ⑨ 堆排序】

目录 一、问题描述 二、算法对比 1.朴素算法 ① 数组模拟容器 ② 有序数组模拟容器 2.二叉堆 ① 二叉堆特点 ② 数组表示二叉树 ③ 堆 ④ 大顶堆 ⑤ 小顶堆 ⑥ 元素插入 ⑦ 获取堆顶 ⑧ 堆顶元素删除 三、代码分析 1.工具函数 2.调整大顶堆函数 Ⅰ、计算子节点索引 Ⅱ、找出最…...

Deepseek/cherry studio中的Latex公式复制到word中

需要将Deepseek/cherry studio中公式复制到word中&#xff0c;但是deepseek输出Latex公式&#xff0c;比如以下Latex代码段&#xff0c;需要通过Mathtype翻译才能在word中编辑。 $$\begin{aligned}H_1(k1) & H_1(k) \frac{1}{A_1} \left( Q_1 u_1(k) Q_{i1} - Q_2 u_2(k…...

测试设计技术全解析:黑盒与白盒测试的七种武器与覆盖率指标

在软件开发的生命周期中&#xff0c;测试设计技术扮演着至关重要的角色&#xff0c;它直接影响着产品质量和用户体验。测试设计技术主要分为黑盒测试技术和白盒测试技术两大类&#xff0c;它们各有优势和适用场景。黑盒测试技术侧重于从用户视角验证软件功能是否符合需求&#…...

AWS中国区IAM相关凭证自行管理策略(只读CodeCommit版)

目标 需要从CodeCommit读取代码。除了设置AWS托管策略&#xff1a;AWSCodeCommitReadOnly。还需要自定义策略&#xff0c;让用户能够自行管理IAM自己的相关凭证。 IAM自定义策略 {"Version": "2012-10-17","Statement": [{"Sid": &…...