RabbitMQ 高级特性——延迟队列
文章目录
- 前言
- 延迟队列
- 延迟队列的概念
- TTL + 死信队列模拟延迟队列
- 设置队列的 TTL
- 设置消息的 TTL
- 延迟队列插件
- 安装并且启动插件服务
- 使用插件实现延迟功能
前言
前面我们学习了 TTL 和死信队列,当队列中的消息达到了过期时间之后,那么这个消息就会被死信交换机投递到死信队列中,然后由专门的消费者进行消费,这篇文章将为大家介绍 RabbitMQ 的另一高级特性——延迟队列。
延迟队列
延迟队列的概念
延迟队列通常指的是一种队列,其中的消息在发送后不会立即被消费,而是会等待一段时间(即延迟时间)后才变得可消费。
RabbitMQ 本身不直接提供名为“延迟队列”的内置功能,但我们可以利用 RabbitMQ 的一些特性和机制来模拟或实现延迟队列的行为。就比如 TTL + 死信队列方式模拟出延迟队列的功能。
TTL + 死信队列模拟延迟队列
我们的消费者不去消费正常队列中的消息,而是去消费死信队列中的消息,为什么这样能实现延迟对立的功能呢?当生产者生产消息投递给正常交换机之后,交换机会将这个消息根据 routing key 和 binding key 路由到指定的队列中,因为这个正常队列没有消费者可以消费消息,所以到达了了这个队列中的消息,就会存储在这个队列中,因为这个队列设置了 TTL,所以消息到达了过期时间之后就会被投递给死信交换机 DLX,然后死信交换机就会将这个消息投递给 DLQ,然后消费者从这个 DLQ 中去消费消息,这样消费者就会在生产者生产的消息到达队列之后不会立即去消费,而是会等待一段时间,这样就通过 TTL + 死信队列的方式模拟出了延迟队列的功能。
设置队列的 TTL
那么我们看看通过代码如何实现:
public class Constants {public static final String NORMAL_EXCHANGE = "normal.exchange";public static final String NORMAL_QUEUE = "normal.queue";public static final String DL_EXCHANGE = "dl.exchange";public static final String DL_QUEUE = "dl.queue";
}
声明交换机、队列和绑定关系:
@Configuration
public class DLConfig {@Bean("normalExchange")public Exchange normalExchange() {return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();}@Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(Constants.NORMAL_QUEUE).ttl(1000).deadLetterExchange(Constants.DL_EXCHANGE).deadLetterRoutingKey("dl").build();}@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalExchange") Exchange exchange, @Qualifier("normalQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();}@Bean("DLExchange")public Exchange DLExchange() {return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();}@Bean("DLQueue")public Queue DLQueue() {return QueueBuilder.durable(Constants.DL_QUEUE).build();}@Bean("DLBinding")public Binding DLBinding(@Qualifier("DLExchange") Exchange exchange,@Qualifier("DLQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("dl").noargs();}
}
生产者代码:
@RequestMapping("producer")
@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("delay")public String delay() {rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","rabbitmq delay,发送时间:" + new Date());return "消息发送成功";}
}
消费者:
@Component
public class DelayListener {@RabbitListener(queues = Constants.DL_QUEUE)public void listener(String message, Channel channel) {System.out.println("时间:" + new Date() + ",接收到消息:" + message + channel);}
}
可以看到,通过 TTL + 死信队列的方式是可以实现延迟队列的功能的,虽然可能从消息发送的时间到消费者消费到这个消息的时间可能会比预想的时间多一点(中间消息传输花费了一点时间)。
这是一次发送一条消息的情况,如果我们一次发多条信息看看什么效果:
@RequestMapping("delay")
public String delay() throws InterruptedException {rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","rabbitmq delay1,发送时间:" + new Date());Thread.sleep(1000);rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","rabbitmq delay2,发送时间:" + new Date());Thread.sleep(1000);rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","rabbitmq delay3,发送时间:" + new Date());return "消息发送成功";
}
当为队列设置 TTL 的时候,消息会按照进队列的先后顺序进行消费,这是为队列设置 TTL 的情况,为队列设置 TTL 意味着进入该队列中的所有消息都具有过期时间,且过期时间如果没有单独设置消息的过期时间的话,那么该队列中的消息的过期时间都是相同的,那么如果我们想要每个消息的过期时间都不相同的话就需要单独给消息设置过期时间,接下来我们看看给消息单独设置过期时间的情况。
设置消息的 TTL
public static final String NORMAL_QUEUE2 = "normal.queue2";
创建出一个没有设置过期时间的普通队列:
@Bean("normalQueue2")
public Queue normalQueue2() {return QueueBuilder.durable(Constants.NORMAL_QUEUE2).deadLetterExchange(Constants.DL_EXCHANGE).deadLetterRoutingKey("dl").build();
}@Bean("normalBinding2")
public Binding normalBinding2(@Qualifier("normalExchange") Exchange exchange,@Qualifier("normalQueue2") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("normal2").noargs();
}
设置消息的过期时间:
@RequestMapping("delay2")
public String delay2(){rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal2","rabbitmq delay,发送时间" + new Date(),message -> {message.getMessageProperties().setExpiration("10000");return message;});rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal2","rabbitmq delay,发送时间" + new Date(),message -> {message.getMessageProperties().setExpiration("20000");return message;});return "消息发送成功";
}
消费者的代码就不需要改动:
为消息设置 TTL 就可以时间每个消息的过期时间都不相同,但是这里会有一个问题,就是如果先投递的消息的过期时间如果晚于后面投递的消息的过期时间就会出现问题:
@RequestMapping("delay2")
public String delay2(){rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal2","rabbitmq delay,发送时间" + new Date(),message -> {message.getMessageProperties().setExpiration("20000");return message;});rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal2","rabbitmq delay,发送时间" + new Date(),message -> {message.getMessageProperties().setExpiration("10000");return message;});return "消息发送成功";
}
可以发现设置了过期时间为 10s 的消息也是过了 20s 之后才进入到死信队列然后才被消费。
消息过期之后,不一定会被马上丢弃。因为RabbitMQ只会检查队首消息是否过期,如果过期则丢到死信队列。
此时就会造成一个问题,如果第一个消息的延时时间很长,第二个消息的延时时间很短,那第二个消息并不会优先得到执行。因为我们的普通队列并没有消费者消费其中的消息,我们都知道对消息设置过期时间,只有在消息被快要使用的之前,才会判断它是否过期,但是这里由于队列没有消费者,所以消息也就不会被使用到,那么 RabbitMQ 就只会检查队首的消息是否过期,这样就导致了队首的消息没有过期,那么之后的消息也不会被检查到过期投递到死信队列中。
所以在考虑使用TTL+死信队列实现延迟任务队列的时候,需要确认业务上每个任务的延迟时间是一致的,如果遇到不同的任务类型需要不同的延迟的话,需要为每一种不同延迟时间的消息建立单独的消息队列。
延迟队列插件
虽然 RabbitMQ 没有直接实现延迟队列,但是 RabbitMQ 提供了一个延迟的插件来实现延迟的功能。
具体的可以看看官方文档 https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq#delaying-messages
安装并且启动插件服务
也可以点击这里:
我们下载的插件的版本需要和 RabbitMQ 的版本兼容:
版本对应关系:
下载完成插件之后,可以在这里查看如何配置:https://www.rabbitmq.com/docs/installing-plugins
我们将下载完成的文件放在 RabbitMQ 的插件目录下:
我们使用的是 Linux,所以需要将插件放在 /usr/lib/rabbitmq/plugins
或者 /usr/lib/rabbitmq/lib/rabbitmq_server-version/plugins
文件下,如果没有这个路径可以自己创建:
下载完成插件并且将其放在指定目录下之后,我们就可以启动插件了:
使用 rabbitmq-plugins list
查看 rabbitmq 的插件:
如果我们之前启动了一个错误版本的插件,可以使用 rabbitmq-plugins disbale 插件名称
来禁用插件:
使用 rabbitmq-plugins enable 插件名称
来启动插件服务:
如何验证插件已经安装并且启动成功,我们在 rabbitmq 的管理页面新建一个交换机的时候,看看交换机的类型是否有 x-delayed-message
这个类型:
我们刷新一下管理页面,然后在新建交换机的时候查看交换机的类型:
这就说明我们的延迟插件启动成功了。
使用插件实现延迟功能
安装并且启动延迟插件之后,我们来看看代码如何实现一个延迟功能的队列:
public static final String DELAY_EXCHANGE = "delay.exchange";
public static final String DELAY_QUEUE = "delay.queue";
声明交换机、对立和绑定关系:
@Configuration
public class DelayConfig {@Bean("delayExchange")public Exchange delayExchange() {return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed().build();}@Bean("delayQueue")public Queue delayQueue() {return QueueBuilder.durable(Constants.DELAY_QUEUE).build();}@Bean("delayBinding")public Binding delayBinding(@Qualifier("delayExchange") Exchange exchange, @Qualifier("delayQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("delay").noargs();}
}
生产者代码:
@RequestMapping("delay3")
public String delay3(){rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE,"delay","rabbitmq delay,发送时间" + new Date(),message -> {message.getMessageProperties().setDelay(20000);return message;});rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE,"delay","rabbitmq delay,发送时间" + new Date(),message -> {message.getMessageProperties().setDelay(10000);return message;});return "消息发送成功";
}
消费者代码:
@RabbitListener(queues = Constants.DELAY_QUEUE)
public void listener2(String message, Channel channel) {System.out.println("时间:" + new Date() + ",接收到消息:" + message + channel);
}
启动项目之后,会创建一个 x-delay-message 类型的交换机:
使用延迟插件的话,就可以使得队列中的消息可以按照延迟时间到达消费者。
相关文章:

RabbitMQ 高级特性——延迟队列
文章目录 前言延迟队列延迟队列的概念TTL 死信队列模拟延迟队列设置队列的 TTL设置消息的 TTL 延迟队列插件安装并且启动插件服务使用插件实现延迟功能 前言 前面我们学习了 TTL 和死信队列,当队列中的消息达到了过期时间之后,那么这个消息就会被死信交…...

EAC(Estimate at Completion)和ETC(Estimate to Complete)
EAC 预计完工成本ETC 预计尚需成本Estimate at CompletionEstimate to Complete完成预估完工时尚需成本估算 EAC ETC ACETC EAC – AC 预测项目总成本,包含了到目前为止实际发生的成本(AC)和预计将发生的成本。如果EAC大于BAC…...

【React】状态管理之Zustand
🌈个人主页: 鑫宝Code 🔥热门专栏: 闲话杂谈| 炫酷HTML | JavaScript基础 💫个人格言: "如无必要,勿增实体" 文章目录 状态管理之Zustand引言1. Zustand 的核心特点1.1 简单直观的 API1.2 无需 Provi…...

Vue3打包自动生成版本JSON文件,添加系统版本检查,实现系统自动更新提示
实现该功能一共有三步。废话不多说,直接上代码!!! 第一步:打包时自动生成版本信息的js文件,versionUpdate.js import fs from fs; import path from path; import { ElMessageBox } from element-plus; i…...
海量数据有限内存系列问题解决方案
1. 排序问题 有限数据充足内存:内存中有十万整数,对所有数据进行排序。 内部排序即可 单节点海量数据有限内存:某台机器有一个文件,文件中包含六十亿整数,一个整数一行,可用内存1G,对所有数据…...

FFmpeg 4.3 音视频-多路H265监控录放C++开发十四,总结编码过程,从摄像头获得数据后,转成AVFrame,然后再次转成AVPacket,
也就是将摄像头采集到的YUV 的数据换成 AVFrame,然后再次转成 AVPacket,那么这AVPakcet数据要怎么办呢?分为三种情况: 一种是将AVPacket存储成h264文件,由于h264编码器在将avframe变成avpacket的时候就是按照h264的格…...

内容占位符:Kinetic Loader HTML+CSS 使用CSS制作三角形原理
内容占位符 前言 随着我们对HTML和CSS3的学习逐渐深入,相信大家都已经掌握了网页制作的基础知识,包括如何使用HTML标记构建网页结构,以及如何运用CSS样式美化页面。为了进一步巩固和熟练这些技能,今天我们一起来完成一个有趣且实…...

麒麟nginx配置
一、配置负载均衡 配置麒麟的yum源 vim /etc/yum.repos.d/kylin_aarch64.repo Copy 删除原来内容,写入如下yum源 [ks10-adv-os] name Kylin Linux Advanced Server 10 - Os baseurl http://update.cs2c.com.cn:8080/NS/V10/V10SP2/os/adv/lic/base/aarch64/ …...

如何在 Ubuntu 上安装 Emby 媒体服务器
Emby 是一个开源的媒体服务器解决方案,它能让你整理、流媒体播放和分享你的个人媒体收藏,包括电影、音乐、电视节目和照片。Emby 帮你集中多媒体内容,让你无论在家还是在外都能轻松访问。它还支持转码,让你能够播放各种格式的内容…...

Mac上详细配置java开发环境和软件(更新中)
文章目录 概要JDK的配置JDK下载安装配置JDK环境变量文件 Idea的安装Mysql安装和配置Navicat Premium16.1安装安装Vscode安装和配置Maven配置本地仓库配置阿里云私服Idea集成Maven 概要 这里使用的是M3型片 14.6版本的Mac 用到的资源放在网盘 链接: https://pan.baidu.com/s/17…...

jmeter常用配置元件介绍总结之定时器
系列文章目录 安装jmeter jmeter常用配置元件介绍总结之定时器 5.定时器5.1.固定定时器5.2.统一随机定时器5.3.Precise Throughput Timer5.4.Constant Throughput Timer5.5.Synchronizing Timer5.6.泊松随机定时器5.7.高斯随机定时器 5.定时器 5.1.固定定时器 固定定时器Cons…...

Spring——提前编译
提前编译:AOT AOT概述 JIT与AOT的区别 JIT和AOT 这个名词是指两种不同的编译方式,这两种编译方式的主要区别在于是否在“运行时”进行编译 (1)JIT, Just-in-time,动态(即时)编译,边运行边编译࿱…...

乐理的学习(音程)
二度,三度,六度,七度的大n度都是直接的音名到音名,如#A到#G的,这样为大n度 而这个基础上向内收,收半音为小n度,在小n度再收,为减n度 在大n度的基础上再向外扩半音,为增…...

【网络】数据链路层协议——以太网,ARP协议
> 作者:დ旧言~ > 座右铭:松树千年终是朽,槿花一日自为荣。 > 目标:了解什么是以太网协议和ARP协议。 > 毒鸡汤:有些事情,总是不明白,所以我不会坚持。早安! > 专栏选自…...

Linux分区、挂载、配额、逻辑卷、RAID、系统综合状态查看
分区与挂载 fdisk fdisk 命令是一个用于磁盘分区管理的命令行工具,可以用来创建、删除、调整分区等操作。常用的 fdisk 命令选项包括: fdisk -l:列出系统中的所有磁盘分区信息。 fdisk /dev/sdX:打开指定磁盘进行分区操作。 n&…...

3D Gaussian Splatting 代码层理解之Part1
2023 年初,来自蔚蓝海岸大学和 马克斯普朗克学会的作者发表了一篇题为“用于实时现场渲染的 3D 高斯泼溅”的论文。该论文提出了实时神经渲染的重大进步,超越了NeRF等以前方法的实用性。高斯泼溅不仅减少了延迟,而且达到或超过了 NeRF 的渲染质量,在神经渲染领域掀起了一场…...
Qt小知识-Q_GLOBAL_STATIC
你还在为创建全局静态对象烦恼嘛,它来了!它来了! qt5提供了两个宏定义Q_GLOBAL_STATIC和Q_GLOBAL_STATIC_WITH_ARGS来实现。可以创建一个全局静态对象,对象在第一次使用时初始化自身,这意味着它不会增加应用程序或库的…...
【SpringBoot】使用过滤器进行XSS防御
在Spring Boot中,我们可以使用注解的方式来进行XSS防御。注解是一种轻量级的防御手段,它可以在方法或字段级别对输入进行校验,从而防止XSS攻击。 而想对全局的请求都进行XSS防御可以使用servlet中的过滤器或者spring mvc中的拦截器ÿ…...

创建vue插件,发布npm
开发步骤:1.创建一个vue项目,2.开发一个组件。 3.注册成插件。 4.vite和package.json配置。5.发布到npm 1.创建一个vue项目 npm create vuelatest 生成了vue项目之后,得到了以下结构。 在src下创建个plugins目录。用于存放开发的…...

【Android Compose原创组件】可拖动滚动条的完美实现
项目背景 我在使用安卓Compose开发自己的【JK管理器】的过程中,很多地方都需要使用滚动条,在Github上也有实现的比较好,但是大多都是基于View(我要的是Compose啊)。 在研究Android 官方示例项目 nowinandroid 中&…...

龙虎榜——20250610
上证指数放量收阴线,个股多数下跌,盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型,指数短线有调整的需求,大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的:御银股份、雄帝科技 驱动…...

TDengine 快速体验(Docker 镜像方式)
简介 TDengine 可以通过安装包、Docker 镜像 及云服务快速体验 TDengine 的功能,本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker,请使用 安装包的方式快…...

springboot 百货中心供应链管理系统小程序
一、前言 随着我国经济迅速发展,人们对手机的需求越来越大,各种手机软件也都在被广泛应用,但是对于手机进行数据信息管理,对于手机的各种软件也是备受用户的喜爱,百货中心供应链管理系统被用户普遍使用,为方…...
【Java学习笔记】Arrays类
Arrays 类 1. 导入包:import java.util.Arrays 2. 常用方法一览表 方法描述Arrays.toString()返回数组的字符串形式Arrays.sort()排序(自然排序和定制排序)Arrays.binarySearch()通过二分搜索法进行查找(前提:数组是…...

基于Flask实现的医疗保险欺诈识别监测模型
基于Flask实现的医疗保险欺诈识别监测模型 项目截图 项目简介 社会医疗保险是国家通过立法形式强制实施,由雇主和个人按一定比例缴纳保险费,建立社会医疗保险基金,支付雇员医疗费用的一种医疗保险制度, 它是促进社会文明和进步的…...

STM32F4基本定时器使用和原理详解
STM32F4基本定时器使用和原理详解 前言如何确定定时器挂载在哪条时钟线上配置及使用方法参数配置PrescalerCounter ModeCounter Periodauto-reload preloadTrigger Event Selection 中断配置生成的代码及使用方法初始化代码基本定时器触发DCA或者ADC的代码讲解中断代码定时启动…...

【2025年】解决Burpsuite抓不到https包的问题
环境:windows11 burpsuite:2025.5 在抓取https网站时,burpsuite抓取不到https数据包,只显示: 解决该问题只需如下三个步骤: 1、浏览器中访问 http://burp 2、下载 CA certificate 证书 3、在设置--隐私与安全--…...

UR 协作机器人「三剑客」:精密轻量担当(UR7e)、全能协作主力(UR12e)、重型任务专家(UR15)
UR协作机器人正以其卓越性能在现代制造业自动化中扮演重要角色。UR7e、UR12e和UR15通过创新技术和精准设计满足了不同行业的多样化需求。其中,UR15以其速度、精度及人工智能准备能力成为自动化领域的重要突破。UR7e和UR12e则在负载规格和市场定位上不断优化…...
uniapp 集成腾讯云 IM 富媒体消息(地理位置/文件)
UniApp 集成腾讯云 IM 富媒体消息全攻略(地理位置/文件) 一、功能实现原理 腾讯云 IM 通过 消息扩展机制 支持富媒体类型,核心实现方式: 标准消息类型:直接使用 SDK 内置类型(文件、图片等)自…...

数据结构:递归的种类(Types of Recursion)
目录 尾递归(Tail Recursion) 什么是 Loop(循环)? 复杂度分析 头递归(Head Recursion) 树形递归(Tree Recursion) 线性递归(Linear Recursion)…...