延时队列实现实战:如何利用 RabbitMQ 实现延时队列,以满足特定延迟处理需求
实现延时队列,可以通过RabbitMQ的死信队列(Dead-letter queue)特性,“死信队列”是当消息过期,或者队列达到最大长度时,未消费的消息会被加入到死信队列。然后,我们可以对死信队列中的消息进行消费,完成类似“延时”的效果。
下面的示例代码演示了如何在Spring Boot中使用RabbitMQ设置一个订单,然后在15分钟后自动取消。
1. 添加 RabbitMQ 依赖:
在你的pom.xml
中加入Spring Boot对RabbitMQ的Starter:
xml
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. 配置队列、交换器、绑定和容器
我们创建一个配置类,定义一个正常的队列和一个死信队列,以及相应的交换机和队列的绑定:
java
@Configuration
public class RabbitMQConfig {/* 正常队列配置 */@Beanpublic Queue orderQueue() {Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "dead_exchange"); // 队列消息过期后发送的交换器args.put("x-dead-letter-routing-key", "dead"); // 队列消息过期后发送的路由键return new Queue("order_queue", true, false, false, args);}@Beanpublic DirectExchange orderExchange() {return new DirectExchange("order_exchange");}@Beanpublic Binding orderBinding() {return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("order");}/* 死信队列配置 */@Beanpublic Queue deadQueue() {return new Queue("dead_queue");}@Beanpublic DirectExchange deadExchange() {return new DirectExchange("dead_exchange");}@Beanpublic Binding deadBinding() {return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");}
}
3. 发送延时消息
创建订单时,我们发送一个延时消息到队列:
java
@Autowired
private RabbitTemplate rabbitTemplate;public void createOrder(String orderId) {//订单创建业务...rabbitTemplate.convertAndSend("order_exchange", "order", orderId, message -> {message.getMessageProperties().setExpiration(String.valueOf(15 * 60 * 1000)); // 15分钟return message;});
}
4. 消费死信队列中的消息
然后,我们需要消费死信队列中的消息,进行订单取消的操作:
java
@RabbitListener(queues = "dead_queue")
public void processDeadLetter(String orderId) {// 订单取消业务...
}
- 在RabbitMQ中设置消息的过期时间
RabbitMQ允许你在两个级别上设置消息的过期时间:队列级别和消息级别。一旦消息过期,它将从队列中删除。
- 队列级别:你可以在声明队列时通过"x-message-ttl"参数设置过期时间(以毫秒为单位)。这会影响队列中的所有消息。
java
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000); // 设置60s过期时间
Channel channel = ...;
channel.queueDeclare("my_queue", false, false, false, args);
- 消息级别:你也可以在发布消息时单独为每条消息设置过期时间。如果队列级别的TTL和消息级别的TTL都被设置了,那么较小的那个值会被应用。
java
byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().expiration("60000").build(); // 设置60s过期时间
Channel channel = ...;
channel.basicPublish("my_exchange", "my_routing_key", props, messageBodyBytes);
5、RabbitMQ中的死信队列如何工作?
“死信队列”用于接收不可路由的消息,或者由于一些原因不能正确处理的消息。这样能防止原始队列堵塞,也可以进一步处理这些消息。
当以下情况发生时,消息会被投递到死信队列:
- 消息被拒绝 (basic.reject / basic.nack),并且requeue=false。
- 在队列中排队的时间过长(超过设置的TTL)。
- 队列长度溢出(超过设置的最大长度)。
在声明队列时,可以通过"x-dead-letter-exchange"和"x-dead-letter-routing-key"来设置死信交换器和路由键。
6、如何确保消费者在消费消息时不会发生重复消费的情况?
确保消费者在消费消息时不发生重复消费,一般可以通过以下方式实现:
- 消息确认机制:RabbitMQ提供了消息确认机制ACK,消费者处理完消息后,返回一个ACK信号,告诉RabbitMQ可以将该消息从队列中删除。如果在处理消息的过程中消费者出现异常(如断开连接),RabbitMQ将不会删除队列中的消息,并且会再次尝试发送该消息给消费者。
- 幂等操作:在消费者端,确保消息处理逻辑是幂等的,也就是说,无论消息被处理一次还是多次,结果都是相同的。例如,对于一个扣款操作,无论这个操作执行多少次,账户的余额都应该只被扣除一次。
- 分布式锁:在处理消息之前,消费者首先尝试获取一个分布式锁。只有获取到锁的消费者才能处理消息。处理完毕后释放锁。避免了同一消息被多个消费者处理的情况。
7、设置RabbitMQ中消息的优先级
在RabbitMQ中,对消息的优先级的支持是通过队列来实现的。在声明队列的时候,可以通过x-max-priority
参数来指定队列支持的最大优先级。然后在发布消息的时候,可以通过basicProperties
的priority
字段来指定消息的优先级。如下:
java
// 声明队列时设置最大优先级
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
channel.queueDeclare("my_queue", false, false, false, args);// 发布消息时设置优先级
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().priority(5).build();
channel.basicPublish("", "my_queue", props, "Hello world".getBytes());
注意,优先级较高的消息将会优先被消费,但是并不保证完全按照优先级顺序消费。
8、RabbitMQ中如何处理消费者异常断开连接的情况?
当RabbitMQ检测到消费者(如一个TCP连接)异常断开,例如因为消费者主机崩溃或因为网络问题,它将关闭该消费者的连接,并将消费者未确认的任何消息重新放入队列。如果你希望一个消息在消费者断开连接时不被再次放入队列,你可以设置该消费者的autoAck
参数为true
(也就是无需显示确认)。但一般情况下,我们推荐消费者在正确处理消息后发送一个确认应答(basicAck
)。
相关文章:
延时队列实现实战:如何利用 RabbitMQ 实现延时队列,以满足特定延迟处理需求
实现延时队列,可以通过RabbitMQ的死信队列(Dead-letter queue)特性,“死信队列”是当消息过期,或者队列达到最大长度时,未消费的消息会被加入到死信队列。然后,我们可以对死信队列中的消息进行消…...
关于在Ubuntu上配置mysql踩的一些坑
最近准备换工作了,回顾了下学校时期做的那个webserver,又在linux下mysql踩了一些坑,特此记录下来 程序编译错误mysql.h: No such file or directory 云服务器缺少mysql必要的运行组件,安装: sudo apt-get install l…...

JSBridge原理 - 前端H5与客户端Native交互
1. 概述: 在混合应用开发中,一种常见且成熟的技术方案是将原生应用与 WebView 结合,使得复杂的业务逻辑可以通过网页技术实现。实现这种类型的混合应用时,就需要解决H5与Native之间的双向通信。JSBridge 是一种在混合应用中实现 …...

【Java EE】Spring请求如何传递参数详解
文章目录 🎍传递单个参数🌴传递多个参数🍀传递对象🎄后端参数重命名(后端参数映射)🌲传递数组🎍传递集合🌴传递JSON数据🌸JSON概念🌸JSON的语法&a…...
菜鸟笔记-Numpy常用函数用法汇总
NumPy(Numerical Python的简称)是Python中用于处理数组和矩阵的库,提供了大量的数学函数来操作这些数组。通过前面的学习,慢慢也能发现一些规律,以下是NumPy的一些常用函数及其用法汇总: 数组创建 numpy.a…...

tensorflow.js 如何使用opencv.js通过面部特征点估算脸部姿态并绘制示意图
文章目录 前言一、实现步骤1. 获取所需特征点的索引2. 使用opencv.js 计算俯仰角、水平角和翻滚角cv.solvePnP介绍cv.solvePnP原理运行代码查看效果 3.绘制姿态示意直线添加canvas元素计算姿态直线坐标并绘制 总结 前言 在计算机视觉领域,估算脸部姿态是一项具有挑…...
Linux命令-dpkg-divert命令(Debian Linux中创建并管理一个转向列表)
说明 dpkg-divert命令 是Debian Linux中创建并管理一个转向(diversion)列表,其使得安装文件的默认位置失效的工具。 语法 dpkg-divert(选项)(参数)选项 --add:添加一个转移文件; --remove:删除一个转移…...
flex: 1 是哪些属性的缩写?
flex:1是哪些属性的缩写? flex:1 是 flex-grow: 1, flex-shrink: 1,flex-basis: 0% 的缩写; 解释下flex-grow flex-grow是将剩余的空间,根据flex-grow的值平分,然后加到flex-basis上 <!doctype html> <htm…...

python基于opencv实现数籽粒
千粒重是一个重要的农艺性状,通过对其的测量和研究,我们可以更好地理解作物的生长状况,优化农业生产,提高作物产量和品质。但数籽粒数目是一个很繁琐和痛苦的过程,我们现在用一个简单的python程序来数水稻籽粒。代码的…...

OpenCV图像处理——基于OpenCV的ORB算法实现目标追踪
概述 ORB(Oriented FAST and Rotated BRIEF)算法是高效的关键点检测和描述方法。它结合了FAST(Features from Accelerated Segment Test)算法的快速关键点检测能力和BRIEF(Binary Robust Independent Elementary Feat…...
13.JavaWeb XML:构建结构化数据的重要工具
目录 导语: 一、XML概念 (1)可拓展 (2)功能-存储数据 (3)xml与html的区别 二、XML内容 三、XML用途 四、案例:使用XML构建在线书店的书籍数据库 结语: 导语&…...

鸿蒙OS实战开发:【多设备自适应服务卡片】
介绍 服务卡片的布局和使用,其中卡片内容显示使用了一次开发,多端部署的能力实现多设备自适应。 用到了卡片扩展模块接口,[ohos.app.form.FormExtensionAbility] 。 卡片信息和状态等相关类型和枚举接口,[ohos.app.form.formInf…...

深度学习基础之一:机器学习
文章目录 深度学习基本概念(Basic concepts of deep learning)机器学习典型任务机器学习分类 模型训练的基本概念基本名词机器学习任务流程模型训练详细流程正、反向传播学习率Batch size激活函数激活函数 sigmoid 损失函数MSE & M交叉熵损失 优化器优化器 — 梯度下降优化…...

Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单视频处理实战案例 之五 简单指定视频某片段重复播放效果
Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单视频处理实战案例 之五 简单指定视频某片段重复播放效果 目录 Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单视频处理实战案例 之五 简单指定视频某片段重复播放效果 一、简单介绍 二、简单指定视频某片段重复播放…...
ARXML处理 - C#的解析代码(二)
参数类 参数容器(ECUCPARAMCONFCONTAINERDEF)的PARAMETERS集合类由以下参数类实例构成。 枚举参数(ECUCENUMERATIONPARAMDEF ) 配置一个下拉选项,如PORT中一个pin可以配置SPI, CAN, PWM /// <remarks/>[Syste…...
关于华为即将举行的鸿蒙春季沟通会的新闻报道
华为计划在4月11日举办此次活动,届时将推出与车和PC类相关的新产品。尽管备受期待的华为P70系列设备的发布尚未得到官方确认,但已有多家媒体对此进行了报道。 文章中还提到了智界S7的新款可能在4月11日上市,并进行多项新功能升级。智界S7是去…...
MySQL视图及如何导入导出
1.视图 MySQL 视图(View)是一种虚拟存在的表,同真实表一样,视图也由列和行构成,但视图并不实际存在于数据库中。行和列的数据来自于定义视图的查询中所使用的表,并且还是在使用视图时动态生成的࿰…...

文心一言上线声音定制功能;通义千问开源模型;openAI又侵权?
文心一言上线定制专属声音功能 百度旗下 AI 聊天机器人文心一言上线新功能,用户录音一句话,即可定制声音。 使用这项功能需要使用文心一言 App。在创建智能体中,点击创建自己的声音,朗读系统提示的一句话,等候几秒钟时…...
课时89:流程控制_函数进阶_函数变量
2.1.4 综合案例 这一节,我们从 信息采集、环境部署、小结 三个方面来学习。 信息采集 脚本实践-采集系统负载信息 查看脚本内容 [rootlocalhost ~]# cat function_systemctl_load.sh #!/bin/bash # 功能:采集系统负载信息 # 版本:v0.3 # …...
Linux命令-dpkg-preconfigure命令(Debian Linux中软件包安装之前询问问题)
说明 dpkg-preconfigure命令 用于在Debian Linux中软件包安装之前询问问题。 语法 dpkg-preconfigure(选项)(参数)选项 -f:选择使用的前端; -p:感兴趣的最低的优先级问题; --apt:在apt模式下运行。参数 软件包&am…...

手游刚开服就被攻击怎么办?如何防御DDoS?
开服初期是手游最脆弱的阶段,极易成为DDoS攻击的目标。一旦遭遇攻击,可能导致服务器瘫痪、玩家流失,甚至造成巨大经济损失。本文为开发者提供一套简洁有效的应急与防御方案,帮助快速应对并构建长期防护体系。 一、遭遇攻击的紧急应…...

【HarmonyOS 5.0】DevEco Testing:鸿蒙应用质量保障的终极武器
——全方位测试解决方案与代码实战 一、工具定位与核心能力 DevEco Testing是HarmonyOS官方推出的一体化测试平台,覆盖应用全生命周期测试需求,主要提供五大核心能力: 测试类型检测目标关键指标功能体验基…...

Linux相关概念和易错知识点(42)(TCP的连接管理、可靠性、面临复杂网络的处理)
目录 1.TCP的连接管理机制(1)三次握手①握手过程②对握手过程的理解 (2)四次挥手(3)握手和挥手的触发(4)状态切换①挥手过程中状态的切换②握手过程中状态的切换 2.TCP的可靠性&…...
Java多线程实现之Callable接口深度解析
Java多线程实现之Callable接口深度解析 一、Callable接口概述1.1 接口定义1.2 与Runnable接口的对比1.3 Future接口与FutureTask类 二、Callable接口的基本使用方法2.1 传统方式实现Callable接口2.2 使用Lambda表达式简化Callable实现2.3 使用FutureTask类执行Callable任务 三、…...

基于Docker Compose部署Java微服务项目
一. 创建根项目 根项目(父项目)主要用于依赖管理 一些需要注意的点: 打包方式需要为 pom<modules>里需要注册子模块不要引入maven的打包插件,否则打包时会出问题 <?xml version"1.0" encoding"UTF-8…...

智能仓储的未来:自动化、AI与数据分析如何重塑物流中心
当仓库学会“思考”,物流的终极形态正在诞生 想象这样的场景: 凌晨3点,某物流中心灯火通明却空无一人。AGV机器人集群根据实时订单动态规划路径;AI视觉系统在0.1秒内扫描包裹信息;数字孪生平台正模拟次日峰值流量压力…...
DeepSeek 技术赋能无人农场协同作业:用 AI 重构农田管理 “神经网”
目录 一、引言二、DeepSeek 技术大揭秘2.1 核心架构解析2.2 关键技术剖析 三、智能农业无人农场协同作业现状3.1 发展现状概述3.2 协同作业模式介绍 四、DeepSeek 的 “农场奇妙游”4.1 数据处理与分析4.2 作物生长监测与预测4.3 病虫害防治4.4 农机协同作业调度 五、实际案例大…...

Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习)
Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习) 一、Aspose.PDF 简介二、说明(⚠️仅供学习与研究使用)三、技术流程总览四、准备工作1. 下载 Jar 包2. Maven 项目依赖配置 五、字节码修改实现代码&#…...

Scrapy-Redis分布式爬虫架构的可扩展性与容错性增强:基于微服务与容器化的解决方案
在大数据时代,海量数据的采集与处理成为企业和研究机构获取信息的关键环节。Scrapy-Redis作为一种经典的分布式爬虫架构,在处理大规模数据抓取任务时展现出强大的能力。然而,随着业务规模的不断扩大和数据抓取需求的日益复杂,传统…...
土建施工员考试:建筑施工技术重点知识有哪些?
《管理实务》是土建施工员考试中侧重实操应用与管理能力的科目,核心考查施工组织、质量安全、进度成本等现场管理要点。以下是结合考试大纲与高频考点整理的重点内容,附学习方向和应试技巧: 一、施工组织与进度管理 核心目标: 规…...