当前位置: 首页 > news >正文

延时队列实现实战:如何利用 RabbitMQ 实现延时队列,以满足特定延迟处理需求

实现延时队列,可以通过RabbitMQ的死信队列(Dead-letter queue)特性,“死信队列”是当消息过期,或者队列达到最大长度时,未消费的消息会被加入到死信队列。然后,我们可以对死信队列中的消息进行消费,完成类似“延时”的效果。

下面的示例代码演示了如何在Spring Boot中使用RabbitMQ设置一个订单,然后在15分钟后自动取消。

1. 添加 RabbitMQ 依赖:

在你的pom.xml中加入Spring Boot对RabbitMQ的Starter:

xml

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 配置队列、交换器、绑定和容器

我们创建一个配置类,定义一个正常的队列和一个死信队列,以及相应的交换机和队列的绑定:

java

@Configuration
public class RabbitMQConfig {/* 正常队列配置 */@Beanpublic Queue orderQueue() {Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "dead_exchange");  // 队列消息过期后发送的交换器args.put("x-dead-letter-routing-key", "dead"); // 队列消息过期后发送的路由键return new Queue("order_queue", true, false, false, args);}@Beanpublic DirectExchange orderExchange() {return new DirectExchange("order_exchange");}@Beanpublic Binding orderBinding() {return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("order");}/* 死信队列配置 */@Beanpublic Queue deadQueue() {return new Queue("dead_queue");}@Beanpublic DirectExchange deadExchange() {return new DirectExchange("dead_exchange");}@Beanpublic Binding deadBinding() {return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");}
}

3. 发送延时消息

创建订单时,我们发送一个延时消息到队列:

java

@Autowired
private RabbitTemplate rabbitTemplate;public void createOrder(String orderId) {//订单创建业务...rabbitTemplate.convertAndSend("order_exchange", "order", orderId, message -> {message.getMessageProperties().setExpiration(String.valueOf(15 * 60 * 1000)); // 15分钟return message;});
}

4. 消费死信队列中的消息

然后,我们需要消费死信队列中的消息,进行订单取消的操作:

java

@RabbitListener(queues = "dead_queue")
public void processDeadLetter(String orderId) {// 订单取消业务...
}
  1. 在RabbitMQ中设置消息的过期时间
    RabbitMQ允许你在两个级别上设置消息的过期时间:队列级别和消息级别。一旦消息过期,它将从队列中删除。
  • 队列级别:你可以在声明队列时通过"x-message-ttl"参数设置过期时间(以毫秒为单位)。这会影响队列中的所有消息。

java

Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000); // 设置60s过期时间
Channel channel = ...;
channel.queueDeclare("my_queue", false, false, false, args); 
  • 消息级别:你也可以在发布消息时单独为每条消息设置过期时间。如果队列级别的TTL和消息级别的TTL都被设置了,那么较小的那个值会被应用。

java

byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().expiration("60000").build(); // 设置60s过期时间
Channel channel = ...;
channel.basicPublish("my_exchange", "my_routing_key", props, messageBodyBytes);

5、RabbitMQ中的死信队列如何工作?

“死信队列”用于接收不可路由的消息,或者由于一些原因不能正确处理的消息。这样能防止原始队列堵塞,也可以进一步处理这些消息。

当以下情况发生时,消息会被投递到死信队列:

  • 消息被拒绝 (basic.reject / basic.nack),并且requeue=false。
  • 在队列中排队的时间过长(超过设置的TTL)。
  • 队列长度溢出(超过设置的最大长度)。

在声明队列时,可以通过"x-dead-letter-exchange"和"x-dead-letter-routing-key"来设置死信交换器和路由键。

 6、如何确保消费者在消费消息时不会发生重复消费的情况?

确保消费者在消费消息时不发生重复消费,一般可以通过以下方式实现:

  • 消息确认机制:RabbitMQ提供了消息确认机制ACK,消费者处理完消息后,返回一个ACK信号,告诉RabbitMQ可以将该消息从队列中删除。如果在处理消息的过程中消费者出现异常(如断开连接),RabbitMQ将不会删除队列中的消息,并且会再次尝试发送该消息给消费者。
  • 幂等操作:在消费者端,确保消息处理逻辑是幂等的,也就是说,无论消息被处理一次还是多次,结果都是相同的。例如,对于一个扣款操作,无论这个操作执行多少次,账户的余额都应该只被扣除一次。
  • 分布式锁:在处理消息之前,消费者首先尝试获取一个分布式锁。只有获取到锁的消费者才能处理消息。处理完毕后释放锁。避免了同一消息被多个消费者处理的情况。

7、设置RabbitMQ中消息的优先级
在RabbitMQ中,对消息的优先级的支持是通过队列来实现的。在声明队列的时候,可以通过x-max-priority参数来指定队列支持的最大优先级。然后在发布消息的时候,可以通过basicPropertiespriority字段来指定消息的优先级。如下:

java

// 声明队列时设置最大优先级
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
channel.queueDeclare("my_queue", false, false, false, args);// 发布消息时设置优先级
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().priority(5).build();
channel.basicPublish("", "my_queue", props, "Hello world".getBytes());

注意,优先级较高的消息将会优先被消费,但是并不保证完全按照优先级顺序消费。

8、RabbitMQ中如何处理消费者异常断开连接的情况?
当RabbitMQ检测到消费者(如一个TCP连接)异常断开,例如因为消费者主机崩溃或因为网络问题,它将关闭该消费者的连接,并将消费者未确认的任何消息重新放入队列。如果你希望一个消息在消费者断开连接时不被再次放入队列,你可以设置该消费者的autoAck参数为true(也就是无需显示确认)。但一般情况下,我们推荐消费者在正确处理消息后发送一个确认应答(basicAck)。

相关文章:

延时队列实现实战:如何利用 RabbitMQ 实现延时队列,以满足特定延迟处理需求

实现延时队列&#xff0c;可以通过RabbitMQ的死信队列&#xff08;Dead-letter queue&#xff09;特性&#xff0c;“死信队列”是当消息过期&#xff0c;或者队列达到最大长度时&#xff0c;未消费的消息会被加入到死信队列。然后&#xff0c;我们可以对死信队列中的消息进行消…...

关于在Ubuntu上配置mysql踩的一些坑

最近准备换工作了&#xff0c;回顾了下学校时期做的那个webserver&#xff0c;又在linux下mysql踩了一些坑&#xff0c;特此记录下来 程序编译错误mysql.h: No such file or directory 云服务器缺少mysql必要的运行组件&#xff0c;安装&#xff1a; sudo apt-get install l…...

JSBridge原理 - 前端H5与客户端Native交互

1. 概述&#xff1a; 在混合应用开发中&#xff0c;一种常见且成熟的技术方案是将原生应用与 WebView 结合&#xff0c;使得复杂的业务逻辑可以通过网页技术实现。实现这种类型的混合应用时&#xff0c;就需要解决H5与Native之间的双向通信。JSBridge 是一种在混合应用中实现 …...

【Java EE】Spring请求如何传递参数详解

文章目录 &#x1f38d;传递单个参数&#x1f334;传递多个参数&#x1f340;传递对象&#x1f384;后端参数重命名&#xff08;后端参数映射&#xff09;&#x1f332;传递数组&#x1f38d;传递集合&#x1f334;传递JSON数据&#x1f338;JSON概念&#x1f338;JSON的语法&a…...

菜鸟笔记-Numpy常用函数用法汇总

NumPy&#xff08;Numerical Python的简称&#xff09;是Python中用于处理数组和矩阵的库&#xff0c;提供了大量的数学函数来操作这些数组。通过前面的学习&#xff0c;慢慢也能发现一些规律&#xff0c;以下是NumPy的一些常用函数及其用法汇总&#xff1a; 数组创建 numpy.a…...

tensorflow.js 如何使用opencv.js通过面部特征点估算脸部姿态并绘制示意图

文章目录 前言一、实现步骤1. 获取所需特征点的索引2. 使用opencv.js 计算俯仰角、水平角和翻滚角cv.solvePnP介绍cv.solvePnP原理运行代码查看效果 3.绘制姿态示意直线添加canvas元素计算姿态直线坐标并绘制 总结 前言 在计算机视觉领域&#xff0c;估算脸部姿态是一项具有挑…...

Linux命令-dpkg-divert命令(Debian Linux中创建并管理一个转向列表)

说明 dpkg-divert命令 是Debian Linux中创建并管理一个转向&#xff08;diversion&#xff09;列表&#xff0c;其使得安装文件的默认位置失效的工具。 语法 dpkg-divert(选项)(参数)选项 --add&#xff1a;添加一个转移文件&#xff1b; --remove&#xff1a;删除一个转移…...

flex: 1 是哪些属性的缩写?

flex&#xff1a;1是哪些属性的缩写? flex&#xff1a;1 是 flex-grow: 1, flex-shrink: 1,flex-basis: 0% 的缩写&#xff1b; 解释下flex-grow flex-grow是将剩余的空间&#xff0c;根据flex-grow的值平分&#xff0c;然后加到flex-basis上 <!doctype html> <htm…...

python基于opencv实现数籽粒

千粒重是一个重要的农艺性状&#xff0c;通过对其的测量和研究&#xff0c;我们可以更好地理解作物的生长状况&#xff0c;优化农业生产&#xff0c;提高作物产量和品质。但数籽粒数目是一个很繁琐和痛苦的过程&#xff0c;我们现在用一个简单的python程序来数水稻籽粒。代码的…...

OpenCV图像处理——基于OpenCV的ORB算法实现目标追踪

概述 ORB&#xff08;Oriented FAST and Rotated BRIEF&#xff09;算法是高效的关键点检测和描述方法。它结合了FAST&#xff08;Features from Accelerated Segment Test&#xff09;算法的快速关键点检测能力和BRIEF&#xff08;Binary Robust Independent Elementary Feat…...

13.JavaWeb XML:构建结构化数据的重要工具

目录 导语&#xff1a; 一、XML概念 &#xff08;1&#xff09;可拓展 &#xff08;2&#xff09;功能-存储数据 &#xff08;3&#xff09;xml与html的区别 二、XML内容 三、XML用途 四、案例&#xff1a;使用XML构建在线书店的书籍数据库 结语&#xff1a; 导语&…...

鸿蒙OS实战开发:【多设备自适应服务卡片】

介绍 服务卡片的布局和使用&#xff0c;其中卡片内容显示使用了一次开发&#xff0c;多端部署的能力实现多设备自适应。 用到了卡片扩展模块接口&#xff0c;[ohos.app.form.FormExtensionAbility] 。 卡片信息和状态等相关类型和枚举接口&#xff0c;[ohos.app.form.formInf…...

深度学习基础之一:机器学习

文章目录 深度学习基本概念(Basic concepts of deep learning)机器学习典型任务机器学习分类 模型训练的基本概念基本名词机器学习任务流程模型训练详细流程正、反向传播学习率Batch size激活函数激活函数 sigmoid 损失函数MSE & M交叉熵损失 优化器优化器 — 梯度下降优化…...

Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单视频处理实战案例 之五 简单指定视频某片段重复播放效果

Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单视频处理实战案例 之五 简单指定视频某片段重复播放效果 目录 Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单视频处理实战案例 之五 简单指定视频某片段重复播放效果 一、简单介绍 二、简单指定视频某片段重复播放…...

ARXML处理 - C#的解析代码(二)

参数类 参数容器&#xff08;ECUCPARAMCONFCONTAINERDEF&#xff09;的PARAMETERS集合类由以下参数类实例构成。 枚举参数&#xff08;ECUCENUMERATIONPARAMDEF &#xff09; 配置一个下拉选项&#xff0c;如PORT中一个pin可以配置SPI, CAN, PWM /// <remarks/>[Syste…...

关于华为即将举行的鸿蒙春季沟通会的新闻报道

华为计划在4月11日举办此次活动&#xff0c;届时将推出与车和PC类相关的新产品。尽管备受期待的华为P70系列设备的发布尚未得到官方确认&#xff0c;但已有多家媒体对此进行了报道。 文章中还提到了智界S7的新款可能在4月11日上市&#xff0c;并进行多项新功能升级。智界S7是去…...

MySQL视图及如何导入导出

1.视图 MySQL 视图&#xff08;View&#xff09;是一种虚拟存在的表&#xff0c;同真实表一样&#xff0c;视图也由列和行构成&#xff0c;但视图并不实际存在于数据库中。行和列的数据来自于定义视图的查询中所使用的表&#xff0c;并且还是在使用视图时动态生成的&#xff0…...

文心一言上线声音定制功能;通义千问开源模型;openAI又侵权?

文心一言上线定制专属声音功能 百度旗下 AI 聊天机器人文心一言上线新功能&#xff0c;用户录音一句话&#xff0c;即可定制声音。 使用这项功能需要使用文心一言 App。在创建智能体中&#xff0c;点击创建自己的声音&#xff0c;朗读系统提示的一句话&#xff0c;等候几秒钟时…...

课时89:流程控制_函数进阶_函数变量

2.1.4 综合案例 这一节&#xff0c;我们从 信息采集、环境部署、小结 三个方面来学习。 信息采集 脚本实践-采集系统负载信息 查看脚本内容 [rootlocalhost ~]# cat function_systemctl_load.sh #!/bin/bash # 功能&#xff1a;采集系统负载信息 # 版本&#xff1a;v0.3 # …...

Linux命令-dpkg-preconfigure命令(Debian Linux中软件包安装之前询问问题)

说明 dpkg-preconfigure命令 用于在Debian Linux中软件包安装之前询问问题。 语法 dpkg-preconfigure(选项)(参数)选项 -f&#xff1a;选择使用的前端&#xff1b; -p&#xff1a;感兴趣的最低的优先级问题&#xff1b; --apt&#xff1a;在apt模式下运行。参数 软件包&am…...

React 第五十五节 Router 中 useAsyncError的使用详解

前言 useAsyncError 是 React Router v6.4 引入的一个钩子&#xff0c;用于处理异步操作&#xff08;如数据加载&#xff09;中的错误。下面我将详细解释其用途并提供代码示例。 一、useAsyncError 用途 处理异步错误&#xff1a;捕获在 loader 或 action 中发生的异步错误替…...

DockerHub与私有镜像仓库在容器化中的应用与管理

哈喽&#xff0c;大家好&#xff0c;我是左手python&#xff01; Docker Hub的应用与管理 Docker Hub的基本概念与使用方法 Docker Hub是Docker官方提供的一个公共镜像仓库&#xff0c;用户可以在其中找到各种操作系统、软件和应用的镜像。开发者可以通过Docker Hub轻松获取所…...

智慧工地云平台源码,基于微服务架构+Java+Spring Cloud +UniApp +MySql

智慧工地管理云平台系统&#xff0c;智慧工地全套源码&#xff0c;java版智慧工地源码&#xff0c;支持PC端、大屏端、移动端。 智慧工地聚焦建筑行业的市场需求&#xff0c;提供“平台网络终端”的整体解决方案&#xff0c;提供劳务管理、视频管理、智能监测、绿色施工、安全管…...

macOS多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用

文章目录 问题现象问题原因解决办法 问题现象 macOS启动台&#xff08;Launchpad&#xff09;多出来了&#xff1a;Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用。 问题原因 很明显&#xff0c;都是Google家的办公全家桶。这些应用并不是通过独立安装的…...

【决胜公务员考试】求职OMG——见面课测验1

2025最新版&#xff01;&#xff01;&#xff01;6.8截至答题&#xff0c;大家注意呀&#xff01; 博主码字不易点个关注吧,祝期末顺利~~ 1.单选题(2分) 下列说法错误的是:&#xff08; B &#xff09; A.选调生属于公务员系统 B.公务员属于事业编 C.选调生有基层锻炼的要求 D…...

以光量子为例,详解量子获取方式

光量子技术获取量子比特可在室温下进行。该方式有望通过与名为硅光子学&#xff08;silicon photonics&#xff09;的光波导&#xff08;optical waveguide&#xff09;芯片制造技术和光纤等光通信技术相结合来实现量子计算机。量子力学中&#xff0c;光既是波又是粒子。光子本…...

JS手写代码篇----使用Promise封装AJAX请求

15、使用Promise封装AJAX请求 promise就有reject和resolve了&#xff0c;就不必写成功和失败的回调函数了 const BASEURL ./手写ajax/test.jsonfunction promiseAjax() {return new Promise((resolve, reject) > {const xhr new XMLHttpRequest();xhr.open("get&quo…...

宇树科技,改名了!

提到国内具身智能和机器人领域的代表企业&#xff0c;那宇树科技&#xff08;Unitree&#xff09;必须名列其榜。 最近&#xff0c;宇树科技的一项新变动消息在业界引发了不少关注和讨论&#xff0c;即&#xff1a; 宇树向其合作伙伴发布了一封公司名称变更函称&#xff0c;因…...

人工智能--安全大模型训练计划:基于Fine-tuning + LLM Agent

安全大模型训练计划&#xff1a;基于Fine-tuning LLM Agent 1. 构建高质量安全数据集 目标&#xff1a;为安全大模型创建高质量、去偏、符合伦理的训练数据集&#xff0c;涵盖安全相关任务&#xff08;如有害内容检测、隐私保护、道德推理等&#xff09;。 1.1 数据收集 描…...

鸿蒙(HarmonyOS5)实现跳一跳小游戏

下面我将介绍如何使用鸿蒙的ArkUI框架&#xff0c;实现一个简单的跳一跳小游戏。 1. 项目结构 src/main/ets/ ├── MainAbility │ ├── pages │ │ ├── Index.ets // 主页面 │ │ └── GamePage.ets // 游戏页面 │ └── model │ …...