第十二章 RabbitMQ之失败消息处理策略
目录
一、引言
二、RepublishMessageRecoverer 实现
2.1. 实现步骤
2.2. 实现代码
2.2.1. 异常交换机队列回收期配置类
2.2.2. 常规交换机队列配置类
2.2.3. 消费者代码
2.2.4. 消费者yml配置
2.2.5. 生产者代码
2.2.6. 生产者yml配置
2.2.7. 运行效果
一、引言
Spring AMQP提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限地requeue到mq。我们可以通过在application.yaml文件中添加配置来开启重试机制:
spring:rabbitmq:host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhoulistener:simple:prefetch: 1acknowledge-mode: manual # none,关闭ack;manual,手动ack;auto:自动ack# 消费者重试机制配置retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初始的失败等待时长为1秒multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息(默认方式)
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(推荐)
二、RepublishMessageRecoverer 实现
在实际项目的生产环境中,通过 RepublishMessageRecoverer 方式我们可以定义一个异常队列和交换机,来接收其他交换机队列转发的无法处理的异常消息。然后我们可以查看其中的异常消息并进行人工处理。

2.1. 实现步骤
1. 将失败处理策略改为RepublishMessageRecoverer
2. 定义接收失败消息的交换机、队列及其绑定关系
3. 定义RepublishMessageRecoverer
2.2. 实现代码
2.2.1. 异常交换机队列回收期配置类
package com.example.consumer;import jakarta.annotation.Resource;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 异常交换机/队列/消息回收器配置类* ConditionalOnProperty 通过yml中的重试配置来选择该配置类是否启用*/
@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")
public class ErrorConfig {@Resourceprivate RabbitTemplate rabbitTemplate;@BeanQueue errorQueue() {return new Queue("error.queue");}@BeanDirectExchange errorExchange() {return new DirectExchange("error.direct");}@BeanBinding errorBind(Queue errorQueue, DirectExchange errorExchange) {return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");}@Beanpublic MessageRecoverer messageRecoverer() {return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}
2.2.2. 常规交换机队列配置类
package com.example.consumer;import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 常规的RabbitMQ 交换机/队列绑定配置类*/
@Configuration
public class RabbitMQConfig {@BeanQueue simpleQueue() {// 使用 QueueBuilder 创建一个持久化队列return QueueBuilder.durable("simple.queue").build();}
}
2.2.3. 消费者代码
package com.example.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消费者*/
@Slf4j
@Component
public class SimpleListener {@RabbitListener(queues = "simple.queue")public void listener1(String msg) throws Exception {
// System.out.println("消费者1:人生是个不断攀登的过程【" + msg + "】");throw new Exception();}
}
2.2.4. 消费者yml配置
# 消费者application.yml配置
spring:rabbitmq:host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhoulistener:simple:prefetch: 1acknowledge-mode: auto # none,关闭ack;manual,手动ack;auto:自动ack# 消费者重试机制配置retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初始的失败等待时长为1秒multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
2.2.5. 生产者代码
package com.example.publisher;import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;/*** 生产者*/
@Slf4j
@SpringBootTest
class PublisherApplicationTests {@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid test() {rabbitTemplate.convertAndSend("simple.queue", "只要学不死,就往死里学!");}
}
2.2.6. 生产者yml配置
# 生产者application.yml配置
spring:rabbitmq:# MQ连接配置host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhou
2.2.7. 运行效果
最终效果是,我们在消费者的代码逻辑中会抛出异常,消息在反复投递消费失败后被重新入列到我们定义的异常交换机队列中:


相关文章:
第十二章 RabbitMQ之失败消息处理策略
目录 一、引言 二、RepublishMessageRecoverer 实现 2.1. 实现步骤 2.2. 实现代码 2.2.1. 异常交换机队列回收期配置类 2.2.2. 常规交换机队列配置类 2.2.3. 消费者代码 2.2.4. 消费者yml配置 2.2.5. 生产者代码 2.2.6. 生产者yml配置 2.2.7. 运行效果 一、引言 …...
23年408数据结构
第一题: 解析: 第一点,我们要知道顺序存储的特点:优点就是随用随取,就是你想要查询第几个元素可以直接查询出来,时间复杂度就是O(1),缺点就是不适合删除和插入,因为每次删除和插入一…...
vue3ElementPlu表格合并多行
// 单元格合并逻辑 const objectSpanMethod ({ row, rowIndex, columnIndex }) > { const previousMachineModelUniqueId rowIndex > 0 ? tableData.value[rowIndex - 1].machineModel : null; const currentMachineModelUniqueId row.machineModel; // 合并“机型”…...
MySQL数据库 - 索引(上)
目录 1 简介 1.1 索引是什么 1.2 为什么要使用索引 2 索引应该选择哪种数据结构 2.1 HASH 2.2 二叉搜索树 2.3 N叉树(B树) 2.4 B树 3 MySQL的页 3.1 为什么要使用页 3.2 页文件头和页文件尾 3.3 页主体 3.4 页目录 4 B树在MySQL索引中的应…...
redis与springBoot整合
前提 要实现,使用Redis存储登录状态 需要一个完整的前端后端的项目 前端项目搭建 解压脚手架 安装依赖 配置请求代理 选做: 禁用EsLint语法检查 Vue Admin Template关闭eslint校验,lintOnSave:false设置无效解决办法_lintonsave: false-CSDN博客 …...
YoloV9改进策略:BackBone改进|CAFormer在YoloV9中的创新应用,显著提升目标检测性能
摘要 在目标检测领域,模型性能的提升一直是研究者和开发者们关注的重点。近期,我们尝试将CAFormer模块引入YoloV9模型中,以替换其原有的主干网络,这一创新性的改进带来了显著的性能提升。 CAFormer,作为MetaFormer框架下的一个变体,结合了深度可分离卷积和普通自注意力…...
消防应急物资仓库管理系统
集驰电子消防装备仓库管理系统(DW-S302系统)是一套成熟系统,依托3D技术、大数据、RFID技术、数据库技术、对装备器材进行统一管理,以RFID射频识别技术为核心,构建以物资综合管理为基础,智能分析定位为主要特色功能的装备器材库综合…...
【论文阅读】Semi-Supervised Few-shot Learning via Multi-Factor Clustering
通过多因素聚类的半监督小样本学习 引用:Ling J, Liao L, Yang M, et al. Semi-supervised few-shot learning via multi-factor clustering[C]//Proceedings of the IEEE/CVF Conference on Computer Vision and Pattern Recognition. 2022: 14564-14573. 论文地址…...
第十三章 RabbitMQ之消息幂等性
目录 一、引言 二、消息幂等解决方案 2.1. 方案一 2.2. 方案二 一、引言 幂等是一个数学概念,用函数表达式来描述是这样的:f(x) f(f(x)) 。在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。有些业务…...
tpcms-master.zip
网盘:https://pan.notestore.cn/s.html?id34https://pan.notestore.cn/s.html?id34...
Spring国际化和Validation
SpringBoot国际化和Validation融合 场景 在应用交互时,可能需要根据客户端得语言来返回不同的语言数据。前端通过参数、请求头等往后端传入locale相关得参数,后端获取参数,根据不同得locale来获取不同得语言得文本信息返回给前端。 实现原…...
②EtherCAT转ModbusTCP, EtherCAT/Ethernet/IP/Profinet/ModbusTCP协议互转工业串口网关
EtherCAT/Ethernet/IP/Profinet/ModbusTCP协议互转工业串口网关https://item.taobao.com/item.htm?ftt&id822721028899 协议转换通信网关 EtherCAT 转 Modbus TCP (接上一章) GW系列型号 配置说明 上载 网线连接电脑到模块上的 WEB 网页设置网口&#…...
【华为HCIP实战课程八】OSPF网络类型及报文类型详解,网络工程师
一、点到点网络类型 1、两台路由器 2、支持广播、组播 P2P(PPP、HDLC、帧中继子接口) 我们需要三个维度考虑 A、是否自动通过组播发现邻居 B、时间(Hello和Dead) C、DR和BDR----多点接入网络需要用到(广播和NBMA) 点到点是组播自动发现邻居,Hello 10S,Dead 40S…...
信息安全工程师(28)机房安全分析与防护
前言 机房安全分析与防护是一个复杂而细致的过程,涉及到物理安全、环境控制、电力供应、数据安全、设备管理、人员管理以及紧急预案等多个方面。 一、机房安全分析 1. 物理安全威胁 非法入侵:未经授权的人员可能通过门窗、通风口等进入机房,…...
大数据处理从零开始————9.MapReduce编程实践之信息过滤之学生成绩统计demo
1.项目目标 1.1 需求概述 现在我们要统计某学校学生的成绩信息,筛选出成绩在60分及以上的学生。 1.2 业务分析 如果我们想实现该需求,可以通过编写一个MapReduce程序,来处理包含学生信息的文本文件,每行包含【学生的姓名&#x…...
自动化测试 | 窗口截图
driver.get_screenshot_as_file 是 Selenium WebDriver 的一个方法,它允许你将当前浏览器窗口(或标签页)的截图保存为文件。这个方法对于自动化测试中的截图验证非常有用,因为它可以帮助你捕获测试执行过程中的页面状态。 以下是…...
初中数学网上考试系统的设计与实现(论文+源码)_kaic
初中数学网上考试系统的设计与实现 学生: 指导教师: 摘 要:科技在人类的历史长流中愈洗愈精,不仅包括人们日常的生活起居,甚至还包括了考试的变化。之前的考试需要大量的时间和精力,组织者还需要挑选并考查…...
关系运算(3)
关系代数 昨天讲完附加关系代数运算,今天讲扩展关系代数运算。 扩展代数运算 正如其名,这种运算定义了前面基本和附加都没有的运算。 去重运算 可以将关系R中跟查询条件相关但是形成了重复的元组去除,只保留查询结果(简洁&…...
tp6的系统是如何上架的
TP6(ThinkPHP6)的系统上架过程,通常指的是将基于ThinkPHP6框架开发的应用程序部署到生产环境,并使其可以通过互联网访问。以下是一个大致的上架流程,包括准备工作、部署步骤以及后续维护等方面: 一、准备工…...
Vue:开发小技巧
目录 1. Table表格偏移 1. Table表格偏移 通过设置自小的宽度进行控制 :min-width <el-table-column label"操作" align"center" class-name"small-padding fixed-width" fixed"right" min-width"150px"><templa…...
多云管理“拦路虎”:深入解析网络互联、身份同步与成本可视化的技术复杂度
一、引言:多云环境的技术复杂性本质 企业采用多云策略已从技术选型升维至生存刚需。当业务系统分散部署在多个云平台时,基础设施的技术债呈现指数级积累。网络连接、身份认证、成本管理这三大核心挑战相互嵌套:跨云网络构建数据…...
高危文件识别的常用算法:原理、应用与企业场景
高危文件识别的常用算法:原理、应用与企业场景 高危文件识别旨在检测可能导致安全威胁的文件,如包含恶意代码、敏感数据或欺诈内容的文档,在企业协同办公环境中(如Teams、Google Workspace)尤为重要。结合大模型技术&…...
TRS收益互换:跨境资本流动的金融创新工具与系统化解决方案
一、TRS收益互换的本质与业务逻辑 (一)概念解析 TRS(Total Return Swap)收益互换是一种金融衍生工具,指交易双方约定在未来一定期限内,基于特定资产或指数的表现进行现金流交换的协议。其核心特征包括&am…...
根目录0xa0属性对应的Ntfs!_SCB中的FileObject是什么时候被建立的----NTFS源代码分析--重要
根目录0xa0属性对应的Ntfs!_SCB中的FileObject是什么时候被建立的 第一部分: 0: kd> g Breakpoint 9 hit Ntfs!ReadIndexBuffer: f7173886 55 push ebp 0: kd> kc # 00 Ntfs!ReadIndexBuffer 01 Ntfs!FindFirstIndexEntry 02 Ntfs!NtfsUpda…...
高考志愿填报管理系统---开发介绍
高考志愿填报管理系统是一款专为教育机构、学校和教师设计的学生信息管理和志愿填报辅助平台。系统基于Django框架开发,采用现代化的Web技术,为教育工作者提供高效、安全、便捷的学生管理解决方案。 ## 📋 系统概述 ### 🎯 系统定…...
数据结构第5章:树和二叉树完全指南(自整理详细图文笔记)
名人说:莫道桑榆晚,为霞尚满天。——刘禹锡(刘梦得,诗豪) 原创笔记:Code_流苏(CSDN)(一个喜欢古诗词和编程的Coder😊) 上一篇:《数据结构第4章 数组和广义表》…...
作为点的对象CenterNet论文阅读
摘要 检测器将图像中的物体表示为轴对齐的边界框。大多数成功的目标检测方法都会枚举几乎完整的潜在目标位置列表,并对每一个位置进行分类。这种做法既浪费又低效,并且需要额外的后处理。在本文中,我们采取了不同的方法。我们将物体建模为单…...
ai流式文字返回前端和php的处理办法
PHP后端 php端主要是用到ob_flush和flush,头改为流式。 基本代码 代码如下: <?php header(Content-Type:text/event-stream); header(Cache-Control:no-cache); header(Connection:keep-alive);function streamPostRequest($url,$data){$chcurl_…...
2. Web网络基础 - 协议端口
深入解析协议端口与netstat命令:网络工程师的实战指南 在网络通信中,协议端口是服务访问的门户。本文将全面解析端口概念,并通过netstat命令实战演示如何监控网络连接状态。 一、协议端口核心知识解析 1. 端口号的本质与分类 端口范围类型说…...
JSON解析崩溃原因及解决方案
问题记录: /************************************************| * 描述: 将ID124执行NFC操作-JSON解析为结构体* 函数名: cJSON_ID124_to_struct* 参数[ I]: *json_string 待解析的指针* 参数[II]: *wireless_rxd 结构体指针* 返回: 成功返回0 失…...
