当前位置: 首页 > news >正文

【Flink】状态管理

目录

1、状态概述

1.1 无状态算子

1.2 有状态算子

2、状态分类

​编辑 2.1 算子状态

2.1.1 列表状态(ListState)

2.1.2 联合列表状态(UnionListState)

2.1.3 广播状态(BroadcastState)

2.2 按键分区状态 

2.2.1 值状态(ValueState)

2.2.2 列表状态(ListState)

2.2.3 Map状态(MapState)

2.2.4 归约状态(ReducingState)

2.2.5 聚合状态(AggregatingState)

2.2.6 状态生存时间(TTL)

3、状态后端(State Backends)

3.1 状态后端的分类(HashMapStateBackend/RocksDB)

3.1.1 哈希表状态后端(HashMapStateBackend)

3.1.2 内嵌RocksDB状态后端(EmbeddedRocksDBStateBackend)

3.2 如何选择正确的状态后端

3.3 状态后端的配置


1、状态概述

1.1 无状态算子

根据当前的输入可以直接转换得到输出结果,这种鼻子就是无状态算子,如map,flatMap,filter

1.2 有状态算子

除当前处理之外,还需要其他处理才能得到计算结果。如聚合算子,窗口算子等

2、状态分类

        Flink的状态有两种:托管状态(Managed State)原始状态(Raw State)托管状态就是由Flink统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由Flink实现,我们只要调接口就可以;而原始状态则是自定义的,相当于就是开辟了一块内存,需要我们自己管理,实现状态的序列化和故障恢复。

        通常我们采用Flink托管状态来实现需求。

 

2.1 算子状态

          一个算子任务会按照并行度分为多个并行子任务执行,而不同的子任务会占据不同的任务槽(task slot)。由于不同的slot在计算资源上是物理隔离的,所以Flink能管理的状态在并行任务间是无法共享的,每个状态只能针对当前子任务的实例有效。

算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。         

算子状态的实际应用场景不如Keyed State多,一般用在Source或Sink等与外部系统连接的算子上,或者完全没有key定义的场景。比如Flink的Kafka连接器中,就用到了算子状态。

算子状态也支持不同的结构类型,主要有三种:ListState、UnionListState和BroadcastState。

2.1.1 列表状态(ListState)

与Keyed State中的列表状态的区别是:在算子状态的上下文中,不会按键(key)分别处理状态,所以每一个并行子任务上只会保留一个“列表”(list),也就是当前并行子任务上所有状态项的集合。列表中的状态项就是可以重新分配的最细粒度,彼此之间完全独立。

案例实操:在map算子中计算数据的个数。

public class OperatorListStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.socketTextStream("hadoop102", 7777).map(new MyCountMapFunction()).print();env.execute();}// TODO 1.实现 CheckpointedFunction 接口public static class MyCountMapFunction implements MapFunction<String, Long>, CheckpointedFunction {private Long count = 0L;private ListState<Long> state;@Overridepublic Long map(String value) throws Exception {return ++count;}/*** TODO 2.本地变量持久化:将 本地变量 拷贝到 算子状态中,开启checkpoint时才会调用** @param context* @throws Exception*/@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {System.out.println("snapshotState...");// 2.1 清空算子状态state.clear();// 2.2 将 本地变量 添加到 算子状态 中state.add(count);}/*** TODO 3.初始化本地变量:程序启动和恢复时, 从状态中 把数据添加到 本地变量,每个子任务调用一次** @param context* @throws Exception*/@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {System.out.println("initializeState...");// 3.1 从 上下文 初始化 算子状态state = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("state", Types.LONG));// 3.2 从 算子状态中 把数据 拷贝到 本地变量if (context.isRestored()) {for (Long c : state.get()) {count += c;}}}}
}

2.1.2 联合列表状态(UnionListState)

与ListState类似,联合列表状态也会将状态表示为一个列表。它与常规列表状态的区别在于,算子并行度进行缩放调整时对于状态的分配方式不同。

UnionListState的重点就在于“联合”(union)。在并行度调整时,常规列表状态是轮询分配状态项,而联合列表状态的算子则会直接广播状态的完整列表。

如果列表中状态项数量太多,为资源和效率考虑一般不建议使用联合重组的方式。

使用方式同ListState,区别在:getUnionListState(new ListStateDescriptor<Long>("union-state", Types.LONG));

state = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<Long>("union-state", Types.LONG));

2.1.3 广播状态(BroadcastState)

有时我们希望算子并行子任务都保持同一份“全局”状态,用来做统一的配置和规则设定。

案例实操:水位超过指定的阈值发送告警,阈值可以动态修改

public class OperatorBroadcastStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 数据流SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());// 配置流(用来广播配置)DataStreamSource<String> configDS = env.socketTextStream("hadoop102", 8888);// TODO 1. 将 配置流 广播MapStateDescriptor<String, Integer> broadcastMapState = new MapStateDescriptor<>("broadcast-state", Types.STRING, Types.INT);BroadcastStream<String> configBS = configDS.broadcast(broadcastMapState);// TODO 2.把 数据流 和 广播后的配置流 connectBroadcastConnectedStream<WaterSensor, String> sensorBCS = sensorDS.connect(configBS);// TODO 3.调用 processsensorBCS.process(new BroadcastProcessFunction<WaterSensor, String, String>() {/*** 数据流的处理方法: 数据流 只能 读取 广播状态,不能修改* @param value* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(WaterSensor value, ReadOnlyContext ctx, Collector<String> out) throws Exception {// TODO 5.通过上下文获取广播状态,取出里面的值(只读,不能修改)ReadOnlyBroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);Integer threshold = broadcastState.get("threshold");// 判断广播状态里是否有数据,因为刚启动时,可能是数据流的第一条数据先来threshold = (threshold == null ? 0 : threshold);if (value.getVc() > threshold) {out.collect(value + ",水位超过指定的阈值:" + threshold + "!!!");}}/*** 广播后的配置流的处理方法:  只有广播流才能修改 广播状态* @param value* @param ctx* @param out* @throws Exception*/@Overridepublic void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {// TODO 4. 通过上下文获取广播状态,往里面写数据BroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);broadcastState.put("threshold", Integer.valueOf(value));}}).print();env.execute();}
}

2.2 按键分区状态 

而很多有状态的操作(比如聚合、窗口)都是要先做keyBy进行按键分区的。按键分区之后,任务所进行的所有计算都应该只针对当前key有效,所以状态也应该按照key彼此隔离。

它的特点非常鲜明,就是以key为作用范围进行隔离。

需要注意,使用Keyed State必须基于KeyedStream。没有进行keyBy分区的DataStream,即使转换算子实现了对应的富函数类,也不能通过运行时上下文访问Keyed State。

2.2.1 值状态(ValueState)

public interface ValueState<T> extends State {T value() throws IOException;void update(T value) throws IOException;
}
  • T value():获取当前状态的值;
  • update(T value):对状态进行更新,传入的参数value就是要覆写的状态值。

        在具体使用时,为了让运行时上下文清楚到底是哪个状态,我们还需要创建一个“状态描述器”(StateDescriptor)来提供状态的基本信息。例如源码中,ValueState的状态描述器构造方法如下:

public ValueStateDescriptor(String name, Class<T> typeClass) {super(name, typeClass, null);
}

案例需求:检测每种传感器的水位值,如果连续的两个水位值超过10,就输出报警。

public class KeyedValueStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) -> element.getTs() * 1000L));sensorDS.keyBy(r -> r.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {// TODO 1.定义状态ValueState<Integer> lastVcState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// TODO 2.在open方法中,初始化状态// 状态描述器两个参数:第一个参数,起个名字,不重复;第二个参数,存储的类型lastVcState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("lastVcState", Types.INT));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
//                                lastVcState.value();  // 取出 本组 值状态 的数据
//                                lastVcState.update(); // 更新 本组 值状态 的数据
//                                lastVcState.clear();  // 清除 本组 值状态 的数据// 1. 取出上一条数据的水位值(Integer默认值是null,判断)int lastVc = lastVcState.value() == null ? 0 : lastVcState.value();// 2. 求差值的绝对值,判断是否超过10Integer vc = value.getVc();if (Math.abs(vc - lastVc) > 10) {out.collect("传感器=" + value.getId() + "==>当前水位值=" + vc + ",与上一条水位值=" + lastVc + ",相差超过10!!!!");}// 3. 更新状态里的水位值lastVcState.update(vc);}}).print();env.execute();}

2.2.2 列表状态(ListState)

将需要保存的数据,以列表(List)的形式组织起来。在ListState<T>接口中同样有一个类型参数T,表示列表中数据的类型。ListState也提供了一系列的方法来操作状态,使用方式与一般的List非常相似。

  • Iterable<T> get():获取当前的列表状态,返回的是一个可迭代类型Iterable<T>;
  • update(List<T> values):传入一个列表values,直接对状态进行覆盖;
  • add(T value):在状态列表中添加一个元素value;
  • addAll(List<T> values):向列表中添加多个元素,以列表values形式传入。

类似地,ListState的状态描述器就叫作ListStateDescriptor,用法跟ValueStateDescriptor完全一致。

案例:针对每种传感器输出最高的3个水位值

public class KeyedListStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) -> element.getTs() * 1000L));sensorDS.keyBy(r -> r.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {ListState<Integer> vcListState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcListState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("vcListState", Types.INT));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {// 1.来一条,存到list状态里vcListState.add(value.getVc());// 2.从list状态拿出来(Iterable), 拷贝到一个List中,排序, 只留3个最大的Iterable<Integer> vcListIt = vcListState.get();// 2.1 拷贝到List中List<Integer> vcList = new ArrayList<>();for (Integer vc : vcListIt) {vcList.add(vc);}// 2.2 对List进行降序排序vcList.sort((o1, o2) -> o2 - o1);// 2.3 只保留最大的3个(list中的个数一定是连续变大,一超过3就立即清理即可)if (vcList.size() > 3) {// 将最后一个元素清除(第4个)vcList.remove(3);}out.collect("传感器id为" + value.getId() + ",最大的3个水位值=" + vcList.toString());// 3.更新list状态vcListState.update(vcList);//                                vcListState.get();            //取出 list状态 本组的数据,是一个Iterable
//                                vcListState.add();            // 向 list状态 本组 添加一个元素
//                                vcListState.addAll();         // 向 list状态 本组 添加多个元素
//                                vcListState.update();         // 更新 list状态 本组数据(覆盖)
//                                vcListState.clear();          // 清空List状态 本组数据}}).print();env.execute();}
}

2.2.3 Map状态(MapState)

把一些键值对(key-value)作为状态整体保存起来,可以认为就是一组key-value映射的列表。

MapState提供了操作映射状态的方法,与Map的使用非常类似。

  • UV get(UK key):传入一个key作为参数,查询对应的value值;
  • put(UK key, UV value):传入一个键值对,更新key对应的value值;
  • putAll(Map<UK, UV> map):将传入的映射map中所有的键值对,全部添加到映射状态中;
  • remove(UK key):将指定key对应的键值对删除;
  • boolean contains(UK key):判断是否存在指定的key,返回一个boolean值。

另外,MapState也提供了获取整个映射相关信息的方法;

  • Iterable<Map.Entry<UK, UV>> entries():获取映射状态中所有的键值对;
  • Iterable<UK> keys():获取映射状态中所有的键(key),返回一个可迭代Iterable类型;
  • Iterable<UV> values():获取映射状态中所有的值(value),返回一个可迭代Iterable类型;
  • boolean isEmpty():判断映射是否为空,返回一个boolean值。

案例需求:统计每种传感器每种水位值出现的次数。

public class KeyedMapStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) -> element.getTs() * 1000L));sensorDS.keyBy(r -> r.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {MapState<Integer, Integer> vcCountMapState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcCountMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("vcCountMapState", Types.INT, Types.INT));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {// 1.判断是否存在vc对应的keyInteger vc = value.getVc();if (vcCountMapState.contains(vc)) {// 1.1 如果包含这个vc的key,直接对value+1Integer count = vcCountMapState.get(vc);vcCountMapState.put(vc, ++count);} else {// 1.2 如果不包含这个vc的key,初始化put进去vcCountMapState.put(vc, 1);}// 2.遍历Map状态,输出每个k-v的值StringBuilder outStr = new StringBuilder();outStr.append("======================================\n");outStr.append("传感器id为" + value.getId() + "\n");for (Map.Entry<Integer, Integer> vcCount : vcCountMapState.entries()) {outStr.append(vcCount.toString() + "\n");}outStr.append("======================================\n");out.collect(outStr.toString());//                                vcCountMapState.get();          // 对本组的Map状态,根据key,获取value
//                                vcCountMapState.contains();     // 对本组的Map状态,判断key是否存在
//                                vcCountMapState.put(, );        // 对本组的Map状态,添加一个 键值对
//                                vcCountMapState.putAll();  // 对本组的Map状态,添加多个 键值对
//                                vcCountMapState.entries();      // 对本组的Map状态,获取所有键值对
//                                vcCountMapState.keys();         // 对本组的Map状态,获取所有键
//                                vcCountMapState.values();       // 对本组的Map状态,获取所有值
//                                vcCountMapState.remove();   // 对本组的Map状态,根据指定key,移除键值对
//                                vcCountMapState.isEmpty();      // 对本组的Map状态,判断是否为空
//                                vcCountMapState.iterator();     // 对本组的Map状态,获取迭代器
//                                vcCountMapState.clear();        // 对本组的Map状态,清空}}).print();env.execute();}
}

2.2.4 归约状态(ReducingState)

归约逻辑的定义,是在归约状态描述器(ReducingStateDescriptor)中,通过传入一个归约函数(ReduceFunction)来实现的。这里的归约函数,就是我们之前介绍reduce聚合算子时讲到的ReduceFunction,所以状态类型跟输入的数据类型是一样的。

public ReducingStateDescriptor(String name, ReduceFunction<T> reduceFunction, Class<T> typeClass) {...}

案例:计算每种传感器的水位和

.process(new KeyedProcessFunction<String, WaterSensor, Integer>() {private ReducingState<Integer> sumVcState;@Overridepublic void open(Configuration parameters) throws Exception {sumVcState = this.getRuntimeContext().getReducingState(new ReducingStateDescriptor<Integer>("sumVcState",Integer::sum,Integer.class));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<Integer> out) throws Exception {sumVcState.add(value.getVc());out.collect(sumVcState.get());}
})

2.2.5 聚合状态(AggregatingState)

与归约状态非常类似,聚合状态也是一个值,用来保存添加进来的所有数据的聚合结果。与ReducingState不同的是,它的聚合逻辑是由在描述器中传入一个更加一般化的聚合函数(AggregateFunction)来定义的;这也就是之前我们讲过的AggregateFunction,里面通过一个累加器(Accumulator)来表示状态,所以聚合的状态类型可以跟添加进来的数据类型完全不同,使用更加灵活。

案例需求:计算每种传感器的平均水位

public class KeyedAggregatingStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) -> element.getTs() * 1000L));sensorDS.keyBy(r -> r.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {AggregatingState<Integer, Double> vcAvgAggregatingState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcAvgAggregatingState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<Integer, Tuple2<Integer, Integer>, Double>("vcAvgAggregatingState",new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() {@Overridepublic Tuple2<Integer, Integer> createAccumulator() {return Tuple2.of(0, 0);}@Overridepublic Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1);}@Overridepublic Double getResult(Tuple2<Integer, Integer> accumulator) {return accumulator.f0 * 1D / accumulator.f1;}@Overridepublic Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
//                                                                return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);return null;}},Types.TUPLE(Types.INT, Types.INT)));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {// 将 水位值 添加到  聚合状态中vcAvgAggregatingState.add(value.getVc());// 从 聚合状态中 获取结果Double vcAvg = vcAvgAggregatingState.get();out.collect("传感器id为" + value.getId() + ",平均水位值=" + vcAvg);//                                vcAvgAggregatingState.get();    // 对 本组的聚合状态 获取结果
//                                vcAvgAggregatingState.add();    // 对 本组的聚合状态 添加数据,会自动进行聚合
//                                vcAvgAggregatingState.clear();  // 对 本组的聚合状态 清空数据}}).print();env.execute();}
}

2.2.6 状态生存时间(TTL)

在实际应用中,很多状态会随着时间的推移逐渐增长,如果不加以限制,最终就会导致存储空间的耗尽。

配置状态的TTL时,需要创建一个StateTtlConfig配置对象,然后调用状态描述器的.enableTimeToLive()方法启动TTL功能。

StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(10)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("my state", String.class);stateDescriptor.enableTimeToLive(ttlConfig);

这里用到了几个配置项:

  • .newBuilder()

状态TTL配置的构造器方法,必须调用,返回一个Builder之后再调用.build()方法就可以得到StateTtlConfig了。方法需要传入一个Time作为参数,这就是设定的状态生存时间

  • .setUpdateType()

设置更新类型。更新类型指定了什么时候更新状态失效时间,这里的OnCreateAndWrite表示只有创建状态和更改状态(写操作)时更新失效时间。另一种类型OnReadAndWrite则表示无论读写操作都会更新失效时间,也就是只要对状态进行了访问,就表明它是活跃的,从而延长生存时间。这个配置默认为OnCreateAndWrite。

  • .setStateVisibility()

设置状态的可见性。所谓的“状态可见性”,是指因为清除操作并不是实时的,所以当状态过期之后还有可能继续存在,这时如果对它进行访问,能否正常读取到就是一个问题了。这里设置的NeverReturnExpired是默认行为,表示从不返回过期值,也就是只要过期就认为它已经被清除了,应用不能继续读取;这在处理会话或者隐私数据时比较重要。对应的另一种配置是ReturnExpireDefNotCleanedUp,就是如果过期状态还存在,就返回它的值。

public class StateTTLDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) -> element.getTs() * 1000L));sensorDS.keyBy(r -> r.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {ValueState<Integer> lastVcState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// TODO 1.创建 StateTtlConfigStateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.seconds(5)) // 过期时间5s
//                                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 状态 创建和写入(更新) 更新 过期时间.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) // 状态 读取、创建和写入(更新) 更新 过期时间.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不返回过期的状态值.build();// TODO 2.状态描述器 启用 TTLValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("lastVcState", Types.INT);stateDescriptor.enableTimeToLive(stateTtlConfig);this.lastVcState = getRuntimeContext().getState(stateDescriptor);}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {// 先获取状态值,打印 ==》 读取状态Integer lastVc = lastVcState.value();out.collect("key=" + value.getId() + ",状态值=" + lastVc);// 如果水位大于10,更新状态值 ===》 写入状态if (value.getVc() > 10) {lastVcState.update(value.getVc());}}}).print();env.execute();}
}

3、状态后端(State Backends)

在Flink中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(state backend)。状态后端主要负责管理本地状态的存储方式和位置

3.1 状态后端的分类(HashMapStateBackend/RocksDB

Flink中提供了两类不同的状态后端,一种是“哈希表状态后端”(HashMapStateBackend),另一种是“内嵌RocksDB状态后端”(EmbeddedRocksDBStateBackend)。

系统默认的状态后端是HashMapStateBackend。

3.1.1 哈希表状态后端(HashMapStateBackend)

HashMapStateBackend是把状态存放在内存里。具体实现上,哈希表状态后端在内部会直接把状态当作对象(objects),保存在Taskmanager的JVM堆上

普通的状态,以及窗口中收集的数据和触发器,都会以键值对的形式存储起来,所以底层是一个哈希表(HashMap),这种状态后端也因此得名。

3.1.2 内嵌RocksDB状态后端(EmbeddedRocksDBStateBackend)

RocksDB是一种内嵌的key-value存储介质,可以把数据持久化到本地硬盘

配置EmbeddedRocksDBStateBackend后,会将处理中的数据全部放入RocksDB数据库中,RocksDB默认存储在TaskManager的本地数据目录里

3.2 如何选择正确的状态后端

HashMap和RocksDB两种状态后端最大的区别,就在于本地状态存放在哪里。

HashMapStateBackend是内存计算,读写速度非常快;但是,状态的大小会受到集群可用内存的限制,如果应用的状态随着时间不停地增长,就会耗尽内存资源。

而RocksDB是硬盘存储,所以可以根据可用的磁盘空间进行扩展,所以它非常适合于超级海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化,而且可能需要直接从磁盘读取数据,这就会导致性能的降低,平均读写性能要比HashMapStateBackend慢一个数量级。

3.3 状态后端的配置

3.3.1 配置默认的状态后端

#flink-conf.yaml# 默认状态后端
state.backend: hashmap# 存放检查点的文件路径
# 这里的state.checkpoints.dir配置项,定义了检查点和元数据写入的目录。
state.checkpoints.dir: hdfs://hadoop102:8020/flink/checkpoints

3.3.2 为每个作业(Per-job/Application)单独配置状态后端

通过执行环境设置,HashMapStateBackend

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStateBackend(new HashMapStateBackend());

通过执行环境设置,EmbeddedRocksDBStateBackend。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStateBackend(new EmbeddedRocksDBStateBackend());

需要注意,如果想在IDE中使用EmbeddedRocksDBStateBackend,需要为Flink项目添加依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb</artifactId><version>${flink.version}</version>
</dependency>

相关文章:

【Flink】状态管理

目录 1、状态概述 1.1 无状态算子 1.2 有状态算子 2、状态分类 ​编辑 2.1 算子状态 2.1.1 列表状态&#xff08;ListState&#xff09; 2.1.2 联合列表状态&#xff08;UnionListState&#xff09; 2.1.3 广播状态&#xff08;BroadcastState&#xff09; 2.2 按键分…...

《微信小程序开发从入门到实战》学习二十八

3.4 开发参与投票页面 3.4.3 使用radio单项选择器组件 逻辑层的数据已经准备好&#xff0c;现在实现视图层的页面展示。 投票的标题、&#xff0c;描述、截止日期、是否匿名等信息通过view和text组件就可以展示。比较特别的是投票选项的展示&#xff0c;涉及到单选还是多选&…...

2824. 统计和小于目标的下标对数目 : 详解 “左找右“ “右找左“ 两种方式

题目描述 这是 LeetCode 上的 「2824. 统计和小于目标的下标对数目」 &#xff0c;难度为 「简单」。 Tag : 「排序」、「二分」、「双指针」 给你一个下标从 0 开始长度为 n 的整数数组 nums 和一个整数 target&#xff0c;请你返回满足 0 < i < j < n 且 nums[i] n…...

windows电脑定时开关机设置

设置流程 右击【此电脑】>【管理】 【任务计划程序】>【创建基本任务】 gina 命令 查看 已经添加的定时任务从哪看&#xff1f;这里&#xff1a; 往下滑啦&#xff0c;看你刚才添加的任务&#xff1a;...

微信小程序取消自定义默认标题

微信小程序取消自定义默认标题 在单独页面index.json中添加 "navigationStyle":"custom"即可 注&#xff1a;仅记录开发查找&#xff01;&#xff01;&#xff01;...

Vue3鼠标拖拽生成区域块并选中元素

Vue3鼠标拖拽生成区域块并选中元素&#xff0c;选中的元素则背景高亮(或者其它逻辑)。 <script setup> import { ref } from vue// 区域ref const regionRef ref(null)// 内容ref const itemRefs ref(null)// 是否开启绘画区域 const enable ref(false)// 鼠标开始位置…...

[深度理解] 重启 Splunk Search Head Cluster

1: 背景: 关于释放Splunk search head 的job 运行压力:splunk search head cluster 要重启的话,怎么办? 答案是:splunk rolling-restart shcluster-members Initiate a rolling restart from the command line Invoke the splunk rolling-restart command from any me…...

Python + Docker 还是 Rust + WebAssembly?

在不断发展的技术世界中&#xff0c;由大语言模型驱动的应用程序&#xff0c;通常被称为“LLM 应用”&#xff0c;已成为各种行业技术创新背后的驱动力。随着这些应用程序的普及&#xff0c;用户需求的大量涌入对底层基础设施的性能、安全性和可靠性提出了新的挑战。 Python 和…...

[汇编实操]DOSBox工具: unable to open input file: 文件名.asm问题解决

出错原因1 &#xff1a;将文件放在debug文件下&#xff0c;mount后发现并没有该文件 解决方案 &#xff1a;重启DOSBox&#xff0c;重新mount&#xff0c;直到dir后可以看到该asm文件 出错原因2&#xff1a;DOS系统不支持8位以上的文件名 解决方案 &#xff1a;将文件名改为8…...

Windows安装MongoDB

1、下载MongoDB的zip&#xff0c;解压 2、创建目录 mkdir D:\JavaSoftware\Database\MongoDB\mongodb-win32-x86_64-windows-5.0.8\data\db mkdir D:\JavaSoftware\Database\MongoDB\mongodb-win32-x86_64-windows-5.0.8\data\log 3、创建一个配置文件mongod.cfg&#xff0c…...

HandBrake 1.7 近日发布

导读HandBrake 1.7 近日发布&#xff0c;作为这个开源、免费和跨平台视频转码器应用程序的重大更新&#xff0c;适用于 GNU/Linux、macOS 和 Windows 系统。 在 HandBrake 1.6 发布近一年后&#xff0c;HandBrake 1.7 版本为 Linux 用户提供了许多好处&#xff0c;包括视频摘要…...

Vue3的watch使用介绍及场景

目录 一、watch的使用 1. 监听一个变量 2. 监听一个对象的属性 3. 监听一个函数的返回值 二、watch的使用场景 1. 监听表单的变化 2. 监听路由参数的变化 3. 监听Vuex中的数据变化 三、watch的效果图 四、watch的示例 以上就是Vue3的watch的介绍&#xff0c;watch是…...

Java设计原则和设计模式

目录 第一部分&#xff1a;设计原则 单一职责原则 (Single Responsibility Principle)开闭原则 (Open-Closed Principle)里氏替换原则 (Liskov Substitution Principle)接口隔离原则 (Interface Segregation Principle)依赖倒置原则 (Dependency Inversion Principle)合成/聚…...

webshell之基于框架免杀

thinkphp array_map_recursive函数 array_map_recursive函数分析 这里存在一个call_user_func命令执行函数 免杀效果 B函数 免杀效果 B函数分析 exec函数分析 在exec函数用存在有个类调用&#xff0c;且所有的参数都可控 smarty_php_tag函数 免杀效果 smarty_php_tag函数分析…...

QT QJsonObject 插入 QByteArray十六进制数据

场景描述 有一组十六进制数使用QByteArray进行存储&#xff1b;需要将其插入QJsonObject&#xff0c;然后通过网络发送出去&#xff1b;接收到后&#xff0c;再转换回QByteArray&#xff1b; 操作代码 1. QByteArray转换QString插入QJsonObject QString str ""; …...

概要设计文档案例分享

1引言 1.1编写目的 1.2项目背景 1.3参考资料 2系统总体设计 2.1整体架构 2.2整体功能架构 2.3整体技术架构 2.4运行环境设计 2.5设计目标 3系统功能模块设计 3.1个人办公 4性能设计 4.1响应时间 4.2并发用户数 5接口设计 5.1接口设计原则 5.2接口实现方式 6运行设计 6.1运行模块…...

微服务qiankun通信方式

qiankun&#xff1a; 是一种类似于微服务的架构&#xff0c;是将一个大型应用拆分成若干个更小、更简单&#xff0c;可以独立开发、测试和部署的子应用&#xff0c;然后由一个基座应用根据路由进行应用切换&#xff0c;主要是为了解决大型工程在变更、维护、扩展等方面的困难而…...

【Azure 架构师学习笔记】-Azure Storage Account(7)- 权限控制

本文属于【Azure 架构师学习笔记】系列。 本文属于【Azure Storage Account】系列。 接上文 【Azure 架构师学习笔记】-Azure Storage Account&#xff08;6&#xff09;- File Layer 前言 存储帐户作为其中一个数据终端存储&#xff0c;对安全性的要求非常高&#xff0c;不管…...

听GPT 讲Rust源代码--src/tools(2)

题图来自AI生成 File: rust/src/tools/rust-installer/src/util.rs 在Rust源代码中&#xff0c;rust/src/tools/rust-installer/src/util.rs文件是安装程序的一个辅助文件&#xff0c;它提供了一些实用函数和结构体来处理安装过程中需要的一些操作。 这个文件中定义了几个结构体…...

【python学习】基础篇-常用模块-collections模块:数据结构,如列表、元组、字典和集合等

Python中的collections模块提供了一些有用的数据结构&#xff0c;如列表、元组、字典和集合等。 以下是collections模块中一些常用数据结构的用法&#xff1a; Counter类 Counter类是一个字典子类&#xff0c;用于计数可哈希对象。 它可以接受一个可迭代对象作为参数&#xff…...

【人工智能】神经网络的优化器optimizer(二):Adagrad自适应学习率优化器

一.自适应梯度算法Adagrad概述 Adagrad&#xff08;Adaptive Gradient Algorithm&#xff09;是一种自适应学习率的优化算法&#xff0c;由Duchi等人在2011年提出。其核心思想是针对不同参数自动调整学习率&#xff0c;适合处理稀疏数据和不同参数梯度差异较大的场景。Adagrad通…...

8k长序列建模,蛋白质语言模型Prot42仅利用目标蛋白序列即可生成高亲和力结合剂

蛋白质结合剂&#xff08;如抗体、抑制肽&#xff09;在疾病诊断、成像分析及靶向药物递送等关键场景中发挥着不可替代的作用。传统上&#xff0c;高特异性蛋白质结合剂的开发高度依赖噬菌体展示、定向进化等实验技术&#xff0c;但这类方法普遍面临资源消耗巨大、研发周期冗长…...

React Native在HarmonyOS 5.0阅读类应用开发中的实践

一、技术选型背景 随着HarmonyOS 5.0对Web兼容层的增强&#xff0c;React Native作为跨平台框架可通过重新编译ArkTS组件实现85%以上的代码复用率。阅读类应用具有UI复杂度低、数据流清晰的特点。 二、核心实现方案 1. 环境配置 &#xff08;1&#xff09;使用React Native…...

Qwen3-Embedding-0.6B深度解析:多语言语义检索的轻量级利器

第一章 引言&#xff1a;语义表示的新时代挑战与Qwen3的破局之路 1.1 文本嵌入的核心价值与技术演进 在人工智能领域&#xff0c;文本嵌入技术如同连接自然语言与机器理解的“神经突触”——它将人类语言转化为计算机可计算的语义向量&#xff0c;支撑着搜索引擎、推荐系统、…...

华为OD机试-食堂供餐-二分法

import java.util.Arrays; import java.util.Scanner;public class DemoTest3 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseint a in.nextIn…...

css3笔记 (1) 自用

outline: none 用于移除元素获得焦点时默认的轮廓线 broder:0 用于移除边框 font-size&#xff1a;0 用于设置字体不显示 list-style: none 消除<li> 标签默认样式 margin: xx auto 版心居中 width:100% 通栏 vertical-align 作用于行内元素 / 表格单元格&#xff…...

sipsak:SIP瑞士军刀!全参数详细教程!Kali Linux教程!

简介 sipsak 是一个面向会话初始协议 (SIP) 应用程序开发人员和管理员的小型命令行工具。它可以用于对 SIP 应用程序和设备进行一些简单的测试。 sipsak 是一款 SIP 压力和诊断实用程序。它通过 sip-uri 向服务器发送 SIP 请求&#xff0c;并检查收到的响应。它以以下模式之一…...

AirSim/Cosys-AirSim 游戏开发(四)外部固定位置监控相机

这个博客介绍了如何通过 settings.json 文件添加一个无人机外的 固定位置监控相机&#xff0c;因为在使用过程中发现 Airsim 对外部监控相机的描述模糊&#xff0c;而 Cosys-Airsim 在官方文档中没有提供外部监控相机设置&#xff0c;最后在源码示例中找到了&#xff0c;所以感…...

2025年渗透测试面试题总结-腾讯[实习]科恩实验室-安全工程师(题目+回答)

安全领域各种资源&#xff0c;学习文档&#xff0c;以及工具分享、前沿信息分享、POC、EXP分享。不定期分享各种好玩的项目及好用的工具&#xff0c;欢迎关注。 目录 腾讯[实习]科恩实验室-安全工程师 一、网络与协议 1. TCP三次握手 2. SYN扫描原理 3. HTTPS证书机制 二…...

Caliper 配置文件解析:fisco-bcos.json

config.yaml 文件 config.yaml 是 Caliper 的主配置文件,通常包含以下内容: test:name: fisco-bcos-test # 测试名称description: Performance test of FISCO-BCOS # 测试描述workers:type: local # 工作进程类型number: 5 # 工作进程数量monitor:type: - docker- pro…...