当前位置: 首页 > news >正文

Flink State 和 Fault Tolerance详解

有状态操作或者操作算子在处理DataStream的元素或者事件的时候需要存储计算的中间状态,这就使得状态在整个Flink的精细化计算中有着非常重要的地位:

  • 记录数据从某一个过去时间点到当前时间的状态信息。
  • 以每分钟/小时/天汇总事件时,状态将保留待处理的汇总记录。
  • 在训练机器学习模型时,状态将保持当前版本的模型参数。

Flink在管理状态方面,使用Checkpoint和Savepoint实现状态容错。Flink的状态在计算规模发生变化的时候,可以自动在并行实例间实现状态的重新分发,底层使用State Backend策略存储计算状态,State Backend决定了状态存储的方式和位置(后续章节介绍)。

Flink在状态管理中将所有能操作的状态分为Keyed StateOperator State,其中Keyed State类型的状态同key一一绑定,并且只能在KeyedStream中使用。所有non-KeyedStream状态操作都叫做Operator State。Flink在底层做状态管理时,将Keyed State和<parallel-operator-instance, key>关联,由于某一个key仅仅落入其中一个operator-instance中,因此可以简单的理解Keyed State是和<operator,key>进行绑定的,采用Key Group机制对Keyed State进行管理或者分类,所有的keyed-operator在做状态操作的时候可能需要和1~n个Key Group进行交互。

Flink在分发Keyed State状态的时候,不是以key为单位,而是以Key Group为最小单元分发

Operator State (也称为 non-keyed state),每一个operator state 会和一个parallel operator instance进行绑定。Keyed StateOperator State 以两种形式存在( managed(管理)和 raw(原生)),所有Flink已知的操作符都支持Managed State,但是Raw State仅仅在用户自定义Operator时使用,并且不支持在并行度发生变化的时候重新分发状态,因此,虽然Flink支持Raw State,但是在绝大多数的应用场景下,一般使用的都是Managed State。

Keyed State

Keyed-state接口提供对不同类型状态的访问,所有状态都限于当前输入元素的key。

类型说明方法
ValueState这个状态主要存储一个可以用作更新的值update(T)
T value()
clear()
ListState这将存储List集合元素add(T)
addAll(List)
Iterable get()
update(List)
clear()
ReducingState这将保留一个值,该值表示添加到状态的所有值的汇总
需要用户提供ReduceFunction
add(T)
T get()
clear()
AggregatingState<IN, OUT>这将保留一个值,该值表示添加到状态的所有值的汇总
需要用户提供AggregateFunction
add(IN)
T get()
clear()
FoldingState<T, ACC>这将保留一个值,该值表示添加到状态的所有值的汇总
需要用户提供FoldFunction
add(IN)
T get()
clear()
MapState<UK, UV>这个状态会保留一个Map集合元素put(UK, UV)
putAll(Map<UK, UV>)
entries()
keys()
values()
clear()

ValueSate

var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.map(new RichMapFunction[(String,Int),(String,Int)] {var vs:ValueState[Int]=_override def open(parameters: Configuration): Unit = {val vsd=new ValueStateDescriptor[Int]("valueCount",createTypeInformation[Int])vs=getRuntimeContext.getState[Int](vsd)}override def map(value: (String, Int)): (String, Int) = {val histroyCount = vs.value()val currentCount=histroyCount+value._2vs.update(currentCount)(value._1,currentCount)}
}).print()
env.execute("wordcount")

AggregatingState<IN, OUT>

var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.map(_.split("\\s+"))
.map(ts=>(ts(0),ts(1).toInt))
.keyBy(0)
.map(new RichMapFunction[(String,Int),(String,Double)] {var vs:AggregatingState[Int,Double]=_override def open(parameters: Configuration): Unit = {val vsd=new AggregatingStateDescriptor[Int,(Double,Int),Double]("avgCount",new AggregateFunction[Int,(Double,Int),Double] {override def createAccumulator(): (Double, Int) = {(0.0,0)}override def add(value: Int, accumulator: (Double, Int)): (Double, Int) = {(accumulator._1+value,accumulator._2+1)}override def merge(a: (Double, Int), b: (Double, Int)): (Double, Int) = {(a._1+b._1,a._2+b._2)}override def getResult(accumulator: (Double, Int)): Double = {accumulator._1/accumulator._2}},createTypeInformation[(Double,Int)])vs=getRuntimeContext.getAggregatingState(vsd)}override def map(value: (String, Int)): (String, Double) = {vs.add(value._2)val avgCount=vs.get()(value._1,avgCount)}
}).print()
env.execute("wordcount")

MapState<UK, UV>

var env=StreamExecutionEnvironment.getExecutionEnvironment
//001 zs 202.15.10.12 日本 2019-10-10
env.socketTextStream("centos",9999)
.map(_.split("\\s+"))
.map(ts=>Login(ts(0),ts(1),ts(2),ts(3),ts(4)))
.keyBy("id","name")
.map(new RichMapFunction[Login,String] {var vs:MapState[String,String]=_override def open(parameters: Configuration): Unit = {val msd=new MapStateDescriptor[String,String]("mapstate",createTypeInformation[String],createTypeInformation[String])vs=getRuntimeContext.getMapState(msd)}override def map(value: Login): String = {println("历史登录")for(k<- vs.keys().asScala){println(k+" "+vs.get(k))}var result=""if(vs.keys().iterator().asScala.isEmpty){result="ok"}else{if(!value.city.equalsIgnoreCase(vs.get("city"))){result="error"}else{result="ok"}}vs.put("ip",value.ip)vs.put("city",value.city)vs.put("loginTime",value.loginTime)result}
}).print()
env.execute("wordcount")

总结

new Rich[Map|FaltMap]Function {var vs:XxxState=_ //状态声明override def open(parameters: Configuration): Unit = {val xxd=new XxxStateDescription //完成状态的初始化vs=getRuntimeContext.getXxxState(xxd)}override def xxx(value: Xx): Xxx = {//状态操作}
}
  • ValueState<T> getState(ValueStateDescriptor<T>)
  • ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
  • ListState<T> getListState(ListStateDescriptor<T>)
  • AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
  • FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
  • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

State Time-To-Live(TTL)

基本使用

可以将state存活时间(TTL)分配给任何类型的keyed-state,如果配置了TTL且状态值已过期,则Flink将尽力清除存储的历史状态值。

import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Timeval ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build
val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)
  • 案例
var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.map(new RichMapFunction[(String,Int),(String,Int)] {var vs:ValueState[Int]=_override def open(parameters: Configuration): Unit = {val vsd=new ValueStateDescriptor[Int]("valueCount",createTypeInformation[Int])val ttlConfig = StateTtlConfig.newBuilder(Time.seconds(5)) //过期时间5s.setUpdateType(UpdateType.OnCreateAndWrite)//创建和修改的时候更新过期时间.setStateVisibility(StateVisibility.NeverReturnExpired)//永不返回过期的数据.build()vsd.enableTimeToLive(ttlConfig)vs=getRuntimeContext.getState[Int](vsd)}override def map(value: (String, Int)): (String, Int) = {val histroyCount = vs.value()val currentCount=histroyCount+value._2vs.update(currentCount)(value._1,currentCount)}
}).print()
env.execute("wordcount")

注意:开启TTL之后,系统会额外消耗内存存储时间戳(Processing Time),如果用户以前没有开启TTL配置,在启动之前修改代码开启了TTL,在做状态恢复的时候系统启动不起来,会抛出兼容性失败以及StateMigrationException的异常。

清除Expired State

在默认情况下,仅当明确读出过期状态时,通过调用ValueState.value()方法才会清除过期的数据,这意味着,如果系统一直未读取过期的状态,则不会将其删除,可能会导致存储状态数据的文件持续增长。

Cleanup in full snapshot

系统会从上一次状态恢复的时间点,加载所有的State快照,在加载过程中会剔除那些过期的数据,这并不会影响磁盘已存储的状态数据,该状态数据只会在Checkpoint的时候被覆盖,但是依然解决不了在运行时自动清除过期且没有用过的数据。

import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.time.Time
val ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupFullSnapshot.build

只能用于memory或者snapshot状态的后端实现,不支持RocksDB State Backend。

Cleanup in background

可以开启后台清除策略,根据State Backend采取默认的清除策略(不同状态的后端存储,清除策略不同)

import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInBackground
.build
  • Incremental cleanup(基于内存backend)
import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig.newBuilder(Time.seconds(5))
.setUpdateType(UpdateType.OnCreateAndWrite)
.setStateVisibility(StateVisibility.NeverReturnExpired)
.cleanupIncrementally(100,true) //默认值 5 | false
.build()

第一个参数表示每一次触发cleanup的时候,系统会一次处理100个元素。第二个参数是false,表示只要用户对任意一个state进行操作,系统都会触发cleanup策略;第二个参数是true,表示只要系统接收到记录数(即使用户没有操作状态)就会触发cleanup策略。

  • RocksDB compaction

RocksDB是一个嵌入式的key-value存储,其中key和value是任意的字节流,底层进行异步压缩,会将key相同的数据进行compact(压缩),以减少state文件大小,但是并不对过期的state进行清理,因此可以通过配置compactFilter,让RocksDB在compact的时候对过期的state进行排除,RocksDB数据库的这种过滤特性,默认关闭,如果想要开启,可以在flink-conf.yaml中配置 state.backend.rocksdb.ttl.compaction.filter.enabled:true 或者在应用程序的API里设置RocksDBStateBackend::enableTtlCompactionFilter。

在这里插入图片描述

import org.apache.flink.api.common.state.StateTtlConfig 
val ttlConfig = StateTtlConfig.newBuilder(Time.seconds(5))
.setUpdateType(UpdateType.OnCreateAndWrite)
.setStateVisibility(StateVisibility.NeverReturnExpired)
.cleanupInRocksdbCompactFilter(1000) //默认配置1000
.build()

这里的1000表示,系统在做Compact的时候,会检查1000个元素是否失效,如果失效,则清除该过期数据。

Operator State

如果用户想要使用Operator State,只需要实现通用的CheckpointedFunction 接口或者ListCheckpointed<T extends Serializable>,值得注意的是,目前的operator-state仅仅支持list-style风格的状态,要求所存储的状态必须是一个List,且其中的元素必须可以序列化。

CheckpointedFunction

提供两种不同的状态分发方案:Even-splitUnion

void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
  • snapshotState():调用checkpoint()的时候,系统会调用snapshotState()对状态做快照
  • initializeState():第一次启动或者从上一次状态恢复的时候,系统会调用initializeState()

Even-split:表示系统在故障恢复时,会将operator-state的元素均分给所有的operator实例,每个operator实例将获取到整个operator-state的sub-list数据。

Union:表示系统在故障恢复时,每一个operator实例可以获取到整个operator-state的全部数据。

案例

class BufferingSink(threshold: Int = 0) extends SinkFunction[(String, Int)]  with CheckpointedFunction  {var listState:ListState[(String,Int)]=_val bufferedElements = ListBuffer[(String, Int)]()//负责将数据输出到外围系统override def invoke(value: (String, Int)): Unit = {bufferedElements += valueif(bufferedElements.size == threshold){for(ele <- bufferedElements){println(ele)}bufferedElements.clear()}}//是在savepoint|checkpoint时候将数据持久化override def snapshotState(context: FunctionSnapshotContext): Unit = {listState.clear()for(ele <- bufferedElements){listState.add(ele)}}//状态恢复|初始化 创建状态override def initializeState(context: FunctionInitializationContext): Unit = {val lsd = new ListStateDescriptor[(String, Int)]("buffered-elements",createTypeInformation[(String,Int)])listState=context.getOperatorStateStore.getListState(lsd)if(context.isRestored){for(element <- listState.get().asScala) {bufferedElements += element}}}
}
var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.addSink(new BufferingSink(5))
env.execute("testoperatorstate")
  • 启动netcat服务
[root@centos ~]# nc -lk 9999
  • 提交任务

在这里插入图片描述

注意,将并行度设置为1,方便测试

  • 在netcat中输入以下数据
[root@centos ~]# nc -lk 9999
a1 b1 c1 d1
  • 取消任务,并且创建savepoint
[root@centos flink-1.8.1]# ./bin/flink list -m centos:8081
------------------ Running/Restarting Jobs -------------------
17.10.2019 09:49:20 : f21795e74312eb06fbf0d48cb8d90489 : testoperatorstate (RUNNING)
--------------------------------------------------------------
[root@centos flink-1.8.1]# ./bin/flink cancel -m centos:8081 -s hdfs:///savepoints f21795e74312eb06fbf0d48cb8d90489
Cancelling job f21795e74312eb06fbf0d48cb8d90489 with savepoint to hdfs:///savepoints.
Cancelled job f21795e74312eb06fbf0d48cb8d90489. Savepoint stored in hdfs://centos:9000/savepoints/savepoint-f21795-38e7beefe07b.

注意,如果Flink需要和Hadoop整合,必须保证在当前环境变量下有HADOOP_HOME|HADOOP_CALSSPATH

[root@centos flink-1.8.1]# cat /root/.bashrc
HADOOP_HOME=/usr/hadoop-2.9.2
JAVA_HOME=/usr/java/latest
PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
CLASSPATH=.
export JAVA_HOME
export PATH
export CLASSPATH
export HADOOP_HOME
HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CLASSPATH
  • 测试状态

在这里插入图片描述

ListCheckpointed

ListCheckpointed接口是CheckpointedFunction接口的一种变体形式,仅仅支持Even-split状态的分发策略。

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;
  • snapshotState():调用checkpoint()的时候,系统会调用snapshotState()对状态做快照
  • restoreState():等价于上述CheckpointedFunction中声明的initializeState()方法,用作状态恢复

案例

import java.lang.{Long => JLong} //修改类别名
import scala.{Long => SLong} //修改类别名
class CustomStatefulSourceFunction extends ParallelSourceFunction[SLong] with ListCheckpointed[JLong]{@volatilevar isRunning:Boolean = truevar offset = 0Loverride def run(ctx: SourceFunction.SourceContext[SLong]): Unit = {val lock = ctx.getCheckpointLockwhile(isRunning){Thread.sleep(1000)lock.synchronized({ctx.collect(offset)offset += 1})}}override def cancel(): Unit = {isRunning=false}override def snapshotState(checkpointId: Long, timestamp: Long): util.List[JLong] = {Collections.singletonList(offset) //存储的是 当前source的偏移量,如果状态不可拆分,用户可以使Collections.singletonList}override def restoreState(state: util.List[JLong]): Unit = {for (s <- state.asScala) {offset = s}}
}
var env=StreamExecutionEnvironment.getExecutionEnvironment
env.addSource[Long](new CustomStatefulSourceFunction)
.print("offset:")
env.execute("testOffset")

广播状态

支持Operator State的第三种类型是广播状态。引入广播状态以支持用例,其中需要将来自一个流的某些数据广播到所有下游任务,广播的状态将存储在本地,用于处理另一个流上所有传入的元素。

A third type of supported operator state is the Broadcast State. Broadcast state was introduced to support use cases where some data coming from one stream is required to be broadcasted to all downstream tasks, where it is stored locally and is used to process all incoming elements on the other stream.

non-keyed

import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.util.Collector
import scala.collection.JavaConverters._
class UserBuyPathBroadcastProcessFunction(msd:MapStateDescriptor[String,Int]) extends BroadcastProcessFunction[UserBuyPath,Rule,String]{//处理的是UserBuyPath,读取广播状态override def processElement(value: UserBuyPath,ctx: BroadcastProcessFunction[UserBuyPath, Rule, String]#ReadOnlyContext,out: Collector[String]): Unit = {val broadcastState = ctx.getBroadcastState(msd)if(broadcastState.contains(value.channel)){//如果有规则,尝试计算val threshold= broadcastState.get(value.channel)if(value.path >= threshold){//将满足条件的用户信息输出out.collect(value.id+" "+value.name+" "+value.channel+" "+value.path)}}}//处理的是规则 Rule 数据 ,记录修改广播状态override def processBroadcastElement(value: Rule, ctx: BroadcastProcessFunction[UserBuyPath, Rule, String]#Context,out: Collector[String]): Unit = {val broadcastState = ctx.getBroadcastState(msd)broadcastState.put(value.channel,value.threshold)//更新状态println("=======rule======")for(entry <- broadcastState.entries().asScala){println(entry.getKey+"\t"+entry.getValue)}println()println()}
}
var env=StreamExecutionEnvironment.getExecutionEnvironment
// id name channel action
// 001 mack 手机 view
// 001 mack 手机 view
// 001 mack 手机 addToCart
// 001 mack 手机 buy
val userStream = fsEnv.socketTextStream("centos", 9999).map(line => line.split("\\s+")).map(ts => UserAction(ts(0), ts(1), ts(2), ts(3))).keyBy("id", "name").map(new UserActionRichMapFunction)val msd=new MapStateDescriptor[String,Int]("braodcast-sate",createTypeInformation[String],createTypeInformation[Int])
// channel 阈值
// 手机类   10
val broadcastStream: BroadcastStream[Rule] = fsEnv.socketTextStream("centos", 8888).map(line => line.split("\\s+")).map(ts => Rule(ts(0), ts(1).toInt)).broadcast(msd)userStream.connect(broadcastStream)
.process(new UserBuyPathBroadcastProcessFunction(msd))
.print()
env.execute("testoperatorstate")
case class Rule(channel:String,threshold:Int)
case class UserAction(id:String,name:String ,channel:String,action:String)
case class UserBuyPath(id:String,name:String,channel:String,path:Int)
class UserActionRichMapFunction extends RichMapFunction[UserAction,UserBuyPath]{var buyPathState:MapState[String,Int]=_override def open(parameters: Configuration): Unit = {val msd= new MapStateDescriptor[String,Int]("buy-path",createTypeInformation[String],createTypeInformation[Int])buyPathState=getRuntimeContext.getMapState(msd)}override def map(value: UserAction): UserBuyPath = {val channel = value.channelvar path=0if(buyPathState.contains(channel)){path=buyPathState.get(channel)}if(value.action.equals("buy")){buyPathState.remove(channel)}else{buyPathState.put(channel,path+1)}UserBuyPath(value.id,value.name,value.channel,buyPathState.get(channel))}
}

keyed

class UserBuyPathKeyedBroadcastProcessFunction(msd:MapStateDescriptor[String,Int]) extends KeyedBroadcastProcessFunction[String,UserAction,Rule,String]{override def processElement(value: UserAction,ctx: KeyedBroadcastProcessFunction[String, UserAction, Rule, String]#ReadOnlyContext,out: Collector[String]): Unit = {println("value:"+value +" key:"+ctx.getCurrentKey)println("=====state======")for(entry <- ctx.getBroadcastState(msd).immutableEntries().asScala){println(entry.getKey+"\t"+entry.getValue)}}override def processBroadcastElement(value: Rule, ctx: KeyedBroadcastProcessFunction[String, UserAction, Rule, String]#Context, out: Collector[String]): Unit = {println("Rule:"+value)//更新状态ctx.getBroadcastState(msd).put(value.channel,value.threshold)}
}
case class Rule(channel:String,threshold:Int)
case class UserAction(id:String,name:String ,channel:String,action:String)
var env=StreamExecutionEnvironment.getExecutionEnvironment
// id name channel action
// 001 mack 手机 view
// 001 mack 手机 view
// 001 mack 手机 addToCart
// 001 mack 手机 buy
val userKeyedStream = env.socketTextStream("centos", 9999)
.map(line => line.split("\\s+"))
.map(ts => UserAction(ts(0), ts(1), ts(2), ts(3)))
.keyBy(0)//只可以写一个参数val msd=new MapStateDescriptor[String,Int]("braodcast-sate",createTypeInformation[String],createTypeInformation[Int])
// channel 阈值
// 手机类 10
// 电子类 10
val broadcastStream: BroadcastStream[Rule] = fsEnv.socketTextStream("centos", 8888)
.map(line => line.split("\\s+"))
.map(ts => Rule(ts(0), ts(1).toInt))
.broadcast(msd)
userKeyedStream.connect(broadcastStream)
.process(new UserBuyPathKeyedBroadcastProcessFunction(msd))
.print()
env.execute("testoperatorstate")

CheckPoint & SavePoint

CheckPoint是Flink实现故障容错的一种机制,系统会根据配置的检查点定期自动对程序计算状态进行备份。一旦程序在计算过程中出现故障,系统会选择一个最近的检查点进行故障恢复。

SavePoint是一种有效的运维手段,需要用户手动触发程序进行状态备份,本质也是在做CheckPoint。

实现故障恢复的先决条件:

  • 持久的数据源,可以在一定时间内重播记录(例如,FlinkKafkaConsumer)
  • 状态的永久性存储,通常是分布式文件系统(例如,HDFS)
var env=StreamExecutionEnvironment.getExecutionEnvironment
//启动检查点机制
env.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE)
//配置checkpoint必须在2s内完成一次checkpoint,否则检查点终止
env.getCheckpointConfig.setCheckpointTimeout(2000)
//设置checkpoint之间时间间隔 <=  Checkpoint interval
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(5)
//配置checkpoint并行度,不配置默认1
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
//一旦检查点不能正常运行,Task也将终止
env.getCheckpointConfig.setFailOnCheckpointingErrors(true)
//将检查点存储外围系统 filesystem、rocksdb,可以配置在cancel任务时候,系统是否保留checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
val props = new Properties()
props.setProperty("bootstrap.servers", "centos:9092")
props.setProperty("group.id", "g1")
env.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props))
.flatMap(line => line.split("\\s+"))
.map((_,1))
.keyBy(0) //只可以写一个参数
.sum(1)
.print()
env.execute("testoperatorstate")

State Backend

State Backend决定Flink如何存储系统状态信息(Checkpoint形式),目前Flink提供了三种State Backend实现。

  • Memory (JobManagwer):这是Flink的默认实现,通常用于测试,系统会将计算状态存储在JobManager的内存中,但是在实际的生产环境中,由于计算的状态比较多,使用Memory 很容易导致OOM(out of memory)。
  • FileSystem:系统会将计算状态存储在TaskManager的内存中,因此一般用作生产环境,系统会根据CheckPoin机制,将TaskManager状态数据在文件系统上进行备份。如果是超大规模集群,TaskManager内存也可能发生溢出。
  • RocksDB:系统会将计算状态存储在TaskManager的内存中,如果TaskManager内存不够,系统可以使用RocksDB配置本地磁盘完成状态的管理,同时支持将本地的状态数据备份到远程文件系统,因此,RocksDB Backend 是推荐的选择。

参考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html

每一个Job都可以配置自己状态存储的后端实现

var env=StreamExecutionEnvironment.getExecutionEnvironment
val fsStateBackend:StateBackend = new FsStateBackend("hdfs:///xxx") //MemoryStateBackend、FsStateBackend、RocksDBStateBackend
env.setStateBackend(fsStateBackend)

如果用户不配置,则系统使用默认实现,默认实现可以通过修改flink-conf-yaml文件进行配置

[root@centos ~]# cd /usr/flink-1.8.1/
[root@centos flink-1.8.1]# vi conf/flink-conf.yaml
#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#state.backend: rocksdb
# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#state.checkpoints.dir: hdfs:///flink-checkpoints
# Default target directory for savepoints, optional.
#state.savepoints.dir: hdfs:///flink-savepoints# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend).
#state.backend.incremental: true

注意,必须在环境变量中出现HADOOP_CLASSPATH

Flink计算发布之后是否还能够修改计算算子?

首先,这在Spark中是不允许的,因为Spark会持久化代码片段,一旦修改代码,必须删除Checkpoint,但是Flink仅仅存储各个算子的计算状态,如果用户修改代码,需要用户在有状态的操作算子上指定uid属性。

env.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props))
.uid("kakfa-consumer")
.flatMap(line => line.split("\\s+"))
.map((_,1))
.keyBy(0) //只可以写一个参数
.sum(1)
.uid("word-count") //唯一
.map(t=>t._1+"->"+t._2)
.print()

Flink Kafka如何保证精准一次的语义操作?

  • https://www.cnblogs.com/ooffff/p/9482873.html
  • https://www.jianshu.com/p/8cf344bb729a
  • https://www.jianshu.com/p/de35bf649293
  • https://blog.csdn.net/justlpf/article/details/80292375
  • https://www.jianshu.com/p/c0af87078b9c (面试题)

相关文章:

Flink State 和 Fault Tolerance详解

有状态操作或者操作算子在处理DataStream的元素或者事件的时候需要存储计算的中间状态&#xff0c;这就使得状态在整个Flink的精细化计算中有着非常重要的地位&#xff1a; 记录数据从某一个过去时间点到当前时间的状态信息。以每分钟/小时/天汇总事件时&#xff0c;状态将保留…...

小红书2023“家生活”趋势白皮书

关于报告的所有内容&#xff0c;公众【营销人星球】获取下载查看 核心观点 近年来&#xff0c;年轻人与家的关系愈发紧密。 在小红书上&#xff0c;我们观察到了家居家装内容的蓬勃生长&#xff0c;3 年来相关内容的笔记规模增长了6倍&#xff0c;相关品类的搜索量增加的 3.…...

使用 LangChain 搭建基于 Amazon DynamoDB 的大语言模型应用

LangChain 是一个旨在简化使用大型语言模型创建应用程序的框架。作为语言模型集成框架&#xff0c;在这个应用场景中&#xff0c;LangChain 将与 Amazon DynamoDB 紧密结合&#xff0c;构建一个完整的基于大语言模型的聊天应用。 本次活动&#xff0c;我们特意邀请了亚马逊云科…...

210. 课程表 II Python

文章目录 一、题目描述示例 1示例 2示例 3 二、代码三、解题思路 一、题目描述 现在你总共有 numCourses 门课需要选&#xff0c;记为 0 到 numCourses - 1。给你一个数组 prerequisites &#xff0c;其中 prerequisites[i] [ai, bi] &#xff0c;表示在选修课程 ai 前 必须 …...

【LeetCode 算法】Linked List Cycle II 环形链表 II

文章目录 Linked List Cycle II 环形链表 II问题描述&#xff1a;分析代码哈希快慢指针 Tag Linked List Cycle II 环形链表 II 问题描述&#xff1a; 给定一个链表的头节点 head &#xff0c;返回链表开始入环的第一个节点。 如果链表无环&#xff0c;则返回 null。 如果链…...

蒸散发与植被总初级生产力估算

目标 熟悉蒸散发ET及其组分&#xff08;植被蒸腾Ec、土壤蒸发Es、冠层截留Ei&#xff09;、植被总初级生产力GPP的概念和碳水耦合的基本原理&#xff1b;掌握利用Python与ArcGIS工具进行课程相关的操作&#xff1b;熟练掌握国际上流行的Penman-Monteith模型&#xff0c;并能够…...

uniapp微信小程序底部弹窗自定义组件

基础弹窗效果组件 <template><view><viewclass"tui-actionsheet-class tui-actionsheet":class"[show ? tui-actionsheet-show : ]"><view class"regional-selection">底部弹窗</view></view><!-- 遮罩…...

人工智能的最新进展:2024年将会发生什么?

文章目录 2024年AI最新发展2024年AI具体应用2024年AI的具体预测 ✍创作者&#xff1a;全栈弄潮儿 &#x1f3e1; 个人主页&#xff1a; 全栈弄潮儿的个人主页 &#x1f3d9;️ 个人社区&#xff0c;欢迎你的加入&#xff1a;全栈弄潮儿的个人社区 &#x1f4d9; 专栏地址&#…...

使用Golang实现一套流程可配置,适用于广告、推荐系统的业务性框架——组合应用

在《使用Golang实现一套流程可配置&#xff0c;适用于广告、推荐系统的业务性框架——简单应用》中&#xff0c;我们看到了各种组合Handler的组件&#xff0c;如HandlerGroup和Layer。这些组件下面的子模块又是不同组件&#xff0c;比如LayerCenter的子组件是Layer。如果此时我…...

DNS入门学习:DNS缓存的原理和作用(中科三方)

在实际业务场景中&#xff0c;DNS解析过程并不总是严格遵循从根域名服务器、顶级域名服务器再到权威域名服务器的一级级查询过程&#xff0c;这只是一个标准状态。为了节省全球查询的时间&#xff0c;同时减轻各级服务器的解析压力&#xff0c;DNS系统中引入了缓存机制。本文中…...

Linux虚拟机安装tomcat(图文详解)

目录 第一章、xshell工具和xftp的使用1.1&#xff09;xshell下载与安装1.2&#xff09;xshell连接1.3&#xff09;xftp下载安装和连接 第二章、安装tomcat1.1&#xff09;关闭防火墙&#xff0c;传输tomcat压缩包到Linux虚拟机12&#xff09;启动tomcat 第一章、xshell工具和xf…...

Matlab对TMS320F28335编程--SVPWM配置互补PWM输出

前言 F28335中断 目的&#xff1a;FOC的核心算法及SVPWM输出&#xff0c;SVPWM的载波频率10kHz&#xff0c;SVPWM的每个周期都会触发ADC中断采集相电流&#xff0c;SVPWM为芯片ePWM4、5、6通道&#xff0c;配置死区 1、配置中断SVPWM进ADC中断&#xff0c;查上表知CPU1,PIE1 …...

MySQL数据库——多表操作

文章目录 前言多表关系一对一关系一对多/多对一关系多对多关系 外键约束创建外键约束插入数据删除带有外键约束的表的数据删除外键约束 多表联合查询数据准备交叉连接查询内连接查询外连接查询左外连接查询右外连接查询满外连接查询 子查询子查询关键字ALL 关键字ANY 和 SOME 关…...

Java版本spring cloud + spring boot企业电子招投标系统源代码 tbms

​ 功能模块&#xff1a; 待办消息&#xff0c;招标公告&#xff0c;中标公告&#xff0c;信息发布 描述&#xff1a; 全过程数字化采购管理&#xff0c;打造从供应商管理到采购招投标、采购合同、采购执行的全过程数字化管理。通供应商门户具备内外协同的能力&#xff0c;为…...

css实现,正常情况下div从左到右一次排列,宽度超出时,右侧最后一个div固定住,左侧其他div滚动

需求:正常情况下 宽度超出时: 实现: <templete><div class"jieduanbox"><div v-for"(item, index) in stageList" :key"index" style"display: inline-block">.......</div><div class"rightBtn&q…...

【Linux手动搭建Sftp,创建用户、用户组及删除用户】

SFTP &#xff08;Secure File Transfer Protocol&#xff09;是一种安全的文件传输协议&#xff0c;基于SSH协议进行加密传输。在进行文件传输时&#xff0c;SFTP客户端通过SSH协议与服务器进行连接&#xff0c;并且通过使用公钥和/或密码进行身份验证&#xff0c;从而确保传输…...

云上 Index:看「简墨」如何为云原生打造全新索引

拓数派首款数据计算引擎 PieCloudDB Database 是一款全新的云原生虚拟数仓。为了提升用户使用体验&#xff0c;提高查询效率&#xff0c;在实现存算分离的同时&#xff0c;PieCloudDB 设计与打造了全新的存储引擎「简墨」等模块&#xff0c;并针对云场景和分析型场景设计了高效…...

Linux安装cuda和cudnn教程

Linux安装cuda和cudnn教程 文章目录 1.下载cuda和cudnn2. 安装cuda并检验安装是否成功3. 安装cudnn4.验证cuda是否能用代码附件&#xff1a;解压各种格式文件的Linux命令参考文献 卸载之前的cuda 卸载之前的cuda教程 1.下载cuda和cudnn CUDA下载地址&#xff1a;https://dev…...

短视频矩阵源码

一、短视频矩阵源码搭建解析&#xff1a; 目录 一、短视频矩阵源码搭建解析&#xff1a; 二、短视频矩阵源码的开发路径分享&#xff1a; 三、短视频矩阵系统开发应具备哪些能力&#xff1f; 短视频技术开发能力&#xff1a; 开发人员应具备短视频相关技术能力&#xff0c…...

群狼调研—连锁化妆品品牌门店神秘顾客调查的行家

连锁化妆品品牌门店神秘顾客调查作为群狼调研(湖南专业市场调查)的优势业务之一&#xff0c;公司成立至今已承包包括北京、上海、广州、深圳、长沙在内全国多个城市上百家不同化妆品品牌客户的神秘顾客调查服务&#xff0c;在创新性、行业操守及客户服务等方面赢得了广大客户的…...

.Net框架,除了EF还有很多很多......

文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...

VTK如何让部分单位不可见

最近遇到一个需求&#xff0c;需要让一个vtkDataSet中的部分单元不可见&#xff0c;查阅了一些资料大概有以下几种方式 1.通过颜色映射表来进行&#xff0c;是最正规的做法 vtkNew<vtkLookupTable> lut; //值为0不显示&#xff0c;主要是最后一个参数&#xff0c;透明度…...

浅谈不同二分算法的查找情况

二分算法原理比较简单&#xff0c;但是实际的算法模板却有很多&#xff0c;这一切都源于二分查找问题中的复杂情况和二分算法的边界处理&#xff0c;以下是博主对一些二分算法查找的情况分析。 需要说明的是&#xff0c;以下二分算法都是基于有序序列为升序有序的情况&#xf…...

Qemu arm操作系统开发环境

使用qemu虚拟arm硬件比较合适。 步骤如下&#xff1a; 安装qemu apt install qemu-system安装aarch64-none-elf-gcc 需要手动下载&#xff0c;下载地址&#xff1a;https://developer.arm.com/-/media/Files/downloads/gnu/13.2.rel1/binrel/arm-gnu-toolchain-13.2.rel1-x…...

解析奥地利 XARION激光超声检测系统:无膜光学麦克风 + 无耦合剂的技术协同优势及多元应用

在工业制造领域&#xff0c;无损检测&#xff08;NDT)的精度与效率直接影响产品质量与生产安全。奥地利 XARION开发的激光超声精密检测系统&#xff0c;以非接触式光学麦克风技术为核心&#xff0c;打破传统检测瓶颈&#xff0c;为半导体、航空航天、汽车制造等行业提供了高灵敏…...

6个月Python学习计划 Day 16 - 面向对象编程(OOP)基础

第三周 Day 3 &#x1f3af; 今日目标 理解类&#xff08;class&#xff09;和对象&#xff08;object&#xff09;的关系学会定义类的属性、方法和构造函数&#xff08;init&#xff09;掌握对象的创建与使用初识封装、继承和多态的基本概念&#xff08;预告&#xff09; &a…...

rknn toolkit2搭建和推理

安装Miniconda Miniconda - Anaconda Miniconda 选择一个 新的 版本 &#xff0c;不用和RKNN的python版本保持一致 使用 ./xxx.sh进行安装 下面配置一下载源 # 清华大学源&#xff08;最常用&#xff09; conda config --add channels https://mirrors.tuna.tsinghua.edu.cn…...

实战设计模式之模板方法模式

概述 模板方法模式定义了一个操作中的算法骨架&#xff0c;并将某些步骤延迟到子类中实现。模板方法使得子类可以在不改变算法结构的前提下&#xff0c;重新定义算法中的某些步骤。简单来说&#xff0c;就是在一个方法中定义了要执行的步骤顺序或算法框架&#xff0c;但允许子类…...

在鸿蒙HarmonyOS 5中使用DevEco Studio实现指南针功能

指南针功能是许多位置服务应用的基础功能之一。下面我将详细介绍如何在HarmonyOS 5中使用DevEco Studio实现指南针功能。 1. 开发环境准备 确保已安装DevEco Studio 3.1或更高版本确保项目使用的是HarmonyOS 5.0 SDK在项目的module.json5中配置必要的权限 2. 权限配置 在mo…...

加密通信 + 行为分析:运营商行业安全防御体系重构

在数字经济蓬勃发展的时代&#xff0c;运营商作为信息通信网络的核心枢纽&#xff0c;承载着海量用户数据与关键业务传输&#xff0c;其安全防御体系的可靠性直接关乎国家安全、社会稳定与企业发展。随着网络攻击手段的不断升级&#xff0c;传统安全防护体系逐渐暴露出局限性&a…...