Java后端中的延迟队列实现:使用Redis与RabbitMQ的不同策略
Java后端中的延迟队列实现:使用Redis与RabbitMQ的不同策略
大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿!
在后端开发中,延迟队列(Delayed Queue)是一种非常实用的设计,能够帮助我们在指定时间后处理某些任务。无论是订单超时处理、定时消息通知,还是需要延迟执行的任务,延迟队列都能为我们提供高效的解决方案。常见的实现延迟队列的策略有很多,其中Redis和RabbitMQ是两种流行的方案。本文将从这两种策略的角度探讨如何在Java后端中实现延迟队列。
一、什么是延迟队列?
延迟队列的基本原理是在消息被放入队列后,不会立即被消费,而是需要等到指定的时间后,消费者才能消费这些消息。延迟队列的典型应用场景包括:
- 延迟发送消息(如邮件或通知)
- 定时任务执行(如定期清理、自动过期)
- 超时订单的处理
下面,我们将分别介绍如何利用Redis和RabbitMQ实现延迟队列,并提供对应的Java代码示例。
二、基于Redis的延迟队列实现
Redis可以通过其Sorted Set(有序集合)和TTL机制来实现延迟队列。有序集合中的每个元素都有一个关联的分数,分数用于排序。我们可以将消息存入有序集合,并将当前时间戳加上延迟时间作为分数,这样我们就可以使用ZRANGEBYSCORE命令获取到期的消息。
Redis延迟队列的Java实现
package cn.juwatech.redis;import redis.clients.jedis.Jedis;import java.util.Set;public class RedisDelayQueue {private static final String DELAY_QUEUE_KEY = "delay_queue";private Jedis jedis;public RedisDelayQueue() {this.jedis = new Jedis("localhost", 6379);}// 添加任务到延迟队列public void addTask(String taskId, long delay) {long score = System.currentTimeMillis() + delay;jedis.zadd(DELAY_QUEUE_KEY, score, taskId);}// 轮询获取到期的任务public void pollTasks() {while (true) {long currentTime = System.currentTimeMillis();// 获取延迟时间已到的任务Set<String> tasks = jedis.zrangeByScore(DELAY_QUEUE_KEY, 0, currentTime);for (String task : tasks) {// 处理任务System.out.println("处理任务: " + task);// 移除已处理任务jedis.zrem(DELAY_QUEUE_KEY, task);}try {Thread.sleep(1000); // 每秒轮询一次} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}public static void main(String[] args) {RedisDelayQueue delayQueue = new RedisDelayQueue();delayQueue.addTask("task1", 5000); // 延迟5秒delayQueue.addTask("task2", 10000); // 延迟10秒// 启动轮询线程处理任务new Thread(delayQueue::pollTasks).start();}
}
代码解析:
- 我们使用Redis的有序集合来存储任务,每个任务有一个时间戳作为分数。
addTask方法将任务ID与延迟后的时间戳一同存入Redis。pollTasks方法定期从Redis中查询当前时间已到期的任务并处理。
Redis的延迟队列方案具有简单、轻量的优势,但由于需要轮询来检测任务是否到期,因此在高并发场景下可能存在性能瓶颈。
三、基于RabbitMQ的延迟队列实现
RabbitMQ提供了更专业的消息队列功能,并且可以通过插件的方式直接支持延迟队列。使用RabbitMQ的延迟队列有两种常见方式:一是基于TTL(Time-To-Live)和DLX(Dead Letter Exchange),二是使用RabbitMQ的延迟消息插件。
RabbitMQ延迟队列的Java实现
package cn.juwatech.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;public class RabbitMQDelayQueue {private static final String EXCHANGE_NAME = "delay_exchange";private static final String QUEUE_NAME = "delay_queue";private static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";private Connection connection;private Channel channel;public RabbitMQDelayQueue() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");this.connection = factory.newConnection();this.channel = connection.createChannel();// 声明死信交换机和队列Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);// 声明延迟队列,指定TTLchannel.queueDeclare(QUEUE_NAME, true, false, false, args);channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delay");}// 发送延迟消息public void sendDelayedMessage(String message, long delay) throws IOException {Map<String, Object> headers = new HashMap<>();headers.put("x-delay", delay);AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().headers(headers).expiration(String.valueOf(delay)) // TTL.build();channel.basicPublish(EXCHANGE_NAME, "delay", properties, message.getBytes());System.out.println("发送延迟消息: " + message + " 延迟: " + delay + " 毫秒");}// 消费消息public void consumeMessage() throws IOException {channel.basicConsume(QUEUE_NAME, true, (consumerTag, message) -> {String body = new String(message.getBody());System.out.println("接收到消息: " + body);}, consumerTag -> {});}public static void main(String[] args) throws IOException, TimeoutException {RabbitMQDelayQueue delayQueue = new RabbitMQDelayQueue();delayQueue.sendDelayedMessage("task1", 5000); // 延迟5秒delayQueue.sendDelayedMessage("task2", 10000); // 延迟10秒// 启动消费端delayQueue.consumeMessage();}
}
代码解析:
- 交换机和队列声明:我们声明了一个死信交换机,用于接收延迟消息。
- 发送延迟消息:在发送消息时,我们设置了TTL(消息存活时间),消息会在指定时间后转发到死信交换机,并最终到达目标队列。
- 消费消息:消费者会从延迟队列中接收到消息,并进行处理。
RabbitMQ的延迟队列方案更加专业,适用于高并发、分布式环境下的消息延迟处理。而且,通过使用RabbitMQ的原生插件,我们可以轻松管理延迟消息的精度和性能。
四、Redis与RabbitMQ延迟队列的对比
| 特性 | Redis | RabbitMQ |
|---|---|---|
| 实现复杂度 | 简单,通过Sorted Set实现 | 较复杂,需要配置TTL和DLX机制 |
| 性能 | 适合中小型任务,性能取决于轮询效率 | 高并发场景表现优异,专业队列系统 |
| 延迟精度 | 受轮询间隔影响 | 延迟精度高,TTL直接控制 |
| 可扩展性 | 难以扩展,需依赖分布式锁等机制 | 天然支持分布式、消息队列 |
| 可靠性 | 数据持久化机制简单 | 提供强大的消息持久化与确认机制 |
五、应用场景分析
- Redis延迟队列更适合任务量不大、处理相对简单的场景,例如订单超时提醒、限时优惠处理等。
- RabbitMQ延迟队列适合需要处理高并发、大规模任务调度的场景,如电商订单、支付系统中的延时扣款和分布式任务调度等。
结语
在Java后端开发中,延迟队列是实现定时任务和延迟消息处理的有效手段。通过Redis和RabbitMQ这两种不同的技术栈,我们可以灵活选择适合自己业务场景的延迟队列方案。Redis简单易用,适合小型任务;Rabbit
MQ功能强大,能够处理复杂的分布式延迟任务。通过合理的选择和配置,我们可以提升系统的性能与可扩展性。
本文著作权归聚娃科技微赚淘客系统开发者团队,转载请注明出处!
相关文章:
Java后端中的延迟队列实现:使用Redis与RabbitMQ的不同策略
Java后端中的延迟队列实现:使用Redis与RabbitMQ的不同策略 大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿! 在后端开发中,延迟队列(Delayed Queue)…...
Linux中使用cp命令的 -f 选项,但还是提醒覆盖的问题
问题: linux 在执行cp的命令的时候,就算是执行 cp -f 也还是会提醒是否要进行替换。 问题原因: 查看别名,alias命令,看到cp的别名为cp -i,那就是说cp本身就是自带覆盖提醒,就算我们加上-f 的…...
互联网技术的持续演进:从现在到未来
互联网技术的持续演进:从现在到未来 在过去的十年里,互联网技术发生了飞速变化。无论是大数据、人工智能,还是5G网络和物联网,每一种技术的突破都在改变我们的生活方式和工作模式。作为现代社会的核心驱动力,互联网技…...
vscode安装ESLint与Vetur插件后自动修复代码不生效
vscode安装ESLint与Vetur插件后自动修复代码不生效 1、安装ESLint 和 Vuter 2、运行结果 2.1、代码保存时代码中的分号;能被检测出来,但是不会自动修复 2.2、手动运行ESLint 修复命令(在终端中执行 npx eslint . --fix)可以修复问题 3、解决办法 在.vscode目录下setti…...
2848、与车相交的点
2848、[简单] 与车相交的点 1、题目描述 给你一个下标从 0 开始的二维整数数组 nums 表示汽车停放在数轴上的坐标。对于任意下标 i,nums[i] [starti, endi] ,其中 starti 是第 i 辆车的起点,endi 是第 i 辆车的终点。 返回数轴上被车 任意…...
基于k8s手动部署rabbitmq集群(Manually Deploying RabbitMQ Cluster Based on k8s)
💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:Linux运维老纪的首页…...
mybatis 配置文件完成增删改查(四) :多条件 动态sql查询
文章目录 就是你在接收数据时,有的查询条件不写,也能从查到相应的stauts也可能为空恒等式标签 代替where关键字 就是你在接收数据时,有的查询条件不写,也能从查到相应的 注意是写字段名 还是 属性名 companyName不写也能查出满足…...
先楫HPM6750 Windows下VSCode开发环境配置
用的是EVKmini,ft2232作为调试器jtag接口调试 启动start_gui.exe 以hello_world为例,更改一下build path,可以generate并使用gcc compile 最后会得到这些 点击start_gui里面的命令行,用命令行启动vscode 新建.vscode文件夹&…...
【JavaScript】LeetCode:41-45
文章目录 41 排序链表42 合并k个升序链表43 LRU缓存44 二叉树的中序遍历45 二叉树的最大深度 41 排序链表 递归 归并排序找到链表中心点,从中心点将链表一分为二。奇数个节点找中心点,偶数个节点找中心左边的点作为中心点。快慢指针找中心点,…...
数据结构(Day18)
一、周学习内容 1、9.18 数据结构(Day15)-CSDN博客 2、9.19 数据结构(Day16)-CSDN博客 3、9.20 链表 目的 插入删除不需要移动任何节点(元素)。 不需要预估存储空间大小,长度动态增长或减小。…...
error: ‘InsertAtTop‘ was not declared in this scope
Qt编译错误记录: 报错:error: ‘InsertAtTop’ was not declared in this scope ui->comboBoxJob->setInsertPolicy(InsertAtTop);这行代码在Qt中编译就会报这个错误,原因是输入参数需要加类名限定,改为: ui-…...
MySQL缓冲池详解
Buffer Pool 本文参考开源项目:小林coding在线文档; 01-缓冲池概述 在MySQL查询数据的时候,是通过存储引擎去磁盘做IO来获取数据库中的数据,这样每次查询一条数据都要去做一次或者多次磁盘的IO,无疑是非常慢的。…...
【我的 PWN 学习手札】tcache stash with fastbin double free —— tcache key 绕过
参考看雪课程:PWN 探索篇 前言 tcache key 的引入使得 tcache dup 利用出现了困难。除了简单利用 UAF 覆写 key 或者House Of Karui 之外,还可以利用 ptmalloc 中的其他机制进行绕过。 一、Tcache Stash with Fastbin Double Free 之前是 double free …...
How can I stream a response from LangChain‘s OpenAI using Flask API?
题意:怎样在 Flask API 中使用 LangChain 的 OpenAI 模型流式传输响应 问题背景: I am using Python Flask app for chat over data. In the console I am getting streamable response directly from the OpenAI since I can enable streming with a f…...
什么是慢充优惠话费充值api?如何选择平台
一、话费充值api的定义 话费充值api是一种能够让开发者将话费充值功能集成到自己的平台的接口。通过接入话费充值api接口,就能够实现话费充值平台的搭建,从而为用户提供话费充值服务,这一接口主要适用于对话费充值有长期稳定需求的企业或者商…...
【MySQL 03】表的操作
目录 1.在数据库内创建表 2.表的查询 3.表的插入 往数据库中插入数据 4.表的修改 5.删除表 1.在数据库内创建表 create table 表名(字段1 字段1类型); 这样我们就创建好了一张表,我们可以进入hellosql目录下进行查看:所以在数据库内建立表…...
3、论文阅读:EnYOLO:一种基于图像增强的水下目标区域自适应实时检测框架
图像增强和目标检测的结合 前言介绍相关工作UIE 水下图像增强UOD 水下目标检测UDA 水下域自适应方法介绍训练过程推理过程网络概述多阶段训练策略Burn-In Stage(预热阶段)Mutual-Learning Stage(相互学习阶段)Domain-Adaptation Stage(领域适应阶段)多阶段训练策略算法介…...
MYSQL面试知识点手册
第一部分:MySQL 基础知识 1.1 MySQL 简介 MySQL 是世界上最流行的开源关系型数据库管理系统之一,它以性能卓越、稳定可靠和易用性而闻名。MySQL 主要应用在 Web 开发、大型互联网公司、企业级应用等场景,且广泛用于构建高并发、高可用的数据…...
排序算法的分析和应用
自己设计一个长度不小于10的乱序数组,用希尔排序,自己设定希尔排序参数 画出每一轮希尔排序的状态 自己设计一个长度不小于10的乱序数组,用堆排序,最终要生成升序数组,画出建堆后的状态 画出每一轮堆排序的状态 自…...
iptables限制网速
1、使用hashlimit来限速 #从eth0网卡进入INPUT链数据,使用模块hashlimit 限制网速为100kb/s或2mb/s,超过限制的数据包会被DROP。OUTPUT链同理,mode为srcip,有4个mode选项: srcip(默认匹配每个源地址IP,配置指定源地址…...
OpenLayers 可视化之热力图
注:当前使用的是 ol 5.3.0 版本,天地图使用的key请到天地图官网申请,并替换为自己的key 热力图(Heatmap)又叫热点图,是一种通过特殊高亮显示事物密度分布、变化趋势的数据可视化技术。采用颜色的深浅来显示…...
逻辑回归:给不确定性划界的分类大师
想象你是一名医生。面对患者的检查报告(肿瘤大小、血液指标),你需要做出一个**决定性判断**:恶性还是良性?这种“非黑即白”的抉择,正是**逻辑回归(Logistic Regression)** 的战场&a…...
JavaScript 中的 ES|QL:利用 Apache Arrow 工具
作者:来自 Elastic Jeffrey Rengifo 学习如何将 ES|QL 与 JavaScript 的 Apache Arrow 客户端工具一起使用。 想获得 Elastic 认证吗?了解下一期 Elasticsearch Engineer 培训的时间吧! Elasticsearch 拥有众多新功能,助你为自己…...
解决Ubuntu22.04 VMware失败的问题 ubuntu入门之二十八
现象1 打开VMware失败 Ubuntu升级之后打开VMware上报需要安装vmmon和vmnet,点击确认后如下提示 最终上报fail 解决方法 内核升级导致,需要在新内核下重新下载编译安装 查看版本 $ vmware -v VMware Workstation 17.5.1 build-23298084$ lsb_release…...
鱼香ros docker配置镜像报错:https://registry-1.docker.io/v2/
使用鱼香ros一件安装docker时的https://registry-1.docker.io/v2/问题 一键安装指令 wget http://fishros.com/install -O fishros && . fishros出现问题:docker pull 失败 网络不同,需要使用镜像源 按照如下步骤操作 sudo vi /etc/docker/dae…...
自然语言处理——循环神经网络
自然语言处理——循环神经网络 循环神经网络应用到基于机器学习的自然语言处理任务序列到类别同步的序列到序列模式异步的序列到序列模式 参数学习和长程依赖问题基于门控的循环神经网络门控循环单元(GRU)长短期记忆神经网络(LSTM)…...
Swagger和OpenApi的前世今生
Swagger与OpenAPI的关系演进是API标准化进程中的重要篇章,二者共同塑造了现代RESTful API的开发范式。 本期就扒一扒其技术演进的关键节点与核心逻辑: 🔄 一、起源与初创期:Swagger的诞生(2010-2014) 核心…...
CSS | transition 和 transform的用处和区别
省流总结: transform用于变换/变形,transition是动画控制器 transform 用来对元素进行变形,常见的操作如下,它是立即生效的样式变形属性。 旋转 rotate(角度deg)、平移 translateX(像素px)、缩放 scale(倍数)、倾斜 skewX(角度…...
数据库正常,但后端收不到数据原因及解决
从代码和日志来看,后端SQL查询确实返回了数据,但最终user对象却为null。这表明查询结果没有正确映射到User对象上。 在前后端分离,并且ai辅助开发的时候,很容易出现前后端变量名不一致情况,还不报错,只是单…...
触发DMA传输错误中断问题排查
在STM32项目中,集成BLE模块后触发DMA传输错误中断(DMA2_Stream1_IRQHandler进入错误流程),但单独运行BLE模块时正常,表明问题可能源于原有线程与BLE模块的交互冲突。以下是逐步排查与解决方案: 一、问题根源…...
