当前位置: 首页 > news >正文

Spring RabbitMQ那些事(2-两种方式实现延时消息订阅)

目录

  • 一、序言
  • 二、死信交换机和消息TTL实现延迟消息
    • 1、死信队列介绍
    • 2、代码示例
      • (1) 死信交换机配置
      • (2) 消息生产者
      • (3) 消息消费者
    • 3、测试用例
  • 三、延迟消息交换机实现延迟消息
    • 1、安装延时消息插件
    • 2、代码示例
      • (1) 延时消息交换机配置
      • (2) 消息生产者
      • (3) 消息消费者
    • 3、测试用例
  • 四、两种实现方式优缺点
  • 1、延时消息插件
  • 2、TLL&死信交换机

一、序言

业务开发中有很多延时操作的场景,比如最常见的超时订单自动关闭延时异步处理,我们常用的实现方式有:

  • 定时任务轮询(有延时)。
  • 借助Redission的延时队列
  • Redis的key过期事件通知机制(需开启key过期事件通知,对Redis有性能损耗)。
  • RocketMQ中定时消息推送(支持的时间间隔固定,不支持自定义)。
  • RabbitMQ中的死信队列和延迟消息交换机

其中用的最多的也是借助Redisson实现的数据结构延迟队列RabbitMQ中的死信队列来实现,今天我们通过RabbitMQ死信队列和延迟消息交换机(新特性)来实现延时消息推送。


二、死信交换机和消息TTL实现延迟消息

1、死信队列介绍

这种方式主要通过结合消息过期和私信交换机来实现延迟消息推送,首先先了解下哪些消息会进入死信队列:

  • 被消费者nack(negatively acknowleged)的消息。
  • TTL过期后未被消费的消息。
  • 超过队列长度限制后被丢弃的消息。

备注:更多信息请参考RabbitMQ中的 Dead Letter Exchange。

2、代码示例

(1) 死信交换机配置

@Configuration
protected static class DeadLetterExchangeConfig {@Beanpublic Queue deadLetterQueue(){return QueueBuilder.durable("dead-letter-queue").build();}@Beanpublic DirectExchange deadLetterExchange() {return ExchangeBuilder.directExchange("dead-letter-exchange").build();}@Beanpublic Binding bindQueueToDeadLetterExchange(Queue deadLetterQueue, DirectExchange deadLetterExchange) {return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("dead-letter-routing-key");}@Beanpublic Queue normalQueue() {return QueueBuilder.durable("normal-queue").deadLetterExchange("dead-letter-exchange").deadLetterRoutingKey("dead-letter-routing-key").build();}}

(2) 消息生产者

@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMqProducer {private final RabbitTemplate rabbitTemplate;public void sendMsgToDeadLetterExchange(String body, int timeoutInMillSeconds) {log.info("开始发送消息到dead letter exchange 消息体:{}, 消息延迟:{}ms, 当前时间:{}", body, timeoutInMillSeconds, LocalDateTime.now());MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setExpiration(String.valueOf(timeoutInMillSeconds)).build();Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();rabbitTemplate.send("normal-queue", message);}}

(3) 消息消费者

@Slf4j
@Component
public class RabbitMqConsumer {@RabbitListener(queues = "dead-letter-queue")public void handleMsgFromDeadLetterQueue(String msg) {log.info("Message received from dead-letter-queue, message body: {}, current time:{}", msg, LocalDateTime.now());}
}

3、测试用例

@RestController
@RequiredArgsConstructor
public class RabbitMsgController {private final RabbitMqProducer rabbitMqProducer;@RequestMapping("/exchange/dead-letter")public ResponseEntity<String> sendMsgToDeadLetterExchange(String body, int timeout) {rabbitMqProducer.sendMsgToDeadLetterExchange(body, timeout);return ResponseEntity.ok("消息发送到死信交换机成功");}}

浏览器访问http://localhost:8080/exchange/dead-letter?body=hello&timeout=5000,可以看到消息被延迟5s处理。

2023-11-26 11:50:33.041  INFO 19152 --- [nio-8080-exec-7] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到dead letter exchange 消息体:hello, 消息延迟:5000ms, 当前时间:2023-11-26T11:50:33.041
2023-11-26 11:50:38.054  INFO 19152 --- [ntContainer#4-4] c.u.r.i.consumer.RabbitMqConsumer        : Message received from dead-letter-queue, message body: hello, current time:2023-11-26T11:50:38.054

三、延迟消息交换机实现延迟消息

上面通过消息TTL和死信交换机实现延迟消息的解决方案是由一个叫James Carr的人提出来的,后来RabbitMQ提供了一个开箱即用的解决方案,通过延时消息插件来实现。

该插件以前被当做是试验性产品,但是现在已经可以投产使用了。(PS:2015年4月16号就已经有该插件文档)

Spring AMQP中,同样提供了对该延时消息插件的支持,并且在RabbitMQ 3.6.0版本就已经测试通过。

1、安装延时消息插件

该延时消息插件为社区插件,因此需要自己手动下载安装的RabbMQ版本对应的插件,下载地址:RabbitMQ延时消息插件releases。

我安装的RabbitMQ版本为3.9.9,3.9.0版本的插件对所有3.9.x版本的RabbitMQ都支持。

在这里插入图片描述
下载完后把.ez结尾的插件复制RabbitMQ的插件目录下,插件目录为/usr/lib/rabbitmq/plugins

在这里插入图片描述
通过命令rabbitmq-plugins enable rabbitmq_delayed_message_exchange安装该插件,通过命令rabbitmq-plugins list查看插件列表,可以看到该延时消息插件已经成功安装。

在这里插入图片描述

2、代码示例

(1) 延时消息交换机配置

@Configuration
protected static class DelayedMsgExchangePluginConfig {@Beanpublic Queue delayedQueue() {return QueueBuilder.durable("delayed-queue").build();}@Beanpublic DirectExchange delayedExchange() {return ExchangeBuilder.directExchange("delayed-exchange").delayed().build();}@Beanpublic Binding bindDelayedQueueToDelayedChange(Queue delayedQueue, DirectExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with("delayed-routing-key");}
}

备注:延时交换机的类型可以为DirectExchage、TopicExcahgeFanoutExchange,这些都支持。

(2) 消息生产者

@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMqProducer {private final RabbitTemplate rabbitTemplate;public void sendDelayedMsg(String body, int timeoutInMillSeconds) {log.info("开始发送消息到delayed-exchange 消息体:{}, 消息延迟:{}ms, 当前时间:{}", body, timeoutInMillSeconds, LocalDateTime.now());MessageProperties messageProperties = new MessageProperties();messageProperties.setDelay(timeoutInMillSeconds);Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();rabbitTemplate.send("delayed-exchange", "delayed-routing-key", message);}
}

(3) 消息消费者

@Slf4j
@Component
public class RabbitMqConsumer {@RabbitListener(queues = "delayed-queue")public void handleMsgFromDelayedQueue(String msg) {log.info("Message received from delayed-queue, message body: {}, current time:{}", msg, LocalDateTime.now());}
}

3、测试用例

@RestController
@RequiredArgsConstructor
public class RabbitMsgController {private final RabbitMqProducer rabbitMqProducer;@RequestMapping("/exchange/delayed")public ResponseEntity<String> sendMsgToHeadersExchange(String body, int timeout) {rabbitMqProducer.sendDelayedMsg(body, timeout);return ResponseEntity.ok("消息发送到延迟交换机成功");}}

浏览器访问http://localhost:8080/exchange/dead-letter?body=hello&timeout=5000,可以看到消息被延迟5s处理。

2023-11-26 13:02:07.816  INFO 26524 --- [nio-8080-exec-3] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到delayed-exchange 消息体:Hello, 消息延迟:5000ms, 当前时间:2023-11-26T13:02:07.816
2023-11-26 13:02:12.830  INFO 26524 --- [ntContainer#5-5] c.u.r.i.consumer.RabbitMqConsumer        : Message received from delayed-queue, message body: Hello, current time:2023-11-26T13:02:12.829

四、两种实现方式优缺点

1、延时消息插件

  • 优点:配置更加简单,少配置1个队列,且语义更明确,容易定位消息出入口。
  • 缺点:延时消息插件对RabbitMQ版本有要求,延时消息交换机

2、TLL&死信交换机

  • 优点:基本适用于所有RabbitMQ版本。
  • 缺点:配置相对来说复杂一些,还有就是我们最开始提到的,不只是TTL过期的消息才会进入死信队列,还有超出队列限制nack的消息也会进入死信队列,触发的条件没那么纯粹。

在这里插入图片描述

相关文章:

Spring RabbitMQ那些事(2-两种方式实现延时消息订阅)

目录 一、序言二、死信交换机和消息TTL实现延迟消息1、死信队列介绍2、代码示例(1) 死信交换机配置(2) 消息生产者(3) 消息消费者 3、测试用例 三、延迟消息交换机实现延迟消息1、安装延时消息插件2、代码示例(1) 延时消息交换机配置(2) 消息生产者(3) 消息消费者 3、测试用例 …...

免费SSL证书有效期只有90天?太短?

随着网络安全问题日益受到重视&#xff0c;SSL证书成为了网站安全的必需品。然而&#xff0c;在许多情况下&#xff0c;免费提供的SSL证书往往只有90天的有效期&#xff0c;这种期限对于很多用户来说显得过于短暂。 首先&#xff0c;我们要理解为什么 SSL 证书的有效期设定为90…...

Java小游戏 王者荣耀

GameFrame类 所需图片&#xff1a; package 王者荣耀;import java.awt.*; import java.awt.event.ActionEvent; import java.awt.event.ActionListener; import java.awt.event.KeyAdapter; import java.awt.event.KeyEvent; import java.io.File; import java.util.ArrayLis…...

Python:diskcache实现基于文件的数据缓存

diskcache是一个基于Sqlite文件的数据缓存 文档 https://grantjenks.com/docs/diskcache/https://github.com/grantjenks/python-diskcachehttps://pypi.org/project/diskcache/ 示例 from diskcache import Cache# 指定文件夹 cache Cache(./cache)# 存 cache.set(name, …...

微信小程序 - 一篇带你解读小程序强制更新(冷/热启动)

在小程序开发中&#xff0c;我们会不可避免的涉及到小程序新版本迭代的问题&#xff0c;因为小程序的更新机制是异步的&#xff0c;新版本发布后并不会立刻应用到所有的现有用户&#xff0c;部分用户用的可能还是原来的旧版本&#xff0c;但如果是急需修复的 bug 或其他急需上线…...

关于接口测试自动化的总结与思考!

序 近期看到阿里云性能测试 PTS 接口测试开启免费公测&#xff0c;本着以和大家交流如何实现高效的接口测试为出发点&#xff0c;本文包含了我在接口测试领域的一些方法和心得&#xff0c;希望大家一起讨论和分享&#xff0c;内容包括但不仅限于&#xff1a; 服务端接口测试介…...

如何用低代码的思路设计文字描边渐变组件

前言 文字特效设计一直是困扰 Web 前端 Css 世界多年的问题, 比如如何用纯 Css 实现文字描边, 渐变, 阴影等, 由于受限于浏览器兼容性的问题, 我们不得不使用其他替代方案来实现. 平时工作中我们使用 PS 等设计工具能很容易的实现文字渐变等特效, 但是随着可视化技术的成熟, 我…...

Linux 网络通信

(一)套接字Socket概念 Socket 中文意思是“插座”&#xff0c;在 Linux 环境下&#xff0c;用于表示进程 x 间网络通信的特殊文件 类型。本质为内核借助缓冲区形成的伪文件。 既然是文件&#xff0c;那么理所当然的&#xff0c;我们可以使用文件描述符引用套接字。Linux 系统…...

借力互联网,民营医院探索互联网医疗服务的发展方向

民营医院互联网医疗服务是指利用互联网技术和平台&#xff0c;为患者提供更加便捷、高效的医疗服务。在当前数字化时代&#xff0c;互联网医疗服务正逐渐成为医疗行业的新趋势&#xff0c;也为民营医院开拓了更广阔的发展空间。下面将围绕这一主题进行讨论&#xff1a; 首先&a…...

office tool plus工具破解word、visio等软件步骤

第一步&#xff1a;下载工具 破解需要用到office tool plus软件 office tool plus软件下载地址&#xff1a;Office Tool Plus 官方网站 - 一键部署 Office 选择其中一个下载到本地&#xff08;本人选择的是第一个的云图小镇下载方式&#xff09; 第二步&#xff1a;启动工具 …...

python之pyqt专栏5-信号与槽1

在上一篇文章&#xff0c;我们了解到如果想要用代码改变QLabel的文本内容&#xff0c;可以调用QLabel类的text()函数。 但是现在有个这样的需求&#xff0c;界面中有一个Button与一个Label&#xff0c;当点击Button时&#xff0c;将Label的内容改变为“Hello world&#xff01;…...

【JMeter】不同场景下的接口请求

场景1: 上传文件接口即Content-Type=multipart/form-data 步骤: 1. 接口url,method以及path正常填写 2.文件上传content-type是multipart/form-data,所以可以勾选【use multipart/form-data】,如果还有其他请求头信息可以添加一个请求头元件 3.请求参…...

十八数字文化受邀参加版博会“区块链+版权”创新应用试点研讨会

2023年11月23日至25日&#xff0c;以“版权新时代 赋能新发展”为主题的第九届中国国际版权博览会在成都市中国西部国际博览城和天府国际会议中心举办。版博会是我国版权领域唯一的综合性、国际性、国家级版权专业博览会&#xff0c;本届版博会由国家版权局主办&#xff0c;四川…...

Centos 7 离线安装(tar) NodeJS 16 和 Vue

目录 一、下载Nodejs二、安装Nodejs2.1、创建安装目录2.2、上传安装包(无网络) or 直接下载(有网络)2.3、解压缩2.4、配置环境变量2.5、创建软连接2.6、更换镜像源2.7、验证是否安装成功 三、安装Vue四、卸载Nodejs 一、下载Nodejs Nodejs&#xff1a;https://nodejs.org/en/ …...

卸载软件最最最彻底的工具——Uninstall Tool

卸载软件最最最彻底的工具——Uninstall Tool Uninstall Tool 是一款功能强大的专业卸载工具。针对一些普通卸载不彻底的问题&#xff0c;它可以做到最优&#xff0c;比如Matlab等软件的卸载难的问题也可以较好地解决。 它比 Windows 自带的“添加/删除程序”功能快 3 倍&…...

2022年MathorCup大数据竞赛B题北京移动用户体验影响因素研究求解全过程文档及程序

2022年MathorCup高校数学建模挑战赛—大数据竞赛 B题 北京移动用户体验影响因素研究 原题再现&#xff1a; 移动通信技术飞速发展&#xff0c;给人们带来了极大便利&#xff0c;人们也越来越离不开移动通信技术带来的各种便捷。随着网络不断的建设&#xff0c;网络覆盖越来越…...

C#,《小白学程序》第二十课:大数的加法(BigInteger Add)

大数的&#xff08;加减乘除&#xff09;四则运算、阶乘运算。 乘法计算包括小学生算法、Karatsuba和Toom-Cook3算法。 重复了部分 19 课的代码。 1 文本格式 using System; using System.Linq; using System.Text; using System.Collections.Generic; /// <summary>…...

通用功能——git 攻略

摘要 本文主要介绍git常用命令的使用方法&#xff0c;同时介绍一些常见问题的处理方法&#xff0c;持续更新中… git命令通用选项 大多数git命令都适用的选项列表如下&#xff1a; -v, --verbose show hash and subject, give twice for upstream branch -q, --quie…...

LemMinX-Maven:帮助在eclipse中更方便地编辑maven的pom文件

LemMinX-Maven&#xff1a;https://github.com/eclipse/lemminx-maven LemMinX-Maven可以帮助我们在eclipse中更方便地编辑maven工程的pom.xml文件&#xff0c;例如补全、提示等。不用单独安装&#xff0c;因为在安装maven eclipse插件的时候已经自动安装了&#xff1a; 例…...

CAD与 PDM系统如何协同工作的?

在产品研发中&#xff0c;CAD&#xff08;计算机辅助设计&#xff09;和PDM&#xff08;产品数据管理&#xff09;是两个核心的工具&#xff0c;它们在产品从设计到制造的整个生命周期中发挥着重要的作用。虽然这两个工具在功能上有所不同&#xff0c;但它们在使用上却有着密切…...

UE5 学习系列(二)用户操作界面及介绍

这篇博客是 UE5 学习系列博客的第二篇&#xff0c;在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下&#xff1a; 【Note】&#xff1a;如果你已经完成安装等操作&#xff0c;可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作&#xff0c;重…...

理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端

&#x1f31f; 什么是 MCP&#xff1f; 模型控制协议 (MCP) 是一种创新的协议&#xff0c;旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议&#xff0c;它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...

ESP32读取DHT11温湿度数据

芯片&#xff1a;ESP32 环境&#xff1a;Arduino 一、安装DHT11传感器库 红框的库&#xff0c;别安装错了 二、代码 注意&#xff0c;DATA口要连接在D15上 #include "DHT.h" // 包含DHT库#define DHTPIN 15 // 定义DHT11数据引脚连接到ESP32的GPIO15 #define D…...

django filter 统计数量 按属性去重

在Django中&#xff0c;如果你想要根据某个属性对查询集进行去重并统计数量&#xff0c;你可以使用values()方法配合annotate()方法来实现。这里有两种常见的方法来完成这个需求&#xff1a; 方法1&#xff1a;使用annotate()和Count 假设你有一个模型Item&#xff0c;并且你想…...

什么是库存周转?如何用进销存系统提高库存周转率?

你可能听说过这样一句话&#xff1a; “利润不是赚出来的&#xff0c;是管出来的。” 尤其是在制造业、批发零售、电商这类“货堆成山”的行业&#xff0c;很多企业看着销售不错&#xff0c;账上却没钱、利润也不见了&#xff0c;一翻库存才发现&#xff1a; 一堆卖不动的旧货…...

MVC 数据库

MVC 数据库 引言 在软件开发领域,Model-View-Controller(MVC)是一种流行的软件架构模式,它将应用程序分为三个核心组件:模型(Model)、视图(View)和控制器(Controller)。这种模式有助于提高代码的可维护性和可扩展性。本文将深入探讨MVC架构与数据库之间的关系,以…...

【ROS】Nav2源码之nav2_behavior_tree-行为树节点列表

1、行为树节点分类 在 Nav2(Navigation2)的行为树框架中,行为树节点插件按照功能分为 Action(动作节点)、Condition(条件节点)、Control(控制节点) 和 Decorator(装饰节点) 四类。 1.1 动作节点 Action 执行具体的机器人操作或任务,直接与硬件、传感器或外部系统…...

python爬虫:Newspaper3k 的详细使用(好用的新闻网站文章抓取和解析的Python库)

更多内容请见: 爬虫和逆向教程-专栏介绍和目录 文章目录 一、Newspaper3k 概述1.1 Newspaper3k 介绍1.2 主要功能1.3 典型应用场景1.4 安装二、基本用法2.2 提取单篇文章的内容2.2 处理多篇文档三、高级选项3.1 自定义配置3.2 分析文章情感四、实战案例4.1 构建新闻摘要聚合器…...

Java 加密常用的各种算法及其选择

在数字化时代&#xff0c;数据安全至关重要&#xff0c;Java 作为广泛应用的编程语言&#xff0c;提供了丰富的加密算法来保障数据的保密性、完整性和真实性。了解这些常用加密算法及其适用场景&#xff0c;有助于开发者在不同的业务需求中做出正确的选择。​ 一、对称加密算法…...

Java求职者面试指南:Spring、Spring Boot、MyBatis框架与计算机基础问题解析

Java求职者面试指南&#xff1a;Spring、Spring Boot、MyBatis框架与计算机基础问题解析 一、第一轮提问&#xff08;基础概念问题&#xff09; 1. 请解释Spring框架的核心容器是什么&#xff1f;它在Spring中起到什么作用&#xff1f; Spring框架的核心容器是IoC容器&#…...