当前位置: 首页 > news >正文

第十二章 RabbitMQ之失败消息处理策略

目录

一、引言

二、RepublishMessageRecoverer 实现

2.1. 实现步骤

2.2. 实现代码

2.2.1. 异常交换机队列回收期配置类

2.2.2. 常规交换机队列配置类 

2.2.3. 消费者代码

2.2.4. 消费者yml配置 

2.2.5. 生产者代码 

2.2.6. 生产者yml配置 

 2.2.7. 运行效果


一、引言

Spring AMQP提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限地requeue到mq。我们可以通过在application.yaml文件中添加配置来开启重试机制:

spring:rabbitmq:host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhoulistener:simple:prefetch: 1acknowledge-mode: manual # none,关闭ack;manual,手动ack;auto:自动ack# 消费者重试机制配置retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初始的失败等待时长为1秒multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息(默认方式)

ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(推荐)

二、RepublishMessageRecoverer 实现

在实际项目的生产环境中,通过 RepublishMessageRecoverer 方式我们可以定义一个异常队列和交换机,来接收其他交换机队列转发的无法处理的异常消息。然后我们可以查看其中的异常消息并进行人工处理。

2.1. 实现步骤

1. 将失败处理策略改为RepublishMessageRecoverer

2. 定义接收失败消息的交换机、队列及其绑定关系

3. 定义RepublishMessageRecoverer

2.2. 实现代码

2.2.1. 异常交换机队列回收期配置类

package com.example.consumer;import jakarta.annotation.Resource;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 异常交换机/队列/消息回收器配置类* ConditionalOnProperty 通过yml中的重试配置来选择该配置类是否启用*/
@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")
public class ErrorConfig {@Resourceprivate RabbitTemplate rabbitTemplate;@BeanQueue errorQueue() {return new Queue("error.queue");}@BeanDirectExchange errorExchange() {return new DirectExchange("error.direct");}@BeanBinding errorBind(Queue errorQueue, DirectExchange errorExchange) {return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");}@Beanpublic MessageRecoverer messageRecoverer() {return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

2.2.2. 常规交换机队列配置类 

package com.example.consumer;import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 常规的RabbitMQ 交换机/队列绑定配置类*/
@Configuration
public class RabbitMQConfig {@BeanQueue simpleQueue() {// 使用 QueueBuilder 创建一个持久化队列return QueueBuilder.durable("simple.queue").build();}
}

2.2.3. 消费者代码

package com.example.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消费者*/
@Slf4j
@Component
public class SimpleListener {@RabbitListener(queues = "simple.queue")public void listener1(String msg) throws Exception {
//        System.out.println("消费者1:人生是个不断攀登的过程【" + msg + "】");throw new Exception();}
}

2.2.4. 消费者yml配置 

# 消费者application.yml配置
spring:rabbitmq:host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhoulistener:simple:prefetch: 1acknowledge-mode: auto # none,关闭ack;manual,手动ack;auto:自动ack# 消费者重试机制配置retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初始的失败等待时长为1秒multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

2.2.5. 生产者代码 

package com.example.publisher;import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;/*** 生产者*/
@Slf4j
@SpringBootTest
class PublisherApplicationTests {@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid test() {rabbitTemplate.convertAndSend("simple.queue", "只要学不死,就往死里学!");}
}

2.2.6. 生产者yml配置 

# 生产者application.yml配置
spring:rabbitmq:# MQ连接配置host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhou

 2.2.7. 运行效果

最终效果是,我们在消费者的代码逻辑中会抛出异常,消息在反复投递消费失败后被重新入列到我们定义的异常交换机队列中:

相关文章:

第十二章 RabbitMQ之失败消息处理策略

目录 一、引言 二、RepublishMessageRecoverer 实现 2.1. 实现步骤 2.2. 实现代码 2.2.1. 异常交换机队列回收期配置类 2.2.2. 常规交换机队列配置类 2.2.3. 消费者代码 2.2.4. 消费者yml配置 2.2.5. 生产者代码 2.2.6. 生产者yml配置 2.2.7. 运行效果 一、引言 …...

23年408数据结构

第一题: 解析: 第一点,我们要知道顺序存储的特点:优点就是随用随取,就是你想要查询第几个元素可以直接查询出来,时间复杂度就是O(1),缺点就是不适合删除和插入,因为每次删除和插入一…...

vue3ElementPlu表格合并多行

// 单元格合并逻辑 const objectSpanMethod ({ row, rowIndex, columnIndex }) > { const previousMachineModelUniqueId rowIndex > 0 ? tableData.value[rowIndex - 1].machineModel : null; const currentMachineModelUniqueId row.machineModel; // 合并“机型”…...

MySQL数据库 - 索引(上)

目录 1 简介 1.1 索引是什么 1.2 为什么要使用索引 2 索引应该选择哪种数据结构 2.1 HASH 2.2 二叉搜索树 2.3 N叉树(B树) 2.4 B树 3 MySQL的页 3.1 为什么要使用页 3.2 页文件头和页文件尾 3.3 页主体 3.4 页目录 4 B树在MySQL索引中的应…...

redis与springBoot整合

前提 要实现,使用Redis存储登录状态 需要一个完整的前端后端的项目 前端项目搭建 解压脚手架 安装依赖 配置请求代理 选做: 禁用EsLint语法检查 Vue Admin Template关闭eslint校验,lintOnSave:false设置无效解决办法_lintonsave: false-CSDN博客 …...

YoloV9改进策略:BackBone改进|CAFormer在YoloV9中的创新应用,显著提升目标检测性能

摘要 在目标检测领域,模型性能的提升一直是研究者和开发者们关注的重点。近期,我们尝试将CAFormer模块引入YoloV9模型中,以替换其原有的主干网络,这一创新性的改进带来了显著的性能提升。 CAFormer,作为MetaFormer框架下的一个变体,结合了深度可分离卷积和普通自注意力…...

消防应急物资仓库管理系统

集驰电子消防装备仓库管理系统(DW-S302系统)是一套成熟系统,依托3D技术、大数据、RFID技术、数据库技术、对装备器材进行统一管理,以RFID射频识别技术为核心,构建以物资综合管理为基础,智能分析定位为主要特色功能的装备器材库综合…...

【论文阅读】Semi-Supervised Few-shot Learning via Multi-Factor Clustering

通过多因素聚类的半监督小样本学习 引用:Ling J, Liao L, Yang M, et al. Semi-supervised few-shot learning via multi-factor clustering[C]//Proceedings of the IEEE/CVF Conference on Computer Vision and Pattern Recognition. 2022: 14564-14573. 论文地址…...

第十三章 RabbitMQ之消息幂等性

目录 一、引言 二、消息幂等解决方案 2.1. 方案一 2.2. 方案二 一、引言 幂等是一个数学概念,用函数表达式来描述是这样的:f(x) f(f(x)) 。在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。有些业务…...

tpcms-master.zip

网盘:https://pan.notestore.cn/s.html?id34https://pan.notestore.cn/s.html?id34...

Spring国际化和Validation

SpringBoot国际化和Validation融合 场景 在应用交互时,可能需要根据客户端得语言来返回不同的语言数据。前端通过参数、请求头等往后端传入locale相关得参数,后端获取参数,根据不同得locale来获取不同得语言得文本信息返回给前端。 实现原…...

②EtherCAT转ModbusTCP, EtherCAT/Ethernet/IP/Profinet/ModbusTCP协议互转工业串口网关

EtherCAT/Ethernet/IP/Profinet/ModbusTCP协议互转工业串口网关https://item.taobao.com/item.htm?ftt&id822721028899 协议转换通信网关 EtherCAT 转 Modbus TCP (接上一章) GW系列型号 配置说明 上载 网线连接电脑到模块上的 WEB 网页设置网口&#…...

【华为HCIP实战课程八】OSPF网络类型及报文类型详解,网络工程师

一、点到点网络类型 1、两台路由器 2、支持广播、组播 P2P(PPP、HDLC、帧中继子接口) 我们需要三个维度考虑 A、是否自动通过组播发现邻居 B、时间(Hello和Dead) C、DR和BDR----多点接入网络需要用到(广播和NBMA) 点到点是组播自动发现邻居,Hello 10S,Dead 40S…...

信息安全工程师(28)机房安全分析与防护

前言 机房安全分析与防护是一个复杂而细致的过程,涉及到物理安全、环境控制、电力供应、数据安全、设备管理、人员管理以及紧急预案等多个方面。 一、机房安全分析 1. 物理安全威胁 非法入侵:未经授权的人员可能通过门窗、通风口等进入机房,…...

大数据处理从零开始————9.MapReduce编程实践之信息过滤之学生成绩统计demo

1.项目目标 1.1 需求概述 现在我们要统计某学校学生的成绩信息,筛选出成绩在60分及以上的学生。 1.2 业务分析 如果我们想实现该需求,可以通过编写一个MapReduce程序,来处理包含学生信息的文本文件,每行包含【学生的姓名&#x…...

自动化测试 | 窗口截图

driver.get_screenshot_as_file 是 Selenium WebDriver 的一个方法,它允许你将当前浏览器窗口(或标签页)的截图保存为文件。这个方法对于自动化测试中的截图验证非常有用,因为它可以帮助你捕获测试执行过程中的页面状态。 以下是…...

初中数学网上考试系统的设计与实现(论文+源码)_kaic

初中数学网上考试系统的设计与实现 学生: 指导教师: 摘 要:科技在人类的历史长流中愈洗愈精,不仅包括人们日常的生活起居,甚至还包括了考试的变化。之前的考试需要大量的时间和精力,组织者还需要挑选并考查…...

关系运算(3)

关系代数 昨天讲完附加关系代数运算,今天讲扩展关系代数运算。 扩展代数运算 正如其名,这种运算定义了前面基本和附加都没有的运算。 去重运算 可以将关系R中跟查询条件相关但是形成了重复的元组去除,只保留查询结果(简洁&…...

tp6的系统是如何上架的

TP6(ThinkPHP6)的系统上架过程,通常指的是将基于ThinkPHP6框架开发的应用程序部署到生产环境,并使其可以通过互联网访问。以下是一个大致的上架流程,包括准备工作、部署步骤以及后续维护等方面: 一、准备工…...

Vue:开发小技巧

目录 1. Table表格偏移 1. Table表格偏移 通过设置自小的宽度进行控制 :min-width <el-table-column label"操作" align"center" class-name"small-padding fixed-width" fixed"right" min-width"150px"><templa…...

力扣之1369.获取最近第二次的活动

题目&#xff1a; sql建表语句 Create table If Not Exists UserActivity (username varchar(30), activity varchar(30), startDate date, endDate date); Truncate table UserActivity; insert into UserActivity (username, activity, startDate, endDate) values (Alic…...

Python 和 Jupyter Kernel 版本不一致

使用jupyter notebook时明明已经安装了包&#xff0c;但是导入时提示&#xff1a; ModuleNotFoundError: No module named ptitprince 1、检查安装环境 !pip show ptitprince Name: ptitprince Version: 0.2.7 Summary: A Python implementation of Rainclouds, originally…...

Android常用布局

目录 布局文件中常见的属性 1. 基本布局属性 1&#xff09;android:layout_width 2&#xff09;android:layout_height 3&#xff09;android:layout_margin 4&#xff09;android:padding 2. 线性布局 (LinearLayout) 属性 1&#xff09;android:orientation 2&#xff09;and…...

初级网络工程师之从入门到入狱(五)

本文是我在学习过程中记录学习的点点滴滴&#xff0c;目的是为了学完之后巩固一下顺便也和大家分享一下&#xff0c;日后忘记了也可以方便快速的复习。 网络工程师从入门到入狱 前言一、链路聚合1.1、手动进行链路聚合1.1.1、 拓扑图&#xff1a;1.1.2、 LSW11.1.3、 LSW2 1.2、…...

JavaScript轮播图实现

这个代码创建了一个简单的轮播图&#xff0c;可以通过点击左右箭头或自动播放来切换图片。 <!DOCTYPE html> <html><head><meta charset"utf-8" /><title>js轮播图练习</title><style>.box {width: 60vw;height: 500px;m…...

【LLM开源项目】LLMs-开发框架-Langchain-Tutorials-Basics-v2.0

【1】使用LCEL构建简单的LLM应用程序(Build a Simple LLM Application with LCEL) https://python.langchain.com/docs/tutorials/llm_chain/ 如何使用LangChain构建简单的LLM应用程序。功能&#xff1a;将把文本从英语翻译成另一种语言。 实现&#xff1a;LLM调用加上一些提…...

Python 爬取天气预报并进行可视化分析

今天&#xff0c;我们就来学习如何使用 Python 爬取天气预报数据&#xff0c;并用数据可视化的方式将未来几天的天气信息一目了然地展示出来。 在本文中&#xff0c;我们将分三步完成这一任务&#xff1a; 使用 Python 爬取天气数据数据解析与处理用可视化展示天气趋势 让我…...

最左侧冗余覆盖子串

题目描述 给定两个字符串 s1 和 s2 和正整数 k&#xff0c;其中 s1 长度为 n1&#xff0c;s2 长度为 n2。 在 s2 中选一个子串&#xff0c;若满足下面条件&#xff0c;则称 s2 以长度 k 冗余覆盖 s1 该子串长度为 n1 k 该子串中包含 s1 中全部字母 该子串每个字母出现次数…...

性能测试-JMeter(2)

JMeter JMeter断言响应断言JSON断言断言持续时间 JMeter关联正则表达式提取器正则表达式正则表达式提取器 XPath提取器JSON提取器 JMeter属性JMeter录制脚本 JMeter断言 断言&#xff1a;让程序自动判断预期结果和实际结果是否一致 提示&#xff1a; -Jmeter在请求的返回层面有…...

芯课堂 | Synwit_UI_Creator(μgui)平台之图像处理篇

今天小编给大家介绍的是UI_Creator&#xff08;μgui&#xff09;平台下关于图像处理的选项。 UI_Creator&#xff08;μgui&#xff09;平台图片类控件有图像控件和分级图像控件&#xff0c;均包含以下选项&#xff1a; 1、消除水波纹&#xff1a; 由于16位真彩色&#xff08…...