RabbitMQ高级特性 - 消息分发(限流、负载均衡)
文章目录
- RabbitMQ 消息分发
- 概述
- 如何实现消费分发机制(限制每个队列消息数量)
- 使用场景
- 限流
- 背景
- 实现 demo
- 非公平发送(负载均衡)
- 背景
- 实现 demo
RabbitMQ 消息分发
概述
RabbitMQ 的队列在有多个消费者订阅时,默认会通过轮询
的机制将消息分发给不同的消费者,但是有些消费者消费速度慢,有些消费者消费速度快,就会导致消费速度慢的消费者影响整个的任务的吞吐量下降
.
例如,公司有1个正式员工和1个实习生,现在有 10 个任务分配平均给他们(各 5 个),而由于实习生干活比较慢,就会导致整个完成任务的吞吐量下降.
消息分发机制给 “正式工” 多分一些任务,给 “实习生” 少分一些任务.
如何实现消费分发机制(限制每个队列消息数量)
可以在配置文件中配置 prefetchCount
(或者使用原生的 channel.basicQos(int prefetchCount)
),来限制当前消息通道上(channel)的每一个消费所能保持的最大未确认消息的数量.
例如 prefetchCount 为 10,并且一个 channel 上有两个消费者,那么每个消费者都最多接收 10 条未确认的消息. 此时整个 channel 上未确认消息总数可能达到 20 条.
具体使用:例如配置 prefetch = 5
,那么 RabbitMQ 就会为消费者计数. 发送一条消息计数+1,消费一条消息计数-1,当达到了上限5,mq队列 就不会再发送消息,直到消费者确认了某条消息(类似 TCP 中的华滑动窗口).
使用场景
限流
背景
假设,订单系统每秒最多处理 1000 请求,正常情况下,该订单系统可以满足日常使用.
但是在突发的秒杀场景下,请求瞬间增多,每秒 1w qps,这不得把订单系统打成筛子.
问题:mq 在中间的话,不是已经有削峰填谷的作用了么?为什么还要使用 mq 的 prefetch 限流机制?
尽管消息队列可以延缓高峰压力,但消费者的处理能力还是有限的(如果不配置 prefetch,消费者自身从队列中取消息的量是不可控的)
. 如果消费者一次性取走过多的消息,就可能会导致资源紧张. prefetch 限流就是用来控制每个消费者取消息的数量,确保消费者不会过载.
实现 demo
假设限制未确认消息上限为 5,发送消息数量为 20.
a)配置 prefetch 参数,设置应答方式为手动应答.
spring:application:name: rabbitmqrabbitmq:host: env-baseport: 5672username: rootpassword: 1111listener:simple:acknowledge-mode: manual # 手动确认prefetch: 5
b)配置交换机队列
@Configuration
class MQConfig {@Beanfun transQueue() = Queue(MQConst.TRANS_QUEUE)@Beanfun qosExchange() = DirectExchange(MQConst.QOS_EXCHANGE)@Beanfun qosQueue() = Queue(MQConst.QOS_QUEUE)@Beanfun qosBinding(): Binding = BindingBuilder.bind(qosQueue()).to(qosExchange()).with(MQConst.QOS_BINDING_KEY)}
c)接口(生产者)
@RestController
@RequestMapping("/mq")
class MQApi(val rabbitTemplate: RabbitTemplate,
) {@RequestMapping("/qos")fun qos(): String {for (i in 1..20) {rabbitTemplate.convertAndSend(MQConst.QOS_EXCHANGE, MQConst.QOS_BINDING_KEY, "qos msg $i")}return "ok"}}
d)消费者
@Component
class QosListener {@RabbitListener(queues = [MQConst.QOS_QUEUE])fun handMessage(message: Message,channel: Channel) {val deliverTag = message.messageProperties.deliveryTagtry {println("接收到消息: ${String(message.body, charset("UTF-8"))}, ${message.messageProperties.messageId}")// 这里不主动应答,模拟超长业务// channel.basicAck(deliverTag, false)} catch (e: Exception) {channel.basicNack(deliverTag, false, true)}}}
e)效果如下:
可以观察到,消费者只接收到 5 个消息,但由于没有主动应答,队列 就不会给消费者发送新的消息.
Ps:此时如果直接关闭程序,这 5 个为应答的消息就会重回队列,成为 Ready 状态.
如下可以直接清理掉这些消息:
非公平发送(负载均衡)
背景
假设有两个消费者,mq 默认会按照轮询的策略将消息分发给消费者.
*但有一个中情况就比较尴尬:打个比方 一个是正式工,另一个是实习生,正式工就处理的很快,而实习生就很慢,就会造成整个任务的进度被拖慢. *
因此我们可以通过 负载均衡 的方式,让处理的快的消费者多处理一些,处理慢的消费者少处理一些.
具体的:只需要配置 prefetch,并开启自动应答即可.
这样一来,处理的快的消费者,自动应答的就更快,接收的消息也就更多.
实现 demo
a)配置文件
spring:application:name: rabbitmqrabbitmq:host: env-baseport: 5672username: rootpassword: 1111listener:simple:acknowledge-mode: manual # 手动确认prefetch: 1 # 具体配置为多少,需要根据实际业务以及系统承受能力(压测)
b)生产者
@RequestMapping("/qos")fun qos(): String {for (i in 1..20) {rabbitTemplate.convertAndSend(MQConst.QOS_EXCHANGE, MQConst.QOS_BINDING_KEY, "qos msg $i")}return "ok"}
c)两个消费者
@Component
class QosListener {@RabbitListener(queues = [MQConst.QOS_QUEUE])fun fastHandMessage(message: Message,channel: Channel) {val deliverTag = message.messageProperties.deliveryTagtry {println("接收到消息: ${String(message.body, charset("UTF-8"))}, ${message.messageProperties.messageId}")Thread.sleep(1000)println("正式工: 任务处理完成!")channel.basicAck(deliverTag, false)} catch (e: Exception) {channel.basicNack(deliverTag, false, true)}}@RabbitListener(queues = [MQConst.QOS_QUEUE])fun slowHandMessage(message: Message,channel: Channel) {val deliverTag = message.messageProperties.deliveryTagtry {println("接收到消息: ${String(message.body, charset("UTF-8"))}, ${message.messageProperties.messageId}")Thread.sleep(2000)println("实习生: 任务处理完成!")channel.basicAck(deliverTag, false)} catch (e: Exception) {channel.basicNack(deliverTag, false, true)}}}
d)效果如下:
相关文章:

RabbitMQ高级特性 - 消息分发(限流、负载均衡)
文章目录 RabbitMQ 消息分发概述如何实现消费分发机制(限制每个队列消息数量)使用场景限流背景实现 demo 非公平发送(负载均衡)背景实现 demo RabbitMQ 消息分发 概述 RabbitMQ 的队列在有多个消费者订阅时,默认会通过…...

信号处理——自相关和互相关分析
1.概括 在信号处理中,自相关和互相关是相关分析非常重要的概念,它们能分析一个信号或两个信号在时间维度的相似性,在振动测试分析、雷达测距和声发射探伤得到了广泛的应用。自相关分析的研究对象为一个信号,互相关分析的研究对象…...

如何解决部分设备分辨率不适配
1)如何解决部分设备分辨率不适配 2)Unity中如何实现草的LOD 3)使用了Play Asset Delivery提交版本被Google报错 4)如何计算弧线弹道的落地位置 这是第396篇UWA技术知识分享的推送,精选了UWA社区的热门话题,…...

C#插件 调用存储过程(输出参数类型)
存储过程 CREATE PROCEDURE [dbo].[GetSum]num1 INT,num2 INT,result INT OUTPUT AS BEGINselect result num1 num2 END C#代码 using Kingdee.BOS; using Kingdee.BOS.App.Data; using Kingdee.BOS.Core.Bill.PlugIn; using Kingdee.BOS.Util; using System; using System.…...

代码随想录算法训练营day32 | 509. 斐波那契数 、70. 爬楼梯 、746. 使用最小花费爬楼梯
碎碎念:开始动态规划了!加油! 参考:代码随想录 动态规划理论基础 动态规划常见类型: 动规基础类题目背包问题打家劫舍股票问题子序列问题 解决动态规划问题应该要思考清楚的: 动态规划五部曲࿱…...
【人工智能专栏】Learning Rate Decay 学习率衰减
Learning Rate Decay 学习率衰减 使用格式 optimizer = torch.optim.SGD(model.paraters(), lr=0.1, momentum=0.9, weight_decay=1e-4) scheduler = torch.optim...
浙大版《C语言程序设计(第3版)》题目集
练习4-11 统计素数并求和 本题要求统计给定整数M和N区间内素数的个数并对它们求和。 输入格式: 输入在一行中给出两个正整数M和N(1≤M≤N≤500)。 输出格式: 在一行中顺序输出M和N区间内素数的个数以及它们的和,数字间以空格分隔。 输入…...

【学习笔记】Day 2
一、进度概述 1、inversionnet_train_light 试运行——未成功 2、DL-FWI基础入门培训-1,2,以及作业1的完成——暂未完成作业 二、详情 1、inversionnet_train_light 试运行 在补充完相关依赖后,运行仍有报错 产生原因:这个代码在当…...

Java中的Map(如果想知道Java中有关Map的知识点,那么只看这一篇就足够了!)
前言:在Java编程语言中,集合框架(Collection Framework)提供了一系列用于存储和操作数据的接口和类。其中,Map和Set是两个非常重要的接口,分别用于存储键值对和无重复元素的集合。 ✨✨✨这里是秋刀鱼不做梦…...
裸金属服务器详解
在云计算飞速发展的今天,裸金属服务器(Bare Metal Server, BMS)作为一种兼具传统物理服务器性能和虚拟化服务优势的计算资源,正逐渐成为企业和个人用户的重要选择。今天我们就来了解下关于裸金属服务器的定义、核心特点以及其在各…...

等待唤醒机制两种实现方法-阻塞队列
桌子上有面条-》吃货执行 桌子上没面条-》生产者制造执行 1、消费者等待 消费者先抢到CPU执行权,发现桌子上没有面条,于是变成等待wait状态,并释放CPU执行权,此时的CPU肯定会被厨师抢到,初始开始做面条,…...
数组项相加和 – 如何将 JavaScript 数组中的数字相加
JavaScript 中的数组是一个对象,它允许您在单个变量名称下存储多个值的有序集合,并以多种方式操作这些值。 在本文中,您将学习如何使用几种不同的方法计算给定数组中所有数字的总和。 具体来说,使用以下方法得到数组中所有数字的总…...
C#和S7-1200PLC S7.NET通信
1、一步步建立一个C#项目 一步步建立一个C#项目(连续读取S7-1200PLC数据)_s7协议批量读取-CSDN博客文章浏览阅读1.7k次,点赞2次,收藏4次。这篇博客作为C#的基础系列,和大家分享如何一步步建立一个C#项目完成对S7-1200PLC数据的连续读取。首先创建一个窗体应用。_s7协议批量…...
常用命令git branch
Git Branch 命令总结 列出分支 git branch:显示本地分支,当前分支会被标记。git branch -r:显示远程分支。git branch -a:显示所有本地和远程分支。 创建分支 git branch <branch_name>:创建一个新分支但不自…...
Android 制作系统签名
一、切换目录 cd build/target/product/security二、执行命令 1)将使用.pk8生成platform.priv.pem (.pem即可,文件名可随意修改)openssl pkcs8 -in platform.pk8 -inform DER -outform PEM -out platform.pem -nocrypt2)生成.p12,此时需输入两次密码,并且要记住 -name后所设置…...

C语言第13篇
1.下面程序是计算n个数的平均值,请填空.______ #include<stdio.h> void main( ) { int i,n; float x,avg0.0; scanf("%d",&n); for(i0;i<n;i) { scanf("%f",&x); avgavg______; } avg________; printf("avg%f\n",avg); } A) …...

基于FPGA的数字信号处理(22)--进位保存加法器(Carry Save Adder, CSA)
目录 1、拆解多个数的加法 2、进位保存加法器 3、CSA的优点和缺点 4、CSA电路的实现 文章总目录点这里:《基于FPGA的数字信号处理》专栏的导航与说明 1、拆解多个数的加法 考虑3个4bits数相加,10 4 7 21 的过程是这样的: 其中的红色数…...

idea使用free流程,2024idea、2023idea都可以安装免费使用
1.先到官网下载,这里选择win系统的,点击下图的.exe https://www.jetbrains.com/idea/download/?sectionwindows 2.下载好后基本上就是一直点击“下一步”到直到安装好,安装好后先打开软件后关闭退出 3.下载配配套资料 链接: https://pan.ba…...

设计模式 之 —— 抽象工厂模式
目录 什么是抽象工厂模式? 定义 特点 抽象工厂模式(java代码示例) 首先定义第一个接口 实现第一个接口的类 定义第二个接口 实现第二个接口的类 * 创建抽象工厂类 创建扩展了 AbstractFactory 的工厂类 饮料工厂 食物工厂 * 创建一个…...

计量经济学(十六)--一文读懂和学会医学统计学中的四种检验方法
1. 统计学是什么? 统计学是应用数学的一个分支,主要通过利用概率论建立数学模型,收集所观察系统的数据,进行量化的分析、总结,并进而进行推断和预测,为相关决策提供依据和参考。它被广泛的应用在各门学科之上,从物理和社会科学到人文科学,甚至被用来工商业及政府的情报…...

业务系统对接大模型的基础方案:架构设计与关键步骤
业务系统对接大模型:架构设计与关键步骤 在当今数字化转型的浪潮中,大语言模型(LLM)已成为企业提升业务效率和创新能力的关键技术之一。将大模型集成到业务系统中,不仅可以优化用户体验,还能为业务决策提供…...

《通信之道——从微积分到 5G》读书总结
第1章 绪 论 1.1 这是一本什么样的书 通信技术,说到底就是数学。 那些最基础、最本质的部分。 1.2 什么是通信 通信 发送方 接收方 承载信息的信号 解调出其中承载的信息 信息在发送方那里被加工成信号(调制) 把信息从信号中抽取出来&am…...

srs linux
下载编译运行 git clone https:///ossrs/srs.git ./configure --h265on make 编译完成后即可启动SRS # 启动 ./objs/srs -c conf/srs.conf # 查看日志 tail -n 30 -f ./objs/srs.log 开放端口 默认RTMP接收推流端口是1935,SRS管理页面端口是8080,可…...

从零实现STL哈希容器:unordered_map/unordered_set封装详解
本篇文章是对C学习的STL哈希容器自主实现部分的学习分享 希望也能为你带来些帮助~ 那咱们废话不多说,直接开始吧! 一、源码结构分析 1. SGISTL30实现剖析 // hash_set核心结构 template <class Value, class HashFcn, ...> class hash_set {ty…...
Angular微前端架构:Module Federation + ngx-build-plus (Webpack)
以下是一个完整的 Angular 微前端示例,其中使用的是 Module Federation 和 npx-build-plus 实现了主应用(Shell)与子应用(Remote)的集成。 🛠️ 项目结构 angular-mf/ ├── shell-app/ # 主应用&…...

算法:模拟
1.替换所有的问号 1576. 替换所有的问号 - 力扣(LeetCode) 遍历字符串:通过外层循环逐一检查每个字符。遇到 ? 时处理: 内层循环遍历小写字母(a 到 z)。对每个字母检查是否满足: 与…...

云原生安全实战:API网关Kong的鉴权与限流详解
🔥「炎码工坊」技术弹药已装填! 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 一、基础概念 1. API网关(API Gateway) API网关是微服务架构中的核心组件,负责统一管理所有API的流量入口。它像一座…...
Leetcode33( 搜索旋转排序数组)
题目表述 整数数组 nums 按升序排列,数组中的值 互不相同 。 在传递给函数之前,nums 在预先未知的某个下标 k(0 < k < nums.length)上进行了 旋转,使数组变为 [nums[k], nums[k1], …, nums[n-1], nums[0], nu…...

Unity VR/MR开发-VR开发与传统3D开发的差异
视频讲解链接:【XR马斯维】VR/MR开发与传统3D开发的差异【UnityVR/MR开发教程--入门】_哔哩哔哩_bilibili...

GraphRAG优化新思路-开源的ROGRAG框架
目前的如微软开源的GraphRAG的工作流程都较为复杂,难以孤立地评估各个组件的贡献,传统的检索方法在处理复杂推理任务时可能不够有效,特别是在需要理解实体间关系或多跳知识的情况下。先说结论,看完后感觉这个框架性能上不会比Grap…...