【RabbitMQ笔记07】消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式
这篇文章,主要接收消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式。
目录
一、消息队列
1.1、发布确认模式
1.2、案例代码
(1)引入依赖
(2)编写生产者【消息确认--单条确认】
(3)编写生产者【消息确认--批量确认】
(4)编写生产者【消息确认--异步确认】
一、消息队列
1.1、发布确认模式
RabbitMQ消息队列中,生产者发送消息给RabbitMQ的时候,可能会出现发送失败的情况,如果不进行处理,此时这一条消息就将丢失。如何确保生产者一定能够将消息发送到RabbitMQ里面呢???
RabbitMQ提出了一种发布确认模式,这种模式大致思想是:生产者发送消息给RabbitMQ时候,如果RabbitMQ正确接收到消息后,需要发给一个ACK标识给生产者,生产者接收到ACK标记后,就可以确认这一条消息发送成功啦。如果生产者没有接收到ACK标识,则可以重复发送这一条消息给RabbitMQ,这就可以确保消息不丢失。

发布确认模式有三种实现,分别是:逐条确认机制、批量确认机制、异步确认机制。
1.2、案例代码
(1)引入依赖
<!-- 引入 RabbitMQ 依赖 -->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version>
</dependency>
(2)编写生产者【消息确认--单条确认】
- 生产者发送消息的时候,需要调用【confirmSelect()】方法开启消息确认机制。
- 生产者将消息发送完成之后,需要调用【waitForConfirms()】方法,阻塞等待RabbitMQ消息队列返回ACK标识。这个方法返回一个boolean类型,true表示RabbitMQ接收消息成功,false表示接收失败。
- 【waitForConfirms()】方法还可以指定一个超时时间,如果在这个超时时间里面RabbitMQ还没有返回ACK标识,那么该方法将抛出一个InterruptedException中断异常。
package com.rabbitmq.demo.confirm;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** @version 1.0.0* @Date: 2023/2/25 16:23* @Copyright (C) ZhuYouBin* @Description: 消息生产者*/
public class Producer {public static void main(String[] args) {// 1、创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2、设置连接的 RabbitMQ 服务地址factory.setHost("127.0.0.1"); // 默认就是本机factory.setPort(5672); // 默认就是 5672 端口// 3、获取连接Connection connection = null; // 连接Channel channel = null; // 通道try {connection = factory.newConnection();// 4、获取通道channel = connection.createChannel();// TODO 开启消息确认机制channel.confirmSelect();// 5、声明 Exchange,如果不存在,则会创建String exchangeName = "exchange_direct_2023";channel.exchangeDeclare(exchangeName, "direct");// 6、发送消息for (int i = 0; i < 10; i++) {// 路由键唯一标识String routingKey = "error";if (i % 3 == 0) {routingKey = "info";} else if (i % 3 == 1) {routingKey = "warn";}String message = "这是发布确认模式,发送的第【" + (i+1) + "】条【" + routingKey + "】消息数据";channel.basicPublish(exchangeName, routingKey, null, message.getBytes());// 等待RabbitMQ返回ACK标识boolean wait = channel.waitForConfirms();System.out.println("RabbitMQ是否接收成功: " + wait);if (!wait) {// 消息发送失败,则可以重新发送channel.basicPublish(exchangeName, routingKey, null, message.getBytes());}}} catch (Exception e) {e.printStackTrace();} finally {if (null != channel) {try {channel.close();} catch (Exception e) {}}if (null != connection) {try {connection.close();} catch (Exception e) {}}}}
}
(3)编写生产者【消息确认--批量确认】
- 前一种方式,是一条消息就调用一次【waitForConfirms()】方法,阻塞等待RabbitMQ的ACK确认标识。
- 但是这种方式是非常耗时的,当需要发送的消息非常多的时候,会严重影响系统性能,所以为了解决这个问题,提出了批量确认的方法。
- 批量确认调用【waitForConfirmsOrDie()】方法,此时会等待一批消息的ACK确认标识,如果这一批消息中存在一个消息没有被RabbitMQ成功接收,此时该方法将抛出一个【IOException】异常。
- 所以,可以通过捕获IOException异常来判断消息是否发送成功。
- 这种方式的缺点:当一批消息出现失败的情况时候,我们没办法知道是哪一条消息失败了,只能够重新将这一批消息重新发送。
package com.rabbitmq.demo.confirm;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;/*** @version 1.0.0* @Date: 2023/2/25 16:23* @Copyright (C) ZhuYouBin* @Description: 消息生产者*/
public class ProducerBatch {public static void main(String[] args) {// 1、创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2、设置连接的 RabbitMQ 服务地址factory.setHost("127.0.0.1"); // 默认就是本机factory.setPort(5672); // 默认就是 5672 端口// 3、获取连接Connection connection = null; // 连接Channel channel = null; // 通道try {connection = factory.newConnection();// 4、获取通道channel = connection.createChannel();// TODO 开启消息确认机制channel.confirmSelect();// 5、声明 Exchange,如果不存在,则会创建String exchangeName = "exchange_direct_2023";channel.exchangeDeclare(exchangeName, "direct");// 6、发送消息int batchSize = 3;int count = 0;for (int i = 0; i < 10; i++) {// 路由键唯一标识String routingKey = "error";if (i % 3 == 0) {routingKey = "info";} else if (i % 3 == 1) {routingKey = "warn";}String message = "这是发布确认模式,发送的第【" + (i+1) + "】条【" + routingKey + "】消息数据";channel.basicPublish(exchangeName, routingKey, null, message.getBytes());// 批量确认if (count == batchSize) {// 等待RabbitMQ返回ACK标识channel.waitForConfirmsOrDie();count = 0;}count++;}} catch (IOException e) {System.out.println("消息发送失败啦");} catch (Exception e) {e.printStackTrace();} finally {if (null != channel) {try {channel.close();} catch (Exception e) {}}if (null != connection) {try {connection.close();} catch (Exception e) {}}}}
}
(4)编写生产者【消息确认--异步确认】
- 异步确认在消息发送之后,调用【addConfirmListener()】方法,该方法介绍两个参数,第一个参数是成功接收到ACK标识的回调方法,第二个参数是失败接收到NACK标识的回调方法。
- 注意:一定要先调用【addConfirmListener()】监听方法,然后再发送消息,如果两者顺序反了,则监听方法不生效。
package com.rabbitmq.demo.confirm;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;/*** @version 1.0.0* @Date: 2023/2/25 16:23* @Copyright (C) ZhuYouBin* @Description: 消息生产者*/
public class ProducerAsync {public static void main(String[] args) {// 1、创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2、设置连接的 RabbitMQ 服务地址factory.setHost("127.0.0.1"); // 默认就是本机factory.setPort(5672); // 默认就是 5672 端口// 3、获取连接Connection connection = null; // 连接Channel channel = null; // 通道try {connection = factory.newConnection();// 4、获取通道channel = connection.createChannel();// TODO 开启消息确认机制channel.confirmSelect();// 5、声明 Exchange,如果不存在,则会创建String exchangeName = "exchange_confirm_2023";channel.exchangeDeclare(exchangeName, "direct");// TODO 一定要先调用监听接口,在发送消息channel.addConfirmListener(new ConfirmCallback() {@Overridepublic void handle(long deliveryTag, boolean multiple) throws IOException {System.out.println("RabbitMQ接收成功啦.....消息的标识deliveryTag=" + deliveryTag + ",批量发送多条消息multiple=" + multiple);}}, new ConfirmCallback() {@Overridepublic void handle(long deliveryTag, boolean multiple) throws IOException {System.out.println("RabbitMQ接收失败啦.....");}});for (int i = 0; i < 10; i++) {// 6、发送消息String message = "这是发布确认模式,发送的消息数据";channel.basicPublish(exchangeName, "queue_confirm_2023", null, message.getBytes());}} catch (IOException e) {System.out.println("消息发送失败啦");} catch (Exception e) {e.printStackTrace();} finally {if (null != channel) {try {channel.close();} catch (Exception e) {}}if (null != connection) {try {connection.close();} catch (Exception e) {}}}}
}
到此,RabbitMQ消息队列中的发布确认模式就介绍完啦。
综上,这篇文章结束了,主要接收消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式。
相关文章:
【RabbitMQ笔记07】消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式
这篇文章,主要接收消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式。 目录 一、消息队列 1.1、发布确认模式 1.2、案例代码 (1)引入依赖 (2)编写生产者【消息确认--单条确认】 (3…...
【华为OD机试模拟题】用 C++ 实现 - IPv4 地址转换成整数(2023.Q1)
最近更新的博客 【华为OD机试模拟题】用 C++ 实现 - 去重求和(2023.Q1) 文章目录 最近更新的博客使用说明IPv4 地址转换成整数题目输入输出示例一输入输出说明示例一输入输出说明Code使用说明 参加华为od机试,一定要注意不要完全背诵代码,需要理解之后模仿写出,...
闭包与高阶函数
文中内容均来自于曾探《JavaScript设计模式与开发实践》的学习笔记。闭包作用域变量的作用域,就是指变量的有效范围。局部变量、全局变量。变量的搜索是从内到外而非从外到内的。变量的生命周期对于全局变量莱索,全局变量的生命周期是永久的,…...
人工智能轨道交通行业周刊-第35期(2023.2.20-2.26)
本期关键词:重庆智慧轨道、智能运维主机、标准轨距、地方铁路公报、景深、机器视觉应用 1 整理涉及公众号名单 1.1 行业类 RT轨道交通人民铁道世界轨道交通资讯网铁路信号技术交流北京铁路轨道交通网上榜铁路视点ITS World轨道交通联盟VSTR铁路与城市轨道交通Rai…...
快慢指针判断链表是否有环
快慢指针判断链表是否有环 单链表有可能存在环,有些情况下要判断一个单链表是否有环。数组的有个快慢指针的方法,其实单链表和数组有相似的地方,可以使用快慢指针的方法。具体做法如下: 首先创建两个指针,它们初始时…...
《MongoDB入门教程》第26篇 聚合统计之$max/$min表达式
本文将会介绍两个 MongoDB 表达式,返回一组数据中最大值的 $max 表达式,以及返回一组数据中最小值的 $min 表达式。 $max 表达式 $max 表达式用于返回一组数据中的最大值,语法如下: { $max: <expression> }$max 表达式在…...
FPGA纯verilog解码SDI视频 纯逻辑资源实现 提供2套工程源码和技术支持
目录1、前言2、硬件电路解析SDI摄像头Gv8601a单端转差GTX解串SDI解码VGA时序恢复YUV转RGB图像输出FDMA图像缓存HDMI输出3、工程1详解:无缓存输出4、工程2详解:缓存3帧输出5、上板调试验证并演示6、福利:工程代码的获取1、前言 FPGA实现SDI视…...
JVM篇之垃圾回收
一.如何判断对象可以回收 1.引用计数法 只要一个对象被其他变量所引用,就让它的计数加1,被引用了两次就让它的计数变成2,当这个变量的计数变成0时,就可以被垃圾回收; 弊端:当出现如下图的情况࿰…...
尝试用程序计算Π(3.141592653......)
文章目录1. π\piπ2. 用微积分来计算π\piπ2.1 原理2.2 代码2.3 结果2.4 分析1. π\piπ π\piπ的重要性或者地位不用多说,有时候还是很好奇,精确地π\piπ值是怎么计算出来的。研究π\piπ的精确计算应该是很多数学家计算机科学家努力的方向…...
【异常检测三件套】系列3--时序异常检测综述
写在前面: 异常检测共包含3个内容,从多个方面剖析异常检测方法,本文为第三篇。过往内容请查看以下链接: 【异常检测三件套】系列1--14种异常检测算法https://blog.csdn.net/allein_STR/article/details/128114175?csdn_share_tail=%7B%22type%22%3A%22blog%22%2C%22rType%…...
关于SAP 错误日志解析
有时候启动或操作sap会出现故障,只是察看sap用户当前目录下的日志文件可能不得要领,此时有必要察看work目录下的一些trace. 以Linux系统为例,其他的也差不多。 instance说明 如下 DVEBMGS?? ABAP Central Instance D?? …...
java:自定义变量加载到系统变量后替换shell模版并执行shell
这里的需求前提是,在项目中进行某些操作前,需要在命令后对shell配置文件的进行修改(如ip、port),这个对于用户是不友好的,需要改为用户页面输入ip、port,后台自动去操作修改配置;那么…...
Redis高级删除策略与数据淘汰
第二章:Redis高级 学习目标 目标1:能够说出redis中的数据删除策与略淘汰策略 目标2:能够说出主从复制的概念,工作流程以及场景问题及解决方案 目标3:能够说出哨兵的作用以及工作原理,以及如何启用哨兵 …...
社畜大学生的Python之pandas学习笔记,保姆入门级教学
接上期,上篇介绍了 NumPy,本篇介绍 pandas。 目录 pandas 入门pandas 的数据结构介绍基本功能汇总和计算描述统计处理缺失数据层次化索引 pandas 入门 Pandas 是基于 Numpy 构建的,让以 NumPy 为中心的应用变的更加简单。 Pandas是基于Numpy…...
20_FreeRTOS低功耗模式
目录 低功耗模式简介 STM32低功耗模式 Tickless模式详解 Tickless模式相关配置 实验源码 低功耗模式简介 很多应用场合对于功耗的要求很严格,比如可穿戴低功耗产品、物联网低功耗产品等。 一般MCU都有相应的低功耗模式,裸机开发时可以使用MCU的低功耗模式。 FreeRTOS也…...
Hive的使用方式
操作Hive可以在Shell命令行下操作,或者是使用JDBC代码的方式操作 针对命令行这种方式,其实还有两种使用 第一个是使用bin目录下的hive命令,这个是从hive一开始就支持的使用方式 后来又出现一个beeline命令,它是通过HiveServer2服…...
Flume三大核心组件
Flume的三大核心组件: Source:数据源 Channel:临时存储数据的管道 Sink:目的地 Source:数据源:通过source组件可以指定让Flume读取哪里的数据,然后将数据传递给后面的 channel Flume内置支持读…...
数据结构(六)二叉树
一、树形结构概念树是一种非线性的数据结构,它是由n(n>0)个有限结点组成一个具有层次关系的集合。把它叫做树是因为它看起来像一棵倒挂的树,也就是说它是根朝上,而叶朝下的。它具有以下的特点:1、有一个…...
Docker buildx 的跨平台编译
docker buildx 默认的 docker build 命令无法完成跨平台构建任务,我们需要为 docker 命令行安装 buildx 插件扩展其功能。buildx 能够使用由 Moby BuildKit 提供的构建镜像额外特性,它能够创建多个 builder 实例,在多个节点并行地执行构建任…...
【java基础】方法重载和方法重写
文章目录方法重载方法重写方法重载 方法重载就是可以在一个类里面定义多个相同名称的方法,只需要参数列表的个数或者类型不同就行。 public class Overload {public int add(int a, int b) {return a b;}public double add(double a, double b) {return a b;}}对…...
ABC系统实战指南:革新数字电路设计的逻辑综合与形式验证技术突破
ABC系统实战指南:革新数字电路设计的逻辑综合与形式验证技术突破 【免费下载链接】abc ABC: System for Sequential Logic Synthesis and Formal Verification 项目地址: https://gitcode.com/gh_mirrors/ab/abc 在现代集成电路设计流程中,工程师…...
NSudo:突破Windows权限壁垒的系统管理利器
NSudo:突破Windows权限壁垒的系统管理利器 【免费下载链接】NSudo [Deprecated, work in progress alternative: https://github.com/M2Team/NanaRun] Series of System Administration Tools 项目地址: https://gitcode.com/gh_mirrors/ns/NSudo 一、核心价…...
EDK II代码格式化集成指南:IDE集成步骤详解
EDK II代码格式化集成指南:IDE集成步骤详解 【免费下载链接】edk2 EDK II 项目地址: https://gitcode.com/gh_mirrors/ed/edk2 EDK II作为现代UEFI固件开发的核心框架,其代码质量直接影响到固件的稳定性和安全性。本文将详细介绍如何将EDK II代码…...
鸿蒙Next通讯录实战:用ArkUI 3.0手把手教你打造新建联系人页面(附完整代码)
鸿蒙Next通讯录实战:用ArkUI 3.0构建企业级新建联系人页面 在移动应用开发领域,通讯录功能一直是检验开发者UI构建和数据管理能力的经典场景。鸿蒙Next作为新一代分布式操作系统,其ArkUI 3.0框架为开发者提供了声明式UI编程范式,让…...
Python内存泄漏分析实战指南(生产环境零停机排查全流程)
第一章:Python内存泄漏的本质与危害Python内存泄漏并非源于C语言中常见的“未释放malloc内存”,而是指对象被意外长期持有,导致垃圾回收器(GC)无法将其回收,从而持续占用堆内存。其本质是**引用关系的非预期…...
macOS Unlocker V3.0:在Windows和Linux上免费运行macOS虚拟机的终极解决方案 [特殊字符]
macOS Unlocker V3.0:在Windows和Linux上免费运行macOS虚拟机的终极解决方案 🚀 【免费下载链接】unlocker 项目地址: https://gitcode.com/gh_mirrors/unlo/unlocker macOS Unlocker V3.0是一款革命性的开源工具,让您能够在Windows或…...
认知几何学:思维如何弯曲意义空间(世毫九实验室原创理论修订版)
认知几何学:思维如何弯曲意义空间(世毫九实验室原创理论修订版)Cognitive Geometry: How Thought Curves Meaning Space (Revised Edition)方见华 世毫九实验室 摘要 本文在《新累土哲学》“关系先于实体”的框架下,对认知几何学进…...
保姆级教程:在WSL上用AWS CLI配置MinIO临时访问凭证(含时区避坑指南)
在WSL中实战MinIO临时凭证:从配置到避坑的全流程指南 如果你正在Windows系统上使用WSL进行开发,并且需要为MinIO对象存储生成临时访问凭证,那么这篇文章将为你提供完整的解决方案。我们将从环境准备开始,逐步深入到凭证生成、策略…...
FireRedASR Pro模型架构浅析:从卷积神经网络到端到端设计
FireRedASR Pro模型架构浅析:从卷积神经网络到端到端设计 最近在语音识别圈子里,FireRedASR Pro这个名字被提到的次数越来越多了。不少朋友都在问,这个模型到底有什么特别之处,为什么大家都在讨论它。其实,它的核心魅…...
从 0 开始讲透 C++ Lambda(对标 Java)
在写 C 多线程或 STL 时,经常会看到这样的代码:std::thread t([]{ std::cout << "Hello C Thread\n"; });很多人第一反应:这 [] 是什么?为什么和 Java 不一样?一、先给结论(先建立整体认知…...
