RabbitMQ死信队列延迟交换机
RabbitMQ死信队列&延迟交换机
1.什么是死信
死信&死信队列 |
---|
![]() |
死信队列的应用:
- 基于死信队列在队列消息已满的情况下,消息也不会丢失
- 实现延迟消费的效果。比如:下订单时,有15分钟的付款时间
2. 实现死信队列
2.1 准备Exchange&Queue
package com.llp.rabbitmq.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 死信队列配置*/
@Configuration
public class DeadLetterConfig {public static final String NORMAL_EXCHANGE = "normal-exchange";public static final String NORMAL_QUEUE = "normal-queue";public static final String NORMAL_ROUTING_KEY = "normal.#";public static final String DEAD_EXCHANGE = "dead-exchange";public static final String DEAD_QUEUE = "dead-queue";public static final String DEAD_ROUTING_KEY = "dead.#";@Beanpublic Exchange normalExchange(){return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).build();}@Beanpublic Queue normalQueue(){//普通队列,绑定死信队列return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey("dead.abc").build();}@Beanpublic Binding normalBinding(Queue normalQueue,Exchange normalExchange){return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs();}@Beanpublic Exchange deadExchange(){return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build();}@Beanpublic Queue deadQueue(){return QueueBuilder.durable(DEAD_QUEUE).build();}@Beanpublic Binding deadBinding(Queue deadQueue,Exchange deadExchange){return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();}
}
2.2 实现效果
-
基于消费者进行reject或者nack实现死信效果
package com.llp.rabbitmq.topic;import com.llp.rabbitmq.config.DeadLetterConfig; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;import java.io.IOException;@Component public class DeadListener {@RabbitListener(queues = DeadLetterConfig.NORMAL_QUEUE)public void consume(String msg, Channel channel, Message message) throws IOException {System.out.println("接收到normal队列的消息:" + msg);//设置消息决绝消费,不需要重新放入到队列中channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);//或者//channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);} }
-
消息的生存时间
-
给消息设置生存时间
@Test public void publishExpire(){String msg = "dead letter expire";rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE, "normal.abc", msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("5000");return message;}}); }
-
给队列设置消息的生存时间
@Bean public Queue normalQueue(){return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey("dead.abc").ttl(10000).build(); }
-
-
设置Queue中的消息最大长度
@Bean public Queue normalQueue(){return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey("dead.abc").maxLength(1).build(); }
只要Queue中已经有一个消息,如果再次发送一个消息,这个消息会变为死信!
3.延迟交换机
下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9
死信队列实现延迟消费时,如果延迟时间比较复杂,比较多,直接使用死信队列时,需要创建大量的队列还对应不同的时间,可以采用延迟交换机来解决这个问题。
将下载的文件上传到linux服务器并使用如下指令,将文件方到rabbitmq容器的plugins目录下
docker cp rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez 9da57c5038ba:/opt/rabbitmq/plugins
在rabbitmq容器的/opt/rabbitmq/sbin目录下执行
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
重启容器生效
docker restart 9da57c5038ba
可以看到添加插件后多了一个延迟交换机的选项
-
构建延迟交换机
package com.llp.rabbitmq.config;import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;import java.util.HashMap; import java.util.Map;/*** 延迟队列*/ @Configuration public class DelayedConfig {public static final String DELAYED_EXCHANGE = "delayed-exchange";public static final String DELAYED_QUEUE = "delayed-queue";public static final String DELAYED_ROUTING_KEY = "delayed.#";@Beanpublic Exchange delayedExchange(){Map<String, Object> arguments = new HashMap<>();arguments.put("x-delayed-type","topic");Exchange exchange = new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,arguments);return exchange;}@Beanpublic Queue delayedQueue(){return QueueBuilder.durable(DELAYED_QUEUE).build();}@Beanpublic Binding delayedBinding(Queue delayedQueue,Exchange delayedExchange){return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();} }
-
发送消息
package com.llp.rabbitmq;import com.llp.rabbitmq.config.DelayedConfig; import org.junit.jupiter.api.Test; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest public class DelayedPublisherTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void publish(){rabbitTemplate.convertAndSend(DelayedConfig.DELAYED_EXCHANGE, "delayed.abc", "xxxx", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//设置消息指定多少时间被消费,单位毫秒message.getMessageProperties().setDelay(30000);return message;}});} }
**延迟交换机存在的问题:**在延迟推送消息的过程中rabbitmq重启了、或者说服务器宕机了就会导致消息丢失
相关文章:

RabbitMQ死信队列延迟交换机
RabbitMQ死信队列&延迟交换机 1.什么是死信 死信&死信队列 死信队列的应用: 基于死信队列在队列消息已满的情况下,消息也不会丢失实现延迟消费的效果。比如:下订单时,有15分钟的付款时间 2. 实现死信队列 2.1 准备E…...

武忠祥老师每日一题||不定积分基础训练(六)
解法一: 求出 f ( x ) , 进而对 f ( x ) 进行积分。 求出f(x),进而对f(x)进行积分。 求出f(x),进而对f(x)进行积分。 令 ln x t , 原式 f ( t ) ln ( 1 e t ) e t 令\ln xt,原式f(t)\frac{\ln (1e^t)}{e^t} 令lnxt,原式f(t)etln(1et) 则 ∫ f ( x ) d…...
C语言结构体详解
结构体是C语言中的一种高级数据类型,它可以将不同的数据类型组合在一起,形成一个自定义的数据类型。结构体为程序员提供了一种组织数据的方式,它为程序开发带来了极大的灵活性和扩展性。 C语言中的结构体定义如下: struct 结构体…...
非盲去模糊简单介绍
文章目录 非盲去模糊简单介绍基于频域的方法1. Wiener滤波器2. 逆滤波器和半正定滤波器 基于空域的方法1. 均值滤波器2. 高斯滤波器3. 双边滤波器 基于偏微分的方法1. 非线性扩散滤波2. 全变分模型3. Laplacian正则化模型 振铃效应应用总结 非盲去模糊简单介绍 非盲去模糊是一…...

C语言动态内存管理与文件操作:打造高效通讯录
本篇博客会讲解如何使用C语言实现一个通讯录。实现通讯录的过程中,会大量用到C语言的知识点,包括但不限于:函数、自定义类型、指针、动态内存管理、文件操作,这些知识点在我的其他博客中都有讲解过,欢迎大家阅读&#…...

2001-2021年全国30省就业人数数据
2001-2021年全国30省就业人数数据/各省就业人数数据 1、时间:2001-2021年 2、范围:包括30个省市不含西藏 3、指标:就业人数 4、来源:各省NJ、社会统计NJ 5、缺失情况说明:无缺失 6、指标说明: 就业人…...

自然语言处理知识抽取(pkuseg、DDParser安装及使用)
一、分词简介 1.基本概念 分词是自然语言处理中的一个重要步骤,它可以帮助我们将文本分成一个个词语,以便更好地理解和分析文本。在计算机视觉、语音识别、机器翻译等领域,分词都扮演着重要的角色。 目前,常用的分词库包括 jie…...
Linux内核面试知识总结
Linux启动过程 1、主机加电自检,加载BIOS硬件信息 2、读取MBR引导文件 3、引导linux内核 4、启动第一个进程init(进程号永远为1) 5、进度相应的运行级别 6、运行终端,输入用户名和密码 linux系统缺省的运行级别 关机、单机…...

深度学习模型压缩与优化加速
1. 简介 深度学习(Deep Learning)因其计算复杂度或参数冗余,在一些场景和设备上限制了相应的模型部署,需要借助模型压缩、系统优化加速、异构计算等方法突破瓶颈,即分别在算法模型、计算图或算子优化以及硬件加速等层…...

Kali 更换源(超详细,附国内优质镜像源地址)
1.进入管理员下的控制台。 2. 输入密码后点击“授权”。 3.在控制台内输入下面的内容。 vim /etc/apt/sources.list 4.敲击回车后会进入下面的页面。 5.来到这个页面后的第一部是按键盘上的“i”键,左下角出现“插入”后说明操作正确。 6.使用“#”将原本的源给注释…...

Java版工程项目管理系统平台+java版企业工程系统源码+助力工程企业实现数字化管理
Java版工程项目管理系统 Spring CloudSpring BootMybatisVueElementUI前后端分离 功能清单如下: 首页 工作台:待办工作、消息通知、预警信息,点击可进入相应的列表 项目进度图表:选择(总体或单个)项目显示1…...

搜索引擎测试报告
文章目录 一、项目背景二、项目功能三、测试目的四、测试环境五、测试计划1、功能测试2、自动化测试 六、测试结果 一、项目背景 java官方文档是我们在学习java语言中不可或缺的权威资料。相比于各种网站的Java资料,官方文档无论是语言表达还是组织方式都要更加全面…...

4年的测试工程师,你遇到过自身瓶颈期吗?又是怎样度过的?
从毕业到现在已经快4年啦,一直软件测试行业混迹。我不是牛人,但是自我感觉还算是个合格的测试工程师,有必要写下自己将近4年来的经历,给自我以提示,给刚入行的朋友提供点参考。 貌似这一点适应的行业最广,…...

【Python零基础学习入门篇④】——第四节:Python的列表、元组、集合和字典
⬇️⬇️⬇️⬇️⬇️⬇️ ⭐⭐⭐Hello,大家好呀我是陈童学哦,一个普通大一在校生,请大家多多关照呀嘿嘿😁😊😘 🌟🌟🌟技术这条路固然很艰辛,但既已选择&…...

3.6 cache存储器
学习步骤: 我会采取以下几个步骤来学习Cache存储器: 确定学习目标:Cache存储器作为一种高速缓存存储器,通常用于提高计算机系统的运行效率。因此,我需要明确学习Cache存储器的目的,包括了解其原理、结构和…...

Ubuntu零基础安装
Ubuntu零基础安装 首先我们需要安装VM,再安装ubuntu。 1、安装VM 进入VM官网 VM官网地址 选择下载试用版 下载Windows版本 下载完成后,点击安装包进行安装 至此就安装完毕了。 桌面会出现VM的图标。 点击打开,弹出如下画面: …...
热门的常用 API 大全分享
天气/环境 空气质量查询: 查询国内3400个城市的整点观测,获取指定城市的整点观测空气质量。未来7天生活指数:支持国内3400个城市以及国际4万个城市的天气指数数据,包括晨练、洗车、穿衣(12项,有详细说明&a…...

利用粒子群算法设计无线传感器网络中的最优安全路由模型(Matlab代码实现)
目录 💥1 概述 📚2 运行结果 🎉3 参考文献 👨💻4 Matlab代码 💥1 概述 无线传感器网络(WSN)由数十个、数百个甚至数千个自主传感器组成。这些传感器以无线方式嵌入环境中&…...

2023年华东杯数学建模B 题 期货价格相关性问题-思路解析
题目背景: 许多金融标的都有其内在的关联,如何从量价数据找到这种关联是一个有趣的 问题。例如在万得的“煤焦钢矿”板块中,有螺纹钢、铁矿石、不锈钢、热轧卷板、 硅铁、焦煤、焦炭、锰硅、线材 9 个品种。这些品种有些是上下游关系&…...

SAP UI5 之Controls (控件) 笔记三
文章目录 官网 Walkthrough学习-Controls控件1.0.1 在index.html中使用class id 属性控制页面展示的属性1.0.2 我们在index.js文件中引入 text文本控制1.0.3打开浏览器查看结果 官网 Walkthrough学习-Controls控件 Controls控件 在前面展示在浏览器中的Hello World 是在Html …...

MongoDB学习和应用(高效的非关系型数据库)
一丶 MongoDB简介 对于社交类软件的功能,我们需要对它的功能特点进行分析: 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具: mysql:关系型数据库&am…...

ESP32读取DHT11温湿度数据
芯片:ESP32 环境:Arduino 一、安装DHT11传感器库 红框的库,别安装错了 二、代码 注意,DATA口要连接在D15上 #include "DHT.h" // 包含DHT库#define DHTPIN 15 // 定义DHT11数据引脚连接到ESP32的GPIO15 #define D…...
【算法训练营Day07】字符串part1
文章目录 反转字符串反转字符串II替换数字 反转字符串 题目链接:344. 反转字符串 双指针法,两个指针的元素直接调转即可 class Solution {public void reverseString(char[] s) {int head 0;int end s.length - 1;while(head < end) {char temp …...
Axios请求超时重发机制
Axios 超时重新请求实现方案 在 Axios 中实现超时重新请求可以通过以下几种方式: 1. 使用拦截器实现自动重试 import axios from axios;// 创建axios实例 const instance axios.create();// 设置超时时间 instance.defaults.timeout 5000;// 最大重试次数 cons…...
CRMEB 框架中 PHP 上传扩展开发:涵盖本地上传及阿里云 OSS、腾讯云 COS、七牛云
目前已有本地上传、阿里云OSS上传、腾讯云COS上传、七牛云上传扩展 扩展入口文件 文件目录 crmeb\services\upload\Upload.php namespace crmeb\services\upload;use crmeb\basic\BaseManager; use think\facade\Config;/*** Class Upload* package crmeb\services\upload* …...

第 86 场周赛:矩阵中的幻方、钥匙和房间、将数组拆分成斐波那契序列、猜猜这个单词
Q1、[中等] 矩阵中的幻方 1、题目描述 3 x 3 的幻方是一个填充有 从 1 到 9 的不同数字的 3 x 3 矩阵,其中每行,每列以及两条对角线上的各数之和都相等。 给定一个由整数组成的row x col 的 grid,其中有多少个 3 3 的 “幻方” 子矩阵&am…...

Spring数据访问模块设计
前面我们已经完成了IoC和web模块的设计,聪明的码友立马就知道了,该到数据访问模块了,要不就这俩玩个6啊,查库势在必行,至此,它来了。 一、核心设计理念 1、痛点在哪 应用离不开数据(数据库、No…...

网站指纹识别
网站指纹识别 网站的最基本组成:服务器(操作系统)、中间件(web容器)、脚本语言、数据厍 为什么要了解这些?举个例子:发现了一个文件读取漏洞,我们需要读/etc/passwd,如…...

[ACTF2020 新生赛]Include 1(php://filter伪协议)
题目 做法 启动靶机,点进去 点进去 查看URL,有 ?fileflag.php说明存在文件包含,原理是php://filter 协议 当它与包含函数结合时,php://filter流会被当作php文件执行。 用php://filter加编码,能让PHP把文件内容…...
ubuntu22.04 安装docker 和docker-compose
首先你要确保没有docker环境或者使用命令删掉docker sudo apt-get remove docker docker-engine docker.io containerd runc安装docker 更新软件环境 sudo apt update sudo apt upgrade下载docker依赖和GPG 密钥 # 依赖 apt-get install ca-certificates curl gnupg lsb-rel…...