当前位置: 首页 > news >正文

【RabbitMQ笔记07】消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式

这篇文章,主要接收消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式。

目录

一、消息队列

1.1、发布确认模式

1.2、案例代码

(1)引入依赖

(2)编写生产者【消息确认--单条确认】

(3)编写生产者【消息确认--批量确认】

(4)编写生产者【消息确认--异步确认】


一、消息队列

1.1、发布确认模式

RabbitMQ消息队列中,生产者发送消息给RabbitMQ的时候,可能会出现发送失败的情况,如果不进行处理,此时这一条消息就将丢失。如何确保生产者一定能够将消息发送到RabbitMQ里面呢???

RabbitMQ提出了一种发布确认模式,这种模式大致思想是:生产者发送消息给RabbitMQ时候,如果RabbitMQ正确接收到消息后,需要发给一个ACK标识给生产者,生产者接收到ACK标记后,就可以确认这一条消息发送成功啦。如果生产者没有接收到ACK标识,则可以重复发送这一条消息给RabbitMQ,这就可以确保消息不丢失。

发布确认模式有三种实现,分别是:逐条确认机制、批量确认机制、异步确认机制。

1.2、案例代码

(1)引入依赖

<!-- 引入 RabbitMQ 依赖 -->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version>
</dependency>

(2)编写生产者【消息确认--单条确认】

  • 生产者发送消息的时候,需要调用【confirmSelect()】方法开启消息确认机制。
  • 生产者将消息发送完成之后,需要调用【waitForConfirms()】方法,阻塞等待RabbitMQ消息队列返回ACK标识。这个方法返回一个boolean类型,true表示RabbitMQ接收消息成功,false表示接收失败。
  • 【waitForConfirms()】方法还可以指定一个超时时间,如果在这个超时时间里面RabbitMQ还没有返回ACK标识,那么该方法将抛出一个InterruptedException中断异常。
package com.rabbitmq.demo.confirm;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** @version 1.0.0* @Date: 2023/2/25 16:23* @Copyright (C) ZhuYouBin* @Description: 消息生产者*/
public class Producer {public static void main(String[] args) {// 1、创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2、设置连接的 RabbitMQ 服务地址factory.setHost("127.0.0.1"); // 默认就是本机factory.setPort(5672); // 默认就是 5672 端口// 3、获取连接Connection connection = null; // 连接Channel channel = null; // 通道try {connection = factory.newConnection();// 4、获取通道channel = connection.createChannel();// TODO 开启消息确认机制channel.confirmSelect();// 5、声明 Exchange,如果不存在,则会创建String exchangeName = "exchange_direct_2023";channel.exchangeDeclare(exchangeName, "direct");// 6、发送消息for (int i = 0; i < 10; i++) {// 路由键唯一标识String routingKey = "error";if (i % 3 == 0) {routingKey = "info";} else if (i % 3 == 1) {routingKey = "warn";}String message = "这是发布确认模式,发送的第【" + (i+1) + "】条【" + routingKey + "】消息数据";channel.basicPublish(exchangeName, routingKey, null, message.getBytes());// 等待RabbitMQ返回ACK标识boolean wait = channel.waitForConfirms();System.out.println("RabbitMQ是否接收成功: " + wait);if (!wait) {// 消息发送失败,则可以重新发送channel.basicPublish(exchangeName, routingKey, null, message.getBytes());}}} catch (Exception e) {e.printStackTrace();} finally {if (null != channel) {try {channel.close();} catch (Exception e) {}}if (null != connection) {try {connection.close();} catch (Exception e) {}}}}
}

(3)编写生产者【消息确认--批量确认】

  • 前一种方式,是一条消息就调用一次【waitForConfirms()】方法,阻塞等待RabbitMQ的ACK确认标识。
  • 但是这种方式是非常耗时的,当需要发送的消息非常多的时候,会严重影响系统性能,所以为了解决这个问题,提出了批量确认的方法。
  • 批量确认调用【waitForConfirmsOrDie()】方法,此时会等待一批消息的ACK确认标识,如果这一批消息中存在一个消息没有被RabbitMQ成功接收,此时该方法将抛出一个【IOException】异常。
  • 所以,可以通过捕获IOException异常来判断消息是否发送成功。
  • 这种方式的缺点:当一批消息出现失败的情况时候,我们没办法知道是哪一条消息失败了,只能够重新将这一批消息重新发送。
package com.rabbitmq.demo.confirm;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;/*** @version 1.0.0* @Date: 2023/2/25 16:23* @Copyright (C) ZhuYouBin* @Description: 消息生产者*/
public class ProducerBatch {public static void main(String[] args) {// 1、创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2、设置连接的 RabbitMQ 服务地址factory.setHost("127.0.0.1"); // 默认就是本机factory.setPort(5672); // 默认就是 5672 端口// 3、获取连接Connection connection = null; // 连接Channel channel = null; // 通道try {connection = factory.newConnection();// 4、获取通道channel = connection.createChannel();// TODO 开启消息确认机制channel.confirmSelect();// 5、声明 Exchange,如果不存在,则会创建String exchangeName = "exchange_direct_2023";channel.exchangeDeclare(exchangeName, "direct");// 6、发送消息int batchSize = 3;int count = 0;for (int i = 0; i < 10; i++) {// 路由键唯一标识String routingKey = "error";if (i % 3 == 0) {routingKey = "info";} else if (i % 3 == 1) {routingKey = "warn";}String message = "这是发布确认模式,发送的第【" + (i+1) + "】条【" + routingKey + "】消息数据";channel.basicPublish(exchangeName, routingKey, null, message.getBytes());// 批量确认if (count == batchSize) {// 等待RabbitMQ返回ACK标识channel.waitForConfirmsOrDie();count = 0;}count++;}} catch (IOException e) {System.out.println("消息发送失败啦");} catch (Exception e) {e.printStackTrace();} finally {if (null != channel) {try {channel.close();} catch (Exception e) {}}if (null != connection) {try {connection.close();} catch (Exception e) {}}}}
}

(4)编写生产者【消息确认--异步确认】

  • 异步确认在消息发送之后,调用【addConfirmListener()】方法,该方法介绍两个参数,第一个参数是成功接收到ACK标识的回调方法,第二个参数是失败接收到NACK标识的回调方法。
  • 注意:一定要先调用【addConfirmListener()】监听方法,然后再发送消息,如果两者顺序反了,则监听方法不生效。
package com.rabbitmq.demo.confirm;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;/*** @version 1.0.0* @Date: 2023/2/25 16:23* @Copyright (C) ZhuYouBin* @Description: 消息生产者*/
public class ProducerAsync {public static void main(String[] args) {// 1、创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2、设置连接的 RabbitMQ 服务地址factory.setHost("127.0.0.1"); // 默认就是本机factory.setPort(5672); // 默认就是 5672 端口// 3、获取连接Connection connection = null; // 连接Channel channel = null; // 通道try {connection = factory.newConnection();// 4、获取通道channel = connection.createChannel();// TODO 开启消息确认机制channel.confirmSelect();// 5、声明 Exchange,如果不存在,则会创建String exchangeName = "exchange_confirm_2023";channel.exchangeDeclare(exchangeName, "direct");// TODO 一定要先调用监听接口,在发送消息channel.addConfirmListener(new ConfirmCallback() {@Overridepublic void handle(long deliveryTag, boolean multiple) throws IOException {System.out.println("RabbitMQ接收成功啦.....消息的标识deliveryTag=" + deliveryTag + ",批量发送多条消息multiple=" + multiple);}}, new ConfirmCallback() {@Overridepublic void handle(long deliveryTag, boolean multiple) throws IOException {System.out.println("RabbitMQ接收失败啦.....");}});for (int i = 0; i < 10; i++) {// 6、发送消息String message = "这是发布确认模式,发送的消息数据";channel.basicPublish(exchangeName, "queue_confirm_2023", null, message.getBytes());}} catch (IOException e) {System.out.println("消息发送失败啦");} catch (Exception e) {e.printStackTrace();} finally {if (null != channel) {try {channel.close();} catch (Exception e) {}}if (null != connection) {try {connection.close();} catch (Exception e) {}}}}
}

到此,RabbitMQ消息队列中的发布确认模式就介绍完啦。

综上,这篇文章结束了,主要接收消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式。

相关文章:

【RabbitMQ笔记07】消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式

这篇文章&#xff0c;主要接收消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式。 目录 一、消息队列 1.1、发布确认模式 1.2、案例代码 &#xff08;1&#xff09;引入依赖 &#xff08;2&#xff09;编写生产者【消息确认--单条确认】 &#xff08;3&#xf…...

【华为OD机试模拟题】用 C++ 实现 - IPv4 地址转换成整数(2023.Q1)

最近更新的博客 【华为OD机试模拟题】用 C++ 实现 - 去重求和(2023.Q1) 文章目录 最近更新的博客使用说明IPv4 地址转换成整数题目输入输出示例一输入输出说明示例一输入输出说明Code使用说明 参加华为od机试,一定要注意不要完全背诵代码,需要理解之后模仿写出,...

闭包与高阶函数

文中内容均来自于曾探《JavaScript设计模式与开发实践》的学习笔记。闭包作用域变量的作用域&#xff0c;就是指变量的有效范围。局部变量、全局变量。变量的搜索是从内到外而非从外到内的。变量的生命周期对于全局变量莱索&#xff0c;全局变量的生命周期是永久的&#xff0c;…...

人工智能轨道交通行业周刊-第35期(2023.2.20-2.26)

本期关键词&#xff1a;重庆智慧轨道、智能运维主机、标准轨距、地方铁路公报、景深、机器视觉应用 1 整理涉及公众号名单 1.1 行业类 RT轨道交通人民铁道世界轨道交通资讯网铁路信号技术交流北京铁路轨道交通网上榜铁路视点ITS World轨道交通联盟VSTR铁路与城市轨道交通Rai…...

快慢指针判断链表是否有环

快慢指针判断链表是否有环 单链表有可能存在环&#xff0c;有些情况下要判断一个单链表是否有环。数组的有个快慢指针的方法&#xff0c;其实单链表和数组有相似的地方&#xff0c;可以使用快慢指针的方法。具体做法如下&#xff1a; 首先创建两个指针&#xff0c;它们初始时…...

《MongoDB入门教程》第26篇 聚合统计之$max/$min表达式

本文将会介绍两个 MongoDB 表达式&#xff0c;返回一组数据中最大值的 $max 表达式&#xff0c;以及返回一组数据中最小值的 $min 表达式。 $max 表达式 $max 表达式用于返回一组数据中的最大值&#xff0c;语法如下&#xff1a; { $max: <expression> }$max 表达式在…...

FPGA纯verilog解码SDI视频 纯逻辑资源实现 提供2套工程源码和技术支持

目录1、前言2、硬件电路解析SDI摄像头Gv8601a单端转差GTX解串SDI解码VGA时序恢复YUV转RGB图像输出FDMA图像缓存HDMI输出3、工程1详解&#xff1a;无缓存输出4、工程2详解&#xff1a;缓存3帧输出5、上板调试验证并演示6、福利&#xff1a;工程代码的获取1、前言 FPGA实现SDI视…...

JVM篇之垃圾回收

一.如何判断对象可以回收 1.引用计数法 只要一个对象被其他变量所引用&#xff0c;就让它的计数加1&#xff0c;被引用了两次就让它的计数变成2&#xff0c;当这个变量的计数变成0时&#xff0c;就可以被垃圾回收&#xff1b; 弊端&#xff1a;当出现如下图的情况&#xff0…...

尝试用程序计算Π(3.141592653......)

文章目录1. π\piπ2. 用微积分来计算π\piπ2.1 原理2.2 代码2.3 结果2.4 分析1. π\piπ π\piπ的重要性或者地位不用多说&#xff0c;有时候还是很好奇&#xff0c;精确地π\piπ值是怎么计算出来的。研究π\piπ的精确计算应该是很多数学家计算机科学家努力的方向&#xf…...

【异常检测三件套】系列3--时序异常检测综述

写在前面: 异常检测共包含3个内容,从多个方面剖析异常检测方法,本文为第三篇。过往内容请查看以下链接: 【异常检测三件套】系列1--14种异常检测算法https://blog.csdn.net/allein_STR/article/details/128114175?csdn_share_tail=%7B%22type%22%3A%22blog%22%2C%22rType%…...

关于SAP 错误日志解析

有时候启动或操作sap会出现故障&#xff0c;只是察看sap用户当前目录下的日志文件可能不得要领&#xff0c;此时有必要察看work目录下的一些trace. 以Linux系统为例&#xff0c;其他的也差不多。 instance说明 如下 DVEBMGS?? ABAP Central Instance D?? …...

java:自定义变量加载到系统变量后替换shell模版并执行shell

这里的需求前提是&#xff0c;在项目中进行某些操作前&#xff0c;需要在命令后对shell配置文件的进行修改&#xff08;如ip、port&#xff09;&#xff0c;这个对于用户是不友好的&#xff0c;需要改为用户页面输入ip、port&#xff0c;后台自动去操作修改配置&#xff1b;那么…...

Redis高级删除策略与数据淘汰

第二章&#xff1a;Redis高级 学习目标 目标1&#xff1a;能够说出redis中的数据删除策与略淘汰策略 目标2&#xff1a;能够说出主从复制的概念&#xff0c;工作流程以及场景问题及解决方案 目标3&#xff1a;能够说出哨兵的作用以及工作原理&#xff0c;以及如何启用哨兵 …...

社畜大学生的Python之pandas学习笔记,保姆入门级教学

接上期&#xff0c;上篇介绍了 NumPy&#xff0c;本篇介绍 pandas。 目录 pandas 入门pandas 的数据结构介绍基本功能汇总和计算描述统计处理缺失数据层次化索引 pandas 入门 Pandas 是基于 Numpy 构建的&#xff0c;让以 NumPy 为中心的应用变的更加简单。 Pandas是基于Numpy…...

20_FreeRTOS低功耗模式

目录 低功耗模式简介 STM32低功耗模式 Tickless模式详解 Tickless模式相关配置 实验源码 低功耗模式简介 很多应用场合对于功耗的要求很严格,比如可穿戴低功耗产品、物联网低功耗产品等。 一般MCU都有相应的低功耗模式,裸机开发时可以使用MCU的低功耗模式。 FreeRTOS也…...

Hive的使用方式

操作Hive可以在Shell命令行下操作&#xff0c;或者是使用JDBC代码的方式操作 针对命令行这种方式&#xff0c;其实还有两种使用 第一个是使用bin目录下的hive命令&#xff0c;这个是从hive一开始就支持的使用方式 后来又出现一个beeline命令&#xff0c;它是通过HiveServer2服…...

Flume三大核心组件

Flume的三大核心组件&#xff1a; Source&#xff1a;数据源 Channel&#xff1a;临时存储数据的管道 Sink&#xff1a;目的地 Source&#xff1a;数据源&#xff1a;通过source组件可以指定让Flume读取哪里的数据&#xff0c;然后将数据传递给后面的 channel Flume内置支持读…...

数据结构(六)二叉树

一、树形结构概念树是一种非线性的数据结构&#xff0c;它是由n&#xff08;n>0&#xff09;个有限结点组成一个具有层次关系的集合。把它叫做树是因为它看起来像一棵倒挂的树&#xff0c;也就是说它是根朝上&#xff0c;而叶朝下的。它具有以下的特点&#xff1a;1、有一个…...

Docker buildx 的跨平台编译

docker buildx 默认的 docker build 命令无法完成跨平台构建任务&#xff0c;我们需要为 docker 命令行安装 buildx 插件扩展其功能。buildx 能够使用由 Moby BuildKit 提供的构建镜像额外特性&#xff0c;它能够创建多个 builder 实例&#xff0c;在多个节点并行地执行构建任…...

【java基础】方法重载和方法重写

文章目录方法重载方法重写方法重载 方法重载就是可以在一个类里面定义多个相同名称的方法&#xff0c;只需要参数列表的个数或者类型不同就行。 public class Overload {public int add(int a, int b) {return a b;}public double add(double a, double b) {return a b;}}对…...

OpenLayers 可视化之热力图

注&#xff1a;当前使用的是 ol 5.3.0 版本&#xff0c;天地图使用的key请到天地图官网申请&#xff0c;并替换为自己的key 热力图&#xff08;Heatmap&#xff09;又叫热点图&#xff0c;是一种通过特殊高亮显示事物密度分布、变化趋势的数据可视化技术。采用颜色的深浅来显示…...

.Net框架,除了EF还有很多很多......

文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...

1.3 VSCode安装与环境配置

进入网址Visual Studio Code - Code Editing. Redefined下载.deb文件&#xff0c;然后打开终端&#xff0c;进入下载文件夹&#xff0c;键入命令 sudo dpkg -i code_1.100.3-1748872405_amd64.deb 在终端键入命令code即启动vscode 需要安装插件列表 1.Chinese简化 2.ros …...

华为OD机试-食堂供餐-二分法

import java.util.Arrays; import java.util.Scanner;public class DemoTest3 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseint a in.nextIn…...

2025盘古石杯决赛【手机取证】

前言 第三届盘古石杯国际电子数据取证大赛决赛 最后一题没有解出来&#xff0c;实在找不到&#xff0c;希望有大佬教一下我。 还有就会议时间&#xff0c;我感觉不是图片时间&#xff0c;因为在电脑看到是其他时间用老会议系统开的会。 手机取证 1、分析鸿蒙手机检材&#x…...

C++.OpenGL (10/64)基础光照(Basic Lighting)

基础光照(Basic Lighting) 冯氏光照模型(Phong Lighting Model) #mermaid-svg-GLdskXwWINxNGHso {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-GLdskXwWINxNGHso .error-icon{fill:#552222;}#mermaid-svg-GLd…...

HTML前端开发:JavaScript 获取元素方法详解

作为前端开发者&#xff0c;高效获取 DOM 元素是必备技能。以下是 JS 中核心的获取元素方法&#xff0c;分为两大系列&#xff1a; 一、getElementBy... 系列 传统方法&#xff0c;直接通过 DOM 接口访问&#xff0c;返回动态集合&#xff08;元素变化会实时更新&#xff09;。…...

mac:大模型系列测试

0 MAC 前几天经过学生优惠以及国补17K入手了mac studio,然后这两天亲自测试其模型行运用能力如何&#xff0c;是否支持微调、推理速度等能力。下面进入正文。 1 mac 与 unsloth 按照下面的进行安装以及测试&#xff0c;是可以跑通文章里面的代码。训练速度也是很快的。 注意…...

DBLP数据库是什么?

DBLP&#xff08;Digital Bibliography & Library Project&#xff09;Computer Science Bibliography是全球著名的计算机科学出版物的开放书目数据库。DBLP所收录的期刊和会议论文质量较高&#xff0c;数据库文献更新速度很快&#xff0c;很好地反映了国际计算机科学学术研…...

Spring AI中使用ChatMemory实现会话记忆功能

文章目录 1、需求2、ChatMemory中消息的存储位置3、实现步骤1、引入依赖2、配置Spring AI3、配置chatmemory4、java层传递conversaionId 4、验证5、完整代码6、参考文档 1、需求 我们知道大型语言模型 &#xff08;LLM&#xff09; 是无状态的&#xff0c;这就意味着他们不会保…...