延迟队列的理解与使用
目录
一、场景引入
二、延迟队列的三种场景
1、死信队列+TTL对队列进行延迟
2、创建通用延时消息+死信队列 对消息延迟
3、使用rabbitmq的延时队列插件
x-delayed-message使用
父pom文件
pom文件
配置文件
config
生产者
消费者
结果
一、场景引入
我们知道可以通过TTL来对队列进行设置过期时间;通过后置处理器MessagePostProcessor对消息进行设置过期时间;
那么根据TTL及MessagePostProcessor机制可以处理关于延迟方面的问题。
比如:秒杀之后,给30分钟时间进行支付,如果30分钟后,没有支付,订单取消。
比如:餐厅订座,A用户早上8点预定的某家餐厅下午6点的座位,B用户中午12点预定的下午5点的座位;根据场景我们需要的时先让B用户进行消费,然后A用户再消费;这时TTL和MessagePostProcessor延迟就已经不能满足订餐的场景了;
因为TTL是对队列进行延迟,MessagePostProcessor是对消息进行延迟,但是MessagePostProcessor对消息延迟是不能根据订座的时间去排序消费的;
/*** 比如当我们发送第一个消息时延迟的时间时50s,而发送的第二个消息延迟时间是30s,虽然延迟30s的消息比延迟50s发送的晚* 但按照我们设想的情况,延迟30s的消息应该先消费;可是实际情况却不是这样,而是延迟50s的消息到达时间后 30s的才能消费!(队列先进先出)* 那这样此方式的不足就出现了!* 场景:* A用户和B用户预定餐厅,A用户先开始预定的,预定的是下午6点。B用户比A用户预定操作晚一些,但是B用户预定的时间是下午5点。通过此场景* 我们希望的是B用户先进行用餐(因为他预定的吃饭时间比A早一些,需要先安排吃饭。不能说A用户没到6点B用户预定5点的吃不了。),根据此* 场景 之前的队列延迟还是消息延迟都不能满足场景需求了,这样就需要另一种延迟方式进行解决了! ->使用rabbitmq的延时队列插件*//*** 注意:* TTL是对队列进行延迟,只要是在此队列中的消息都会按照TTL设置时间进行延迟;* MessagePostProcessor是对消息进行延迟;** 如果我们不仅使用了消息延迟,而且还使用了队列延迟,那么延迟的时间就会以小的时间为准!* 理解:* 如果a消息设置的消息延迟是30s,b消息设置的延迟时间是90s,队列设置的延迟是60s。那么a消息最终的延迟是30s(a的消息延迟与队列延迟* 比对以延迟时间小的为准!),b消息最终延迟的时间是60s(b的消息延迟与队列延迟比对以延迟的时间小的为准!)*/
二、延迟队列的三种场景
1、死信队列+TTL对队列进行延迟
给队列设置TTL过期时间,此队列可不用绑定消费者,时间到后把消息投递到死信队列中,由死信队列的消费者进行消费,这样就能达到延迟消费的作用
@Beanpublic Queue directQueueLong(){return QueueBuilder.durable("业务队列名称").deadLetterExchange("死信交换机名称").deadLetterRoutingKey("死信队列 RoutingKey").ttl(20000) // 消息停留时间//.maxLength(500).build();}
监听死信队列,即可处理超时的消息队列
缺点:
上述实现方式中,ttl延时队列中所有的消息超时时间都是一样的,如果不同消息想设置不一样的超时时间,就需要建立多个不同超时时间的消息队列,比较麻烦,且不利于维护。
2、创建通用延时消息+死信队列 对消息延迟
这样的方式可对每一个消息设置指定的过期时间,不用像TTL那样给队列设置过期时间(若队列设置的过期时间达到后,其队列中的消息均会被删除或别的处理),但是由于队列是先进先出,若先投递的消息设置的过期时间是40s,后投递的消息过期时间是30s,那么设置过期时间为30s的并不会到30s时就投递到死信队列中,而是等40s到期后才会一起被投递。
rabbitTemplate.convertAndSend("交换机名称", "RoutingKey","对象",
message => {message.getMessageProperties().setExpiration(String.valueOf(5000))// 设置消息的持久化属性//这样发送的消息就会被持久化到 RabbitMQ 中,即使 RabbitMQ 重启,消息也不会丢失。 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;});
缺点:
该种方式可以创建一个承载不同超时时间消息的消息队列,但是这种方式有一个问题,如果消息队列中排在前面的消息没有到超时时间,即使后面的消息到了超时时间,先到超时时间的消息也不会进入死信队列,而是先检查排在最前面的消息队列是否到了超时时间,如果到了超时时间才会继续检查后面的消息。
3、使用rabbitmq的延时队列插件
使用rabbitmq的延时队列插件,实现同一个队列中有多个不同超时时间的消息,并按时间超时顺序出队
1、下载延迟插件
在 RabbitMQ 的 3.5.7 版本之后,提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列 ,同时需保证 Erlang/OPT 版本为 18.0 之后。
下载地址:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
2、安装插件并启用
下载完成后直接把插件放在 /home/rabbitmq 目录,然后拷贝到容器内plugins目录下(my-rabbit是容器的name,也可以使用容器id)
docker cp /home/rabbitmq/rabbitmq_delayed_message_exchange-3.12.0.ez rabbitmq:/plugins
进入 Docker 容器
docker exec -it rabbit /bin/bash
在plugins内启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
退出容器
exit
重启 RabbitMQ
docker restart my-rabbit
貌似不重启也能生效!
结果:
就多了一个x-delayed-message交换机
x-delayed-message使用
父pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.1</version>
<!-- <version>2.2.5.RELEASE</version>--><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.chensir</groupId><artifactId>spring-boot-rabbitmq</artifactId><version>0.0.1-SNAPSHOT</version><name>spring-boot-rabbitmq</name><properties><java.version>8</java.version><hutool.version>5.8.3</hutool.version><lombok.version>1.18.24</lombok.version></properties><description>spring-boot-rabbitmq</description><packaging>pom</packaging><modules><module>direct-exchange</module><module>fanout-exchange</module><module>topic-exchange</module><module>game-exchange</module><module>dead-letter-queue</module><module>delay-queue</module><module>delay-queue2</module></modules><dependencyManagement><dependencies><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>${hutool.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version></dependency></dependencies></dependencyManagement></project>
pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>com.chensir</groupId><artifactId>spring-boot-rabbitmq</artifactId><version>0.0.1-SNAPSHOT</version><relativePath>../pom.xml </relativePath></parent><artifactId>delay-queue2</artifactId><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
配置文件
logging.level.com.chensir = debug
server.port=8086
#host
spring.rabbitmq.host=121.40.100.66
#默认5672
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=guest
#密码
spring.rabbitmq.password=guest
#连接到代理时用的虚拟主机
spring.rabbitmq.virtual-host=/
#每个消费者每次可最大处理的nack消息数量
spring.rabbitmq.listener.simple.prefetch=1
#表示消息确认方式,其有三种配置方式,分别是none、manual(手动)和auto(自动);默认auto
spring.rabbitmq.listener.simple.acknowledge-mode=auto
#监听重试是否可用
spring.rabbitmq.listener.simple.retry.enabled=true
#最大重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=5
#最大重试时间间隔
spring.rabbitmq.listener.simple.retry.max-interval=3000ms
#第一次和第二次尝试传递消息的时间间隔
spring.rabbitmq.listener.simple.retry.initial-interval=1000ms
#应用于上一重试间隔的乘数
spring.rabbitmq.listener.simple.retry.multiplier=2
#决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
spring.rabbitmq.listener.simple.default-requeue-rejected=false
config
package com.chensir.config;import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}@Beanpublic CustomExchange customExchange(){Map<String,Object> args = new HashMap<>();//延迟时以direct直连方式绑定args.put("x-delayed-type","direct");// name:交换机名称 type:类型 固定值x-delayed-messagereturn new CustomExchange("chen-x-delayedExchange","x-delayed-message",true,false,args);}@Beanpublic Queue directQueueLong(){return QueueBuilder.durable("chen-DirectQueue").build();}@Beanpublic Binding binding(){return BindingBuilder.bind(directQueueLong()).to(customExchange()).with("direct123").noargs();}
}
生产者
package com.chensir.provider;import com.chensir.model.OrderIngOk;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
@Slf4j
public class DirectProvider {@Resourceprivate RabbitTemplate rabbitTemplate;public void send(){OrderIngOk orderIngOk = new OrderIngOk();orderIngOk.setOrderNo("202207149687-1");orderIngOk.setId(1);orderIngOk.setUserName("倪海杉前-延迟40秒");rabbitTemplate.convertAndSend("chen-x-delayedExchange", "direct123",orderIngOk,m->{m.getMessageProperties().setDelay(40*1000); //设置延迟时间,对延迟交换机有效// m.getMessageProperties().setExpiration(String.valueOf(30*1000)); 设置过期时间,对队列有效return m;});OrderIngOk orderIngOk2 = new OrderIngOk();orderIngOk2.setOrderNo("202207149687-2");orderIngOk2.setId(2);orderIngOk2.setUserName("倪海杉后-延迟30秒");rabbitTemplate.convertAndSend("chen-x-delayedExchange", "direct123",orderIngOk2,m->{m.getMessageProperties().setDelay(30*1000);return m;});log.debug("消息生产完成");}}
消费者
package com.chensir.consumer;import cn.hutool.json.JSONUtil;
import com.chensir.model.OrderIngOk;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
@Slf4j
public class DirectConsumer {@RabbitHandler@RabbitListener(queues = "chen-DirectQueue" )public void process(OrderIngOk orderIngOk) throws IOException {try {// 处理业务开始log.debug("接受到消息,并正常处理结束,{}", JSONUtil.toJsonStr(orderIngOk));// 处理业务结束} catch (Exception ex){throw ex;}}
}
结果
2023-08-29 18:27:45.983 DEBUG 15568 --- [ main] com.chensir.provider.DirectProvider : 消息生产完成
2023-08-29 18:28:16.143 DEBUG 15568 --- [ntContainer#0-1] com.chensir.consumer.DirectConsumer : 接受到消息,并正常处理结束,{"id":2,"OrderNo":"202207149687-2","userName":"倪海杉后-延迟30秒"}
2023-08-29 18:28:26.057 DEBUG 15568 --- [ntContainer#0-1] com.chensir.consumer.DirectConsumer : 接受到消息,并正常处理结束,{"id":1,"OrderNo":"202207149687-1","userName":"倪海杉前-延迟40秒"}
可见 延迟40s的是先发送的,但是最终结果是先消费延迟30s的。这样就能达到我们订座的场景需求了。
相关文章:

延迟队列的理解与使用
目录 一、场景引入 二、延迟队列的三种场景 1、死信队列TTL对队列进行延迟 2、创建通用延时消息死信队列 对消息延迟 3、使用rabbitmq的延时队列插件 x-delayed-message使用 父pom文件 pom文件 配置文件 config 生产者 消费者 结果 一、场景引入 我们知道可以通过TT…...

jQuery成功之路——jQuery的DOM操作简单易懂
jQuery的DOM操作 1.jQuery操作内容 jQuery操作内容 1. text() 获取或修改文本内容 类似于 dom.innerText 2. html() 获取或修改html内容 类似 dom.innerHTML 注意: 1. text() 是获取设置所有 2. html() 是获取第一个,设置所有 <!DOCTYPE html> <html lang"zh…...
C++ 学习系列 -- 智能指针 make_shared 与 make_unique
一 make_shared 1.1 make_shared 是什么? c 11 中 引入了智能指针 shared_ptr,以及一个模板函数 make_shared 来生成一个制定类型的 shared_ptr。 1.2 引入 make_shared ,解决了什么问题? make_shared的引入,主…...

贝叶斯神经网络 - 捕捉现实世界的不确定性
贝叶斯神经网络 - 捕捉现实世界的不确定性 Bayesian Neural Networks 生活本质上是不确定性和概率性的,贝叶斯神经网络 (BNN) 旨在捕获和量化这种不确定性 在许多现实世界的应用中,仅仅做出预测是不够的;您还想知道您对该预测的信心有多大。例…...
games101作业1
题目 给定三维下三个点 v0(2.0, 0.0, −2.0), v1(0.0, 2.0, −2.0), v2(−2.0, 0.0, −2.0), 你需要将这三个点的坐标变换为屏幕坐标并在屏幕上绘制出对应的线框三角形 (在代码框架中,我们已经提供了 draw_triangle 函数,所以你只需要去构建变换矩阵即可…...

LeetCode 面试题 02.08. 环路检测
文章目录 一、题目二、C# 题解 一、题目 给定一个链表,如果它是有环链表,实现一个算法返回环路的开头节点。若环不存在,请返回 null。 如果链表中有某个节点,可以通过连续跟踪 next 指针再次到达,则链表中存在环。 为了…...

【Linux】线程安全-生产者消费者模型
文章目录 生产者消费者模型123规则应用场景优点忙闲不均生产者和消费者解耦支持高并发 代码模拟 生产者消费者模型 123规则 1个线程安全的队列:只要保证先进先出特性的数据结构都可以称为队列 这个队列要保证互斥(就是保证当前只有一个线程对队列进行操…...
优化(2) 2023/09/03
今天重新温习了下clean abap,以前只是偶尔打开看几眼。今天把有些自己不熟悉的地方,重点研究了下。有几个点可以在以后工作使用。这几点可能并不能提升程序效率,但会大大提高代码可读性和代码的可扩展性: 用insert XXX into tabl…...
Swap and Reverse 题解
Swap and Reverse 题面翻译 题目描述 本题共有 t t t 组数据。 给定一个长度为 n n n 的字符串 s s s 和一个整数 k k k, s s s 只包含小写字母,你可以进行若干次操作(可以是零次),具体操作如下: 选…...

单元测试:优雅编写Kotlin单元测试
一、MockK简介 MockK是一款功能强大、易于使用的Kotlin mocking框架。在编写单元测试时,MockK能够帮助我们简化代码、提高测试覆盖率,并改善测试的可维护性。除了基本用法外,MockK还提供了许多额外的功能和灵活的用法,让我们能够…...

深度学习入门教学——卷积神经网络CNN
目录 一、CNN简介 一、输入层 二、卷积层 三、池化层 四、全连接层 一、CNN简介 1、应用领域 检测任务 分类与检索 超分辨率重构 2、卷积网络与传统网咯的区别 传统神经网络和卷积神经网络都是用来提取特征的。神经网络: 可以将其看作是一个二维的。卷积神经…...
【MySQL】MySQL系统变量(system variables)列表(mysqld --verbose --help的结果例)
文章目录 【MySQL】MySQL系统变量(system variables)列表(mysqld --verbose --help的结果例)mysqld --verbose --help的结果例参考 【免责声明】文章仅供学习交流,观点代表个人,与任何公司无关。 编辑|SQL和…...

Python学习之四 数据输入与输出
(一) 脚本编程 前面的章节,组要学习了一些简单的Python编程,使用的是交互式解释器,本章节将开始进行脚本编程。可以使用多种编辑器或者IDE完成编码,主要使用vim。 参考前续小节的写法,我们给a、b分别赋值3和5。 在终端运行程序后发现,没有任何输出。这就是本次我们将要…...

VBA技术资料MF51:VBA_在Excel中突出显示唯一值
【分享成果,随喜正能量】世间万物,因果循环不休,你的善心善行,都可能成为你的善缘善果。每天忆佛念佛,每天都在佛菩萨的加持下生活,自然吉祥如意,法喜充满。 。 我给VBA的定义:VBA是…...

Mqtt学习笔记--交叉编译移植(1)
简述 Mqtt目前在物联网行业的应用比较多,mqtt属于应用层的一个中间件,这个中间件实现消息的订阅发布机制。网上介绍Mqtt的实现原来的比较多,这里不细介绍。 其实在我们之前的产品中,自己也开发的有类似的中间件,除了具…...

Gateway的服务网关
Gateway服务网关 Gateway网关是我们服务的守门神,所有微服务的统一入口。 网关的核心功能特性: 请求路由 权限控制 限流 架构如下: gateway使用 引入依赖 创建gateway服务,引入依赖 <!--网关--> <dependency>…...

信息化发展18
存储技术 1 、存储分类 2 、常用存储模式的技术与应用对比: ( 1 ) 存储虚拟化( Storage Virtualization ) 是“ 云存储” 的核心技术之一。 它带给人们直接的好处是提高了存储利用率, 降低了存储成本, 简…...

TypeScript学习 + 贪吃蛇项目
TypeSCript简介 TypeScript是JavaScript的超集。它对JS进行了扩展,向JS中引入了类型的概念,并添加了许多新的特性。TS代码需要通过编译器编译为JS,然后再交由JS解析器执行。TS完全兼容JS,换言之,任何的JS代码都可以直…...
YOLO-NAS详细教程-介绍如何进行物体检测
对象检测是计算机视觉中的一项核心任务,可以检测和分类图像中的边界框。自从深度学习首次取得突破以来,它就以极快的速度获得普及和普及,并推动了医疗领域、监控、智能购物等众多公司的发展。考虑到它最终满足了两个基本需求,这一点也就不足为奇了端到端方式:找到所有当前…...
容器没有命令时,如何查看进程、容器executable file not found in $PATH: unknown
前言 当容器没有ps -ef命令时,可以通过以下的命令来查看容器的进程。 docker container top查看容器运行的进程(该命令很有用) #docker container top 命令用于查看容器运行的进程 #当容器里面没有ps -ef命令时,使用docker con…...

【Oracle APEX开发小技巧12】
有如下需求: 有一个问题反馈页面,要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据,方便管理员及时处理反馈。 我的方法:直接将逻辑写在SQL中,这样可以直接在页面展示 完整代码: SELECTSF.FE…...

阿里云ACP云计算备考笔记 (5)——弹性伸缩
目录 第一章 概述 第二章 弹性伸缩简介 1、弹性伸缩 2、垂直伸缩 3、优势 4、应用场景 ① 无规律的业务量波动 ② 有规律的业务量波动 ③ 无明显业务量波动 ④ 混合型业务 ⑤ 消息通知 ⑥ 生命周期挂钩 ⑦ 自定义方式 ⑧ 滚的升级 5、使用限制 第三章 主要定义 …...

转转集团旗下首家二手多品类循环仓店“超级转转”开业
6月9日,国内领先的循环经济企业转转集团旗下首家二手多品类循环仓店“超级转转”正式开业。 转转集团创始人兼CEO黄炜、转转循环时尚发起人朱珠、转转集团COO兼红布林CEO胡伟琨、王府井集团副总裁祝捷等出席了开业剪彩仪式。 据「TMT星球」了解,“超级…...

【Java_EE】Spring MVC
目录 Spring Web MVC 编辑注解 RestController RequestMapping RequestParam RequestParam RequestBody PathVariable RequestPart 参数传递 注意事项 编辑参数重命名 RequestParam 编辑编辑传递集合 RequestParam 传递JSON数据 编辑RequestBody …...

蓝桥杯3498 01串的熵
问题描述 对于一个长度为 23333333的 01 串, 如果其信息熵为 11625907.5798, 且 0 出现次数比 1 少, 那么这个 01 串中 0 出现了多少次? #include<iostream> #include<cmath> using namespace std;int n 23333333;int main() {//枚举 0 出现的次数//因…...

OPenCV CUDA模块图像处理-----对图像执行 均值漂移滤波(Mean Shift Filtering)函数meanShiftFiltering()
操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 在 GPU 上对图像执行 均值漂移滤波(Mean Shift Filtering),用于图像分割或平滑处理。 该函数将输入图像中的…...

GO协程(Goroutine)问题总结
在使用Go语言来编写代码时,遇到的一些问题总结一下 [参考文档]:https://www.topgoer.com/%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B/goroutine.html 1. main()函数默认的Goroutine 场景再现: 今天在看到这个教程的时候,在自己的电…...
探索Selenium:自动化测试的神奇钥匙
目录 一、Selenium 是什么1.1 定义与概念1.2 发展历程1.3 功能概述 二、Selenium 工作原理剖析2.1 架构组成2.2 工作流程2.3 通信机制 三、Selenium 的优势3.1 跨浏览器与平台支持3.2 丰富的语言支持3.3 强大的社区支持 四、Selenium 的应用场景4.1 Web 应用自动化测试4.2 数据…...

Golang——7、包与接口详解
包与接口详解 1、Golang包详解1.1、Golang中包的定义和介绍1.2、Golang包管理工具go mod1.3、Golang中自定义包1.4、Golang中使用第三包1.5、init函数 2、接口详解2.1、接口的定义2.2、空接口2.3、类型断言2.4、结构体值接收者和指针接收者实现接口的区别2.5、一个结构体实现多…...

渗透实战PortSwigger靶场:lab13存储型DOM XSS详解
进来是需要留言的,先用做简单的 html 标签测试 发现面的</h1>不见了 数据包中找到了一个loadCommentsWithVulnerableEscapeHtml.js 他是把用户输入的<>进行 html 编码,输入的<>当成字符串处理回显到页面中,看来只是把用户输…...