Flink状态的理解
Flink是一个带状态的数据处理系统;系统在处理数据的过程中,各算子所记录的状态会随着数据的处理而不断变化;
1. 状态
所谓状态State,一般指一个具体的 Task 的状态,即线程处理过程中需要保存的历史数据或历史累计数据,默认保存在 Java 的堆内存中。
根据算子是否存在按照Key进行分区,State可以划分为keyed state 和 Non-keyed state(Operator State、算子状态)
- operator state是task级别的state,说白了就是每个task对应一个state, 在逻辑上,由算子task下所有subtask共享
Operator State的经常被用在Source或Sink算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证Flink应用的Exactly-Once语义。 - keyed state 是基于KeyedStream上的状态,这个状态是跟特定的Key 绑定的。KeyedStream流上的每一个Key,都对应一个State
2. 状态数据结构
状态数据由Flink内置状态机制管理。keyed state提供了5种数据结构
2.1 keyed state 数据结构
状态 | 状态描述 |
---|---|
ValueState | 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的key |
ListState | 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索 |
MapState | 维护了一个映射列表 |
ReducingState | 保存一个单值,表示添加到状态的所有值的聚合 |
AggregateState | 保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与添加到状态的元素的类型不同 |
2.2 operator state
状态 | 状态描述 |
---|---|
ListState | ListState的快照存储数据,系统重启后,list数据的重分配模式为: round-robin 轮询平均分配 |
UnionListState | UnionListState的快照存储数据,在系统重启后,list数据的重分配模式为: 广播模式; 在每个subtask上都拥有一份完整的数据; |
3. 状态后端
默认情况下,state会保存在taskmanager
的JVM堆内存,checkpoint
会存储在JobManager
的内存中。然而,状态数据的存储和checkpoint的存储位置可以改变,由state Backend(状态后端)
配置实现
老版本(flink-1.12版及以前) Fsstatebackend MemoryStatebackend RocksdbStateBackend
flink1.12版本之后,可用的状态后端类型有两种
HashMapStateBackend、EmbeddedRocksDBStateBackend
而且其所生成的快照文件也统一了格式,因而在job重新部署或者版本升级时,可以任意替换statebackend
-
HashMapStateBackend
※ 状态数据是以java对象形式存储在heap内存中;
※ 内存空间不够时,也会溢出部分数据到本地磁盘文件;
※ 可以支撑大规模的状态数据;(只不过在状态数据规模超出内存空间时,读写效率就会明显降低) -
EmbeddedRocksDBStateBackend
※ RocksDB使用一套日志结构的数据库引擎,它是Flink中内置的第三方状态管理器, 为了更好的性能,这套引擎是用C++编写的。 Key和value是任意大小的字节流。
※ 它需要配置一个远端的filesystem uri(一般是HDFS),在做checkpoint的时候,会把本地的数据直接复制到fileSystem中。fail over的时候从fileSystem中恢复到本地, RocksDB克服了state受内存限制的缺点,同时又能够持久化到远端文件系统中,比较适合在生产中使用。
※ 使用RocksDB + HDFS进行state存储:首先
state先在taskManger的本地存储到RocksDB
,然后异步写入
到HDFS中,状态数量仅仅受限于本地磁盘容量限制
。
4. 状态数据容错
Flink是一个stateful(带状态)的数据处理系统;系统在处理数据的过程中,各算子所记录的状态会随着数据的处理而不断变化;
一旦系统崩溃,需要重启后能够恢复出崩溃前的状态才能进行数据的接续处理;因此,必须要一种机制能对系统内的各种状态进行持久化容错;Flink用checkpoint机制实现状态数据的容错
4.1 checkpoint
Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator/task的状态来生成快照
,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复
,从而修正因为故障带来的程序数据异常。
-
checkpoint默认关闭,需要手工开启。开启后,默认Exactly-once快照模式。还有一种快照模式为At-least-once
-
checkPoint的位置设置
flink-conf.yaml#state.checkpoints.dir
-
Flink支持不同的重启策略,以在故障发生时控制作业如何重启,集群在启动时会伴随一个默认的重启策略。配置参数
flink-conf.yaml#restart-strategy
♦ 如果没有启用 checkpointing,则使用无重启 (no restart) 策略 ♦ 如果启用了 checkpointing,但没有配置重启策略,则使用固定 间隔 (fixed-delay) 策略,尝试重启次数默认值是:Integer.MAX_VALUE。
♦ 另一种重启策略为Failure rate,某时间段内失败了N次就重启
全局配置
# 每隔3s重启一次,重试间隔为10s
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
#5分钟内若失败了3次则认为该job失败,重试间隔为10s
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
# 不重启
restart-strategy: none
单个JOB内配置
Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
//开启状态检查点机制(它将会定期对整个系统中各个task的状态进行快照持久化,以便失败重启后还能从失败之前的状态恢复)env.enableCheckpointing(1000,CheckpointingMode.EXACTLY_ONCE);
// checkpoint机制触发后,持久化保存各task状态数据的存储位置(生产中用hdfs
env.getCheckpointConfig().setCheckpointStorage("hdfs://node01:8020/tmp/flink/state");
// 指定状态后端存储(内存)
env.setStateBackend(new HashMapStateBackend());
// 开启自动重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.milliseconds(2000)));
// env.setRestartStrategy(RestartStrategies.failureRateRestart(3, org.apache.flink.api.common.time.Time.seconds(100), org.apache.flink.api.common.time.Time.seconds(10)));
// env.setRestartStrategy(RestartStrategies.noRestart())
- checkpoint个数默认保留最近成功生成的一个,支持保留多个,通过参数
flink-conf.yaml#state.checkpoints.num-retained
控制,如果希望回退到某个Checkpoint点,只需要指定对应的某个Checkpoint路径即可实现
flink run -t yarn-per-job -yjm 1024 -ytm 1024 -s hdfs://node01:8020/tmp/flink/state/715b120fe8736a3af7842ea0a5264c46/chk-6/_metadata
4.2 savePoint
savePoint是检查点一种特殊实现,底层其实也是使用Checkpoint的机制。
savePoint是用户以手工命令的方式触发checkpoint
,并将结果持久化到指定的存储目录中。
作业升级、代码修改、任务迁移和维护,都可以使用savePoint
-
savePoint的存储位置
savePoint的存储位置flink-conf.yaml#state.savepoints.dir
,不是必须设置,但设置了后, 后面创建指定Job的Savepoint时,可以不用在手动执行命令时指定Savepoint的位置。 -
savePoint的手动触发:
#【针对on standAlone模式】
bin/flink savepoint jobId [targetDirectory]#【针对on yarn模式需要指定-yid参数】
bin/flink savepoint jobId [targetDirectory] -yid yarnAppId
jobId 需要触发savepoint的jobId编号
targetDirectory 指定savepoint存储数据目录
-yid 指定yarnAppId
例如: flink savepoint 84e766231bbe4b9ff3667f9a0d80b867 -yid application_1619059559839_0001
- 查看HDFS上savepoint目录
#Savepoint directory /flink/savepoints/savepoint-:shortjobid-:savepointid/
#Savepoint file contains the checkpoint meta data /savepoints/savepoint-:shortjobid-:savepointid/_metadata
4. 触发savepoint并且停止作业
##语法: bin/flink stop jobId -yid yarnAppId
##例如: flink stop 84e766231bbe4b9ff3667f9a0d80b867 -yid application_1619059559839_0001
- 从指定的savepoint启动job
##语法: bin/flink run -s savepointPath [runArgs]
##例如: flink run -t yarn-per-job -yjm 1024 -ytm 1024
-s hdfs://node01:8020/flink/savepoints/savepoint-84e766-0591f3377ad0
-c com.loess.checkpoint.TestCheckPoint flink-study-1.0-SNAPSHOT.jar
- 清除savepoint数据
bin/flink savepoint -d savepointPath
##也可以手动删除某个savepoint,这通过常规的文件系统操作就可以做到,不影响其它的savepoints和checkpoints
- savePoint使用建议
为了能够在作业的不同版本之间以及 Flink 的不同版本之间顺利升级,推荐通过 uid(String) 方法手动的给算子赋予 ID,这些
ID 将用于确定每一个算子的状态范围。不手动给各算子指定 ID,则会由 Flink 自动给每个算子生成一个 ID。只要这些 ID 没有改变就能从保存点
(savepoint)将程序恢复回来。而这些自动生成的 ID
依赖于程序的结构,并且对代码的更改敏感。当程序改变时,ID会随之变化,所以建议用户手动设置 ID
DataStream<String> stream = env.// Stateful source (e.g. Kafka) with ID.addSource(new StatefulSource()).uid("source-id") // ID for the source operator.shuffle()// Stateful mapper with ID.map(new StatefulMapper()).uid("mapper-id") // ID for the mapper// Stateless printing sink.print(); // Auto-generated ID
4.3 savePoint与checkPoint的区别
- checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从checkpoint来恢复运行,且不影响作业逻辑的准确性。而savepoint的侧重点是“维护”,即Flink作业需要在人工干预下手动重启、升级、迁移或A/B测试时,先将状态整体写入可靠存储,维护完毕之后再从savepoint恢复现场。
- savepoint是通过checkpoint机制创建的,所以savepoint本质上是特殊的checkpoint。
- checkpoint面向Flink Runtime本身,由Flink的各个TaskManager定时触发快照并自动清理,一般不需要用户干预;savepoint面向用户,完全根据用户的需要触发与清理。
- checkpoint是支持增量的(通过RocksDB),特别是对于超大状态的作业而言可以降低写入成本。savepoint并不会连续自动触发,所以savepoint没有必要支持增量。
5. checkpoint机制原理
checkPoint是所有 Operator / Task 的状态在某个时间点的一份拷贝(一份快照)
, 这个时间点应该是所有 Operator / Task 任务都恰好处理完一个相同的输入数据的时候。
若某个subTask挂了,则此时的状态都被清空,从checkpoint恢复最近一次的状态,重新启动应用程序,计算输入流
5.1 Barrier机制
Barrier是一种特殊事件,用来作为快照信号,由checkpoint 协调器向数据流中注入该信号,subtask任务收到该信号后,就会执行状态的快照。
- 首先是JobManager中的checkpoint Coordinator(协调器) 向任务中的所有source Task周期性发送barrier(栅栏)进行快照请求。
- source Task接受到barrier后, 会把当前自己的状态进行snapshot(可以保存在HDFS上)。
- source向checkpoint coordinator确认snapshot已经完成。
- source继续向下游transformation operator发送 barrier。
- transformation operator重复source的操作,直到sink operator向协调器确认snapshot完成。
- coordinator确认完成
本周期的snapshot
已经完成。
5.2 Barrier对齐
对于下游算子来说,可能有多个与之相连的上游输入,我们将算子之间的边
称为通道。Source要将一个ID为n的Checkpoint Barrier向所有下游算子广播,这也意味着下游算子的多个输入里都有同一个Checkpoint Barrier,而且 不同输入里Checkpoint Barrier的流入进度可能不同。因此Checkpoint Barrier传播的过程需要进行对齐(Barrier Alignment)
算子对齐分为四部:
(1). 算子子任务在某个输入通道中收到第一个ID为n的Checkpoint Barrier,但是其他输入通道中ID为n的Checkpoint Barrier还未到达,该算子子任务开始准备进行对齐
。
(2). 算子子任务将第一个输入通道的数据缓存下来,同时继续处理其他输入通道的数据,这个过程被称为对齐。
(3). 第二个输入通道的Checkpoint Barrier抵达该算子子任务,该算子子任务执行快照,将状态写入State Backend,然后将ID为n的Checkpoint Barrier向下游所有输出通道广播。
(4). 对于这个算子子任务,快照执行结束,继续处理各个通道中新流入数据,包括刚才缓存起来的数据。
6.快照性能优化方案
每次进行Checkpoint前,都需要暂停处理新流入数据,然后开始执行快照,假如状态比较大,一次快照可能长达几秒甚至几分钟。
Checkpoint Barrier对齐时,必须等待所有上游通道都处理完,假如某个上游通道处理很慢,这可能造成整个数据流堵塞。
两种优化方案
① Flink提供了异步快照(Asynchronous Snapshot)的机制。当实际执行快照时,Flink可以立即向下广播Checkpoint Barrier,表示自己已经执行完自己部分的快照。一旦数据同步完成,再给Checkpoint Coordinator发送确认信息
。通过基于 Chandy-Lamport 算法的分布式快照,将检查点的保存和数据处理分离开,不暂停整个应用。
② Flink允许跳过对齐这一步,或者说一个算子子任务不需要等待所有上游通道的Checkpoint Barrier,直接将Checkpoint Barrier广播,执行快照并继续处理后续流入数据。为了保证数据一致性,Flink必须将那些较慢的数据流中的元素也一起快照,一旦重启,这些元素会被重新处理一遍
相关文章:

Flink状态的理解
Flink是一个带状态的数据处理系统;系统在处理数据的过程中,各算子所记录的状态会随着数据的处理而不断变化; 1. 状态 所谓状态State,一般指一个具体的 Task 的状态,即线程处理过程中需要保存的历史数据或历史累计数据…...

6.3.tensorRT高级(1)-yolov5模型导出、编译到推理(无封装)
目录 前言1. YOLOv5导出2. YOLOv5推理3. 补充知识总结 前言 杜老师推出的 tensorRT从零起步高性能部署 课程,之前有看过一遍,但是没有做笔记,很多东西也忘了。这次重新撸一遍,顺便记记笔记。 本次课程学习 tensorRT 高级-yolov5模…...

如何利用设备数字化平台推动精益制造?
人工智能驱动技术的不断发展,尤其是基于机器学习的预测分析工具的使用,为制造业带来了全新的效率和价值水平。一直以来,精益生产(也叫精益制造)在制造业中扮演着重要角色,而现在通过与工业 4.0的融合&#…...

使用Wps减小PDF文件的大小
第一步、打开左上角的文件 第二步、点击打印选项 第三步、点击打印按钮...

【深度学习】GPT-3
2020年5月,OpenAI在长达72页的论文《https://arxiv.org/pdf/2005.14165Language Models are Few-Shot Learners》中发布了GPT-3,共有1750亿参数量,需要700G的硬盘存储,(GPT-2有15亿个参数),它比GPT-2有了极大的改进。根…...

在登录界面中设置登录框、多选项和按钮(HTML和CSS)
登录框(Input框)的样式: /* 设置输入框的宽度和高度 */ input[type"text"], input[type"password"] {width: 200px;height: 30px; }/* 设置输入框的边框样式、颜色和圆角 */ input[type"text"], input[type&q…...

【语音识别】- 声学,词汇和语言模型
一、说明 语音识别是指计算机通过处理人类语言的音频信号,将其转换为可理解的文本形式的技术。也就是说,它可以将人类的口语语音转换为文本,以便计算机能够进一步处理和理解。它是自然语言处理技术的一部分,被广泛应用于语音识别助…...

【考研英语语法及长难句】小结
【 考场攻略汇总 】 考点汇总 考场攻略 #1 断开长难句只看谓语动词,不考虑非谓语动词先找从句,先看主句 考场攻略 #2 抓住谓语动词,抓住句子最核心的表述动作或内容通过定位谓语动词,找到复杂多变的主语通过谓语动词的数量&…...
C# 反射
反射的概念:C#通过类型(Type)来创建对象,调用对象中的方法,属性等信息;B超就是利用了反射原理将超声波打在人的肚子上,然后通过反射波进行体内器官的成员; 反射提供的类:…...

Pytorch(二)
一、分类任务 构建分类网络模型 必须继承nn.Module且在其构造函数中需调用nn.Module的构造函数无需写反向传播函数,nn.Module能够利用autograd自动实现反向传播Module中的可学习参数可以通过named_parameters()返回迭代器 from torch import nn import torch.nn.f…...
Python 使用http时间同步设置系统时间源码
Python方式实现使用http时间同步设置系统时间源码,系统环境是ubuntu 12.04、Python2.7版本。需要使用到time、os及httplib方法。 Python使用http时间同步设置系统时间,源码如下: #-*-coding:utf8 -*- import httplib as client import time…...
golang sync.singleflight 解决热点缓存穿透问题
在 go 的 sync 包中,有一个 singleflight 包,里面有一个 singleflight.go 文件,代码加注释,一共 200 行出头。内容包括以下几块儿: Group 结构体管理一组相关的函数调用工作,它包含一个互斥锁和一个 map,map 的 key 是…...

4、Linux驱动开发:设备-设备号设备号注册
目录 🍅点击这里查看所有博文 随着自己工作的进行,接触到的技术栈也越来越多。给我一个很直观的感受就是,某一项技术/经验在刚开始接触的时候都记得很清楚。往往过了几个月都会忘记的差不多了,只有经常会用到的东西才有可能真正记…...
C++(MFC)调用Python
环境: phyton版本:3.10 VS版本:VS2017 包含文件头:Python\Python310\include 包含库文件:Python\Python310\libs 程序运行期间,以下函数只需要调用一次即可,重复调用会导致崩溃 void Initial…...

深度学习实践——循环神经网络实践
系列实验 深度学习实践——卷积神经网络实践:裂缝识别 深度学习实践——循环神经网络实践 深度学习实践——模型部署优化实践 深度学习实践——模型推理优化练习 代码可见于: 深度学习实践——循环神经网络实践 0 概况1 架构实现1.1 RNN架构1.1.1 RNN架…...

docker简单web管理docker.io/uifd/ui-for-docker
要先pull这个镜像docker.io/uifd/ui-for-docker 这个软件默认只能使用9000端口,别的不行,因为作者在镜像制作时已加入这一层 刚下下来镜像可以通过docker history docker.io/uifd/ui-for-docker 查看到这个端口已被 设置 如果在没有设置br0网关时&…...

SpringBoot内嵌的Tomcat:
SpringBoot内嵌Tomcat源码: 1、调用启动类SpringbootdemoApplication中的SpringApplication.run()方法。 SpringBootApplication public class SpringbootdemoApplication {public static void main(String[] args) {SpringApplication.run(SpringbootdemoApplicat…...
企业级docker应用注意事项
现在很多企业使用容器化技术部署应用,绕不开的docker技术,在生产环境docker常用操作总结。参考:https://juejin.cn/post/7259275893796651069 1. 尽可能使用官方镜像 在docker hub 官方 使用后面带有 DOCKER OFFICIAL IMAGE 标签的镜像&…...
腾讯云高性能计算集群CPU服务器处理器说明
腾讯云高性能计算集群以裸金属云服务器为节点,通过RDMA互联,提供了高带宽和极低延迟的网络服务,能满足大规模高性能计算、人工智能、大数据推荐等应用的并行计算需求,腾讯云服务器网分享腾讯云服务器高性能计算集群CPU处理器说明&…...

tinkerCAD案例:23.Tinkercad 中的自定义字体
tinkerCAD案例:23.Tinkercad 中的自定义字体 原文 Tinkercad Projects Tinkercad has a fun shape in the Shape Generators section that allows you to upload your own font in SVG format and use it in your designs. I’ve used it for a variety of desi…...

C++实现分布式网络通信框架RPC(3)--rpc调用端
目录 一、前言 二、UserServiceRpc_Stub 三、 CallMethod方法的重写 头文件 实现 四、rpc调用端的调用 实现 五、 google::protobuf::RpcController *controller 头文件 实现 六、总结 一、前言 在前边的文章中,我们已经大致实现了rpc服务端的各项功能代…...

MongoDB学习和应用(高效的非关系型数据库)
一丶 MongoDB简介 对于社交类软件的功能,我们需要对它的功能特点进行分析: 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具: mysql:关系型数据库&am…...

EtherNet/IP转DeviceNet协议网关详解
一,设备主要功能 疆鸿智能JH-DVN-EIP本产品是自主研发的一款EtherNet/IP从站功能的通讯网关。该产品主要功能是连接DeviceNet总线和EtherNet/IP网络,本网关连接到EtherNet/IP总线中做为从站使用,连接到DeviceNet总线中做为从站使用。 在自动…...
MySQL中【正则表达式】用法
MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现(两者等价),用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例: 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...
Python 高级应用10:在python 大型项目中 FastAPI 和 Django 的相互配合
无论是python,或者java 的大型项目中,都会涉及到 自身平台微服务之间的相互调用,以及和第三发平台的 接口对接,那在python 中是怎么实现的呢? 在 Python Web 开发中,FastAPI 和 Django 是两个重要但定位不…...

工厂方法模式和抽象工厂方法模式的battle
1.案例直接上手 在这个案例里面,我们会实现这个普通的工厂方法,并且对比这个普通工厂方法和我们直接创建对象的差别在哪里,为什么需要一个工厂: 下面的这个是我们的这个案例里面涉及到的接口和对应的实现类: 两个发…...
Easy Excel
Easy Excel 一、依赖引入二、基本使用1. 定义实体类(导入/导出共用)2. 写 Excel3. 读 Excel 三、常用注解说明(完整列表)四、进阶:自定义转换器(Converter) 其它自定义转换器没生效 Easy Excel在…...
scan_mode设计原则
scan_mode设计原则 在进行mtp controller设计时,基本功能设计完成后,需要设计scan_mode设计。 1、在进行scan_mode设计时,需要保证mtp处于standby模式,不会有擦写、编程动作。 2、只需要固定mtp datasheet说明的接口即可…...

LSTM-XGBoost多变量时序预测(Matlab完整源码和数据)
LSTM-XGBoost多变量时序预测(Matlab完整源码和数据) 目录 LSTM-XGBoost多变量时序预测(Matlab完整源码和数据)效果一览基本介绍程序设计参考资料 效果一览 基本介绍 普通的多变量时序已经用腻了,审稿人也看烦了&#…...

Linux 内核内存管理子系统全面解析与体系构建
一、前言: 为什么内存管理是核心知识 内存管理是 Linux 内核最核心也最复杂的子系统之一,其作用包括: 为软件提供独立的虚拟内存空间,实现安全隔离分配/回收物理内存资源,维持系统稳定支持不同类型的内存分配器,最优…...