RabbitMQ 高级特性——延迟队列

文章目录
- 前言
- 延迟队列
- 延迟队列的概念
- TTL + 死信队列模拟延迟队列
- 设置队列的 TTL
- 设置消息的 TTL
- 延迟队列插件
- 安装并且启动插件服务
- 使用插件实现延迟功能
前言
前面我们学习了 TTL 和死信队列,当队列中的消息达到了过期时间之后,那么这个消息就会被死信交换机投递到死信队列中,然后由专门的消费者进行消费,这篇文章将为大家介绍 RabbitMQ 的另一高级特性——延迟队列。
延迟队列
延迟队列的概念
延迟队列通常指的是一种队列,其中的消息在发送后不会立即被消费,而是会等待一段时间(即延迟时间)后才变得可消费。
RabbitMQ 本身不直接提供名为“延迟队列”的内置功能,但我们可以利用 RabbitMQ 的一些特性和机制来模拟或实现延迟队列的行为。就比如 TTL + 死信队列方式模拟出延迟队列的功能。
TTL + 死信队列模拟延迟队列

我们的消费者不去消费正常队列中的消息,而是去消费死信队列中的消息,为什么这样能实现延迟对立的功能呢?当生产者生产消息投递给正常交换机之后,交换机会将这个消息根据 routing key 和 binding key 路由到指定的队列中,因为这个正常队列没有消费者可以消费消息,所以到达了了这个队列中的消息,就会存储在这个队列中,因为这个队列设置了 TTL,所以消息到达了过期时间之后就会被投递给死信交换机 DLX,然后死信交换机就会将这个消息投递给 DLQ,然后消费者从这个 DLQ 中去消费消息,这样消费者就会在生产者生产的消息到达队列之后不会立即去消费,而是会等待一段时间,这样就通过 TTL + 死信队列的方式模拟出了延迟队列的功能。
设置队列的 TTL
那么我们看看通过代码如何实现:
public class Constants {public static final String NORMAL_EXCHANGE = "normal.exchange";public static final String NORMAL_QUEUE = "normal.queue";public static final String DL_EXCHANGE = "dl.exchange";public static final String DL_QUEUE = "dl.queue";
}
声明交换机、队列和绑定关系:
@Configuration
public class DLConfig {@Bean("normalExchange")public Exchange normalExchange() {return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();}@Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(Constants.NORMAL_QUEUE).ttl(1000).deadLetterExchange(Constants.DL_EXCHANGE).deadLetterRoutingKey("dl").build();}@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalExchange") Exchange exchange, @Qualifier("normalQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();}@Bean("DLExchange")public Exchange DLExchange() {return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();}@Bean("DLQueue")public Queue DLQueue() {return QueueBuilder.durable(Constants.DL_QUEUE).build();}@Bean("DLBinding")public Binding DLBinding(@Qualifier("DLExchange") Exchange exchange,@Qualifier("DLQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("dl").noargs();}
}
生产者代码:
@RequestMapping("producer")
@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("delay")public String delay() {rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","rabbitmq delay,发送时间:" + new Date());return "消息发送成功";}
}
消费者:
@Component
public class DelayListener {@RabbitListener(queues = Constants.DL_QUEUE)public void listener(String message, Channel channel) {System.out.println("时间:" + new Date() + ",接收到消息:" + message + channel);}
}

可以看到,通过 TTL + 死信队列的方式是可以实现延迟队列的功能的,虽然可能从消息发送的时间到消费者消费到这个消息的时间可能会比预想的时间多一点(中间消息传输花费了一点时间)。
这是一次发送一条消息的情况,如果我们一次发多条信息看看什么效果:
@RequestMapping("delay")
public String delay() throws InterruptedException {rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","rabbitmq delay1,发送时间:" + new Date());Thread.sleep(1000);rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","rabbitmq delay2,发送时间:" + new Date());Thread.sleep(1000);rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","rabbitmq delay3,发送时间:" + new Date());return "消息发送成功";
}

当为队列设置 TTL 的时候,消息会按照进队列的先后顺序进行消费,这是为队列设置 TTL 的情况,为队列设置 TTL 意味着进入该队列中的所有消息都具有过期时间,且过期时间如果没有单独设置消息的过期时间的话,那么该队列中的消息的过期时间都是相同的,那么如果我们想要每个消息的过期时间都不相同的话就需要单独给消息设置过期时间,接下来我们看看给消息单独设置过期时间的情况。
设置消息的 TTL
public static final String NORMAL_QUEUE2 = "normal.queue2";
创建出一个没有设置过期时间的普通队列:
@Bean("normalQueue2")
public Queue normalQueue2() {return QueueBuilder.durable(Constants.NORMAL_QUEUE2).deadLetterExchange(Constants.DL_EXCHANGE).deadLetterRoutingKey("dl").build();
}@Bean("normalBinding2")
public Binding normalBinding2(@Qualifier("normalExchange") Exchange exchange,@Qualifier("normalQueue2") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("normal2").noargs();
}
设置消息的过期时间:
@RequestMapping("delay2")
public String delay2(){rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal2","rabbitmq delay,发送时间" + new Date(),message -> {message.getMessageProperties().setExpiration("10000");return message;});rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal2","rabbitmq delay,发送时间" + new Date(),message -> {message.getMessageProperties().setExpiration("20000");return message;});return "消息发送成功";
}
消费者的代码就不需要改动:

为消息设置 TTL 就可以时间每个消息的过期时间都不相同,但是这里会有一个问题,就是如果先投递的消息的过期时间如果晚于后面投递的消息的过期时间就会出现问题:
@RequestMapping("delay2")
public String delay2(){rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal2","rabbitmq delay,发送时间" + new Date(),message -> {message.getMessageProperties().setExpiration("20000");return message;});rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal2","rabbitmq delay,发送时间" + new Date(),message -> {message.getMessageProperties().setExpiration("10000");return message;});return "消息发送成功";
}

可以发现设置了过期时间为 10s 的消息也是过了 20s 之后才进入到死信队列然后才被消费。
消息过期之后,不一定会被马上丢弃。因为RabbitMQ只会检查队首消息是否过期,如果过期则丢到死信队列。
此时就会造成一个问题,如果第一个消息的延时时间很长,第二个消息的延时时间很短,那第二个消息并不会优先得到执行。因为我们的普通队列并没有消费者消费其中的消息,我们都知道对消息设置过期时间,只有在消息被快要使用的之前,才会判断它是否过期,但是这里由于队列没有消费者,所以消息也就不会被使用到,那么 RabbitMQ 就只会检查队首的消息是否过期,这样就导致了队首的消息没有过期,那么之后的消息也不会被检查到过期投递到死信队列中。
所以在考虑使用TTL+死信队列实现延迟任务队列的时候,需要确认业务上每个任务的延迟时间是一致的,如果遇到不同的任务类型需要不同的延迟的话,需要为每一种不同延迟时间的消息建立单独的消息队列。
延迟队列插件
虽然 RabbitMQ 没有直接实现延迟队列,但是 RabbitMQ 提供了一个延迟的插件来实现延迟的功能。
具体的可以看看官方文档 https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq#delaying-messages
安装并且启动插件服务



也可以点击这里:



我们下载的插件的版本需要和 RabbitMQ 的版本兼容:

版本对应关系:

下载完成插件之后,可以在这里查看如何配置:https://www.rabbitmq.com/docs/installing-plugins

我们将下载完成的文件放在 RabbitMQ 的插件目录下:
我们使用的是 Linux,所以需要将插件放在 /usr/lib/rabbitmq/plugins 或者 /usr/lib/rabbitmq/lib/rabbitmq_server-version/plugins文件下,如果没有这个路径可以自己创建:

下载完成插件并且将其放在指定目录下之后,我们就可以启动插件了:
使用 rabbitmq-plugins list 查看 rabbitmq 的插件:

如果我们之前启动了一个错误版本的插件,可以使用 rabbitmq-plugins disbale 插件名称 来禁用插件:

使用 rabbitmq-plugins enable 插件名称 来启动插件服务:

如何验证插件已经安装并且启动成功,我们在 rabbitmq 的管理页面新建一个交换机的时候,看看交换机的类型是否有 x-delayed-message 这个类型:
我们刷新一下管理页面,然后在新建交换机的时候查看交换机的类型:

这就说明我们的延迟插件启动成功了。
使用插件实现延迟功能
安装并且启动延迟插件之后,我们来看看代码如何实现一个延迟功能的队列:
public static final String DELAY_EXCHANGE = "delay.exchange";
public static final String DELAY_QUEUE = "delay.queue";
声明交换机、对立和绑定关系:
@Configuration
public class DelayConfig {@Bean("delayExchange")public Exchange delayExchange() {return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed().build();}@Bean("delayQueue")public Queue delayQueue() {return QueueBuilder.durable(Constants.DELAY_QUEUE).build();}@Bean("delayBinding")public Binding delayBinding(@Qualifier("delayExchange") Exchange exchange, @Qualifier("delayQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("delay").noargs();}
}
生产者代码:
@RequestMapping("delay3")
public String delay3(){rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE,"delay","rabbitmq delay,发送时间" + new Date(),message -> {message.getMessageProperties().setDelay(20000);return message;});rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE,"delay","rabbitmq delay,发送时间" + new Date(),message -> {message.getMessageProperties().setDelay(10000);return message;});return "消息发送成功";
}
消费者代码:
@RabbitListener(queues = Constants.DELAY_QUEUE)
public void listener2(String message, Channel channel) {System.out.println("时间:" + new Date() + ",接收到消息:" + message + channel);
}
启动项目之后,会创建一个 x-delay-message 类型的交换机:


使用延迟插件的话,就可以使得队列中的消息可以按照延迟时间到达消费者。
相关文章:
RabbitMQ 高级特性——延迟队列
文章目录 前言延迟队列延迟队列的概念TTL 死信队列模拟延迟队列设置队列的 TTL设置消息的 TTL 延迟队列插件安装并且启动插件服务使用插件实现延迟功能 前言 前面我们学习了 TTL 和死信队列,当队列中的消息达到了过期时间之后,那么这个消息就会被死信交…...
EAC(Estimate at Completion)和ETC(Estimate to Complete)
EAC 预计完工成本ETC 预计尚需成本Estimate at CompletionEstimate to Complete完成预估完工时尚需成本估算 EAC ETC ACETC EAC – AC 预测项目总成本,包含了到目前为止实际发生的成本(AC)和预计将发生的成本。如果EAC大于BAC…...
【React】状态管理之Zustand
🌈个人主页: 鑫宝Code 🔥热门专栏: 闲话杂谈| 炫酷HTML | JavaScript基础 💫个人格言: "如无必要,勿增实体" 文章目录 状态管理之Zustand引言1. Zustand 的核心特点1.1 简单直观的 API1.2 无需 Provi…...
Vue3打包自动生成版本JSON文件,添加系统版本检查,实现系统自动更新提示
实现该功能一共有三步。废话不多说,直接上代码!!! 第一步:打包时自动生成版本信息的js文件,versionUpdate.js import fs from fs; import path from path; import { ElMessageBox } from element-plus; i…...
海量数据有限内存系列问题解决方案
1. 排序问题 有限数据充足内存:内存中有十万整数,对所有数据进行排序。 内部排序即可 单节点海量数据有限内存:某台机器有一个文件,文件中包含六十亿整数,一个整数一行,可用内存1G,对所有数据…...
FFmpeg 4.3 音视频-多路H265监控录放C++开发十四,总结编码过程,从摄像头获得数据后,转成AVFrame,然后再次转成AVPacket,
也就是将摄像头采集到的YUV 的数据换成 AVFrame,然后再次转成 AVPacket,那么这AVPakcet数据要怎么办呢?分为三种情况: 一种是将AVPacket存储成h264文件,由于h264编码器在将avframe变成avpacket的时候就是按照h264的格…...
内容占位符:Kinetic Loader HTML+CSS 使用CSS制作三角形原理
内容占位符 前言 随着我们对HTML和CSS3的学习逐渐深入,相信大家都已经掌握了网页制作的基础知识,包括如何使用HTML标记构建网页结构,以及如何运用CSS样式美化页面。为了进一步巩固和熟练这些技能,今天我们一起来完成一个有趣且实…...
麒麟nginx配置
一、配置负载均衡 配置麒麟的yum源 vim /etc/yum.repos.d/kylin_aarch64.repo Copy 删除原来内容,写入如下yum源 [ks10-adv-os] name Kylin Linux Advanced Server 10 - Os baseurl http://update.cs2c.com.cn:8080/NS/V10/V10SP2/os/adv/lic/base/aarch64/ …...
如何在 Ubuntu 上安装 Emby 媒体服务器
Emby 是一个开源的媒体服务器解决方案,它能让你整理、流媒体播放和分享你的个人媒体收藏,包括电影、音乐、电视节目和照片。Emby 帮你集中多媒体内容,让你无论在家还是在外都能轻松访问。它还支持转码,让你能够播放各种格式的内容…...
Mac上详细配置java开发环境和软件(更新中)
文章目录 概要JDK的配置JDK下载安装配置JDK环境变量文件 Idea的安装Mysql安装和配置Navicat Premium16.1安装安装Vscode安装和配置Maven配置本地仓库配置阿里云私服Idea集成Maven 概要 这里使用的是M3型片 14.6版本的Mac 用到的资源放在网盘 链接: https://pan.baidu.com/s/17…...
jmeter常用配置元件介绍总结之定时器
系列文章目录 安装jmeter jmeter常用配置元件介绍总结之定时器 5.定时器5.1.固定定时器5.2.统一随机定时器5.3.Precise Throughput Timer5.4.Constant Throughput Timer5.5.Synchronizing Timer5.6.泊松随机定时器5.7.高斯随机定时器 5.定时器 5.1.固定定时器 固定定时器Cons…...
Spring——提前编译
提前编译:AOT AOT概述 JIT与AOT的区别 JIT和AOT 这个名词是指两种不同的编译方式,这两种编译方式的主要区别在于是否在“运行时”进行编译 (1)JIT, Just-in-time,动态(即时)编译,边运行边编译࿱…...
乐理的学习(音程)
二度,三度,六度,七度的大n度都是直接的音名到音名,如#A到#G的,这样为大n度 而这个基础上向内收,收半音为小n度,在小n度再收,为减n度 在大n度的基础上再向外扩半音,为增…...
【网络】数据链路层协议——以太网,ARP协议
> 作者:დ旧言~ > 座右铭:松树千年终是朽,槿花一日自为荣。 > 目标:了解什么是以太网协议和ARP协议。 > 毒鸡汤:有些事情,总是不明白,所以我不会坚持。早安! > 专栏选自…...
Linux分区、挂载、配额、逻辑卷、RAID、系统综合状态查看
分区与挂载 fdisk fdisk 命令是一个用于磁盘分区管理的命令行工具,可以用来创建、删除、调整分区等操作。常用的 fdisk 命令选项包括: fdisk -l:列出系统中的所有磁盘分区信息。 fdisk /dev/sdX:打开指定磁盘进行分区操作。 n&…...
3D Gaussian Splatting 代码层理解之Part1
2023 年初,来自蔚蓝海岸大学和 马克斯普朗克学会的作者发表了一篇题为“用于实时现场渲染的 3D 高斯泼溅”的论文。该论文提出了实时神经渲染的重大进步,超越了NeRF等以前方法的实用性。高斯泼溅不仅减少了延迟,而且达到或超过了 NeRF 的渲染质量,在神经渲染领域掀起了一场…...
Qt小知识-Q_GLOBAL_STATIC
你还在为创建全局静态对象烦恼嘛,它来了!它来了! qt5提供了两个宏定义Q_GLOBAL_STATIC和Q_GLOBAL_STATIC_WITH_ARGS来实现。可以创建一个全局静态对象,对象在第一次使用时初始化自身,这意味着它不会增加应用程序或库的…...
【SpringBoot】使用过滤器进行XSS防御
在Spring Boot中,我们可以使用注解的方式来进行XSS防御。注解是一种轻量级的防御手段,它可以在方法或字段级别对输入进行校验,从而防止XSS攻击。 而想对全局的请求都进行XSS防御可以使用servlet中的过滤器或者spring mvc中的拦截器ÿ…...
创建vue插件,发布npm
开发步骤:1.创建一个vue项目,2.开发一个组件。 3.注册成插件。 4.vite和package.json配置。5.发布到npm 1.创建一个vue项目 npm create vuelatest 生成了vue项目之后,得到了以下结构。 在src下创建个plugins目录。用于存放开发的…...
【Android Compose原创组件】可拖动滚动条的完美实现
项目背景 我在使用安卓Compose开发自己的【JK管理器】的过程中,很多地方都需要使用滚动条,在Github上也有实现的比较好,但是大多都是基于View(我要的是Compose啊)。 在研究Android 官方示例项目 nowinandroid 中&…...
XML Group端口详解
在XML数据映射过程中,经常需要对数据进行分组聚合操作。例如,当处理包含多个物料明细的XML文件时,可能需要将相同物料号的明细归为一组,或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码,增加了开…...
XCTF-web-easyupload
试了试php,php7,pht,phtml等,都没有用 尝试.user.ini 抓包修改将.user.ini修改为jpg图片 在上传一个123.jpg 用蚁剑连接,得到flag...
python打卡day49
知识点回顾: 通道注意力模块复习空间注意力模块CBAM的定义 作业:尝试对今天的模型检查参数数目,并用tensorboard查看训练过程 import torch import torch.nn as nn# 定义通道注意力 class ChannelAttention(nn.Module):def __init__(self,…...
C# SqlSugar:依赖注入与仓储模式实践
C# SqlSugar:依赖注入与仓储模式实践 在 C# 的应用开发中,数据库操作是必不可少的环节。为了让数据访问层更加简洁、高效且易于维护,许多开发者会选择成熟的 ORM(对象关系映射)框架,SqlSugar 就是其中备受…...
vue3+vite项目中使用.env文件环境变量方法
vue3vite项目中使用.env文件环境变量方法 .env文件作用命名规则常用的配置项示例使用方法注意事项在vite.config.js文件中读取环境变量方法 .env文件作用 .env 文件用于定义环境变量,这些变量可以在项目中通过 import.meta.env 进行访问。Vite 会自动加载这些环境变…...
大学生职业发展与就业创业指导教学评价
这里是引用 作为软工2203/2204班的学生,我们非常感谢您在《大学生职业发展与就业创业指导》课程中的悉心教导。这门课程对我们即将面临实习和就业的工科学生来说至关重要,而您认真负责的教学态度,让课程的每一部分都充满了实用价值。 尤其让我…...
Web 架构之 CDN 加速原理与落地实践
文章目录 一、思维导图二、正文内容(一)CDN 基础概念1. 定义2. 组成部分 (二)CDN 加速原理1. 请求路由2. 内容缓存3. 内容更新 (三)CDN 落地实践1. 选择 CDN 服务商2. 配置 CDN3. 集成到 Web 架构 …...
Python ROS2【机器人中间件框架】 简介
销量过万TEEIS德国护膝夏天用薄款 优惠券冠生园 百花蜂蜜428g 挤压瓶纯蜂蜜巨奇严选 鞋子除臭剂360ml 多芬身体磨砂膏280g健70%-75%酒精消毒棉片湿巾1418cm 80片/袋3袋大包清洁食品用消毒 优惠券AIMORNY52朵红玫瑰永生香皂花同城配送非鲜花七夕情人节生日礼物送女友 热卖妙洁棉…...
Ubuntu系统多网卡多相机IP设置方法
目录 1、硬件情况 2、如何设置网卡和相机IP 2.1 万兆网卡连接交换机,交换机再连相机 2.1.1 网卡设置 2.1.2 相机设置 2.3 万兆网卡直连相机 1、硬件情况 2个网卡n个相机 电脑系统信息,系统版本:Ubuntu22.04.5 LTS;内核版本…...
GraphQL 实战篇:Apollo Client 配置与缓存
GraphQL 实战篇:Apollo Client 配置与缓存 上一篇:GraphQL 入门篇:基础查询语法 依旧和上一篇的笔记一样,主实操,没啥过多的细节讲解,代码具体在: https://github.com/GoldenaArcher/graphql…...
