第十二章 RabbitMQ之失败消息处理策略
目录
一、引言
二、RepublishMessageRecoverer 实现
2.1. 实现步骤
2.2. 实现代码
2.2.1. 异常交换机队列回收期配置类
2.2.2. 常规交换机队列配置类
2.2.3. 消费者代码
2.2.4. 消费者yml配置
2.2.5. 生产者代码
2.2.6. 生产者yml配置
2.2.7. 运行效果
一、引言
Spring AMQP提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限地requeue到mq。我们可以通过在application.yaml文件中添加配置来开启重试机制:
spring:rabbitmq:host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhoulistener:simple:prefetch: 1acknowledge-mode: manual # none,关闭ack;manual,手动ack;auto:自动ack# 消费者重试机制配置retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初始的失败等待时长为1秒multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息(默认方式)
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(推荐)
二、RepublishMessageRecoverer 实现
在实际项目的生产环境中,通过 RepublishMessageRecoverer 方式我们可以定义一个异常队列和交换机,来接收其他交换机队列转发的无法处理的异常消息。然后我们可以查看其中的异常消息并进行人工处理。

2.1. 实现步骤
1. 将失败处理策略改为RepublishMessageRecoverer
2. 定义接收失败消息的交换机、队列及其绑定关系
3. 定义RepublishMessageRecoverer
2.2. 实现代码
2.2.1. 异常交换机队列回收期配置类
package com.example.consumer;import jakarta.annotation.Resource;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 异常交换机/队列/消息回收器配置类* ConditionalOnProperty 通过yml中的重试配置来选择该配置类是否启用*/
@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")
public class ErrorConfig {@Resourceprivate RabbitTemplate rabbitTemplate;@BeanQueue errorQueue() {return new Queue("error.queue");}@BeanDirectExchange errorExchange() {return new DirectExchange("error.direct");}@BeanBinding errorBind(Queue errorQueue, DirectExchange errorExchange) {return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");}@Beanpublic MessageRecoverer messageRecoverer() {return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}
2.2.2. 常规交换机队列配置类
package com.example.consumer;import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 常规的RabbitMQ 交换机/队列绑定配置类*/
@Configuration
public class RabbitMQConfig {@BeanQueue simpleQueue() {// 使用 QueueBuilder 创建一个持久化队列return QueueBuilder.durable("simple.queue").build();}
}
2.2.3. 消费者代码
package com.example.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消费者*/
@Slf4j
@Component
public class SimpleListener {@RabbitListener(queues = "simple.queue")public void listener1(String msg) throws Exception {
// System.out.println("消费者1:人生是个不断攀登的过程【" + msg + "】");throw new Exception();}
}
2.2.4. 消费者yml配置
# 消费者application.yml配置
spring:rabbitmq:host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhoulistener:simple:prefetch: 1acknowledge-mode: auto # none,关闭ack;manual,手动ack;auto:自动ack# 消费者重试机制配置retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初始的失败等待时长为1秒multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
2.2.5. 生产者代码
package com.example.publisher;import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;/*** 生产者*/
@Slf4j
@SpringBootTest
class PublisherApplicationTests {@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid test() {rabbitTemplate.convertAndSend("simple.queue", "只要学不死,就往死里学!");}
}
2.2.6. 生产者yml配置
# 生产者application.yml配置
spring:rabbitmq:# MQ连接配置host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhou
2.2.7. 运行效果
最终效果是,我们在消费者的代码逻辑中会抛出异常,消息在反复投递消费失败后被重新入列到我们定义的异常交换机队列中:


相关文章:
第十二章 RabbitMQ之失败消息处理策略
目录 一、引言 二、RepublishMessageRecoverer 实现 2.1. 实现步骤 2.2. 实现代码 2.2.1. 异常交换机队列回收期配置类 2.2.2. 常规交换机队列配置类 2.2.3. 消费者代码 2.2.4. 消费者yml配置 2.2.5. 生产者代码 2.2.6. 生产者yml配置 2.2.7. 运行效果 一、引言 …...
23年408数据结构
第一题: 解析: 第一点,我们要知道顺序存储的特点:优点就是随用随取,就是你想要查询第几个元素可以直接查询出来,时间复杂度就是O(1),缺点就是不适合删除和插入,因为每次删除和插入一…...
vue3ElementPlu表格合并多行
// 单元格合并逻辑 const objectSpanMethod ({ row, rowIndex, columnIndex }) > { const previousMachineModelUniqueId rowIndex > 0 ? tableData.value[rowIndex - 1].machineModel : null; const currentMachineModelUniqueId row.machineModel; // 合并“机型”…...
MySQL数据库 - 索引(上)
目录 1 简介 1.1 索引是什么 1.2 为什么要使用索引 2 索引应该选择哪种数据结构 2.1 HASH 2.2 二叉搜索树 2.3 N叉树(B树) 2.4 B树 3 MySQL的页 3.1 为什么要使用页 3.2 页文件头和页文件尾 3.3 页主体 3.4 页目录 4 B树在MySQL索引中的应…...
redis与springBoot整合
前提 要实现,使用Redis存储登录状态 需要一个完整的前端后端的项目 前端项目搭建 解压脚手架 安装依赖 配置请求代理 选做: 禁用EsLint语法检查 Vue Admin Template关闭eslint校验,lintOnSave:false设置无效解决办法_lintonsave: false-CSDN博客 …...
YoloV9改进策略:BackBone改进|CAFormer在YoloV9中的创新应用,显著提升目标检测性能
摘要 在目标检测领域,模型性能的提升一直是研究者和开发者们关注的重点。近期,我们尝试将CAFormer模块引入YoloV9模型中,以替换其原有的主干网络,这一创新性的改进带来了显著的性能提升。 CAFormer,作为MetaFormer框架下的一个变体,结合了深度可分离卷积和普通自注意力…...
消防应急物资仓库管理系统
集驰电子消防装备仓库管理系统(DW-S302系统)是一套成熟系统,依托3D技术、大数据、RFID技术、数据库技术、对装备器材进行统一管理,以RFID射频识别技术为核心,构建以物资综合管理为基础,智能分析定位为主要特色功能的装备器材库综合…...
【论文阅读】Semi-Supervised Few-shot Learning via Multi-Factor Clustering
通过多因素聚类的半监督小样本学习 引用:Ling J, Liao L, Yang M, et al. Semi-supervised few-shot learning via multi-factor clustering[C]//Proceedings of the IEEE/CVF Conference on Computer Vision and Pattern Recognition. 2022: 14564-14573. 论文地址…...
第十三章 RabbitMQ之消息幂等性
目录 一、引言 二、消息幂等解决方案 2.1. 方案一 2.2. 方案二 一、引言 幂等是一个数学概念,用函数表达式来描述是这样的:f(x) f(f(x)) 。在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。有些业务…...
tpcms-master.zip
网盘:https://pan.notestore.cn/s.html?id34https://pan.notestore.cn/s.html?id34...
Spring国际化和Validation
SpringBoot国际化和Validation融合 场景 在应用交互时,可能需要根据客户端得语言来返回不同的语言数据。前端通过参数、请求头等往后端传入locale相关得参数,后端获取参数,根据不同得locale来获取不同得语言得文本信息返回给前端。 实现原…...
②EtherCAT转ModbusTCP, EtherCAT/Ethernet/IP/Profinet/ModbusTCP协议互转工业串口网关
EtherCAT/Ethernet/IP/Profinet/ModbusTCP协议互转工业串口网关https://item.taobao.com/item.htm?ftt&id822721028899 协议转换通信网关 EtherCAT 转 Modbus TCP (接上一章) GW系列型号 配置说明 上载 网线连接电脑到模块上的 WEB 网页设置网口&#…...
【华为HCIP实战课程八】OSPF网络类型及报文类型详解,网络工程师
一、点到点网络类型 1、两台路由器 2、支持广播、组播 P2P(PPP、HDLC、帧中继子接口) 我们需要三个维度考虑 A、是否自动通过组播发现邻居 B、时间(Hello和Dead) C、DR和BDR----多点接入网络需要用到(广播和NBMA) 点到点是组播自动发现邻居,Hello 10S,Dead 40S…...
信息安全工程师(28)机房安全分析与防护
前言 机房安全分析与防护是一个复杂而细致的过程,涉及到物理安全、环境控制、电力供应、数据安全、设备管理、人员管理以及紧急预案等多个方面。 一、机房安全分析 1. 物理安全威胁 非法入侵:未经授权的人员可能通过门窗、通风口等进入机房,…...
大数据处理从零开始————9.MapReduce编程实践之信息过滤之学生成绩统计demo
1.项目目标 1.1 需求概述 现在我们要统计某学校学生的成绩信息,筛选出成绩在60分及以上的学生。 1.2 业务分析 如果我们想实现该需求,可以通过编写一个MapReduce程序,来处理包含学生信息的文本文件,每行包含【学生的姓名&#x…...
自动化测试 | 窗口截图
driver.get_screenshot_as_file 是 Selenium WebDriver 的一个方法,它允许你将当前浏览器窗口(或标签页)的截图保存为文件。这个方法对于自动化测试中的截图验证非常有用,因为它可以帮助你捕获测试执行过程中的页面状态。 以下是…...
初中数学网上考试系统的设计与实现(论文+源码)_kaic
初中数学网上考试系统的设计与实现 学生: 指导教师: 摘 要:科技在人类的历史长流中愈洗愈精,不仅包括人们日常的生活起居,甚至还包括了考试的变化。之前的考试需要大量的时间和精力,组织者还需要挑选并考查…...
关系运算(3)
关系代数 昨天讲完附加关系代数运算,今天讲扩展关系代数运算。 扩展代数运算 正如其名,这种运算定义了前面基本和附加都没有的运算。 去重运算 可以将关系R中跟查询条件相关但是形成了重复的元组去除,只保留查询结果(简洁&…...
tp6的系统是如何上架的
TP6(ThinkPHP6)的系统上架过程,通常指的是将基于ThinkPHP6框架开发的应用程序部署到生产环境,并使其可以通过互联网访问。以下是一个大致的上架流程,包括准备工作、部署步骤以及后续维护等方面: 一、准备工…...
Vue:开发小技巧
目录 1. Table表格偏移 1. Table表格偏移 通过设置自小的宽度进行控制 :min-width <el-table-column label"操作" align"center" class-name"small-padding fixed-width" fixed"right" min-width"150px"><templa…...
理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端
🌟 什么是 MCP? 模型控制协议 (MCP) 是一种创新的协议,旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议,它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...
SpringBoot+uniapp 的 Champion 俱乐部微信小程序设计与实现,论文初版实现
摘要 本论文旨在设计并实现基于 SpringBoot 和 uniapp 的 Champion 俱乐部微信小程序,以满足俱乐部线上活动推广、会员管理、社交互动等需求。通过 SpringBoot 搭建后端服务,提供稳定高效的数据处理与业务逻辑支持;利用 uniapp 实现跨平台前…...
招商蛇口 | 执笔CID,启幕低密生活新境
作为中国城市生长的力量,招商蛇口以“美好生活承载者”为使命,深耕全球111座城市,以央企担当匠造时代理想人居。从深圳湾的开拓基因到西安高新CID的战略落子,招商蛇口始终与城市发展同频共振,以建筑诠释对土地与生活的…...
【SSH疑难排查】轻松解决新版OpenSSH连接旧服务器的“no matching...“系列算法协商失败问题
【SSH疑难排查】轻松解决新版OpenSSH连接旧服务器的"no matching..."系列算法协商失败问题 摘要: 近期,在使用较新版本的OpenSSH客户端连接老旧SSH服务器时,会遇到 "no matching key exchange method found", "n…...
AI语音助手的Python实现
引言 语音助手(如小爱同学、Siri)通过语音识别、自然语言处理(NLP)和语音合成技术,为用户提供直观、高效的交互体验。随着人工智能的普及,Python开发者可以利用开源库和AI模型,快速构建自定义语音助手。本文由浅入深,详细介绍如何使用Python开发AI语音助手,涵盖基础功…...
永磁同步电机无速度算法--基于卡尔曼滤波器的滑模观测器
一、原理介绍 传统滑模观测器采用如下结构: 传统SMO中LPF会带来相位延迟和幅值衰减,并且需要额外的相位补偿。 采用扩展卡尔曼滤波器代替常用低通滤波器(LPF),可以去除高次谐波,并且不用相位补偿就可以获得一个误差较小的转子位…...
微服务通信安全:深入解析mTLS的原理与实践
🔥「炎码工坊」技术弹药已装填! 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 一、引言:微服务时代的通信安全挑战 随着云原生和微服务架构的普及,服务间的通信安全成为系统设计的核心议题。传统的单体架构中&…...
LangChain 中的文档加载器(Loader)与文本切分器(Splitter)详解《二》
🧠 LangChain 中 TextSplitter 的使用详解:从基础到进阶(附代码) 一、前言 在处理大规模文本数据时,特别是在构建知识库或进行大模型训练与推理时,文本切分(Text Splitting) 是一个…...
Xcode 16 集成 cocoapods 报错
基于 Xcode 16 新建工程项目,集成 cocoapods 执行 pod init 报错 ### Error RuntimeError - PBXGroup attempted to initialize an object with unknown ISA PBXFileSystemSynchronizedRootGroup from attributes: {"isa">"PBXFileSystemSynchro…...
电脑桌面太单调,用Python写一个桌面小宠物应用。
下面是一个使用Python创建的简单桌面小宠物应用。这个小宠物会在桌面上游荡,可以响应鼠标点击,并且有简单的动画效果。 import tkinter as tk import random import time from PIL import Image, ImageTk import os import sysclass DesktopPet:def __i…...
