当前位置: 首页 > news >正文

【RabbitMQ笔记07】消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式

这篇文章,主要接收消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式。

目录

一、消息队列

1.1、发布确认模式

1.2、案例代码

(1)引入依赖

(2)编写生产者【消息确认--单条确认】

(3)编写生产者【消息确认--批量确认】

(4)编写生产者【消息确认--异步确认】


一、消息队列

1.1、发布确认模式

RabbitMQ消息队列中,生产者发送消息给RabbitMQ的时候,可能会出现发送失败的情况,如果不进行处理,此时这一条消息就将丢失。如何确保生产者一定能够将消息发送到RabbitMQ里面呢???

RabbitMQ提出了一种发布确认模式,这种模式大致思想是:生产者发送消息给RabbitMQ时候,如果RabbitMQ正确接收到消息后,需要发给一个ACK标识给生产者,生产者接收到ACK标记后,就可以确认这一条消息发送成功啦。如果生产者没有接收到ACK标识,则可以重复发送这一条消息给RabbitMQ,这就可以确保消息不丢失。

发布确认模式有三种实现,分别是:逐条确认机制、批量确认机制、异步确认机制。

1.2、案例代码

(1)引入依赖

<!-- 引入 RabbitMQ 依赖 -->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version>
</dependency>

(2)编写生产者【消息确认--单条确认】

  • 生产者发送消息的时候,需要调用【confirmSelect()】方法开启消息确认机制。
  • 生产者将消息发送完成之后,需要调用【waitForConfirms()】方法,阻塞等待RabbitMQ消息队列返回ACK标识。这个方法返回一个boolean类型,true表示RabbitMQ接收消息成功,false表示接收失败。
  • 【waitForConfirms()】方法还可以指定一个超时时间,如果在这个超时时间里面RabbitMQ还没有返回ACK标识,那么该方法将抛出一个InterruptedException中断异常。
package com.rabbitmq.demo.confirm;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** @version 1.0.0* @Date: 2023/2/25 16:23* @Copyright (C) ZhuYouBin* @Description: 消息生产者*/
public class Producer {public static void main(String[] args) {// 1、创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2、设置连接的 RabbitMQ 服务地址factory.setHost("127.0.0.1"); // 默认就是本机factory.setPort(5672); // 默认就是 5672 端口// 3、获取连接Connection connection = null; // 连接Channel channel = null; // 通道try {connection = factory.newConnection();// 4、获取通道channel = connection.createChannel();// TODO 开启消息确认机制channel.confirmSelect();// 5、声明 Exchange,如果不存在,则会创建String exchangeName = "exchange_direct_2023";channel.exchangeDeclare(exchangeName, "direct");// 6、发送消息for (int i = 0; i < 10; i++) {// 路由键唯一标识String routingKey = "error";if (i % 3 == 0) {routingKey = "info";} else if (i % 3 == 1) {routingKey = "warn";}String message = "这是发布确认模式,发送的第【" + (i+1) + "】条【" + routingKey + "】消息数据";channel.basicPublish(exchangeName, routingKey, null, message.getBytes());// 等待RabbitMQ返回ACK标识boolean wait = channel.waitForConfirms();System.out.println("RabbitMQ是否接收成功: " + wait);if (!wait) {// 消息发送失败,则可以重新发送channel.basicPublish(exchangeName, routingKey, null, message.getBytes());}}} catch (Exception e) {e.printStackTrace();} finally {if (null != channel) {try {channel.close();} catch (Exception e) {}}if (null != connection) {try {connection.close();} catch (Exception e) {}}}}
}

(3)编写生产者【消息确认--批量确认】

  • 前一种方式,是一条消息就调用一次【waitForConfirms()】方法,阻塞等待RabbitMQ的ACK确认标识。
  • 但是这种方式是非常耗时的,当需要发送的消息非常多的时候,会严重影响系统性能,所以为了解决这个问题,提出了批量确认的方法。
  • 批量确认调用【waitForConfirmsOrDie()】方法,此时会等待一批消息的ACK确认标识,如果这一批消息中存在一个消息没有被RabbitMQ成功接收,此时该方法将抛出一个【IOException】异常。
  • 所以,可以通过捕获IOException异常来判断消息是否发送成功。
  • 这种方式的缺点:当一批消息出现失败的情况时候,我们没办法知道是哪一条消息失败了,只能够重新将这一批消息重新发送。
package com.rabbitmq.demo.confirm;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;/*** @version 1.0.0* @Date: 2023/2/25 16:23* @Copyright (C) ZhuYouBin* @Description: 消息生产者*/
public class ProducerBatch {public static void main(String[] args) {// 1、创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2、设置连接的 RabbitMQ 服务地址factory.setHost("127.0.0.1"); // 默认就是本机factory.setPort(5672); // 默认就是 5672 端口// 3、获取连接Connection connection = null; // 连接Channel channel = null; // 通道try {connection = factory.newConnection();// 4、获取通道channel = connection.createChannel();// TODO 开启消息确认机制channel.confirmSelect();// 5、声明 Exchange,如果不存在,则会创建String exchangeName = "exchange_direct_2023";channel.exchangeDeclare(exchangeName, "direct");// 6、发送消息int batchSize = 3;int count = 0;for (int i = 0; i < 10; i++) {// 路由键唯一标识String routingKey = "error";if (i % 3 == 0) {routingKey = "info";} else if (i % 3 == 1) {routingKey = "warn";}String message = "这是发布确认模式,发送的第【" + (i+1) + "】条【" + routingKey + "】消息数据";channel.basicPublish(exchangeName, routingKey, null, message.getBytes());// 批量确认if (count == batchSize) {// 等待RabbitMQ返回ACK标识channel.waitForConfirmsOrDie();count = 0;}count++;}} catch (IOException e) {System.out.println("消息发送失败啦");} catch (Exception e) {e.printStackTrace();} finally {if (null != channel) {try {channel.close();} catch (Exception e) {}}if (null != connection) {try {connection.close();} catch (Exception e) {}}}}
}

(4)编写生产者【消息确认--异步确认】

  • 异步确认在消息发送之后,调用【addConfirmListener()】方法,该方法介绍两个参数,第一个参数是成功接收到ACK标识的回调方法,第二个参数是失败接收到NACK标识的回调方法。
  • 注意:一定要先调用【addConfirmListener()】监听方法,然后再发送消息,如果两者顺序反了,则监听方法不生效。
package com.rabbitmq.demo.confirm;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;/*** @version 1.0.0* @Date: 2023/2/25 16:23* @Copyright (C) ZhuYouBin* @Description: 消息生产者*/
public class ProducerAsync {public static void main(String[] args) {// 1、创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2、设置连接的 RabbitMQ 服务地址factory.setHost("127.0.0.1"); // 默认就是本机factory.setPort(5672); // 默认就是 5672 端口// 3、获取连接Connection connection = null; // 连接Channel channel = null; // 通道try {connection = factory.newConnection();// 4、获取通道channel = connection.createChannel();// TODO 开启消息确认机制channel.confirmSelect();// 5、声明 Exchange,如果不存在,则会创建String exchangeName = "exchange_confirm_2023";channel.exchangeDeclare(exchangeName, "direct");// TODO 一定要先调用监听接口,在发送消息channel.addConfirmListener(new ConfirmCallback() {@Overridepublic void handle(long deliveryTag, boolean multiple) throws IOException {System.out.println("RabbitMQ接收成功啦.....消息的标识deliveryTag=" + deliveryTag + ",批量发送多条消息multiple=" + multiple);}}, new ConfirmCallback() {@Overridepublic void handle(long deliveryTag, boolean multiple) throws IOException {System.out.println("RabbitMQ接收失败啦.....");}});for (int i = 0; i < 10; i++) {// 6、发送消息String message = "这是发布确认模式,发送的消息数据";channel.basicPublish(exchangeName, "queue_confirm_2023", null, message.getBytes());}} catch (IOException e) {System.out.println("消息发送失败啦");} catch (Exception e) {e.printStackTrace();} finally {if (null != channel) {try {channel.close();} catch (Exception e) {}}if (null != connection) {try {connection.close();} catch (Exception e) {}}}}
}

到此,RabbitMQ消息队列中的发布确认模式就介绍完啦。

综上,这篇文章结束了,主要接收消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式。

相关文章:

【RabbitMQ笔记07】消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式

这篇文章&#xff0c;主要接收消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式。 目录 一、消息队列 1.1、发布确认模式 1.2、案例代码 &#xff08;1&#xff09;引入依赖 &#xff08;2&#xff09;编写生产者【消息确认--单条确认】 &#xff08;3&#xf…...

【华为OD机试模拟题】用 C++ 实现 - IPv4 地址转换成整数(2023.Q1)

最近更新的博客 【华为OD机试模拟题】用 C++ 实现 - 去重求和(2023.Q1) 文章目录 最近更新的博客使用说明IPv4 地址转换成整数题目输入输出示例一输入输出说明示例一输入输出说明Code使用说明 参加华为od机试,一定要注意不要完全背诵代码,需要理解之后模仿写出,...

闭包与高阶函数

文中内容均来自于曾探《JavaScript设计模式与开发实践》的学习笔记。闭包作用域变量的作用域&#xff0c;就是指变量的有效范围。局部变量、全局变量。变量的搜索是从内到外而非从外到内的。变量的生命周期对于全局变量莱索&#xff0c;全局变量的生命周期是永久的&#xff0c;…...

人工智能轨道交通行业周刊-第35期(2023.2.20-2.26)

本期关键词&#xff1a;重庆智慧轨道、智能运维主机、标准轨距、地方铁路公报、景深、机器视觉应用 1 整理涉及公众号名单 1.1 行业类 RT轨道交通人民铁道世界轨道交通资讯网铁路信号技术交流北京铁路轨道交通网上榜铁路视点ITS World轨道交通联盟VSTR铁路与城市轨道交通Rai…...

快慢指针判断链表是否有环

快慢指针判断链表是否有环 单链表有可能存在环&#xff0c;有些情况下要判断一个单链表是否有环。数组的有个快慢指针的方法&#xff0c;其实单链表和数组有相似的地方&#xff0c;可以使用快慢指针的方法。具体做法如下&#xff1a; 首先创建两个指针&#xff0c;它们初始时…...

《MongoDB入门教程》第26篇 聚合统计之$max/$min表达式

本文将会介绍两个 MongoDB 表达式&#xff0c;返回一组数据中最大值的 $max 表达式&#xff0c;以及返回一组数据中最小值的 $min 表达式。 $max 表达式 $max 表达式用于返回一组数据中的最大值&#xff0c;语法如下&#xff1a; { $max: <expression> }$max 表达式在…...

FPGA纯verilog解码SDI视频 纯逻辑资源实现 提供2套工程源码和技术支持

目录1、前言2、硬件电路解析SDI摄像头Gv8601a单端转差GTX解串SDI解码VGA时序恢复YUV转RGB图像输出FDMA图像缓存HDMI输出3、工程1详解&#xff1a;无缓存输出4、工程2详解&#xff1a;缓存3帧输出5、上板调试验证并演示6、福利&#xff1a;工程代码的获取1、前言 FPGA实现SDI视…...

JVM篇之垃圾回收

一.如何判断对象可以回收 1.引用计数法 只要一个对象被其他变量所引用&#xff0c;就让它的计数加1&#xff0c;被引用了两次就让它的计数变成2&#xff0c;当这个变量的计数变成0时&#xff0c;就可以被垃圾回收&#xff1b; 弊端&#xff1a;当出现如下图的情况&#xff0…...

尝试用程序计算Π(3.141592653......)

文章目录1. π\piπ2. 用微积分来计算π\piπ2.1 原理2.2 代码2.3 结果2.4 分析1. π\piπ π\piπ的重要性或者地位不用多说&#xff0c;有时候还是很好奇&#xff0c;精确地π\piπ值是怎么计算出来的。研究π\piπ的精确计算应该是很多数学家计算机科学家努力的方向&#xf…...

【异常检测三件套】系列3--时序异常检测综述

写在前面: 异常检测共包含3个内容,从多个方面剖析异常检测方法,本文为第三篇。过往内容请查看以下链接: 【异常检测三件套】系列1--14种异常检测算法https://blog.csdn.net/allein_STR/article/details/128114175?csdn_share_tail=%7B%22type%22%3A%22blog%22%2C%22rType%…...

关于SAP 错误日志解析

有时候启动或操作sap会出现故障&#xff0c;只是察看sap用户当前目录下的日志文件可能不得要领&#xff0c;此时有必要察看work目录下的一些trace. 以Linux系统为例&#xff0c;其他的也差不多。 instance说明 如下 DVEBMGS?? ABAP Central Instance D?? …...

java:自定义变量加载到系统变量后替换shell模版并执行shell

这里的需求前提是&#xff0c;在项目中进行某些操作前&#xff0c;需要在命令后对shell配置文件的进行修改&#xff08;如ip、port&#xff09;&#xff0c;这个对于用户是不友好的&#xff0c;需要改为用户页面输入ip、port&#xff0c;后台自动去操作修改配置&#xff1b;那么…...

Redis高级删除策略与数据淘汰

第二章&#xff1a;Redis高级 学习目标 目标1&#xff1a;能够说出redis中的数据删除策与略淘汰策略 目标2&#xff1a;能够说出主从复制的概念&#xff0c;工作流程以及场景问题及解决方案 目标3&#xff1a;能够说出哨兵的作用以及工作原理&#xff0c;以及如何启用哨兵 …...

社畜大学生的Python之pandas学习笔记,保姆入门级教学

接上期&#xff0c;上篇介绍了 NumPy&#xff0c;本篇介绍 pandas。 目录 pandas 入门pandas 的数据结构介绍基本功能汇总和计算描述统计处理缺失数据层次化索引 pandas 入门 Pandas 是基于 Numpy 构建的&#xff0c;让以 NumPy 为中心的应用变的更加简单。 Pandas是基于Numpy…...

20_FreeRTOS低功耗模式

目录 低功耗模式简介 STM32低功耗模式 Tickless模式详解 Tickless模式相关配置 实验源码 低功耗模式简介 很多应用场合对于功耗的要求很严格,比如可穿戴低功耗产品、物联网低功耗产品等。 一般MCU都有相应的低功耗模式,裸机开发时可以使用MCU的低功耗模式。 FreeRTOS也…...

Hive的使用方式

操作Hive可以在Shell命令行下操作&#xff0c;或者是使用JDBC代码的方式操作 针对命令行这种方式&#xff0c;其实还有两种使用 第一个是使用bin目录下的hive命令&#xff0c;这个是从hive一开始就支持的使用方式 后来又出现一个beeline命令&#xff0c;它是通过HiveServer2服…...

Flume三大核心组件

Flume的三大核心组件&#xff1a; Source&#xff1a;数据源 Channel&#xff1a;临时存储数据的管道 Sink&#xff1a;目的地 Source&#xff1a;数据源&#xff1a;通过source组件可以指定让Flume读取哪里的数据&#xff0c;然后将数据传递给后面的 channel Flume内置支持读…...

数据结构(六)二叉树

一、树形结构概念树是一种非线性的数据结构&#xff0c;它是由n&#xff08;n>0&#xff09;个有限结点组成一个具有层次关系的集合。把它叫做树是因为它看起来像一棵倒挂的树&#xff0c;也就是说它是根朝上&#xff0c;而叶朝下的。它具有以下的特点&#xff1a;1、有一个…...

Docker buildx 的跨平台编译

docker buildx 默认的 docker build 命令无法完成跨平台构建任务&#xff0c;我们需要为 docker 命令行安装 buildx 插件扩展其功能。buildx 能够使用由 Moby BuildKit 提供的构建镜像额外特性&#xff0c;它能够创建多个 builder 实例&#xff0c;在多个节点并行地执行构建任…...

【java基础】方法重载和方法重写

文章目录方法重载方法重写方法重载 方法重载就是可以在一个类里面定义多个相同名称的方法&#xff0c;只需要参数列表的个数或者类型不同就行。 public class Overload {public int add(int a, int b) {return a b;}public double add(double a, double b) {return a b;}}对…...

浏览器访问 AWS ECS 上部署的 Docker 容器(监听 80 端口)

✅ 一、ECS 服务配置 Dockerfile 确保监听 80 端口 EXPOSE 80 CMD ["nginx", "-g", "daemon off;"]或 EXPOSE 80 CMD ["python3", "-m", "http.server", "80"]任务定义&#xff08;Task Definition&…...

深入解析C++中的extern关键字:跨文件共享变量与函数的终极指南

&#x1f680; C extern 关键字深度解析&#xff1a;跨文件编程的终极指南 &#x1f4c5; 更新时间&#xff1a;2025年6月5日 &#x1f3f7;️ 标签&#xff1a;C | extern关键字 | 多文件编程 | 链接与声明 | 现代C 文章目录 前言&#x1f525;一、extern 是什么&#xff1f;&…...

SpringTask-03.入门案例

一.入门案例 启动类&#xff1a; package com.sky;import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cache.annotation.EnableCach…...

OpenLayers 分屏对比(地图联动)

注&#xff1a;当前使用的是 ol 5.3.0 版本&#xff0c;天地图使用的key请到天地图官网申请&#xff0c;并替换为自己的key 地图分屏对比在WebGIS开发中是很常见的功能&#xff0c;和卷帘图层不一样的是&#xff0c;分屏对比是在各个地图中添加相同或者不同的图层进行对比查看。…...

【JavaWeb】Docker项目部署

引言 之前学习了Linux操作系统的常见命令&#xff0c;在Linux上安装软件&#xff0c;以及如何在Linux上部署一个单体项目&#xff0c;大多数同学都会有相同的感受&#xff0c;那就是麻烦。 核心体现在三点&#xff1a; 命令太多了&#xff0c;记不住 软件安装包名字复杂&…...

优选算法第十二讲:队列 + 宽搜 优先级队列

优选算法第十二讲&#xff1a;队列 宽搜 && 优先级队列 1.N叉树的层序遍历2.二叉树的锯齿型层序遍历3.二叉树最大宽度4.在每个树行中找最大值5.优先级队列 -- 最后一块石头的重量6.数据流中的第K大元素7.前K个高频单词8.数据流的中位数 1.N叉树的层序遍历 2.二叉树的锯…...

python报错No module named ‘tensorflow.keras‘

是由于不同版本的tensorflow下的keras所在的路径不同&#xff0c;结合所安装的tensorflow的目录结构修改from语句即可。 原语句&#xff1a; from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense 修改后&#xff1a; from tensorflow.python.keras.lay…...

招商蛇口 | 执笔CID,启幕低密生活新境

作为中国城市生长的力量&#xff0c;招商蛇口以“美好生活承载者”为使命&#xff0c;深耕全球111座城市&#xff0c;以央企担当匠造时代理想人居。从深圳湾的开拓基因到西安高新CID的战略落子&#xff0c;招商蛇口始终与城市发展同频共振&#xff0c;以建筑诠释对土地与生活的…...

uniapp 字符包含的相关方法

在uniapp中&#xff0c;如果你想检查一个字符串是否包含另一个子字符串&#xff0c;你可以使用JavaScript中的includes()方法或者indexOf()方法。这两种方法都可以达到目的&#xff0c;但它们在处理方式和返回值上有所不同。 使用includes()方法 includes()方法用于判断一个字…...

永磁同步电机无速度算法--基于卡尔曼滤波器的滑模观测器

一、原理介绍 传统滑模观测器采用如下结构&#xff1a; 传统SMO中LPF会带来相位延迟和幅值衰减&#xff0c;并且需要额外的相位补偿。 采用扩展卡尔曼滤波器代替常用低通滤波器(LPF)&#xff0c;可以去除高次谐波&#xff0c;并且不用相位补偿就可以获得一个误差较小的转子位…...