当前位置: 首页 > news >正文

延时队列实现实战:如何利用 RabbitMQ 实现延时队列,以满足特定延迟处理需求

实现延时队列,可以通过RabbitMQ的死信队列(Dead-letter queue)特性,“死信队列”是当消息过期,或者队列达到最大长度时,未消费的消息会被加入到死信队列。然后,我们可以对死信队列中的消息进行消费,完成类似“延时”的效果。

下面的示例代码演示了如何在Spring Boot中使用RabbitMQ设置一个订单,然后在15分钟后自动取消。

1. 添加 RabbitMQ 依赖:

在你的pom.xml中加入Spring Boot对RabbitMQ的Starter:

xml

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 配置队列、交换器、绑定和容器

我们创建一个配置类,定义一个正常的队列和一个死信队列,以及相应的交换机和队列的绑定:

java

@Configuration
public class RabbitMQConfig {/* 正常队列配置 */@Beanpublic Queue orderQueue() {Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "dead_exchange");  // 队列消息过期后发送的交换器args.put("x-dead-letter-routing-key", "dead"); // 队列消息过期后发送的路由键return new Queue("order_queue", true, false, false, args);}@Beanpublic DirectExchange orderExchange() {return new DirectExchange("order_exchange");}@Beanpublic Binding orderBinding() {return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("order");}/* 死信队列配置 */@Beanpublic Queue deadQueue() {return new Queue("dead_queue");}@Beanpublic DirectExchange deadExchange() {return new DirectExchange("dead_exchange");}@Beanpublic Binding deadBinding() {return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");}
}

3. 发送延时消息

创建订单时,我们发送一个延时消息到队列:

java

@Autowired
private RabbitTemplate rabbitTemplate;public void createOrder(String orderId) {//订单创建业务...rabbitTemplate.convertAndSend("order_exchange", "order", orderId, message -> {message.getMessageProperties().setExpiration(String.valueOf(15 * 60 * 1000)); // 15分钟return message;});
}

4. 消费死信队列中的消息

然后,我们需要消费死信队列中的消息,进行订单取消的操作:

java

@RabbitListener(queues = "dead_queue")
public void processDeadLetter(String orderId) {// 订单取消业务...
}
  1. 在RabbitMQ中设置消息的过期时间
    RabbitMQ允许你在两个级别上设置消息的过期时间:队列级别和消息级别。一旦消息过期,它将从队列中删除。
  • 队列级别:你可以在声明队列时通过"x-message-ttl"参数设置过期时间(以毫秒为单位)。这会影响队列中的所有消息。

java

Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000); // 设置60s过期时间
Channel channel = ...;
channel.queueDeclare("my_queue", false, false, false, args); 
  • 消息级别:你也可以在发布消息时单独为每条消息设置过期时间。如果队列级别的TTL和消息级别的TTL都被设置了,那么较小的那个值会被应用。

java

byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().expiration("60000").build(); // 设置60s过期时间
Channel channel = ...;
channel.basicPublish("my_exchange", "my_routing_key", props, messageBodyBytes);

5、RabbitMQ中的死信队列如何工作?

“死信队列”用于接收不可路由的消息,或者由于一些原因不能正确处理的消息。这样能防止原始队列堵塞,也可以进一步处理这些消息。

当以下情况发生时,消息会被投递到死信队列:

  • 消息被拒绝 (basic.reject / basic.nack),并且requeue=false。
  • 在队列中排队的时间过长(超过设置的TTL)。
  • 队列长度溢出(超过设置的最大长度)。

在声明队列时,可以通过"x-dead-letter-exchange"和"x-dead-letter-routing-key"来设置死信交换器和路由键。

 6、如何确保消费者在消费消息时不会发生重复消费的情况?

确保消费者在消费消息时不发生重复消费,一般可以通过以下方式实现:

  • 消息确认机制:RabbitMQ提供了消息确认机制ACK,消费者处理完消息后,返回一个ACK信号,告诉RabbitMQ可以将该消息从队列中删除。如果在处理消息的过程中消费者出现异常(如断开连接),RabbitMQ将不会删除队列中的消息,并且会再次尝试发送该消息给消费者。
  • 幂等操作:在消费者端,确保消息处理逻辑是幂等的,也就是说,无论消息被处理一次还是多次,结果都是相同的。例如,对于一个扣款操作,无论这个操作执行多少次,账户的余额都应该只被扣除一次。
  • 分布式锁:在处理消息之前,消费者首先尝试获取一个分布式锁。只有获取到锁的消费者才能处理消息。处理完毕后释放锁。避免了同一消息被多个消费者处理的情况。

7、设置RabbitMQ中消息的优先级
在RabbitMQ中,对消息的优先级的支持是通过队列来实现的。在声明队列的时候,可以通过x-max-priority参数来指定队列支持的最大优先级。然后在发布消息的时候,可以通过basicPropertiespriority字段来指定消息的优先级。如下:

java

// 声明队列时设置最大优先级
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
channel.queueDeclare("my_queue", false, false, false, args);// 发布消息时设置优先级
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().priority(5).build();
channel.basicPublish("", "my_queue", props, "Hello world".getBytes());

注意,优先级较高的消息将会优先被消费,但是并不保证完全按照优先级顺序消费。

8、RabbitMQ中如何处理消费者异常断开连接的情况?
当RabbitMQ检测到消费者(如一个TCP连接)异常断开,例如因为消费者主机崩溃或因为网络问题,它将关闭该消费者的连接,并将消费者未确认的任何消息重新放入队列。如果你希望一个消息在消费者断开连接时不被再次放入队列,你可以设置该消费者的autoAck参数为true(也就是无需显示确认)。但一般情况下,我们推荐消费者在正确处理消息后发送一个确认应答(basicAck)。

相关文章:

延时队列实现实战:如何利用 RabbitMQ 实现延时队列,以满足特定延迟处理需求

实现延时队列&#xff0c;可以通过RabbitMQ的死信队列&#xff08;Dead-letter queue&#xff09;特性&#xff0c;“死信队列”是当消息过期&#xff0c;或者队列达到最大长度时&#xff0c;未消费的消息会被加入到死信队列。然后&#xff0c;我们可以对死信队列中的消息进行消…...

关于在Ubuntu上配置mysql踩的一些坑

最近准备换工作了&#xff0c;回顾了下学校时期做的那个webserver&#xff0c;又在linux下mysql踩了一些坑&#xff0c;特此记录下来 程序编译错误mysql.h: No such file or directory 云服务器缺少mysql必要的运行组件&#xff0c;安装&#xff1a; sudo apt-get install l…...

JSBridge原理 - 前端H5与客户端Native交互

1. 概述&#xff1a; 在混合应用开发中&#xff0c;一种常见且成熟的技术方案是将原生应用与 WebView 结合&#xff0c;使得复杂的业务逻辑可以通过网页技术实现。实现这种类型的混合应用时&#xff0c;就需要解决H5与Native之间的双向通信。JSBridge 是一种在混合应用中实现 …...

【Java EE】Spring请求如何传递参数详解

文章目录 &#x1f38d;传递单个参数&#x1f334;传递多个参数&#x1f340;传递对象&#x1f384;后端参数重命名&#xff08;后端参数映射&#xff09;&#x1f332;传递数组&#x1f38d;传递集合&#x1f334;传递JSON数据&#x1f338;JSON概念&#x1f338;JSON的语法&a…...

菜鸟笔记-Numpy常用函数用法汇总

NumPy&#xff08;Numerical Python的简称&#xff09;是Python中用于处理数组和矩阵的库&#xff0c;提供了大量的数学函数来操作这些数组。通过前面的学习&#xff0c;慢慢也能发现一些规律&#xff0c;以下是NumPy的一些常用函数及其用法汇总&#xff1a; 数组创建 numpy.a…...

tensorflow.js 如何使用opencv.js通过面部特征点估算脸部姿态并绘制示意图

文章目录 前言一、实现步骤1. 获取所需特征点的索引2. 使用opencv.js 计算俯仰角、水平角和翻滚角cv.solvePnP介绍cv.solvePnP原理运行代码查看效果 3.绘制姿态示意直线添加canvas元素计算姿态直线坐标并绘制 总结 前言 在计算机视觉领域&#xff0c;估算脸部姿态是一项具有挑…...

Linux命令-dpkg-divert命令(Debian Linux中创建并管理一个转向列表)

说明 dpkg-divert命令 是Debian Linux中创建并管理一个转向&#xff08;diversion&#xff09;列表&#xff0c;其使得安装文件的默认位置失效的工具。 语法 dpkg-divert(选项)(参数)选项 --add&#xff1a;添加一个转移文件&#xff1b; --remove&#xff1a;删除一个转移…...

flex: 1 是哪些属性的缩写?

flex&#xff1a;1是哪些属性的缩写? flex&#xff1a;1 是 flex-grow: 1, flex-shrink: 1,flex-basis: 0% 的缩写&#xff1b; 解释下flex-grow flex-grow是将剩余的空间&#xff0c;根据flex-grow的值平分&#xff0c;然后加到flex-basis上 <!doctype html> <htm…...

python基于opencv实现数籽粒

千粒重是一个重要的农艺性状&#xff0c;通过对其的测量和研究&#xff0c;我们可以更好地理解作物的生长状况&#xff0c;优化农业生产&#xff0c;提高作物产量和品质。但数籽粒数目是一个很繁琐和痛苦的过程&#xff0c;我们现在用一个简单的python程序来数水稻籽粒。代码的…...

OpenCV图像处理——基于OpenCV的ORB算法实现目标追踪

概述 ORB&#xff08;Oriented FAST and Rotated BRIEF&#xff09;算法是高效的关键点检测和描述方法。它结合了FAST&#xff08;Features from Accelerated Segment Test&#xff09;算法的快速关键点检测能力和BRIEF&#xff08;Binary Robust Independent Elementary Feat…...

13.JavaWeb XML:构建结构化数据的重要工具

目录 导语&#xff1a; 一、XML概念 &#xff08;1&#xff09;可拓展 &#xff08;2&#xff09;功能-存储数据 &#xff08;3&#xff09;xml与html的区别 二、XML内容 三、XML用途 四、案例&#xff1a;使用XML构建在线书店的书籍数据库 结语&#xff1a; 导语&…...

鸿蒙OS实战开发:【多设备自适应服务卡片】

介绍 服务卡片的布局和使用&#xff0c;其中卡片内容显示使用了一次开发&#xff0c;多端部署的能力实现多设备自适应。 用到了卡片扩展模块接口&#xff0c;[ohos.app.form.FormExtensionAbility] 。 卡片信息和状态等相关类型和枚举接口&#xff0c;[ohos.app.form.formInf…...

深度学习基础之一:机器学习

文章目录 深度学习基本概念(Basic concepts of deep learning)机器学习典型任务机器学习分类 模型训练的基本概念基本名词机器学习任务流程模型训练详细流程正、反向传播学习率Batch size激活函数激活函数 sigmoid 损失函数MSE & M交叉熵损失 优化器优化器 — 梯度下降优化…...

Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单视频处理实战案例 之五 简单指定视频某片段重复播放效果

Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单视频处理实战案例 之五 简单指定视频某片段重复播放效果 目录 Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单视频处理实战案例 之五 简单指定视频某片段重复播放效果 一、简单介绍 二、简单指定视频某片段重复播放…...

ARXML处理 - C#的解析代码(二)

参数类 参数容器&#xff08;ECUCPARAMCONFCONTAINERDEF&#xff09;的PARAMETERS集合类由以下参数类实例构成。 枚举参数&#xff08;ECUCENUMERATIONPARAMDEF &#xff09; 配置一个下拉选项&#xff0c;如PORT中一个pin可以配置SPI, CAN, PWM /// <remarks/>[Syste…...

关于华为即将举行的鸿蒙春季沟通会的新闻报道

华为计划在4月11日举办此次活动&#xff0c;届时将推出与车和PC类相关的新产品。尽管备受期待的华为P70系列设备的发布尚未得到官方确认&#xff0c;但已有多家媒体对此进行了报道。 文章中还提到了智界S7的新款可能在4月11日上市&#xff0c;并进行多项新功能升级。智界S7是去…...

MySQL视图及如何导入导出

1.视图 MySQL 视图&#xff08;View&#xff09;是一种虚拟存在的表&#xff0c;同真实表一样&#xff0c;视图也由列和行构成&#xff0c;但视图并不实际存在于数据库中。行和列的数据来自于定义视图的查询中所使用的表&#xff0c;并且还是在使用视图时动态生成的&#xff0…...

文心一言上线声音定制功能;通义千问开源模型;openAI又侵权?

文心一言上线定制专属声音功能 百度旗下 AI 聊天机器人文心一言上线新功能&#xff0c;用户录音一句话&#xff0c;即可定制声音。 使用这项功能需要使用文心一言 App。在创建智能体中&#xff0c;点击创建自己的声音&#xff0c;朗读系统提示的一句话&#xff0c;等候几秒钟时…...

课时89:流程控制_函数进阶_函数变量

2.1.4 综合案例 这一节&#xff0c;我们从 信息采集、环境部署、小结 三个方面来学习。 信息采集 脚本实践-采集系统负载信息 查看脚本内容 [rootlocalhost ~]# cat function_systemctl_load.sh #!/bin/bash # 功能&#xff1a;采集系统负载信息 # 版本&#xff1a;v0.3 # …...

Linux命令-dpkg-preconfigure命令(Debian Linux中软件包安装之前询问问题)

说明 dpkg-preconfigure命令 用于在Debian Linux中软件包安装之前询问问题。 语法 dpkg-preconfigure(选项)(参数)选项 -f&#xff1a;选择使用的前端&#xff1b; -p&#xff1a;感兴趣的最低的优先级问题&#xff1b; --apt&#xff1a;在apt模式下运行。参数 软件包&am…...

魔兽争霸3终极优化指南:WarcraftHelper专业级性能提升方案

魔兽争霸3终极优化指南&#xff1a;WarcraftHelper专业级性能提升方案 【免费下载链接】WarcraftHelper Warcraft III Helper , support 1.20e, 1.24e, 1.26a, 1.27a, 1.27b 项目地址: https://gitcode.com/gh_mirrors/wa/WarcraftHelper 还在为魔兽争霸3在现代硬件上的…...

与拼写检查 JSON 路径的差异:为何保密检查走 Markdown 摘要链:开源免费的WPS AI 软件 察元AI文档助手

与拼写检查 JSON 路径的差异:为何保密检查走 Markdown 摘要链 摘要 本文围绕标题所述主题,结合本仓库当前源码行进行说明。仅供技术理解与内部培训,不构成定密、法务或密码测评结论。文中代码块均摘自本地仓库对应路径与行号。 正文 0. 结论先行 结论先行:保密检查由内…...

计算机生成全息技术参数敏感性分析与优化策略

1. 计算机生成全息技术中的参数敏感性研究在光学工程领域&#xff0c;计算机生成全息&#xff08;Computer-Generated Holography, CGH&#xff09;技术正经历着从传统迭代算法到神经网络方法的范式转变。这项技术的核心挑战在于如何高效准确地重建目标光场——这本质上是一个相…...

切削液防锈成分消耗机理、三类防锈剂参数与补加管控实测

一、防锈成分消耗核心机理物理消耗&#xff1a;工件表面携带&#xff08;占比 35%&#xff09;、切屑比表面积吸附&#xff08;占比 40%&#xff09;&#xff1b;化学消耗&#xff1a;金属界面化学吸附&#xff08;15%&#xff09;、高温裂解&#xff08;5%&#xff09;、细菌降…...

杰理之把音量调到最高后暂停蓝牙音乐,再按播放后,音量会变小问题处理参考【篇】

由于苹果手机音量等级只有16级&#xff0c;当近端耳机音量调超过16级后&#xff08;比如20级&#xff09;...

Taotoken助力初创团队以可控成本构建AI应用原型

&#x1f680; 告别海外账号与网络限制&#xff01;稳定直连全球优质大模型&#xff0c;限时半价接入中。 &#x1f449; 点击领取海量免费额度 Taotoken助力初创团队以可控成本构建AI应用原型 对于资源有限的初创团队而言&#xff0c;快速验证AI功能是产品创新的关键一步&…...

智能代理模式在渗透测试中的应用:pentestagent架构解析与实战

1. 项目概述与核心价值最近在安全测试和红队演练的圈子里&#xff0c;一个名为GH05TCREW/pentestagent的项目开始被频繁提及。乍一看这个名字&#xff0c;你可能会觉得它又是一个普通的渗透测试工具集&#xff0c;但当你真正深入去研究和使用它时&#xff0c;会发现它的设计理念…...

TouchGFX SPI屏移植避坑全记录:从下载算法到分散加载.sct文件

TouchGFX SPI屏移植实战&#xff1a;破解下载算法与分散加载的三大技术难点 当一块240x320的SPI接口屏幕在STM32F412RET6上流畅渲染出60帧的TouchGFX界面时&#xff0c;我盯着示波器上稳定的时序信号长舒一口气——这已经是本周第三次重写W25Q64的下载算法。与官方文档描述的&…...

FakeLocation终极指南:三分钟掌握Android应用级虚拟定位黑科技

FakeLocation终极指南&#xff1a;三分钟掌握Android应用级虚拟定位黑科技 【免费下载链接】FakeLocation Xposed module to mock locations per app. 项目地址: https://gitcode.com/gh_mirrors/fak/FakeLocation 你是否曾想过在手机上"瞬间移动"到世界任何角…...

3大创新突破:APK Installer如何重新定义Windows上的Android应用体验

3大创新突破&#xff1a;APK Installer如何重新定义Windows上的Android应用体验 【免费下载链接】APK-Installer An Android Application Installer for Windows 项目地址: https://gitcode.com/GitHub_Trending/ap/APK-Installer 在当今跨平台应用需求日益增长的背景下…...