当前位置: 首页 > news >正文

RabbitMQ入门指南(九):消费者可靠性

专栏导航

RabbitMQ入门指南

从零开始了解大数据


目录

专栏导航

前言

一、消费者确认机制

二、失败重试机制

三、失败处理策略

四、业务幂等性

1.通过唯一标识符保证操作的幂等性

2.通过业务判断保证操作的幂等性

总结


前言

RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容。


当RabbitMQ向消费者投递消息后,了解消费者的处理状态是非常重要的。因为消息的投递并不代表消费者一定能够正确地消费这些消息,可能会出现各种故障:

  • 网络故障:在消息投递过程中,如果RabbitMQ和消费者之间的网络连接出现故障,可能会导致消息无法正确投递给消费者。
  • 消费者宕机:如果消费者在接收消息后突然宕机,那么消息可能无法被正确处理。
  • 消费者处理异常:消费者在接收到消息后,由于处理不当或者出现异常情况,可能会导致消息处理失败。

以上情况都可能导致消息丢失,因此RabbitMQ需要知道消费者的处理状态,以便在消息处理失败时重新投递消息。

一、消费者确认机制

RabbitMQ的消费者确认机制Consumer Acknowledgement)是一种确保消息被成功处理的机制。当消费者处理消息结束后,需要向RabbitMQ发送一个回执,以告知消息的处理状态。这个机制对于确保消息的可靠传递非常重要,因为它可以防止消息在消费者端处理失败而没有被正确处理的情况。

回执有三种可选值:

  • ACK(确认):表示消费者成功处理了消息,RabbitMQ会从队列中删除该消息。
  • NACK(否定确认):表示消息处理失败,RabbitMQ需要再次投递该消息。
  • REJECT(拒绝):表示消息处理失败并且被拒绝,RabbitMQ会从队列中删除该消息。

在实际应用中,一般使用ACK或NACK两种方式。REJECT方式的使用相对较少,通常只在消息格式存在问题,即存在开发错误的情况下使用。因此大多数情况下需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ACK,处理失败时返回NACK。

在consumer服务的application.yml文件中添加配置修改Spring AMQP的ACK处理方式 :

spring:rabbitmq:listener:simple:acknowledge-mode: auto

RabbitMQ 支持三种不同的确认模式,这些模式通过acknowledge-mode属性进行配置:

  • none:关闭ACK。消费者接收到消息后不需要发送任何确认给发送者,发送者将继续发送下一条消息。在这种模式下,如果消费者处理消息失败,消息将会丢失,无法保证消息的可靠性。
  • manual:手动ACK。消费者接收到消息后需要手动发送确认给发送者,发送者才会继续发送下一条消息。在这种模式下,如果消费者处理消息失败,可以手动发送NACK给发送者,告诉发送者这条消息处理失败,以便发送者重新发送消息。这种模式可以保证消息的可靠性,但需要消费者手动处理确认和NACK。
  • auto:自动ACK。Spring AMQP提供了一种自动的消息确认机制。它利用AOP(面向切面编程)对消息处理逻辑做了环绕增强。当业务正常执行时,Spring AMQP会自动返回ACK。当业务出现异常时,根据异常判断返回不同结果:业务异常,自动返回NACK;消息处理或校验异常,自动返回REJECT。

二、失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。 如果消费者持续出现异常,消息会不断地在队列中重新排队并重新发送,这可能会导致消息处理延迟和队列持续增长,给系统带来不必要的压力。

为了解决这个问题,Spring框架提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到MQ队列 。

在consumer服务的application.yml文件中添加配置:

spring:rabbitmq:listener:simple:retry:enabled: trueinitial-interval: 1000msmultiplier: 1max-attempts: 3stateless: true
enabled: true开启消费者失败重试
initial-interval: 1000ms初始的等待时长
multiplier: 1每次重试的等待时长是上次等待时长的倍数
max-attempts: 3最大重试次数
stateless: truetrue表示重试是无状态的,即每次重试都是独立的,不会考虑之前的重试状态。如果业务中包含事务,需要改为false。

通过这样的配置,当消费者出现异常时,消息会在本地进行重试,而不是无限期地重新排队发送。在达到最大重试次数后,SpringAMQP会抛出AmqpRejectAndDontRequeueException异常,并将消息从队列中删除。这意味着最后一次处理消息的结果是失败的,并且消息不会被重新排队发送给消费者。

这种失败重试机制可以有效地减少消息处理的延迟和队列的增长,提高系统的稳定性和可用性。当然,在使用失败重试机制时,也需要考虑到业务逻辑和异常处理的合理性,避免因过度重试而导致的问题。

三、失败处理策略

在失败重试机制中,当达到最大重试次数后,消息会被直接丢弃。尽管这在某些场景中可能是可接受的,但对于那些对消息可靠性要求极高的业务来说,这显然是一个潜在的风险点。

Spring AMQP为此提供了强大的支持,允许开发人员自定义重试次数耗尽后的消息处理策略。这个策略是由MessageRecovery接口来定义的,它有三种不同的实现方式:

  • RejectAndDontRequeueRecoverer:当重试次数耗尽后,直接拒绝消息,并丢弃该消息。这是默认的处理方式。
  • ImmediateRequeueMessageRecoverer:当重试次数耗尽后,返回一个NACK给生产者,使消息重新入队,以便再次发送。
  • RepublishMessageRecoverer:当重试次数耗尽后,可以将失败的消息投递到一个指定的交换机和队列中,这个交换机和队列专门用来存放异常的消息。

在处理策略中,一种比较合适的方式是使用RepublishMessageRecoverer。当消息失败后,它会被投递到一个特定的、专门用于存放异常消息的队列中。这个队列可以由人工进行集中处理,使得开发人员可以更精细地处理和诊断问题。

在consumer服务中定义处理失败消息的交换机和队列:

    @Beanpublic DirectExchange errorExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue");}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorExchange){return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");}

定义一个RepublishMessageRecoverer:

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

完整代码如下:

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorConfiguration {@Beanpublic DirectExchange errorExchange() {return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue() {return new Queue("error.queue");}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");}@Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

通过这样的配置,当消息在尝试多次重试后仍然失败时,它们会被自动投递到定义的异常消息队列中。这样就可以集中处理这些异常消息,进行进一步的诊断或处理。这种策略为开发人员在处理复杂分布式系统中的消息问题提供了一种更加专业和灵活的方式。

四、业务幂等性

在计算机科学和软件开发中,幂等性是一个重要的概念。简单来说,如果一个操作或函数不论执行一次还是多次,其结果都是相同的,那么称这个操作或函数是幂等的。在业务处理中,幂等性尤其关键。它可以保证系统的稳定性,确保在某些异常情况下,多次执行某个业务操作不会对业务状态产生不一致的结果。幂等性的重要性在于它能够避免因重复执行操作而产生的数据不一致、状态冲突等问题。在涉及金融交易、库存管理、用户认证等关键领域,幂等性是确保系统稳定和数据准确的重要前提。

1.通过唯一标识符保证操作的幂等性

为每个操作生成唯一的标识符(如ID),并在系统中跟踪这些标识符以检测重复操作。当接收到具有已知标识符的操作时,可以跳过重复的操作。Spring AMQP的MessageConverter自带了MessageID的功能,只要开启这个功能即可。

Jackson的消息转换器示例:

@Bean
public MessageConverter messageConverter(){// 定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 配置自动创建消息ID,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jjmc.setCreateMessageIds(true);return jjmc;
}

在publisher服务中编写测试类,并利用RabbitTemplate实现消息发送:

    @Testvoid testSendMessage2Queue() {String queueName = "demo.queue";String msg = "Idempotent Test";rabbitTemplate.convertAndSend(queueName, msg);}

运行测试用例,查看结果:

2.通过业务判断保证操作的幂等性

业务判断,是一种基于业务逻辑和状态的检查,以确定是否对重复的请求或消息进行处理。在多种业务场景中,这一策略的思路各有不同。

比如在支付订单的案例中,业务逻辑主要是支付并将订单状态从“未支付”修改为“已支付”(需要防止重复支付)。因此,在执行这一业务时,可以判断订单的状态是否为“未支付”。若状态不是“未支付”,则说明该订单已经被处理过,无需重复处理。与基于唯一标识符的方案相比,业务判断方案无需对原有数据库进行改造,因此更为推荐。

以支付修改订单的业务为例:

    @Overridepublic void markOrderPaySuccess(Long orderId) {// 查询订单Order order = getById(orderId);// 判断订单状态,订单不存在或者订单状态不是1,放弃处理if (order == null || order.getStatus() != 1) {return;}// 尝试更新订单order.setStatus(2);order.setPayTime(LocalDateTime.now());orderService.updateById(order);}

以上代码示例判断和更新是两步动作 ,极小概率下可能存在线程安全问题,所以可以进行以下修改:

    @Overridepublic void markOrderPaySuccess(Long orderId) {// UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1orderService.lambdaUpdate().set(Order::getStatus, 2).set(Order::getPayTime, LocalDateTime.now()).eq(Order::getId, orderId).eq(Order::getStatus, 1).update();}
 

总结

RabbitMQ是一个开源的消息队列软件,旨在提供可靠的消息传递和消息队列功能。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容,希望对大家有所帮助。

相关文章:

RabbitMQ入门指南(九):消费者可靠性

专栏导航 RabbitMQ入门指南 从零开始了解大数据 目录 专栏导航 前言 一、消费者确认机制 二、失败重试机制 三、失败处理策略 四、业务幂等性 1.通过唯一标识符保证操作的幂等性 2.通过业务判断保证操作的幂等性 总结 前言 RabbitMQ是一个高效、可靠的开源消息队列系…...

MySQL的聚合函数、MySQL的联合查询、MySQL的左连接右连接内连接

MySQL的聚合函数 MySQL聚合函数是在数据库中对数据进行聚合操作的函数。它们将多行数据作为输入,并返回单个值作为结果。 常用的MySQL聚合函数包括: COUNT:计算符合条件的行数。SUM:对指定列的数值进行求和操作。AVG&#xff1…...

RKNN Toolkit Lite2 一键安装和测试,sh脚本

RKNN Toolkit Lite2 安装和测试教程 本教程旨在指导用户如何使用提供的shell脚本来安装和测试RKNN Toolkit Lite2,适用于需要在Linux系统上部署和测试AI模型的开发者。 简介 RKNN Toolkit Lite2是一个高效的AI模型转换和推理工具包,专为Rockchip NPU设…...

探索中国制造API接口:解锁无限商机,引领制造业数字化转型

一、概述 中国制造API接口是一种应用程序接口,专门为中国制造行业提供数据和服务。通过使用API接口,开发者可以轻松地获取中国制造的商品信息、供应商数据、生产能力等,从而为他们的应用程序或网站提供更加丰富的内容和功能。 二、API接口的…...

CentOS上安装MySQL 8.0的详细教程

CentOS上安装MySQL 8.0的详细教程 大家好,我是免费搭建查券返利机器人赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿!今天我将为大家分享一篇关于在CentOS上安装MySQL 8.0的详细教程。MySQL是一个强大…...

[RISCV] 为android14添加一个新的riscv device

本篇博客将基于android-14-r18添加Sifive unmatched板子的支持。 Setup build envoronment Establishing a build environment $ sudo apt install git-core gnupg flex bison build-essential zip curl zlib1g-dev libc6-dev-i386 libncurses5 x11proto-core-dev libx11-de…...

【Fastadmin】通用排序weigh不执行model模型的事件

在model模型类支持的before_delete、after_delete、before_write、after_write、before_update、after_update、before_insert、after_insert事件行为中,我们可以快捷的做很多操作,如删除缓存、逻辑判断等 但是在fastadmin的通用排序weigh拖动中无法触发…...

logstash收集华为、H3C、Cisco交换机日志

网络设备配置 将 syslog-ip 替换成服务器的IP地址。 Huawei info-center loghost source interface info-center loghost syslog-ip local-time facility local6 H3C info-center loghost source interface info-center loghost syslog-ip facility local5 Aruba logging arm …...

云上荆楚丨云轴科技ZStack成功实践精选(湖北)

湖北自古以来有九省通衢的美称,地处长江中游,富有荆楚之美誉,灵秀之蕴意。2022年湖北数字经济强省三年行动计划正式印发,计划到“十四五”末,数字经济核心产业增加值力争达到7000亿元,占GDP的比重超过12%。…...

C语言字符串处理提取时间(ffmpeg返回的时间字符串)

【1】需求 需求:有一个 “00:01:33.90” 这样格式的时间字符串,需要将这个字符串的时间值提取打印出来(提取时、分、秒、毫秒)。 这个时间字符串从哪里来的? 是ffmpeg返回的时间,也就是视频的总时间。 下…...

NC(65)元数据增加字段

以报销单主表er_bxzb表为例,增加15个字段字段以及两个其他业务所需字段 1、先在er_bxzb增加字段 增加字段 alter table er_bxzb add no_invoice char(1) default(N);alter table er_bxzb add is_enabled_taxation_cloud char(1) default(N);alter table er_bxzb a…...

SParC数据集介绍

导语 SParC是Text-to-SQL领域的一个多轮查询数据集。本篇博客将对该数据集论文和数据格式进行简要介绍。 SParC数据集概述 SParC是一个跨领域的多轮Text-to-SQL数据集。它包含有4298个问题轮次,大约有12k的自然语言问句到SQL标注的Question-SQL对。这些问题来自于…...

OpenGL 绘制Mesh数据(Qt)

文章目录 一、简介二、实现代码三、实现效果一、简介 Mesh数据的结构主要就是点与三角面片,因此本质上仍然是对三角面片进行绘制。这里我们借助VCG这个库实现对Mesh数据的读取,这个库相对简单轻巧,很方便使用。 二、实现代码 由于修改的部分很多,我们逐一进行解释一下: --…...

9.传统的轨道画线算法()

轨道画线分为以下步骤: 1.读取摄像头图片 2.图片灰度处理,截取轨道区域的图片 3.中值滤波处理,并区域取均值后做期望差的绝对值。本人通过一些轨道图片实验,用这种方法二值化得到的效果比caany算子等方法的效果好 4.二值化后再…...

F (1164) : B DS二叉排序树_有效的二叉排序树

Description 给你一个二叉树,判断其是否是一个有效的二叉排序树。 有效的二叉排序树定义如下: 1. 结点的左子树只包含小于当前结点的数。 2. 结点的右子树只包含大于当前结点的数。 3. 所有左子树和右子树自身必须也是二叉排序树。 Input 第一行输…...

结合el-upload修改支持上传图片、视频并预览

结合element plus的el-upload标签&#xff0c;实现上传图片和视频&#xff0c;并支持在线预览和放大 1、html部分 <el-form-item label"活动照片、视频"><el-uploadv-model:file-list"state.photoList":action"state.uploadUrl"accept…...

1.SQL - 概述

1. SQL语句分类 • 数据定义语言&#xff1a;简称DDL(Data Definition Language)&#xff0c;用来定义数据库对象&#xff1a;数据库&#xff0c;表&#xff0c;列等。关键字&#xff1a;create&#xff0c;alter&#xff0c;drop等 • 数据操作语言&#xff1a;简称DML(Data …...

GaussDB数据库表创建行访问控制策略

目录 一、前言 二、GaussDB中的行访问控制 1、CREATE ROW LEVEL SECURITY POLICY语法 2、ALTER ROW LEVEL SECURITY POLICY语法 3、ROW LEVEL SECURITY策略与适配SQL语法关系 三、GaussDB中的行访问控制策略示例 1、实现GaussDB行访问控制的一般步骤 2、行访问控制策略…...

提升设备巡检效率的关键:易点易动设备管理系统的应用

随着互联网技术的发展,智慧设备管理已成为各行各业提升运营效率的重要选择。相比传统的手动巡检方式,采用设备管理系统可以实现物联网技术给企业带来更高效的运营方式。其中,易点易动作为一款成熟的设备管理系统,其广泛应用于提升设备巡检效率这一领域发挥了很好的作用。 采用易…...

【C++】STL 容器 - list 双向链表容器 ① ( 容器特点 | 容器操作时间复杂度 | 构造函数 )

文章目录 一、 list 双向链表容器简介1、容器特点2、容器操作时间复杂度3、遍历访问5、头文件 二、 list 双向链表容器 构造函数1、默认无参构造函数2、创建包含 n 个相同元素的 list 双向链表3、使用初始化列表构造 list 双向链表4、使用另外一个 list 容器 构造 list 双向链表…...

[C/C++]数据结构 希尔排序

&#x1f966;前言: 希尔排序也称 “缩小增量排序”&#xff0c;它也是一种插入类排序的方法,在学习希尔排序之前我们首先了解一下直接插入排序. 一: &#x1f6a9;直接插入排序 1.1 &#x1f31f;排序思路 直接插入排序的基本原理是将一条记录插入到已排好的有序表中&#x…...

SQL进阶:子查询

一般情况下,我们都是直接对表进行查询,但有时候,想要的数据可能通过一次select 获取不到,需要嵌套select,这样就形成了子查询。 子查询可以位于查询语句的任意位置,主要的注意点在于用于不同的位置,和不同的关键字一起使用时,需要注意返回的列的数量和行的数量。 位于…...

5、IDEA集成Git

IDEA集成Git 1. 配置Git忽略文件2. 定位Git程序3. 初始化本地库、添加暂存区、提交到本地库4. 切换版本5. 创建分支和切换分支6. 合并分支7. 解决冲突 1. 配置Git忽略文件 问题1&#xff1a;为什么要忽略他们&#xff1f; 与项目的实际功能无关&#xff0c;不参与服务器上部署…...

oracle数据库sqlplus登录卡顿

问题描述 新安装了一套oracle 11.2.0.1 版本的数据库服务器&#xff0c;出现了在服务器本地通过sqlplus / as sysdba登录的时候很快&#xff0c;但是通过监听登录的时候就非常的慢&#xff0c;卡顿&#xff0c;大概需要1分钟多的时间才能登进数据库。 之前安装了好几套oracle …...

【C#】Visual Studio 2022 远程调试配置教程

在某些特殊的情况下&#xff0c;开发机和调试机可能不是同一台设备&#xff0c;此时就需要远程调试了。 开发机配置 首先需要确保两台机器在同一局域网下。 创建共享文件夹 随便找个地方新建一个文件夹&#xff0c;用来放编译结果。例如我这里是 D:\DebuggingWorkspace\。 …...

LSTM的记忆能力实验

长短期记忆网络&#xff08;Long Short-Term Memory Network&#xff0c;LSTM&#xff09;是一种可以有效缓解长程依赖问题的循环神经网络&#xff0e;LSTM 的特点是引入了一个新的内部状态&#xff08;Internal State) 和门控机制&#xff08;Gating Mechanism&#xff09;&am…...

Unity之ShaderGraph如何实现瓶装水效果

前言 有一个场景在做效果时,有一个水瓶放到桌子上的设定,但是模型只做了个水瓶,里面是空的,所以我就想办法,如何做出来瓶中装睡的效果,最好是能跟随瓶子有液体流动的效果。 如下图所示: 水面实现 水面效果 液体颜色设置 因为液体有边缘颜色和内里面颜色,所以要分开…...

【python与机器学习3】感知机和门电路:与门,或门,非门等

目录 1 电子和程序里的与门&#xff0c;非门&#xff0c;或门&#xff0c;与非门 &#xff0c;或非门&#xff0c;异或门 1.1 基础电路 1.2 所有的电路情况 1.3 电路的符号 1.4 各种电路对应的实际电路图 2 各种具体的电路 2.1 与门&#xff08;and gate&#xff09; 2…...

关键字:extends关键字

在 Java 中&#xff0c;extends 是一个关键字&#xff0c;用于表示继承关系。当一个类使用 extends 关键字时&#xff0c;它表示该类是一个子类&#xff0c;并且继承了父类的属性和方法。 以下是 extends 关键字的解析&#xff1a; 语法&#xff1a; 描述&#xff1a; ChildC…...

KEPServerEX 6 之【外篇-1】PTC-ThingWorx服务端软件安装 Tomcat10本地安装

本文目标: 安装 Java 和 Apache Tomcat ,为ThingWorx安装做基础。 ----------------------------------------------------------------------- 安装重点 --------------------------------------------------------------------- 1. 安装 Java 11 / JDK 11 添加系…...