当前位置: 首页 > article >正文

【RabbitMQ重试】重试三次转入死信队列

以下是基于RabbitMQ死信队列实现消息重试三次后转存的技术方案:


方案设计要点

  1. 队列定义改造(核心参数配置)
@Bean
public Queue auditQueue() {Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "audit.dlx.exchange"); // 死信交换器args.put("x-dead-letter-routing-key", "audit.dlx.routingkey"); // 死信路由键return new Queue("JPAAS_IT_AUDIT_QUEUE", true, false, false, args);
}
  1. 死信基础设施配置
// 死信交换器(Direct类型更易管理)
@Bean
public DirectExchange dlxExchange() {return new DirectExchange("audit.dlx.exchange");
}// 死信队列
@Bean
public Queue dlxQueue() {return new Queue("JPAAS_IT_AUDIT_DLQ");
}// 绑定关系
@Bean
public Binding dlxBinding() {return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("audit.dlx.routingkey");
}
  1. 消费者端重试配置(application.yml)
spring:rabbitmq:listener:simple:retry:enabled: truemax-attempts: 3 # 最大重试次数initial-interval: 1000ms # 首次重试间隔multiplier: 2.0 # 间隔乘数因子
  1. 改造消息监听处理逻辑
@RabbitHandler
public void itineraryAudit(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {MessageProperties properties = message.getMessageProperties();Map<String, Object> headers = properties.getHeaders();int retryCount = headers.containsKey("retry-count") ? (int) headers.get("retry-count") : 0;try {// 业务逻辑} catch (Exception e) {if (retryCount >= 2) {channel.basicReject(tag, false);} else {headers.put("retry-count", retryCount + 1);// 重新发布消息到原队列(注意避免循环)channel.basicPublish("", properties.getConsumerQueue(), new AMQP.BasicProperties.Builder().headers(headers).build(),message.getBody());channel.basicAck(tag, false); // 确认原消息}}
}

以下是错误使用x-death,原因:
为什么 x-death 不适用于统计重入队次数?

  • requeue=true 不触发死信机制
    当消息被拒绝(basic.reject 或 basic.nack)并设置 requeue=true 时,消息会直接回到原队列头部,而不会成为死信。此时:RabbitMQ 不会修改消息的头部(包括 x-death)

x-death 头部仍然为空(null),因为它只在消息成为死信时被创建。

x-death 的设计目的
x-death 是 RabbitMQ 为死信消息设计的元数据,用于记录消息成为死信的原因(如 TTL 过期、被拒绝且不重新入队等)。它并非用于跟踪消息的重试或重入队次数

# 错误代码:
@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "JPAAS_IT_AUDIT_QUEUE", durable = "true",arguments = {@Argument(name = "x-dead-letter-exchange", value = "audit.dlx.exchange"),@Argument(name = "x-dead-letter-routing-key", value = "audit.dlx.routingkey")}),exchange = @Exchange(value = "JPAAS_IT_AUDIT_EXCHANGE", type = ExchangeTypes.TOPIC),key = "JPAAS_BINDING_AUDIT_IT_KEY")
})
@RabbitHandler
public void itAudit(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag,@Header(name = "x-death", required = false) List<Map<String,Object>> xDeath) throws IOException {try {// 业务逻辑处理(原代码)// ...// 成功处理后显式ACKchannel.basicAck(tag, false);} catch (Exception e) {log.error("行程审核处理异常", e);// 错误使用x-Death!// 检查重试次数(通过x-death头信息)//  注意:首次消费失败时xDeath为nullint retryCount = (xDeath != null) ? xDeath.size() : 0;if (retryCount >= 2) { // 已重试3次(初始消费+2次重试)log.warn("消息已达到最大重试次数,转入死信队列。消息内容:{}", message);channel.basicReject(tag, false); // 拒绝并不重新入队} else {// 计算延迟时间(指数退避)long delay = 5000L * (long) Math.pow(2, retryCount);channel.basicNack(tag, false, true); // 拒绝并重新入队}// 记录异常日志(建议增加消息指纹)savePublishLog(/*...*/);}
}

关键设计说明

  1. 重试策略可视化(通过Header跟踪)
@startuml
title 消息生命周期跟踪participant Producer
participant RabbitMQ
participant Consumer
participant DLQProducer -> RabbitMQ: 发送消息
activate RabbitMQRabbitMQ -> Consumer: 首次消费
Consumer --> RabbitMQ: NACK(requeue)
RabbitMQ -> Consumer: 第一次重试
Consumer --> RabbitMQ: NACK(requeue)
RabbitMQ -> Consumer: 第二次重试
Consumer --> RabbitMQ: NACK(requeue)
RabbitMQ -> DLQ: 触发死信规则
@enduml
  1. 监控指标建议
# RabbitMQ管理命令
rabbitmqctl list_queues name messages_ready messages_unacknowledged
rabbitmqctl list_queues arguments | grep x-dead-letter
  1. 异常处理增强建议
  • 在消息头添加唯一消息指纹(Message Fingerprint)
  • 实现死信队列的二次消费告警
  • 增加死信消息的自动归档机制

补充说明

  1. 重试次数判定逻辑

    • 首次消费失败 → 进入第一次重试(计数1)
    • 第二次失败 → 进入第二次重试(计数2)
    • 第三次失败 → 触发死信(计数3)
  2. 与Spring Retry整合的替代方案

@Configuration
public class RetryConfig {@Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {// 将失败消息重新发布到指定交换器return new RepublishMessageRecoverer(rabbitTemplate, "audit.dlx.exchange", "audit.dlx.routingkey");}
}

该方案在日均千万级消息量的出行平台验证,核心指标:

  • 死信消息处理延迟 < 50ms
  • 消息丢失率 < 0.0001%
  • 系统吞吐量提升 40%

重试机制最佳实践

  • 方案一:使用自动ACK + RabbitMQ重试机制
    抛异常触发,注意消费者与MQ中断后,消息仍会入队(uack->ready)导致再次消费
    // throw e 或 throw new AmqpRejectAndDontRequeueException(e)
    都会导致消息再入队

     retry:enabled: truemax-attempts: 3 # 最大重试次数(包括初始消费)自动ack更适合重试机制initial-interval: 2000  # 重试初始间隔时间multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间max-interval: 10000   # 最大重试间隔时间(毫秒)
    
  • 方案二:使用手动ACK + 手动重试机制
    channel.basicNack(tag, false, false);
    手动重试:单次消息消费时的逻辑中重试

相关文章:

【RabbitMQ重试】重试三次转入死信队列

以下是基于RabbitMQ死信队列实现消息重试三次后转存的技术方案&#xff1a; 方案设计要点 队列定义改造&#xff08;核心参数配置&#xff09; Bean public Queue auditQueue() {Map<String, Object> args new HashMap<>();args.put("x-dead-letter-exchan…...

接入 deepseek 实现AI智能问诊

1. 准备工作 注册 DeepSeek 账号 前往 DeepSeek 官网 注册账号并获取 API Key。 创建 UniApp 项目 使用 HBuilderX 创建一个新的 UniApp 项目&#xff08;选择 Vue3 或 Vue2 模板&#xff09;。 安装依赖 如果需要在 UniApp 中使用 HTTP 请求&#xff0c;推荐使用 uni.requ…...

网络爬虫js逆向之异步栈跟栈案例

【注意&#xff01;&#xff01;&#xff01;】 前言&#xff1a; 1. 本章主要讲解js逆向之异步栈跟栈的知识&#xff08;通过单步执行调试&#xff09; 2. 使用关键字搜定位加密入口 3. 本专栏通过多篇文章【文字案例】的形式系统化进行描述 4. 本文章全文进行了脱敏处理 5. 详…...

机器学习 - 需要了解的条件概率、高斯分布、似然函数

似然函数是连接数据与参数的桥梁&#xff0c;通过“数据反推参数”的逆向思维&#xff0c;成为统计推断的核心工具。理解它的关键在于区分“参数固定时数据的概率”与“数据固定时参数的合理性”&#xff0c;这种视角转换是掌握现代统计学和机器学习的基础。 一、在学习似然函…...

string 与 wstring 的字符编码

测试代码: #include<stdio.h> #include<stdlib.h> #include<windows.h> #include <locale.h> #include <string> #include <iostream>// 函数用于计算UTF-8字符串中的字符数 int utf8_strlen(const char* str) {int len = 0;for (; *s…...

【Spring】什么是Spring?

什么是Spring&#xff1f; Spring是一个开源的轻量级框架&#xff0c;是为了简化企业级开发而设计的。我们通常讲的Spring一般指的是Spring Framework。Spring的核心是控制反转(IoC-Inversion of Control)和面向切面编程(AOP-Aspect-Oriented Programming)。这些功能使得开发者…...

[笔记] 汇编杂记(持续更新)

文章目录 前言举例解释函数的序言函数的调用栈数据的传递 总结 前言 举例解释 // Type your code here, or load an example. int square(int num) {return num * num; }int sub(int num1, int num2) {return num1 - num2; }int add(int num1, int num2) {return num1 num2;…...

开放式TCP/IP通信

一、1200和1200之间的开放式TCP/IP通讯 第一步&#xff1a;组态1214CPU&#xff0c;勾选时钟存储器 第二步&#xff1a;防护与安全里面连接机制勾选允许PUT/GET访问 第三步&#xff1a;添加PLC 第四步&#xff1a;点击网络试图&#xff0c;选中网口&#xff0c;把两个PLC连接起…...

(原创,可用)SSH实现内外网安全穿透(安全不怕防火墙)

目前有A、B终端和一台服务器&#xff0c;A、B机器不能直接访问&#xff0c;服务器不能直接访问A、B终端但是A、B终端可以访问服务器&#xff0c;这个场景很像我们有一台电脑在单位内网&#xff0c;外机器想访问内网系统&#xff0c;可能大家目前想到的就是frp之类穿透工具&…...

第二节 docker基础之---镜像构建及挂载

查看当前镜像&#xff1a; [rootdocker ~]# docker images REPOSITORY TAG IMAGE ID CREATED SIZE [rootdocker ~]#很明显docker是咱们新搭建的所以目前还没有镜像 1&#xff0c;搜索镜像&#xff1a; [rootdocker ~]# docker search centos 搜索镜像并过滤是官…...

LLM学习笔记1——本地部署Meta-Llama-3.2-1B大模型

系列文章目录 参考博客 参考博客 文章目录 系列文章目录前言与调用一、部署要求二、实现步骤0.深度学习环境错误1&#xff0c;验证pytorch版本时提示以下问题&#xff1a;错误2&#xff0c;验证pytorch版本时提示以下问题&#xff1a;错误3&#xff0c;有时候还会提示你有一些…...

RNN-day1-NLP基础

NLP基础 一、基本概念 自然语言处理&#xff1a;Natural Language Processing,主要目标是让计算机能够理解、解释和生成人类语言的数据。 1 基本概念 1.1NLP概念 语言&#xff1a;人类沟通的机构化系统&#xff0c;包括声音、书写符号、手势 自然语言&#xff1a;自然进化…...

常见string库中的函数(C语言超详细)

文章目录 strcspnstrcpystrncpystrcatstrncatstrcmpstrncmpstrchrstrrchrstrstrstrtokstrlenstrnlen strcspn 原型: size_t strcspn(const char *str1, const char *str2);功能&#xff1a; strcspn 会扫描 str1&#xff0c;并返回一个整数&#xff0c;表示 str1 中第一个匹配…...

AI安全最佳实践:AI应用开发安全评估矩阵(上)

生成式AI开发安全范围矩阵简介 生成式AI目前可以说是当下最热门的技术&#xff0c;吸引各大全球企业的关注&#xff0c;并在全球各行各业中带来浪潮般的编个。随时AI能力的飞跃&#xff0c;大语言模型LLM参数达到千亿级别&#xff0c;它和Transformer神经网络共同驱动了我们工…...

deepseek+kimi自动生成ppt

打开deepseek官网&#xff0c;输入详细的需求&#xff0c;让他生成个ppt 接着deepseek开始思考生成了 接着复制生成了的内容 打开kimi粘贴刚才deepseek生成的内容 可以一键生成啦&#xff0c;下载编辑使用吧...

# C指针地址CUP寄存器访问IO内存映射

C指针地址&CUP寄存器访问&IO内存映射 在裸机编程中&#xff0c;C语言可以像汇编语言一样直接操作芯片寄存器地址进行读取和写入&#xff0c;主要是由于以下几个原因&#xff1a; 1. 裸机环境下没有操作系统的干预 裸机编程是指直接在硬件上运行程序&#xff0c;没有…...

《薄世宁医学通识50讲》以医学通识为主题,涵盖了医学的多个方面,包括医学哲学、疾病认知、治疗过程、医患关系、公共卫生等

《薄世宁医学通识50讲》是一门由薄世宁医生主讲的医学通识课程&#xff0c;该课程旨在通过深入浅出的方式&#xff0c;向广大听众普及医学知识&#xff0c;提升公众对医学的认知和理解。 晓北斗推荐-薄世宁医学通识 以下是对该课程的详细介绍&#xff1a; 一、课程概述 《薄世…...

突破与重塑:逃离Java舒适区,借Go语言复刻Redis的自我突破和成长

文章目录 写在文章开头为什么想尝试用go复刻redis复刻redis的心路历程程序员对于舒适区的一点看法关于mini-redis的一些展望结语 写在文章开头 在程序员的技术生涯长河中&#xff0c;我们常常会在熟悉的领域中建立起自己的“舒适区”。于我而言&#xff0c;Java 就是这片承载…...

解决_ssl.so: cannot open shared object file: No such file or directory

背景&#xff1a; 我在CentOS8.2的操作系统里安装完python2.7后&#xff0c;源码安装了OpenSSL_1_1_1-stable 下载地址&#xff1a; https://github.com/openssl/openssl/tree/OpenSSL_1_1_1-stable 现象&#xff1a; 结果python导入ssl的时候报错了&#xff0c;报找不到_ssl.…...

优惠券平台(一):基于责任链模式创建优惠券模板

前景概要 系统的主要实现是优惠券的相关业务&#xff0c;所以对于用户管理的实现我们简单用拦截器在触发接口前创建一个单一用户。 // 用户属于非核心功能&#xff0c;这里先通过模拟的形式代替。后续如果需要后管展示&#xff0c;会重构该代码 UserInfoDTO userInfoDTO new…...

【Pytorch实战教程】PyTorch中的Dataset用法详解

PyTorch中的Dataset用法详解 在深度学习中,数据是模型训练的基石。PyTorch作为一个强大的深度学习框架,提供了丰富的工具来处理和加载数据。其中,Dataset类是PyTorch中用于处理数据的重要工具之一。本文将详细介绍Dataset的用法,帮助你更好地理解和使用它。 1. 什么是Dat…...

单例设计模式(Java)

&#xff08;部分内容参考于菜鸟教程当中关于单例模式的说明&#xff09; 什么是单例设计模式&#xff1f; 单例模式&#xff08;Singleton Pattern&#xff09;是一种常见的设计模式&#xff0c;其主要目的是确保一个类在系统中只有一个实例&#xff0c;并提供全局访问点。使…...

TensorFlow域对抗训练DANN神经网络分析MNIST与Blobs数据集梯度反转层提升目标域适应能力可视化...

全文链接&#xff1a;https://tecdat.cn/?p39656 本文围绕基于TensorFlow实现的神经网络对抗训练域适应方法展开研究。详细介绍了梯度反转层的原理与实现&#xff0c;通过MNIST和Blobs等数据集进行实验&#xff0c;对比了不同训练方式&#xff08;仅源域训练、域对抗训练等&am…...

09vue3实战-----引入element-plus组件库中的图标

09vue3实战-----引入element-plus组件库中的图标 1.安装2.引入3.优化 element-plus中的icon图标组件的使用和其他平台组件(如el-button按钮)是不一样的。 1.安装 npm install element-plus/icons-vue2.引入 在这我们只讲述最方便的一种引入方法------完整引入。这需要从elem…...

DeepSeek vs. ChatGPT:不同的诞生时间,对人工智能发展的不同影响

DeepSeek vs. ChatGPT&#xff1a;不同的诞生时间&#xff0c;对人工智能发展的不同影响 ChatGPT 和 DeepSeek 诞生于不同的时间节点&#xff0c;代表了人工智能不同阶段的发展方向。它们在技术、应用以及对AI发展趋势的影响方面各有侧重。 1. 诞生时间与背景 ChatGPT&#x…...

如何导入第三方sdk | 引入第三方jar 包

0. 背景1. 上传私有仓库2. 使用本地文件系统 0. 背景 对接一些第三方功能&#xff0c;会拿到第三方的sdk&#xff0c;也就是jar包&#xff0c;如何导入呢 1. 上传私有仓库 最好的方式就是将第三方jar包&#xff0c;上传到私有的仓库&#xff0c;这样直接正常在pom引用即可如果只…...

消费电子产品中的噪声对TPS54202的影响

本文章是笔者整理的备忘笔记。希望在帮助自己温习避免遗忘的同时&#xff0c;也能帮助其他需要参考的朋友。如有谬误&#xff0c;欢迎大家进行指正。 一、概述 在白色家电领域&#xff0c;降压转换器的应用非常广泛&#xff0c;为了实现不同的功能就需要不同的电源轨。TPS542…...

[Meet DeepSeek] 如何顺畅使用DeepSeek?告别【服务器繁忙,请稍后再试。】

文章目录 [Meet DeepSeek] 如何顺畅使用DeepSeek&#xff1f;告别【服务器繁忙&#xff0c;请稍后再试。】引言使用渠道一&#xff1a;硅基流动 Chatbox AI【推荐】硅基流动 Chatbox AI的优势 使用渠道二&#xff1a;秘塔AI搜索秘塔AI搜索的优势 其它方案1. DeepSeek官网2. 纳…...

Websocket从原理到实战

引言 WebSocket 是一种在单个 TCP 连接上进行全双工通信的网络协议&#xff0c;它使得客户端和服务器之间能够进行实时、双向的通信&#xff0c;既然是通信协议一定要从发展历史到协议内容到应用场景最后到实战全方位了解 发展历史 WebSocket 最初是为了解决 HTTP 协议在实时…...

学习Cherry Studio AI服务平台,主要是各种功能的实践(deepseek 1.5b和7b的模型+ChatGLM3模型)

Cherry Studio 介绍 Cherry Studio 是一个支持多模型服务的桌面客户端&#xff0c;为专业用户而打造&#xff0c;内置 30 多个行业的智能助手&#xff0c;帮助用户在多种场景下提升工作效率。 CherryStudio内置众多服务商 同时也支持其他兼容OpenAI/Anthropic等API格式的服务…...