Flink状态State | 大数据技术
⭐简单说两句⭐
✨ 正在努力的小叮当~
💖 超级爱分享,分享各种有趣干货!
👩💻 提供:模拟面试 | 简历诊断 | 独家简历模板
🌈 感谢关注,关注了你就是我的超级粉丝啦!
🔒 以下内容仅对你可见~作者:小叮当撩代码,CSDN后端领域新星创作者 |阿里云专家博主
CSDN个人主页:小叮当撩代码
🔎GZH:
哆啦A梦撩代码🎉欢迎关注🔎点赞👍收藏⭐️留言📝
Flink状态

Flink中的State

State概念
在 Flink 中,状态是流处理程序中非常重要的一部分,它允许你保存和访问数据,以实现复杂的计算逻辑。
可以简单理解为: 历史计算结果
Flink中的算子任务的State分类通常分为两类
1️⃣ 有状态
有状态需要考虑历史的数据,相同的输入可能会得到不同的输出
比如:sum/reduce/maxBy, 对单词按照key分组聚合,进来一个(hello,1),得到(hello,1), 再进来一个(hello,1), 得到的结果为(hello,2)
2️⃣ 无状态
无状态简单说就是不需要考虑历史的数据,相同的输入得到相同的结果
比如map、filter、flatmap算子都属于无状态,不需要依赖其他数据
✅ Flink默认已经支持了无状态和有状态计算!
状态分类
Flink中有两种基本类型的状态:托管状态(Managed State)和原生状态(Raw State)
Managed State是由Flink管理的,Flink帮忙存储、恢复和优化
Raw State是开发者自己管理的,需要自己序列化
❇️通常情况下,我们采用托管状态来实现我们的需求!!!
托管状态
Flink 中,一个算子任务会按照并行度分为多个并行子任务执行,而不同的子任务会占据不同的任务槽(task slot)。由于不同的 slot 在计算资源上是物理隔离的,所以Flink 能管理的状态在并行任务间是无法共享的,每个状态只能针对当前子任务的实例有效。
很多有状态的操作(比如聚合、窗口)都是要先做 keyBy 进行按键分区的。按键分区之后,任务所进行的所有计算都应该只针对当前 key 有效,所以状态也应该按照 key 彼此隔离。在这种情况下,状态的访问方式又会有所不同。
🎨所以:我们又可以将托管状态分为两类:算子状态和按键分区状态。
键控状态Keyed State
详细内容可以瞅瞅官网:https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/dev/datastream/fault-tolerance/state/
Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有相同key的所有数据都会访问相同的状态。
需要注意的是键控状态只能在 KeyedStream 上进行使用,可以通过 stream.keyBy(…) 来得到 KeyedStream 。

Flink 提供了以下数据格式来管理和存储键控状态 (Keyed State):
ValueState:存储单值类型的状态。可以使用 update(T) 进行更新,并通过 T value() 进行检索。
ListState:存储列表类型的状态。可以使用 add(T) 或 addAll(List) 添加元素;并通过 get() 获得整个列表。
ReducingState:用于存储经过 ReduceFunction 计算后的结果,使用 add(T) 增加元素。
AggregatingState:用于存储经过 AggregatingState 计算后的结果,使用 add(IN) 添加元素。
FoldingState:已被标识为废弃,会在未来版本中移除,官方推荐使用 AggregatingState 代替。
MapState:维护 Map 类型的状态。
Code实操
例子1
使用KeyState中的ValueState来模拟实现maxBy
代码清单
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author tiancx*/
public class StateMaxByDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//加载数据DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("北京", 1),Tuple2.of("上海", 2),Tuple2.of("广州", 3),Tuple2.of("北京", 4),Tuple2.of("上海", 5),Tuple2.of("广州", 6),Tuple2.of("北京", 3)).keyBy(t -> t.f0);source.map(new RichMapFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>>() {//定义状态,用于存储最大值ValueState<Integer> maxValueState = null;//进行初始化@Overridepublic void open(Configuration parameters) throws Exception {//创建状态描述器ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>("maxValueState", Integer.class);maxValueState = getRuntimeContext().getState(descriptor);}@Overridepublic Tuple3<String, Integer, Integer> map(Tuple2<String, Integer> value) throws Exception {//获取当前值Integer currentVal = value.f1;Integer currentMax = maxValueState.value();if (currentMax == null || currentVal > currentMax) {maxValueState.update(currentVal);}return Tuple3.of(value.f0, value.f1, maxValueState.value());}}).print();env.execute();}
}
运行看结果

例子2
如果一个人的体温超过阈值38度,超过3次及以上,则输出: 姓名 [温度1,温度2,温度3]
代码清单
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import java.util.List;/*** @author tiancx*/
public class StateDemo01 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);DataStreamSource<String> stream = env.socketTextStream("localhost", 9999);DataStream<Tuple2<String, Integer>> source = stream.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] split = value.split(" ");return Tuple2.of(split[0], Integer.parseInt(split[1]));}}).keyBy(t -> t.f0);source.flatMap(new RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, List<Integer>>>() {ListState<Integer> listState = null;//存放超过38度的次数ValueState<Integer> valueState = null;@Overridepublic void open(Configuration parameters) throws Exception {ListStateDescriptor<Integer> listStateDescriptor = new ListStateDescriptor<Integer>("listState", Integer.class);ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>("valueState", Integer.class);listState = getRuntimeContext().getListState(listStateDescriptor);valueState = getRuntimeContext().getState(descriptor);}@Overridepublic void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, List<Integer>>> out) throws Exception {System.out.println("进入flatMap");Integer val = value.f1;if (valueState.value() == null) {valueState.update(0);}if (val > 38) {listState.add(val);valueState.update(valueState.value() + 1);}if (valueState.value() >= 3) {List<Integer> list = (List<Integer>) listState.get();out.collect(Tuple2.of(value.f0, list));listState.clear();valueState.clear();}}}).print();env.execute();}
}
输入

运行结果

算子状态OperatorState
算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态跟数据的 key 无关,所以不同 key 的数据只要被分发到同一个并行子任务,就会访问到同一个 Operator State。
算 子 状 态 也 支 持 不 同 的 结 构 类 型 , 主 要 有 三 种 : ListState 、 UnionListState 和BroadcastState。
code实操
例子1:
在 map 算子中计算数据的个数
代码清单
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.scala.typeutils.Types;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author tiancx*/
public class OperatorListStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<String> stream = env.socketTextStream("localhost", 9999);stream.map(new MyCountMapFunction()).print();env.execute();}public static class MyCountMapFunction implements MapFunction<String, Long>, CheckpointedFunction {private Long count = 0L;private ListState<Long> listState;@Overridepublic Long map(String value) throws Exception {return ++count;}/*** 本地变量持久化:将 本地变量拷贝到算子状态中,开启checkpoint 时才会调用 snapshotState 方法** @param context the context for drawing a snapshot of the operator* @throws Exception*/@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {System.out.println("MyCountMapFunction.snapshotState");listState.clear();listState.add(count);}/*** 初始化本地变量:程序启动和恢复时,从状态中把数据添加到本地变量,每个子任务调用一次** @param context the context for initializing the operator* @throws Exception*/@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {System.out.println("MyCountMapFunction.initializeState");//从上下文初始化状态listState = context.getOperatorStateStore().getListState(new ListStateDescriptor<>("listState", Types.LONG()));//从算子状态中把数据拷贝到本地变量if (context.isRestored()) {for (Long aLong : listState.get()) {count += aLong;}}}}
}
输入

运行结果

【都看到这了,点点赞点点关注呗,爱你们】😚😚

💬
✨ 正在努力的小叮当~
💖 超级爱分享,分享各种有趣干货!
👩💻 提供:模拟面试 | 简历诊断 | 独家简历模板
🌈 感谢关注,关注了你就是我的超级粉丝啦!
🔒 以下内容仅对你可见~
作者:小叮当撩代码,CSDN后端领域新星创作者 |阿里云专家博主
CSDN个人主页:小叮当撩代码
🔎GZH:哆啦A梦撩代码
🎉欢迎关注🔎点赞👍收藏⭐️留言📝
相关文章:
Flink状态State | 大数据技术
⭐简单说两句⭐ ✨ 正在努力的小叮当~ 💖 超级爱分享,分享各种有趣干货! 👩💻 提供:模拟面试 | 简历诊断 | 独家简历模板 🌈 感谢关注,关注了你就是我的超级粉丝啦! &a…...
go语言方法之方法值和方法表达式
我们经常选择一个方法,并且在同一个表达式里执行,比如常见的p.Distance()形式,实际上 将其分成两步来执行也是可能的。p.Distance叫作“选择器”,选择器会返回一个方法"值"->一 个将方法(Point.Distance)绑定到特定接…...
TDMQ CKafka 版弹性存储能力重磅上线!
导语 自 2024年5月起,TDMQ CKafka 专业版支持弹性存储能力,这种产品形态下,存储可按需使用、按量付费,一方面降低消费即删除、存储使用波动大场景下的存储成本,另一方面存储空间理论上无穷大。 TDMQ CKafka 版产品能…...
24、Linux网络端口
Linux网络端口 1、查看网络接口信息ifconfig ens33 eth0 文件 ifconfig 当前设备正在工作的网卡,启动的设备。 ifconfig -a 查看所有的网络设备。 ifconfig ens33 查看指定网卡设备。 ifconfig ens33 up/down 对指定网卡设备进行开关 基于物理网卡设备虚拟的…...
Mysql全文搜索和LIKE搜索有什么区别
全文搜索和LIKE的区别 性能:在大数据集上,全文搜索通常比LIKE查询更快,因为它使用了专门的索引结构。 功能:全文搜索提供了更丰富的查询功能,如多个关键词的搜索、自然语言搜索、布尔搜索等。而LIKE通常只支持简单的…...
elementplu父级页面怎么使用封装子组件原组件的方法
一、使用原因: 封装了el-table,表格中有多选,父级要根据指定状态,让其选择不上,需要用到elementplus中table原方法toggleRowSelection 附加小知识点:(el-tree刷新树后之前选中的保持高亮setCurr…...
el-date-picker选择开始日期的近半年
<el-date-pickerv-model"form[val.key]":type"val.datePickerType || daterange":clearable"val.clearable && true"range-separator"~"start-placeholder"开始日期"end-placeholder"结束日期"style&q…...
C++
封装一个矩形类(Rect),拥有私有属性:宽度(width)、高度(height), 定义公有成员函数: 初始化函数:void init(int w, int h) 更改宽度的函数:set_w(int w) 更改高度的函数:set_h(int h) 输出该矩形的周长和面积函数:void show()...
nginx源码阅读理解 [持续更新,建议关注]
文章目录 前述一、nginx 进程模型基本流程二、源码里的小点1.对字符串操作都进行了原生实现2.配置文件解析也是原生实现待续 前述 通过对 nginx 的了解和代码简单阅读,发现这个C代码的中间件确实存在过人之处,使用场景特别多,插件模块很丰富…...
笔试训练2
牛客.单词搜索 刚开始我就想是搜索,但是不清楚bfs还是dfs更好,我尝试了bfs但是队列存东西,没有我想象的那么好写,所以我决定试试dfs import java.util.*;public class Solution {static int m 0;static int n 0;static int […...
构建坚不可摧的Web安全防线:深入剖析二阶注入与全面防御策略
引言 在数字化时代,数据安全是企业和个人最为关注的问题之一。网络攻击手段层出不穷,其中SQL注入攻击尤为狡猾,它允许攻击者通过Web应用的漏洞对数据库进行非法操作。更隐蔽的是二阶注入攻击,它不仅威胁当前操作,还能…...
(4) qml动态元素
文章目录 概述注意 动画元素变化的策略Animation on 变化behavior on⽤standalone animation注意 缓冲曲线(Easing Curves)动画分组 概述 这⼀章介绍如何控制属性值的变化,通过动画的⽅式在⼀段时间内来改变属性值。这项技术是建⽴⼀个现代化…...
深度神经网络——什么是梯度下降?
如果对神经网络的训练有所了解,那么很可能已经听说过“梯度下降”这一术语。梯度下降是提升神经网络性能、降低其误差率的主要技术手段。然而,对于机器学习新手来说,梯度下降的概念可能稍显晦涩。本文旨在帮助您直观理解梯度下降的工作原理。…...
基本元器件 - 二极管
目录 二极管的主要参数 二极管的分类 整流二极管 快恢复二极管(FRD) 稳压(齐纳)二级管 瞬态电压抑制器(TVS) 开关二极管 肖特基二极管(SBD) 正偏与反偏 常用封装 伏安特性…...
【设计模式】单例模式(创建型)⭐⭐⭐
1.概念 1.1 什么是单例模式 单例模式属于创建型模式,一个单例类在任何情况下都只存在一个实例, 构造方法必须是私有的、由自己创建一个静态变量存储实例,对外提供一 个静态公有方法获取实例。 1.2 优点与缺点 优点:是内存中只有一个实例&…...
《深入浅出C语言:从基础到指针的全面指南》
1. 简介 C语言是一种通用的编程语言,广泛应用于系统编程、嵌入式系统和高性能应用程序。它由Dennis Ritchie在1972年开发,并且至今仍然非常流行。C语言以其高效、灵活和强大的功能著称,是许多现代编程语言的基础。 2. 基本语法 2.1 Hello, …...
Typescript高级: 深入实践Record类型
概述 Record 类型是TS中其众多强大特性之一它为我们提供了创建键值对映射的强大能力极大地增强了代码的灵活性与类型安全性 应用示例 1 )用于配置场景 在复杂的项目中,配置文件往往包含多个模块的不同设置使用 Record 可以确保配置的键名正确且值类型…...
重构与优化-对象间特性搬移重构(2)
在软件开发过程中,重构是改进代码结构和设计、不改变其外在行为的过程。对象之间的特性搬移(Moving Features Between Objects)是重构的一种重要类型,它涉及到将属性、方法或其他特性从一个对象转移到另一个对象,以优化代码结构、提高可维护性和遵循设计原则。以下是几种典…...
网络流量监控与DNS流量分析
目录 一、网络流量监控的基础知识 什么是网络流量监控? 网络流量监控的重要性 实用案例:如何通过网络流量监控优化带宽利用 二、DNS流量分析的核心要点 什么是DNS流量分析? DNS流量分析的优势 实用技巧:如何通过DNS流量分…...
【数据分析】打造完美数据分析环境:Python开发环境搭建全攻略
打造完美数据分析环境:Python开发环境搭建全攻略 在数据分析的世界中,搭建一个稳定且高效的Python开发环境是至关重要的。本文将介绍三种主要的环境搭建方式:使用pip、Anaconda和Miniconda。 1. 使用pip从清华镜像安装Python包 pip是Pytho…...
网络编程(Modbus进阶)
思维导图 Modbus RTU(先学一点理论) 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议,由 Modicon 公司(现施耐德电气)于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…...
【力扣数据库知识手册笔记】索引
索引 索引的优缺点 优点1. 通过创建唯一性索引,可以保证数据库表中每一行数据的唯一性。2. 可以加快数据的检索速度(创建索引的主要原因)。3. 可以加速表和表之间的连接,实现数据的参考完整性。4. 可以在查询过程中,…...
大数据零基础学习day1之环境准备和大数据初步理解
学习大数据会使用到多台Linux服务器。 一、环境准备 1、VMware 基于VMware构建Linux虚拟机 是大数据从业者或者IT从业者的必备技能之一也是成本低廉的方案 所以VMware虚拟机方案是必须要学习的。 (1)设置网关 打开VMware虚拟机,点击编辑…...
Linux简单的操作
ls ls 查看当前目录 ll 查看详细内容 ls -a 查看所有的内容 ls --help 查看方法文档 pwd pwd 查看当前路径 cd cd 转路径 cd .. 转上一级路径 cd 名 转换路径 …...
高危文件识别的常用算法:原理、应用与企业场景
高危文件识别的常用算法:原理、应用与企业场景 高危文件识别旨在检测可能导致安全威胁的文件,如包含恶意代码、敏感数据或欺诈内容的文档,在企业协同办公环境中(如Teams、Google Workspace)尤为重要。结合大模型技术&…...
C# SqlSugar:依赖注入与仓储模式实践
C# SqlSugar:依赖注入与仓储模式实践 在 C# 的应用开发中,数据库操作是必不可少的环节。为了让数据访问层更加简洁、高效且易于维护,许多开发者会选择成熟的 ORM(对象关系映射)框架,SqlSugar 就是其中备受…...
实现弹窗随键盘上移居中
实现弹窗随键盘上移的核心思路 在Android中,可以通过监听键盘的显示和隐藏事件,动态调整弹窗的位置。关键点在于获取键盘高度,并计算剩余屏幕空间以重新定位弹窗。 // 在Activity或Fragment中设置键盘监听 val rootView findViewById<V…...
什么?连接服务器也能可视化显示界面?:基于X11 Forwarding + CentOS + MobaXterm实战指南
文章目录 什么是X11?环境准备实战步骤1️⃣ 服务器端配置(CentOS)2️⃣ 客户端配置(MobaXterm)3️⃣ 验证X11 Forwarding4️⃣ 运行自定义GUI程序(Python示例)5️⃣ 成功效果
Redis无法正确连接 在运行jar包时出现了这样的错误 查询得知问题核心在于Redis连接失败,具体原因是客户端发送了密码认证请求,但Redis服务器未设置密码 1.为Redis设置密码(匹配客户端配置) 步骤: 1).修…...
使用Matplotlib创建炫酷的3D散点图:数据可视化的新维度
文章目录 基础实现代码代码解析进阶技巧1. 自定义点的大小和颜色2. 添加图例和样式美化3. 真实数据应用示例实用技巧与注意事项完整示例(带样式)应用场景在数据科学和可视化领域,三维图形能为我们提供更丰富的数据洞察。本文将手把手教你如何使用Python的Matplotlib库创建引…...
