Flink源码之State创建流程
StreamOperatorStateHandler
在StreamTask启动初始化时通过StreamTaskStateInitializerImpl::streamOperatorStateContext会为每个StreamOperator 创建keyedStatedBackend和operatorStateBackend,在AbstractStreamOperator中有个StreamOperatorStateHandler成员变量,调用AbstractStreamOperator::initializeState方法中会初始化StreamOperatorStateHandler类型的成员变量, StreamOperatorStateHandler对象变量封装了keyedStatedBackend和operatorStateBackend,用于统一管理SteamOperator的状态。
OperatorChain::initializeStateAndOpenOperators //调用每个Operator的initializeState和Open方法AbstractStreamOperator::initializeState(StreamTaskStateInitializer) StreamTaskStateInitializerImpl::streamOperatorStateContext //此时会创建keyedStatedBackend和operatorStateBackendStreamOperatorStateHandler::new //初始化StreamOperator的stateHandler成员变量,用于状态管理StreamOperatorStateHandler::initializeOperatorStateStateInitializationContextImpl::new //封装DefaultKeyedStateStore和OperatorStateStoreCheckpointedStreamOperator::initializeState(StateInitializationContext)//调用用户定义函数中的initializeState方法,可获取Operator StateStreamingRuntimeContext::setKeyedStateStore
Flink中主要有两种StateBackend:
- HashMapStateBackend //内存
- EmbeddedRocksDBStateBackend //内存+磁盘
每个StreamTask一个StateBackend成员变量,在构造函数中进行初始化,通过用户代码中设置或StateBackendLoader::loadStateBackendFromConfig从配置中加载,默认为HashMapStateBackend。简单起见,以HashMapStateBackend为例剖析创建KeyedStatedBackend和OperatorStateBackend以及处理数据流时是如何使用KeyedState和OperatorState的。
OperatorState
OperatorState创建流程:
OperatorChain::initializeStateAndOpenOperators //调用每个Operator的initializeState和Open方法AbstractStreamOperator::initializeStateStreamTaskStateInitializerImpl::streamOperatorStateContextStreamTaskStateInitializerImpl::operatorStateBackendHashMapStateBackend::createOperatorStateBackend //创建DefaultOperatorStateBackendStreamOperatorStateHandler::new //创建StreamOperatorStateHandlerStreamOperatorStateHandler::initializeOperatorState //调用CheckpointedFunction::initializeStateStateInitializationContextImpl::new //该实例可getOperatorStateStore
使用Operator State的用户业务代码需要实现CheckpointedFunction接口,该接口中有以两个下方法:
void initializeState(FunctionInitializationContext context) throws Exception;void snapshotState(FunctionSnapshotContext context) throws Exception;
其中initializeState方法则会被StreamOperatorStateHandler.initializeOperatorState 调用,在initializeState方法中可使用
FunctionInitializationContext.getOperatorStateStore().getListState(ListStateDescriptor)
DefaultOperatorStateBackend::getListState::newPartitionableListState::new //内部是ArrayList
因此通过OperatorStateStore获取的ListState内部本质上是一个ArrayList, 业务代码中可以调用add方法向这个内部List添加元素,由StateBackend管理每个Operator State,这样就实现了一个分布式状态管理,借助Checkpoint可以实现状态持久化及容灾恢复。
OperatorStateStore有三个获取状态方法:
<S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
<S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
<K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)throws Exception
KeyedState
KeyedState创建流程如下:
OperatorChain::initializeStateAndOpenOperators //调用每个Operator的initializeState和Open方法AbstractStreamOperator::initializeStateStreamTaskStateInitializerImpl::streamOperatorStateContextStreamTaskStateInitializerImpl::keyedStatedBackendHashMapStateBackend::createKeyedStateBackend //创建HeapKeyedStateBackendHeapKeyedStateBackendBuilder::buildInternalKeyContextImpl::new //用于保存当前正在处理的keyStreamOperatorStateHandler::new //创建StreamOperatorStateHandlerDefaultKeyedStateStore::new //创建DefaultKeyedStateStoreStreamingRuntimeContext::setKeyedStateStore //设置keyedStateStore成员变量AbstractStreamUdfOperator::openFunctionUtils::openFunctionRichFunction::open
KeyedStateStore保存在StreamingRuntimeContext中,使用KeyedState时,用户自定义函数实现RichFunction接口,在open方法中调用getRuntimeContext().getState方法获取状态:
getRuntimeContext().getState() //获取ValueState
DefaultKeyedStateStore::getState
DefaultKeyedStateStore::getPartitionedState
HeapKeyedStateBackend::getPartitionedState
AbstractKeyedStateBackend::getOrCreateKeyedStateLatencyTrackingStateFactory::createStateAndWrapWithLatencyTrackingIfEnabledTtlStateFactory::createStateAndWrapWithTtlIfEnabled //包装TTLHeapKeyedStateBackend::createInternalStateHeapKeyedStateBackend::tryRegisterStateTable //这里很关键,对每个State创建一个StateTableCopyOnWriteStateTable::new//异步快照,这里传递了当前KeyedStateBackend的InternalKeyContextStateTable::new //根据当前Task管理的KeyGroups数量创建StateMap数组CopyOnWriteStateTable::createStateMap //一个KeyGroup一个StateMapCopyOnWriteStateMap::new //存储key及其对应的状态HeapValueState::createHeapValueState::new //有个成员变量指向存储当前state的CopyOnWriteStateMapHeapValueState::setCurrentNamespace //默认为VoidNamespace
KeyedState有以下几种类型
ValueState<T> getState(ValueStateDescriptor<T> stateProperties) 获取HeapValueStateListState<T> getListState(ListStateDescriptor<T> stateProperties)获取HeapListStateMapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties)获取HeapMapStategetAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties)获取HeapAggregatingStategetReducingState(ReducingStateDescriptor<T> stateProperties)获取HeapReducingState
RocksDBStateBackend
EmbeddedRocksDBStateBackend 管理OperatorState与HashMapStateBackend 一样,也是通过DefaultOperatorStateBackend进行管理的。
EmbeddedRocksDBStateBackend 管理KeyedState则是使用RocksDBKeyedStateBackend实现,这样可以借助磁盘加内存进行大状态管理:
RocksDBValueState
RocksDBListState
RocksDBMapState
RocksDBAggregatingState
RocksDBReducingState
总结
Flink内置状态管理是相比其他分布式流式处理系统最大的优势之一,不用借助外部存储组件,就可实现高效可靠的分布式状态管理,极大降低了学习和使用成本。
相关文章:
Flink源码之State创建流程
StreamOperatorStateHandler 在StreamTask启动初始化时通过StreamTaskStateInitializerImpl::streamOperatorStateContext会为每个StreamOperator 创建keyedStatedBackend和operatorStateBackend,在AbstractStreamOperator中有个StreamOperatorStateHandler成员变量…...
selenium常见等待机制及其特点和使用方法
目录 1、强制等待 2、隐式等待 3、显示等待 1、强制等待 强制等待是在程序中直接调用Thread.sleep(timeout) ,来完成的,该用法的优点是使用起来方便,语法也比较简单,缺点就是需要强制等待固定的时间,可能会造成测试的时间过…...
C++物件数组的常用方法介绍
以下代码建立了一个物件数组Student,并展示了如何计算物件数组的长度,如何从物件数组中找到特定的对象,如何根据数组的不同参数进行排序,以及如何找到最大和最小值。 #include <iostream> #include <algorithm>using…...
云计算:新一代的技术革命
云计算,作为21世纪的一项重要技术革命,已在全球范围内引发了深远的影响。它改变了我们存储和处理数据的方式,使得企业无需再建设和维护昂贵的本地服务器和数据中心。本文将深入探讨云计算的基本概念,类型,主要优点&…...
数据结构—图的应用
6.4图的应用 概念回顾—生成树 生成树:所有顶点均由边连接在一起,但不存在回路的图。 一个图可以有许多棵不同的生成树、含有n个顶点 n-1 条边的图不一定是生成树所有生成树具有以下共同特点 生成树的顶点个数与图的顶点个数相同;生成树是图的…...
Unity 鼠标控制 UI 放大、缩小、拖拽
文章目录 1. 代码2. 测试场景 1. 代码 using UnityEngine; using UnityEngine.UI; using UnityEngine.EventSystems;public class UIDragZoom : MonoBehaviour, IDragHandler, IScrollHandler {private Vector2 originalSize;private Vector2 originalPosition;private RectTr…...
tensorflow 模型计算中,预测错误;权重参数加载
tensorflow 模型计算中,预测错误;权重参数加载 tensorflow 模型计算主要代码(正确代码) linear1_kernel_initializer tf.constant_initializer(numpy.transpose(data["linear1.weight"])) linear1_bias_initializer …...
Jay17 2023.8.14日报 即 留校集训阶段性总结
8.14 打了moeCTF,还剩一题ak Web。 Jay17-集训结束阶段性总结: 集训产出: 自集训开始以来一个半月,最主要做的事情有三。 一是跟课程,复习学过的知识,学习新的知识;目前课程已大体听完&…...
【C语言】小游戏-扫雷(清屏+递归展开+标记)
大家好,我是深鱼~ 目录 一、游戏介绍 二、文件分装 三、代码实现步骤 1.制作简易游戏菜单 2. 初始化棋盘(11*11) 3.打印棋盘(9*9) 4.布置雷 5.计算(x,y)周围8个坐标的和 6.排查雷 <1>清屏后打印棋盘 <2>递归展开 <3>标记雷 四、完整代…...
云服务 Ubuntu 20.04 版本 使用 Nginx 部署静态网页
所需操作: 1.安装Nginx 2.修改配置文件 3.测试、重启 Nginx 4.内部修改防火墙 5.配置解析 6.测试是否部署成功 1.安装Nginx // 未使用 root 账号 apt-get update // 更新apt-get install nginx // 安装 nginx 1.1.测试是否安装没问题 在网页上输入云服务的公网…...
无后效性
动态规划的概念 在上例的多阶段决策问题中,各个阶段采取的决策,一般来说是与时间有关的,决策依赖于当前状态,又随即引起状态的转移,一个决策序列就是在变化的状态中产生出来的,故有“动态”的含义…...
Kubernetes系列-删除deployment和pod
通过deployment创建的pod直接执行delete是不会正常被删除的,因为deployment中设置了pod的数量,deployment会动态维护pod的数量,倘若pod数量少于约定数量,deployment会创建pod,直到pod数量达到约定数量才会停止。 如若…...
kotlin字符串方法
以下是一些常用的 String 方法示例: 1.获取字符串长度: val str "Hello, Kotlin" val length str.length2.字符串比较: val str1 "apple" val str2 "banana" val compareResult str1.compareTo(str2)3…...
ubuntu篇---配置FTP服务,本机和docker安装
ubuntu篇---配置FTP服务 一、本机安装1.1 安装FTP服务器软件1.2 配置FTP服务 二、docker安装(我用的这个)2.1 创建 目录2.2 启动脚本2.3 访问2.4 如何创建一个新的用户2.5 测试2.6 使用 一、本机安装 1.1 安装FTP服务器软件 ubuntu安装vsftp sudo apt…...
SpringBoot中properties、yml、yaml的优先级
原理 配置优先级低的会先加载然后会被配置优先级高的覆盖 验证 创建SpringBoot项目(网址) 在resource目录下创建application.properties、application.yml、application.yaml文件 运行 结论 优先级顺序: properties>yml>yaml...
SHELL 基础 SHELL注释 及 执行SHELL脚本的四种方法
SHELL 脚本编写规范 : 脚本开头 : # 脚本第一行 : #! /bin/bash 或 #!/bin/sh ( 脚本解释器 ) # 程序段开头需要加 版本版权信息 ,例如 : # Date 创建日期 # Author : 作者 # …...
【Spring】深入探索 Spring AOP:概念、使用与实现原理解析
文章目录 前言一、初识 Spring AOP1.1 什么是 AOP1.2 什么是 Spring AOP 二、AOP 的核心概念2.1 切面(Aspect)2.2 切点(Pointcut)2.3 通知(Advice)2.4 连接点(Join Point) 三、Sprin…...
LocalDate介绍和使用
1.什么是 LocalDate? 在我们开始之前,让我先简单介绍一下 LocalDate。它是 Java 8 中引入的日期类,用于表示不带时区信息的日期。也就是说,它专注于日期,并忽略了具体的时间。这样,我们就可以专心解决那些…...
三、使用注解形式开发 Spring MVC程序
文章目录 一、环境准备二、配置 web.xml三、配置 SpringMVC-Servlet.xml ,这里不再使用之前那种写法,直接采用注解配置,引入注解支持,配置视图解析器四、编写 Controller(Controller 和 RequestMapping 注解说明&#…...
【Go】常见的四个内存泄漏问题
Goroutine没有顺利结束 1、这里更多的是由于channelforselect导致的,错误的写法导致了发送者或接收者没有发现channel已经关闭,任务已经结束了,却仍然在尝试输入输出https://geektutu.com/post/hpg-exit-goroutine.html Map的remove方法不会…...
3种方案实现小米智能家居与Home Assistant无缝集成
3种方案实现小米智能家居与Home Assistant无缝集成 【免费下载链接】ha_xiaomi_home Xiaomi Home Integration for Home Assistant 项目地址: https://gitcode.com/GitHub_Trending/ha/ha_xiaomi_home 你是否遇到过智能家居设备品牌碎片化的困扰?是否希望用统…...
【并发心法】别用 volatile 骗自己了!撕碎裸机并发的伪安全,用 C++ Atomics 与内存屏障镇压“乱序执行”的底层叛乱
摘要:在嵌入式 C/C 开发中,99% 的工程师误以为 volatile 是解决中断与主循环并发冲突的万能解药。本文将无情揭露这一长达数十年的认知毒瘤。我们将带你深入现代编译器(GCC/Clang)的优化黑盒与 ARM Cortex 高级内核的流水线深处&a…...
从多项式逼近到优化求解:泰勒展开与拉格朗日乘子的机器学习实践
1. 泰勒展开:机器学习的"局部望远镜" 第一次接触泰勒公式时,我的数学老师用了个有趣的比喻:这就像用乐高积木拼凑复杂雕塑的局部轮廓。在机器学习中,这个思想被广泛应用——当我们面对复杂的损失函数曲面时,…...
虚幻引擎+数字孪生:手把手搭建智慧校园三维可视化平台(附浙江工商大学实战案例)
虚幻引擎数字孪生:从零构建智慧校园三维可视化平台的完整指南 想象一下,清晨走进校园时,管理员已经在三维可视化平台上完成了安防巡查;教务主任通过热力图调整着今天的课程安排;后勤人员正根据实时数据优化能源分配——…...
Flux.1-Dev深海幻境在网络安全领域的应用:恶意流量日志可视化分析
Flux.1-Dev深海幻境在网络安全领域的应用:恶意流量日志可视化分析 每天,安全运维中心的告警大屏上,成千上万条日志像瀑布一样滚动。分析师小李紧盯着屏幕,试图从这些密密麻麻的IP地址、端口号和状态码中,分辨出一次真…...
虚拟机异常断电后卡在initramfs阶段?手把手教你用xfs_repair修复系统分区
1. 虚拟机异常断电的常见后果 最近在调试一个基于KVM的虚拟机集群时,遇到了一个典型问题:机房突然断电后,几台虚拟机重启时卡在了initramfs阶段,屏幕上不断刷出"generating /run/initramfs/rdsosreport.txt"的提示。这种…...
【实战指南】彻底解决conda环境变量配置错误:从报错分析到.bashrc修复
1. 遇到conda环境变量报错怎么办? 刚装完Anaconda/Miniconda,满心欢喜准备大展身手,结果终端里输入conda却蹦出一行刺眼的红色报错:"bash: /opt/conda/bin/conda: No such file or directory"。这种场景我见过太多次了&…...
MusePublic效果展示:多主体构图稳定性测试——双人/三人场景自然互动生成
MusePublic效果展示:多主体构图稳定性测试——双人/三人场景自然互动生成 1. 引言:当AI学会描绘“关系” 在AI绘画的世界里,生成一个栩栩如生的人物已经不再是难事。但当画面中需要同时出现两个、甚至三个人物,并且他们之间要有…...
破解微信小程序video组件的限制:3种禁止拖动进度条的实战方案对比
微信小程序视频播放控制深度解析:3种禁止拖动进度条的工程化方案 在知识付费和在线教育类小程序中,视频内容的完整播放率直接影响知识传递效果。但微信小程序原生video组件的enable-progress-gesture属性仅能禁用触摸手势,无法真正阻止进度条…...
Ubuntu 20.04 下通过 PPA 快速部署 qBittorrent 及配置指南
1. 为什么选择qBittorrent? 如果你经常需要下载大型文件,比如开源系统镜像、影视素材或者游戏资源,那么一个靠谱的BT客户端绝对是刚需。我在Ubuntu上试过各种BT工具,最终发现qBittorrent是最稳定高效的选择。它完全开源免费&#…...
