学会RabbitMQ的延迟队列,提高消息处理效率
系列文章目录
手把手教你,本地RabbitMQ服务搭建(windows)
消息队列选型——为什么选择RabbitMQ
RabbitMQ灵活运用,怎么理解五种消息模型
RabbitMQ 能保证消息可靠性吗
推或拉? RabbitMQ 消费模式该如何选择
死信是什么,如何运用RabbitMQ的死信机制?
真的好用吗?鲜有人提的 RabbitMQ-RPC模式
如何利用RabbitMQ的延迟队列提高消息处理效率
- 系列文章目录
- 一、什么是延迟队列?
- 二、延迟队列的实现
- 1. x-delayed-message插件
- 2. TTL + 死信队列
- 三、手写延时队列
- 1. 时间轮概念
- 2. JAVA演示
- 四、应用场景与注意事项
- 1. 应用场景
- 2. 注意事项
- 总结

前面我们讲到了RabbitMQ的死信队列,其实除了死信队列,RabbitMQ还有一个常用的延迟队列设计。今天,我们就来说一下这个延迟队列
📕作者简介:战斧,从事金融IT行业,有着多年一线开发、架构经验;爱好广泛,乐于分享,致力于创作更多高质量内容
📗本文收录于 RabbitMQ ,有需要者,可直接订阅专栏实时获取更新
📘高质量专栏 云原生、RabbitMQ、Spring全家桶 等仍在更新,欢迎指导
📙Zookeeper Redis kafka docker netty等诸多框架,以及架构与分布式专题即将上线,敬请期待
提示:以下是本篇文章正文内容,下面案例可供参考
一、什么是延迟队列?
延迟队列指的是当我们将消息发送到RabbitMQ时,可以指定消息的有效期或者消息需要在未来某个时间点才能被消费。这种消息被称为“延迟消息”。因此,RabbitMQ支持通过延迟队列来实现延迟消息的发送和消费。
二、延迟队列的实现
延迟队列的实现原理其实就是将消息放入到一个普通的队列中,只不过这个队列有一个特殊的属性:消息的消费被延迟一段时间。这个延迟时间可以是任意的,也可以是固定的。当消息进入队列时,会有一个定时器在计时,当计时器到达设定的时间时,消息会被转移至消费队列等待被消费。
在RabbitMQ中,延迟队列的实现有两种方式:一种是通过x-delayed-message插件实现;另一种是通过TTL(Time To Live)和死信队列实现。
1. x-delayed-message插件
x-delayed-message插件可以让RabbitMQ支持延迟消息功能,它是一个非官方插件,需要自行下载并安装。其源码地址如下:github地址 或 gitee地址;如果你是从笔者之前的安装博客 手把手教你,本地RabbitMQ服务搭建(windows) 过来的,那么你用的可能是RabbitMQ V3.12,可以直接下载我上传的资源 3.12-插件
首先,需要在RabbitMQ服务器上安装x-delayed-message插件。把上述的插件复制进我们RabbitMQ的服务插件目录下

然后执行插件的启用 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 即可
然后,在Java代码中定义queue、exchange和connectionFactory,代码如下:
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST_NAME);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setPort(PORT);Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-delayed-type", "direct");
channel.exchangeDeclare("delayed_exchange", "x-delayed-message", true, false, arguments);
channel.queueDeclare("delayed_queue", true, false, false, null);
channel.queueBind("delayed_queue", "delayed_exchange", "delayed_routing_key");
不难发现,此时其实是交换机在做延迟,

当然,除了交换机的设置,在发送消息时,还需要在消息头部设置x-delay属性,代码如下:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2);
builder.headers(new HashMap<String, Object>(){{put("x-delay", 5000);}});
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("delayed_exchange", "delayed_routing_key", properties, message.getBytes());
2. TTL + 死信队列
此种方式的原理其实我们在学习死信队列的时候应该就察觉到了,就是利用消息超时(TTL)后会转入死信交换机的机制,其模型如下:

首先,需要在Java代码中定义queue、exchange和connectionFactory,代码如下:
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST_NAME);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setPort(PORT);Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "dead_letter_exchange");
arguments.put("x-dead-letter-routing-key", "dead_letter_routing_key");
arguments.put("x-message-ttl", 5000);channel.exchangeDeclare("normal_exchange", "direct", true, false, null);
channel.exchangeDeclare("dead_letter_exchange", "direct", true, false, null);
channel.queueDeclare("normal_queue", true, false, false, arguments);
channel.queueDeclare("dead_letter_queue", true, false, false, null);
channel.queueBind("normal_queue", "normal_exchange", "normal_routing_key");
channel.queueBind("dead_letter_queue", "dead_letter_exchange", "dead_letter_routing_key");
在发送消息时,只需要将消息发送到normal_exchange交换机下,代码如下:
channel.basicPublish("normal_exchange", "normal_routing_key", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
三、手写延时队列
当然,除了RabbitMQ,实现延时队列的方式还有很多,我们甚至可以自己实现,本节,我们就尝试自己写个延时队列
1. 时间轮概念
在关于计时或定时的设计里,时间轮是一种用于处理定时任务的数据结构。它通过将时间划分为一系列的时刻,每个时刻对应一个槽,将任务存储在相应的槽中

时间轮通常包含多个槽和指针,其中指针指向当前时刻对应的槽,每过单位时间,指针就指向下一个槽,这样任务调度时按照指针的移动依次执行槽中的任务

2. JAVA演示
我们先使用JUC相关内容实现一个时间轮
import java.util.*;
import java.util.concurrent.*;class TimeWheel {private int size;private int currentIndex;private List<BlockingQueue<Task>> slots;private Executor executor;public TimeWheel(int size, Executor executor) {this.size = size;this.slots = new ArrayList<>(size);for (int i = 0; i < size; i++) {slots.add(new LinkedBlockingQueue<>());}this.executor = executor;}public void addTask(Task task) {int expireIndex = (int)(currentIndex + task.getDelay() / 1000) % size;slots.get(expireIndex).add(task);}public void start() {new Thread(() -> {while (true) {currentIndex = (currentIndex + 1) % size;BlockingQueue<Task> currentSlot = slots.get(currentIndex);List<Task> tasks = new ArrayList<>();currentSlot.drainTo(tasks);for (Task task : tasks) {executor.execute(task);}try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}).start();}
}class Task implements Runnable {private long delay; // 延迟时间,单位毫秒private Runnable task; // 任务public Task(long delay, Runnable task) {this.delay = delay;this.task = task;}public long getDelay() {return delay;}@Overridepublic void run() {task.run();}
}
我们可以使用main方法来尝试验证这个时间轮效果:
public static void main(String[] args) {TimeWheel timeWheel = new TimeWheel(60 * 60, Executors.newFixedThreadPool(10));// 添加任务,延迟5秒执行timeWheel.addTask(new Task(5000, () -> System.out.println("Task 1 executed!")));// 添加任务,延迟10秒执行timeWheel.addTask(new Task(10000, () -> System.out.println("Task 2 executed!")));// 启动时间轮timeWheel.start();}

当然,以上代码只是一个简化的实现,实际情况中需要考虑任务执行时间和时间轮的精度等问题。
四、应用场景与注意事项
1. 应用场景
-
红包预告
在现在的抢红包的场景下,当用户发起红包活动后,可能不希望立即开抢,而是设定在一段时间后开启。那么我们可以将将红包信息发送到一个延迟队列中,一定时间后,系统会自动激活红包,此时用户才可以真正抢红包

-
订单系统
在订单系统中,有一些订单需要在未来某个时间点才能被处理。例如,有些订单需要在一定的时间之后才能发货或者确认收货。这时候,我们可以将这些订单放到延迟队列中,当时间到达时再进行处理。 -
优惠券系统
在优惠券系统中,有一些优惠券需要在未来某个时间点才能使用。这时候,我们可以将这些优惠券放到延迟队列中,当时间到达时再进行激活。
2. 注意事项
-
延迟队列不要使用太多
使用延迟队列可以在一定程度上减少系统的负载,但是使用过多的延迟队列会导致系统变得更加复杂,维护起来也更加困难。 -
延迟队列可能会导致消息丢失
在RabbitMQ中,当一个带有TTL消息被发送到队列中时,如果队列中的消息太多,或者队列的消费者速度太慢,就会导致消息失效,如果没有使用死信机制,消息就会被丢失。为了避免这种情况发生,我们需要对队列进行监控,及时发现问题并进行处理。 -
设置合适的延迟时间
在使用延迟队列时,需要根据实际需求设置合适的延迟时间。如果延迟时间太短,可能会导致消息延迟效果不明显;如果延迟时间太长,可能会导致系统累积大量的消息,导致负载过高。
总结
RabbitMQ的延迟队列是一种非常实用的特性,可以帮助我们实现定时任务、限流、削峰等功能。但是,在使用延迟队列时,需要谨慎对待,根据实际需求设置合适的延迟时间,并及时监控队列中的消息,避免出现消息丢失的情况。
相关文章:
学会RabbitMQ的延迟队列,提高消息处理效率
系列文章目录 手把手教你,本地RabbitMQ服务搭建(windows) 消息队列选型——为什么选择RabbitMQ RabbitMQ灵活运用,怎么理解五种消息模型 RabbitMQ 能保证消息可靠性吗 推或拉? RabbitMQ 消费模式该如何选择 死信是什么…...
ChatGPT会取代搜索引擎吗?BingChat、GoogleBard与ChatGPT区别
目前暂时不会,ChatGPT为代表的聊天机器人很可能会直接集成到搜索中,而不是取代它。微软已经通过Bing Chat和Bing做到了这一点,它将“聊天”选项卡直接放入Bing搜索的菜单中。Google、百度也分别开始尝试通过其AI生成技术将Google Bard、文心一…...
多个QLabel中文字左右对其问题研究
众所周知,关于QLabel 中的文字对其方式,官方提供多种,具体可参考 AlignmentFlag,这里就不详细列举了。 实际开发中有这样一个需求:多个lab中,文字显示不同,长度不一,但想要实现视觉…...
链式二叉树统计结点个数的方法和bug
方法一: 分治:分而治之 int BTreeSize1(BTNode* root) {if (root NULL) return 0;else return BTreeSize(root->left)BTreeSize(root->right)1; } 方法二: 遍历计数:设置一个计数器,对二叉树正常访问&#…...
C语言-报错集锦-03-malloc(): memory corruption: 0x0000000001496d90 ***
一、报错信息 [2023-8]--[ Debug ]--Push Data To StAccessPath OK. [2023-8]--[ Debug ]--Judge Vertex(0) Is Not Accessed. [2023-8]--[ Debug ]--Judge Vertex(2) Is Accessed. [2023-8]--[ Debug ]--Judge Vertex(3) Is Not Accessed. [2023-8]--[ Debug ]--Judge Vertex…...
现代C++中的从头开始深度学习:【5/8】卷积
一、说明 在上一个故事中,我们介绍了机器学习的一些最相关的编码方面,例如 functional 规划、矢量化和线性代数规划。 现在,让我们通过使用 2D 卷积实现实际编码深度学习模型来开始我们的道路。让我们开始吧。 二、关于本系列 我们将学习如何…...
以太网帧格式与吞吐量计算
以太网帧结构 帧大小的定义 以太网单个最大帧 6(目的MAC地址) 6(源MAC地址) 2(帧类型) 1500{IP数据包[IP头(20)DATA(1480)]} 4(CRC校验ÿ…...
vue中install方法
1:语法 vue提供install可供我们开发新的插件及全局注册组件等 install方法第一个参数是vue的构造器,第二个参数是可选的选项对象 export default {install(Vue,option){组件指令混入挂载vue原型} }2:注册组件 一:注册单个组件 1…...
Flutter:文件读取—— video_player、chewie、image_picker、file_picker
前言 简单学习一下几个比较好用的文件读取库 video_player 简介 用于视频播放 官方文档 https://pub-web.flutter-io.cn/packages/video_player 安装 flutter pub add video_player加载网络视频 class _MyHomePageState extends State<MyHomePage> {// 控制器late…...
vim的使用
vim文本编辑器 vim介绍命令模式光标移动选中内容复制内容粘贴内容删除撤销/恢复字符转换 编辑模式末行模式保存/退出查找行号显示文件切换 扩展 vim介绍 vim是Linux自带的文本编辑器,具有命令模式、编辑模式、末行模式三种模式。 模式间的切换: 命令模…...
马氏杆法检查斜视
使用 检查水平向斜视时,使用水平向马氏杆检查;重直向斜视时,使用重直问马氏杆;检查旋转斜视时,使用双马氏杆. 检查水平向斜视 双眼屈光不正全矫 双眼同时打开,右眼前加水平向马氏杆,左眼前不加 双眼同时观察点光源&…...
Mac电脑怎么使用“磁盘工具”修复磁盘
我们可以使用“磁盘工具”的“急救”功能来查找和修复磁盘错误。 “磁盘工具”可以查找和修复与 Mac 磁盘的格式及目录结构有关的错误。使用 Mac 时,错误可能会导致意外行为,而重大错误甚至可能会导致 Mac 彻底无法启动。 继续之前,请确保您…...
c++画出分割图像,水平线和垂直线
1、pca 找到图像某个区域的垂直线,并画出来 // 1、 斑块的框 血管二值化图,pca 找到垂直血管壁的直线, 还是根据斑块找主轴方向吧// Step 1: 提取斑块左右范围内的血管像素点坐标,std::vector<cv::Point> points;for (int y 0; y <…...
Python 程序设计入门(015)—— enumerate() 函数的用法
Python 程序设计入门(015)—— enumerate() 函数的用法 目录 Python 程序设计入门(015)—— enumerate() 函数的用法一、enumerate() 函数的语法二、为可迭代对象创建索引三、将字符串、列表等转换为字典1、将字符串转换为字典2、…...
__dict__属性
__dict__ 是 Python 中的一个特殊属性,通常存在于大多数 Python 对象中,用于存储该对象的可变属性。 以下是关于 __dict__ 的一些关键点和详细信息: 存储属性:对于大多数自定义的 Python 对象,__dict__ 属性包含了这个…...
k8s之Pod控制器
目录 一、Pod控制器及其功用二、pod控制器的多种类型2.1 pod容器中的有状态和无状态的区别 三、Deployment 控制器四、SatefulSet 控制器4.1 StatefulSet由以下几个部分组成4.2 为什么要有headless?4.3 为什么要有volumeClaimTemplate?4.4 滚动更新4.5 扩…...
逆元(求乘法逆元的几种方法)
目录 逆元 加法逆元 乘法逆元 如何求 快速幂 扩展欧几里得 O(n)求1到n的乘法逆元 逆元 数学中,逆元素(英语:Inverse element)推广了加法中的加法逆元和乘法中的倒数。直观地说,它是一个可以取消另一给定元素运…...
没点本事,还真做不好数字化转型
数字化转型逐渐成为企业业务增长的利器 然而,在此过程中 企业最应该注重哪些? 效率?质量? 但还有一个至关重要的点不容忽视 那就是安全 有一家硬核企业通过技术与狠活 硬生生提升了应用安全性 保障了产业与数字化的安全融合…...
windows 10 远程桌面配置
1. 修改远程桌面端口(3389) 打开注册表(winr), 输入regedit 找到配置项【计算机\HKEY_LOCAL_MACHINE\SYSTEM\ControlSet001\Control\Terminal Server\Wds\rdpwd\Tds\tcp】 , 可以通过搜索“Wds”快速定位。 修改端口配…...
OpenStreetMap 上基于A*搜索算法的C ++路线规划项目
引言 在现代的地理信息系统(GIS)中,路线规划是一个重要的组成部分。它涉及到从一个地点到另一个地点的最优路径的确定。在这篇文章中,我们将探讨如何在OpenStreetMap数据上实现一个基于A*搜索算法的C路线规划项目。 OpenStreetM…...
【算法训练营Day07】字符串part1
文章目录 反转字符串反转字符串II替换数字 反转字符串 题目链接:344. 反转字符串 双指针法,两个指针的元素直接调转即可 class Solution {public void reverseString(char[] s) {int head 0;int end s.length - 1;while(head < end) {char temp …...
Rust 异步编程
Rust 异步编程 引言 Rust 是一种系统编程语言,以其高性能、安全性以及零成本抽象而著称。在多核处理器成为主流的今天,异步编程成为了一种提高应用性能、优化资源利用的有效手段。本文将深入探讨 Rust 异步编程的核心概念、常用库以及最佳实践。 异步编程基础 什么是异步…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
【开发技术】.Net使用FFmpeg视频特定帧上绘制内容
目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法,当前调用一个医疗行业的AI识别算法后返回…...
以光量子为例,详解量子获取方式
光量子技术获取量子比特可在室温下进行。该方式有望通过与名为硅光子学(silicon photonics)的光波导(optical waveguide)芯片制造技术和光纤等光通信技术相结合来实现量子计算机。量子力学中,光既是波又是粒子。光子本…...
人机融合智能 | “人智交互”跨学科新领域
本文系统地提出基于“以人为中心AI(HCAI)”理念的人-人工智能交互(人智交互)这一跨学科新领域及框架,定义人智交互领域的理念、基本理论和关键问题、方法、开发流程和参与团队等,阐述提出人智交互新领域的意义。然后,提出人智交互研究的三种新范式取向以及它们的意义。最后,总结…...
从面试角度回答Android中ContentProvider启动原理
Android中ContentProvider原理的面试角度解析,分为已启动和未启动两种场景: 一、ContentProvider已启动的情况 1. 核心流程 触发条件:当其他组件(如Activity、Service)通过ContentR…...
ubuntu22.04有线网络无法连接,图标也没了
今天突然无法有线网络无法连接任何设备,并且图标都没了 错误案例 往上一顿搜索,试了很多博客都不行,比如 Ubuntu22.04右上角网络图标消失 最后解决的办法 下载网卡驱动,重新安装 操作步骤 查看自己网卡的型号 lspci | gre…...
Neko虚拟浏览器远程协作方案:Docker+内网穿透技术部署实践
前言:本文将向开发者介绍一款创新性协作工具——Neko虚拟浏览器。在数字化协作场景中,跨地域的团队常需面对实时共享屏幕、协同编辑文档等需求。通过本指南,你将掌握在Ubuntu系统中使用容器化技术部署该工具的具体方案,并结合内网…...
算术操作符与类型转换:从基础到精通
目录 前言:从基础到实践——探索运算符与类型转换的奥秘 算术操作符超级详解 算术操作符:、-、*、/、% 赋值操作符:和复合赋值 单⽬操作符:、--、、- 前言:从基础到实践——探索运算符与类型转换的奥秘 在先前的文…...
