大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(已更完)
- Spark(已更完)
- Flink(正在更新!)
章节内容
上节我们完成了如下的内容:
- Flink 并行度
- Flink 并行度详解
- Flink 并行度 案例

状态类型
Flink根据是否需要保存中间结果,把计算分为有状态计算和无状态计算。
- 有状态计算:依赖之前或之后的事件
- 无状态计算:独立
根据数据结构不同,Flink定义了多种State,应用于不同的场景。
- ValueState:即类型为T的单值状态,这个状态与对应的Key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过 value() 方法获取状态值
- ListState:即Key上的状态值为一个列表,可以通过add方法往列表中附加值,也可以通过get()方法返回一个Iterable来遍历状态值
- ReducingState:这种状态通过用户传入的ReduceFunction,每次调用add方法添加值的时候,会调用ReduceFunction,最后合并到一个单一的状态值。
- FoldingState:跟ReducingState有点类似,不过它的状态值类型可以与add方法中传入的元素类型不同(这种状态会在未来的Flink版本当中删除)
- MapState:即状态值为一个Map,用户通过put和putAll方法添加元素
State按照是否有Key划分为:
- KeyedState
- OperatorState
案例1 利用State求平均值
实现思路
- 读数据源
- 将数据源根据Key分组
- 按照Key分组策略,对流式数据调用状态化处理:实例化出一个状态实例,随着流式数据的到来更新状态,最后输出结果
编写代码
package icu.wzk;import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class FlinkStateTest01 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Tuple2<Long, Long>> data = env.fromElements(Tuple2.of(1L, 3L),Tuple2.of(1L, 5L),Tuple2.of(1L, 7L),Tuple2.of(1L, 4L),Tuple2.of(1L, 2L));KeyedStream<Tuple2<Long, Long>, Long> keyed = data.keyBy(new KeySelector<Tuple2<Long, Long>, Long>() {@Overridepublic Long getKey(Tuple2<Long, Long> value) throws Exception {return value.f0;}});SingleOutputStreamOperator<Tuple2<Long, Long>> flatMapped = keyed.flatMap(new RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {private transient ValueState<Tuple2<Long, Long>> sum;@Overridepublic void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {Tuple2<Long, Long> currentSum = sum.value();if (currentSum == null) {currentSum = Tuple2.of(0L, 0L);}// 更新currentSum.f0 += 1L;currentSum.f1 += value.f1;System.out.println("currentValue: " + currentSum);// 更新状态值sum.update(currentSum);// 如果 count >= 5 清空状态值 重新计算if (currentSum.f0 >= 5) {out.collect(new Tuple2<>(value.f0, currentSum.f1 / currentSum.f0));sum.clear();}}@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>("average",TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));sum = getRuntimeContext().getState(descriptor);}});flatMapped.print();env.execute("Flink State Test");}
}
运行结果

执行分析


Keyed State
表示和Key相关的一种State, 只能用于KeyedStream类型数据集对应的Function和Operator之上,KeyedState是OperatorState的特例,区别在于KeyedState事先按照Key对数据集进行了区分,每个KeyState仅对应一个Operator和Key的组合。
KeyedState可以通过KeyGroups进行管理,主要用于当算子并行度发生变化时,自动重新分布KeyedState数据。在系统运行过程中,一个Keyed算子实例可能运行一个或者多个KeyGroups的Keys。
Operator State
与 Keyed State 不同的是,Operator State 只和并行的算子实例绑定,和数据元素中的Key无关,每个算子实例中持有所有数据元素中的一部分状态数据。Operator State 支持算子实例并行度发生变化时自动重新分配状态数据。
同时在Flink中KeyedState和OperatorState均具有两种形式,其中一种为托管状态(Managed State)形式,由FlinkRuntime中控制和管理状态数据,并将状态数据转换为内存HashTables或RocksDB的对象存储,然后将这些状态数据通过内部的接口持久话到CheckPoints中,任务异常时可以通过这些状态数据恢复任务。另外一种是原生状态(Row State)形式,由算子自己管理数据结构,当触发CheckPoint中,当从CheckPoint恢复任务时,算子自己再返序列化出状态的数据结构。
DataStreamAPI支持使用ManagedState和RawState两种状态形式,在Flink中推荐用户使用ManagedState管理状态数据,主要原因是ManagedState能够更好地支持状态数据的重平衡以及更加完善的内存管理。
状态描述

State既然是暴露给用户的,那么就需要有一些属性需要指定:
- State名称
- Value Serializer
- State Type Info
在对应的StateBackend中,会去调用对应的create方法获取到stateDescriptor中的值。
Flink通过StateDescriptor来定义一个状态,这是一个抽象类,内部定义了状态名称、类型、序列化器等基础信息,与上面的状态对应,从StateDescriptor派生ValueStateDescriptor、ListStateDescriptor等等
- ValueState getState(ValueStateDescriptor)
- ReducingState getReducingState(ReducingStateDescriptor)
- ListState getListState(ListStateDescriptor)
- FoldingState getFoldingState(FoldingStateDescriptor)
- MapState getMapState(MapStateDescriptot)
相关文章:
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
点一下关注吧!!!非常感谢!!持续更新!!! 目前已经更新到了: Hadoop(已更完)HDFS(已更完)MapReduce(已更完&am…...
C++复习day04
一、函数重载 1.什么是函数重载? 自然语言中,一个词可以有多重含义,人们可以通过上下文来判断该词真实的含义,即该词被重 载了。 比如:以前有一个笑话,国有两个体育项目大家根本不用看,也不用…...
[苍穹外卖]-04菜品管理接口开发
效果预览 新增菜品 需求分析 查看产品原型分析需求, 包括用到哪些接口, 业务的限制规则 业务规则 菜品名称必须是唯一的菜品必须属于某个分类下, 不能单独存在新增菜品时可以根据情况选择菜品的口味每个菜品必须对应一张图片 接口设计 根据类型查询分类接口 文件上传接口 …...
gitlab 启动/关闭/启用开机启动/禁用开机启动
文章目录 启动 gitlab关闭 gitlab查看 gitlab 运行状态启用 gitlab 开机启动禁用 gitlab 开机启动GitlabGit启动 gitlab hxstrive@localhost:~$ sudo gitlab-ctl start ok: run: alertmanager: (pid 65953) 0s ok: run: gitaly: (pid 65965) 0s ok: run: gitlab-exporter: (pi…...
中间件解析漏洞(附环境搭建教程)
⼀:IIS解析漏洞 环境资源: https://download.csdn.net/download/Nai_zui_jiang/89717504 环境安装 windows2003iis6 1.创建新的虚拟机 2.在下⼀步中选择我们的iso⽂件镜像 vm已主动识别到windows2003 3.产品密钥⽹上搜⼀个 密码自己设置一个简单的&…...
matlab实现kaiser窗+时域采样序列(不管原信号拉伸成什么样子)是一样的,变到频谱后再采样就是一样的频域序列。
下图窗2的频谱在周期化的时候应该是2(w-k*pi/T)我直接对2w减得写错了 可见这两个kaiser窗频谱不一样,采样间隔为2T的窗,频谱压缩2倍,且以原采样频率的一半周期化。 但是这两个不同的kaiser窗在频域采样点的值使完全一…...
git为不同的项目设置不同的提交作者
方法1:找到项目的.git文件夹打开 打开config在下面添加自己作者信息 [user]name 作者名email 邮箱方法2:直接在.git文件夹设置作者名(不使用–global参数) git config user.name "xxxxx"如果想要修改之前提交的…...
防爆定位信标与防爆定位基站有什么区别?
新锐科技 https://baijiahao.baidu.com/s?id1804974957959442238&wfrspider&forpc http://www.xinruikc.cn/biaoqian/52.html http://www.xinruikc.cn/xinbiao/...
QT 编译报错:C3861: ‘tr‘ identifier not found
问题: QT 编译报错:C3861: ‘tr’ identifier not found 原因 使用tr的地方所在的类没有继承自 QObject 类 或者在不在某一类中, 解决方案 就直接用类名引用 :QObject::tr( )...
谈谈ES搜索引擎
一 ES的定义 ES 它的全称是 Elasticsearch,是一个建立在全文搜索引擎库Lucene基础上的一个开源搜索和分析引擎。ES 它本身具备分布式存储,检索速度快的特性,所以我们经常用它来实现全文检索功能。目前在 Elastic 官网对 ES 的定义,…...
【MySQL】MySQL基础
目录 什么是数据库主流数据库基本使用MySQL的安装连接服务器服务器、数据库、表关系使用案例数据逻辑存储 MySQL的架构SQL分类什么是存储引擎 什么是数据库 mysql它是数据库服务的客户端mysqld它是数据库服务的服务器端mysql本质:基于C(mysql)…...
Spring中Bean的相关注解
目录 1.Spring IoC&DI 2.关于Bean存储的相关注解(类注解与方法注解) Bean的获取方式 类注解和方法注解的重命名 2.1 类注解 2.1.1 Controller 2.1.2 Service 2.1.3 Repository 2.1.4 Component 2.1.5 Configuration 2.2 方法注解-Bean 2.2.1 定义多个对象 2.2…...
Golang | Leetcode Golang题解之第385题迷你语法分析器
题目: 题解: func deserialize(s string) *NestedInteger {if s[0] ! [ {num, _ : strconv.Atoi(s)ni : &NestedInteger{}ni.SetInteger(num)return ni}stack, num, negative : []*NestedInteger{}, 0, falsefor i, ch : range s {if ch - {negati…...
【Java 优选算法】双指针(上)
欢迎关注个人主页:逸狼 创造不易,可以点点赞吗~ 如有错误,欢迎指出~ 目录 移动零 分析 代码 复写零 分析 代码 快乐数 分析 代码 盛最多水的容器 分析 代码 移动零 题目链接 分析 双指针算法,利用两个指针cur和dest将数组划分为三个区间…...
【自动驾驶】控制算法(八)横向控制Ⅰ | 算法与流程
写在前面: 🌟 欢迎光临 清流君 的博客小天地,这里是我分享技术与心得的温馨角落。📝 个人主页:清流君_CSDN博客,期待与您一同探索 移动机器人 领域的无限可能。 🔍 本文系 清流君 原创之作&…...
Android SSE 单向接收数据
Server-Sent Events(SSE)是一种在客户端和服务器之间实现单向实时通信的技术。它允许服务器向客户端推送数据,但客户端无法使用 SSE 向服务器发送数据。这使得其适用于需要持续接收服务器数据的应用场景(如实时通知、股票行情、社…...
排序《数据结构》
排序 《数据结构》 1.排序的概念及其运用1.1 排序的概念1.2 排序运用1.3常见的排序算法1.4 排序动图演示 2.常见排序算法的实现2.1 插入排序2.2希尔排序2.3 快排左边做keyi,右边先走,可以保证相遇位置比keyi小 2.4 快速排序优化快排(非递归&a…...
flutter 提示框2 Dialog
flutter 提示框 写在点击的方法体中 child里放自己喜欢的 showDialog( context: context, builder: (BuildContext context) { final Dialog alertDialog Dialog( backgroundColor: Colors.transparent,shadowColor:Colors.transparent,child: Container(height: mediawi…...
如何选择SDR无线图传方案
在开源软件定义无线电(SDR)领域,有几个项目提供了无线图传的解决方案。以下是一些开源SDR无线图传方案: 1. **OpenHD**:这是一个远程高清数字图像传输的开源解决方案,它使用SDR技术来实现高清视频的无线传…...
关于Python类中方法__init__()解析
# import numpy as npclass Car():def __init__(self, maker, name, year):self.maker makerself.name nameself.year yearprint(self.searchMakrt() "123")def searchMakrt(self):print("汽车制作厂家为: " self.maker)# passreturn &quo…...
R语言AI模型部署方案:精准离线运行详解
R语言AI模型部署方案:精准离线运行详解 一、项目概述 本文将构建一个完整的R语言AI部署解决方案,实现鸢尾花分类模型的训练、保存、离线部署和预测功能。核心特点: 100%离线运行能力自包含环境依赖生产级错误处理跨平台兼容性模型版本管理# 文件结构说明 Iris_AI_Deployme…...
Cinnamon修改面板小工具图标
Cinnamon开始菜单-CSDN博客 设置模块都是做好的,比GNOME简单得多! 在 applet.js 里增加 const Settings imports.ui.settings;this.settings new Settings.AppletSettings(this, HTYMenusonichy, instance_id); this.settings.bind(menu-icon, menu…...
Keil 中设置 STM32 Flash 和 RAM 地址详解
文章目录 Keil 中设置 STM32 Flash 和 RAM 地址详解一、Flash 和 RAM 配置界面(Target 选项卡)1. IROM1(用于配置 Flash)2. IRAM1(用于配置 RAM)二、链接器设置界面(Linker 选项卡)1. 勾选“Use Memory Layout from Target Dialog”2. 查看链接器参数(如果没有勾选上面…...
【C语言练习】080. 使用C语言实现简单的数据库操作
080. 使用C语言实现简单的数据库操作 080. 使用C语言实现简单的数据库操作使用原生APIODBC接口第三方库ORM框架文件模拟1. 安装SQLite2. 示例代码:使用SQLite创建数据库、表和插入数据3. 编译和运行4. 示例运行输出:5. 注意事项6. 总结080. 使用C语言实现简单的数据库操作 在…...
QT: `long long` 类型转换为 `QString` 2025.6.5
在 Qt 中,将 long long 类型转换为 QString 可以通过以下两种常用方法实现: 方法 1:使用 QString::number() 直接调用 QString 的静态方法 number(),将数值转换为字符串: long long value 1234567890123456789LL; …...
Maven 概述、安装、配置、仓库、私服详解
目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...
Device Mapper 机制
Device Mapper 机制详解 Device Mapper(简称 DM)是 Linux 内核中的一套通用块设备映射框架,为 LVM、加密磁盘、RAID 等提供底层支持。本文将详细介绍 Device Mapper 的原理、实现、内核配置、常用工具、操作测试流程,并配以详细的…...
OPENCV形态学基础之二腐蚀
一.腐蚀的原理 (图1) 数学表达式:dst(x,y) erode(src(x,y)) min(x,y)src(xx,yy) 腐蚀也是图像形态学的基本功能之一,腐蚀跟膨胀属于反向操作,膨胀是把图像图像变大,而腐蚀就是把图像变小。腐蚀后的图像变小变暗淡。 腐蚀…...
三分算法与DeepSeek辅助证明是单峰函数
前置 单峰函数有唯一的最大值,最大值左侧的数值严格单调递增,最大值右侧的数值严格单调递减。 单谷函数有唯一的最小值,最小值左侧的数值严格单调递减,最小值右侧的数值严格单调递增。 三分的本质 三分和二分一样都是通过不断缩…...
rknn toolkit2搭建和推理
安装Miniconda Miniconda - Anaconda Miniconda 选择一个 新的 版本 ,不用和RKNN的python版本保持一致 使用 ./xxx.sh进行安装 下面配置一下载源 # 清华大学源(最常用) conda config --add channels https://mirrors.tuna.tsinghua.edu.cn…...
