RabbitMQ实现延迟消息发送——实战篇
在项目中,我们经常需要使用消息队列来实现延迟任务,本篇文章就向各位介绍使用RabbitMQ如何实现延迟消息发送,由于是实战篇,所以不会讲太多理论的知识,还不太理解的可以先看看MQ的延迟消息的一个实现原理再来看这篇文章跟着练手哦~
需求背景
我这儿的一个需求背景大概是干部添加完活动后,由管理员进行审批,审批通过后,该活动id连同设置的过期时间会被放入消息队列中,等到活动结束时间到的时候,自动将活动的状态设置为已完成,这里华丽一个活动图,各位参考一下。
需求了解完之后我们就可以开始的写代码啦~(手动微笑)
相关知识点拓展
这里还是简单提一下MQ实现延迟队列的一个方法,一种是用插件,还有一种是使用死信队列,当然本文我们使用的就是通过死信队列来实现的。
当我们的一个正常消息因为设置了过期时间或者被消费者拒绝消费的时候,这条消息就会被放入死信队列中,然后死信队列再进行消费。
然后啰嗦一下,说一下MQ的交换机类型,以及死信交换机一般选用哪种:
1. Direct Exchange(直连交换机)
- 特点:
- 根据消息的 Routing Key 精确匹配队列的 Binding Key。
- 完全匹配时,消息才会被路由到对应的队列。
- 适用场景:
- 点对点消息传递,消息需要精确路由到特定队列。
- 示例:
- 消息的 Routing Key 为
order.created
,队列的 Binding Key 也为order.created
,则消息会被路由到该队列。
2. Fanout Exchange(扇出交换机)
- 特点:
- 将消息广播到所有绑定到该交换机的队列,忽略 Routing Key。
- 适用场景:
- 广播消息,消息需要发送到多个队列。
- 示例:
- 消息发送到 Fanout Exchange,所有绑定到该交换机的队列都会收到消息。
3. Topic Exchange(主题交换机)
- 特点:
- 根据消息的 Routing Key 和队列的 Binding Key 进行模式匹配。
- Binding Key 支持通配符:
*
:匹配一个单词。#
:匹配零个或多个单词。- 适用场景:
- 消息需要根据模式路由到多个队列。
- 示例:
- 消息的 Routing Key 为
order.created.us
,队列的 Binding Key 为order.created.*
,则消息会被路由到该队列。
4. Headers Exchange(头交换机)
- 特点:
- 根据消息的 Headers(键值对)匹配队列的 Binding Arguments。
- 忽略 Routing Key。
- 适用场景:
- 消息需要根据复杂的条件路由到队列。
- 示例:
- 消息的 Headers 包含
type=order
和region=us
,队列的 Binding Arguments 要求x-match=all
且type=order
,则消息会被路由到该队列。
5. Default Exchange(默认交换机)
- 特点:
- RabbitMQ 默认创建的交换机,类型为 Direct Exchange。
- 每个队列都会自动绑定到默认交换机,Binding Key 为队列名称。
- 适用场景:
- 默认情况下,消息可以直接发送到队列。
死信交换机适合使用哪种类型?
死信交换机(DLX, Dead Letter Exchange)的类型选择取决于你的业务需求。以下是常见的选择:
1. Direct Exchange
- 适用场景:
- 死信消息需要精确路由到特定的死信队列。
- 示例:
- 将死信消息路由到
dlx-queue
,用于统一处理所有死信消息。2. Topic Exchange
- 适用场景:
- 死信消息需要根据不同的 Routing Key 路由到不同的死信队列。
- 示例:
- 将死信消息根据业务类型(如
order.dead
、payment.dead
)路由到不同的死信队列。3. Fanout Exchange
- 适用场景:
- 死信消息需要广播到多个死信队列。
- 示例:
- 将死信消息同时发送到日志队列和报警队列。
推荐选择
- 大多数情况下,死信交换机使用 Direct Exchange,因为死信消息通常需要精确路由到一个死信队列,用于统一处理。
- 如果死信消息需要根据不同的条件路由到多个队列,可以使用 Topic Exchange。
代码部分
首先,我们需要定义一个死信交换机和死信队列,用来接收来自普通队列的消息。
// 创建死信交换机,处理延迟消息通知@Bean("dead_letter_exchange")public DirectExchange delayExchange(){return new DirectExchange("dead_letter_exchange",true,false);}
// 创建死信队列public Queue deadLetterQueue(){Queue queue = new Queue("dead_letter_queue", true);rabbitAdmin.declareQueue(queue);log.info("死信队列声明成功:" + queue.getName());return queue; }
然后,我们需要配置一个普通的消息队列和一个普通的交换机,这个消息队列需要设置对应的死信交换机和死信路由,同时我们这个普通队列需要接收一个过期时间,保证一到过期时间消息就会被发送到死信队列当中。
// 创建一个普通队列,接受一个过期时间,出列活动结束后,发送到死信队列public Queue normalQueue(Long expireTime){Map<String,Object> args = new HashMap<>();if (expireTime != null && expireTime > 0) { // 确保 TTL 是正数args.put("x-message-ttl", expireTime);}// 设置死信交换机args.put("x-dead-letter-exchange",deadLetterExchange);// 设置死信路由键args.put("x-dead-letter-routing-key","dead_letter_routing_key");Queue queue = new Queue("normal_queue", true, false, false, args);log.info("普通队列声明成功:" + queue.getName());return queue; }
// 创建一个普通交换机,处理活动结束自动设置活动状态为结束@Bean("activity_end_exchange")public DirectExchange activityEndExchange(){return new DirectExchange("activity_end_exchange");}
然后我们需要分别将死信交换机和死信队列,普通交换机和普通队列分别进行绑定。
// 将死信队列和死信交换机进行绑定public void bindDeadLetterRouting(){Queue queue=queueDeclareConfig.deadLetterQueue();Binding binding = BindingBuilder.bind(queue).to(deadLetterExchange).with("dead_letter_routing_key");rabbitAdmin.declareBinding(binding);log.info("死信队列绑定成功,死信队列名称----》" + queue.getName() + ",死信交换机名称----》" + deadLetterExchange.getName());}// 绑定活动结束交换机和普通队列public void bindActivityEndRouting(Long expireTime) {Queue queue = queueDeclareConfig.normalQueue(expireTime);Binding binding = BindingBuilder.bind(queue).to(activityEndExchange).with("activity_end_routing_key");rabbitAdmin.declareBinding(binding);}
当然,我们还需要配置生产者来发送消息到交换机里面
//活动结束后,发送消息到死信队列,自动设置活动结束状态public void sendActivityEndMessage(Long expireTime, Integer activityId) {rabbitMQBindRoutingConfig.bindDeadLetterRouting();rabbitMQBindRoutingConfig.bindActivityEndRouting(expireTime);try {// 将消息发送到普通队列,等待消息过期发送到死信交换机rabbitTemplate.convertAndSend("activity_end_exchange", "activity_end_routing_key", activityId, msg -> {msg.getMessageProperties().setExpiration(expireTime.toString());return msg;});} catch (Exception e) {log.error("发送消息失败------->" + activityId);throw new RuntimeException("发送消息失败---->" + activityId);}}
这里生产者的代码可以根据你的业务逻辑具体进行更改~
消费者逻辑也需要进行编写一下
// 使用MQ延迟队列,活动结束,修改活动状态@RabbitListener(queues = "dead_letter_queue")public void updatePlaceOccupyStatus(Message message, Channel channel){try {String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);Integer activityId = Integer.parseInt(messageBody);ActivityInfo activityInfo = baseMapper.selectById(activityId);LambdaUpdateWrapper<ActivityInfo> wrapper = new LambdaUpdateWrapper<>();wrapper.eq(ActivityInfo::getActivityId,activityId).set(ActivityInfo::getProgress,StatusConstant.FINISH);if(baseMapper.update(activityInfo,wrapper)>0){channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}} catch (Exception e) {log.error("处理消息时发生错误:" + e.getMessage());try {channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);} catch (IOException ioException) {ioException.printStackTrace();}}
消费者这边需要注意的是如果你选择的提交类型不是自动提交的话,在处理完消息之后需要手动ack一下消息,不然消费的消息不会被认为已经消费,从而导致消息积压,也会在之后的消费中重复进行消费,因此你需要告诉生产者这条消息已经被消费了。
当然,如果在消费的过程中出现了什么问题,可以设置以下这行代码:
basicNack方法接收三个参数:
deliveryTag: 消息的标识符。
multiple: 是否对多个消息进行否定确认。
requeue: 是否将消息重新放入队列。
可以根据你的需求进行设定~
然后的然后,我们需要再application.yml当中进行配置相关信息:
rabbitmq:host: localhostport: 5672username: guestpassword: guest
# 确认消息发送到交换机上publisher-confirm-type: correlated# 消息发送到队列确认,失败回调publisher-returns: truelistener:direct:acknowledge-mode: manualretry:enabled: true
# 重试的时间间隔为1sinitial-interval: 1000ms
# 最大重试3次max-attempts: 3
# 最大的重试时间间隔为2smax-interval: 2000ms
# 每次重试时间间隔为1s,每次重试时间间隔倍数multiplier: 1.0#重试次数超过上面的设置之后是否丢弃(false不丢弃时需要写相应代码将该消息加入死信队列)default-requeue-rejected: falsesimple:default-requeue-rejected: falseacknowledge-mode: manual
# 最小消费者数量concurrency: 1
# 最大消费者数量max-concurrency: 10retry:enabled: trueinitial-interval: 1000msmax-attempts: 3max-interval: 2000msmultiplier: 1.0
上面给出了一个比较全的配置,你可以根据你的需求进行选择,但是需要注意的是default-requeue-rejected: false这一行配置一定要先配置,不然你的消息在普通队列中过期了,是不会发送到死信队列当中进行消费的~
到这儿,基本上所有的代码都写的差不多了,当然我们还需要再rabbitmq控制平台上分别建一个普通交换机和一个死信交换机,一个普通队列和一个私信队列,然后分别绑定就可以了。
注意的是,普通交换机也需要在平台上配置一次死信队列和死信路由:
到这儿,如果没有什么问题的话基本上已经可以直接运行了,所以我的这篇文章到这儿基本上也已经结束了,如果你有什么问题,可以评论区留言,我们相互学习~
相关文章:

RabbitMQ实现延迟消息发送——实战篇
在项目中,我们经常需要使用消息队列来实现延迟任务,本篇文章就向各位介绍使用RabbitMQ如何实现延迟消息发送,由于是实战篇,所以不会讲太多理论的知识,还不太理解的可以先看看MQ的延迟消息的一个实现原理再来看这篇文章…...
Oracle 拉链式merge sort join 原理
Oracle 拉链式Merge Sort Join 的原理,我用一个生活中的比喻来解释。 --- 比喻场景:匹配快递包裹和收件人 1. 快递包裹清单 想象我们有一个快递公司送货的包裹清单,清单按照收件人的邮编(ZIP Code)排序: …...
QModbusTCPClient占用内存持续增长
最近使用QModbusTCPClient通信,需要频繁发送读写请求,发现软件占用内存一直在增减,经过不断咨询和尝试,终于解决了。 1.方案一(失败) 最开始以为是访问太频繁,导致创建reply的对象比delete re…...
代码中使用 Iterable<T> 作为方法参数的解释
/*** 根据课程 id 集合查询课程简单信息* param ids id 集合* return 课程简单信息的列表*/ GetMapping("/courses/simpleInfo/list") List<CourseSimpleInfoDTO> getSimpleInfoList(RequestParam("ids") Iterable<Long> ids); 一、代码解释&…...
Oracle数据库传统审计怎么用
Oracle数据库传统审计怎么用 审计功能开启与关闭By Session还是By AccessWhenever Successful数据库语句审计数据库对象审计查看审计策略和记录Oracle数据库审计功能分为传统审计(Traditional Auditing)和统一审计(Unified Auditing)。统一审计是从Oracle 12c版本开始引入的…...
leetcode-买卖股票问题
309. 买卖股票的最佳时机含冷冻期 - 力扣(LeetCode) 动态规划解题思路: 1、暴力递归(难点如何定义递归函数) 2、记忆化搜索-傻缓存法(根据暴力递归可变参数确定缓存数组维度) 3、严格表结构依…...

MYSQL学习笔记(三):分组、排序、分页查询
前言: 学习和使用数据库可以说是程序员必须具备能力,这里将更新关于MYSQL的使用讲解,大概应该会更新30篇,涵盖入门、进阶、高级(一些原理分析);这一篇是讲解分组、排序、分页查询,并且结合案例进行讲解;虽…...

上位机工作感想-2024年工作总结和来年计划
随着工作年限的增增长,发现自己越来越不喜欢在博客里面写一些掺杂自己感想的东西了,或许是逐渐被工作逼得“成熟”了吧。2024年,学到了很多东西,做了很多项目,也帮别人解决了很多问题,唯独没有涨工资。来这…...
【视觉惯性SLAM:十六、 ORB-SLAM3 中的多地图系统】
16.1 多地图的基本概念 多地图系统是机器人和计算机视觉领域中的一种关键技术,尤其在 SLAM 系统中具有重要意义。单一地图通常用于表示机器人或相机在环境中的位置和构建的空间结构,但单一地图在以下情况下可能无法满足需求: 大规模场景建图…...

【C++笔记】红黑树封装map和set深度剖析
【C笔记】红黑树封装map和set深度剖析 🔥个人主页:大白的编程日记 🔥专栏:C笔记 文章目录 【C笔记】红黑树封装map和set深度剖析前言一. 源码及框架分析1.1 源码框架分析 二. 模拟实现map和set2.1封装map和set 三.迭代器3.1思路…...

4.若依 BaseController
若依的BaseController是其他所有Controller的基类,一起来看下BaseController定义了什么 1. 定义请求返回内容的格式 code/msg/data 返回数据格式不是必须是AjaxResult,开发者可以自定义返回格式,注意与前端取值方式一致即可。 2. 获取调用…...
vue项目配置多语言
本文详细介绍如何在 Vue 项目中集成 vue-i18n 和 Element-UI ,实现多语言切换;首先通过 npm 安装 vue-i18n 和相关语言包,接着在配置文件中设置中文和英文的语言信息;最后在 main.js 中导入并挂载多语言实例,实现切换地…...
数据可视化大屏设计与实现
本文将带你一步步了解如何使用 ECharts 实现一个数据可视化大屏,并且如何动态加载天气数据展示。通过整合 HTML、CSS、JavaScript 以及后端接口请求,我们可以构建一个响应式的数据可视化页面。 1. 页面结构介绍 在此例中,整个页面分为几个主…...

PDF文件提取开源工具调研总结
概述 PDF是一种日常工作中广泛使用的跨平台文档格式,常常包含丰富的内容:包括文本、图表、表格、公式、图像。在现代信息处理工作流中发挥了重要的作用,尤其是RAG项目中,通过将非结构化数据转化为结构化和可访问的信息࿰…...

多监控m3u8视频流,怎么获取每个监控的封面图(纯前端)
文章目录 1.背景2.问题分析3.解决方案3.1解决思路3.2解决过程3.2.1 封装播放组件3.2.2 隐形的视频div3.2.3 截取封面图 3.3 结束 1.背景 有这样一个需求: 给你一个监控列表,每页展示多个监控(至少12个,m3u8格式)&…...

【机器学习实战入门项目】使用深度学习创建您自己的表情符号
深度学习项目入门——让你更接近数据科学的梦想 表情符号或头像是表示非语言暗示的方式。这些暗示已成为在线聊天、产品评论、品牌情感等的重要组成部分。这也促使数据科学领域越来越多的研究致力于表情驱动的故事讲述。 随着计算机视觉和深度学习的进步,现在可以…...
技术洞察:C++在后端开发中的前沿趋势与社会影响
文章目录 引言C在后端开发中的前沿趋势1. 高性能计算的需求2. 微服务架构的兴起3. 跨平台开发的便利性 跨领域技术融合与创新实践1. C与人工智能的结合2. C与区块链技术的融合 C对社会与人文的影响1. 提升生产力与创新能力2. 促进技术教育与人才培养3. 技术与人文的深度融合 结…...

【人工智能 | 大数据】基于人工智能的大数据分析方法
【作者主页】Francek Chen 【专栏介绍】 ⌈ ⌈ ⌈智能大数据分析 ⌋ ⌋ ⌋ 智能大数据分析是指利用先进的技术和算法对大规模数据进行深入分析和挖掘,以提取有价值的信息和洞察。它结合了大数据技术、人工智能(AI)、机器学习(ML&a…...

数字经济时代下的创新探索与实践:以“开源AI智能名片2+1链动模式S2B2C商城小程序源码”为核心
摘要:在数字经济蓬勃发展的今天,中国作为全球数字经济的领航者,正以前所未有的速度推进“数字中国”建设。本文旨在探讨“开源AI智能名片21链动模式S2B2C商城小程序源码”在数字经济背景下的应用潜力与实践价值,从多个维度分析其对…...
【English-Book】Go in Action目录页翻译中文
第8页 内容 前言 xi 序言 xiii 致谢 xiv 关于本书 xvi 关于封面插图 xix 1 介绍 Go 1 1.1 用 Go 解决现代编程挑战 2 开发速度 3 • 并发 3 • Go 的类型系统 5 内存管理 7 1.2 你好,Go 7 介绍 Go 玩具 8 1.3 总结 8 2 Go 快速入门 9 2.1 程序架构 10 2.2 主包 …...
Caliper 配置文件解析:config.yaml
Caliper 是一个区块链性能基准测试工具,用于评估不同区块链平台的性能。下面我将详细解释你提供的 fisco-bcos.json 文件结构,并说明它与 config.yaml 文件的关系。 fisco-bcos.json 文件解析 这个文件是针对 FISCO-BCOS 区块链网络的 Caliper 配置文件,主要包含以下几个部…...

Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习)
Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习) 一、Aspose.PDF 简介二、说明(⚠️仅供学习与研究使用)三、技术流程总览四、准备工作1. 下载 Jar 包2. Maven 项目依赖配置 五、字节码修改实现代码&#…...

C++_哈希表
本篇文章是对C学习的哈希表部分的学习分享 相信一定会对你有所帮助~ 那咱们废话不多说,直接开始吧! 一、基础概念 1. 哈希核心思想: 哈希函数的作用:通过此函数建立一个Key与存储位置之间的映射关系。理想目标:实现…...
OCR MLLM Evaluation
为什么需要评测体系?——背景与矛盾 能干的事: 看清楚发票、身份证上的字(准确率>90%),速度飞快(眨眼间完成)。干不了的事: 碰到复杂表格(合并单元…...

Windows电脑能装鸿蒙吗_Windows电脑体验鸿蒙电脑操作系统教程
鸿蒙电脑版操作系统来了,很多小伙伴想体验鸿蒙电脑版操作系统,可惜,鸿蒙系统并不支持你正在使用的传统的电脑来安装。不过可以通过可以使用华为官方提供的虚拟机,来体验大家心心念念的鸿蒙系统啦!注意:虚拟…...

聚六亚甲基单胍盐酸盐市场深度解析:现状、挑战与机遇
根据 QYResearch 发布的市场报告显示,全球市场规模预计在 2031 年达到 9848 万美元,2025 - 2031 年期间年复合增长率(CAGR)为 3.7%。在竞争格局上,市场集中度较高,2024 年全球前十强厂商占据约 74.0% 的市场…...
2025.6.9总结(利与弊)
凡事都有两面性。在大厂上班也不例外。今天找开发定位问题,从一个接口人不断溯源到另一个 接口人。有时候,不知道是谁的责任填。将工作内容分的很细,每个人负责其中的一小块。我清楚的意识到,自己就是个可以随时替换的螺丝钉&…...

react菜单,动态绑定点击事件,菜单分离出去单独的js文件,Ant框架
1、菜单文件treeTop.js // 顶部菜单 import { AppstoreOutlined, SettingOutlined } from ant-design/icons; // 定义菜单项数据 const treeTop [{label: Docker管理,key: 1,icon: <AppstoreOutlined />,url:"/docker/index"},{label: 权限管理,key: 2,icon:…...
stm32进入Infinite_Loop原因(因为有系统中断函数未自定义实现)
这是系统中断服务程序的默认处理汇编函数,如果我们没有定义实现某个中断函数,那么当stm32产生了该中断时,就会默认跑这里来了,所以我们打开了什么中断,一定要记得实现对应的系统中断函数,否则会进来一直循环…...

【技巧】dify前端源代码修改第一弹-增加tab页
回到目录 【技巧】dify前端源代码修改第一弹-增加tab页 尝试修改dify的前端源代码,在知识库增加一个tab页"HELLO WORLD",完成后的效果如下 [gif01] 1. 前端代码进入调试模式 参考 【部署】win10的wsl环境下启动dify的web前端服务 启动调试…...