大数据-126 - Flink State 03篇 状态原理和原理剖析:状态存储 Part1
点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(已更完)
- Spark(已更完)
- Flink(正在更新!)
章节内容
上节我们完成了如下的内容:
- Flink 广播状态
- 基本概念、代码案例、测试结果
状态存储
Flink的一个重要特性就是有状态计算(stateful processing),Flink提供了简单易用的API来存储和获取状态,但是我们还是要理解API背后的原理,才能更好的使用。
State存储方式
Flink为State提供了三种开箱即用的后端存储方式(state backend):
- Memory State Backend
- File System (FS)State Backend
- RocksDB State Backend
MemoryStateBackend
MemoryStateBackend将工作状态数据保存在TaskManager的Java内存中。Key/Value状态和Window算子使用哈希表存储数值和触发器。进行快照时(CheckPointing),生成的快照数据将和 CheckPoint ACK消息一起发送给 JobManager,JobManager将收到的所有快照数据保存在Java内存中。
MemoryStateBackend现在被默认配置成异步的,这样避免阻塞主线程的pipline处理,MemoryStateBackend的状态存取的数据都非常快,但是不适合生产环境中使用。这是以为它有以下限制:
- 每个state的默认大小被限制为5MB(这个值可以通过MemoryStateBackend构造方法设置)
- 每个Task的所有State数据(一个Task可能包含一个Pipline中的多个的Operator)大小不能超过RPC系统的帧大小(akk.framesize 默认10MB)
- JobManager收到的State数据总和不能超过JobManager内存
MemoryStateBackend适合的场景:
- 本地开发和调试
- 状态很小的作业
FsStateBackend
FsStateBackend 需要配置一个CheckPoint路径,例如:hdfs://xxxxxxx 或者 file:///xxxxx,我们一般都会配置HDFS的目录。
FsStateBackend将工作状态数据保存在TaskManager的Java内存中,进行快照时,再将快照数据写入上面的配置的路径,然后将写入的文件路径告知JobManager。JobManager中保存所有状态的元数据信息(在HA的模式下,元数据会写入CheckPoint目录)。
FsStateBackend 默认使用异步方式进行快照,防止阻塞主线程的Pipline处理,可以通过FsStateBackend构造函数取消该模式:
new FsStateBackend(path, false)
FsStateBackend 适合的场景:
- 大状态、长窗口、大键值(键或者值很大)状态的作业
- 适合高可用方案
RocksDBStateBackend
RocksDBStateBackend 也需要配置一个CheckPoint路径,例如:hdfs://xxx 或者 file:///xxx,一般是 HDFS 路径。
RocksDB是一种可嵌入的可持久型的 key-value 存储引擎,提供 ACID 支持。由Facebook基于LevelDB开发,使用LSM存储引擎,是内存和磁盘的混合存储。
RocksDBStateBackend将工作状态保存在TaskManager的RocksDB数据库中,CheckPoint时,RocksDB中的所有数据会被传输到配置的文件目录,少量元数据信息保存在JobManager内存中(HA模式下,会保存在CheckPoint目录)。
RocksDBStateBackend使用异步方式进行快照,RocksDBStateBackend的限制:
- 由于RocksDB的JNI Bridge API是基于 byte[] 的,RocksDBStateBackend支持的每个Key或者每个Value的最大值不超过 2的31次方(2GB)
- 要注意的是,有merge操作的状态(例如:ListState),可能会在运行过程中超过2的31次时,导致程序失败。
RocksDBStateBackend适用于以下的场景: - 超大状态、超长窗口(天)、大键值状态的作业
- 适合高可用模式
使用RocksDBStateBackend时,能够限制状态大小是TaskManager磁盘空间(相对于FsStateBackend状态大小限制与TaskManager内存)。这也导致RocksDBStateBackend的吞吐比其他两个要低一些,因为RocksDB的状态数据的读写都要经过反序列化/序列化。
RocksDBStateBackend时目前三者中唯一支持增量CheckPoint的。
三者吞吐量对比
KeyedState 和 Operator State
State分类
Operator State
(或 non-keyed state):
每个Operator State绑定一个并行的Operator实例,KafkaConnector是使用OperatorState的典型示例:每个并行的Kafka Consumer实例维护了每个Kafka Topic分区和该分区Offset的映射关系,并将这个映射关系保存为OperatorState。
在算子并行度改变时,OperatorState也会重新分配。
Keyed State
这种State只存在于KeyedStream上的函数和操作中,比如Keyed UDF(KeyedProcessFunction)Window State。可以把Keyed State想象成被分区的OperatorState。每个KeyedState在逻辑上可以看成与一个 <parallel-operator-instance, key> 绑定,由于一个key肯定只存在于一个Operator实例,所以我们可以简单的的认为一个 <operator, key>对应一个 KeyedState。
每个KeyedState在逻辑上还会被分配一个KeyGroup,分配方法如下:
MathUtils.murmurHash(key.hashCode()) % maxParallelism;
其中maxParallelism是Flink程序的最大并行度,这个值一般我们不会去手动设置,使用默认的值(128)就好,这里注意下,maxParallelism和我们运行程序时指定的算子并行度(parallelism)不同,parallelism不能大于maxParallelism,最多两者相等。
为什么会有 Key Group这个概念呢?
我们通常写程序,会给算子指定一个并行度,运行一段时间后,积累了一些State,这时候数据量大了,需要增加并行度。我们修改并行度后重新提交,那这些已经存在的State该如何分配到各个Operator呢?这就有了最大并行度和KeyGroup的概念。
上面的计算公式也说明了KeyGroup的个数最多是maxParallelism个。当并行度改变之后,我们在计算这个Key被分配到的Operator:
keyGroupId * paralleism / maxParallelism;
可以看到,一个KeyGroupId会对应一个Operator,当并行度更改时,新的Operator会去拉取对应的KeyGroup的KeyedState,这样就把KeyedState尽量均匀的分配给所有的Operator了。
根据State数据是否被Flink托管,Flink又将State分类为:
- Managed State:被Flink托管,保存为内部的哈希表或者RocksDB,CheckPoint时,Flink将State进行序列化编码。例如:ValueState ListState
- Row State:Operator 自行管理的数据结构, Checkpoint时,它们只能以byte数据写入CheckPoint。
建议使用 Managed State,当使用 Managed State时,Flink会帮助我们更改并行度时重新分配State,优化内存。
使用ManageKeyedState
如何创建?
上面提到,KeyedState只能在KeyedStream上使用,可以通过Stream.keyBy创建KeyedStream,我们可以创建以下几种:
- ValueState
- ListState
- ReducingState
- AggregatingState<IN,OUT>
- MapState<UK,UV>
- FoldingState<T,ACC>
每种State都对应各种的描述符,通过描述符RuntimeContext中获取对应的State,而RuntimeContext只有RichFunction才能获取,所以想要使用KeyedState,用户编写的类必须继承RichFunction或者其他子类。
- ValueState getState(ValueStateDescriptor)
- ReducingState getReducingState(ReducingStateDescriptor)
- ListState getListState(ListStateDescriptor)
- AggregationState<IN,OUT> getAggregatingState(AggregatingStateDescriptor<IN,ACC,OUT>)
- FoldingState<T,ACC> getFoldingState(FoldingStateDescriptor<T,ACC>)
- MapState<UK,UV> getMapState(MapStateDescriptor<UK,UV>)
给KeyedState设置过期时间
在Flink 1.6.0 以后,还可以给KeyedState设置 TTL(Time-To-Live),当某一个Key的State数据过期时,会被StateBackend尽力删除。
官方给出了示例:
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)) // 状态存活时间.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // TTL 何时被更新,这里配置的 state 创建和写入时.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();// 设置过期的 state 不被读取
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("textstate", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
State的TTL何时被更新?
可以进行以下配置,默认只是key的state被modify(创建或者更新)的时候才更新TTL:
- StateTtlConfig.UpdateType.OnCreateAndWrite:只在一个key的state创建和写入时更新TTL(默认)
- StateTtlConfig.UpdateType.onReadAndWrite:读取state时仍然更新TTL
当State过期但是还未删除时,这个状态是否还可见?
可以进行以下配置,默认是不可见的:
- StateTtlConfig.StateVisibility.NerverReturnExpired:不可见(默认)
- StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp:可见
注意:
- 状态的最新访问时会和状态数据保存在一起,所以开启TTL特性增大State的大小,Heap State Backend会额外存储一个包括用户状态以及时间戳的Java8对象,RocksDB StateBackend会在每个状态值(list或者map的每个元素)序列化后增加8个字节。
- 暂时只支持基于 Processing Time的TTL。
- 尝试从CheckPoint/SavePoint进行恢复时,TTL的状态(是否开启)必须和之前保存一致,否则会遇到:StateMigrationException。
- TTL的配置并不会保存在CheckPoint/SavePoint中,仅对当前的Job有效。
- 当前开启TTL的MapState仅在用户序列化支持NULL的情况下,才支持用户值为NULL,如果用户值序列化器不支持NULL,可以用NullableSerializer包装一层。
使用ManageOperatorState
(这里以及后续放到下一篇:大数据-127 Flink)
相关文章:

大数据-126 - Flink State 03篇 状态原理和原理剖析:状态存储 Part1
点一下关注吧!!!非常感谢!!持续更新!!! 目前已经更新到了: Hadoop(已更完)HDFS(已更完)MapReduce(已更完&am…...

RFID射频模块(MFRC522 STM32)
目录 一、介绍 二、传感器原理 1.原理图 2.引脚描述 3.工作原理介绍 三、程序设计 main.c文件 MFRC522.h文件 MFRC522.c文件 四、实验效果 五、资料获取 项目分享 一、介绍 RC522 RFID射频模块是一款广泛应用于非接触式RFID系统中的核心组件,由NXP&…...

【JavaSE】--方法的使用
文章目录 1. 方法概念及使用1.1 什么是方法1.2 方法定义1.3 方法调用的执行过程1.4 实参和形参的关系(重要)1.5 没有返回值的方法 2. 方法重载2.1 方法重载概念2.2 方法签名 3. 递归3.1 递归的概念3.2 递归执行过程分析3.3 递归练习 1. 方法概念及使用 1…...

wireshark打开时空白|没有接口,卸载重装可以解决
解决方法:卸载wireshark,全选卸载干净,重新安装旧版的wireshark4.2.7, 甚至cmd下运行net start npf时显示服务名无效,但打开wireshark仍有多个接口 错误描述: 一开始下载的是wireshark的最新版,win11 x64 在安装wir…...

单值二叉树--(C语言)
题目如下: 如果二叉树每个节点都具有相同的值,那么该二叉树就是单值二叉树。 只有给定的树是单值二叉树时,才返回 true;否则返回 false。 示例 1: 输入:[1,1,1,1,1,null,1] 输出:true示例 2&a…...

Linux云计算 |【第三阶段】PROJECT1-DAY2
主要内容: 网站架构演变、LNPMariadb数据库分离、Web服务器集群(部署Nginx后端web服务器、部署NFS共享存储服务器、部署Haproxy代理服务器、部署DNS域名解析服务器) 一、网站架构演变: 随着网站访问量和业务复杂度的增加&#x…...

Claude Prompt 汉语新解
感谢刚哥! ;; 作者: 李继刚 ;; 版本: 0.3 ;; 模型: Claude Sonnet ;; 用途: 将一个汉语词汇进行全新角度的解释 ;; 设定如下内容为你的 *System Prompt* (defun 新汉语老师 () "你是年轻人,批判现实,思考深刻,语言风趣" (风格 . ("Oscar Wilde&q…...

【运维监控】influxdb 2.0+grafana 监控java 虚拟机以及方法耗时情况(2)
运维监控系列文章入口:【运维监控】系列文章汇总索引 文章目录 四、grafana集成influxdb监控java 虚拟机以及方法耗时情况1、添加grafana数据源2、添加grafana的dashboard1)、选择新建dashboard方式2)、导入dashboard 3、验证 关于java应用的…...

怎么看待伦敦银交易的风险与收益?
伦敦银交易的风险与收益,在宣传材料中,伦敦银是一种潜在收益很高,潜在风险不大的品种。然而在实践中我们发现,伦敦银交易好像并不如宣传材料说的那样容易做。那么,具体伦敦银交易的风险和收益是怎么样的?那…...
如何通俗易懂的解释TON的智能合约
文章目录 一、小故事一则二、Ton的智能合约在小故事中三、python代码模拟 一、小故事一则 在一个遥远的国度里,有一个被魔法笼罩的小镇,这个小镇每年都会举办一场盛大的戏剧节。这个戏剧节不仅是演员们展示才华的舞台,更是他们交流心得、共同…...

针对Docker容器的可视化管理工具—DockerUI
目录 ⛳️推荐 前言 1. 安装部署DockerUI 2. 安装cpolar内网穿透 3. 配置DockerUI公网访问地址 4. 公网远程访问DockerUI 5. 固定DockerUI公网地址 ⛳️推荐 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下…...

五大注入攻击网络安全类型介绍
1. SQL注入(SQL Injection) SQL注入流程 1.1. 概述 SQL注入是最常见的注入攻击类型之一,攻击者通过在输入字段中插入恶意的SQL代码来改变原本的SQL逻辑或执行额外的SQL语句,来操控数据库执行未授权的操作(如拖库、获取…...

linux-L9.linux中对文件 按照时间排序 显示100 个
find . -type f -exec stat --format %Y %n {} | sort -nr | head -n 100解释: • find . -type f:在当前目录下查找所有文件。 • -exec stat --format ‘%Y %n’ {} :对每个找到的文件执行stat命令,以获取文件的修改时间&#…...

springboot从分层到解耦
注释很详细,直接上代码 三层架构 项目结构 源码: HelloController package com.amoorzheyu.controller;import com.amoorzheyu.pojo.User; import com.amoorzheyu.service.HelloService; import com.amoorzheyu.service.impl.HelloServiceA; import o…...

网络视频流解码显示后花屏问题的分析
问题描述 rtp打包的ps视频流发送到客户端后显示花屏。 数据分析过程 1、用tcpdump抓包 tcpdump -i eth0 -vnn -w rtp.pcap 2、用wireshark提取rtp的payload 保存为record.h264文件 3、用vlc播放器播放 显示花屏 4、提取关键帧 用xxd命令将h264文件转为txt文件 xxd -p…...
MySQL 大量 IN 的查询优化
背景 (1)MySQL 8.0 版本 (2)业务中遇到大量 IN 的查询,例: SELECT id, username, icon FROM users WHERE id IN (123, 523, 1343, ...);其中 id 为主键,IN 的列表长度有 8000 多个 问题 …...
python运维
环境准备 安装python3环境 # centos 安装python3 yum install python3创建激活venv python3 -m venv .venv source .venv/bin/activatezookeeper pip install kazoo 递归复制目录 from kazoo.client import KazooClientdef copy_node(zk, source_path, destination_path)…...

gen_server补充基础学习
学习gen_server的回调结构 gen_server:start_link(Name, Mod, InitArgs, Opts)这个调用是所有事物的起点。它 会创建一个名为Name的通用服务器,回调模块是Mod,Opts则控制通用服务器的行为。在这里可以指定消息记录、函数调试和其他行为。通用服务器通过…...
Python 入门教程(3)基础知识 | 3.1、基础语法
文章目录 一、 基础语法1、缩进规则2、标识符3、多行语句 一、 基础语法 1、缩进规则 学习 Python 与其他语言最大的区别就是,Python 的代码块不使用大括号 {} 来控制类,函数以及其他逻辑判断。python 最具特色的就是用缩进来写模块。缩进的空白数量是可…...
git 合并分支并解决冲突
git 合并分支并解决冲突 切换分支 git checkout <branch-name> 首先切换到要合并的目标分支 合并分支 git merge <source-branch> //将源分支代码合并到当前分支中,源分支的各项新增的提交都会按时间点插入到当前分支的提交记录中 git merge …...

相机从app启动流程
一、流程框架图 二、具体流程分析 1、得到cameralist和对应的静态信息 目录如下: 重点代码分析: 启动相机前,先要通过getCameraIdList获取camera的个数以及id,然后可以通过getCameraCharacteristics获取对应id camera的capabilities(静态信息)进行一些openCamera前的…...
Axios请求超时重发机制
Axios 超时重新请求实现方案 在 Axios 中实现超时重新请求可以通过以下几种方式: 1. 使用拦截器实现自动重试 import axios from axios;// 创建axios实例 const instance axios.create();// 设置超时时间 instance.defaults.timeout 5000;// 最大重试次数 cons…...

NLP学习路线图(二十三):长短期记忆网络(LSTM)
在自然语言处理(NLP)领域,我们时刻面临着处理序列数据的核心挑战。无论是理解句子的结构、分析文本的情感,还是实现语言的翻译,都需要模型能够捕捉词语之间依时序产生的复杂依赖关系。传统的神经网络结构在处理这种序列依赖时显得力不从心,而循环神经网络(RNN) 曾被视为…...

UR 协作机器人「三剑客」:精密轻量担当(UR7e)、全能协作主力(UR12e)、重型任务专家(UR15)
UR协作机器人正以其卓越性能在现代制造业自动化中扮演重要角色。UR7e、UR12e和UR15通过创新技术和精准设计满足了不同行业的多样化需求。其中,UR15以其速度、精度及人工智能准备能力成为自动化领域的重要突破。UR7e和UR12e则在负载规格和市场定位上不断优化…...
[Java恶补day16] 238.除自身以外数组的乘积
给你一个整数数组 nums,返回 数组 answer ,其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法,且在 O(n) 时间复杂度…...
基于Java Swing的电子通讯录设计与实现:附系统托盘功能代码详解
JAVASQL电子通讯录带系统托盘 一、系统概述 本电子通讯录系统采用Java Swing开发桌面应用,结合SQLite数据库实现联系人管理功能,并集成系统托盘功能提升用户体验。系统支持联系人的增删改查、分组管理、搜索过滤等功能,同时可以最小化到系统…...

代码规范和架构【立芯理论一】(2025.06.08)
1、代码规范的目标 代码简洁精炼、美观,可持续性好高效率高复用,可移植性好高内聚,低耦合没有冗余规范性,代码有规可循,可以看出自己当时的思考过程特殊排版,特殊语法,特殊指令,必须…...

CVPR2025重磅突破:AnomalyAny框架实现单样本生成逼真异常数据,破解视觉检测瓶颈!
本文介绍了一种名为AnomalyAny的创新框架,该方法利用Stable Diffusion的强大生成能力,仅需单个正常样本和文本描述,即可生成逼真且多样化的异常样本,有效解决了视觉异常检测中异常样本稀缺的难题,为工业质检、医疗影像…...

算法打卡第18天
从中序与后序遍历序列构造二叉树 (力扣106题) 给定两个整数数组 inorder 和 postorder ,其中 inorder 是二叉树的中序遍历, postorder 是同一棵树的后序遍历,请你构造并返回这颗 二叉树 。 示例 1: 输入:inorder [9,3,15,20,7…...

倒装芯片凸点成型工艺
UBM(Under Bump Metallization)与Bump(焊球)形成工艺流程。我们可以将整张流程图分为三大阶段来理解: 🔧 一、UBM(Under Bump Metallization)工艺流程(黄色区域ÿ…...