Flink 数据序列化
为 Flink 量身定制的序列化框架
大家都知道现在大数据生态非常火,大多数技术组件都是运行在JVM
上的,Flink
也是运行在JVM
上,基于JVM
的数据分析引擎都需要将大量的数据存储在内存中,这就不得不面临JVM
的一些问题,比如Java
对象存储密度较低等。针对这些问题,最常用的方法就是实现一个显式的内存管理,也就是说用自定义的内存池来进行内存的分配回收,接着将序列化后的对象存储到内存块中。
现在Java
生态圈中已经有许多序列化框架,比如说Java serialization, Kryo,Apache Avro
等等。但是Flink
依然是选择了自己定制的序列化框架,那么到底有什么意义呢?若Flink
选择自己定制的序列化框架,对类型信息了解越多,可以在早期完成类型检查,更好的选取序列化方式,进行数据布局,节省数据的存储空间,直接操作二进制数据。
Flink
在其内部构建了一套自己的类型系统,Flink
现阶段支持的类型分类如图所示,从图中可以看到Flink
类型可以分为基础类型Basic
、数组Arrays
、复合类型Composite
、辅助类型Auxiliary
、泛型和其它类型Generic
。Flink
支持任意的Java
或是Scala
类型。不需要像Hadoop
一样去实现一个特定的接口org.apache.hadoop.io.Writable
,Flink
能够自动识别数据类型。
TypeInformation
的思维导图如图所示,从图中可以看出,在Flink
中每一个具体的类型都对应了一个具体的TypeInformation
实现类,例如BasicTypeInformation
中的IntegerTypeInformation
和FractionalTypeInformation
都具体的对应了一个TypeInformation
。然后还有BasicArrayTypeInformation
、CompositeType
以及一些其它类型,也都具体对应了一个TypeInformation
。
TypeInformation
是Flink
类型系统的核心类。对于用户自定义的Function
来说,Flink
需要一个类型信息来作为该函数的输入输出类型,即TypeInfomation
。该类型信息类作为一个工具来生成对应类型的序列化器TypeSerializer
,并用于执行语义检查,比如当一些字段在作为join
或grouping
的键时,检查这些字段是否在该类型中存在。
Flink 的序列化过程
在Flink
序列化过程中,进行序列化操作必须要有序列化器,那么序列化器从何而来?
每一个具体的数据类型都对应一个TypeInformation
的具体实现,每一个TypeInformation
都会为对应的具体数据类型提供一个专属的序列化器。通过 Flink
的序列化过程图可以看到TypeInformation
会提供一个createSerialize()
方法,通过这个方法就可以得到该类型进行数据序列化操作与反序化操作的对象TypeSerializer
。
对于大多数数据类型
Flink
可以自动生成对应的序列化器,能非常高效地对数据集进行序列化和反序列化,比如,BasicTypeInfo
、WritableTypeInfo
等,但针对GenericTypeInfo
类型,Flink
会使用Kyro
进行序列化和反序列化。其中,Tuple
、Pojo
和CaseClass
类型是复合类型,它们可能嵌套一个或者多个数据类型。在这种情况下,它们的序列化器同样是复合的。它们会将内嵌类型的序列化委托给对应类型的序列化器。
简单的介绍下Pojo
的类型规则,即在满足一些条件的情况下,才会选用Pojo
的序列化进行相应的序列化与反序列化的一个操作。即类必须是Public
的,且类有一个public
的无参数构造函数,该类(以及所有超类)中的所有非静态no-static
、非瞬态no-transient
字段都是public
的(和非最终的final
)或者具有公共getter
和setter
方法,该方法遵循getter
和setter
的Java bean
命名约定。当用户定义的数据类型无法识别为POJO
类型时,必须将其作为GenericType
处理并使用Kryo
进行序列化。
Flink
自带了很多TypeSerializer
子类,大多数情况下各种自定义类型都是常用类型的排列组合,因而可以直接复用,如果内建的数据类型和序列化方式不能满足你的需求,Flink
的类型信息系统也支持用户拓展。若用户有一些特殊的需求,只需要实现 TypeInformation
、TypeSerializer
和TypeComparator
即可定制自己类型的序列化和比较大小方式,来提升数据类型在序列化和比较时的性能。
序列化就是将数据结构或者对象转换成一个二进制串的过程,在Java
里面可以简单地理解成一个byte
数组。而反序列化恰恰相反,就是将序列化过程中所生成的二进制串转换成数据结构或者对象的过程。下面就以内嵌型的Tuple3
这个对象为例,简述一下它的序列化过程。
Tuple3
包含三个层面,一是int
类型,一是double
类型,还有一个是Person
。Person
包含两个字段,一是int
型的ID
,另一个是 String
类型的name
,它在序列化操作时,会委托相应具体序列化的序列化器进行相应的序列化操作。从图中可以看到Tuple3
会把 int
类型通过IntSerializer
进行序列化操作,此时int
只需要占用四个字节就可以了。根据int
占用四个字节,这个能够体现出Flink
可序列化过程中的一个优势,即在知道数据类型的前提下,可以更好的进行相应的序列化与反序列化操作。相反,如果采用Java
的序列化,虽然能够存储更多的属性信息,但一次占据的存储空间会受到一定的损耗。Person
类会被当成一个Pojo
对象来进行处理,PojoSerializer
序列化器会把一些属性信息使用一个字节存储起来。同样,其字段则采取相对应的序列化器进行相应序列化,在序列化完的结果中,可以看到所有的数据都是由MemorySegment
去支持。
MemorySegment
具有什么作用呢? MemorySegment
在Flink
中会将对象序列化到预分配的内存块上,它代表1
个固定长度的内存,默认大小为32 kb
。MemorySegment
代表Flink
中的一个最小的内存分配单元,相当于是Java
的一个byte
数组。 每条记录都会以序列化的形式存储在一个或多个MemorySegment
中。
Flink 序列化的最佳实践
Flink
常见的应用场景有四种,即注册子类型、注册自定义序列化器、添加类型提示、手动创建TypeInformation
,具体如下:
【1】注册子类型: 如果函数签名只描述了超类型,但是它们实际上在执行期间使用了超类型的子类型,那么让Flink
了解这些子类型会大大提高性能。可以在StreamExecutionEnvironment
或ExecutionEnvironment
中调用.registertype (clazz)
注册子类型信息。
【2】注册自定义序列化器: 对于不适用于自己的序列化框架的数据类型,Flink
会使用Kryo
来进行序列化,并不是所有的类型都与Kryo
无缝连接,具体注册方法在下文介绍。
【3】添加类型提示: 有时,当Flink
用尽各种手段都无法推测出泛型信息时,用户需要传入一个类型提示TypeHint
,这个通常只在Java API
中需要。
【4】手动创建TypeInformation
: 在某些API
调用中,这可能是必需的,因为Java
的泛型类型擦除导致Flink
无法推断数据类型。
其实在大多数情况下,用户不必担心序列化框架和注册类型,因为Flink
已经提供了大量的序列化操作,不需要去定义自己的一些序列化器,但是在一些特殊场景下,需要去做一些相应的处理。
实践 - 类型声明: 类型声明去创建一个类型信息的对象是通过哪种方式?通常是用TypeInformation.of()
方法来创建一个类型信息的对象,具体说明如下:
【1】对于非泛型类,直接传入class
对象即可
PojoTypeInfo<Person> typeInfo = (PojoTypeInfo<Person>) TypeInformation.of(Person.class);
【2】对于泛型类,需要通过TypeHint
来保存泛型类型信息
final TypeInfomation<Tuple2<Integer,Integer>> resultType = TypeInformation.of(new TypeHint<Tuple2<Integer,Integer>>(){});
【3】预定义常量: 如BasicTypeInfo
,这个类定义了一系列常用类型的快捷方式,对于String
、Boolean
、Byte
、Short
、Integer
、Long
、Float
、Double
、Char
等基本类型的类型声明,可以直接使用。而且Flink
还提供了完全等价的Types
类org.apache.flink.api.common.typeinfo.Types
。特别需要注意的是,flink-table
模块也有一个Types
类org.apache.flink.table.api.Types
,用于table
模块内部的类型定义信息,用法稍有不同。使用IDE
的自动import
时一定要小心。
【4】自定义TypeInfo
和TypeInfoFactory
: 通过自定义TypeInfo
为任意类提供Flink
原生内存管理(而非Kryo
),使存
储更紧凑,运行时也更高效。需要注意在自定义类上使用@TypeInfo
注解,随后创建相应的TypeInfoFactory
并覆盖createTypeInfo()
方法。
@TypeInfo(MyTupleTypeInfoFactory.class)
public class MyTuple<T0,T1>{public T0 myfield0;public T1 myfield1;
}public class MyTupleTypeInfoFactory extends TypeInfoFactory<MyTuple>{@Overridepublic TypeInformation<MyTuple> createTypeInfo(Type t, Map<String, TypeInfomation<?>> genericParameters){return new MyTupleTypeInfo(genericParameters.get("T0").genericParameters.get("T1"));}
}
实践 - 注册子类型
Flink
认识父类,但不一定认识子类的一些独特特性,因此需要单独注册子类型。StreamExecutionEnvironment
和 ExecutionEnvironment
提供registerType()
方法用来向Flink
注册子类信息。
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerType(typeClass);
在registerType()
方法内部,会使用TypeExtractor
来提取类型信息,如上所示,获取到的类型信息属于PojoTypeInfo
及其子类,那么需要将其注册到一起,否则统一交给Kryo
去处理,Flink
并不过问 ( 这种情况下性能会变差 )。
实践 - Kryo 序列化
对于Flink
无法序列化的类型(例如用户自定义类型,没有registerType
,也没有自定义TypeInfo
和TypeInfoFactory
),默认会交给 Kryo
处理,如果Kryo
仍然无法处理(例如Guava
、Thrift
、Protobuf
等第三方库的一些类),有两种解决方案:
【1】强制使用Avro
来代替Kryo
env.getConfig().enableForceAvro();
【2】为Kryo
增加自定义的Serializer
以增强Kryo
的功能
env.getConfig().addDefaultKryoSerializer(clazz, serializer);
注:如果希望完全禁用Kryo
(100%
使用Flink
的序列化机制),可以通过Kryoenv.getConfig().disableGenericTypes()
的方式完成,但注意一切无法处理的类都将导致异常,这种对于调试非常有效。
Flink 通信层的序列化
Flink
的Task
之间如果需要跨网络传输数据记录, 那么就需要将数据序列化之后写入NetworkBufferPool
,然后下层的Task
读出之后再进行反序列化操作,最后进行逻辑处理。为了使得记录以及事件能够被写入 Buffer,随后在消费时再从Buffer
中读出,
Flink
提供了数据记录序列化器RecordSerializer
与反序列化器RecordDeserializer
以及事件序列化器EventSerializer
。
Function
发送的数据被封装成SerializationDelegate
,它将任意元素公开为IOReadableWritable
以进行序列化,通过setInstance()
来传入要序列化的数据。在Flink
通信层的序列化中,有几个问题值得关注,具体如下:
【1】何时确定Function
的输入输出类型?
在构建StreamTransformation
的时候通过TypeExtractor
工具确定Function
的输入输出类型。TypeExtractor
类可以根据方法签名、子类信息等蛛丝马迹自动提取或恢复类型信息。
【2】何时确定Function
的序列化 / 反序列化器?
构造StreamGraph
时, 通过TypeInfomation
的createSerializer()
方法获取对应类型的序列化器TypeSerializer
,并在addOperator()
的过程中执行setSerializers() 操作,设置StreamConfig
的TYPESERIALIZERIN1
、TYPESERIALIZERIN2
、 TYPESERIALIZEROUT_1
属性。
【3】何时进行真正的序列化 / 反序列化操作? 这个过程与TypeSerializer
又是怎么联系在一起的呢?
构造StreamGraph
时, 通过TypeInfomation
的createSerializer()
方法获取对应类型的序列化器TypeSerializer
,并在addOperator()
的过程中执行setSerializers()
操作,设置StreamConfig
的TYPESERIALIZERIN1
、 TYPESERIALIZERIN2
、 TYPESERIALIZEROUT_1
属性。
【4】何时进行真正的序列化 / 反序列化操作? 这个过程与TypeSerializer
又是怎么联系在一起的呢?
大家都应该清楚Task
和StreamTask
两个概念,Task
是直接受TaskManager
管理和调度的,而Task
又会调用StreamTask
,而StreamTask
中真正封装了算子的处理逻辑。在run()
方法中,首先将反序列化后的数据封装成StreamRecord
交给算子处理;然后将处理结果通过Collector
发送给下游 ( 在构建Collector
时已经确定了SerializtionDelegate
),并通过RecordWriter
写入器将序列化后的结果写入DataOutput
;最后序列化的操作交给SerializerDelegate
处理,实际还是通过TypeSerializer
的serialize()
方法完成。
相关文章:

Flink 数据序列化
为 Flink 量身定制的序列化框架 大家都知道现在大数据生态非常火,大多数技术组件都是运行在JVM上的,Flink也是运行在JVM上,基于JVM的数据分析引擎都需要将大量的数据存储在内存中,这就不得不面临JVM的一些问题,比如Ja…...

【并发设计模式】聊聊两阶段终止模式如何优雅终止线程
在软件设计中,抽象出了23种设计模式,用以解决对象的创建、组合、使用三种场景。在并发编程中,针对线程的操作,也抽象出对应的并发设计模式。 两阶段终止模式- 优雅停止线程避免共享的设计模式- 只读、Copy-on-write、Thread-Spec…...

Java实现非对称加密【详解】
Java实现非对称加密 1. 简介2. 非对称加密算法--DH(密钥交换)3. 非对称加密算法--RSA非对称加密算法--EIGamal5. 总结6 案例6.1 案例16.2 案例2 1. 简介 公开密钥密码学(英语:Public-key cryptography)也称非对称式密…...

simulinkveristandlabview联合仿真——模型导入搭建人机界面
目录 1.软件版本 2.搭建simulink仿真模型 编译错误 3.导入veristand并建立工程 4.veristand导入labview labview显示veristand工程数据 labview设置veristand工程数据 运行labview工程 1.软件版本 matlab2020a,veristand2020 R4,labview2020 SP…...

k8s中Helm工具实践
k8s中Helm工具实践 1)安装redis-cluster 先搭建一个NFS的SC(只需要SC,不需要pvc),具体步骤此文档不再提供,请参考前面相关章节。 下载redis-cluster的chart包 helm pull bitnami/redis-cluster --untar…...

推荐算法架构7:特征工程(吊打面试官,史上最全!)
系列文章,请多关注 推荐算法架构1:召回 推荐算法架构2:粗排 推荐算法架构3:精排 推荐算法架构4:重排 推荐算法架构5:全链路专项优化 推荐算法架构6:数据样本 推荐算法架构7:特…...

Web前端 ---- 【Vue】vue路由守卫(全局前置路由守卫、全局后置路由守卫、局部路由path守卫、局部路由component守卫)
目录 前言 全局前置路由守卫 全局后置路由守卫 局部路由守卫之path守卫 局部路由守卫之component守卫 前言 本文介绍Vue2最后的知识点,关于vue的路由守卫。也就是鉴权,不是所有的组件任何人都可以访问到的,需要权限,而根据权限…...
uniapp点击tabbar之前做判断
在UniApp中,可以通过监听 tabBar 的 click 事件来在点击 tabBar 前做判断。具体步骤如下: 在 pages.json 文件中配置 tabBar,例如: {"pages":[{"path":"pages/home/home","name":"h…...

DLLNotFoundException:xxx tolua... 错误打印
DLLNotFoundException:xxx tolua... 错误打印 一、DLLNotFoundException介绍二、Plugins文件夹文件目录结构如下: 三、Plugins中的Android文件夹四、Plugins中的IOS文件夹这里不说了没测试过不过原理应该也是选择对应的平台即可五、Plugins中的x86和X86_64文件夹 一…...

Python量化投资——金融数据最佳实践: 使用qteasy+tushare搭建本地金融数据仓库并定期批量更新【附源码】
用qteasytushare实现金融数据本地化存储及访问 目的什么是qteasy什么是tushare为什么要本地化使用qteasy创建本地数据仓库qteasy支持的几种本地化仓库类型配置本地数据仓库配置tushare 的API token 配置本地数据源 —— 用MySQL数据库作为本地数据源下载金融历史数据 数据的定期…...
【投稿】北海 - Rust与面向对象(二)
模板方法 Rust提供了trait,类似于面向对象的接口,不同的是,将传统面向对象的虚函数表从对象中分离出来,trait仍然是一个函数表,只不过是独立的,它的参数self指针可以指向任何实现了该trait的结构。 从对象中…...

HarmonyOS构建第一个ArkTS应用(FA模型)
构建第一个ArkTS应用(FA模型) 创建ArkTS工程 若首次打开DevEco Studio,请点击Create Project创建工程。如果已经打开了一个工程,请在菜单栏选择File > New > Create Project来创建一个新工程。 选择Application应用开发&a…...

阿里云 ARMS 应用监控重磅支持 Java 21
作者:牧思 & 山猎 前言 今年的 9 月 19 日,作为最新的 LTS (Long Term Support) Java 版本,Java 21 正式 GA,带来了不少重量级的更新,详情请参考 The Arrival of Java 21 [ 1] 。虽然目前 Java 11 和 Java 17 都…...
C++ 类的析构函数和构造函数
构造函数 类的构造函数是类的一种特殊的成员函数,它会在每次创建类的新对象时执行。主要用来在创建对象时初始化对象即为对象成员变量赋初始值。 构造函数的名称与类的名称是完全相同的,并且不会返回任何类型,也不会返回 void。构造函数可用…...

STM32——CAN协议
文章目录 一.CAN协议的基本特点1.1 特点1.2 电平标准1.3 基本的五个帧1.4 数据帧 二.数据帧解析2.1 帧起始和仲裁段2.2 控制段2.3 数据段和CRC段2.4 ACK段和帧结束 三.总线仲裁四.位时序五.STM32CAN控制器原理与配置5.1 STM32CAN控制器介绍5.2 CAN的模式5.3 CAN框图 六 手册寄存…...

数据结构-如何巧妙实现一个栈?逐步解析与代码示例
文章目录 引言1.栈的基本概念2.选择数组还是链表?3. 定义栈结构4.初始化栈5.压栈操作6.弹栈操作7.查看栈顶和判断栈空9.销毁栈操作10.测试并且打印栈内容栈的实际应用结论 引言 栈是一种基本但强大的数据结构,它在许多算法和系统功能中扮演着关键角色。…...
web前端之拖拽API、vue3实现图片上传拖拽排序、拖放、投掷、复制、若依、vuedraggable
MENU vue2html5原生dom原生JavaScript实现跨区域拖放vue2实现跨区域拖放vue2mousedown实现全屏拖动,全屏投掷vue3element-plusvuedraggable实现图片上传拖拽排序vue2transition-group实现拖动排序原生拖拽排序 vue2html5原生dom原生JavaScript实现跨区域拖放 关键代…...

第11章 GUI Page403~405 步骤三 设置滚动范围
运行效果: 源代码: /**************************************************************** Name: wxMyPainterApp.h* Purpose: Defines Application Class* Author: yanzhenxi (3065598272qq.com)* Created: 2023-12-21* Copyright: yanzhen…...

【Spring Security】打造安全无忧的Web应用--使用篇
🥳🥳Welcome Huihuis Code World ! !🥳🥳 接下来看看由辉辉所写的关于Spring Security的相关操作吧 目录 🥳🥳Welcome Huihuis Code World ! !🥳🥳 一.Spring Security中的授权是…...

体验一下 CodeGPT 插件
体验一下 CodeGPT 插件 0. 背景1. CodeGPT 插件安装2. CodeGPT 插件基本配置3. (可选)CodeGPT 插件预制提示词原始配置(英文)4. CodeGPT 插件预制提示词配置(中文)5. 简单验证一下 0. 背景 看到B站Up主 “wwwzhouhui” 一个关于 CodeGPT 的视频,感觉挺有意思&#…...
SkyWalking 10.2.0 SWCK 配置过程
SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外,K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案,全安装在K8S群集中。 具体可参…...

HBuilderX安装(uni-app和小程序开发)
下载HBuilderX 访问官方网站:https://www.dcloud.io/hbuilderx.html 根据您的操作系统选择合适版本: Windows版(推荐下载标准版) Windows系统安装步骤 运行安装程序: 双击下载的.exe安装文件 如果出现安全提示&…...

MySQL 8.0 OCP 英文题库解析(十三)
Oracle 为庆祝 MySQL 30 周年,截止到 2025.07.31 之前。所有人均可以免费考取原价245美元的MySQL OCP 认证。 从今天开始,将英文题库免费公布出来,并进行解析,帮助大家在一个月之内轻松通过OCP认证。 本期公布试题111~120 试题1…...

多模态大语言模型arxiv论文略读(108)
CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文标题:CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文作者:Sayna Ebrahimi, Sercan O. Arik, Tejas Nama, Tomas Pfister ➡️ 研究机构: Google Cloud AI Re…...

USB Over IP专用硬件的5个特点
USB over IP技术通过将USB协议数据封装在标准TCP/IP网络数据包中,从根本上改变了USB连接。这允许客户端通过局域网或广域网远程访问和控制物理连接到服务器的USB设备(如专用硬件设备),从而消除了直接物理连接的需要。USB over IP的…...
【生成模型】视频生成论文调研
工作清单 上游应用方向:控制、速度、时长、高动态、多主体驱动 类型工作基础模型WAN / WAN-VACE / HunyuanVideo控制条件轨迹控制ATI~镜头控制ReCamMaster~多主体驱动Phantom~音频驱动Let Them Talk: Audio-Driven Multi-Person Conversational Video Generation速…...
Java + Spring Boot + Mybatis 实现批量插入
在 Java 中使用 Spring Boot 和 MyBatis 实现批量插入可以通过以下步骤完成。这里提供两种常用方法:使用 MyBatis 的 <foreach> 标签和批处理模式(ExecutorType.BATCH)。 方法一:使用 XML 的 <foreach> 标签ÿ…...

处理vxe-table 表尾数据是单独一个接口,表格tableData数据更新后,需要点击两下,表尾才是正确的
修改bug思路: 分别把 tabledata 和 表尾相关数据 console.log() 发现 更新数据先后顺序不对 settimeout延迟查询表格接口 ——测试可行 升级↑:async await 等接口返回后再开始下一个接口查询 ________________________________________________________…...

莫兰迪高级灰总结计划简约商务通用PPT模版
莫兰迪高级灰总结计划简约商务通用PPT模版,莫兰迪调色板清新简约工作汇报PPT模版,莫兰迪时尚风极简设计PPT模版,大学生毕业论文答辩PPT模版,莫兰迪配色总结计划简约商务通用PPT模版,莫兰迪商务汇报PPT模版,…...

FFmpeg:Windows系统小白安装及其使用
一、安装 1.访问官网 Download FFmpeg 2.点击版本目录 3.选择版本点击安装 注意这里选择的是【release buids】,注意左上角标题 例如我安装在目录 F:\FFmpeg 4.解压 5.添加环境变量 把你解压后的bin目录(即exe所在文件夹)加入系统变量…...