当前位置: 首页 > article >正文

高级特性实战:死信队列、延迟队列与优先级队列(二)

三、延迟队列:实现任务定时执行

3.1 延迟队列概念解析

延迟队列(Delay Queue),是一种特殊的队列,它的独特之处在于队列中的元素(消息)并不会立即被处理,而是会在指定的延迟时间过后,才会被消费者取出并进行处理。简单来说,延迟队列就像是一个时间控制的消息容器,能够精确地按照设定的时间规则来释放消息,实现任务的定时执行。

延迟队列中的每个元素都关联了一个延迟时间,这个时间决定了元素在队列中的等待时长。在延迟时间未到达之前,元素会一直处于队列中等待,不会被消费。只有当延迟时间到期,元素才会被视为 “就绪” 状态,被投递到消费者进行处理 。这种特性使得延迟队列在很多对时间有精确要求的场景中发挥着重要作用,比如电商平台中的限时优惠活动、物流系统中的货物配送时间管理等。

3.2 延迟队列应用场景

延迟队列在实际业务中有广泛的应用场景,以下是一些常见的例子:

  • 定时任务执行:在很多系统中,都需要执行一些定时任务,如每天凌晨执行数据备份、每周一发送周报邮件等。使用延迟队列,可以将这些定时任务封装成消息,设置好延迟时间,放入延迟队列中。当延迟时间到达时,任务消息就会被取出执行,实现了定时任务的自动化处理,避免了使用复杂的定时任务框架。
  • 电商订单未支付超时处理:在电商购物流程中,当用户下单后,如果在规定的时间内(比如 30 分钟)未完成支付,订单需要被自动取消。利用延迟队列,在用户下单后,将订单信息作为消息放入延迟队列,并设置延迟时间为 30 分钟。30 分钟后,消息从延迟队列中被取出,系统可以检查订单状态,如果仍未支付,则自动取消订单,释放库存,保证了库存的有效管理和订单的正常流转。
  • 用户注册后未激活提醒:当用户注册新账号后,可能需要在一定时间内进行激活操作。如果用户在规定时间(如 24 小时)内未激活,系统可以通过延迟队列发送提醒邮件或短信。将用户注册信息和激活提醒任务封装成消息,设置 24 小时的延迟时间放入延迟队列。24 小时后,消息被取出,系统自动发送提醒通知,提高了用户激活率。

3.3 延迟队列实现方式

3.3.1 使用 RabbitMQ 的 TTL 和死信队列实现延迟队列

RabbitMQ 本身并没有直接提供延迟队列的功能,但我们可以巧妙地利用它的两个特性:消息的过期时间(Time-To-Live,TTL)和死信队列(Dead Letter Queue)来实现延迟队列。

其实现原理如下:首先,我们创建一个普通队列,并为该队列设置一个死信交换机(Dead Letter Exchange)和死信路由键(Dead Letter Routing Key)。同时,我们可以为发送到该队列的消息设置 TTL,或者直接为队列设置 TTL。当消息在队列中停留的时间超过了 TTL 值时,消息就会成为死信。由于我们之前配置了死信交换机和路由键,这些死信会被发送到指定的死信队列中。而消费者只需要监听死信队列,就可以获取到这些延迟处理的消息,从而实现了延迟队列的功能。

下面是一个使用 RabbitMQ 的 Java 客户端实现延迟队列的示例代码:

首先,引入 RabbitMQ 的 Java 客户端依赖:

 

<dependency>

<groupId>com.rabbitmq</groupId>

<artifactId>amqp-client</artifactId>

<version>5.14.2</version>

</dependency>

然后,编写配置类,声明队列、交换机和绑定关系:

 

import com.rabbitmq.client.AMQP;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import org.springframework.amqp.core.*;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

import java.util.Map;

@Configuration

public class RabbitMQConfig {

public static final String DELAY_EXCHANGE = "delay_exchange";

public static final String DELAY_QUEUE = "delay_queue";

public static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";

public static final String DEAD_LETTER_QUEUE = "dead_letter_queue";

public static final String ROUTING_KEY = "routing_key";

@Bean

public Queue delayQueue() {

Map<String, Object> args = new HashMap<>();

args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);

args.put("x-dead-letter-routing-key", ROUTING_KEY);

// 设置队列的TTL为10秒

args.put("x-message-ttl", 10000);

return QueueBuilder.durable(DELAY_QUEUE).withArguments(args).build();

}

@Bean

public DirectExchange delayExchange() {

return new DirectExchange(DELAY_EXCHANGE);

}

@Bean

public Binding delayBinding() {

return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(ROUTING_KEY);

}

@Bean

public Queue deadLetterQueue() {

return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();

}

@Bean

public DirectExchange deadLetterExchange() {

return new DirectExchange(DEAD_LETTER_EXCHANGE);

}

@Bean

public Binding deadLetterBinding() {

return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(ROUTING_KEY);

}

}

接着,编写生产者代码,向延迟队列发送消息:

 

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

@Component

public class Producer {

@Autowired

private RabbitTemplate rabbitTemplate;

public void send(String message) {

rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE, RabbitMQConfig.ROUTING_KEY, message);

System.out.println("Sent message: " + message);

}

}

最后,编写消费者代码,从死信队列中接收延迟处理的消息:

 

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

@Component

public class Consumer {

@RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE)

public void receive(String message) {

System.out.println("Received message: " + message);

}

}

在上述代码中,我们通过配置delayQueue的x-message-ttl参数设置了队列的 TTL 为 10 秒。生产者将消息发送到delay_exchange,经过 10 秒延迟后,消息进入dead_letter_queue,消费者从dead_letter_queue中接收并处理消息,实现了延迟队列的功能。

3.3.2 基于 JDK 的 DelayQueue 实现延迟队列

JDK 提供了DelayQueue类,它是一个无界的阻塞队列,用于存放实现了Delayed接口的对象。DelayQueue中的元素只有在其延迟时间到期时才能从队列中取出,非常适合用于实现延迟队列。

Delayed接口继承自Comparable接口,实现Delayed接口需要实现getDelay和compareTo方法。getDelay方法用于获取元素的剩余延迟时间,compareTo方法用于比较元素的延迟时间,以确定队列中元素的顺序。

以下是一个基于DelayQueue实现延迟队列的简单示例:

 

import java.util.concurrent.DelayQueue;

import java.util.concurrent.Delayed;

import java.util.concurrent.TimeUnit;

public class DelayQueueExample {

public static void main(String[] args) {

DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();

// 添加延迟任务,延迟3秒执行

delayQueue.offer(new DelayedTask(3000, "Task 1"));

// 添加延迟任务,延迟5秒执行

delayQueue.offer(new DelayedTask(5000, "Task 2"));

new Thread(() -> {

while (true) {

try {

DelayedTask task = delayQueue.take();

System.out.println("Executing task: " + task.getMessage() + " at " + System.currentTimeMillis());

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}).start();

}

static class DelayedTask implements Delayed {

private final long delayTime;

private final String message;

private final long expire;

public DelayedTask(long delayTime, String message) {

this.delayTime = delayTime;

this.message = message;

this.expire = System.currentTimeMillis() + delayTime;

}

@Override

public long getDelay(TimeUnit unit) {

return unit.convert(expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);

}

@Override

public int compareTo(Delayed other) {

long diff = this.expire - ((DelayedTask) other).expire;

if (diff == 0) {

return 0;

} else if (diff < 0) {

return -1;

} else {

return 1;

}

}

public String getMessage() {

return message;

}

}

}

在这个示例中,DelayedTask类实现了Delayed接口,并重写了getDelay和compareTo方法。DelayQueueExample类中创建了一个DelayQueue,并向其中添加了两个延迟任务。主线程启动一个新线程,不断从DelayQueue中取出到期的任务并执行,实现了简单的延迟队列功能。

3.3.3 利用 Redis 的有序集合 ZSet 实现延迟队列

Redis 的有序集合(Sorted Set,简称 ZSet)是一种非常适合实现延迟队列的数据结构。在 ZSet 中,每个元素都关联一个分数(score),集合会根据分数对元素进行排序。我们可以利用这一特性,将消息的执行时间作为分数,消息内容作为元素,实现延迟队列。

实现原理如下:生产者将消息及其延迟时间(转换为时间戳作为分数)添加到 ZSet 中。消费者通过不断轮询 ZSet,获取当前时间戳之前的元素(即延迟时间已到的消息),并进行处理。处理完成后,将该元素从 ZSet 中删除。

以下是一个使用 Redis 的 Java 客户端 Jedis 实现延迟队列的简单示例代码:

首先,引入 Jedis 依赖:

 

<dependency>

<groupId>redis.clients</groupId>

<artifactId>jedis</artifactId>

<version>3.8.0</version>

</dependency>

然后,编写生产者代码,向 ZSet 中添加延迟消息:

 

import redis.clients.jedis.Jedis;

public class RedisProducer {

private static final String DELAY_QUEUE_KEY = "delay_queue";

public static void main(String[] args) {

try (Jedis jedis = new Jedis("localhost", 6379)) {

// 发送延迟消息,延迟5秒

long delayTime = System.currentTimeMillis() + 5000;

jedis.zadd(DELAY_QUEUE_KEY, delayTime, "Message 1");

System.out.println("Sent message to Redis delay queue.");

}

}

}

接着,编写消费者代码,从 ZSet 中获取并处理延迟消息:

 

import redis.clients.jedis.Jedis;

import redis.clients.jedis.Tuple;

import java.util.Set;

public class RedisConsumer {

private static final String DELAY_QUEUE_KEY = "delay_queue";

public static void main(String[] args) {

new Thread(() -> {

try (Jedis jedis = new Jedis("localhost", 6379)) {

while (true) {

long currentTime = System.currentTimeMillis();

// 获取当前时间之前的消息

Set<Tuple> messages = jedis.zrangeByScoreWithScores(DELAY_QUEUE_KEY, 0, currentTime);

for (Tuple message : messages) {

System.out.println("Received message: " + message.getElement());

// 处理完消息后,从ZSet中删除

jedis.zrem(DELAY_QUEUE_KEY, message.getElement());

}

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}).start();

}

}

在上述代码中,RedisProducer将带有延迟时间的消息添加到 Redis 的 ZSet 中,RedisConsumer通过不断轮询 ZSet,获取并处理延迟时间已到的消息,实现了基于 Redis ZSet 的延迟队列功能。

相关文章:

高级特性实战:死信队列、延迟队列与优先级队列(二)

三、延迟队列&#xff1a;实现任务定时执行 3.1 延迟队列概念解析 延迟队列&#xff08;Delay Queue&#xff09;&#xff0c;是一种特殊的队列&#xff0c;它的独特之处在于队列中的元素&#xff08;消息&#xff09;并不会立即被处理&#xff0c;而是会在指定的延迟时间过后…...

VR 电缆故障测试系统:技术革新​

VR 电缆故障测试系统&#xff0c;作为电力领域的创新科技成果&#xff0c;融合了虚拟现实技术、三维建模、实时交互等前沿技术&#xff0c;为电缆故障测试带来了全新的解决方案。它的工作原理犹如一位经验丰富的侦探&#xff0c;通过层层线索&#xff0c;精准地锁定电缆故障的位…...

Rocky Linux上安装Go

使用官方二进制包安装 1. 下载 Go 官方二进制包 cd /tmp wget https://go.dev/dl/go1.22.3.linux-amd64.tar.gz2. 解压并安装到 /usr/local sudo rm -rf /usr/local/go # 如果之前有旧版本先删除 sudo tar -C /usr/local -xzf go1.22.3.linux-amd64.tar.gz3. 设置环境变量…...

深度学习论文: FastVLM: Efficient Vision Encoding for Vision Language Models

深度学习论文: FastVLM: Efficient Vision Encoding for Vision Language Models FastVLM: Efficient Vision Encoding for Vision Language Models PDF: https://www.arxiv.org/abs/2412.13303 PyTorch代码: https://github.com/shanglianlm0525/CvPytorch PyTorch代码: https…...

白杨SEO:做AI搜索优化的DeepSeek、豆包、Kimi、百度文心一言、腾讯元宝、通义、智谱、天工等AI生成内容信息采集主要来自哪?占比是多少?

大家好&#xff0c;我是白杨SEO&#xff0c;专注SEO十年以上&#xff0c;全网SEO流量实战派&#xff0c;AI搜索优化研究者。 在开始写之前&#xff0c;先说个抱歉。 上周在上海客户以及线下聚会AI搜索优化分享说各大AI模型的联网搜索是关闭的&#xff0c;最开始上来确实是的。…...

显示docker桌面,vnc远程连接docker

目录 相关概念&#xff1a; 实现步骤&#xff1a; 1.启动docker容器 2.安装x11 3.Docker 容器中安装一个完整的图形桌面&#xff08;XFCE&#xff09;和 VNC 远程桌面服务器&#xff08;TightVNC&#xff09; 4.配置vncservice 5.本地安装VNC Viewer连接VNC Viewer下载地…...

Web 端顶级视效实现:山海鲸端渲染底层原理与发布模式详解

大家好&#xff0c;欢迎大家回到山海鲸的渲染模式系列教程。昨天&#xff0c;我们看了一下山海鲸支持的3种渲染模式的整体概览。今天&#xff0c;我们就来看一下山海鲸支持的最基础的渲染模式&#xff0c;也就是端渲染的渲染设置。 1. 山海鲸的端渲染 我们说到端渲染&#xf…...

腾讯云国际站性能调优

全球化业务扩张中&#xff0c;云端性能直接决定用户体验与商业成败。腾讯云国际站通过资源适配、网络优化与存储革新&#xff0c;为企业提供全链路调优方案。 ​​资源精准适配​​ 实例选型需与业务场景深度耦合&#xff0c;计算优化型实例加速AI训练效率3倍&#xff0c;内存…...

深入解析操作系统内核与用户空间以及内核态与用户态转换

用户空间和内核空间的划分是现代操作系统的基础&#xff0c;对应用程序网络模型的设计和优化有着深远的影响。 内核空间与用户空间的分工 现代操作系统为了保证系统的稳定性和安全性&#xff0c;将虚拟内存空间划分为用户空间和内核空间。 一、用户空间 用户空间是用户程序…...

每日一题洛谷P8662 [蓝桥杯 2018 省 AB] 全球变暖c++

P8662 [蓝桥杯 2018 省 AB] 全球变暖 - 洛谷 (luogu.com.cn) DFS #include<iostream> using namespace std; int n, res; char a[1005][1005]; bool vis[1005][1005]; bool flag; int dx[4] { 0,0,1,-1 }; int dy[4] { 1,-1,0,0 }; void dfs(int x, int y) {vis[x][y]…...

【JVM】初识JVM 从字节码文件到类的生命周期

初识JVM JVM&#xff08;Java Virtual Machine&#xff09;即 Java 虚拟机&#xff0c;是 Java 技术的核心组件之一。JVM的本质就是运行在计算机上的一个程序&#xff0c;通过软件模拟实现了一台抽象的计算机的功能。JVM是Java程序的运行环境&#xff0c;负责加载字节码文件&a…...

多级体验体系构建:基于开源AI智能客服与AI智能名片的S2B2C商城小程序体验升级路径研究

摘要&#xff1a;在体验经济时代&#xff0c;传统企业单一的总部体验模式难以覆盖全链路用户需求。本文针对B端与C端体验深度差异&#xff0c;提出“一级总部体验—二级区域体验—三级终端体验”的分层架构&#xff0c;并引入“开源AI智能客服”与“AI智能名片”技术&#xff0…...

每日算法 -【Swift 算法】字符串转整数算法题详解:myAtoi 实现与正则表达式对比

Swift 字符串转整数算法题详解&#xff1a;myAtoi 实现与正则表达式对比 &#x1f9e9; 题目背景 LeetCode 上的经典算法题 8. String to Integer (atoi) 是一道考察字符串解析与边界处理的题目。这道题虽看似简单&#xff0c;但处理细节相当复杂。我们将使用 Swift 语言实现…...

记录一个难崩的bug

1.后端配置了 Filter 过滤器&#xff0c;如果再配置了Configuration ,那么会出现冲突吗&#xff1f; 过滤器与Configuration类本身无直接冲突&#xff0c;但需注意注册机制、执行顺序和依赖管理。通过显式控制过滤器的注册方式和优先级&#xff0c;结合Spring Security的链式配…...

Git切换历史版本及Gitee云绑定

1、git介绍 Git是目前世界上最先进的分布式版本控制系统 Linux <- BitKeeper&#xff08;不是开源的&#xff0c;但免费的&#xff0c;后来要收费&#xff09; Linus Torvalds(林纳斯托瓦兹) 两周时间吧&#xff0c;弄了个 Git&#xff1b;大约一个月就把Linux代码从BitK…...

智能外呼系统中 NLP 意图理解的工作原理与技术实现

智能外呼系统通过整合语音识别&#xff08;ASR&#xff09;、自然语言处理&#xff08;NLP&#xff09;和语音合成&#xff08;TTS&#xff09;等技术&#xff0c;实现了自动化的电话交互。其中&#xff0c;NLP 意图理解是核心模块&#xff0c;负责解析用户话语中的语义和意图&…...

服务器的IP是什么东西?

一、什么是服务器的IP地址&#xff1f; 服务器的IP地址是互联网协议&#xff08;Internet Protocol&#xff09;的缩写&#xff0c;是服务器在网络中的唯一数字标识符。它类似于现实生活中的门牌号&#xff0c;用于标识服务器在网络中的位置&#xff0c;使其他设备能够通过它与…...

[问题解决]:Unable to find image ‘containrrr/watchtower:latest‘ locally

一&#xff0c;问题 在使用docker安装部署新应用的时候&#xff0c;报错&#xff1a;Unable to find image containrrr/watchtower:latest locally 分析认为是当前docker的资源库里找不到这个软件的镜像&#xff0c;需要配置一个包含这个软件镜像的新的资源库。 二&#xff0…...

【文件上传】阿里云对象存储服务实现文件上传

一、基础 上传到本地&#xff1a; package org.example.controller;import lombok.extern.slf4j.Slf4j; import org.example.pojo.Result; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; imp…...

IPv6代理如何引领下一代网络未来

随着互联网技术的不断发展&#xff0c;IPv6逐渐成为下一代网络协议的核心&#xff0c;替代IPv4已是大势所趋。IPv6代理作为IPv6网络环境下的重要工具&#xff0c;为用户提供了更高效、更安全的网络解决方案。 IPv6代理的定义 IPv6代理是在IPv6网络环境中为处理IPv4转换和其他网…...

Linux——数据链路层

1. 认识以太网 认知&#xff1a;以太网是用于局域网数据通信的协议标准&#xff0c;定义了同一局域网内通过电缆/无线怎么在设备之间传输数据帧。 注&#xff1a;整个网络世界可以具象看出由许许多多的局域网组成&#xff0c; • 家庭中的设备A and 家庭中的设备B and 家庭路由…...

ubuntu 22.04 安装下载

ubuntu 22.04下载安装及相关配置_ubuntu22.04下载-CSDN博客...

深度学习面试八股简略速览

在准备深度学习面试时&#xff0c;你可能会感到有些不知所措。毕竟&#xff0c;深度学习是一个庞大且不断发展的领域&#xff0c;涉及众多复杂的技术和概念。但别担心&#xff0c;本文将为你提供一份全面的指南&#xff0c;从基础理论到实际应用&#xff0c;帮助你在面试中脱颖…...

【深度学习-pytorch篇】1. Pytorch矩阵操作与DataSet创建

Pytorch矩阵操作与DataSet创建 1. Python 环境配置 1.1 安装 Anaconda 推荐使用 Anaconda 来管理 Python 环境&#xff0c;访问官网下载安装&#xff1a; https://www.anaconda.com/download/success 1.2 安装 PyTorch 请根据自己的系统平台&#xff08;Windows/Linux/ma…...

游戏引擎学习第310天:利用网格划分完成排序加速优化

回顾并为今天的内容做个铺垫 昨天我们完成了一个用于排序的空间划分系统&#xff0c;但还没有机会真正利用它。昨天的工作刚好在结束时才完成&#xff0c;所以今天我们打算正式使用这个空间划分来加速排序。 现在我们在渲染代码中&#xff0c;可以看到在代码底部隐藏着一个“…...

数据结构 - 树的遍历

一、二叉树的遍历 对于二叉树&#xff0c;常用的遍历方式包括&#xff1a;先序遍历、中序遍历、后序遍历和层次遍历 。 1、先序遍历&#xff08;PreOrder&#xff09; 先序遍历的操作过程如下&#xff1a; 若二叉树为空&#xff0c;则什么也不做&#xff1b;否则&#xff0…...

时序模型介绍

一.整体介绍 1.单变量 vs 多变量时序数据 单变量就是只根据时间预测&#xff0c;多变量还要考虑用户 2.为什么不能用机器学习预测&#xff1a; a.时间不是影响标签的关键因素 b.时间与标签之间的联系过于弱/过于复杂&#xff0c;因此时序模型依赖于时间与时间的相关性来进行预…...

Java面试实战:从Spring到大数据的全栈挑战

Java面试实战&#xff1a;从Spring到大数据的全栈挑战 在某家知名互联网大厂&#xff0c;严肃的面试官正在面试一位名叫谢飞机的程序员。谢飞机以其搞笑的回答和对Java技术栈的独特见解而闻名。 第一轮&#xff1a;Spring与微服务的探索 面试官&#xff1a;“请你谈谈Spring…...

解决idea与springboot版本问题

遇到以下问题&#xff1a; 1、springboot3.2.0与jdk1.8 提示这个包org.springframework.web.bind.annotation不存在&#xff0c;但是pom已经引入了spring-boot-starter-web 2、Error:Cannot determine path to tools.jar library for 17 (D:/jdk17) 3、Error:(3, 28) java: …...

【第4章 图像与视频】4.4 离屏 canvas

文章目录 前言为什么要使用 offscreenCanvas为什么要使用 OffscreenCanvas如何使用 OffscreenCanvas第一种使用方式第二种使用方式 计算时长超过多长时间适合用Web Worker 前言 在 Canvas 开发中&#xff0c;我们经常需要处理复杂的图形和动画&#xff0c;这些操作可能会影响页…...