RabbitMQ 发布确认机制
发布确认模式是避免消息由生产者到RabbitMQ消息丢失的一种手段
发布确认模式
- 原理说明
- 实现方式
- 开启confirm(确认)模式
- 阻塞确认
- 异步确认
- 总结
原理说明
生产者通过调用channel.confirmSelect方法将信道设置为confirm模式,之后RabbitMQ会返回Confirm.Select-OK命令表示同意生产者将当前信道设置为confirm模式。
confirm模式下的信道所发送的消息都将被应带ack或者nack一次,不会出现一条消息即被ack又被nack的情况,并且RabbitMQ也并没有对消息被confirm的快慢做出保证,消息被confirm是异步进行。

如上图所示为confirm模式下的消息发送过程,其中4和6为异步应答,也就是说4过程并不一定在5之前,也有可能是在下一条消息发送后才会进行上一条消息的应答。
RabbitMQ 事务和发送确认机制确保的是消息能够正确的发送至RabbitMQ的交换机,如果交换机没有匹配的队列,那么消息也会被丢失。和事务不同的是,发布确认机制是异步进行的,因此在性能上发布确认模式将更加优秀,需要注意的是:事务和确认机制是互斥的,不能共存。
事务机制和发布确认机制都存在以下注意点:
- 如果消息需要持久化并且存在队列,则在消息入队并且持久化后进行返回事务提交成功或者应答消息。
- 如果消息不需要持久化但是存在队列,则在消息入队后返回事务提交成功或者应答消息。
- 如果消息不可路由到队列中,则在路由失败后返回事务提交成功或者应答消息。
上文中一直强调的时发布确认针对发布发送到RabbitMQ中的交换机进行保证,但消息实际是否能入队发布确认机制并不能提供保证,因此还需要和mandatory参数配合使用。
实现方式
RabbitMQ的发布确认机制可以分为三种实现方式:阻塞等待确认、批量阻塞等待确认、异步确认。
阻塞等待确认:每当消息发送后,发送者都阻塞的等待应答消息。这种实现方式将无法体现发布确认模式的异步性能优势。
批量阻塞确认:批量阻塞确认类似于阻塞等待确认,区别在于批量阻塞确认并不会针对每条消息进行阻塞等待,他会针对一些消息进行统一阻塞等待应答消息。这种实现方式将同步和异步结合起来进行使用,对应答性能有一定的提升。
异步应答:实现一个监听器的方式接收应答消息,应答消息的处理逻辑不会影响消息的发送,消息的应答和消息发送是异步进行的,他们并不直接相互干扰。
上面对三种确认方式进行简单说明,下面将分别介绍发布确认机制的实现方式。
开启confirm(确认)模式
确认模式的开启是针对信道设置的,一旦信道进入了confirm模式,所有在该信道上面发布的消息都会被指派唯一的ID,RabbitMQ也将针对该信道发送的所有消息都进行应答。
RabbitMQ回传给生产者的确认消息中的deliverryTag包含了确认消息的序号,但在使用(批量)阻塞确认方式进行实现的时候该消息序号无意义。开启confirm模式仅需要以下代码进行实现即可:
channel.confirmSelect();
阻塞确认
阻塞确认的方式依赖于channel.waitForConfirms()方法,该方法如下所示:
/*** Wait until all messages published since the last call have been* either ack'd or nack'd by the broker. Note, when called on a* non-Confirm channel, waitForConfirms throws an IllegalStateException.* @return whether all the messages were ack'd (and none were nack'd)* @throws java.lang.IllegalStateException*/boolean waitForConfirms() throws InterruptedException;
自从上次调用该方法后直到所有发送的消息都被应答后返回所有消息的应答结果,如果所有发送的消息应答结果都是成功则返回true,一旦存在任何一条消息应答失败则返回false。
根据该方法的描述可知,可以通过该方法实现阻塞等待确认和批量阻塞确认两种方案,区别仅在于是发送一条消息调用一次该方法还是发送一批消息后调用一次这个方法。
阻塞等待确认的方式如下代码所示:
//发送消息
channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
// 如果发送失败则进行该条消息的重新发送
if(!channel.waitForConfirms()){channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
}
阻塞批量确认的方式如下代码所示:
// 存储未应带消息队列List<String> messages = new ArrayList<>();for (int i = 1; i < 20000 ; i++){String msg = String.valueOf(i);messages.add(msg);channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());// 每发送十条消息进行一次确认if(i > 0 && i % 10 == 0 ){// 如果确认不通过则将消息重新发送if(!channel.waitForConfirms()){for (String e : messages) {channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,e.getBytes());}}else{// 如果确认成功则将这些消息从未应答队列中移除messages.clear();}}}
异步确认
客户端Channel提供了addConfirmListener方法,该可以添加ConfirmListener这个回调接口,该接口包含两个方法:handleAck和handleNack,分别用来处理饭hi的Ack和Nack,这两个方法都将返回一个参数deliveryTag(消息的唯一有序序号)和一个boolean型参数multiple,如果该参数为true表示自该消息之前的所有消息RabbitMQ服务都已经做出了应答。我们可以通过该值实现具体业务的发布确认。
/**
* Implement this interface in order to be notified of Confirm events.
* Acks represent messages handled successfully; Nacks represent
* messages lost by the broker. Note, the lost messages could still
* have been delivered to consumers, but the broker cannot guarantee
* this.
* For a lambda-oriented syntax, use {@link ConfirmCallback}.
*/
public interface ConfirmListener {void handleAck(long deliveryTag, boolean multiple)throws IOException;void handleNack(long deliveryTag, boolean multiple)throws IOException;
}
异步确认的方式实现起来比较复杂,在生产者端需要维护一个消息队列,如果消息应答成功则将该消息从队列中移除,如果消息应答失败则将该消息再重新发送或进行其他业务处理。该逻辑伪码如下所示:
// 存储未确认消息,其中key为消息序号,value为消息实体HashMap<Long,String> msgMap = new HashMap<>();channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {msgMap.remove(deliveryTag);}/*** 如果消息应带结果为nack则重新发送该消息* @param deliveryTag* @param multiple* @throws IOException*/@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {String msg = msgMap.get(deliveryTag);if(msg != null){channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());}}});for (int i = 1; i < 20000 ; i++){String msg = String.valueOf(i);// 将消息序号和消息存储map中msgMap.put(channel.getNextPublishSeqNo(),msg);channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());}
上述代码使用了map存储消息序号和消息实体,这种存储方式应该会存在风险,由于监听器和消息发送过程是异步进行了,因此可能会存在线程安全的问题,HashMap是非线程安全的。
总结
发布确认模式是为我们解决消息自生产者发送到RabbitMQ交换机过程中消息丢失的问题的,这一场景需求我们也可以通过事务机制实现。发布确认模式和事务机制比较如下表所示:
| 比较 | 事务机制 | 发布确认机制 |
|---|---|---|
| 实现方式 | 通过AMQP协议层面实现 | 轻量级实现,采用RabbitMQ应答机制 |
| 命令详解 | Tx.Select Basic.Publish Tx.Commit Commit.OK | Basic.Publish Basic.Ack |
| 性能 | 同步,性能较慢 | 可异步实现也可同步实现,性能快,AMQP命令交互少 |
| 消息到达队列时机 | 事务提交后消息才会进入队列,消息入队存在滞后性 | 消息发送后就进入队列,发布确认模式不影响消息进入队列时机 |
| 事务提交成功或消息应答时机 | 消息被交换机处理完成后,或消息不可达 | 同事务 |
| 实现复杂度 | 简单 | 相对复杂 |
| 适合场景 | 批量发送消息,实现批量消息的原子性和一致性 | 确保消息发送到交换机 |
发布确认模式的具体实现可以划分为三种:阻塞等待、批量确认、异步确认,这三者的比较如下表所示:
| 比较内容 | 阻塞等待 | 批量等待 | 异步确认 |
|---|---|---|---|
| 性能 | 低 | 中 | 高 |
| 实现复杂度 | 低 | 中 | 高 |
| 确认范围 | 每条消息 | 批量消息 | 每条消息 |
| 是否可以精准确认每条消息 | 是 | 否 | 是 |
根据上述内容,我们在实现避免消息自生产者到交换机丢失的机制时建议使用发布确认模式的异步确认,因为异步确认性能最高,并且可以准确的得到被应答的消息的序号,有助于我们进行后续逻辑处理。
相关文章:
RabbitMQ 发布确认机制
发布确认模式是避免消息由生产者到RabbitMQ消息丢失的一种手段 发布确认模式 原理说明实现方式开启confirm(确认)模式阻塞确认异步确认 总结 原理说明 生产者通过调用channel.confirmSelect方法将信道设置为confirm模式,之后RabbitMQ会返回Co…...
微信小程序使用rich-text解析富文本字符串的时候,遇到image标签图片很大超过屏幕
场景: 使用uniapp开发微信小程序,解析富文本文章需求 用到的组件: u-view2.0的u-parse uniapp提供的rich-text 以上两种组件都是解析富文本的作用,一般用于富文本解析场景,比如解析文章内容,商品详情&am…...
使用IIS服务器部署Flask python Web项目
参考文章 ""D:\Program Files (x86)\Python310\python310.exe"|"D:\Program Files (x86)\Python310\lib\site-packages\wfastcgi.py"" can now be used as a FastCGI script processor参考文章 请求路径填写*,模块选择FastCgiModule&…...
sentinel核心流程源码解析
sentinel的处理槽(ProcessorSlot) 可以说,sentinel实现的各种功能就是由各处理槽完成的 ,ProcessorSlot定义了四个方法: 当进入该处理槽时触发该方法 处理完 entry方法之后触发该方法 退出该处理槽时触发该方法 exit方法处理完成时触发该方法 sentinel的…...
中睿天下Coremail | 2023年第二季度企业邮箱安全态势观察
今日,中睿天下联合Coremail邮件安全发布《2023第二季度企业邮箱安全性研究报告》,对2023第二季度和2023上半年的企业邮箱的安全风险进行了分析。 一 垃圾邮件同比下降16.38% 根据监测,2023年Q2垃圾邮件数量达到6.47亿封,环比下降…...
桶排序-1184:明明的随机数
【题目描述】 明明想在学校中请一些同学一起做一项问卷调查,为了实验的客观性,他先用计算机生成了N个1到1000之间的随机整数(N≤100),对于其中重复的数字,只保留一个,把其余相同的数去掉&#x…...
Spring Boot中整合Keycloak OpenID Connect(OIDC)
在Spring Boot中整合Keycloak OpenID Connect(OIDC)是一个常见的任务,用于实现身份验证和授权。Keycloak是一个开源的身份和访问管理解决方案,而OpenID Connect是构建在OAuth 2.0之上的认证和授权协议。下面是一个简单的步骤指南&…...
如何使用Mac终端给树莓派pico构建C/C++程序进行开发,以及遇到各种问题该怎么处理,不使用任何IDE或编辑器(例如VS Code)
写本文的原因是官方的教程已经过时了,如果你现在按照官方教程来在 Mac 上进行配置,那么会遇到一堆问题,比如我几乎把能踩的“雷”都踩了。所以这里记录了完整过程,以及各种错误的原因和处理方法,不然以后换 Mac 了或者…...
linux 关机和重启
关机和重启 关机和重启之前最好先数据数据同步一下 # 将数据由内存同步到硬盘sync 关机 #shutdown [选项] 时间#立即进入维护模式shutdown now#立即重启shutdown -r now#20:00 重新启动计算机shutdown -r 20:00& #立即关机shutdown -h now# 20:00 关闭计算机shutdown -h 20…...
Python(八十三)字符串的比较操作
❤️ 专栏简介:本专栏记录了我个人从零开始学习Python编程的过程。在这个专栏中,我将分享我在学习Python的过程中的学习笔记、学习路线以及各个知识点。 ☀️ 专栏适用人群 :本专栏适用于希望学习Python编程的初学者和有一定编程基础的人。无…...
Java面试
Java面试宝典是一本面向Java开发者的面试准备指南,旨在帮助准备参加Java相关职位面试的人们更好地准备和应对面试。以下是一些可能在Java面试中涉及的主题和问题,供您参考: Java基础知识: 什么是Java虚拟机(JVM&#x…...
基于java的voliate关键字详解
voliate关键字的作用: 一、内存可见性 基于缓存一致性协议,当用voliate关键字修饰的变量改动时,cpu会通知其他线程,缓存已被修改,需要更新缓存。这样每个线程都能获取到最新的变量值。 二、基于内存屏障的防止指令重排 用voli…...
企业计算机服务器中了360后缀勒索病毒怎么办,勒索病毒解密数据恢复
随着计算机技术的不断发展,企业的办公系统得到了很大提升,但是随之而来的网络安全威胁也不断增加,勒索病毒的攻击事件时有发生。近期,我们收到某地连锁超市的求助,企业的计算机服务器遭到了360后缀勒索病毒攻击&#x…...
W6100-EVB-PICO 做TCP Server进行回环测试(六)
前言 上一章我们用W6100-EVB-PICO开发板做TCP 客户端连接服务器进行数据回环测试,那么本章将用开发板做TCP服务器来进行数据回环测试。 TCP是什么?什么是TCP Server?能干什么? TCP (Transmission Control Protocol) 是一种面向连…...
前端小兔鲜儿2
day10-小兔鲜儿 01-banner-轮播图 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1FWNmPpj-1691944251554)(assets/1680344354682.png)] index.css HTML结构 <!-- banner --><div class"banner"><div class"wrappe…...
Pycharm 双击启动失败?
事故 双击 Pycharm 后,出现加载工程,我不想加载这个工程,就点击了弹出的 cancle 取消按钮。然后再到桌面双击 Pycharm 却发现无法启动了。哪怕以管理员权限运行也没用,就是不出界面。 原因未知 CtrlshiftESC 打开后台ÿ…...
spring 事务回滚失败异常
1 背景介绍 事务模板里抛异常,抛异常前的update操作成功,事务没有回滚成功,业务数据还是落db了。debug代码,发现GenericConnectionContext类中derivedConnectionMap是空的,导致回滚代码没有执行 2 解决方案 保证事务…...
Kafka 01——Kafka的安装及简单入门使用
Kafka 01——Kafka的安装及简单入门使用 1. 下载安装1.1 JDK的安装1.2 Zookeeper的安装1.2.1 关于Zookeeper版本的选择1.2.2 下载、安装Zookeeper 1.3 kafka的安装1.3.1 下载1.3.2 解压1.3.3 修改配置文件 2. 启动 kafka2.1 Kafka启动2.2 启动 kafka 遇到的问题2.2.1 问题12.2.…...
【爬虫】爬取旅行评论和评分
以马蜂窝“普达措国家公园”为例,其评论高达3000多条,但这3000多条并非是完全向用户展示的,向用户展示的只有5页,数了一下每页15条评论,也就是75条评论,有点太少了吧! 因此想了个办法尽可能多爬…...
C++ 泛型编程:函数模板
文章目录 前言一、什么是泛型编程二、函数模板三、函数模板的使用四、多参数函数模板五,示例代码:总结 前言 当需要编写通用的代码以处理不同类型的数据时,C 中的函数模板是一个很有用的工具。函数模板允许我们编写一个通用的函数定义&#…...
.Net框架,除了EF还有很多很多......
文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...
云启出海,智联未来|阿里云网络「企业出海」系列客户沙龙上海站圆满落地
借阿里云中企出海大会的东风,以**「云启出海,智联未来|打造安全可靠的出海云网络引擎」为主题的阿里云企业出海客户沙龙云网络&安全专场于5.28日下午在上海顺利举办,现场吸引了来自携程、小红书、米哈游、哔哩哔哩、波克城市、…...
Vue2 第一节_Vue2上手_插值表达式{{}}_访问数据和修改数据_Vue开发者工具
文章目录 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染2. 插值表达式{{}}3. 访问数据和修改数据4. vue响应式5. Vue开发者工具--方便调试 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染 准备容器引包创建Vue实例 new Vue()指定配置项 ->渲染数据 准备一个容器,例如: …...
企业如何增强终端安全?
在数字化转型加速的今天,企业的业务运行越来越依赖于终端设备。从员工的笔记本电脑、智能手机,到工厂里的物联网设备、智能传感器,这些终端构成了企业与外部世界连接的 “神经末梢”。然而,随着远程办公的常态化和设备接入的爆炸式…...
Web 架构之 CDN 加速原理与落地实践
文章目录 一、思维导图二、正文内容(一)CDN 基础概念1. 定义2. 组成部分 (二)CDN 加速原理1. 请求路由2. 内容缓存3. 内容更新 (三)CDN 落地实践1. 选择 CDN 服务商2. 配置 CDN3. 集成到 Web 架构 …...
让回归模型不再被异常值“带跑偏“,MSE和Cauchy损失函数在噪声数据环境下的实战对比
在机器学习的回归分析中,损失函数的选择对模型性能具有决定性影响。均方误差(MSE)作为经典的损失函数,在处理干净数据时表现优异,但在面对包含异常值的噪声数据时,其对大误差的二次惩罚机制往往导致模型参数…...
【 java 虚拟机知识 第一篇 】
目录 1.内存模型 1.1.JVM内存模型的介绍 1.2.堆和栈的区别 1.3.栈的存储细节 1.4.堆的部分 1.5.程序计数器的作用 1.6.方法区的内容 1.7.字符串池 1.8.引用类型 1.9.内存泄漏与内存溢出 1.10.会出现内存溢出的结构 1.内存模型 1.1.JVM内存模型的介绍 内存模型主要分…...
Python 训练营打卡 Day 47
注意力热力图可视化 在day 46代码的基础上,对比不同卷积层热力图可视化的结果 import torch import torch.nn as nn import torch.optim as optim from torchvision import datasets, transforms from torch.utils.data import DataLoader import matplotlib.pypl…...
c# 局部函数 定义、功能与示例
C# 局部函数:定义、功能与示例 1. 定义与功能 局部函数(Local Function)是嵌套在另一个方法内部的私有方法,仅在包含它的方法内可见。 • 作用:封装仅用于当前方法的逻辑,避免污染类作用域,提升…...
海云安高敏捷信创白盒SCAP入选《中国网络安全细分领域产品名录》
近日,嘶吼安全产业研究院发布《中国网络安全细分领域产品名录》,海云安高敏捷信创白盒(SCAP)成功入选软件供应链安全领域产品名录。 在数字化转型加速的今天,网络安全已成为企业生存与发展的核心基石,为了解…...
