当前位置: 首页 > news >正文

Flink 数据类型 TypeInformation信息

Flink流应用程序处理的是以数据对象表示的事件流。所以在Flink内部,我么需要能够处理这些对象。它们需要被序列化和反序列化,以便通过网络传送它们;或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点,Flink需要明确知道应用程序所处理的数据类型。并为每个数据类型生成特定的序列化器、反序列化器和比较器。Flink支持非常完善的数据类型,数据类型描述信息都是由TypeInformation定义,比较常用的TypeInformationBasicTypeInfoTupleTypeInfoCaseClassTypeInfoPojoTypeInfo类等。TypeInformation主要作用是为了在 Flink系统内有效地对数据结构类型进行管理,能够在分布式计算过程中对数据的类型进行管理和推断。同时基于对数据的类型信息管理,Flink内部对数据存储也进行了相应的性能优化。Flink能够支持任意的JavaScala的数据类型,不用像Hadoop中的org.apache.hadoop.io.Writable而实现特定的序列化和反序列化接口,从而让用户能够更加容易使用已有的数据结构类型。另外使用TypeInformation管理数据类型信息,能够在数据处理之前将数据类型推断出来,而不是真正在触发计算后才识别出,这样能够及时有效地避免用户在使用Flink编写应用的过程中的数据类型问题。

原生数据类型

Flink通过实现BasicTypeInfo数据类型,能够支持任意Java 原生基本类型(装箱)或String类型,例如IntegerStringDouble等,如以下代码所示,通过从给定的元素集中创建DataStream数据集。

//创建 Int 类型的数据集
DataStreamSource<Integer> integerDataStreamSource = env.fromElements(1, 2, 3, 4, 5);
//创建 String 的类型的数据集
DataStreamSource<String> stringDataStreamSource = env.fromElements("Java", "Scala");

Flink实现另外一种TypeInfomationBasicArrayTypeInfo,对应的是Java基本类型数组(装箱)或String对象的数组,如下代码通过使用 Array数组和List集合创建DataStream数据集。

List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5);
//通过 List 集合创建数据集
DataStreamSource<Integer> integerDataStreamSource1 = env.fromCollection(integers);

Java Tuples类型

通过定义TupleTypeInfo来描述Tuple类型数据,FlinkJava接口中定义了元祖类Tuple供用户使用。Flink Tuples是固定长度固定类型的Java Tuple实现,不支持空值存储。目前支持任意的Flink Java Tuple类型字段数量上限为25,如果字段数量超过上限,可以通过继承Tuple类的方式进行拓展。如下代码所示,创建Tuple数据类型数据集。

//通过实例化 Tuple2 创建具有两个元素的数据集
DataStreamSource<Tuple2<String, Integer>> tuple2DataStreamSource = env.fromElements(new Tuple2<>("a", 1), new Tuple2<>("b", 2));
//通过实例化 Tuple3 创建具有三个元素的数据集
DataStreamSource<Tuple3<String, Integer, Long>> tuple3DataStreamSource = env.fromElements(new Tuple3<>("a", 1, 3L), new Tuple3<>("b", 2, 3L));

Scala Case Class类型

Flink通过实现CaseClassTypeInfo支持任意的Scala Case Class,包括Scala tuples类型,支持的字段数量上限为22,支持通过字段名称和位置索引获取指标,不支持存储空值。如下代码实例所示,定义WordCount Case Class数据类型,然后通过fromElements方法创建input数据集,调用keyBy()方法对数据集根据 word字段重新分区。

//定义 WordCount Case Class 数据结构
case class WordCount(word: Sring, count: Int)
//通过 fromElements 方法创建数据集
val input = env.fromElements(WordCount("hello", 1),WordCount("word",2))
val keyStream1 = input.keyBy("word")//根据word字段为分区字段,
val keyStream2 = input.keyBy(0)//也可以通过制定position分区

通过使用Scala Tuple创建DataStream数据集,其他的使用方式和Case Class相似。需要注意的是,如果根据名称获取字段,可以使用 Tuple中的默认字段名称。

//通过实例化Scala Tuple2 创建具有两个元素的数据集
val tupleStream: DataStream[Tuple2[String,Int]] = env.fromElements(("a",1),("b",2));
//使用默认名字段获取字段,表示第一个 tuple字段,相当于下标0
tuple2DataStreamSource.keyBy("_1");

POJOs 类型

POJOs类可以完成复杂数据结构的定义,Flink通过实现PojoTypeInfo来描述任意的POJOs,包括JavaScala类。在Flink中使用POJOs类可以通过字段名称获取字段,例如dataStream.join(otherStream).where("name").equalTo("personName"),对于用户做数据处理则非常透明和简单,如代码所示。如果在Flink中使用POJOs数据类型,需要遵循以下要求:
【1】POJOs类必须是Public修饰且必须独立定义,不能是内部类;
【2】POJOs类中必须含有默认空构造器;
【3】POJOs类中所有的 Fields必须是Public或者具有Public修饰的gettersetter方法;
【4】POJOs类中的字段类型必须是Flink支持的。

//类和属性具有 public 修饰
public class Persion{public String name;public Integer age;//具有默认的空构造器public Persion(){}public Persion(String name,Integer age){this.name = name;this.age = age;};
}

定义好POJOs Class后,就可以在 Flink环境中使用了,如下代码所示,使用fromElements接口构建Person类的数据集。POJOs类仅支持字段名称指定字段,如代码中通过Person name来指定Keyby字段。

DataStreamSource<Persion> persionDataStreamSource = env.fromElements(new Persion("zzx", 18), new Persion("fj", 16));
persionData.keyBy("name").sum("age");

Flink Value类型

Value数据类型实现了org.apache.flink.types.Value,其中包括read()write()两个方法完成序列化和反序列化操作,相对于通用的序列化工具会有着比较高效的性能。目前Flink提供了內建的Value类型有IntValue、DoubleValue以及StringValue等,用户可以结合原生数据类型和Value类型使用。

特殊数据类型

Flink中也支持一些比较特殊的数据数据类型,例如Scala中的ListMapEitherOptionTry数据类型,以及Java中Either数据类型,还有HadoopWritable数据类型。如下代码所示,创建MapList类型数据集。这种数据类型使用场景不是特别广泛,主要原因是数据中的操作相对不像POJOs类那样方便和透明,用户无法根据字段位置或者名称获取字段信息,同时要借助Types Hint帮助Flink推断数据类型信息,关于Tyeps Hmt介绍可以参考下一小节。

//创建 map 类型数据集
Map map = new HashMap<>();
map.put("name","zzx");
map.put("age",12);
env.fromElements(map);
//创建 List 类型数据集
env.fromElements(Arrays.asList(1,2,3,4,5),Arrays.asList(3,4,5));

TypeInformation信息获取: 通常情况下Flink都能正常进行数据类型推断,并选择合适的serializers以及comparators。但在某些情况下却无法直接做到,例如定义函数时如果使用到了泛型,JVM就会出现类型擦除的问题,使得Flink并不能很容易地获取到数据集中的数据类型信息。同时在Scala APIJava API中,Flink分别使用了不同的方式重构了数据类型信息。

Scala API类型信息

Scala API通过使用Manifest和类标签,在编译器运行时获取类型信息,即使是在函数定义中使用了泛型,也不会像Java API出现类型擦除的问题,这使得Scala API具有非常精密的类型管理机制。同时在Flink中使用到Scala Macros框架,在编译代码的过程中推断函数输入参数和返回值的类型信息,同时在Flink中注册成TypeInformation以支持上层计算算子使用。
当使用Scala API开发 Flink应用,如果使用到Flink已经通过TypeInformation定义的数据类型,TypeInformation类不会自动创建,而是使用隐式参数的方式引入,代码不会直接抛出编码异常,但是当启动Flink应用程序时就会报could not find implicit value for evidence parameter of type TypeInformation的错误。这时需要将TypeInformation类隐式参数引入到当前程序环境中,代码实例如下:

import org.apache.flink.api.scala._

Java API类型信息

由于Java的泛型会出现类型擦除问题,Flink通过Java反射机制尽可能重构类型信息,例如使用函数签名以及子类的信息等。同时类型推断在当输出类型依赖于输入参数类型时相对比较容易做到,但是如果函数的输出类型不依赖于输入参数的类型信息,这个时候就需要借助于类型提示Ctype Himts来告诉系统函数中传入的参数类型信息和输出参数信息。如代码清单通过在returns方法中传入TypeHint实例指定输出参数类型,帮助Flink系统对输出类型进行数据类型参数的推断和收集。

//定义泛型函数,输入参数 T,O 输出参数为 O
class MyMapFucntion<T,O> implements MapFunction<T,O>{@Overridepublic O map(T t) throws Exception {//定义计算逻辑return null;}
}//通过 List 集合创建数据集
DataStreamSource<Integer> input = env.fromCollection(integers);
input.flatMap(new MyMapFucntion<String,Integer>()).returns(new TypeHint<Integer>() {//通过returns方法指定返回参数类型
})

在使用Java API定义POJOs类型数据时,PojoTypeInformationPOJOs类中的所有字段创建序列化器,对于标准的类型,例如IntegerStringLong等类型是通过Flink自带的序列化器进行数据序列化,对于其他类型数据都是直接调用Kryo序列化工具来进行序列化。通常情况下,如果Kryo序列化工具无法对POJOs类序列化时,可以使用AvroPOJOs类进行序列化,如下代码通过在ExecutionConfig中调用 enableForceAvro()来开启Avro序列化。

//获取运行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//开启 avro 序列化
env.getConfig().enableForceAvro();

如果用户想使用Kryo序列化工具来序列化POJOs所有字段,则在ExecutionConfig中调用enableForceKryo()来开启Kryo序列化。

//获取运行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//开启 Kryo 序列化
env.getConfig().enableForceKryo();

如果默认的Kryo序列化类不能序列化POJOs对象,通过调用ExecutionConfigaddDefaultKryoSerializer()方法向Kryo中添加自定义的序列化器。

public void addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)

自定义TypeInformation

除了使用已有的TypeInformation所定义的数据格式类型之外,用户也可以自定义实现TypeInformation,来满足的不同的数据类型定义需求。Flink提供了可插拔的 Type Information Factory让用户将自定义的TypeInformation注册到Flink类型系统中。如下代码所示只需要通过实现org.apache.flink.api.common.typeinfo.TypeInfoFactory接口,返回相应的类型信息。通过@TypeInfo注解创建数据类型,定义CustomTuple数据类型。

@TypeInfo(CustomTypeInfoFactory.class)
public class CustomTuple<T0,T1>{public T0 field0;public T1 field1;
}

然后定义CustomTypeInfoFactory类继承于TypeInfoFactory,参数类型指定CustomTuple。最后重写createTypeInfo方法,创建的CustomTupleTypeInfo就是CustomTuple数据类型TypeInformation

public class CustomTypeInfoFactory extends TypeInfoFactory<CustomTuple>{@Overridepublic TypeInfomation<CustomTuple> createTypeInfo(Type t, Map<String,TypeInfoFactory<?>> genericParameters){return new CustomTupleTypeInfo(genericParameters.get("T0"),genericParameters.get("T1");}
}

相关文章:

Flink 数据类型 TypeInformation信息

Flink流应用程序处理的是以数据对象表示的事件流。所以在Flink内部&#xff0c;我么需要能够处理这些对象。它们需要被序列化和反序列化&#xff0c;以便通过网络传送它们&#xff1b;或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点&#xff0c;Flink需要明确知…...

基于python的leetcode算法介绍之递归

文章目录 零 算法介绍一 简单示例 辗转相除法Leetcode例题与思路[509. 斐波那契数](https://leetcode.cn/problems/fibonacci-number/)解题思路&#xff1a;题解&#xff1a; [206. 反转链表](https://leetcode.cn/problems/reverse-linked-list/)解题思路&#xff1a;题解&…...

2023年度佳作:AIGC、AGI、GhatGPT、人工智能大语言模型的崛起与挑战

目录 前言 01 《ChatGPT 驱动软件开发》 内容简介 02 《ChatGPT原理与实战》 内容简介 03 《神经网络与深度学习》 04 《AIGC重塑教育》 内容简介 05 《通用人工智能》 目  录 前言 2023年是人工智能大语言模型大爆发的一年&#xff0c;一些概念和英文缩写也在这一…...

Axure的交互以及情形的介绍

一. 交互 1.1 交互概述 通俗来讲就是&#xff0c;谁用了什么方法做了什么事情&#xff0c;主体"谁"对应的就是axure中的元件&#xff0c;"什么方法"对应的就是交互事件&#xff0c;比如单击事件、双击事件&#xff0c;"什么事情"对应的就是交互…...

【MATLAB第84期】基于MATLAB的波形叠加极限学习机SW-ELM代理模型的sobol全局敏感性分析法应用

【MATLAB第84期】基于MATLAB的波形叠加极限学习机SW-ELM代理模型的sobol全局敏感性分析法应用 前言 跟往期sobol区别&#xff1a; 1.sobol计算依赖于验证集样本&#xff0c;无需定义变量上下限。 2.SW-ELM自带激活函数&#xff0c;计算具有phi&#xff08;x&#xff09;e^x激…...

米游社区表情包整合网站源码

源码介绍 米游社表情包整合网站源码&#xff0c;来自Github大佬的项目&#xff0c;包含米游兔123枚&#xff0c;米游社 玩家12枚&#xff0c;崩坏 星穹铁道112枚&#xff0c;绝区零218枚&#xff0c;NAP32枚&#xff0c;崩坏RPG62枚&#xff0c;崩坏3-1282枚&#xff0c;原神 …...

easyexcel调用公共导出方法导出数据

easyexcel备忘 Slf4j public class ConditionDownloadUtil {//扫描在xboot 包下所有IService 接口的子类, 每次启动服务后, 重新扫描public final static Class[] classesExtendsIService ClassUtil.scanPackageBySuper("cn.exrick.xboot", IService.class).toArra…...

C语言插入排序算法及代码

一、原理 在待排序的数组里&#xff0c;从数组的第二个数字开始&#xff0c;通过构建有序序列&#xff0c;对于未排序数据&#xff0c;在已排序序列中从后向前扫描&#xff0c;找到相应位置并插入。 二、代码部分 #include<stdio.h> #include<stdlib.h> int ma…...

2023年中国法拍房用户画像和数据分析

法拍房主要平台 法拍房主要平台有3家&#xff0c;分别是阿里、京东和北交互联平台。目前官方认定纳入网络司法拍卖的平台共有7家&#xff0c;其中阿里资产司法拍卖平台的挂拍量最大。 阿里法拍房 阿里法拍房数据显示2017年&#xff0c;全国法拍房9000套&#xff1b;2018年&a…...

Android 清除临时文件,清空缓存

python 代码&#xff1a; import os import shutil import tracebackdef delete_folder(path):if os.path.exists(path):print(f"删除文件夹: {path}")shutil.rmtree(path)print("删除完成")def delete_file(path):if os.path.exists(path):print(f"删…...

Guava限流神器:RateLimiter使用指南

1. 引言 可能有些小伙伴听到“限流”这个词就觉得头大&#xff0c;感觉像是一个既复杂又枯燥的话题。别急&#xff0c;小黑今天就要用轻松易懂的方式&#xff0c;带咱们一探RateLimiter的究竟。 想象一下&#xff0c;当你去超市排队结账时&#xff0c;如果收银台开得越多&…...

【六大排序详解】开篇 :插入排序 与 希尔排序

插入排序 与 希尔排序 六大排序之二 插入排序 与 希尔排序1 排序1.1排序的概念 2 插入排序2.1 插入排序原理2.2 排序步骤2.3 代码实现 3 希尔排序3.1 希尔排序原理3.2 排序步骤3.3 代码实现 4 时间复杂度分析 Thanks♪(&#xff65;ω&#xff65;)&#xff89;下一篇文章见&am…...

凸优化问题求解

这里写目录标题 1. 线性规划基本定理2.单纯形法2.1 转轴运算 3. 内点法3.1 线性规划的内点法 1. 线性规划基本定理 首先我们指出&#xff0c;线性规划均可等价地化成如下标准形式 { min ⁡ c T x , s . t A x b , x ⪰ 0 , \begin{align}\begin{cases}\min~c^Tx,\\\mathrm{s.…...

文件操作入门指南

目录 一、为什么使用文件 二、什么是文件 2.1 程序文件 2.2 数据文件 2.3 文件名 三、文件的打开和关闭 3.1 文件指针 3.2 文件的打开和关闭 四、文件的顺序读写 ​编辑 &#x1f33b;深入理解 “流”&#xff1a; &#x1f342;文件的顺序读写函数介绍&#xff1a; …...

Axure之交互与情节与一些实例

目录 一.交互与情节简介 二.ERP登录页到主页的跳转 三.ERP的菜单跳转到各个页面的跳转 四.省市联动 五.手机下拉加载 今天就到这里了&#xff0c;希望帮到你哦&#xff01;&#xff01;&#xff01; 一.交互与情节简介 "交互"通常指的是人与人、人与计算机或物体…...

【数据库设计和SQL基础语法】--连接与联接--多表查询与子查询基础(二)

一、子查询基础 1.1 子查询概述 子查询是指在一个查询语句内部嵌套另一个查询语句的过程。子查询可以嵌套在 SELECT、FROM、WHERE 或 HAVING 子句中&#xff0c;用于从数据库中检索数据或执行其他操作。子查询通常返回一个结果集&#xff0c;该结果集可以被包含它的主查询使用…...

Android studio中导入opencv库

具体opencv库的导入流程参考链接&#xff1a;Android Studio开发之路 &#xff08;五&#xff09;导入OpenCV以及报错解决 一、出现的错误&#xff1a;NullPointerException: Cannot invoke “java.io.File.toPath()” because “this.mySdkLocation” is null 解决办法&#…...

Linux(1)_基础知识

第一部分 一、Linux系统概述 创始人&#xff1a;芬兰大学大一的学生写的Linux内核&#xff0c;李纳斯托瓦兹。 Linux时unix的类系统&#xff1b; 特点&#xff1a;多用户 多线程的操作系统&#xff1b; 开源操作系统&#xff1b; 开源项目&#xff1a;操作系统&#xff0c;应用…...

网络相关面试题

简述 TCP 连接的过程&#xff08;淘系&#xff09; 参考答案&#xff1a; TCP 协议通过三次握手建立可靠的点对点连接&#xff0c;具体过程是&#xff1a; 首先服务器进入监听状态&#xff0c;然后即可处理连接 第一次握手&#xff1a;建立连接时&#xff0c;客户端发送 syn 包…...

Vue2面试题:说一下对跨域的理解?

http请求分为两大类&#xff1a;普通http请求&#xff08;如百度请求&#xff09;和ajax请求&#xff08;跨域是出现在ajax请求&#xff09; 同源策略&#xff1a;在浏览器发起ajax请求时&#xff0c;当前的网址和被请求的网址协议、域名、端口号必须完全一致&#xff0c;目的是…...

后进先出(LIFO)详解

LIFO 是 Last In, First Out 的缩写&#xff0c;中文译为后进先出。这是一种数据结构的工作原则&#xff0c;类似于一摞盘子或一叠书本&#xff1a; 最后放进去的元素最先出来 -想象往筒状容器里放盘子&#xff1a; &#xff08;1&#xff09;你放进的最后一个盘子&#xff08…...

利用ngx_stream_return_module构建简易 TCP/UDP 响应网关

一、模块概述 ngx_stream_return_module 提供了一个极简的指令&#xff1a; return <value>;在收到客户端连接后&#xff0c;立即将 <value> 写回并关闭连接。<value> 支持内嵌文本和内置变量&#xff08;如 $time_iso8601、$remote_addr 等&#xff09;&a…...

visual studio 2022更改主题为深色

visual studio 2022更改主题为深色 点击visual studio 上方的 工具-> 选项 在选项窗口中&#xff0c;选择 环境 -> 常规 &#xff0c;将其中的颜色主题改成深色 点击确定&#xff0c;更改完成...

今日科技热点速览

&#x1f525; 今日科技热点速览 &#x1f3ae; 任天堂Switch 2 正式发售 任天堂新一代游戏主机 Switch 2 今日正式上线发售&#xff0c;主打更强图形性能与沉浸式体验&#xff0c;支持多模态交互&#xff0c;受到全球玩家热捧 。 &#x1f916; 人工智能持续突破 DeepSeek-R1&…...

人机融合智能 | “人智交互”跨学科新领域

本文系统地提出基于“以人为中心AI(HCAI)”理念的人-人工智能交互(人智交互)这一跨学科新领域及框架,定义人智交互领域的理念、基本理论和关键问题、方法、开发流程和参与团队等,阐述提出人智交互新领域的意义。然后,提出人智交互研究的三种新范式取向以及它们的意义。最后,总结…...

scikit-learn机器学习

# 同时添加如下代码, 这样每次环境(kernel)启动的时候只要运行下方代码即可: # Also add the following code, # so that every time the environment (kernel) starts, # just run the following code: import sys sys.path.append(/home/aistudio/external-libraries)机…...

Linux基础开发工具——vim工具

文章目录 vim工具什么是vimvim的多模式和使用vim的基础模式vim的三种基础模式三种模式的初步了解 常用模式的详细讲解插入模式命令模式模式转化光标的移动文本的编辑 底行模式替换模式视图模式总结 使用vim的小技巧vim的配置(了解) vim工具 本文章仍然是继续讲解Linux系统下的…...

比特币:固若金汤的数字堡垒与它的四道防线

第一道防线&#xff1a;机密信函——无法破解的哈希加密 将每一笔比特币交易比作一封在堡垒内部传递的机密信函。 解释“哈希”&#xff08;Hashing&#xff09;就是一种军事级的加密术&#xff08;SHA-256&#xff09;&#xff0c;能将信函内容&#xff08;交易细节&#xf…...

CppCon 2015 学习:Simple, Extensible Pattern Matching in C++14

什么是 Pattern Matching&#xff08;模式匹配&#xff09; ❝ 模式匹配就是一种“描述式”的写法&#xff0c;不需要你手动判断、提取数据&#xff0c;而是直接描述你希望的数据结构是什么样子&#xff0c;系统自动判断并提取。❞ 你给的定义拆解&#xff1a; ✴ Instead of …...

Monorepo架构: 项目管理模式对比与考量

关于 monorepo 相关概念及项目管理模式 在软件开发中&#xff0c;尤其是前端项目&#xff0c;我们会涉及到不同的项目管理模式&#xff0c;这里先介绍几个重要的概念“monorepo”是当前较为热门的一种项目管理方式&#xff0c;虽然很多人可能听说过&#xff0c;但可能在实际项…...