当前位置: 首页 > news >正文

Flink 数据类型 TypeInformation信息

Flink流应用程序处理的是以数据对象表示的事件流。所以在Flink内部,我么需要能够处理这些对象。它们需要被序列化和反序列化,以便通过网络传送它们;或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点,Flink需要明确知道应用程序所处理的数据类型。并为每个数据类型生成特定的序列化器、反序列化器和比较器。Flink支持非常完善的数据类型,数据类型描述信息都是由TypeInformation定义,比较常用的TypeInformationBasicTypeInfoTupleTypeInfoCaseClassTypeInfoPojoTypeInfo类等。TypeInformation主要作用是为了在 Flink系统内有效地对数据结构类型进行管理,能够在分布式计算过程中对数据的类型进行管理和推断。同时基于对数据的类型信息管理,Flink内部对数据存储也进行了相应的性能优化。Flink能够支持任意的JavaScala的数据类型,不用像Hadoop中的org.apache.hadoop.io.Writable而实现特定的序列化和反序列化接口,从而让用户能够更加容易使用已有的数据结构类型。另外使用TypeInformation管理数据类型信息,能够在数据处理之前将数据类型推断出来,而不是真正在触发计算后才识别出,这样能够及时有效地避免用户在使用Flink编写应用的过程中的数据类型问题。

原生数据类型

Flink通过实现BasicTypeInfo数据类型,能够支持任意Java 原生基本类型(装箱)或String类型,例如IntegerStringDouble等,如以下代码所示,通过从给定的元素集中创建DataStream数据集。

//创建 Int 类型的数据集
DataStreamSource<Integer> integerDataStreamSource = env.fromElements(1, 2, 3, 4, 5);
//创建 String 的类型的数据集
DataStreamSource<String> stringDataStreamSource = env.fromElements("Java", "Scala");

Flink实现另外一种TypeInfomationBasicArrayTypeInfo,对应的是Java基本类型数组(装箱)或String对象的数组,如下代码通过使用 Array数组和List集合创建DataStream数据集。

List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5);
//通过 List 集合创建数据集
DataStreamSource<Integer> integerDataStreamSource1 = env.fromCollection(integers);

Java Tuples类型

通过定义TupleTypeInfo来描述Tuple类型数据,FlinkJava接口中定义了元祖类Tuple供用户使用。Flink Tuples是固定长度固定类型的Java Tuple实现,不支持空值存储。目前支持任意的Flink Java Tuple类型字段数量上限为25,如果字段数量超过上限,可以通过继承Tuple类的方式进行拓展。如下代码所示,创建Tuple数据类型数据集。

//通过实例化 Tuple2 创建具有两个元素的数据集
DataStreamSource<Tuple2<String, Integer>> tuple2DataStreamSource = env.fromElements(new Tuple2<>("a", 1), new Tuple2<>("b", 2));
//通过实例化 Tuple3 创建具有三个元素的数据集
DataStreamSource<Tuple3<String, Integer, Long>> tuple3DataStreamSource = env.fromElements(new Tuple3<>("a", 1, 3L), new Tuple3<>("b", 2, 3L));

Scala Case Class类型

Flink通过实现CaseClassTypeInfo支持任意的Scala Case Class,包括Scala tuples类型,支持的字段数量上限为22,支持通过字段名称和位置索引获取指标,不支持存储空值。如下代码实例所示,定义WordCount Case Class数据类型,然后通过fromElements方法创建input数据集,调用keyBy()方法对数据集根据 word字段重新分区。

//定义 WordCount Case Class 数据结构
case class WordCount(word: Sring, count: Int)
//通过 fromElements 方法创建数据集
val input = env.fromElements(WordCount("hello", 1),WordCount("word",2))
val keyStream1 = input.keyBy("word")//根据word字段为分区字段,
val keyStream2 = input.keyBy(0)//也可以通过制定position分区

通过使用Scala Tuple创建DataStream数据集,其他的使用方式和Case Class相似。需要注意的是,如果根据名称获取字段,可以使用 Tuple中的默认字段名称。

//通过实例化Scala Tuple2 创建具有两个元素的数据集
val tupleStream: DataStream[Tuple2[String,Int]] = env.fromElements(("a",1),("b",2));
//使用默认名字段获取字段,表示第一个 tuple字段,相当于下标0
tuple2DataStreamSource.keyBy("_1");

POJOs 类型

POJOs类可以完成复杂数据结构的定义,Flink通过实现PojoTypeInfo来描述任意的POJOs,包括JavaScala类。在Flink中使用POJOs类可以通过字段名称获取字段,例如dataStream.join(otherStream).where("name").equalTo("personName"),对于用户做数据处理则非常透明和简单,如代码所示。如果在Flink中使用POJOs数据类型,需要遵循以下要求:
【1】POJOs类必须是Public修饰且必须独立定义,不能是内部类;
【2】POJOs类中必须含有默认空构造器;
【3】POJOs类中所有的 Fields必须是Public或者具有Public修饰的gettersetter方法;
【4】POJOs类中的字段类型必须是Flink支持的。

//类和属性具有 public 修饰
public class Persion{public String name;public Integer age;//具有默认的空构造器public Persion(){}public Persion(String name,Integer age){this.name = name;this.age = age;};
}

定义好POJOs Class后,就可以在 Flink环境中使用了,如下代码所示,使用fromElements接口构建Person类的数据集。POJOs类仅支持字段名称指定字段,如代码中通过Person name来指定Keyby字段。

DataStreamSource<Persion> persionDataStreamSource = env.fromElements(new Persion("zzx", 18), new Persion("fj", 16));
persionData.keyBy("name").sum("age");

Flink Value类型

Value数据类型实现了org.apache.flink.types.Value,其中包括read()write()两个方法完成序列化和反序列化操作,相对于通用的序列化工具会有着比较高效的性能。目前Flink提供了內建的Value类型有IntValue、DoubleValue以及StringValue等,用户可以结合原生数据类型和Value类型使用。

特殊数据类型

Flink中也支持一些比较特殊的数据数据类型,例如Scala中的ListMapEitherOptionTry数据类型,以及Java中Either数据类型,还有HadoopWritable数据类型。如下代码所示,创建MapList类型数据集。这种数据类型使用场景不是特别广泛,主要原因是数据中的操作相对不像POJOs类那样方便和透明,用户无法根据字段位置或者名称获取字段信息,同时要借助Types Hint帮助Flink推断数据类型信息,关于Tyeps Hmt介绍可以参考下一小节。

//创建 map 类型数据集
Map map = new HashMap<>();
map.put("name","zzx");
map.put("age",12);
env.fromElements(map);
//创建 List 类型数据集
env.fromElements(Arrays.asList(1,2,3,4,5),Arrays.asList(3,4,5));

TypeInformation信息获取: 通常情况下Flink都能正常进行数据类型推断,并选择合适的serializers以及comparators。但在某些情况下却无法直接做到,例如定义函数时如果使用到了泛型,JVM就会出现类型擦除的问题,使得Flink并不能很容易地获取到数据集中的数据类型信息。同时在Scala APIJava API中,Flink分别使用了不同的方式重构了数据类型信息。

Scala API类型信息

Scala API通过使用Manifest和类标签,在编译器运行时获取类型信息,即使是在函数定义中使用了泛型,也不会像Java API出现类型擦除的问题,这使得Scala API具有非常精密的类型管理机制。同时在Flink中使用到Scala Macros框架,在编译代码的过程中推断函数输入参数和返回值的类型信息,同时在Flink中注册成TypeInformation以支持上层计算算子使用。
当使用Scala API开发 Flink应用,如果使用到Flink已经通过TypeInformation定义的数据类型,TypeInformation类不会自动创建,而是使用隐式参数的方式引入,代码不会直接抛出编码异常,但是当启动Flink应用程序时就会报could not find implicit value for evidence parameter of type TypeInformation的错误。这时需要将TypeInformation类隐式参数引入到当前程序环境中,代码实例如下:

import org.apache.flink.api.scala._

Java API类型信息

由于Java的泛型会出现类型擦除问题,Flink通过Java反射机制尽可能重构类型信息,例如使用函数签名以及子类的信息等。同时类型推断在当输出类型依赖于输入参数类型时相对比较容易做到,但是如果函数的输出类型不依赖于输入参数的类型信息,这个时候就需要借助于类型提示Ctype Himts来告诉系统函数中传入的参数类型信息和输出参数信息。如代码清单通过在returns方法中传入TypeHint实例指定输出参数类型,帮助Flink系统对输出类型进行数据类型参数的推断和收集。

//定义泛型函数,输入参数 T,O 输出参数为 O
class MyMapFucntion<T,O> implements MapFunction<T,O>{@Overridepublic O map(T t) throws Exception {//定义计算逻辑return null;}
}//通过 List 集合创建数据集
DataStreamSource<Integer> input = env.fromCollection(integers);
input.flatMap(new MyMapFucntion<String,Integer>()).returns(new TypeHint<Integer>() {//通过returns方法指定返回参数类型
})

在使用Java API定义POJOs类型数据时,PojoTypeInformationPOJOs类中的所有字段创建序列化器,对于标准的类型,例如IntegerStringLong等类型是通过Flink自带的序列化器进行数据序列化,对于其他类型数据都是直接调用Kryo序列化工具来进行序列化。通常情况下,如果Kryo序列化工具无法对POJOs类序列化时,可以使用AvroPOJOs类进行序列化,如下代码通过在ExecutionConfig中调用 enableForceAvro()来开启Avro序列化。

//获取运行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//开启 avro 序列化
env.getConfig().enableForceAvro();

如果用户想使用Kryo序列化工具来序列化POJOs所有字段,则在ExecutionConfig中调用enableForceKryo()来开启Kryo序列化。

//获取运行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//开启 Kryo 序列化
env.getConfig().enableForceKryo();

如果默认的Kryo序列化类不能序列化POJOs对象,通过调用ExecutionConfigaddDefaultKryoSerializer()方法向Kryo中添加自定义的序列化器。

public void addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)

自定义TypeInformation

除了使用已有的TypeInformation所定义的数据格式类型之外,用户也可以自定义实现TypeInformation,来满足的不同的数据类型定义需求。Flink提供了可插拔的 Type Information Factory让用户将自定义的TypeInformation注册到Flink类型系统中。如下代码所示只需要通过实现org.apache.flink.api.common.typeinfo.TypeInfoFactory接口,返回相应的类型信息。通过@TypeInfo注解创建数据类型,定义CustomTuple数据类型。

@TypeInfo(CustomTypeInfoFactory.class)
public class CustomTuple<T0,T1>{public T0 field0;public T1 field1;
}

然后定义CustomTypeInfoFactory类继承于TypeInfoFactory,参数类型指定CustomTuple。最后重写createTypeInfo方法,创建的CustomTupleTypeInfo就是CustomTuple数据类型TypeInformation

public class CustomTypeInfoFactory extends TypeInfoFactory<CustomTuple>{@Overridepublic TypeInfomation<CustomTuple> createTypeInfo(Type t, Map<String,TypeInfoFactory<?>> genericParameters){return new CustomTupleTypeInfo(genericParameters.get("T0"),genericParameters.get("T1");}
}

相关文章:

Flink 数据类型 TypeInformation信息

Flink流应用程序处理的是以数据对象表示的事件流。所以在Flink内部&#xff0c;我么需要能够处理这些对象。它们需要被序列化和反序列化&#xff0c;以便通过网络传送它们&#xff1b;或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点&#xff0c;Flink需要明确知…...

基于python的leetcode算法介绍之递归

文章目录 零 算法介绍一 简单示例 辗转相除法Leetcode例题与思路[509. 斐波那契数](https://leetcode.cn/problems/fibonacci-number/)解题思路&#xff1a;题解&#xff1a; [206. 反转链表](https://leetcode.cn/problems/reverse-linked-list/)解题思路&#xff1a;题解&…...

2023年度佳作:AIGC、AGI、GhatGPT、人工智能大语言模型的崛起与挑战

目录 前言 01 《ChatGPT 驱动软件开发》 内容简介 02 《ChatGPT原理与实战》 内容简介 03 《神经网络与深度学习》 04 《AIGC重塑教育》 内容简介 05 《通用人工智能》 目  录 前言 2023年是人工智能大语言模型大爆发的一年&#xff0c;一些概念和英文缩写也在这一…...

Axure的交互以及情形的介绍

一. 交互 1.1 交互概述 通俗来讲就是&#xff0c;谁用了什么方法做了什么事情&#xff0c;主体"谁"对应的就是axure中的元件&#xff0c;"什么方法"对应的就是交互事件&#xff0c;比如单击事件、双击事件&#xff0c;"什么事情"对应的就是交互…...

【MATLAB第84期】基于MATLAB的波形叠加极限学习机SW-ELM代理模型的sobol全局敏感性分析法应用

【MATLAB第84期】基于MATLAB的波形叠加极限学习机SW-ELM代理模型的sobol全局敏感性分析法应用 前言 跟往期sobol区别&#xff1a; 1.sobol计算依赖于验证集样本&#xff0c;无需定义变量上下限。 2.SW-ELM自带激活函数&#xff0c;计算具有phi&#xff08;x&#xff09;e^x激…...

米游社区表情包整合网站源码

源码介绍 米游社表情包整合网站源码&#xff0c;来自Github大佬的项目&#xff0c;包含米游兔123枚&#xff0c;米游社 玩家12枚&#xff0c;崩坏 星穹铁道112枚&#xff0c;绝区零218枚&#xff0c;NAP32枚&#xff0c;崩坏RPG62枚&#xff0c;崩坏3-1282枚&#xff0c;原神 …...

easyexcel调用公共导出方法导出数据

easyexcel备忘 Slf4j public class ConditionDownloadUtil {//扫描在xboot 包下所有IService 接口的子类, 每次启动服务后, 重新扫描public final static Class[] classesExtendsIService ClassUtil.scanPackageBySuper("cn.exrick.xboot", IService.class).toArra…...

C语言插入排序算法及代码

一、原理 在待排序的数组里&#xff0c;从数组的第二个数字开始&#xff0c;通过构建有序序列&#xff0c;对于未排序数据&#xff0c;在已排序序列中从后向前扫描&#xff0c;找到相应位置并插入。 二、代码部分 #include<stdio.h> #include<stdlib.h> int ma…...

2023年中国法拍房用户画像和数据分析

法拍房主要平台 法拍房主要平台有3家&#xff0c;分别是阿里、京东和北交互联平台。目前官方认定纳入网络司法拍卖的平台共有7家&#xff0c;其中阿里资产司法拍卖平台的挂拍量最大。 阿里法拍房 阿里法拍房数据显示2017年&#xff0c;全国法拍房9000套&#xff1b;2018年&a…...

Android 清除临时文件,清空缓存

python 代码&#xff1a; import os import shutil import tracebackdef delete_folder(path):if os.path.exists(path):print(f"删除文件夹: {path}")shutil.rmtree(path)print("删除完成")def delete_file(path):if os.path.exists(path):print(f"删…...

Guava限流神器:RateLimiter使用指南

1. 引言 可能有些小伙伴听到“限流”这个词就觉得头大&#xff0c;感觉像是一个既复杂又枯燥的话题。别急&#xff0c;小黑今天就要用轻松易懂的方式&#xff0c;带咱们一探RateLimiter的究竟。 想象一下&#xff0c;当你去超市排队结账时&#xff0c;如果收银台开得越多&…...

【六大排序详解】开篇 :插入排序 与 希尔排序

插入排序 与 希尔排序 六大排序之二 插入排序 与 希尔排序1 排序1.1排序的概念 2 插入排序2.1 插入排序原理2.2 排序步骤2.3 代码实现 3 希尔排序3.1 希尔排序原理3.2 排序步骤3.3 代码实现 4 时间复杂度分析 Thanks♪(&#xff65;ω&#xff65;)&#xff89;下一篇文章见&am…...

凸优化问题求解

这里写目录标题 1. 线性规划基本定理2.单纯形法2.1 转轴运算 3. 内点法3.1 线性规划的内点法 1. 线性规划基本定理 首先我们指出&#xff0c;线性规划均可等价地化成如下标准形式 { min ⁡ c T x , s . t A x b , x ⪰ 0 , \begin{align}\begin{cases}\min~c^Tx,\\\mathrm{s.…...

文件操作入门指南

目录 一、为什么使用文件 二、什么是文件 2.1 程序文件 2.2 数据文件 2.3 文件名 三、文件的打开和关闭 3.1 文件指针 3.2 文件的打开和关闭 四、文件的顺序读写 ​编辑 &#x1f33b;深入理解 “流”&#xff1a; &#x1f342;文件的顺序读写函数介绍&#xff1a; …...

Axure之交互与情节与一些实例

目录 一.交互与情节简介 二.ERP登录页到主页的跳转 三.ERP的菜单跳转到各个页面的跳转 四.省市联动 五.手机下拉加载 今天就到这里了&#xff0c;希望帮到你哦&#xff01;&#xff01;&#xff01; 一.交互与情节简介 "交互"通常指的是人与人、人与计算机或物体…...

【数据库设计和SQL基础语法】--连接与联接--多表查询与子查询基础(二)

一、子查询基础 1.1 子查询概述 子查询是指在一个查询语句内部嵌套另一个查询语句的过程。子查询可以嵌套在 SELECT、FROM、WHERE 或 HAVING 子句中&#xff0c;用于从数据库中检索数据或执行其他操作。子查询通常返回一个结果集&#xff0c;该结果集可以被包含它的主查询使用…...

Android studio中导入opencv库

具体opencv库的导入流程参考链接&#xff1a;Android Studio开发之路 &#xff08;五&#xff09;导入OpenCV以及报错解决 一、出现的错误&#xff1a;NullPointerException: Cannot invoke “java.io.File.toPath()” because “this.mySdkLocation” is null 解决办法&#…...

Linux(1)_基础知识

第一部分 一、Linux系统概述 创始人&#xff1a;芬兰大学大一的学生写的Linux内核&#xff0c;李纳斯托瓦兹。 Linux时unix的类系统&#xff1b; 特点&#xff1a;多用户 多线程的操作系统&#xff1b; 开源操作系统&#xff1b; 开源项目&#xff1a;操作系统&#xff0c;应用…...

网络相关面试题

简述 TCP 连接的过程&#xff08;淘系&#xff09; 参考答案&#xff1a; TCP 协议通过三次握手建立可靠的点对点连接&#xff0c;具体过程是&#xff1a; 首先服务器进入监听状态&#xff0c;然后即可处理连接 第一次握手&#xff1a;建立连接时&#xff0c;客户端发送 syn 包…...

Vue2面试题:说一下对跨域的理解?

http请求分为两大类&#xff1a;普通http请求&#xff08;如百度请求&#xff09;和ajax请求&#xff08;跨域是出现在ajax请求&#xff09; 同源策略&#xff1a;在浏览器发起ajax请求时&#xff0c;当前的网址和被请求的网址协议、域名、端口号必须完全一致&#xff0c;目的是…...

Python爬虫实战:研究MechanicalSoup库相关技术

一、MechanicalSoup 库概述 1.1 库简介 MechanicalSoup 是一个 Python 库,专为自动化交互网站而设计。它结合了 requests 的 HTTP 请求能力和 BeautifulSoup 的 HTML 解析能力,提供了直观的 API,让我们可以像人类用户一样浏览网页、填写表单和提交请求。 1.2 主要功能特点…...

(二)TensorRT-LLM | 模型导出(v0.20.0rc3)

0. 概述 上一节 对安装和使用有个基本介绍。根据这个 issue 的描述&#xff0c;后续 TensorRT-LLM 团队可能更专注于更新和维护 pytorch backend。但 tensorrt backend 作为先前一直开发的工作&#xff0c;其中包含了大量可以学习的地方。本文主要看看它导出模型的部分&#x…...

【算法训练营Day07】字符串part1

文章目录 反转字符串反转字符串II替换数字 反转字符串 题目链接&#xff1a;344. 反转字符串 双指针法&#xff0c;两个指针的元素直接调转即可 class Solution {public void reverseString(char[] s) {int head 0;int end s.length - 1;while(head < end) {char temp …...

浅谈不同二分算法的查找情况

二分算法原理比较简单&#xff0c;但是实际的算法模板却有很多&#xff0c;这一切都源于二分查找问题中的复杂情况和二分算法的边界处理&#xff0c;以下是博主对一些二分算法查找的情况分析。 需要说明的是&#xff0c;以下二分算法都是基于有序序列为升序有序的情况&#xf…...

关键领域软件测试的突围之路:如何破解安全与效率的平衡难题

在数字化浪潮席卷全球的今天&#xff0c;软件系统已成为国家关键领域的核心战斗力。不同于普通商业软件&#xff0c;这些承载着国家安全使命的软件系统面临着前所未有的质量挑战——如何在确保绝对安全的前提下&#xff0c;实现高效测试与快速迭代&#xff1f;这一命题正考验着…...

Java线上CPU飙高问题排查全指南

一、引言 在Java应用的线上运行环境中&#xff0c;CPU飙高是一个常见且棘手的性能问题。当系统出现CPU飙高时&#xff0c;通常会导致应用响应缓慢&#xff0c;甚至服务不可用&#xff0c;严重影响用户体验和业务运行。因此&#xff0c;掌握一套科学有效的CPU飙高问题排查方法&…...

USB Over IP专用硬件的5个特点

USB over IP技术通过将USB协议数据封装在标准TCP/IP网络数据包中&#xff0c;从根本上改变了USB连接。这允许客户端通过局域网或广域网远程访问和控制物理连接到服务器的USB设备&#xff08;如专用硬件设备&#xff09;&#xff0c;从而消除了直接物理连接的需要。USB over IP的…...

使用Matplotlib创建炫酷的3D散点图:数据可视化的新维度

文章目录 基础实现代码代码解析进阶技巧1. 自定义点的大小和颜色2. 添加图例和样式美化3. 真实数据应用示例实用技巧与注意事项完整示例(带样式)应用场景在数据科学和可视化领域,三维图形能为我们提供更丰富的数据洞察。本文将手把手教你如何使用Python的Matplotlib库创建引…...

JS设计模式(4):观察者模式

JS设计模式(4):观察者模式 一、引入 在开发中&#xff0c;我们经常会遇到这样的场景&#xff1a;一个对象的状态变化需要自动通知其他对象&#xff0c;比如&#xff1a; 电商平台中&#xff0c;商品库存变化时需要通知所有订阅该商品的用户&#xff1b;新闻网站中&#xff0…...

Mysql中select查询语句的执行过程

目录 1、介绍 1.1、组件介绍 1.2、Sql执行顺序 2、执行流程 2.1. 连接与认证 2.2. 查询缓存 2.3. 语法解析&#xff08;Parser&#xff09; 2.4、执行sql 1. 预处理&#xff08;Preprocessor&#xff09; 2. 查询优化器&#xff08;Optimizer&#xff09; 3. 执行器…...