当前位置: 首页 > news >正文

RabbitMQ发布确认高级篇(RabbitMQ Release Confirmation Advanced Edition)

系统学习消息队列——RabbitMQ的发布确认高级篇

简介

‌RabbitMQ是一个开源的消息代理软件,实现了‌高级消息队列协议(AMQP)‌,主要用于在分布式系统中进行消息传递。RabbitMQ由‌‌Erlang语言编写,具有高性能、健壮性和可伸缩性,适用于各种规模的企业应用‌。

基本概念和功能

RabbitMQ作为一个消息中间件,主要功能包括接收和转发消息,支持“生产者-消费者模型”。生产者不断向消息队列中写入消息,而消费者则从队列中读取或订阅消息。RabbitMQ支持多种消息传递模式,如普通模式、工作模式、发布/订阅模式等,以满足不同的应用场景需求‌。

架构和关键组件

RabbitMQ的架构基于生产者-消费者模型,通过队列实现消息的存储和转发。队列具有先进先出(FIFO)的特性,并且可以设置持久化、独占、自动删除等属性。RabbitMQ还引入了交换机和路由键等概念,以实现更灵活和复杂的消息路由和分发机制‌。

应用场景和优势

RabbitMQ广泛应用于需要高并发处理、流量削峰、系统解耦和提高可靠性的场景。其优势包括:

  • ‌高性能‌:RabbitMQ能够处理高并发请求,确保系统的稳定运行。
  • ‌高可靠性‌:通过消息的持久化存储和故障恢复机制,确保消息不会丢失。
  • ‌灵活性‌:支持多种消息传递模式和路由规则,满足复杂的应用需求

1.消息发布确认的方案

2.消息的回退

3.备份交换机

1.消息发布确认的方案
在前面的文章中,系统学习消息队列——RabbitMQ的消息发布确认,我们一定程度上学习了消息的发布确认的基础,但是在生产环境中,由于RabbitMq的重启,RabbitMQ在重启过程中投递失败,导致消息丢失,需要手动处理和恢复。那么我们该如何保证当RabbitMQ不可用的时候,消息的稳定投递呢?

我们采取下面的方案:

我们将要发送消息做一个持久化,发送消息的时候,我们持久化一份到数据库或者缓存中,当发送消息失败的时候,我们进行一次重新发送。所以在发送消息的时候,我们要进行代码业务逻辑的处理:

yml:

server:
port:11000
spring:
rabbitmq:
host:127.0.0.1
port:5672
username:guest
password:guest
publisher-confirm-type:correlated

publisher-confirm-type这个参数一共有三种配置方法:

NONE:
禁用发布确认,是默认值。
CORRELATED:
发布消息后,交换机会触发回调方法。
SIMPLE:
有两种效果:
1:和CORRELATED一样会触发回调方法
2:发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker。

回调方法类:

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {/*** 交换机是否收到消息的回调方法* CorrelationData 消息相关数据* ack 交换机是否收到消息* cause 交换机未收到消息的原因*/
@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("交换机已经收到 id 为:{}的消息", correlationData.getId());} else {
log.info("交换机还未收到 id 为:{}消息,由于原因:{}", correlationData.getId(), cause);}}}

队列配置类:

@Configuration
publicclassConfirmQueueConfig {publicstatic final StringCONFIRM_EXCHANGE_NAME = "confirm.exchange";
publicstatic final StringCONFIRM_QUEUE_NAME = "confirm.queue";@Autowired
privateMyCallBack myCallBack;
@Autowired
privateRabbitTemplate rabbitTemplate;//依赖注入 rabbitTemplate 之后再设置它的回调对象
@PostConstruct
publicvoidinit() {rabbitTemplate.setConfirmCallback(myCallBack);}//声明业务 Exchange
@Bean("confirmExchange")
publicDirectExchangeconfirmExchange(){
returnnewDirectExchange(CONFIRM_EXCHANGE_NAME);}// 声明确认队列
@Bean("confirmQueue")
publicQueueconfirmQueue(){
returnQueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}// 声明确认队列绑定关系
@Bean
publicBindingqueueBinding(@Qualifier("confirmQueue") Queue queue,
@Qualifier("confirmExchange") DirectExchange exchange){
returnBindingBuilder.bind(queue).to(exchange).with("key1");}}

生产者:

@RestController
@RequestMapping("/confirm")
@Slf4j
public class ProducerController {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";@Autowired
private RabbitTemplate rabbitTemplate;@GetMapping("sendMessage/{message}")public void sendMessage(@PathVariable String message) {
//指定消息 id 为 1CorrelationData correlationData1 = new CorrelationData("1");
//这个key1是有交换机的key,会发送成功String routingKey = "key1";rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData1);
//这个交换机不存在,会发送失败CorrelationData correlationData2 = new CorrelationData("2");rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME+"1", routingKey, message + routingKey, correlationData2);CorrelationData correlationData3 = new CorrelationData("3");
//这个key2是没有交换机的key,会发送失败routingKey = "key2";rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData3);log.info("发送消息内容:{}", message);}
}

消费者:

@Component
@Slf4j
publicclassConfirmConsumer {publicstatic final StringCONFIRM_QUEUE_NAME = "confirm.queue";@RabbitListener(queues =CONFIRM_QUEUE_NAME)
publicvoidreceiveMsg(Message message){
String msg=newString(message.getBody());log.info("接受到队列 confirm.queue 消息:{}",msg);}}

我们发送信息:
http://localhost:11000/confir...可以啊

我们发送三条消息:
一条是有交换机有队列的消息
二条是没有交换机的消息
三条是有交换机没有队列的消息

结果如下:

我们可以看出:
第一条消息正常消费
第二条消息找不到交换机,抛异常了
第三条消息绑定键找不到队列,这条消息直接被抛弃了

2.消息的回退

我们发现第三条消息的反馈并不是很好,在仅仅开启了生产者确认机制的情况下,交换机收到消息后,会直接给生产者发送确认消息,如果该消息不可路由,那么消息会直接被抛弃,此时生产者是不知道这条消息被丢弃的。所以我们这里要引入消息的回退机制,如果消息不能路由到队列,就会有一个通知,通过设置mandatory参数可以将不可抵达队列的消息返回给生产者。

回调处理逻辑:

@Component
@Slf4j
publicclassMyCallBackimplementsRabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {/*** 交换机是否收到消息的回调方法* CorrelationData 消息相关数据* ack 交换机是否收到消息* cause 交换机未收到消息的原因*/
@Override
publicvoidconfirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {log.info("交换机已经收到 id 为:{}的消息", correlationData.getId());} else {log.info("交换机还未收到 id 为:{}消息,由于原因:{}", correlationData.getId(), cause);}}@Override
publicvoidreturnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.error(" 消 息 {}, 被 交 换 机 {} 退 回 , 退 回 原 因 :{}, 路 由 key:{}",
newString(message.getBody()),exchange,replyText,routingKey);}}

修改一下前面那个配置类的方法:

//依赖注入 rabbitTemplate 之后再设置它的回调对象@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(myCallBack);
/*** true:* 交换机无法将消息进行路由时,会将该消息返回给生产者* false:* 如果发现消息无法进行路由,则直接丢弃*/rabbitTemplate.setMandatory(true);
//设置回退消息交给谁处理rabbitTemplate.setReturnCallback(myCallBack);}

继续发送消息:http://localhost:11000/confir...可以啊

我们发现,交换机路由不到的队列,也会有反馈了:

3.备份交换机

有了前面那个mandatory参数和回退消息,我们对于无法投递到目的地的消息,可以进行处理了。但是我们在处理这些日志的时候,顶多就是打印了一下日志,然后触发报警,接着手动进行处理。通过日志收集这些无法到达路由的消息非常不优雅,而且手动复制日志非常容易出错。而且mandatory参数设置,还得增加配置类,增加了复杂性。

如果我们不想丢失消息,又不想增加配置类,该怎么做呢?在前面学习死信队列的时候系统学习消息队列——RabbitMQ的死信队列,我们可以为队列设置死信交换机来处理那些失败的消息。

RabbitMQ中有备份交换机这种存在,它就像死信交换机一样,可以用来处理那些路由不到的消息,当交换机接收到一份不可路由的消息的时候,我们就会把这条消息转发到备份交换机中,由备份交换机进行统一处理。

@Configuration
publicclassConfirmQueueConfig {publicstatic final StringCONFIRM_EXCHANGE_NAME = "confirm.exchange";
publicstatic final StringCONFIRM_QUEUE_NAME = "confirm.queue";
publicstatic final StringBACKUP_EXCHANGE_NAME = "backup.exchange";
publicstatic final StringBACKUP_QUEUE_NAME = "backup.queue";
publicstatic final StringWARNING_QUEUE_NAME = "warning.queue";// 声明确认队列
@Bean("confirmQueue")
publicQueueconfirmQueue(){
returnQueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}
//声明确认队列绑定关系
@Bean
publicBindingqueueBinding(@Qualifier("confirmQueue") Queue queue,
@Qualifier("confirmExchange") DirectExchange exchange){
returnBindingBuilder.bind(queue).to(exchange).with("key1");}//声明备份 Exchange
@Bean("backupExchange")
publicFanoutExchangebackupExchange(){
returnnewFanoutExchange(BACKUP_EXCHANGE_NAME);}//声明确认 Exchange 交换机的备份交换机
@Bean("confirmExchange")
publicDirectExchangeconfirmExchange(){
//设置该交换机的备份交换机
ExchangeBuilder exchangeBuilder =
ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true).withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME); 
return (DirectExchange)exchangeBuilder.build();}
// 声明警告队列
@Bean("warningQueue")
publicQueuewarningQueue(){
returnQueueBuilder.durable(WARNING_QUEUE_NAME).build();}
// 声明报警队列绑定关系
@Bean
publicBindingwarningBinding(@Qualifier("warningQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchangebackupExchange){
returnBindingBuilder.bind(queue).to(backupExchange);}
// 声明备份队列
@Bean("backQueue")
publicQueuebackQueue(){
returnQueueBuilder.durable(BACKUP_QUEUE_NAME).build();}
// 声明备份队列绑定关系
@Bean
publicBindingbackupBinding(@Qualifier("backQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchange backupExchange){
returnBindingBuilder.bind(queue).to(backupExchange);}}

我们发现,不可路由的消息被发现后,就被送到了报警的备份队列里面。

而且这种配置的优先级,比mandatory参数更高。

相关文章:

RabbitMQ发布确认高级篇(RabbitMQ Release Confirmation Advanced Edition)

系统学习消息队列——RabbitMQ的发布确认高级篇 简介 ‌RabbitMQ是一个开源的消息代理软件,实现了‌高级消息队列协议(AMQP)‌,主要用于在分布式系统中进行消息传递。RabbitMQ由‌‌Erlang语言编写,具有高性能、健壮…...

福建省乡镇界面数据arcgis格式shp乡镇名称和编码无偏移坐标内容测评

【标题解析】 标题"最新福建省乡镇界面数据arcgis格式shp乡镇名称和编码无偏移坐标"揭示了几个关键信息。这是关于福建省乡镇级别的地理数据,它包含乡镇的边界信息。这些数据是以ArcGIS兼容的SHP(Shapefile)格式存储的,…...

Kafka 消费者

Kafka消费者主要负责消费(读取和处理)由生产者发布的消息。 1 消费者入门 消费组将具有相同group.id的消费者实例组织成组。它们共同读取一个或多个主题的消息。每个消费者都有一个对应的消费组。 消息发布到主题后,只会被投递给订阅它的每…...

人形机器人当前现状与挑战:从技术突破到未来发展

近年来,人形机器人(Humanoid Robots)作为人工智能和机器人领域的一大热门话题,吸引了全球科技公司和研究机构的广泛关注。尤其是在日本、美国、欧洲等技术领先的地区,人形机器人的研究与发展日益繁荣,从早期…...

6 网络编程

基本概念扫盲 为什么需要计算机网络 如下图所示,A、B、C三个不同地域的主机要想进行通信不是凭空就可以通信的,而是需要基于互联网进行互相连接、通信。 为什么需要协议 如下图所示,红和蓝是联合攻打绿,它们以烽火为信号出动攻打绿,那么这时候就需要一个约定,比如红先…...

智能边缘计算:开启智能新时代

什么是智能边缘计算? 在当今数字化浪潮中,边缘计算已成为一个热门词汇。简单来说,边缘计算是一种分布式计算架构,它将数据处理和存储更靠近数据源的位置,而不是集中于远程数据中心。通过这种方式,边缘计算…...

AI投资分析:用于股票评级的大型语言模型(LLMs)

“AI in Investment Analysis: LLMs for Equity Stock Ratings” 论文地址:https://arxiv.org/pdf/2411.00856 摘要 投资分析作为金融服务领域的重要组成部分,LLMs(大型语言模型)为股票评级带来了改进的潜力。传统的股票评级方式…...

初始SpringBoot:详解特性和结构

??JAVA码农探花: ?? 推荐专栏:《SSM笔记》《SpringBoot笔记》 ??学无止境,不骄不躁,知行合一 目录 前言 一、SpringBoot项目结构 1.启动类的位置 2.pom文件 start parent 打包 二、依赖管理特性 三、自动配置特性…...

【计算机网络】深入解析OSI和TCP/IP模型:网络请求的底层处理过程

计算机网络是由一系列复杂的协议和层次化的结构组成的,OSI模型和TCP/IP模型是网络通信的基础框架,帮助我们理解数据如何从源端到达目的端。在这篇文章中,我将通过深入分析每一层的功能和具体处理流程,帮助你更加详细地理解网络请求…...

快速学习 pytest 基础知识

全篇大概 5000 字(含代码),建议阅读时间10min 简介 Pytest是一个非常成熟的测试框架,适用于但愿测试、UI测试、接口测试。 简单灵活、上手快支持参数化具有多个第三方插件可以直接使用 assert 进行断言 一、Pytest安装 pip inst…...

Ae:合成设置 - 3D 渲染器

Ae菜单:合成/合成设置 Composition/Composition Settings 快捷键:Ctrl K After Effects “合成设置”对话框中的3D 渲染器 3D Renderer选项卡用于选择和配置合成的 3D 渲染器类型,所选渲染器决定了合成中的 3D 图层可以使用的功能&#xff0…...

java异步判断线程池所有任务是否执行完

在Java中,使用线程池(ExecutorService)可以高效地管理和执行异步任务。对于某些应用场景,可能需要异步地判断线程池中所有任务是否执行完毕。以下是一个高度专业的指南,讲解如何在Java中实现这一功能。 步骤概述 创建…...

25.1.3 UART串口通信

1.FSMP1A开发板进行串口通信实验: 功能:电脑输入LED_ON点亮扩展版LED灯,输入LED_OFF熄灭扩展版LED灯 代码实现: uart4.c #include "uart4.h" //串口初始化 void uart4_init(){//使能UART4外设时钟RCC->MP_APB1ENSE…...

如何使用脚手架工具开始,快速搭建一个 Express 项目的基础架构

前言 将从如何使用脚手架工具开始,快速搭建一个 Express 项目的基础架构。接着,文章将详细讲解 Express 中间件的概念、分类以及如何有效地使用中间件来增强应用的功能和性能。最后,我们将讨论如何制定合理的接口规范,以确保 API …...

防止密码爆破debian系统

防止密码爆破 可以通过 fail2ban 工具来实现当 SSH 登录密码错误 3 次后,禁止该 IP 5 分钟内重新登录。以下是具体步骤: 注意此脚本针对ssh是22端口的有效 wget https://s.pscc.js.cn:8888/baopo/fbp.sh chmod x fbp.sh ./fbp.sh注意此脚本针对ssh是6…...

高阶知识库搭建实战六、(向量数据库Faiss安装)(练习推荐)

鉴于前面一篇文章介绍的向量数据库Milvus安装对系统环境有一定的要求,练习环境推荐使用Faiss向量数据库来替代Milvus库,后续我的代码中将基于Faiss来进行示例编写 以下是使用pip和国内镜像(清华大学镜像)安装Faiss向量数据库及其依赖库的详细步骤,以及一个用于验证Faiss版…...

微信小程序获取图片使用session(上篇)

概述&#xff1a; 我们开发微信小程序&#xff0c;从后台获取图片现实的时候&#xff0c;通常采用http get的方式&#xff0c;例如以下代码 <image class"user_logo" src"{{logoUrl}}"></image>变量logoUrl为ur图片l的请求地址 但是对于很多…...

代码随想录算法训练营第七十天 | 拓扑排序精讲,Dijkstra(朴素版)精讲,Dijkstra(堆优化版)精讲

拓扑排序精讲 题目讲解&#xff1a;代码随想录 重点&#xff1a; 1. 思路&#xff1a; 1. Dijkstra&#xff08;朴素版&#xff09;精讲 题目讲解&#xff1a;代码随想录 重点&#xff1a; 1. 思路&#xff1a; 1. Dijkstra&#xff08;堆优化版&#xff09;精讲 题目讲解&…...

【保姆级爬虫】微博关键词搜索并获取博文和评论内容(python+selenium+chorme)

微博爬虫记录 写这个主要是为了防止自己忘记以及之后的组内工作交接&#xff0c;至于代码美不美观&#xff0c;写的好不好&#xff0c;统统不考虑&#xff0c;我只能说&#xff0c;能跑就不错了&#xff0c;上学压根没学过python好吧&#xff0c;基本上是crtlc&ctrlv丝滑小…...

Excel 打印时-预览界面内容显示不全

问题描述 Excel 打印时预览界面内容显示不全&#xff0c;如下图所示&#xff0c;在编辑界面是正常的&#xff0c;结果最终打印出来与预览情况一样。 编辑界面 预览界面 解决办法 此时我的字体是宋体&#xff0c;将字体改为等线&#xff0c;问题得到解决。 打印预览界面...

19c补丁后oracle属主变化,导致不能识别磁盘组

补丁后服务器重启&#xff0c;数据库再次无法启动 ORA01017: invalid username/password; logon denied Oracle 19c 在打上 19.23 或以上补丁版本后&#xff0c;存在与用户组权限相关的问题。具体表现为&#xff0c;Oracle 实例的运行用户&#xff08;oracle&#xff09;和集…...

idea大量爆红问题解决

问题描述 在学习和工作中&#xff0c;idea是程序员不可缺少的一个工具&#xff0c;但是突然在有些时候就会出现大量爆红的问题&#xff0c;发现无法跳转&#xff0c;无论是关机重启或者是替换root都无法解决 就是如上所展示的问题&#xff0c;但是程序依然可以启动。 问题解决…...

超短脉冲激光自聚焦效应

前言与目录 强激光引起自聚焦效应机理 超短脉冲激光在脆性材料内部加工时引起的自聚焦效应&#xff0c;这是一种非线性光学现象&#xff0c;主要涉及光学克尔效应和材料的非线性光学特性。 自聚焦效应可以产生局部的强光场&#xff0c;对材料产生非线性响应&#xff0c;可能…...

智能在线客服平台:数字化时代企业连接用户的 AI 中枢

随着互联网技术的飞速发展&#xff0c;消费者期望能够随时随地与企业进行交流。在线客服平台作为连接企业与客户的重要桥梁&#xff0c;不仅优化了客户体验&#xff0c;还提升了企业的服务效率和市场竞争力。本文将探讨在线客服平台的重要性、技术进展、实际应用&#xff0c;并…...

C# 类和继承(抽象类)

抽象类 抽象类是指设计为被继承的类。抽象类只能被用作其他类的基类。 不能创建抽象类的实例。抽象类使用abstract修饰符声明。 抽象类可以包含抽象成员或普通的非抽象成员。抽象类的成员可以是抽象成员和普通带 实现的成员的任意组合。抽象类自己可以派生自另一个抽象类。例…...

OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别

OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别 直接训练提示词嵌入向量的核心区别 您提到的代码: prompt_embedding = initial_embedding.clone().requires_grad_(True) optimizer = torch.optim.Adam([prompt_embedding...

第 86 场周赛:矩阵中的幻方、钥匙和房间、将数组拆分成斐波那契序列、猜猜这个单词

Q1、[中等] 矩阵中的幻方 1、题目描述 3 x 3 的幻方是一个填充有 从 1 到 9 的不同数字的 3 x 3 矩阵&#xff0c;其中每行&#xff0c;每列以及两条对角线上的各数之和都相等。 给定一个由整数组成的row x col 的 grid&#xff0c;其中有多少个 3 3 的 “幻方” 子矩阵&am…...

【C++进阶篇】智能指针

C内存管理终极指南&#xff1a;智能指针从入门到源码剖析 一. 智能指针1.1 auto_ptr1.2 unique_ptr1.3 shared_ptr1.4 make_shared 二. 原理三. shared_ptr循环引用问题三. 线程安全问题四. 内存泄漏4.1 什么是内存泄漏4.2 危害4.3 避免内存泄漏 五. 最后 一. 智能指针 智能指…...

Chrome 浏览器前端与客户端双向通信实战

Chrome 前端&#xff08;即页面 JS / Web UI&#xff09;与客户端&#xff08;C 后端&#xff09;的交互机制&#xff0c;是 Chromium 架构中非常核心的一环。下面我将按常见场景&#xff0c;从通道、流程、技术栈几个角度做一套完整的分析&#xff0c;特别适合你这种在分析和改…...

在 Visual Studio Code 中使用驭码 CodeRider 提升开发效率:以冒泡排序为例

目录 前言1 插件安装与配置1.1 安装驭码 CodeRider1.2 初始配置建议 2 示例代码&#xff1a;冒泡排序3 驭码 CodeRider 功能详解3.1 功能概览3.2 代码解释功能3.3 自动注释生成3.4 逻辑修改功能3.5 单元测试自动生成3.6 代码优化建议 4 驭码的实际应用建议5 常见问题与解决建议…...