大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(已更完)
- Spark(已更完)
- Flink(正在更新!)
章节内容
上节我们完成了如下的内容:
- Flink 并行度
- Flink 并行度详解
- Flink 并行度 案例

状态类型
Flink根据是否需要保存中间结果,把计算分为有状态计算和无状态计算。
- 有状态计算:依赖之前或之后的事件
- 无状态计算:独立
根据数据结构不同,Flink定义了多种State,应用于不同的场景。
- ValueState:即类型为T的单值状态,这个状态与对应的Key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过 value() 方法获取状态值
- ListState:即Key上的状态值为一个列表,可以通过add方法往列表中附加值,也可以通过get()方法返回一个Iterable来遍历状态值
- ReducingState:这种状态通过用户传入的ReduceFunction,每次调用add方法添加值的时候,会调用ReduceFunction,最后合并到一个单一的状态值。
- FoldingState:跟ReducingState有点类似,不过它的状态值类型可以与add方法中传入的元素类型不同(这种状态会在未来的Flink版本当中删除)
- MapState:即状态值为一个Map,用户通过put和putAll方法添加元素
State按照是否有Key划分为:
- KeyedState
- OperatorState
案例1 利用State求平均值
实现思路
- 读数据源
- 将数据源根据Key分组
- 按照Key分组策略,对流式数据调用状态化处理:实例化出一个状态实例,随着流式数据的到来更新状态,最后输出结果
编写代码
package icu.wzk;import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class FlinkStateTest01 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Tuple2<Long, Long>> data = env.fromElements(Tuple2.of(1L, 3L),Tuple2.of(1L, 5L),Tuple2.of(1L, 7L),Tuple2.of(1L, 4L),Tuple2.of(1L, 2L));KeyedStream<Tuple2<Long, Long>, Long> keyed = data.keyBy(new KeySelector<Tuple2<Long, Long>, Long>() {@Overridepublic Long getKey(Tuple2<Long, Long> value) throws Exception {return value.f0;}});SingleOutputStreamOperator<Tuple2<Long, Long>> flatMapped = keyed.flatMap(new RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {private transient ValueState<Tuple2<Long, Long>> sum;@Overridepublic void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {Tuple2<Long, Long> currentSum = sum.value();if (currentSum == null) {currentSum = Tuple2.of(0L, 0L);}// 更新currentSum.f0 += 1L;currentSum.f1 += value.f1;System.out.println("currentValue: " + currentSum);// 更新状态值sum.update(currentSum);// 如果 count >= 5 清空状态值 重新计算if (currentSum.f0 >= 5) {out.collect(new Tuple2<>(value.f0, currentSum.f1 / currentSum.f0));sum.clear();}}@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>("average",TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));sum = getRuntimeContext().getState(descriptor);}});flatMapped.print();env.execute("Flink State Test");}
}
运行结果

执行分析


Keyed State
表示和Key相关的一种State, 只能用于KeyedStream类型数据集对应的Function和Operator之上,KeyedState是OperatorState的特例,区别在于KeyedState事先按照Key对数据集进行了区分,每个KeyState仅对应一个Operator和Key的组合。
KeyedState可以通过KeyGroups进行管理,主要用于当算子并行度发生变化时,自动重新分布KeyedState数据。在系统运行过程中,一个Keyed算子实例可能运行一个或者多个KeyGroups的Keys。
Operator State
与 Keyed State 不同的是,Operator State 只和并行的算子实例绑定,和数据元素中的Key无关,每个算子实例中持有所有数据元素中的一部分状态数据。Operator State 支持算子实例并行度发生变化时自动重新分配状态数据。
同时在Flink中KeyedState和OperatorState均具有两种形式,其中一种为托管状态(Managed State)形式,由FlinkRuntime中控制和管理状态数据,并将状态数据转换为内存HashTables或RocksDB的对象存储,然后将这些状态数据通过内部的接口持久话到CheckPoints中,任务异常时可以通过这些状态数据恢复任务。另外一种是原生状态(Row State)形式,由算子自己管理数据结构,当触发CheckPoint中,当从CheckPoint恢复任务时,算子自己再返序列化出状态的数据结构。
DataStreamAPI支持使用ManagedState和RawState两种状态形式,在Flink中推荐用户使用ManagedState管理状态数据,主要原因是ManagedState能够更好地支持状态数据的重平衡以及更加完善的内存管理。
状态描述

State既然是暴露给用户的,那么就需要有一些属性需要指定:
- State名称
- Value Serializer
- State Type Info
在对应的StateBackend中,会去调用对应的create方法获取到stateDescriptor中的值。
Flink通过StateDescriptor来定义一个状态,这是一个抽象类,内部定义了状态名称、类型、序列化器等基础信息,与上面的状态对应,从StateDescriptor派生ValueStateDescriptor、ListStateDescriptor等等
- ValueState getState(ValueStateDescriptor)
- ReducingState getReducingState(ReducingStateDescriptor)
- ListState getListState(ListStateDescriptor)
- FoldingState getFoldingState(FoldingStateDescriptor)
- MapState getMapState(MapStateDescriptot)
相关文章:
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
点一下关注吧!!!非常感谢!!持续更新!!! 目前已经更新到了: Hadoop(已更完)HDFS(已更完)MapReduce(已更完&am…...
C++复习day04
一、函数重载 1.什么是函数重载? 自然语言中,一个词可以有多重含义,人们可以通过上下文来判断该词真实的含义,即该词被重 载了。 比如:以前有一个笑话,国有两个体育项目大家根本不用看,也不用…...
[苍穹外卖]-04菜品管理接口开发
效果预览 新增菜品 需求分析 查看产品原型分析需求, 包括用到哪些接口, 业务的限制规则 业务规则 菜品名称必须是唯一的菜品必须属于某个分类下, 不能单独存在新增菜品时可以根据情况选择菜品的口味每个菜品必须对应一张图片 接口设计 根据类型查询分类接口 文件上传接口 …...
gitlab 启动/关闭/启用开机启动/禁用开机启动
文章目录 启动 gitlab关闭 gitlab查看 gitlab 运行状态启用 gitlab 开机启动禁用 gitlab 开机启动GitlabGit启动 gitlab hxstrive@localhost:~$ sudo gitlab-ctl start ok: run: alertmanager: (pid 65953) 0s ok: run: gitaly: (pid 65965) 0s ok: run: gitlab-exporter: (pi…...
中间件解析漏洞(附环境搭建教程)
⼀:IIS解析漏洞 环境资源: https://download.csdn.net/download/Nai_zui_jiang/89717504 环境安装 windows2003iis6 1.创建新的虚拟机 2.在下⼀步中选择我们的iso⽂件镜像 vm已主动识别到windows2003 3.产品密钥⽹上搜⼀个 密码自己设置一个简单的&…...
matlab实现kaiser窗+时域采样序列(不管原信号拉伸成什么样子)是一样的,变到频谱后再采样就是一样的频域序列。
下图窗2的频谱在周期化的时候应该是2(w-k*pi/T)我直接对2w减得写错了 可见这两个kaiser窗频谱不一样,采样间隔为2T的窗,频谱压缩2倍,且以原采样频率的一半周期化。 但是这两个不同的kaiser窗在频域采样点的值使完全一…...
git为不同的项目设置不同的提交作者
方法1:找到项目的.git文件夹打开 打开config在下面添加自己作者信息 [user]name 作者名email 邮箱方法2:直接在.git文件夹设置作者名(不使用–global参数) git config user.name "xxxxx"如果想要修改之前提交的…...
防爆定位信标与防爆定位基站有什么区别?
新锐科技 https://baijiahao.baidu.com/s?id1804974957959442238&wfrspider&forpc http://www.xinruikc.cn/biaoqian/52.html http://www.xinruikc.cn/xinbiao/...
QT 编译报错:C3861: ‘tr‘ identifier not found
问题: QT 编译报错:C3861: ‘tr’ identifier not found 原因 使用tr的地方所在的类没有继承自 QObject 类 或者在不在某一类中, 解决方案 就直接用类名引用 :QObject::tr( )...
谈谈ES搜索引擎
一 ES的定义 ES 它的全称是 Elasticsearch,是一个建立在全文搜索引擎库Lucene基础上的一个开源搜索和分析引擎。ES 它本身具备分布式存储,检索速度快的特性,所以我们经常用它来实现全文检索功能。目前在 Elastic 官网对 ES 的定义,…...
【MySQL】MySQL基础
目录 什么是数据库主流数据库基本使用MySQL的安装连接服务器服务器、数据库、表关系使用案例数据逻辑存储 MySQL的架构SQL分类什么是存储引擎 什么是数据库 mysql它是数据库服务的客户端mysqld它是数据库服务的服务器端mysql本质:基于C(mysql)…...
Spring中Bean的相关注解
目录 1.Spring IoC&DI 2.关于Bean存储的相关注解(类注解与方法注解) Bean的获取方式 类注解和方法注解的重命名 2.1 类注解 2.1.1 Controller 2.1.2 Service 2.1.3 Repository 2.1.4 Component 2.1.5 Configuration 2.2 方法注解-Bean 2.2.1 定义多个对象 2.2…...
Golang | Leetcode Golang题解之第385题迷你语法分析器
题目: 题解: func deserialize(s string) *NestedInteger {if s[0] ! [ {num, _ : strconv.Atoi(s)ni : &NestedInteger{}ni.SetInteger(num)return ni}stack, num, negative : []*NestedInteger{}, 0, falsefor i, ch : range s {if ch - {negati…...
【Java 优选算法】双指针(上)
欢迎关注个人主页:逸狼 创造不易,可以点点赞吗~ 如有错误,欢迎指出~ 目录 移动零 分析 代码 复写零 分析 代码 快乐数 分析 代码 盛最多水的容器 分析 代码 移动零 题目链接 分析 双指针算法,利用两个指针cur和dest将数组划分为三个区间…...
【自动驾驶】控制算法(八)横向控制Ⅰ | 算法与流程
写在前面: 🌟 欢迎光临 清流君 的博客小天地,这里是我分享技术与心得的温馨角落。📝 个人主页:清流君_CSDN博客,期待与您一同探索 移动机器人 领域的无限可能。 🔍 本文系 清流君 原创之作&…...
Android SSE 单向接收数据
Server-Sent Events(SSE)是一种在客户端和服务器之间实现单向实时通信的技术。它允许服务器向客户端推送数据,但客户端无法使用 SSE 向服务器发送数据。这使得其适用于需要持续接收服务器数据的应用场景(如实时通知、股票行情、社…...
排序《数据结构》
排序 《数据结构》 1.排序的概念及其运用1.1 排序的概念1.2 排序运用1.3常见的排序算法1.4 排序动图演示 2.常见排序算法的实现2.1 插入排序2.2希尔排序2.3 快排左边做keyi,右边先走,可以保证相遇位置比keyi小 2.4 快速排序优化快排(非递归&a…...
flutter 提示框2 Dialog
flutter 提示框 写在点击的方法体中 child里放自己喜欢的 showDialog( context: context, builder: (BuildContext context) { final Dialog alertDialog Dialog( backgroundColor: Colors.transparent,shadowColor:Colors.transparent,child: Container(height: mediawi…...
如何选择SDR无线图传方案
在开源软件定义无线电(SDR)领域,有几个项目提供了无线图传的解决方案。以下是一些开源SDR无线图传方案: 1. **OpenHD**:这是一个远程高清数字图像传输的开源解决方案,它使用SDR技术来实现高清视频的无线传…...
关于Python类中方法__init__()解析
# import numpy as npclass Car():def __init__(self, maker, name, year):self.maker makerself.name nameself.year yearprint(self.searchMakrt() "123")def searchMakrt(self):print("汽车制作厂家为: " self.maker)# passreturn &quo…...
ROS2开发避坑:用CycloneDDS配置文件解决本地回环通信中断问题(附完整XML)
ROS2通信稳定性实战:CycloneDDS深度配置指南 当你在机器人开发过程中遭遇节点间通信时断时续的问题,那种感觉就像在暴雨天试图用对讲机协调团队——关键指令总在最重要时刻丢失。本文将揭示如何通过CycloneDDS的精细配置,在硬件网络不稳定的…...
通义千问1.5-1.8B-Chat-GPTQ-Int4场景应用:网络安全威胁情报的智能分析与报告生成
通义千问1.5-1.8B-Chat-GPTQ-Int4场景应用:网络安全威胁情报的智能分析与报告生成 1. 引言:当安全分析师遇上信息洪流 想象一下,你是一名网络安全分析师。凌晨三点,刺耳的告警声把你从睡梦中惊醒。屏幕上,来自防火墙…...
利用快马平台快速构建技能评估系统原型:以skill-vetter为例
利用快马平台快速构建技能评估系统原型:以skill-vetter为例 最近在做一个前端开发技能评估系统,需要快速验证产品原型。传统开发流程从搭建环境到功能实现至少需要1-2周,但通过InsCode(快马)平台的AI辅助和现成模板,我只用了3天就…...
【仅限JDK 25 Early Access用户】:隐藏API `LinkerOptions` 强制启用向量化调用的2行代码,实测吞吐提升2.8倍
第一章:Java 25 外部函数接口优化案例Java 25 正式将外部函数与内存 API(Foreign Function & Memory API)从预览特性转为正式特性,显著提升了 JVM 与本地代码交互的安全性、性能与开发体验。相比早期 JNI 方案,FFM…...
COMSOL 6.1 激光粉末床熔融气孔缺陷演化仿真:开启微观世界的探索之旅
COMSOL 6.1 激光粉末床熔融气孔缺陷演化仿真案例模型 本案例选用层流和流体传热模块,采用水平集法,考虑材料的热物性以及激光加工过程中的马兰戈尼效应、熔融金属表面张力、反冲压力、相变潜热、热对流和热辐射,建立含气孔缺陷的二维数值仿真…...
从内核事件到业务洞察:手把手教你用sysdig + Lua脚本定制专属监控看板
从内核事件到业务洞察:用sysdig与Lua脚本构建定制化监控体系 当你的微服务集群每天处理数十亿次API调用时,标准监控指标如CPU使用率或内存消耗早已无法满足需求。真正的挑战在于:当某个关键业务接口的99线突然飙升时,如何快速定位…...
Android tinyalsa深度解析之pcm_params_get_periods_min调用流程与实战(一百七十三)
简介: CSDN博客专家、《Android系统多媒体进阶实战》作者 博主新书推荐:《Android系统多媒体进阶实战》🚀 Android Audio工程师专栏地址: Audio工程师进阶系列【原创干货持续更新中……】🚀 Android多媒体专栏地址&a…...
[特殊字符] Nano-Banana部署教程:Ubuntu/CentOS环境下的镜像拉取与启动
Nano-Banana部署教程:Ubuntu/CentOS环境下的镜像拉取与启动 1. 项目简介 Nano-Banana是一款专门为产品拆解和平铺展示风格设计的轻量级文本生成图像系统。这个项目的核心在于深度融合了Nano-Banana专属的Turbo LoRA微调权重,专门针对Knolling平铺、爆炸…...
LeetCode 11. Container With Most Water 题解
LeetCode 11. Container With Most Water 题解 题目描述 给你 n 个非负整数 a1,a2,...,an,每个数代表坐标中的一个点 (i, ai) 。在坐标内画 n 条垂直线,垂直线 i 的两个端点分别为 (i, ai) 和 (i, 0) 。找出其中的两条…...
新手福音:用快马平台将vmware官网概念转化为可交互的虚拟机演示代码
作为一名刚接触虚拟化技术的新手,我最近在VMware官网上看到了关于虚拟机的基础概念介绍。虽然理论知识很全面,但总觉得少了点动手实践的环节。直到发现了InsCode(快马)平台,它让我能够把抽象的概念快速转化为可运行的代码,这种学习…...
