RabbitMQ高级特性--消息确认机制
目录
一、消息确认
1.消息确认机制
2.手动确认方法
二、代码示例
1. AcknowledgeMode.NONE
1.1 配置文件
1.2 生产者
1.3 消费者
1.4 运行程序
2.AcknowledgeMode.AUTO
3.AcknowledgeMode.MANUAL
一、消息确认
1.消息确认机制
生产者发送消息之后,到达消费端之后,可能会有以下情况:
1. 消息处理成功;
2. 消息处理异常。

RabbitMQ向消费者发送消息后,就会把这条消息删除掉,那么第二种情况就会造成消息丢失。
那么如何确保消息端已经被成功接收了并且被正确处理了呢?
为了确保消息从队列可靠的到达消费者,RabbitMQ提供了消息确认机制(Messageacknowledment)。
消费者在订阅队列时,可以指定autoAck参数,根据这个参数,消息确认机制分为以下两种:
自动确认:当autoAck等于true时,RabbitMQ会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正的接收到消息,自动确认模式适用于对于消息可靠性要求不高的场景。
手动确认:当autoAck等于false时,RabbitMQ会等待消费者显式的调用BasicAck命令,回复确认信号后才从内存(或者磁盘)中删除,这种方式适用于对消息可靠性要求较高的场景。
自动确认代码示例:
DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}
};
channel.basicConsume(Constants.TOPIC_QUEUE_NAME1, true, consumer);
当autoAck参数置为false,对于RabbitMQ服务器来说,队列中的消息分为了两个部分:
一是等待发送给消费者的消息;二是已经发送给消费者,但是还没收到消费者确认信号的消息。
如果RabbitMQ一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则RabbitMQ会重新安排这条消息进入队列,等待投递给下一个消费者,当然也有可能是原来的那个消费者。

从RabbitMQ的Web管理平台上也可以看到当前队列中Ready状态和Unacked状态的消息数。

Ready:等待投递给消费者的消息数。
Unacked:已经投递给消费者,但是未收到消费者确认信号的消息数。
2.手动确认方法
消费者在收到消息后,可以选择确认,也可以选择跳过或者直接拒绝确认,RabbitMQ也提供了不同的确认方法,消费者客户端可以调用与其对应的channel的相关方法,共有以下三种:
肯定确认: Channel.basicAck(long deliveryTag, boolean multiple);
RabbitMQ 已经知道该消息并且成功的处理消息,可以将其丢弃。
参数说明:
deliveryTag:消息的唯一标识,它是一个单调递增的64位的长整型值,deliveryTag是每个信道(Channel)独立维护的,所以在每个信道上都是唯一的,当消费者确认(ack)一条消息时,必须使用对应的信道进行确认。
multiple:是否批量确认,在某些情况下,为了减少网络流量,可以对一系列连续的deliveryTag进行批量确认,值为true则会一次性ack所以小于等于指定deliveryTag的消息,值为false,则只确认当前deliveryTag的消息。

否定确认: Channel.basicReject(long deliveryTag, boolean requeue); 参数说明:
deliveryTag:参考上文。
requeue:表示拒绝后,这条消息该如何处理,如果值为true那么,则RabbitMQ会将这条消息重新入队,重新发送给下一个订阅的消费者,值为false,则RabbitMQ会把这条消息从队列中移除,不会再发送给消费者。
否定确认: Channel.basicNack(long deliveryTag, boolean multiple,
boolean requeue); 参数说明:
参考上文
multiple参数设置为true则表⽰拒绝deliveryTag编号之前所有未被当前消费者确认的消息。
二、代码示例
我们基于SpringBoot来演示消息的确认机制,使用方式和方法与RabbitMQ Java Client有一定差异,
Spring AMQP对消息确认提供了三种策略:
public enum AcknowledgeMode {NONE,MANUAL,AUTO;
} 1. AcknowledgeMode.NONE
1.1 配置文件
spring:rabbitmq:addresses: amqp:listener:simple:acknowledge-mode: none
1.2 生产者
public class Constant {public static final String ACK_EXCHANGE_NAME = "ack_exchange";public static final String ACK_QUEUE = "ack_queue";
}
/*
以下为消费端⼿动应答代码⽰例配置
*/
@Bean("ackExchange")
public Exchange ackExchange(){return
ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build()
;
}
//2. 队列
@Bean("ackQueue")
public Queue ackQueue() {return QueueBuilder.durable(Constant.ACK_QUEUE).build();
}
//3. 队列和交换机绑定 Binding
@Bean("ackBinding")
public Binding ackBinding(@Qualifier("ackExchange") Exchange exchange,
@Qualifier("ackQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("ack").noargs();
}
import com.xiaowu.rabbitmq.constant.Constant;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/producer")
public class ProductController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack(){rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack",
"consumer ack test...");return "发送成功!";}
}
1.3 消费者
import com.xiaowu.rabbitmq.constant.Constant;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class AckQueueListener {//指定监听队列的名称@RabbitListener(queues = Constant.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throws
Exception {System.out.printf("接收到消息: %s, deliveryTag: %d%n", new
String(message.getBody(),"UTF-8"),
message.getMessageProperties().getDeliveryTag());//模拟处理失败int num = 3/0;System.out.println("处理完成");}
}
1.4 运行程序
启动生产者可以从RabbitMQ Web管理界面看到如下:

再启动消费者,控制台输出:
接收到消息: consumer ack test..., deliveryTag: 1
2024-04-29T17:03:57.797+08:00 WARN 16952 --- [ntContainer#0-1]
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
Listener method 'public void
com.xiaowu.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception
//....
管理界面:

可以看到消息处理失败但是消息已经从管理界面移除。
2.AcknowledgeMode.AUTO
将配置文件修改为:
spring:rabbitmq:addresses: amqp:listener:simple:acknowledge-mode: auto
再次启动程序,控制台不断输出错误信息:
接收到消息: consumer ack test..., deliveryTag: 1
2024-04-29T17:07:06.114+08:00 WARN 16488 --- [ntContainer#0-1]
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
Listener method 'public void
com.xiaowu.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception接收到消息: consumer ack test..., deliveryTag: 2
2024-04-29T17:07:07.161+08:00 WARN 16488 --- [ntContainer#0-1]
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
Listener method 'public void
com.xiaowu.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception接收到消息: consumer ack test..., deliveryTag: 3
2024-04-29T17:07:08.208+08:00 WARN 16488 --- [ntContainer#0-1]
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
Listener method 'public void
com.xiaowu.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception接收到消息: consumer ack test..., deliveryTag: 4
2024-04-29T17:07:09.254+08:00 WARN 16488 --- [ntContainer#0-1]
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
Listener method 'public void
com.xiaowu.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception
3.AcknowledgeMode.MANUAL
spring:rabbitmq:addresses: amqp:listener:simple:acknowledge-mode: manual
消费者手动确认逻辑:
import com.xiaowu.rabbitmq.constant.Constant;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class AckQueueListener {//指定监听队列的名称@RabbitListener(queues = Constant.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throws
Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//1. 接收消息System.out.printf("接收到消息: %s, deliveryTag: %d%n", new
String(message.getBody(),"UTF-8"),
message.getMessageProperties().getDeliveryTag());//2. 处理业务逻辑System.out.println("处理业务逻辑");//⼿动设置⼀个异常, 来测试异常拒绝机制
// int num = 3/0;//3. ⼿动签收channel.basicAck(deliveryTag, true);} catch (Exception e) {//4. 异常了就拒绝签收//第三个参数requeue, 是否重新发送, 如果为true, 则会重新发送,,若为false,
则直接丢弃channel.basicNack(deliveryTag, true,true);}}
}
这个代码运行的结果是正常的, 运行后消息会被签收: Ready为0, unacked为0。
异常时拒绝:
import com.xiaowu.rabbitmq.constant.Constant;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class AckQueueListener {//指定监听队列的名称@RabbitListener(queues = Constant.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throws
Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//1. 接收消息System.out.printf("接收到消息: %s, deliveryTag: %d%n", new
String(message.getBody(),"UTF-8"),
message.getMessageProperties().getDeliveryTag());//2. 处理业务逻辑System.out.println("处理业务逻辑");//⼿动设置⼀个异常, 来测试异常拒绝机制int num = 3/0;//3. ⼿动签收channel.basicAck(deliveryTag, true);} catch (Exception e) {//4. 异常了就拒绝签收//第三个参数requeue, 是否重新发送, 如果为true, 则会重新发送,,若为false,
则直接丢弃channel.basicNack(deliveryTag, true,true);}}
}
接收到消息: consumer ack test..., deliveryTag: 1
处理业务逻辑
接收到消息: consumer ack test..., deliveryTag: 2
处理业务逻辑
接收到消息: consumer ack test..., deliveryTag: 3
处理业务逻辑
接收到消息: consumer ack test..., deliveryTag: 4
处理业务逻辑
接收到消息: consumer ack test..., deliveryTag: 5
处理业务逻辑
接收到消息: consumer ack test..., deliveryTag: 6
处理业务逻辑
管理页面上unacked也是1:

相关文章:
RabbitMQ高级特性--消息确认机制
目录 一、消息确认 1.消息确认机制 2.手动确认方法 二、代码示例 1. AcknowledgeMode.NONE 1.1 配置文件 1.2 生产者 1.3 消费者 1.4 运行程序 2.AcknowledgeMode.AUTO 3.AcknowledgeMode.MANUAL 一、消息确认 1.消息确认机制 生产者发送消息之后,到达消…...
Java EE 进阶:Spring IoCDI
IOC的简单介绍 什么是Spring?Spring是一个开源的框架,让我们的开发更加的简单,我们可以用一句更加具体的话来概括Spring,就是Spring是一个包含众多工具方法的IOC容器。 简单介绍一下IOC,我们之前说过通过ReqestContr…...
deepseek为什么要开源
一、生态位的抢占与锁定:以 JDK 版本为例 在软件开发的世界里,生态位的抢占和先入为主的效应十分显著。就拿 Java 开发中的 JDK 版本来说,目前大多数开发者仍在广泛使用 JDK8。尽管 JDK17 和 JDK21 已经推出,且具备更多先进特性…...
Java数据结构第二十期:解构排序算法的艺术与科学(二)
专栏:Java数据结构秘籍 个人主页:手握风云 目录 一、常见排序算法的实现 1.1. 直接选择排序 1.2. 堆排序 1.3. 冒泡排序 1.4. 快速排序 一、常见排序算法的实现 1.1. 直接选择排序 每⼀次从待排序的数据元素中选出最小的⼀个元素,存放在…...
【算法day5】最长回文子串——马拉车算法
最长回文子串 给你一个字符串 s,找到 s 中最长的 回文 子串。 https://leetcode.cn/problems/longest-palindromic-substring/description/ 算法思路: class Solution { public:string longestPalindrome(string s) {int s_len s.size();string tmp …...
《如何排查Linux系统平均负载过高》
【系统平均负载导读】何为系统平均负载?假设一台云服务主机,突然之间响应用户请求的时间变长了,那么这个时候应该如何去排查?带着这个问题,我们对“平均负载”展开深入的探讨和研究。 何为Linux系统的平均负载…...
基于DeepSeek实现PDF嵌入SVG图片无损放大
1. PDF中效果图 2. 询问Deepseek进行代码书写,不断优化后结果 /*** SVG工具类,用于生成价格趋势的SVG图表*/ public class SvgUtils {// SVG画布尺寸private static final int WIDTH 800;private static final int HEIGHT 500;private static final i…...
整型的不同类型和溢出
一、整数的不同类型 不同编程语言中的整数类型主要通过以下两个维度区分: 1. 存储大小 字节数:决定整数能表示的范围(如 1字节8位)。 常见类型: byte(1字节)、short(2字节&#x…...
使用express创建服务器保存数据到mysql
创建数据库和表结构 CREATE DATABASE collect;USE collect;CREATE TABLE info (id int(11) NOT NULL AUTO_INCREMENT,create_date bigint(20) DEFAULT NULL COMMENT 时间,type varchar(20) DEFAULT NULL COMMENT 数据分类,text_value text COMMENT 内容,PRIMARY KEY (id) ) EN…...
小程序 wxml 语法 —— 41列表渲染 - 进阶用法
这一节讲解列表渲染的两个进阶用法: 如果需要对默认的变量名和下标进行修改,可以使用 wx:for-item 和 wx:for-item: 使用 wx:for-item 可以指定数组当前元素的变量名使用 wx:for-index 可以指定数组当前下标的变量名 将 wx:for 用在 标签上&…...
python语言总结(持续更新)
本文主要是总结各函数,简单的函数不会给予示例,如果在平日遇到一些新类型将会添加 基础知识 输入与输出 print([要输出的内容])输出函数 input([提示内容]如果输入提示内容会在交互界面显示,用以提示用户)输入函数 注释 # 单行注释符&…...
正则表达式简述
普通字符 普通字符代表它们自身,用于精确匹配字符串中的字符。例如,a 匹配字符 a,1 匹配数字 1。元字符 元字符是具有特殊含义的字符,用于匹配特定类型的字符或字符串模式。 常用元字符 . :匹配除换行符以外的任意单个…...
FPGA学习篇——Verilog学习5(reg,wire区分及模块例化)
1 何时用reg,何时用wire? 这个我找了一些网上的各种资料,大概说一下自己的理解,可能还不太到位... wire相当于一根线,是实时传输的那种,而reg是一个寄存器,是可以存储数据的,需要立…...
Redis 数据持久化之AOF
AOF(Append Only File) 以日志的形式来记录每个写操作,将Redis执行过的所有写指令记录下来(读操作不记录),只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换…...
【芯片验证】verificationguide上的74道SystemVerilog面试题
诧异啊,像我这种没事在网上各处捡东西吃的人为什么之前一直没有用过verificationguide这个网站呢?总不能是大家都已经看过就留下我不知道吧。前几天在论坛上和朋友谈论验证面试题时才搜到这个网站的,感觉挺有意思: .: Verification Guide :.verificationguide.com/https…...
C++进阶知识10 封装unordered_map和unordered_set
封装unordered_map和unordered_set 1. 模拟实现unordered_map和unordered_set1.1 实现出复⽤哈希表的框架,并⽀持insert 1. 模拟实现unordered_map和unordered_set 1.1 实现出复⽤哈希表的框架,并⽀持insert • 参考源码框架,unordered_map…...
大白话JavaScript数据类型判断方法的原理与实践
大白话JavaScript数据类型判断方法的原理与实践 答题思路 明确 JavaScript 数据类型:JavaScript 数据类型分为基本数据类型(如 Number、String、Boolean、Null、Undefined、Symbol)和引用数据类型(如 Object、Array、Function 等…...
Java后端高频面经——计算机网络
TCP/IP四层模型?输入一个网址后发生了什么,以百度为例?(美团) (1)四层模型 应用层:支持 HTTP、SMTP 等最终用户进程传输层:处理主机到主机的通信(TCP、UDP&am…...
面试题(二)--Object中的常见方法
Object Java的Object是所有Java类的父类,所有的Java类直接或者间接的继承了Object类,Object类位于java.lang包中(编译时自动导入),主要提供了11种方法。 /*** native 方法,用于返回当前运行时对象的 Class…...
运行OpenManus项目(使用Conda)
部署本项目需要具备一定的基础:Linux基础、需要安装好Anaconda/Miniforge(Python可以不装好,直接新建虚拟环境的时候装好即可),如果不装Anaconda或者Miniforge,只装过Python,需要确保Python是3.…...
如何在 Windows 10 启用卓越性能模式及不同电源计划对比
在使用 powercfg -duplicatescheme 命令启用 “卓越性能模式”(即 Ultimate Performance 模式)之前,有几个前提条件需要注意: 前提条件: 系统版本要求:卓越性能模式 仅在 Windows 10 专业版 或更高版本&a…...
设备管理系统功能与.NET+VUE(IVIEW)技术实现
在现代工业和商业环境中,设备管理系统(Equipment Management System,简称EMS)是确保设备高效运行和维护的关键工具。本文采用多租户设计的设备管理系统,基于.NET后端和VUE前端(使用IVIEW UI框架)…...
分布式光伏发电的发展现状与前景
分布式光伏发电的发展现状与前景 1、分布式光伏发电的背景2、分布式光伏发电的分类2.1、集中式光伏发电2.1.1、特点、原则2.1.2、优点2.1.3、缺点 2.2、分布式光伏发电2.2.1、特点、原则2.2.2、优点2.2.3、缺点 2.3、对比 3、分布式光伏发电的现状4、分布式光伏发电的应用场景4…...
数据类设计_图片类设计之2_无规则图类设计(前端架构基础)
前言 学的东西多了,要想办法用出来.C和C是偏向底层的语言,直接与数据打交道.尝试做一些和数据方面相关的内容 引入 接续上一篇数据类设计_图片类设计之1_矩阵类设计(前端架构基础)-CSDN博客,讨论非规则图类型的设计 无规则图的简单定义 前面的矩阵类,有明显的特征:长,宽,行和…...
aws(学习笔记第三十二课) 深入使用cdk(API Gateway + event bridge)
文章目录 aws(学习笔记第三十二课) 深入使用cdk学习内容:1. 使用aws API Gatewaylambda1.1. 以前的练习1.2. 使用cdk创建API Gateway lambda1.3. 确认cdk创建API Gateway lambda 2. 使用event bridge练习producer和consumer2.1. 代码链接2.2. 开始练习2.3. 代码部…...
计算机视觉算法实战——老虎个体识别(主页有源码)
✨个人主页欢迎您的访问 ✨期待您的三连 ✨ ✨个人主页欢迎您的访问 ✨期待您的三连 ✨ ✨个人主页欢迎您的访问 ✨期待您的三连✨ 1. 领域介绍 老虎个体识别是计算机视觉中的一个重要应用领域,旨在通过分析老虎的独特条纹图案,自动识别和区…...
Qt添加MySql数据库驱动
文章目录 一. 安装MySql二.编译mysql动态链接库 Qt版本:5.14.2 MySql版本:8.0.41 一. 安装MySql 参考这里进行安装:https://blog.csdn.net/qq_30150579/article/details/146042922 将mysql安装目录里的bin,include和lib拷贝出来…...
蓝桥杯备考:图论初解
1:图的定义 我们学了线性表和树的结构,那什么是图呢? 线性表是一个串一个是一对一的结构 树是一对多的,每个结点可以有多个孩子,但只能有一个父亲 而我们今天学的图!就是多对多的结构了 V表示的是图的顶点集…...
【每日学点HarmonyOS Next知识】输入框自动获取焦点、JS桥实现方式、Popup设置全屏蒙版、鼠标事件适配、Web跨域
1、HarmonyOS TextInput或TextArea如何自动获取焦点? 可以使用 focusControl.requestFocus 对需要获取焦点的组件设置焦点,具体可以参考文档: https://developer.huawei.com/consumer/cn/doc/harmonyos-references-V5/ts-universal-attribut…...
网络空间安全(19)CSRF攻防
一、简介 跨站请求伪造(Cross-Site Request Forgery,简称CSRF)是一种网络攻击方式。攻击者通过诱导受害者访问恶意页面,利用受害者在被攻击网站已经获取的注册凭证(如Cookie),绕过后台的用户验证…...
