RabbitMQ-客户端源码之AMQCommand
AMQCommand不是直接包含Method等成员变量的,而是通过CommandAssembler又做了一次封装。
接下来先看下CommandAssembler类。此类中有这些成员变量:
/** Current state, used to decide how to handle each incoming frame. */
private enum CAState {EXPECTING_METHOD, EXPECTING_CONTENT_HEADER, EXPECTING_CONTENT_BODY, COMPLETE
}
private CAState state;/** The method for this command */
private Method method;/** The content header for this command */
private AMQContentHeader contentHeader;/** The fragments of this command's content body - a list of byte[] */
private final List bodyN;
/** sum of the lengths of all fragments */
private int bodyLength;/** No bytes of content body not yet accumulated */
private long remainingBodyBytes;
- CAState state标识这此Command目前的状态,是准备处理Method(EXPECTING_METHOD),还是处理Content header(EXPECTING_CONTENT_HEADER),还是准备处理Content body(EXPECTING_CONTENT_BODY),还是以及完成了(COMPLETE)。
- Method method代表type=Method的AMQP帧
- AMQContentHeader contentHeader代表type=Content header的AMQP帧
- final List bodyN代表type=Content body的AMQP帧,就是真正的消息体(Message body)。
- bodyLength就是消息体大小
这个类中除了构造函数,getMethod, getContentHeader, getContentBody,isComplete这个几个方法,最关键的方法就是:
public synchronized boolean handleFrame(Frame f) throws IOException
{switch (this.state) {case EXPECTING_METHOD: consumeMethodFrame(f); break;case EXPECTING_CONTENT_HEADER: consumeHeaderFrame(f); break;case EXPECTING_CONTENT_BODY: consumeBodyFrame(f); break;default:throw new AssertionError("Bad Command State " + this.state);}return isComplete();
}
这个方法主要是处理AQMP帧的,根据CAState state来处理相应状态类型的帧,然后赋值给相应的成员变量。
采用consumeMethodFrame(Frame f)方法举个例子:
private void consumeMethodFrame(Frame f) throws IOException {if (f.type == AMQP.FRAME_METHOD) {this.method = AMQImpl.readMethodFrom(f.getInputStream());this.state = this.method.hasContent() ? CAState.EXPECTING_CONTENT_HEADER : CAState.COMPLETE;} else {throw new UnexpectedFrameError(f, AMQP.FRAME_METHOD);}
}
这个方法首先判断当前帧是否是Method帧(AMQP.FRAME_METHOD),然后调用AMQPImp.readMethodFrom的方法。就那Connection.Start这个真来将,它会从socket的输入流中读取:
public Start(MethodArgumentReader rdr) throws IOException {this(rdr.readOctet(), rdr.readOctet(), rdr.readTable(), rdr.readLongstr(), rdr.readLongstr());
}
对应于下图:
- 第一个rdr.readOctet()是指Version-Magor:0
- 第二个rdr.readOctet()是指Version-Minor:9
- 第三个rdr.readTable()是指Server-Properties
- 第四个rdr.readLongstr()是指Mechanisms
- 第五个rdr.readLongstr()是指Locales
而MethodArgumentReader.readOctet()就是:
public final int readOctet()throws IOException
{clearBits();return in.readOctet();//in对象是DataInputStream对象
}
写到这里,思路再跳回来,知道了底层其实是Socket的DataInputStream,其上只是做了封装再封装
CommandAssembler 中的handleFrame这个方法只在AMQCommand中的:
private final CommandAssembler assembler;
public boolean handleFrame(Frame f) throws IOException {return this.assembler.handleFrame(f);
}
只在这个方法中调用。CommandAssembler只是对Method,Content-Header,Content-Body做了一下封装。下面继续回到AMQCommand这个类中来。
仔细阅读源码的同学会发现在handleFrame方法当遇到类似Basic.Publish时会有Method,Content-Header,Content-Body一起的报文,那么handleFrame处理完Method之后就直接返回了,没有完全处理完,这该如何是好?
这个就又要联系到AMQConnection中的MainLoop的内部类了。此类中的关键代码如下:
while (_running) {Frame frame = _frameHandler.readFrame();if (frame != null) {_missedHeartbeats = 0;if (frame.type == AMQP.FRAME_HEARTBEAT) {// Ignore it: we've already just reset the heartbeat counter.} else {if (frame.channel == 0) { // the special channel_channel0.handleFrame(frame);} else {if (isOpen()) {// If we're still _running, but not isOpen(), then we// must be quiescing, which means any inbound frames// for non-zero channels (and any inbound commands on// channel zero that aren't Connection.CloseOk) must// be discarded.ChannelManager cm = _channelManager;if (cm != null) {cm.getChannel(frame.channel).handleFrame(frame);}}}}} else {// Socket timeout waiting for a frame.// Maybe missed heartbeat.handleSocketTimeout();}
}
可以看到这是一个一直轮询读取Frame并处理Frame的过程。在遇到类似Basic.Publish这种带Method, Content-Header, Content-Body的类型的报文时,会循环处理,直到处理完成。注意这里的Method, Content-Header以及Content-Body都是看成单个Frame的,也就是这个while循环要三次,而不是将Basic.Publish看成一个帧。
上面调用的handleFrame方法是AMQChannel类中的(详细可以参考([[五]RabbitMQ-客户端源码之AMQChannel][RabbitMQ-_AMQChannel])):
public void handleFrame(Frame frame) throws IOException {AMQCommand command = _command;if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line_command = new AMQCommand(); // prepare for the next onehandleCompleteInboundCommand(command);}
}
可以看到只有当AMQCommand的handleFrame方法返回true时,即执行完成之后才会继续处理。
AMQCommand也有getMethod, getContentHeader, getContentBody等方法,这些都是间接调用CommandAssembler类中相应的方法的。
AMQCommand中也有个特别重要的方法:
/*** Sends this command down the named channel on the channel's* connection, possibly in multiple frames.* @param channel the channel on which to transmit the command* @throws IOException if an error is encountered*/
public void transmit(AMQChannel channel) throws IOException {int channelNumber = channel.getChannelNumber();AMQConnection connection = channel.getConnection();synchronized (assembler) {Method m = this.assembler.getMethod();connection.writeFrame(m.toFrame(channelNumber));if (m.hasContent()) {byte[] body = this.assembler.getContentBody();connection.writeFrame(this.assembler.getContentHeader().toFrame(channelNumber, body.length));int frameMax = connection.getFrameMax();int bodyPayloadMax = (frameMax == 0) ? body.length : frameMax- EMPTY_FRAME_SIZE;for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {int remaining = body.length - offset;int fragmentLength = (remaining < bodyPayloadMax) ? remaining: bodyPayloadMax;Frame frame = Frame.fromBodyFragment(channelNumber, body,offset, fragmentLength);connection.writeFrame(frame);}}}connection.flush();
}
这段主要通过传输AMQP帧的,通过AMQChannel获取到通信链路connection,然后将AMQCommand对象自身的method成员变量(或者包括content-header以及content-body)传送给broker。这段方法里还有判断payload大小是否超过broker端所设置的最大帧大小frameMax,即(frameMax == 0) ? body.length : frameMax - EMPTY_FRAME_SIZE这段代码。当frameMax=0时则没有大小限制,当frameMax不为0时则按照payload拆分成若干的payload然后发送多个FRAME_BODY帧。
相关文章:
RabbitMQ-客户端源码之AMQCommand
AMQCommand不是直接包含Method等成员变量的,而是通过CommandAssembler又做了一次封装。 接下来先看下CommandAssembler类。此类中有这些成员变量: /** Current state, used to decide how to handle each incoming frame. */ private enum CAState {EXP…...
linux设置登录失败处理功能(密码错误次数限制、pam_tally2.so模块)和操作超时退出功能(/etc/profile)
一、登录失败处理功能策略 1、登录失败处理功能策略(服务器终端) (1)编辑系统/etc/pam.d/system-auth 文件,在 auth 字段所在的那一部分添加如下pam_tally2.so模块的策略参数: auth required pam_tally2…...
Centos7上Docker安装
文章目录1.Docker常识2.安装Docker1.卸载旧版本Docker2.安装Docker3.启动Docker4.配置镜像加速前天开学啦~所以可以回来继续卷了哈哈哈,放假在家效率不高,在学校事情也少点(^_−)☆昨天和今天学了学Docker相关的知识,也算是简单了解了下&…...
新瑞鹏“狂飙”,宠物医疗是门好生意吗?
宠物看病比人还贵,正在让不少年轻一族陷入尴尬境地。在知乎上,有个高赞提问叫“你愿意花光积蓄,给宠物治病吗”,这个在老一辈人看来不可思议的魔幻选择,真实地发生在当下的年轻人身上。提问底下,有人表示自…...
Spring循环依赖问题,Spring是如何解决循环依赖的?
文章目录一、什么是循环依赖1、代码实例2、重要信息二、源码分析1、初始化Student对Student中的ClassRoom进行Autowire操作2、Student的自动注入ClassRoom时,又对ClassRoom的初始化3、ClassRoom的初始化,又执行自动注入Student的逻辑4、Student注入Class…...
更改SAP GUI登录界面信息
在SAP GUI的登录界面,左部输入登录信息如客户端、用户名、密码等,右部空余部分可维护一些登录信息文本,如登录的产品、客户端说明及注意事项等,此项操作详见SAP Notes 205487 – Own text on SAPGui logon screen 维护文档使用的…...
分布式微服务架构下网络通信的底层实现原理
在分布式架构中,网络通信是底层基础,没有网络,也就没有所谓的分布式架构。只有通过网络才能使得一大片机器互相协作,共同完成一件事情。 同样,在大规模的系统架构中,应用吞吐量上不去、网络存在通信延迟、我…...
进大厂必备的Java面试八股文大全(2023最新精简易懂版,八股文中的八股文)
为什么同样是跳槽,有些人薪资能翻三倍?” 最近一个粉丝发出了灵魂拷问,类似的问题我收到过很多次,身边也确实有认识的同事、朋友们有非常成功的跳槽经历和收益,先说一个典型例子: 学弟小 A 工作一年半&am…...
都说测试行业饱和了,为什么我们公司给初级测试开到了12K?
故事起因: 最近我有个刚毕业的学生问我说:我感觉现在测试行业已经饱和了,也不是说饱和了,是初级的测试根本就没有公司要,哪怕你不要工资也没公司要你,测试刚学出来,没有任何的项目经验和工作经验…...
解决Idea启动项目失败,提示Error running ‘XXXApplication‘: Command line is too long
IDEA版本为:IntelliJ IDEA 2018.2 (Ultimate Edition)一、问题描述有时当我们使用IDEA,Run/Debug一个SpringBoot项目时,可能会启动失败,并提示以下错误。Error running XXXApplication: Command line is too long. Shorten comman…...
GB/T28181-2022针对H.265、AAC的说明和技术实现
GB/T28181-2022规范说明GB/T28181-2022相对来GB/T28181-2016针对H.265、AAC的更新如下:——更改了“联网系统通信协议结构图”,媒体流通道增加了 H.265、G.722.1、AAC(见 4.3.1,2016 年版的 4.3.1)。——增加了对 H.26…...
开关电源环路稳定性分析(11)——观察法找零极点
大家好,这里是大话硬件。 这篇文章主要是分享如何用观察法直接写出补偿网络中的零极点的表达式。 在前面的文章中,我们分别整理了OTA和OPA型的补偿网络,当时有下面的结论。 针对某个固定的补偿网络,我们可以用数学的方法推导补偿…...
焕新启航,「龙蜥大讲堂」2023 年度招募来了!13 场技术分享先睹为快
龙蜥大讲堂是龙蜥推出的系列技术直播活动,邀请龙蜥社区的开发者们分享围绕龙蜥技术展开,包括但不限于内核、编译器、机密计算、容器、储存等相关技术领域。欢迎社区开发者们积极参与,共享技术盛宴。往期回顾龙蜥社区技术系列直播截至目前已举…...
推广传单制作工具
临近节日如何制作推广活动呢?没有素材制作满减活动宣传单怎么办?小编教你如何使用在线设计工具乔拓云,轻松设计商品的专属满减活动宣传单,不仅设计简单,还能自动生成活动分享链接,只需跟着小编下面的设计步…...
软件设计(十一)数据结构(上)
线性结构 线性表 线性表是n个元素的有限序列,通常记为(a1,a2....an),特点如下。 存在唯一的一个称作“第一个”的元素。存在位移的一个称作“最后一个”的元素。除了表头外,表中的每一个元素均只有唯一的直接前趋除了表尾外&…...
https协议
文章目录对称加密方案非对称加密方案对称加密方案非对称加密方案对称加密方案非对称加密方案数字证书因为HTTP是明文传输,所以会很有可能产生中间人攻击(获取并篡改传输在客户端及服务端的信息并不被人发觉),HTTPS加密应运而生。 …...
深入浅出C语言——数据在内存中的存储
文章目录一、数据类型详细介绍1. C语言中的内置类型2. 类型的基本归类:二. 整形在内存中的存储1. 原码、反码、补码2. 大小端三.浮点数存储规则一、数据类型详细介绍 1. C语言中的内置类型 C语言的内置类型有char、short、int、long、long long、float、double&…...
在 Centos 上在线安装 GitLab
作为程序员,其中一个愿望是拥有一个自己的代码存储库。在支持私有部署的代码存储库产品中,GitLab 是比较著名的了,所以今天我总结了一下在 Centos 上安装 GitLab 的过程。 依赖 基础依赖 首先,需要安装部分基础的依赖ÿ…...
模型解释性:SHAP包的使用
本篇博客介绍另一种事后可解释性方法:SHAP(SHapley Additive exPlanation)方法。 1. Shapley值理论 Shapley值是博弈论中的一个概念,通过衡量联盟中各成员对联盟总目标的贡献程度,从而根据贡献程度来进行联盟成员的利益分配,避免…...
算法训练营 day45 动态规划 0-1背包理论 分割等和子集
算法训练营 day45 动态规划 0-1背包理论 分割等和子集 0-1背包理论 有n件物品和一个最多能背重量为w 的背包。第i件物品的重量是weight[i],得到的价值是value[i] 。每件物品只能用一次,求解将哪些物品装入背包里物品价值总和最大。 在下面的讲解中&…...
A2A JS SDK 完整教程:快速入门指南
目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库ÿ…...
Java数值运算常见陷阱与规避方法
整数除法中的舍入问题 问题现象 当开发者预期进行浮点除法却误用整数除法时,会出现小数部分被截断的情况。典型错误模式如下: void process(int value) {double half = value / 2; // 整数除法导致截断// 使用half变量 }此时...
免费数学几何作图web平台
光锐软件免费数学工具,maths,数学制图,数学作图,几何作图,几何,AR开发,AR教育,增强现实,软件公司,XR,MR,VR,虚拟仿真,虚拟现实,混合现实,教育科技产品,职业模拟培训,高保真VR场景,结构互动课件,元宇宙http://xaglare.c…...
【 java 虚拟机知识 第一篇 】
目录 1.内存模型 1.1.JVM内存模型的介绍 1.2.堆和栈的区别 1.3.栈的存储细节 1.4.堆的部分 1.5.程序计数器的作用 1.6.方法区的内容 1.7.字符串池 1.8.引用类型 1.9.内存泄漏与内存溢出 1.10.会出现内存溢出的结构 1.内存模型 1.1.JVM内存模型的介绍 内存模型主要分…...
Kubernetes 网络模型深度解析:Pod IP 与 Service 的负载均衡机制,Service到底是什么?
Pod IP 的本质与特性 Pod IP 的定位 纯端点地址:Pod IP 是分配给 Pod 网络命名空间的真实 IP 地址(如 10.244.1.2)无特殊名称:在 Kubernetes 中,它通常被称为 “Pod IP” 或 “容器 IP”生命周期:与 Pod …...
uniapp 实现腾讯云IM群文件上传下载功能
UniApp 集成腾讯云IM实现群文件上传下载功能全攻略 一、功能背景与技术选型 在团队协作场景中,群文件共享是核心需求之一。本文将介绍如何基于腾讯云IMCOS,在uniapp中实现: 群内文件上传/下载文件元数据管理下载进度追踪跨平台文件预览 二…...
软件工程 期末复习
瀑布模型:计划 螺旋模型:风险低 原型模型: 用户反馈 喷泉模型:代码复用 高内聚 低耦合:模块内部功能紧密 模块之间依赖程度小 高内聚:指的是一个模块内部的功能应该紧密相关。换句话说,一个模块应当只实现单一的功能…...
LangChain 中的文档加载器(Loader)与文本切分器(Splitter)详解《二》
🧠 LangChain 中 TextSplitter 的使用详解:从基础到进阶(附代码) 一、前言 在处理大规模文本数据时,特别是在构建知识库或进行大模型训练与推理时,文本切分(Text Splitting) 是一个…...
React核心概念:State是什么?如何用useState管理组件自己的数据?
系列回顾: 在上一篇《React入门第一步》中,我们已经成功创建并运行了第一个React项目。我们学会了用Vite初始化项目,并修改了App.jsx组件,让页面显示出我们想要的文字。但是,那个页面是“死”的,它只是静态…...
FOPLP vs CoWoS
以下是 FOPLP(Fan-out panel-level packaging 扇出型面板级封装)与 CoWoS(Chip on Wafer on Substrate)两种先进封装技术的详细对比分析,涵盖技术原理、性能、成本、应用场景及市场趋势等维度: 一、技术原…...
