当前位置: 首页 > news >正文

Flink:快速掌握批处理数据源的创建方法

Flink 社区最近 “基于FLIP-27” 设计了新的 Source 框架 。一些连接器(API)已迁移到这个新框架。本文介绍了如何使用这个新框架创建批处理源。 它是在为Cassandra实现Flink 批处理源时构建的。如果您有兴趣贡献或迁移连接器,这篇文章非常适合!

1.实现Source组件

Source架构如图:

1.1 Source框架

Cassandra 源示例:

https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java

源接口仅在所有其他组件之间起“粘合”作用。它的作用是实例化所有这些并定义源Boundedness 。我们还在这里进行源配置以及用户配置验证。

1.2 SourceReader

Cassandra SourceReader 示例:

https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSourceReader.java

如上图所示,SourceReader 的实例( 在本文的后续部分中我们将其简称为阅读器)在任务管理器中并行运行,以读取划分为Split 的实际数据。阅读器从SplitEnumerator请求拆分,并将生成的拆分结果返回给它们。

Flink 提供了负责所有线程的SourceReaderBase实现。对于大多数情况,Flink 还为此类提供了有用的扩展:

SingleThreadMultiplexSourceReaderBase :

https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.html

该类已配置了线程模型:每个SplitReader 实例使用一个线程读取拆分(但任务管理器中存在多个 SplitReader 实例)。

我们在 SourceReader 类中接下来要做的事情是:

  • 提供 SplitReader 供应者;

  • 创建一个记录发射器;

  • 为 SplitReaders 创建共享资源(会话等)。由于 SplitReader 供应者是在 super() 调用的 SourceReader 构造函数中创建的,因此使用 SourceReader 工厂创建共享资源并将它们传递给供应者是一个好主意;

  • 实现start():这里我们应该要求枚举器进行第一次分割;

  • 重写SourceReaderBase 父类中的close() 以释放任何创建的资源(例如共享资源);

  • 实现initializedState(),以从Split 创建可变的SplitState;

  • 实现toSplitType() ,以从可变的 SplitState 创建 Split;

  • 实现onSplitFinished():这里,因为它是一个批处理源(有限数据),我们应该要求Enumerator进行下一次分割。

1.3 Split和SplitState

Cassandra Split示例:

https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplit.java

SourceSplit表示源数据的一个分区。拆分的定义取决于我们正在读取的后端。例如,它可以是(分区开始,分区结束)元组或(偏移量,分割大小)元组。

在任何情况下,Split对象都应该被视为不可变对象:对它的任何更新都应该在相关的SplitState上完成。拆分状态是将存储在Flink检查点内的状态。一个检查点可能发生在两次获取一次分裂之间。因此,如果我们正在读取拆分,我们必须在拆分状态中存储读取进程的当前状态。这个当前状态需要是可序列化的(因为它将成为检查点的一部分),并且后端源可以从中恢复。这样,在故障转移的情况下,读取可以从中断的地方恢复。因此,我们确保不会有重复或丢失的数据。

例如,如果记录的读取顺序在后端是确定的,那么拆分状态可以存储n条已经读取的记录,以便在故障转移后在n+1处重新启动。

1.4 SplitEnumerator和SplitEnumeratorState

SplitEnumerator负责创建拆分并将其提供给阅读器。只要有可能,最好是惰性地生成分割,这意味着每次读取器向枚举数请求分割时,枚举数都会按需生成一个并将其分配给阅读器。

为此,我们实现了SplitEnumerator handleSplitRequest() 方法。延迟拆分生成比拆分发现更可取,在拆分发现中,我们预先生成所有拆分并存储它们,等待将它们分配给阅读器。实际上,在某些情况下,分割的数量可能非常大,并且会消耗大量内存,这可能会在分散阅读器的情况下产生问题。该框架通过实现addReader()提供了对阅读器注册进行操作的能力。但是,由于我们要进行延迟分割生成,因此在那里我们没有什么可做的。在某些情况下,生成拆分的成本太高,因此我们可以预先生成一批(不是全部)拆分来分摊这个成本。需要考虑批处理分割的 数量/大小,以避免消耗过多的内存。

长话短说,Source实现的棘手部分是拆分源数据。最好的平衡是不要有太多的分割(这会导致太多的内存消耗),也不要太少(这会导致次优的并行性)。满足这种平衡的一个好方法是预先评估源数据的大小,并允许用户指定拆分将占用的最大内存。这样他们就可以根据任务管理器上的可用内存配置此参数。这个参数是可选的,所以Source程序需要提供一个默认值。此外,源代码需要控制用户提供的max-split-size不能太小,否则会导致太多的分割。一般的经验法则是给用户一些自由,来保护他们免受不必要的行为。对于这些安全措施,刚性阈值不能很好地工作,因为当突然超过阈值时,Source可能开始失效。

例如,如果我们强制分割的数量低于并行度的两倍,如果作业经常在一个不断增长的表上运行,那么在某个时刻,将会有越来越多的max-split-size的分割,并且将超过阈值。当然,需要在不读取实际数据的情况下评估源数据的大小。Cassandra连接器就是这样做的。

另一个重要的话题是状态。如果作业管理器失败,则拆分枚举器需要恢复。对于分割,我们需要为枚举器提供一个状态,它将成为检查点的一部分。恢复后,将重建枚举数并接收一个枚举数状态,以恢复其先前的状态。在检查点上,当调用SplitEnumerator snapshotState()时,枚举数返回其状态。状态必须包含恢复故障转移后枚举器停止的位置所需的所有内容。在延迟分割生成场景中,状态将包含生成下一个分割所需的所有内容。例如,它可以是下一个分裂的开始偏移量,分裂大小,仍然生成的分裂的数量等等,但是SplitEnumeratorState也必须包含一个分裂的列表,不是发现的分裂的列表,而是要重新分配的分裂的列表。实际上,每当reader失败时,如果它在最后一个检查点之后被分配了分片,那么检查点就不会包含这些分片。因此,在恢复时,阅读器将不再分配分片。有一个回调来处理这种情况:addSplitsBack()。在这里,分配给故障读取器的分片可以放回枚举器状态,以便以后重新分配给阅读器。这里没有内存大小风险,因为要重新分配的分片数量非常低。

以上是关于分裂的更重要的话题。还有两个方法需要实现:用于资源创建/处置的常用start() /close()方法。关于start()的实现,Flink连接器框架提供了enumeratorContext callAsync()实用程序来异步运行长时间的处理,比如拆分准备或拆分发现(如果不可能生成延迟拆分)。实际上,start()方法在源协调器线程中运行,我们不希望长时间阻塞它。

1.5 SplitReader 

Cassandra SplitReader示例:

https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java

这个类负责读取框架调用handleSplitsChanges()时接收到的实际分片。拆分阅读器的主要部分是fetch()实现,我们读取接收到的所有分片,并将读取的记录作为RecordsBySplits对象返回。该对象包含分割id到所属记录的映射,以及已完成分割的id。需要考虑的要点:

  • fetch调用必须是非阻塞的。如果其代码中的任何调用是同步的并且可能很长,则必须提供fetch()的转义。当框架调用wakeUp()时,我们应该通过设置一个AtomicBoolean来中断获取。

  • Fetch调用需要是可重入的:一个已经读过的分片不能被重读。我们应该将其从分割列表中删除,并在返回的RecordsBySplits中将其id添加到已完成的分割(以及空分割)中。

实现者提前退出fetch()方法是完全可以的。此外,失败可能会中断获取。在这两种情况下,框架稍后都会再次调用fetch()。在这种情况下,fetch方法必须使用已经讨论过的拆分状态从停止读取的位置恢复读取。如果由于后端约束而无法恢复对分割的读取,那么唯一的解决方案就是自动读取分割(要么根本不读取分割,要么完全读取分割)。这样,在读取中断的情况下,不会输出任何内容,并且可以在下一次读取调用时从开始重新读取分割,从而没有重复。但是,如果完全读取分割,则需要考虑以下几点:

  • 我们应该确保总的拆分内容(来自源的记录)适合内存,例如通过指定以字节为单位的最大拆分大小(请参阅SplitEnumarator)。

  • 分裂状态变得无用,只需要一个分裂类。

1.6 RecordEmitter

 Cassandra RecordEmitter示例:

https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraRecordEmitter.java

SplitReader以实现者为每条记录提供的中间记录格式的形式读取记录。它可以是后端返回的原始格式,也可以是允许事后提取实际记录的任何格式。该格式不是源程序期望的最终输出格式。它包含转换为记录输出格式所需的所有内容。我们需要实现RecordEmitter#emitRecord()来完成这个转换。一个好的模式是用一个映射函数初始化RecordEmitter。实现必须是幂等的。实际上,这种方法可能会在中途中断。在这种情况下,稍后将再次将同一组记录传递给记录发射器。

1.7 Serializers

Cassandra SplitSerializer和SplitEnumeratorStateSerializer示例:

https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.javahttps://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java

我们需要为以下情况提供单例序列化器:

  • 拆分:当将拆分从枚举器发送到读取器时,以及当检查读取器的当前状态时,拆分被序列化

  • SplitEnumeratorState:序列化器用于SplitEnumerator#snapshotState()的结果。

对于两者,我们都需要实现SimpleVersionedSerializer。在一些重要的地方需要注意:

  • 在Flink中禁止使用Java序列化,主要是出于迁移考虑。我们应该使用ObjectOutputStream手动编写对象的字段。当一个类不被ObjectOutputStream(不是String, Integer, Long…)支持时,我们应该将对象的大小以字节为单位写入Integer,然后写入转换为byte[]的对象。类似的方法用于序列化集合。首先写入集合的元素数量,然后序列化所有包含的对象。当然,对于反序列化,我们以相同的顺序进行完全相同的读取。

  • 可能会有很多拆分,所以我们应该缓存SplitSerializer中使用的OutputStream。我们可以使用。

ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE = ThreadLocal. withinitial (() -> new DataOutputSerializer(64));

初始流大小取决于拆分的大小。

2.测试&&总结

本文收集了实现领域的反馈,因为javadoc无法涵盖高性能和可维护源的所有实现细节。希望你喜欢这篇文章,并且它给了你为Flink项目贡献一个新连接器的愿望!

Flink:快速掌握批处理数据源的创建方法

相关文章:

Flink:快速掌握批处理数据源的创建方法

Flink 社区最近 “基于FLIP-27” 设计了新的 Source 框架 。一些连接器&#xff08;API&#xff09;已迁移到这个新框架。本文介绍了如何使用这个新框架创建批处理源。 它是在为Cassandra实现Flink 批处理源时构建的。如果您有兴趣贡献或迁移连接器&#xff0c;这篇文章非常适合…...

基于cubeMX的正点原子miniSTM32对W25Q64的存储使用

一、实现目标 使用cubeMX建立项目工程&#xff0c;结合正点原子提供的hal库对W25Q64闪存调用的例程&#xff0c;实现W25Q64的读写。 二、实现过程 1、首先建立cubeMX工程&#xff0c;其他项设置不再叙述&#xff0c;只看连接W25Q64的SPI设置&#xff0c;这里使用SPI1&#xf…...

C++笔记(三)

封装意义: 在设计类的时候&#xff0c;属性和行为写在一起&#xff0c;表现事物 类在设计时&#xff0c;可以把属性和行为放在不同的权限下&#xff0c;加以控制。 访问权限有三种&#xff1a; public 公共 类内 类外都可以访问&#xff0c; protected保护 类内可以访问…...

c语言不定参数

时间记录&#xff1a;2024/1/22 一、不定参数的函数定义和使用到的c函数 &#xff08;1&#xff09;定义 void fun1(参数类型 argName,...); 示例&#xff1a; void fun1(int count,...);&#xff08;2&#xff09;获取不定参数的值 #include <stdarg.h> //包含头文件…...

云手机与实体手机的对比

在数字化时代&#xff0c;云手机作为一种虚拟手机在云端服务器上运行&#xff0c;与传统的实体手机相比存在诸多差异。让我们深入探讨云手机与实体手机之间的区别&#xff0c;以便更好地了解它们的特点和优势。 外观上的差异 实体手机具有实际的外观和重量&#xff0c;占据一定…...

diffusion 和 gan 的优缺点对比

sample速度GAN更快&#xff0c;Diffusion需要迭代更多次。 训练难度GAN 的训练可能是不稳定的&#xff0c;容易出现模式崩溃和训练振荡等问题。Diffusion 训练loss收敛性好&#xff0c;比较平稳。 模拟分布连续性Diffusion相较于GAN可以模拟更加复杂&#xff0c;更加非线性的分…...

VC++中使用OpenCV进行人脸检测

VC中使用OpenCV进行人脸检测 对于上面的图像&#xff0c;如何使用OpenCV进行人脸检测呢&#xff1f; 使用OpenCV进行人脸检测十分简单&#xff0c;OpenCV官网给了一个Python人脸检测的示例程序&#xff0c; objectDetection.py代码如下&#xff1a; from __future__ import p…...

11Docker数据持久化

Docker数据持久化 容器中数据持久化主要有两种方式&#xff1a; 数据卷&#xff08;Data Volumes&#xff09;数据卷容器&#xff08;Data Volumes Dontainers&#xff09; 数据卷 数据卷是一个可供一个或多个容器使用的特殊目录&#xff0c;可以绕过UFS&#xff08;Unix F…...

RK3588平台开发系列讲解(视频篇)RKMedia框架

文章目录 一、 RKMedia框架介绍二、 RKMedia框架API三、 视频处理流程四、venc 测试案例沉淀、分享、成长,让自己和他人都能有所收获!😄 📢RKMedia是RK提供的一种多媒体处理方案,可实现音视频捕获、音视频输出、音视频编解码等功能。 一、 RKMedia框架介绍 功能: VI(输…...

Vue3 Teleport 将组件传送到外层DOM位置

✨ 专栏介绍 在当今Web开发领域中&#xff0c;构建交互性强、可复用且易于维护的用户界面是至关重要的。而Vue.js作为一款现代化且流行的JavaScript框架&#xff0c;正是为了满足这些需求而诞生。它采用了MVVM架构模式&#xff0c;并通过数据驱动和组件化的方式&#xff0c;使…...

【学网攻】 第(5)节 -- Cisco VTP的使用

文章目录 【学网攻】 第(1)节 -- 认识网络【学网攻】 第(2)节 -- 交换机认识及使用【学网攻】 第(3)节 -- 交换机配置聚合端口【学网攻】 第(4)节 -- 交换机划分Vlan 前言 网络已经成为了我们生活中不可或缺的一部分&#xff0c;它连接了世界各地的人们&#xff0c;让信息和资…...

uniapp复选框 实现排他选项

选择了排他选项之后 复选框其他选项不可以选择 <view class"reportData" v-for"(val, index) in obj" :key"index"> <view v-if"val.type 3" ><u-checkbox-group v-model"optionValue" placement"colu…...

openssl3.2/test/certs - 004 - cross root and root cross cert

文章目录 openssl3.2/test/certs - 004 - cross root and root cross cert概述笔记END openssl3.2/test/certs - 004 - cross root and root cross cert 概述 索引贴 openssl3.2 - 官方demo学习 - test - certs 笔记 // \file my_openssl_linux_log_doc_004.txt // openssl…...

图像分类】【深度学习】【轻量级网络】【Pytorch版本】EfficientNet_V2模型算法详解

【图像分类】【深度学习】【轻量级网络】【Pytorch版本】EfficientNet_V2模型算法详解 文章目录 【图像分类】【深度学习】【轻量级网络】【Pytorch版本】EfficientNet_V2模型算法详解前言EfficientNet_V2讲解自适应正则化的渐进学习(Progressive Learning with adaptive Regul…...

05.Elasticsearch应用(五)

Elasticsearch应用&#xff08;五&#xff09; 1.目标 咱们这一章主要学习Mapping&#xff08;映射&#xff09; 2.介绍 Mapping是对索引库中文档的约束&#xff0c;类似于数据表结构&#xff0c;作用如下&#xff1a; 定义索引中的字段的名称定义字段的数据类型&#xff…...

npm更换镜像

大家好&#xff01;今天给大家分享的知识是如何更换npm镜像 前言 有时候在加载npm时有时会很慢&#xff0c;那是由于node安装插件是从国外服务器下载&#xff0c;受网络影响大&#xff0c;速度慢且可能出现异常&#xff0c;这时候就需要更换镜像&#xff0c;使插件的安装快捷&…...

野指针(C语言)

野指针 //概念:野指针就是指针指向的位置是不可知的(随机的,不正确的 //,没有明确限制的,空间还属于操作系统而不属于程序的) //野指针成因: //1.指针未初始化 #include <stdio.h> int main() { int* p;//局部变量指针未初始化,默认为随机值 //此时p指向的空间不…...

动物姿态识别(数据集+代码)

动物姿态识别是指利用计算机视觉和深度学习技术来识别动物的姿态&#xff0c;即确定动物身体的姿态、方向和位置等信息。这种技术可应用于动物行为研究、动物健康监测、智能养殖等领域。 动物姿态识别的关键技术包括图像处理、特征提取和分类器设计。首先&#xff0c;需要对动…...

JSON-handle工具安装及使用

目录 介绍下载安装简单操作 介绍 JSON-Handle 是一款非常好用的用于操作json的浏览器插件&#xff0c;对于开发人员和测试人员来说是一款很好用的工具&#xff0c;如果你还没有用过&#xff0c;请赶紧下载安装吧&#xff0c;下面是安装过程和具体使用。 下载安装 点击下载JSON…...

kali安装LAMP和DVWA

LANMP简介 LANMP是指一组通常用来搭建动态网站或者服务器的开源软件&#xff0c;本身都是各自独立的程序&#xff0c;但是因为常被放在一起使用&#xff0c;拥有了越来越高的兼容度&#xff0c;共同组成了一个强大的Web应用程序平台。 L:指Linux&#xff0c;一类Unix计算机操作…...

tinyECC:Arduino嵌入式平台的轻量级ECC密码库

1. tinyECC 库概述&#xff1a;面向 Arduino 微控制器的轻量级椭圆曲线密码学实现tinyECC 是一个专为资源受限的 Arduino 微控制器平台设计的嵌入式椭圆曲线密码学&#xff08;Elliptic Curve Cryptography, ECC&#xff09;库。其核心目标是在仅有几 KB RAM 和数十 KB Flash 的…...

告别纯手工标注!用微调后的SAM2+ISAT,实现裂缝标注效率翻倍(保姆级避坑指南)

基于SAM2与ISAT的裂缝智能标注实战&#xff1a;从零构建高效半自动化工作流 想象一下这样的场景&#xff1a;你面前堆叠着数千张道路裂缝检测图像&#xff0c;每张都需要精确标注裂缝区域。传统手工标注不仅耗时费力&#xff0c;还容易因疲劳导致标注质量下降。这正是计算机视觉…...

FlexASIO专业调优实战:解决音频延迟与音质问题的3步诊断法

FlexASIO专业调优实战&#xff1a;解决音频延迟与音质问题的3步诊断法 【免费下载链接】FlexASIO A flexible universal ASIO driver that uses the PortAudio sound I/O library. Supports WASAPI (shared and exclusive), KS, DirectSound and MME. 项目地址: https://gitc…...

背单词花园:把单词种进长期记忆,告别背了就忘

为什么背单词花园抗遗忘效果出众&#xff1f;因为它把艾宾浩斯遗忘曲线&#xff0c;变成了看得见、好坚持的种花流程。一、新学单词 收获种子&#xff0c;记忆从第一步就扎根每次领取种子&#xff0c;就是开启一次新单词学习。用趣味场景完成初次编码&#xff0c;让单词不再是…...

OpenClaw主控Agent配置:任务分发、流程调度,打造专属SEO自动化团队

构建智能中枢&#xff1a;OpenClaw主控Agent的深度配置与SEO自动化团队实践引言在数字化营销日益激烈的今天&#xff0c;搜索引擎优化&#xff08;SEO&#xff09;已成为企业获取流量、提升品牌曝光不可或缺的策略。然而&#xff0c;传统的SEO操作往往涉及大量重复性、耗时耗力…...

SillyTavern角色系统全解析:从入门到高级定制指南

SillyTavern角色系统全解析&#xff1a;从入门到高级定制指南 【免费下载链接】SillyTavern LLM Frontend for Power Users. 项目地址: https://gitcode.com/GitHub_Trending/si/SillyTavern 一、基础认知&#xff1a;角色系统的核心架构 在AI交互的世界里&#xff0c;…...

别再手动合并代码了!用Docker Compose 5分钟搞定Gitea私有Git服务器(附PostgreSQL配置)

5分钟极速搭建Gitea私有Git服务&#xff1a;Docker Compose与PostgreSQL黄金组合 还在用网盘同步代码&#xff1f;或是把项目文件夹压缩后通过聊天软件传来传去&#xff1f;作为经历过这些"原始管理方式"的开发者&#xff0c;我完全理解手动合并冲突时的崩溃感——上…...

2026 GitHub 高星项目全景指南

一、GitHub 全球 Star 最高项目(2026年3月 实时数据) GitHub 无官方总 Star 榜单,以下为综合第三方统计与实时检索的全球高星项目 Top10,数据动态更新,以仓库主页为准: 排名 项目名称 Star 数 核心定位 1 build-your-own-x ⭐47.4万+ 从零实现各类技术的教程合集 2 awes…...

墨语灵犀GPU算力适配指南:A10/A100/V100显卡部署性能与显存占用实测

墨语灵犀GPU算力适配指南&#xff1a;A10/A100/V100显卡部署性能与显存占用实测 1. 引言&#xff1a;当古典美学遇见现代算力 想象一下&#xff0c;你正在处理一份重要的海外文献&#xff0c;或者需要将一段优美的中文诗歌翻译成英文。你希望翻译结果不仅准确&#xff0c;更要…...

Pixel Dream Workshop 学术研究辅助:快速生成论文插图与概念图

Pixel Dream Workshop 学术研究辅助&#xff1a;快速生成论文插图与概念图 1. 科研绘图的痛点与解决方案 科研工作者经常面临一个共同难题&#xff1a;如何高效制作专业、美观的学术图表。传统绘图软件学习曲线陡峭&#xff0c;而外包设计又成本高昂、周期长。Pixel Dream Wo…...