当前位置: 首页 > news >正文

RabbitMQ七种工作模式之 RPC通信模式, 发布确认模式

文章目录

  • 六. RPC(RPC通信模式)
    • 客户端
    • 服务端
  • 七. Publisher Confirms(发布确认模式)
    • 1. Publishing Messages Individually(单独确认)
    • 2. Publishing Messages in Batches(批量确认)
    • 3. Handling Publisher Confirms Asynchronously(异步确认)

六. RPC(RPC通信模式)

在这里插入图片描述
在这里插入图片描述

  1. 客⼾端发送消息到⼀个指定的队列, 并在消息属性中设置replyTo字段, 这个字段指定了⼀个回调队列, ⽤于接收服务端的响应.
  2. 服务端接收到请求后, 处理请求并发送响应消息到replyTo指定的回调队列
  3. 客⼾端在回调队列上等待响应消息. ⼀旦收到响应,客⼾端会检查消息的correlationId属性,以确保它是所期望的响应

公共代码:

 	public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";

客户端

客户端需要完成两件事, 发送请求, 接收响应
发送请求:

//发送请求://声明队列channel.queueDeclare(Common.RPC_REQUEST_QUEUE, true, false, false, null);channel.queueDeclare(Common.RPC_RESPONSE_QUEUE, true, false, false, null);//定义回调队列String replyQueueName = Common.RPC_RESPONSE_QUEUE;//本次请求的唯一标识String corrId = UUID.randomUUID().toString();//生成发送消息的属性AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();//通过内置交换机, 发送消息String message = "hello, rpc......";channel.basicPublish("", Common.RPC_REQUEST_QUEUE, props, message.getBytes(StandardCharsets.UTF_8));

接收响应:

 //接收响应://使用阻塞队列来存储回调结果, 避免了客户端反复访问队列final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);//接收服务器的响应DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));//如果唯一标识正确, 放在阻塞队列中if(properties.getCorrelationId().equals(corrId)){response.offer(new String(body));}}};channel.basicConsume(replyQueueName, true, consumer);//获取回调的结果String result = response.take();System.out.println("[RPCClient] Result: " + result);

服务端

在这里插入图片描述

		//设置同时最多只能获取一个消息channel.basicQos(1);//接收消息, 并对消息进行应答DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();String message = new String(body);String response = "接收到消息 request: " + message;channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes(StandardCharsets.UTF_8));//对消息进行应答channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(Common.RPC_REQUEST_QUEUE, false, consumer);//自动应答设置成false, 在成功回调后, 再进行手动应答

在这里插入图片描述

七. Publisher Confirms(发布确认模式)

在这里插入图片描述

Publisher Confirms模式是RabbitMQ提供的⼀种确保消息可靠发送到RabbitMQ服务器的机制。在这种模式下,⽣产者可以等待RabbitMQ服务器的确认,以确保消息已经被服务器接收并处理.

  1. ⽣产者将Channel设置为confirm模式(通过调⽤channel.confirmSelect()完成)后, 发布的每⼀条消息都会获得⼀个唯⼀的ID, ⽣产者可以将这些序列号与消息关联起来,以便跟踪消息的状态.
  2. 当消息被RabbitMQ服务器接收并处理后,服务器会异步地向⽣产者发送⼀个确认(ACK)给⽣产者(包含消息的唯⼀ID),表明消息已经送达.
    通过Publisher Confirms模式,⽣产者可以确保消息被RabbitMQ服务器成功接收, 从⽽避免消息丢失的问题.
    适⽤场景: 对数据安全性要求较⾼的场景. ⽐如⾦融交易, 订单处理

⽣产者将信道设置成confirm(确认)模式, ⼀旦信道进⼊confirm模式, 所有在该信道上⾯发布的消息都会被指派⼀个唯⼀的ID(从1开始), ⼀旦消息被投递到所有匹配的队列之后, RabbitMQ就会发送⼀个确认给⽣产者(包含消息的唯⼀ID), 这就使得⽣产者知道消息已经正确到达⽬的队列了, 如果消息和队列是可持久化的, 那么确认消息会在将消息写⼊磁盘之后发出. broker回传给⽣产者的确认消息中deliveryTag 包含了确认消息的序号, 此外 broker 也可以设置channel.basicAck⽅法中的multiple参数, 表⽰到这个序号之前的所有消息都已经得到了处理.
在这里插入图片描述

使⽤发送确认机制, 必须要信道设置成confirm(确认)模式

发布确认有3种策略:

1. Publishing Messages Individually(单独确认)

public static void publishingMessagesIndividually() throws IOException, TimeoutException, InterruptedException {try(Connection connection = createConnection()){//创建channelChannel channel = connection.createChannel();//开启信道确认模式channel.confirmSelect();channel.queueDeclare(Common.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null);Long start = System.currentTimeMillis();for(int i = 0; i < MESSAGE_COUNT; i++){String body = "消息" + i;channel.basicPublish("", Common.PUBLISHER_CONFIRMS_QUEUE1, null, body.getBytes(StandardCharsets.UTF_8));//等待确认消息, 只要消息被确认, 这个方法就会被返回//如果超时过期, 则抛出TimeoutException, 如果任何消息被nack(丢失), waitForConfirmsOrDie将抛出IOExceptionchannel.waitForConfirmsOrDie();}Long end = System.currentTimeMillis(5000);System.out.printf("Published %d message individually in %d ms", MESSAGE_COUNT, end - start);}}

在这里插入图片描述

2. Publishing Messages in Batches(批量确认)

    private static void publishingMessagesInBatches() throws IOException, TimeoutException, InterruptedException {try(Connection connection = createConnection()){Channel channel = connection.createChannel();channel.confirmSelect();channel.queueDeclare(Common.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);//批量个数int batchSize = 100;int outstandingMessageCount = 0;long start = System.currentTimeMillis();for(int i = 0; i < MESSAGE_COUNT; i++){String body = "消息" + i;channel.basicPublish("", Common.PUBLISHER_CONFIRMS_QUEUE2, null, body.getBytes(StandardCharsets.UTF_8));outstandingMessageCount++;if(outstandingMessageCount == batchSize){channel.waitForConfirmsOrDie(5000);outstandingMessageCount = 0;}}if(outstandingMessageCount > 0){channel.waitForConfirms(5000);}long end = System.currentTimeMillis();System.out.printf("Published %d message batch in %d ms", MESSAGE_COUNT, end - start);}}

在这里插入图片描述
相⽐于单独确认策略, 批量确认极⼤地提升了confirm的效率, 缺点是出现Basic.Nack或者超时时, 我们不清楚具体哪条消息出了问题. 客⼾端需要将这⼀批次的消息全部重发, 这会带来明显的重复消息数量, 当消息经常丢失时,批量确认的性能应该是不升反降的

3. Handling Publisher Confirms Asynchronously(异步确认)

异步confirm⽅法的编程实现最为复杂. Channel 接⼝提供了⼀个⽅法addConfirmListener. 这个⽅法可以添加ConfirmListener 回调接⼝.
ConfirmListener 接⼝中包含两个⽅法: handleAck(long deliveryTag, boolean multiple) 和 handleNack(long deliveryTag, boolean multiple) , 分别对应处理RabbitMQ发送给⽣产者的ack和nack.
deliveryTag 表⽰发送消息的序号. multiple 表⽰是否批量确认.
我们需要为每⼀个Channel 维护⼀个已发送消息的序号集合. 当收到RabbitMQ的confirm 回调时, 从集合中删除对应的消息. 当Channel开启confirm模式后, channel上发送消息都会附带⼀个从1开始递增的deliveryTag序号. 我们可以使⽤SortedSet 的有序性来维护这个已发消息的集合.

  1. 当收到ack时, 从序列中删除该消息的序号. 如果为批量确认消息, 表⽰⼩于等于当前序号deliveryTag的消息都收到了, 则清除对应集合
  2. 当收到nack时, 处理逻辑类似, 不过需要结合具体的业务情况, 进⾏消息重发等操作
private static void handlingPublisherConfirmsAsynchronously() throws IOException, TimeoutException, InterruptedException{try(Connection connection = createConnection()){Channel channel = connection.createChannel();channel.queueDeclare(Common.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null);channel.confirmSelect();//有序集合, 元素按照自然顺序进行排序, 存储未confirm消息序号SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {if(multiple){//批量confirmSet.headSet(deliveryTag + 1).clear();}else{confirmSet.remove(deliveryTag);}}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {if(multiple){//批量confirmSet.headSet(deliveryTag + 1).clear();}else{confirmSet.remove(deliveryTag);}//如果处理失败, 需要有消息重发的环节, 此处省略}});long start = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "消息" + i;long nextPublishSeqNo = channel.getNextPublishSeqNo();channel.basicPublish("", Common.PUBLISHER_CONFIRMS_QUEUE3, null, message.getBytes(StandardCharsets.UTF_8));confirmSet.add(nextPublishSeqNo);}while(!confirmSet.isEmpty()){Thread.sleep(10);}long end = System.currentTimeMillis();System.out.printf("Published %d message ConfirmsAsynchronously in %d ms", MESSAGE_COUNT, end - start);}}

在这里插入图片描述

相关文章:

RabbitMQ七种工作模式之 RPC通信模式, 发布确认模式

文章目录 六. RPC(RPC通信模式)客户端服务端 七. Publisher Confirms(发布确认模式)1. Publishing Messages Individually(单独确认)2. Publishing Messages in Batches(批量确认)3. Handling Publisher Confirms Asynchronously(异步确认) 六. RPC(RPC通信模式) 客⼾端发送消息…...

并非传统意义上的整体二分

是的&#xff0c;如标题所见&#xff0c;本文章会以作者所理解的整体二分思想来介绍一系列整体二分食用方法。 一下内容均是作者本人理解&#xff0c;可能会与算法本身冲突。 1 本质 1.1 板子及从中的启发 我们在做主席树板子的时候&#xff0c;如果使用整体二分&#xff0…...

PostgreSQL的一主一从集群搭建部署 (同步)

一、实验环境 虚拟机名IP身份简称keep-postgres12-node1192.168.122.87主节点node1keep-postgres12-node2192.168.122.89备节点node2 二、安装数据库 源码包方式&#xff08;主&#xff09; 1、创建用户 [rootkeep-postgres12-node1 ~]# groupadd postgres [rootkeep-post…...

ios逆向某新闻 md5+aes

本期的案例比较简单&#xff0c;也许是ios逆向算法本来就比较简单的原因&#xff0c;所以前面我就多扯一些爬虫和逆向的东西。之前写的文章都是js逆向和android逆向的案例&#xff0c;这也是首篇ios的案例&#xff0c;所以会从入门开始讲起。 3大逆向对比 首先爬虫工程师大部…...

grpc的负载均衡

grpc的负载均衡分为client-side load balance和server-side load balance。 所谓的“客户端负载均衡”是指主调方调用被调方的时候&#xff0c;在grpc.DialContext里需要指定grpc.WithDefaultServiceConfig&#xff0c;这个DefaultServiceConfig默认是用pick-first策略。也支持…...

提升搜索体验!—— 推出 Elastic Rerank 模型(技术预览版)

作者&#xff1a;来自 Elastic Shubha Anjur Tupil 几分钟内即可开始使用 Elastic Rerank 模型&#xff1a;强大的语义搜索功能&#xff0c;无需重新索引&#xff0c;提供灵活性和成本控制&#xff1b;高相关性、顶级性能和文本搜索效率。 使用我们全新的先进跨编码器 Elastic …...

【51单片机】程序实验1112.外部中断-定时器中断

主要参考学习资料&#xff1a;B站【普中官方】51单片机手把手教学视频 前置知识&#xff1a;C语言 单片机套装&#xff1a;普中STC51单片机开发板A4标准版套餐7 码字不易&#xff0c;求点赞收藏加关注(•ω•̥) 有问题欢迎评论区讨论~ 目录 程序实验11&12.外部中断-定时器…...

webrtc-java:引领Java进入实时通信新时代

webrtc-java&#xff1a;引领Java进入实时通信新时代 项目地址:https://gitcode.com/gh_mirrors/we/webrtc-java 在现代互联网应用中&#xff0c;实时通信&#xff08;Real-Time Communication, RTC&#xff09;已成为连接人们的桥梁。而说起RTC技术的先锋&#xff0c;不得不…...

TongWeb7-东方通快速使用手册

TongWeb7-东方通 快速使用手册 文章目录 第1章 TongWeb7 产品介绍 1.1 概述1.2 规范支持 第2章 TongWeb7 安装 2.1 TongWeb7 安装要求 2.1.1 TongWeb7 支持的操作系统2.1.2 系统要求2.1.3 其他 2.2 安装TongWeb72.3TongWeb7 目录结构说明2.4 TongWeb7 的启动和停止 第3章 应用…...

JVM内存区块

大家好&#xff0c;经过前两篇文章的介绍&#xff0c;大家对数组也有了一定了解&#xff0c;其实所有的数组都是对象&#xff0c;我们在方法中引用数组的变量叫做引用变量&#xff08;简称引用&#xff09;&#xff0c;那么数组到底是存放在哪里的呢&#xff0c;为什么引用再出…...

C语言单元总结

黑色加粗表示刷题刷到这样的题 红色加粗表示可能重要 单元一 程序设计宏观认识 C语言程序框架 C语言程序最基本的程序框架由两部分构成&#xff0c;分别是 1) 编译预处理 2) 函数组 C语言程序构成 C程序最大的特点就是所有的程序都是用函数来装配的&#xff0c;函数是构成…...

通过PS和Unity制作2D动画之一:创建形象

1、通过路径画出轮廓 使用路径的过程中&#xff0c;需要注意&#xff1a; 1&#xff09;如果使用形状工具作图&#xff0c;比如使用椭圆工具画正圆形&#xff0c;需要设置其属性为“路径”。 2&#xff09;使用路径选择工具&#xff0c;再按住Alt键点击某个路径&#xff0c;可…...

Notable是一款优秀开源免费的Markdown编辑器

一、Notable简介 ‌ Notable‌是一款开源的跨平台Markdown编辑器&#xff0c;支持Linux、MacOS、Windows以及国产操作系统等多种主流操作系统。它以其高颜值和强大的功能&#xff0c;成为了许多用户的首选工具。 主要特性 实时预览‌&#xff1a; Notable提供了实时预览功能&…...

基于MFC绘制门电路

MFC绘制门电路 1. 设计内容、方法与难点 本课题设计的内容包括了基本门电路中与门和非门的绘制、选中以及它们之间的连接。具体采用的方法是在OnDraw函数里面进行绘制&#xff0c;并设计元器件基类&#xff0c;派生出与门和非门&#xff0c;并组合了一个引脚类&#xff0c;在…...

C—指针初阶(2)

如果看完阁下满意的话&#xff0c;能否一键三连呢&#xff0c;我的动力就是大家的支持与肯定&#xff0c;冲&#xff01; 二级指针 我们先看概念以及作用&#xff1a;用来存放一级指针的地址的指针 先看例子&#xff0c;我们逐一分析 我们先分析上面那个“1” 标注那里&#x…...

Linux 基础环境的开发工具以及使用(下)

1. make / Makefile 自动化构建的工具 1&#xff09;引入 在我们进行一些大型的工程的时候&#xff0c;代码量是极其大&#xff0c;当我们代码在进行一系列的编译的时候&#xff0c;难免会出现一些错误&#xff0c;当我们对错误进行一系列的更改之后&#xff0c;难道我们需要…...

constexpr、const和 #define 的比较

constexpr、const 和 #define 的比较 一、定义常量 constexpr 定义&#xff1a;constexpr用于定义在编译期可求值的常量表达式。示例&#xff1a;constexpr int x 5;这里&#xff0c;x的值在编译期就确定为5。 const 定义&#xff1a;const表示变量在运行期间不能被修改&…...

期末复习-Hadoop综合复习

说明 以下内容仅供参考&#xff0c;提到不代表考到&#xff0c;请结合实际情况自己复习 目录 说明 一、题型及分值 二、综合案例题-部署Hadoop集群 或 部署Hadoop HA集群 案例 1&#xff1a;Hadoop 基础集群部署 案例 2&#xff1a;Hadoop HA 集群部署 案例 3&#xff…...

禁用SAP Hana错误密码锁定用户功能

背景 公司项目适配多种数据库其中包含SAP Hana&#xff0c;由于有同事的数据库连接工具保存了某个在用的数据库的旧密码&#xff0c;导致时不时会被锁用户。通过查询官方文档已解决&#xff0c;这里统一记录一下。 禁用密码锁定方法 以下按系统管理员和普通用户的解法分别列…...

Ubuntu 22.04加Windows AD域

说明&#xff1a;   Ubuntu 22.04系统通过realmd&#xff0c;sssd加入到 Active Directory 域&#xff0c;并为域用户配置sudo权限。同时为方便用户使用为Ubuntu系统安装wps与sogou中文输入法。 1. Ubuntu 22.04加入Windows AD域 1.1 首先配置网络&#xff0c;Ubuntu系统能…...

华为云AI开发平台ModelArts

华为云ModelArts&#xff1a;重塑AI开发流程的“智能引擎”与“创新加速器”&#xff01; 在人工智能浪潮席卷全球的2025年&#xff0c;企业拥抱AI的意愿空前高涨&#xff0c;但技术门槛高、流程复杂、资源投入巨大的现实&#xff0c;却让许多创新构想止步于实验室。数据科学家…...

挑战杯推荐项目

“人工智能”创意赛 - 智能艺术创作助手&#xff1a;借助大模型技术&#xff0c;开发能根据用户输入的主题、风格等要求&#xff0c;生成绘画、音乐、文学作品等多种形式艺术创作灵感或初稿的应用&#xff0c;帮助艺术家和创意爱好者激发创意、提高创作效率。 ​ - 个性化梦境…...

Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动

一、前言说明 在2011版本的gb28181协议中&#xff0c;拉取视频流只要求udp方式&#xff0c;从2016开始要求新增支持tcp被动和tcp主动两种方式&#xff0c;udp理论上会丢包的&#xff0c;所以实际使用过程可能会出现画面花屏的情况&#xff0c;而tcp肯定不丢包&#xff0c;起码…...

江苏艾立泰跨国资源接力:废料变黄金的绿色供应链革命

在华东塑料包装行业面临限塑令深度调整的背景下&#xff0c;江苏艾立泰以一场跨国资源接力的创新实践&#xff0c;重新定义了绿色供应链的边界。 跨国回收网络&#xff1a;废料变黄金的全球棋局 艾立泰在欧洲、东南亚建立再生塑料回收点&#xff0c;将海外废弃包装箱通过标准…...

Frozen-Flask :将 Flask 应用“冻结”为静态文件

Frozen-Flask 是一个用于将 Flask 应用“冻结”为静态文件的 Python 扩展。它的核心用途是&#xff1a;将一个 Flask Web 应用生成成纯静态 HTML 文件&#xff0c;从而可以部署到静态网站托管服务上&#xff0c;如 GitHub Pages、Netlify 或任何支持静态文件的网站服务器。 &am…...

大学生职业发展与就业创业指导教学评价

这里是引用 作为软工2203/2204班的学生&#xff0c;我们非常感谢您在《大学生职业发展与就业创业指导》课程中的悉心教导。这门课程对我们即将面临实习和就业的工科学生来说至关重要&#xff0c;而您认真负责的教学态度&#xff0c;让课程的每一部分都充满了实用价值。 尤其让我…...

Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信

文章目录 Linux C语言网络编程详细入门教程&#xff1a;如何一步步实现TCP服务端与客户端通信前言一、网络通信基础概念二、服务端与客户端的完整流程图解三、每一步的详细讲解和代码示例1. 创建Socket&#xff08;服务端和客户端都要&#xff09;2. 绑定本地地址和端口&#x…...

Java求职者面试指南:Spring、Spring Boot、MyBatis框架与计算机基础问题解析

Java求职者面试指南&#xff1a;Spring、Spring Boot、MyBatis框架与计算机基础问题解析 一、第一轮提问&#xff08;基础概念问题&#xff09; 1. 请解释Spring框架的核心容器是什么&#xff1f;它在Spring中起到什么作用&#xff1f; Spring框架的核心容器是IoC容器&#…...

十九、【用户管理与权限 - 篇一】后端基础:用户列表与角色模型的初步构建

【用户管理与权限 - 篇一】后端基础:用户列表与角色模型的初步构建 前言准备工作第一部分:回顾 Django 内置的 `User` 模型第二部分:设计并创建 `Role` 和 `UserProfile` 模型第三部分:创建 Serializers第四部分:创建 ViewSets第五部分:注册 API 路由第六部分:后端初步测…...

对象回调初步研究

_OBJECT_TYPE结构分析 在介绍什么是对象回调前&#xff0c;首先要熟悉下结构 以我们上篇线程回调介绍过的导出的PsProcessType 结构为例&#xff0c;用_OBJECT_TYPE这个结构来解析它&#xff0c;0x80处就是今天要介绍的回调链表&#xff0c;但是先不着急&#xff0c;先把目光…...