当前位置: 首页 > news >正文

Flink源码之State创建流程

StreamOperatorStateHandler

在StreamTask启动初始化时通过StreamTaskStateInitializerImpl::streamOperatorStateContext会为每个StreamOperator 创建keyedStatedBackend和operatorStateBackend,在AbstractStreamOperator中有个StreamOperatorStateHandler成员变量,调用AbstractStreamOperator::initializeState方法中会初始化StreamOperatorStateHandler类型的成员变量, StreamOperatorStateHandler对象变量封装了keyedStatedBackend和operatorStateBackend,用于统一管理SteamOperator的状态。

 OperatorChain::initializeStateAndOpenOperators //调用每个Operator的initializeState和Open方法AbstractStreamOperator::initializeState(StreamTaskStateInitializer) StreamTaskStateInitializerImpl::streamOperatorStateContext //此时会创建keyedStatedBackend和operatorStateBackendStreamOperatorStateHandler::new //初始化StreamOperator的stateHandler成员变量,用于状态管理StreamOperatorStateHandler::initializeOperatorStateStateInitializationContextImpl::new //封装DefaultKeyedStateStore和OperatorStateStoreCheckpointedStreamOperator::initializeState(StateInitializationContext)//调用用户定义函数中的initializeState方法,可获取Operator StateStreamingRuntimeContext::setKeyedStateStore

Flink中主要有两种StateBackend:

  • HashMapStateBackend //内存
  • EmbeddedRocksDBStateBackend //内存+磁盘

每个StreamTask一个StateBackend成员变量,在构造函数中进行初始化,通过用户代码中设置或StateBackendLoader::loadStateBackendFromConfig从配置中加载,默认为HashMapStateBackend。简单起见,以HashMapStateBackend为例剖析创建KeyedStatedBackend和OperatorStateBackend以及处理数据流时是如何使用KeyedState和OperatorState的。

OperatorState

OperatorState创建流程:

OperatorChain::initializeStateAndOpenOperators //调用每个Operator的initializeState和Open方法AbstractStreamOperator::initializeStateStreamTaskStateInitializerImpl::streamOperatorStateContextStreamTaskStateInitializerImpl::operatorStateBackendHashMapStateBackend::createOperatorStateBackend //创建DefaultOperatorStateBackendStreamOperatorStateHandler::new //创建StreamOperatorStateHandlerStreamOperatorStateHandler::initializeOperatorState //调用CheckpointedFunction::initializeStateStateInitializationContextImpl::new //该实例可getOperatorStateStore

使用Operator State的用户业务代码需要实现CheckpointedFunction接口,该接口中有以两个下方法:

void initializeState(FunctionInitializationContext context) throws Exception;void snapshotState(FunctionSnapshotContext context) throws Exception;

其中initializeState方法则会被StreamOperatorStateHandler.initializeOperatorState 调用,在initializeState方法中可使用

FunctionInitializationContext.getOperatorStateStore().getListState(ListStateDescriptor)
DefaultOperatorStateBackend::getListState::newPartitionableListState::new  //内部是ArrayList

因此通过OperatorStateStore获取的ListState内部本质上是一个ArrayList, 业务代码中可以调用add方法向这个内部List添加元素,由StateBackend管理每个Operator State,这样就实现了一个分布式状态管理,借助Checkpoint可以实现状态持久化及容灾恢复。

OperatorStateStore有三个获取状态方法:

<S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
<S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
<K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)throws Exception

KeyedState

KeyedState创建流程如下:

OperatorChain::initializeStateAndOpenOperators //调用每个Operator的initializeState和Open方法AbstractStreamOperator::initializeStateStreamTaskStateInitializerImpl::streamOperatorStateContextStreamTaskStateInitializerImpl::keyedStatedBackendHashMapStateBackend::createKeyedStateBackend //创建HeapKeyedStateBackendHeapKeyedStateBackendBuilder::buildInternalKeyContextImpl::new //用于保存当前正在处理的keyStreamOperatorStateHandler::new //创建StreamOperatorStateHandlerDefaultKeyedStateStore::new //创建DefaultKeyedStateStoreStreamingRuntimeContext::setKeyedStateStore //设置keyedStateStore成员变量AbstractStreamUdfOperator::openFunctionUtils::openFunctionRichFunction::open

KeyedStateStore保存在StreamingRuntimeContext中,使用KeyedState时,用户自定义函数实现RichFunction接口,在open方法中调用getRuntimeContext().getState方法获取状态:

getRuntimeContext().getState() //获取ValueState
DefaultKeyedStateStore::getState
DefaultKeyedStateStore::getPartitionedState
HeapKeyedStateBackend::getPartitionedState
AbstractKeyedStateBackend::getOrCreateKeyedStateLatencyTrackingStateFactory::createStateAndWrapWithLatencyTrackingIfEnabledTtlStateFactory::createStateAndWrapWithTtlIfEnabled //包装TTLHeapKeyedStateBackend::createInternalStateHeapKeyedStateBackend::tryRegisterStateTable //这里很关键,对每个State创建一个StateTableCopyOnWriteStateTable::new//异步快照,这里传递了当前KeyedStateBackend的InternalKeyContextStateTable::new //根据当前Task管理的KeyGroups数量创建StateMap数组CopyOnWriteStateTable::createStateMap //一个KeyGroup一个StateMapCopyOnWriteStateMap::new //存储key及其对应的状态HeapValueState::createHeapValueState::new //有个成员变量指向存储当前state的CopyOnWriteStateMapHeapValueState::setCurrentNamespace  //默认为VoidNamespace

KeyedState有以下几种类型

ValueState<T> getState(ValueStateDescriptor<T> stateProperties) 获取HeapValueStateListState<T> getListState(ListStateDescriptor<T> stateProperties)获取HeapListStateMapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties)获取HeapMapStategetAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties)获取HeapAggregatingStategetReducingState(ReducingStateDescriptor<T> stateProperties)获取HeapReducingState

RocksDBStateBackend

EmbeddedRocksDBStateBackend 管理OperatorState与HashMapStateBackend 一样,也是通过DefaultOperatorStateBackend进行管理的。

EmbeddedRocksDBStateBackend 管理KeyedState则是使用RocksDBKeyedStateBackend实现,这样可以借助磁盘加内存进行大状态管理:

RocksDBValueState
RocksDBListState
RocksDBMapState
RocksDBAggregatingState
RocksDBReducingState

总结

Flink内置状态管理是相比其他分布式流式处理系统最大的优势之一,不用借助外部存储组件,就可实现高效可靠的分布式状态管理,极大降低了学习和使用成本。

相关文章:

Flink源码之State创建流程

StreamOperatorStateHandler 在StreamTask启动初始化时通过StreamTaskStateInitializerImpl::streamOperatorStateContext会为每个StreamOperator 创建keyedStatedBackend和operatorStateBackend&#xff0c;在AbstractStreamOperator中有个StreamOperatorStateHandler成员变量…...

selenium常见等待机制及其特点和使用方法

目录 1、强制等待 2、隐式等待 3、显示等待 1、强制等待 强制等待是在程序中直接调用Thread.sleep(timeout) ,来完成的&#xff0c;该用法的优点是使用起来方便&#xff0c;语法也比较简单&#xff0c;缺点就是需要强制等待固定的时间&#xff0c;可能会造成测试的时间过…...

C++物件数组的常用方法介绍

以下代码建立了一个物件数组Student&#xff0c;并展示了如何计算物件数组的长度&#xff0c;如何从物件数组中找到特定的对象&#xff0c;如何根据数组的不同参数进行排序&#xff0c;以及如何找到最大和最小值。 #include <iostream> #include <algorithm>using…...

云计算:新一代的技术革命

云计算&#xff0c;作为21世纪的一项重要技术革命&#xff0c;已在全球范围内引发了深远的影响。它改变了我们存储和处理数据的方式&#xff0c;使得企业无需再建设和维护昂贵的本地服务器和数据中心。本文将深入探讨云计算的基本概念&#xff0c;类型&#xff0c;主要优点&…...

数据结构—图的应用

6.4图的应用 概念回顾—生成树 生成树&#xff1a;所有顶点均由边连接在一起&#xff0c;但不存在回路的图。 一个图可以有许多棵不同的生成树、含有n个顶点 n-1 条边的图不一定是生成树所有生成树具有以下共同特点 生成树的顶点个数与图的顶点个数相同&#xff1b;生成树是图的…...

Unity 鼠标控制 UI 放大、缩小、拖拽

文章目录 1. 代码2. 测试场景 1. 代码 using UnityEngine; using UnityEngine.UI; using UnityEngine.EventSystems;public class UIDragZoom : MonoBehaviour, IDragHandler, IScrollHandler {private Vector2 originalSize;private Vector2 originalPosition;private RectTr…...

tensorflow 模型计算中,预测错误;权重参数加载

tensorflow 模型计算中&#xff0c;预测错误&#xff1b;权重参数加载 tensorflow 模型计算主要代码&#xff08;正确代码&#xff09; linear1_kernel_initializer tf.constant_initializer(numpy.transpose(data["linear1.weight"])) linear1_bias_initializer …...

Jay17 2023.8.14日报 即 留校集训阶段性总结

8.14 打了moeCTF&#xff0c;还剩一题ak Web。 Jay17-集训结束阶段性总结&#xff1a; 集训产出&#xff1a; 自集训开始以来一个半月&#xff0c;最主要做的事情有三。 一是跟课程&#xff0c;复习学过的知识&#xff0c;学习新的知识&#xff1b;目前课程已大体听完&…...

【C语言】小游戏-扫雷(清屏+递归展开+标记)

大家好&#xff0c;我是深鱼~ 目录 一、游戏介绍 二、文件分装 三、代码实现步骤 1.制作简易游戏菜单 2. 初始化棋盘(11*11) 3.打印棋盘(9*9) 4.布置雷 5.计算(x,y)周围8个坐标的和 6.排查雷 <1>清屏后打印棋盘 <2>递归展开 <3>标记雷 四、完整代…...

云服务 Ubuntu 20.04 版本 使用 Nginx 部署静态网页

所需操作&#xff1a; 1.安装Nginx 2.修改配置文件 3.测试、重启 Nginx 4.内部修改防火墙 5.配置解析 6.测试是否部署成功 1.安装Nginx // 未使用 root 账号 apt-get update // 更新apt-get install nginx // 安装 nginx 1.1.测试是否安装没问题 在网页上输入云服务的公网…...

无后效性

动态规划的概念 在上例的多阶段决策问题中&#xff0c;各个阶段采取的决策&#xff0c;一般来说是与时间有关的&#xff0c;决策依赖于当前状态&#xff0c;又随即引起状态的转移&#xff0c;一个决策序列就是在变化的状态中产生出来的&#xff0c;故有“动态”的含义&#xf…...

Kubernetes系列-删除deployment和pod

通过deployment创建的pod直接执行delete是不会正常被删除的&#xff0c;因为deployment中设置了pod的数量&#xff0c;deployment会动态维护pod的数量&#xff0c;倘若pod数量少于约定数量&#xff0c;deployment会创建pod&#xff0c;直到pod数量达到约定数量才会停止。 如若…...

kotlin字符串方法

以下是一些常用的 String 方法示例&#xff1a; 1.获取字符串长度&#xff1a; val str "Hello, Kotlin" val length str.length2.字符串比较&#xff1a; val str1 "apple" val str2 "banana" val compareResult str1.compareTo(str2)3…...

ubuntu篇---配置FTP服务,本机和docker安装

ubuntu篇---配置FTP服务 一、本机安装1.1 安装FTP服务器软件1.2 配置FTP服务 二、docker安装&#xff08;我用的这个&#xff09;2.1 创建 目录2.2 启动脚本2.3 访问2.4 如何创建一个新的用户2.5 测试2.6 使用 一、本机安装 1.1 安装FTP服务器软件 ubuntu安装vsftp sudo apt…...

SpringBoot中properties、yml、yaml的优先级

原理 配置优先级低的会先加载然后会被配置优先级高的覆盖 验证 创建SpringBoot项目&#xff08;网址&#xff09; 在resource目录下创建application.properties、application.yml、application.yaml文件 运行 结论 优先级顺序&#xff1a; properties>yml>yaml...

SHELL 基础 SHELL注释 及 执行SHELL脚本的四种方法

SHELL 脚本编写规范 &#xff1a; 脚本开头 &#xff1a; # 脚本第一行 &#xff1a; #! /bin/bash 或 #!/bin/sh &#xff08; 脚本解释器 &#xff09; # 程序段开头需要加 版本版权信息 &#xff0c;例如 &#xff1a; # Date 创建日期 # Author : 作者 # …...

【Spring】深入探索 Spring AOP:概念、使用与实现原理解析

文章目录 前言一、初识 Spring AOP1.1 什么是 AOP1.2 什么是 Spring AOP 二、AOP 的核心概念2.1 切面&#xff08;Aspect&#xff09;2.2 切点&#xff08;Pointcut&#xff09;2.3 通知&#xff08;Advice&#xff09;2.4 连接点&#xff08;Join Point&#xff09; 三、Sprin…...

LocalDate介绍和使用

1.什么是 LocalDate&#xff1f; 在我们开始之前&#xff0c;让我先简单介绍一下 LocalDate。它是 Java 8 中引入的日期类&#xff0c;用于表示不带时区信息的日期。也就是说&#xff0c;它专注于日期&#xff0c;并忽略了具体的时间。这样&#xff0c;我们就可以专心解决那些…...

三、使用注解形式开发 Spring MVC程序

文章目录 一、环境准备二、配置 web.xml三、配置 SpringMVC-Servlet.xml &#xff0c;这里不再使用之前那种写法&#xff0c;直接采用注解配置&#xff0c;引入注解支持&#xff0c;配置视图解析器四、编写 Controller&#xff08;Controller 和 RequestMapping 注解说明&#…...

【Go】常见的四个内存泄漏问题

Goroutine没有顺利结束 1、这里更多的是由于channelforselect导致的&#xff0c;错误的写法导致了发送者或接收者没有发现channel已经关闭&#xff0c;任务已经结束了&#xff0c;却仍然在尝试输入输出https://geektutu.com/post/hpg-exit-goroutine.html Map的remove方法不会…...

汇智信科-机场数字孪生系统

机场数字孪生系统以数字化孪生技术构建机场全要素虚拟映射&#xff0c;精准还原机场、跑道、塔台等设施及飞机运行状态&#xff0c;支持多维度动态监测与可视化管控&#xff1b;通过模拟飞机调度、跑道滑行等全流程作业场景&#xff0c;覆盖机场多角色业务协同&#xff0c;同时…...

竞赛获奖保研加分测评:除了挑战杯,哪些垂直赛事含金量更高?

在 2026 年推免&#xff08;保研&#xff09;竞争进入白热化的背景下&#xff0c;工科学子的加分项已不仅仅是绩点的博弈&#xff0c;更是工程实战能力的短兵相接。随着教育部《关于加强新时代卓越工程师培养的指导意见》的深入实施&#xff0c;各大名校对人才的评价标准正从“…...

Linux远程连接工具评测与选型指南

1. Linux远程连接工具概述作为一名嵌入式Linux开发者&#xff0c;我每天都需要通过远程连接工具访问各种开发板和服务器。在多年的实践中&#xff0c;我尝试过市面上几乎所有主流的远程终端工具&#xff0c;深知每款工具的特点和适用场景。选择一款合适的远程连接工具&#xff…...

ReplaceItems:批量设计元素智能替换引擎 — 献给追求极致效率的UI设计师

ReplaceItems&#xff1a;批量设计元素智能替换引擎 — 献给追求极致效率的UI设计师 【免费下载链接】illustrator-scripts Adobe Illustrator scripts 项目地址: https://gitcode.com/gh_mirrors/il/illustrator-scripts 设计效率瓶颈诊断&#xff1a;为何手动替换如此…...

泰金新能科创板上市:市值79亿 预计第一季净利降幅超45%

雷递网 雷建平 3月31日西安泰金新能科技股份有限公司&#xff08;简称&#xff1a;“泰金新能”&#xff0c;股票代码&#xff1a;“688813”&#xff09;今日在上交所上市。泰金新能发行价为26.28元/股&#xff0c;发行4000万股&#xff0c;募资总额为10.51亿元。泰金新能开盘…...

DeepSeek-Coder-V2终极指南:如何免费打造你的专属AI编程助手

DeepSeek-Coder-V2终极指南&#xff1a;如何免费打造你的专属AI编程助手 【免费下载链接】DeepSeek-Coder-V2 DeepSeek-Coder-V2: Breaking the Barrier of Closed-Source Models in Code Intelligence 项目地址: https://gitcode.com/GitHub_Trending/de/DeepSeek-Coder-V2 …...

牙科手术显微镜市场:其中中国市场占比超15%

在口腔诊疗向精细化、微创化演进的进程中&#xff0c;牙科手术显微镜作为核心光学放大设备&#xff0c;凭借其高照度、高景深与高清晰度特性&#xff0c;成为提升根管治疗、牙周手术及种植修复等环节精准性的关键工具。该设备集成连续变倍观察、同轴照明、术野调焦及影像记录系…...

RAG系统的需求分析

这个是一个基于私有知识库的智能对话平台&#xff0c;允许用户上传文档构建专属知识库&#xff0c;并通过自然语言交互的方式查询和获取知识。它结合了大语言模型和向量检索技术&#xff0c;让用户通过对话的形式与自己的知识库进行高效交互应用场景个人用户场景:学习助手&…...

DeepSeek句式重构指令怎么用?手把手教你降AI率超过30%

第一次操作的话&#xff0c;照着下面的步骤来&#xff0c;15分钟内搞定DeepSeek句式重构指令、降AI、降AIGC率。 工具选嘎嘎降AI&#xff08;www.aigcleaner.com&#xff09;&#xff0c;达标率99.26%&#xff0c;有退款保障&#xff0c;操作也不复杂。 准备工作 需要准备的&…...

ANR-WatchDog源码深度剖析:从线程监控到错误抛出的完整实现

ANR-WatchDog源码深度剖析&#xff1a;从线程监控到错误抛出的完整实现 【免费下载链接】ANR-WatchDog A simple watchdog that detects Android ANR (Application Not Responding) error and throws a meaningful exception 项目地址: https://gitcode.com/gh_mirrors/an/AN…...