RabbitMQ消息可靠性问题及解决
说明:在RabbitMQ消息传递过程中,有以下问题:
-
消息没发到交换机
-
消息没发到队列
-
MQ宕机,消息在队列中丢失
-
消息者接收到消息后,未能正常消费(程序报错),此时消息已在队列中移除
针对以上问题,提供以下解决方案:
-
消息确认:确认消息是否发送到交换机、队列;
-
消息持久化:持久化消息,以防MQ宕机造成消息丢失;
-
消费者消息确认:确认消费者已正确消费消息,才把消息从队列中删除;
消息确认
可以使用Rabbit MQ提供的publisher confirm机制来避免消息发送到MQ过程丢失。具体实现是,publisher-confirm(发送者确定)、publisher-return(发送者回执),前者判断消息到交换机、后者判断交换机到队列
publisher-confirm(发送者确定)
-
消息成功投递到交换机,返回ack;
-
消息未投递到交换机,返回nack;
publisher-return(发送者回执)
- 消息投递到交换机,但没有到队列,返回ack,即失败原因;
在生产者端添加配置
spring:rabbitmq:# rabbitMQ相关配置host: 118.178.228.175port: 5672username: rootpassword: 123456virtual-host: /# 开启生产者确认,correlated为异步,simple为同步publisher-confirm-type: correlated# 开启publish-return功能,基于callback机制publisher-returns: true# 开启消息路由失败的策略,true是调用returnCallback方法,false是丢弃消息template:mandatory: true
publisher-return(发送者回执)代码
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;/*** 发送者回执实现*/
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplate对象RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 设置ReturnCallbackrabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {/*** 回执信息* @param message 信息对象* @param replyCode 回执码* @param replyText 回执内容* @param exchange 交换机* @param routingKey 路由键值*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("消息发送队列失败=====replyCode{},replyText{},exchange{},routingKey{},message{}",replyCode,replyText,exchange,routingKey,message);}});}
}
publisher-confirm(发送者确定)代码
@Testpublic void sendExceptionMessage() {// 路由键值String routingKey = "exception";// 消息String message = "This is a exception message";// 给消息设置一个唯一IDCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 编写confirmCallBack回调函数correlationData.getFuture().addCallback(new SuccessCallback<CorrelationData.Confirm>() {@Overridepublic void onSuccess(CorrelationData.Confirm confirm) {if (confirm.isAck()) {// 消息发送交换机成功log.debug("消息送达至交换机成功");} else {// 消息发送交换机失败,打印消息log.error("消息未能送达至交换机,ID{},原因{}", correlationData.getId(), confirm.getReason());}}}, new FailureCallback() {// 消息发送交换机异常@Overridepublic void onFailure(Throwable ex) {log.error("消息发送交换机异常,ID:{},原因{}", correlationData.getId(), ex.getMessage());}});rabbitTemplate.convertAndSend("amq.direct", routingKey, message, correlationData);}
测试,设置一个不存在的routingKey,被发送者确认(publisher-confirm)捕获到;
// 路由键值
String routingKey = "null";
设置一个不存在的路由,被发送者回执(publisher-return)捕获到;
rabbitTemplate.convertAndSend("null", routingKey, message, correlationData);
消息持久化
消息持久化,是指把消息保存到磁盘中,在RabbitMQ宕机或者关机时,重启后,消息仍可以保存下来。消息依赖于交换机、队列,因此持久化消息,同时也需要持久化交换机、队列。
创建一个持久化的交换机、队列
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 消息持久化*/
@Configuration
public class DurableConfig {/*** 交换机持久化* @return*/@Beanpublic DirectExchange directExchange(){// 三个参数分别是:交换机名、是否持久化、没有队列与之绑定时是否自动删除return new DirectExchange("durable.direct",true,false);}/*** 队列持久化* @return*/@Beanpublic Queue durableQueue(){return QueueBuilder.durable("durable.queue").build();}/*** 交换机与队列绑定* @return*/@Beanpublic Binding binding(){return BindingBuilder.bind(durableQueue()).to(directExchange()).with("durable");}}
发送一个持久化的消息
/*** 发送持久化消息*/@Testpublic void sendDurableMessage() {String routingKey = "durable";CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());Message message = MessageBuilder.withBody("This is a durable message".getBytes(StandardCharsets.UTF_8))// 设置该消息未持久化消息.setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();rabbitTemplate.convertAndSend("durable.direct", routingKey, message, correlationData);}
打开RabbitMQ管理平台,可以看到"delivery_mode: 2",表示该消息是持久化消息
(源码:MessageDeliveryMode类)
实际上,交换机、队列默认就是持久化的(durable: true),所以不用特意设置;
消费者消息确认
介绍
消费者消息确认,是为了确保消费者已经消费了消息,才让MQ把该消息删除;
可通过在消费者的配置文件中增加下面这行配置实现,备选项有以下三个:
-
none:关闭ack,表示不做处理,消息发给消费者之后就立即被删除;
-
auto:自动ack,表示由Spring检测代码是否出现异常,出现异常则保留消息,没有异常则删除消息;
-
manual:手动ack,可根据业务手动编写代码,返回ack;
spring:rabbitmq:listener:simple:# 设置消息确认模式acknowledge-mode: none
测试:none
可编写代码测试,下面是生产者代码,发送消息
/*** 发送普通消息*/@Testpublic void sendNoneMessage() {String directName = "none.direct";String routingKey = "none";String message = "This is a test message";rabbitTemplate.convertAndSend(directName, routingKey, message);}
消费者代码有问题,未能正常消费消息
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "none.queue"),exchange = @Exchange(name = "none.direct",type = ExchangeTypes.DIRECT),key = {"none"}))public void getNoneMessage(String normalMessage){System.out.println(1/0);System.out.println("normalMessage = " + normalMessage);}
测试结果,程序报错,消息也没能保留下来
测试:auto
更改设置为:auto,重试
但是消息未被删除
这种情况,在实际开发中是不能允许,可以通过更改消费失败的重试机制解决。
消费失败重试机制
方法一:设置retry
因为消息被消费失败,消息会一直循环重试,无限循环,导致mq的消息处理飙升,带来不必要的压力,这种情况可以通过在消费者端添加以下配置,限制失败重试的条件来解决:
spring:rabbitmq:listener:simple:retry:# 开启消费者失败重试enabled: true# 初次失败等待时长为1秒initial-interval: 1000# 失败的等待时长倍数,即后一次等待的时间是前一次等待时间的多少倍multiplier: 1# 最多重试次数max-attempts: 3# true 无状态 false 有状态 如果业务中包含事务 改为falsestateless: true
开启后,控制台可以发现,信息不回一直循环打印,而是打印数条后停止,日志信息中有提示“Retry Policy Exhausted”(重试策略已用尽)
这种通过配置的方式,并不会重试数次后仍保留消息,而是重试数次仍失败,随即丢弃消息,消息丢失,这在实际开发中也是不能被允许的。
方法二:路由存储消息
因此,可以通过下面这个方法,把消费失败的消息,通过交换机路由到另外的队列中存储起来,等业务代码被修复,再路由回来消费。
代码如下
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 错误消息队列*/
@Configuration
public class ErrorMessageQueueConfig {/*** 创建一个交换机,用于路由消费失败的消息* @return*/@Beanpublic DirectExchange errorExchange(){return new DirectExchange("error.direct");}/*** 创建一个队列,用于存储消费失败的消息* @return*/@Beanpublic Queue errorQueue(){return new Queue("error.queue");}/*** 绑定* @return*/@Beanpublic Binding errorBinding(){return BindingBuilder.bind(errorQueue()).to(errorExchange()).with("error");}/*** 路由,当消费失败时,把消费失败的消息路由到此队列中,路由key为"error"* @param rabbitTemplate* @return*/@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");}
}
可以看到,消息消费失败后并没有被丢失,而是路由到错误队列中存储了起来。因为错误队列没有设置RabbitListener,所以可以存储消息,等带代码问题被排查出来后,可以再针对该队列设置监听方法,消费这部分错误的消息。
另外,值得一提的是,消费者这边的控制台会报一个警告,提示路由密钥错误。我们可以理解,在RabbitMQ底层,会把消费失败了的消息,统一路由到一个地方去,而我们这种手动把消费失败的消息路由到自定义的队列中的方式,打破了这种“默认的规则”,所以报了一个这样的警告。这种警告是在可控范围内的。
总结
RabbitMQ发送消息,为了确保消息的可靠性,保证消息能被交换机、队列收到,消息能被正常消费,而不会因消费失败而丢失,提供了对应的一系列方法,并且最后还提供了两种消费失败重试方法,优化了消费过程,非常Nice。
相关文章:

RabbitMQ消息可靠性问题及解决
说明:在RabbitMQ消息传递过程中,有以下问题: 消息没发到交换机 消息没发到队列 MQ宕机,消息在队列中丢失 消息者接收到消息后,未能正常消费(程序报错),此时消息已在队列中移除 …...

2023河南萌新联赛第(三)场:郑州大学(两个题目)
1.入门mex 重点 一些数字的mex是从0往上枚举,第一个没出现的数字。请你回答选最多k个数字,mex最大是多少 既然从0开始枚举,那么应该是最小,那么最大是什么? 经过自己的考虑,给出一个样例,0 1 1…...

学生管理系统-07打包与上线
一、项目架构 vue的项目必须要进行打包,并部署在nginx服务器上的 二、vue的打包 1、修改vue.cofing.js文件 在该文件中添加publicPath属性,值为./ const { defineConfig } require(vue/cli-service) module.exports defineConfig({transpileDepen…...

day31贪心算法 用最少数量的箭引爆气球 和无重叠区间
题目描述 题目分析: x轴向上射箭,12一支,重叠的需要一支,3-8一支,7-16一支 返回2; 就是让重叠的气球尽量在一起,局部最优;用一支弓箭,全局最优就是最少弓箭;…...

AMEYA360报道:手机直连卫星通信发展的三个阶段
卫星通信的发展从过去、现在与规划,可以分为三个阶段。手机卫星通信的第一个阶段中,较为典型的有铱星公司、海事卫星电话、天通卫星通信等,终端设备方面已经可以做到手持设备直接通过自带的天线与卫星进行通信。 包括铱星、天通卫星等&#x…...

redis中缓存雪崩,缓存穿透,缓存击穿的原因以及解决方案
一 redis的缓存雪崩 1.1 缓存雪崩 在redis中,新,旧数据交替时候,旧数据进行了删除,新数据没有更新过来,造成在高并发环境下,大量请求查询redis没有数据,直接查询mysql,造成mysql的…...

ChatGPT火热之下的冷思考
作为一款基于人工智能的自然语言处理(NLP)聊天机器人程序,ChatGPT通过大量来自互联网的文本进行训练,并使用深度学习和机器学习算法来理解用户的问题并提供准确的回答。并且,ChatGPT还内置了情感分析、关键字提取和实体识别等功能&am…...

查看docker容器启动参数
查看docker启动参数 1、查看docker容器的自启动策略2、查看docker容器的日志滚动清理策略 以下配置命令以redis容器为例 1、查看docker容器的自启动策略 docker inspect --format{{json .HostConfig.RestartPolicy}} redis输出的name是always 表示此容器是开机自启动的&#x…...

对Webpack的理解
Webpack是目前比较物流的前端构建工具,它基于入口,用不同的Loader来处理不同的文件 Webpack的核心概念 Entry:入口,Webpack执行构建的第一步将从Entry开始,可抽象成输入。告诉Webpack要使用哪个模块作为构建项目的起…...

使用wxPython和pillow开发拼图小游戏(四)
上一篇介绍了使用本地图片来初始化游戏的方法,通过前边三篇,该小游戏的主要内容差不多介绍完了,最后这一篇来介绍下游戏用时的计算、重置游戏和关闭窗口事件处理 游戏用时的计算 对于游戏用时的记录,看过前几篇的小伙伴可能也发现…...

XGBoost实例——皮马印第安人糖尿病预测和特征筛选
利用皮马印第安人糖尿病数据集来预测皮马印第安人的糖尿病,以下是数据集的信息: Pregnancies:怀孕次数Glucose:葡萄糖BloodPressure:血压 (mm Hg)SkinThickness:皮层厚度 (mm)Insulin:胰岛素 2…...

使用MQ发送对象错误
说明:使用RabbitMQ发送消息,消息是对象,出现下面这样的错误; 错误信息:Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of com.hmall.item.pojo.Item (no Cr…...

安装和卸载docker,详细教程
安装docker ############################################################################# 安装: 1、Docker要求CentOS系统的内核版本高于 3.10 ,通过 uname -r 命令查看你当前的内核版本是否支持安账docker 2、更新yum包:sudo yum -y up…...

RabbitMQ的确认机制
RabbitMQ的确认机制 生产者确认 public class ProductionMessageConfirm {public static void Send(){ConnectionFactory factory new ConnectionFactory();factory.HostName "localhost";//RabbitMQ服务在本地运行factory.UserName "guest";//用户名…...

java项目之人才公寓管理系统(ssm+mysql+jsp)
风定落花生,歌声逐流水,大家好我是风歌,混迹在java圈的辛苦码农。今天要和大家聊的是一款基于ssm的人才公寓管理系统。技术交流和部署相关看文章末尾! 开发环境: 后端: 开发语言:Java 框架&…...

git使用记录
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、常用git命令总结 前言 一、常用git命令 git --version # mkdir my-project cd my-project git status # 这一步显然没东西 git init # 创建 git status #…...

Spring MVC异步上传、跨服务器上传和文件下载
一、异步上传 之前的上传方案,在上传成功后都会跳转页面。而在实际开发中,很多情况下上传后不进行跳转,而是进行页面的局部刷新,比如:上传头像成功后将头像显示在网页中。这时候就需要使用异步文件上传。 1.1 JSP页面 …...

性能测试之并发用户数的估计
在计算并发用户数之前,需要先了解2个概念。 并发用户:指的是现实系统中同时操作业务的用户,在性能测试工具中一般称为虚拟用户。并发用户这些用户的最大特征是和服务器产生了交互,这种交互既可以是单向的传输数据,也可…...

【全方位解析】如何获取客户端/服务端真实 IP
一、应用场景 1.比如在投票系统开发中,为了防止刷票,我们需要限制每个 IP 地址只能投票一次 2.当网站受到诸如 DDoS(Distributed Denial of Service,分布式拒绝服务攻击)等攻击时,我们需要快速定位攻击者…...

Ceph简介和特性
Ceph是一个多版本存储系统,它把每一个待管理的数据流(例如一个文件) 切分为一到多个固定大小的对象数据,并以其为原子单元完成数据存取。 对象数据的底层存储服务是由多个主机 (host) 组成的存储集群,该集群也被称之为 RADOS (ReliableAutoma…...

Python基本语法之符号使用
好久没有和小伙伴们更新python了,我对于此感到抱歉以后有时间尽量多更新 目录 一. 标识符 A.定义: B.使用特点 C.Python标识符,进一步探讨以下几个方面的详细内容: 1. 规则和约定: 2. 有效的标识符示例࿱…...

前端vue部署到nginx并且配置https安全证书全流程
说明一下: 本人原本使用的是docker安装nginx通过挂载实现部署,但是出现了很多bug(例如部署安全证书后还是无法访问),所以困扰了很久,最后改为本地安装nginx,最终在不懈的努力下终于按照好了&…...

三子棋(超详解+完整码源)
三子棋 前言一,游戏规则二,所需文件三,创建菜单四,游戏核心内容实现1.棋盘初始化1.棋盘展示3.玩家下棋4.电脑下棋5.游戏胜负判断6.game()函数内部具体实现 四,游戏运行实操 前言 C语言实现三子棋…...

【算法提高:动态规划】1.2 最长上升子序列模型(TODO:最长公共上升子序列)
文章目录 题目列表1017. 怪盗基德的滑翔翼1014. 登山482. 合唱队形1012. 友好城市(⭐排序后 最长上升子序列模型)1016. 最大上升子序列和1010. 拦截导弹解法1——最长递减子序列 贪心解法2——最长递减子序列 最长递增子序列(⭐贪心结论&am…...

会不会好奇ai绘画生成器?ai创作的灵感从何而来?
在这个宁静的公园里,阳光透过树叶的缝隙洒在的地面上,微风轻拂着艺术家的发丝,带来一丝清凉。坐在长椅上的他,手中紧握着一支触控画笔,目光凝视着眼前的美景。旁边一台智能绘画助手正在悄悄发光,它似乎能够…...

【Ajax】笔记-JQuery发送请求与通用方法
Get请求 语法格式: $.get(url, [data], [callback], [type]) url:请求的 URL 地址。data:请求携带的参数。callback:载入成功时回调函数。type:设置返回内容格式,xml, html, script, json, text, _default。 准备三个按钮分别测试Get 、Post、通用型方…...

视频的音频提取怎么做?这样提取很简单
提取视频中的音频通常在需要从视频中独立使用音频或需要对音频进行编辑时使用。例如,当我们需要将音频上传到音乐流媒体平台或将其用于播客或其他音频项目时,就可能需要从视频中提取音频。问题是该怎么提取呢?教给大家几种简单的提取方法&…...

几百本常用计算机开发语言电子书链接
GitHub - XiangLinPro/IT_book: 本项目收藏这些年来看过或者听过的一些不错的常用的上千本书籍,没准你想找的书就在这里呢,包含了互联网行业大多数书籍和面试经验题目等等。有人工智能系列(常用深度学习框架TensorFlow、pytorch、keras。NLP、…...

Docker Compose 解析:定义和管理多容器应用,从多角度探索其优势和应用场景
🌷🍁 博主 libin9iOak带您 Go to New World.✨🍁 🦄 个人主页——libin9iOak的博客🎐 🐳 《面试题大全》 文章图文并茂🦕生动形象🦖简单易学!欢迎大家来踩踩~ἳ…...

Linux系列---【CentOS 7通过MSTSC连接远程桌面】
安装对应的yum源 yum list lightdm xorgxrdp xrdp 可以看到这些软件都在epel中,如果没有的话,请先安装对应的yum源。命令如下: yum install -y epel-release 确认yum源没有问题之后,我们就可以进行安装了。 安装lightdm xorgxrdp…...