当前位置: 首页 > news >正文

Flink源码之State创建流程

StreamOperatorStateHandler

在StreamTask启动初始化时通过StreamTaskStateInitializerImpl::streamOperatorStateContext会为每个StreamOperator 创建keyedStatedBackend和operatorStateBackend,在AbstractStreamOperator中有个StreamOperatorStateHandler成员变量,调用AbstractStreamOperator::initializeState方法中会初始化StreamOperatorStateHandler类型的成员变量, StreamOperatorStateHandler对象变量封装了keyedStatedBackend和operatorStateBackend,用于统一管理SteamOperator的状态。

 OperatorChain::initializeStateAndOpenOperators //调用每个Operator的initializeState和Open方法AbstractStreamOperator::initializeState(StreamTaskStateInitializer) StreamTaskStateInitializerImpl::streamOperatorStateContext //此时会创建keyedStatedBackend和operatorStateBackendStreamOperatorStateHandler::new //初始化StreamOperator的stateHandler成员变量,用于状态管理StreamOperatorStateHandler::initializeOperatorStateStateInitializationContextImpl::new //封装DefaultKeyedStateStore和OperatorStateStoreCheckpointedStreamOperator::initializeState(StateInitializationContext)//调用用户定义函数中的initializeState方法,可获取Operator StateStreamingRuntimeContext::setKeyedStateStore

Flink中主要有两种StateBackend:

  • HashMapStateBackend //内存
  • EmbeddedRocksDBStateBackend //内存+磁盘

每个StreamTask一个StateBackend成员变量,在构造函数中进行初始化,通过用户代码中设置或StateBackendLoader::loadStateBackendFromConfig从配置中加载,默认为HashMapStateBackend。简单起见,以HashMapStateBackend为例剖析创建KeyedStatedBackend和OperatorStateBackend以及处理数据流时是如何使用KeyedState和OperatorState的。

OperatorState

OperatorState创建流程:

OperatorChain::initializeStateAndOpenOperators //调用每个Operator的initializeState和Open方法AbstractStreamOperator::initializeStateStreamTaskStateInitializerImpl::streamOperatorStateContextStreamTaskStateInitializerImpl::operatorStateBackendHashMapStateBackend::createOperatorStateBackend //创建DefaultOperatorStateBackendStreamOperatorStateHandler::new //创建StreamOperatorStateHandlerStreamOperatorStateHandler::initializeOperatorState //调用CheckpointedFunction::initializeStateStateInitializationContextImpl::new //该实例可getOperatorStateStore

使用Operator State的用户业务代码需要实现CheckpointedFunction接口,该接口中有以两个下方法:

void initializeState(FunctionInitializationContext context) throws Exception;void snapshotState(FunctionSnapshotContext context) throws Exception;

其中initializeState方法则会被StreamOperatorStateHandler.initializeOperatorState 调用,在initializeState方法中可使用

FunctionInitializationContext.getOperatorStateStore().getListState(ListStateDescriptor)
DefaultOperatorStateBackend::getListState::newPartitionableListState::new  //内部是ArrayList

因此通过OperatorStateStore获取的ListState内部本质上是一个ArrayList, 业务代码中可以调用add方法向这个内部List添加元素,由StateBackend管理每个Operator State,这样就实现了一个分布式状态管理,借助Checkpoint可以实现状态持久化及容灾恢复。

OperatorStateStore有三个获取状态方法:

<S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
<S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
<K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)throws Exception

KeyedState

KeyedState创建流程如下:

OperatorChain::initializeStateAndOpenOperators //调用每个Operator的initializeState和Open方法AbstractStreamOperator::initializeStateStreamTaskStateInitializerImpl::streamOperatorStateContextStreamTaskStateInitializerImpl::keyedStatedBackendHashMapStateBackend::createKeyedStateBackend //创建HeapKeyedStateBackendHeapKeyedStateBackendBuilder::buildInternalKeyContextImpl::new //用于保存当前正在处理的keyStreamOperatorStateHandler::new //创建StreamOperatorStateHandlerDefaultKeyedStateStore::new //创建DefaultKeyedStateStoreStreamingRuntimeContext::setKeyedStateStore //设置keyedStateStore成员变量AbstractStreamUdfOperator::openFunctionUtils::openFunctionRichFunction::open

KeyedStateStore保存在StreamingRuntimeContext中,使用KeyedState时,用户自定义函数实现RichFunction接口,在open方法中调用getRuntimeContext().getState方法获取状态:

getRuntimeContext().getState() //获取ValueState
DefaultKeyedStateStore::getState
DefaultKeyedStateStore::getPartitionedState
HeapKeyedStateBackend::getPartitionedState
AbstractKeyedStateBackend::getOrCreateKeyedStateLatencyTrackingStateFactory::createStateAndWrapWithLatencyTrackingIfEnabledTtlStateFactory::createStateAndWrapWithTtlIfEnabled //包装TTLHeapKeyedStateBackend::createInternalStateHeapKeyedStateBackend::tryRegisterStateTable //这里很关键,对每个State创建一个StateTableCopyOnWriteStateTable::new//异步快照,这里传递了当前KeyedStateBackend的InternalKeyContextStateTable::new //根据当前Task管理的KeyGroups数量创建StateMap数组CopyOnWriteStateTable::createStateMap //一个KeyGroup一个StateMapCopyOnWriteStateMap::new //存储key及其对应的状态HeapValueState::createHeapValueState::new //有个成员变量指向存储当前state的CopyOnWriteStateMapHeapValueState::setCurrentNamespace  //默认为VoidNamespace

KeyedState有以下几种类型

ValueState<T> getState(ValueStateDescriptor<T> stateProperties) 获取HeapValueStateListState<T> getListState(ListStateDescriptor<T> stateProperties)获取HeapListStateMapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties)获取HeapMapStategetAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties)获取HeapAggregatingStategetReducingState(ReducingStateDescriptor<T> stateProperties)获取HeapReducingState

RocksDBStateBackend

EmbeddedRocksDBStateBackend 管理OperatorState与HashMapStateBackend 一样,也是通过DefaultOperatorStateBackend进行管理的。

EmbeddedRocksDBStateBackend 管理KeyedState则是使用RocksDBKeyedStateBackend实现,这样可以借助磁盘加内存进行大状态管理:

RocksDBValueState
RocksDBListState
RocksDBMapState
RocksDBAggregatingState
RocksDBReducingState

总结

Flink内置状态管理是相比其他分布式流式处理系统最大的优势之一,不用借助外部存储组件,就可实现高效可靠的分布式状态管理,极大降低了学习和使用成本。

相关文章:

Flink源码之State创建流程

StreamOperatorStateHandler 在StreamTask启动初始化时通过StreamTaskStateInitializerImpl::streamOperatorStateContext会为每个StreamOperator 创建keyedStatedBackend和operatorStateBackend&#xff0c;在AbstractStreamOperator中有个StreamOperatorStateHandler成员变量…...

selenium常见等待机制及其特点和使用方法

目录 1、强制等待 2、隐式等待 3、显示等待 1、强制等待 强制等待是在程序中直接调用Thread.sleep(timeout) ,来完成的&#xff0c;该用法的优点是使用起来方便&#xff0c;语法也比较简单&#xff0c;缺点就是需要强制等待固定的时间&#xff0c;可能会造成测试的时间过…...

C++物件数组的常用方法介绍

以下代码建立了一个物件数组Student&#xff0c;并展示了如何计算物件数组的长度&#xff0c;如何从物件数组中找到特定的对象&#xff0c;如何根据数组的不同参数进行排序&#xff0c;以及如何找到最大和最小值。 #include <iostream> #include <algorithm>using…...

云计算:新一代的技术革命

云计算&#xff0c;作为21世纪的一项重要技术革命&#xff0c;已在全球范围内引发了深远的影响。它改变了我们存储和处理数据的方式&#xff0c;使得企业无需再建设和维护昂贵的本地服务器和数据中心。本文将深入探讨云计算的基本概念&#xff0c;类型&#xff0c;主要优点&…...

数据结构—图的应用

6.4图的应用 概念回顾—生成树 生成树&#xff1a;所有顶点均由边连接在一起&#xff0c;但不存在回路的图。 一个图可以有许多棵不同的生成树、含有n个顶点 n-1 条边的图不一定是生成树所有生成树具有以下共同特点 生成树的顶点个数与图的顶点个数相同&#xff1b;生成树是图的…...

Unity 鼠标控制 UI 放大、缩小、拖拽

文章目录 1. 代码2. 测试场景 1. 代码 using UnityEngine; using UnityEngine.UI; using UnityEngine.EventSystems;public class UIDragZoom : MonoBehaviour, IDragHandler, IScrollHandler {private Vector2 originalSize;private Vector2 originalPosition;private RectTr…...

tensorflow 模型计算中,预测错误;权重参数加载

tensorflow 模型计算中&#xff0c;预测错误&#xff1b;权重参数加载 tensorflow 模型计算主要代码&#xff08;正确代码&#xff09; linear1_kernel_initializer tf.constant_initializer(numpy.transpose(data["linear1.weight"])) linear1_bias_initializer …...

Jay17 2023.8.14日报 即 留校集训阶段性总结

8.14 打了moeCTF&#xff0c;还剩一题ak Web。 Jay17-集训结束阶段性总结&#xff1a; 集训产出&#xff1a; 自集训开始以来一个半月&#xff0c;最主要做的事情有三。 一是跟课程&#xff0c;复习学过的知识&#xff0c;学习新的知识&#xff1b;目前课程已大体听完&…...

【C语言】小游戏-扫雷(清屏+递归展开+标记)

大家好&#xff0c;我是深鱼~ 目录 一、游戏介绍 二、文件分装 三、代码实现步骤 1.制作简易游戏菜单 2. 初始化棋盘(11*11) 3.打印棋盘(9*9) 4.布置雷 5.计算(x,y)周围8个坐标的和 6.排查雷 <1>清屏后打印棋盘 <2>递归展开 <3>标记雷 四、完整代…...

云服务 Ubuntu 20.04 版本 使用 Nginx 部署静态网页

所需操作&#xff1a; 1.安装Nginx 2.修改配置文件 3.测试、重启 Nginx 4.内部修改防火墙 5.配置解析 6.测试是否部署成功 1.安装Nginx // 未使用 root 账号 apt-get update // 更新apt-get install nginx // 安装 nginx 1.1.测试是否安装没问题 在网页上输入云服务的公网…...

无后效性

动态规划的概念 在上例的多阶段决策问题中&#xff0c;各个阶段采取的决策&#xff0c;一般来说是与时间有关的&#xff0c;决策依赖于当前状态&#xff0c;又随即引起状态的转移&#xff0c;一个决策序列就是在变化的状态中产生出来的&#xff0c;故有“动态”的含义&#xf…...

Kubernetes系列-删除deployment和pod

通过deployment创建的pod直接执行delete是不会正常被删除的&#xff0c;因为deployment中设置了pod的数量&#xff0c;deployment会动态维护pod的数量&#xff0c;倘若pod数量少于约定数量&#xff0c;deployment会创建pod&#xff0c;直到pod数量达到约定数量才会停止。 如若…...

kotlin字符串方法

以下是一些常用的 String 方法示例&#xff1a; 1.获取字符串长度&#xff1a; val str "Hello, Kotlin" val length str.length2.字符串比较&#xff1a; val str1 "apple" val str2 "banana" val compareResult str1.compareTo(str2)3…...

ubuntu篇---配置FTP服务,本机和docker安装

ubuntu篇---配置FTP服务 一、本机安装1.1 安装FTP服务器软件1.2 配置FTP服务 二、docker安装&#xff08;我用的这个&#xff09;2.1 创建 目录2.2 启动脚本2.3 访问2.4 如何创建一个新的用户2.5 测试2.6 使用 一、本机安装 1.1 安装FTP服务器软件 ubuntu安装vsftp sudo apt…...

SpringBoot中properties、yml、yaml的优先级

原理 配置优先级低的会先加载然后会被配置优先级高的覆盖 验证 创建SpringBoot项目&#xff08;网址&#xff09; 在resource目录下创建application.properties、application.yml、application.yaml文件 运行 结论 优先级顺序&#xff1a; properties>yml>yaml...

SHELL 基础 SHELL注释 及 执行SHELL脚本的四种方法

SHELL 脚本编写规范 &#xff1a; 脚本开头 &#xff1a; # 脚本第一行 &#xff1a; #! /bin/bash 或 #!/bin/sh &#xff08; 脚本解释器 &#xff09; # 程序段开头需要加 版本版权信息 &#xff0c;例如 &#xff1a; # Date 创建日期 # Author : 作者 # …...

【Spring】深入探索 Spring AOP:概念、使用与实现原理解析

文章目录 前言一、初识 Spring AOP1.1 什么是 AOP1.2 什么是 Spring AOP 二、AOP 的核心概念2.1 切面&#xff08;Aspect&#xff09;2.2 切点&#xff08;Pointcut&#xff09;2.3 通知&#xff08;Advice&#xff09;2.4 连接点&#xff08;Join Point&#xff09; 三、Sprin…...

LocalDate介绍和使用

1.什么是 LocalDate&#xff1f; 在我们开始之前&#xff0c;让我先简单介绍一下 LocalDate。它是 Java 8 中引入的日期类&#xff0c;用于表示不带时区信息的日期。也就是说&#xff0c;它专注于日期&#xff0c;并忽略了具体的时间。这样&#xff0c;我们就可以专心解决那些…...

三、使用注解形式开发 Spring MVC程序

文章目录 一、环境准备二、配置 web.xml三、配置 SpringMVC-Servlet.xml &#xff0c;这里不再使用之前那种写法&#xff0c;直接采用注解配置&#xff0c;引入注解支持&#xff0c;配置视图解析器四、编写 Controller&#xff08;Controller 和 RequestMapping 注解说明&#…...

【Go】常见的四个内存泄漏问题

Goroutine没有顺利结束 1、这里更多的是由于channelforselect导致的&#xff0c;错误的写法导致了发送者或接收者没有发现channel已经关闭&#xff0c;任务已经结束了&#xff0c;却仍然在尝试输入输出https://geektutu.com/post/hpg-exit-goroutine.html Map的remove方法不会…...

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…...

反向工程与模型迁移:打造未来商品详情API的可持续创新体系

在电商行业蓬勃发展的当下&#xff0c;商品详情API作为连接电商平台与开发者、商家及用户的关键纽带&#xff0c;其重要性日益凸显。传统商品详情API主要聚焦于商品基本信息&#xff08;如名称、价格、库存等&#xff09;的获取与展示&#xff0c;已难以满足市场对个性化、智能…...

VB.net复制Ntag213卡写入UID

本示例使用的发卡器&#xff1a;https://item.taobao.com/item.htm?ftt&id615391857885 一、读取旧Ntag卡的UID和数据 Private Sub Button15_Click(sender As Object, e As EventArgs) Handles Button15.Click轻松读卡技术支持:网站:Dim i, j As IntegerDim cardidhex, …...

Spring数据访问模块设计

前面我们已经完成了IoC和web模块的设计&#xff0c;聪明的码友立马就知道了&#xff0c;该到数据访问模块了&#xff0c;要不就这俩玩个6啊&#xff0c;查库势在必行&#xff0c;至此&#xff0c;它来了。 一、核心设计理念 1、痛点在哪 应用离不开数据&#xff08;数据库、No…...

优选算法第十二讲:队列 + 宽搜 优先级队列

优选算法第十二讲&#xff1a;队列 宽搜 && 优先级队列 1.N叉树的层序遍历2.二叉树的锯齿型层序遍历3.二叉树最大宽度4.在每个树行中找最大值5.优先级队列 -- 最后一块石头的重量6.数据流中的第K大元素7.前K个高频单词8.数据流的中位数 1.N叉树的层序遍历 2.二叉树的锯…...

Java编程之桥接模式

定义 桥接模式&#xff08;Bridge Pattern&#xff09;属于结构型设计模式&#xff0c;它的核心意图是将抽象部分与实现部分分离&#xff0c;使它们可以独立地变化。这种模式通过组合关系来替代继承关系&#xff0c;从而降低了抽象和实现这两个可变维度之间的耦合度。 用例子…...

【Android】Android 开发 ADB 常用指令

查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...

MySQL 部分重点知识篇

一、数据库对象 1. 主键 定义 &#xff1a;主键是用于唯一标识表中每一行记录的字段或字段组合。它具有唯一性和非空性特点。 作用 &#xff1a;确保数据的完整性&#xff0c;便于数据的查询和管理。 示例 &#xff1a;在学生信息表中&#xff0c;学号可以作为主键&#xff…...

篇章二 论坛系统——系统设计

目录 2.系统设计 2.1 技术选型 2.2 设计数据库结构 2.2.1 数据库实体 1. 数据库设计 1.1 数据库名: forum db 1.2 表的设计 1.3 编写SQL 2.系统设计 2.1 技术选型 2.2 设计数据库结构 2.2.1 数据库实体 通过需求分析获得概念类并结合业务实现过程中的技术需要&#x…...

写一个shell脚本,把局域网内,把能ping通的IP和不能ping通的IP分类,并保存到两个文本文件里

写一个shell脚本&#xff0c;把局域网内&#xff0c;把能ping通的IP和不能ping通的IP分类&#xff0c;并保存到两个文本文件里 脚本1 #!/bin/bash #定义变量 ip10.1.1 #循环去ping主机的IP for ((i1;i<10;i)) doping -c1 $ip.$i &>/dev/null[ $? -eq 0 ] &&am…...