RabbitMQ高级特性 - 消息分发(限流、负载均衡)
文章目录
- RabbitMQ 消息分发
- 概述
- 如何实现消费分发机制(限制每个队列消息数量)
- 使用场景
- 限流
- 背景
- 实现 demo
- 非公平发送(负载均衡)
- 背景
- 实现 demo
RabbitMQ 消息分发
概述
RabbitMQ 的队列在有多个消费者订阅时,默认会通过轮询的机制将消息分发给不同的消费者,但是有些消费者消费速度慢,有些消费者消费速度快,就会导致消费速度慢的消费者影响整个的任务的吞吐量下降.
例如,公司有1个正式员工和1个实习生,现在有 10 个任务分配平均给他们(各 5 个),而由于实习生干活比较慢,就会导致整个完成任务的吞吐量下降.
消息分发机制给 “正式工” 多分一些任务,给 “实习生” 少分一些任务.
如何实现消费分发机制(限制每个队列消息数量)
可以在配置文件中配置 prefetchCount(或者使用原生的 channel.basicQos(int prefetchCount)),来限制当前消息通道上(channel)的每一个消费所能保持的最大未确认消息的数量.
例如 prefetchCount 为 10,并且一个 channel 上有两个消费者,那么每个消费者都最多接收 10 条未确认的消息. 此时整个 channel 上未确认消息总数可能达到 20 条.
具体使用:例如配置 prefetch = 5,那么 RabbitMQ 就会为消费者计数. 发送一条消息计数+1,消费一条消息计数-1,当达到了上限5,mq队列 就不会再发送消息,直到消费者确认了某条消息(类似 TCP 中的华滑动窗口).
使用场景
限流
背景
假设,订单系统每秒最多处理 1000 请求,正常情况下,该订单系统可以满足日常使用.
但是在突发的秒杀场景下,请求瞬间增多,每秒 1w qps,这不得把订单系统打成筛子.
问题:mq 在中间的话,不是已经有削峰填谷的作用了么?为什么还要使用 mq 的 prefetch 限流机制?
尽管消息队列可以延缓高峰压力,但消费者的处理能力还是有限的(如果不配置 prefetch,消费者自身从队列中取消息的量是不可控的). 如果消费者一次性取走过多的消息,就可能会导致资源紧张. prefetch 限流就是用来控制每个消费者取消息的数量,确保消费者不会过载.
实现 demo
假设限制未确认消息上限为 5,发送消息数量为 20.
a)配置 prefetch 参数,设置应答方式为手动应答.
spring:application:name: rabbitmqrabbitmq:host: env-baseport: 5672username: rootpassword: 1111listener:simple:acknowledge-mode: manual # 手动确认prefetch: 5
b)配置交换机队列
@Configuration
class MQConfig {@Beanfun transQueue() = Queue(MQConst.TRANS_QUEUE)@Beanfun qosExchange() = DirectExchange(MQConst.QOS_EXCHANGE)@Beanfun qosQueue() = Queue(MQConst.QOS_QUEUE)@Beanfun qosBinding(): Binding = BindingBuilder.bind(qosQueue()).to(qosExchange()).with(MQConst.QOS_BINDING_KEY)}
c)接口(生产者)
@RestController
@RequestMapping("/mq")
class MQApi(val rabbitTemplate: RabbitTemplate,
) {@RequestMapping("/qos")fun qos(): String {for (i in 1..20) {rabbitTemplate.convertAndSend(MQConst.QOS_EXCHANGE, MQConst.QOS_BINDING_KEY, "qos msg $i")}return "ok"}}
d)消费者
@Component
class QosListener {@RabbitListener(queues = [MQConst.QOS_QUEUE])fun handMessage(message: Message,channel: Channel) {val deliverTag = message.messageProperties.deliveryTagtry {println("接收到消息: ${String(message.body, charset("UTF-8"))}, ${message.messageProperties.messageId}")// 这里不主动应答,模拟超长业务// channel.basicAck(deliverTag, false)} catch (e: Exception) {channel.basicNack(deliverTag, false, true)}}}
e)效果如下:
可以观察到,消费者只接收到 5 个消息,但由于没有主动应答,队列 就不会给消费者发送新的消息.


Ps:此时如果直接关闭程序,这 5 个为应答的消息就会重回队列,成为 Ready 状态.
如下可以直接清理掉这些消息:
非公平发送(负载均衡)
背景
假设有两个消费者,mq 默认会按照轮询的策略将消息分发给消费者.
*但有一个中情况就比较尴尬:打个比方 一个是正式工,另一个是实习生,正式工就处理的很快,而实习生就很慢,就会造成整个任务的进度被拖慢. *
因此我们可以通过 负载均衡 的方式,让处理的快的消费者多处理一些,处理慢的消费者少处理一些.
具体的:只需要配置 prefetch,并开启自动应答即可. 这样一来,处理的快的消费者,自动应答的就更快,接收的消息也就更多.
实现 demo
a)配置文件
spring:application:name: rabbitmqrabbitmq:host: env-baseport: 5672username: rootpassword: 1111listener:simple:acknowledge-mode: manual # 手动确认prefetch: 1 # 具体配置为多少,需要根据实际业务以及系统承受能力(压测)
b)生产者
@RequestMapping("/qos")fun qos(): String {for (i in 1..20) {rabbitTemplate.convertAndSend(MQConst.QOS_EXCHANGE, MQConst.QOS_BINDING_KEY, "qos msg $i")}return "ok"}
c)两个消费者
@Component
class QosListener {@RabbitListener(queues = [MQConst.QOS_QUEUE])fun fastHandMessage(message: Message,channel: Channel) {val deliverTag = message.messageProperties.deliveryTagtry {println("接收到消息: ${String(message.body, charset("UTF-8"))}, ${message.messageProperties.messageId}")Thread.sleep(1000)println("正式工: 任务处理完成!")channel.basicAck(deliverTag, false)} catch (e: Exception) {channel.basicNack(deliverTag, false, true)}}@RabbitListener(queues = [MQConst.QOS_QUEUE])fun slowHandMessage(message: Message,channel: Channel) {val deliverTag = message.messageProperties.deliveryTagtry {println("接收到消息: ${String(message.body, charset("UTF-8"))}, ${message.messageProperties.messageId}")Thread.sleep(2000)println("实习生: 任务处理完成!")channel.basicAck(deliverTag, false)} catch (e: Exception) {channel.basicNack(deliverTag, false, true)}}}
d)效果如下:


相关文章:
RabbitMQ高级特性 - 消息分发(限流、负载均衡)
文章目录 RabbitMQ 消息分发概述如何实现消费分发机制(限制每个队列消息数量)使用场景限流背景实现 demo 非公平发送(负载均衡)背景实现 demo RabbitMQ 消息分发 概述 RabbitMQ 的队列在有多个消费者订阅时,默认会通过…...
信号处理——自相关和互相关分析
1.概括 在信号处理中,自相关和互相关是相关分析非常重要的概念,它们能分析一个信号或两个信号在时间维度的相似性,在振动测试分析、雷达测距和声发射探伤得到了广泛的应用。自相关分析的研究对象为一个信号,互相关分析的研究对象…...
如何解决部分设备分辨率不适配
1)如何解决部分设备分辨率不适配 2)Unity中如何实现草的LOD 3)使用了Play Asset Delivery提交版本被Google报错 4)如何计算弧线弹道的落地位置 这是第396篇UWA技术知识分享的推送,精选了UWA社区的热门话题,…...
C#插件 调用存储过程(输出参数类型)
存储过程 CREATE PROCEDURE [dbo].[GetSum]num1 INT,num2 INT,result INT OUTPUT AS BEGINselect result num1 num2 END C#代码 using Kingdee.BOS; using Kingdee.BOS.App.Data; using Kingdee.BOS.Core.Bill.PlugIn; using Kingdee.BOS.Util; using System; using System.…...
代码随想录算法训练营day32 | 509. 斐波那契数 、70. 爬楼梯 、746. 使用最小花费爬楼梯
碎碎念:开始动态规划了!加油! 参考:代码随想录 动态规划理论基础 动态规划常见类型: 动规基础类题目背包问题打家劫舍股票问题子序列问题 解决动态规划问题应该要思考清楚的: 动态规划五部曲࿱…...
【人工智能专栏】Learning Rate Decay 学习率衰减
Learning Rate Decay 学习率衰减 使用格式 optimizer = torch.optim.SGD(model.paraters(), lr=0.1, momentum=0.9, weight_decay=1e-4) scheduler = torch.optim...
浙大版《C语言程序设计(第3版)》题目集
练习4-11 统计素数并求和 本题要求统计给定整数M和N区间内素数的个数并对它们求和。 输入格式: 输入在一行中给出两个正整数M和N(1≤M≤N≤500)。 输出格式: 在一行中顺序输出M和N区间内素数的个数以及它们的和,数字间以空格分隔。 输入…...
【学习笔记】Day 2
一、进度概述 1、inversionnet_train_light 试运行——未成功 2、DL-FWI基础入门培训-1,2,以及作业1的完成——暂未完成作业 二、详情 1、inversionnet_train_light 试运行 在补充完相关依赖后,运行仍有报错 产生原因:这个代码在当…...
Java中的Map(如果想知道Java中有关Map的知识点,那么只看这一篇就足够了!)
前言:在Java编程语言中,集合框架(Collection Framework)提供了一系列用于存储和操作数据的接口和类。其中,Map和Set是两个非常重要的接口,分别用于存储键值对和无重复元素的集合。 ✨✨✨这里是秋刀鱼不做梦…...
裸金属服务器详解
在云计算飞速发展的今天,裸金属服务器(Bare Metal Server, BMS)作为一种兼具传统物理服务器性能和虚拟化服务优势的计算资源,正逐渐成为企业和个人用户的重要选择。今天我们就来了解下关于裸金属服务器的定义、核心特点以及其在各…...
等待唤醒机制两种实现方法-阻塞队列
桌子上有面条-》吃货执行 桌子上没面条-》生产者制造执行 1、消费者等待 消费者先抢到CPU执行权,发现桌子上没有面条,于是变成等待wait状态,并释放CPU执行权,此时的CPU肯定会被厨师抢到,初始开始做面条,…...
数组项相加和 – 如何将 JavaScript 数组中的数字相加
JavaScript 中的数组是一个对象,它允许您在单个变量名称下存储多个值的有序集合,并以多种方式操作这些值。 在本文中,您将学习如何使用几种不同的方法计算给定数组中所有数字的总和。 具体来说,使用以下方法得到数组中所有数字的总…...
C#和S7-1200PLC S7.NET通信
1、一步步建立一个C#项目 一步步建立一个C#项目(连续读取S7-1200PLC数据)_s7协议批量读取-CSDN博客文章浏览阅读1.7k次,点赞2次,收藏4次。这篇博客作为C#的基础系列,和大家分享如何一步步建立一个C#项目完成对S7-1200PLC数据的连续读取。首先创建一个窗体应用。_s7协议批量…...
常用命令git branch
Git Branch 命令总结 列出分支 git branch:显示本地分支,当前分支会被标记。git branch -r:显示远程分支。git branch -a:显示所有本地和远程分支。 创建分支 git branch <branch_name>:创建一个新分支但不自…...
Android 制作系统签名
一、切换目录 cd build/target/product/security二、执行命令 1)将使用.pk8生成platform.priv.pem (.pem即可,文件名可随意修改)openssl pkcs8 -in platform.pk8 -inform DER -outform PEM -out platform.pem -nocrypt2)生成.p12,此时需输入两次密码,并且要记住 -name后所设置…...
C语言第13篇
1.下面程序是计算n个数的平均值,请填空.______ #include<stdio.h> void main( ) { int i,n; float x,avg0.0; scanf("%d",&n); for(i0;i<n;i) { scanf("%f",&x); avgavg______; } avg________; printf("avg%f\n",avg); } A) …...
基于FPGA的数字信号处理(22)--进位保存加法器(Carry Save Adder, CSA)
目录 1、拆解多个数的加法 2、进位保存加法器 3、CSA的优点和缺点 4、CSA电路的实现 文章总目录点这里:《基于FPGA的数字信号处理》专栏的导航与说明 1、拆解多个数的加法 考虑3个4bits数相加,10 4 7 21 的过程是这样的: 其中的红色数…...
idea使用free流程,2024idea、2023idea都可以安装免费使用
1.先到官网下载,这里选择win系统的,点击下图的.exe https://www.jetbrains.com/idea/download/?sectionwindows 2.下载好后基本上就是一直点击“下一步”到直到安装好,安装好后先打开软件后关闭退出 3.下载配配套资料 链接: https://pan.ba…...
设计模式 之 —— 抽象工厂模式
目录 什么是抽象工厂模式? 定义 特点 抽象工厂模式(java代码示例) 首先定义第一个接口 实现第一个接口的类 定义第二个接口 实现第二个接口的类 * 创建抽象工厂类 创建扩展了 AbstractFactory 的工厂类 饮料工厂 食物工厂 * 创建一个…...
计量经济学(十六)--一文读懂和学会医学统计学中的四种检验方法
1. 统计学是什么? 统计学是应用数学的一个分支,主要通过利用概率论建立数学模型,收集所观察系统的数据,进行量化的分析、总结,并进而进行推断和预测,为相关决策提供依据和参考。它被广泛的应用在各门学科之上,从物理和社会科学到人文科学,甚至被用来工商业及政府的情报…...
wordpress后台更新后 前端没变化的解决方法
使用siteground主机的wordpress网站,会出现更新了网站内容和修改了php模板文件、js文件、css文件、图片文件后,网站没有变化的情况。 不熟悉siteground主机的新手,遇到这个问题,就很抓狂,明明是哪都没操作错误&#x…...
Leetcode 3576. Transform Array to All Equal Elements
Leetcode 3576. Transform Array to All Equal Elements 1. 解题思路2. 代码实现 题目链接:3576. Transform Array to All Equal Elements 1. 解题思路 这一题思路上就是分别考察一下是否能将其转化为全1或者全-1数组即可。 至于每一种情况是否可以达到…...
通过Wrangler CLI在worker中创建数据库和表
官方使用文档:Getting started Cloudflare D1 docs 创建数据库 在命令行中执行完成之后,会在本地和远程创建数据库: npx wranglerlatest d1 create prod-d1-tutorial 在cf中就可以看到数据库: 现在,您的Cloudfla…...
基于uniapp+WebSocket实现聊天对话、消息监听、消息推送、聊天室等功能,多端兼容
基于 UniApp + WebSocket实现多端兼容的实时通讯系统,涵盖WebSocket连接建立、消息收发机制、多端兼容性配置、消息实时监听等功能,适配微信小程序、H5、Android、iOS等终端 目录 技术选型分析WebSocket协议优势UniApp跨平台特性WebSocket 基础实现连接管理消息收发连接…...
屋顶变身“发电站” ,中天合创屋面分布式光伏发电项目顺利并网!
5月28日,中天合创屋面分布式光伏发电项目顺利并网发电,该项目位于内蒙古自治区鄂尔多斯市乌审旗,项目利用中天合创聚乙烯、聚丙烯仓库屋面作为场地建设光伏电站,总装机容量为9.96MWp。 项目投运后,每年可节约标煤3670…...
sqlserver 根据指定字符 解析拼接字符串
DECLARE LotNo NVARCHAR(50)A,B,C DECLARE xml XML ( SELECT <x> REPLACE(LotNo, ,, </x><x>) </x> ) DECLARE ErrorCode NVARCHAR(50) -- 提取 XML 中的值 SELECT value x.value(., VARCHAR(MAX))…...
现代密码学 | 椭圆曲线密码学—附py代码
Elliptic Curve Cryptography 椭圆曲线密码学(ECC)是一种基于有限域上椭圆曲线数学特性的公钥加密技术。其核心原理涉及椭圆曲线的代数性质、离散对数问题以及有限域上的运算。 椭圆曲线密码学是多种数字签名算法的基础,例如椭圆曲线数字签…...
解决:Android studio 编译后报错\app\src\main\cpp\CMakeLists.txt‘ to exist
现象: android studio报错: [CXX1409] D:\GitLab\xxxxx\app.cxx\Debug\3f3w4y1i\arm64-v8a\android_gradle_build.json : expected buildFiles file ‘D:\GitLab\xxxxx\app\src\main\cpp\CMakeLists.txt’ to exist 解决: 不要动CMakeLists.…...
关于uniapp展示PDF的解决方案
在 UniApp 的 H5 环境中使用 pdf-vue3 组件可以实现完整的 PDF 预览功能。以下是详细实现步骤和注意事项: 一、安装依赖 安装 pdf-vue3 和 PDF.js 核心库: npm install pdf-vue3 pdfjs-dist二、基本使用示例 <template><view class"con…...
goreplay
1.github地址 https://github.com/buger/goreplay 2.简单介绍 GoReplay 是一个开源的网络监控工具,可以记录用户的实时流量并将其用于镜像、负载测试、监控和详细分析。 3.出现背景 随着应用程序的增长,测试它所需的工作量也会呈指数级增长。GoRepl…...

