当前位置: 首页 > news >正文

RabbitMQ高级特性 - 消费者消息确认机制

文章目录

  • RabbitMQ 消息确认机制
    • 背景
    • 消费者消息确认机制
      • 概述
      • 手动确认(RabbitMQ 原生 SDK)
      • 手动确认(Spring-AMQP 封装 RabbitMQ SDK)
        • AcknowledgeMode.NONE
        • AcknowledgeMode.AUTO(默认)
        • AcknowledgeMode.MANUAL
        • MANUAL 可能会引发的问题

RabbitMQ 消息确认机制


背景

在这里插入图片描述

上图中可以看出,从生产者发送消息消费者接收到消息并正确处理,这些里路线都可能会出现问题,那么为了保证这些消息最后能被正确处理,RabbitMQ 就提供了消息确认机制.

消费者消息确认机制

概述

在这里插入图片描述
为了保证消息从 队列 到 消费者正确消费,那么就引入了消费者消息确认机制.

a)消费者在订阅队列时,可以指定 autoAck 参数,根据这个参数设置,消息确认机制分为以下两种(以下讲到的方法和参数来自于 RabbitMQ 原生的 SDK,非 Spring 提供).

  • 自动确认:当 autoAck = true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后不管消费者是否真正的消费这些消息,都会从内存中删除.(适合对消息可靠性要求不高的场景).
  • 手动确认:当 autoAck = false 时,RabbitMQ 会等待消费者显示的调用 Basic.Ack 命令(波安排时间哦且确认消息),然后才会从 内存或磁盘 中删除消息.(适合对消息可靠性要求高的场景).

Ps:可靠性高了,性能也就下降了,所以请综合考虑.

b)对于 MQ队列 中的消息,在 MQ管理平台上可以看到以下两种类别:
在这里插入图片描述
Ready:队列已经准备好消息,随时准备发送给消费者 的消息数量(只要消费者来要,就立刻发送).
Unacked:消息已经发送给消费者,但是消费者没有返回消息确认 的消息数量(消息确认包括 ack肯定确认nack否定确认

手动确认(RabbitMQ 原生 SDK)

消费者在收到消息之后,可以选择确认,也可以选择拒绝或者跳过,RabbitMQ因此提供了不同的确认应答方式,消费者客户端可通过调用 channel 的相关方法实现.

a)肯定确认:消费者已经接收到消息,并且成功处理消息,可以将其丢弃了.

Channel.basicAck(long deliveryTag, boolean multiple)
  • deliveryTag:消息的唯一标识(单调递增的 long),特点如下:
    • deliveryTag 是每个 Channel 通道独立维护,所以每个通道上的都是唯一的(生产者 和 Broker 建立一个 channel 会生成一个 deliverTag,消费者 和 Broker 建立一个 channel 会生成一个 deliverTag,这俩 deliverTag 是不同的).
    • 当消费者 ack确认 一条消息时,必须使用对应的通道上 deliveryTag 进行确认.
  • multiple:是否批量确认. 如果值为 true,那么就会一次性 ack确认 所有小于或等于指定的 deliveryTag 的消息,大大减少了网络开销
    • 假设 deliveryTag = 8,multiple = true:那么 deliveryTag <= 8 的消息都会被确认.
    • 假设 deliveryTag = 8,multiple = false:只确认 8.

Ps:deliveryTag 确保了消息传递的可靠性和顺序性.

b)否定确认(单个):用来拒绝这个消息. 被拒绝的消息如何处理,具体要看 requeue 参数.

Channel.basicReject(long deliveryTag, boolean requeue)
  • requeue:标识拒绝后,这条消息如何处理.
    • requeue = true:消息会重新存入队列,将来会发送给下一个订阅的消费者.
    • requeue = false:消息会从队列中移除,因此不会发送给消费者.

c)否定确认(批量):Channel.basicReject 只能拒绝一条消息,如果要批量拒绝消息,就可以使用 Channel.basicNack.

Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)

multiple:参数设置为 true 则表示拒绝 deliveryTag 编号之前所有未被当前消费者确认的消息.

d)MQ 的管理平台上也提供了几种确认方式.
在这里插入图片描述

手动确认(Spring-AMQP 封装 RabbitMQ SDK)

Spring-AMQP 对消息确认提供了三种策略:

public enum AcknowledgeMode {NONE,MANUAL,AUTO;
}

这里根 RabbitMQ 原生 SDK 是有些不同的.

AcknowledgeMode.NONE

不管消费者是否成功处理了消息,RabbitMQ 都会自动确认消息,然后从 队列 中移除消息.

a)配置手动确认

spring:application:name: rabbitmqrabbitmq:host: env-baseport: 5672username: rootpassword: 1111listener:simple:acknowledge-mode: none

b)生产者接口

@RestController
@RequestMapping("/mq")
class MQApi(val rabbitTemplate: RabbitTemplate
) {@RequestMapping("/ack")fun ack(): String {rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1")return "ok"}}

c)消费者

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset@Component
class AckListener {@RabbitListener(queues = [MQConst.ACK_QUEUE])fun handMessage(message: Message,channel: Channel,) {println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, ${message.messageProperties.deliveryTag}")//业务处理...println("业务逻辑处理完成")}}

在这里插入图片描述

d)效果演示
触发接口之后,回到 MQ 管理平台,可以看到队列中消息已经被删除.
在这里插入图片描述

AcknowledgeMode.AUTO(默认)

分为以下情况:

  • 消费者处理消息过程中没有抛出异常,则自动确认消息,然后从 队列 中移除消息.
  • 消费者处理消息过程中抛出异常,则不会确认消息,消息会重返队列,并且不断重试(MQ 管理平台中 Unacked +1).

a)配置文件

spring:application:name: rabbitmqrabbitmq:host: env-baseport: 5672username: rootpassword: 1111listener:simple:acknowledge-mode: auto

b)生产者接口

@RestController
@RequestMapping("/mq")
class MQApi(val rabbitTemplate: RabbitTemplate
) {@RequestMapping("/ack")fun ack(): String {rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1")return "ok"}}

c)消费者(正常处理消息)

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset@Component
class AckListener {@RabbitListener(queues = [MQConst.ACK_QUEUE])fun handMessage(message: Message,channel: Channel,) {println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, ${message.messageProperties.deliveryTag}")//业务处理...println("业务逻辑处理完成")}}

效果如下:
在这里插入图片描述
在这里插入图片描述
d)消费者(异常处理消息)

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset@Component
class AckListener {@RabbitListener(queues = [MQConst.ACK_QUEUE])fun handMessage(message: Message,channel: Channel,) {println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, ${message.messageProperties.deliveryTag}")//业务处理...val a = 1 / 0println("业务逻辑处理完成")}}

效果如下:
消息未被确认,会不断重返队列,进行重试,因此 IDEA 中会循环报错输出.
在这里插入图片描述

AcknowledgeMode.MANUAL

分为以下情况:

  • 消费者在处理完消息后显示调用 basicAck 方法 来确认消息,然后从 队列 中移除消息.
  • 消费者在处理完消息后显示调用 basicNack 方法 来否定确认消息,是否从队列中移除消息需要看 requeue 参数的值
    • requeue = true:重返队列,不断重试.
    • requeue = false:丢弃消息.
  • 消费者在处理完消息后什么都不做,则不会确认消息,消息会重返队列,并且不断重试(MQ 管理平台中 Unacked +1).

a)配置文件

spring:application:name: rabbitmqrabbitmq:host: env-baseport: 5672username: rootpassword: 1111listener:simple:acknowledge-mode: manual

b)生产者接口

@RestController
@RequestMapping("/mq")
class MQApi(val rabbitTemplate: RabbitTemplate
) {@RequestMapping("/ack")fun ack(): String {rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1")return "ok"}}

c)消费者(异常处理消息,requeue = true)

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset@Component
class AckListener {@RabbitListener(queues = [MQConst.ACK_QUEUE])fun handMessage(message: Message,channel: Channel,) {val deliveryTag = message.messageProperties.deliveryTagtry {println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, $deliveryTag")//业务处理...val a = 1 / 0println("业务逻辑处理完成")channel.basicAck(deliveryTag, false)} catch (e: Exception) {channel.basicNack(deliveryTag, false, true) //requeue: true}}}

由于消息处理异常,发送 nack,并且 requeue = true,因此消息会重返队列,不断重试.
在这里插入图片描述
在这里插入图片描述

d)消费者(异常处理消息,requeue = false)

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset@Component
class AckListener {@RabbitListener(queues = [MQConst.ACK_QUEUE])fun handMessage(message: Message,channel: Channel,) {val deliveryTag = message.messageProperties.deliveryTagtry {println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, $deliveryTag")//业务处理...val a = 1 / 0println("业务逻辑处理完成")channel.basicAck(deliveryTag, false)} catch (e: Exception) {channel.basicNack(deliveryTag, false, false) //requeue: false}}}

由于消息处理异常,发送 nack,并且 requeue = false,因此消息不会重返队列,消息被丢弃.
在这里插入图片描述
d)消费者(正常处理消息)

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset@Component
class AckListener {@RabbitListener(queues = [MQConst.ACK_QUEUE])fun handMessage(message: Message,channel: Channel,) {val deliveryTag = message.messageProperties.deliveryTagtry {println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, $deliveryTag")//业务处理...println("业务逻辑处理完成")channel.basicAck(deliveryTag, false)} catch (e: Exception) {channel.basicNack(deliveryTag, false, false) //requeue: false}}}

消息被正常处理,返回 ack.
在这里插入图片描述

MANUAL 可能会引发的问题

在这里插入图片描述
如果这里捕获的不是 Exception 异常,那么消费者处理消息的时候,可能会引发一些不会被捕获的异常,就会导致没有返回 nack.
也就意味着,没有进行确认应答,那么 mq管理平台 上就会显示 Unacked 数值 +1.

Ps:具体还是需要根据业务场景而定

在这里插入图片描述

相关文章:

RabbitMQ高级特性 - 消费者消息确认机制

文章目录 RabbitMQ 消息确认机制背景消费者消息确认机制概述手动确认&#xff08;RabbitMQ 原生 SDK&#xff09;手动确认&#xff08;Spring-AMQP 封装 RabbitMQ SDK&#xff09;AcknowledgeMode.NONEAcknowledgeMode.AUTO&#xff08;默认&#xff09;AcknowledgeMode.MANUAL…...

PermX-htb

0x01 立足 信息收集 端口扫描 nmap -sSCV -Pn 10.10.11.23 正常开启22和80端口 访问web页面 并没有看到有攻击点 这个页面可先记录一会儿有需要的话可以尝试xss获取cookie 域名扫描 ffuf -w 1.txt -u http://permx.htb/ -H Host:FUZZ.permx.htb 这里用的ffuf扫描工具 扫出了…...

解密RCE漏洞:原理剖析、复现与代码审计实战

在网络安全领域&#xff0c;远程代码执行&#xff08;RCE&#xff09;漏洞因其严重性和破坏力而备受关注。RCE漏洞允许攻击者在目标系统上执行任意代码&#xff0c;从而掌控整个系统&#xff0c;带来极大的安全风险。理解RCE漏洞的工作原理&#xff0c;并掌握其复现与代码审计技…...

打造智能家居:用React、Node.js和WebSocket构建ESP32设备控制面板(代码说明)

一、项目概述 在物联网&#xff08;IoT&#xff09;时代&#xff0c;智能设备的远程控制变得越来越重要。本文介绍了一个构建智能设备控制面板的项目&#xff0c;允许用户通过 Web 应用来控制多个 ESP32 设备。用户可以通过该面板查看设备列表&#xff0c;实时了解设备状态&am…...

计网:从输入URL到网页显示期间发生了什么

1、URL包含的信息 我们输入的url中包含着一些信息&#xff1a; http&#xff1a;表示的此次我们使用的什么协议/www.baidu.com&#xff1a;表示的是我们想要访问的服务器名称&#xff0c;也就是域名dir3/home.html&#xff1a;表示我们所要访问的资源 2、通过DNS解析URL获得I…...

龚宇引以为傲的“爆款制造营”,爱奇艺怕是要爽约了

文&#xff1a;互联网江湖 作者&#xff1a;刘致呈 人们经常用人红戏不红&#xff0c;来形容毯星&#xff0c;综艺上咋咋呼呼&#xff0c;一提都知道&#xff0c;可问及代表作&#xff0c;不好意思&#xff0c;这个真没有。 今年的爱奇艺&#xff0c;貌似也迎来了这一宿命。 …...

org.springframework.web.client.HttpClientErrorException$NotFound异常

springCloud报错信息&#xff1a;org.springframework.web.client.HttpClientErrorException$NotFound: 404 null第一点&#xff1a; 第二点&#xff1a;没有httpclient工具类 注入RestTmeplate类时&#xff0c;改类需要RestController或ResponseBody...

在开关电源转换器中充分利用碳化硅器件的性能优势

在过去的几十年中&#xff0c;半导体行业已经采取了许多措施来改善基于硅 MOSFET &#xff08;parasitic parameters&#xff09;&#xff0c;以满足开关转换器(开关电源)设计人员的需求。行业效率標準以及市场对效率技术需求的双重作用&#xff0c;导致了对于可用于构建更高效…...

QObject::connect: Cannot queue arguments of type ‘QList<QString>‘

QObject::connect: Cannot queue arguments of type ‘QList’ QObject::connect: Cannot queue arguments of type QList<QString> (Make sure QList<QString> is registered using qRegisterMetaType().)使用信号和槽时&#xff0c;QList无法当做参数被传递&…...

基于K8S部署安装Jenkins

基于K8S部署安装Jenkins 1.Jenkins Kubernetes 清单文件2.Kubernetes Jenkins 部署1&#xff1a;为 Jenkins 创建 Namespace。 最好将所有DevOps工具分类为与其他应用程序分开的命名空间。2&#xff1a;创建“serviceAccount.yaml”文件并复制以下管理员服务帐户清单。1. kubec…...

24-8-4-读书笔记(十三)-《莎士比亚全集》(第一卷(续)) [英] 威廉·莎士比亚 [译]朱生豪

文章目录 《莎士比亚全集》(第一卷(续))目录阅读笔记记录总结《莎士比亚全集》(第一卷(续)) 《莎士比亚全集》朱生豪的经典译本,非常值得花时间去读一读,莎氏的巨作有其独特的韵味,与莫里哀、契诃夫、曹禺等其他国家的剧作家有其鲜明的特点,这既是源于其所处的时代…...

linux nicstat

nicstat 是一个用于监控和报告网络接口统计信息的工具。它可以提供关于网络接口的详细性能数据&#xff0c;包括传输速率、错误率、丢包率等。nicstat 对于诊断网络性能问题和优化网络配置非常有用。 安装 nicstat nicstat 可能不在所有Linux发行版的默认软件库中&#xff0c…...

程序员如何积累人脉?光靠技术不行了~

从事技术的人&#xff0c;还没被社会“塑造”前&#xff0c;总会有一个“固有思维”&#xff0c;就是这个世界大概率是“由代码和逻辑主宰的世界”&#xff0c;人脉积累并不在考虑范围内&#xff0c;而我们也常被误解为只懂得与机器对话的technician。 事实上&#xff0c;游戏…...

初识增强现实(AR)

初识增强现实&#xff08;AR&#xff09; 笔记来源&#xff1a; 1.2023年中国增强现实&#xff08;AR&#xff09;行业研究报告 2.wiki/Augmented reality 3.In-Depth Review of Augmented Reality: Tracking Technologies, Development Tools, AR Displays, Collaborative AR…...

开关电源起振是什么看了就知道

接触开关电源的朋友都知道&#xff0c;含有电源管理芯片的开关电源有输入&#xff0c;没输出时常说是不是电路没起振&#xff0c;到底这句话是什么意思呢&#xff1f;什么是“起振”先不做 的解释&#xff0c;简单打个比方&#xff0c;大家就容易懂了&#xff0c;就好像抢救心…...

Modbus_Ascii协议

设备必须要有RTU协议&#xff01;这是Modbus协议上规定的&#xff0c;且默认模式必须是RTU&#xff0c;ASCII作为选项。&#xff08;也就是说&#xff0c;一般的设备只有RTU这个协议&#xff0c;ASCII一般很少&#xff09;所以说&#xff0c;一般学习Modbus协议&#xff0c;只需…...

树莓派在功能和成本之间的 “惊人平衡 “支持了全球数字标牌的成功故事!

树莓派的“功能和成本之间的惊人平衡”支撑全球数字标牌成功故事 数字标牌已经成为一个数十亿美元的行业。Yodeck很快预测到了其中的潜力&#xff1a;他们需要硬件来支持他们可靠、具有成本效益和易于管理的服务&#xff0c;而不会影响性能。事实证明&#xff0c;树莓派 4 证明…...

C++ 学习记录

文章目录 继承重载和重写区别重载重写 参考文献 继承 继承顾名思义就是对长辈本有的东西进行获取与使用&#xff0c;即两个以及两个类以上的关系在获取与使用时会存在一些情况&#xff1a; public&#xff1a;长辈对外公开的自身所有物&#xff0c;最终都会是后代的protected&…...

C#中的TCP和UDP

TcpClient TCP客户端 UDP客户端 tcp和udp的区别 TCP&#xff08;传输控制协议&#xff09;和UDP&#xff08;用户数据报协议&#xff09;是两种在网络通信中常用的传输层协议&#xff0c;它们在C#或任何其他编程语言中都具有相似的特性。下面是TCP和UDP的主要区别&#xff1a;…...

Spring中使用嵌套事务及事务保存点

嵌套事务及事务保存点 Spring中的嵌套事务与事务保存点1. 什么是嵌套事务&#xff1f;2. 为什么使用嵌套事务&#xff1f;3. 如何在Spring中使用嵌套事务&#xff1f;4. 使用事务保存点5. 总结 Spring框架提供了强大的事务管理功能&#xff0c;包括对嵌套事务的支持。在Spring中…...

SFT、RLHF、DPO、IFT —— LLM 微调的进化之路

TL;DR • SFT、RLHF 和 DPO 都是先估计 LLMs 本身的偏好&#xff0c;再与人类的偏好进行对齐&#xff1b; • SFT 只通过 LLMs 生成的下一个单词进行估计&#xff0c;而 RLHF 和 DPO 通过 LLMs 生成的完整句子进行估计&#xff0c;显然后者的估计会更准确&#xff1b; • 虽然…...

【数据结构】LinkedList与链表

目录 链表 1、链表的概念及结构 2、LinkedList的使用 2、1什么是LinkedList 2、2LinkedList的使用 3、LinkedList的遍历 4、LinkedList的模拟实现 5、ArrayList和LinkedList的区别 上篇已经熟悉了ArrayList的使用&#xff0c;ArrayList底层使用数组来存储元素。由于其底层…...

《LeetCode热题100》---<5.①普通数组篇五道>

本篇博客讲解LeetCode热题100道普通数组篇中的五道题 第一道&#xff1a;最大子数组和&#xff08;中等&#xff09; 第二道&#xff1a;合并区间&#xff08;中等&#xff09; 第一道&#xff1a;最大子数组和&#xff08;中等&#xff09; 法一&#xff1a;贪心算法 class So…...

根据id查找树形结构中匹配数据与上级所有数据

背后 在用户管理业务开发过程中&#xff0c;通常需要查询出用户管理的菜单数据和当前菜单的所有上级数据。为了方便后续的cv工作&#xff0c;我打算把这种方法记录下来&#xff0c;以备不时之需. 代码实现细节 Data public class MenuDTO {Schema(description "菜单id&qu…...

探索亚马逊Amazon S3:无缝存储管理与极速数据传输的奥秘

亚马逊云科技中Amazon S3&#xff0c;因其设计简单与高度可靠&#xff0c;允许用户通过互联网存储和检索任意数量的数据&#xff0c;并能够自动扩展以满足各种规模的需求&#xff0c;使得Amazon S3成为了许多云计算应用和网站的核心存储基础设施之一&#xff0c;Amazon S3提供的…...

Linux_监测CPU和内存

通过TOP持续获取进程的CPU和内存消耗&#xff0c;并写入到表格 # 配置进程名 processvm-agent # 配置次数 number100 # 配置间隔时间 time5 # csv结果文件 filecm_$(date %s).csv echo "%CPU,%MEM">${file} pid$(ps -aux | grep ${process} | awk -F {OFS"…...

OpenCV经典案例:01 答题卡识别

目录 透视变换矫正 选项识别匹配 QT 界面设计 引言&#xff1a;随着信息化的发展&#xff0c;计算机阅卷已经成为一种常规操作。在大型考试中&#xff0c;客观题基本不再 需要人工阅卷。本项目旨在开发一个基于OpenCV的高效答题卡识别系统&#xff0c;通过先进的图像处理和模…...

进程的管理与控制详解:创建、终止、阻塞等待与非阻塞等待

目录 一、进程创建 1、实例 2、fork函数详解 (1)fork函数模板 (2). fork() 函数的工作原理 (3). fork() 返回值和错误处理 3、如何理解进程创建过程 二、进程终止 1、终止是在做什么&#xff1f; 2、进程终止&#xff0c;有三种情况 3、进程如何终止&#xff1f; 三…...

【从零开始一步步学习VSOA开发】开发环境搭建

开发环境搭建 开发 VSOA 首先需要搭建开发环境&#xff0c;这里讲解 Windows 下 C/C 开发环境搭建方法。 下载 IDE 并申请授权码 SylixOS 的开发和部署需要 RealEvo-IDE 的支持&#xff0c;因此您需要先获取 RealEvo-IDE 的安装包和注册码。 RealEvo-IDE 分为体验版和商业版…...

一篇文章让你用我的世界中的红石搞懂什么是ALU!

目录 1.一些在开始的约定 2.七大逻辑门电路 1、 与门 2、 或门 3、 非门 5、 或非门 6、 异或门 7、 同或门 3.半加器 4.全加器 5.ALU 1.一些在开始的约定 相同的概念&#xff1a;相同的概念&#xff1a;高电平低电平逻辑真逻辑假 开关的开 开关的关 灯的亮 灯…...