RabbitMQ 延时消息实现
1. 实现方式
1. 设置队列过期时间:延迟队列消息过期 + 死信队列,所有消息过期时间一致
2. 设置消息的过期时间:此种方式下有缺陷,MQ只会判断队列第一条消息是否过期,会导致消息的阻塞需要额外安装 `rabbitmq_delayed_message_exchange` 插件才能解决此问题
- 导入Spring 集成RabbitMQ MAEVN
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.2.5.RELEASE</version>
</dependency>
2. 设置队列过期时间:延迟队列消息过期 + 死信队列
推送消息至延迟队列 -> 消息过期自动推送到死信队列 -> 消费死信队列
2.1. MQ配置信息
2.1.1. 自定义队列配置
…/bootstrap.yml
# rabbitmq自定义配置
rabbitmq:ttlExchange: medical_dev_ttl_topic_changettlKey: dev_ttlttlQueue: medical.dev.ttl.topic.queuedelayExpireTime: 600ttlQueueSize: 10000deadExchange: medical_dev_dead_topic_changedeadKey: dev_deaddeadQueue: medical.dev.dead.topic.queue
2.1.2. 读取自定义MQ配置信息
/*** amqp配置文件*/
@Data
@Component
@ConfigurationProperties(prefix = "rabbitmq")
public class MyConfigProperties {/*** 延迟队列*/public String ttlExchange;public String ttlKey;public String ttlQueue;private Integer delayExpireTime;public Integer ttlQueueSize;/*** 死信队列*/public String deadExchange;public String deadKey;public String deadQueue;}
2.2. 配置文件自动生成队列
2.2.1. 延迟队列
import com.awsa.site.mq.MyConfigProperties;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.annotation.Resource;
import java.util.HashMap;/*** 延迟队列配置文件* * @author mingAn.xie*/
@Configuration
public class RabbitMQConfigTTL {@ResourceMyConfigProperties myConfigProperties;// 1: 声明交换机@Beanpublic TopicExchange ttlTopicExchange(){return new TopicExchange(myConfigProperties.getTtlExchange());}// 2: 声明队列// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。@Beanpublic Queue ttlTopicduanxinQueue(){HashMap<String, Object> args = new HashMap<>();// 给队列设置消息过期时间:毫秒值args.put("x-message-ttl", mqConfigProperties.getDelayExpireTime() * 1000);// 设置队列最大长度args.put("x-max-length", myConfigProperties.getTtlQueueSize());// 设置死信队列交换机名称// 当消息在一个队列中变成死信后,它能就发送到另一个交换机中,这个交换机就是DLX,绑定DLX的队列被称之为死信队列// 编程死信队列的原因:消息被拒绝,消息过期,队列达到最大长度args.put("x-dead-letter-exchange", myConfigProperties.getDeadExchange());// 设置死信队列路由keyargs.put("x-dead-letter-routing-key", myConfigProperties.getDeadKey());return new Queue(myConfigProperties.getTtlQueue(), true, false, false, args);}// 3: 绑定对用关系@Beanpublic Binding ttlTopicsmsBinding(){return BindingBuilder.bind(ttlTopicduanxinQueue()).to(ttlTopicExchange()).with(myConfigProperties.getTtlKey());}}
2.2.2. 死信队列
import com.awsa.site.mq.MyConfigProperties;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.annotation.Resource;/*** 死信队列配置文件* * @author mingAn.xie*/
@Configuration
public class RabbitMQConfigDead {@ResourceMyConfigProperties myConfigProperties;// 1: 声明交换机@Beanpublic TopicExchange deadTopicExchange(){return new TopicExchange(myConfigProperties.getDeadExchange());}// 2: 声明队列// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。@Beanpublic Queue deadTopicduanxinQueue(){return new Queue(myConfigProperties.getDeadQueue(), true);}// 3: 绑定对用关系@Beanpublic Binding deadTopicsmsBinding(){return BindingBuilder.bind(deadTopicduanxinQueue()).to(deadTopicExchange()).with(myConfigProperties.getDeadKey());}}
2.3. 生产者推送消息
import com.awsa.site.mq.MyConfigProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** RabbitMQ生产者推送消息类* * @author xiemingan*/
@Component
@Slf4j
public class RabbitmqProducer {@Resourceprivate RabbitTemplate rabbitTemplate;@Resourceprivate MyConfigProperties myConfigProperties;/*** @param pushMessage 推送消息体*/public void pushTtlMessage(String pushMessage) {// 推送消息至交换机,并指定路由keyrabbitTemplate.convertAndSend(myConfigProperties.getTtlExchange(), myConfigProperties.getTtlKey(), pushMessage);log.info("MQ消息推送队列, exchange: {}, key: {}, message: {}", myConfigProperties.getTtlExchange(), myConfigProperties.getTtlKey(), pushMessage);}}
2.4. 消费者处理消息
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;/*** @author mingAn.xie*/
@Log4j2
@Component
public class RabbitmqConsumer {/*** 消费死信队列* @param message 消息体*/@RabbitListener(queues = "${rabbitmq.deadQueue}")public void pushMessages(Message message) {String body = new String(message.getBody()).trim();if (StringUtils.isEmpty(body)){return;}log.info("MQ消息消费, RabbitmqConsumer.pushMessages() : {}", body);}}
3. 设置消息的过期时间
设置交换机类型为
x-delayed-type
,推送消息至交换机,直连队列消费
3.1. 安装插件 rabbitmq_delayed_message_exchange
前言:这里默认使用环境为
Liunx
系统Docker
安装RabbitMQ
具体可以参考这篇文章:Docker 安装 RabbitMQ 挂载配置文件
安装插件版本需要与RabbitMQ版本一致,否则可能会导致安装失败,可先进入RabbitMQ容器中查看其他插件版本
插件各版本地址: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
- 这里以最新版本 v3.13.0 举例
# 下载插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez# 将插件复制进容器中: rabbitmq_xxxxxx
docker cp rabbitmq_delayed_message_exchange-3.13.0.ez rabbitmq_xxxxxx:/plugins# 进入容器: rabbitmq_xxxxxx
docker exec -it rabbitmq_xxxxxx bash
cd plugins# 查询插件列表, 此处可看到插件的版本
rabbitmq-plugins list# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- 交换机类型中出现
x-delayed-type
表示安装成功

3.2. MQ配置信息
3.2.1. 自定义队列配置
…/bootstrap.yml
#mq队列自定义配置
rabbitmq:saveTaskTtlExchange: ey240001_pro_save_task_ttl_topic_exchangesaveTaskTtlKey: ey240001_pro_save_task_ttlsaveTaskTtlQueue: ey240001.pro.save.task.ttl.topic.queuesaveTaskTtlQueueSize: 10000
3.2.2. 读取自定义MQ配置信息
/*** amqp配置文件** @author mingAn.xie*/
@Data
@Component
@ConfigurationProperties(prefix = "rabbitmq")
public class MyConfigProperties {/*** 任务待办生成延时队列*/public String saveTaskTtlExchange;public String saveTaskTtlKey;public String saveTaskTtlQueue;public Integer saveTaskTtlQueueSize;}
3.3. 配置文件生成 x-delayed-type
交换机
import com.awsa.site.mq.MyConfigProperties;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;/*** x-delayed-type 交换机延迟队列配置* * @author mingAn.xie*/
@Configuration
public class RabbitMQConfigSaveTaskTtl {@ResourceMyConfigProperties myConfigProperties;// 1: 声明交换机@Beanpublic CustomExchange saveTaskTopicExchange() {Map<String, Object> args = new HashMap<>();// 设置延迟队列插件类型:按过期时间消费args.put("x-delayed-type", "direct");// 参数:name 交换机名称,type 交换机类型,durable 是否持久化,autoDelete 是否自动删除,arguments 参数return new CustomExchange(myConfigProperties.getSaveTaskTtlExchange(), "x-delayed-message", true, false, args);}// 2: 声明队列// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。@Beanpublic Queue saveTaskTopicduanxinQueue() {return new Queue(myConfigProperties.getSaveTaskTtlQueue(), true, false, false);}// 3: 绑定对用关系@Beanpublic Binding saveTaskTopicsmsBinding() {return BindingBuilder.bind(saveTaskTopicduanxinQueue()).to(saveTaskTopicExchange()).with(myConfigProperties.getSaveTaskTtlKey()).noargs();}}
3.4. 生产者推送消息
import com.awsa.site.mq.MyConfigProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** 生产者推送消息类* * @author xiemingan*/
@Component
@Slf4j
public class RabbitmqProducer {@Resourceprivate RabbitTemplate rabbitTemplate;@Resourceprivate MyConfigProperties myConfigProperties;/*** @param pushMessage 推送消息体* @param ttlTime 延时时间(毫秒值)*/public void pushTtlMessage(String pushMessage, long ttlTime) {ttlTime = ttlTime <= 0 ? 1000 : ttlTime;// 3.1.推送MQ延迟消息队列long finalTtlTime = ttlTime;MessagePostProcessor messagePostProcessor = message -> {// 设置延迟时间message.getMessageProperties().setDelay((int) finalTtlTime);return message;};rabbitTemplate.convertAndSend(myConfigProperties.getSaveTaskTtlExchange(), myConfigProperties.getSaveTaskTtlKey(), pushMessage, messagePostProcessor);log.info("MQ消息推送队列, exchange: {}, key: {}, message: {}, ttlTime: {}", myConfigProperties.getSaveTaskTtlExchange(), myConfigProperties.getSaveTaskTtlKey(), pushMessage, ttlTime);}}
3.5. 消费者处理消息
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;/*** @author mingAn.xie*/
@Log4j2
@Component
public class RabbitmqConsumer {/*** 消费延时消息* @param message 消息体*/@RabbitListener(queues = "${rabbitmq.saveTaskTtlQueue}")public void pushMessages(Message message) {String body = new String(message.getBody()).trim();if (StringUtils.isEmpty(body)) {return;}log.info("MQ延迟消息消费, RabbitmqConsumer.pushMessages() : {}", body);}}
相关文章:

RabbitMQ 延时消息实现
1. 实现方式 1. 设置队列过期时间:延迟队列消息过期 死信队列,所有消息过期时间一致 2. 设置消息的过期时间:此种方式下有缺陷,MQ只会判断队列第一条消息是否过期,会导致消息的阻塞需要额外安装 rabbitmq_delayed_me…...
【Django】枚举类型数据
模型 在模型里主要增加两项内容: 枚举表字段增加choices class Snort(CoreModel):PAGE_TYPE_CHOICES [(1, 失陷主机检测), # 1是保存到数据库里的数据,失陷主机检测是显示在前端的(2, 远程漏洞攻击检测),(3, 可疑流量行为),(4, WEB检测),]page_type…...
java实现https连接总是要报no cipher suites in common
遇到“no cipher suites in common”这样的错误通常意味着客户端和服务器之间没有共同支持的加密套件(Cipher Suite)。这个问题可能由多个原因引起,包括但不限于SSL/TLS配置错误、Java安全策略限制、客户端或服务器不支持的加密算法等。解决这…...

[C++初阶] 爱上C++ : 与C++的第一次约会
🔥个人主页:guoguoqiang 🔥专栏:我与C的爱恋 本篇内容带大家浅浅的了解一下C中的命名空间。 在c中,名称(name)可以是符号常量、变量、函数、结构、枚举、类和对象等等。工程越大,名称…...

STM32技术打造:智能考勤打卡系统 | 刷卡式上下班签到自动化解决方案
文章目录 一、简易刷卡式打卡考勤系统(一)功能简介原理图设计程序设计 哔哩哔哩: https://www.bilibili.com/video/BV1NZ421Y79W/?spm_id_from333.999.0.0&vd_sourcee5082ef80535e952b2a4301746491be0 一、简易刷卡式打卡考勤系统 &…...

module ‘numpy‘ has no attribute ‘int‘
在 NumPy 中,如果遇到了错误提示 "module numpy has no attribute int",这通常意味着正在尝试以错误的方式使用 NumPy 的整数类型。从 NumPy 1.20 版本开始,numpy.int 已经不再是一个有效的属性,因为 NumPy 不再推荐使用…...

MFC(一)搭建空项目
安装MFC支持库 创建空白桌面程序 项目相关设置 复制以下代码 // mfc.h #pragma once #include <afxwin.h>class MyApp : public CWinApp { public:virtual BOOL InitInstance(); };class MyFrame : public CFrameWnd { public:MyFrame();// 消息映射机制DECLARE_…...

OKCC的API资源管理平台怎么用?
API资源管理平台,重点是“资源”管理平台,不是API接口管理平台。 天天讯通推出的API资源管理平台,类似昆石的VOS系统,区别是VOS是SIP资源管理系统,我们的API资源管理平台是API资源管理系统(AXB、AX、回拨AP…...
CentOS 7 安装python 3.7 需要必要的依赖。
在 CentOS 7 上部署 Python 3.7 可以通过源代码编译安装来实现。以下是大致的步骤: 安装必要的依赖: bashCopy Code sudo yum install gcc openssl-devel bzip2-devel libffi-devel 下载 Python 3.7 源代码并进行编译安装: bashCopy Code wg…...

美术馆设计方案优化布局与设施提升观众体验!
如今,美术馆不仅仅是作为展示艺术作品的平台,也是吸引公众参与和创造独特体验的数字艺术体验空间,因此许多传统美术馆在进行翻修改造时,都会更加注重用户体验,并在其中使用大量的多媒体互动,让参观者能够在…...
数据库基础原理
宏观 数据库的实现原理分为四个部分: 网络通信 网络协议 硬盘存储 内存分配 微观 硬盘存储 数据库是持久化的,而持久化如何实现的,我们不难想到磁盘可以持久化存储,所以数据库所有持久化的数据都是以文件形式存在磁盘中的…...

Pandas操作MultiIndex合并行列的Excel,写入读取以及写入多余行及Index列处理,插入行,修改某个单元格的值,多字段排序
Pandas操作MultiIndex合并行列的excel,写入读取以及写入多余行及Index列处理,多字段排序尽量保持原来的顺序 1. 效果图及问题2. 源码参考 今天是谁写Pandas的 复合索引MultiIndex,写的糊糊涂涂,晕晕乎乎。 是我呀… 记录下&#…...
工作总结5
1.taro框架使用map标签出现的错误 这个问题困扰很长时间,在频繁切换页面渲染的时候出现左边不显示,我理解的是变量没有到达map标签的属性上,那我就想是不是setState太慢了,然后又用了变量,本地缓存等,都没有…...

速通汇编(二)汇编mov、addsub指令
一,mov指令 mov指令的全称是move,从字面上去理解,作用是移动(比较确切的说是复制)数据,mov指令可以有以下几种形式 无论哪种形式,都是把右边的值移动到左边 mov 寄存器,数据&#…...
软考 - 系统架构设计师 - 构件组装技术
概念 构件组装是将库中的构件经修改后相互连接,或者将它们和当前开发项目中的软件元素进行连接,最终构成新的目标构件。 构件组装技术是基于构件的软件开发的核心技术,也是构件技术研究的重点和难点。构件组装的目的是利用现有的构件组装成新…...

2010年之前电脑ubuntu安装nvidia驱动黑屏处理
装好驱动 仿真fps直接到60Hz 陈旧设备 都是非常老旧的电脑,没钱换新电脑,就这么穷…… 电脑详细配置: 冲动 想装显卡驱动提升一下性能,结果……黑了 黑习惯了也无所谓,几分钟就能解决,关键还是太穷&…...

类与对象中C++
加油!!! 文章目录 前言 一、类的6个默认成员函数 编辑 二、构造函数 1.概念 三、析构函数 1.概念 2.特性 四、拷贝构造函数 1.概念 2.特征 拷贝构造函数典型调用场景 五、赋值运算符重载 1.运算符重载 2.赋值运算符重载 赋值运算符重载格式…...
k8s 集群重启报错:The connection to the server 192.168.92.26:6443 was refused
[rootk8s-master ~]# kubectl get node The connection to the server 192.168.92.26:6443 was refused - did you specify the right host or port?查到是kubelet进程没有启动 [rootk8s-master ~]# systemctl status kubelet ● kubelet.service - kubelet: The Kubernetes …...

国内好用的chatGPT和AI绘图工具
分享一个比较好用的AI 分享一个比较好用的AI,只是需要开通会员,目前官网的价格是:298,开通之后可以使用chatgpt4、AI绘画、图片融合等等!不开通的话是可以免费使用15次的,下面是一些介绍图片!链…...

蚂蚁庄园3.31今日答案春季美食“雷竹笋”之所以得名是因为出笋与打雷有关吗?
蚂蚁庄园是一款爱心公益游戏,用户可以通过喂养小鸡,产生鸡蛋,并通过捐赠鸡蛋参与公益项目。用户每日完成答题就可以领取鸡饲料,使用鸡饲料喂鸡之后,会可以获得鸡蛋,可以通过鸡蛋来进行爱心捐赠。其中&#…...
Oracle查询表空间大小
1 查询数据库中所有的表空间以及表空间所占空间的大小 SELECTtablespace_name,sum( bytes ) / 1024 / 1024 FROMdba_data_files GROUP BYtablespace_name; 2 Oracle查询表空间大小及每个表所占空间的大小 SELECTtablespace_name,file_id,file_name,round( bytes / ( 1024 …...

8k长序列建模,蛋白质语言模型Prot42仅利用目标蛋白序列即可生成高亲和力结合剂
蛋白质结合剂(如抗体、抑制肽)在疾病诊断、成像分析及靶向药物递送等关键场景中发挥着不可替代的作用。传统上,高特异性蛋白质结合剂的开发高度依赖噬菌体展示、定向进化等实验技术,但这类方法普遍面临资源消耗巨大、研发周期冗长…...

WordPress插件:AI多语言写作与智能配图、免费AI模型、SEO文章生成
厌倦手动写WordPress文章?AI自动生成,效率提升10倍! 支持多语言、自动配图、定时发布,让内容创作更轻松! AI内容生成 → 不想每天写文章?AI一键生成高质量内容!多语言支持 → 跨境电商必备&am…...
AI编程--插件对比分析:CodeRider、GitHub Copilot及其他
AI编程插件对比分析:CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展,AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者,分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...
Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信
文章目录 Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信前言一、网络通信基础概念二、服务端与客户端的完整流程图解三、每一步的详细讲解和代码示例1. 创建Socket(服务端和客户端都要)2. 绑定本地地址和端口&#x…...

Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习)
Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习) 一、Aspose.PDF 简介二、说明(⚠️仅供学习与研究使用)三、技术流程总览四、准备工作1. 下载 Jar 包2. Maven 项目依赖配置 五、字节码修改实现代码&#…...

Netty从入门到进阶(二)
二、Netty入门 1. 概述 1.1 Netty是什么 Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients. Netty是一个异步的、基于事件驱动的网络应用框架,用于…...

Razor编程中@Html的方法使用大全
文章目录 1. 基础HTML辅助方法1.1 Html.ActionLink()1.2 Html.RouteLink()1.3 Html.Display() / Html.DisplayFor()1.4 Html.Editor() / Html.EditorFor()1.5 Html.Label() / Html.LabelFor()1.6 Html.TextBox() / Html.TextBoxFor() 2. 表单相关辅助方法2.1 Html.BeginForm() …...

Qt的学习(一)
1.什么是Qt Qt特指用来进行桌面应用开发(电脑上写的程序)涉及到的一套技术Qt无法开发网页前端,也不能开发移动应用。 客户端开发的重要任务:编写和用户交互的界面。一般来说和用户交互的界面,有两种典型风格&…...

pgsql:还原数据库后出现重复序列导致“more than one owned sequence found“报错问题的解决
问题: pgsql数据库通过备份数据库文件进行还原时,如果表中有自增序列,还原后可能会出现重复的序列,此时若向表中插入新行时会出现“more than one owned sequence found”的报错提示。 点击菜单“其它”-》“序列”,…...