当前位置: 首页 > news >正文

RabbitMQ-保证消息可靠性

RabbitMQ-保证消息可靠性

  • 1.消息可靠性
    • 1.1.生产者消息确认
      • 1.1.1.修改配置
      • 1.1.2.定义Return回调
      • 1.1.3.定义ConfirmCallback
    • 1.2.消息持久化
      • 1.2.1.交换机持久化
      • 1.2.2.队列持久化
      • 1.2.3.消息持久化
    • 1.3.消费者消息确认
      • 1.3.1.演示none模式
      • 1.3.2.演示auto模式
    • 1.4.消费失败重试机制
      • 1.4.1.本地重试
      • 1.4.2.失败策略
    • 1.5.总结

1.消息可靠性

消息从发送,到消费者接收,会经理多个过程:

在这里插入图片描述

其中的每一步都可能导致消息丢失,常见的丢失原因包括:

  • 发送时丢失:
    • 生产者发送的消息未送达exchange
    • 消息到达exchange后未到达queue
  • MQ宕机,queue将消息丢失
  • consumer接收到消息后未消费就宕机

针对这些问题,RabbitMQ分别给出了解决方案:

  • 生产者确认机制
  • mq持久化
  • 消费者确认机制
  • 失败重试机制

下面操作通过案例来演示每一个步骤。

项目结构如下:

在这里插入图片描述

1.1.生产者消息确认

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。

返回结果有两种方式:

  • publisher-confirm,发送者确认
    • 消息成功投递到交换机,返回ack
    • 消息未投递到交换机,返回nack
  • publisher-return,发送者回执
    • 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

在这里插入图片描述

注意:

在这里插入图片描述

1.1.1.修改配置

首先,在publisher服务中的application.yml文件,添加下面的内容:

spring:rabbitmq:publisher-confirm-type: correlatedpublisher-returns: truetemplate:mandatory: true

说明:

  • publish-confirm-type:开启publisher-confirm,这里支持两种类型:
    • simple:同步等待confirm结果,直到超时
    • correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
  • publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
  • template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息

1.1.2.定义Return回调

每个RabbitTemplate只能配置一个R

eturnCallback,因此需要在项目加载时配置:

修改publisher服务,添加一个:

package cn.zqd.mq.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 设置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 投递失败,记录日志log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",replyCode, replyText, exchange, routingKey, message.toString());// 如果有业务需要,可以重发消息});}
}

1.1.3.定义ConfirmCallback

ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同。

在publisher服务的cn.zqd.mq.spring.SpringAmqpTest类中,定义一个单元测试方法:

public void testSendMessage2SimpleQueue() throws InterruptedException {// 1.消息体String message = "hello, spring amqp!";// 2.全局唯一的消息ID,需要封装到CorrelationData中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 3.添加callbackcorrelationData.getFuture().addCallback(result -> {if(result.isAck()){// 3.1.ack,消息成功log.debug("消息发送成功, ID:{}", correlationData.getId());}else{// 3.2.nack,消息失败log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());}},ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage()));// 4.发送消息rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);// 休眠一会儿,等待ack回执Thread.sleep(2000);
}

1.2.消息持久化

生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失。

要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。

  • 交换机持久化
  • 队列持久化
  • 消息持久化

1.2.1.交换机持久化

RabbitMQ中交换机默认是非持久化的,mq重启后就丢失。

SpringAMQP中可以通过代码指定交换机持久化:

@Bean
public DirectExchange simpleExchange(){// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除return new DirectExchange("simple.direct", true, false);
}

事实上,默认情况下,由SpringAMQP声明的交换机都是持久化的。

可以在RabbitMQ控制台看到持久化的交换机都会带上D的标示:

在这里插入图片描述

1.2.2.队列持久化

RabbitMQ中队列默认是非持久化的,mq重启后就丢失。

SpringAMQP中可以通过代码指定交换机持久化:

@Bean
public Queue simpleQueue(){// 使用QueueBuilder构建队列,durable就是持久化的return QueueBuilder.durable("simple.queue").build();
}

事实上,默认情况下,由SpringAMQP声明的队列都是持久化的。

可以在RabbitMQ控制台看到持久化的队列都会带上D的标示:

在这里插入图片描述

1.2.3.消息持久化

利用SpringAMQP发送消息时,可以设置消息的属性(MessageProperties),指定delivery-mode:

  • 1:非持久化
  • 2:持久化

用java代码指定:

在这里插入图片描述

默认情况下,SpringAMQP发出的任何消息都是持久化的,不用特意指定。

1.3.消费者消息确认

RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。

而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。

设想这样的场景:

  • 1)RabbitMQ投递消息给消费者
  • 2)消费者获取消息后,返回ACK给RabbitMQ
  • 3)RabbitMQ删除消息
  • 4)消费者宕机,消息尚未处理

这样,消息就丢失了。因此消费者返回ACK的时机非常重要。

而SpringAMQP则允许配置三种确认模式:

•manual:手动ack,需要在业务代码结束后,调用api发送ack。

•auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack

•none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

由此可知:

  • none模式下,消息投递是不可靠的,可能丢失
  • auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
  • manual:自己根据业务情况,判断什么时候该ack

一般,我们都是使用默认的auto即可。

1.3.1.演示none模式

修改consumer服务的application.yml文件,添加下面内容:

spring:rabbitmq:listener:simple:acknowledge-mode: none # 关闭ack

修改consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理异常:

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {log.info("消费者接收到simple.queue的消息:【{}】", msg);// 模拟异常System.out.println(1 / 0);log.debug("消息处理完成!");
}

测试可以发现,当消息处理抛异常时,消息依然被RabbitMQ删除了。

1.3.2.演示auto模式

再次把确认机制修改为auto:

spring:rabbitmq:listener:simple:acknowledge-mode: auto # 关闭ack

在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unack(未确定状态):

在这里插入图片描述

抛出异常后,因为Spring会自动返回nack,所以消息恢复至Ready状态,并且没有被RabbitMQ删除:

在这里插入图片描述

1.4.消费失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:

在这里插入图片描述

怎么办呢?

1.4.1.本地重试

我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

修改consumer服务的application.yml文件,添加内容:

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000 # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启consumer服务,重复之前的测试。可以发现:

  • 在重试3次后,SpringAMQP会抛出异常AmqpRejectAndDontRequeueException,说明本地重试触发了
  • 查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是ack,mq删除消息了

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring会返回ack,消息会被丢弃

1.4.2.失败策略

在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

1)在consumer服务中定义处理失败消息的交换机和队列

@Bean
public DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

2)定义一个RepublishMessageRecoverer,关联队列和交换机

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

完整代码:

package cn.zqd.mq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;@Configuration
public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

1.5.总结

如何确保RabbitMQ消息的可靠性?(面试题)

  • 开启生产者确认机制,确保生产者的消息能到达队列
  • 开启持久化功能,确保消息未消费前在队列中不会丢失
  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

相关文章:

RabbitMQ-保证消息可靠性

RabbitMQ-保证消息可靠性 1.消息可靠性1.1.生产者消息确认1.1.1.修改配置1.1.2.定义Return回调1.1.3.定义ConfirmCallback 1.2.消息持久化1.2.1.交换机持久化1.2.2.队列持久化1.2.3.消息持久化 1.3.消费者消息确认1.3.1.演示none模式1.3.2.演示auto模式 1.4.消费失败重试机制1.…...

Python教程——Python本地环境安装

文章目录 简介安装Python下载安装验证安装结果 手动添加环境变量安装问题 简介 python官网:https://www.python.org/ Python Windows下载地址:https://www.python.org/downloads/windows/ Python 官方文档:https://www.python.org/doc/ Pytho…...

“智慧交通”转型升级+创新发展策略

随着“互联网交通”的应用创新推陈出新,传统轨道交通行业客户服务中心已难以满足乘客对便捷高效的客户服务需求;节假日人流量激增,客户服务人手不足,交通、站点堵塞、信息更新不及时等问题是常态。因此,“智慧城市”交…...

华为OD机试 - 开放日活动、取出尽量少的球(Python)

题目描述 某部门开展Family Day开放日活动,其中有个从桶里取球的游戏,游戏规则如下: 有N个容量一样的小桶等距排开, 且每个小桶都默认装了数量不等的小球, 每个小桶装的小球数量记录在数组 bucketBallNums 中, 游戏开始时,要求所有桶的小球总数不能超过SUM, 如果…...

一些关于单链表的操作

思维导图: 一, 链表 1.1节点的结构 链表是啥样的啊?顾名思义链表就是一种用链子链接起来的表。那这种表是怎么样的啊? 这样的呗: 现在,我们知道了链表的形状了。那我们该如何用编程语言来形成这一种形状…...

CTF-PHP反序列化漏洞2-利用魔法函数

作者:Eason_LYC 悲观者预言失败,十言九中。 乐观者创造奇迹,一次即可。 一个人的价值,在于他所拥有的。可以不学无术,但不能一无所有! 技术领域:WEB安全、网络攻防 关注WEB安全、网络攻防。我的…...

Doris(23):Doris的函数—字符串函数

1 append_trailing_char_if_absent(VARCHAR str, VARCHAR trailing_char) 如果s字符串非空并且末尾不包含c字符,则将c字符附加到末尾。 trailing_char只包含一个字符,如果包含多个字符,将返回NULL select append_trailing_char_if_absent(a,c);select append_trailing_cha…...

01-Shiro550漏洞流程

1. 漏洞原理 Apache Shiro框架提供了记住密码的功能(RememberMe),用户登录成功后会生成经过加密并编码的cookie。在服务端对rememberMe的cookie值,先base64解码然后AES解密再反序列化,就导致了反序列化RCE漏洞。 那么…...

《程序员面试金典(第6版)》面试题 16.08. 整数的英语表示

题目描述 给定一个整数,打印该整数的英文描述。 示例 1: 输入: 123输出: “One Hundred Twenty Three” 示例 2: 输入: 12345输出: “Twelve Thousand Three Hundred Forty Five” 示例 3: 输入: 1234567输出: “One Million Two Hundred Thirty Four Thousand…...

ChatGPT技术原理 第四章:Transformer模型

目录 4.1 什么是Transformer 4.2 Transformer结构详解 4.3 Self-Attention机制 4.4 Multi-Head Attention机制 4.1 什么是Transformer...

基于redis和threadlocal实现登录状态校验和拦截

1.流程图 单机节点下的登录状态校验 分布式节点下的登录状态校验 2.代码实现 实现步骤分为如下几步 实现WebMvcConfigurer接口,添加拦截器定义拦截器,需要配置两个interceptor,第一个用于刷新token,写threadlocal&#xff…...

14-6-进程间通信-信号量

前面学习了pipe,fifo,共享内存,信号。 本章将讲述信号量。 一、什么是信号量/信号量集? 1.什么是信号量 信号量是一个计数器。信号量用于实现进程间的同步和互斥。而可以取多个正整数的信号量被称为通用信号量。 对信号量的使用场景的解读 房间&#…...

《中国教育报》投稿邮箱编辑部征稿

《中国教育报》国家教育部主管,中国教育报刊社主办的以教育新闻为主的全国性日报。是迄今为止中国最具权威和最有影响力的教育新闻媒体。中国教育报刊社是中华人民共和国教育部直属的新闻出版机构。2018年获得第三届全国“百强报纸”。2019年入选“新媒体影响力指数…...

Photoshop如何使用绘画和图像修饰之实例演示?

文章目录 0.引言1.给图像添加渐变色效果2.快速创建一副素描画3.清除图像中多余的景物4.快速融合两张图像5.调整图像光影6.人像面部瑕疵修除7.美化眼睛 0.引言 因科研等多场景需要进行绘图处理,笔者对PS进行了学习,本文通过《Photoshop2021入门教程》及其…...

【C++】布隆过滤器

文章目录 布隆过滤器提出布隆过滤器概念布隆过滤器应用场景设计思路:布隆过滤器的插入布隆过滤器的查找布隆过滤器删除BloomFilter.h布隆过滤器优点布隆过滤器缺陷 布隆过滤器提出 我们在使用新闻客户端看新闻时,它会给我们不停地推荐新的内容,它每次推荐时要去重,去掉那些已经…...

功能齐全的 ESP32 智能手表,具有多个表盘、心率传感器硬件设计

相关设计资料下载ESP32 智能手表带心率、指南针设计资料(包含Arduino源码+原理图+Gerber+3D文件).zip 介绍 我们调查了智能手表项目的不同方面,并学会了集成和测试每个单独的部分。在本文中,我们将使用所学知识,结合使用硬件和软件组件,从头开始创建我们自己的智能手表。在…...

微服务不是本地部署的最佳选择,不妨试试模块化单体

微服务仅适用于成熟产品 关于从头开始使用微服务,马丁・福勒(Martin Fowler)总结道: 1. 几乎所有成功的微服务都是从一个过于庞大而不得不拆分的单体应用开始的。 2. 几乎所有从头开始以微服务构建的系统,最后都会因…...

解读Toolformer

【引子】读论文Toolformer: Language Models Can Teach Themselves to Use Tools,https://arxiv.org/pdf/2302.04761.pdf,再阅读了几篇关于Toolformer的网络热文,于是“无知者无畏”,开始自不量力地试图解读Toolformer。 大语言模…...

FCOS3D Fully Convolutional One-Stage Monocular 3D Object Detection 论文学习

论文地址:Fully Convolutional One-Stage Monocular 3D Object Detection Github地址:Fully Convolutional One-Stage Monocular 3D Object Detection 1. 解决了什么问题? 单目 3D 目标检测由于成本很低,对于自动驾驶任务非常重…...

Xpath学习笔记

Xpath原理:先将HTML文档转为XML文档,再用xpath查找HTML节点或元素 什么是xml? 1、xml指可扩展标记语言 2、xml是一种标记原因,类似于html 3、xml的设计宗旨是传输数据,而非显示数据 4、xml标签需要我们自己自定义 5、x…...

谷歌浏览器插件

项目中有时候会用到插件 sync-cookie-extension1.0.0:开发环境同步测试 cookie 至 localhost,便于本地请求服务携带 cookie 参考地址:https://juejin.cn/post/7139354571712757767 里面有源码下载下来,加在到扩展即可使用FeHelp…...

Spark 之 入门讲解详细版(1)

1、简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发通用内存并行计算框架。Spark在2013年6月进入Apache成为孵化项目,8个月后成为Apache顶级项目,速度之快足见过人之处&…...

突破不可导策略的训练难题:零阶优化与强化学习的深度嵌合

强化学习(Reinforcement Learning, RL)是工业领域智能控制的重要方法。它的基本原理是将最优控制问题建模为马尔可夫决策过程,然后使用强化学习的Actor-Critic机制(中文译作“知行互动”机制),逐步迭代求解…...

智慧工地云平台源码,基于微服务架构+Java+Spring Cloud +UniApp +MySql

智慧工地管理云平台系统,智慧工地全套源码,java版智慧工地源码,支持PC端、大屏端、移动端。 智慧工地聚焦建筑行业的市场需求,提供“平台网络终端”的整体解决方案,提供劳务管理、视频管理、智能监测、绿色施工、安全管…...

IGP(Interior Gateway Protocol,内部网关协议)

IGP(Interior Gateway Protocol,内部网关协议) 是一种用于在一个自治系统(AS)内部传递路由信息的路由协议,主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...

Python爬虫(一):爬虫伪装

一、网站防爬机制概述 在当今互联网环境中,具有一定规模或盈利性质的网站几乎都实施了各种防爬措施。这些措施主要分为两大类: 身份验证机制:直接将未经授权的爬虫阻挡在外反爬技术体系:通过各种技术手段增加爬虫获取数据的难度…...

2025盘古石杯决赛【手机取证】

前言 第三届盘古石杯国际电子数据取证大赛决赛 最后一题没有解出来,实在找不到,希望有大佬教一下我。 还有就会议时间,我感觉不是图片时间,因为在电脑看到是其他时间用老会议系统开的会。 手机取证 1、分析鸿蒙手机检材&#x…...

在QWebEngineView上实现鼠标、触摸等事件捕获的解决方案

这个问题我看其他博主也写了,要么要会员、要么写的乱七八糟。这里我整理一下,把问题说清楚并且给出代码,拿去用就行,照着葫芦画瓢。 问题 在继承QWebEngineView后,重写mousePressEvent或event函数无法捕获鼠标按下事…...

代码随想录刷题day30

1、零钱兑换II 给你一个整数数组 coins 表示不同面额的硬币,另给一个整数 amount 表示总金额。 请你计算并返回可以凑成总金额的硬币组合数。如果任何硬币组合都无法凑出总金额,返回 0 。 假设每一种面额的硬币有无限个。 题目数据保证结果符合 32 位带…...

MySQL 知识小结(一)

一、my.cnf配置详解 我们知道安装MySQL有两种方式来安装咱们的MySQL数据库,分别是二进制安装编译数据库或者使用三方yum来进行安装,第三方yum的安装相对于二进制压缩包的安装更快捷,但是文件存放起来数据比较冗余,用二进制能够更好管理咱们M…...