当前位置: 首页 > news >正文

【Flink网络数据传输(4)】RecordWriter(下)封装数据并发送到网络的过程

文章目录

  • 一. RecordWriter封装数据并发送到网络
    • 1. 数据发送到网络的具体流程
    • 2. 源码层面
      • 2.1. Serializer的实现逻辑
        • a. SpanningRecordSerializer的实现
        • b. SpanningRecordSerializer中如何对数据元素进行序列化
      • 2.2. 将ByteBuffer中间数据写入BufferBuilder
  • 二. BufferBuilder申请资源并创建
    • 1. ChannelSelectorRecordWriter创建BufferBuilder
    • 2. BroadcastRecordWriter创建BufferBuilder

一. RecordWriter封装数据并发送到网络

1. 数据发送到网络的具体流程

RecordWriter对接入的StreamRecord数据进行序列化并等待下游任务消费的过程,整个过程细节如下。

  1. StreamRecord通过RecordWriterOutput写入RecordWriter,并在RecordWriter中通过RecordSerializer组件将StreamRecord序列化为ByteBuffer数据格式。

  2. RecordWriter向ResultPartition申请BufferBuilder对象,用于构建BufferConsumer对象,将序列化后的二进制数据存储在申请到的Buffer中。ResultPartition会向LocalBufferPool申请MemorySegment内存块,用于存储Buffer数据

  3. BufferBuilder中会不断接入ByteBuffer数据,直到将BufferBuilder中的Buffer空间占满,此时会申请新的BufferBuilder继续构建BufferConsumer数据集。

  4. Buffer构建完成后,会调用flushTargetPartition()方法,让ResultPartition向下游输出数据,此时会通知NetworkSequenceViewReader组件开始消费ResultSubPartition中的BufferConsumer对象。

  5. 当BufferConsumer中Buffer数据被推送到网络后,回收BufferConsumer中的MemorySegment内存空间,继续用于后续的消息处理。

在这里插入图片描述

 

2. 源码层面

接下来我们从源码的角度了解RecordWriter具体处理数据的逻辑。在RecordWriterOutput中调用pushToRecordWriter方法将数据写出。

在这里插入图片描述

通过recordWriter.emit(serializationDelegate)方法,将数据元素发送到RecordWriter中进行处理。主要逻辑如下

  1. 序列化数据为ByteBuffer二进制数据,并缓存在SpanningRecordSerializer.serializationBuffer对象中。
  2. 将序列化器生成的中间数据复制到指定分区中,实际上就是将ByteBuffer数据复制到BufferBuiler对象中。
  3. 如果BufferBuiler中存储了完整的数据元素,就会清空序列化器的中间数据,因为序列化器中累积的数据不宜过大。
protected void emit(T record, int targetSubpartition) throws IOException {  checkErroneous();  targetPartition.emitRecord(serializeRecord(serializer, record), targetSubpartition);  if (flushAlways) {  targetPartition.flush(targetSubpartition);  }  
}protected void emit(T record, int targetChannel) throws IOException, InterruptedException {checkErroneous();// 数据序列化serializer.serializeRecord(record);// 将序列化器中的数据复制到指定分区中if (copyFromSerializerToTargetChannel(targetChannel)) {// 清空序列化器serializer.prune();}
}

 

2.1. Serializer的实现逻辑

接着了解如何将序列化器中的数据转换成Buffer并存储到ResultPartiton中,最终将数据发送到下游。

a. SpanningRecordSerializer的实现

SpanningRecordSerializer实现将序列化后的BytesBuffer数据写入BufferBuilder。

SpanningRecordSerializer对象主要包含了DataOutputSerializer serializationBuffer和ByteBuffer dataBuffer两个成员变量。

  • DataOutputSerializer可以将数据转换成二进制格式并存储在byte[]数组中。在serialization中会调用serializationBuffer.wrapAsByteBuffer()方法,将serializationBuffer中生成的byte[]数组转换成ByteBuffer数据结构,并赋值给dataBuffer对象。
  • ByteBuffer是Java NIO中用于对二进制数据进行操作的Buffer接口,底层有DirectByteBuffer和HeapByteBuffer等实现,通过ByteBuffer提供的方法,可以轻松实现对二进制数据的操作。

 

b. SpanningRecordSerializer中如何对数据元素进行序列化

SpanningRecordSerializer.serializeRecord()方法主要逻辑如下。

1)清理serializationBuffer的中间数据,实际上就是将byte[]数组的position参数置为0。
2)设定serialization buffer的初始容量,默认不小于4。
3)将数据元素写入serializationBuffer的bytes[]数组。(所有数据元素都实现了IOReadableWritable接口,可以直接将数据对象转换为二进制格式)
4)获取serializationBuffer的长度信息,并写入serializationBuffer。
5)将serializationBuffer中的byte[]数据封装为java.io.ByteBuffer数据结构,最终赋值到dataBuffer的中间结果中。

public void serializeRecord(T record) throws IOException {if (CHECKED) {if (dataBuffer.hasRemaining()) {throw new IllegalStateException("Pending serialization of previous record.");}}// 首先清理serializationBuffer中的数据serializationBuffer.clear();// 设定serialization buffer数量serializationBuffer.skipBytesToWrite(4);// 将record数据写入serializationBufferrecord.write(serializationBuffer);// 获取serializationBuffer的长度信息并记录到serializationBuffer对象中int len = serializationBuffer.length() - 4;serializationBuffer.setPosition(0);serializationBuffer.writeInt(len);serializationBuffer.skipBytesToWrite(len);// 对serializationBuffer进行wrapp处理,转换成ByteBuffer数据结构dataBuffer = serializationBuffer.wrapAsByteBuffer();
}

Flink 1.12版本中RecordWriter就提供了serializeRecord的能力,没有单拎出来实现。

 

2.2. 将ByteBuffer中间数据写入BufferBuilder

首先BufferBuilder用于构建完整的Buffer数据。在copyFromSerializerToTargetChannel()方法中实现了将RecordSerializer中的ByteBuffer中间数据写入BufferBuilder的逻辑:

  1. 对序列化器进行Reset操作,重置初始化位置。
  2. 将序列化器的ByteBuffer中间数据写入BufferBuilder。
  3. 判断当前BufferBuilder是否构建了完整的Buffer数据,完成BufferBuilder中Buffer的构建。
  4. 判断SerializationResult中是否具有完整的数据元素,如果是则将pruneTriggered置为True,然后清空当前的BufferBuilder,并跳出循环。
  5. 创建新的bufferBuilder,继续从序列化器中将中间数据复制到BufferBuilder中。
  6. 指定flushAlways参数为True,调用flushTargetPartition()方法将数据写入ResultPartition。为防止过度频繁地将数据写入ResultPartiton,在RecordWriter中会有独立的outputFlusher线程(在构造器中),周期性地将构建出来的Buffer数据推送到ResultPartiton本地队列中存储,默认延迟为100ms。
protected boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException {// 对序列化器进行Reset操作,初始化initial positionserializer.reset();// 创建BufferBuilderboolean pruneTriggered = false;BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);// 调用序列化器将数据写入bufferBuilderSerializationResult result = serializer.copyToBufferBuilder(bufferBuilder);// 如果SerializationResult是完整Bufferwhile (result.isFullBuffer()) {// 则完成创建Buffer数据的操作finishBufferBuilder(bufferBuilder);// 如果是完整记录,则将pruneTriggered置为Trueif (result.isFullRecord()) {pruneTriggered = true;emptyCurrentBufferBuilder(targetChannel);break;}// 创建新的bufferBuilder,继续复制序列化器中的数据到BufferBuilder中bufferBuilder = requestNewBufferBuilder(targetChannel);result = serializer.copyToBufferBuilder(bufferBuilder);}checkState(!serializer.hasSerializedData(), "All data should be written at once");// 如果指定的flushAlways,则直接调用flushTargetPartition将数据写入ResultPartitionif (flushAlways) {flushTargetPartition(targetChannel);}return pruneTriggered;
}

 

二. BufferBuilder申请资源并创建

1. ChannelSelectorRecordWriter创建BufferBuilder

在ChannelSelectorRecordWriter.getBufferBuilder()方法中定义了BufferBuilder的创建过程。

//1. targetChannel确认数据写入的分区,ID与下游InputGate中的InputChannelID是对应的
//2. 
public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {//在ChannelSelectorRecordWriter中维护了//bufferBuilders[]数组,用于存储创建好的BufferBuilder对象if (bufferBuilders[targetChannel] != null) {return bufferBuilders[targetChannel];} else {//只有在无法从bufferBuilders[]中获取BufferBuilder时,//才会调用requestNewBufferBuilder()方法创建新的BufferBuilder对象。return requestNewBufferBuilder(targetChannel);}
}

requestNewBufferBuilder()方法逻辑如下

  1. 检查bufferBuilders[]的状态,确保bufferBuilders[targetChannel]为空或者bufferBuilders[targetChannel].isFinished()方法返回值为True。
  2. 调用targetPartition.getBufferBuilder()方法获取新的BufferBuilder,这里的targetPartition就是前面提到的ResultPartition。在ResultPartition中会向LocalBufferPool申请Buffer内存空间,用于存储序列化后的ByteBuffer数据。
  3. 向targetPartition添加通过bufferBuilder构建的BufferConsumer对象,bufferBuilder和BufferConsumer内部维护了同一个Buffer数据。BufferConsumer会被存储到ResultSubpartition的BufferConsumer队列中。
  4. 将创建好的bufferBuilder添加至数组,用于下次直接获取和构建BufferConsumer对象。
public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {checkState(bufferBuilders[targetChannel] == null || bufferBuilders[targetChannel].isFinished());// 调用targetPartition获取BufferBuilderBufferBuilder bufferBuilder = targetPartition.getBufferBuilder();// 向targetPartition中添加BufferConsumertargetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(),targetChannel);// 将创建好的bufferBuilder添加至数组bufferBuilders[targetChannel] = bufferBuilder;return bufferBuilder;
}

 

2. BroadcastRecordWriter创建BufferBuilder

在BroadcastRecordWriter内部创建BufferBuilder的过程中,会将创建的bufferConsumer对象添加到所有的ResultSubPartition中,实现将Buffer数据下发至所有InputChannel,如下代码:

public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {checkState(bufferBuilder == null || bufferBuilder.isFinished());BufferBuilder builder = targetPartition.getBufferBuilder();if (randomTriggered) {targetPartition.addBufferConsumer(builder.createBufferConsumer(), targetChannel);} else {try (BufferConsumer bufferConsumer = builder.createBufferConsumer()) {for (int channel = 0; channel < numberOfChannels; channel++) {targetPartition.addBufferConsumer(bufferConsumer.copy(), channel);}}}bufferBuilder = builder;return builder;
}

 

以上步骤就是在RecordWriter组件中将数据元素序列化成二进制格式,然后通过BufferBuilder构建成Buffer类型数据,最终存储在ResultPartition的ResultSubPartition中。

这是从Task的层面了解数据网络传输过程,下篇了解在TaskManager中如何构建底层的网络传输通道。

 

相关文章:

【Flink网络数据传输(4)】RecordWriter(下)封装数据并发送到网络的过程

文章目录 一. RecordWriter封装数据并发送到网络1. 数据发送到网络的具体流程2. 源码层面2.1. Serializer的实现逻辑a. SpanningRecordSerializer的实现b. SpanningRecordSerializer中如何对数据元素进行序列化 2.2. 将ByteBuffer中间数据写入BufferBuilder 二. BufferBuilder申…...

【牛客】VL74 异步复位同步释放

描述 题目描述&#xff1a; 请使用异步复位同步释放来将输入数据a存储到寄存器中&#xff0c;并画图说明异步复位同步释放的机制原理 信号示意图&#xff1a; clk为时钟 rst_n为低电平复位 d信号输入 dout信号输出 波形示意图&#xff1a; 输入描述&#xff1a; clk为时…...

CSS3笔记

1.相同优先级的样式以写在后面的为主。 2.交集选择器&#xff0c;并且 条件挨在一起 p.rich{...} /*p元素class有rich的元素*/ 3.并集选择器&#xff0c;或者 逗号隔开 .class1,class2{...}/*满足其中一个类名都会使用该样式*/ 4.后代选择器 空格 隔开 所有符合的包括孙子及…...

两天学会微服务网关Gateway-Gateway工作原理

锋哥原创的微服务网关Gateway视频教程&#xff1a; Gateway微服务网关视频教程&#xff08;无废话版&#xff09;_哔哩哔哩_bilibiliGateway微服务网关视频教程&#xff08;无废话版&#xff09;共计17条视频&#xff0c;包括&#xff1a;1_Gateway简介、2_Gateway工作原理、3…...

备忘 clang diagnostic 类的应用示例 ubuntu 22.04

系统的ncurses环境有些问题 通过源码安装了ncurses6.3后&#xff0c;才可以在 llvmort-18.1.rc4中编译通过示例&#xff1a; 1&#xff0c;折腾环境 ncurses-6.3$ ./configure ncurses-6.3$ make -j ncurses-6.3$ sudo make install sudo apt install libtinfo5 sudo…...

Git小册-笔记迁移

Git简介 Git是目前世界上最先进的分布式版本控制系统&#xff08;没有之一&#xff09;。 所有的版本控制系统&#xff0c;其实只能跟踪文本文件的改动&#xff0c;比如TXT文件&#xff0c;网页&#xff0c;所有的程序代码等等&#xff0c;Git也不例外。版本控制系统可以告诉…...

【你也能从零基础学会网站开发】Web建站之HTML+CSS入门篇 传统布局和Web标准布局的区别

&#x1f680; 个人主页 极客小俊 ✍&#x1f3fb; 作者简介&#xff1a;web开发者、设计师、技术分享 &#x1f40b; 希望大家多多支持, 我们一起学习和进步&#xff01; &#x1f3c5; 欢迎评论 ❤️点赞&#x1f4ac;评论 &#x1f4c2;收藏 &#x1f4c2;加关注 传统布局与…...

005-事件捕获、冒泡事件委托

事件捕获、冒泡&事件委托 1、事件捕获与冒泡2、事件冒泡示例3、阻止事件冒泡4、阻止事件默认行为5、事件委托6、事件委托优点 1、事件捕获与冒泡 2、事件冒泡示例 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /…...

SpringBoot快速入门(介绍,创建的3种方式,Web分析)

目录 一、SpringBoot介绍 二、SpringBootWeb快速入门 创建 定义请求处理类 运行测试 三、Web分析 一、SpringBoot介绍 我们可以打开Spring的官网(Spring | Home)&#xff0c;去看一下Spring的简介&#xff1a;Spring makes Java simple。 Spring发展到今天已经形成了一种…...

VMwareWorkstation17.0虚拟机搭建WindowsME虚拟机(完整安装步骤详细图文教程)

VMwareWorkstation17.0虚拟机搭建WindowsME虚拟机&#xff08;完整安装步骤详细图文教程&#xff09; 一、Windows ME安装准备工作3.1 Windows ME下载地址3.2 DOS软盘版下载地址3.3 UltraISO 4.用VMware虚拟模仿当年的电脑配置4.1 新建虚拟机4.2 类型配置4.3 类型配置4.4 选择版…...

【Java设计模式】八、装饰者模式

文章目录 0、背景1、装饰者模式2、案例3、使用场景4、源码中的实际应用 0、背景 有个快餐店&#xff0c;里面的快餐有炒饭FriedRice 和 炒面FriedNoodles&#xff0c;且加配菜后总价不一样&#xff0c;计算麻烦。如果单独使用继承&#xff0c;那就是&#xff1a; 类爆炸不说&a…...

python INI文件操作与configparser内置库

目录 INI文件 configparser内置库 类与方法 操作实例 导入INI 查询所有节的列表 判断某个节是否存在 查询某个节的所有键的列表 判断节下是否存在某个键 增加节点 删除节点 增加节点的键 修改键值 保存修改结果 获取键值 获取节点所有键值 INI文件 即Initiali…...

软考笔记--软件系统质量属性

一.软件系统质量属性的概念 软件系统的质量就是“软件系统与明确地和隐含的定义的需求相一致的程度”。更具体地说&#xff0c;软件系统质量就是软件与明确地叙述的功能和性能需求文档中明确描述的开发标准以及任何专业开发的软件产品都应该具有的隐含特征相一致的程度。从管理…...

新型设备巡检方案-手机云巡检

随着科技的不断发展&#xff0c;设备巡检工作也在逐步向智能化、高效化方向转变。传统的巡检方式往往需要人工逐个设备检查&#xff0c;耗时耗力&#xff0c;效率低下&#xff0c;同时还容易漏检和误检。而新型设备巡检应用—手机蓝牙云巡检的出现&#xff0c;则为设备巡检工作…...

node.js 下 mysql2 的 CURD 功能极简封装

此封装适合于使用 SQL 直接操作数据库的小型后端项目&#xff0c;更多功能请查阅MySQL2官网 // 代码保存到单独的 js 文件const mysql require(mysql2/promise)const debug true let conn/*** 执行 SQL 语句* param {String} sql* param {*} params* returns {Array}*/ const…...

Cloud-Eureka服务治理-Ribbon负载均衡

构建Cloud父工程 父工程只做依赖版本管理 不引入依赖 pom.xml <packaging>pom</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.9.RELEA…...

Northwestern University-844计算机科学与技术/软件工程-机试指南【考研复习】

本文提到的西北大学是位于密歇根湖泊畔的西北大学。西北大学&#xff08;英语&#xff1a;Northwestern University&#xff0c;简称&#xff1a;NU&#xff09;是美国的一所著名私立研究型大学。它由九人于1851年创立&#xff0c;目标是建立一所为西北领地地区的人服务的大学。…...

【Linux的网络编程】

1、OSI的七层网络模型有哪些&#xff0c;每一层有什么作用&#xff1f; 答&#xff1a;&#xff08;1&#xff09;应用层&#xff1a;负责处理不同应用程序之间的通信&#xff0c;需要满足提供的协议&#xff0c;确保数据发送方和接收方的正确。 &#xff08;2&#xff09;表…...

vue-seamless-scroll 点击事件不生效

问题&#xff1a;在使用此插件时发现&#xff0c;列表内容前几行还是能正常点击的&#xff0c;但是从第二次出现的列表开始就没有点击事件了 原因&#xff1a;因为html元素是复制出来的&#xff08;滚动组件是将后面的复制出来一份&#xff0c;进行填铺页面&#xff0c;方便滚动…...

前端工程部署步骤小记

安装mqtt: “mqtt”: “^4.3.7”, npm install git panjiacheng 后台demo下载zip 1、npm install --registryhttps://registry.npmmirror.com 2、npm run dev 前端demo创建 1、安装npm 2、npm install vuenext 3、npm install -g vue/cli 查看版本 vue --version 4、更新插件…...

Android Wi-Fi 连接失败日志分析

1. Android wifi 关键日志总结 (1) Wi-Fi 断开 (CTRL-EVENT-DISCONNECTED reason3) 日志相关部分&#xff1a; 06-05 10:48:40.987 943 943 I wpa_supplicant: wlan0: CTRL-EVENT-DISCONNECTED bssid44:9b:c1:57:a8:90 reason3 locally_generated1解析&#xff1a; CTR…...

【SpringBoot】100、SpringBoot中使用自定义注解+AOP实现参数自动解密

在实际项目中,用户注册、登录、修改密码等操作,都涉及到参数传输安全问题。所以我们需要在前端对账户、密码等敏感信息加密传输,在后端接收到数据后能自动解密。 1、引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId...

vscode(仍待补充)

写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh&#xff1f; debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...

如何理解 IP 数据报中的 TTL?

目录 前言理解 前言 面试灵魂一问&#xff1a;说说对 IP 数据报中 TTL 的理解&#xff1f;我们都知道&#xff0c;IP 数据报由首部和数据两部分组成&#xff0c;首部又分为两部分&#xff1a;固定部分和可变部分&#xff0c;共占 20 字节&#xff0c;而即将讨论的 TTL 就位于首…...

有限自动机到正规文法转换器v1.0

1 项目简介 这是一个功能强大的有限自动机&#xff08;Finite Automaton, FA&#xff09;到正规文法&#xff08;Regular Grammar&#xff09;转换器&#xff0c;它配备了一个直观且完整的图形用户界面&#xff0c;使用户能够轻松地进行操作和观察。该程序基于编译原理中的经典…...

学校时钟系统,标准考场时钟系统,AI亮相2025高考,赛思时钟系统为教育公平筑起“精准防线”

2025年#高考 将在近日拉开帷幕&#xff0c;#AI 监考一度冲上热搜。当AI深度融入高考&#xff0c;#时间同步 不再是辅助功能&#xff0c;而是决定AI监考系统成败的“生命线”。 AI亮相2025高考&#xff0c;40种异常行为0.5秒精准识别 2025年高考即将拉开帷幕&#xff0c;江西、…...

Mobile ALOHA全身模仿学习

一、题目 Mobile ALOHA&#xff1a;通过低成本全身远程操作学习双手移动操作 传统模仿学习&#xff08;Imitation Learning&#xff09;缺点&#xff1a;聚焦与桌面操作&#xff0c;缺乏通用任务所需的移动性和灵活性 本论文优点&#xff1a;&#xff08;1&#xff09;在ALOHA…...

Reasoning over Uncertain Text by Generative Large Language Models

https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829 1. 概述 文本中的不确定性在许多语境中传达,从日常对话到特定领域的文档(例如医学文档)(Heritage 2013;Landmark、Gulbrandsen 和 Svenevei…...

SiFli 52把Imagie图片,Font字体资源放在指定位置,编译成指定img.bin和font.bin的问题

分区配置 (ptab.json) img 属性介绍&#xff1a; img 属性指定分区存放的 image 名称&#xff0c;指定的 image 名称必须是当前工程生成的 binary 。 如果 binary 有多个文件&#xff0c;则以 proj_name:binary_name 格式指定文件名&#xff0c; proj_name 为工程 名&…...

CRMEB 中 PHP 短信扩展开发:涵盖一号通、阿里云、腾讯云、创蓝

目前已有一号通短信、阿里云短信、腾讯云短信扩展 扩展入口文件 文件目录 crmeb\services\sms\Sms.php 默认驱动类型为&#xff1a;一号通 namespace crmeb\services\sms;use crmeb\basic\BaseManager; use crmeb\services\AccessTokenServeService; use crmeb\services\sms\…...