当前位置: 首页 > news >正文

48、Flink 的 Data Source API 详解

a)概述

本节将描述 FLIP-27 中引入的新 Source API 的主要接口。

b)Source

Source API 是一个工厂模式的接口,用于创建以下组件。

  • Split Enumerator
  • Source Reader
  • Split Serializer
  • Enumerator Checkpoint Serializer

此外,Source 还提供了 Boundedness【有界】的特性,使 Flink 可以选择合适的模式来运行 Flink 任务。

Source 实现应该是可序列化的,因为 Source 实例会在运行时被序列化并上传到 Flink 集群。

c)SplitEnumerator

SplitEnumerator 典型实现如下

  • SourceReader 的注册处理;
  • SourceReader 的失败处理;
    • SourceReader 失败时会调用 addSplitsBack() 方法;SplitEnumerator 会收回已经被分配,但尚未被该 SourceReader 确认(acknowledged)的分片。
  • SourceEvent 的处理
    • SourceEvents 是 SplitEnumerator 和 SourceReader 之间来回传递的自定义事件,可以利用此机制来执行复杂的协调任务。
  • 分片的发现以及分配
    • SplitEnumerator 可以将分片分配到 SourceReader 从而响应各种事件,包括发现新的分片、新 SourceReader 的注册、SourceReader 的失败处理等。

SplitEnumerator 可以在 SplitEnumeratorContext 的帮助下完成上述工作,SplitEnumeratorContext 会在 SplitEnumerator 创建或者恢复的时候提供给 Source。

SplitEnumeratorContext 允许 SplitEnumerator 检索到 reader 的必要信息并执行协调操作,而在 Source 的实现中会将 SplitEnumeratorContext 传递给 SplitEnumerator 实例。

SplitEnumerator 的实现可以仅采用被动工作方式,仅在其方法被调用时采取协调操作;但是一些 SplitEnumerator 的实现会采取主动的工作方式;例如 SplitEnumerator 定期寻找分片并分配给 SourceReader,这类问题使用 SplitEnumeratorContext 类中的 callAsync() 方法比较方便。

示例:如何在 SplitEnumerator 不需要自己维护线程的条件下实现这一点。

class MySplitEnumerator implements SplitEnumerator<MySplit, MyCheckpoint> {private final long DISCOVER_INTERVAL = 60_000L;/*** 一种发现分片的方法*/private List<MySplit> discoverSplits() {...}@Overridepublic void start() {...enumContext.callAsync(this::discoverSplits, splits -> {Map<Integer, List<MySplit>> assignments = new HashMap<>();int parallelism = enumContext.currentParallelism();for (MySplit split : splits) {int owner = split.splitId().hashCode() % parallelism;assignments.computeIfAbsent(owner, new ArrayList<>()).add(split);}enumContext.assignSplits(new SplitsAssignment<>(assignments));}, 0L, DISCOVER_INTERVAL);...}...
}
d)SourceReader

SourceReader 是一个运行在 Task Manager 上的组件,用于处理来自分片的记录。

SourceReader 提供了一个拉取式的(pull-based)处理接口,Flink 任务会在循环中不断调用 pollNext(ReaderOutput) 轮询来自 SourceReader 的记录,pollNext(ReaderOutput) 方法的返回值指示 SourceReader 的状态。

  • MORE_AVAILABLE - SourceReader 有可用的记录。
  • NOTHING_AVAILABLE - SourceReader 现在没有可用的记录,但是将来可能会有记录可用。
  • END_OF_INPUT - SourceReader 已经处理完所有记录,到达数据的尾部。即 SourceReader 可以终止任务了。

pollNext(ReaderOutput) 会使用 ReaderOutput 作为参数,为了提高性能且在必要情况下,SourceReader 可以在一次 pollNext() 调用中返回多条记录;例如外部系统的工作粒度为块,而一个块可以包含多个记录,但是 source 只能在块的边界处设置 Checkpoint,此时SourceReader 可以一次将一个块中的所有记录通过 ReaderOutput 发送至下游。

**注意:SourceReader 的实现应该避免在一次 pollNext(ReaderOutput) 的调用中发送多个记录;**因为对 SourceReader 轮询的任务线程工作在一个事件循环(event-loop)中,且不能阻塞。

在创建 SourceReader 时,相应的 SourceReaderContext 会提供给 Source,而 Source 会将相应的上下文传递给 SourceReader 实例;SourceReader 可以通过 SourceReaderContextSourceEvent 传递给相应的 SplitEnumeratorSource 的一个典型设计模式是让 SourceReader 发送它们的本地信息给 SplitEnumerator,后者则会全局性地做出决定。

SourceReader API 是一个底层(low-level) API,允许用户自行处理分片,并使用自己的线程模型来获取和移交记录;为了帮助实现 SourceReader,Flink 提供了 SourceReaderBase 类,可以显著减少编写 SourceReader 所需要的工作量。

强烈建议连接器开发人员充分利用 SourceReaderBase 而不是从头开始编写 SourceReader

e)Source 使用方法

为了通过 Source 创建 DataStream,需要将 Source 传递给 StreamExecutionEnvironment

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Source mySource = new MySource(...);DataStream<Integer> stream = env.fromSource(mySource,WatermarkStrategy.noWatermarks(),"MySourceName");

相关文章:

48、Flink 的 Data Source API 详解

a&#xff09;概述 本节将描述 FLIP-27 中引入的新 Source API 的主要接口。 b&#xff09;Source Source API 是一个工厂模式的接口&#xff0c;用于创建以下组件。 Split EnumeratorSource ReaderSplit SerializerEnumerator Checkpoint Serializer 此外&#xff0c;Sou…...

深入解析Java扩展机制:SPI与Spring.factories

目录 Java SPI概述 1.1 什么是SPI&#xff1f;1.2 SPI的工作原理1.3 SPI的优缺点 SPI的应用 2.1 Java标准库中的SPI应用2.2 自定义SPI示例 Spring.factories概述 3.1 什么是spring.factories&#xff1f;3.2 spring.factories的工作原理3.3 spring.factories的优缺点 spring.f…...

Vue2之模板语法

文章目录 1.模板语法1.1 插值语法{{}}可以写什么1.2 指令语法1.2.1 指令概述1.2.2 v-bind指令1.2.3 v-model指令 1.模板语法 1.1 插值语法{{}}可以写什么 &#xff08;1&#xff09;在data中声明的 &#xff08;2&#xff09;常量 &#xff08;3&#xff09;合法的JavaScript…...

java基础练习题

1、一个".java"源文件中是否可以包括多个类&#xff1f;有什么限制&#xff1f; 可以包含多个类。但是只有一个类可以声明为public&#xff0c;且要求声明为public的类的类名与源文件名相同。 2、java的优势&#xff1f; a、跨平台性 b、安全性高 c、简单性 d、…...

unity中通过实现底层接口实现非按钮(图片)的事件监听

编写监听脚本 PEListenter 继承自MonoBehaviour类&#xff0c;并实现了IPointerDownHandler、IPointerUpHandler和IDragHandler接口&#xff0c;按照需求定义需要接收事件&#xff08;鼠标按下、抬起、拖拽&#xff09;的回调函数 //监听类&#xff08;需要挂载在物体上面&am…...

重庆耶非凡科技有限公司的选品师项目加盟靠谱吗?

在当今电子商务的浪潮中&#xff0c;选品师的角色愈发重要。而重庆耶非凡科技有限公司以其独特的选品师项目&#xff0c;在行业内引起了广泛关注。对于想要加盟该项目的人来说&#xff0c;项目的靠谱性无疑是首要考虑的问题。 首先&#xff0c;我们来看看耶非凡科技有限公司的背…...

《青少年编程与数学》课程方案:4、课程策略

《青少年编程与数学》课程方案&#xff1a;4、课程策略 一、工程师思维二、使命感驱动三、价值观引领四、学习现代化五、工作生活化六、与时代共进 《青少年编程与数学》课程策略强调采用工程师思维&#xff0c;避免重复造轮子&#xff0c;培养使命感&#xff0c;通过探索兴趣、…...

用爬虫实现---模拟填志愿

先来说实现逻辑&#xff0c;首先我要获取到这个网站上所有的信息&#xff0c;那么我们就可以开始对元素进行检查 我们发现他的每一个学校信息都有一个对应的属性&#xff0c;并且是相同的&#xff0c;那么我们就可以遍历这个网页中的所有属性一样的开始爬取 在来分析&#xff0…...

vscode Run Code输出出现中文乱码情况问题解决方案

主要解决方案是通过修改计算机默认的编码格式,来完成的。 chcp 是 Windows 操作系统中的一个命令,用于显示或设置控制台的代码页(code page)。代码页决定了控制台如何解释和显示字符,特别是非 ASCII 字符(例如 Unicode 字符)。 使用方法 显示当前代码页: 输入 chcp 而…...

代码随想录训练营Day30

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、重新安排行程 前言 提示&#xff1a;这里可以添加本文要记录的大概内容&#xff1a; 今天是跟着代码随想录刷题的第30天&#xff0c;主要是复习了回溯算法…...

Swift 序列(Sequence)排序面面俱到 - 从过去到现在(二)

概览 在上篇 Swift 序列(Sequence)排序面面俱到 - 从过去到现在(一)博文中,我们讨论了 Swift 语言中序列和集合元素排序的一些基本知识,我们还给出了以自定义类型中任意属性排序的“康庄大道”。 不过在实际的撸码场景中,我们往往需要的是“多属性”同时参与到排序的考…...

STM32F103C8T6基于HAL库移植uC/OS-III

文章目录 一、建立STM32CubeMX工程二、移植1、 uC/OS-III源码2、移植过程 三、配置相关代码1、bsp.c和bsp.h2、main.c3、修改启动代码4、修改app_cfg.h文件5、修改includes.h文件6、修改lib_cfg.h文件 四、编译与烧录总结参考资料 学习嵌入式实时操作系统&#xff08;RTOS&…...

微服务学习Day9-分布式事务Seata

文章目录 分布式事务seata引入理论基础CAP定理BASE理论 初识Seata动手实践XA模式AT模式TCC模式SAGA模式 高可用 分布式事务seata 引入 理论基础 CAP定理 BASE理论 初识Seata 动手实践 XA模式 AT模式 TCC模式 Service Slf4j public class AccountTCCServiceImpl implements A…...

vue用vite配置代理解决跨域问题(target、rewrite和changeOrigin的使用场景)

Vite的target、rewrite和changeOrigin的使用场景 1. target 使用场景&#xff1a;target 属性在 Vite 的 vite.config.ts 或 vite.config.js 文件的 server.proxy 配置中指定&#xff0c;用于设置代理服务器应该将请求转发到的目标地址。这通常是一个后端服务的API接口地址。…...

为什么PPT录制没有声音 电脑ppt录屏没有声音怎么办

一、为什么PPT录制没有声音 1.软件问题 我们下载软件的时候可能遇到软件损坏的问题&#xff0c;导致录制没有声音&#xff0c;但其他功能还是可以使用的。我建议使用PPT的隐藏功能&#xff0c;下载插件&#xff0c;在PPT界面的加载项选项卡中就能使用。我推荐一款可以解决录屏…...

JDBC学习笔记(三)高级篇

一、JDBC 优化及工具类封装 1.1 现有问题 1.2 JDBC 工具类封装 V1.0 resources/db.properties配置文件&#xff1a; driverClassNamecom.mysql.cj.jdbc.Driver urljdbc:mysql:///atguigu usernameroot password123456 initialSize10 maxActive20 工具类代码&#xff1a; p…...

c++编译器在什么情况下会提供类的默认构造函数等,与析构函数

我们都知道&#xff0c;在 c 里&#xff0c;编写的简单类&#xff0c;若没有自己编写构造析构函数与 copy 构造函数 与 赋值运算符函数&#xff0c;那么编译器会提供这些函数&#xff0c;并实现简单的语义&#xff0c;比如成员赋值。看 源码时&#xff0c;出现了下图类似的情形…...

SpringBoot3整合Mybatis-Plus3.5.5出现的问题

主要是由于 mybatis-plus 中 mybatis 的整合包版本不够导致的 排除 mybatis-plus 中自带的 mybatis 整合包&#xff0c;单独引入即可 java.lang.IllegalArgumentException: Invalid value type for attribute factoryBeanObjectType: java.lang.Stringat org.springframework.…...

服务器数据恢复—强制上线raid5阵列离线硬盘导致raid不可用的数据恢复案例

服务器数据恢复环境&#xff1a; 某品牌2850服务器中有一组由6块SCSI硬盘组建的raid5磁盘阵列&#xff0c;linux操作系统ext3文件系统。 服务器故障&#xff1a; 服务器运行过程中突然瘫痪。服务器管理员检查阵列后发现raid5阵列中有两块硬盘离线&#xff0c;将其中一块硬盘进行…...

初入阿里云,上手走一波

初入阿里云&#xff0c;上手走一波 一阶&#xff1a;ECSMysqlDMS安装Mysql初始化MysqlMysql操作DMS管理Mysql 二阶&#xff1a;ECSOSS远程连接ECSOSS控制台其他图片服务 三阶&#xff1a;更多搭配操作 可以说个人在日常使用过程中&#xff0c;操作最多的阿里云产品就是阿里云服…...

React Native 开发环境搭建(全平台详解)

React Native 开发环境搭建&#xff08;全平台详解&#xff09; 在开始使用 React Native 开发移动应用之前&#xff0c;正确设置开发环境是至关重要的一步。本文将为你提供一份全面的指南&#xff0c;涵盖 macOS 和 Windows 平台的配置步骤&#xff0c;如何在 Android 和 iOS…...

java调用dll出现unsatisfiedLinkError以及JNA和JNI的区别

UnsatisfiedLinkError 在对接硬件设备中&#xff0c;我们会遇到使用 java 调用 dll文件 的情况&#xff0c;此时大概率出现UnsatisfiedLinkError链接错误&#xff0c;原因可能有如下几种 类名错误包名错误方法名参数错误使用 JNI 协议调用&#xff0c;结果 dll 未实现 JNI 协…...

YSYX学习记录(八)

C语言&#xff0c;练习0&#xff1a; 先创建一个文件夹&#xff0c;我用的是物理机&#xff1a; 安装build-essential 练习1&#xff1a; 我注释掉了 #include <stdio.h> 出现下面错误 在你的文本编辑器中打开ex1文件&#xff0c;随机修改或删除一部分&#xff0c;之后…...

【CSS position 属性】static、relative、fixed、absolute 、sticky详细介绍,多层嵌套定位示例

文章目录 ★ position 的五种类型及基本用法 ★ 一、position 属性概述 二、position 的五种类型详解(初学者版) 1. static(默认值) 2. relative(相对定位) 3. absolute(绝对定位) 4. fixed(固定定位) 5. sticky(粘性定位) 三、定位元素的层级关系(z-i…...

江苏艾立泰跨国资源接力:废料变黄金的绿色供应链革命

在华东塑料包装行业面临限塑令深度调整的背景下&#xff0c;江苏艾立泰以一场跨国资源接力的创新实践&#xff0c;重新定义了绿色供应链的边界。 跨国回收网络&#xff1a;废料变黄金的全球棋局 艾立泰在欧洲、东南亚建立再生塑料回收点&#xff0c;将海外废弃包装箱通过标准…...

sqlserver 根据指定字符 解析拼接字符串

DECLARE LotNo NVARCHAR(50)A,B,C DECLARE xml XML ( SELECT <x> REPLACE(LotNo, ,, </x><x>) </x> ) DECLARE ErrorCode NVARCHAR(50) -- 提取 XML 中的值 SELECT value x.value(., VARCHAR(MAX))…...

JVM虚拟机:内存结构、垃圾回收、性能优化

1、JVM虚拟机的简介 Java 虚拟机(Java Virtual Machine 简称:JVM)是运行所有 Java 程序的抽象计算机,是 Java 语言的运行环境,实现了 Java 程序的跨平台特性。JVM 屏蔽了与具体操作系统平台相关的信息,使得 Java 程序只需生成在 JVM 上运行的目标代码(字节码),就可以…...

推荐 github 项目:GeminiImageApp(图片生成方向,可以做一定的素材)

推荐 github 项目:GeminiImageApp(图片生成方向&#xff0c;可以做一定的素材) 这个项目能干嘛? 使用 gemini 2.0 的 api 和 google 其他的 api 来做衍生处理 简化和优化了文生图和图生图的行为(我的最主要) 并且有一些目标检测和切割(我用不到) 视频和 imagefx 因为没 a…...

面向无人机海岸带生态系统监测的语义分割基准数据集

描述&#xff1a;海岸带生态系统的监测是维护生态平衡和可持续发展的重要任务。语义分割技术在遥感影像中的应用为海岸带生态系统的精准监测提供了有效手段。然而&#xff0c;目前该领域仍面临一个挑战&#xff0c;即缺乏公开的专门面向海岸带生态系统的语义分割基准数据集。受…...

SQL慢可能是触发了ring buffer

简介 最近在进行 postgresql 性能排查的时候,发现 PG 在某一个时间并行执行的 SQL 变得特别慢。最后通过监控监观察到并行发起得时间 buffers_alloc 就急速上升,且低水位伴随在整个慢 SQL,一直是 buferIO 的等待事件,此时也没有其他会话的争抢。SQL 虽然不是高效 SQL ,但…...