当前位置: 首页 > news >正文

学会RabbitMQ的延迟队列,提高消息处理效率

系列文章目录

手把手教你,本地RabbitMQ服务搭建(windows)
消息队列选型——为什么选择RabbitMQ
RabbitMQ灵活运用,怎么理解五种消息模型
RabbitMQ 能保证消息可靠性吗
推或拉? RabbitMQ 消费模式该如何选择
死信是什么,如何运用RabbitMQ的死信机制?
真的好用吗?鲜有人提的 RabbitMQ-RPC模式



在这里插入图片描述
前面我们讲到了RabbitMQ的死信队列,其实除了死信队列,RabbitMQ还有一个常用的延迟队列设计。今天,我们就来说一下这个延迟队列

📕作者简介:战斧,从事金融IT行业,有着多年一线开发、架构经验;爱好广泛,乐于分享,致力于创作更多高质量内容
📗本文收录于 RabbitMQ ,有需要者,可直接订阅专栏实时获取更新
📘高质量专栏 云原生、RabbitMQ、Spring全家桶 等仍在更新,欢迎指导
📙Zookeeper Redis kafka docker netty等诸多框架,以及架构与分布式专题即将上线,敬请期待


提示:以下是本篇文章正文内容,下面案例可供参考

一、什么是延迟队列?

延迟队列指的是当我们将消息发送到RabbitMQ时,可以指定消息的有效期或者消息需要在未来某个时间点才能被消费。这种消息被称为“延迟消息”。因此,RabbitMQ支持通过延迟队列来实现延迟消息的发送和消费。

二、延迟队列的实现

延迟队列的实现原理其实就是将消息放入到一个普通的队列中,只不过这个队列有一个特殊的属性:消息的消费被延迟一段时间。这个延迟时间可以是任意的,也可以是固定的。当消息进入队列时,会有一个定时器在计时,当计时器到达设定的时间时,消息会被转移至消费队列等待被消费。

在RabbitMQ中,延迟队列的实现有两种方式:一种是通过x-delayed-message插件实现;另一种是通过TTL(Time To Live)和死信队列实现。

1. x-delayed-message插件

x-delayed-message插件可以让RabbitMQ支持延迟消息功能,它是一个非官方插件,需要自行下载并安装。其源码地址如下:github地址 或 gitee地址;如果你是从笔者之前的安装博客 手把手教你,本地RabbitMQ服务搭建(windows) 过来的,那么你用的可能是RabbitMQ V3.12,可以直接下载我上传的资源 3.12-插件

首先,需要在RabbitMQ服务器上安装x-delayed-message插件。把上述的插件复制进我们RabbitMQ的服务插件目录下
在这里插入图片描述
然后执行插件的启用 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 即可
然后,在Java代码中定义queue、exchange和connectionFactory,代码如下:

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST_NAME);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setPort(PORT);Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-delayed-type", "direct");
channel.exchangeDeclare("delayed_exchange", "x-delayed-message", true, false, arguments);
channel.queueDeclare("delayed_queue", true, false, false, null);
channel.queueBind("delayed_queue", "delayed_exchange", "delayed_routing_key");

不难发现,此时其实是交换机在做延迟,
在这里插入图片描述

当然,除了交换机的设置,在发送消息时,还需要在消息头部设置x-delay属性,代码如下:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2);
builder.headers(new HashMap<String, Object>(){{put("x-delay", 5000);}});
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("delayed_exchange", "delayed_routing_key", properties, message.getBytes());

2. TTL + 死信队列

此种方式的原理其实我们在学习死信队列的时候应该就察觉到了,就是利用消息超时(TTL)后会转入死信交换机的机制,其模型如下:
在这里插入图片描述

首先,需要在Java代码中定义queue、exchange和connectionFactory,代码如下:

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST_NAME);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setPort(PORT);Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "dead_letter_exchange");
arguments.put("x-dead-letter-routing-key", "dead_letter_routing_key");
arguments.put("x-message-ttl", 5000);channel.exchangeDeclare("normal_exchange", "direct", true, false, null);
channel.exchangeDeclare("dead_letter_exchange", "direct", true, false, null);
channel.queueDeclare("normal_queue", true, false, false, arguments);
channel.queueDeclare("dead_letter_queue", true, false, false, null);
channel.queueBind("normal_queue", "normal_exchange", "normal_routing_key");
channel.queueBind("dead_letter_queue", "dead_letter_exchange", "dead_letter_routing_key");

在发送消息时,只需要将消息发送到normal_exchange交换机下,代码如下:

channel.basicPublish("normal_exchange", "normal_routing_key", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

三、手写延时队列

当然,除了RabbitMQ,实现延时队列的方式还有很多,我们甚至可以自己实现,本节,我们就尝试自己写个延时队列

1. 时间轮概念

在关于计时或定时的设计里,时间轮是一种用于处理定时任务的数据结构。它通过将时间划分为一系列的时刻,每个时刻对应一个槽,将任务存储在相应的槽中
在这里插入图片描述
时间轮通常包含多个槽和指针,其中指针指向当前时刻对应的槽,每过单位时间,指针就指向下一个槽,这样任务调度时按照指针的移动依次执行槽中的任务
在这里插入图片描述

2. JAVA演示

我们先使用JUC相关内容实现一个时间轮

import java.util.*;
import java.util.concurrent.*;class TimeWheel {private int size;private int currentIndex;private List<BlockingQueue<Task>> slots;private Executor executor;public TimeWheel(int size, Executor executor) {this.size = size;this.slots = new ArrayList<>(size);for (int i = 0; i < size; i++) {slots.add(new LinkedBlockingQueue<>());}this.executor = executor;}public void addTask(Task task) {int expireIndex = (int)(currentIndex + task.getDelay() / 1000) % size;slots.get(expireIndex).add(task);}public void start() {new Thread(() -> {while (true) {currentIndex = (currentIndex + 1) % size;BlockingQueue<Task> currentSlot = slots.get(currentIndex);List<Task> tasks = new ArrayList<>();currentSlot.drainTo(tasks);for (Task task : tasks) {executor.execute(task);}try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}).start();}
}class Task implements Runnable {private long delay; // 延迟时间,单位毫秒private Runnable task; // 任务public Task(long delay, Runnable task) {this.delay = delay;this.task = task;}public long getDelay() {return delay;}@Overridepublic void run() {task.run();}
}

我们可以使用main方法来尝试验证这个时间轮效果:

    public static void main(String[] args) {TimeWheel timeWheel = new TimeWheel(60 * 60, Executors.newFixedThreadPool(10));// 添加任务,延迟5秒执行timeWheel.addTask(new Task(5000, () -> System.out.println("Task 1 executed!")));// 添加任务,延迟10秒执行timeWheel.addTask(new Task(10000, () -> System.out.println("Task 2 executed!")));// 启动时间轮timeWheel.start();}

在这里插入图片描述

当然,以上代码只是一个简化的实现,实际情况中需要考虑任务执行时间和时间轮的精度等问题。

四、应用场景与注意事项

1. 应用场景

  1. 红包预告
    在现在的抢红包的场景下,当用户发起红包活动后,可能不希望立即开抢,而是设定在一段时间后开启。那么我们可以将将红包信息发送到一个延迟队列中,一定时间后,系统会自动激活红包,此时用户才可以真正抢红包
    在这里插入图片描述

  2. 订单系统
    在订单系统中,有一些订单需要在未来某个时间点才能被处理。例如,有些订单需要在一定的时间之后才能发货或者确认收货。这时候,我们可以将这些订单放到延迟队列中,当时间到达时再进行处理。

  3. 优惠券系统
    在优惠券系统中,有一些优惠券需要在未来某个时间点才能使用。这时候,我们可以将这些优惠券放到延迟队列中,当时间到达时再进行激活。

2. 注意事项

  1. 延迟队列不要使用太多
    使用延迟队列可以在一定程度上减少系统的负载,但是使用过多的延迟队列会导致系统变得更加复杂,维护起来也更加困难。

  2. 延迟队列可能会导致消息丢失
    在RabbitMQ中,当一个带有TTL消息被发送到队列中时,如果队列中的消息太多,或者队列的消费者速度太慢,就会导致消息失效,如果没有使用死信机制,消息就会被丢失。为了避免这种情况发生,我们需要对队列进行监控,及时发现问题并进行处理。

  3. 设置合适的延迟时间
    在使用延迟队列时,需要根据实际需求设置合适的延迟时间。如果延迟时间太短,可能会导致消息延迟效果不明显;如果延迟时间太长,可能会导致系统累积大量的消息,导致负载过高。

总结

RabbitMQ的延迟队列是一种非常实用的特性,可以帮助我们实现定时任务、限流、削峰等功能。但是,在使用延迟队列时,需要谨慎对待,根据实际需求设置合适的延迟时间,并及时监控队列中的消息,避免出现消息丢失的情况。

相关文章:

学会RabbitMQ的延迟队列,提高消息处理效率

系列文章目录 手把手教你&#xff0c;本地RabbitMQ服务搭建&#xff08;windows&#xff09; 消息队列选型——为什么选择RabbitMQ RabbitMQ灵活运用&#xff0c;怎么理解五种消息模型 RabbitMQ 能保证消息可靠性吗 推或拉&#xff1f; RabbitMQ 消费模式该如何选择 死信是什么…...

ChatGPT会取代搜索引擎吗?BingChat、GoogleBard与ChatGPT区别

目前暂时不会&#xff0c;ChatGPT为代表的聊天机器人很可能会直接集成到搜索中&#xff0c;而不是取代它。微软已经通过Bing Chat和Bing做到了这一点&#xff0c;它将“聊天”选项卡直接放入Bing搜索的菜单中。Google、百度也分别开始尝试通过其AI生成技术将Google Bard、文心一…...

多个QLabel中文字左右对其问题研究

众所周知&#xff0c;关于QLabel 中的文字对其方式&#xff0c;官方提供多种&#xff0c;具体可参考 AlignmentFlag&#xff0c;这里就不详细列举了。 实际开发中有这样一个需求&#xff1a;多个lab中&#xff0c;文字显示不同&#xff0c;长度不一&#xff0c;但想要实现视觉…...

链式二叉树统计结点个数的方法和bug

方法一&#xff1a; 分治&#xff1a;分而治之 int BTreeSize1(BTNode* root) {if (root NULL) return 0;else return BTreeSize(root->left)BTreeSize(root->right)1; } 方法二&#xff1a; 遍历计数&#xff1a;设置一个计数器&#xff0c;对二叉树正常访问&#…...

C语言-报错集锦-03-malloc(): memory corruption: 0x0000000001496d90 ***

一、报错信息 [2023-8]--[ Debug ]--Push Data To StAccessPath OK. [2023-8]--[ Debug ]--Judge Vertex(0) Is Not Accessed. [2023-8]--[ Debug ]--Judge Vertex(2) Is Accessed. [2023-8]--[ Debug ]--Judge Vertex(3) Is Not Accessed. [2023-8]--[ Debug ]--Judge Vertex…...

现代C++中的从头开始深度学习:【5/8】卷积

一、说明 在上一个故事中&#xff0c;我们介绍了机器学习的一些最相关的编码方面&#xff0c;例如 functional 规划、矢量化和线性代数规划。 现在&#xff0c;让我们通过使用 2D 卷积实现实际编码深度学习模型来开始我们的道路。让我们开始吧。 二、关于本系列 我们将学习如何…...

以太网帧格式与吞吐量计算

以太网帧结构 帧大小的定义 以太网单个最大帧 6&#xff08;目的MAC地址&#xff09; 6&#xff08;源MAC地址&#xff09; 2&#xff08;帧类型&#xff09; 1500{IP数据包[IP头&#xff08;20&#xff09;DATA&#xff08;1480&#xff09;]} 4&#xff08;CRC校验&#xff…...

vue中install方法

1&#xff1a;语法 vue提供install可供我们开发新的插件及全局注册组件等 install方法第一个参数是vue的构造器&#xff0c;第二个参数是可选的选项对象 export default {install(Vue,option){组件指令混入挂载vue原型} }2&#xff1a;注册组件 一&#xff1a;注册单个组件 1…...

Flutter:文件读取—— video_player、chewie、image_picker、file_picker

前言 简单学习一下几个比较好用的文件读取库 video_player 简介 用于视频播放 官方文档 https://pub-web.flutter-io.cn/packages/video_player 安装 flutter pub add video_player加载网络视频 class _MyHomePageState extends State<MyHomePage> {// 控制器late…...

vim的使用

vim文本编辑器 vim介绍命令模式光标移动选中内容复制内容粘贴内容删除撤销/恢复字符转换 编辑模式末行模式保存/退出查找行号显示文件切换 扩展 vim介绍 vim是Linux自带的文本编辑器&#xff0c;具有命令模式、编辑模式、末行模式三种模式。 模式间的切换&#xff1a; 命令模…...

马氏杆法检查斜视

使用 检查水平向斜视时&#xff0c;使用水平向马氏杆检查;重直向斜视时&#xff0c;使用重直问马氏杆;检查旋转斜视时&#xff0c;使用双马氏杆. 检查水平向斜视 双眼屈光不正全矫 双眼同时打开&#xff0c;右眼前加水平向马氏杆&#xff0c;左眼前不加 双眼同时观察点光源&…...

Mac电脑怎么使用“磁盘工具”修复磁盘

我们可以使用“磁盘工具”的“急救”功能来查找和修复磁盘错误。 “磁盘工具”可以查找和修复与 Mac 磁盘的格式及目录结构有关的错误。使用 Mac 时&#xff0c;错误可能会导致意外行为&#xff0c;而重大错误甚至可能会导致 Mac 彻底无法启动。 继续之前&#xff0c;请确保您…...

c++画出分割图像,水平线和垂直线

1、pca 找到图像某个区域的垂直线&#xff0c;并画出来 // 1、 斑块的框 血管二值化图&#xff0c;pca 找到垂直血管壁的直线, 还是根据斑块找主轴方向吧// Step 1: 提取斑块左右范围内的血管像素点坐标&#xff0c;std::vector<cv::Point> points;for (int y 0; y <…...

Python 程序设计入门(015)—— enumerate() 函数的用法

Python 程序设计入门&#xff08;015&#xff09;—— enumerate() 函数的用法 目录 Python 程序设计入门&#xff08;015&#xff09;—— enumerate() 函数的用法一、enumerate() 函数的语法二、为可迭代对象创建索引三、将字符串、列表等转换为字典1、将字符串转换为字典2、…...

__dict__属性

__dict__ 是 Python 中的一个特殊属性&#xff0c;通常存在于大多数 Python 对象中&#xff0c;用于存储该对象的可变属性。 以下是关于 __dict__ 的一些关键点和详细信息&#xff1a; 存储属性&#xff1a;对于大多数自定义的 Python 对象&#xff0c;__dict__ 属性包含了这个…...

k8s之Pod控制器

目录 一、Pod控制器及其功用二、pod控制器的多种类型2.1 pod容器中的有状态和无状态的区别 三、Deployment 控制器四、SatefulSet 控制器4.1 StatefulSet由以下几个部分组成4.2 为什么要有headless&#xff1f;4.3 为什么要有volumeClaimTemplate&#xff1f;4.4 滚动更新4.5 扩…...

逆元(求乘法逆元的几种方法)

目录 逆元 加法逆元 乘法逆元 如何求 快速幂 扩展欧几里得 O(n)求1到n的乘法逆元 逆元 数学中&#xff0c;逆元素&#xff08;英语&#xff1a;Inverse element&#xff09;推广了加法中的加法逆元和乘法中的倒数。直观地说&#xff0c;它是一个可以取消另一给定元素运…...

没点本事,还真做不好数字化转型

数字化转型逐渐成为企业业务增长的利器 然而&#xff0c;在此过程中 企业最应该注重哪些&#xff1f; 效率&#xff1f;质量&#xff1f; 但还有一个至关重要的点不容忽视 那就是安全 有一家硬核企业通过技术与狠活 硬生生提升了应用安全性 保障了产业与数字化的安全融合…...

windows 10 远程桌面配置

1. 修改远程桌面端口&#xff08;3389&#xff09; 打开注册表&#xff08;winr&#xff09;, 输入regedit 找到配置项【计算机\HKEY_LOCAL_MACHINE\SYSTEM\ControlSet001\Control\Terminal Server\Wds\rdpwd\Tds\tcp】 &#xff0c; 可以通过搜索“Wds”快速定位。 修改端口配…...

OpenStreetMap 上基于A*搜索算法的C ++路线规划项目

引言 在现代的地理信息系统&#xff08;GIS&#xff09;中&#xff0c;路线规划是一个重要的组成部分。它涉及到从一个地点到另一个地点的最优路径的确定。在这篇文章中&#xff0c;我们将探讨如何在OpenStreetMap数据上实现一个基于A*搜索算法的C路线规划项目。 OpenStreetM…...

Linux相关概念和易错知识点(42)(TCP的连接管理、可靠性、面临复杂网络的处理)

目录 1.TCP的连接管理机制&#xff08;1&#xff09;三次握手①握手过程②对握手过程的理解 &#xff08;2&#xff09;四次挥手&#xff08;3&#xff09;握手和挥手的触发&#xff08;4&#xff09;状态切换①挥手过程中状态的切换②握手过程中状态的切换 2.TCP的可靠性&…...

c++ 面试题(1)-----深度优先搜索(DFS)实现

操作系统&#xff1a;ubuntu22.04 IDE:Visual Studio Code 编程语言&#xff1a;C11 题目描述 地上有一个 m 行 n 列的方格&#xff0c;从坐标 [0,0] 起始。一个机器人可以从某一格移动到上下左右四个格子&#xff0c;但不能进入行坐标和列坐标的数位之和大于 k 的格子。 例…...

从零开始打造 OpenSTLinux 6.6 Yocto 系统(基于STM32CubeMX)(九)

设备树移植 和uboot设备树修改的内容同步到kernel将设备树stm32mp157d-stm32mp157daa1-mx.dts复制到内核源码目录下 源码修改及编译 修改arch/arm/boot/dts/st/Makefile&#xff0c;新增设备树编译 stm32mp157f-ev1-m4-examples.dtb \stm32mp157d-stm32mp157daa1-mx.dtb修改…...

网站指纹识别

网站指纹识别 网站的最基本组成&#xff1a;服务器&#xff08;操作系统&#xff09;、中间件&#xff08;web容器&#xff09;、脚本语言、数据厍 为什么要了解这些&#xff1f;举个例子&#xff1a;发现了一个文件读取漏洞&#xff0c;我们需要读/etc/passwd&#xff0c;如…...

【网络安全】开源系统getshell漏洞挖掘

审计过程&#xff1a; 在入口文件admin/index.php中&#xff1a; 用户可以通过m,c,a等参数控制加载的文件和方法&#xff0c;在app/system/entrance.php中存在重点代码&#xff1a; 当M_TYPE system并且M_MODULE include时&#xff0c;会设置常量PATH_OWN_FILE为PATH_APP.M_T…...

sshd代码修改banner

sshd服务连接之后会收到字符串&#xff1a; SSH-2.0-OpenSSH_9.5 容易被hacker识别此服务为sshd服务。 是否可以通过修改此banner达到让人无法识别此服务的目的呢&#xff1f; 不能。因为这是写的SSH的协议中的。 也就是协议规定了banner必须这么写。 SSH- 开头&#xff0c…...

【免费数据】2005-2019年我国272个地级市的旅游竞争力多指标数据(33个指标)

旅游业是一个城市的重要产业构成。旅游竞争力是一个城市竞争力的重要构成部分。一个城市的旅游竞争力反映了其在旅游市场竞争中的比较优势。 今日我们分享的是2005-2019年我国272个地级市的旅游竞争力多指标数据&#xff01;该数据集源自2025年4月发表于《地理学报》的论文成果…...

大数据驱动企业决策智能化的路径与实践

&#x1f4dd;个人主页&#x1f339;&#xff1a;慌ZHANG-CSDN博客 &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; 一、引言&#xff1a;数据驱动的企业竞争力重构 在这个瞬息万变的商业时代&#xff0c;“快者胜”的竞争逻辑愈发明显。企业如何在复杂环…...

简单介绍C++中 string与wstring

在C中&#xff0c;string和wstring是两种用于处理不同字符编码的字符串类型&#xff0c;分别基于char和wchar_t字符类型。以下是它们的详细说明和对比&#xff1a; 1. 基础定义 string 类型&#xff1a;std::string 字符类型&#xff1a;char&#xff08;通常为8位&#xff09…...

【题解-洛谷】P10480 可达性统计

题目&#xff1a;P10480 可达性统计 题目描述 给定一张 N N N 个点 M M M 条边的有向无环图&#xff0c;分别统计从每个点出发能够到达的点的数量。 输入格式 第一行两个整数 N , M N,M N,M&#xff0c;接下来 M M M 行每行两个整数 x , y x,y x,y&#xff0c;表示从 …...