RabbitMQ 发布确认机制
发布确认模式是避免消息由生产者到RabbitMQ消息丢失的一种手段
发布确认模式
- 原理说明
- 实现方式
- 开启confirm(确认)模式
- 阻塞确认
- 异步确认
- 总结
原理说明
生产者通过调用channel.confirmSelect方法将信道设置为confirm模式,之后RabbitMQ会返回Confirm.Select-OK命令表示同意生产者将当前信道设置为confirm模式。
confirm模式下的信道所发送的消息都将被应带ack或者nack一次,不会出现一条消息即被ack又被nack的情况,并且RabbitMQ也并没有对消息被confirm的快慢做出保证,消息被confirm是异步进行。
如上图所示为confirm模式下的消息发送过程,其中4和6为异步应答,也就是说4过程并不一定在5之前,也有可能是在下一条消息发送后才会进行上一条消息的应答。
RabbitMQ 事务和发送确认机制确保的是消息能够正确的发送至RabbitMQ的交换机,如果交换机没有匹配的队列,那么消息也会被丢失。和事务不同的是,发布确认机制是异步进行的,因此在性能上发布确认模式将更加优秀,需要注意的是:事务和确认机制是互斥的,不能共存。
事务机制和发布确认机制都存在以下注意点:
- 如果消息需要持久化并且存在队列,则在消息入队并且持久化后进行返回事务提交成功或者应答消息。
- 如果消息不需要持久化但是存在队列,则在消息入队后返回事务提交成功或者应答消息。
- 如果消息不可路由到队列中,则在路由失败后返回事务提交成功或者应答消息。
上文中一直强调的时发布确认针对发布发送到RabbitMQ中的交换机进行保证,但消息实际是否能入队发布确认机制并不能提供保证,因此还需要和mandatory参数配合使用。
实现方式
RabbitMQ的发布确认机制可以分为三种实现方式:阻塞等待确认、批量阻塞等待确认、异步确认。
阻塞等待确认:每当消息发送后,发送者都阻塞的等待应答消息。这种实现方式将无法体现发布确认模式的异步性能优势。
批量阻塞确认:批量阻塞确认类似于阻塞等待确认,区别在于批量阻塞确认并不会针对每条消息进行阻塞等待,他会针对一些消息进行统一阻塞等待应答消息。这种实现方式将同步和异步结合起来进行使用,对应答性能有一定的提升。
异步应答:实现一个监听器的方式接收应答消息,应答消息的处理逻辑不会影响消息的发送,消息的应答和消息发送是异步进行的,他们并不直接相互干扰。
上面对三种确认方式进行简单说明,下面将分别介绍发布确认机制的实现方式。
开启confirm(确认)模式
确认模式的开启是针对信道设置的,一旦信道进入了confirm模式,所有在该信道上面发布的消息都会被指派唯一的ID,RabbitMQ也将针对该信道发送的所有消息都进行应答。
RabbitMQ回传给生产者的确认消息中的deliverryTag包含了确认消息的序号,但在使用(批量)阻塞确认方式进行实现的时候该消息序号无意义。开启confirm模式仅需要以下代码进行实现即可:
channel.confirmSelect();
阻塞确认
阻塞确认的方式依赖于channel.waitForConfirms()方法,该方法如下所示:
/*** Wait until all messages published since the last call have been* either ack'd or nack'd by the broker. Note, when called on a* non-Confirm channel, waitForConfirms throws an IllegalStateException.* @return whether all the messages were ack'd (and none were nack'd)* @throws java.lang.IllegalStateException*/boolean waitForConfirms() throws InterruptedException;
自从上次调用该方法后直到所有发送的消息都被应答后返回所有消息的应答结果,如果所有发送的消息应答结果都是成功则返回true,一旦存在任何一条消息应答失败则返回false。
根据该方法的描述可知,可以通过该方法实现阻塞等待确认和批量阻塞确认两种方案,区别仅在于是发送一条消息调用一次该方法还是发送一批消息后调用一次这个方法。
阻塞等待确认的方式如下代码所示:
//发送消息
channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
// 如果发送失败则进行该条消息的重新发送
if(!channel.waitForConfirms()){channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
}
阻塞批量确认的方式如下代码所示:
// 存储未应带消息队列List<String> messages = new ArrayList<>();for (int i = 1; i < 20000 ; i++){String msg = String.valueOf(i);messages.add(msg);channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());// 每发送十条消息进行一次确认if(i > 0 && i % 10 == 0 ){// 如果确认不通过则将消息重新发送if(!channel.waitForConfirms()){for (String e : messages) {channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,e.getBytes());}}else{// 如果确认成功则将这些消息从未应答队列中移除messages.clear();}}}
异步确认
客户端Channel提供了addConfirmListener方法,该可以添加ConfirmListener这个回调接口,该接口包含两个方法:handleAck和handleNack,分别用来处理饭hi的Ack和Nack,这两个方法都将返回一个参数deliveryTag(消息的唯一有序序号)和一个boolean型参数multiple,如果该参数为true表示自该消息之前的所有消息RabbitMQ服务都已经做出了应答。我们可以通过该值实现具体业务的发布确认。
/**
* Implement this interface in order to be notified of Confirm events.
* Acks represent messages handled successfully; Nacks represent
* messages lost by the broker. Note, the lost messages could still
* have been delivered to consumers, but the broker cannot guarantee
* this.
* For a lambda-oriented syntax, use {@link ConfirmCallback}.
*/
public interface ConfirmListener {void handleAck(long deliveryTag, boolean multiple)throws IOException;void handleNack(long deliveryTag, boolean multiple)throws IOException;
}
异步确认的方式实现起来比较复杂,在生产者端需要维护一个消息队列,如果消息应答成功则将该消息从队列中移除,如果消息应答失败则将该消息再重新发送或进行其他业务处理。该逻辑伪码如下所示:
// 存储未确认消息,其中key为消息序号,value为消息实体HashMap<Long,String> msgMap = new HashMap<>();channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {msgMap.remove(deliveryTag);}/*** 如果消息应带结果为nack则重新发送该消息* @param deliveryTag* @param multiple* @throws IOException*/@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {String msg = msgMap.get(deliveryTag);if(msg != null){channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());}}});for (int i = 1; i < 20000 ; i++){String msg = String.valueOf(i);// 将消息序号和消息存储map中msgMap.put(channel.getNextPublishSeqNo(),msg);channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());}
上述代码使用了map存储消息序号和消息实体,这种存储方式应该会存在风险,由于监听器和消息发送过程是异步进行了,因此可能会存在线程安全的问题,HashMap是非线程安全的。
总结
发布确认模式是为我们解决消息自生产者发送到RabbitMQ交换机过程中消息丢失的问题的,这一场景需求我们也可以通过事务机制实现。发布确认模式和事务机制比较如下表所示:
比较 | 事务机制 | 发布确认机制 |
---|---|---|
实现方式 | 通过AMQP协议层面实现 | 轻量级实现,采用RabbitMQ应答机制 |
命令详解 | Tx.Select Basic.Publish Tx.Commit Commit.OK | Basic.Publish Basic.Ack |
性能 | 同步,性能较慢 | 可异步实现也可同步实现,性能快,AMQP命令交互少 |
消息到达队列时机 | 事务提交后消息才会进入队列,消息入队存在滞后性 | 消息发送后就进入队列,发布确认模式不影响消息进入队列时机 |
事务提交成功或消息应答时机 | 消息被交换机处理完成后,或消息不可达 | 同事务 |
实现复杂度 | 简单 | 相对复杂 |
适合场景 | 批量发送消息,实现批量消息的原子性和一致性 | 确保消息发送到交换机 |
发布确认模式的具体实现可以划分为三种:阻塞等待、批量确认、异步确认,这三者的比较如下表所示:
比较内容 | 阻塞等待 | 批量等待 | 异步确认 |
---|---|---|---|
性能 | 低 | 中 | 高 |
实现复杂度 | 低 | 中 | 高 |
确认范围 | 每条消息 | 批量消息 | 每条消息 |
是否可以精准确认每条消息 | 是 | 否 | 是 |
根据上述内容,我们在实现避免消息自生产者到交换机丢失的机制时建议使用发布确认模式的异步确认,因为异步确认性能最高,并且可以准确的得到被应答的消息的序号,有助于我们进行后续逻辑处理。
相关文章:

RabbitMQ 发布确认机制
发布确认模式是避免消息由生产者到RabbitMQ消息丢失的一种手段 发布确认模式 原理说明实现方式开启confirm(确认)模式阻塞确认异步确认 总结 原理说明 生产者通过调用channel.confirmSelect方法将信道设置为confirm模式,之后RabbitMQ会返回Co…...
微信小程序使用rich-text解析富文本字符串的时候,遇到image标签图片很大超过屏幕
场景: 使用uniapp开发微信小程序,解析富文本文章需求 用到的组件: u-view2.0的u-parse uniapp提供的rich-text 以上两种组件都是解析富文本的作用,一般用于富文本解析场景,比如解析文章内容,商品详情&am…...

使用IIS服务器部署Flask python Web项目
参考文章 ""D:\Program Files (x86)\Python310\python310.exe"|"D:\Program Files (x86)\Python310\lib\site-packages\wfastcgi.py"" can now be used as a FastCGI script processor参考文章 请求路径填写*,模块选择FastCgiModule&…...

sentinel核心流程源码解析
sentinel的处理槽(ProcessorSlot) 可以说,sentinel实现的各种功能就是由各处理槽完成的 ,ProcessorSlot定义了四个方法: 当进入该处理槽时触发该方法 处理完 entry方法之后触发该方法 退出该处理槽时触发该方法 exit方法处理完成时触发该方法 sentinel的…...

中睿天下Coremail | 2023年第二季度企业邮箱安全态势观察
今日,中睿天下联合Coremail邮件安全发布《2023第二季度企业邮箱安全性研究报告》,对2023第二季度和2023上半年的企业邮箱的安全风险进行了分析。 一 垃圾邮件同比下降16.38% 根据监测,2023年Q2垃圾邮件数量达到6.47亿封,环比下降…...
桶排序-1184:明明的随机数
【题目描述】 明明想在学校中请一些同学一起做一项问卷调查,为了实验的客观性,他先用计算机生成了N个1到1000之间的随机整数(N≤100),对于其中重复的数字,只保留一个,把其余相同的数去掉&#x…...
Spring Boot中整合Keycloak OpenID Connect(OIDC)
在Spring Boot中整合Keycloak OpenID Connect(OIDC)是一个常见的任务,用于实现身份验证和授权。Keycloak是一个开源的身份和访问管理解决方案,而OpenID Connect是构建在OAuth 2.0之上的认证和授权协议。下面是一个简单的步骤指南&…...

如何使用Mac终端给树莓派pico构建C/C++程序进行开发,以及遇到各种问题该怎么处理,不使用任何IDE或编辑器(例如VS Code)
写本文的原因是官方的教程已经过时了,如果你现在按照官方教程来在 Mac 上进行配置,那么会遇到一堆问题,比如我几乎把能踩的“雷”都踩了。所以这里记录了完整过程,以及各种错误的原因和处理方法,不然以后换 Mac 了或者…...
linux 关机和重启
关机和重启 关机和重启之前最好先数据数据同步一下 # 将数据由内存同步到硬盘sync 关机 #shutdown [选项] 时间#立即进入维护模式shutdown now#立即重启shutdown -r now#20:00 重新启动计算机shutdown -r 20:00& #立即关机shutdown -h now# 20:00 关闭计算机shutdown -h 20…...

Python(八十三)字符串的比较操作
❤️ 专栏简介:本专栏记录了我个人从零开始学习Python编程的过程。在这个专栏中,我将分享我在学习Python的过程中的学习笔记、学习路线以及各个知识点。 ☀️ 专栏适用人群 :本专栏适用于希望学习Python编程的初学者和有一定编程基础的人。无…...
Java面试
Java面试宝典是一本面向Java开发者的面试准备指南,旨在帮助准备参加Java相关职位面试的人们更好地准备和应对面试。以下是一些可能在Java面试中涉及的主题和问题,供您参考: Java基础知识: 什么是Java虚拟机(JVM&#x…...
基于java的voliate关键字详解
voliate关键字的作用: 一、内存可见性 基于缓存一致性协议,当用voliate关键字修饰的变量改动时,cpu会通知其他线程,缓存已被修改,需要更新缓存。这样每个线程都能获取到最新的变量值。 二、基于内存屏障的防止指令重排 用voli…...

企业计算机服务器中了360后缀勒索病毒怎么办,勒索病毒解密数据恢复
随着计算机技术的不断发展,企业的办公系统得到了很大提升,但是随之而来的网络安全威胁也不断增加,勒索病毒的攻击事件时有发生。近期,我们收到某地连锁超市的求助,企业的计算机服务器遭到了360后缀勒索病毒攻击&#x…...

W6100-EVB-PICO 做TCP Server进行回环测试(六)
前言 上一章我们用W6100-EVB-PICO开发板做TCP 客户端连接服务器进行数据回环测试,那么本章将用开发板做TCP服务器来进行数据回环测试。 TCP是什么?什么是TCP Server?能干什么? TCP (Transmission Control Protocol) 是一种面向连…...
前端小兔鲜儿2
day10-小兔鲜儿 01-banner-轮播图 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1FWNmPpj-1691944251554)(assets/1680344354682.png)] index.css HTML结构 <!-- banner --><div class"banner"><div class"wrappe…...

Pycharm 双击启动失败?
事故 双击 Pycharm 后,出现加载工程,我不想加载这个工程,就点击了弹出的 cancle 取消按钮。然后再到桌面双击 Pycharm 却发现无法启动了。哪怕以管理员权限运行也没用,就是不出界面。 原因未知 CtrlshiftESC 打开后台ÿ…...
spring 事务回滚失败异常
1 背景介绍 事务模板里抛异常,抛异常前的update操作成功,事务没有回滚成功,业务数据还是落db了。debug代码,发现GenericConnectionContext类中derivedConnectionMap是空的,导致回滚代码没有执行 2 解决方案 保证事务…...

Kafka 01——Kafka的安装及简单入门使用
Kafka 01——Kafka的安装及简单入门使用 1. 下载安装1.1 JDK的安装1.2 Zookeeper的安装1.2.1 关于Zookeeper版本的选择1.2.2 下载、安装Zookeeper 1.3 kafka的安装1.3.1 下载1.3.2 解压1.3.3 修改配置文件 2. 启动 kafka2.1 Kafka启动2.2 启动 kafka 遇到的问题2.2.1 问题12.2.…...

【爬虫】爬取旅行评论和评分
以马蜂窝“普达措国家公园”为例,其评论高达3000多条,但这3000多条并非是完全向用户展示的,向用户展示的只有5页,数了一下每页15条评论,也就是75条评论,有点太少了吧! 因此想了个办法尽可能多爬…...

C++ 泛型编程:函数模板
文章目录 前言一、什么是泛型编程二、函数模板三、函数模板的使用四、多参数函数模板五,示例代码:总结 前言 当需要编写通用的代码以处理不同类型的数据时,C 中的函数模板是一个很有用的工具。函数模板允许我们编写一个通用的函数定义&#…...
golang循环变量捕获问题
在 Go 语言中,当在循环中启动协程(goroutine)时,如果在协程闭包中直接引用循环变量,可能会遇到一个常见的陷阱 - 循环变量捕获问题。让我详细解释一下: 问题背景 看这个代码片段: fo…...
1688商品列表API与其他数据源的对接思路
将1688商品列表API与其他数据源对接时,需结合业务场景设计数据流转链路,重点关注数据格式兼容性、接口调用频率控制及数据一致性维护。以下是具体对接思路及关键技术点: 一、核心对接场景与目标 商品数据同步 场景:将1688商品信息…...
【android bluetooth 框架分析 04】【bt-framework 层详解 1】【BluetoothProperties介绍】
1. BluetoothProperties介绍 libsysprop/srcs/android/sysprop/BluetoothProperties.sysprop BluetoothProperties.sysprop 是 Android AOSP 中的一种 系统属性定义文件(System Property Definition File),用于声明和管理 Bluetooth 模块相…...

从零开始打造 OpenSTLinux 6.6 Yocto 系统(基于STM32CubeMX)(九)
设备树移植 和uboot设备树修改的内容同步到kernel将设备树stm32mp157d-stm32mp157daa1-mx.dts复制到内核源码目录下 源码修改及编译 修改arch/arm/boot/dts/st/Makefile,新增设备树编译 stm32mp157f-ev1-m4-examples.dtb \stm32mp157d-stm32mp157daa1-mx.dtb修改…...

ServerTrust 并非唯一
NSURLAuthenticationMethodServerTrust 只是 authenticationMethod 的冰山一角 要理解 NSURLAuthenticationMethodServerTrust, 首先要明白它只是 authenticationMethod 的选项之一, 并非唯一 1 先厘清概念 点说明authenticationMethodURLAuthenticationChallenge.protectionS…...
【Web 进阶篇】优雅的接口设计:统一响应、全局异常处理与参数校验
系列回顾: 在上一篇中,我们成功地为应用集成了数据库,并使用 Spring Data JPA 实现了基本的 CRUD API。我们的应用现在能“记忆”数据了!但是,如果你仔细审视那些 API,会发现它们还很“粗糙”:有…...
css的定位(position)详解:相对定位 绝对定位 固定定位
在 CSS 中,元素的定位通过 position 属性控制,共有 5 种定位模式:static(静态定位)、relative(相对定位)、absolute(绝对定位)、fixed(固定定位)和…...
【服务器压力测试】本地PC电脑作为服务器运行时出现卡顿和资源紧张(Windows/Linux)
要让本地PC电脑作为服务器运行时出现卡顿和资源紧张的情况,可以通过以下几种方式模拟或触发: 1. 增加CPU负载 运行大量计算密集型任务,例如: 使用多线程循环执行复杂计算(如数学运算、加密解密等)。运行图…...

自然语言处理——Transformer
自然语言处理——Transformer 自注意力机制多头注意力机制Transformer 虽然循环神经网络可以对具有序列特性的数据非常有效,它能挖掘数据中的时序信息以及语义信息,但是它有一个很大的缺陷——很难并行化。 我们可以考虑用CNN来替代RNN,但是…...

C/C++ 中附加包含目录、附加库目录与附加依赖项详解
在 C/C 编程的编译和链接过程中,附加包含目录、附加库目录和附加依赖项是三个至关重要的设置,它们相互配合,确保程序能够正确引用外部资源并顺利构建。虽然在学习过程中,这些概念容易让人混淆,但深入理解它们的作用和联…...