当前位置: 首页 > news >正文

Flink状态编程

        Flink处理机制的核心就是“有状态的流处理”,在某些情况下,一条数据的计算不仅要基于当前数据自身,还需要依赖数据流中的一些其他数据。这些在一个任务中,用来辅助计算的数据我们就称之为这个任务的状态。

一、按键分区状态(KeyedState)分类

        按键分区状态是根据输入流中定义的key来进行维护和访问的,所有是定义在KeyedStream中的,也就是对datastream进行keyby之后才能使用按键分区状态。

        按键分区状态支持的结构类型主要有以下几种:

(1)ValueState(值状态)

        顾名思义 ,就是状态只保存一个值,这个值的类型可以是任何具体的数据类型。如ValueState<Long>。

publice interface ValueState<T> extends State {T value();   // 获取当前状态的值void update(T value); // 对状态进行更新
}

(2)ListState(列表状态)

        以列表的形式保存数据。 

publice interface ListState<T> extends State {Iterable<T> get();   // 获取当前的列表状态 是一个可迭代对象void update(List<T> values); // 传入一个列表,对列表状态进行覆盖更新void add(T value);   // 在状态列表中添加一个元素void addAll(List<T> values); //添加多个元素
}

(3)MapState(映射状态)

        把键值对作为状态保存起来。 

publice interface MapState<K, V> extends State {V get(K key);   // 获取传入key对应的value值void put(K key, V value);    //更新key对应的valuevoid putAll(Map<K, V> map);void remove(K key); boolean contains(K key);Iterable<Map.Enter<K, V>> entries();Iterable<K> keys();Iterable<V> values();boolean isEmpty();
}

(4)ReducingState(归约状态)

        归约状态保存的是进行归约计算后的结果值,也就是每add一个元素,都进行归约计算,并将归约结果保存为当前状态值,因此需要在归约状态描述器中声明一个归约函数。

(5)AggregatingState(聚合状态)

        聚合状态与归约状态类似,聚合状态也是一个值,只不过聚合状态描述器传入的是一个更加一般化的聚合函数,可以重新定义中间状态和输出状态的类型。

二、状态生存时间(TTL,time-to-live)

        在实际应用中,状态会随着时间的推移而逐渐增多,如果不加以限制,最终就会导致存储空间的耗尽。Flink可以为状态配置“生存时间”,当状态在内存中存活的时间超过设定的值时,就将他清除掉,调用clear方法可以清除状态。

        但是,如果额外开启一个进程不断扫描所有的状态是否过期会占用大量资源且很多情况下是无用功,一个比较好的方法是:状态失效的时候不立即删除,之后如果有对这个状态进行访问,再判断是否已经失效、从而进行清除,则不需要另外开启进程进行扫描了。

        配置状态的TTL时,首先需要创建一个StateTtlConfig配置对象,然后调用(状态描述器.enableTimeToLive())方法启动TTL功能。

StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(10)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//创建状态和更新状态时才更新失效时间.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)//不返回过期值.build();ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>('my_state', String.class);stateDescriptor.enableTimeToLive(ttlConfig)

三、算子状态分类

        算子状态针对当前算子任务有效,不对key进行隔离,与key无关,因此一个并行子任务上的不同key会访问到相同的算子状态。

(1)ListState(列表状态)

        与上面类似。差别主要在于重分区时,按键分区状态的ListState是以keyGrouo的形式重新均衡发送到下游的,而算子状态的ListState是将所有数据收集到一起均匀分配。

(2)UnionListState(联合列表状态)

        与ListState类似。差别主要在于重分区时,是将所有的状态一起发送到下游的每一个并行子任务上,列表状态太大时效率很低,不建议使用。

(3)BroadcastState(广播状态)

        对所有的并行子任务保持同一份“全局”状态,一般用来做统一的规则或配置设定。这时所有的并行子任务都将访问同一个状态,就像是状态被广播了,注意没有真正广播。

        广播状态必须基于广播流来创建。

(4)算子状态的持久化保存

        与按键分区状态相比,算子状态的故障后重新恢复稍显复杂:因为故障重启后可能发生并行度调整,按键分区状态中相同的key仍然可以被分配到一个子任务上,然而算子状态下的数据所发往的分区可能会发生变化,那么如何保证原先的状态与故障恢复后数据的对应关系呢?

        Flink提供了CheckpointedFunction接口,让我们可以根据业务需求自行设计状态的保存和恢复逻辑,这里就不展开说了。

四、状态持久化和状态后端

(1)开启检查点 

        对状态进行持久化保存,可以在发生故障后进行重启恢复 。Flink对状态进行持久化的方式,就是将状态写入检查点保存到外部存储系统中。具体的存储介质,一般是分布式文件系统。因此,要相对状态进行自动持久化保存,首先就要开启检查点。调用执行环境的.enableCheckpointing()方法就可以开启检查点。

env.enableCheckpointing(1000);  //每隔1s保存检查点

(2)检查点的保存流程

        检查点的保存主要是JobManager TaskManager和外部存储系统三者之间的协调。具体来说: 在应用触发检查点保存时,首先由JobManager向每个TaskManager发送触发检查点命令;TaskManager收到命令后,对当前任务的状态进行快照保存,持久化到远程的存储介质;完成后向JobManager返回确认消息;JobManager只有收到所有TaskManager确认消息,才会确认当前检查点保存成功。而这一切工作的协调,就需要一个“专职人员”来完成,也就是状态后端。

 (3)状态后端(state backends)

        状态后端就是Flink中负责状态的存储、访问以及维护的一个可插拔组件,主要负责两件事:一是本地的状态管理,而是将检查点写入远程的持久化介质。

        状态后端可以分为两类:(默认)哈希表状态后端(HashMapStateBackend)、内嵌RocksDB状态后端(EmbeddedRocksDBStateBackend)。可以通过对执行环境调用.setStateBackend()方法设置状态后端类型。

env.setStateBackend(new HashMapStateBackend());env.setStateBackend(new EmbeddedRocksDBStateBackend());

        哈希表状态后端优点:本地状态放入内存,读写效率高

        内嵌RocksDB状态后端优点:异步快照;增量式保存检查点机制

相关文章:

Flink状态编程

Flink处理机制的核心就是“有状态的流处理”&#xff0c;在某些情况下&#xff0c;一条数据的计算不仅要基于当前数据自身&#xff0c;还需要依赖数据流中的一些其他数据。这些在一个任务中&#xff0c;用来辅助计算的数据我们就称之为这个任务的状态。 一、按键分区状态&…...

【Django篇】--动手实现路由模块化与路由反转

一、路由模块化 在一个Django项目中&#xff0c;由于功能类别不同&#xff0c;因此需要将不同功能进行模块化设计。在Django项目中模块化设计则需要将不同模块封装为对应的app模块&#xff0c;每一个模块中涉及到的路由则也需要进行模块化设计&#xff0c;才能更好的让整个项目…...

多元统计分析练习题3

从总体 A A A 和 B B B 中分别抽取 n 10 n10 n10 个样本 假设 A , B A,B A,B 协方差矩阵相同&#xff0c;并且服从多元正态分布 计算得到的样本均值和样本离差阵分别为 X ‾ A ( 1 , 2 , 3 ) T , V B d i a g ( 1 , 1 , 1 ) X ‾ B ( 1.5 , 2.5 , 3.5 ) T , V B d i…...

windows remote desktop service 远程桌面RDS授权激活

windows remote desktop service 远程桌面RDS授权激活 功能介绍&#xff1a;操作步骤&#xff1a;1、添加远程桌面授权服务2、添加远程桌面授权许可 功能介绍&#xff1a; 本文以 windows Server 2016为例&#xff0c;系统默认远程桌面连接数是2个用户&#xff0c;如果多余两个…...

6-pandas数据读取

前言 一、分组聚合 1.groupby使用&#xff1a; groupby() 是 pandas 库中用于对数据进行分组操作的一个非常重要的方法。 import pandas as pddata {城市: [北京, 上海, 广州, 北京, 上海, 广州],人口: [2154, 2424, 1303, 2154, 2424, 1303],年龄: [25, 30, 35, 25, 30, 3…...

【Logback详解】

Logback详解 Logback 是一个用于 Java 应用的日志框架&#xff0c;它由 Log4j 的创始人 Ceki Glc 创建。Logback 分为三个模块&#xff1a;logback-core、logback-classic 和 logback-access。logback-classic 模块实现了 SLF4J (Simple Logging Facade for Java) API&#xf…...

Flume的概念和原理

一、Flume的概念 1、flume 作为 cloudera 开发的实时日志收集系统 2、flume一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方&#xff0c;用于收集数据;同时&#xff0c;Flume提供对数据进行简单处理&#xff0c;并写到各种…...

初始nginx

华子目录 nginx介绍nginx功能介绍基础特性web服务相关功能nginx进程结构web请求处理机制 nginx进程间通信nginx启动与http连接建立http处理过程 nginx模块介绍nginx命令演示 nginx介绍 nginx是免费的、开源的、高性能的HTTP和反向代理服务器、邮件代理服务器、以及TCP/UDP代理服…...

vulnhub靶场 Empire LupinOne

使用命令查看靶机ip,访问ip arp-scan -l 使用御剑扫描一下子域名&#xff0c;但是没有获取到什么有用的信息 这是一个Apache文档&#xff0c;没有什么用 紧接着我们尝试暴力破解&#xff0c;这里推荐使用ffuf工具暴力破解目录&#xff0c;kali自带的ffuf扫描速度贼快 参数解释…...

6-Gin 路由详解 --[Gin 框架入门精讲与实战案例]

Gin 是一个用 Go 语言编写的 HTTP Web 框架&#xff0c;以其高性能和简洁的 API 而闻名。它提供了一套强大的路由功能&#xff0c;使得开发者可以轻松地定义 URL 路由规则&#xff0c;并将这些规则映射到具体的处理函数&#xff08;handler&#xff09;。以下是关于 Gin 路由的…...

使用Lodash工具库的orderby和sortby进行排序的区别

简介 _.orderBy 和 _.sortBy 是 Lodash 库中用于排序数组的两个函数。 区别 _.orderBy 允许你指定一个或多个属性来排序&#xff0c;并为每个属性指定排序方向&#xff08;升序或降序&#xff09;。默认所有值为升序排&#xff0c;指定为"desc" 降序&#xff0c…...

CSS面试题|[2024-12-24]

1.说一下CSS的盒模型 在HTML页面中的所有元素都可以看成是一个盒子 盒子的组成&#xff1a;内容content、内边距padding、边框border、外边距margin 盒模型的类型&#xff1a; 标准盒模型 margin border padding content IE盒模型 margin content&#xff08;包括border p…...

flask-admin 在modelview 视图中重写on_model_change 与after_model_change

背景&#xff1a; 当我们在使用flask-admin进行WEB开发时应该第一时间想到的是竟可能使用框架推荐的modelView模型&#xff0c;其次才是自定义模型 baseview,因为只有modelview模型下开发才能最大限度的提高效率。 制作&#xff1a; 1、在modelview视图下框架会通过默认视图…...

Excel粘贴复制不完整的原因以及解决方法

在数据处理和分析的过程中&#xff0c;Excel无疑是不可或缺的工具。然而&#xff0c;在使用Excel进行复制粘贴操作时&#xff0c;有时会遇到粘贴不完整的情况&#xff0c;这可能会让人感到困惑和烦恼。本文将深入探讨Excel粘贴复制不完整的原因、提供解决方案&#xff0c;并给出…...

【深度学习环境】NVIDIA Driver、Cuda和Pytorch(centos9机器,要用到显示器)

文章目录 一 、Anaconda install二、 NIVIDIA driver install三、 Cuda install四、Pytorch install 一 、Anaconda install Step 1 Go to the official website: https://www.anaconda.com/download Input your email and submit. Step 2 Select your version, and click i…...

Cocos Creator 3.8.5 正式发布,更小更快更多平台!

在 Cocos Creator 3.8.5 版本中&#xff0c;我们做了新一轮的优化。 在加载速度、代码裁剪、平台增强等多方面做了优化&#xff0c;提升了开发者体验和游戏性能。 希望能够助 Cocos 开发者们的产品更上一层楼。 一、加载速度优化 1、WASM 模块延迟加载 在早期版本中&#xff0c…...

Python中构建终端应用界面利器——Blessed模块

在现代开发中&#xff0c;命令行应用已经不再仅仅是一个简单的文本输入输出工具。随着需求的复杂化和用户体验的重视&#xff0c;终端界面也逐渐成为一个不可忽视的设计环节。 如果你曾经尝试过开发终端UI&#xff0c;可能对传统的 print() 或者 input() 函数感到不满足&#…...

Android 15 状态栏闹钟图标不显示问题修复

Android 15 状态栏闹钟图标不显示问题修复 问题描述 在 Android 15 系统中,发现即使设置了闹钟,状态栏也不会显示闹钟图标。这个问题影响了用户及时查看闹钟状态的体验。 问题分析 通过查看 SystemUI 的配置文件,发现在 frameworks/base/packages/SystemUI/res/values/conf…...

数据采集背后的效率革命:如何优化你的爬虫性能

在爬虫技术日益发展的今天&#xff0c;性能优化成为提升数据采集效率的关键。面对日益复杂的网页结构和庞大的数据量&#xff0c;高效的爬虫能够显著降低运行时间和资源成本。本文将围绕爬虫性能优化的核心方法展开讨论&#xff0c;并通过实例对比多进程、多线程以及普通爬取的…...

【Compose multiplatform教程06】用IDEA编译Compose Multiplatform常见问题

当我们从Kotlin Multiplatform Wizard | JetBrains 下载ComposeMultiplatform项目时 会遇到无法正常编译/运行的情况&#xff0c;一般网页和桌面是可以正常编译的&#xff0c; 我这里着重解决如下问题 1:Gradle版本不兼容或者Gradle连接超时 2:JDK版本不兼容 3:Gradle依赖库连…...

深度学习在微纳光子学中的应用

深度学习在微纳光子学中的主要应用方向 深度学习与微纳光子学的结合主要集中在以下几个方向&#xff1a; 逆向设计 通过神经网络快速预测微纳结构的光学响应&#xff0c;替代传统耗时的数值模拟方法。例如设计超表面、光子晶体等结构。 特征提取与优化 从复杂的光学数据中自…...

Unity3D中Gfx.WaitForPresent优化方案

前言 在Unity中&#xff0c;Gfx.WaitForPresent占用CPU过高通常表示主线程在等待GPU完成渲染&#xff08;即CPU被阻塞&#xff09;&#xff0c;这表明存在GPU瓶颈或垂直同步/帧率设置问题。以下是系统的优化方案&#xff1a; 对惹&#xff0c;这里有一个游戏开发交流小组&…...

大型活动交通拥堵治理的视觉算法应用

大型活动下智慧交通的视觉分析应用 一、背景与挑战 大型活动&#xff08;如演唱会、马拉松赛事、高考中考等&#xff09;期间&#xff0c;城市交通面临瞬时人流车流激增、传统摄像头模糊、交通拥堵识别滞后等问题。以演唱会为例&#xff0c;暖城商圈曾因观众集中离场导致周边…...

376. Wiggle Subsequence

376. Wiggle Subsequence 代码 class Solution { public:int wiggleMaxLength(vector<int>& nums) {int n nums.size();int res 1;int prediff 0;int curdiff 0;for(int i 0;i < n-1;i){curdiff nums[i1] - nums[i];if( (prediff > 0 && curdif…...

Java面试专项一-准备篇

一、企业简历筛选规则 一般企业的简历筛选流程&#xff1a;首先由HR先筛选一部分简历后&#xff0c;在将简历给到对应的项目负责人后再进行下一步的操作。 HR如何筛选简历 例如&#xff1a;Boss直聘&#xff08;招聘方平台&#xff09; 直接按照条件进行筛选 例如&#xff1a…...

MySQL账号权限管理指南:安全创建账户与精细授权技巧

在MySQL数据库管理中&#xff0c;合理创建用户账号并分配精确权限是保障数据安全的核心环节。直接使用root账号进行所有操作不仅危险且难以审计操作行为。今天我们来全面解析MySQL账号创建与权限分配的专业方法。 一、为何需要创建独立账号&#xff1f; 最小权限原则&#xf…...

python报错No module named ‘tensorflow.keras‘

是由于不同版本的tensorflow下的keras所在的路径不同&#xff0c;结合所安装的tensorflow的目录结构修改from语句即可。 原语句&#xff1a; from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense 修改后&#xff1a; from tensorflow.python.keras.lay…...

保姆级教程:在无网络无显卡的Windows电脑的vscode本地部署deepseek

文章目录 1 前言2 部署流程2.1 准备工作2.2 Ollama2.2.1 使用有网络的电脑下载Ollama2.2.2 安装Ollama&#xff08;有网络的电脑&#xff09;2.2.3 安装Ollama&#xff08;无网络的电脑&#xff09;2.2.4 安装验证2.2.5 修改大模型安装位置2.2.6 下载Deepseek模型 2.3 将deepse…...

Python基于历史模拟方法实现投资组合风险管理的VaR与ES模型项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档&#xff09;&#xff0c;如需数据代码文档可以直接到文章最后关注获取。 1.项目背景 在金融市场日益复杂和波动加剧的背景下&#xff0c;风险管理成为金融机构和个人投资者关注的核心议题之一。VaR&…...

虚拟电厂发展三大趋势:市场化、技术主导、车网互联

市场化&#xff1a;从政策驱动到多元盈利 政策全面赋能 2025年4月&#xff0c;国家发改委、能源局发布《关于加快推进虚拟电厂发展的指导意见》&#xff0c;首次明确虚拟电厂为“独立市场主体”&#xff0c;提出硬性目标&#xff1a;2027年全国调节能力≥2000万千瓦&#xff0…...