大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(已更完)
- Spark(已更完)
- Flink(正在更新!)
章节内容
上节我们完成了如下的内容:
- Flink 并行度
- Flink 并行度详解
- Flink 并行度 案例
状态类型
Flink根据是否需要保存中间结果,把计算分为有状态计算和无状态计算。
- 有状态计算:依赖之前或之后的事件
- 无状态计算:独立
根据数据结构不同,Flink定义了多种State,应用于不同的场景。
- ValueState:即类型为T的单值状态,这个状态与对应的Key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过 value() 方法获取状态值
- ListState:即Key上的状态值为一个列表,可以通过add方法往列表中附加值,也可以通过get()方法返回一个Iterable来遍历状态值
- ReducingState:这种状态通过用户传入的ReduceFunction,每次调用add方法添加值的时候,会调用ReduceFunction,最后合并到一个单一的状态值。
- FoldingState:跟ReducingState有点类似,不过它的状态值类型可以与add方法中传入的元素类型不同(这种状态会在未来的Flink版本当中删除)
- MapState:即状态值为一个Map,用户通过put和putAll方法添加元素
State按照是否有Key划分为:
- KeyedState
- OperatorState
案例1 利用State求平均值
实现思路
- 读数据源
- 将数据源根据Key分组
- 按照Key分组策略,对流式数据调用状态化处理:实例化出一个状态实例,随着流式数据的到来更新状态,最后输出结果
编写代码
package icu.wzk;import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class FlinkStateTest01 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Tuple2<Long, Long>> data = env.fromElements(Tuple2.of(1L, 3L),Tuple2.of(1L, 5L),Tuple2.of(1L, 7L),Tuple2.of(1L, 4L),Tuple2.of(1L, 2L));KeyedStream<Tuple2<Long, Long>, Long> keyed = data.keyBy(new KeySelector<Tuple2<Long, Long>, Long>() {@Overridepublic Long getKey(Tuple2<Long, Long> value) throws Exception {return value.f0;}});SingleOutputStreamOperator<Tuple2<Long, Long>> flatMapped = keyed.flatMap(new RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {private transient ValueState<Tuple2<Long, Long>> sum;@Overridepublic void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {Tuple2<Long, Long> currentSum = sum.value();if (currentSum == null) {currentSum = Tuple2.of(0L, 0L);}// 更新currentSum.f0 += 1L;currentSum.f1 += value.f1;System.out.println("currentValue: " + currentSum);// 更新状态值sum.update(currentSum);// 如果 count >= 5 清空状态值 重新计算if (currentSum.f0 >= 5) {out.collect(new Tuple2<>(value.f0, currentSum.f1 / currentSum.f0));sum.clear();}}@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>("average",TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));sum = getRuntimeContext().getState(descriptor);}});flatMapped.print();env.execute("Flink State Test");}
}
运行结果
执行分析
Keyed State
表示和Key相关的一种State, 只能用于KeyedStream类型数据集对应的Function和Operator之上,KeyedState是OperatorState的特例,区别在于KeyedState事先按照Key对数据集进行了区分,每个KeyState仅对应一个Operator和Key的组合。
KeyedState可以通过KeyGroups进行管理,主要用于当算子并行度发生变化时,自动重新分布KeyedState数据。在系统运行过程中,一个Keyed算子实例可能运行一个或者多个KeyGroups的Keys。
Operator State
与 Keyed State 不同的是,Operator State 只和并行的算子实例绑定,和数据元素中的Key无关,每个算子实例中持有所有数据元素中的一部分状态数据。Operator State 支持算子实例并行度发生变化时自动重新分配状态数据。
同时在Flink中KeyedState和OperatorState均具有两种形式,其中一种为托管状态(Managed State)形式,由FlinkRuntime中控制和管理状态数据,并将状态数据转换为内存HashTables或RocksDB的对象存储,然后将这些状态数据通过内部的接口持久话到CheckPoints中,任务异常时可以通过这些状态数据恢复任务。另外一种是原生状态(Row State)形式,由算子自己管理数据结构,当触发CheckPoint中,当从CheckPoint恢复任务时,算子自己再返序列化出状态的数据结构。
DataStreamAPI支持使用ManagedState和RawState两种状态形式,在Flink中推荐用户使用ManagedState管理状态数据,主要原因是ManagedState能够更好地支持状态数据的重平衡以及更加完善的内存管理。
状态描述
State既然是暴露给用户的,那么就需要有一些属性需要指定:
- State名称
- Value Serializer
- State Type Info
在对应的StateBackend中,会去调用对应的create方法获取到stateDescriptor中的值。
Flink通过StateDescriptor来定义一个状态,这是一个抽象类,内部定义了状态名称、类型、序列化器等基础信息,与上面的状态对应,从StateDescriptor派生ValueStateDescriptor、ListStateDescriptor等等
- ValueState getState(ValueStateDescriptor)
- ReducingState getReducingState(ReducingStateDescriptor)
- ListState getListState(ListStateDescriptor)
- FoldingState getFoldingState(FoldingStateDescriptor)
- MapState getMapState(MapStateDescriptot)
相关文章:

大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
点一下关注吧!!!非常感谢!!持续更新!!! 目前已经更新到了: Hadoop(已更完)HDFS(已更完)MapReduce(已更完&am…...
C++复习day04
一、函数重载 1.什么是函数重载? 自然语言中,一个词可以有多重含义,人们可以通过上下文来判断该词真实的含义,即该词被重 载了。 比如:以前有一个笑话,国有两个体育项目大家根本不用看,也不用…...

[苍穹外卖]-04菜品管理接口开发
效果预览 新增菜品 需求分析 查看产品原型分析需求, 包括用到哪些接口, 业务的限制规则 业务规则 菜品名称必须是唯一的菜品必须属于某个分类下, 不能单独存在新增菜品时可以根据情况选择菜品的口味每个菜品必须对应一张图片 接口设计 根据类型查询分类接口 文件上传接口 …...
gitlab 启动/关闭/启用开机启动/禁用开机启动
文章目录 启动 gitlab关闭 gitlab查看 gitlab 运行状态启用 gitlab 开机启动禁用 gitlab 开机启动GitlabGit启动 gitlab hxstrive@localhost:~$ sudo gitlab-ctl start ok: run: alertmanager: (pid 65953) 0s ok: run: gitaly: (pid 65965) 0s ok: run: gitlab-exporter: (pi…...

中间件解析漏洞(附环境搭建教程)
⼀:IIS解析漏洞 环境资源: https://download.csdn.net/download/Nai_zui_jiang/89717504 环境安装 windows2003iis6 1.创建新的虚拟机 2.在下⼀步中选择我们的iso⽂件镜像 vm已主动识别到windows2003 3.产品密钥⽹上搜⼀个 密码自己设置一个简单的&…...

matlab实现kaiser窗+时域采样序列(不管原信号拉伸成什么样子)是一样的,变到频谱后再采样就是一样的频域序列。
下图窗2的频谱在周期化的时候应该是2(w-k*pi/T)我直接对2w减得写错了 可见这两个kaiser窗频谱不一样,采样间隔为2T的窗,频谱压缩2倍,且以原采样频率的一半周期化。 但是这两个不同的kaiser窗在频域采样点的值使完全一…...

git为不同的项目设置不同的提交作者
方法1:找到项目的.git文件夹打开 打开config在下面添加自己作者信息 [user]name 作者名email 邮箱方法2:直接在.git文件夹设置作者名(不使用–global参数) git config user.name "xxxxx"如果想要修改之前提交的…...
防爆定位信标与防爆定位基站有什么区别?
新锐科技 https://baijiahao.baidu.com/s?id1804974957959442238&wfrspider&forpc http://www.xinruikc.cn/biaoqian/52.html http://www.xinruikc.cn/xinbiao/...

QT 编译报错:C3861: ‘tr‘ identifier not found
问题: QT 编译报错:C3861: ‘tr’ identifier not found 原因 使用tr的地方所在的类没有继承自 QObject 类 或者在不在某一类中, 解决方案 就直接用类名引用 :QObject::tr( )...

谈谈ES搜索引擎
一 ES的定义 ES 它的全称是 Elasticsearch,是一个建立在全文搜索引擎库Lucene基础上的一个开源搜索和分析引擎。ES 它本身具备分布式存储,检索速度快的特性,所以我们经常用它来实现全文检索功能。目前在 Elastic 官网对 ES 的定义,…...

【MySQL】MySQL基础
目录 什么是数据库主流数据库基本使用MySQL的安装连接服务器服务器、数据库、表关系使用案例数据逻辑存储 MySQL的架构SQL分类什么是存储引擎 什么是数据库 mysql它是数据库服务的客户端mysqld它是数据库服务的服务器端mysql本质:基于C(mysql)…...
Spring中Bean的相关注解
目录 1.Spring IoC&DI 2.关于Bean存储的相关注解(类注解与方法注解) Bean的获取方式 类注解和方法注解的重命名 2.1 类注解 2.1.1 Controller 2.1.2 Service 2.1.3 Repository 2.1.4 Component 2.1.5 Configuration 2.2 方法注解-Bean 2.2.1 定义多个对象 2.2…...

Golang | Leetcode Golang题解之第385题迷你语法分析器
题目: 题解: func deserialize(s string) *NestedInteger {if s[0] ! [ {num, _ : strconv.Atoi(s)ni : &NestedInteger{}ni.SetInteger(num)return ni}stack, num, negative : []*NestedInteger{}, 0, falsefor i, ch : range s {if ch - {negati…...

【Java 优选算法】双指针(上)
欢迎关注个人主页:逸狼 创造不易,可以点点赞吗~ 如有错误,欢迎指出~ 目录 移动零 分析 代码 复写零 分析 代码 快乐数 分析 代码 盛最多水的容器 分析 代码 移动零 题目链接 分析 双指针算法,利用两个指针cur和dest将数组划分为三个区间…...

【自动驾驶】控制算法(八)横向控制Ⅰ | 算法与流程
写在前面: 🌟 欢迎光临 清流君 的博客小天地,这里是我分享技术与心得的温馨角落。📝 个人主页:清流君_CSDN博客,期待与您一同探索 移动机器人 领域的无限可能。 🔍 本文系 清流君 原创之作&…...
Android SSE 单向接收数据
Server-Sent Events(SSE)是一种在客户端和服务器之间实现单向实时通信的技术。它允许服务器向客户端推送数据,但客户端无法使用 SSE 向服务器发送数据。这使得其适用于需要持续接收服务器数据的应用场景(如实时通知、股票行情、社…...

排序《数据结构》
排序 《数据结构》 1.排序的概念及其运用1.1 排序的概念1.2 排序运用1.3常见的排序算法1.4 排序动图演示 2.常见排序算法的实现2.1 插入排序2.2希尔排序2.3 快排左边做keyi,右边先走,可以保证相遇位置比keyi小 2.4 快速排序优化快排(非递归&a…...
flutter 提示框2 Dialog
flutter 提示框 写在点击的方法体中 child里放自己喜欢的 showDialog( context: context, builder: (BuildContext context) { final Dialog alertDialog Dialog( backgroundColor: Colors.transparent,shadowColor:Colors.transparent,child: Container(height: mediawi…...
如何选择SDR无线图传方案
在开源软件定义无线电(SDR)领域,有几个项目提供了无线图传的解决方案。以下是一些开源SDR无线图传方案: 1. **OpenHD**:这是一个远程高清数字图像传输的开源解决方案,它使用SDR技术来实现高清视频的无线传…...
关于Python类中方法__init__()解析
# import numpy as npclass Car():def __init__(self, maker, name, year):self.maker makerself.name nameself.year yearprint(self.searchMakrt() "123")def searchMakrt(self):print("汽车制作厂家为: " self.maker)# passreturn &quo…...
内存分配函数malloc kmalloc vmalloc
内存分配函数malloc kmalloc vmalloc malloc实现步骤: 1)请求大小调整:首先,malloc 需要调整用户请求的大小,以适应内部数据结构(例如,可能需要存储额外的元数据)。通常,这包括对齐调整,确保分配的内存地址满足特定硬件要求(如对齐到8字节或16字节边界)。 2)空闲…...

视频字幕质量评估的大规模细粒度基准
大家读完觉得有帮助记得关注和点赞!!! 摘要 视频字幕在文本到视频生成任务中起着至关重要的作用,因为它们的质量直接影响所生成视频的语义连贯性和视觉保真度。尽管大型视觉-语言模型(VLMs)在字幕生成方面…...
Spring Boot面试题精选汇总
🤟致敬读者 🟩感谢阅读🟦笑口常开🟪生日快乐⬛早点睡觉 📘博主相关 🟧博主信息🟨博客首页🟫专栏推荐🟥活动信息 文章目录 Spring Boot面试题精选汇总⚙️ **一、核心概…...
WEB3全栈开发——面试专业技能点P2智能合约开发(Solidity)
一、Solidity合约开发 下面是 Solidity 合约开发 的概念、代码示例及讲解,适合用作学习或写简历项目背景说明。 🧠 一、概念简介:Solidity 合约开发 Solidity 是一种专门为 以太坊(Ethereum)平台编写智能合约的高级编…...

uniapp手机号一键登录保姆级教程(包含前端和后端)
目录 前置条件创建uniapp项目并关联uniClound云空间开启一键登录模块并开通一键登录服务编写云函数并上传部署获取手机号流程(第一种) 前端直接调用云函数获取手机号(第三种)后台调用云函数获取手机号 错误码常见问题 前置条件 手机安装有sim卡手机开启…...
JavaScript 数据类型详解
JavaScript 数据类型详解 JavaScript 数据类型分为 原始类型(Primitive) 和 对象类型(Object) 两大类,共 8 种(ES11): 一、原始类型(7种) 1. undefined 定…...
Java求职者面试指南:Spring、Spring Boot、Spring MVC与MyBatis技术解析
Java求职者面试指南:Spring、Spring Boot、Spring MVC与MyBatis技术解析 一、第一轮基础概念问题 1. Spring框架的核心容器是什么?它的作用是什么? Spring框架的核心容器是IoC(控制反转)容器。它的主要作用是管理对…...

热烈祝贺埃文科技正式加入可信数据空间发展联盟
2025年4月29日,在福州举办的第八届数字中国建设峰会“可信数据空间分论坛”上,可信数据空间发展联盟正式宣告成立。国家数据局党组书记、局长刘烈宏出席并致辞,强调该联盟是推进全国一体化数据市场建设的关键抓手。 郑州埃文科技有限公司&am…...

C++--string的模拟实现
一,引言 string的模拟实现是只对string对象中给的主要功能经行模拟实现,其目的是加强对string的底层了解,以便于在以后的学习或者工作中更加熟练的使用string。本文中的代码仅供参考并不唯一。 二,默认成员函数 string主要有三个成员变量,…...

(12)-Fiddler抓包-Fiddler设置IOS手机抓包
1.简介 Fiddler不但能截获各种浏览器发出的 HTTP 请求,也可以截获各种智能手机发出的HTTP/ HTTPS 请求。 Fiddler 能捕获Android 和 Windows Phone 等设备发出的 HTTP/HTTPS 请求。同理也可以截获iOS设备发出的请求,比如 iPhone、iPad 和 MacBook 等苹…...