当前位置: 首页 > news >正文

第十二章 RabbitMQ之失败消息处理策略

目录

一、引言

二、RepublishMessageRecoverer 实现

2.1. 实现步骤

2.2. 实现代码

2.2.1. 异常交换机队列回收期配置类

2.2.2. 常规交换机队列配置类 

2.2.3. 消费者代码

2.2.4. 消费者yml配置 

2.2.5. 生产者代码 

2.2.6. 生产者yml配置 

 2.2.7. 运行效果


一、引言

Spring AMQP提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限地requeue到mq。我们可以通过在application.yaml文件中添加配置来开启重试机制:

spring:rabbitmq:host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhoulistener:simple:prefetch: 1acknowledge-mode: manual # none,关闭ack;manual,手动ack;auto:自动ack# 消费者重试机制配置retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初始的失败等待时长为1秒multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息(默认方式)

ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(推荐)

二、RepublishMessageRecoverer 实现

在实际项目的生产环境中,通过 RepublishMessageRecoverer 方式我们可以定义一个异常队列和交换机,来接收其他交换机队列转发的无法处理的异常消息。然后我们可以查看其中的异常消息并进行人工处理。

2.1. 实现步骤

1. 将失败处理策略改为RepublishMessageRecoverer

2. 定义接收失败消息的交换机、队列及其绑定关系

3. 定义RepublishMessageRecoverer

2.2. 实现代码

2.2.1. 异常交换机队列回收期配置类

package com.example.consumer;import jakarta.annotation.Resource;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 异常交换机/队列/消息回收器配置类* ConditionalOnProperty 通过yml中的重试配置来选择该配置类是否启用*/
@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")
public class ErrorConfig {@Resourceprivate RabbitTemplate rabbitTemplate;@BeanQueue errorQueue() {return new Queue("error.queue");}@BeanDirectExchange errorExchange() {return new DirectExchange("error.direct");}@BeanBinding errorBind(Queue errorQueue, DirectExchange errorExchange) {return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");}@Beanpublic MessageRecoverer messageRecoverer() {return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

2.2.2. 常规交换机队列配置类 

package com.example.consumer;import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 常规的RabbitMQ 交换机/队列绑定配置类*/
@Configuration
public class RabbitMQConfig {@BeanQueue simpleQueue() {// 使用 QueueBuilder 创建一个持久化队列return QueueBuilder.durable("simple.queue").build();}
}

2.2.3. 消费者代码

package com.example.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消费者*/
@Slf4j
@Component
public class SimpleListener {@RabbitListener(queues = "simple.queue")public void listener1(String msg) throws Exception {
//        System.out.println("消费者1:人生是个不断攀登的过程【" + msg + "】");throw new Exception();}
}

2.2.4. 消费者yml配置 

# 消费者application.yml配置
spring:rabbitmq:host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhoulistener:simple:prefetch: 1acknowledge-mode: auto # none,关闭ack;manual,手动ack;auto:自动ack# 消费者重试机制配置retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初始的失败等待时长为1秒multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

2.2.5. 生产者代码 

package com.example.publisher;import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;/*** 生产者*/
@Slf4j
@SpringBootTest
class PublisherApplicationTests {@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid test() {rabbitTemplate.convertAndSend("simple.queue", "只要学不死,就往死里学!");}
}

2.2.6. 生产者yml配置 

# 生产者application.yml配置
spring:rabbitmq:# MQ连接配置host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhou

 2.2.7. 运行效果

最终效果是,我们在消费者的代码逻辑中会抛出异常,消息在反复投递消费失败后被重新入列到我们定义的异常交换机队列中:

相关文章:

第十二章 RabbitMQ之失败消息处理策略

目录 一、引言 二、RepublishMessageRecoverer 实现 2.1. 实现步骤 2.2. 实现代码 2.2.1. 异常交换机队列回收期配置类 2.2.2. 常规交换机队列配置类 2.2.3. 消费者代码 2.2.4. 消费者yml配置 2.2.5. 生产者代码 2.2.6. 生产者yml配置 2.2.7. 运行效果 一、引言 …...

23年408数据结构

第一题: 解析: 第一点,我们要知道顺序存储的特点:优点就是随用随取,就是你想要查询第几个元素可以直接查询出来,时间复杂度就是O(1),缺点就是不适合删除和插入,因为每次删除和插入一…...

vue3ElementPlu表格合并多行

// 单元格合并逻辑 const objectSpanMethod ({ row, rowIndex, columnIndex }) > { const previousMachineModelUniqueId rowIndex > 0 ? tableData.value[rowIndex - 1].machineModel : null; const currentMachineModelUniqueId row.machineModel; // 合并“机型”…...

MySQL数据库 - 索引(上)

目录 1 简介 1.1 索引是什么 1.2 为什么要使用索引 2 索引应该选择哪种数据结构 2.1 HASH 2.2 二叉搜索树 2.3 N叉树(B树) 2.4 B树 3 MySQL的页 3.1 为什么要使用页 3.2 页文件头和页文件尾 3.3 页主体 3.4 页目录 4 B树在MySQL索引中的应…...

redis与springBoot整合

前提 要实现,使用Redis存储登录状态 需要一个完整的前端后端的项目 前端项目搭建 解压脚手架 安装依赖 配置请求代理 选做: 禁用EsLint语法检查 Vue Admin Template关闭eslint校验,lintOnSave:false设置无效解决办法_lintonsave: false-CSDN博客 …...

YoloV9改进策略:BackBone改进|CAFormer在YoloV9中的创新应用,显著提升目标检测性能

摘要 在目标检测领域,模型性能的提升一直是研究者和开发者们关注的重点。近期,我们尝试将CAFormer模块引入YoloV9模型中,以替换其原有的主干网络,这一创新性的改进带来了显著的性能提升。 CAFormer,作为MetaFormer框架下的一个变体,结合了深度可分离卷积和普通自注意力…...

消防应急物资仓库管理系统

集驰电子消防装备仓库管理系统(DW-S302系统)是一套成熟系统,依托3D技术、大数据、RFID技术、数据库技术、对装备器材进行统一管理,以RFID射频识别技术为核心,构建以物资综合管理为基础,智能分析定位为主要特色功能的装备器材库综合…...

【论文阅读】Semi-Supervised Few-shot Learning via Multi-Factor Clustering

通过多因素聚类的半监督小样本学习 引用:Ling J, Liao L, Yang M, et al. Semi-supervised few-shot learning via multi-factor clustering[C]//Proceedings of the IEEE/CVF Conference on Computer Vision and Pattern Recognition. 2022: 14564-14573. 论文地址…...

第十三章 RabbitMQ之消息幂等性

目录 一、引言 二、消息幂等解决方案 2.1. 方案一 2.2. 方案二 一、引言 幂等是一个数学概念,用函数表达式来描述是这样的:f(x) f(f(x)) 。在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。有些业务…...

tpcms-master.zip

网盘:https://pan.notestore.cn/s.html?id34https://pan.notestore.cn/s.html?id34...

Spring国际化和Validation

SpringBoot国际化和Validation融合 场景 在应用交互时,可能需要根据客户端得语言来返回不同的语言数据。前端通过参数、请求头等往后端传入locale相关得参数,后端获取参数,根据不同得locale来获取不同得语言得文本信息返回给前端。 实现原…...

②EtherCAT转ModbusTCP, EtherCAT/Ethernet/IP/Profinet/ModbusTCP协议互转工业串口网关

EtherCAT/Ethernet/IP/Profinet/ModbusTCP协议互转工业串口网关https://item.taobao.com/item.htm?ftt&id822721028899 协议转换通信网关 EtherCAT 转 Modbus TCP (接上一章) GW系列型号 配置说明 上载 网线连接电脑到模块上的 WEB 网页设置网口&#…...

【华为HCIP实战课程八】OSPF网络类型及报文类型详解,网络工程师

一、点到点网络类型 1、两台路由器 2、支持广播、组播 P2P(PPP、HDLC、帧中继子接口) 我们需要三个维度考虑 A、是否自动通过组播发现邻居 B、时间(Hello和Dead) C、DR和BDR----多点接入网络需要用到(广播和NBMA) 点到点是组播自动发现邻居,Hello 10S,Dead 40S…...

信息安全工程师(28)机房安全分析与防护

前言 机房安全分析与防护是一个复杂而细致的过程,涉及到物理安全、环境控制、电力供应、数据安全、设备管理、人员管理以及紧急预案等多个方面。 一、机房安全分析 1. 物理安全威胁 非法入侵:未经授权的人员可能通过门窗、通风口等进入机房,…...

大数据处理从零开始————9.MapReduce编程实践之信息过滤之学生成绩统计demo

1.项目目标 1.1 需求概述 现在我们要统计某学校学生的成绩信息,筛选出成绩在60分及以上的学生。 1.2 业务分析 如果我们想实现该需求,可以通过编写一个MapReduce程序,来处理包含学生信息的文本文件,每行包含【学生的姓名&#x…...

自动化测试 | 窗口截图

driver.get_screenshot_as_file 是 Selenium WebDriver 的一个方法,它允许你将当前浏览器窗口(或标签页)的截图保存为文件。这个方法对于自动化测试中的截图验证非常有用,因为它可以帮助你捕获测试执行过程中的页面状态。 以下是…...

初中数学网上考试系统的设计与实现(论文+源码)_kaic

初中数学网上考试系统的设计与实现 学生: 指导教师: 摘 要:科技在人类的历史长流中愈洗愈精,不仅包括人们日常的生活起居,甚至还包括了考试的变化。之前的考试需要大量的时间和精力,组织者还需要挑选并考查…...

关系运算(3)

关系代数 昨天讲完附加关系代数运算,今天讲扩展关系代数运算。 扩展代数运算 正如其名,这种运算定义了前面基本和附加都没有的运算。 去重运算 可以将关系R中跟查询条件相关但是形成了重复的元组去除,只保留查询结果(简洁&…...

tp6的系统是如何上架的

TP6(ThinkPHP6)的系统上架过程,通常指的是将基于ThinkPHP6框架开发的应用程序部署到生产环境,并使其可以通过互联网访问。以下是一个大致的上架流程,包括准备工作、部署步骤以及后续维护等方面: 一、准备工…...

Vue:开发小技巧

目录 1. Table表格偏移 1. Table表格偏移 通过设置自小的宽度进行控制 :min-width <el-table-column label"操作" align"center" class-name"small-padding fixed-width" fixed"right" min-width"150px"><templa…...

Redis相关知识总结(缓存雪崩,缓存穿透,缓存击穿,Redis实现分布式锁,如何保持数据库和缓存一致)

文章目录 1.什么是Redis&#xff1f;2.为什么要使用redis作为mysql的缓存&#xff1f;3.什么是缓存雪崩、缓存穿透、缓存击穿&#xff1f;3.1缓存雪崩3.1.1 大量缓存同时过期3.1.2 Redis宕机 3.2 缓存击穿3.3 缓存穿透3.4 总结 4. 数据库和缓存如何保持一致性5. Redis实现分布式…...

【Redis技术进阶之路】「原理分析系列开篇」分析客户端和服务端网络诵信交互实现(服务端执行命令请求的过程 - 初始化服务器)

服务端执行命令请求的过程 【专栏简介】【技术大纲】【专栏目标】【目标人群】1. Redis爱好者与社区成员2. 后端开发和系统架构师3. 计算机专业的本科生及研究生 初始化服务器1. 初始化服务器状态结构初始化RedisServer变量 2. 加载相关系统配置和用户配置参数定制化配置参数案…...

抖音增长新引擎:品融电商,一站式全案代运营领跑者

抖音增长新引擎&#xff1a;品融电商&#xff0c;一站式全案代运营领跑者 在抖音这个日活超7亿的流量汪洋中&#xff0c;品牌如何破浪前行&#xff1f;自建团队成本高、效果难控&#xff1b;碎片化运营又难成合力——这正是许多企业面临的增长困局。品融电商以「抖音全案代运营…...

页面渲染流程与性能优化

页面渲染流程与性能优化详解&#xff08;完整版&#xff09; 一、现代浏览器渲染流程&#xff08;详细说明&#xff09; 1. 构建DOM树 浏览器接收到HTML文档后&#xff0c;会逐步解析并构建DOM&#xff08;Document Object Model&#xff09;树。具体过程如下&#xff1a; (…...

ABAP设计模式之---“简单设计原则(Simple Design)”

“Simple Design”&#xff08;简单设计&#xff09;是软件开发中的一个重要理念&#xff0c;倡导以最简单的方式实现软件功能&#xff0c;以确保代码清晰易懂、易维护&#xff0c;并在项目需求变化时能够快速适应。 其核心目标是避免复杂和过度设计&#xff0c;遵循“让事情保…...

【7色560页】职场可视化逻辑图高级数据分析PPT模版

7种色调职场工作汇报PPT&#xff0c;橙蓝、黑红、红蓝、蓝橙灰、浅蓝、浅绿、深蓝七种色调模版 【7色560页】职场可视化逻辑图高级数据分析PPT模版&#xff1a;职场可视化逻辑图分析PPT模版https://pan.quark.cn/s/78aeabbd92d1...

AI病理诊断七剑下天山,医疗未来触手可及

一、病理诊断困局&#xff1a;刀尖上的医学艺术 1.1 金标准背后的隐痛 病理诊断被誉为"诊断的诊断"&#xff0c;医生需通过显微镜观察组织切片&#xff0c;在细胞迷宫中捕捉癌变信号。某省病理质控报告显示&#xff0c;基层医院误诊率达12%-15%&#xff0c;专家会诊…...

腾讯云V3签名

想要接入腾讯云的Api&#xff0c;必然先按其文档计算出所要求的签名。 之前也调用过腾讯云的接口&#xff0c;但总是卡在签名这一步&#xff0c;最后放弃选择SDK&#xff0c;这次终于自己代码实现。 可能腾讯云翻新了接口文档&#xff0c;现在阅读起来&#xff0c;清晰了很多&…...

【C++进阶篇】智能指针

C内存管理终极指南&#xff1a;智能指针从入门到源码剖析 一. 智能指针1.1 auto_ptr1.2 unique_ptr1.3 shared_ptr1.4 make_shared 二. 原理三. shared_ptr循环引用问题三. 线程安全问题四. 内存泄漏4.1 什么是内存泄漏4.2 危害4.3 避免内存泄漏 五. 最后 一. 智能指针 智能指…...

代码规范和架构【立芯理论一】(2025.06.08)

1、代码规范的目标 代码简洁精炼、美观&#xff0c;可持续性好高效率高复用&#xff0c;可移植性好高内聚&#xff0c;低耦合没有冗余规范性&#xff0c;代码有规可循&#xff0c;可以看出自己当时的思考过程特殊排版&#xff0c;特殊语法&#xff0c;特殊指令&#xff0c;必须…...