当前位置: 首页 > news >正文

RabbitMq-发布确认高级(避坑指南版)

在初学rabbitMq的时候,伙伴们肯定已经接触到了“发布确认”的概念,但是到了后期学习中,会接触到“springboot”中使用“发布确认”高级的概念。后者主要是解决什么问题呢?或者是什么样的场景引出这样的概念呢?

在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 rabbitmq 重启期间生产者投递失败,导致消息丢失,需要手动处理和恢复。因此为了确保rabbitmq 的消息可靠投递,特别是在这样比较极端的情况,rabbitmq 集群不可用的时候,对无法投递的消息进行处理。

废话不说直接开始撸代码!!!在代码中解决实际问题~

一、代码架构分析:

        接触到这里,对于一条完整的“rabbitmq消息”发布链的构成大家已经不陌生了。主要是由:“消息生产者”、“交换机”、“队列”、“消费者”四个方面构成,如图所示:

二、构造“配置类”代码: 

声明交换机“confirm_exchange”、声明队列“confirm_queue”、通过routing-key对交换机和队列进行绑定。

package com.example.rabbitmq_demo.fabuquerengaoji;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @ClaseName: ConfirmConfig$* @Description:配置类 发布确认(高级)* @Author: wuhs* @Date: 2023/8/16$ 14:32$* 快捷键ctrl+shift+u  字母大小写转化*/
@Configuration
public class ConfirmConfig {// 交换机public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";// 队列public static final String CONFIRM_QUEUE_NAME = "confirm_queue";// ROUTING-KEYpublic static final String CONFIRM_ROUTING_KEY = "key1";@Beanpublic DirectExchange confirmExchange() {return new DirectExchange(CONFIRM_EXCHANGE_NAME);}@Beanpublic Queue confirmQueue() {return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}@Beanpublic Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange directExchange,@Qualifier("confirmQueue") Queue queue) {//一般使用在项目中使用@Qualifier来限定注入的Bean。return BindingBuilder.bind(queue).to(directExchange).with(CONFIRM_ROUTING_KEY);}
}

三、构建消费者代码:

通过@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)来监听队列,以此“充当”消费者。这一块也没啥好说的,直接上代码!

package com.example.rabbitmq_demo.fabuquerengaoji;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @ClaseName: Consumer$* @Description:消费者* @Author: wuhs* @Date: 2023/8/16$ 15:18$】*/
@Slf4j
@Component
public class Consumer {@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)public void reciverConfirmMessage(Message message) {String msg = new String(message.getBody());log.info("接收到的队列confirm.queue消息:{}", msg);}
}

四、创建“回调”方法

        在最开始我们说到“确保rabbitmq 的消息可靠投递”的概念,那么具体如何确保呢?如果我们在消费者每次消费成功、未消费成功交换机都能进行“回调”确认,是不是就能知道哪些消息消费成功、哪些没有消费成功呢?

        在RabbitTemplate中有一个方法接口(ConfirmCallback),我们只需要实现这个接口并实现“confirm”方法,并将它注入进RabbitTemplate工具中即可创建“回调”。具体代码如下:

package com.example.rabbitmq_demo.fabuquerengaoji;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/*** @ClaseName: MyCallBack$* @Description:* @Author: wuhs* @Date: 2023/8/16$ 16:17$*/
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {// 注入rabbitTemplate.setConfirmCallback(this);}//交换机确认回调方法  @Overridepublic void confirm(CorrelationData correlationData, boolean ack, String reason) {String id = correlationData != null ? correlationData.getId() : "";if (ack) {//发消息 交换机接收到了消息 回调log.info("交换机已经收到了ID为:{}的消息", id);} else {//发消息 交换机没有接收到了消息 回调log.info("交换机没有收到了ID为:{}发的的消息,失败的原因是:{}", id, reason);}}
}

confirm方法参数介绍:

* 1. correlationData 保存回调消息的ID及相关信息
* 2. 交换机是否收到了消息 ack=true(收到)、ack=false(未收到)
* 3. reason 失败的原因

 六、配置类声明:(application.yml)

        在这里需要注意!!这也是最容易踩得坑,不知道有没有小伙伴遇没遇到,“publisher-confirm-type: correlated”也声明了,但是项目创建启动发布消息之后“没有成功回调”的情况,查看了很多的文章,很多博主只配置了publisher-confirm-type、但是并没有开启“confirm 确认机制”,所以会存在“误导”,导致一直找不到失败的原因~具体正确配置,看代码:

server:port: 8899spring:rabbitmq:host: 124.221.94.214port: 5672username: xgsmpassword: xgsm123# 发送者开启 confirm 确认机制publisher-confirms: truepublisher-confirm-type: correlated

 publisher-confirm-type参数介绍:

publisher-confirm-type这个参数一共有三种配置方法:

# NONE:禁用发布确认,是默认值。

# CORRELATED:发布消息后,交换机会触发回调方法。

# SIMPLE:有两种效果:

1:和CORRELATED一样会触发回调方法

2:发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,

# 要注意的点是waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker。

 七、创建Controller层(消息生产者)

        这里演示三种情况。第一种为正常情况下,发送成功后的回调;第二种消息为发送失败、当交换机不存在则发送失败(模拟发送失败),所以将交换机名称修改即可

package com.example.rabbitmq_demo.fabuquerengaoji;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @ClaseName: ProducerController$* @Description:消息生产者* @Author: wuhs* @Date: 2023/8/16$ 14:58$*/
@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;// @PathVariable主要作用:映射URL绑定的占位符@RequestMapping("/sendMessage/{message}")public void sendMessage(@PathVariable String message) {//正常发送CorrelationData correlationData = new CorrelationData("1");rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData);//发送失败-交换机不存在的情况CorrelationData correlationData2 = new CorrelationData("2");rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME+"2", ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData2);log.info("发送的消息为:{}", message);}
}

测试结果: 

 如果是routing-key错误,这种情况会触发回调嘛?让我们验证一下;修改routing-key为“错误值”

  CorrelationData correlationData3 = new CorrelationData("3");rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY+"key3", message,correlationData3);

测试结果:

         通过结果可以看出,消息发送成功了,而且也触发了“成功的回调”。但是我们知道的是,由于路由失败,这里消费者并没有对消息进行消费,这是为什么呢?那是因为,在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。解决方式为:通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。具体操作如下

1、application.yml文件中添加消息回退配置

# 发送者开启 return 确认机制publisher-returns: true

 2、实现RabbitTemplate中的方法接口ReturnCallback,并实现“returnedMessage”方法,最后将类注入到RabbitTemplate的RabbitTemplate中,详细代码如下:

package com.example.rabbitmq_demo.fabuquerengaoji;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/*** @ClaseName: MyCallBack$* @Description:* @Author: wuhs* @Date: 2023/8/16$ 16:17$*/
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {// 注入rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}/*** 交换机确认回调方法* 1、发消息 交换机接收到了消息 回调* 1.1 correlationData 保存回调消息的ID及相关信息* 1.2 交换机收到消息 ack=true* 2、发消息 交换机接收失败了 回调* 2.1 correlationData 保存回调消息的ID及相关信息* 2.2 交换机接收到消息 ack=false* 2.3 reason 失败的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String reason) {String id = correlationData != null ? correlationData.getId() : "";if (ack) {log.info("交换机已经收到了ID为:{}的消息", id);} else {log.info("交换机没有收到了ID为:{}发的的消息,失败的原因是:{}", id, reason);}}//可以在当消息传递的过长中不可达目的地时将消息返回给生产者// 只有不可待目的地的时候 才进行回退@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("消息{},被交换机{}退回,退回原因:{},路由key:{}", message, exchange, replyText, routingKey);}
}

测试结果:

2023-08-17 10:36:32.476  INFO 21108 --- [221.94.214:5672] c.e.r.fabuquerengaoji.MyCallBack : 消息(Body:'消息确认发布测试' MessageProperties [headers={spring_returned_message_correlation=3}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),被交换机confirm_exchange退回,退回原因:NO_ROUTE,路由key:key1key3

问题解决!~ 

相关文章:

RabbitMq-发布确认高级(避坑指南版)

在初学rabbitMq的时候,伙伴们肯定已经接触到了“发布确认”的概念,但是到了后期学习中,会接触到“springboot”中使用“发布确认”高级的概念。后者主要是解决什么问题呢?或者是什么样的场景引出这样的概念呢? 在生产环…...

Blender增强现实3D模型制作指南【AR】

推荐:用 NSDT编辑器 快速搭建可编程3D场景 将静态和动画 3D 内容集成到移动增强现实 (AR) 体验中是增强用户沉浸感和参与度的高效方法。 然而,为 AR 创建 3D 对象可能相当艰巨,尤其是对于那些缺乏 3D 建模经验的人来说。 与添加视频或照片 AR…...

Java查看https证书过期时间(JKS,CERT)

在这里需要使用X.509 证书的抽象类 X509Certificate 。此类提供了一种访问 X.509 证书所有属性的标准方式。 这些证书被广泛使用以支持 Internet 安全系统中的身份验证和其他功能。常见的应用包括增强保密邮件 (PEM)、传输层安全 (SSL)、用于受信任软件发布的代码签名和安全电…...

关于vue,记录一次修饰符.stop和.once的使用,以及猜想。

内置指令 | Vue.js 在vue的api里&#xff0c;关于v-on有stop和once两个事件标签。 .stop - 调用 event.stopPropagation()。.once - 最多触发一次处理函数。 原有主要代码和页面效果 &#xff08;无stop和once&#xff09;: ...<div class"div" click"di…...

解决git reset --soft HEAD^撤销commit时报错

今天在使用git回退功能的时候&#xff0c;遇到以下错误&#xff1a; 解决git reset --soft HEAD^撤销commit时报错 问题&#xff1a; 在进行完commit后&#xff0c;想要撤销该commit&#xff0c;于是使用了git reset --soft HEAD^命令&#xff0c;但是出现如下报错&#xff1…...

【BASH】回顾与知识点梳理(三十四)

【BASH】回顾与知识点梳理 三十四 三十四. 认识系统服务&#xff08;二&#xff09;34.1 systemctl 针对 service 类型的配置文件systemctl 配置文件相关目录简介systemctl 配置文件的设定项目简介[Unit] 部份[Service] 部份[Install] 部份 两个 vsftpd 运作的实例多重的重复设…...

Python可视化在量化交易中的应用(11)_Seaborn折线图

举个栗子&#xff0c;用seaborn绘制折线图。 Seaborn中折线图的绘制方法 在seaborn中&#xff0c;我们一般使用sns作为seaborn模块的别名&#xff0c;因此&#xff0c;在下文中&#xff0c;均以sns指代seaborn模块。 seaborn中绘制折线图使用的是sns.plot()函数&#xff1a; …...

无涯教程-TensorFlow - TensorBoard可视化

TensorFlow包含一个可视化工具&#xff0c;称为TensorBoard&#xff0c;它用于分析数据流图&#xff0c;还用于了解机器学习模型。 TensorBoard的重要功能包括查看有关垂直对齐的任何图形的参数和详细信息的不同类型统计的视图。 深度神经网络包括多达36&#xff0c;000个节点…...

[uni-app] uview封装Popup组件,处理props及v-model的传值问题

文章目录 需求及效果遇到的问题解决的办法偷懒的写法 需求及效果 uView(1.x版本)中, 有Pop弹出层的组件, 现在有个需求是,进行简单封装,有些通用的设置不想每次都写(比如 :mask-custom-style"{background: rgba(0, 0, 0, 0.7)}"这种) 然后内部内容交给插槽去自己随…...

【C++】int a;和int *p=new int;有什么区别?

2023年8月19日&#xff0c;周六早上 int a; 和 int *p new int; 之间有以下区别&#xff1a; 1. 内存分配方式&#xff1a;int a; 是在栈上分配内存&#xff0c;而 int *p new int; 是在堆上动态分配内存。 2. 生命周期&#xff1a;int a; 的生命周期与其所在的作用域相同&…...

redis事务管理

目录 一、redis事务定义 二、事务控制命令——Multi、Exec、discard 三、事务的错误处理 四、事务的冲突问题 悲观锁 乐观锁 WATCH unwatch 五、事务特性 单独的隔离操作 没有隔离级别的概念 不保证原子性 一、redis事务定义 Redis 事务是一个单独的隔离操作&…...

TPS_C++版本及功能支持备注

TPS_C版本及功能支持备注 相关参考链接C23&#xff1a;https://zh.cppreference.com/w/cpp/23 相关参考链接C20&#xff1a;https://zh.cppreference.com/w/cpp/20 相关参考链接C17&#xff1a;https://zh.cppreference.com/w/cpp/17 相关参考链接C14&#xff1a;https://zh.cp…...

同步jenkinsfile流水线(sync-job)

环境 变量&#xff1a;env&#xff08;环境变量&#xff1a;sit/dev/simulation/prod/all&#xff09;&#xff0c;job&#xff08;job-name/all&#xff09;目录&#xff1a;/var/lib/jenkins/jenkinsfile environment.json&#xff1a; [roottest-01 jenkinsfile]# cat env…...

STM32单片机WIFI-APP智能温室大棚系统CO2土壤湿度空气温湿度补光

实践制作DIY- GC0161--智能温室大棚系统 基于STM32单片机设计---智能温室大棚系统 二、功能介绍&#xff1a; 电路组成&#xff1a;STM32F103CXT6最小系统LCD1602显示器DHT11空气温度湿度光敏电阻光强土壤湿度传感器SGP30二氧化碳传感器 1个继电器&#xff08;空气加湿&#x…...

SpringBoot复习:(52)不再需要使用@EnableTransactionManagement的原因

在Spring项目中&#xff0c;要用事务&#xff0c;需要EnableTransactionManagement注解加Transactional注解。而在SpringBoot项目&#xff0c;有事务的自动配置类TransactionAutoConfiguration,代码如下&#xff1a; 可以在其内部类EnableTransactionManagementConfiguratio…...

HackNos 3靶场

配置 进入控制面板配置网卡 第一步&#xff1a;启动靶机时按下 shift 键&#xff0c; 进入以下界面 第二步&#xff1a;选择第二个选项&#xff0c;然后按下 e 键&#xff0c;进入编辑界面 将这里的ro修改为rw single init/bin/bash&#xff0c;然后按ctrlx&#xff0c;进入…...

【办公自动化】使用Python批量生成PPT版荣誉证书

&#x1f935;‍♂️ 个人主页&#xff1a;艾派森的个人主页 ✍&#x1f3fb;作者简介&#xff1a;Python学习者 &#x1f40b; 希望大家多多支持&#xff0c;我们一起进步&#xff01;&#x1f604; 如果文章对你有帮助的话&#xff0c; 欢迎评论 &#x1f4ac;点赞&#x1f4…...

【C++深入浅出】初识C++中篇(引用、内联函数)

目录 一. 前言 二. 引用 2.1 引用的概念 2.2 引用的使用 2.3 引用的特性 2.4 常引用 2.5 引用的使用场景 2.6 传值、传引用效率比较 2.7 引用和指针的区别 三. 内联函数 3.1 内联函数的概念 3.2 内联函数的特性 一. 前言 上期说道&#xff0c;C是在C的基础之上&…...

前端:VUE2中的父子传值

文章目录 一、背景什么是父子传值二、业务场景子传父1、在父页面中引入子页面2、子传父&#xff1a;父组件标识3、子传父&#xff1a;子组件标识 父传子父组件调用子组件中的方法 总结&#xff1a; 一、背景 最近做项目中需要使用到流工作&#xff0c;在这里流工作需要用到父子…...

【100天精通python】Day40:GUI界面编程_PyQt 从入门到实战(完)_网络编程与打包发布

目录 8 网络编程 8.1 使用PyQt 网络模块进行网络通信 服务器端示例 客户端示例 8.2 处理网络请求和响应 9 打包和发布 9.1 创建可执行文件或安装程序 9.2 解决依赖问题 9.3 发布 PyQt 应用到不同平台 9.3.1 发布到 Windows 9.3.2 发布到 macOS 9.3.3 发布到 Linux 9…...

后进先出(LIFO)详解

LIFO 是 Last In, First Out 的缩写&#xff0c;中文译为后进先出。这是一种数据结构的工作原则&#xff0c;类似于一摞盘子或一叠书本&#xff1a; 最后放进去的元素最先出来 -想象往筒状容器里放盘子&#xff1a; &#xff08;1&#xff09;你放进的最后一个盘子&#xff08…...

页面渲染流程与性能优化

页面渲染流程与性能优化详解&#xff08;完整版&#xff09; 一、现代浏览器渲染流程&#xff08;详细说明&#xff09; 1. 构建DOM树 浏览器接收到HTML文档后&#xff0c;会逐步解析并构建DOM&#xff08;Document Object Model&#xff09;树。具体过程如下&#xff1a; (…...

【Zephyr 系列 10】实战项目:打造一个蓝牙传感器终端 + 网关系统(完整架构与全栈实现)

🧠关键词:Zephyr、BLE、终端、网关、广播、连接、传感器、数据采集、低功耗、系统集成 📌目标读者:希望基于 Zephyr 构建 BLE 系统架构、实现终端与网关协作、具备产品交付能力的开发者 📊篇幅字数:约 5200 字 ✨ 项目总览 在物联网实际项目中,**“终端 + 网关”**是…...

【C++从零实现Json-Rpc框架】第六弹 —— 服务端模块划分

一、项目背景回顾 前五弹完成了Json-Rpc协议解析、请求处理、客户端调用等基础模块搭建。 本弹重点聚焦于服务端的模块划分与架构设计&#xff0c;提升代码结构的可维护性与扩展性。 二、服务端模块设计目标 高内聚低耦合&#xff1a;各模块职责清晰&#xff0c;便于独立开发…...

《C++ 模板》

目录 函数模板 类模板 非类型模板参数 模板特化 函数模板特化 类模板的特化 模板&#xff0c;就像一个模具&#xff0c;里面可以将不同类型的材料做成一个形状&#xff0c;其分为函数模板和类模板。 函数模板 函数模板可以简化函数重载的代码。格式&#xff1a;templa…...

打手机检测算法AI智能分析网关V4守护公共/工业/医疗等多场景安全应用

一、方案背景​ 在现代生产与生活场景中&#xff0c;如工厂高危作业区、医院手术室、公共场景等&#xff0c;人员违规打手机的行为潜藏着巨大风险。传统依靠人工巡查的监管方式&#xff0c;存在效率低、覆盖面不足、判断主观性强等问题&#xff0c;难以满足对人员打手机行为精…...

(一)单例模式

一、前言 单例模式属于六大创建型模式,即在软件设计过程中,主要关注创建对象的结果,并不关心创建对象的过程及细节。创建型设计模式将类对象的实例化过程进行抽象化接口设计,从而隐藏了类对象的实例是如何被创建的,封装了软件系统使用的具体对象类型。 六大创建型模式包括…...

Python 实现 Web 静态服务器(HTTP 协议)

目录 一、在本地启动 HTTP 服务器1. Windows 下安装 node.js1&#xff09;下载安装包2&#xff09;配置环境变量3&#xff09;安装镜像4&#xff09;node.js 的常用命令 2. 安装 http-server 服务3. 使用 http-server 开启服务1&#xff09;使用 http-server2&#xff09;详解 …...

关于easyexcel动态下拉选问题处理

前些日子突然碰到一个问题&#xff0c;说是客户的导入文件模版想支持部分导入内容的下拉选&#xff0c;于是我就找了easyexcel官网寻找解决方案&#xff0c;并没有找到合适的方案&#xff0c;没办法只能自己动手并分享出来&#xff0c;针对Java生成Excel下拉菜单时因选项过多导…...

消防一体化安全管控平台:构建消防“一张图”和APP统一管理

在城市的某个角落&#xff0c;一场突如其来的火灾打破了平静。熊熊烈火迅速蔓延&#xff0c;滚滚浓烟弥漫开来&#xff0c;周围群众的生命财产安全受到严重威胁。就在这千钧一发之际&#xff0c;消防救援队伍迅速行动&#xff0c;而豪越科技消防一体化安全管控平台构建的消防“…...