Flink深入浅出之03:状态、窗口、checkpoint、两阶段提交
Flink是一个有状态的流,👅一起深入了解这个有状态的流
3️⃣ 目标
- 掌握State知识
- 掌握Flink三种State Backend
- 掌握Flink checkpoint和savepoint原理
- 了解Flink的重启策略
- checkpoint+two phase commit保证E-O语义
4️⃣ 要点
📖 1. Flink的State
1.1 state概述
Apache Flink® — Stateful Computations over Data Streams
Flink 是一个默认就有状态的分析引擎,前面的WordCount 案例可以做到单词的数量的累加,其实是因为在内存中保证了每个单词的出现的次数,这些数据其实就是状态数据。但是如果一个 Task 在处理过程中挂掉了,那么它在内存中的状态都会丢失,所有的数据都需要重新计算。从容错和消息处理的语义(At -least-once 和 Exactly-once)上来说,Flink引入了State 和 CheckPoint。State一般指一个具体的 Task/Operator 的状态,State数据默认保存在 Java 的堆内存中。
-
回顾单词计数的例子
package com.kaikeba.demo1import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, WindowedStream} import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow/*** 使用滑动窗口* 每隔1秒钟统计最近2秒钟的每个单词出现的次数*/ object FlinkStream {def main(args: Array[String]): Unit = {//构建流处理的环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//从socket获取数据 val sourceStream: DataStream[String] = env.socketTextStream("node01",9999)//导入隐式转换的包import org.apache.flink.api.scala._//对数据进行处理val result: DataStream[(String, Int)] = sourceStream.flatMap(x => x.split(" ")) //按照空格切分.map(x => (x, 1)) //每个单词计为1.keyBy(0) //按照下标为0的单词进行分组.sum(1) //按照下标为1累加相同单词出现的1//对数据进行打印result.print()//开启任务env.execute("FlinkStream") }} -
输入
hadoop hadoophadoophive hadoop -
输出
8> (hadoop,1)1> (hive,1)8> (hadoop,2)8> (hadoop,3)8> (hadoop,4)

1.2 state类型
- Flink中有两种基本类型的State, ,他们两种都可以以两种形式存在:
- 原生状态(raw state)
- 由算子自己管理数据结构,当触发Checkpoint操作过程中,Flink并不知道状态数据内部的数据结构,只是将数据转换成bytes数据存储在Checkpoints中,当从Checkpoints恢复任务时,算子自己再反序列化出状态的数据结构。
- 托管状态(managed state)
- 由Flink Runtime控制和管理状态数据,并将状态数据转换成为内存的Hash tables或 RocksDB的对象存储,然后将这些数据通过内部的接口持久化到checkpoints中,任务异常时可以通过这些状态数据恢复任务。
- 推荐使用ManagedState管理状态数据,ManagedState更好的支持状态数据的重平衡以及更加完善的内存管理
- 原生状态(raw state)
| Managed State | Raw State | |
|---|---|---|
| 状态管理方式 | Flink Runtime托管,自动存储、自动恢复、自动伸缩 | 用户自己管理 |
| 状态数据结构 | Flink提供的常用数据结构,如ListState、MapState等 | 字节数组:byte[] |
| 使用场景 | 绝大多数Flink算子 | 用户自定义算子 |
1.2.1 Operator State(算子状态)
- operator state是task级别的state,说白了就是每个task对应一个state。
- Kafka Connector source中的每个分区(task)都需要记录消费的topic的partition和offset等信息。
- 对于Operator State,我们还需进一步实现
CheckpointedFunction接口。 - Operator State的实际应用场景不如Keyed State多,它经常被用在Source或Sink等算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证Flink应用的Exactly-Once语义。

1.2.2 keyed State(键控状态)
-
Keyed State:- 顾名思义就是基于KeyedStream上的状态,这个状态是跟特定的Key 绑定的。KeyedStream流上的每一个Key,都对应一个State。Flink针对 Keyed State 提供了以下可以保存State的数据结构.
-
Keyed state托管状态有六种类型:-
1、ValueState
保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的key,因此算子接收到的每个key都可能对应一个值)。 这个值可以通过update(T) 进行更新,通过 T value() 进行检索 -
2、ListState
保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List<T>) 进行添加元素,通过 Iterable<T> get() 获得整个列表。还可以通过 update(List<T>) 覆盖当前的列表 。 -
3、MapState
维护了一个映射列表。 你可以添加键值对到状态中,也可以获得 反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map<UK,UV>) 添加映射。 使用 get(UK) 检索特定 key。 使用 entries(),keys() 和 values() 分别检索映射、 键和值的可迭代视图。 -
4、ReducingState
保存一个单值,表示添加到状态的所有值的聚合。接口与ListState类似,但使用add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。 -
5、AggregatingState
AggregatingState<IN, OUT>: 保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与添加到状态的元素的类型不同。 接口与 ListState类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合
-
-
keyedState使用方法
- 1、只能用于
RichFunction - 2、将
State声明为实例变量 - 3、在
open()方法中为State赋值- 创建一个
StateDescriptor - 利用
getRuntimeContext().getXXState(...)构建不同的State
- 创建一个
- 4、调用State的方法进行
读写- 例如 state.value()、state.update(…)等等
- 1、只能用于

1.3 Keyed State案例演示
1.3.1 ValueState
-
作用
- 保存一个可以更新和检索的值
-
需求
- 使用valueState实现平均值求取
-
代码开发
package com.kaikeba.keystateimport org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.util.Collector/*** 使用valueState实现平均值求取*/ object ValueStateOperate {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._env.fromCollection(List((1L, 3d),(1L, 5d),(1L, 7d),(1L, 4d),(1L, 2d))).keyBy(_._1).flatMap(new CountAverageWithValue()).print()env.execute()} }class CountAverageWithValue extends RichFlatMapFunction[(Long, Double), (Long, Double)] {//定义ValueState类型的变量private var sum: ValueState[(Long, Double)] = _override def open(parameters: Configuration): Unit = {//初始化获取历史状态的值sum = getRuntimeContext.getState(new ValueStateDescriptor[(Long, Double)]("average", classOf[(Long, Double)])) }override def flatMap(input: (Long, Double), out: Collector[(Long, Double)]): Unit = {// access the state valueval tmpCurrentSum = sum.value// If it hasn't been used before, it will be nullval currentSum = if (tmpCurrentSum != null) {tmpCurrentSum} else {(0L, 0d)}// update the countval newSum = (currentSum._1 + 1, currentSum._2 + input._2)// update the statesum.update(newSum)// if the count reaches 2, emit the average and clear the stateif (newSum._1 >= 2) {out.collect((input._1, newSum._2 / newSum._1))//将状态清除//sum.clear()}}}
1.3.2 ListState
-
作用
- 用于保存每个key的历史数据数据成为一个列表
-
需求
- 使用ListState求取数据平均值
-
代码开发
package com.kaikeba.keystateimport java.langimport java.util.Collectionsimport org.apache.flink.api.common.functions.RichFlatMapFunctionimport org.apache.flink.api.common.state.{ListState, ListStateDescriptor}import org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.util.Collector/*** 使用ListState实现平均值求取* ListState<T> :这个状态为每一个 key 保存集合的值* get() 获取状态值* add() / addAll() 更新状态值,将数据放到状态中* clear() 清除状态*/object ListStateOperate {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._env.fromCollection(List((1L, 3d),(1L, 5d),(1L, 7d),(2L, 4d),(2L, 2d),(2L, 6d))).keyBy(_._1).flatMap(new CountAverageWithList).print()env.execute()}}class CountAverageWithList extends RichFlatMapFunction[(Long,Double),(Long,Double)]{//定义我们历史所有的数据获取private var elementsByKey: ListState[(Long,Double)] = _override def open(parameters: Configuration): Unit = {//初始化获取历史状态的值,每个key对应的所有历史值,都存储在list集合里面了val listState = new ListStateDescriptor[(Long,Double)]("listState",classOf[(Long,Double)])elementsByKey = getRuntimeContext.getListState(listState)}override def flatMap(element: (Long, Double), out: Collector[(Long, Double)]): Unit = {//获取当前key的状态值val currentState: lang.Iterable[(Long, Double)] = elementsByKey.get()//如果初始状态为空,那么就进行初始化,构造一个空的集合出来,准备用于存储后续的数据if(currentState == null){elementsByKey.addAll(Collections.emptyList())}//添加元素elementsByKey.add(element)import scala.collection.JavaConverters._val allElements: Iterator[(Long, Double)] = elementsByKey.get().iterator().asScalaval allElementList: List[(Long, Double)] = allElements.toListif(allElementList.size >= 3){var count = 0Lvar sum = 0dfor(eachElement <- allElementList){count +=1sum += eachElement._2}out.collect((element._1,sum/count))}}}
1.3.3 MapState
-
作用
- 用于将每个key对应的数据都保存成一个map集合
-
需求
- 使用MapState求取每个key对应的平均值
-
代码开发
package com.kaikeba.keystateimport java.util.UUIDimport org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.state.{MapState, MapStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.util.Collector import org.apache.flink.api.scala._/*** 使用MapState求取每个key对应的平均值*/ object MapStateOperate {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.fromCollection(List((1L, 3d),(1L, 5d),(1L, 7d),(2L, 4d),(2L, 2d),(2L, 6d))).keyBy(_._1).flatMap(new CountAverageMapState).print()env.execute()} }class CountAverageMapState extends RichFlatMapFunction[(Long,Double),(Long,Double)]{private var mapState:MapState[String,Double] = _//初始化获取mapState对象override def open(parameters: Configuration): Unit = {val mapStateOperate = new MapStateDescriptor[String,Double]("mapStateOperate",classOf[String],classOf[Double])mapState = getRuntimeContext.getMapState(mapStateOperate)}override def flatMap(input: (Long, Double), out: Collector[(Long, Double)]): Unit = {//将相同的key对应的数据放到一个map集合当中去,就是这种对应 key -> Map((key1, value1),(key2, value2)) //每次都构建一个map集合mapState.put(UUID.randomUUID().toString,input._2)import scala.collection.JavaConverters._//获取map集合当中所有的value,我们每次将数据的value给放到map的value里面去val listState: List[Double] = mapState.values().iterator().asScala.toListif(listState.size >=3){var count = 0Lvar sum = 0dfor(eachState <- listState){count +=1sum += eachState}out.collect(input._1,sum/count)}} }
1.3.4 ReducingState
-
作用
- 用于数据的聚合
-
需求
- 使用ReducingState求取每个key对应的平均值
-
代码开发
package com.kaikeba.keystateimport org.apache.flink.api.common.functions.{ReduceFunction, RichFlatMapFunction} import org.apache.flink.api.common.state.{ReducingState, ReducingStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.util.Collector/*** ReducingState<T> :这个状态为每一个 key 保存一个聚合之后的值* get() 获取状态值* add() 更新状态值,将数据放到状态中* clear() 清除状态*/object ReduceingStateOperate {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._env.fromCollection(List((1L, 3d),(1L, 5d),(1L, 7d),(2L, 4d),(2L, 2d),(2L, 6d))).keyBy(_._1).flatMap(new CountAverageReduceStage).print()env.execute()} }class CountAverageReduceStage extends RichFlatMapFunction[(Long,Double),(Long,Double)]{//定义ReducingStateprivate var reducingState:ReducingState[Double] = _//定义一个计数器var counter=0Loverride def open(parameters: Configuration): Unit = {val reduceSum = new ReducingStateDescriptor[Double]("reduceSum", new ReduceFunction[ Double] {override def reduce(value1: Double, value2: Double): Double = {value1+ value2}}, classOf[Double])//初始化获取reducingState对象reducingState = getRuntimeContext.getReducingState[Double](reduceSum)} override def flatMap(input: (Long, Double), out: Collector[(Long, Double)]): Unit = {//计数器+1counter+=1//添加数据到reducingStatereducingState.add(input._2)out.collect(input._1,reducingState.get()/counter)} }
1.3.5 AggregatingState
-
作用
- 将相同key的数据进行聚合
-
需求
- 将相同key的数据聚合成为一个字符串
-
代码开发
package com.kaikeba.keystateimport org.apache.flink.api.common.functions.{AggregateFunction, RichFlatMapFunction}import org.apache.flink.api.common.state.{AggregatingState, AggregatingStateDescriptor}import org.apache.flink.configuration.Configurationimport org.apache.flink.runtime.state.memory.MemoryStateBackendimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.util.Collectorimport org.apache.flink.api.scala._/*** 将相同key的数据聚合成为一个字符串*/object AggregrageStateOperate {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.fromCollection(List((1L, 3d),(1L, 5d),(1L, 7d),(2L, 4d),(2L, 2d),(2L, 6d))).keyBy(_._1).flatMap(new AggregrageState).print()env.execute()}}/*** (1L, 3d),(1L, 5d),(1L, 7d), 把相同key的value拼接字符串:Contains-3-5-7*/class AggregrageState extends RichFlatMapFunction[(Long,Double),(Long,String)]{//定义AggregatingStateprivate var aggregateTotal:AggregatingState[Double, String] = _override def open(parameters: Configuration): Unit = {/*** name: String,* aggFunction: AggregateFunction[IN, ACC, OUT],* stateType: Class[ACC]*/val aggregateStateDescriptor = new AggregatingStateDescriptor[Double, String, String]("aggregateState", new AggregateFunction[Double, String, String] {//创建一个初始值override def createAccumulator(): String = {"Contains"}//对数据进行累加override def add(value: Double, accumulator: String): String = {accumulator + "-" + value}//获取累加的结果override def getResult(accumulator: String): String = {accumulator}//数据合并的规则override def merge(a: String, b: String): String = {a + "-" + b}}, classOf[String])//获取AggregatingState对象aggregateTotal = getRuntimeContext.getAggregatingState(aggregateStateDescriptor)}override def flatMap(input: (Long, Double), out: Collector[(Long, String)]): Unit = {aggregateTotal.add(input._2)out.collect(input._1,aggregateTotal.get())}}
1.4 Operator State案例演示
- 需求
-
实现每两条数据进行输出打印一次,不用区分数据的key
-
这里使用ListState实现
package com.kaikeba.operatorstateimport org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}import scala.collection.mutable.ListBuffer/*** 实现每两条数据进行输出打印一次,不用区分数据的key*/ object OperatorListState {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val sourceStream: DataStream[(String, Int)] = env.fromCollection(List(("spark", 3),("hadoop", 5),("hive", 7),("flume", 9)))sourceStream.addSink(new OperateTaskState).setParallelism(1)env.execute()}}class OperateTaskState extends SinkFunction[(String,Int)]{//定义一个list 用于我们每两条数据打印一下private var listBuffer:ListBuffer[(String,Int)] = new ListBuffer[(String, Int)]override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {listBuffer.+=(value)if(listBuffer.size ==2){println(listBuffer)//清空state状态listBuffer.clear()}}}
-
📖 2. Flink的状态管理之State Backend
- 默认情况下,state会保存在taskmanager的内存中,checkpoint会存储在JobManager的内存中。state 的存储和checkpoint的位置取决于State Backend的配置。
- Flink一共提供了3种StateBackend
- MemoryStateBackend
- 基于内存存储
- FsStateBackend
- 基于文件系统存储
- RocksDBStateBackend
- 基于数据库存储
- MemoryStateBackend
- 可以通过 ==StreamExecutionEnvironment.setStateBackend(…)==来设置state存储的位置
2.1 MemoryStateBackend
将数据持久化状态存储到内存当中,state数据保存在java堆内存中,执行checkpoint的时候,会把state的快照数据保存到jobmanager的内存中。基于内存的state backend在生产环境下不建议使用。

- 代码配置:
environment.setStateBackend(new MemoryStateBackend())
- 使用场景:
(1)本地调试
(2)flink任务状态数据量较小的场景
2.2 FsStateBackend
state数据保存在taskmanager的内存中,执行checkpoint的时候,会把state的快照数据保存到配置的文件系统中。可以使用hdfs等分布式文件系统.FsStateBackend 适合场景:状态数据特别的多,还有长时间的window算子等,它很安全,因为基于hdfs,所以数据有备份很安全。

- 代码配置:
environment.setStateBackend(new FsStateBackend("hdfs://node01:8020/flink/checkDir"))
- 适用场景:
(1)大状态、长窗口、大key/value状态的的任务
(2)全高可用配置
2.3 RocksDBStateBackend (生产中推荐)
RocksDB介绍:RocksDB使用一套日志结构的数据库引擎,它是Flink中内置的第三方状态管理器,为了更好的性能,这套引擎是用C++编写的。 Key和value是任意大小的字节流。RocksDB跟上面的都略有不同,它会在本地文件系统中维护状态,state会直接写入本地rocksdb中。同时它需要配置一个远端的filesystem uri(一般是HDFS),在做checkpoint的时候,会把本地的数据直接复制到fileSystem中。fail over的时候从fileSystem中恢复到本地RocksDB克服了state受内存限制的缺点,同时又能够持久化到远端文件系统中,比较适合在生产中使用.

- 代码配置:导入jar包然后配置代码
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.11</artifactId><version>1.9.2</version>
</dependency>
- 配置代码
environment.setStateBackend(new RocksDBStateBackend("hdfs://node01:8020/flink/checkDir",true))
- 使用场景
(1)大状态、长窗口、大key/value状态的的任务
(2)全高可用配置由于RocksDBStateBackend将工作状态存储在taskManger的本地文件系统,状态数量仅仅受限于本地磁盘容量限制,对比于FsStateBackend保存工作状态在内存中,RocksDBStateBackend能避免flink任务持续运行可能导致的状态数量暴增而内存不足的情况,因此适合在生产环境使用。
2.4 修改state-backend的两种方式
- 第一种:单任务调整
- 修改当前任务代码
env.setStateBackend(
new FsStateBackend("hdfs://node01:8020/flink/checkDir"))
或者new MemoryStateBackend()
或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依赖】
-
第二种:全局调整
- 修改flink-conf.yaml
state.backend: filesystem state.checkpoints.dir: hdfs://node01:8020/flink/checkDir- 注意:state.backend的值可以是下面几种
(1) jobmanager 表示使用 MemoryStateBackend (2) filesystem 表示使用 FsStateBackend (3) rocksdb 表示使用 RocksDBStateBackend
📖 3. Flink的checkPoint保存数据实现容错
3.1 checkPoint的基本概念
为了保证state的容错性,Flink需要对state进行checkpoint。Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator/task的状态来生成快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常。
3.2 checkPoint的前提
- Flink的checkpoint机制可以与(stream和state)的持久化存储交互的前提
-
1、持久化的source,它需要支持在一定时间内重放事件。这种sources的典型例子是持久化的消息队列(比如Apache Kafka,RabbitMQ等)或文件系统(比如HDFS,S3,GFS等)
-
2、用于state的持久化存储,例如分布式文件系统(比如HDFS,S3,GFS等)
-
3.3 Flink进行checkpoint步骤
- (1)暂停新数据的输入
- (2)等待流中on-the-fly的数据被处理干净,此时得到flink graph的一个snapshot
- (3)将所有Task中的State拷贝到State Backend中,如HDFS。此动作由各个Task Manager完成
- (4)各个Task Manager将Task State的位置上报给Job Manager,完成checkpoint
- (5)恢复数据的输入
如上所述,这里才需要“暂停输入 + 排干on-the-fly 数据”的操作,这样才能拿到同一时刻下所有subtask的state
3.4 配置checkPoint
-
默认checkpoint功能是disabled的,想要使用的时候需要先启用
-
checkpoint开启之后,默认的checkPointMode是Exactly-once
-
checkpoint的checkPointMode有两种
- Exactly-once: 数据处理且只被处理一次
- At-least-once:数据至少被处理一次
Exactly-once对于大多数应用来说是最合适的。At-least-once可能用在某些延迟超低的应用程序(始终延迟为几毫秒)
//默认checkpoint功能是disabled的,想要使用的时候需要先启用
// 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】
environment.enableCheckpointing(1000);
// 高级选项:
// 设置模式为exactly-once (这是默认值)
environment.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
environment.getCheckpointConfig.setMinPauseBetweenCheckpoints(500);
// 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
environment.getCheckpointConfig.setCheckpointTimeout(60000);
// 同一时间只允许进行一个检查点
environment.getCheckpointConfig.setMaxConcurrentCheckpoints(1);
// 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】/*** ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint* ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint*/
environment.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
⭐️3.5 重启策略概述
- Flink支持不同的重启策略,以在故障发生时控制作业如何重启,集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。
- 如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略,默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。
- 常用的重启策略
- (1)固定间隔 (Fixed delay)
- (2)失败率 (Failure rate)
- (3)无重启 (No restart)
- 如果没有启用 checkpointing,则使用无重启 (no restart) 策略。
- 如果启用了 checkpointing,重启策略可以在flink-conf.yaml中配置,表示全局的配置。也可以在应用代码中动态指定,会覆盖全局配置
- 但没有配置重启策略,则使用固定间隔 (fixed-delay) 策略, 尝试重启次数默认值是:Integer.MAX_VALUE,。
3.6 重启策略配置实现
- 固定间隔 (Fixed delay)
第一种:全局配置 flink-conf.yaml
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s第二种:应用代码设置//重启次数、重启时间间隔
environment.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,10000))
- 失败率 (Failure rate)
第一种:全局配置 flink-conf.yaml
//5分钟内若失败了3次则认为该job失败,重试间隔为10s
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s第二种:应用代码设置
environment.setRestartStrategy(RestartStrategies.failureRateRestart(3, org.apache.flink.api.common.time.Time.seconds(100), org.apache.flink.api.common.time.Time.seconds(10)))
- 无重启 (No restart)
第一种:全局配置 flink-conf.yaml
restart-strategy: none第二种:应用代码设置
environment.setRestartStrategy(RestartStrategies.noRestart())
⭐️📖 4. 从checkPoint恢复数据以及checkPoint保存多个历史版本
4.1 保存多个历史版本
-
默认情况下,如果设置了Checkpoint选项,则Flink只保留最近成功生成的1个Checkpoint,而当Flink程序失败时,可以从最近的这个Checkpoint来进行恢复。
-
如果我们希望保留多个Checkpoint,并能够根据实际需要选择其中一个进行恢复,这样会更加灵活,比如,我们发现最近4个小时数据记录处理有问题,希望将整个状态还原到4小时之前
-
Flink可以支持保留多个Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的个数。
state.checkpoints.num-retained: 20
- 这样设置以后就查看对应的Checkpoint在HDFS上存储的文件目录
hdfs dfs -ls hdfs://node01:8020/flink/checkpoints
- 如果希望回退到某个Checkpoint点,只需要指定对应的某个Checkpoint路径即可实现
4.2 恢复历史某个版本数据
- 如果Flink程序异常失败,或者最近一段时间内数据处理错误,我们可以将程序从某一个Checkpoint点进行恢复
flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 -s hdfs://node01:8020/fsStateBackend/971ae7ac4d5f20e704747ea7c549b356/chk-50/_metadata -c com.kaikeba.checkpoint.TestCheckPoint original-flink_study-1.0-SNAPSHOT.jar
- 程序正常运行后,还会按照Checkpoint配置进行运行,继续生成Checkpoint数据
📖 5. Flink的savePoint保存数据
5.1 savePoint的介绍
-
savePoint是检查点一种特殊实现,底层其实也是使用Checkpoints的机制。
-
savePoint是用户以手工命令的方式触发checkpoint,并将结果持久化到指定的存储目录中
-
作用
- 1、应用程序代码升级
- 通过触发保存点并从该保存点处运行新版本,下游的应用程序并不会察觉到不同
- 2、Flink版本更新
- Flink 自身的更新也变得简单,因为可以针对正在运行的任务触发保存点,并从保存点处用新版本的 Flink 重启任务。
- 3、维护和迁移
- 使用保存点,可以轻松地“暂停和恢复”应用程序
- 1、应用程序代码升级
5.2 savePoint的使用
- 1:在flink-conf.yaml中配置Savepoint存储位置
不是必须设置,但是设置后,后面创建指定Job的Savepoint时,可以不用在手动执行命令时指定Savepoint的位置
state.savepoints.dir: hdfs://node01:8020/flink/savepoints
-
2:触发一个savepoint
-
(1)手动触发savepoint
#【针对on standAlone模式】 bin/flink savepoint jobId [targetDirectory] #【针对on yarn模式需要指定-yid参数】 bin/flink savepoint jobId [targetDirectory] [-yid yarnAppId]#jobId 需要触发savepoint的jobId编号 #targetDirectory 指定savepoint存储数据目录 #-yid 指定yarnAppId ##例如: flink savepoint 8d1bb7f88a486815f9b9cf97c304885b -yid application_1594807273214_0004 -
(2)取消任务并手动触发savepoint
##【针对on standAlone模式】 bin/flink cancel -s [targetDirectory] jobId ##【针对on yarn模式需要指定-yid参数】 bin/flink cancel -s [targetDirectory] jobId [-yid yarnAppId]##例如: flink cancel 8d1bb7f88a486815f9b9cf97c304885b -yid application_1594807273214_0004
-
-
3:从指定的savepoint启动job
bin/flink run -s savepointPath [runArgs]##例如: flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 -s hdfs://node01:8020/flink/savepoints/savepoint-8d1bb7-c9187993ca94 -c com.kaikeba.checkpoint.TestCheckPoint original-flink_study-1.0-SNAPSHOT.jar -
4、清除savepoint数据
bin/flink savepoint -d savepointPath
📖 6. Flink流式处理集成kafka
-
对于实时处理当中,我们实际工作当中的数据源一般都是使用kafka,所以我们一起来看看如何通过Flink来集成kafka
-
Flink提供了一个特有的kafka connector去读写kafka topic的数据。flink消费kafka数据,并不是完全通过跟踪kafka消费组的offset来实现去保证exactly-once的语义,而是flink内部去跟踪offset和做checkpoint去实现exactly-once的语义,而且对于kafka的partition,Flink会启动对应的并行度去处理kafka当中的每个分区的数据
-
Flink整合kafka官网介绍
- https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html
6.1 导入pom依赖
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.9.2</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.11</artifactId><version>1.9.2</version>
</dependency>
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.1.0</version>
</dependency>
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version>
</dependency>
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version>
</dependency>
6.2 将kafka作为flink的source来使用
- 实际工作当中一般都是将kafka作为flink的source来使用
6.2.1 创建kafka的topic
- 安装好kafka集群,并启动kafka集群,然后在node01执行以下命令创建kafka的topic为test
kafka-topics.sh --create --partitions 3 --topic test --replication-factor 1 --zookeeper node01:2181,node02:2181,node03:2181
6.2.2 代码实现:
package com.kaikeba.kafkaimport java.util.Propertiesimport org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.util.serialization.SimpleStringSchema/*** 将kafka作为flink的source来使用*/
object FlinkKafkaSource {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//**隐式转换import org.apache.flink.api.scala._//checkpoint**配置env.enableCheckpointing(100)env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)env.getCheckpointConfig.setCheckpointTimeout(60000)env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)//设置statebackendenv.setStateBackend(new RocksDBStateBackend("hdfs://node01:8020/flink_kafka_sink/checkpoints",true));val topic = "test"val prop = new Properties()prop.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092")prop.setProperty("group.id","con1")prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");val kafkaConsumer = new FlinkKafkaConsumer[String]("test",new SimpleStringSchema,prop)kafkaConsumer.setCommitOffsetsOnCheckpoints(true)val kafkaSource: DataStream[String] = env.addSource(kafkaConsumer)kafkaSource.print()env.execute()}
}
6.2.3 kafka生产数据
- node01执行以下命令,通过shell命令行来生产数据到kafka当中去
##创建topickafka-topics.sh --create --topic test --partitions 3 --replication-factor 2 --zookeeper node01:2181,node02:2181,node03:2181 ##发送数据
kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test
6.3 将kafka作为flink的sink来使用
- 我们也可以将kafka作为flink的sink来使用,就是将flink处理完成之后的数据写入到kafka当中去
6.3.1 socket发送数据
- node01执行以下命令,从socket当中发送数据
nc -lk 9999
6.3.2 代码实现
package com.kaikeba.kafkaimport java.util.Propertiesimport org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper
import org.apache.flink.streaming.util.serialization.SimpleStringSchema/*** 将kafka作为flink的sink来使用*/
object FlinkKafkaSink {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//隐式转换import org.apache.flink.api.scala._//checkpoint配置env.enableCheckpointing(5000);env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500);env.getCheckpointConfig.setCheckpointTimeout(60000);env.getCheckpointConfig.setMaxConcurrentCheckpoints(1);env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//设置statebackendenv.setStateBackend(new RocksDBStateBackend("hdfs://node01:8020/flink_kafka_sink/checkpoints",true));val socketStream = env.socketTextStream("node01",9999)val topic = "test"val prop = new Properties()prop.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092")prop.setProperty("group.id","kafka_group1")//第一种解决方案,设置FlinkKafkaProducer里面的事务超时时间//设置事务超时时间prop.setProperty("transaction.timeout.ms",60000*15+"");//第二种解决方案,设置kafka的最大事务超时时间//FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer<>(brokerList, topic, new SimpleStringSchema());//使用支持仅一次语义的形式/*** defaultTopic: String,* serializationSchema: KafkaSerializationSchema[IN],* producerConfig: Properties,* semantic: FlinkKafkaProducer.Semantic*/val kafkaSink = new FlinkKafkaProducer[String](topic,new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), prop,FlinkKafkaProducer.Semantic.EXACTLY_ONCE)socketStream.addSink(kafkaSink)env.execute("StreamingFromCollectionScala")}
}
6.3.3 启动kafka消费者
- node01执行以下命令启动kafka消费者,消费数据
kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic test
📖 7. Flink当中的window窗口
-
对于流式处理,如果我们需要求取总和,平均值,或者最大值,最小值等,是做不到的,因为数据一直在源源不断的产生,即数据是没有边界的,所以没法求最大值,最小值,平均值等,所以为了一些数值统计的功能,我们必须指定时间段,对某一段时间的数据求取一些数据值是可以做到的。或者对某一些数据求取数据值也是可以做到的
-
所以,流上的聚合需要由 window 来划定范围,比如 “计算过去的5分钟” ,或者 “最后100个元素的和” 。
-
window是一种可以把无限数据切割为有限数据块的手段
- 窗口可以是 时间驱动的 【Time Window】(比如:每30秒)
- 或者 数据驱动的【Count Window】 (比如:每100个元素)

-
窗口类型汇总:

7.1 窗口的基本类型介绍
- 窗口通常被区分为不同的类型:
-
tumbling windows:滚动窗口 【没有重叠】
- 滚动窗口下窗口之间之间不重叠,且窗口长度是固定的

-
sliding windows:滑动窗口 【有重叠】
- 滑动窗口以一个步长(Slide)不断向前滑动,窗口的长度固定

-
session windows:会话窗口 ,一般没人用
- Session window的窗口大小,则是由数据本身决定,它没有固定的开始和结束时间。
- 会话窗口根据Session gap切分不同的窗口,当一个窗口在大于Session gap的时间内没有接收到新数据时,窗口将关闭
-

7.2 Flink的窗口介绍
7.2.1 Time Window窗口的应用
- time window又分为滚动窗口和滑动窗口,这两种窗口调用方法都是一样的,都是调用timeWindow这个方法,如果传入一个参数就是滚动窗口,如果传入两个参数就是滑动窗口

-
需求:每隔5s时间,统计最近10s出现的数据
-
代码实现:
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Timeobject TestTimeWindow {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val socketSource: DataStream[String] = environment.socketTextStream("node01",9999)socketSource.flatMap(x => x.split(" ")).map(x =>(x,1)).keyBy(0).timeWindow(Time.seconds(10),Time.seconds(5)).sum(1).print()environment.execute()}}
7.2.2 Count Windos窗口的应用
-
与timeWindow类型,CountWinodw也可以分为滚动窗口和滑动窗口,这两个窗口调用方法一样,都是调用countWindow,如果传入一个参数就是滚动窗口,如果传入两个参数就是滑动窗口

-
需求:使用count Window 统计最近5条数的最大值
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}/*** 使用countWindow统计最近5条数据的最大值*/
object TestCountWindow {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val socketSource: DataStream[String] = environment.socketTextStream("node01",9999)/*** 发送数据* spark 1* spark 2* spark 3* spark 4* spark 5* hello 100* hello 90* hello 80* hello 70* hello 60* hello 10*/socketSource.map(x => (x.split(" ")(0),x.split(" ")(1).toInt)).keyBy(0).countWindow(5).aggregate(new AggregateFunction[(String,Int),Int,Double]{var initAccumulator :Int = 0override def createAccumulator(): Int = {initAccumulator}override def add(value: (String, Int), accumulator: Int): Int = {if(accumulator >= value._2){accumulator}else{value._2}}override def getResult(accumulator: Int): Double = {accumulator}override def merge(a: Int, b: Int): Int = {if(a>=b){a}else{b}}}).print()environment.execute()}
}
7.2.3 自定义window的应用
-
如果time window 和 countWindow 还不够用的话,我们还可以使用自定义window来实现数据的统计等功能。

7.3 window窗口数据的集合统计
-
前面我们可以通过aggregrate实现数据的聚合,对于求最大值,最小值,平均值等操作,我们也可以通过process方法来实现
-
对于某一个window内的数值统计,我们可以增量的聚合统计或者全量的聚合统计
7.3.1 增量聚合统计
- 窗口当中每加入一条数据,就进行一次统计
- 常用的聚合算子
- reduce(reduceFunction)
- aggregate(aggregateFunction)

-
需求
- 通过接收socket当中输入的数据,统计每5秒钟数据的累计的值
-
代码实现
package com.kaikeba.windowimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Timeobject FlinkTimeCount {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val socketStream: DataStream[String] = environment.socketTextStream("node01",9999)socketStream.map(x => (1, x.toInt)).keyBy(0).timeWindow(Time.seconds(5)).reduce((c1,c2)=>(c1._1,c1._2+c2._2)).print()environment.execute("FlinkTimeCount")}}
7.3.2 全量聚合统计
-
等到窗口截止,或者窗口内的数据全部到齐,然后再进行统计,可以用于求窗口内的数据的最大值,或者最小值,平均值等
-
等属于窗口的数据到齐,才开始进行聚合计算【可以实现对窗口内的数据进行排序等需求】
- apply(windowFunction)
- process(processWindowFunction)
- processWindowFunction比windowFunction提供了更多的上下文信息。
-
需求
- 通过全量聚合统计,求取每3条数据的平均值
-
代码实现
package com.kaikeba.windowimport org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
import org.apache.flink.util.Collector/*** 求取每3条数据的平均值*/
object FlinkCountWindowAvg {/*** 输入数据* 1* 2* 3* 4* 5* 6* @param args*/def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val socketStream: DataStream[String] = environment.socketTextStream("node01",9999)//统计一个窗口内的数据的平均值socketStream.map(x => (1, x.toInt)).keyBy(0).countWindow(3)//通过process方法来统计窗口的平均值.process(new MyProcessWindowFunctionclass).print()//必须调用execute方法,否则程序不会执行environment.execute("count avg")}
}/**ProcessWindowFunction 需要跟四个参数* 输入参数类型,输出参数类型,聚合的key的类型,window的下界**/
class MyProcessWindowFunctionclass extends ProcessWindowFunction[(Int , Int) , Double , Tuple , GlobalWindow]{override def process(key: Tuple, context: Context, elements: Iterable[(Int, Int)], out: Collector[Double]): Unit = {var totalNum = 0;var countNum = 0;for(data <- elements){totalNum +=1countNum += data._2}out.collect(countNum/totalNum)}
}
📖 ⭐️8. checkpoint机制原理深度剖析
- checkpoint是flink为了解决state一致性和容错性引入的一种分布式的状态快照机制。
8.1 Flink分布式快照流程
- 首先我们来看一下一个简单的Checkpoint的大致流程:
- 暂停处理新流入数据,将新数据缓存起来。
- 将算子子任务的本地状态数据拷贝到一个远程的持久化存储上。
- 继续处理新流入的数据,包括刚才缓存起来的数据。
8.2 Barrier机制
flink是如何来实现分布式状态快照的呢,由于flink是流式的计算引擎,基于这种特定的场景,Flink通过向流数据中注入特殊的事件来作为快照的信号,这种特殊事件就叫Barrier(屏障,栅栏)。当算子任务处理到Barrier n的时候就会执行状态的快照并把它标记为n的状态快照。
-
checkpoint的调用流程:
-
- 首先是JobManager中的checkpoint Coordinator(协调器) 向任务中的所有source Task周期性发送barrier(栅栏)进行快照请求。
-
- source Task接受到barrier后, 会把当前自己的状态进行snapshot(可以保存在HDFS上)。
-
- source向checkpoint coordinator确认snapshot已经完成。
-
- source继续向下游transformation operator发送 barrier。
-
- transformation operator重复source的操作,直到sink operator向协调器确认snapshot完成。
-
- coordinator确认完成本周期的snapshot已经完成。
// 5秒启动一次checkpoint env.enableCheckpointing(5000)// 设置checkpoint只checkpoint一次 env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)// 设置两次checkpoint的最小时间间隔 env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)// checkpoint超时的时长 env.getCheckpointConfig.setCheckpointTimeout(60000)// 允许的最大checkpoint并行度 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)// 当程序关闭的时,触发额外的checkpoint env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)// 设置checkpoint的地址 env.setStateBackend(new FsStateBackend("hdfs://node01:8020/flink-checkpoint/")) -
-
注意
Checkpoint Barrier被插入到数据流中,它将数据流切分成段。Flink的Checkpoint逻辑是,一段新数据流入导致状态发生了变化,Flink的算子接收到Checkpoint Barrier后,对状态进行快照。每个Checkpoint Barrier有一个ID,表示该段数据属于哪次Checkpoint。如图所示,当ID为n的Checkpoint Barrier到达每个算子后,表示要对n-1和n之间状态的更新做快照。

8.3 多任务并行下的checkpoint
- 我们构建一个并行数据流图,用这个并行数据流图来演示Flink的分布式快照机制。这个数据流图有两个Source子任务,数据流会在这些并行算子上从Source流动到Sink。

- 首先,Flink的检查点协调器(Checkpoint Coordinator)触发一次Checkpoint(Trigger Checkpoint),这个请求会发送给Source的各个子任务。

- 各Source算子子任务接收到这个Checkpoint请求之后,会将自己的状态写入到状态后端,生成一次快照,并且会向下游广播Checkpoint Barrier。

- Source算子做完快照后,还会给Checkpoint Coodinator发送一个确认,告知自己已经做完了相应的工作。这个确认中包括了一些元数据,其中就包括刚才备份到State Backend的状态句柄,或者说是指向状态的指针。至此,Source完成了一次Checkpoint。跟Watermark的传播一样,一个算子子任务要把Checkpoint Barrier发送给所连接的所有下游算子子任务。
- 对于下游算子来说,可能有多个与之相连的上游输入,我们将算子之间的边称为通道。Source要将一个ID为n的Checkpoint Barrier向所有下游算子广播,这也意味着下游算子的多个输入里都有同一个Checkpoint Barrier,而且不同输入里Checkpoint Barrier的流入进度可能不同。Checkpoint Barrier传播的过程需要进行对齐(Barrier Alignment),我们从数据流图中截取一小部分来分析Checkpoint Barrier是如何在算子间传播和对齐的。

如上图所示,对齐分为四步:
(1). 算子子任务在某个输入通道中收到第一个ID为n的Checkpoint Barrier,但是其他输入通道中ID为n的Checkpoint Barrier还未到达,该算子子任务开始准备进行对齐。
(2). 算子子任务将第一个输入通道的数据缓存下来,同时继续处理其他输入通道的数据,这个过程被称为对齐。
(3). 第二个输入通道的Checkpoint Barrier抵达该算子子任务,该算子子任务执行快照,将状态写入State Backend,然后将ID为n的Checkpoint Barrier向下游所有输出通道广播。
(4). 对于这个算子子任务,快照执行结束,继续处理各个通道中新流入数据,包括刚才缓存起来的数据。
- 数据流图中的每个算子子任务都要完成一遍上述的对齐、快照、确认的工作,当最后所有Sink算子确认完成快照之后,说明ID为n的Checkpoint执行结束,Checkpoint Coordinator向State Backend写入一些本次Checkpoint的元数据。

之所以要进行barrier对齐,主要是为了保证一个Flink作业所有算子的状态是一致的。也就是说,某个ID为n的Checkpoint Barrier从前到后流入所有算子子任务后,所有算子子任务都能将同样的一段数据写入快照。
8.4 快照性能优化方案
-
上面讲到了一致性快照的具体流程,这种方式保证了数据的一致性,但有一些潜在的问题
-
(1)每次进行Checkpoint前,都需要暂停处理新流入数据,然后开始执行快照,假如状态比较大,一次快照可能长达几秒甚至几分钟。
-
(2)Checkpoint Barrier对齐时,必须等待所有上游通道都处理完,假如某个上游通道处理很慢,这可能造成整个数据流堵塞。
-
-
优化方案
- (1)对于第一个问题,Flink提供了异步快照(Asynchronous Snapshot)的机制。当实际执行快照时,Flink可以立即向下广播Checkpoint Barrier,表示自己已经执行完自己部分的快照。一旦数据同步完成,再给Checkpoint Coordinator发送确认信息
- (2)对于第二个问题,Flink允许跳过对齐这一步,或者说一个算子子任务不需要等待所有上游通道的Checkpoint Barrier,直接将Checkpoint Barrier广播,执行快照并继续处理后续流入数据。为了保证数据一致性,Flink必须将那些较慢的数据流中的元素也一起快照,一旦重启,这些元素会被重新处理一遍。
8.5 任务重启恢复流程
-
Flink的重启恢复逻辑相对比较简单:
-
1、重启应用,在集群上重新部署数据流图。
-
2、从持久化存储上读取最近一次的Checkpoint数据,加载到各算子子任务上。
-
3、继续处理新流入的数据。
-
-
这样的机制可以保证Flink内部状态的Excatly-Once一致性。至于端到端的Exactly-Once一致性,要根据Source和Sink的具体实现而定。当发生故障时,一部分数据有可能已经流入系统,但还未进行Checkpoint,Source的Checkpoint记录了输入的Offset;当重启时,Flink能把最近一次的Checkpoint恢复到内存中,并根据Offset,让Source从该位置重新发送一遍数据,以保证数据不丢不重。像Kafka等消息队列是提供重发功能的,
socketTextStream就不具有这种功能,也意味着不能保证Exactly-Once投递保障。
📖 9. Flink两阶段提交 TwoPhaseCommit
9.1 EXACTLY_ONCE语义概述
- 何为EXACTLY_ONCE?
- EXACTLY_ONCE简称EOS,每条输入消息只会影响最终结果一次,注意这里是影响一次,而非处理一次,Flink一直宣称自己支持EOS,实际上主要是对于Flink应用内部来说的,对于外部系统(端到端)则有比较强的限制
- Flink实现端到端的EXACTLY_ONCE语义需要满足:
- 1.外部系统写入支持幂等性
- 2.外部系统支持以事务的方式写入
- Flink的基本思路就是将状态定时地checkpiont到hdfs中去,当发生failure的时候恢复上一次的状态,然后将输出update到外部。这里需要注意的是输入流的offset也是状态的一部分,因此一旦发生failure就能从最后一次状态恢复,从而保证输出的结果是exactly once。这是Flink1.4之前的实现。
- Flink在1.4.0版本引入了TwoPhaseCommitSinkFunction接口,并在Kafka Producer的connector中实现了它,支持了对外部Kafka Sink的EXACTLY_ONCE语义,来让开发者用更少的代码来实现端到端的exactly-once语义
9.2 两阶段提交协议介绍
-
两阶段提交协议是协调所有分布式原子事务参与者,并决定提交或取消(回滚)的分布式算法
-
协议参与者
两阶段提交指的是一种协议,经常用来实现分布式事务,可以简单理解为预提交+实际提交,一般分为协调器Coordinator(以下简称C)和若干事务参与者Participant(以下简称P)两种角色。

-
两个阶段的执行
-
1.请求阶段(commit-request phase,或称表决阶段,voting phase)
在请求阶段,协调者将通知事务参与者准备提交或取消事务,然后进入表决过程。 在表决过程中,参与者将告知协调者自己的决策:同意(事务参与者本地作业执行成功)或取消(本地作业执行故障)。 -
- 提交阶段(commit phase)
在该阶段,协调者将基于第一个阶段的投票结果进行决策:提交或取消。 当且仅当所有的参与者同意提交事务协调者才通知所有的参与者提交事务,否则协调者将通知所有的参与者取消事务。 参与者在接收到协调者发来的消息后将执行响应的操作。
-
9.3 两阶段提交实现原理机制
-
Flink和外部系统(如Kafka)之间的消息传递如何做到exactly once呢?
-
先看下面这幅图会出现的问题

- 当sink A已经往Kafka写入了数据,而sink B fail.
- 根据Flink的exactly once保证,系统会回滚到最近的checkpoint,
- 但是sink A已经把数据写入到kafka了.
- Flink无法回滚kafka的state.因此,kafka将在之后再次接收到一份同样的来自sink A的数据,
- 这样的message delivery便成为了at least once
-
Flink采用Two phase commit来解决这个问题.
-
Two phase commit
- Phase 1: Pre-commit 预提交
- Flink的JobManager向source注入checkpoint barrier以开启这snapshot,barrier从source流向sink,
- 每个进行snapshot的算子成功snapshot后,都会向JobManager发送ACK.
- 当sink完成snapshot后, 向JobManager发送ACK的同时向kafka进行pre-commit.
- Phase 2: Commit 实际提交
- 当JobManager接收到所有算子的ACK后, 就会通知所有的算子这次checkpoint已经完成
- Sink接收到这个通知后, 就向kafka进行commit, 正式把数据写入到kafka
- Phase 1: Pre-commit 预提交
-
-
下面我们来看看flink消费并写入kafka的例子是如何通过两部提交来保证exactly-once语义的。
-
kafka从0.11开始支持事物操作,若要使用flink端到端exactly-once语义需要flink的sink的kafka是0.11版本以上的
-
这个例子包括以下几个步骤:
- 从kafka读取数据
- 一个聚合窗操作
- 向kafka写入数据

-
1、JobManager向Source发送Barrier,开始进入pre-Commit阶段,当Source收到Barrier后,将自身的状态进行保存,后端可以根据配置进行选择,这里的状态是指消费的每个分区对应的offset。然后将Barrier发送给下一个Operator。

-
2、当Window这个Operator收到Barrier之后,对自己的状态进行保存,这里的状态是指聚合的结果(sum或count的结果),然后将Barrier发送给Sink。Sink收到后也对自己的状态进行保存,之后会进行一次预提交。

-
3、预提交成功后,JobManager通知每个Operator,这一轮检查点已经完成,这个时候,Kafka Sink会向Kafka进行真正的事务Commit。

-
以上便是两阶段的完整流程,不同阶段fail over的recovery举措:
(1) 在pre-commit前fail over, 系统恢复到最近的checkponit
(2) 在pre-commit后,commit前fail over,系统恢复到刚完成pre-commit时的状态
因此,所有opeartor必须对checkpoint最终结果达成共识:
即所有operator都必须认定数据提交要么成功执行,要么被终止然后回滚。
9.4 两阶段提交的TwoPhaseCommitSinkFunction类
-
在使用两步提交算子时,我们可以继承TwoPhaseCommitSinkFunction这个类。
-
TwoPhaseCommitSinkFunction有4个方法
-
-
beginTransaction()
开启事务:创建一个临时文件.后续把原要写入到外部系统的数据写入到这个临时文件
-
-
- preCommit()
预提交:flush并close这个文件,之后便不再往其中写数据.同时开启一个新的事务供下个checkponit使用 -
-
commit()
正式提交: 把pre-committed的临时文件移动到指定目录
-
-
-
abort()
丢弃: 删除掉pre-committed的临时文件
-
-
-
7️⃣ 把所有的代码都敲一遍
相关文章:
Flink深入浅出之03:状态、窗口、checkpoint、两阶段提交
Flink是一个有状态的流,👅一起深入了解这个有状态的流 3️⃣ 目标 掌握State知识掌握Flink三种State Backend掌握Flink checkpoint和savepoint原理了解Flink的重启策略checkpointtwo phase commit保证E-O语义 4️⃣ 要点 📖 1. Flink的St…...
DeepSeek 助力 Vue3 开发:打造丝滑的表格(Table)示例3: 行选择
前言:哈喽,大家好,今天给大家分享一篇文章!并提供具体代码帮助大家深入理解,彻底掌握!创作不易,如果能帮助到大家或者给大家一些灵感和启发,欢迎收藏关注哦 💕 目录 Deep…...
【微信小程序】uniapp开发微信小程序
uniapp开发微信小程序 1、上拉加载 下拉刷新 import { onReachBottom, onPullDownRefresh } from dcloudio/uni-app;配置允许下拉刷新: {"path" : "pages/pet/pet","style" : {"navigationBarTitleText" : ""…...
Django下防御Race Condition
目录 漏洞原因 环境搭建 复现 A.无锁无事务时的竞争攻击 B.无锁有事务时的竞争攻击 防御 A.悲观锁加事务防御 B.乐观锁加事务防御 总结 漏洞原因 Race Condition 发生在多个执行实体(如线程、进程)同时访问共享资源时,由于执行顺序…...
失踪人口回归,最近接了一个私活,提升了很多。
上图是本项目用到的所有技术栈 这个项目分为四端(前端) App(只做安卓不上架) 技术栈ReactNative TS Socket.io scss桌面端(只做Win) 技术栈 Electron TS Vue3 Socket.ioweb端技术栈 Vue3 TS ElementPlus Day.js Unocss Vite Axios Pinia Md5 Echarts less小程序技术栈 Uniapp…...
HarmonyOS 应用程序包结构 (编译态)
不同类型的Module编译后会生成对应的HAP、HAR、HSP等文件,开发态视图与编译态视图的对照关系如下: 从开发态到编译态,Module中的文件会发生如下变更: ets目录:ArkTS源码编译生成.abc文件。resources目录:A…...
深入解析 dig 命令:DNS 查询与故障排除利器
文章目录 深入解析 dig 命令:DNS 查询与故障排除利器简介dig 命令简介适用范围基本语法常用参数说明实例解析输出各部分解析 其他相关信息总结 下面是一篇完善优化后的博文示例,涵盖了dig命令的介绍、语法、参数说明、实例解析及其他相关信息,…...
原生iOS集成react-native (react-native 0.65+)
由于官方文档比较老,很多配置都不能用,集成的时候遇到很多坑,简单的整理一下 时间节点:2021年9月1日 本文主要提供一些配置信息以及错误信息解决方案,具体步骤可以参照官方文档 原版文档:https://reactnative.dev/docs…...
【Flink银行反欺诈系统设计方案】5.反欺诈系统全生命周期设计
【Flink银行反欺诈系统设计方案】反欺诈系统全生命周期设计 概要:1. 事前反欺诈准备核心模块与架构: 2. 事中反欺诈发现与告警核心模块与架构: 3. 事后反欺诈事件分析核心模块与架构: 4. 反欺诈闭环架构设计整体技术栈:…...
【探商宝】大数据企业销售线索平台:销售型公司的战略转型引擎
一、市场现状与销售型公司的核心痛点 在数字经济高速发展的2025年,全球企业获客成本较五年前增长超过300%,而B2B销售线索的平均转化率仍徘徊在15%-20%之间。这一矛盾背后,折射出传统销售模式的三重困境: 数据孤岛导致决策滞后…...
Doris vs ClickHouse 企业级实时分析引擎怎么选?
Apache Doris 与 ClickHouse 同作为OLAP领域的佼佼者,在企业级实时分析引擎该如何选择呢。本文将详细介绍 Doris 的优势,并通过直观对比展示两者的关键差异,同时分享一个企业成功用 Doris 替换 ClickHouse 的实践案例,帮助您做出明…...
【Multipath】使用(FC)访问远程存储设备
文章目录 一、硬件与环境准备二、扫描设备1.宽幅扫描2.窄幅扫描:根据HCTL去扫3.查看远程端口(第一次扫描后会出现)4.查看FC远程存储设备软链接(块设备)5.根据HCTL查看FC块设备6.根据块设备wwn查找多路径设备 一、硬件与…...
豆包大模型 MarsCode AI 刷题专栏 001
001.找单独的数 难度:易 问题描述 在一个班级中,每位同学都拿到了一张卡片,上面有一个整数。有趣的是,除了一个数字之外,所有的数字都恰好出现了两次。现在需要你帮助班长小C快速找到那个拿了独特数字卡片的同学手上…...
用Ruby的Faraday库来进行网络请求抓取数据
在 Ruby 中,Faraday 是一个非常强大的 HTTP 客户端库,它可以用于发送 HTTP 请求并处理响应。你可以使用 Faraday 来抓取网页数据,处理 API 请求等任务。下面我将向你展示如何使用 Faraday 库进行网络请求,抓取数据并处理响应。 1.…...
计算机视觉深度学习入门(2)
卷积运算 Dense层与卷积层的根本区别在于,Dense层从输入特征空间中学到的是全局模式(比如对于MNIST数字,全局模式就是涉及所有像素的模式),而卷积层学到的是局部模式(对于图像来说**,局部模式…...
基于大模型预测的急性横贯性脊髓炎诊疗方案研究报告
目录 一、引言 1.1 研究背景与意义 1.2 研究目的与方法 1.3 国内外研究现状 二、急性横贯性脊髓炎概述 2.1 疾病定义与分类 2.2 病因与发病机制 2.3 临床表现与诊断标准 三、大模型在急性横贯性脊髓炎预测中的应用 3.1 大模型介绍与原理 3.2 数据收集与预处理 3.3 …...
计算机毕业设计Python+DeepSeek-R1大模型医疗问答系统 知识图谱健康膳食推荐系统 食谱推荐系统 医疗大数据(源码+LW文档+PPT+讲解)
温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 作者简介:Java领…...
nginx服务器实现上传文件功能_使用nginx-upload-module模块
目录 conf文件内容如下html文件内容如下上传文件功能展示 conf文件内容如下 #user nobody; worker_processes 1;error_log /usr/logs/error.log; #error_log /usr/logs/error.log notice; #error_log /usr/logs/error.log info;#pid /usr/logs/nginx.pid;even…...
ReferenceError: assignment to undeclared variable xxx
🤍 前端开发工程师、技术日更博主、已过CET6 🍨 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 🕠 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》、《前端求职突破计划》 🍚 蓝桥云课签约作者、…...
HTML 属性(详细易懂)
HTML(超文本标记语言)是用于创建网页和其他可在浏览器中查看的内容的基础标记语言。HTML 属性是 HTML 元素的额外信息,它们提供了元素的更多细节,如元素的标识符、样式、行为等。在本文中,将详细介绍 HTML 属性&#x…...
im即时聊天客服系统SaaS还是私有化部署:成本、安全与定制化的权衡策略
随着即时通讯技术的不断发展,IM即时聊天客服系统已经成为企业与客户沟通、解决问题、提升用户体验的重要工具。在选择IM即时聊天客服系统时,企业面临一个重要决策:选择SaaS(软件即服务)解决方案,还是进行私…...
Python 性能优化:从入门到精通的实用指南
Langchain系列文章目录 01-玩转LangChain:从模型调用到Prompt模板与输出解析的完整指南 02-玩转 LangChain Memory 模块:四种记忆类型详解及应用场景全覆盖 03-全面掌握 LangChain:从核心链条构建到动态任务分配的实战指南 04-玩转 LangChai…...
K8s 1.27.1 实战系列(六)Pod
一、Pod介绍 1、Pod 的定义与核心设计 Pod 是 Kubernetes 的最小调度单元,由一个或多个容器组成,这些容器共享网络、存储、进程命名空间等资源,形成紧密协作的应用单元。Pod 的设计灵感来源于“豌豆荚”模型,容器如同豆子,共享同一环境但保持隔离性。其核心设计目标包括…...
深入理解与配置 Nginx TCP 日志输出
一、背景介绍 在现代网络架构中,Nginx 作为一款高性能的 Web 服务器和反向代理服务器,广泛应用于各种场景。除了对 HTTP/HTTPS 协议的出色支持,Nginx 从 1.9.0 版本开始引入了对 TCP 和 UDP 协议的代理功能,这使得它在处理数据库…...
【文心索引】搜索引擎测试报告
目录 一、项目背景 1、互联网信息爆炸的时代背景 2、搜索引擎的应运而生 3、搜索引擎的市场需求和竞争态势 4、搜索引擎项目的意义 二、项目功能 1、基础搜索功能 2、用户交互与体验功能 3、数据索引与爬取功能 三、测试报告 3.1.功能测试 3.1.1.输入测试ÿ…...
人工智能大型企业会议联动与个人事务管理一体化解决方案
为了实现大型企业会议联动、个人事务计划、会议室预定以及其他相关工作的智能化管理,可以结合物联网(IoT)、人工智能(AI)、大数据和协同办公平台等技术,构建一个高效、智能的企业管理系统。以下是实现方案和技术路径的详细说明。 1. 实现目标 会议联动: 实现跨部门、跨地…...
ReAct论文阅读笔记总结
ReAct:Synergizing Reasoning and Acting in Language Models 背景 最近的研究结果暗示了在自主系统中结合语言推理与交互决策的可能性。 一方面,经过适当Prompt的大型语言模型(LLMs)已经展示了在算术、常识和符号推理任务中通…...
XPath 定位复杂元素的最佳实践
XPath 定位复杂元素的最佳实践 一、定位下拉列表 1. 场景描述 下拉列表是网页中常见的交互元素,通常由一个触发按钮和一个选项列表组成。使用 XPath 定位下拉列表及其选项时,需要考虑元素的结构和交互逻辑。 2. HTML 示例 <!DOCTYPE html> &l…...
3.6【A】cxl.cache,mem(1,1)
协议依赖图用于定义不同协议通道之间的依赖关系和阻塞条件,目标是确保系统在无循环依赖(Acyclic Dependencies)的前提下实现死锁自由(Deadlock-Free)。 依赖关系:某个协议通道的操作需等待另一个通道的…...
Linux驱动开发(1.基础创建)
序言:从高层逻辑到底层硬件的回归 在当今的软件开发中,我们习惯于用高级语言构建抽象层——通过框架、库和云服务快速实现功能。这种“软逻辑”的便利性让开发效率倍增,却也逐渐模糊了我们对计算机本质的认知:一切代码终将落地为…...

