RabbitMQ 延时消息实现
1. 实现方式
1. 设置队列过期时间:延迟队列消息过期 + 死信队列,所有消息过期时间一致
2. 设置消息的过期时间:此种方式下有缺陷,MQ只会判断队列第一条消息是否过期,会导致消息的阻塞需要额外安装 `rabbitmq_delayed_message_exchange` 插件才能解决此问题
- 导入Spring 集成RabbitMQ MAEVN
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.2.5.RELEASE</version>
</dependency>
2. 设置队列过期时间:延迟队列消息过期 + 死信队列
推送消息至延迟队列 -> 消息过期自动推送到死信队列 -> 消费死信队列
2.1. MQ配置信息
2.1.1. 自定义队列配置
…/bootstrap.yml
# rabbitmq自定义配置
rabbitmq:ttlExchange: medical_dev_ttl_topic_changettlKey: dev_ttlttlQueue: medical.dev.ttl.topic.queuedelayExpireTime: 600ttlQueueSize: 10000deadExchange: medical_dev_dead_topic_changedeadKey: dev_deaddeadQueue: medical.dev.dead.topic.queue
2.1.2. 读取自定义MQ配置信息
/*** amqp配置文件*/
@Data
@Component
@ConfigurationProperties(prefix = "rabbitmq")
public class MyConfigProperties {/*** 延迟队列*/public String ttlExchange;public String ttlKey;public String ttlQueue;private Integer delayExpireTime;public Integer ttlQueueSize;/*** 死信队列*/public String deadExchange;public String deadKey;public String deadQueue;}
2.2. 配置文件自动生成队列
2.2.1. 延迟队列
import com.awsa.site.mq.MyConfigProperties;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.annotation.Resource;
import java.util.HashMap;/*** 延迟队列配置文件* * @author mingAn.xie*/
@Configuration
public class RabbitMQConfigTTL {@ResourceMyConfigProperties myConfigProperties;// 1: 声明交换机@Beanpublic TopicExchange ttlTopicExchange(){return new TopicExchange(myConfigProperties.getTtlExchange());}// 2: 声明队列// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。@Beanpublic Queue ttlTopicduanxinQueue(){HashMap<String, Object> args = new HashMap<>();// 给队列设置消息过期时间:毫秒值args.put("x-message-ttl", mqConfigProperties.getDelayExpireTime() * 1000);// 设置队列最大长度args.put("x-max-length", myConfigProperties.getTtlQueueSize());// 设置死信队列交换机名称// 当消息在一个队列中变成死信后,它能就发送到另一个交换机中,这个交换机就是DLX,绑定DLX的队列被称之为死信队列// 编程死信队列的原因:消息被拒绝,消息过期,队列达到最大长度args.put("x-dead-letter-exchange", myConfigProperties.getDeadExchange());// 设置死信队列路由keyargs.put("x-dead-letter-routing-key", myConfigProperties.getDeadKey());return new Queue(myConfigProperties.getTtlQueue(), true, false, false, args);}// 3: 绑定对用关系@Beanpublic Binding ttlTopicsmsBinding(){return BindingBuilder.bind(ttlTopicduanxinQueue()).to(ttlTopicExchange()).with(myConfigProperties.getTtlKey());}}
2.2.2. 死信队列
import com.awsa.site.mq.MyConfigProperties;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.annotation.Resource;/*** 死信队列配置文件* * @author mingAn.xie*/
@Configuration
public class RabbitMQConfigDead {@ResourceMyConfigProperties myConfigProperties;// 1: 声明交换机@Beanpublic TopicExchange deadTopicExchange(){return new TopicExchange(myConfigProperties.getDeadExchange());}// 2: 声明队列// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。@Beanpublic Queue deadTopicduanxinQueue(){return new Queue(myConfigProperties.getDeadQueue(), true);}// 3: 绑定对用关系@Beanpublic Binding deadTopicsmsBinding(){return BindingBuilder.bind(deadTopicduanxinQueue()).to(deadTopicExchange()).with(myConfigProperties.getDeadKey());}}
2.3. 生产者推送消息
import com.awsa.site.mq.MyConfigProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** RabbitMQ生产者推送消息类* * @author xiemingan*/
@Component
@Slf4j
public class RabbitmqProducer {@Resourceprivate RabbitTemplate rabbitTemplate;@Resourceprivate MyConfigProperties myConfigProperties;/*** @param pushMessage 推送消息体*/public void pushTtlMessage(String pushMessage) {// 推送消息至交换机,并指定路由keyrabbitTemplate.convertAndSend(myConfigProperties.getTtlExchange(), myConfigProperties.getTtlKey(), pushMessage);log.info("MQ消息推送队列, exchange: {}, key: {}, message: {}", myConfigProperties.getTtlExchange(), myConfigProperties.getTtlKey(), pushMessage);}}
2.4. 消费者处理消息
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;/*** @author mingAn.xie*/
@Log4j2
@Component
public class RabbitmqConsumer {/*** 消费死信队列* @param message 消息体*/@RabbitListener(queues = "${rabbitmq.deadQueue}")public void pushMessages(Message message) {String body = new String(message.getBody()).trim();if (StringUtils.isEmpty(body)){return;}log.info("MQ消息消费, RabbitmqConsumer.pushMessages() : {}", body);}}
3. 设置消息的过期时间
设置交换机类型为
x-delayed-type
,推送消息至交换机,直连队列消费
3.1. 安装插件 rabbitmq_delayed_message_exchange
前言:这里默认使用环境为
Liunx
系统Docker
安装RabbitMQ
具体可以参考这篇文章:Docker 安装 RabbitMQ 挂载配置文件
安装插件版本需要与RabbitMQ版本一致,否则可能会导致安装失败,可先进入RabbitMQ容器中查看其他插件版本
插件各版本地址: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
- 这里以最新版本 v3.13.0 举例
# 下载插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez# 将插件复制进容器中: rabbitmq_xxxxxx
docker cp rabbitmq_delayed_message_exchange-3.13.0.ez rabbitmq_xxxxxx:/plugins# 进入容器: rabbitmq_xxxxxx
docker exec -it rabbitmq_xxxxxx bash
cd plugins# 查询插件列表, 此处可看到插件的版本
rabbitmq-plugins list# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- 交换机类型中出现
x-delayed-type
表示安装成功

3.2. MQ配置信息
3.2.1. 自定义队列配置
…/bootstrap.yml
#mq队列自定义配置
rabbitmq:saveTaskTtlExchange: ey240001_pro_save_task_ttl_topic_exchangesaveTaskTtlKey: ey240001_pro_save_task_ttlsaveTaskTtlQueue: ey240001.pro.save.task.ttl.topic.queuesaveTaskTtlQueueSize: 10000
3.2.2. 读取自定义MQ配置信息
/*** amqp配置文件** @author mingAn.xie*/
@Data
@Component
@ConfigurationProperties(prefix = "rabbitmq")
public class MyConfigProperties {/*** 任务待办生成延时队列*/public String saveTaskTtlExchange;public String saveTaskTtlKey;public String saveTaskTtlQueue;public Integer saveTaskTtlQueueSize;}
3.3. 配置文件生成 x-delayed-type
交换机
import com.awsa.site.mq.MyConfigProperties;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;/*** x-delayed-type 交换机延迟队列配置* * @author mingAn.xie*/
@Configuration
public class RabbitMQConfigSaveTaskTtl {@ResourceMyConfigProperties myConfigProperties;// 1: 声明交换机@Beanpublic CustomExchange saveTaskTopicExchange() {Map<String, Object> args = new HashMap<>();// 设置延迟队列插件类型:按过期时间消费args.put("x-delayed-type", "direct");// 参数:name 交换机名称,type 交换机类型,durable 是否持久化,autoDelete 是否自动删除,arguments 参数return new CustomExchange(myConfigProperties.getSaveTaskTtlExchange(), "x-delayed-message", true, false, args);}// 2: 声明队列// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。@Beanpublic Queue saveTaskTopicduanxinQueue() {return new Queue(myConfigProperties.getSaveTaskTtlQueue(), true, false, false);}// 3: 绑定对用关系@Beanpublic Binding saveTaskTopicsmsBinding() {return BindingBuilder.bind(saveTaskTopicduanxinQueue()).to(saveTaskTopicExchange()).with(myConfigProperties.getSaveTaskTtlKey()).noargs();}}
3.4. 生产者推送消息
import com.awsa.site.mq.MyConfigProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** 生产者推送消息类* * @author xiemingan*/
@Component
@Slf4j
public class RabbitmqProducer {@Resourceprivate RabbitTemplate rabbitTemplate;@Resourceprivate MyConfigProperties myConfigProperties;/*** @param pushMessage 推送消息体* @param ttlTime 延时时间(毫秒值)*/public void pushTtlMessage(String pushMessage, long ttlTime) {ttlTime = ttlTime <= 0 ? 1000 : ttlTime;// 3.1.推送MQ延迟消息队列long finalTtlTime = ttlTime;MessagePostProcessor messagePostProcessor = message -> {// 设置延迟时间message.getMessageProperties().setDelay((int) finalTtlTime);return message;};rabbitTemplate.convertAndSend(myConfigProperties.getSaveTaskTtlExchange(), myConfigProperties.getSaveTaskTtlKey(), pushMessage, messagePostProcessor);log.info("MQ消息推送队列, exchange: {}, key: {}, message: {}, ttlTime: {}", myConfigProperties.getSaveTaskTtlExchange(), myConfigProperties.getSaveTaskTtlKey(), pushMessage, ttlTime);}}
3.5. 消费者处理消息
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;/*** @author mingAn.xie*/
@Log4j2
@Component
public class RabbitmqConsumer {/*** 消费延时消息* @param message 消息体*/@RabbitListener(queues = "${rabbitmq.saveTaskTtlQueue}")public void pushMessages(Message message) {String body = new String(message.getBody()).trim();if (StringUtils.isEmpty(body)) {return;}log.info("MQ延迟消息消费, RabbitmqConsumer.pushMessages() : {}", body);}}
相关文章:

RabbitMQ 延时消息实现
1. 实现方式 1. 设置队列过期时间:延迟队列消息过期 死信队列,所有消息过期时间一致 2. 设置消息的过期时间:此种方式下有缺陷,MQ只会判断队列第一条消息是否过期,会导致消息的阻塞需要额外安装 rabbitmq_delayed_me…...
【Django】枚举类型数据
模型 在模型里主要增加两项内容: 枚举表字段增加choices class Snort(CoreModel):PAGE_TYPE_CHOICES [(1, 失陷主机检测), # 1是保存到数据库里的数据,失陷主机检测是显示在前端的(2, 远程漏洞攻击检测),(3, 可疑流量行为),(4, WEB检测),]page_type…...
java实现https连接总是要报no cipher suites in common
遇到“no cipher suites in common”这样的错误通常意味着客户端和服务器之间没有共同支持的加密套件(Cipher Suite)。这个问题可能由多个原因引起,包括但不限于SSL/TLS配置错误、Java安全策略限制、客户端或服务器不支持的加密算法等。解决这…...

[C++初阶] 爱上C++ : 与C++的第一次约会
🔥个人主页:guoguoqiang 🔥专栏:我与C的爱恋 本篇内容带大家浅浅的了解一下C中的命名空间。 在c中,名称(name)可以是符号常量、变量、函数、结构、枚举、类和对象等等。工程越大,名称…...

STM32技术打造:智能考勤打卡系统 | 刷卡式上下班签到自动化解决方案
文章目录 一、简易刷卡式打卡考勤系统(一)功能简介原理图设计程序设计 哔哩哔哩: https://www.bilibili.com/video/BV1NZ421Y79W/?spm_id_from333.999.0.0&vd_sourcee5082ef80535e952b2a4301746491be0 一、简易刷卡式打卡考勤系统 &…...

module ‘numpy‘ has no attribute ‘int‘
在 NumPy 中,如果遇到了错误提示 "module numpy has no attribute int",这通常意味着正在尝试以错误的方式使用 NumPy 的整数类型。从 NumPy 1.20 版本开始,numpy.int 已经不再是一个有效的属性,因为 NumPy 不再推荐使用…...

MFC(一)搭建空项目
安装MFC支持库 创建空白桌面程序 项目相关设置 复制以下代码 // mfc.h #pragma once #include <afxwin.h>class MyApp : public CWinApp { public:virtual BOOL InitInstance(); };class MyFrame : public CFrameWnd { public:MyFrame();// 消息映射机制DECLARE_…...

OKCC的API资源管理平台怎么用?
API资源管理平台,重点是“资源”管理平台,不是API接口管理平台。 天天讯通推出的API资源管理平台,类似昆石的VOS系统,区别是VOS是SIP资源管理系统,我们的API资源管理平台是API资源管理系统(AXB、AX、回拨AP…...
CentOS 7 安装python 3.7 需要必要的依赖。
在 CentOS 7 上部署 Python 3.7 可以通过源代码编译安装来实现。以下是大致的步骤: 安装必要的依赖: bashCopy Code sudo yum install gcc openssl-devel bzip2-devel libffi-devel 下载 Python 3.7 源代码并进行编译安装: bashCopy Code wg…...

美术馆设计方案优化布局与设施提升观众体验!
如今,美术馆不仅仅是作为展示艺术作品的平台,也是吸引公众参与和创造独特体验的数字艺术体验空间,因此许多传统美术馆在进行翻修改造时,都会更加注重用户体验,并在其中使用大量的多媒体互动,让参观者能够在…...
数据库基础原理
宏观 数据库的实现原理分为四个部分: 网络通信 网络协议 硬盘存储 内存分配 微观 硬盘存储 数据库是持久化的,而持久化如何实现的,我们不难想到磁盘可以持久化存储,所以数据库所有持久化的数据都是以文件形式存在磁盘中的…...

Pandas操作MultiIndex合并行列的Excel,写入读取以及写入多余行及Index列处理,插入行,修改某个单元格的值,多字段排序
Pandas操作MultiIndex合并行列的excel,写入读取以及写入多余行及Index列处理,多字段排序尽量保持原来的顺序 1. 效果图及问题2. 源码参考 今天是谁写Pandas的 复合索引MultiIndex,写的糊糊涂涂,晕晕乎乎。 是我呀… 记录下&#…...
工作总结5
1.taro框架使用map标签出现的错误 这个问题困扰很长时间,在频繁切换页面渲染的时候出现左边不显示,我理解的是变量没有到达map标签的属性上,那我就想是不是setState太慢了,然后又用了变量,本地缓存等,都没有…...

速通汇编(二)汇编mov、addsub指令
一,mov指令 mov指令的全称是move,从字面上去理解,作用是移动(比较确切的说是复制)数据,mov指令可以有以下几种形式 无论哪种形式,都是把右边的值移动到左边 mov 寄存器,数据&#…...
软考 - 系统架构设计师 - 构件组装技术
概念 构件组装是将库中的构件经修改后相互连接,或者将它们和当前开发项目中的软件元素进行连接,最终构成新的目标构件。 构件组装技术是基于构件的软件开发的核心技术,也是构件技术研究的重点和难点。构件组装的目的是利用现有的构件组装成新…...

2010年之前电脑ubuntu安装nvidia驱动黑屏处理
装好驱动 仿真fps直接到60Hz 陈旧设备 都是非常老旧的电脑,没钱换新电脑,就这么穷…… 电脑详细配置: 冲动 想装显卡驱动提升一下性能,结果……黑了 黑习惯了也无所谓,几分钟就能解决,关键还是太穷&…...

类与对象中C++
加油!!! 文章目录 前言 一、类的6个默认成员函数 编辑 二、构造函数 1.概念 三、析构函数 1.概念 2.特性 四、拷贝构造函数 1.概念 2.特征 拷贝构造函数典型调用场景 五、赋值运算符重载 1.运算符重载 2.赋值运算符重载 赋值运算符重载格式…...
k8s 集群重启报错:The connection to the server 192.168.92.26:6443 was refused
[rootk8s-master ~]# kubectl get node The connection to the server 192.168.92.26:6443 was refused - did you specify the right host or port?查到是kubelet进程没有启动 [rootk8s-master ~]# systemctl status kubelet ● kubelet.service - kubelet: The Kubernetes …...

国内好用的chatGPT和AI绘图工具
分享一个比较好用的AI 分享一个比较好用的AI,只是需要开通会员,目前官网的价格是:298,开通之后可以使用chatgpt4、AI绘画、图片融合等等!不开通的话是可以免费使用15次的,下面是一些介绍图片!链…...

蚂蚁庄园3.31今日答案春季美食“雷竹笋”之所以得名是因为出笋与打雷有关吗?
蚂蚁庄园是一款爱心公益游戏,用户可以通过喂养小鸡,产生鸡蛋,并通过捐赠鸡蛋参与公益项目。用户每日完成答题就可以领取鸡饲料,使用鸡饲料喂鸡之后,会可以获得鸡蛋,可以通过鸡蛋来进行爱心捐赠。其中&#…...
什么?连接服务器也能可视化显示界面?:基于X11 Forwarding + CentOS + MobaXterm实战指南
文章目录 什么是X11?环境准备实战步骤1️⃣ 服务器端配置(CentOS)2️⃣ 客户端配置(MobaXterm)3️⃣ 验证X11 Forwarding4️⃣ 运行自定义GUI程序(Python示例)5️⃣ 成功效果
什么是Ansible Jinja2
理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具,可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板,允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板,并通…...

有限自动机到正规文法转换器v1.0
1 项目简介 这是一个功能强大的有限自动机(Finite Automaton, FA)到正规文法(Regular Grammar)转换器,它配备了一个直观且完整的图形用户界面,使用户能够轻松地进行操作和观察。该程序基于编译原理中的经典…...

【数据分析】R版IntelliGenes用于生物标志物发现的可解释机器学习
禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者! 文章目录 介绍流程步骤1. 输入数据2. 特征选择3. 模型训练4. I-Genes 评分计算5. 输出结果 IntelliGenesR 安装包1. 特征选择2. 模型训练和评估3. I-Genes 评分计…...
安卓基础(aar)
重新设置java21的环境,临时设置 $env:JAVA_HOME "D:\Android Studio\jbr" 查看当前环境变量 JAVA_HOME 的值 echo $env:JAVA_HOME 构建ARR文件 ./gradlew :private-lib:assembleRelease 目录是这样的: MyApp/ ├── app/ …...
JavaScript基础-API 和 Web API
在学习JavaScript的过程中,理解API(应用程序接口)和Web API的概念及其应用是非常重要的。这些工具极大地扩展了JavaScript的功能,使得开发者能够创建出功能丰富、交互性强的Web应用程序。本文将深入探讨JavaScript中的API与Web AP…...

莫兰迪高级灰总结计划简约商务通用PPT模版
莫兰迪高级灰总结计划简约商务通用PPT模版,莫兰迪调色板清新简约工作汇报PPT模版,莫兰迪时尚风极简设计PPT模版,大学生毕业论文答辩PPT模版,莫兰迪配色总结计划简约商务通用PPT模版,莫兰迪商务汇报PPT模版,…...

Golang——9、反射和文件操作
反射和文件操作 1、反射1.1、reflect.TypeOf()获取任意值的类型对象1.2、reflect.ValueOf()1.3、结构体反射 2、文件操作2.1、os.Open()打开文件2.2、方式一:使用Read()读取文件2.3、方式二:bufio读取文件2.4、方式三:os.ReadFile读取2.5、写…...

c++第七天 继承与派生2
这一篇文章主要内容是 派生类构造函数与析构函数 在派生类中重写基类成员 以及多继承 第一部分:派生类构造函数与析构函数 当创建一个派生类对象时,基类成员是如何初始化的? 1.当派生类对象创建的时候,基类成员的初始化顺序 …...