Flink状态的理解
Flink是一个带状态的数据处理系统;系统在处理数据的过程中,各算子所记录的状态会随着数据的处理而不断变化;
1. 状态
所谓状态State,一般指一个具体的 Task 的状态,即线程处理过程中需要保存的历史数据或历史累计数据,默认保存在 Java 的堆内存中。

根据算子是否存在按照Key进行分区,State可以划分为keyed state 和 Non-keyed state(Operator State、算子状态)
- operator state是task级别的state,说白了就是每个task对应一个state, 在逻辑上,由算子task下所有subtask共享

Operator State的经常被用在Source或Sink算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证Flink应用的Exactly-Once语义。 - keyed state 是基于KeyedStream上的状态,这个状态是跟特定的Key 绑定的。KeyedStream流上的每一个Key,都对应一个State

2. 状态数据结构
状态数据由Flink内置状态机制管理。keyed state提供了5种数据结构
2.1 keyed state 数据结构
| 状态 | 状态描述 |
|---|---|
| ValueState | 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的key |
| ListState | 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索 |
| MapState | 维护了一个映射列表 |
| ReducingState | 保存一个单值,表示添加到状态的所有值的聚合 |
| AggregateState | 保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与添加到状态的元素的类型不同 |
2.2 operator state
| 状态 | 状态描述 |
|---|---|
| ListState | ListState的快照存储数据,系统重启后,list数据的重分配模式为: round-robin 轮询平均分配 |
| UnionListState | UnionListState的快照存储数据,在系统重启后,list数据的重分配模式为: 广播模式; 在每个subtask上都拥有一份完整的数据; |
3. 状态后端
默认情况下,state会保存在taskmanager的JVM堆内存,checkpoint会存储在JobManager的内存中。然而,状态数据的存储和checkpoint的存储位置可以改变,由state Backend(状态后端)配置实现
老版本(flink-1.12版及以前) Fsstatebackend MemoryStatebackend RocksdbStateBackend
flink1.12版本之后,可用的状态后端类型有两种
HashMapStateBackend、EmbeddedRocksDBStateBackend
而且其所生成的快照文件也统一了格式,因而在job重新部署或者版本升级时,可以任意替换statebackend
-
HashMapStateBackend
※ 状态数据是以java对象形式存储在heap内存中;
※ 内存空间不够时,也会溢出部分数据到本地磁盘文件;
※ 可以支撑大规模的状态数据;(只不过在状态数据规模超出内存空间时,读写效率就会明显降低) -
EmbeddedRocksDBStateBackend
※ RocksDB使用一套日志结构的数据库引擎,它是Flink中内置的第三方状态管理器, 为了更好的性能,这套引擎是用C++编写的。 Key和value是任意大小的字节流。
※ 它需要配置一个远端的filesystem uri(一般是HDFS),在做checkpoint的时候,会把本地的数据直接复制到fileSystem中。fail over的时候从fileSystem中恢复到本地, RocksDB克服了state受内存限制的缺点,同时又能够持久化到远端文件系统中,比较适合在生产中使用。
※ 使用RocksDB + HDFS进行state存储:首先
state先在taskManger的本地存储到RocksDB,然后异步写入到HDFS中,状态数量仅仅受限于本地磁盘容量限制。

4. 状态数据容错
Flink是一个stateful(带状态)的数据处理系统;系统在处理数据的过程中,各算子所记录的状态会随着数据的处理而不断变化;
一旦系统崩溃,需要重启后能够恢复出崩溃前的状态才能进行数据的接续处理;因此,必须要一种机制能对系统内的各种状态进行持久化容错;Flink用checkpoint机制实现状态数据的容错
4.1 checkpoint
Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator/task的状态来生成快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常。
-
checkpoint默认关闭,需要手工开启。开启后,默认Exactly-once快照模式。还有一种快照模式为At-least-once
-
checkPoint的位置设置
flink-conf.yaml#state.checkpoints.dir -
Flink支持不同的重启策略,以在故障发生时控制作业如何重启,集群在启动时会伴随一个默认的重启策略。配置参数
flink-conf.yaml#restart-strategy
♦ 如果没有启用 checkpointing,则使用无重启 (no restart) 策略 ♦ 如果启用了 checkpointing,但没有配置重启策略,则使用固定 间隔 (fixed-delay) 策略,尝试重启次数默认值是:Integer.MAX_VALUE。
♦ 另一种重启策略为Failure rate,某时间段内失败了N次就重启
全局配置
# 每隔3s重启一次,重试间隔为10s
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
#5分钟内若失败了3次则认为该job失败,重试间隔为10s
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
# 不重启
restart-strategy: none
单个JOB内配置
Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
//开启状态检查点机制(它将会定期对整个系统中各个task的状态进行快照持久化,以便失败重启后还能从失败之前的状态恢复)env.enableCheckpointing(1000,CheckpointingMode.EXACTLY_ONCE);
// checkpoint机制触发后,持久化保存各task状态数据的存储位置(生产中用hdfs
env.getCheckpointConfig().setCheckpointStorage("hdfs://node01:8020/tmp/flink/state");
// 指定状态后端存储(内存)
env.setStateBackend(new HashMapStateBackend());
// 开启自动重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.milliseconds(2000)));
// env.setRestartStrategy(RestartStrategies.failureRateRestart(3, org.apache.flink.api.common.time.Time.seconds(100), org.apache.flink.api.common.time.Time.seconds(10)));
// env.setRestartStrategy(RestartStrategies.noRestart())
- checkpoint个数默认保留最近成功生成的一个,支持保留多个,通过参数
flink-conf.yaml#state.checkpoints.num-retained控制,如果希望回退到某个Checkpoint点,只需要指定对应的某个Checkpoint路径即可实现
flink run -t yarn-per-job -yjm 1024 -ytm 1024 -s hdfs://node01:8020/tmp/flink/state/715b120fe8736a3af7842ea0a5264c46/chk-6/_metadata
4.2 savePoint
savePoint是检查点一种特殊实现,底层其实也是使用Checkpoint的机制。
savePoint是用户以手工命令的方式触发checkpoint,并将结果持久化到指定的存储目录中。
作业升级、代码修改、任务迁移和维护,都可以使用savePoint
-
savePoint的存储位置
savePoint的存储位置flink-conf.yaml#state.savepoints.dir,不是必须设置,但设置了后, 后面创建指定Job的Savepoint时,可以不用在手动执行命令时指定Savepoint的位置。 -
savePoint的手动触发:
#【针对on standAlone模式】
bin/flink savepoint jobId [targetDirectory]#【针对on yarn模式需要指定-yid参数】
bin/flink savepoint jobId [targetDirectory] -yid yarnAppId
jobId 需要触发savepoint的jobId编号
targetDirectory 指定savepoint存储数据目录
-yid 指定yarnAppId
例如: flink savepoint 84e766231bbe4b9ff3667f9a0d80b867 -yid application_1619059559839_0001
- 查看HDFS上savepoint目录
#Savepoint directory /flink/savepoints/savepoint-:shortjobid-:savepointid/
#Savepoint file contains the checkpoint meta data /savepoints/savepoint-:shortjobid-:savepointid/_metadata

4. 触发savepoint并且停止作业
##语法: bin/flink stop jobId -yid yarnAppId
##例如: flink stop 84e766231bbe4b9ff3667f9a0d80b867 -yid application_1619059559839_0001
- 从指定的savepoint启动job
##语法: bin/flink run -s savepointPath [runArgs]
##例如: flink run -t yarn-per-job -yjm 1024 -ytm 1024
-s hdfs://node01:8020/flink/savepoints/savepoint-84e766-0591f3377ad0
-c com.loess.checkpoint.TestCheckPoint flink-study-1.0-SNAPSHOT.jar
- 清除savepoint数据
bin/flink savepoint -d savepointPath
##也可以手动删除某个savepoint,这通过常规的文件系统操作就可以做到,不影响其它的savepoints和checkpoints
- savePoint使用建议
为了能够在作业的不同版本之间以及 Flink 的不同版本之间顺利升级,推荐通过 uid(String) 方法手动的给算子赋予 ID,这些
ID 将用于确定每一个算子的状态范围。不手动给各算子指定 ID,则会由 Flink 自动给每个算子生成一个 ID。只要这些 ID 没有改变就能从保存点
(savepoint)将程序恢复回来。而这些自动生成的 ID
依赖于程序的结构,并且对代码的更改敏感。当程序改变时,ID会随之变化,所以建议用户手动设置 ID
DataStream<String> stream = env.// Stateful source (e.g. Kafka) with ID.addSource(new StatefulSource()).uid("source-id") // ID for the source operator.shuffle()// Stateful mapper with ID.map(new StatefulMapper()).uid("mapper-id") // ID for the mapper// Stateless printing sink.print(); // Auto-generated ID
4.3 savePoint与checkPoint的区别
- checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从checkpoint来恢复运行,且不影响作业逻辑的准确性。而savepoint的侧重点是“维护”,即Flink作业需要在人工干预下手动重启、升级、迁移或A/B测试时,先将状态整体写入可靠存储,维护完毕之后再从savepoint恢复现场。
- savepoint是通过checkpoint机制创建的,所以savepoint本质上是特殊的checkpoint。
- checkpoint面向Flink Runtime本身,由Flink的各个TaskManager定时触发快照并自动清理,一般不需要用户干预;savepoint面向用户,完全根据用户的需要触发与清理。
- checkpoint是支持增量的(通过RocksDB),特别是对于超大状态的作业而言可以降低写入成本。savepoint并不会连续自动触发,所以savepoint没有必要支持增量。

5. checkpoint机制原理
checkPoint是所有 Operator / Task 的状态在某个时间点的一份拷贝(一份快照), 这个时间点应该是所有 Operator / Task 任务都恰好处理完一个相同的输入数据的时候。

若某个subTask挂了,则此时的状态都被清空,从checkpoint恢复最近一次的状态,重新启动应用程序,计算输入流
5.1 Barrier机制
Barrier是一种特殊事件,用来作为快照信号,由checkpoint 协调器向数据流中注入该信号,subtask任务收到该信号后,就会执行状态的快照。

- 首先是JobManager中的checkpoint Coordinator(协调器) 向任务中的所有source Task周期性发送barrier(栅栏)进行快照请求。
- source Task接受到barrier后, 会把当前自己的状态进行snapshot(可以保存在HDFS上)。
- source向checkpoint coordinator确认snapshot已经完成。
- source继续向下游transformation operator发送 barrier。
- transformation operator重复source的操作,直到sink operator向协调器确认snapshot完成。
- coordinator确认完成
本周期的snapshot已经完成。
5.2 Barrier对齐
对于下游算子来说,可能有多个与之相连的上游输入,我们将算子之间的边 称为通道。Source要将一个ID为n的Checkpoint Barrier向所有下游算子广播,这也意味着下游算子的多个输入里都有同一个Checkpoint Barrier,而且 不同输入里Checkpoint Barrier的流入进度可能不同。因此Checkpoint Barrier传播的过程需要进行对齐(Barrier Alignment)

算子对齐分为四部:
(1). 算子子任务在某个输入通道中收到第一个ID为n的Checkpoint Barrier,但是其他输入通道中ID为n的Checkpoint Barrier还未到达,该算子子任务开始准备进行对齐。
(2). 算子子任务将第一个输入通道的数据缓存下来,同时继续处理其他输入通道的数据,这个过程被称为对齐。
(3). 第二个输入通道的Checkpoint Barrier抵达该算子子任务,该算子子任务执行快照,将状态写入State Backend,然后将ID为n的Checkpoint Barrier向下游所有输出通道广播。
(4). 对于这个算子子任务,快照执行结束,继续处理各个通道中新流入数据,包括刚才缓存起来的数据。

6.快照性能优化方案
每次进行Checkpoint前,都需要暂停处理新流入数据,然后开始执行快照,假如状态比较大,一次快照可能长达几秒甚至几分钟。
Checkpoint Barrier对齐时,必须等待所有上游通道都处理完,假如某个上游通道处理很慢,这可能造成整个数据流堵塞。
两种优化方案
① Flink提供了异步快照(Asynchronous Snapshot)的机制。当实际执行快照时,Flink可以立即向下广播Checkpoint Barrier,表示自己已经执行完自己部分的快照。一旦数据同步完成,再给Checkpoint Coordinator发送确认信息。通过基于 Chandy-Lamport 算法的分布式快照,将检查点的保存和数据处理分离开,不暂停整个应用。
② Flink允许跳过对齐这一步,或者说一个算子子任务不需要等待所有上游通道的Checkpoint Barrier,直接将Checkpoint Barrier广播,执行快照并继续处理后续流入数据。为了保证数据一致性,Flink必须将那些较慢的数据流中的元素也一起快照,一旦重启,这些元素会被重新处理一遍
相关文章:
Flink状态的理解
Flink是一个带状态的数据处理系统;系统在处理数据的过程中,各算子所记录的状态会随着数据的处理而不断变化; 1. 状态 所谓状态State,一般指一个具体的 Task 的状态,即线程处理过程中需要保存的历史数据或历史累计数据…...
6.3.tensorRT高级(1)-yolov5模型导出、编译到推理(无封装)
目录 前言1. YOLOv5导出2. YOLOv5推理3. 补充知识总结 前言 杜老师推出的 tensorRT从零起步高性能部署 课程,之前有看过一遍,但是没有做笔记,很多东西也忘了。这次重新撸一遍,顺便记记笔记。 本次课程学习 tensorRT 高级-yolov5模…...
如何利用设备数字化平台推动精益制造?
人工智能驱动技术的不断发展,尤其是基于机器学习的预测分析工具的使用,为制造业带来了全新的效率和价值水平。一直以来,精益生产(也叫精益制造)在制造业中扮演着重要角色,而现在通过与工业 4.0的融合&#…...
使用Wps减小PDF文件的大小
第一步、打开左上角的文件 第二步、点击打印选项 第三步、点击打印按钮...
【深度学习】GPT-3
2020年5月,OpenAI在长达72页的论文《https://arxiv.org/pdf/2005.14165Language Models are Few-Shot Learners》中发布了GPT-3,共有1750亿参数量,需要700G的硬盘存储,(GPT-2有15亿个参数),它比GPT-2有了极大的改进。根…...
在登录界面中设置登录框、多选项和按钮(HTML和CSS)
登录框(Input框)的样式: /* 设置输入框的宽度和高度 */ input[type"text"], input[type"password"] {width: 200px;height: 30px; }/* 设置输入框的边框样式、颜色和圆角 */ input[type"text"], input[type&q…...
【语音识别】- 声学,词汇和语言模型
一、说明 语音识别是指计算机通过处理人类语言的音频信号,将其转换为可理解的文本形式的技术。也就是说,它可以将人类的口语语音转换为文本,以便计算机能够进一步处理和理解。它是自然语言处理技术的一部分,被广泛应用于语音识别助…...
【考研英语语法及长难句】小结
【 考场攻略汇总 】 考点汇总 考场攻略 #1 断开长难句只看谓语动词,不考虑非谓语动词先找从句,先看主句 考场攻略 #2 抓住谓语动词,抓住句子最核心的表述动作或内容通过定位谓语动词,找到复杂多变的主语通过谓语动词的数量&…...
C# 反射
反射的概念:C#通过类型(Type)来创建对象,调用对象中的方法,属性等信息;B超就是利用了反射原理将超声波打在人的肚子上,然后通过反射波进行体内器官的成员; 反射提供的类:…...
Pytorch(二)
一、分类任务 构建分类网络模型 必须继承nn.Module且在其构造函数中需调用nn.Module的构造函数无需写反向传播函数,nn.Module能够利用autograd自动实现反向传播Module中的可学习参数可以通过named_parameters()返回迭代器 from torch import nn import torch.nn.f…...
Python 使用http时间同步设置系统时间源码
Python方式实现使用http时间同步设置系统时间源码,系统环境是ubuntu 12.04、Python2.7版本。需要使用到time、os及httplib方法。 Python使用http时间同步设置系统时间,源码如下: #-*-coding:utf8 -*- import httplib as client import time…...
golang sync.singleflight 解决热点缓存穿透问题
在 go 的 sync 包中,有一个 singleflight 包,里面有一个 singleflight.go 文件,代码加注释,一共 200 行出头。内容包括以下几块儿: Group 结构体管理一组相关的函数调用工作,它包含一个互斥锁和一个 map,map 的 key 是…...
4、Linux驱动开发:设备-设备号设备号注册
目录 🍅点击这里查看所有博文 随着自己工作的进行,接触到的技术栈也越来越多。给我一个很直观的感受就是,某一项技术/经验在刚开始接触的时候都记得很清楚。往往过了几个月都会忘记的差不多了,只有经常会用到的东西才有可能真正记…...
C++(MFC)调用Python
环境: phyton版本:3.10 VS版本:VS2017 包含文件头:Python\Python310\include 包含库文件:Python\Python310\libs 程序运行期间,以下函数只需要调用一次即可,重复调用会导致崩溃 void Initial…...
深度学习实践——循环神经网络实践
系列实验 深度学习实践——卷积神经网络实践:裂缝识别 深度学习实践——循环神经网络实践 深度学习实践——模型部署优化实践 深度学习实践——模型推理优化练习 代码可见于: 深度学习实践——循环神经网络实践 0 概况1 架构实现1.1 RNN架构1.1.1 RNN架…...
docker简单web管理docker.io/uifd/ui-for-docker
要先pull这个镜像docker.io/uifd/ui-for-docker 这个软件默认只能使用9000端口,别的不行,因为作者在镜像制作时已加入这一层 刚下下来镜像可以通过docker history docker.io/uifd/ui-for-docker 查看到这个端口已被 设置 如果在没有设置br0网关时&…...
SpringBoot内嵌的Tomcat:
SpringBoot内嵌Tomcat源码: 1、调用启动类SpringbootdemoApplication中的SpringApplication.run()方法。 SpringBootApplication public class SpringbootdemoApplication {public static void main(String[] args) {SpringApplication.run(SpringbootdemoApplicat…...
企业级docker应用注意事项
现在很多企业使用容器化技术部署应用,绕不开的docker技术,在生产环境docker常用操作总结。参考:https://juejin.cn/post/7259275893796651069 1. 尽可能使用官方镜像 在docker hub 官方 使用后面带有 DOCKER OFFICIAL IMAGE 标签的镜像&…...
腾讯云高性能计算集群CPU服务器处理器说明
腾讯云高性能计算集群以裸金属云服务器为节点,通过RDMA互联,提供了高带宽和极低延迟的网络服务,能满足大规模高性能计算、人工智能、大数据推荐等应用的并行计算需求,腾讯云服务器网分享腾讯云服务器高性能计算集群CPU处理器说明&…...
tinkerCAD案例:23.Tinkercad 中的自定义字体
tinkerCAD案例:23.Tinkercad 中的自定义字体 原文 Tinkercad Projects Tinkercad has a fun shape in the Shape Generators section that allows you to upload your own font in SVG format and use it in your designs. I’ve used it for a variety of desi…...
挑战杯推荐项目
“人工智能”创意赛 - 智能艺术创作助手:借助大模型技术,开发能根据用户输入的主题、风格等要求,生成绘画、音乐、文学作品等多种形式艺术创作灵感或初稿的应用,帮助艺术家和创意爱好者激发创意、提高创作效率。 - 个性化梦境…...
MPNet:旋转机械轻量化故障诊断模型详解python代码复现
目录 一、问题背景与挑战 二、MPNet核心架构 2.1 多分支特征融合模块(MBFM) 2.2 残差注意力金字塔模块(RAPM) 2.2.1 空间金字塔注意力(SPA) 2.2.2 金字塔残差块(PRBlock) 2.3 分类器设计 三、关键技术突破 3.1 多尺度特征融合 3.2 轻量化设计策略 3.3 抗噪声…...
大话软工笔记—需求分析概述
需求分析,就是要对需求调研收集到的资料信息逐个地进行拆分、研究,从大量的不确定“需求”中确定出哪些需求最终要转换为确定的“功能需求”。 需求分析的作用非常重要,后续设计的依据主要来自于需求分析的成果,包括: 项目的目的…...
【JavaEE】-- HTTP
1. HTTP是什么? HTTP(全称为"超文本传输协议")是一种应用非常广泛的应用层协议,HTTP是基于TCP协议的一种应用层协议。 应用层协议:是计算机网络协议栈中最高层的协议,它定义了运行在不同主机上…...
线程同步:确保多线程程序的安全与高效!
全文目录: 开篇语前序前言第一部分:线程同步的概念与问题1.1 线程同步的概念1.2 线程同步的问题1.3 线程同步的解决方案 第二部分:synchronized关键字的使用2.1 使用 synchronized修饰方法2.2 使用 synchronized修饰代码块 第三部分ÿ…...
Linux简单的操作
ls ls 查看当前目录 ll 查看详细内容 ls -a 查看所有的内容 ls --help 查看方法文档 pwd pwd 查看当前路径 cd cd 转路径 cd .. 转上一级路径 cd 名 转换路径 …...
JVM暂停(Stop-The-World,STW)的原因分类及对应排查方案
JVM暂停(Stop-The-World,STW)的完整原因分类及对应排查方案,结合JVM运行机制和常见故障场景整理而成: 一、GC相关暂停 1. 安全点(Safepoint)阻塞 现象:JVM暂停但无GC日志,日志显示No GCs detected。原因:JVM等待所有线程进入安全点(如…...
大语言模型(LLM)中的KV缓存压缩与动态稀疏注意力机制设计
随着大语言模型(LLM)参数规模的增长,推理阶段的内存占用和计算复杂度成为核心挑战。传统注意力机制的计算复杂度随序列长度呈二次方增长,而KV缓存的内存消耗可能高达数十GB(例如Llama2-7B处理100K token时需50GB内存&a…...
【Linux】Linux 系统默认的目录及作用说明
博主介绍:✌全网粉丝23W,CSDN博客专家、Java领域优质创作者,掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域✌ 技术范围:SpringBoot、SpringCloud、Vue、SSM、HTML、Nodejs、Python、MySQL、PostgreSQL、大数据、物…...
【C++进阶篇】智能指针
C内存管理终极指南:智能指针从入门到源码剖析 一. 智能指针1.1 auto_ptr1.2 unique_ptr1.3 shared_ptr1.4 make_shared 二. 原理三. shared_ptr循环引用问题三. 线程安全问题四. 内存泄漏4.1 什么是内存泄漏4.2 危害4.3 避免内存泄漏 五. 最后 一. 智能指针 智能指…...
