Flink中KeyedStateStore实现--怎么做到一个Key对应一个State
背景
在Flink中有两种基本的状态:Keyed State和Operator State,Operator State很好理解,一个特定的Operator算子共享同一个state,这是实现层面很好做到的。
但是 Keyed State 是怎么实现的?一般来说,正常的人第一眼就会想到:一个task绑定一个Keyd State,从网上随便查找资料就能发现正确的答案是:对于每一个Key会绑定一个State,但是这在Flink中是怎么实现的呢?
注意:这里我们只讲Flink中是怎么实现一个Key对应一个State的,其他细节并不细说,且state的backend为RocksDB
闲说杂谈
我们以ValueState类型的Keyed State举例:
ValueStateDescriptor<HoodieRecordGlobalLocation> indexStateDesc =new ValueStateDescriptor<>("indexState",TypeInformation.of(HoodieRecordGlobalLocation.class));
ValueState<HoodieRecordGlobalLocation> indexState = context.getKeyedStateStore().getState(indexStateDesc)
....
indexState.update((HoodieRecordGlobalLocation) indexRecord.getCurrentLocation())
-
context.getKeyedStateStore().getState是获取对应key的State,最终的调用链如下:
DefaultKeyedStateStore.getState -> getPartitionedState||\/RocksDBKeyedStateBackend.getPartitionedState -> getOrCreateKeyedState -> createInternalState -> tryRegisterKvStateInformation||\/RocksDBValueState.create(创建RocksDBValueState)这里的 tryRegisterKvStateInformation会涉及到RocksDB ColumnFamily的创建:
RocksDBOperationUtils.createStateInfo -> createColumnFamilyDescriptor // createColumnFamilyDescriptor的部分代码: ColumnFamilyOptions options =createColumnFamilyOptions(columnFamilyOptionsFactory, metaInfoBase.getName()); if (ttlCompactFiltersManager != null) {ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(metaInfoBase, options); } byte[] nameBytes = metaInfoBase.getName().getBytes(ConfigConstants.DEFAULT_CHARSET); ... return new ColumnFamilyDescriptor(nameBytes, options);其实最终会发现RocksDB的ColumnFamily是跟ValueStateDescriptor也就是描述符的名字有关的,这就是为什么描述符必须是唯一的,关于RocksDB的ColumnFamily,可以参考RocksDB 简介
注意此时返回是key对应的一个State的ColumnFamily,该Family包括该task所有的key的value值 -
indexState.update 这里是更新indexState得值
因为上一步得到只是该Task所对应的ColumanFamily所对应的所有的values,也就是* Flink中的Key-Groups*,(关于Key-Groups可以参考Apache-Flink深度解析-State)public void update(V value) {if (value == null) {clear();return;}try {backend.db.put(columnFamily,writeOptions,serializeCurrentKeyWithGroupAndNamespace(),serializeValue(value));} catch (Exception e) {throw new FlinkRuntimeException("Error while adding data to RocksDB", e);}}最终的调用链如下:
RocksDBValueState.update -> serializeCurrentKeyWithGroupAndNamespace||\/ SerializedCompositeKeyBuilder.buildCompositeKeyNamespace||\/ serializeNamespace(namespace, namespaceSerializer) -> keyOutView.getCopyOfBuffer()这里的keyOutView.getCopyOfBuffer是会获得的record的key,所以在backend.db.put方法中才会更新对应的Key值。
但是什么时候Record的key信息会被写入到keyOutView中去呢? -
Record的key何时被写到keyOutView中
AbstractStreamTaskNetworkInput.emitNext -> processElement||\/ OneInputStreamTask.emitRecord||\/ OneInputStreamOperator.setKeyContextElement -> setKeyContextElement1 -> setKeyContextElement||\/ AbstractStreamOperator.setCurrentKey||\/ StreamOperatorStateHandler.setCurrentKey||\/ RocksDBKeyedStateBackend.setCurrentKey||\/ SerializedCompositeKeyBuilder.setCurrentKey -> serializeKeyGroupAndKey||\/ keySerializer.serialize(key, keyOutView);最后一步keySerializer.serialize(key, keyOutView)一个Record的key就被写到keyOutView中,也就是说对应的key是从每个record中获取的,所以在backend.db.put方法中就能获取到对应的Key
其他
对于keyedStateStore是在哪里初始化的,可以看AbstractStreamOperator中initializeState方法:
final StreamOperatorStateContext context =streamTaskStateManager.streamOperatorStateContext(getOperatorID(),getClass().getSimpleName(),getProcessingTimeService(),this,keySerializer,streamTaskCloseableRegistry,metrics,config.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.STATE_BACKEND,runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),runtimeContext.getUserCodeClassLoader()),isUsingCustomRawKeyedState());stateHandler =new StreamOperatorStateHandler(context, getExecutionConfig(), streamTaskCloseableRegistry);
这个方法里也包括了keyedStatedBackend和operatorStateBackend等初始化, 具体的细节后续再解析。
相关文章:
Flink中KeyedStateStore实现--怎么做到一个Key对应一个State
背景 在Flink中有两种基本的状态:Keyed State和Operator State,Operator State很好理解,一个特定的Operator算子共享同一个state,这是实现层面很好做到的。 但是 Keyed State 是怎么实现的?一般来说,正常的…...
flex: 0 0 100%;
flex: 0 0 100%; flex: 0 0 100%; 是一个用于设置flex项的flex-grow、flex-shrink和flex-basis属性的缩写flex-grow:指定了flex项在剩余空间中的放大比例,默认为0,表示不放大。在这个例子中,设置为0表示不允许flex项在水平方向上…...
IMX6ULL系统移植篇-镜像烧写方法
一. 烧录镜像简介 本文我们就来学习:windows 系统下烧录镜像的方法。 如何使用 NXP 官方提供的 MfgTool 工具通过 USB OTG 口来 烧写系统。 二. windows下烧录镜像 1. 烧录镜像前准备工作 (1)从开发板上拔下 SD卡。 (2…...
【Android】实现雷达扫描效果,使用自定义View来绘制雷达扫描动画
要在Android上实现雷达扫描效果,你可以使用自定义View来绘制雷达扫描动画。以下是一个简单的示例代码: 创建一个名为RadarView的自定义View类,继承自View: import android.content.Context; import android.graphics.Canvas; im…...
小程序 - 文件预览
小程序文件预览 /** 预览 - txt文本 */viewTxt(path) {let fs wx.getFileSystemManager();let _this this;fs.readFile({filePath: path,encoding: "utf8",position: 0,success(res) {_this.setData({setNoRefresh: true});wx.navigateTo({url: /pages/view-txt/v…...
将String类型的证书转换为X509Certificate类型对象,读取证书链文件内容,完成证书链校验
证书内容如下所示: 证书内容如下 -----BEGIN CERTIFICATE----- MIIFZDCCA0ygAwIBAgIIYsLLTehAXpYwDQYJKoZIhvcNAQELBQAwUDELMAkGA1UEBhMCQ04xDzANBgNVBAoMBkh1YXdlaTETMBEGA1UECwwKSHVhd2VpIENCRzEbMBkGA1UEAwwSSHVhd2VpIENCRyBSb290IENBMB4XDTE3MDgyMTEwNTYyN1oXDTQyMDgxNTEw…...
v-model实现原理(一根绳上的蚂蚱)
目录 1、什么是v-model2、v-model实现原理3、实现示例3.1 实现text和textarea3.2 实现checkbox和radio3.3 实现select 1、什么是v-model v-model 本质上是一颗语法糖,可以用 v-model 指令在表单 <input>、<textarea> 及 <select>元素上创建双向数…...
第三章 仅支持追加的单表内存数据库
第三章 仅支持追加的单表内存数据库 我们将从小处着手,对数据库施加很多限制。目前,它有如下限制: 支持两种操作:插入一行和打印所有行 仅驻留在内存中(不需要持久化到磁盘) 支持单个硬编码表 我们的硬…...
抖音seo矩阵系统源码解析
抖音SEO矩阵系统源码是一种用于优化抖音视频内容的工具,可以帮助用户提高抖音视频的搜索排名和流量,从而增加视频曝光和转化率。该系统包括两部分,即数据收集和分析模块以及SEO策略和实施模块。 数据收集和分析模块主要负责从抖音平台上收集…...
6个ChatGPT4的最佳用途
文章目录 ChatGPT 4’s Current Limitations ChatGPT 4 的当前限制1. Crafting Complex Prompts 制作复杂的提示2. Logic Problems 逻辑问题3. Verifying GPT 3.5 Text 验证 GPT 3.5 文本4. Complex Coding 复杂编码5.Nuanced Text Transformation 细微的文本转换6. Complex Kn…...
go系列-读取文件
1 概述 2 整个文件读入内存 直接将数据直接读取入内存,是效率最高的一种方式,但此种方式,仅适用于小文件,对于大文件,则不适合,因为比较浪费内存。 2.1 直接指定文化名读取 在 Go 1.16 开始,i…...
10 编码转换问题
文章目录 字符编码问题编码转换问题ANSI转UnicodeUnicode转ANSIUtf8转 ANSIutf8 转UnicodeANSI 转UTF-8Unicode 转 UTF-8 全部代码 字符编码问题 Windows API 函数 MessageBoxA:MessageBox 内部实现,字符串编码(ANSI)转换成了Unicode,在调用MessageboxW MessageBox:…...
Spring MVC获取参数和自定义参数类型转换器及编码过滤器
目录 一、使用Servlet原生对象获取参数 1.1 控制器方法 1.2 测试结果 二、自定义参数类型转换器 2.1 编写类型转换器类 2.2 注册类型转换器对象 2.3 测试结果 三、编码过滤器 3.1 JSP表单 3.2 控制器方法 3.3 配置过滤器 3.4 测试结果 往期专栏&文章相关导读…...
理想的实验
1.关于“问题”的问题 一项研究计划可以围绕四个基本问题(frequently asked questions,FAQ)展开: 研究对象间的(因果)关系(relationship of interest) 这里更关注的是“因果关系”,…...
nginx配置开机启动(Windows环境)
文章目录 1、下载nginx,并解压2、配置nginx.conf,并启动Nginx3、开机自启动 1、下载nginx,并解压 2、配置nginx.conf,并启动Nginx 两种方法: 方法一:直接双击nginx.exe,双击后一个黑色弹窗一闪…...
MySQL 基础面试题02(事务索引)
1.什么是 MySQL 事务? MySQL 事务是指一组操作,是一个不可分割的工作单位,可以确保一组数据库操作要么全部执行,要么全部不执行。换句话说,事务是 MySQL 中保证数据一致性和完整性的机制。 在 MySQL 中,事…...
主从架构lua脚本-Redis(四)
上篇文章介绍了rdb、aof持久化。 持久化RDB/AOF-Redis(三)https://blog.csdn.net/ke1ying/article/details/131148269 redis数据备份策略 写job每小时copy一份到其他目录。目录里可以保留最近一个月数据。把目录日志保存到其他服务器,防止机…...
maven与idea版本适配问题
maven与idea版本适配问题 1.版本对应关系——3.6.3 注意:针对一些老项目 还是尽量采用 3.6.3版本,针对idea各个版本的兼容性就很兼容 0.IDEA 2022 兼容maven 3.8.1及之前的所用版本 1.IDEA 2021 兼容maven 3.8.1及之前的所用版本 2.IDEA 2020 兼容Mave…...
ChatGPT扫盲知识库
本文并不是教你如何使用ChatGPT,而是帮助小白理清一些与ChatGPT相关的概念,并解释一些常见的问题。 概念 OpenAI: 一家人工智能公司,ChatGPT属于该公司的产品之一。前身是一个非盈利组织,不过目前已经转变为一家商业公司。 GPT: O…...
chatgpt赋能python:Python轨迹可视化:用数据讲故事
Python轨迹可视化:用数据讲故事 介绍 随着物联网、智能城市等领域的发展,越来越多的数据被收集下来并存储在数据库中。这些数据对于决策者来说是非常重要的,但是如何将这些数据进行展示和分析呢?这时候Python轨迹可视化就可以派…...
基于DeepSeek的本地部署AI智能体:锁脸功能实现完整方案
基于DeepSeek的本地部署AI智能体:锁脸功能实现完整方案 一、项目概述与架构设计 1.1 任务目标 开发一个具有锁脸功能的AI智能体,能够: 完全本地部署,无需依赖云端服务 锁定智能体的角色设定、人格特征和对话风格 支持多轮对话记忆 提供RESTful API接口 保证角色设定在任…...
LongCat-Video:AI视频生成技术的范式突破与实践指南
LongCat-Video:AI视频生成技术的范式突破与实践指南 【免费下载链接】LongCat-Video 项目地址: https://ai.gitcode.com/hf_mirrors/meituan-longcat/LongCat-Video 在数字内容创作领域,AI视频生成技术正经历从实验性探索到产业化应用的关键转折…...
w3x2lni:魔兽地图跨版本兼容解决方案技术指南
w3x2lni:魔兽地图跨版本兼容解决方案技术指南 【免费下载链接】w3x2lni 魔兽地图格式转换工具 项目地址: https://gitcode.com/gh_mirrors/w3/w3x2lni 价值定位:破解魔兽地图版本壁垒 当你尝试在1.32.8版本魔兽争霸III中运行经典的1.24.4地图时&…...
3步打造B站高效体验:开源客户端的极致优化指南
3步打造B站高效体验:开源客户端的极致优化指南 【免费下载链接】BiliBili-UWP BiliBili的UWP客户端,当然,是第三方的了 项目地址: https://gitcode.com/gh_mirrors/bi/BiliBili-UWP BiliBili-UWP作为一款开源客户端,专为Wi…...
G-Helper解决华硕笔记本续航衰减的智能调控方案:延长50%使用时间
G-Helper解决华硕笔记本续航衰减的智能调控方案:延长50%使用时间 【免费下载链接】g-helper Lightweight, open-source control tool for ASUS laptops and ROG Ally. Manage performance modes, fans, GPU, battery, and RGB lighting across Zephyrus, Flow, TUF,…...
Anaconda镜像源失效?三步解决UnavailableInvalidChannel报错
1. 镜像源失效的典型症状 当你兴冲冲地打开终端准备创建新的Python虚拟环境时,突然看到这段红色报错信息: Collecting package metadata (current_repodata.json): failed UnavailableInvalidChannel: The channel is not accessible or is invalid.chan…...
告别兼容性烦恼,让老旧应用在现代浏览器中“无缝”运行
在数字化转型的浪潮中,企业的技术架构往往承载着历史的痕迹。当我们享受着现代浏览器带来的极速体验与丰富扩展时,一个不容忽视的挑战正悄然影响着员工的工作效率与IT运维的平静——那就是“传统浏览器支持”问题。这并非一个遥不可及的技术概念…...
手把手教你配置Figma MCP:打造属于你自己的AI驱动设计组件库(以阅读题为例)
智能设计革命:用Figma MCP构建AI驱动的交互式学习组件库 当设计系统遇上生成式AI,一场关于效率与智能化的变革正在悄然发生。在Figma中构建可动态响应数据的智能组件库,已成为中高级UI/UX设计师突破传统设计边界的必备技能。本文将深入解析如…...
langchain4j 学习系列(9)-AIService与可观测性
一、基本用法1.1 定义业务接口View Code注:{{it}}是langchain4j内部约定的默认占位符名。当只有1个参数时,{{it}}在运行时,会自动替换成用户的prompt. 当然也可以强制指定参数名,就本示例而言,注释的二种写法ÿ…...
巴旦木脱青皮的设计【solidworks三维、cad图纸、论文、答辩稿】
巴旦木脱青皮设计是农产品加工领域的关键环节,其核心作用在于通过机械结构与工艺参数的协同优化,实现青皮与果仁的高效分离,同时避免果仁损伤。该设计需综合考虑物料特性、动力传递效率及设备稳定性,通过三维建模与二维图纸的精准…...
