当前位置: 首页 > article >正文

RabbitMQ核心机制——延迟队列

一、 什么是延迟队列?

    消息发送之后,不想让消费者马上收到消息,而是等待特定时间后消费者才能拿到这条消息进行消费。


二、 如何实现延迟队列

    RabbitMQ并没有直接支持延迟队列这一功能,如果需要实现延迟队列,有两种方法可以实现:

1> TTL + 死信队列:给普通队列或消息设置TTL,但没有消费者监听普通队列,消息过期后通过死信交换机路由到死信队列,死信队列的消费者获取消息,就达到了延迟的效果,如下图:

2> 插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases 通过这个链接下载好插件,安装即可

下载好插件之后(注意插件的版本要与RabbitMQ版本对应),通过下列命令安装插件:

#进入下列目录,这是附加目录,如果没有就自己创建一个
cd /usr/lib/rabbitmq/plugins#查看插件列表
rabbitmq-plugins list#启动插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange#重启服务
service rabbitmq-server restart

如果在管理界面的交换机——>新建交换机看到下图这个交换机,就代表安装好了:

准备工作完成,接下来看如何通过这两种方式实现延迟队列。


三、 基于 TTL + 死信队列 实现

 准备工作:

(1)声明队列、交换机、及绑定关系

    @Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(Constants.NORMAL_QUEUE).ttl(10000).deadLetterExchange(Constants.DL_EXCHANGE).deadLetterRoutingKey("dl").build();}@Bean("normalExchange")public DirectExchange normalExchange() {return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();}@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("normal");}@Bean("dlQueue")public Queue dlQueue() {return QueueBuilder.durable(Constants.DL_QUEUE).build();}@Bean("dlExchange")public DirectExchange dlExchange() {return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();}@Bean("dlBinding")public Binding dlBinding(@Qualifier("dlQueue") Queue queue, @Qualifier("dlExchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("dl");}

(2)生产者代码

    @RequestMapping("/delay")public String delay(){rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","delay test...");System.out.printf("%tc 消息发送成功 \n",new Date());return "消息发送成功";}

(3)消费者代码

@Component
public class DelayListener {@RabbitListener(queues = Constants.DL_QUEUE)public void messageHandler(Message message) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("[delay.queue] %tc 接收到消息: %s, deliveryTag: %d \n",new Date(),new String(message.getBody(),"UTF-8"),deliveryTag);//业务处理}
}

 

3.1 设置队列TTL + 死信队列

上面的代码就是设置 队列的TTL + 死信队列,这里直接测试:

结果预测:由于上面给队列设置的TTL为10s,因此发送消息10s后消息就因该被消费

可以看到,确实达到了延迟效果,消息发送后10消费者才接收到消息


3.2 设置消息TTL + 死性队列(不推荐)

     前面学习死信队列时,我们知道,如果队列前面的消息比后面的消息过期时间长,那么后面的消息必须等待前面的消息被判定为过期才能继续判定后面的消息是否过期,如果使用 设置消息的TTL + 死信队列 来实现延迟队列是否会出现问题?不妨一试

一、修改normal队列声明,修改生产者代码

    @Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(Constants.NORMAL_QUEUE).deadLetterExchange(Constants.DL_EXCHANGE).deadLetterRoutingKey("dl").build();}
    @RequestMapping("/delay")public String delay(){MessagePostProcessor messagePostProcessor1 = message -> {message.getMessageProperties().setExpiration("10000");return message;};MessagePostProcessor messagePostProcessor2 = message -> {message.getMessageProperties().setExpiration("30000");return message;};rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","delay test 30s...",messagePostProcessor2);rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","delay test 10s...",messagePostProcessor1);System.out.printf("%tc 消息发送成功 \n",new Date());return "消息发送成功";}

二、运行程序,测试

预期结果:10s后收到第一条消息,再过20s手收到另一条消息

可以看到,两条消息都在30s后才被消费者接收,显然不符合期望

     可以看到,通过设置 消息的TTL + 死信队列 来实现延迟效果是可能会出现问题的,在实际应用中,推荐使用 队列TTL + 死信队列 或 插件 来实现延迟队列,而不是 消息TTL + 死信队列 来实现。


四、 通过插件实现

    通过插件实现延迟队列非常简单,只需要在声明交换机时通过delayed方法指定这是一个延迟交换机即可。

一、声明队列、交换机及绑定关系

    @Bean("delayQueue")public Queue delayQueue() {return QueueBuilder.durable(Constants.DELAY_QUEUE).build();}@Bean("delayExchange")public DirectExchange delayExchange() {return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed().build();//通过delayed方法声明这是一个延迟交换机}@Bean("delayBinding")public Binding delayBinding(@Qualifier("delayQueue") Queue queue, @Qualifier("delayExchange") DirectExchange directExchange) {return BindingBuilder.bind(queue).to(directExchange).with("delay");}

二、消费者代码

@Component
public class DelayListener {@RabbitListener(queues = Constants.DELAY_QUEUE)public void messageHandler(Message message) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("[delay.queue] %tc 接收到消息: %s, deliveryTag: %d \n",new Date(),new String(message.getBody(),"UTF-8"),deliveryTag);//业务处理}
}

三、生产者代码

    @RequestMapping("/delay")public String delay(){MessagePostProcessor messagePostProcessor1 = message -> {message.getMessageProperties().setDelayLong(10000l);return message;};MessagePostProcessor messagePostProcessor2 = message -> {message.getMessageProperties().setDelayLong(30000l);return message;};rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE,"delay","delay test 30s...",messagePostProcessor2);rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE,"delay","delay test 10s...",messagePostProcessor1);System.out.printf("%tc 消息发送成功 \n",new Date());return "消息发送成功";}

四、运行程序,测试

预期结果,10s后收到第一条消息,再过20s收到第二条消息

符合预期结果

相关文章:

RabbitMQ核心机制——延迟队列

一、 什么是延迟队列? 消息发送之后,不想让消费者马上收到消息,而是等待特定时间后消费者才能拿到这条消息进行消费。 二、 如何实现延迟队列 RabbitMQ并没有直接支持延迟队列这一功能,如果需要实现延迟队列,有两种方法…...

华为云Flexus+DeepSeek征文|Flexus云服务器Dify-LLM资源部署极致体验Agent

前引:重磅来袭!本次以DeepSeek-V3/R1商用大模型和Dify-LLM应用平台一键部署为核心,专为新手打造“开箱即用”的AI开发体验。无论你是想快速搭建企业级AI应用,还是探索大模型落地的无限可能,只需跟随小编实现三步走&…...

IP 网段

以下是关于 IP 网段 的详细解析,涵盖基本概念、表示方法、划分规则及实际应用场景: 一、网段核心概念 1. 什么是网段? 网段指一个逻辑划分的 IP 地址范围,属于同一子网的设备可以直接通信(无需经过路由器&#xff09…...

芋道框架 - 接口设置匿名访问

芋道框架 - 接口设置匿名访问 在芋道(yudao)框架中,有些接口需要开放给外部访问,例如文件上传、验证码获取等。要实现接口的匿名访问,主要有两个步骤: 1. 在接口上添加 PermitAll 在需要匿名访问的接口方…...

熔盐核裂变反应堆:第四代核能技术的重要突破

在核能技术的演进历程中,第四代核能系统被寄予厚望,代表着更高的效率、更强的安全性和更广泛的资源适应能力。近年来,熔盐反应堆技术作为其中最具挑战性和变革性的方案之一,逐步走向工程化与实用化阶段。其中,液态燃料…...

鸿蒙OSUniApp 实现的日期选择器与时间选择器组件#三方框架 #Uniapp

UniApp 实现的日期选择器与时间选择器组件 在移动应用开发中,日期选择器和时间选择器是表单、预约、日程、打卡等场景中不可或缺的基础组件。一个好用的日期/时间选择器不仅能提升用户体验,还能有效减少输入错误。随着 HarmonyOS(鸿蒙&#…...

Linux云计算训练营笔记day16(Linux周期性计划任务、Python)

Linux云计算训练营笔记day16(Linux周期性计划任务、Python) 目录 Linux云计算训练营笔记day16(Linux周期性计划任务、Python)1.Linux 周期性计划任务2.Python 1.Linux 周期性计划任务 周期性计划任务 用途: 按照设置的时间间隔,为…...

对比Redis与向量数据库(如Milvus)在AI中的应用

对比Redis与向量数据库(如Milvus)在AI中的应用 在AI架构中,缓存系统的设计直接影响响应速度、资源成本以及推理路径是否高效。而面对不同的AI业务诉求,选用什么类型的缓存系统、如何搭配,往往是系统架构设计中必须深入…...

【Elasticsearch入门到落地】13、DSL查询详解:分类、语法与实战场景

接上篇《12、索引库删除判断以及文档增删改查》 上一篇我们讲解了如何判断索引库是否存在并删除它,以及如何对索引库中的文档进行增删改查操作。本篇我们进入ElasticSearch的DSL语法的详解。 Elasticsearch(ES)作为强大的分布式搜索引擎&…...

[欠拟合过拟合]机器学习-part10

7.欠拟合过拟合 7.1欠拟合 欠拟合是指模型在训练数据上表现不佳,同时在新的未见过的数据上也表现不佳。这通常发生在模型过于简单,或者是训练的次数不够,无法捕捉数据中的复杂模式时。欠拟合模型的表现特征如下: 训练误差较高。 …...

【windwos】文本编辑器Notepad++ 替代品Notepad--

一、N和N--对比介绍 曾经备受推崇的「Notepad」曾是Windows上的经典代码编辑器。然而,作者的一些政治言论已经让它被广大中国用户抛弃。 一个名为「Notepad--」的新编辑器,也是开源免费,功能和实用性也在尽可能接近。与此同时,「N…...

Linux基本指令篇 —— clear指令

clear 是 Linux 和 Unix 系统中用于清空终端屏幕的常用命令。它的作用是移除当前终端窗口中的所有可见内容,提供一个干净的界面,类似于“刷新”终端。以下是关于 clear 的详细解析: 目录 1. 基本用法 2. 实现原理 3. 常见场景 场景 1&…...

Anaconda 的基础教程,从入门到精通

以下是Anaconda的基础教程,从入门到精通: 一、Anaconda简介 Anaconda是一个用于科学计算的Python/R发行版,集成了1000+常用数据科学包,提供环境管理功能,解决包依赖冲突问题。适合数据科学、机器学习、深度学习等领域。 二、安装与配置 1. 下载与安装 官网下载:Anaco…...

阿里云DDoS防护:万一被“黑”了,如何更换IP地址?

阿里云DDoS防护:万一被“黑”了,如何更换IP地址“绝地反击”? 各位站长、运维老铁、业务负责人们,大家好!在如今这个网络世界,最让人提心吊胆的,莫过于遭遇**DDoS攻击(分布式拒绝服…...

机器学习笔记【Week2】

一、多变量线性回归(Multivariate Linear Regression) 为什么需要多变量? 现实问题中,一个目标可能受多个因素影响,比如预测房价时: x 1 x_1 x1​:面积 x 2 x_2 x2​:卧室数量 x 3…...

饭卡管理系统(接口文档)

一、管理员端口 1、学生列表查询 id(隐藏)姓名性别头像用户名卡号账号余额按钮1小马男http……小马2577293893#C12320.0删除 编辑2小飞男http……小飞#C123 删除 编辑 基本信息 请求路径:/information 请求方式:GET 接口描述&…...

【小白量化智能体】应用2:编写通达信绘图指标及生成Python绘图程序

【小白量化智能体】应用2:编写通达信绘图指标及生成Python绘图程序 【小白量化智能体】是指能够自主或半自主地通过与环境的交互来实现目标或任务的计算实体。智能体技术是一个百科全书,又融合了人工智能、计算机科学、心理学和经济学等多个领域的知识&a…...

C++23 std::start_lifetime_as:用于隐式生存期类型的显式生存期管理函数 (P2590R2)

文章目录 一、C23简介二、std::start_lifetime_as 基本概念函数原型模板参数参数返回值注意事项示例代码 三、std::start_lifetime_as 的作用1. 避免复杂的拷贝操作2. 保持对象表示不变3. 简化代码逻辑 四、std::start_lifetime_as 的使用场景1. 内存池管理2. 类型双关&#xf…...

总结:进程和线程的联系和区别

前言:通过学习javaEE初阶中的多线程章节后加上我自己的理解,想来总结一下线程和进程的联系和区别. 一来是能更好地复习知识,二来是为了记录我的学习路程,相信未来的我回首不会忘记这段难忘的经历. 1.进程 先来谈谈进程:进程是操作系统中资源分配的基本单位. 1)进程的执行方…...

Innodb底层原理与Mysql日志机制深入刨析

MySQL的内部组件结构 大体来说,MySQL 可以分为 Server 层和存储引擎层两部分。 Server层 主要包括连接器、查询缓存、分析器、优化器、执行器等,涵盖 MySQL 的大多数核心服务功能,以及所有的内置函数(如日期、时间、数学和加密函数等),所有跨存储引擎的功能都在这一层实…...

JMeter-SSE响应数据自动化

结构图 背景: 需要写一个JMeter脚本来进行自动化测试,主要是通过接口调用一些东西,同时要对响应的数据进行处理,包括不限于错误信息的输出。 1.SSE(摘录) SSE(Server-Sent Events)是一种基于HTTP协议、允许…...

泛型(1)

1.泛型的理解和好处 使用传统方法的问题分析 (1)不能对加入到集合ArrayList中的数据类型进行约束 (2)遍历的时候,需要进行类型装换,如果集合中的数量较大,对效率有影响. 使用泛型的好处 (1)使用泛型添加 (检查元素的类型,提高了安全性.) (2)减少了类型转换的次数,提高效率…...

esp8266 点灯科技远程控制继电器

手机端安装点灯科技app 打开 Arduino IDE 编辑&#xff1a; #define BLINKER_WIFI #include <Blinker.h> char auth[] "点灯科技 key"; char ssid[] "wifi ID"; char pswd[] "WiFi key"; // 新建组件对象 BlinkerButton Button1(&q…...

MMA: Multi-Modal Adapter for Vision-Language Models论文解读

abstract 预训练视觉语言模型&#xff08;VLMs&#xff09;已成为各种下游任务中迁移学习的优秀基础模型。然而&#xff0c;针对少样本泛化任务对VLMs进行微调时&#xff0c;面临着“判别性—泛化性”困境&#xff0c;即需要保留通用知识&#xff0c;同时对任务特定知识进行微…...

Java中Map集合的遍历方式详解

Java中Map集合的遍历方式详解 一、Map 集合概述二、Map 遍历的基础方式1. **使用 KeySet 迭代器遍历**2. **使用 KeySet 的 for-each 循环** 三、EntrySet 遍历&#xff1a;高效的键值对访问1. **EntrySet 迭代器遍历**2. **EntrySet 的 for-each 循环** 四、Java 8 引入的 Lam…...

使用 Cannonballs 进行实用导体粗糙度建模

在 GB/s 制度下&#xff0c;导体损耗的精确建模是高速串行链路设计成功的前提。未能对粗糙度效果进行建模可能会毁了您的一天。例如&#xff0c;图 1 显示了与测量数据相比&#xff0c;无粗糙度的 40 英寸印刷电路板 &#xff08;PCB&#xff09; 走线的模拟总损耗。总损耗是电…...

Spring Boot 注解 @ConditionalOnMissingBean是什么

一句话总结&#xff1a; ConditionalOnMissingBean 是 Spring Boot 提供的一个 条件注解&#xff08;Conditional Annotation&#xff09;&#xff0c;意思是&#xff1a; 只有当 Spring 容器中 不存在 某个 Bean 时&#xff0c;当前的 Bean 或配置才会被加载。 这是一种典型的…...

国外常用支付流程简易说明(无代码)

一、Xendit Xendit 是一家总部位于印度尼西亚的支付解决方案提供商&#xff0c;业务覆盖东南亚多个国家。它允许企业接受信用卡以及多种本地支付方式&#xff1a; 1、如果需要&#xff0c;创建一个 Xendit 帐户并登录Xendit 仪表板。 2、在页面左上角查看您的账户模式。使用…...

(先发再改)测试流程标准文档

Revision Record 修订记录 序号 修改日期 修改章节 修改描述 拟制 审批 修订版本 1 20250520 初稿 v1.0 目录 1. 文档概述... 7 1.1 文档目的... 7 1.1.1 标准化质量保障流程... 7 1.1.2.…...

亚马逊SP-API开发实战:商品数据获取与操作

一、API接入准备 开发者注册&#xff1a; 登录亚马逊开发者中心申请SP-API权限 完成MWS迁移&#xff08;如适用&#xff09; 认证配置&#xff1a; # OAuth2.0认证示例 import requests auth_url "https://api.amazon.com/auth/o2/token" params { "…...