【RabbitMQ笔记07】消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式
这篇文章,主要接收消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式。
目录
一、消息队列
1.1、发布确认模式
1.2、案例代码
(1)引入依赖
(2)编写生产者【消息确认--单条确认】
(3)编写生产者【消息确认--批量确认】
(4)编写生产者【消息确认--异步确认】
一、消息队列
1.1、发布确认模式
RabbitMQ消息队列中,生产者发送消息给RabbitMQ的时候,可能会出现发送失败的情况,如果不进行处理,此时这一条消息就将丢失。如何确保生产者一定能够将消息发送到RabbitMQ里面呢???
RabbitMQ提出了一种发布确认模式,这种模式大致思想是:生产者发送消息给RabbitMQ时候,如果RabbitMQ正确接收到消息后,需要发给一个ACK标识给生产者,生产者接收到ACK标记后,就可以确认这一条消息发送成功啦。如果生产者没有接收到ACK标识,则可以重复发送这一条消息给RabbitMQ,这就可以确保消息不丢失。
发布确认模式有三种实现,分别是:逐条确认机制、批量确认机制、异步确认机制。
1.2、案例代码
(1)引入依赖
<!-- 引入 RabbitMQ 依赖 -->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version>
</dependency>
(2)编写生产者【消息确认--单条确认】
- 生产者发送消息的时候,需要调用【confirmSelect()】方法开启消息确认机制。
- 生产者将消息发送完成之后,需要调用【waitForConfirms()】方法,阻塞等待RabbitMQ消息队列返回ACK标识。这个方法返回一个boolean类型,true表示RabbitMQ接收消息成功,false表示接收失败。
- 【waitForConfirms()】方法还可以指定一个超时时间,如果在这个超时时间里面RabbitMQ还没有返回ACK标识,那么该方法将抛出一个InterruptedException中断异常。
package com.rabbitmq.demo.confirm;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** @version 1.0.0* @Date: 2023/2/25 16:23* @Copyright (C) ZhuYouBin* @Description: 消息生产者*/
public class Producer {public static void main(String[] args) {// 1、创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2、设置连接的 RabbitMQ 服务地址factory.setHost("127.0.0.1"); // 默认就是本机factory.setPort(5672); // 默认就是 5672 端口// 3、获取连接Connection connection = null; // 连接Channel channel = null; // 通道try {connection = factory.newConnection();// 4、获取通道channel = connection.createChannel();// TODO 开启消息确认机制channel.confirmSelect();// 5、声明 Exchange,如果不存在,则会创建String exchangeName = "exchange_direct_2023";channel.exchangeDeclare(exchangeName, "direct");// 6、发送消息for (int i = 0; i < 10; i++) {// 路由键唯一标识String routingKey = "error";if (i % 3 == 0) {routingKey = "info";} else if (i % 3 == 1) {routingKey = "warn";}String message = "这是发布确认模式,发送的第【" + (i+1) + "】条【" + routingKey + "】消息数据";channel.basicPublish(exchangeName, routingKey, null, message.getBytes());// 等待RabbitMQ返回ACK标识boolean wait = channel.waitForConfirms();System.out.println("RabbitMQ是否接收成功: " + wait);if (!wait) {// 消息发送失败,则可以重新发送channel.basicPublish(exchangeName, routingKey, null, message.getBytes());}}} catch (Exception e) {e.printStackTrace();} finally {if (null != channel) {try {channel.close();} catch (Exception e) {}}if (null != connection) {try {connection.close();} catch (Exception e) {}}}}
}
(3)编写生产者【消息确认--批量确认】
- 前一种方式,是一条消息就调用一次【waitForConfirms()】方法,阻塞等待RabbitMQ的ACK确认标识。
- 但是这种方式是非常耗时的,当需要发送的消息非常多的时候,会严重影响系统性能,所以为了解决这个问题,提出了批量确认的方法。
- 批量确认调用【waitForConfirmsOrDie()】方法,此时会等待一批消息的ACK确认标识,如果这一批消息中存在一个消息没有被RabbitMQ成功接收,此时该方法将抛出一个【IOException】异常。
- 所以,可以通过捕获IOException异常来判断消息是否发送成功。
- 这种方式的缺点:当一批消息出现失败的情况时候,我们没办法知道是哪一条消息失败了,只能够重新将这一批消息重新发送。
package com.rabbitmq.demo.confirm;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;/*** @version 1.0.0* @Date: 2023/2/25 16:23* @Copyright (C) ZhuYouBin* @Description: 消息生产者*/
public class ProducerBatch {public static void main(String[] args) {// 1、创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2、设置连接的 RabbitMQ 服务地址factory.setHost("127.0.0.1"); // 默认就是本机factory.setPort(5672); // 默认就是 5672 端口// 3、获取连接Connection connection = null; // 连接Channel channel = null; // 通道try {connection = factory.newConnection();// 4、获取通道channel = connection.createChannel();// TODO 开启消息确认机制channel.confirmSelect();// 5、声明 Exchange,如果不存在,则会创建String exchangeName = "exchange_direct_2023";channel.exchangeDeclare(exchangeName, "direct");// 6、发送消息int batchSize = 3;int count = 0;for (int i = 0; i < 10; i++) {// 路由键唯一标识String routingKey = "error";if (i % 3 == 0) {routingKey = "info";} else if (i % 3 == 1) {routingKey = "warn";}String message = "这是发布确认模式,发送的第【" + (i+1) + "】条【" + routingKey + "】消息数据";channel.basicPublish(exchangeName, routingKey, null, message.getBytes());// 批量确认if (count == batchSize) {// 等待RabbitMQ返回ACK标识channel.waitForConfirmsOrDie();count = 0;}count++;}} catch (IOException e) {System.out.println("消息发送失败啦");} catch (Exception e) {e.printStackTrace();} finally {if (null != channel) {try {channel.close();} catch (Exception e) {}}if (null != connection) {try {connection.close();} catch (Exception e) {}}}}
}
(4)编写生产者【消息确认--异步确认】
- 异步确认在消息发送之后,调用【addConfirmListener()】方法,该方法介绍两个参数,第一个参数是成功接收到ACK标识的回调方法,第二个参数是失败接收到NACK标识的回调方法。
- 注意:一定要先调用【addConfirmListener()】监听方法,然后再发送消息,如果两者顺序反了,则监听方法不生效。
package com.rabbitmq.demo.confirm;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;/*** @version 1.0.0* @Date: 2023/2/25 16:23* @Copyright (C) ZhuYouBin* @Description: 消息生产者*/
public class ProducerAsync {public static void main(String[] args) {// 1、创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2、设置连接的 RabbitMQ 服务地址factory.setHost("127.0.0.1"); // 默认就是本机factory.setPort(5672); // 默认就是 5672 端口// 3、获取连接Connection connection = null; // 连接Channel channel = null; // 通道try {connection = factory.newConnection();// 4、获取通道channel = connection.createChannel();// TODO 开启消息确认机制channel.confirmSelect();// 5、声明 Exchange,如果不存在,则会创建String exchangeName = "exchange_confirm_2023";channel.exchangeDeclare(exchangeName, "direct");// TODO 一定要先调用监听接口,在发送消息channel.addConfirmListener(new ConfirmCallback() {@Overridepublic void handle(long deliveryTag, boolean multiple) throws IOException {System.out.println("RabbitMQ接收成功啦.....消息的标识deliveryTag=" + deliveryTag + ",批量发送多条消息multiple=" + multiple);}}, new ConfirmCallback() {@Overridepublic void handle(long deliveryTag, boolean multiple) throws IOException {System.out.println("RabbitMQ接收失败啦.....");}});for (int i = 0; i < 10; i++) {// 6、发送消息String message = "这是发布确认模式,发送的消息数据";channel.basicPublish(exchangeName, "queue_confirm_2023", null, message.getBytes());}} catch (IOException e) {System.out.println("消息发送失败啦");} catch (Exception e) {e.printStackTrace();} finally {if (null != channel) {try {channel.close();} catch (Exception e) {}}if (null != connection) {try {connection.close();} catch (Exception e) {}}}}
}
到此,RabbitMQ消息队列中的发布确认模式就介绍完啦。
综上,这篇文章结束了,主要接收消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式。
相关文章:

【RabbitMQ笔记07】消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式
这篇文章,主要接收消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式。 目录 一、消息队列 1.1、发布确认模式 1.2、案例代码 (1)引入依赖 (2)编写生产者【消息确认--单条确认】 (3…...

【华为OD机试模拟题】用 C++ 实现 - IPv4 地址转换成整数(2023.Q1)
最近更新的博客 【华为OD机试模拟题】用 C++ 实现 - 去重求和(2023.Q1) 文章目录 最近更新的博客使用说明IPv4 地址转换成整数题目输入输出示例一输入输出说明示例一输入输出说明Code使用说明 参加华为od机试,一定要注意不要完全背诵代码,需要理解之后模仿写出,...
闭包与高阶函数
文中内容均来自于曾探《JavaScript设计模式与开发实践》的学习笔记。闭包作用域变量的作用域,就是指变量的有效范围。局部变量、全局变量。变量的搜索是从内到外而非从外到内的。变量的生命周期对于全局变量莱索,全局变量的生命周期是永久的,…...

人工智能轨道交通行业周刊-第35期(2023.2.20-2.26)
本期关键词:重庆智慧轨道、智能运维主机、标准轨距、地方铁路公报、景深、机器视觉应用 1 整理涉及公众号名单 1.1 行业类 RT轨道交通人民铁道世界轨道交通资讯网铁路信号技术交流北京铁路轨道交通网上榜铁路视点ITS World轨道交通联盟VSTR铁路与城市轨道交通Rai…...
快慢指针判断链表是否有环
快慢指针判断链表是否有环 单链表有可能存在环,有些情况下要判断一个单链表是否有环。数组的有个快慢指针的方法,其实单链表和数组有相似的地方,可以使用快慢指针的方法。具体做法如下: 首先创建两个指针,它们初始时…...
《MongoDB入门教程》第26篇 聚合统计之$max/$min表达式
本文将会介绍两个 MongoDB 表达式,返回一组数据中最大值的 $max 表达式,以及返回一组数据中最小值的 $min 表达式。 $max 表达式 $max 表达式用于返回一组数据中的最大值,语法如下: { $max: <expression> }$max 表达式在…...

FPGA纯verilog解码SDI视频 纯逻辑资源实现 提供2套工程源码和技术支持
目录1、前言2、硬件电路解析SDI摄像头Gv8601a单端转差GTX解串SDI解码VGA时序恢复YUV转RGB图像输出FDMA图像缓存HDMI输出3、工程1详解:无缓存输出4、工程2详解:缓存3帧输出5、上板调试验证并演示6、福利:工程代码的获取1、前言 FPGA实现SDI视…...

JVM篇之垃圾回收
一.如何判断对象可以回收 1.引用计数法 只要一个对象被其他变量所引用,就让它的计数加1,被引用了两次就让它的计数变成2,当这个变量的计数变成0时,就可以被垃圾回收; 弊端:当出现如下图的情况࿰…...

尝试用程序计算Π(3.141592653......)
文章目录1. π\piπ2. 用微积分来计算π\piπ2.1 原理2.2 代码2.3 结果2.4 分析1. π\piπ π\piπ的重要性或者地位不用多说,有时候还是很好奇,精确地π\piπ值是怎么计算出来的。研究π\piπ的精确计算应该是很多数学家计算机科学家努力的方向…...
【异常检测三件套】系列3--时序异常检测综述
写在前面: 异常检测共包含3个内容,从多个方面剖析异常检测方法,本文为第三篇。过往内容请查看以下链接: 【异常检测三件套】系列1--14种异常检测算法https://blog.csdn.net/allein_STR/article/details/128114175?csdn_share_tail=%7B%22type%22%3A%22blog%22%2C%22rType%…...
关于SAP 错误日志解析
有时候启动或操作sap会出现故障,只是察看sap用户当前目录下的日志文件可能不得要领,此时有必要察看work目录下的一些trace. 以Linux系统为例,其他的也差不多。 instance说明 如下 DVEBMGS?? ABAP Central Instance D?? …...

java:自定义变量加载到系统变量后替换shell模版并执行shell
这里的需求前提是,在项目中进行某些操作前,需要在命令后对shell配置文件的进行修改(如ip、port),这个对于用户是不友好的,需要改为用户页面输入ip、port,后台自动去操作修改配置;那么…...

Redis高级删除策略与数据淘汰
第二章:Redis高级 学习目标 目标1:能够说出redis中的数据删除策与略淘汰策略 目标2:能够说出主从复制的概念,工作流程以及场景问题及解决方案 目标3:能够说出哨兵的作用以及工作原理,以及如何启用哨兵 …...

社畜大学生的Python之pandas学习笔记,保姆入门级教学
接上期,上篇介绍了 NumPy,本篇介绍 pandas。 目录 pandas 入门pandas 的数据结构介绍基本功能汇总和计算描述统计处理缺失数据层次化索引 pandas 入门 Pandas 是基于 Numpy 构建的,让以 NumPy 为中心的应用变的更加简单。 Pandas是基于Numpy…...

20_FreeRTOS低功耗模式
目录 低功耗模式简介 STM32低功耗模式 Tickless模式详解 Tickless模式相关配置 实验源码 低功耗模式简介 很多应用场合对于功耗的要求很严格,比如可穿戴低功耗产品、物联网低功耗产品等。 一般MCU都有相应的低功耗模式,裸机开发时可以使用MCU的低功耗模式。 FreeRTOS也…...
Hive的使用方式
操作Hive可以在Shell命令行下操作,或者是使用JDBC代码的方式操作 针对命令行这种方式,其实还有两种使用 第一个是使用bin目录下的hive命令,这个是从hive一开始就支持的使用方式 后来又出现一个beeline命令,它是通过HiveServer2服…...
Flume三大核心组件
Flume的三大核心组件: Source:数据源 Channel:临时存储数据的管道 Sink:目的地 Source:数据源:通过source组件可以指定让Flume读取哪里的数据,然后将数据传递给后面的 channel Flume内置支持读…...

数据结构(六)二叉树
一、树形结构概念树是一种非线性的数据结构,它是由n(n>0)个有限结点组成一个具有层次关系的集合。把它叫做树是因为它看起来像一棵倒挂的树,也就是说它是根朝上,而叶朝下的。它具有以下的特点:1、有一个…...

Docker buildx 的跨平台编译
docker buildx 默认的 docker build 命令无法完成跨平台构建任务,我们需要为 docker 命令行安装 buildx 插件扩展其功能。buildx 能够使用由 Moby BuildKit 提供的构建镜像额外特性,它能够创建多个 builder 实例,在多个节点并行地执行构建任…...
【java基础】方法重载和方法重写
文章目录方法重载方法重写方法重载 方法重载就是可以在一个类里面定义多个相同名称的方法,只需要参数列表的个数或者类型不同就行。 public class Overload {public int add(int a, int b) {return a b;}public double add(double a, double b) {return a b;}}对…...
Android Wi-Fi 连接失败日志分析
1. Android wifi 关键日志总结 (1) Wi-Fi 断开 (CTRL-EVENT-DISCONNECTED reason3) 日志相关部分: 06-05 10:48:40.987 943 943 I wpa_supplicant: wlan0: CTRL-EVENT-DISCONNECTED bssid44:9b:c1:57:a8:90 reason3 locally_generated1解析: CTR…...

【Java_EE】Spring MVC
目录 Spring Web MVC 编辑注解 RestController RequestMapping RequestParam RequestParam RequestBody PathVariable RequestPart 参数传递 注意事项 编辑参数重命名 RequestParam 编辑编辑传递集合 RequestParam 传递JSON数据 编辑RequestBody …...

C++ 求圆面积的程序(Program to find area of a circle)
给定半径r,求圆的面积。圆的面积应精确到小数点后5位。 例子: 输入:r 5 输出:78.53982 解释:由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982,因为我们只保留小数点后 5 位数字。 输…...

SpringCloudGateway 自定义局部过滤器
场景: 将所有请求转化为同一路径请求(方便穿网配置)在请求头内标识原来路径,然后在将请求分发给不同服务 AllToOneGatewayFilterFactory import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; impor…...
MySQL账号权限管理指南:安全创建账户与精细授权技巧
在MySQL数据库管理中,合理创建用户账号并分配精确权限是保障数据安全的核心环节。直接使用root账号进行所有操作不仅危险且难以审计操作行为。今天我们来全面解析MySQL账号创建与权限分配的专业方法。 一、为何需要创建独立账号? 最小权限原则…...

html css js网页制作成品——HTML+CSS榴莲商城网页设计(4页)附源码
目录 一、👨🎓网站题目 二、✍️网站描述 三、📚网站介绍 四、🌐网站效果 五、🪓 代码实现 🧱HTML 六、🥇 如何让学习不再盲目 七、🎁更多干货 一、👨…...

Selenium常用函数介绍
目录 一,元素定位 1.1 cssSeector 1.2 xpath 二,操作测试对象 三,窗口 3.1 案例 3.2 窗口切换 3.3 窗口大小 3.4 屏幕截图 3.5 关闭窗口 四,弹窗 五,等待 六,导航 七,文件上传 …...

接口自动化测试:HttpRunner基础
相关文档 HttpRunner V3.x中文文档 HttpRunner 用户指南 使用HttpRunner 3.x实现接口自动化测试 HttpRunner介绍 HttpRunner 是一个开源的 API 测试工具,支持 HTTP(S)/HTTP2/WebSocket/RPC 等网络协议,涵盖接口测试、性能测试、数字体验监测等测试类型…...

脑机新手指南(七):OpenBCI_GUI:从环境搭建到数据可视化(上)
一、OpenBCI_GUI 项目概述 (一)项目背景与目标 OpenBCI 是一个开源的脑电信号采集硬件平台,其配套的 OpenBCI_GUI 则是专为该硬件设计的图形化界面工具。对于研究人员、开发者和学生而言,首次接触 OpenBCI 设备时,往…...

[论文阅读]TrustRAG: Enhancing Robustness and Trustworthiness in RAG
TrustRAG: Enhancing Robustness and Trustworthiness in RAG [2501.00879] TrustRAG: Enhancing Robustness and Trustworthiness in Retrieval-Augmented Generation 代码:HuichiZhou/TrustRAG: Code for "TrustRAG: Enhancing Robustness and Trustworthin…...