当前位置: 首页 > news >正文

学会RabbitMQ的延迟队列,提高消息处理效率

系列文章目录

手把手教你,本地RabbitMQ服务搭建(windows)
消息队列选型——为什么选择RabbitMQ
RabbitMQ灵活运用,怎么理解五种消息模型
RabbitMQ 能保证消息可靠性吗
推或拉? RabbitMQ 消费模式该如何选择
死信是什么,如何运用RabbitMQ的死信机制?
真的好用吗?鲜有人提的 RabbitMQ-RPC模式



在这里插入图片描述
前面我们讲到了RabbitMQ的死信队列,其实除了死信队列,RabbitMQ还有一个常用的延迟队列设计。今天,我们就来说一下这个延迟队列

📕作者简介:战斧,从事金融IT行业,有着多年一线开发、架构经验;爱好广泛,乐于分享,致力于创作更多高质量内容
📗本文收录于 RabbitMQ ,有需要者,可直接订阅专栏实时获取更新
📘高质量专栏 云原生、RabbitMQ、Spring全家桶 等仍在更新,欢迎指导
📙Zookeeper Redis kafka docker netty等诸多框架,以及架构与分布式专题即将上线,敬请期待


提示:以下是本篇文章正文内容,下面案例可供参考

一、什么是延迟队列?

延迟队列指的是当我们将消息发送到RabbitMQ时,可以指定消息的有效期或者消息需要在未来某个时间点才能被消费。这种消息被称为“延迟消息”。因此,RabbitMQ支持通过延迟队列来实现延迟消息的发送和消费。

二、延迟队列的实现

延迟队列的实现原理其实就是将消息放入到一个普通的队列中,只不过这个队列有一个特殊的属性:消息的消费被延迟一段时间。这个延迟时间可以是任意的,也可以是固定的。当消息进入队列时,会有一个定时器在计时,当计时器到达设定的时间时,消息会被转移至消费队列等待被消费。

在RabbitMQ中,延迟队列的实现有两种方式:一种是通过x-delayed-message插件实现;另一种是通过TTL(Time To Live)和死信队列实现。

1. x-delayed-message插件

x-delayed-message插件可以让RabbitMQ支持延迟消息功能,它是一个非官方插件,需要自行下载并安装。其源码地址如下:github地址 或 gitee地址;如果你是从笔者之前的安装博客 手把手教你,本地RabbitMQ服务搭建(windows) 过来的,那么你用的可能是RabbitMQ V3.12,可以直接下载我上传的资源 3.12-插件

首先,需要在RabbitMQ服务器上安装x-delayed-message插件。把上述的插件复制进我们RabbitMQ的服务插件目录下
在这里插入图片描述
然后执行插件的启用 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 即可
然后,在Java代码中定义queue、exchange和connectionFactory,代码如下:

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST_NAME);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setPort(PORT);Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-delayed-type", "direct");
channel.exchangeDeclare("delayed_exchange", "x-delayed-message", true, false, arguments);
channel.queueDeclare("delayed_queue", true, false, false, null);
channel.queueBind("delayed_queue", "delayed_exchange", "delayed_routing_key");

不难发现,此时其实是交换机在做延迟,
在这里插入图片描述

当然,除了交换机的设置,在发送消息时,还需要在消息头部设置x-delay属性,代码如下:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2);
builder.headers(new HashMap<String, Object>(){{put("x-delay", 5000);}});
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("delayed_exchange", "delayed_routing_key", properties, message.getBytes());

2. TTL + 死信队列

此种方式的原理其实我们在学习死信队列的时候应该就察觉到了,就是利用消息超时(TTL)后会转入死信交换机的机制,其模型如下:
在这里插入图片描述

首先,需要在Java代码中定义queue、exchange和connectionFactory,代码如下:

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST_NAME);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setPort(PORT);Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "dead_letter_exchange");
arguments.put("x-dead-letter-routing-key", "dead_letter_routing_key");
arguments.put("x-message-ttl", 5000);channel.exchangeDeclare("normal_exchange", "direct", true, false, null);
channel.exchangeDeclare("dead_letter_exchange", "direct", true, false, null);
channel.queueDeclare("normal_queue", true, false, false, arguments);
channel.queueDeclare("dead_letter_queue", true, false, false, null);
channel.queueBind("normal_queue", "normal_exchange", "normal_routing_key");
channel.queueBind("dead_letter_queue", "dead_letter_exchange", "dead_letter_routing_key");

在发送消息时,只需要将消息发送到normal_exchange交换机下,代码如下:

channel.basicPublish("normal_exchange", "normal_routing_key", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

三、手写延时队列

当然,除了RabbitMQ,实现延时队列的方式还有很多,我们甚至可以自己实现,本节,我们就尝试自己写个延时队列

1. 时间轮概念

在关于计时或定时的设计里,时间轮是一种用于处理定时任务的数据结构。它通过将时间划分为一系列的时刻,每个时刻对应一个槽,将任务存储在相应的槽中
在这里插入图片描述
时间轮通常包含多个槽和指针,其中指针指向当前时刻对应的槽,每过单位时间,指针就指向下一个槽,这样任务调度时按照指针的移动依次执行槽中的任务
在这里插入图片描述

2. JAVA演示

我们先使用JUC相关内容实现一个时间轮

import java.util.*;
import java.util.concurrent.*;class TimeWheel {private int size;private int currentIndex;private List<BlockingQueue<Task>> slots;private Executor executor;public TimeWheel(int size, Executor executor) {this.size = size;this.slots = new ArrayList<>(size);for (int i = 0; i < size; i++) {slots.add(new LinkedBlockingQueue<>());}this.executor = executor;}public void addTask(Task task) {int expireIndex = (int)(currentIndex + task.getDelay() / 1000) % size;slots.get(expireIndex).add(task);}public void start() {new Thread(() -> {while (true) {currentIndex = (currentIndex + 1) % size;BlockingQueue<Task> currentSlot = slots.get(currentIndex);List<Task> tasks = new ArrayList<>();currentSlot.drainTo(tasks);for (Task task : tasks) {executor.execute(task);}try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}).start();}
}class Task implements Runnable {private long delay; // 延迟时间,单位毫秒private Runnable task; // 任务public Task(long delay, Runnable task) {this.delay = delay;this.task = task;}public long getDelay() {return delay;}@Overridepublic void run() {task.run();}
}

我们可以使用main方法来尝试验证这个时间轮效果:

    public static void main(String[] args) {TimeWheel timeWheel = new TimeWheel(60 * 60, Executors.newFixedThreadPool(10));// 添加任务,延迟5秒执行timeWheel.addTask(new Task(5000, () -> System.out.println("Task 1 executed!")));// 添加任务,延迟10秒执行timeWheel.addTask(new Task(10000, () -> System.out.println("Task 2 executed!")));// 启动时间轮timeWheel.start();}

在这里插入图片描述

当然,以上代码只是一个简化的实现,实际情况中需要考虑任务执行时间和时间轮的精度等问题。

四、应用场景与注意事项

1. 应用场景

  1. 红包预告
    在现在的抢红包的场景下,当用户发起红包活动后,可能不希望立即开抢,而是设定在一段时间后开启。那么我们可以将将红包信息发送到一个延迟队列中,一定时间后,系统会自动激活红包,此时用户才可以真正抢红包
    在这里插入图片描述

  2. 订单系统
    在订单系统中,有一些订单需要在未来某个时间点才能被处理。例如,有些订单需要在一定的时间之后才能发货或者确认收货。这时候,我们可以将这些订单放到延迟队列中,当时间到达时再进行处理。

  3. 优惠券系统
    在优惠券系统中,有一些优惠券需要在未来某个时间点才能使用。这时候,我们可以将这些优惠券放到延迟队列中,当时间到达时再进行激活。

2. 注意事项

  1. 延迟队列不要使用太多
    使用延迟队列可以在一定程度上减少系统的负载,但是使用过多的延迟队列会导致系统变得更加复杂,维护起来也更加困难。

  2. 延迟队列可能会导致消息丢失
    在RabbitMQ中,当一个带有TTL消息被发送到队列中时,如果队列中的消息太多,或者队列的消费者速度太慢,就会导致消息失效,如果没有使用死信机制,消息就会被丢失。为了避免这种情况发生,我们需要对队列进行监控,及时发现问题并进行处理。

  3. 设置合适的延迟时间
    在使用延迟队列时,需要根据实际需求设置合适的延迟时间。如果延迟时间太短,可能会导致消息延迟效果不明显;如果延迟时间太长,可能会导致系统累积大量的消息,导致负载过高。

总结

RabbitMQ的延迟队列是一种非常实用的特性,可以帮助我们实现定时任务、限流、削峰等功能。但是,在使用延迟队列时,需要谨慎对待,根据实际需求设置合适的延迟时间,并及时监控队列中的消息,避免出现消息丢失的情况。

相关文章:

学会RabbitMQ的延迟队列,提高消息处理效率

系列文章目录 手把手教你&#xff0c;本地RabbitMQ服务搭建&#xff08;windows&#xff09; 消息队列选型——为什么选择RabbitMQ RabbitMQ灵活运用&#xff0c;怎么理解五种消息模型 RabbitMQ 能保证消息可靠性吗 推或拉&#xff1f; RabbitMQ 消费模式该如何选择 死信是什么…...

ChatGPT会取代搜索引擎吗?BingChat、GoogleBard与ChatGPT区别

目前暂时不会&#xff0c;ChatGPT为代表的聊天机器人很可能会直接集成到搜索中&#xff0c;而不是取代它。微软已经通过Bing Chat和Bing做到了这一点&#xff0c;它将“聊天”选项卡直接放入Bing搜索的菜单中。Google、百度也分别开始尝试通过其AI生成技术将Google Bard、文心一…...

多个QLabel中文字左右对其问题研究

众所周知&#xff0c;关于QLabel 中的文字对其方式&#xff0c;官方提供多种&#xff0c;具体可参考 AlignmentFlag&#xff0c;这里就不详细列举了。 实际开发中有这样一个需求&#xff1a;多个lab中&#xff0c;文字显示不同&#xff0c;长度不一&#xff0c;但想要实现视觉…...

链式二叉树统计结点个数的方法和bug

方法一&#xff1a; 分治&#xff1a;分而治之 int BTreeSize1(BTNode* root) {if (root NULL) return 0;else return BTreeSize(root->left)BTreeSize(root->right)1; } 方法二&#xff1a; 遍历计数&#xff1a;设置一个计数器&#xff0c;对二叉树正常访问&#…...

C语言-报错集锦-03-malloc(): memory corruption: 0x0000000001496d90 ***

一、报错信息 [2023-8]--[ Debug ]--Push Data To StAccessPath OK. [2023-8]--[ Debug ]--Judge Vertex(0) Is Not Accessed. [2023-8]--[ Debug ]--Judge Vertex(2) Is Accessed. [2023-8]--[ Debug ]--Judge Vertex(3) Is Not Accessed. [2023-8]--[ Debug ]--Judge Vertex…...

现代C++中的从头开始深度学习:【5/8】卷积

一、说明 在上一个故事中&#xff0c;我们介绍了机器学习的一些最相关的编码方面&#xff0c;例如 functional 规划、矢量化和线性代数规划。 现在&#xff0c;让我们通过使用 2D 卷积实现实际编码深度学习模型来开始我们的道路。让我们开始吧。 二、关于本系列 我们将学习如何…...

以太网帧格式与吞吐量计算

以太网帧结构 帧大小的定义 以太网单个最大帧 6&#xff08;目的MAC地址&#xff09; 6&#xff08;源MAC地址&#xff09; 2&#xff08;帧类型&#xff09; 1500{IP数据包[IP头&#xff08;20&#xff09;DATA&#xff08;1480&#xff09;]} 4&#xff08;CRC校验&#xff…...

vue中install方法

1&#xff1a;语法 vue提供install可供我们开发新的插件及全局注册组件等 install方法第一个参数是vue的构造器&#xff0c;第二个参数是可选的选项对象 export default {install(Vue,option){组件指令混入挂载vue原型} }2&#xff1a;注册组件 一&#xff1a;注册单个组件 1…...

Flutter:文件读取—— video_player、chewie、image_picker、file_picker

前言 简单学习一下几个比较好用的文件读取库 video_player 简介 用于视频播放 官方文档 https://pub-web.flutter-io.cn/packages/video_player 安装 flutter pub add video_player加载网络视频 class _MyHomePageState extends State<MyHomePage> {// 控制器late…...

vim的使用

vim文本编辑器 vim介绍命令模式光标移动选中内容复制内容粘贴内容删除撤销/恢复字符转换 编辑模式末行模式保存/退出查找行号显示文件切换 扩展 vim介绍 vim是Linux自带的文本编辑器&#xff0c;具有命令模式、编辑模式、末行模式三种模式。 模式间的切换&#xff1a; 命令模…...

马氏杆法检查斜视

使用 检查水平向斜视时&#xff0c;使用水平向马氏杆检查;重直向斜视时&#xff0c;使用重直问马氏杆;检查旋转斜视时&#xff0c;使用双马氏杆. 检查水平向斜视 双眼屈光不正全矫 双眼同时打开&#xff0c;右眼前加水平向马氏杆&#xff0c;左眼前不加 双眼同时观察点光源&…...

Mac电脑怎么使用“磁盘工具”修复磁盘

我们可以使用“磁盘工具”的“急救”功能来查找和修复磁盘错误。 “磁盘工具”可以查找和修复与 Mac 磁盘的格式及目录结构有关的错误。使用 Mac 时&#xff0c;错误可能会导致意外行为&#xff0c;而重大错误甚至可能会导致 Mac 彻底无法启动。 继续之前&#xff0c;请确保您…...

c++画出分割图像,水平线和垂直线

1、pca 找到图像某个区域的垂直线&#xff0c;并画出来 // 1、 斑块的框 血管二值化图&#xff0c;pca 找到垂直血管壁的直线, 还是根据斑块找主轴方向吧// Step 1: 提取斑块左右范围内的血管像素点坐标&#xff0c;std::vector<cv::Point> points;for (int y 0; y <…...

Python 程序设计入门(015)—— enumerate() 函数的用法

Python 程序设计入门&#xff08;015&#xff09;—— enumerate() 函数的用法 目录 Python 程序设计入门&#xff08;015&#xff09;—— enumerate() 函数的用法一、enumerate() 函数的语法二、为可迭代对象创建索引三、将字符串、列表等转换为字典1、将字符串转换为字典2、…...

__dict__属性

__dict__ 是 Python 中的一个特殊属性&#xff0c;通常存在于大多数 Python 对象中&#xff0c;用于存储该对象的可变属性。 以下是关于 __dict__ 的一些关键点和详细信息&#xff1a; 存储属性&#xff1a;对于大多数自定义的 Python 对象&#xff0c;__dict__ 属性包含了这个…...

k8s之Pod控制器

目录 一、Pod控制器及其功用二、pod控制器的多种类型2.1 pod容器中的有状态和无状态的区别 三、Deployment 控制器四、SatefulSet 控制器4.1 StatefulSet由以下几个部分组成4.2 为什么要有headless&#xff1f;4.3 为什么要有volumeClaimTemplate&#xff1f;4.4 滚动更新4.5 扩…...

逆元(求乘法逆元的几种方法)

目录 逆元 加法逆元 乘法逆元 如何求 快速幂 扩展欧几里得 O(n)求1到n的乘法逆元 逆元 数学中&#xff0c;逆元素&#xff08;英语&#xff1a;Inverse element&#xff09;推广了加法中的加法逆元和乘法中的倒数。直观地说&#xff0c;它是一个可以取消另一给定元素运…...

没点本事,还真做不好数字化转型

数字化转型逐渐成为企业业务增长的利器 然而&#xff0c;在此过程中 企业最应该注重哪些&#xff1f; 效率&#xff1f;质量&#xff1f; 但还有一个至关重要的点不容忽视 那就是安全 有一家硬核企业通过技术与狠活 硬生生提升了应用安全性 保障了产业与数字化的安全融合…...

windows 10 远程桌面配置

1. 修改远程桌面端口&#xff08;3389&#xff09; 打开注册表&#xff08;winr&#xff09;, 输入regedit 找到配置项【计算机\HKEY_LOCAL_MACHINE\SYSTEM\ControlSet001\Control\Terminal Server\Wds\rdpwd\Tds\tcp】 &#xff0c; 可以通过搜索“Wds”快速定位。 修改端口配…...

OpenStreetMap 上基于A*搜索算法的C ++路线规划项目

引言 在现代的地理信息系统&#xff08;GIS&#xff09;中&#xff0c;路线规划是一个重要的组成部分。它涉及到从一个地点到另一个地点的最优路径的确定。在这篇文章中&#xff0c;我们将探讨如何在OpenStreetMap数据上实现一个基于A*搜索算法的C路线规划项目。 OpenStreetM…...

Python|GIF 解析与构建(5):手搓截屏和帧率控制

目录 Python&#xff5c;GIF 解析与构建&#xff08;5&#xff09;&#xff1a;手搓截屏和帧率控制 一、引言 二、技术实现&#xff1a;手搓截屏模块 2.1 核心原理 2.2 代码解析&#xff1a;ScreenshotData类 2.2.1 截图函数&#xff1a;capture_screen 三、技术实现&…...

测试微信模版消息推送

进入“开发接口管理”--“公众平台测试账号”&#xff0c;无需申请公众账号、可在测试账号中体验并测试微信公众平台所有高级接口。 获取access_token: 自定义模版消息&#xff1a; 关注测试号&#xff1a;扫二维码关注测试号。 发送模版消息&#xff1a; import requests da…...

深入浅出:JavaScript 中的 `window.crypto.getRandomValues()` 方法

深入浅出&#xff1a;JavaScript 中的 window.crypto.getRandomValues() 方法 在现代 Web 开发中&#xff0c;随机数的生成看似简单&#xff0c;却隐藏着许多玄机。无论是生成密码、加密密钥&#xff0c;还是创建安全令牌&#xff0c;随机数的质量直接关系到系统的安全性。Jav…...

CentOS下的分布式内存计算Spark环境部署

一、Spark 核心架构与应用场景 1.1 分布式计算引擎的核心优势 Spark 是基于内存的分布式计算框架&#xff0c;相比 MapReduce 具有以下核心优势&#xff1a; 内存计算&#xff1a;数据可常驻内存&#xff0c;迭代计算性能提升 10-100 倍&#xff08;文档段落&#xff1a;3-79…...

【服务器压力测试】本地PC电脑作为服务器运行时出现卡顿和资源紧张(Windows/Linux)

要让本地PC电脑作为服务器运行时出现卡顿和资源紧张的情况&#xff0c;可以通过以下几种方式模拟或触发&#xff1a; 1. 增加CPU负载 运行大量计算密集型任务&#xff0c;例如&#xff1a; 使用多线程循环执行复杂计算&#xff08;如数学运算、加密解密等&#xff09;。运行图…...

零基础在实践中学习网络安全-皮卡丘靶场(第九期-Unsafe Fileupload模块)(yakit方式)

本期内容并不是很难&#xff0c;相信大家会学的很愉快&#xff0c;当然对于有后端基础的朋友来说&#xff0c;本期内容更加容易了解&#xff0c;当然没有基础的也别担心&#xff0c;本期内容会详细解释有关内容 本期用到的软件&#xff1a;yakit&#xff08;因为经过之前好多期…...

PAN/FPN

import torch import torch.nn as nn import torch.nn.functional as F import mathclass LowResQueryHighResKVAttention(nn.Module):"""方案 1: 低分辨率特征 (Query) 查询高分辨率特征 (Key, Value).输出分辨率与低分辨率输入相同。"""def __…...

虚拟电厂发展三大趋势:市场化、技术主导、车网互联

市场化&#xff1a;从政策驱动到多元盈利 政策全面赋能 2025年4月&#xff0c;国家发改委、能源局发布《关于加快推进虚拟电厂发展的指导意见》&#xff0c;首次明确虚拟电厂为“独立市场主体”&#xff0c;提出硬性目标&#xff1a;2027年全国调节能力≥2000万千瓦&#xff0…...

解读《网络安全法》最新修订,把握网络安全新趋势

《网络安全法》自2017年施行以来&#xff0c;在维护网络空间安全方面发挥了重要作用。但随着网络环境的日益复杂&#xff0c;网络攻击、数据泄露等事件频发&#xff0c;现行法律已难以完全适应新的风险挑战。 2025年3月28日&#xff0c;国家网信办会同相关部门起草了《网络安全…...

淘宝扭蛋机小程序系统开发:打造互动性强的购物平台

淘宝扭蛋机小程序系统的开发&#xff0c;旨在打造一个互动性强的购物平台&#xff0c;让用户在购物的同时&#xff0c;能够享受到更多的乐趣和惊喜。 淘宝扭蛋机小程序系统拥有丰富的互动功能。用户可以通过虚拟摇杆操作扭蛋机&#xff0c;实现旋转、抽拉等动作&#xff0c;增…...