当前位置: 首页 > news >正文

(四)RabbitMQ高级特性(消费端限流、利用限流实现不公平分发、消息存活时间、优先级队列

Lison <dreamlison@163.com>, v1.0.0, 2023.06.23

RabbitMQ高级特性(消费端限流、利用限流实现不公平分发、消息存活时间、优先级队列

文章目录

  • RabbitMQ高级特性(消费端限流、利用限流实现不公平分发、消息存活时间、优先级队列
    • 消费端限流
    • 利用限流实现不公平分发
    • 消息存活时间
    • 优先级队列

消费端限流

在这里插入图片描述

之前我们讲过MQ可以对请求进行“削峰填谷”,即通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。

1、生产者批量发送消息

@Test
public void testSendBatch() {// 发送十条消息for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", "send message..."+i);}
}

2、消费端配置限流机制

spring:rabbitmq:host: 127.0.0.1port: 5672username: adminpassword: 1233456virtual-host: /listener:simple:# 限流机制必须开启手动签收acknowledge-mode: manual# 消费端最多拉取5条消息消费,签收后不满5条才会继续拉取消息。prefetch: 5

3、消费者监听队列

@Component
public class QosConsumer{@RabbitListener(queues = "my_queue")public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException {// 1.获取消息System.out.println(new String(message.getBody()));// 2.模拟业务处理Thread.sleep(3000);// 3.签收消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);}
}

利用限流实现不公平分发

在RabbitMQ中,多个消费者监听同一条队列,则队列默认采用的轮询分发。但是在某种场景下这种策略并不是很好,例如消费者1处 理任务的速度非常快,而其他消费者处理速度却很慢。此时如果采用公平分发,则消费者1有很大一部分时间处于空闲状态。此时可以 采用不公平分发,即谁处理的快,谁处理的消息多

1、生产者批量发送消息

@Test
public void testSendBatch() {// 发送十条消息for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", "send message..."+i);}
}

端配置不公平分发

spring:rabbitmq:host: 127.0.0.1port: 5672username: adminpassword: 1233456virtual-host: /listener:simple:# 限流机制必须开启手动签收acknowledge-mode: manual# 消费端最多拉取1条消息消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发prefetch: 1

编写两个消费者

@Component
public class UnfairConsumer {// 消费者1@RabbitListener(queues = "my_queue")public void listenMessage1(Message message, Channel channel) throws Exception{//1.获取消息System.out.println("消费者1:"+new String(message.getBody(),"UTF-8"));//2. 处理业务逻辑Thread.sleep(500); // 消费者1处理快//3. 手动签收channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}// 消费者2@RabbitListener(queues = "my_queue")public void listenMessage2(Message message, Channel channel) throws Exception{//1.获取消息System.out.println("消费者2:"+new String(message.getBody(),"UTF-8"));//2. 处理业务逻辑Thread.sleep(3000);// 消费者2处理慢//3. 手动签收channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}
}

消息存活时间

RabbitMQ可以设置消息的存活时间(Time To Live,简称TTL), 当消息到达存活时间后还没有被消费,会被移出队列。RabbitMQ 可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间。

设置队列所有消息存活时间

1、在创建队列时设置其存活时间:

@Configuration
public class RabbitConfig2 {private final String EXCHANGE_NAME="my_topic_exchange2";private final String QUEUE_NAME="my_queue2";// 1.创建交换机@Bean("bootExchange2")public Exchange getExchange2(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}// 2.创建队列@Bean("bootQueue2")public Queue getMessageQueue2(){return QueueBuilder.durable(QUEUE_NAME).ttl(10000) //队列的每条消息存活10s.build();}// 3.将队列绑定到交换机@Beanpublic Binding bindMessageQueue2(@Qualifier("bootExchange2") Exchange exchange,@Qualifier("bootQueue2") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();}
}

2、生产者批量生产消息,测试存活时间

@Test
public void testSendBatch2() throws InterruptedException {// 发送十条消息for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend("my_topic_exchange2", "my_routing", "send message..."+i);}
}

设置单条消息存活时间

@Test
public void testSendMessage() {//设置消息属性MessageProperties messageProperties = new MessageProperties();//设置存活时间messageProperties.setExpiration("10000");// 创建消息对象Message message = new Message("send message...".getBytes(StandardCharsets.UTF_8), messageProperties);// 发送消息rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", message);
}

注意:

1 如果设置了单条消息的存活时间,也设置了队列的存活时间,以时间短的为准。

2 消息过期后,并不会马上移除消息,只有消息消费到队列顶端时,才会移除该消息。

@Test
public void testSendMessage2() {
for (int i = 0; i < 10; i++) {
if (i == 5) {
// 1.创建消息属性
MessageProperties messageProperties = new MessageProperties();
// 2.设置存活时间
messageProperties.setExpiration(“10000”);
// 3.创建消息对象
Message message = new Message((“send message…” +i).getBytes(),messageProperties);
// 4.发送消息
rabbitTemplate.convertAndSend(“my_topic_exchange”, “my_routing”, message);
} else {
rabbitTemplate.convertAndSend(“my_topic_exchange”, “my_routing”, “sendmessage…” + i);
}
}
}
在以上案例中,i=5的消息才有过期时间,10s后消息并没有 马上被移除,但该消息已经不会被消费了,当它到达队列顶 端时会被移除。

优先级队列

假设在电商系统中有一个订单催付的场景,即客户在一段时间内未付款会给用户推送一条短信提醒,但是系统中分为大型商家和小型商家。比如像苹果,小米这样大商家一年能给我们创造很大的利润,所以在订单量大时,他们的订单必须得到优先处理,此时就需要为不同的消息设置不同的优先级,此时我们要使用优先级队列。

1、创建队列和交换机

@Configuration
public class RabbitConfig3 {private final String EXCHANGE_NAME="priority_exchange";private final String QUEUE_NAME="priority_queue";// 1.创建交换机@Bean(EXCHANGE_NAME)public Exchange priorityExchange(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}// 2.创建队列@Bean(QUEUE_NAME)public Queue priorityQueue(){return QueueBuilder.durable(QUEUE_NAME)//设置队列的最大优先级,最大可以设置到255,官网推荐不要超过10,,如果设置太高比较浪费资源.maxPriority(10).build();}// 3.将队列绑定到交换机@Beanpublic Binding bindPriority(@Qualifier(EXCHANGE_NAME) Exchange exchange, @Qualifier(QUEUE_NAME) Queue queue){return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();}
}

2、编写生产者

@Test
public void testPriority() {for (int i = 0; i < 10; i++) {if (i == 5) {// i为5时消息的优先级较高MessageProperties messageProperties = new MessageProperties();messageProperties.setPriority(9);Message message = new Message(("send message..." +i).getBytes(StandardCharsets.UTF_8), messageProperties);rabbitTemplate.convertAndSend("priority_exchange", "my_routing", message);} else {rabbitTemplate.convertAndSend("priority_exchange", "my_routing", "send message..." + i);}}
}

3、编写消费者

@Component
public class PriorityConsumer {@RabbitListener(queues = "priority_queue")public void listenMessage(Message message, Channel channel) throws Exception{//获取消息System.out.println(new String(message.getBody()));//手动签收channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}
}

相关文章:

(四)RabbitMQ高级特性(消费端限流、利用限流实现不公平分发、消息存活时间、优先级队列

Lison <dreamlison163.com>, v1.0.0, 2023.06.23 RabbitMQ高级特性&#xff08;消费端限流、利用限流实现不公平分发、消息存活时间、优先级队列 文章目录 RabbitMQ高级特性&#xff08;消费端限流、利用限流实现不公平分发、消息存活时间、优先级队列消费端限流利用限流…...

Vue如何配置eslint

eslint官网: eslint.bootcss.com eslicate如何配置 1、选择新的配置&#xff1a; 2、选择三个必选项 3、再选择Css预处理器 4、之后选择处理器 5、选择是提交的时候就进行保存模式 6、放到独立的配置文件上去 7、最后一句是将自己的数据存为预设 8、配合console不要出现的规则…...

Elasticsearch查询文档

GET查询索引单个文档 GET /索引/_doc/ID GET /ffbf/_doc/123返回结果如下,查到了有数据"found" : true表示 {"_index" : "ffbf","_type" : "_doc","_id" : "123","_version" : 2...

面向对象编程:多态性的理论与实践

文章目录 1. 修饰词和访问权限2. 多态的概念3. 多态的使用现象4. 多态的问题与解决5. 多态的意义 在面向对象编程中&#xff0c;多态是一个重要的概念&#xff0c;它允许不同的对象以不同的方式响应相同的消息。本文将深入探讨多态的概念及其应用&#xff0c;以及在Java中如何实…...

linux:filezilla root密码登陆

问题&#xff1a; 如题 参考&#xff1a; 亚马逊服务器FileZilla登录失败解决办法_亚马逊云 ssh链接秘钥认证不了 ubuntu拒绝root用户ssh远程登录解决办法 总结&#xff1a; vi /etc/ssh/sshd_config&#xff0c;修改配置&#xff1a; PermitRootLogin yes PasswordAuthenticat…...

在nginx上部署nuxt项目

先安装Node.js 我安的18.17.0。 安装完成后&#xff0c;可以使用cmd&#xff0c;winr然cmd进入&#xff0c;测试是否安装成功。安装在哪个盘都可以测试。 测试 输入node -v 和 npm -v&#xff0c;&#xff08;中间有空格&#xff09;出现下图版本提示就是完成了NodeJS的安装…...

嵌入式linux通用spi驱动之spidev使用总结

Linux内核集成了spidev驱动&#xff0c;提供了SPI设备的用户空间API。支持用于半双工通信的read和write访问接口以及用于全双工通信和I/O配置的ioctl接口。使用时&#xff0c;只需将SPI从设备的compatible属性值添加到spidev区动的spidev dt ids[]数组中&#xff0c;即可将该SP…...

【Nodejs】Puppeteer\爬虫实践

puppeteer 文档:puppeteer.js中文文档|puppeteerjs中文网|puppeteer爬虫教程 Puppeteer本身依赖6.4以上的Node&#xff0c;但是为了异步超级好用的async/await&#xff0c;推荐使用7.6版本以上的Node。另外headless Chrome本身对服务器依赖的库的版本要求比较高&#xff0c;c…...

Windows Active Directory密码同步

大多数 IT 环境中&#xff0c;员工需要记住其默认 Windows Active Directory &#xff08;AD&#xff09; 帐户以外的帐户的单独凭据&#xff0c;最重要的是&#xff0c;每个密码还受不同的密码策略和到期日期的约束&#xff0c;为不同的帐户使用单独的密码会增加用户忘记密码和…...

安科瑞能源物联网以能源供应、能源管理、设备管理、能耗分析的能源流向为主线-安科瑞黄安南

摘要&#xff1a;随着科学技术的发展&#xff0c;我国的物联网技术有了很大进展。为了提升电力抄表服务的稳定性&#xff0c;保障电力抄表数据的可靠性&#xff0c;本文提出并实现了基于物联网的智能电力抄表服务平台&#xff0c;结合云计算、大数据等技术&#xff0c;提供电力…...

FPGA设计时序分析一、时序路径

目录 一、前言 二、时序路径 2.1 时序路径构成 2.2 时序路径分类 2.3 数据捕获 2.4 Fast corner/Slow corner 2.5 Vivado时序报告 三、参考资料 一、前言 时序路径字面容易简单地理解为时钟路径&#xff0c;事实时钟存在的意义是为了数据的处理、传输&#xff0c;因此严…...

spring复习:(52)注解方式下,ConfigurationClassPostProcessor是怎么被添加到容器的?

进入AnnotationConfigApplicationContext的构造方法&#xff1a; 进入AnnotatedBeanDefinitionReader的构造方法&#xff1a; 进入this(registry, getOrCreateEnvironment(registry));代码如下&#xff1a; 进入AnnotationConfigUtils.registerAnnotationConfigProcessors方…...

全国大学生数据统计与分析竞赛2021年【本科组】-B题:用户消费行为价值分析

目录 摘 要 1 任务背景与重述 1.1 任务背景 1.2 任务重述 2 任务分析 3 数据假设 4 任务求解 4.1 任务一&#xff1a;数据预处理 4.1.1 数据清洗 4.1.2 数据集成 4.1.3 数据变换 4.2 任务二&#xff1a;对用户城市分布情况与分布情况可视化分析 4.2.1 城市分布情况可视化分析 4…...

力扣1667. 修复表中的名字

表&#xff1a; Users ------------------------- | Column Name | Type | ------------------------- | user_id | int | | name | varchar | ------------------------- 在 SQL 中&#xff0c;user_id 是该表的主键。 该表包含用户的 ID 和名字。…...

【设计模式】详解观察者模式

文章目录 1、简介2、观察者模式简单实现抽象主题&#xff08;Subject&#xff09;具体主题&#xff08;ConcreteSubject&#xff09;抽象观察者&#xff08;Observer&#xff09;具体观察者&#xff08;ConcrereObserver&#xff09;测试&#xff1a; 观察者设计模式优缺点观察…...

用html+javascript打造公文一键排版系统8:附件及标题排版

最近工作有点忙&#xff0c;所 以没能及时完善公文一键排版系统&#xff0c;现在只好熬夜更新一下。 有时公文有包括附件&#xff0c;招照公文排版规范&#xff1a; 附件应当另面编排&#xff0c;并在版记之前&#xff0c;与公文正文一起装订。“附件”二字及附件顺序号用3号黑…...

微服务体系<1>

我们的微服务架构 我们的微服务架构和单体架构的区别 什么是微服务架构 微服务就是吧我们传统的单体服务分成 订单模块 库存模块 账户模块单体模块 是本地调用 从订单模块 调用到库存模块 再到账户模块 这三个模块都是调用的同一个数据库 这就是我们的单体架构微服务 就是…...

M5ATOMS3基础02传感器MPU6886

M5ATOMS3基础01按键 简洁版本 MPU6886是一款6轴IMU单元&#xff0c;具有3轴重力加速度计和3轴陀螺仪。它采用16位ADC&#xff0c;内置可编程数字滤波器和片上温度传感器&#xff0c;并通过I2C接口&#xff08;地址为0x68&#xff09;与上位机通信。MPU6886支持低功耗模式&#…...

vue 快速自定义分页el-pagination

vue 快速自定义分页el-pagination template <div style"text-align: center"><el-paginationbackground:current-page"pageObj.currentPage":page-size"pageObj.page":page-sizes"pageObj.pageSize"layout"total,prev,…...

0-虚拟机补充知识

虚拟机克隆 如果想要构建服务器集群&#xff0c;没有必要一台一台的去进行安装&#xff0c;只要通过克隆就可以。 快速获得多台服务器主要有两种方式&#xff0c;分别为&#xff1a;直接拷贝操作和vmware的克隆操作 直接拷贝 将之前安装虚拟机的所有文件进行拷贝&#xff0…...

手游刚开服就被攻击怎么办?如何防御DDoS?

开服初期是手游最脆弱的阶段&#xff0c;极易成为DDoS攻击的目标。一旦遭遇攻击&#xff0c;可能导致服务器瘫痪、玩家流失&#xff0c;甚至造成巨大经济损失。本文为开发者提供一套简洁有效的应急与防御方案&#xff0c;帮助快速应对并构建长期防护体系。 一、遭遇攻击的紧急应…...

大话软工笔记—需求分析概述

需求分析&#xff0c;就是要对需求调研收集到的资料信息逐个地进行拆分、研究&#xff0c;从大量的不确定“需求”中确定出哪些需求最终要转换为确定的“功能需求”。 需求分析的作用非常重要&#xff0c;后续设计的依据主要来自于需求分析的成果&#xff0c;包括: 项目的目的…...

Docker 运行 Kafka 带 SASL 认证教程

Docker 运行 Kafka 带 SASL 认证教程 Docker 运行 Kafka 带 SASL 认证教程一、说明二、环境准备三、编写 Docker Compose 和 jaas文件docker-compose.yml代码说明&#xff1a;server_jaas.conf 四、启动服务五、验证服务六、连接kafka服务七、总结 Docker 运行 Kafka 带 SASL 认…...

如何在看板中有效管理突发紧急任务

在看板中有效管理突发紧急任务需要&#xff1a;设立专门的紧急任务通道、重新调整任务优先级、保持适度的WIP&#xff08;Work-in-Progress&#xff09;弹性、优化任务处理流程、提高团队应对突发情况的敏捷性。其中&#xff0c;设立专门的紧急任务通道尤为重要&#xff0c;这能…...

AI编程--插件对比分析:CodeRider、GitHub Copilot及其他

AI编程插件对比分析&#xff1a;CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展&#xff0c;AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者&#xff0c;分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...

高防服务器能够抵御哪些网络攻击呢?

高防服务器作为一种有着高度防御能力的服务器&#xff0c;可以帮助网站应对分布式拒绝服务攻击&#xff0c;有效识别和清理一些恶意的网络流量&#xff0c;为用户提供安全且稳定的网络环境&#xff0c;那么&#xff0c;高防服务器一般都可以抵御哪些网络攻击呢&#xff1f;下面…...

html-<abbr> 缩写或首字母缩略词

定义与作用 <abbr> 标签用于表示缩写或首字母缩略词&#xff0c;它可以帮助用户更好地理解缩写的含义&#xff0c;尤其是对于那些不熟悉该缩写的用户。 title 属性的内容提供了缩写的详细说明。当用户将鼠标悬停在缩写上时&#xff0c;会显示一个提示框。 示例&#x…...

探索Selenium:自动化测试的神奇钥匙

目录 一、Selenium 是什么1.1 定义与概念1.2 发展历程1.3 功能概述 二、Selenium 工作原理剖析2.1 架构组成2.2 工作流程2.3 通信机制 三、Selenium 的优势3.1 跨浏览器与平台支持3.2 丰富的语言支持3.3 强大的社区支持 四、Selenium 的应用场景4.1 Web 应用自动化测试4.2 数据…...

c++第七天 继承与派生2

这一篇文章主要内容是 派生类构造函数与析构函数 在派生类中重写基类成员 以及多继承 第一部分&#xff1a;派生类构造函数与析构函数 当创建一个派生类对象时&#xff0c;基类成员是如何初始化的&#xff1f; 1.当派生类对象创建的时候&#xff0c;基类成员的初始化顺序 …...

spring Security对RBAC及其ABAC的支持使用

RBAC (基于角色的访问控制) RBAC (Role-Based Access Control) 是 Spring Security 中最常用的权限模型&#xff0c;它将权限分配给角色&#xff0c;再将角色分配给用户。 RBAC 核心实现 1. 数据库设计 users roles permissions ------- ------…...