6 Flink 状态管理
6 Flink 状态管理
- 1. State-Keyed State
- 2. State-Operator State
- 3. Broadcast State
我们前面写的 wordcount 的例子,没有包含状态管理。如果一个task在处理过程中挂掉了,那么它在内存中的状态都会丢失,所有的数据都需要重新计算。从容错和消息处理的语义上(at least once, exactly once),Flink引入了state和checkpoint。
因此可以说flink因为引入了state和checkpoint所以才支持的exactly once
首先区分一下两个概念:
state:
state一般指一个具体的task/operator的状态:
state数据默认保存在java的堆内存中,TaskManage节点的内存中。
operator表示一些算子在运行的过程中会产生的一些中间结果。
checkpoint:
checkpoint可以理解为checkpoint是把state数据定时持久化存储了,则表示了一个Flink Job在一个特定时刻的一份全局状态快照,即包含了所有task/operator的状态。
注意:task(subTask)是Flink中执行的基本单位。operator指算子(transformation)
State可以被记录,在失败的情况下数据还可以恢复。
Flink中有两种基本类型的State:
Keyed State
Operator State
Keyed State和Operator State,可以以两种形式存在:
原始状态(raw state)
托管状态(managed state)
托管状态是由Flink框架管理的状态。
我们说operator算子保存了数据的中间结果,中间结果保存在什么类型中,如果我们这里是托管状态,则由flink框架自行管理
原始状态由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据结构一无所知。
通常在DataStream上的状态推荐使用托管的状态,当实现一个用户自定义的operator时,会使用到原始状态。
1. State-Keyed State
基于KeyedStream上的状态。这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key,都对应一个state,比如:stream.keyBy(…)。KeyBy之后的Operator State,可以理解为分区过的Operator State。
保存state的数据结构:
ValueState:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值。
ListState:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable来遍历状态值。
ReducingState:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值。
MapState<UK, UV>:即状态值为一个map。用户通过put或putAll方法添加元素。
需要注意的是,以上所述的State对象,仅仅用于与状态进行交互(更新、删除、清空等),而真正的状态值,有可能是存在内存、磁盘、或者其他分布式存储系统中。相当于我们只是持有了这个状态的句柄。
- ValueState
使用ValueState保存中间结果对下面数据进行分组求和。
开发步骤: - 获取流处理执行环境
- 加载数据源
- 数据分组
- 数据转换,定义ValueState,保存中间结果
- 数据打印
- 触发执行
ValueState:测试数据源:
List((1L, 4L),(2L, 3L),(3L, 1L),(1L, 2L),(3L, 2L),(1L, 2L),(2L, 2L),(2L, 9L)
)
示例代码:
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.util.Collectorobject TestKeyedState {class CountWithKeyedState extends RichFlatMapFunction[(Long, Long), (Long, Long)] {/*** ValueState状态句柄. 第一个值为count,第二个值为sum。*/private var sum: ValueState[(Long, Long)] = _override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {// 获取当前状态值val tmpCurrentSum: (Long, Long) = sum.value// 状态默认值val currentSum = if (tmpCurrentSum != null) {tmpCurrentSum} else {(0L, 0L)}// 更新val newSum = (currentSum._1 + 1, currentSum._2 + input._2)// 更新状态值sum.update(newSum)// 如果count >=3 清空状态值,重新计算if (newSum._1 >= 3) {out.collect((input._1, newSum._2 / newSum._1))sum.clear()}}override def open(parameters: Configuration): Unit = {sum = getRuntimeContext.getState(new ValueStateDescriptor[(Long, Long)]("average", // 状态名称TypeInformation.of(new TypeHint[(Long, Long)](){}) )// 状态类型)}} def main(args: Array[String]): Unit = {//初始化执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//构建数据源val inputStream: DataStream[(Long, Long)] = env.fromCollection(List((1L, 4L),(2L, 3L),(3L, 1L),(1L, 2L),(3L, 2L),(1L, 2L),(2L, 2L),(2L, 9L)))//执行数据处理inputStream.keyBy(0).flatMap(new CountWithKeyedState).setParallelism(1).print//运行任务env.execute}
}
- MapState
使用MapState保存中间结果对下面数据进行分组求和: - 获取流处理执行环境
- 加载数据源
- 数据分组
- 数据转换,定义MapState,保存中间结果
- 数据打印
- 触发执行
MapState:测试数据源:
List(("java", 1),("python", 3),("java", 2),("scala", 2),("python", 1),("java", 1),("scala", 2)
)
示例代码:
object MapState {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)/*** 使用MapState保存中间结果对下面数据进行分组求和* 1.获取流处理执行环境* 2.加载数据源* 3.数据分组* 4.数据转换,定义MapState,保存中间结果* 5.数据打印* 6.触发执行*/val source: DataStream[(String, Int)] = env.fromCollection(List(("java", 1),("python", 3),("java", 2),("scala", 2),("python", 1),("java", 1),("scala", 2)))source.keyBy(0).map(new RichMapFunction[(String, Int), (String, Int)] {var mste: MapState[String, Int] = _override def open(parameters: Configuration): Unit = {val msState = new MapStateDescriptor[String, Int]("ms",TypeInformation.of(new TypeHint[(String)] {}),TypeInformation.of(new TypeHint[(Int)] {}))mste = getRuntimeContext.getMapState(msState)}override def map(value: (String, Int)): (String, Int) = {val i: Int = mste.get(value._1)mste.put(value._1, value._2 + i)(value._1, value._2 + i)}}).print()env.execute()}
}
2. State-Operator State
与Key无关的State,与Operator绑定的state,整个operator只对应一个state。
保存state的数据结构:
ListState
举例来说,Flink中的 Kafka Connector,就使用了operator state。它会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射。
步骤:
1.获取执行环境
2.设置检查点机制:路径,重启策略
3.自定义数据源
需要继承并行数据源和CheckpointedFunction
设置listState,通过上下文对象context获取
数据处理,保留offset
制作快照
4.数据打印
5.触发执行
示例代码:
import java.utilimport org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._object ListOperate {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.enableCheckpointing(5000)env.setStateBackend(new FsStateBackend("hdfs://node01:8020/tmp/check/8"))env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)env.getCheckpointConfig.setCheckpointTimeout(60000)env.getCheckpointConfig.setFailOnCheckpointingErrors(false)env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)//重启策略env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(1), Time.seconds(5)))//模拟kakfa偏移量env.addSource(new MyRichParrelSourceFun).print()env.execute()}}class MyRichParrelSourceFun extends RichParallelSourceFunction[String]with CheckpointedFunction {var listState: ListState[Long] = _var offset: Long = 0L//任务运行override def run(ctx: SourceFunction.SourceContext[String]): Unit = {val iterState: util.Iterator[Long] = listState.get().iterator()while (iterState.hasNext) {offset = iterState.next()}while (true) {offset += 1ctx.collect("offset:"+offset)Thread.sleep(1000)if(offset > 10){1/0}}}//取消任务override def cancel(): Unit = ???//制作快照override def snapshotState(context: FunctionSnapshotContext): Unit = {listState.clear()listState.add(offset)}//初始化状态override def initializeState(context: FunctionInitializationContext): Unit = {listState = context.getOperatorStateStore.getListState(new ListStateDescriptor[Long]("listState", TypeInformation.of(new TypeHint[Long] {})))}
}
3. Broadcast State
Broadcast State 是 Flink 1.5 引入的新特性。在开发过程中,如果遇到需要下发/广播配置、规则等低吞吐事件流到下游所有 task 时,就可以使用 Broadcast State 特性。下游的 task 接收这些配置、规则并保存为 BroadcastState, 将这些配置应用到另一个数据流的计算中 。
- API介绍
通常,我们首先会创建一个Keyed或Non-Keyed的Data Stream,然后再创建一个Broadcasted Stream,最后通过Data Stream来连接(调用connect方法)到Broadcasted Stream上,这样实现将Broadcast State广播到Data Stream下游的每个Task中。
如果Data Stream是Keyed Stream,则连接到Broadcasted Stream后,添加处理ProcessFunction时需要使用KeyedBroadcastProcessFunction来实现,下面是KeyedBroadcastProcessFunction的API,代码如下所示:
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends BaseBroadcastProcessFunction {public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;
}
上面泛型中的各个参数的含义,说明如下:
KS:表示Flink程序从最上游的Source Operator开始构建Stream,当调用keyBy时所依赖的Key的类型;
IN1:表示非Broadcast的Data Stream中的数据记录的类型;
IN2:表示Broadcast Stream中的数据记录的类型;
OUT:表示经过KeyedBroadcastProcessFunction的processElement()和processBroadcastElement()方法处理后输出结果数据记录的类型。
如果Data Stream是Non-Keyed Stream,则连接到Broadcasted Stream后,添加处理ProcessFunction时需要使用BroadcastProcessFunction来实现,下面是BroadcastProcessFunction的API,代码如下所示:
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;}
上面泛型中的各个参数的含义,与前面KeyedBroadcastProcessFunction的泛型类型中的后3个含义相同,只是没有调用keyBy操作对原始Stream进行分区操作,就不需要KS泛型参数。
注意事项:
1.Broadcast State 是Map类型,即K-V类型。
2.Broadcast State 只有在广播一侧的方法中processBroadcastElement可以修改;在非广播一侧方法中processElement只读。
3.Broadcast State在运行时保存在内存中。
2) 场景举例
1.动态更新计算规则: 如事件流需要根据最新的规则进行计算,则可将规则作为广播状态广播到下游Task中。
2.实时增加额外字段: 如事件流需要实时增加用户的基础信息,则可将用户的基础信息作为广播状态广播到下游Task中。
相关文章:
6 Flink 状态管理
6 Flink 状态管理 1. State-Keyed State2. State-Operator State3. Broadcast State 我们前面写的 wordcount 的例子,没有包含状态管理。如果一个task在处理过程中挂掉了,那么它在内存中的状态都会丢失,所有的数据都需要重新计算。从容错和消…...
第1章 量子暗网中的血色黎明
月球暗面的危机与阴谋 量子隧穿效应催生的幽蓝电弧,于环形山表面肆意跳跃,仿若无数奋力挣扎的机械蠕虫,将月球暗面的死寂打破,徒增几分诡异。艾丽伫立在被遗弃的“广寒宫”量子基站顶端,机械义眼之中,倒映着…...
爬虫基础(六)代理简述
目录 一、什么是代理 二、基本原理 三、代理分类 一、什么是代理 爬虫一般是自动化的,当我们自动运行时 爬虫自动抓取数据,但一会就出现了错误: 如,您的访问频率过高! 这是因为网站的反爬措施,如果频…...
前端 Vue 性能提升策略
一、引言 前端性能优化是确保 Web 应用快速响应和流畅用户体验的关键。对于使用 Vue.js 构建的应用,性能优化不仅涉及通用的前端技术,还包括针对 Vue 特性的特定优化措施。本文将从多个方面探讨如何全面提升前端和 Vue 应用的性能。 二、前端性能优化基础 1. 减少初始加载…...
MCU内部ADC模块误差如何校准
本文章是笔者整理的备忘笔记。希望在帮助自己温习避免遗忘的同时,也能帮助其他需要参考的朋友。如有谬误,欢迎大家进行指正。 一、ADC误差校准引言 MCU 片内 ADC 模块的误差总包括了 5 个静态参数 (静态失调,增益误差,微分非线性…...
Spring MVC消息转换器
在Spring MVC框架中,extendMessageConverters 通常与消息转换器(Message Converters)相关。消息转换器是Spring MVC用于将HTTP请求和响应主体(body)转换为Java对象和字符串的组件。它们在处理不同的媒体类型࿰…...
手写防抖函数、手写节流函数
文章目录 1 手写防抖函数2 手写节流函数 1 手写防抖函数 函数防抖是指在事件被触发n秒后再执行回调,如果在这n秒内事件又被触发,则重新计时。这可以使用在一些点击请求的事件上,避免因为用户的多次点击向后端发送多次请求。 function debou…...
【Rust自学】15.4. Drop trait:告别手动清理,释放即安全
喜欢的话别忘了点赞、收藏加关注哦,对接下来的教程有兴趣的可以关注专栏。谢谢喵!(・ω・) 15.4.1. Drop trait的意义 类型如果实现了Drop trait,就可以让程序员自定义当值离开作用域时发生的操作。例如文件、网络资源…...
【Block总结】CPCA,通道优先卷积注意力|即插即用
论文信息 标题: Channel Prior Convolutional Attention for Medical Image Segmentation 论文链接: arxiv.org 代码链接: GitHub 创新点 本文提出了一种新的通道优先卷积注意力(CPCA)机制,旨在解决医学图像分割中存在的低对比度和显著…...
信息学奥赛一本通 1607:【 例 2】任务安排 2 | 洛谷 P10979 任务安排 2
【题目链接】 ybt 1607:【 例 2】任务安排 2 洛谷 P10979 任务安排 2 注:ybt1607中n最大达到 1 0 4 10^4 104,洛谷P10979中n最大达到 3 ∗ 1 0 5 3*10^5 3∗105,本题解统一认为n最大达到 3 ∗ 1 0 5 3*10^5 3∗105。 【题目考点…...
AI(计算机视觉)自学路线
本文仅用来记录一下自学路线方便日后复习,如果对你自学有帮助的话也很开心o(* ̄▽ ̄*)ブ B站吴恩达机器学习->B站小土堆pytorch基础学习->opencv相关知识(Halcon或者opencv库)->四类神经网络(这里跟…...
OFDM系统仿真
1️⃣ OFDM的原理 1.1 介绍 OFDM是一种多载波调制技术,将输入数据分配到多个子载波上,每个子载波上可以独立使用 QAM、PSK 等传统调制技术进行调制。这些子载波之间互相正交,从而可以有效利用频谱并减少干扰。 1.2 OFDM的核心 多载波调制…...
torch numpy seed使用方法
1 import numpy as np np.random.seed(500) np.random.rand(5)array([0.69367953, 0.06171699, 0.6666116 , 0.55920894, 0.08511062])import torch torch.manual_seed(500) torch.rand(5)为了能够复现数据,我们可以使用seed 来控制生成的随机数。设置seed数据来设…...
【Go语言圣经】第四节:复合数据类型
第四章:复合数据类型 本节主要讨论四种类型——数组、slice、map和结构体。 数组和结构体都是有固定内存大小的数据结构。相比之下,slice 和 map 则是动态的数据结构,它们可以根据需要动态增长。 4.1 数组 数组是一个定长的由特定类型元素…...
【Vite + Vue + Ts 项目三个 tsconfig 文件】
Vite Vue Ts 项目三个 tsconfig 文件 为什么 Vite Vue Ts 项目会有三个 tsconfig 文件?首先我们先了解什么是 tsconfig.json ? 为什么 Vite Vue Ts 项目会有三个 tsconfig 文件? 在使用 Vite 创建 vue-ts 模板的项目时,会发现除了 ts…...
论文和代码解读:RF-Inversion 图像/视频编辑技术
Diffusion Models专栏文章汇总:入门与实战 前言:Rectified Flow的反演和DDIM这些不太一样,上一篇博客中介绍了腾讯提出的一种方法《基于Rectified Flow FLUX的图像编辑方法 RF-Solver》,主要就是用泰勒展开和一阶导数近似来分解反演公式。这篇博客介绍谷歌提出的方法RF-Inv…...
完美还是完成?把握好度,辨证看待
完美还是完成? 如果说之前这个答案有争议,那么现在,答案毋庸置疑 ■为什么完美大于完成 ●时间成本: 做事不仅要考虑结果,还要考虑时间和精力,要说十年磨一剑的确质量更好,但是现实没有那么多…...
Many Whelps! Handle It! (10 player) Many Whelps! Handle It! (25 player)
http://db.nfuwow.com/80/?achievement4403 http://db.nfuwow.com/80/?achievement4406 最少扣你50DKP! 第二阶段 当奥妮克希亚升空后,在10秒内引出50只奥妮克希亚雏龙,随后击败奥妮克希亚。 World of Warcraft [CLASSIC][80猎人][Grandel][最少扣你5…...
3.4 Go函数作用域(标识符)
只有精准分析每个标识符的作用域范围,才能编写出优质、健壮的代码,避免逻辑错误的发生。 作用域标识符 简单来说,作用域指的是标识符可以起作用的范围,即其可见范围。将标识符的可见性限制在一定范围内,这个范围就是…...
【React】PureComponent 和 Component 的区别
前言 在 React 中,PureComponent 和 Component 都是用于创建组件的基类,但它们有一个主要的区别:PureComponent 会给类组件默认加一个shouldComponentUpdate周期函数。在此周期函数中,它对props 和 state (新老的属性/状态)会做一…...
MongoDb user自定义 role 添加 action(collStats, EstimateDocumentCount)
使用 mongosh cd mongsh_bin_path mongosh “mongodb://user:passip:port/db”这样就直接进入了对应的db 直接输入: 这样 role “read_only_role" 就获得了3个 action, 分别是 查询,列举集合,集合元数据查询 P.S: 如果没有 …...
fastadmin中require-form.js的data-favisible控制显示隐藏
只要在任意元素上添加data-favisible属性就可以轻松的控制显示隐藏了 其中reportype是php传到前端的一个变量??? <div class"form-group" data-favisible"reportype6"><label class"control-label col-xs-12 col-sm-2">{:__(Ove_…...
Day51:type()函数
在 Python 中,type() 是一个内置函数,用于返回对象的类型。它可以用于检查变量的类型,也可以用于动态创建新的类型。今天,我们将深入了解 type() 函数的使用方法。 1. 使用 type() 获取变量的类型 最常见的使用方式是将一个对象…...
vue 无法 局域网内访问
资料 Vue项目设置可以局域网访问_vue.js_脚本之家 过程 上午,前端vue服务能够在局域网内访问, 下午就不行了,但是后端服务能够正常访问,本机也能正常访问ip:端口号 前端服务 判定不是下面的问题: 同一…...
【llm对话系统】大模型 Llama 源码分析之 LoRA 微调
1. 引言 微调 (Fine-tuning) 是将预训练大模型 (LLM) 应用于下游任务的常用方法。然而,直接微调大模型的所有参数通常需要大量的计算资源和内存。LoRA (Low-Rank Adaptation) 是一种高效的微调方法,它通过引入少量可训练参数,固定预训练模型…...
蓝桥杯刷题DAY2:二维前缀和 一维前缀和 差分数组
闪耀的灯光 📌 题目描述 蓝桥公园是一个适合夜间散步的好地方,公园可以被视为由 n m 个矩形区域构成。每个区域都有一盏灯,初始亮度为 a[i][j]。 小蓝可以选择一个大的矩形区域,并按下开关一次,这将使得该区域内每盏…...
网件r7000刷回原厂固件合集测评
《网件R7000路由器刷回原厂固件详解》 网件R7000是一款备受赞誉的高性能无线路由器,其强大的性能和可定制性吸引了许多高级用户。然而,有时候用户可能会尝试第三方固件以提升功能或优化网络性能,但这也可能导致一些问题,如系统不…...
算法随笔_35: 每日温度
上一篇:算法随笔_34: 最后一个单词的长度-CSDN博客 题目描述如下: 给定一个整数数组 temperatures ,表示每天的温度,返回一个数组 answer ,其中 answer[i] 是指对于第 i 天,下一个更高温度出现在几天后。如果气温在这之后都不会升…...
C++初阶 -- 手撕string类(模拟实现string类)
目录 一、string类的成员变量 二、构造函数 2.1 无参版本 2.2 有参版本 2.3 缺省值版本 三、析构函数 四、拷贝构造函数 五、c_str函数 六、operator重载 七、size函数 八、迭代器iterator 8.1 正常版本 8.2 const版本 九、operator[] 9.1 正常版本 9.2 const版…...
【Unity3D】实现2D角色/怪物死亡消散粒子效果
核心:这是一个Unity粒子系统自带的一种功能,可将粒子生成控制在一个Texture图片网格范围内,并且粒子颜色会自动采样图片的像素点颜色,之后则是粒子编辑出消散效果。 Particle System1物体(爆发式随机速度扩散10000个粒…...
