RabbitMQ实现延迟消息发送——实战篇
在项目中,我们经常需要使用消息队列来实现延迟任务,本篇文章就向各位介绍使用RabbitMQ如何实现延迟消息发送,由于是实战篇,所以不会讲太多理论的知识,还不太理解的可以先看看MQ的延迟消息的一个实现原理再来看这篇文章跟着练手哦~
需求背景
我这儿的一个需求背景大概是干部添加完活动后,由管理员进行审批,审批通过后,该活动id连同设置的过期时间会被放入消息队列中,等到活动结束时间到的时候,自动将活动的状态设置为已完成,这里华丽一个活动图,各位参考一下。
需求了解完之后我们就可以开始的写代码啦~(手动微笑)
相关知识点拓展
这里还是简单提一下MQ实现延迟队列的一个方法,一种是用插件,还有一种是使用死信队列,当然本文我们使用的就是通过死信队列来实现的。
当我们的一个正常消息因为设置了过期时间或者被消费者拒绝消费的时候,这条消息就会被放入死信队列中,然后死信队列再进行消费。
然后啰嗦一下,说一下MQ的交换机类型,以及死信交换机一般选用哪种:
1. Direct Exchange(直连交换机)
- 特点:
- 根据消息的 Routing Key 精确匹配队列的 Binding Key。
- 完全匹配时,消息才会被路由到对应的队列。
- 适用场景:
- 点对点消息传递,消息需要精确路由到特定队列。
- 示例:
- 消息的 Routing Key 为
order.created
,队列的 Binding Key 也为order.created
,则消息会被路由到该队列。
2. Fanout Exchange(扇出交换机)
- 特点:
- 将消息广播到所有绑定到该交换机的队列,忽略 Routing Key。
- 适用场景:
- 广播消息,消息需要发送到多个队列。
- 示例:
- 消息发送到 Fanout Exchange,所有绑定到该交换机的队列都会收到消息。
3. Topic Exchange(主题交换机)
- 特点:
- 根据消息的 Routing Key 和队列的 Binding Key 进行模式匹配。
- Binding Key 支持通配符:
*
:匹配一个单词。#
:匹配零个或多个单词。- 适用场景:
- 消息需要根据模式路由到多个队列。
- 示例:
- 消息的 Routing Key 为
order.created.us
,队列的 Binding Key 为order.created.*
,则消息会被路由到该队列。
4. Headers Exchange(头交换机)
- 特点:
- 根据消息的 Headers(键值对)匹配队列的 Binding Arguments。
- 忽略 Routing Key。
- 适用场景:
- 消息需要根据复杂的条件路由到队列。
- 示例:
- 消息的 Headers 包含
type=order
和region=us
,队列的 Binding Arguments 要求x-match=all
且type=order
,则消息会被路由到该队列。
5. Default Exchange(默认交换机)
- 特点:
- RabbitMQ 默认创建的交换机,类型为 Direct Exchange。
- 每个队列都会自动绑定到默认交换机,Binding Key 为队列名称。
- 适用场景:
- 默认情况下,消息可以直接发送到队列。
死信交换机适合使用哪种类型?
死信交换机(DLX, Dead Letter Exchange)的类型选择取决于你的业务需求。以下是常见的选择:
1. Direct Exchange
- 适用场景:
- 死信消息需要精确路由到特定的死信队列。
- 示例:
- 将死信消息路由到
dlx-queue
,用于统一处理所有死信消息。2. Topic Exchange
- 适用场景:
- 死信消息需要根据不同的 Routing Key 路由到不同的死信队列。
- 示例:
- 将死信消息根据业务类型(如
order.dead
、payment.dead
)路由到不同的死信队列。3. Fanout Exchange
- 适用场景:
- 死信消息需要广播到多个死信队列。
- 示例:
- 将死信消息同时发送到日志队列和报警队列。
推荐选择
- 大多数情况下,死信交换机使用 Direct Exchange,因为死信消息通常需要精确路由到一个死信队列,用于统一处理。
- 如果死信消息需要根据不同的条件路由到多个队列,可以使用 Topic Exchange。
代码部分
首先,我们需要定义一个死信交换机和死信队列,用来接收来自普通队列的消息。
// 创建死信交换机,处理延迟消息通知@Bean("dead_letter_exchange")public DirectExchange delayExchange(){return new DirectExchange("dead_letter_exchange",true,false);}
// 创建死信队列public Queue deadLetterQueue(){Queue queue = new Queue("dead_letter_queue", true);rabbitAdmin.declareQueue(queue);log.info("死信队列声明成功:" + queue.getName());return queue; }
然后,我们需要配置一个普通的消息队列和一个普通的交换机,这个消息队列需要设置对应的死信交换机和死信路由,同时我们这个普通队列需要接收一个过期时间,保证一到过期时间消息就会被发送到死信队列当中。
// 创建一个普通队列,接受一个过期时间,出列活动结束后,发送到死信队列public Queue normalQueue(Long expireTime){Map<String,Object> args = new HashMap<>();if (expireTime != null && expireTime > 0) { // 确保 TTL 是正数args.put("x-message-ttl", expireTime);}// 设置死信交换机args.put("x-dead-letter-exchange",deadLetterExchange);// 设置死信路由键args.put("x-dead-letter-routing-key","dead_letter_routing_key");Queue queue = new Queue("normal_queue", true, false, false, args);log.info("普通队列声明成功:" + queue.getName());return queue; }
// 创建一个普通交换机,处理活动结束自动设置活动状态为结束@Bean("activity_end_exchange")public DirectExchange activityEndExchange(){return new DirectExchange("activity_end_exchange");}
然后我们需要分别将死信交换机和死信队列,普通交换机和普通队列分别进行绑定。
// 将死信队列和死信交换机进行绑定public void bindDeadLetterRouting(){Queue queue=queueDeclareConfig.deadLetterQueue();Binding binding = BindingBuilder.bind(queue).to(deadLetterExchange).with("dead_letter_routing_key");rabbitAdmin.declareBinding(binding);log.info("死信队列绑定成功,死信队列名称----》" + queue.getName() + ",死信交换机名称----》" + deadLetterExchange.getName());}// 绑定活动结束交换机和普通队列public void bindActivityEndRouting(Long expireTime) {Queue queue = queueDeclareConfig.normalQueue(expireTime);Binding binding = BindingBuilder.bind(queue).to(activityEndExchange).with("activity_end_routing_key");rabbitAdmin.declareBinding(binding);}
当然,我们还需要配置生产者来发送消息到交换机里面
//活动结束后,发送消息到死信队列,自动设置活动结束状态public void sendActivityEndMessage(Long expireTime, Integer activityId) {rabbitMQBindRoutingConfig.bindDeadLetterRouting();rabbitMQBindRoutingConfig.bindActivityEndRouting(expireTime);try {// 将消息发送到普通队列,等待消息过期发送到死信交换机rabbitTemplate.convertAndSend("activity_end_exchange", "activity_end_routing_key", activityId, msg -> {msg.getMessageProperties().setExpiration(expireTime.toString());return msg;});} catch (Exception e) {log.error("发送消息失败------->" + activityId);throw new RuntimeException("发送消息失败---->" + activityId);}}
这里生产者的代码可以根据你的业务逻辑具体进行更改~
消费者逻辑也需要进行编写一下
// 使用MQ延迟队列,活动结束,修改活动状态@RabbitListener(queues = "dead_letter_queue")public void updatePlaceOccupyStatus(Message message, Channel channel){try {String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);Integer activityId = Integer.parseInt(messageBody);ActivityInfo activityInfo = baseMapper.selectById(activityId);LambdaUpdateWrapper<ActivityInfo> wrapper = new LambdaUpdateWrapper<>();wrapper.eq(ActivityInfo::getActivityId,activityId).set(ActivityInfo::getProgress,StatusConstant.FINISH);if(baseMapper.update(activityInfo,wrapper)>0){channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}} catch (Exception e) {log.error("处理消息时发生错误:" + e.getMessage());try {channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);} catch (IOException ioException) {ioException.printStackTrace();}}
消费者这边需要注意的是如果你选择的提交类型不是自动提交的话,在处理完消息之后需要手动ack一下消息,不然消费的消息不会被认为已经消费,从而导致消息积压,也会在之后的消费中重复进行消费,因此你需要告诉生产者这条消息已经被消费了。
当然,如果在消费的过程中出现了什么问题,可以设置以下这行代码:
basicNack方法接收三个参数:
deliveryTag: 消息的标识符。
multiple: 是否对多个消息进行否定确认。
requeue: 是否将消息重新放入队列。
可以根据你的需求进行设定~
然后的然后,我们需要再application.yml当中进行配置相关信息:
rabbitmq:host: localhostport: 5672username: guestpassword: guest
# 确认消息发送到交换机上publisher-confirm-type: correlated# 消息发送到队列确认,失败回调publisher-returns: truelistener:direct:acknowledge-mode: manualretry:enabled: true
# 重试的时间间隔为1sinitial-interval: 1000ms
# 最大重试3次max-attempts: 3
# 最大的重试时间间隔为2smax-interval: 2000ms
# 每次重试时间间隔为1s,每次重试时间间隔倍数multiplier: 1.0#重试次数超过上面的设置之后是否丢弃(false不丢弃时需要写相应代码将该消息加入死信队列)default-requeue-rejected: falsesimple:default-requeue-rejected: falseacknowledge-mode: manual
# 最小消费者数量concurrency: 1
# 最大消费者数量max-concurrency: 10retry:enabled: trueinitial-interval: 1000msmax-attempts: 3max-interval: 2000msmultiplier: 1.0
上面给出了一个比较全的配置,你可以根据你的需求进行选择,但是需要注意的是default-requeue-rejected: false这一行配置一定要先配置,不然你的消息在普通队列中过期了,是不会发送到死信队列当中进行消费的~
到这儿,基本上所有的代码都写的差不多了,当然我们还需要再rabbitmq控制平台上分别建一个普通交换机和一个死信交换机,一个普通队列和一个私信队列,然后分别绑定就可以了。
注意的是,普通交换机也需要在平台上配置一次死信队列和死信路由:
到这儿,如果没有什么问题的话基本上已经可以直接运行了,所以我的这篇文章到这儿基本上也已经结束了,如果你有什么问题,可以评论区留言,我们相互学习~
相关文章:

RabbitMQ实现延迟消息发送——实战篇
在项目中,我们经常需要使用消息队列来实现延迟任务,本篇文章就向各位介绍使用RabbitMQ如何实现延迟消息发送,由于是实战篇,所以不会讲太多理论的知识,还不太理解的可以先看看MQ的延迟消息的一个实现原理再来看这篇文章…...
Oracle 拉链式merge sort join 原理
Oracle 拉链式Merge Sort Join 的原理,我用一个生活中的比喻来解释。 --- 比喻场景:匹配快递包裹和收件人 1. 快递包裹清单 想象我们有一个快递公司送货的包裹清单,清单按照收件人的邮编(ZIP Code)排序: …...
QModbusTCPClient占用内存持续增长
最近使用QModbusTCPClient通信,需要频繁发送读写请求,发现软件占用内存一直在增减,经过不断咨询和尝试,终于解决了。 1.方案一(失败) 最开始以为是访问太频繁,导致创建reply的对象比delete re…...
代码中使用 Iterable<T> 作为方法参数的解释
/*** 根据课程 id 集合查询课程简单信息* param ids id 集合* return 课程简单信息的列表*/ GetMapping("/courses/simpleInfo/list") List<CourseSimpleInfoDTO> getSimpleInfoList(RequestParam("ids") Iterable<Long> ids); 一、代码解释&…...
Oracle数据库传统审计怎么用
Oracle数据库传统审计怎么用 审计功能开启与关闭By Session还是By AccessWhenever Successful数据库语句审计数据库对象审计查看审计策略和记录Oracle数据库审计功能分为传统审计(Traditional Auditing)和统一审计(Unified Auditing)。统一审计是从Oracle 12c版本开始引入的…...
leetcode-买卖股票问题
309. 买卖股票的最佳时机含冷冻期 - 力扣(LeetCode) 动态规划解题思路: 1、暴力递归(难点如何定义递归函数) 2、记忆化搜索-傻缓存法(根据暴力递归可变参数确定缓存数组维度) 3、严格表结构依…...

MYSQL学习笔记(三):分组、排序、分页查询
前言: 学习和使用数据库可以说是程序员必须具备能力,这里将更新关于MYSQL的使用讲解,大概应该会更新30篇,涵盖入门、进阶、高级(一些原理分析);这一篇是讲解分组、排序、分页查询,并且结合案例进行讲解;虽…...

上位机工作感想-2024年工作总结和来年计划
随着工作年限的增增长,发现自己越来越不喜欢在博客里面写一些掺杂自己感想的东西了,或许是逐渐被工作逼得“成熟”了吧。2024年,学到了很多东西,做了很多项目,也帮别人解决了很多问题,唯独没有涨工资。来这…...
【视觉惯性SLAM:十六、 ORB-SLAM3 中的多地图系统】
16.1 多地图的基本概念 多地图系统是机器人和计算机视觉领域中的一种关键技术,尤其在 SLAM 系统中具有重要意义。单一地图通常用于表示机器人或相机在环境中的位置和构建的空间结构,但单一地图在以下情况下可能无法满足需求: 大规模场景建图…...

【C++笔记】红黑树封装map和set深度剖析
【C笔记】红黑树封装map和set深度剖析 🔥个人主页:大白的编程日记 🔥专栏:C笔记 文章目录 【C笔记】红黑树封装map和set深度剖析前言一. 源码及框架分析1.1 源码框架分析 二. 模拟实现map和set2.1封装map和set 三.迭代器3.1思路…...

4.若依 BaseController
若依的BaseController是其他所有Controller的基类,一起来看下BaseController定义了什么 1. 定义请求返回内容的格式 code/msg/data 返回数据格式不是必须是AjaxResult,开发者可以自定义返回格式,注意与前端取值方式一致即可。 2. 获取调用…...
vue项目配置多语言
本文详细介绍如何在 Vue 项目中集成 vue-i18n 和 Element-UI ,实现多语言切换;首先通过 npm 安装 vue-i18n 和相关语言包,接着在配置文件中设置中文和英文的语言信息;最后在 main.js 中导入并挂载多语言实例,实现切换地…...
数据可视化大屏设计与实现
本文将带你一步步了解如何使用 ECharts 实现一个数据可视化大屏,并且如何动态加载天气数据展示。通过整合 HTML、CSS、JavaScript 以及后端接口请求,我们可以构建一个响应式的数据可视化页面。 1. 页面结构介绍 在此例中,整个页面分为几个主…...

PDF文件提取开源工具调研总结
概述 PDF是一种日常工作中广泛使用的跨平台文档格式,常常包含丰富的内容:包括文本、图表、表格、公式、图像。在现代信息处理工作流中发挥了重要的作用,尤其是RAG项目中,通过将非结构化数据转化为结构化和可访问的信息࿰…...

多监控m3u8视频流,怎么获取每个监控的封面图(纯前端)
文章目录 1.背景2.问题分析3.解决方案3.1解决思路3.2解决过程3.2.1 封装播放组件3.2.2 隐形的视频div3.2.3 截取封面图 3.3 结束 1.背景 有这样一个需求: 给你一个监控列表,每页展示多个监控(至少12个,m3u8格式)&…...

【机器学习实战入门项目】使用深度学习创建您自己的表情符号
深度学习项目入门——让你更接近数据科学的梦想 表情符号或头像是表示非语言暗示的方式。这些暗示已成为在线聊天、产品评论、品牌情感等的重要组成部分。这也促使数据科学领域越来越多的研究致力于表情驱动的故事讲述。 随着计算机视觉和深度学习的进步,现在可以…...
技术洞察:C++在后端开发中的前沿趋势与社会影响
文章目录 引言C在后端开发中的前沿趋势1. 高性能计算的需求2. 微服务架构的兴起3. 跨平台开发的便利性 跨领域技术融合与创新实践1. C与人工智能的结合2. C与区块链技术的融合 C对社会与人文的影响1. 提升生产力与创新能力2. 促进技术教育与人才培养3. 技术与人文的深度融合 结…...

【人工智能 | 大数据】基于人工智能的大数据分析方法
【作者主页】Francek Chen 【专栏介绍】 ⌈ ⌈ ⌈智能大数据分析 ⌋ ⌋ ⌋ 智能大数据分析是指利用先进的技术和算法对大规模数据进行深入分析和挖掘,以提取有价值的信息和洞察。它结合了大数据技术、人工智能(AI)、机器学习(ML&a…...

数字经济时代下的创新探索与实践:以“开源AI智能名片2+1链动模式S2B2C商城小程序源码”为核心
摘要:在数字经济蓬勃发展的今天,中国作为全球数字经济的领航者,正以前所未有的速度推进“数字中国”建设。本文旨在探讨“开源AI智能名片21链动模式S2B2C商城小程序源码”在数字经济背景下的应用潜力与实践价值,从多个维度分析其对…...
【English-Book】Go in Action目录页翻译中文
第8页 内容 前言 xi 序言 xiii 致谢 xiv 关于本书 xvi 关于封面插图 xix 1 介绍 Go 1 1.1 用 Go 解决现代编程挑战 2 开发速度 3 • 并发 3 • Go 的类型系统 5 内存管理 7 1.2 你好,Go 7 介绍 Go 玩具 8 1.3 总结 8 2 Go 快速入门 9 2.1 程序架构 10 2.2 主包 …...
变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析
一、变量声明设计:let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性,这种设计体现了语言的核心哲学。以下是深度解析: 1.1 设计理念剖析 安全优先原则:默认不可变强制开发者明确声明意图 let x 5; …...

微软PowerBI考试 PL300-选择 Power BI 模型框架【附练习数据】
微软PowerBI考试 PL300-选择 Power BI 模型框架 20 多年来,Microsoft 持续对企业商业智能 (BI) 进行大量投资。 Azure Analysis Services (AAS) 和 SQL Server Analysis Services (SSAS) 基于无数企业使用的成熟的 BI 数据建模技术。 同样的技术也是 Power BI 数据…...
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする 1、前言(1)情况说明(2)工程师的信仰2、知识点(1) にする1,接续:名词+にする2,接续:疑问词+にする3,(A)は(B)にする。(2)復習:(1)复习句子(2)ために & ように(3)そう(4)にする3、…...
Java如何权衡是使用无序的数组还是有序的数组
在 Java 中,选择有序数组还是无序数组取决于具体场景的性能需求与操作特点。以下是关键权衡因素及决策指南: ⚖️ 核心权衡维度 维度有序数组无序数组查询性能二分查找 O(log n) ✅线性扫描 O(n) ❌插入/删除需移位维护顺序 O(n) ❌直接操作尾部 O(1) ✅内存开销与无序数组相…...

前端开发面试题总结-JavaScript篇(一)
文章目录 JavaScript高频问答一、作用域与闭包1.什么是闭包(Closure)?闭包有什么应用场景和潜在问题?2.解释 JavaScript 的作用域链(Scope Chain) 二、原型与继承3.原型链是什么?如何实现继承&a…...

selenium学习实战【Python爬虫】
selenium学习实战【Python爬虫】 文章目录 selenium学习实战【Python爬虫】一、声明二、学习目标三、安装依赖3.1 安装selenium库3.2 安装浏览器驱动3.2.1 查看Edge版本3.2.2 驱动安装 四、代码讲解4.1 配置浏览器4.2 加载更多4.3 寻找内容4.4 完整代码 五、报告文件爬取5.1 提…...

回溯算法学习
一、电话号码的字母组合 import java.util.ArrayList; import java.util.List;import javax.management.loading.PrivateClassLoader;public class letterCombinations {private static final String[] KEYPAD {"", //0"", //1"abc", //2"…...
C++.OpenGL (14/64)多光源(Multiple Lights)
多光源(Multiple Lights) 多光源渲染技术概览 #mermaid-svg-3L5e5gGn76TNh7Lq {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-3L5e5gGn76TNh7Lq .error-icon{fill:#552222;}#mermaid-svg-3L5e5gGn76TNh7Lq .erro…...

JVM 内存结构 详解
内存结构 运行时数据区: Java虚拟机在运行Java程序过程中管理的内存区域。 程序计数器: 线程私有,程序控制流的指示器,分支、循环、跳转、异常处理、线程恢复等基础功能都依赖这个计数器完成。 每个线程都有一个程序计数…...
scikit-learn机器学习
# 同时添加如下代码, 这样每次环境(kernel)启动的时候只要运行下方代码即可: # Also add the following code, # so that every time the environment (kernel) starts, # just run the following code: import sys sys.path.append(/home/aistudio/external-libraries)机…...