RabbitMQ高级特性--消息确认机制
目录
一、消息确认
1.消息确认机制
2.手动确认方法
二、代码示例
1. AcknowledgeMode.NONE
1.1 配置文件
1.2 生产者
1.3 消费者
1.4 运行程序
2.AcknowledgeMode.AUTO
3.AcknowledgeMode.MANUAL
一、消息确认
1.消息确认机制
生产者发送消息之后,到达消费端之后,可能会有以下情况:
1. 消息处理成功;
2. 消息处理异常。

RabbitMQ向消费者发送消息后,就会把这条消息删除掉,那么第二种情况就会造成消息丢失。
那么如何确保消息端已经被成功接收了并且被正确处理了呢?
为了确保消息从队列可靠的到达消费者,RabbitMQ提供了消息确认机制(Messageacknowledment)。
消费者在订阅队列时,可以指定autoAck参数,根据这个参数,消息确认机制分为以下两种:
自动确认:当autoAck等于true时,RabbitMQ会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正的接收到消息,自动确认模式适用于对于消息可靠性要求不高的场景。
手动确认:当autoAck等于false时,RabbitMQ会等待消费者显式的调用BasicAck命令,回复确认信号后才从内存(或者磁盘)中删除,这种方式适用于对消息可靠性要求较高的场景。
自动确认代码示例:
DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}
};
channel.basicConsume(Constants.TOPIC_QUEUE_NAME1, true, consumer);
当autoAck参数置为false,对于RabbitMQ服务器来说,队列中的消息分为了两个部分:
一是等待发送给消费者的消息;二是已经发送给消费者,但是还没收到消费者确认信号的消息。
如果RabbitMQ一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则RabbitMQ会重新安排这条消息进入队列,等待投递给下一个消费者,当然也有可能是原来的那个消费者。

从RabbitMQ的Web管理平台上也可以看到当前队列中Ready状态和Unacked状态的消息数。

Ready:等待投递给消费者的消息数。
Unacked:已经投递给消费者,但是未收到消费者确认信号的消息数。
2.手动确认方法
消费者在收到消息后,可以选择确认,也可以选择跳过或者直接拒绝确认,RabbitMQ也提供了不同的确认方法,消费者客户端可以调用与其对应的channel的相关方法,共有以下三种:
肯定确认: Channel.basicAck(long deliveryTag, boolean multiple);
RabbitMQ 已经知道该消息并且成功的处理消息,可以将其丢弃。
参数说明:
deliveryTag:消息的唯一标识,它是一个单调递增的64位的长整型值,deliveryTag是每个信道(Channel)独立维护的,所以在每个信道上都是唯一的,当消费者确认(ack)一条消息时,必须使用对应的信道进行确认。
multiple:是否批量确认,在某些情况下,为了减少网络流量,可以对一系列连续的deliveryTag进行批量确认,值为true则会一次性ack所以小于等于指定deliveryTag的消息,值为false,则只确认当前deliveryTag的消息。

否定确认: Channel.basicReject(long deliveryTag, boolean requeue); 参数说明:
deliveryTag:参考上文。
requeue:表示拒绝后,这条消息该如何处理,如果值为true那么,则RabbitMQ会将这条消息重新入队,重新发送给下一个订阅的消费者,值为false,则RabbitMQ会把这条消息从队列中移除,不会再发送给消费者。
否定确认: Channel.basicNack(long deliveryTag, boolean multiple,
boolean requeue); 参数说明:
参考上文
multiple参数设置为true则表⽰拒绝deliveryTag编号之前所有未被当前消费者确认的消息。
二、代码示例
我们基于SpringBoot来演示消息的确认机制,使用方式和方法与RabbitMQ Java Client有一定差异,
Spring AMQP对消息确认提供了三种策略:
public enum AcknowledgeMode {NONE,MANUAL,AUTO;
} 1. AcknowledgeMode.NONE
1.1 配置文件
spring:rabbitmq:addresses: amqp:listener:simple:acknowledge-mode: none
1.2 生产者
public class Constant {public static final String ACK_EXCHANGE_NAME = "ack_exchange";public static final String ACK_QUEUE = "ack_queue";
}
/*
以下为消费端⼿动应答代码⽰例配置
*/
@Bean("ackExchange")
public Exchange ackExchange(){return
ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build()
;
}
//2. 队列
@Bean("ackQueue")
public Queue ackQueue() {return QueueBuilder.durable(Constant.ACK_QUEUE).build();
}
//3. 队列和交换机绑定 Binding
@Bean("ackBinding")
public Binding ackBinding(@Qualifier("ackExchange") Exchange exchange,
@Qualifier("ackQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("ack").noargs();
}
import com.xiaowu.rabbitmq.constant.Constant;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/producer")
public class ProductController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack(){rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack",
"consumer ack test...");return "发送成功!";}
}
1.3 消费者
import com.xiaowu.rabbitmq.constant.Constant;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class AckQueueListener {//指定监听队列的名称@RabbitListener(queues = Constant.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throws
Exception {System.out.printf("接收到消息: %s, deliveryTag: %d%n", new
String(message.getBody(),"UTF-8"),
message.getMessageProperties().getDeliveryTag());//模拟处理失败int num = 3/0;System.out.println("处理完成");}
}
1.4 运行程序
启动生产者可以从RabbitMQ Web管理界面看到如下:

再启动消费者,控制台输出:
接收到消息: consumer ack test..., deliveryTag: 1
2024-04-29T17:03:57.797+08:00 WARN 16952 --- [ntContainer#0-1]
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
Listener method 'public void
com.xiaowu.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception
//....
管理界面:

可以看到消息处理失败但是消息已经从管理界面移除。
2.AcknowledgeMode.AUTO
将配置文件修改为:
spring:rabbitmq:addresses: amqp:listener:simple:acknowledge-mode: auto
再次启动程序,控制台不断输出错误信息:
接收到消息: consumer ack test..., deliveryTag: 1
2024-04-29T17:07:06.114+08:00 WARN 16488 --- [ntContainer#0-1]
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
Listener method 'public void
com.xiaowu.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception接收到消息: consumer ack test..., deliveryTag: 2
2024-04-29T17:07:07.161+08:00 WARN 16488 --- [ntContainer#0-1]
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
Listener method 'public void
com.xiaowu.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception接收到消息: consumer ack test..., deliveryTag: 3
2024-04-29T17:07:08.208+08:00 WARN 16488 --- [ntContainer#0-1]
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
Listener method 'public void
com.xiaowu.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception接收到消息: consumer ack test..., deliveryTag: 4
2024-04-29T17:07:09.254+08:00 WARN 16488 --- [ntContainer#0-1]
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
Listener method 'public void
com.xiaowu.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception
3.AcknowledgeMode.MANUAL
spring:rabbitmq:addresses: amqp:listener:simple:acknowledge-mode: manual
消费者手动确认逻辑:
import com.xiaowu.rabbitmq.constant.Constant;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class AckQueueListener {//指定监听队列的名称@RabbitListener(queues = Constant.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throws
Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//1. 接收消息System.out.printf("接收到消息: %s, deliveryTag: %d%n", new
String(message.getBody(),"UTF-8"),
message.getMessageProperties().getDeliveryTag());//2. 处理业务逻辑System.out.println("处理业务逻辑");//⼿动设置⼀个异常, 来测试异常拒绝机制
// int num = 3/0;//3. ⼿动签收channel.basicAck(deliveryTag, true);} catch (Exception e) {//4. 异常了就拒绝签收//第三个参数requeue, 是否重新发送, 如果为true, 则会重新发送,,若为false,
则直接丢弃channel.basicNack(deliveryTag, true,true);}}
}
这个代码运行的结果是正常的, 运行后消息会被签收: Ready为0, unacked为0。
异常时拒绝:
import com.xiaowu.rabbitmq.constant.Constant;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class AckQueueListener {//指定监听队列的名称@RabbitListener(queues = Constant.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throws
Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//1. 接收消息System.out.printf("接收到消息: %s, deliveryTag: %d%n", new
String(message.getBody(),"UTF-8"),
message.getMessageProperties().getDeliveryTag());//2. 处理业务逻辑System.out.println("处理业务逻辑");//⼿动设置⼀个异常, 来测试异常拒绝机制int num = 3/0;//3. ⼿动签收channel.basicAck(deliveryTag, true);} catch (Exception e) {//4. 异常了就拒绝签收//第三个参数requeue, 是否重新发送, 如果为true, 则会重新发送,,若为false,
则直接丢弃channel.basicNack(deliveryTag, true,true);}}
}
接收到消息: consumer ack test..., deliveryTag: 1
处理业务逻辑
接收到消息: consumer ack test..., deliveryTag: 2
处理业务逻辑
接收到消息: consumer ack test..., deliveryTag: 3
处理业务逻辑
接收到消息: consumer ack test..., deliveryTag: 4
处理业务逻辑
接收到消息: consumer ack test..., deliveryTag: 5
处理业务逻辑
接收到消息: consumer ack test..., deliveryTag: 6
处理业务逻辑
管理页面上unacked也是1:

相关文章:
RabbitMQ高级特性--消息确认机制
目录 一、消息确认 1.消息确认机制 2.手动确认方法 二、代码示例 1. AcknowledgeMode.NONE 1.1 配置文件 1.2 生产者 1.3 消费者 1.4 运行程序 2.AcknowledgeMode.AUTO 3.AcknowledgeMode.MANUAL 一、消息确认 1.消息确认机制 生产者发送消息之后,到达消…...
C++复试笔记(一)
Setw 是C中用于设置输出字段宽度的函数。当使用 setw(3) 时,它会设置紧接着的输出字段的最小宽度为3个字符。如果字段内容长度小于3,则会在左侧填充空格以达到指定宽度;如果内容长度大于或等于3,则全部内容将被输出,…...
K8s 1.27.1 实战系列(四)验证集群及应用部署测试
一、验证集群可用性 1、检查节点 kubectl get nodes ------------------------------------------------------ NAME STATUS ROLES AGE VERSION k8s-master Ready control-plane 3h48m v1.27.1 k8s-node1 Ready <none> …...
基于Spring Boot的健美操评分管理系统设计与实现(LW+源码+讲解)
专注于大学生项目实战开发,讲解,毕业答疑辅导,欢迎高校老师/同行前辈交流合作✌。 技术范围:SpringBoot、Vue、SSM、HLMT、小程序、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容:…...
H5页面在移动端自动横屏
首先需要再head标签添加这样一段代码 <meta name="viewport" content="width=device-width,height=device-width,initial-scale=1.0,user-scalable=no">因为需求是为了满足WEB端和手机端都可以查看整体效果 但由于UI没有设计移动端的样式 所以我想说…...
【从0到1搞懂大模型】神经网络的实现:数据策略、模型调优与评估体系(3)
一、数据集的划分 (1)按一定比例划分为训练集和测试集 我们通常取8-2、7-3、6-4、5-5比例切分,直接将数据随机划分为训练集和测试集,然后使用训练集来生成模型,再用测试集来测试模型的正确率和误差,以验证…...
从0到1入门RabbitMQ
一、同步调用 优势:时效性强,等待到结果后才返回 缺点: 拓展性差性能下降级联失败问题 二、异步调用 优势: 耦合度低,拓展性强异步调用,无需等待,性能好故障隔离,下游服务故障不影响…...
MySQL数据库复杂的增删改查操作
在前面的文章中,我们主要学习了数据库的基础知识以及基本的增删改查的操作。接下去将以一个比较实际的公司数据库为例子,进行讲解一些较为复杂且现时需求的例子。 基础知识: 一文清晰梳理Mysql 数据库基础知识_字段变动如何梳理清楚-CSDN博…...
点云软件VeloView开发环境搭建与编译
官方编译说明 LidarView / LidarView-Superbuild GitLab 我的编译过程: 安装vs2019,windows sdk,qt5.14.2(没安装到5.15.7),git,cmake3.31,python3.7.9,ninja下载放到…...
本地YARN集群部署
请先完成HDFS的前置部署,部署方式可查看:本地部署HDFS集群https://blog.csdn.net/m0_73641796/article/details/145998092?spm1001.2014.3001.5502 部署说明 组件配置文件启动进程备注Hadoop HDFS需修改 需启动: NameNode作为主节点 DataNode作为从节点 Secondary…...
STM32常见外设的驱动示例和代码解析
以下是针对STM32常见外设的驱动示例和代码解析,基于HAL库实现,适用于大多数STM32系列(如F1/F4/H7等),可根据具体型号调整引脚和时钟配置。 1. GPIO驱动 应用场景:控制LED、按键检测、继电器开关等。 示例代码: // 初始化LED(推挽输出) void LED_Init(void) {GPIO_In…...
使用数据库和缓存的时候,是如何解决数据不一致的问题的?
1.缓存更新策略 1.1. 缓存旁路模式(Cache Aside) 在应用里负责管理缓存,读取时先查缓存,如果命中了则返回缓存,如果未命中就查询数据库,然后返回缓存,返回缓存的同时把数据给写入缓存中。更新…...
VS Code C++ 开发环境配置
VS Code 是当前非常流行的开发工具. 本文讲述如何配置 VS Code 作为 C开发环境. 本文将按照如下步骤来介绍如何配置 VS Code 作为 C开发环境. 安装编译器安装插件配置工作区 第一个步骤的具体操作会因为系统不同或者方案不同而有不同的选择. 环境要求 首先需要立即 VS Code…...
使用OpenCV和MediaPipe库——实现人体姿态检测
目录 准备工作如何在Windows系统中安装OpenCV和MediaPipe库? 安装Python 安装OpenCV 安装MediaPipe 验证安装 代码逻辑 整体代码 效果展示 准备工作如何在Windows系统中安装OpenCV和MediaPipe库? 安装Python 可以通过命令行运行python --versio…...
JWT的学习
1、HTTP无状态及解决方案 HTTP一种是无状态的协议,每次请求都是一次独立的请求,一次交互之后就是陌生人。 以CSDN为例,先登录一次,然后浏览器退出,这个时候在进入CSDN,按理说服务器是不知道你已经登陆了&…...
elasticsearch是哪家的
Elasticsearch:数据搜索与分析的领航者 在当今这个信息爆炸的时代,快速且准确地处理海量数据成为了众多企业和组织追求的目标。而Elasticsearch正是在这个背景下脱颖而出的一款强大的开源搜索引擎。它是由位于美国加利福尼亚州的Elastic公司所开发和维护…...
《A++ 敏捷开发》- 18 软件需求
需求并不是关于需求 (Requirements are not really about requirements) 大家去公共图书馆寄存物品,以前都是扫二维码开箱,有些图书馆升级了使用指纹识别。 “是否新方法比以前好?”我问年轻的开发人员。 “当然用指纹识别好。新技术&#x…...
计算机网络:计算机网络的组成和功能
计算机网络的组成: 计算机网络的工作方式: 计算机网络的逻辑功能; 总结: 计算机网络的功能: 1.数据通信 2.资源共享 3.分布式处理:计算机网络的分布式处理是指将计算任务分散到网络中的多个节点(计算机或设备&…...
Upload-Labs-Linux 1-20
前端校验绕过:pass 01 两种思路:1.通过抓包,修改后缀 2.前端禁用js绕过前端后缀检验 首先写一个木马,改为图片格式GIF89a<?php eval($_POST[cmd])?>抓包之后改为PHP格式: 使用蚁剑连接木马,第一次尝…...
Compose笔记(八)--权限
这一节主要了解一下Compose中权限的申请,其中主要用到accompanist-permissions这个权限库,它是一个简化的Android Compose 中权限管理的库,如下使用: 栗子: 依赖添加 dependencies {implementation("com.google.accompani…...
uniapp 对接腾讯云IM群组成员管理(增删改查)
UniApp 实战:腾讯云IM群组成员管理(增删改查) 一、前言 在社交类App开发中,群组成员管理是核心功能之一。本文将基于UniApp框架,结合腾讯云IM SDK,详细讲解如何实现群组成员的增删改查全流程。 权限校验…...
PHP和Node.js哪个更爽?
先说结论,rust完胜。 php:laravel,swoole,webman,最开始在苏宁的时候写了几年php,当时觉得php真的是世界上最好的语言,因为当初活在舒适圈里,不愿意跳出来,就好比当初活在…...
MongoDB学习和应用(高效的非关系型数据库)
一丶 MongoDB简介 对于社交类软件的功能,我们需要对它的功能特点进行分析: 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具: mysql:关系型数据库&am…...
Objective-C常用命名规范总结
【OC】常用命名规范总结 文章目录 【OC】常用命名规范总结1.类名(Class Name)2.协议名(Protocol Name)3.方法名(Method Name)4.属性名(Property Name)5.局部变量/实例变量(Local / Instance Variables&…...
前端开发面试题总结-JavaScript篇(一)
文章目录 JavaScript高频问答一、作用域与闭包1.什么是闭包(Closure)?闭包有什么应用场景和潜在问题?2.解释 JavaScript 的作用域链(Scope Chain) 二、原型与继承3.原型链是什么?如何实现继承&a…...
Java多线程实现之Thread类深度解析
Java多线程实现之Thread类深度解析 一、多线程基础概念1.1 什么是线程1.2 多线程的优势1.3 Java多线程模型 二、Thread类的基本结构与构造函数2.1 Thread类的继承关系2.2 构造函数 三、创建和启动线程3.1 继承Thread类创建线程3.2 实现Runnable接口创建线程 四、Thread类的核心…...
代理篇12|深入理解 Vite中的Proxy接口代理配置
在前端开发中,常常会遇到 跨域请求接口 的情况。为了解决这个问题,Vite 和 Webpack 都提供了 proxy 代理功能,用于将本地开发请求转发到后端服务器。 什么是代理(proxy)? 代理是在开发过程中,前端项目通过开发服务器,将指定的请求“转发”到真实的后端服务器,从而绕…...
Spring是如何解决Bean的循环依赖:三级缓存机制
1、什么是 Bean 的循环依赖 在 Spring框架中,Bean 的循环依赖是指多个 Bean 之间互相持有对方引用,形成闭环依赖关系的现象。 多个 Bean 的依赖关系构成环形链路,例如: 双向依赖:Bean A 依赖 Bean B,同时 Bean B 也依赖 Bean A(A↔B)。链条循环: Bean A → Bean…...
力扣热题100 k个一组反转链表题解
题目: 代码: func reverseKGroup(head *ListNode, k int) *ListNode {cur : headfor i : 0; i < k; i {if cur nil {return head}cur cur.Next}newHead : reverse(head, cur)head.Next reverseKGroup(cur, k)return newHead }func reverse(start, end *ListNode) *ListN…...
群晖NAS如何在虚拟机创建飞牛NAS
套件中心下载安装Virtual Machine Manager 创建虚拟机 配置虚拟机 飞牛官网下载 https://iso.liveupdate.fnnas.com/x86_64/trim/fnos-0.9.2-863.iso 群晖NAS如何在虚拟机创建飞牛NAS - 个人信息分享...
