Flink四大基石之State
State
state 可以理解为-- 历史计算结果
有状态计算和无状态计算
- 无状态计算:
-
- 不需要考虑历史数据, 相同的输入,得到相同的输出!
- 如:map, 将每个单词记为1, 进来一个hello, 得到(hello,1),再进来一个hello,得到的还是(hello,1)
- 有状态计算:
-
- 需要考虑历史数据, 相同的输入,可能会得到不同的输出!
- 如:sum/reduce/maxBy, 对单词按照key分组聚合,进来一个(hello,1),得到(hello,1), 再进来一个(hello,1), 得到的结果为(hello,2)
注意: Flink默认已经支持了无状态和有状态计算!
例如WordCount代码:已经做好了状态维护, 输入hello,输出(hello,1),再输入hello,输出(hello,2)
有状态计算和无状态计算的应用场景
- 无状态计算:数据转换,过滤等操作直接使用无状态的map/filter即可
- 有状态计算:需要做聚合/比较的操作得使用有状态的sum/reduce/maxBy/minBy....
有状态中的状态的分类
有状态的计算是流处理框架要实现的重要功能,因为稍复杂的流处理场景都需要记录状态(State),然后在新流入数据的基础上不断更新状态。下面的几个场景都需要使用流处理的状态功能:
- 数据流中的数据有重复,想对重复数据去重,需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入过的数据来判断去重。
- 检查输入流是否符合某个特定的模式,需要将之前流入的元素以状态的形式缓存下来。比如,判断一个温度传感器数据流中的温度是否在持续上升。
- 对一个时间窗口内的数据进行聚合分析,分析一个小时内某项指标的75分位或99分位的数值。
其实窗口本身就是状态,他不是立即出结果,而是将数据都保存起来,达到触发条件才计算。
一个状态更新和获取的流程如下图所示,一个算子子任务接收输入流,获取对应的状态,根据新的计算结果更新状态。一个简单的例子是对一个时间窗口内输入流的某个整数字段求和,那么当算子子任务接收到新元素时,会获取已经存储在状态中的数值,然后将当前输入加到状态上,并将状态数据更新。

以wordcout为例,说明上图的流程
状态类型
Flink有两种基本类型的状态:托管状态(Managed State)和原生状态(Raw State)。
两者的区别:Managed State是由Flink管理的,Flink帮忙存储、恢复和优化,Raw State是开发者自己管理的,需要自己序列化。
具体区别有:
从状态管理的方式上来说,Managed State由Flink Runtime托管,状态是自动存储、自动恢复的,Flink在存储管理和持久化上做了一些优化。当横向伸缩,或者说修改Flink应用的并行度时,状态也能自动重新分布到多个并行实例上。Raw State是用户自定义的状态。
从状态的数据结构上来说,Managed State支持了一系列常见的数据结构,如ValueState、ListState、MapState等。Raw State只支持字节,任何上层数据结构需要序列化为字节数组。使用时,需要用户自己序列化,以非常底层的字节数组形式存储,Flink并不知道存储的是什么样的数据结构。
从具体使用场景来说,绝大多数的算子都可以通过继承Rich函数类或其他提供好的接口类,在里面使用Managed State。Raw State是在已有算子和Managed State不够用时,用户自定义算子时使用。
对Managed State继续细分,它又有两种类型:Keyed State和Operator State。
Flink状态 - 托管状态- KeyedState ( 在keyBy之后可以使用状态 )- ValueState (存储一个值)- ListState (存储多个值)- MapState (存储key-value) - OperatorState ( 没有keyBy的情况下也可以使用 ) [不用]- 原生状态 (不用)
Keyed State (键控状态)
Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有相同key的所有数据都会访问相同的状态。
需要注意的是键控状态只能在 KeyedStream 上进行使用,可以通过 stream.keyBy(...) 来得到 KeyedStream 。

Flink 提供了以下数据格式来管理和存储键控状态 (Keyed State):
· ValueState:存储单值类型的状态。可以使用 update(T) 进行更新,并通过 T value() 进行检索。
· ListState:存储列表类型的状态。可以使用 add(T) 或 addAll(List) 添加元素;并通过 get() 获得整个列表。
· ReducingState:用于存储经过 ReduceFunction 计算后的结果,使用 add(T) 增加元素。
· AggregatingState:用于存储经过 AggregatingState 计算后的结果,使用 add(IN) 添加元素。
· FoldingState:已被标识为废弃,会在未来版本中移除,官方推荐使用 AggregatingState 代替。
· MapState:维护 Map 类型的状态。
代码演示-Managed State-Keyed State
//nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/fault-tolerance/state/
案例1:
使用KeyedState中的ValueState获取数据中的最大值(获取每个key的最大值)(实际中直接使用maxBy即可)
也就是我们自己使用KeyState中的ValueState来模拟实现maxBy
代码实现:
package com.bigdata.state;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class _01_KeyedStateDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStream<Tuple2<String, Long>> tupleDS = env.fromElements(Tuple2.of("北京", 1L),Tuple2.of("上海", 2L),Tuple2.of("北京", 6L),Tuple2.of("上海", 8L),Tuple2.of("北京", 3L),Tuple2.of("上海", 4L),Tuple2.of("北京", 7L));//2. source-加载数据tupleDS.keyBy(new KeySelector<Tuple2<String, Long>, String>() {@Overridepublic String getKey(Tuple2<String, Long> value) throws Exception {return value.f0;}}).map(new RichMapFunction<Tuple2<String, Long>, Tuple2<String,Long>>() {// 借助状态这个API实现ValueState<Long> maxValueState= null;@Overridepublic void open(Configuration parameters) throws Exception {// 就是对ValueState初始化ValueStateDescriptor<Long> stateDescriptor = new ValueStateDescriptor<Long>("valueState",Long.class);maxValueState = getRuntimeContext().getState(stateDescriptor);}@Overridepublic Tuple2<String, Long> map(Tuple2<String, Long> value) throws Exception {Long val = value.f1;if(maxValueState.value() == null){maxValueState.update(val);}else{if(maxValueState.value() < val){maxValueState.update(val);}}return Tuple2.of(value.f0,maxValueState.value());}}).print();//.maxBy(1).print();//3. transformation-数据处理转换//4. sink-数据输出//5. execute-执行env.execute();}
}
案例2:
如果一个人的体温超过阈值38度,超过3次及以上,则输出: 姓名 [温度1,温度2,温度3]
姓名,温度输入 输出张三,37张三,38张三,39张三,35张三,40张三,41 张三,[39,40,41]张三,40 张三,[39,40,41,40]
package com.bigdata.state;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.planner.expressions.In;
import org.apache.flink.util.Collector;import java.util.ArrayList;public class _02_KeyedStateDemo2 {// 如果一个人的体温超过阈值38度,超过3次及以上,则输出: 姓名 [温度1,温度2,温度3]public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 8889);//3. transformation-数据处理转换 zs,37dataStreamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] arr = value.split(",");return Tuple2.of(arr[0],Integer.valueOf(arr[1]));}}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}}).flatMap(new RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, ArrayList<Integer>>>() {ValueState<Integer> valueState = null;ListState<Integer> listState = null;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<Integer>("numState",Integer.class);valueState = getRuntimeContext().getState(stateDescriptor);ListStateDescriptor<Integer> listStateDescriptor = new ListStateDescriptor<>("listState", Integer.class);listState = getRuntimeContext().getListState(listStateDescriptor);}@Overridepublic void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, ArrayList<Integer>>> out) throws Exception {Integer tiwen = value.f1;if(tiwen >= 38){valueState.update(valueState.value()==null?1:(valueState.value()+1));listState.add(tiwen);}if(valueState.value()!=null && valueState.value() >= 3){ArrayList<Integer> list = new ArrayList<>();Iterable<Integer> iterable = listState.get();for (Integer tiwenwen : iterable) {list.add(tiwenwen);}out.collect(Tuple2.of(value.f0,list));}}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

相关文章:
Flink四大基石之State
State state 可以理解为-- 历史计算结果 有状态计算和无状态计算 无状态计算: 不需要考虑历史数据, 相同的输入,得到相同的输出!如:map, 将每个单词记为1, 进来一个hello, 得到(hello,1),再进来一个hello,得到的还是(hello,1) 有状态计算: 需要考虑历史数据, 相同的输入,可…...
Spacy小笔记:zh_core_web_trf、zh_core_web_lg、zh_core_web_md 和 zh_core_web_sm区别
Spacy小笔记 最近频繁用到spacy,就小记一下。 2024.11.29 zh_core_web_trf、zh_core_web_lg、zh_core_web_md 和 zh_core_web_sm区别 首先,它们都是预训练的中文模型: zh_core_web_trf:395M 架构: 基于 Transformer 架构(bert…...
第六届智能控制、测量与信号处理国际学术会议 (ICMSP 2024)
重要信息 2024年11月29日-12月1日 中国陕西西安石油大学雁塔校区 大会官网:www.icmsp.net 大会简介 第六届智能控制、测量与信号处理国际学术会议(ICMSP 2024)由西安石油大学、中海油田服务股份有限公司、浙江水利水电学院与中国石油装备…...
docker服务容器化
docker服务容器化 1 引言2 多个容器间网络联通2.1 单独创建关联2.2 创建时关联 3 服务搭建3.1 镜像清单3.2 容器创建 4 联合实战4.2 flink_sql之kafka到starrocks4.2 flink_sql之mysql到starrocks 5 文献借鉴 1 引言 利用docker可以很效率地搭建服务,本文在win1…...
【QT】控件8
1.QDial 通过调节旋钮位置来控制窗口的不透明度: void Widget::on_dial_valueChanged(int value) {qDebug()<<value;this->setWindowOpacity((double)value/100); }效果演示: 2.Date/Time Edit 计算两个日期的差值 ui界面设计 计算按钮按下…...
漫谈推理谬误——错误因果
相关文章 漫谈推理谬误——错误假设-CSDN博客文章浏览阅读736次,点赞22次,收藏3次。在日常生活中,我们会面临各种逻辑推理,有些看起来一目了然,有些非常的科学严谨,但也有很多似是而非,隐藏了陷…...
【数据结构】队列实现剖析:掌握队列的底层实现
在计算机科学中,**队列(Queue)**是一种常见的数据结构,它遵循先进先出(FIFO,First In First Out)的原则。队列的应用非常广泛,例如任务调度、资源管理、进程通信等。本篇文章旨在为计…...
【C++】IO库(二):文件输入输出
8.2 文件输入输出 头文件 fstream 定义了三个类型来之支持文件IO,分别是: ifstream:从一个给定文件读取数据;ofstream:向一个给定文件写入数据;fstream:读写给定文件。 在 C 当中,…...
105.【C语言】数据结构之二叉树求总节点和第K层节点的个数
目录 1.求二叉树总的节点的个数 1.容易想到的方法 代码 缺陷 思考:能否在TreeSize函数内定义静态变量解决size的问题呢? 其他写法 运行结果 2.最好的方法:分而治之 代码 运行结果 2.求二叉树第K层节点的个数 错误代码 运行结果 修正 运行结果 其他写法 1.求二…...
力扣637. 二叉树的层平均值
给定一个非空二叉树的根节点 root , 以数组的形式返回每一层节点的平均值。与实际答案相差 10-5 以内的答案可以被接受。 提示: 树中节点数量在 [1, 104] 范围内-231 < Node.val < 231 - 1 代码: /*** Definition for a binary tree node.* stru…...
【前端】Next.js 服务器端渲染(SSR)与客户端渲染(CSR)的最佳实践
关于Next.js 服务器端渲染(SSR)与客户端渲染(CSR)的实践内容方面,我们按下面几点进行阐述。 1. 原理 服务器端渲染 (SSR): 在服务器上生成完整的HTML页面,然后发送给客户端。这使得用户在首次访问时能够…...
路径规划之启发式算法之一:A-Star(A*)算法
A*算法是一种启发式搜索算法,常用于解决路径规划问题。 一、A*算法的定义与原理 A*算法是一种用于在图形或网格中查找最短路径的算法。它在搜索过程中综合考虑了每个节点的实际距离(g值)和预估距离(h值),以…...
Android复习代码1-4章
public class RudioButton extends AppCompatActivity {Overrideprotected void onCreate(Nullable Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_rudio_button);// 找到RadioGroup和TextView的实例RadioGroup radioGrou…...
【问题】webdriver.Chrome()设置参数executable_path报不存在
场景1: 标红报错unresolved reference executable_path 场景2: 执行报错TypeError: __init__() got an unexpected keyword argument executable_path 原因: 上述两种场景是因为selenium4开始不再支持某些初始化参数。比如executable_path 解决: 方案…...
win10系统安装docker-desktop
1、开启Hyper-v ———————————————— Hyper-V 是微软提供的一种虚拟化技术,它允许你在同一台物理计算机上运行多个独立的操作系统实例。这种技术主要用于开发、测试、以及服务器虚拟化等领域。 —————————————————————— &#…...
小程序-基于java+SpringBoot+Vue的乡村研学旅行平台设计与实现
项目运行 1.运行环境:最好是java jdk 1.8,我们在这个平台上运行的。其他版本理论上也可以。 2.IDE环境:IDEA,Eclipse,Myeclipse都可以。推荐IDEA; 3.tomcat环境:Tomcat 7.x,8.x,9.x版本均可 4.硬件环境:…...
组件A底部栏(position: fixed )事件使用$emit更新内容失败bug解决
今天遇到一个很离奇的bug,记录一下 问题:在组件内底部栏使用$emit触发按钮事件但打印出来的值是初始化的值,更新的值被重置导致更新失败 原因:组件内底部使用了 position: fixed; 固定, 导致组件内插槽 this 与 保存按…...
数据结构——排序第三幕(深究快排(非递归实现)、快排的优化、内省排序,排序总结)超详细!!!!
文章目录 前言一、非递归实现快排二、快排的优化版本三、内省排序四、排序算法复杂度以及稳定性的分析总结 前言 继上一篇博客基于递归的方式学习了快速排序和归并排序 今天我们来深究快速排序,使用栈的数据结构非递归实现快排,优化快排(三路…...
C++的类功能整合
1. 类的基本概念 类是面向对象编程的核心,它封装了数据和操作数据的函数。 #include <iostream> using namespace std;class MyClass { public:int publicData;void publicFunction() {cout << "Public function" << endl;}private:i…...
《String类》
目录 一、定义与概述 二、创建字符串对象 2.1 直接赋值 2.2 使用构造函数 三、字符串的不可变性 四、常用方法 4.1 String对象的比较 4.1.1 比较是否引用同一个对象 4.1.2 boolean equals(Object anObject)方法:按照字典序比较 4.1.3 int compareTo(Strin…...
XML Group端口详解
在XML数据映射过程中,经常需要对数据进行分组聚合操作。例如,当处理包含多个物料明细的XML文件时,可能需要将相同物料号的明细归为一组,或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码,增加了开…...
Python爬虫(二):爬虫完整流程
爬虫完整流程详解(7大核心步骤实战技巧) 一、爬虫完整工作流程 以下是爬虫开发的完整流程,我将结合具体技术点和实战经验展开说明: 1. 目标分析与前期准备 网站技术分析: 使用浏览器开发者工具(F12&…...
C++ 基础特性深度解析
目录 引言 一、命名空间(namespace) C 中的命名空间 与 C 语言的对比 二、缺省参数 C 中的缺省参数 与 C 语言的对比 三、引用(reference) C 中的引用 与 C 语言的对比 四、inline(内联函数…...
深入解析C++中的extern关键字:跨文件共享变量与函数的终极指南
🚀 C extern 关键字深度解析:跨文件编程的终极指南 📅 更新时间:2025年6月5日 🏷️ 标签:C | extern关键字 | 多文件编程 | 链接与声明 | 现代C 文章目录 前言🔥一、extern 是什么?&…...
pikachu靶场通关笔记22-1 SQL注入05-1-insert注入(报错法)
目录 一、SQL注入 二、insert注入 三、报错型注入 四、updatexml函数 五、源码审计 六、insert渗透实战 1、渗透准备 2、获取数据库名database 3、获取表名table 4、获取列名column 5、获取字段 本系列为通过《pikachu靶场通关笔记》的SQL注入关卡(共10关࿰…...
Springboot社区养老保险系统小程序
一、前言 随着我国经济迅速发展,人们对手机的需求越来越大,各种手机软件也都在被广泛应用,但是对于手机进行数据信息管理,对于手机的各种软件也是备受用户的喜爱,社区养老保险系统小程序被用户普遍使用,为方…...
ABAP设计模式之---“简单设计原则(Simple Design)”
“Simple Design”(简单设计)是软件开发中的一个重要理念,倡导以最简单的方式实现软件功能,以确保代码清晰易懂、易维护,并在项目需求变化时能够快速适应。 其核心目标是避免复杂和过度设计,遵循“让事情保…...
Redis的发布订阅模式与专业的 MQ(如 Kafka, RabbitMQ)相比,优缺点是什么?适用于哪些场景?
Redis 的发布订阅(Pub/Sub)模式与专业的 MQ(Message Queue)如 Kafka、RabbitMQ 进行比较,核心的权衡点在于:简单与速度 vs. 可靠与功能。 下面我们详细展开对比。 Redis Pub/Sub 的核心特点 它是一个发后…...
Fabric V2.5 通用溯源系统——增加图片上传与下载功能
fabric-trace项目在发布一年后,部署量已突破1000次,为支持更多场景,现新增支持图片信息上链,本文对图片上传、下载功能代码进行梳理,包含智能合约、后端、前端部分。 一、智能合约修改 为了增加图片信息上链溯源,需要对底层数据结构进行修改,在此对智能合约中的农产品数…...
(一)单例模式
一、前言 单例模式属于六大创建型模式,即在软件设计过程中,主要关注创建对象的结果,并不关心创建对象的过程及细节。创建型设计模式将类对象的实例化过程进行抽象化接口设计,从而隐藏了类对象的实例是如何被创建的,封装了软件系统使用的具体对象类型。 六大创建型模式包括…...
