RabbitMq-发布确认高级(避坑指南版)
在初学rabbitMq的时候,伙伴们肯定已经接触到了“发布确认”的概念,但是到了后期学习中,会接触到“springboot”中使用“发布确认”高级的概念。后者主要是解决什么问题呢?或者是什么样的场景引出这样的概念呢?
在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 rabbitmq 重启期间生产者投递失败,导致消息丢失,需要手动处理和恢复。因此为了确保rabbitmq 的消息可靠投递,特别是在这样比较极端的情况,rabbitmq 集群不可用的时候,对无法投递的消息进行处理。
废话不说直接开始撸代码!!!在代码中解决实际问题~
一、代码架构分析:
接触到这里,对于一条完整的“rabbitmq消息”发布链的构成大家已经不陌生了。主要是由:“消息生产者”、“交换机”、“队列”、“消费者”四个方面构成,如图所示:

二、构造“配置类”代码:
声明交换机“confirm_exchange”、声明队列“confirm_queue”、通过routing-key对交换机和队列进行绑定。
package com.example.rabbitmq_demo.fabuquerengaoji;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @ClaseName: ConfirmConfig$* @Description:配置类 发布确认(高级)* @Author: wuhs* @Date: 2023/8/16$ 14:32$* 快捷键ctrl+shift+u 字母大小写转化*/
@Configuration
public class ConfirmConfig {// 交换机public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";// 队列public static final String CONFIRM_QUEUE_NAME = "confirm_queue";// ROUTING-KEYpublic static final String CONFIRM_ROUTING_KEY = "key1";@Beanpublic DirectExchange confirmExchange() {return new DirectExchange(CONFIRM_EXCHANGE_NAME);}@Beanpublic Queue confirmQueue() {return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}@Beanpublic Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange directExchange,@Qualifier("confirmQueue") Queue queue) {//一般使用在项目中使用@Qualifier来限定注入的Bean。return BindingBuilder.bind(queue).to(directExchange).with(CONFIRM_ROUTING_KEY);}
}
三、构建消费者代码:
通过@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)来监听队列,以此“充当”消费者。这一块也没啥好说的,直接上代码!
package com.example.rabbitmq_demo.fabuquerengaoji;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @ClaseName: Consumer$* @Description:消费者* @Author: wuhs* @Date: 2023/8/16$ 15:18$】*/
@Slf4j
@Component
public class Consumer {@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)public void reciverConfirmMessage(Message message) {String msg = new String(message.getBody());log.info("接收到的队列confirm.queue消息:{}", msg);}
}
四、创建“回调”方法
在最开始我们说到“确保rabbitmq 的消息可靠投递”的概念,那么具体如何确保呢?如果我们在消费者每次消费成功、未消费成功交换机都能进行“回调”确认,是不是就能知道哪些消息消费成功、哪些没有消费成功呢?
在RabbitTemplate中有一个方法接口(ConfirmCallback),我们只需要实现这个接口并实现“confirm”方法,并将它注入进RabbitTemplate工具中即可创建“回调”。具体代码如下:
package com.example.rabbitmq_demo.fabuquerengaoji;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/*** @ClaseName: MyCallBack$* @Description:* @Author: wuhs* @Date: 2023/8/16$ 16:17$*/
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {// 注入rabbitTemplate.setConfirmCallback(this);}//交换机确认回调方法 @Overridepublic void confirm(CorrelationData correlationData, boolean ack, String reason) {String id = correlationData != null ? correlationData.getId() : "";if (ack) {//发消息 交换机接收到了消息 回调log.info("交换机已经收到了ID为:{}的消息", id);} else {//发消息 交换机没有接收到了消息 回调log.info("交换机没有收到了ID为:{}发的的消息,失败的原因是:{}", id, reason);}}
}
confirm方法参数介绍:
* 1. correlationData 保存回调消息的ID及相关信息 * 2. 交换机是否收到了消息 ack=true(收到)、ack=false(未收到) * 3. reason 失败的原因
六、配置类声明:(application.yml)
在这里需要注意!!这也是最容易踩得坑,不知道有没有小伙伴遇没遇到,“publisher-confirm-type: correlated”也声明了,但是项目创建启动发布消息之后“没有成功回调”的情况,查看了很多的文章,很多博主只配置了publisher-confirm-type、但是并没有开启“confirm 确认机制”,所以会存在“误导”,导致一直找不到失败的原因~具体正确配置,看代码:
server:port: 8899spring:rabbitmq:host: 124.221.94.214port: 5672username: xgsmpassword: xgsm123# 发送者开启 confirm 确认机制publisher-confirms: truepublisher-confirm-type: correlated
publisher-confirm-type参数介绍:
publisher-confirm-type这个参数一共有三种配置方法:
# NONE:禁用发布确认,是默认值。
# CORRELATED:发布消息后,交换机会触发回调方法。
# SIMPLE:有两种效果:
1:和CORRELATED一样会触发回调方法
2:发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,
# 要注意的点是waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker。
七、创建Controller层(消息生产者)
这里演示三种情况。第一种为正常情况下,发送成功后的回调;第二种消息为发送失败、当交换机不存在则发送失败(模拟发送失败),所以将交换机名称修改即可
package com.example.rabbitmq_demo.fabuquerengaoji;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @ClaseName: ProducerController$* @Description:消息生产者* @Author: wuhs* @Date: 2023/8/16$ 14:58$*/
@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;// @PathVariable主要作用:映射URL绑定的占位符@RequestMapping("/sendMessage/{message}")public void sendMessage(@PathVariable String message) {//正常发送CorrelationData correlationData = new CorrelationData("1");rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData);//发送失败-交换机不存在的情况CorrelationData correlationData2 = new CorrelationData("2");rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME+"2", ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData2);log.info("发送的消息为:{}", message);}
}
测试结果:
如果是routing-key错误,这种情况会触发回调嘛?让我们验证一下;修改routing-key为“错误值”
CorrelationData correlationData3 = new CorrelationData("3");rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY+"key3", message,correlationData3);
测试结果:

通过结果可以看出,消息发送成功了,而且也触发了“成功的回调”。但是我们知道的是,由于路由失败,这里消费者并没有对消息进行消费,这是为什么呢?那是因为,在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。解决方式为:通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。具体操作如下
1、application.yml文件中添加消息回退配置
# 发送者开启 return 确认机制publisher-returns: true
2、实现RabbitTemplate中的方法接口ReturnCallback,并实现“returnedMessage”方法,最后将类注入到RabbitTemplate的RabbitTemplate中,详细代码如下:
package com.example.rabbitmq_demo.fabuquerengaoji;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/*** @ClaseName: MyCallBack$* @Description:* @Author: wuhs* @Date: 2023/8/16$ 16:17$*/
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {// 注入rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}/*** 交换机确认回调方法* 1、发消息 交换机接收到了消息 回调* 1.1 correlationData 保存回调消息的ID及相关信息* 1.2 交换机收到消息 ack=true* 2、发消息 交换机接收失败了 回调* 2.1 correlationData 保存回调消息的ID及相关信息* 2.2 交换机接收到消息 ack=false* 2.3 reason 失败的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String reason) {String id = correlationData != null ? correlationData.getId() : "";if (ack) {log.info("交换机已经收到了ID为:{}的消息", id);} else {log.info("交换机没有收到了ID为:{}发的的消息,失败的原因是:{}", id, reason);}}//可以在当消息传递的过长中不可达目的地时将消息返回给生产者// 只有不可待目的地的时候 才进行回退@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("消息{},被交换机{}退回,退回原因:{},路由key:{}", message, exchange, replyText, routingKey);}
}
测试结果:
2023-08-17 10:36:32.476 INFO 21108 --- [221.94.214:5672] c.e.r.fabuquerengaoji.MyCallBack : 消息(Body:'消息确认发布测试' MessageProperties [headers={spring_returned_message_correlation=3}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),被交换机confirm_exchange退回,退回原因:NO_ROUTE,路由key:key1key3
问题解决!~
相关文章:
RabbitMq-发布确认高级(避坑指南版)
在初学rabbitMq的时候,伙伴们肯定已经接触到了“发布确认”的概念,但是到了后期学习中,会接触到“springboot”中使用“发布确认”高级的概念。后者主要是解决什么问题呢?或者是什么样的场景引出这样的概念呢? 在生产环…...
Blender增强现实3D模型制作指南【AR】
推荐:用 NSDT编辑器 快速搭建可编程3D场景 将静态和动画 3D 内容集成到移动增强现实 (AR) 体验中是增强用户沉浸感和参与度的高效方法。 然而,为 AR 创建 3D 对象可能相当艰巨,尤其是对于那些缺乏 3D 建模经验的人来说。 与添加视频或照片 AR…...
Java查看https证书过期时间(JKS,CERT)
在这里需要使用X.509 证书的抽象类 X509Certificate 。此类提供了一种访问 X.509 证书所有属性的标准方式。 这些证书被广泛使用以支持 Internet 安全系统中的身份验证和其他功能。常见的应用包括增强保密邮件 (PEM)、传输层安全 (SSL)、用于受信任软件发布的代码签名和安全电…...
关于vue,记录一次修饰符.stop和.once的使用,以及猜想。
内置指令 | Vue.js 在vue的api里,关于v-on有stop和once两个事件标签。 .stop - 调用 event.stopPropagation()。.once - 最多触发一次处理函数。 原有主要代码和页面效果 (无stop和once): ...<div class"div" click"di…...
解决git reset --soft HEAD^撤销commit时报错
今天在使用git回退功能的时候,遇到以下错误: 解决git reset --soft HEAD^撤销commit时报错 问题: 在进行完commit后,想要撤销该commit,于是使用了git reset --soft HEAD^命令,但是出现如下报错࿱…...
【BASH】回顾与知识点梳理(三十四)
【BASH】回顾与知识点梳理 三十四 三十四. 认识系统服务(二)34.1 systemctl 针对 service 类型的配置文件systemctl 配置文件相关目录简介systemctl 配置文件的设定项目简介[Unit] 部份[Service] 部份[Install] 部份 两个 vsftpd 运作的实例多重的重复设…...
Python可视化在量化交易中的应用(11)_Seaborn折线图
举个栗子,用seaborn绘制折线图。 Seaborn中折线图的绘制方法 在seaborn中,我们一般使用sns作为seaborn模块的别名,因此,在下文中,均以sns指代seaborn模块。 seaborn中绘制折线图使用的是sns.plot()函数: …...
无涯教程-TensorFlow - TensorBoard可视化
TensorFlow包含一个可视化工具,称为TensorBoard,它用于分析数据流图,还用于了解机器学习模型。 TensorBoard的重要功能包括查看有关垂直对齐的任何图形的参数和详细信息的不同类型统计的视图。 深度神经网络包括多达36,000个节点…...
[uni-app] uview封装Popup组件,处理props及v-model的传值问题
文章目录 需求及效果遇到的问题解决的办法偷懒的写法 需求及效果 uView(1.x版本)中, 有Pop弹出层的组件, 现在有个需求是,进行简单封装,有些通用的设置不想每次都写(比如 :mask-custom-style"{background: rgba(0, 0, 0, 0.7)}"这种) 然后内部内容交给插槽去自己随…...
【C++】int a;和int *p=new int;有什么区别?
2023年8月19日,周六早上 int a; 和 int *p new int; 之间有以下区别: 1. 内存分配方式:int a; 是在栈上分配内存,而 int *p new int; 是在堆上动态分配内存。 2. 生命周期:int a; 的生命周期与其所在的作用域相同&…...
redis事务管理
目录 一、redis事务定义 二、事务控制命令——Multi、Exec、discard 三、事务的错误处理 四、事务的冲突问题 悲观锁 乐观锁 WATCH unwatch 五、事务特性 单独的隔离操作 没有隔离级别的概念 不保证原子性 一、redis事务定义 Redis 事务是一个单独的隔离操作&…...
TPS_C++版本及功能支持备注
TPS_C版本及功能支持备注 相关参考链接C23:https://zh.cppreference.com/w/cpp/23 相关参考链接C20:https://zh.cppreference.com/w/cpp/20 相关参考链接C17:https://zh.cppreference.com/w/cpp/17 相关参考链接C14:https://zh.cp…...
同步jenkinsfile流水线(sync-job)
环境 变量:env(环境变量:sit/dev/simulation/prod/all),job(job-name/all)目录:/var/lib/jenkins/jenkinsfile environment.json: [roottest-01 jenkinsfile]# cat env…...
STM32单片机WIFI-APP智能温室大棚系统CO2土壤湿度空气温湿度补光
实践制作DIY- GC0161--智能温室大棚系统 基于STM32单片机设计---智能温室大棚系统 二、功能介绍: 电路组成:STM32F103CXT6最小系统LCD1602显示器DHT11空气温度湿度光敏电阻光强土壤湿度传感器SGP30二氧化碳传感器 1个继电器(空气加湿&#x…...
SpringBoot复习:(52)不再需要使用@EnableTransactionManagement的原因
在Spring项目中,要用事务,需要EnableTransactionManagement注解加Transactional注解。而在SpringBoot项目,有事务的自动配置类TransactionAutoConfiguration,代码如下: 可以在其内部类EnableTransactionManagementConfiguratio…...
HackNos 3靶场
配置 进入控制面板配置网卡 第一步:启动靶机时按下 shift 键, 进入以下界面 第二步:选择第二个选项,然后按下 e 键,进入编辑界面 将这里的ro修改为rw single init/bin/bash,然后按ctrlx,进入…...
【办公自动化】使用Python批量生成PPT版荣誉证书
🤵♂️ 个人主页:艾派森的个人主页 ✍🏻作者简介:Python学习者 🐋 希望大家多多支持,我们一起进步!😄 如果文章对你有帮助的话, 欢迎评论 💬点赞Ǵ…...
【C++深入浅出】初识C++中篇(引用、内联函数)
目录 一. 前言 二. 引用 2.1 引用的概念 2.2 引用的使用 2.3 引用的特性 2.4 常引用 2.5 引用的使用场景 2.6 传值、传引用效率比较 2.7 引用和指针的区别 三. 内联函数 3.1 内联函数的概念 3.2 内联函数的特性 一. 前言 上期说道,C是在C的基础之上&…...
前端:VUE2中的父子传值
文章目录 一、背景什么是父子传值二、业务场景子传父1、在父页面中引入子页面2、子传父:父组件标识3、子传父:子组件标识 父传子父组件调用子组件中的方法 总结: 一、背景 最近做项目中需要使用到流工作,在这里流工作需要用到父子…...
【100天精通python】Day40:GUI界面编程_PyQt 从入门到实战(完)_网络编程与打包发布
目录 8 网络编程 8.1 使用PyQt 网络模块进行网络通信 服务器端示例 客户端示例 8.2 处理网络请求和响应 9 打包和发布 9.1 创建可执行文件或安装程序 9.2 解决依赖问题 9.3 发布 PyQt 应用到不同平台 9.3.1 发布到 Windows 9.3.2 发布到 macOS 9.3.3 发布到 Linux 9…...
利用AI写教材,掌握低查重方法,让你的教材脱颖而出!
许多教材编写者常常会有一种失落感:在花费大量心血完成了主体内容后,配套资源的不足却影响了整体的教学效果。针对课后练习的题型设计,常常缺乏创新的思路;想要制作直观的教学课件,却没有相应的技术能力;对…...
H5扫码功能实战:如何在微信和原生浏览器中实现二维码解析(附完整代码)
H5扫码功能实战:如何在微信和原生浏览器中实现二维码解析 移动互联网时代,二维码已成为连接线上线下最重要的入口之一。作为前端开发者,我们经常需要在H5页面中实现扫码功能,但不同环境下的兼容性问题往往让人头疼。本文将深入探讨…...
400字节的前端奇迹:TinyEditor如何重新定义微型代码编辑体验
400字节的前端奇迹:TinyEditor如何重新定义微型代码编辑体验 【免费下载链接】TinyEditor A functional HTML/CSS/JS editor in less than 400 bytes 项目地址: https://gitcode.com/gh_mirrors/ti/TinyEditor 在前端开发的世界里,我们常常被功能…...
手把手教你排查PCIe设备异常:从`Malformed TLP`错误看MPS/MRRS配置
深度解析PCIe设备异常:从Malformed TLP错误到MPS/MRRS调优实战 当你在嵌入式Linux系统中接入一块高性能FPGA加速卡时,突然在系统日志中发现Malformed TLP错误,设备性能骤降甚至完全无法工作——这种场景对任何嵌入式开发者都不陌生。PCIe总线…...
AMP实战:对抗运动先验在物理驱动角色控制中的风格化应用
1. AMP框架如何革新角色动作控制 想象一下你在玩一款开放世界游戏,主角需要从悬崖边缘精准跳到对面平台。传统动画系统可能会直接播放预设的跳跃动画,但物理引擎计算发现距离不够时,就会出现角色悬空滑行的诡异画面。这正是AMP(Ad…...
解锁高效无水印备份:抖音视频批量下载的完整指南
解锁高效无水印备份:抖音视频批量下载的完整指南 【免费下载链接】douyin-downloader 项目地址: https://gitcode.com/GitHub_Trending/do/douyin-downloader 直面内容管理痛点:三个真实用户的困境 场景一:学习资源的系统性流失 教…...
基于ROS的语音控制机器人(一):从零搭建多模态交互系统
1. 从零搭建ROS语音控制机器人的核心思路 第一次接触ROS机器人开发时,我被其分布式架构深深吸引。想象一下:你对着电脑说"前进",树莓派就能驱动小车移动;喊"打开摄像头",机器人立即开启视觉识别—…...
可视化拖拽组件库终极指南:响应式设计与适配方案完整解析
可视化拖拽组件库终极指南:响应式设计与适配方案完整解析 【免费下载链接】visual-drag-demo 一个低代码(可视化拖拽)教学项目 项目地址: https://gitcode.com/gh_mirrors/vi/visual-drag-demo 可视化拖拽组件库是现代低代码开发平台的…...
3步告别卡顿:用鸣潮工具箱实现流畅游戏体验
3步告别卡顿:用鸣潮工具箱实现流畅游戏体验 【免费下载链接】WaveTools 🧰鸣潮工具箱 项目地址: https://gitcode.com/gh_mirrors/wa/WaveTools 你的游戏还在卡顿吗?试试这个免费解决方案 你是否曾经在《鸣潮》的激烈战斗中遭遇突然的…...
Kubernetes 与 GitOps 最佳实践
Kubernetes 与 GitOps 最佳实践 一、前言 哥们,别整那些花里胡哨的。GitOps 是现代 Kubernetes 运维的重要趋势,今天直接上硬货,教你如何在 Kubernetes 中实现 GitOps 工作流。 二、GitOps 核心概念 概念描述优势声明式配置所有配置以声明式方…...
