当前位置: 首页 > news >正文

RabbitMQ 发布确认机制

发布确认模式是避免消息由生产者到RabbitMQ消息丢失的一种手段

发布确认模式

  • 原理说明
  • 实现方式
    • 开启confirm(确认)模式
    • 阻塞确认
    • 异步确认
  • 总结

原理说明

  生产者通过调用channel.confirmSelect方法将信道设置为confirm模式,之后RabbitMQ会返回Confirm.Select-OK命令表示同意生产者将当前信道设置为confirm模式。
  confirm模式下的信道所发送的消息都将被应带ack或者nack一次,不会出现一条消息即被ack又被nack的情况,并且RabbitMQ也并没有对消息被confirm的快慢做出保证,消息被confirm是异步进行。
在这里插入图片描述

  如上图所示为confirm模式下的消息发送过程,其中4和6为异步应答,也就是说4过程并不一定在5之前,也有可能是在下一条消息发送后才会进行上一条消息的应答。
  RabbitMQ 事务和发送确认机制确保的是消息能够正确的发送至RabbitMQ的交换机,如果交换机没有匹配的队列,那么消息也会被丢失。和事务不同的是,发布确认机制是异步进行的,因此在性能上发布确认模式将更加优秀,需要注意的是:事务和确认机制是互斥的,不能共存
  事务机制和发布确认机制都存在以下注意点:

  • 如果消息需要持久化并且存在队列,则在消息入队并且持久化后进行返回事务提交成功或者应答消息。
  • 如果消息不需要持久化但是存在队列,则在消息入队后返回事务提交成功或者应答消息。
  • 如果消息不可路由到队列中,则在路由失败后返回事务提交成功或者应答消息。

  上文中一直强调的时发布确认针对发布发送到RabbitMQ中的交换机进行保证,但消息实际是否能入队发布确认机制并不能提供保证,因此还需要和mandatory参数配合使用。

实现方式

  RabbitMQ的发布确认机制可以分为三种实现方式:阻塞等待确认、批量阻塞等待确认、异步确认。
阻塞等待确认:每当消息发送后,发送者都阻塞的等待应答消息。这种实现方式将无法体现发布确认模式的异步性能优势。
批量阻塞确认:批量阻塞确认类似于阻塞等待确认,区别在于批量阻塞确认并不会针对每条消息进行阻塞等待,他会针对一些消息进行统一阻塞等待应答消息。这种实现方式将同步和异步结合起来进行使用,对应答性能有一定的提升。
异步应答:实现一个监听器的方式接收应答消息,应答消息的处理逻辑不会影响消息的发送,消息的应答和消息发送是异步进行的,他们并不直接相互干扰。
上面对三种确认方式进行简单说明,下面将分别介绍发布确认机制的实现方式。

开启confirm(确认)模式

  确认模式的开启是针对信道设置的,一旦信道进入了confirm模式,所有在该信道上面发布的消息都会被指派唯一的ID,RabbitMQ也将针对该信道发送的所有消息都进行应答。
  RabbitMQ回传给生产者的确认消息中的deliverryTag包含了确认消息的序号,但在使用(批量)阻塞确认方式进行实现的时候该消息序号无意义。开启confirm模式仅需要以下代码进行实现即可:

channel.confirmSelect();

阻塞确认

  阻塞确认的方式依赖于channel.waitForConfirms()方法,该方法如下所示:

    /*** Wait until all messages published since the last call have been* either ack'd or nack'd by the broker.  Note, when called on a* non-Confirm channel, waitForConfirms throws an IllegalStateException.* @return whether all the messages were ack'd (and none were nack'd)* @throws java.lang.IllegalStateException*/boolean waitForConfirms() throws InterruptedException;

  自从上次调用该方法后直到所有发送的消息都被应答后返回所有消息的应答结果,如果所有发送的消息应答结果都是成功则返回true,一旦存在任何一条消息应答失败则返回false。
  根据该方法的描述可知,可以通过该方法实现阻塞等待确认和批量阻塞确认两种方案,区别仅在于是发送一条消息调用一次该方法还是发送一批消息后调用一次这个方法。
  阻塞等待确认的方式如下代码所示:

//发送消息
channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
// 如果发送失败则进行该条消息的重新发送
if(!channel.waitForConfirms()){channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
}

 阻塞批量确认的方式如下代码所示:

        // 存储未应带消息队列List<String> messages = new ArrayList<>();for (int i = 1; i < 20000 ;  i++){String msg = String.valueOf(i);messages.add(msg);channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());// 每发送十条消息进行一次确认if(i > 0 && i % 10 == 0 ){// 如果确认不通过则将消息重新发送if(!channel.waitForConfirms()){for (String e : messages) {channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,e.getBytes());}}else{// 如果确认成功则将这些消息从未应答队列中移除messages.clear();}}}

异步确认

  客户端Channel提供了addConfirmListener方法,该可以添加ConfirmListener这个回调接口,该接口包含两个方法:handleAck和handleNack,分别用来处理饭hi的Ack和Nack,这两个方法都将返回一个参数deliveryTag(消息的唯一有序序号)和一个boolean型参数multiple,如果该参数为true表示自该消息之前的所有消息RabbitMQ服务都已经做出了应答。我们可以通过该值实现具体业务的发布确认。

/**
* Implement this interface in order to be notified of Confirm events.
* Acks represent messages handled successfully; Nacks represent
* messages lost by the broker.  Note, the lost messages could still
* have been delivered to consumers, but the broker cannot guarantee
* this.
* For a lambda-oriented syntax, use {@link ConfirmCallback}.
*/
public interface ConfirmListener {void handleAck(long deliveryTag, boolean multiple)throws IOException;void handleNack(long deliveryTag, boolean multiple)throws IOException;
}

  异步确认的方式实现起来比较复杂,在生产者端需要维护一个消息队列,如果消息应答成功则将该消息从队列中移除,如果消息应答失败则将该消息再重新发送或进行其他业务处理。该逻辑伪码如下所示:

        // 存储未确认消息,其中key为消息序号,value为消息实体HashMap<Long,String> msgMap = new HashMap<>();channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {msgMap.remove(deliveryTag);}/*** 如果消息应带结果为nack则重新发送该消息* @param deliveryTag* @param multiple* @throws IOException*/@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {String msg = msgMap.get(deliveryTag);if(msg != null){channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());}}});for (int i = 1; i < 20000 ;  i++){String msg = String.valueOf(i);// 将消息序号和消息存储map中msgMap.put(channel.getNextPublishSeqNo(),msg);channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());}

  上述代码使用了map存储消息序号和消息实体,这种存储方式应该会存在风险,由于监听器和消息发送过程是异步进行了,因此可能会存在线程安全的问题,HashMap是非线程安全的。

总结

  发布确认模式是为我们解决消息自生产者发送到RabbitMQ交换机过程中消息丢失的问题的,这一场景需求我们也可以通过事务机制实现。发布确认模式和事务机制比较如下表所示:

比较事务机制发布确认机制
实现方式通过AMQP协议层面实现轻量级实现,采用RabbitMQ应答机制
命令详解Tx.Select
Basic.Publish
Tx.Commit
Commit.OK
Basic.Publish
Basic.Ack
性能同步,性能较慢可异步实现也可同步实现,性能快,AMQP命令交互少
消息到达队列时机事务提交后消息才会进入队列,消息入队存在滞后性消息发送后就进入队列,发布确认模式不影响消息进入队列时机
事务提交成功或消息应答时机消息被交换机处理完成后,或消息不可达同事务
实现复杂度简单相对复杂
适合场景批量发送消息,实现批量消息的原子性和一致性确保消息发送到交换机

  发布确认模式的具体实现可以划分为三种:阻塞等待、批量确认、异步确认,这三者的比较如下表所示:

比较内容阻塞等待批量等待异步确认
性能
实现复杂度
确认范围每条消息批量消息每条消息
是否可以精准确认每条消息

  根据上述内容,我们在实现避免消息自生产者到交换机丢失的机制时建议使用发布确认模式的异步确认,因为异步确认性能最高,并且可以准确的得到被应答的消息的序号,有助于我们进行后续逻辑处理。

相关文章:

RabbitMQ 发布确认机制

发布确认模式是避免消息由生产者到RabbitMQ消息丢失的一种手段 发布确认模式 原理说明实现方式开启confirm&#xff08;确认&#xff09;模式阻塞确认异步确认 总结 原理说明 生产者通过调用channel.confirmSelect方法将信道设置为confirm模式&#xff0c;之后RabbitMQ会返回Co…...

微信小程序使用rich-text解析富文本字符串的时候,遇到image标签图片很大超过屏幕

场景&#xff1a; 使用uniapp开发微信小程序&#xff0c;解析富文本文章需求 用到的组件&#xff1a; u-view2.0的u-parse uniapp提供的rich-text 以上两种组件都是解析富文本的作用&#xff0c;一般用于富文本解析场景&#xff0c;比如解析文章内容&#xff0c;商品详情&am…...

使用IIS服务器部署Flask python Web项目

参考文章 ""D:\Program Files (x86)\Python310\python310.exe"|"D:\Program Files (x86)\Python310\lib\site-packages\wfastcgi.py"" can now be used as a FastCGI script processor参考文章 请求路径填写*&#xff0c;模块选择FastCgiModule&…...

sentinel核心流程源码解析

sentinel的处理槽(ProcessorSlot) 可以说&#xff0c;sentinel实现的各种功能就是由各处理槽完成的 ,ProcessorSlot定义了四个方法&#xff1a; 当进入该处理槽时触发该方法 处理完 entry方法之后触发该方法 退出该处理槽时触发该方法 exit方法处理完成时触发该方法 sentinel的…...

中睿天下Coremail | 2023年第二季度企业邮箱安全态势观察

今日&#xff0c;中睿天下联合Coremail邮件安全发布《2023第二季度企业邮箱安全性研究报告》&#xff0c;对2023第二季度和2023上半年的企业邮箱的安全风险进行了分析。 一 垃圾邮件同比下降16.38% 根据监测&#xff0c;2023年Q2垃圾邮件数量达到6.47亿封&#xff0c;环比下降…...

桶排序-1184:明明的随机数

【题目描述】 明明想在学校中请一些同学一起做一项问卷调查&#xff0c;为了实验的客观性&#xff0c;他先用计算机生成了N个1到1000之间的随机整数&#xff08;N≤100&#xff09;&#xff0c;对于其中重复的数字&#xff0c;只保留一个&#xff0c;把其余相同的数去掉&#x…...

Spring Boot中整合Keycloak OpenID Connect(OIDC)

在Spring Boot中整合Keycloak OpenID Connect&#xff08;OIDC&#xff09;是一个常见的任务&#xff0c;用于实现身份验证和授权。Keycloak是一个开源的身份和访问管理解决方案&#xff0c;而OpenID Connect是构建在OAuth 2.0之上的认证和授权协议。下面是一个简单的步骤指南&…...

如何使用Mac终端给树莓派pico构建C/C++程序进行开发,以及遇到各种问题该怎么处理,不使用任何IDE或编辑器(例如VS Code)

写本文的原因是官方的教程已经过时了&#xff0c;如果你现在按照官方教程来在 Mac 上进行配置&#xff0c;那么会遇到一堆问题&#xff0c;比如我几乎把能踩的“雷”都踩了。所以这里记录了完整过程&#xff0c;以及各种错误的原因和处理方法&#xff0c;不然以后换 Mac 了或者…...

linux 关机和重启

关机和重启 关机和重启之前最好先数据数据同步一下 # 将数据由内存同步到硬盘sync 关机 #shutdown [选项] 时间#立即进入维护模式shutdown now#立即重启shutdown -r now#20:00 重新启动计算机shutdown -r 20:00& #立即关机shutdown -h now# 20:00 关闭计算机shutdown -h 20…...

Python(八十三)字符串的比较操作

❤️ 专栏简介&#xff1a;本专栏记录了我个人从零开始学习Python编程的过程。在这个专栏中&#xff0c;我将分享我在学习Python的过程中的学习笔记、学习路线以及各个知识点。 ☀️ 专栏适用人群 &#xff1a;本专栏适用于希望学习Python编程的初学者和有一定编程基础的人。无…...

Java面试

Java面试宝典是一本面向Java开发者的面试准备指南&#xff0c;旨在帮助准备参加Java相关职位面试的人们更好地准备和应对面试。以下是一些可能在Java面试中涉及的主题和问题&#xff0c;供您参考&#xff1a; Java基础知识&#xff1a; 什么是Java虚拟机&#xff08;JVM&#x…...

基于java的voliate关键字详解

voliate关键字的作用: 一、内存可见性 基于缓存一致性协议&#xff0c;当用voliate关键字修饰的变量改动时&#xff0c;cpu会通知其他线程&#xff0c;缓存已被修改&#xff0c;需要更新缓存。这样每个线程都能获取到最新的变量值。 二、基于内存屏障的防止指令重排 用voli…...

企业计算机服务器中了360后缀勒索病毒怎么办,勒索病毒解密数据恢复

随着计算机技术的不断发展&#xff0c;企业的办公系统得到了很大提升&#xff0c;但是随之而来的网络安全威胁也不断增加&#xff0c;勒索病毒的攻击事件时有发生。近期&#xff0c;我们收到某地连锁超市的求助&#xff0c;企业的计算机服务器遭到了360后缀勒索病毒攻击&#x…...

W6100-EVB-PICO 做TCP Server进行回环测试(六)

前言 上一章我们用W6100-EVB-PICO开发板做TCP 客户端连接服务器进行数据回环测试&#xff0c;那么本章将用开发板做TCP服务器来进行数据回环测试。 TCP是什么&#xff1f;什么是TCP Server&#xff1f;能干什么&#xff1f; TCP (Transmission Control Protocol) 是一种面向连…...

前端小兔鲜儿2

day10-小兔鲜儿 01-banner-轮播图 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1FWNmPpj-1691944251554)(assets/1680344354682.png)] index.css HTML结构 <!-- banner --><div class"banner"><div class"wrappe…...

Pycharm 双击启动失败?

事故 双击 Pycharm 后&#xff0c;出现加载工程&#xff0c;我不想加载这个工程&#xff0c;就点击了弹出的 cancle 取消按钮。然后再到桌面双击 Pycharm 却发现无法启动了。哪怕以管理员权限运行也没用&#xff0c;就是不出界面。 原因未知 CtrlshiftESC 打开后台&#xff…...

spring 事务回滚失败异常

1 背景介绍 事务模板里抛异常&#xff0c;抛异常前的update操作成功&#xff0c;事务没有回滚成功&#xff0c;业务数据还是落db了。debug代码&#xff0c;发现GenericConnectionContext类中derivedConnectionMap是空的&#xff0c;导致回滚代码没有执行 2 解决方案 保证事务…...

Kafka 01——Kafka的安装及简单入门使用

Kafka 01——Kafka的安装及简单入门使用 1. 下载安装1.1 JDK的安装1.2 Zookeeper的安装1.2.1 关于Zookeeper版本的选择1.2.2 下载、安装Zookeeper 1.3 kafka的安装1.3.1 下载1.3.2 解压1.3.3 修改配置文件 2. 启动 kafka2.1 Kafka启动2.2 启动 kafka 遇到的问题2.2.1 问题12.2.…...

【爬虫】爬取旅行评论和评分

以马蜂窝“普达措国家公园”为例&#xff0c;其评论高达3000多条&#xff0c;但这3000多条并非是完全向用户展示的&#xff0c;向用户展示的只有5页&#xff0c;数了一下每页15条评论&#xff0c;也就是75条评论&#xff0c;有点太少了吧&#xff01; 因此想了个办法尽可能多爬…...

C++ 泛型编程:函数模板

文章目录 前言一、什么是泛型编程二、函数模板三、函数模板的使用四、多参数函数模板五&#xff0c;示例代码&#xff1a;总结 前言 当需要编写通用的代码以处理不同类型的数据时&#xff0c;C 中的函数模板是一个很有用的工具。函数模板允许我们编写一个通用的函数定义&#…...

Zustand 状态管理库:极简而强大的解决方案

Zustand 是一个轻量级、快速和可扩展的状态管理库&#xff0c;特别适合 React 应用。它以简洁的 API 和高效的性能解决了 Redux 等状态管理方案中的繁琐问题。 核心优势对比 基本使用指南 1. 创建 Store // store.js import create from zustandconst useStore create((set)…...

ssc377d修改flash分区大小

1、flash的分区默认分配16M、 / # df -h Filesystem Size Used Available Use% Mounted on /dev/root 1.9M 1.9M 0 100% / /dev/mtdblock4 3.0M...

如何在看板中有效管理突发紧急任务

在看板中有效管理突发紧急任务需要&#xff1a;设立专门的紧急任务通道、重新调整任务优先级、保持适度的WIP&#xff08;Work-in-Progress&#xff09;弹性、优化任务处理流程、提高团队应对突发情况的敏捷性。其中&#xff0c;设立专门的紧急任务通道尤为重要&#xff0c;这能…...

江苏艾立泰跨国资源接力:废料变黄金的绿色供应链革命

在华东塑料包装行业面临限塑令深度调整的背景下&#xff0c;江苏艾立泰以一场跨国资源接力的创新实践&#xff0c;重新定义了绿色供应链的边界。 跨国回收网络&#xff1a;废料变黄金的全球棋局 艾立泰在欧洲、东南亚建立再生塑料回收点&#xff0c;将海外废弃包装箱通过标准…...

Nginx server_name 配置说明

Nginx 是一个高性能的反向代理和负载均衡服务器&#xff0c;其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机&#xff08;Virtual Host&#xff09;。 1. 简介 Nginx 使用 server_name 指令来确定…...

Springcloud:Eureka 高可用集群搭建实战(服务注册与发现的底层原理与避坑指南)

引言&#xff1a;为什么 Eureka 依然是存量系统的核心&#xff1f; 尽管 Nacos 等新注册中心崛起&#xff0c;但金融、电力等保守行业仍有大量系统运行在 Eureka 上。理解其高可用设计与自我保护机制&#xff0c;是保障分布式系统稳定的必修课。本文将手把手带你搭建生产级 Eur…...

DBAPI如何优雅的获取单条数据

API如何优雅的获取单条数据 案例一 对于查询类API&#xff0c;查询的是单条数据&#xff0c;比如根据主键ID查询用户信息&#xff0c;sql如下&#xff1a; select id, name, age from user where id #{id}API默认返回的数据格式是多条的&#xff0c;如下&#xff1a; {&qu…...

优选算法第十二讲:队列 + 宽搜 优先级队列

优选算法第十二讲&#xff1a;队列 宽搜 && 优先级队列 1.N叉树的层序遍历2.二叉树的锯齿型层序遍历3.二叉树最大宽度4.在每个树行中找最大值5.优先级队列 -- 最后一块石头的重量6.数据流中的第K大元素7.前K个高频单词8.数据流的中位数 1.N叉树的层序遍历 2.二叉树的锯…...

代理篇12|深入理解 Vite中的Proxy接口代理配置

在前端开发中,常常会遇到 跨域请求接口 的情况。为了解决这个问题,Vite 和 Webpack 都提供了 proxy 代理功能,用于将本地开发请求转发到后端服务器。 什么是代理(proxy)? 代理是在开发过程中,前端项目通过开发服务器,将指定的请求“转发”到真实的后端服务器,从而绕…...

服务器--宝塔命令

一、宝塔面板安装命令 ⚠️ 必须使用 root 用户 或 sudo 权限执行&#xff01; sudo su - 1. CentOS 系统&#xff1a; yum install -y wget && wget -O install.sh http://download.bt.cn/install/install_6.0.sh && sh install.sh2. Ubuntu / Debian 系统…...