RabbitMQ-客户端源码之AMQChannel
AMQChannel是一个抽象类,是ChannelN的父类。其中包含唯一的抽象方法:
/*** Protected API - called by nextCommand to check possibly handle an incoming Command before it is returned to the caller of nextCommand. If this method* returns true, the command is considered handled and is not passed back to nextCommand's caller; if it returns false, nextCommand returns the command as* usual. This is used in subclasses to implement handling of Basic.Return and Basic.Deliver messages, as well as Channel.Close and Connection.Close.* @param command the command to handle asynchronously* @return true if we handled the command; otherwise the caller should consider it "unhandled"*/
public abstract boolean processAsync(Command command) throws IOException;
有关processAsync()这个方法的会在介绍ChannelN类的时候详细阐述([[八]RabbitMQ-客户端源码之ChannelN][RabbitMQ-_ChannelN])。
首先来说下AMQChannel的成员变量:
protected final Object _channelMutex = new Object();
/** The connection this channel is associated with. */
private final AMQConnection _connection;
/** This channel's channel number. */
private final int _channelNumber;
/** Command being assembled */
private AMQCommand _command = new AMQCommand();
/** The current outstanding RPC request, if any. (Could become a queue in future.) */
private RpcContinuation _activeRpc = null;
/** Whether transmission of content-bearing methods should be blocked */
public volatile boolean _blockContent = false;
- _channelMutex这个是内部用来当对象锁的,没有实际的意义,可忽略
- _connection是指AMQConnection这个对象。
- _channelNumber是指channel number, 这个应该不用多解释了吧。通道编号为0的代表全局连接中的所有帧,1-65535代表特定通道的帧.
- _command是内部处理使用的对象,调用AMQCommand的方法来处理一些东西。
- _activeRpc是指当前未处理完的rpc请求(the current outstanding rpc request)。
- _blockContent 是在Channel.Flow里用到的,其余情况都是false
在AMQChannel的构造函数中,只有两个参数:AMQConnection connection以及int channelNumber.
AMQChannel中有个handleFrame方法:
/*** Private API - When the Connection receives a Frame for this* channel, it passes it to this method.* @param frame the incoming frame* @throws IOException if an error is encountered*/
public void handleFrame(Frame frame) throws IOException {AMQCommand command = _command;if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line_command = new AMQCommand(); // prepare for the next onehandleCompleteInboundCommand(command);}
}/*** Private API - handle a command which has been assembled* @throws IOException if there's any problem** @param command the incoming command* @throws IOException*/
public void handleCompleteInboundCommand(AMQCommand command) throws IOException {// First, offer the command to the asynchronous-command// handling mechanism, which gets to act as a filter on the// incoming command stream. If processAsync() returns true,// the command has been dealt with by the filter and so should// not be processed further. It will return true for// asynchronous commands (deliveries/returns/other events),// and false for commands that should be passed on to some// waiting RPC continuation.if (!processAsync(command)) {// The filter decided not to handle/consume the command,// so it must be some reply to an earlier RPC.nextOutstandingRpc().handleCommand(command);markRpcFinished();}
}
这个在[[六]RabbitMQ-客户端源码之AMQCommand][RabbitMQ-_AMQCommand]有所介绍,主要是用来处理Frame帧的,当调用AMQCommand的handleFrame处理之后返回为true是,即处理完毕时继续调用handleCompleteInboundCommand方法。这其中也牵涉到AMQConnection的MainLoop内部类,具体可以看看:[[六]RabbitMQ-客户端源码之AMQCommand][RabbitMQ-_AMQCommand]。
AMQChannel中有很多方法带有rpc的字样,这来做一个整理。
首先是:
public void enqueueRpc(RpcContinuation k)
{synchronized (_channelMutex) {boolean waitClearedInterruptStatus = false;while (_activeRpc != null) {try {_channelMutex.wait();} catch (InterruptedException e) {waitClearedInterruptStatus = true;}}if (waitClearedInterruptStatus) {Thread.currentThread().interrupt();}_activeRpc = k;}
}
这个方法在AMQConnection.start()方法中有过使用:_channel0.enqueueRpc(conStartBroker)。这个方法就是将参数付给成员变量_activeRpc,至于这个RpcContinuation到底是个什么gui,我们下面再讲。
继续下一个方法:
public boolean isOutstandingRpc()
{synchronized (_channelMutex) {return (_activeRpc != null);}
}
这个方法是判断一下当前的_activeRpc是否为null,为null则为false,否则为true。看方法的名字应该猜出大半。
下面一个方法:
public RpcContinuation nextOutstandingRpc()
{synchronized (_channelMutex) {RpcContinuation result = _activeRpc;_activeRpc = null;_channelMutex.notifyAll();return result;}
}
方法将当前的_activeRpc返回,并置AQMChannel的_activeRpc为null。
接下来几个方法联系性很强:
/*** Protected API - sends a {@link Method} to the broker and waits for the* next in-bound Command from the broker: only for use from* non-connection-MainLoop threads!*/
public AMQCommand rpc(Method m)throws IOException, ShutdownSignalException
{return privateRpc(m);
}public AMQCommand rpc(Method m, int timeout)throws IOException, ShutdownSignalException, TimeoutException {return privateRpc(m, timeout);
}private AMQCommand privateRpc(Method m)throws IOException, ShutdownSignalException
{SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation();rpc(m, k);// At this point, the request method has been sent, and we// should wait for the reply to arrive.//// Calling getReply() on the continuation puts us to sleep// until the connection's reader-thread throws the reply over// the fence.return k.getReply();
}private AMQCommand privateRpc(Method m, int timeout)throws IOException, ShutdownSignalException, TimeoutException {SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation();rpc(m, k);return k.getReply(timeout);
}public void rpc(Method m, RpcContinuation k)throws IOException
{synchronized (_channelMutex) {ensureIsOpen();quiescingRpc(m, k);}
}public void quiescingRpc(Method m, RpcContinuation k)throws IOException
{synchronized (_channelMutex) {enqueueRpc(k);quiescingTransmit(m);}
}
主要是看最后一个方法——quiescingRpc.这个方法说白就两行代码:
enqueueRpc(k);是将由privateRpc等方法内部创建的SimpleBlockingRpcContinuation对象附给当前的AQMChannel对象的成员变量_activeRpc
关于quiescingTransmit(m)就要接下去看了:
public void quiescingTransmit(Method m) throws IOException {synchronized (_channelMutex) {quiescingTransmit(new AMQCommand(m));}
}
public void quiescingTransmit(AMQCommand c) throws IOException {synchronized (_channelMutex) {if (c.getMethod().hasContent()) {while (_blockContent) {try {_channelMutex.wait();} catch (InterruptedException e) {}// This is to catch a situation when the thread wakes up during// shutdown. Currently, no command that has content is allowed// to send anything in a closing state.ensureIsOpen();}}c.transmit(this);}
}
上面代码只需要看: c.transmit(this);这一句,其余的都是摆设。看到这里,就调用了AMQCommand的transmit方法,这个transmit方法就是讲AMQChannel中封装的内容发给broker,然后等待broker返回,进而通过之前附值的_activeRpc来处理回传的帧。
虽然之前在AMQConnection([[二]RabbitMQ-客户端源码之AMQConnection][RabbitMQ-_AMQConnection])中详细讲述了start()方法,但是这里还是要来拿这个来举例这个AMQChannel中的rpc怎么使用
在AMQConnection中有这么一段代码:
Method method = (challenge == null)? new AMQP.Connection.StartOk.Builder().clientProperties(_clientProperties).mechanism(sm.getName()).response(response).build(): new AMQP.Connection.SecureOk.Builder().response(response).build();try {Method serverResponse = _channel0.rpc(method, HANDSHAKE_TIMEOUT/2).getMethod();if (serverResponse instanceof AMQP.Connection.Tune) {connTune = (AMQP.Connection.Tune) serverResponse;} else {challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();response = sm.handleChallenge(challenge, this.username, this.password);}
客户端将Method封装成Connection.StartOk帧之后等待broker返回Connection.Tune帧。
此时调用了AMQChannel的rpc(Method m, int timeout)方法,其间接调用了AMQChannel的privateRpc(Method, int timeout)方法。代码详情上面已经罗列出来。
注意privateRpc(Method, int timeout)方法的最有一句返回:return k.getReply(timeout);这句代码的意思是SimpleBlockingRpcContinuation对象在等待broker的返回,确切的来说是MainLoop线程处理之后返回,即AMQChannel类中handleCompleteInboundCommand方法的nextOutstandingRpc().handleCommand(command)这行代码。
相关文章:
RabbitMQ-客户端源码之AMQChannel
AMQChannel是一个抽象类,是ChannelN的父类。其中包含唯一的抽象方法: /*** Protected API - called by nextCommand to check possibly handle an incoming Command before it is returned to the caller of nextCommand. If this method* returns true…...
注意力机制(SE,ECA,CBAM) Pytorch代码
注意力机制1 SENet2 ECANet3 CBAM3.1 通道注意力3.2 空间注意力3.3 CBAM4 展示网络层具体信息1 SENet SE注意力机制(Squeeze-and-Excitation Networks):是一种通道类型的注意力机制,就是在通道维度上增加注意力机制,主要内容是是…...
Vue2笔记03 脚手架(项目结构),常用属性配置,ToDoList(本地存储,组件通信)
Vue脚手架 vue-cli 向下兼容可以选择较高版本 初始化 全局安装脚手架 npm install -g vue/cli 创建项目:切换到项目所在目录 vue create xxx 按照指引选择vue版本 创建成功 根据指引依次输入上面指令即可运行项目 也可使用vue ui在界面上完成创建&…...
Java程序的执行顺序、简述对线程池的理解
点个关注,必回关 文章目录一、Java程序是如何执行的二、合理利用线程池能够带来三个好处一、Java程序是如何执行的 我们日常的工作中都使用开发工具(IntelliJ IDEA 或 Eclipse 等)可以很方便的调试程序,或者是通 过打包工具把项目…...
【前言】嵌入式系统简介
随手拍拍💁♂️📷 日期: 2022.12.01 地点: 杭州 介绍: 2022.11.30下午两点时,杭州下了一场特别大的雪。隔天的12月路过食堂时,边上的井盖上发现了这个小雪人。此时边上的雪已经融化殆尽,只有这个雪人依旧维持着原状⛄…...
React设计原理—1框架原理
阅读前须知 本文是笔者学习卡颂的《React设计原理》的读书笔记,对书中有价值内容以Q&A方式进行呈现,同时结合了自己的理解🤔阅读时推荐先看问题,想想自己的答案,再和答案比对一下本文属于前端框架科普,…...
(C00034)基于Springboot+html前后端分离技术的宿舍管理系统-有文档
基于Springboothtml技术的宿舍管理系统-有文档项目简介项目获取开发环境项目技术运行截图项目简介 基于Springboothtml的前后端分离技术的宿舍管理系统项目为了方便对学生宿舍进行管理而设计,分为后勤、宿管、学生三种用户,后勤对整体宿舍进行管理、宿管…...
Flink面试题
一 基础篇Flink的执行图有哪几种?分别有什么作用Flink中的执行图一般是可以分为四类,按照生成顺序分别为:StreamGraph-> JobGraph-> ExecutionGraph->物理执行图。1)StreamGraph顾名思义,这里代表的是我们编写…...
Python学习笔记
前言:又从仓库翻出来了一些以前总结的文档,以下内容是我初学Python时网上找的或是图书馆借书抄写的笔记,现在再看有点零散不成体系,但是也还是纪念一下子吧。 Python学习笔记 对于初学编程的人来说,Python可以缩短编…...
最适合入门的100个深度学习实战项目
🚨注意🚨:最近经粉丝反馈,发现有些订阅者将此专栏内容进行二次售卖,特在此声明,本专栏内容仅供学习,不得以任何方式进行售卖,未经作者许可不得对本专栏内容行使发表权、署名权、修改…...
AssertionError: 618 columns passed, passed data had 508 columns【已解决】
问题描述 程序中断,报错如下AssertionError: 618 columns passed, passed data had 508 columns Exception has occurred: ValueError 618 columns passed, passed data had 508 columns AssertionError: 618 columns passed, passed data had 508 columnsThe abo…...
166_技巧_Power BI 窗口函数处理连续发生业务问题
166_技巧_Power BI 窗口函数处理连续发生业务问题 一、背景 在生产经营的数据监控中,会有一类指标需要监控是否连续发生,从而根据其在设定区间中的连续频次来评价业务。 例如: 员工连续迟到天数。销售金额连续上升或者下降。用户连续登陆…...
电子科技大学人工智能期末复习笔记(五):机器学习
目录 前言 监督学习 vs 无监督学习 回归 vs 分类 Regression vs Classification 训练集 vs 测试集 vs 验证集 泛化和过拟合 Generalization & Overfitting 线性分类器 Linear Classifiers 激活函数 - 概率决策 ⚠线性回归 决策树 Decision Trees 决策树构建递归…...
使用DDD指导业务设计的总结思考
领域驱动设计(DDD) 是 Eric Evans 提出的一种软件设计方法和思想,主要解决业务系统的设计和建模。DDD 有大量难以理解的概念,尤其是翻译的原因,某些词汇非常生涩,例如:模型、限界上下文、聚合、…...
面试官问:如何确保缓存和数据库的一致性?
如果你对这个问题有过研究,应该可以发现这个问题其实很好回答,如果第一次听到或者第一次遇到这个问题,估计会有点懵,今天我们来聊聊这个话题。 1、问题分析 首先我们来看看为什么会有这个问题! 我们在日常开发中&am…...
16.数据库Redis
一、基本概念 Redis(Remote Dictionary Server)译为“远程字典服务”,它是一款基于内存实现的键值型 NoSQL 数据库, 通常也被称为数据结构服务器,这是因为它可以存储多种数据类型,比如 string(字…...
【Redis高级-集群分片】
单机安装Redis首先需要安装Redis所需要的依赖:yum install -y gcc tclRedis安装包上传到虚拟机的任意目录:我放到了/tmp目录:解压缩:tar -zxvf /tmp/redis-6.2.4.tar.gz -C /tmp解压后:进入redis目录:cd /t…...
CSDN - CSDN27题解
文章目录幸运数字题目描述解题思路AC代码投篮题目描述解题思路AC代码通货膨胀-x国货币题目描述解题思路AC代码最后一位题目描述解题思路AC代码CSDN编程竞赛报名地址:https://edu.csdn.net/contest/detail/41 这次题目描述刚开始好像有些问题,之后被修正了…...
docker拉取mysql
搜索mysql版本docker search mysql搜索获赞数(星星数量) 大于 1000 的镜像docker search --filterstars1000 mysql搜索官方发布的版本docker search --filter is-officialtrue mysql搜索版本号docker search mysql57拉取docker pull devbeta/mysql57查看下载镜像docker images启…...
在Linux上安装Python3
记录:373场景:在CentOS 7.9操作系统上,安装Python-3.8.9环境。版本:JDK 1.8 Python-3.8.9官网地址:https://www.python.org下载地址:https://www.python.org/ftp/python/1.安装基础依赖1.1安装gcc(1)安装命…...
UE5 学习系列(二)用户操作界面及介绍
这篇博客是 UE5 学习系列博客的第二篇,在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下: 【Note】:如果你已经完成安装等操作,可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作,重…...
conda相比python好处
Conda 作为 Python 的环境和包管理工具,相比原生 Python 生态(如 pip 虚拟环境)有许多独特优势,尤其在多项目管理、依赖处理和跨平台兼容性等方面表现更优。以下是 Conda 的核心好处: 一、一站式环境管理:…...
以下是对华为 HarmonyOS NETX 5属性动画(ArkTS)文档的结构化整理,通过层级标题、表格和代码块提升可读性:
一、属性动画概述NETX 作用:实现组件通用属性的渐变过渡效果,提升用户体验。支持属性:width、height、backgroundColor、opacity、scale、rotate、translate等。注意事项: 布局类属性(如宽高)变化时&#…...
ardupilot 开发环境eclipse 中import 缺少C++
目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...
k8s业务程序联调工具-KtConnect
概述 原理 工具作用是建立了一个从本地到集群的单向VPN,根据VPN原理,打通两个内网必然需要借助一个公共中继节点,ktconnect工具巧妙的利用k8s原生的portforward能力,简化了建立连接的过程,apiserver间接起到了中继节…...
3403. 从盒子中找出字典序最大的字符串 I
3403. 从盒子中找出字典序最大的字符串 I 题目链接:3403. 从盒子中找出字典序最大的字符串 I 代码如下: class Solution { public:string answerString(string word, int numFriends) {if (numFriends 1) {return word;}string res;for (int i 0;i &…...
【开发技术】.Net使用FFmpeg视频特定帧上绘制内容
目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法,当前调用一个医疗行业的AI识别算法后返回…...
精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南
精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南 在数字化营销时代,邮件列表效度、用户参与度和网站性能等指标往往决定着创业公司的增长成败。今天,我们将深入解析邮件打开率、网站可用性、页面参与时…...
【Go语言基础【13】】函数、闭包、方法
文章目录 零、概述一、函数基础1、函数基础概念2、参数传递机制3、返回值特性3.1. 多返回值3.2. 命名返回值3.3. 错误处理 二、函数类型与高阶函数1. 函数类型定义2. 高阶函数(函数作为参数、返回值) 三、匿名函数与闭包1. 匿名函数(Lambda函…...
什么是VR全景技术
VR全景技术,全称为虚拟现实全景技术,是通过计算机图像模拟生成三维空间中的虚拟世界,使用户能够在该虚拟世界中进行全方位、无死角的观察和交互的技术。VR全景技术模拟人在真实空间中的视觉体验,结合图文、3D、音视频等多媒体元素…...
