当前位置: 首页 > news >正文

Flink中KeyedStateStore实现--怎么做到一个Key对应一个State

背景

在Flink中有两种基本的状态:Keyed State和Operator StateOperator State很好理解,一个特定的Operator算子共享同一个state,这是实现层面很好做到的。
但是 Keyed State 是怎么实现的?一般来说,正常的人第一眼就会想到:一个task绑定一个Keyd State,从网上随便查找资料就能发现正确的答案是:对于每一个Key会绑定一个State,但是这在Flink中是怎么实现的呢?
注意:这里我们只讲Flink中是怎么实现一个Key对应一个State的,其他细节并不细说,且state的backend为RocksDB

闲说杂谈

我们以ValueState类型的Keyed State举例:


ValueStateDescriptor<HoodieRecordGlobalLocation> indexStateDesc =new ValueStateDescriptor<>("indexState",TypeInformation.of(HoodieRecordGlobalLocation.class));
ValueState<HoodieRecordGlobalLocation> indexState = context.getKeyedStateStore().getState(indexStateDesc)
....
indexState.update((HoodieRecordGlobalLocation) indexRecord.getCurrentLocation())
  • context.getKeyedStateStore().getState是获取对应keyState,最终的调用链如下:

     DefaultKeyedStateStore.getState -> getPartitionedState||\/RocksDBKeyedStateBackend.getPartitionedState -> getOrCreateKeyedState -> createInternalState -> tryRegisterKvStateInformation||\/RocksDBValueState.create(创建RocksDBValueState)                                                                             

    这里的 tryRegisterKvStateInformation会涉及到RocksDB ColumnFamily的创建:

    RocksDBOperationUtils.createStateInfo -> createColumnFamilyDescriptor 
    // createColumnFamilyDescriptor的部分代码:
    ColumnFamilyOptions options =createColumnFamilyOptions(columnFamilyOptionsFactory, metaInfoBase.getName());
    if (ttlCompactFiltersManager != null) {ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(metaInfoBase, options);
    }
    byte[] nameBytes = metaInfoBase.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
    ...
    return new ColumnFamilyDescriptor(nameBytes, options);

    其实最终会发现RocksDBColumnFamily是跟ValueStateDescriptor也就是描述符的名字有关的,这就是为什么描述符必须是唯一的,关于RocksDBColumnFamily,可以参考RocksDB 简介
    注意此时返回是key对应的一个State的ColumnFamily,该Family包括该task所有的key的value值

  • indexState.update 这里是更新indexState得值
    因为上一步得到只是该Task所对应的ColumanFamily所对应的所有的values,也就是* Flink中的Key-Groups*,(关于Key-Groups可以参考Apache-Flink深度解析-State)

      public void update(V value) {if (value == null) {clear();return;}try {backend.db.put(columnFamily,writeOptions,serializeCurrentKeyWithGroupAndNamespace(),serializeValue(value));} catch (Exception e) {throw new FlinkRuntimeException("Error while adding data to RocksDB", e);}}
    

    最终的调用链如下:

    RocksDBValueState.update -> serializeCurrentKeyWithGroupAndNamespace||\/
    SerializedCompositeKeyBuilder.buildCompositeKeyNamespace||\/
    serializeNamespace(namespace, namespaceSerializer) -> keyOutView.getCopyOfBuffer()   

    这里的keyOutView.getCopyOfBuffer是会获得的record的key,所以在backend.db.put方法中才会更新对应的Key值。
    但是什么时候Record的key信息会被写入到keyOutView中去呢?

  • Record的key何时被写到keyOutView

    AbstractStreamTaskNetworkInput.emitNext -> processElement||\/
    OneInputStreamTask.emitRecord||\/
    OneInputStreamOperator.setKeyContextElement -> setKeyContextElement1 -> setKeyContextElement||\/
    AbstractStreamOperator.setCurrentKey||\/
    StreamOperatorStateHandler.setCurrentKey||\/
    RocksDBKeyedStateBackend.setCurrentKey||\/
    SerializedCompositeKeyBuilder.setCurrentKey -> serializeKeyGroupAndKey||\/
    keySerializer.serialize(key, keyOutView);    

    最后一步keySerializer.serialize(key, keyOutView)一个Record的key就被写到keyOutView中,也就是说对应的key是从每个record中获取的,所以在backend.db.put方法中就能获取到对应的Key

其他

对于keyedStateStore是在哪里初始化的,可以看AbstractStreamOperatorinitializeState方法:

final StreamOperatorStateContext context =streamTaskStateManager.streamOperatorStateContext(getOperatorID(),getClass().getSimpleName(),getProcessingTimeService(),this,keySerializer,streamTaskCloseableRegistry,metrics,config.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.STATE_BACKEND,runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),runtimeContext.getUserCodeClassLoader()),isUsingCustomRawKeyedState());stateHandler =new StreamOperatorStateHandler(context, getExecutionConfig(), streamTaskCloseableRegistry);

这个方法里也包括了keyedStatedBackendoperatorStateBackend等初始化, 具体的细节后续再解析。

相关文章:

Flink中KeyedStateStore实现--怎么做到一个Key对应一个State

背景 在Flink中有两种基本的状态&#xff1a;Keyed State和Operator State&#xff0c;Operator State很好理解&#xff0c;一个特定的Operator算子共享同一个state&#xff0c;这是实现层面很好做到的。 但是 Keyed State 是怎么实现的&#xff1f;一般来说&#xff0c;正常的…...

flex: 0 0 100%;

flex: 0 0 100%; flex: 0 0 100%; 是一个用于设置flex项的flex-grow、flex-shrink和flex-basis属性的缩写flex-grow&#xff1a;指定了flex项在剩余空间中的放大比例&#xff0c;默认为0&#xff0c;表示不放大。在这个例子中&#xff0c;设置为0表示不允许flex项在水平方向上…...

IMX6ULL系统移植篇-镜像烧写方法

一. 烧录镜像简介 本文我们就来学习&#xff1a;windows 系统下烧录镜像的方法。 如何使用 NXP 官方提供的 MfgTool 工具通过 USB OTG 口来 烧写系统。 二. windows下烧录镜像 1. 烧录镜像前准备工作 &#xff08;1&#xff09;从开发板上拔下 SD卡。 &#xff08;2…...

【Android】实现雷达扫描效果,使用自定义View来绘制雷达扫描动画

要在Android上实现雷达扫描效果&#xff0c;你可以使用自定义View来绘制雷达扫描动画。以下是一个简单的示例代码&#xff1a; 创建一个名为RadarView的自定义View类&#xff0c;继承自View&#xff1a; import android.content.Context; import android.graphics.Canvas; im…...

小程序 - 文件预览

小程序文件预览 /** 预览 - txt文本 */viewTxt(path) {let fs wx.getFileSystemManager();let _this this;fs.readFile({filePath: path,encoding: "utf8",position: 0,success(res) {_this.setData({setNoRefresh: true});wx.navigateTo({url: /pages/view-txt/v…...

将String类型的证书转换为X509Certificate类型对象,读取证书链文件内容,完成证书链校验

证书内容如下所示: 证书内容如下 -----BEGIN CERTIFICATE----- MIIFZDCCA0ygAwIBAgIIYsLLTehAXpYwDQYJKoZIhvcNAQELBQAwUDELMAkGA1UEBhMCQ04xDzANBgNVBAoMBkh1YXdlaTETMBEGA1UECwwKSHVhd2VpIENCRzEbMBkGA1UEAwwSSHVhd2VpIENCRyBSb290IENBMB4XDTE3MDgyMTEwNTYyN1oXDTQyMDgxNTEw…...

v-model实现原理(一根绳上的蚂蚱)

目录 1、什么是v-model2、v-model实现原理3、实现示例3.1 实现text和textarea3.2 实现checkbox和radio3.3 实现select 1、什么是v-model v-model 本质上是一颗语法糖&#xff0c;可以用 v-model 指令在表单 <input>、<textarea> 及 <select>元素上创建双向数…...

第三章 仅支持追加的单表内存数据库

第三章 仅支持追加的单表内存数据库 我们将从小处着手&#xff0c;对数据库施加很多限制。目前&#xff0c;它有如下限制&#xff1a; 支持两种操作&#xff1a;插入一行和打印所有行 仅驻留在内存中&#xff08;不需要持久化到磁盘&#xff09; 支持单个硬编码表 我们的硬…...

抖音seo矩阵系统源码解析

抖音SEO矩阵系统源码是一种用于优化抖音视频内容的工具&#xff0c;可以帮助用户提高抖音视频的搜索排名和流量&#xff0c;从而增加视频曝光和转化率。该系统包括两部分&#xff0c;即数据收集和分析模块以及SEO策略和实施模块。 数据收集和分析模块主要负责从抖音平台上收集…...

6个ChatGPT4的最佳用途

文章目录 ChatGPT 4’s Current Limitations ChatGPT 4 的当前限制1. Crafting Complex Prompts 制作复杂的提示2. Logic Problems 逻辑问题3. Verifying GPT 3.5 Text 验证 GPT 3.5 文本4. Complex Coding 复杂编码5.Nuanced Text Transformation 细微的文本转换6. Complex Kn…...

go系列-读取文件

1 概述 2 整个文件读入内存 直接将数据直接读取入内存&#xff0c;是效率最高的一种方式&#xff0c;但此种方式&#xff0c;仅适用于小文件&#xff0c;对于大文件&#xff0c;则不适合&#xff0c;因为比较浪费内存。 2.1 直接指定文化名读取 在 Go 1.16 开始&#xff0c;i…...

10 编码转换问题

文章目录 字符编码问题编码转换问题ANSI转UnicodeUnicode转ANSIUtf8转 ANSIutf8 转UnicodeANSI 转UTF-8Unicode 转 UTF-8 全部代码 字符编码问题 Windows API 函数 MessageBoxA:MessageBox 内部实现&#xff0c;字符串编码(ANSI)转换成了Unicode,在调用MessageboxW MessageBox:…...

Spring MVC获取参数和自定义参数类型转换器及编码过滤器

目录 一、使用Servlet原生对象获取参数 1.1 控制器方法 1.2 测试结果 二、自定义参数类型转换器 2.1 编写类型转换器类 2.2 注册类型转换器对象 2.3 测试结果 三、编码过滤器 3.1 JSP表单 3.2 控制器方法 3.3 配置过滤器 3.4 测试结果 往期专栏&文章相关导读…...

理想的实验

1.关于“问题”的问题 一项研究计划可以围绕四个基本问题&#xff08;frequently asked questions,FAQ&#xff09;展开&#xff1a; 研究对象间的&#xff08;因果&#xff09;关系&#xff08;relationship of interest&#xff09; 这里更关注的是“因果关系”&#xff0c…...

nginx配置开机启动(Windows环境)

文章目录 1、下载nginx&#xff0c;并解压2、配置nginx.conf&#xff0c;并启动Nginx3、开机自启动 1、下载nginx&#xff0c;并解压 2、配置nginx.conf&#xff0c;并启动Nginx 两种方法&#xff1a; 方法一&#xff1a;直接双击nginx.exe&#xff0c;双击后一个黑色弹窗一闪…...

MySQL 基础面试题02(事务索引)

1.什么是 MySQL 事务&#xff1f; MySQL 事务是指一组操作&#xff0c;是一个不可分割的工作单位&#xff0c;可以确保一组数据库操作要么全部执行&#xff0c;要么全部不执行。换句话说&#xff0c;事务是 MySQL 中保证数据一致性和完整性的机制。 在 MySQL 中&#xff0c;事…...

主从架构lua脚本-Redis(四)

上篇文章介绍了rdb、aof持久化。 持久化RDB/AOF-Redis&#xff08;三&#xff09;https://blog.csdn.net/ke1ying/article/details/131148269 redis数据备份策略 写job每小时copy一份到其他目录。目录里可以保留最近一个月数据。把目录日志保存到其他服务器&#xff0c;防止机…...

maven与idea版本适配问题

maven与idea版本适配问题 1.版本对应关系——3.6.3 注意&#xff1a;针对一些老项目 还是尽量采用 3.6.3版本&#xff0c;针对idea各个版本的兼容性就很兼容 0.IDEA 2022 兼容maven 3.8.1及之前的所用版本 1.IDEA 2021 兼容maven 3.8.1及之前的所用版本 2.IDEA 2020 兼容Mave…...

ChatGPT扫盲知识库

本文并不是教你如何使用ChatGPT&#xff0c;而是帮助小白理清一些与ChatGPT相关的概念&#xff0c;并解释一些常见的问题。 概念 OpenAI: 一家人工智能公司&#xff0c;ChatGPT属于该公司的产品之一。前身是一个非盈利组织&#xff0c;不过目前已经转变为一家商业公司。 GPT: O…...

chatgpt赋能python:Python轨迹可视化:用数据讲故事

Python轨迹可视化&#xff1a;用数据讲故事 介绍 随着物联网、智能城市等领域的发展&#xff0c;越来越多的数据被收集下来并存储在数据库中。这些数据对于决策者来说是非常重要的&#xff0c;但是如何将这些数据进行展示和分析呢&#xff1f;这时候Python轨迹可视化就可以派…...

【Axure高保真原型】引导弹窗

今天和大家中分享引导弹窗的原型模板&#xff0c;载入页面后&#xff0c;会显示引导弹窗&#xff0c;适用于引导用户使用页面&#xff0c;点击完成后&#xff0c;会显示下一个引导弹窗&#xff0c;直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…...

QMC5883L的驱动

简介 本篇文章的代码已经上传到了github上面&#xff0c;开源代码 作为一个电子罗盘模块&#xff0c;我们可以通过I2C从中获取偏航角yaw&#xff0c;相对于六轴陀螺仪的yaw&#xff0c;qmc5883l几乎不会零飘并且成本较低。 参考资料 QMC5883L磁场传感器驱动 QMC5883L磁力计…...

【磁盘】每天掌握一个Linux命令 - iostat

目录 【磁盘】每天掌握一个Linux命令 - iostat工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景 注意事项 【磁盘】每天掌握一个Linux命令 - iostat 工具概述 iostat&#xff08;I/O Statistics&#xff09;是Linux系统下用于监视系统输入输出设备和CPU使…...

postgresql|数据库|只读用户的创建和删除(备忘)

CREATE USER read_only WITH PASSWORD 密码 -- 连接到xxx数据库 \c xxx -- 授予对xxx数据库的只读权限 GRANT CONNECT ON DATABASE xxx TO read_only; GRANT USAGE ON SCHEMA public TO read_only; GRANT SELECT ON ALL TABLES IN SCHEMA public TO read_only; GRANT EXECUTE O…...

如何为服务器生成TLS证书

TLS&#xff08;Transport Layer Security&#xff09;证书是确保网络通信安全的重要手段&#xff0c;它通过加密技术保护传输的数据不被窃听和篡改。在服务器上配置TLS证书&#xff0c;可以使用户通过HTTPS协议安全地访问您的网站。本文将详细介绍如何在服务器上生成一个TLS证…...

Java 加密常用的各种算法及其选择

在数字化时代&#xff0c;数据安全至关重要&#xff0c;Java 作为广泛应用的编程语言&#xff0c;提供了丰富的加密算法来保障数据的保密性、完整性和真实性。了解这些常用加密算法及其适用场景&#xff0c;有助于开发者在不同的业务需求中做出正确的选择。​ 一、对称加密算法…...

.Net Framework 4/C# 关键字(非常用,持续更新...)

一、is 关键字 is 关键字用于检查对象是否于给定类型兼容,如果兼容将返回 true,如果不兼容则返回 false,在进行类型转换前,可以先使用 is 关键字判断对象是否与指定类型兼容,如果兼容才进行转换,这样的转换是安全的。 例如有:首先创建一个字符串对象,然后将字符串对象隐…...

Python Einops库:深度学习中的张量操作革命

Einops&#xff08;爱因斯坦操作库&#xff09;就像给张量操作戴上了一副"语义眼镜"——让你用人类能理解的方式告诉计算机如何操作多维数组。这个基于爱因斯坦求和约定的库&#xff0c;用类似自然语言的表达式替代了晦涩的API调用&#xff0c;彻底改变了深度学习工程…...

VisualXML全新升级 | 新增数据库编辑功能

VisualXML是一个功能强大的网络总线设计工具&#xff0c;专注于简化汽车电子系统中复杂的网络数据设计操作。它支持多种主流总线网络格式的数据编辑&#xff08;如DBC、LDF、ARXML、HEX等&#xff09;&#xff0c;并能够基于Excel表格的方式生成和转换多种数据库文件。由此&…...

使用SSE解决获取状态不一致问题

使用SSE解决获取状态不一致问题 1. 问题描述2. SSE介绍2.1 SSE 的工作原理2.2 SSE 的事件格式规范2.3 SSE与其他技术对比2.4 SSE 的优缺点 3. 实战代码 1. 问题描述 目前做的一个功能是上传多个文件&#xff0c;这个上传文件是整体功能的一部分&#xff0c;文件在上传的过程中…...