RabbitMQ学习(九):延迟队列
一、延迟队列概念
延时队列中,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理。简单来说,延时队列就是用来存放需要在指定时间内被处理的 元素的队列。
其实延迟队列就是死信队列的一种。
二、延迟队列使用场景
订单在十分钟之内未支付则自动取消
新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒
用户注册成功后,如果三天内没有登陆则进行短信提醒
用户发起退款,如果三天内没有得到处理则通知相关运营人员
预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
这些场景都有一个特点:需要在某个事件发生之后或者之前的指定时间点完成某一项任务。如: 发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭。
看起来似乎只需要 使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理就行。如果 数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求, 如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支 付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十 分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万 级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单 的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。
三、RabbitMQ 中的 TTL
TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有 消息的最大存活时间, 单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这 条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的TTL 和消息的 TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。
四、队列TTL
4.1 代码架构图
创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:

4.2 配置类
这期我们整合了springboot,将交换机和队列的声明放到了配置类中。
@Configuration
public class TtlQueueConfig {public static final String X_EXCHANGE = "X";public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";public static final String Y_DEAD_LETTER_EXCHANGE = "Y";public static final String DEAD_LETTER_QUEUE = "QD";// 声明 xExchange@Bean("xExchange")public DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}// 声明 yExchange@Bean("yExchange")public DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明队列 A ttl 为 10s 并绑定到对应的死信交换机@Bean("queueA")public Queue queueA(){Map<String, Object> args = new HashMap<>(3);//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");//声明队列的 TTLargs.put("x-message-ttl", 10000);return QueueBuilder.durable(QUEUE_A).withArguments(args).build();}// 声明队列 A 绑定 X 交换机@Beanpublic Binding queueaBindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}//声明队列 B ttl 为 40s 并绑定到对应的死信交换机@Bean("queueB")public Queue queueB(){Map<String, Object> args = new HashMap<>(3);//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");//声明队列的 TTLargs.put("x-message-ttl", 40000);return QueueBuilder.durable(QUEUE_B).withArguments(args).build();}//声明队列 B 绑定 X 交换机@Beanpublic Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queue1B).to(xExchange).with("XB");}//声明死信队列 QD@Bean("queueD")public Queue queueD(){return new Queue(DEAD_LETTER_QUEUE);}//声明死信队列 QD 绑定关系@Beanpublic Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");}
}
4.3 生产者
@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("sendMsg/{message}")public void sendMsg(@PathVariable String message){log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), message);rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: "+message);rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40S 的队列: "+message);}
}
4.4 消费者
@Slf4j
@Component
public class DeadLetterQueueConsumer {@RabbitListener(queues = "QD")public void receiveD(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);}
}
4.5 总结
发起一个请求 http://localhost:8080/ttl/sendMsg/嘻嘻嘻
第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息, 然后被消费掉,这样一个延时队列就打造完成了。
不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S 两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然 后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?
五、延时队列优化
5.1 代码架构图
在这里新增了一个队列 QC,绑定关系如下,该队列不设置 TTL 时间,而是通过生产者生产消息时里指定延迟时间。

5.2 配置类
@Component
public class MsgTtlQueueConfig {public static final String Y_DEAD_LETTER_EXCHANGE = "Y";public static final String QUEUE_C = "QC";//声明队列 C 死信交换机@Bean("queueC")public Queue queueB(){Map<String, Object> args = new HashMap<>(3);//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");//没有声明 TTL 属性return QueueBuilder.durable(QUEUE_C).withArguments(args).build();}//声明队列 B 绑定 X 交换机@Beanpublic Binding queuecBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueC).to(xExchange).with("XC");}
}
5.3 生产者
@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message,@PathVariable String ttlTime) {rabbitTemplate.convertAndSend("X", "XC", message, correlationData ->{correlationData.getMessageProperties().setExpiration(ttlTime);return correlationData;});log.info("当前时间:{},发送一条时长{}毫秒TTL信息给队列 C:{}", new Date(),ttlTime, message);
}
发起请求 : http://localhost:8080/ttl/sendExpirationMsg/你好 1/20000 http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000
5.4 总结
看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消 息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
六、Rabbitmq 插件实现延迟队列
上文中提到的问题,确实是一个问题,如果不能实现在消息粒度上的 TTL,并使其在设置的 TTL 时间 及时死亡,就无法设计成一个通用的延时队列。那如何解决呢,接下来我们就去解决该问题。
6.1 安装延时队列插件
在官网上下载 https://www.rabbitmq.com/community-plugins.html,下载 rabbitmq_delayed_message_exchange 插件,然后解压放置到 RabbitMQ 的插件目录。
6.2 代码架构图
在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:

6.3 配置类
在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制,消息传递后并 不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才 投递到目标队列中。
@Configuration
public class DelayedQueueConfig {public static final String DELAYED_QUEUE_NAME = "delayed.queue";public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";@Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE_NAME);}//自定义交换机 我们在这里定义的是一个延迟交换机@Beanpublic CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();//自定义交换机的类型args.put("x-delayed-type", "direct");return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false,args);}@Beanpublic Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,@Qualifier("delayedExchange") CustomExchange delayedExchange) {return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}
}
6.4 生产者
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime) {rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,correlationData ->{correlationData.getMessageProperties().setDelay(delayTime);return correlationData;});log.info("当前时间:{}, 发送一条延迟 {} 毫秒的信息给队列 delayed.queue:{}", new Date(),delayTime, message);
}
6.5 消费者
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message){String msg = new String(message.getBody());log.info("当前时间:{},收到延时队列的消息:{}", new Date().toString(), msg);
}
发起请求: http://localhost:8080/ttl/sendDelayMsg/comeon baby1/20000 http://localhost:8080/ttl/sendDelayMsg/comeon baby2/2000
6.6 总结
延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正 确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为 单个节点挂掉导致延时队列不可用或者消息丢失。
当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景。
相关文章:

RabbitMQ学习(九):延迟队列
一、延迟队列概念延时队列中,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理。简单来说,延时队列就是用来存放需要在指定时间内被处理的 元素的队列。其实延迟…...

TCP并发服务器(多进程与多线程)
欢迎关注博主 Mindtechnist 或加入【Linux C/C/Python社区】一起探讨和分享Linux C/C/Python/Shell编程、机器人技术、机器学习、机器视觉、嵌入式AI相关领域的知识和技术。 TCP并发服务器(多进程与多线程)1. 多进程并发服务器(1)…...
第1章 Memcached 教程
Memcached是一个自由开源的,高性能,分布式内存对象缓存系统。 Memcached是以LiveJournal旗下Danga Interactive公司的Brad Fitzpatric为首开发的一款软件。现在已成为mixi、hatena、Facebook、Vox、LiveJournal等众多服务中提高Web应用扩展性的重要因素…...

【2022.12.9】Lammps+Python 在计算g6(r)时遇到的问题
目录写在前面绘制g6( r )执行步骤【updated】如何检查图像的正确性:不是编程问题,而是数学问题的一个小bug废稿2则:写在前面 全部log: 【2022.11.16】LammpsPythonMATLAB在绘制维诺图时遇到的问题 绘制g6( r )执行步骤【updated…...

MySQL使用C语言连接
文章目录MySQL使用C语言连接引入库下载库文件在项目中使用库使用库连接数据库下发SQL请求获取查询结果MySQL使用C语言连接 引入库 要使用C语言连接MySQL,需要使用MySQL官网提供的库。 下载库文件 下载库文件 首先,进入MySQL官网,选择DEVEL…...

JavaScript随手笔记---比较两个数组差异
💌 所属专栏:【JavaScript随手笔记】 😀 作 者:我是夜阑的狗🐶 🚀 个人简介:一个正在努力学技术的CV工程师,专注基础和实战分享 ,欢迎咨询! &#…...

【C++修炼之路】21.红黑树封装map和set
每一个不曾起舞的日子都是对生命的辜负 红黑树封装map和set前言一.改良红黑树的数据域结构1.1 改良后的结点1.2 改良后的类二. 封装的set和map2.1 set.h2.2 map.h三. 迭代器3.1 迭代器封装3.2 const迭代器四.完整代码实现4.1 RBTree.h4.2 set.h4.3 map.h4.4 Test.cpp前言 上一节…...
下载ojdbc14.jar的10.2.0.1.0版本的包
一、首先要有ojdbc14.jar包 没有的可以去下载一个,我的是从这里下载的ojdbc14.jar下载_ojdbc14.jar最新版下载[驱动包软件]-下载之家, 就是无奈关注了一个公众号,有的就不用下了。 二、找到maven的本地仓库的地址 我的地址在这里D:\apach…...

关于欧拉角你需要知道几个点
基础理解,参照:https://www.cnblogs.com/Estranged-Tech/p/16903025.html 欧拉角、万向节死锁(锁死)理解 一、欧拉角理解 举例讲解 欧拉角用三次独立的绕确定的轴旋转角度来表示姿态。如下图所示 经过三次旋转,旋…...

git ssh配置
ssh配置 执行以下命令进行配置 git config --global user.name “这里换上你的用户名” git config --global user.email “这里换上你的邮箱” 执行以下命令生成秘钥: ssh-keygen -t rsa -C “这里换上你的邮箱” 执行命令后需要进行3次或4次确认。直接全部回车就…...

Linux进程概念(三)
环境变量与进程地址空间环境变量什么是环境变量常见环境变量环境变量相关命令环境变量的全局属性PWDmain函数的三个参数进程地址空间什么是进程地址空间进程地址空间,页表,内存的关系为什么存在进程地址空间环境变量 什么是环境变量 我们所有写的程序都…...

新手福利——x64逆向基础
一、x64程序的内存和通用寄存器 随着游戏行业的发展,x32位的程序已经很难满足一些新兴游戏的需求了,因为32位内存的最大值为0xFFFFFFFF,这个值看似足够,但是当游戏对资源需求非常大,那么真正可以分配的内存就显得捉襟…...
虚幻c++中的细节之枚举类型(enum)
文章目录前言一、原生c的枚举类型关键字classint8 - 枚举的基础类型(underlying type)二、枚举类型的灵活运用位运算枚举循环遍历三、虚幻风格的枚举类型UENUMUMETATEnumAsByte总结前言 虚幻引擎中的代码部分实现了一套反射机制,为c代码带了…...
判断某个字符串在另一个字符串中的个数
/** * 用于判断字符串中字符的个数 * * param str1 原字符串 * param str2 需要判断的字符 * return 返回有几个 */ private int getCount(String str1, String str2) { //获取两个字符串的长度 int oneLength str1.length(); int toLength str2.length(); //定义两个整数&am…...

测试人员如何运用好OKR
在软件测试工作中是不是还不知道OKR是什么?又或者每次都很害怕写OKR?或者总觉得很迷茫,不知道目标是什么? OKR 与 KPI 的区别 去年公司从KPI换OKR之后,我也有一段抓瞎的过程,然后自己找了两本书看,一本是《OKR工作法》…...

CentOS7 Hive2.3.9 安装部署(mysql 8.0)
一、CentOS7安装MySQL数据库 查询载mariadb rpm -qa | grep mariadb卸载mariadb rpm -e --nodeps [查询出来的内容]安装wget为下载mysql准备 yum -y install wget在tools目录下执行以下命令,下载MySQL的repo源: wget -P /tools/ https://dev.mysql.…...
【PTA Advanced】1142 Maximal Clique(C++)
目录 题目 Input Specification: Output Specification: Sample Input: Sample Output: 思路 代码 题目 A clique is a subset of vertices of an undirected graph such that every two distinct vertices in the clique are adjacent. A maximal clique is a clique …...
1. MySQL在金融互联网行业的企业级安装部署
这里写目录标题 1. 版本介绍示例2.安装MySQL规范(建议二进制)2.1 安装方式2.2 安装用户2.3 目录规范3.二进制安装3.1 操作系统配置3.2 MySQL 5.7.33 安装部署2.3 MySQL8.0.27安装2.4 源码安装(了解 )3.多实例部署及注意事项3.1 多实例概念3.2 多实例安装3.3 多实例第二种方式…...

【C++修炼之路】19.AVL树
每一个不曾起舞的日子都是对生命的辜负 AVL树前言:一.AVL树的概念二.AVL树的结构2.1 AVL树节点的定义2.2 AVL树的结构2.3 AVL树的插入2.4 AVL树的验证2.5 AVL树的删除(了解)三.AVL树的旋转(重要)3.1 左单旋3.2 右单旋3.3 左右双旋3.4 右左双旋…...

项目管理工具dhtmlxGantt甘特图入门教程(十):服务器端数据集成(下)
这篇文章给大家讲解如何利用dhtmlxGantt在服务器端集成数据。 dhtmlxGantt是用于跨浏览器和跨平台应用程序的功能齐全的Gantt图表,可满足应用程序的所有需求,是完善的甘特图图表库 DhtmlxGantt正版试用下载(qun 764149912)http…...
CVPR 2025 MIMO: 支持视觉指代和像素grounding 的医学视觉语言模型
CVPR 2025 | MIMO:支持视觉指代和像素对齐的医学视觉语言模型 论文信息 标题:MIMO: A medical vision language model with visual referring multimodal input and pixel grounding multimodal output作者:Yanyuan Chen, Dexuan Xu, Yu Hu…...

【Redis技术进阶之路】「原理分析系列开篇」分析客户端和服务端网络诵信交互实现(服务端执行命令请求的过程 - 初始化服务器)
服务端执行命令请求的过程 【专栏简介】【技术大纲】【专栏目标】【目标人群】1. Redis爱好者与社区成员2. 后端开发和系统架构师3. 计算机专业的本科生及研究生 初始化服务器1. 初始化服务器状态结构初始化RedisServer变量 2. 加载相关系统配置和用户配置参数定制化配置参数案…...

Docker 本地安装 mysql 数据库
Docker: Accelerated Container Application Development 下载对应操作系统版本的 docker ;并安装。 基础操作不再赘述。 打开 macOS 终端,开始 docker 安装mysql之旅 第一步 docker search mysql 》〉docker search mysql NAME DE…...
MinIO Docker 部署:仅开放一个端口
MinIO Docker 部署:仅开放一个端口 在实际的服务器部署中,出于安全和管理的考虑,我们可能只能开放一个端口。MinIO 是一个高性能的对象存储服务,支持 Docker 部署,但默认情况下它需要两个端口:一个是 API 端口(用于存储和访问数据),另一个是控制台端口(用于管理界面…...
华为OD最新机试真题-数组组成的最小数字-OD统一考试(B卷)
题目描述 给定一个整型数组,请从该数组中选择3个元素 组成最小数字并输出 (如果数组长度小于3,则选择数组中所有元素来组成最小数字)。 输入描述 行用半角逗号分割的字符串记录的整型数组,0<数组长度<= 100,0<整数的取值范围<= 10000。 输出描述 由3个元素组成…...
【安全篇】金刚不坏之身:整合 Spring Security + JWT 实现无状态认证与授权
摘要 本文是《Spring Boot 实战派》系列的第四篇。我们将直面所有 Web 应用都无法回避的核心问题:安全。文章将详细阐述认证(Authentication) 与授权(Authorization的核心概念,对比传统 Session-Cookie 与现代 JWT(JS…...

DAY 45 超大力王爱学Python
来自超大力王的友情提示:在用tensordoard的时候一定一定要用绝对位置,例如:tensorboard --logdir"D:\代码\archive (1)\runs\cifar10_mlp_experiment_2" 不然读取不了数据 知识点回顾: tensorboard的发展历史和原理tens…...
前端工具库lodash与lodash-es区别详解
lodash 和 lodash-es 是同一工具库的两个不同版本,核心功能完全一致,主要区别在于模块化格式和优化方式,适合不同的开发环境。以下是详细对比: 1. 模块化格式 lodash 使用 CommonJS 模块格式(require/module.exports&a…...

以太网PHY布局布线指南
1. 简介 对于以太网布局布线遵循以下准则很重要,因为这将有助于减少信号发射,最大程度地减少噪声,确保器件作用,最大程度地减少泄漏并提高信号质量。 2. PHY设计准则 2.1 DRC错误检查 首先检查DRC规则是否设置正确,然…...

Linux系统:进程间通信-匿名与命名管道
本节重点 匿名管道的概念与原理匿名管道的创建命名管道的概念与原理命名管道的创建两者的差异与联系命名管道实现EchoServer 一、管道 管道(Pipe)是一种进程间通信(IPC, Inter-Process Communication)机制,用于在不…...