RabbitMQ消息可靠性问题及解决
说明:在RabbitMQ消息传递过程中,有以下问题:
-
消息没发到交换机
-
消息没发到队列
-
MQ宕机,消息在队列中丢失
-
消息者接收到消息后,未能正常消费(程序报错),此时消息已在队列中移除
针对以上问题,提供以下解决方案:
-
消息确认:确认消息是否发送到交换机、队列;
-
消息持久化:持久化消息,以防MQ宕机造成消息丢失;
-
消费者消息确认:确认消费者已正确消费消息,才把消息从队列中删除;
消息确认
可以使用Rabbit MQ提供的publisher confirm机制来避免消息发送到MQ过程丢失。具体实现是,publisher-confirm(发送者确定)、publisher-return(发送者回执),前者判断消息到交换机、后者判断交换机到队列
publisher-confirm(发送者确定)
-
消息成功投递到交换机,返回ack;
-
消息未投递到交换机,返回nack;
publisher-return(发送者回执)
- 消息投递到交换机,但没有到队列,返回ack,即失败原因;
在生产者端添加配置
spring:rabbitmq:# rabbitMQ相关配置host: 118.178.228.175port: 5672username: rootpassword: 123456virtual-host: /# 开启生产者确认,correlated为异步,simple为同步publisher-confirm-type: correlated# 开启publish-return功能,基于callback机制publisher-returns: true# 开启消息路由失败的策略,true是调用returnCallback方法,false是丢弃消息template:mandatory: true
publisher-return(发送者回执)代码
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;/*** 发送者回执实现*/
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplate对象RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 设置ReturnCallbackrabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {/*** 回执信息* @param message 信息对象* @param replyCode 回执码* @param replyText 回执内容* @param exchange 交换机* @param routingKey 路由键值*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("消息发送队列失败=====replyCode{},replyText{},exchange{},routingKey{},message{}",replyCode,replyText,exchange,routingKey,message);}});}
}
publisher-confirm(发送者确定)代码
@Testpublic void sendExceptionMessage() {// 路由键值String routingKey = "exception";// 消息String message = "This is a exception message";// 给消息设置一个唯一IDCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 编写confirmCallBack回调函数correlationData.getFuture().addCallback(new SuccessCallback<CorrelationData.Confirm>() {@Overridepublic void onSuccess(CorrelationData.Confirm confirm) {if (confirm.isAck()) {// 消息发送交换机成功log.debug("消息送达至交换机成功");} else {// 消息发送交换机失败,打印消息log.error("消息未能送达至交换机,ID{},原因{}", correlationData.getId(), confirm.getReason());}}}, new FailureCallback() {// 消息发送交换机异常@Overridepublic void onFailure(Throwable ex) {log.error("消息发送交换机异常,ID:{},原因{}", correlationData.getId(), ex.getMessage());}});rabbitTemplate.convertAndSend("amq.direct", routingKey, message, correlationData);}
测试,设置一个不存在的routingKey,被发送者确认(publisher-confirm)捕获到;
// 路由键值
String routingKey = "null";
设置一个不存在的路由,被发送者回执(publisher-return)捕获到;
rabbitTemplate.convertAndSend("null", routingKey, message, correlationData);
消息持久化
消息持久化,是指把消息保存到磁盘中,在RabbitMQ宕机或者关机时,重启后,消息仍可以保存下来。消息依赖于交换机、队列,因此持久化消息,同时也需要持久化交换机、队列。
创建一个持久化的交换机、队列
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 消息持久化*/
@Configuration
public class DurableConfig {/*** 交换机持久化* @return*/@Beanpublic DirectExchange directExchange(){// 三个参数分别是:交换机名、是否持久化、没有队列与之绑定时是否自动删除return new DirectExchange("durable.direct",true,false);}/*** 队列持久化* @return*/@Beanpublic Queue durableQueue(){return QueueBuilder.durable("durable.queue").build();}/*** 交换机与队列绑定* @return*/@Beanpublic Binding binding(){return BindingBuilder.bind(durableQueue()).to(directExchange()).with("durable");}}
发送一个持久化的消息
/*** 发送持久化消息*/@Testpublic void sendDurableMessage() {String routingKey = "durable";CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());Message message = MessageBuilder.withBody("This is a durable message".getBytes(StandardCharsets.UTF_8))// 设置该消息未持久化消息.setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();rabbitTemplate.convertAndSend("durable.direct", routingKey, message, correlationData);}
打开RabbitMQ管理平台,可以看到"delivery_mode: 2",表示该消息是持久化消息
(源码:MessageDeliveryMode类)
实际上,交换机、队列默认就是持久化的(durable: true),所以不用特意设置;
消费者消息确认
介绍
消费者消息确认,是为了确保消费者已经消费了消息,才让MQ把该消息删除;
可通过在消费者的配置文件中增加下面这行配置实现,备选项有以下三个:
-
none:关闭ack,表示不做处理,消息发给消费者之后就立即被删除;
-
auto:自动ack,表示由Spring检测代码是否出现异常,出现异常则保留消息,没有异常则删除消息;
-
manual:手动ack,可根据业务手动编写代码,返回ack;
spring:rabbitmq:listener:simple:# 设置消息确认模式acknowledge-mode: none
测试:none
可编写代码测试,下面是生产者代码,发送消息
/*** 发送普通消息*/@Testpublic void sendNoneMessage() {String directName = "none.direct";String routingKey = "none";String message = "This is a test message";rabbitTemplate.convertAndSend(directName, routingKey, message);}
消费者代码有问题,未能正常消费消息
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "none.queue"),exchange = @Exchange(name = "none.direct",type = ExchangeTypes.DIRECT),key = {"none"}))public void getNoneMessage(String normalMessage){System.out.println(1/0);System.out.println("normalMessage = " + normalMessage);}
测试结果,程序报错,消息也没能保留下来
测试:auto
更改设置为:auto,重试
但是消息未被删除
这种情况,在实际开发中是不能允许,可以通过更改消费失败的重试机制解决。
消费失败重试机制
方法一:设置retry
因为消息被消费失败,消息会一直循环重试,无限循环,导致mq的消息处理飙升,带来不必要的压力,这种情况可以通过在消费者端添加以下配置,限制失败重试的条件来解决:
spring:rabbitmq:listener:simple:retry:# 开启消费者失败重试enabled: true# 初次失败等待时长为1秒initial-interval: 1000# 失败的等待时长倍数,即后一次等待的时间是前一次等待时间的多少倍multiplier: 1# 最多重试次数max-attempts: 3# true 无状态 false 有状态 如果业务中包含事务 改为falsestateless: true
开启后,控制台可以发现,信息不回一直循环打印,而是打印数条后停止,日志信息中有提示“Retry Policy Exhausted”(重试策略已用尽)
这种通过配置的方式,并不会重试数次后仍保留消息,而是重试数次仍失败,随即丢弃消息,消息丢失,这在实际开发中也是不能被允许的。
方法二:路由存储消息
因此,可以通过下面这个方法,把消费失败的消息,通过交换机路由到另外的队列中存储起来,等业务代码被修复,再路由回来消费。
代码如下
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 错误消息队列*/
@Configuration
public class ErrorMessageQueueConfig {/*** 创建一个交换机,用于路由消费失败的消息* @return*/@Beanpublic DirectExchange errorExchange(){return new DirectExchange("error.direct");}/*** 创建一个队列,用于存储消费失败的消息* @return*/@Beanpublic Queue errorQueue(){return new Queue("error.queue");}/*** 绑定* @return*/@Beanpublic Binding errorBinding(){return BindingBuilder.bind(errorQueue()).to(errorExchange()).with("error");}/*** 路由,当消费失败时,把消费失败的消息路由到此队列中,路由key为"error"* @param rabbitTemplate* @return*/@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");}
}
可以看到,消息消费失败后并没有被丢失,而是路由到错误队列中存储了起来。因为错误队列没有设置RabbitListener,所以可以存储消息,等带代码问题被排查出来后,可以再针对该队列设置监听方法,消费这部分错误的消息。
另外,值得一提的是,消费者这边的控制台会报一个警告,提示路由密钥错误。我们可以理解,在RabbitMQ底层,会把消费失败了的消息,统一路由到一个地方去,而我们这种手动把消费失败的消息路由到自定义的队列中的方式,打破了这种“默认的规则”,所以报了一个这样的警告。这种警告是在可控范围内的。
总结
RabbitMQ发送消息,为了确保消息的可靠性,保证消息能被交换机、队列收到,消息能被正常消费,而不会因消费失败而丢失,提供了对应的一系列方法,并且最后还提供了两种消费失败重试方法,优化了消费过程,非常Nice。
相关文章:

RabbitMQ消息可靠性问题及解决
说明:在RabbitMQ消息传递过程中,有以下问题: 消息没发到交换机 消息没发到队列 MQ宕机,消息在队列中丢失 消息者接收到消息后,未能正常消费(程序报错),此时消息已在队列中移除 …...

2023河南萌新联赛第(三)场:郑州大学(两个题目)
1.入门mex 重点 一些数字的mex是从0往上枚举,第一个没出现的数字。请你回答选最多k个数字,mex最大是多少 既然从0开始枚举,那么应该是最小,那么最大是什么? 经过自己的考虑,给出一个样例,0 1 1…...
学生管理系统-07打包与上线
一、项目架构 vue的项目必须要进行打包,并部署在nginx服务器上的 二、vue的打包 1、修改vue.cofing.js文件 在该文件中添加publicPath属性,值为./ const { defineConfig } require(vue/cli-service) module.exports defineConfig({transpileDepen…...

day31贪心算法 用最少数量的箭引爆气球 和无重叠区间
题目描述 题目分析: x轴向上射箭,12一支,重叠的需要一支,3-8一支,7-16一支 返回2; 就是让重叠的气球尽量在一起,局部最优;用一支弓箭,全局最优就是最少弓箭;…...
AMEYA360报道:手机直连卫星通信发展的三个阶段
卫星通信的发展从过去、现在与规划,可以分为三个阶段。手机卫星通信的第一个阶段中,较为典型的有铱星公司、海事卫星电话、天通卫星通信等,终端设备方面已经可以做到手持设备直接通过自带的天线与卫星进行通信。 包括铱星、天通卫星等&#x…...

redis中缓存雪崩,缓存穿透,缓存击穿的原因以及解决方案
一 redis的缓存雪崩 1.1 缓存雪崩 在redis中,新,旧数据交替时候,旧数据进行了删除,新数据没有更新过来,造成在高并发环境下,大量请求查询redis没有数据,直接查询mysql,造成mysql的…...

ChatGPT火热之下的冷思考
作为一款基于人工智能的自然语言处理(NLP)聊天机器人程序,ChatGPT通过大量来自互联网的文本进行训练,并使用深度学习和机器学习算法来理解用户的问题并提供准确的回答。并且,ChatGPT还内置了情感分析、关键字提取和实体识别等功能&am…...

查看docker容器启动参数
查看docker启动参数 1、查看docker容器的自启动策略2、查看docker容器的日志滚动清理策略 以下配置命令以redis容器为例 1、查看docker容器的自启动策略 docker inspect --format{{json .HostConfig.RestartPolicy}} redis输出的name是always 表示此容器是开机自启动的&#x…...
对Webpack的理解
Webpack是目前比较物流的前端构建工具,它基于入口,用不同的Loader来处理不同的文件 Webpack的核心概念 Entry:入口,Webpack执行构建的第一步将从Entry开始,可抽象成输入。告诉Webpack要使用哪个模块作为构建项目的起…...

使用wxPython和pillow开发拼图小游戏(四)
上一篇介绍了使用本地图片来初始化游戏的方法,通过前边三篇,该小游戏的主要内容差不多介绍完了,最后这一篇来介绍下游戏用时的计算、重置游戏和关闭窗口事件处理 游戏用时的计算 对于游戏用时的记录,看过前几篇的小伙伴可能也发现…...

XGBoost实例——皮马印第安人糖尿病预测和特征筛选
利用皮马印第安人糖尿病数据集来预测皮马印第安人的糖尿病,以下是数据集的信息: Pregnancies:怀孕次数Glucose:葡萄糖BloodPressure:血压 (mm Hg)SkinThickness:皮层厚度 (mm)Insulin:胰岛素 2…...

使用MQ发送对象错误
说明:使用RabbitMQ发送消息,消息是对象,出现下面这样的错误; 错误信息:Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of com.hmall.item.pojo.Item (no Cr…...
安装和卸载docker,详细教程
安装docker ############################################################################# 安装: 1、Docker要求CentOS系统的内核版本高于 3.10 ,通过 uname -r 命令查看你当前的内核版本是否支持安账docker 2、更新yum包:sudo yum -y up…...
RabbitMQ的确认机制
RabbitMQ的确认机制 生产者确认 public class ProductionMessageConfirm {public static void Send(){ConnectionFactory factory new ConnectionFactory();factory.HostName "localhost";//RabbitMQ服务在本地运行factory.UserName "guest";//用户名…...

java项目之人才公寓管理系统(ssm+mysql+jsp)
风定落花生,歌声逐流水,大家好我是风歌,混迹在java圈的辛苦码农。今天要和大家聊的是一款基于ssm的人才公寓管理系统。技术交流和部署相关看文章末尾! 开发环境: 后端: 开发语言:Java 框架&…...
git使用记录
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、常用git命令总结 前言 一、常用git命令 git --version # mkdir my-project cd my-project git status # 这一步显然没东西 git init # 创建 git status #…...

Spring MVC异步上传、跨服务器上传和文件下载
一、异步上传 之前的上传方案,在上传成功后都会跳转页面。而在实际开发中,很多情况下上传后不进行跳转,而是进行页面的局部刷新,比如:上传头像成功后将头像显示在网页中。这时候就需要使用异步文件上传。 1.1 JSP页面 …...

性能测试之并发用户数的估计
在计算并发用户数之前,需要先了解2个概念。 并发用户:指的是现实系统中同时操作业务的用户,在性能测试工具中一般称为虚拟用户。并发用户这些用户的最大特征是和服务器产生了交互,这种交互既可以是单向的传输数据,也可…...

【全方位解析】如何获取客户端/服务端真实 IP
一、应用场景 1.比如在投票系统开发中,为了防止刷票,我们需要限制每个 IP 地址只能投票一次 2.当网站受到诸如 DDoS(Distributed Denial of Service,分布式拒绝服务攻击)等攻击时,我们需要快速定位攻击者…...

Ceph简介和特性
Ceph是一个多版本存储系统,它把每一个待管理的数据流(例如一个文件) 切分为一到多个固定大小的对象数据,并以其为原子单元完成数据存取。 对象数据的底层存储服务是由多个主机 (host) 组成的存储集群,该集群也被称之为 RADOS (ReliableAutoma…...
基于大模型的 UI 自动化系统
基于大模型的 UI 自动化系统 下面是一个完整的 Python 系统,利用大模型实现智能 UI 自动化,结合计算机视觉和自然语言处理技术,实现"看屏操作"的能力。 系统架构设计 #mermaid-svg-2gn2GRvh5WCP2ktF {font-family:"trebuchet ms",verdana,arial,sans-…...

基于FPGA的PID算法学习———实现PID比例控制算法
基于FPGA的PID算法学习 前言一、PID算法分析二、PID仿真分析1. PID代码2.PI代码3.P代码4.顶层5.测试文件6.仿真波形 总结 前言 学习内容:参考网站: PID算法控制 PID即:Proportional(比例)、Integral(积分&…...
linux 下常用变更-8
1、删除普通用户 查询用户初始UID和GIDls -l /home/ ###家目录中查看UID cat /etc/group ###此文件查看GID删除用户1.编辑文件 /etc/passwd 找到对应的行,YW343:x:0:0::/home/YW343:/bin/bash 2.将标红的位置修改为用户对应初始UID和GID: YW3…...
土地利用/土地覆盖遥感解译与基于CLUE模型未来变化情景预测;从基础到高级,涵盖ArcGIS数据处理、ENVI遥感解译与CLUE模型情景模拟等
🔍 土地利用/土地覆盖数据是生态、环境和气象等诸多领域模型的关键输入参数。通过遥感影像解译技术,可以精准获取历史或当前任何一个区域的土地利用/土地覆盖情况。这些数据不仅能够用于评估区域生态环境的变化趋势,还能有效评价重大生态工程…...

Yolov8 目标检测蒸馏学习记录
yolov8系列模型蒸馏基本流程,代码下载:这里本人提交了一个demo:djdll/Yolov8_Distillation: Yolov8轻量化_蒸馏代码实现 在轻量化模型设计中,**知识蒸馏(Knowledge Distillation)**被广泛应用,作为提升模型…...
QT3D学习笔记——圆台、圆锥
类名作用Qt3DWindow3D渲染窗口容器QEntity场景中的实体(对象或容器)QCamera控制观察视角QPointLight点光源QConeMesh圆锥几何网格QTransform控制实体的位置/旋转/缩放QPhongMaterialPhong光照材质(定义颜色、反光等)QFirstPersonC…...

R 语言科研绘图第 55 期 --- 网络图-聚类
在发表科研论文的过程中,科研绘图是必不可少的,一张好看的图形会是文章很大的加分项。 为了便于使用,本系列文章介绍的所有绘图都已收录到了 sciRplot 项目中,获取方式: R 语言科研绘图模板 --- sciRplothttps://mp.…...

Unity UGUI Button事件流程
场景结构 测试代码 public class TestBtn : MonoBehaviour {void Start(){var btn GetComponent<Button>();btn.onClick.AddListener(OnClick);}private void OnClick(){Debug.Log("666");}}当添加事件时 // 实例化一个ButtonClickedEvent的事件 [Formerl…...

Chrome 浏览器前端与客户端双向通信实战
Chrome 前端(即页面 JS / Web UI)与客户端(C 后端)的交互机制,是 Chromium 架构中非常核心的一环。下面我将按常见场景,从通道、流程、技术栈几个角度做一套完整的分析,特别适合你这种在分析和改…...

【Linux手册】探秘系统世界:从用户交互到硬件底层的全链路工作之旅
目录 前言 操作系统与驱动程序 是什么,为什么 怎么做 system call 用户操作接口 总结 前言 日常生活中,我们在使用电子设备时,我们所输入执行的每一条指令最终大多都会作用到硬件上,比如下载一款软件最终会下载到硬盘上&am…...