RabbitMQ高级特性 - 消息分发(限流、负载均衡)
文章目录
- RabbitMQ 消息分发
- 概述
- 如何实现消费分发机制(限制每个队列消息数量)
- 使用场景
- 限流
- 背景
- 实现 demo
- 非公平发送(负载均衡)
- 背景
- 实现 demo
RabbitMQ 消息分发
概述
RabbitMQ 的队列在有多个消费者订阅时,默认会通过轮询的机制将消息分发给不同的消费者,但是有些消费者消费速度慢,有些消费者消费速度快,就会导致消费速度慢的消费者影响整个的任务的吞吐量下降.
例如,公司有1个正式员工和1个实习生,现在有 10 个任务分配平均给他们(各 5 个),而由于实习生干活比较慢,就会导致整个完成任务的吞吐量下降.
消息分发机制给 “正式工” 多分一些任务,给 “实习生” 少分一些任务.
如何实现消费分发机制(限制每个队列消息数量)
可以在配置文件中配置 prefetchCount(或者使用原生的 channel.basicQos(int prefetchCount)),来限制当前消息通道上(channel)的每一个消费所能保持的最大未确认消息的数量.
例如 prefetchCount 为 10,并且一个 channel 上有两个消费者,那么每个消费者都最多接收 10 条未确认的消息. 此时整个 channel 上未确认消息总数可能达到 20 条.
具体使用:例如配置 prefetch = 5,那么 RabbitMQ 就会为消费者计数. 发送一条消息计数+1,消费一条消息计数-1,当达到了上限5,mq队列 就不会再发送消息,直到消费者确认了某条消息(类似 TCP 中的华滑动窗口).
使用场景
限流
背景
假设,订单系统每秒最多处理 1000 请求,正常情况下,该订单系统可以满足日常使用.
但是在突发的秒杀场景下,请求瞬间增多,每秒 1w qps,这不得把订单系统打成筛子.
问题:mq 在中间的话,不是已经有削峰填谷的作用了么?为什么还要使用 mq 的 prefetch 限流机制?
尽管消息队列可以延缓高峰压力,但消费者的处理能力还是有限的(如果不配置 prefetch,消费者自身从队列中取消息的量是不可控的). 如果消费者一次性取走过多的消息,就可能会导致资源紧张. prefetch 限流就是用来控制每个消费者取消息的数量,确保消费者不会过载.
实现 demo
假设限制未确认消息上限为 5,发送消息数量为 20.
a)配置 prefetch 参数,设置应答方式为手动应答.
spring:application:name: rabbitmqrabbitmq:host: env-baseport: 5672username: rootpassword: 1111listener:simple:acknowledge-mode: manual # 手动确认prefetch: 5
b)配置交换机队列
@Configuration
class MQConfig {@Beanfun transQueue() = Queue(MQConst.TRANS_QUEUE)@Beanfun qosExchange() = DirectExchange(MQConst.QOS_EXCHANGE)@Beanfun qosQueue() = Queue(MQConst.QOS_QUEUE)@Beanfun qosBinding(): Binding = BindingBuilder.bind(qosQueue()).to(qosExchange()).with(MQConst.QOS_BINDING_KEY)}
c)接口(生产者)
@RestController
@RequestMapping("/mq")
class MQApi(val rabbitTemplate: RabbitTemplate,
) {@RequestMapping("/qos")fun qos(): String {for (i in 1..20) {rabbitTemplate.convertAndSend(MQConst.QOS_EXCHANGE, MQConst.QOS_BINDING_KEY, "qos msg $i")}return "ok"}}
d)消费者
@Component
class QosListener {@RabbitListener(queues = [MQConst.QOS_QUEUE])fun handMessage(message: Message,channel: Channel) {val deliverTag = message.messageProperties.deliveryTagtry {println("接收到消息: ${String(message.body, charset("UTF-8"))}, ${message.messageProperties.messageId}")// 这里不主动应答,模拟超长业务// channel.basicAck(deliverTag, false)} catch (e: Exception) {channel.basicNack(deliverTag, false, true)}}}
e)效果如下:
可以观察到,消费者只接收到 5 个消息,但由于没有主动应答,队列 就不会给消费者发送新的消息.


Ps:此时如果直接关闭程序,这 5 个为应答的消息就会重回队列,成为 Ready 状态.
如下可以直接清理掉这些消息:
非公平发送(负载均衡)
背景
假设有两个消费者,mq 默认会按照轮询的策略将消息分发给消费者.
*但有一个中情况就比较尴尬:打个比方 一个是正式工,另一个是实习生,正式工就处理的很快,而实习生就很慢,就会造成整个任务的进度被拖慢. *
因此我们可以通过 负载均衡 的方式,让处理的快的消费者多处理一些,处理慢的消费者少处理一些.
具体的:只需要配置 prefetch,并开启自动应答即可. 这样一来,处理的快的消费者,自动应答的就更快,接收的消息也就更多.
实现 demo
a)配置文件
spring:application:name: rabbitmqrabbitmq:host: env-baseport: 5672username: rootpassword: 1111listener:simple:acknowledge-mode: manual # 手动确认prefetch: 1 # 具体配置为多少,需要根据实际业务以及系统承受能力(压测)
b)生产者
@RequestMapping("/qos")fun qos(): String {for (i in 1..20) {rabbitTemplate.convertAndSend(MQConst.QOS_EXCHANGE, MQConst.QOS_BINDING_KEY, "qos msg $i")}return "ok"}
c)两个消费者
@Component
class QosListener {@RabbitListener(queues = [MQConst.QOS_QUEUE])fun fastHandMessage(message: Message,channel: Channel) {val deliverTag = message.messageProperties.deliveryTagtry {println("接收到消息: ${String(message.body, charset("UTF-8"))}, ${message.messageProperties.messageId}")Thread.sleep(1000)println("正式工: 任务处理完成!")channel.basicAck(deliverTag, false)} catch (e: Exception) {channel.basicNack(deliverTag, false, true)}}@RabbitListener(queues = [MQConst.QOS_QUEUE])fun slowHandMessage(message: Message,channel: Channel) {val deliverTag = message.messageProperties.deliveryTagtry {println("接收到消息: ${String(message.body, charset("UTF-8"))}, ${message.messageProperties.messageId}")Thread.sleep(2000)println("实习生: 任务处理完成!")channel.basicAck(deliverTag, false)} catch (e: Exception) {channel.basicNack(deliverTag, false, true)}}}
d)效果如下:


相关文章:
RabbitMQ高级特性 - 消息分发(限流、负载均衡)
文章目录 RabbitMQ 消息分发概述如何实现消费分发机制(限制每个队列消息数量)使用场景限流背景实现 demo 非公平发送(负载均衡)背景实现 demo RabbitMQ 消息分发 概述 RabbitMQ 的队列在有多个消费者订阅时,默认会通过…...
信号处理——自相关和互相关分析
1.概括 在信号处理中,自相关和互相关是相关分析非常重要的概念,它们能分析一个信号或两个信号在时间维度的相似性,在振动测试分析、雷达测距和声发射探伤得到了广泛的应用。自相关分析的研究对象为一个信号,互相关分析的研究对象…...
如何解决部分设备分辨率不适配
1)如何解决部分设备分辨率不适配 2)Unity中如何实现草的LOD 3)使用了Play Asset Delivery提交版本被Google报错 4)如何计算弧线弹道的落地位置 这是第396篇UWA技术知识分享的推送,精选了UWA社区的热门话题,…...
C#插件 调用存储过程(输出参数类型)
存储过程 CREATE PROCEDURE [dbo].[GetSum]num1 INT,num2 INT,result INT OUTPUT AS BEGINselect result num1 num2 END C#代码 using Kingdee.BOS; using Kingdee.BOS.App.Data; using Kingdee.BOS.Core.Bill.PlugIn; using Kingdee.BOS.Util; using System; using System.…...
代码随想录算法训练营day32 | 509. 斐波那契数 、70. 爬楼梯 、746. 使用最小花费爬楼梯
碎碎念:开始动态规划了!加油! 参考:代码随想录 动态规划理论基础 动态规划常见类型: 动规基础类题目背包问题打家劫舍股票问题子序列问题 解决动态规划问题应该要思考清楚的: 动态规划五部曲࿱…...
【人工智能专栏】Learning Rate Decay 学习率衰减
Learning Rate Decay 学习率衰减 使用格式 optimizer = torch.optim.SGD(model.paraters(), lr=0.1, momentum=0.9, weight_decay=1e-4) scheduler = torch.optim...
浙大版《C语言程序设计(第3版)》题目集
练习4-11 统计素数并求和 本题要求统计给定整数M和N区间内素数的个数并对它们求和。 输入格式: 输入在一行中给出两个正整数M和N(1≤M≤N≤500)。 输出格式: 在一行中顺序输出M和N区间内素数的个数以及它们的和,数字间以空格分隔。 输入…...
【学习笔记】Day 2
一、进度概述 1、inversionnet_train_light 试运行——未成功 2、DL-FWI基础入门培训-1,2,以及作业1的完成——暂未完成作业 二、详情 1、inversionnet_train_light 试运行 在补充完相关依赖后,运行仍有报错 产生原因:这个代码在当…...
Java中的Map(如果想知道Java中有关Map的知识点,那么只看这一篇就足够了!)
前言:在Java编程语言中,集合框架(Collection Framework)提供了一系列用于存储和操作数据的接口和类。其中,Map和Set是两个非常重要的接口,分别用于存储键值对和无重复元素的集合。 ✨✨✨这里是秋刀鱼不做梦…...
裸金属服务器详解
在云计算飞速发展的今天,裸金属服务器(Bare Metal Server, BMS)作为一种兼具传统物理服务器性能和虚拟化服务优势的计算资源,正逐渐成为企业和个人用户的重要选择。今天我们就来了解下关于裸金属服务器的定义、核心特点以及其在各…...
等待唤醒机制两种实现方法-阻塞队列
桌子上有面条-》吃货执行 桌子上没面条-》生产者制造执行 1、消费者等待 消费者先抢到CPU执行权,发现桌子上没有面条,于是变成等待wait状态,并释放CPU执行权,此时的CPU肯定会被厨师抢到,初始开始做面条,…...
数组项相加和 – 如何将 JavaScript 数组中的数字相加
JavaScript 中的数组是一个对象,它允许您在单个变量名称下存储多个值的有序集合,并以多种方式操作这些值。 在本文中,您将学习如何使用几种不同的方法计算给定数组中所有数字的总和。 具体来说,使用以下方法得到数组中所有数字的总…...
C#和S7-1200PLC S7.NET通信
1、一步步建立一个C#项目 一步步建立一个C#项目(连续读取S7-1200PLC数据)_s7协议批量读取-CSDN博客文章浏览阅读1.7k次,点赞2次,收藏4次。这篇博客作为C#的基础系列,和大家分享如何一步步建立一个C#项目完成对S7-1200PLC数据的连续读取。首先创建一个窗体应用。_s7协议批量…...
常用命令git branch
Git Branch 命令总结 列出分支 git branch:显示本地分支,当前分支会被标记。git branch -r:显示远程分支。git branch -a:显示所有本地和远程分支。 创建分支 git branch <branch_name>:创建一个新分支但不自…...
Android 制作系统签名
一、切换目录 cd build/target/product/security二、执行命令 1)将使用.pk8生成platform.priv.pem (.pem即可,文件名可随意修改)openssl pkcs8 -in platform.pk8 -inform DER -outform PEM -out platform.pem -nocrypt2)生成.p12,此时需输入两次密码,并且要记住 -name后所设置…...
C语言第13篇
1.下面程序是计算n个数的平均值,请填空.______ #include<stdio.h> void main( ) { int i,n; float x,avg0.0; scanf("%d",&n); for(i0;i<n;i) { scanf("%f",&x); avgavg______; } avg________; printf("avg%f\n",avg); } A) …...
基于FPGA的数字信号处理(22)--进位保存加法器(Carry Save Adder, CSA)
目录 1、拆解多个数的加法 2、进位保存加法器 3、CSA的优点和缺点 4、CSA电路的实现 文章总目录点这里:《基于FPGA的数字信号处理》专栏的导航与说明 1、拆解多个数的加法 考虑3个4bits数相加,10 4 7 21 的过程是这样的: 其中的红色数…...
idea使用free流程,2024idea、2023idea都可以安装免费使用
1.先到官网下载,这里选择win系统的,点击下图的.exe https://www.jetbrains.com/idea/download/?sectionwindows 2.下载好后基本上就是一直点击“下一步”到直到安装好,安装好后先打开软件后关闭退出 3.下载配配套资料 链接: https://pan.ba…...
设计模式 之 —— 抽象工厂模式
目录 什么是抽象工厂模式? 定义 特点 抽象工厂模式(java代码示例) 首先定义第一个接口 实现第一个接口的类 定义第二个接口 实现第二个接口的类 * 创建抽象工厂类 创建扩展了 AbstractFactory 的工厂类 饮料工厂 食物工厂 * 创建一个…...
计量经济学(十六)--一文读懂和学会医学统计学中的四种检验方法
1. 统计学是什么? 统计学是应用数学的一个分支,主要通过利用概率论建立数学模型,收集所观察系统的数据,进行量化的分析、总结,并进而进行推断和预测,为相关决策提供依据和参考。它被广泛的应用在各门学科之上,从物理和社会科学到人文科学,甚至被用来工商业及政府的情报…...
盘古信息PCB行业解决方案:以全域场景重构,激活智造新未来
一、破局:PCB行业的时代之问 在数字经济蓬勃发展的浪潮中,PCB(印制电路板)作为 “电子产品之母”,其重要性愈发凸显。随着 5G、人工智能等新兴技术的加速渗透,PCB行业面临着前所未有的挑战与机遇。产品迭代…...
Java多线程实现之Callable接口深度解析
Java多线程实现之Callable接口深度解析 一、Callable接口概述1.1 接口定义1.2 与Runnable接口的对比1.3 Future接口与FutureTask类 二、Callable接口的基本使用方法2.1 传统方式实现Callable接口2.2 使用Lambda表达式简化Callable实现2.3 使用FutureTask类执行Callable任务 三、…...
动态 Web 开发技术入门篇
一、HTTP 协议核心 1.1 HTTP 基础 协议全称 :HyperText Transfer Protocol(超文本传输协议) 默认端口 :HTTP 使用 80 端口,HTTPS 使用 443 端口。 请求方法 : GET :用于获取资源,…...
windows系统MySQL安装文档
概览:本文讨论了MySQL的安装、使用过程中涉及的解压、配置、初始化、注册服务、启动、修改密码、登录、退出以及卸载等相关内容,为学习者提供全面的操作指导。关键要点包括: 解压 :下载完成后解压压缩包,得到MySQL 8.…...
vue3 daterange正则踩坑
<el-form-item label"空置时间" prop"vacantTime"> <el-date-picker v-model"form.vacantTime" type"daterange" start-placeholder"开始日期" end-placeholder"结束日期" clearable :editable"fal…...
Tauri2学习笔记
教程地址:https://www.bilibili.com/video/BV1Ca411N7mF?spm_id_from333.788.player.switch&vd_source707ec8983cc32e6e065d5496a7f79ee6 官方指引:https://tauri.app/zh-cn/start/ 目前Tauri2的教程视频不多,我按照Tauri1的教程来学习&…...
多模态学习路线(2)——DL基础系列
目录 前言 一、归一化 1. Layer Normalization (LN) 2. Batch Normalization (BN) 3. Instance Normalization (IN) 4. Group Normalization (GN) 5. Root Mean Square Normalization(RMSNorm) 二、激活函数 1. Sigmoid激活函数(二分类&…...
后端下载限速(redis记录实时并发,bucket4j动态限速)
✅ 使用 Redis 记录 所有用户的实时并发下载数✅ 使用 Bucket4j 实现 全局下载速率限制(动态)✅ 支持 动态调整限速策略✅ 下载接口安全、稳定、可监控 🧩 整体架构概览 模块功能Redis存储全局并发数和带宽令牌桶状态Bucket4j Redis分布式限…...
Redis——Cluster配置
目录 分片 一、分片的本质与核心价值 二、分片实现方案对比 三、分片算法详解 1. 范围分片(顺序分片) 2. 哈希分片 3. 虚拟槽分片(Redis Cluster 方案) 四、Redis Cluster 分片实践要点 五、经典问题解析 C…...
调试快捷键 pycharm vscode
目录 调试快捷键 pycharm vscode 修改快捷键 方法 1:通过菜单打开 方法 2:用快捷键打开 调试快捷键 pycharm Resume Program F9 Step Over F8 两个离的比较近,比较方便,比vscode的好。 vscode Continue F5 改为F9 S…...

