没经过我同意,flink window就把数据存到state里的了?
欢迎关注我
不知道大家在初次使用Flink的时候,是否对Flink中定义本地变量和状态比较好奇,这俩有啥区别?
而且在使用Window API时明明没有显式地创建状态,也没调用getState()
,却依然把每个窗口里的所有元素都自动缓存到 StateBackend里,这到底是怎么做到的?它怎么可以自作主张呢。
本地变量 vs Managed State
我们先来看看第一个问题,怎么清楚的区分本地变量和状态的区别呢?我们看下表:
特性 | 本地变量(Local Variable / Field) | Managed State (ValueState , ListState …) |
---|---|---|
生命周期 | 仅在当前 processElement() 或算子实例初始化时有效;Task 重启或 failover 后重置为初始值 | 跨事件、跨 checkpoint 持久化;重启后按最新 checkpoint 恢复 |
容错 | 不参与 Flink 容错;Task 重启或 Job 恢复后丢失 | 参与 checkpoint/savepoint;保证 Exactly-once 语义 |
序列化 | 不会被 Flink 自动序列化;只存在 JVM 堆栈或算子对象里 | 通过 TypeSerializer 序列化到 StateBackend(内存或 RocksDB) |
使用场景 | 临时计数、方法内临时缓存,无需跨事件保留 | 需要累积、聚合、窗口缓存、跨事件关联时使用 |
简单写个代码看看
//本地字段无法容错
public class MyMapFunction extends RichMapFunction<Event, Integer> {private int counter = 0; // 普通字段@Overridepublic Integer map(Event value) {counter += 1;return counter; // 失败重启后,counter 会被重置为 0}
}//使用 Managed State
public class MyStatefulMap extends RichMapFunction<Event, Integer> {private transient ValueState<Integer> counterState;@Overridepublic void open(Configuration cfg) {counterState = getRuntimeContext().getState(new ValueStateDescriptor<>("counter", Integer.class));}@Overridepublic Integer map(Event value) throws Exception {Integer cnt = Optional.ofNullable(counterState.value()).orElse(0);cnt += 1;counterState.update(cnt); // 这个值会被 checkpoint 序列化并恢复return cnt;}
}
Window怎么存元素到state的?
Flink 的 Window 算子并没有让我们在代码里手动 getState()
,我们一般都只是这样写:
dataStream.keyBy(Event::getUserId).window(TumblingEventTimeWindows.of(Time.minutes(5))).apply(new ProcessWindowFunction<Event, Result, String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<Event> elems,Collector<Result> out) { … }});
却能自动把 5 分钟窗口里的所有 Event
缓存起来。而且在强哥之前的文章中也提到,如果Window定义的时间跨度太长,缓存在state里面的数据过多,可能会对服务性能造成影响,官网也是提到了的:
Flink creates one copy of each element per window to which it belongs. Given this, tumbling windows keep one copy of each element (an element belongs to exactly one window unless it is dropped late). In contrast, sliding windows create several of each element, as explained in the Window Assigners section. Hence, a sliding window of size 1 day and slide 1 second might not be a good idea.
那么,Window背后究竟发生了什么?我们来看关键源码位置(注:以下源码基于flink-streaming-java:1.18.1)。
1. 隐式注册StateDescriptor
我们先看示例代码
source
.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) {return Tuple2.of(value, 1);}
})
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new WindowFunctionExample())
;
在执行process
方法的时候,Flink会调用org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorBuilder#apply(org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction<java.lang.Iterable<T>,R,K,W>)
,创建WindowOperator
:
private <R> WindowOperator<K, T, ?, R, W> apply(InternalWindowFunction<Iterable<T>, R, K, W> function) {if (evictor != null) {return buildEvictingWindowOperator(function);} else {ListStateDescriptor<T> stateDesc =new ListStateDescriptor<>(WINDOW_STATE_NAME, inputType.createSerializer(config));return buildWindowOperator(stateDesc, function);}}
可以看到这里创建了ListStateDescriptor
。
然后,在执行Window的生命周期方法org.apache.flink.streaming.runtime.operators.windowing.WindowOperator#open()
,Flink 会为每个 key + window 分配一个内部缓存state:
// create (or restore) the state that hold the actual window contents
// NOTE - the state may be null in the case of the overriding evicting window operator
if (windowStateDescriptor != null) {windowState =(InternalAppendingState<K, W, IN, ACC, ACC>)getOrCreateKeyedState(windowSerializer, windowStateDescriptor);
}public <N, S extends State, V> S getOrCreateKeyedState(TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> stateDescriptor) throws Exception {Preconditions.checkNotNull(namespaceSerializer, "Namespace serializer");Preconditions.checkNotNull(this.keySerializer, "State key serializer has not been configured in the config. This operation cannot use partitioned state.");InternalKvState<K, ?, ?> kvState = (InternalKvState)this.keyValueStatesByName.get(stateDescriptor.getName());if (kvState == null) {if (!stateDescriptor.isSerializerInitialized()) {stateDescriptor.initializeSerializerUnlessSet(this.executionConfig);}kvState = LatencyTrackingStateFactory.createStateAndWrapWithLatencyTrackingIfEnabled((InternalKvState)TtlStateFactory.createStateAndWrapWithTtlIfEnabled(namespaceSerializer, stateDescriptor, this, this.ttlTimeProvider), stateDescriptor, this.latencyTrackingStateConfig);this.keyValueStatesByName.put(stateDescriptor.getName(), kvState);this.publishQueryableStateIfEnabled(stateDescriptor, kvState);}return kvState;}
这里的 windowState
就是 Flink 为你隐藏起来的“窗口元素缓存”:
windowState
存放当前 window 的所有元素- Flink 会在每次触发窗口(watermark 到达或触发器触发)时,把这个
windowState
内容取出来,交给你的process()
方法 - 当窗口关闭后,会
clear()
这个windowState
2. 数据到达:write to state
在 WindowOperator#processElement()
中,Flink 收到新的事件时,会执行:
// 把元素追加到 windowState
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());
……
//触发器触发,则获取窗口状态内容
TriggerResult triggerResult = triggerContext.onElement(element);if (triggerResult.isFire()) {ACC contents = windowState.get();if (contents == null) {continue;}emitWindowContents(window, contents);if (triggerResult.isPurge()) {windowState.clear();}registerCleanupTimer(window);
}
这段代码每来一条事件,就把它 序列化 并写入到 StateBackend(Memory/RocksDB),而不是保存在 Java内存对象里。
同时,如果是触发了触发器,则会返回窗口内容。还会创建一个定时器,定时执行窗口计算。
3. 定时触发:read from state
当定时器触发时,WindowOperator#onEventTime()
会调用:
TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
//触发器触发
if (triggerResult.isFire()) {// 读取并反序列化所有元素ACC contents = windowState.get();if (contents != null) {emitWindowContents(triggerContext.window, contents);}
}// 清空 state
if (triggerResult.isPurge()) {windowState.clear();
}
从 写(add()
)到 读(get()
)再到 删(clear()
),整个过程都是通过 Flink 的 StateBackend 完成的——这保证了:
- Exactly-once 语义:即使 TaskManager 宕机或网络抖动,StateBackend 里缓存的窗口数据都能在恢复后自动重新加载。
- 水平扩容/重分区:当做 rescale 操作或 job 重启时,Flink 会把 state 分片迁移到新的并行度实例,保证窗口数据完整。
为什么 Window 必须序列化?
-
分布式容错
Window 算子可能在多个 TM 上并行,节点故障、网络分区、作业重调度都可能发生,只有把窗口数据写到 StateBackend,才能在恢复时不丢失任何事件。 -
Checkpoint & Savepoint
Flink 借助 Kafka、文件系统做 checkpoint/savepoint;所有 state(包括窗口元素)都会被打包到 checkpoint 里,保证 Exactly-once 与故障恢复。 -
弹性伸缩
扩容、缩容时需要重新分配并行任务,StateBackend 会把各个 key + window 的数据按照新的并行度迁移到对应实例。
总结
源码分析完了,写个小总结吧
- 本地变量 只能在当前算子实例、当前方法调用中生存,不会参与序列化;重启或缩容后会丢失。
- Managed State(包括我们手动声明的
ValueState
、也包括 WindowOperator 背后隐式的ListState
)会被 Flink 序列化到 StateBackend,参与 checkpoint/savepoint、支持容错恢复和重分区。 - 虽然 Window API 没让你在代码里
getState()
,但其核心实现却在算子初始化时自动注册了 ListStateDescriptor,并在processElement()
/onTimer()
里读写、清理这个 state。
了解了这套机制,你就能:
- 在自定义算子里灵活选择到底用本地变量还是 Managed State;
- 明白为什么 Window API 自带的“隐式状态”一定要序列化到后端,以及如何通过 StateBackend 配置(Memory vs RocksDB)来优化性能
欢迎关注我
相关文章:
没经过我同意,flink window就把数据存到state里的了?
欢迎关注我 不知道大家在初次使用Flink的时候,是否对Flink中定义本地变量和状态比较好奇,这俩有啥区别? 而且在使用Window API时明明没有显式地创建状态,也没调用getState(),却依然把每个窗口里的所有元素都自动缓存…...
Python+OpenCV打造AR/VR基础框架:从原理到实战的全链路解析
引言:重新定义数字与现实的边界 在元宇宙概念持续升温的当下,AR(增强现实)与VR(虚拟现实)技术正成为连接物理世界与数字世界的桥梁。Python凭借其丰富的计算机视觉生态(尤其是OpenCV库…...

家用或办公 Windows 电脑玩人工智能开源项目配备核显的必要性(含 NPU 及显卡类型补充)
一、GPU 与显卡的概念澄清 首先需要明确一个容易误解的概念:GPU 不等同于显卡。 显卡和GPU是两个不同的概念。 【概念区分】 在讨论图形计算领域时,需首先澄清一个常见误区:GPU(图形处理单元)与显卡(视…...

实现一个简单的 TCP 客户端/服务器
注意: TCP 三次握手建立连接建立连接后,TCP 提供全双工的通信服务,也就是在同一个连接中,通信双方 可以在同一时刻同时写数据,相对的概念叫做半双工,同一个连接的同一时刻,只能由一方来写数据T…...

对抗帕金森:在疾病阴影下,如何重掌生活主动权?
帕金森病,一种影响全球超 1000 万人的神经退行性疾病,正无声地改变着患者的生活轨迹。随着大脑中多巴胺分泌减少,患者逐渐出现肢体震颤、肌肉僵硬、步态迟缓等症状,甚至连扣纽扣、端水杯这类日常动作都变得艰难。更棘手的是&#…...

鸿蒙 UIAbility组件与UI的数据同步和窗口关闭
使用 EventHub 进行数据通信 Stage模型概念图 根据 Stage 模型概念图 UIAbility 先于 ArkUI Page 创建 所以,事件要先 .on 订阅 再 emit 发布 假如现在有页面 Page1 和他的 UIAbility // src/main/ets/page1ability/Page1Ability.ets onCreate(want: Want, laun…...
DeepSeek 赋能汽车全生态:从产品到服务的智能化跃迁
目录 一、引言二、DeepSeek 助力汽车产品介绍与推广2.1 新车性能参数与技术亮点宣传文案2.2 汽车品牌故事与文化内涵挖掘2.3 汽车广告创意与宣传方案设计 三、DeepSeek 赋能汽车售后服务支持3.1 汽车维修保养知识科普文章创作3.2 常见故障诊断与解决方案生成3.3 汽车用户个性化…...
MQTT 在Spring Boot 中的使用
在 Spring Boot 中使用 MQTT 通常会借助 Spring Integration 项目提供的 MQTT 支持。这使得 MQTT 的集成可以很好地融入 Spring 的消息驱动和企业集成模式。 以下是如何在 Spring Boot 中集成和使用 MQTT 的详细步骤: 前提条件: MQTT Brokerÿ…...

Vue3学习(组合式API——计算属性computed详解)
目录 一、计算属性computed。 Vue官方提供的案例。(普通写法与计算属性写法) 使用计算属性computed重构——>简化描述响应式状态的复杂逻辑。 (1)计算属性computed小案例。 <1>需求说明。(筛选原数组——>得新数组) &…...
Spring 中的 @ComponentScan注解详解
在 Spring 框架中,@ComponentScan 是一个非常重要的注解,它用于自动扫描和注册 Bean。通过该注解,Spring 能够自动发现并管理标注了特定注解的类(如 @Component, @Service, @Repository 等),从而实现依赖注入和容器管理。 本文将详细介绍 @ComponentScan 的作用、常见搭…...
MySQL 数据库故障排查指南
MySQL 数据库故障排查指南 本指南旨在帮助您识别和解决常见的 MySQL 数据库故障。我们将从问题识别开始,逐步深入到具体的故障类型和排查步骤。 1. 问题识别与信息收集 在开始排查之前,首先需要清晰地了解问题的现象和范围。 故障现象: 数…...

Android Studio 模拟器配置方案
Android Studio 模拟器配置方案 1.引言2.使用Android Studio中的模拟器3.使用国产模拟器1.引言 前面介绍【React Native基础环境配置】的时候需要配置模拟器,当时直接使用了USB调试方案,但是有些时候可能不太方便连接手机调试,比如没有iPhone调不了ios。接下来说明另外两种可…...

k8s中ingress-nginx介绍
1. 介绍 Ingress是一种Kubernetes资源,用于将外部流量路由到Kubernetes集群内的服务。与NodePort相比,它提供了更高级别的路由功能和负载平衡,可以根据HTTP请求的路径、主机名、HTTP方法等来路由流量。可以说Ingress是为了弥补NodePort在流量…...
键盘输出希腊字符方法
在不同操作系统中,输出希腊字母的方法有所不同。以下是针对 Windows 和 macOS 系统的详细方法,以及一些通用技巧: 1.Windows 系统 1.1 使用字符映射表 字符映射表是一个内置工具,可以方便地找到并插入希腊字母。 • 步骤…...

字节DeerFlow开源框架:多智能体深度研究框架,实现端到端自动化研究流程
🦌 DeerFlow DeerFlow(Deep Exploration and Efficient Research Flow)是一个社区驱动的深度研究框架,它建立在开源社区的杰出工作基础之上。目标是将语言模型与专业工具(如网络搜索、爬虫和Python代码执行࿰…...
MySQL 存储函数[特殊字符] VS 存储过程[特殊字符]
1、存储函数😸 一、存储函数概述 存储函数是MySQL中一种特殊的存储程序,具有以下特点: 返回单个值:必须通过RETURN语句返回明确的结果SQL表达式使用:可以直接在SQL语句中调用输入参数:只接受输入参数(隐…...
reactor实现TCP遇到的问题和探究
struct conn{ int fd; char rbuffer[1024]; char wbuffer[1024]; int wlength; int rlength; int (*recv_cb)(int); int (*send_cb)(int); }; int (*recv_cb)(int); recv_cb:函数指针的名称*recv_cb:星号 * 表示 recv_cb 是一个指针。(*recv_cb)&#…...
ElasticSearch重启之后shard未分配问题的解决
以下是Elasticsearch重启后分片未分配问题的完整解决方案,结合典型故障场景与最新实践: 一、快速诊断定位 检查集群状态 GET /_cluster/health?pretty # status为red/yellow时需关注unassigned_shards字段值 2.查看未分配分片详情 …...

算法第十八天|530. 二叉搜索树的最小绝对差、501.二叉搜索树中的众数、236. 二叉树的最近公共祖先
530. 二叉搜索树的最小绝对差 题目 思路与解法 第一想法: 一个二叉搜索树的最小绝对差,从根结点看,它的结点与它的最小差值一定出现在 左子树的最右结点(左子树最大值)和右子树的最左结点(右子树的最小值…...
QMK键盘编码器(Encoder)(理论部分)
QMK键盘编码器(Encoder)(理论部分) 前言 作为一名深耕机械键盘DIY多年的老司机,我发现很多键盘爱好者对QMK编码器的配置总是一知半解。今天我就把多年积累的经验毫无保留地分享给大家,从硬件接线到软件配置,从基础应用到高阶玩法,一文全搞定!保证看完就能让你的编码…...

微服务调试问题总结
本地环境调试。 启动本地微服务,使用公共nacos配置。利用如apifox进行本地代码调试解决调试问题。除必要的业务微服务依赖包需要下载到本地。使用mvn clean install -DskipTests进行安装启动前选择好profile环境进行启动,启动前记得mvn clean清理项目。…...
C++(2)
二、面向对象基础 1. 类与对象 1.1 核心概念 类(Class) 定义:抽象描述具有共同特征和行为的对象模板本质:代码复用的蓝图,定义数据(属性)与操作(行为࿰…...

美SEC主席:探索比特币上市证券交易所
作者/演讲者:美SEC主席Paul S. Atkins 编译:Liam 5月12日,由美国SEC加密货币特别工作组发起的主题为《资产上链:TradFi与DeFi的交汇点》系列圆桌会议如期举行。 会议期间,现任美SEC主席Paul S. Atkins发表了主旨演讲。…...
@Controller 与 @RestController-笔记
1.Controller与RestController对比 Spring MVC 中 Controller 与 RestController 的功能对比: Controller是Spring MVC中用于标识一个类作为控制器的标准注解。它允许处理HTTP请求,并返回视图名称,通常和视图解析器一起使用来渲染页面。而R…...
JavaScript篇:揭秘函数式与命令式编程的思维碰撞
大家好,我是江城开朗的豌豆,一名拥有6年以上前端开发经验的工程师。我精通HTML、CSS、JavaScript等基础前端技术,并深入掌握Vue、React、Uniapp、Flutter等主流框架,能够高效解决各类前端开发问题。在我的技术栈中,除了…...
c++和c的不同
c:面向对象(封装,继承,多态),STL,模板 一、基础定义与背景 C语言 诞生年代:20世纪70年代,Dennis Ritchie在贝尔实验室开发。主要特点: 过程式、结构化编程面向系统底层…...

MySQL Join连接算法深入解析
引言 在关系型数据库中,Join操作是实现多表数据关联查询的关键手段,直接影响查询性能和资源消耗。MySQL支持多种Join算法,包括经典的索引嵌套循环连接(Index Nested-Loop Join)、块嵌套循环连接(Block Nes…...
从构想到交付:专业级软开发流程详解
目录 一、软件开发生命周期(SDLC)标准化流程 1. 需求工程阶段(Requirement Engineering) 2. 系统设计阶段(System Design) 3. 开发阶段(Implementation) 4. 测试阶段&a…...
腾讯云-人脸核身+人脸识别教程
一。产品概述 慧眼人脸核身特惠活动 腾讯云慧眼人脸核身是一组对用户身份信息真实性进行验证审核的服务套件,提供人脸核身、身份信息核验、银行卡要素核验和运营商类要素核验等各类实名信息认证能力,以解决行业内大量对用户身份信息真实性核实的需求&a…...

http请求卡顿
接口有时出现卡顿,而且抓包显示有时tcp目标机器没有响应, 但nginx和java应用又没有错误日志,让人抓耳挠腮,最终还是请运维大哥帮忙,一顿操作后系统暂时无卡顿了,佩服的同时感觉疑惑到底调整了啥东…...