Flink系列之:State Time-To-Live (TTL)
Flink系列之:State Time-To-Live TTL
- 一、TTL
- 二、TTL实现代码
- 三、过期状态的清理
一、TTL
- Flink的TTL(Time-To-Live)是一种数据过期策略,用于指定数据在流处理中的存活时间。TTL可以应用于Flink中的状态或事件时间窗口,以控制数据的保留时间。
- 当应用程序使用状态进行计算时,状态可能会消耗存储资源。TTL可以用来设置状态的最大生存时间,超过该时间的状态将被自动清理,以释放存储资源。这可以帮助应对状态数据的增长和资源限制问题。
- 对于事件时间窗口,TTL可以用来指定窗口的持续时间。当到达窗口结束时间后,该窗口的结果将被输出,并且窗口中的所有数据将被清理。这可以确保计算结果及时输出,并释放计算资源。
- 通过设置适当的TTL值,可以控制数据的保留时间,避免资源浪费和计算延迟。TTL的使用可以根据具体应用场景和需求进行配置,以实现数据管理的灵活性和效率。
- 可以将生存时间 (TTL) 分配给任何类型的键控状态。如果配置了 TTL 并且状态值已过期,则将尽最大努力清除存储的值,这将在下面更详细地讨论。
- 所有状态集合类型都支持每条目 TTL。这意味着列表元素和映射条目独立过期。
- 为了使用状态 TTL,必须首先构建一个 StateTtlConfig 配置对象。然后可以通过传递配置在任何状态描述符中启用 TTL 功能:
二、TTL实现代码
java代码:
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
这段代码使用Apache Flink提供的StateTtlConfig来设置状态的TTL(Time-To-Live)配置。
- 首先,导入必要的包org.apache.flink.api.common.state.StateTtlConfig和org.apache.flink.api.common.state.ValueStateDescriptor。
- 然后,创建StateTtlConfig对象ttlConfig,并使用StateTtlConfig.newBuilder(Time.seconds(1))来指定TTL的时间长度为1秒。这意味着状态数据的最大生存时间为1秒。
- 接下来,调用ttlConfig的setUpdateType方法,将UpdateType设置为StateTtlConfig.UpdateType.OnCreateAndWrite。这表示在创建和写入状态时更新TTL。
- 然后,调用ttlConfig的setStateVisibility方法,将StateVisibility设置为StateTtlConfig.StateVisibility.NeverReturnExpired。这表示状态在过期后永远不会返回,也就是被清理后不会再被读取。
- 最后,使用ValueStateDescriptor创建一个名为"text state"的状态描述符stateDescriptor,并调用stateDescriptor的enableTimeToLive方法,将ttlConfig传递给它。这将启用状态的TTL配置。
- 通过配置TTL,可以控制状态的生存时间,以及何时更新和清理状态。这有助于管理状态数据的存储和性能。
Scala代码:
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Timeval ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).buildval stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)
Python代码:
from pyflink.common.time import Time
from pyflink.common.typeinfo import Types
from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfigttl_config = StateTtlConfig \.new_builder(Time.seconds(1)) \.set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \.set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) \.build()state_descriptor = ValueStateDescriptor("text state", Types.STRING())
state_descriptor.enable_time_to_live(ttl_config)
该配置有几个选项需要考虑:
- newBuilder方法的第一个参数是必需的,它是生存时间值。
- 更新类型配置何时刷新状态 TTL(默认为 OnCreateAndWrite):
- StateTtlConfig.UpdateType.OnCreateAndWrite - 仅在创建和写入访问时
- StateTtlConfig.UpdateType.OnReadAndWrite - 也用于读取访问
- (注:如果同时将状态可见性设置为StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp,状态读缓存将被禁用,这会导致PyFlink中的一些性能损失)
状态可见性配置如果尚未清除过期值,是否在读取访问时返回过期值(默认为 NeverReturnExpired):
- StateTtlConfig.StateVisibility.NeverReturnExpired - 永远不会返回过期值
(注:状态读/写缓存将被禁用,这会导致 PyFlink 中的一些性能损失)
- StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 如果仍然可用则返回
在 NeverReturnExpired 的情况下,过期状态的行为就好像它不再存在一样,即使它仍然需要被删除。该选项对于数据必须在 TTL 之后严格无法进行读取访问的用例很有用,例如处理隐私敏感数据的应用程序。
另一个选项 ReturnExpiredIfNotCleanedUp 允许在清理之前返回过期状态。
笔记:
- 状态后端存储上次修改的时间戳以及用户值,这意味着启用此功能会增加状态存储的消耗。堆状态后端存储一个附加的 Java 对象,其中包含对用户状态对象的引用和内存中的原始 long 值。 RocksDB 状态后端为每个存储值、列表条目或映射条目添加 8 个字节。
- 当前仅支持涉及处理时间的 TTL。
- 尝试使用启用 TTL 的描述符恢复之前未配置 TTL 的状态,反之亦然,将导致兼容性失败和 StateMigrationException。
- TTL 配置不是检查点或保存点的一部分,而是 Flink 在当前运行的作业中处理它的一种方式。
- 不建议通过将 ttl 从短值调整为长值来恢复检查点状态,这可能会导致潜在的数据错误。
- 目前,仅当用户值序列化程序可以处理空值时,具有 TTL 的映射状态才支持空用户值。如果序列化器不支持 null 值,则可以使用 NullableSerializer 对其进行包装,但需要在序列化形式中增加一个额外字节。
- 启用 TTL 的配置后,StateDescriptor 中的 defaultValue 实际上已被弃用,将不再生效。这样做的目的是使语义更加清晰,并让用户在状态内容为空或过期时手动管理默认值。
三、过期状态的清理
默认情况下,过期值会在读取时显式删除,例如 ValueState#value,并在配置的状态后端支持的情况下定期在后台进行垃圾收集。可以在 StateTtlConfig 中禁用后台清理:
Java代码:
import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).disableCleanupInBackground().build();
Scala代码:
import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).disableCleanupInBackground.build
Python代码:
from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfigttl_config = StateTtlConfig \.new_builder(Time.seconds(1)) \.cleanup_in_rocksdb_compact_filter(1000, Time.hours(1)) \.build()
为了对后台的一些特殊清理进行更细粒度的控制,您可以如下所述单独配置它。目前,堆状态后端依赖于增量清理,RocksDB 后端使用压缩过滤器进行后台清理。
完整快照中的清理
此外,您可以在拍摄完整状态快照时激活清理,这将减少其大小。在当前实现下,本地状态不会被清除,但在从以前的快照恢复时,它不会包括删除的过期状态。可以在StateTtlConfig中配置:
Java代码:
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupFullSnapshot().build();
Scala代码:
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.time.Timeval ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupFullSnapshot.build
Python代码:
from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfigttl_config = StateTtlConfig \.new_builder(Time.seconds(1)) \.cleanup_full_snapshot() \.build()
此选项不适用于 RocksDB 状态后端中的增量检查点。
对于现有作业,可以在 StateTtlConfig 中随时激活或停用此清理策略,例如从保存点重新启动后。
增量清理
另一种选择是逐步触发某些状态条目的清理。触发器可以是来自每个状态访问或/和每个记录处理的回调。如果此清理策略对于某些状态是活动的,则存储后端会在其所有条目上为此状态保留一个惰性全局迭代器。每次触发增量清理时,迭代器都会前进。检查遍历的状态条目并清除过期的状态条目。
该功能可以在 StateTtlConfig 中配置:
Java代码:
import org.apache.flink.api.common.state.StateTtlConfig;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupIncrementally(10, true).build();
Scala代码:
import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupIncrementally(10, true).build
Python:
from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfigttl_config = StateTtlConfig \.new_builder(Time.seconds(1)) \.cleanup_incrementally(10, True) \.build()
该策略有两个参数。第一个是每次清理触发的检查状态条目数。它总是在每次状态访问时触发。第二个参数定义是否在每次记录处理时额外触发清理。堆后端的默认后台清理会检查 5 个条目,而不会针对每个记录处理进行清理。
笔记:
- 如果没有对状态进行访问或没有处理任何记录,则过期状态将持续存在。
- 增量清理所花费的时间会增加记录处理延迟。
- 目前增量清理仅针对堆状态后端实现。对 RocksDB 设置它不会有任何效果。
- 如果堆状态后端与同步快照一起使用,则全局迭代器在迭代时会保留所有键的副本,因为其特定实现不支持并发修改。启用此功能将增加内存消耗。异步快照则不存在这个问题。
- 对于现有作业,可以在 StateTtlConfig 中随时激活或停用此清理策略,例如从保存点重新启动后。
RocksDB 压缩期间的清理
如果使用 RocksDB 状态后端,将调用 Flink 特定的压缩过滤器进行后台清理。 RocksDB 定期运行异步压缩来合并状态更新并减少存储。 Flink 压缩过滤器使用 TTL 检查状态条目的过期时间戳并排除过期值。
该功能可以在 StateTtlConfig 中配置:
Java代码:
import org.apache.flink.api.common.state.StateTtlConfig;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupInRocksdbCompactFilter(1000, Time.hours(1)).build();
Scala代码:
import org.apache.flink.api.common.state.StateTtlConfigval ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupInRocksdbCompactFilter(1000, Time.hours(1)).build
Python代码:
from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfigttl_config = StateTtlConfig \.new_builder(Time.seconds(1)) \.cleanup_in_rocksdb_compact_filter(1000, Time.hours(1)) \.build()
RocksDB 压缩过滤器每次处理一定数量的状态条目后都会从 Flink 查询当前时间戳,用于检查过期情况。您可以更改它并将自定义值传递给 StateTtlConfig.newBuilder(…).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) 方法。更频繁地更新时间戳可以提高清理速度,但会降低压缩性能,因为它使用来自本机代码的 JNI 调用。 RocksDB 后端的默认后台清理会在每次处理 1000 个条目时查询当前时间戳。
定期压缩可以加快过期状态条目的清理速度,特别是对于很少访问的状态条目。早于该值的文件将被拾取进行压缩,并重新写入到与之前相同的级别。它确保文件定期通过压缩过滤器。您可以更改它并将自定义值传递给 StateTtlConfig.newBuilder(…).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries, Time periodicalCompactionTime) 方法。定期压缩秒数的默认值为 30 天。您可以将其设置为 0 以关闭定期压缩,或设置一个较小的值以加速过期状态条目清理,但它会触发更多压缩。
您可以通过激活 FlinkCompactionFilter 的调试级别来从 RocksDB 过滤器的本机代码激活调试日志:
log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG
笔记:
- 在压缩过程中调用 TTL 过滤器会减慢速度。 TTL 过滤器必须解析上次访问的时间戳,并检查正在压缩的每个键的每个存储状态条目的过期时间。如果是集合状态类型(列表或映射),还会针对每个存储的元素调用检查。
- 如果此功能与包含非固定字节长度元素的列表状态一起使用,则本机 TTL 过滤器必须在每个状态条目(其中至少第一个元素已过期)额外通过 JNI 调用该元素的 Flink java 类型序列化器确定下一个未过期元素的偏移量。
- 对于现有作业,可以在 StateTtlConfig 中随时激活或停用此清理策略,例如从保存点重新启动后。
- 周期性压缩仅在启用 TTL 时才起作用。
相关文章:
Flink系列之:State Time-To-Live (TTL)
Flink系列之:State Time-To-Live TTL 一、TTL二、TTL实现代码三、过期状态的清理 一、TTL Flink的TTL(Time-To-Live)是一种数据过期策略,用于指定数据在流处理中的存活时间。TTL可以应用于Flink中的状态或事件时间窗口࿰…...

数据结构(Chapter Two -01)—线性表及顺序表
2.1 线性表 线性表是具有相同数据类型的n个数据元素的有限序列。第一个元素为表头元素,最后一个元素为表尾元素。除第一个元素,每个元素有且仅有一个直接前驱。除最后一个元素,每个元素都仅有一个直接后继。 其中线性表包括以下(…...
【刷题笔记1】
笔记1 string s;while(cin>>s);cout<<s.length()<<endl;输入为hello nowcoder时,输出为8 (nowcoder的长度) 2.字符串的输入(有空格) string a;getline(cin, a);cout<<a<<endl;输入为ABCabc a 输出为ABCabc a …...

视频数据卡设计方案:120-基于PCIe的视频数据卡
一、产品概述 基于PCIe的一款视频数据收发卡,并通过PCIe传输到存储计算服务器,实现信号的采集、分析、模拟输出,存储。 产品固化FPGA逻辑,实现PCIe的连续采集,单次采集容量2GB,开源的PCIe QT客…...

Windows使用VNC Viewer远程桌面Ubuntu【内网穿透】
文章目录 前言1. ubuntu安装VNC2. 设置vnc开机启动3. windows 安装VNC viewer连接工具4. 内网穿透4.1 安装cpolar【支持使用一键脚本命令安装】4.2 创建隧道映射4.3 测试公网远程访问 5. 配置固定TCP地址5.1 保留一个固定的公网TCP端口地址5.2 配置固定公网TCP端口地址5.3 测试…...

javascript 数组处理的两个利器: `forEach` 和 `map`(上)
🤍 前端开发工程师(主业)、技术博主(副业)、已过CET6 🍨 阿珊和她的猫_CSDN个人主页 🕠 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 🍚 蓝桥云课签约作者、已在蓝桥云…...

【C语言】SCU安全项目1-FindKeys
目录 前言 命令行参数 16进制转字符串 extract_message1 process_keys12 extract_message2 main process_keys34 前言 因为这个学期基本都在搞CTF的web方向,C语言不免荒废。所幸还会一点指针相关的知识,故第一个安全项目做的挺顺利的,…...

IDA pro软件 如何修改.exe小程序打开对话框显示的文字?
环境: Win10 专业版 IDA pro Version 7.5.201028 .exe小程序 问题描述: IDA pro软件 如何修改.exe小程序打开对话框显示的文字? 解决方案: 一、在IDA Python脚本中编写代码来修改.rdata段中的静态字符串可以使用以下示例代码作为起点(未成功) import idc# 定义要修…...

Ubuntu22.04切换用户
一、只有一个用户时没有切换用户菜单项 1、用户信息 cat /etc/passwd 2、系统菜单 二、添加用户 添加新用户ym,全名yang mi 三、有两个及以上的用户时出现切换用户菜单项 1、用户信息 cat /etc/passwd 2、系统菜单 四、切换用户 1、点击上图中Switch User …...

torch.gather(...)
1. Abstract 对于 pytorch 中的函数 torch.gather(input, # (Tensor) the source tensordim, # (int) the axis along which to indexindex, # (LongTensor) the indices of elements to gather*,sparse_gradFalse,outNone ) → Tensor有点绕,很多博客画各…...
vscode如何开发微信小程序?JS与TS的主要区别?
要在 VS Code 中编写微信小程序代码并同步到 Git,需要安装以下插件: 1. 微信小程序插件(WeChat Mini Program):此插件提供了微信小程序的语法高亮、代码提示、调试、上传等功能。 2. Git 插件(GitLens、…...

产品入门第五讲:Axure交互和情境
目录 一.Axure交互和情境的介绍 1.交互介绍 概念 常见的Axure交互设计技巧 2.情境介绍 概念 常见的Axure情境设计技巧: 二.实例展示 1.ERP登录页到主页的跳转 2.ERP的菜单跳转到各个页面 📚📚 🏅我是默,一个…...

Python 自动化之收发邮件(一)
imapclient / smtplib 收发邮件 文章目录 imapclient / smtplib 收发邮件前言一、基本内容二、发送邮件1.整体代码 三、获取邮件1.整体代码 总结 前言 简单给大家写个如何用Python进行发邮件和查看邮件教程,希望对各位有所帮助。 一、基本内容 本文主要分为两部分…...
Flutter开发笔记 —— sqflite插件数据库应用
前言 今天在观阅掘金大佬文章的时候,了解到了该 sqflite 插件,结合官网教程和自己实践,由此总结出该文,希望对大家的学习有帮助! 插件详情 Flutter的 SQLite 插件。支持 iOS、Android 和 MacOS。 支持事务和batch模式…...

OxLint 发布了,Eslint 何去何从?
由于最近的rust在前端领域的崛起,基于rust的前端生态链遭到rust底层重构,最近又爆出OxLint,是一款基于Rust的linter工具Oxlint在国外前端圈引起热烈讨论,很多大佬给出了高度评价;你或许不知道OxLint,相比ES…...
第一次使用ThreadPoolExecutor处理业务
通过对业务逻辑的分析,进行编码,先把第一条sql查出来的数据进行分组,然后分别使用不同的线程去查询数据返回,并添加到原来的数据中。 总感觉哪里写的不对,但是同事们都没用过这个,请大家指教一下ÿ…...

Sharding-Jdbc(6):Sharding-Jdbc日志分析
1 修改配置 将配置文件中的开启分片日志从false改为true Sharding-JDBC中的路由结果是通过分片字段和分片方法来确定的,如果查询条件中有 id 字段的情况还好,查询将会落到某个具体的分片;如果查询没有分片的字段,会向所有的db或者是表都会查…...

centos安装了curl却报 -bash: curl: command not found
前因 我服务器上想用curl下载docker-compress,发现没有curl命令,就去下载安装,安装完成之后,报-bash: curl: command not found 解决方法 [rootcentos ~]# rpm -e --nodeps curl warning: file /usr/bin/curl: remove failed: …...

Re58:读论文 REALM: Retrieval-Augmented Language Model Pre-Training
诸神缄默不语-个人CSDN博文目录 诸神缄默不语的论文阅读笔记和分类 论文名称:REALM: Retrieval-Augmented Language Model Pre-Training 模型名称:Retrieval-Augmented Language Model pre-training (REALM) 本文是2020年ICML论文,作者来自…...
java的json解析
import com.alibaba.fastjson.*; public class JsonParser { public static void main(String[] args) { String jsonStr "{\"name\":\"John\", \"age\":30}"; // JSON字符串示例 // 将JSON字符串转换为JSONObject对象 JSONObje…...

linux之kylin系统nginx的安装
一、nginx的作用 1.可做高性能的web服务器 直接处理静态资源(HTML/CSS/图片等),响应速度远超传统服务器类似apache支持高并发连接 2.反向代理服务器 隐藏后端服务器IP地址,提高安全性 3.负载均衡服务器 支持多种策略分发流量…...
DeepSeek 赋能智慧能源:微电网优化调度的智能革新路径
目录 一、智慧能源微电网优化调度概述1.1 智慧能源微电网概念1.2 优化调度的重要性1.3 目前面临的挑战 二、DeepSeek 技术探秘2.1 DeepSeek 技术原理2.2 DeepSeek 独特优势2.3 DeepSeek 在 AI 领域地位 三、DeepSeek 在微电网优化调度中的应用剖析3.1 数据处理与分析3.2 预测与…...
《Playwright:微软的自动化测试工具详解》
Playwright 简介:声明内容来自网络,将内容拼接整理出来的文档 Playwright 是微软开发的自动化测试工具,支持 Chrome、Firefox、Safari 等主流浏览器,提供多语言 API(Python、JavaScript、Java、.NET)。它的特点包括&a…...

YSYX学习记录(八)
C语言,练习0: 先创建一个文件夹,我用的是物理机: 安装build-essential 练习1: 我注释掉了 #include <stdio.h> 出现下面错误 在你的文本编辑器中打开ex1文件,随机修改或删除一部分,之后…...
linux 错误码总结
1,错误码的概念与作用 在Linux系统中,错误码是系统调用或库函数在执行失败时返回的特定数值,用于指示具体的错误类型。这些错误码通过全局变量errno来存储和传递,errno由操作系统维护,保存最近一次发生的错误信息。值得注意的是,errno的值在每次系统调用或函数调用失败时…...
反射获取方法和属性
Java反射获取方法 在Java中,反射(Reflection)是一种强大的机制,允许程序在运行时访问和操作类的内部属性和方法。通过反射,可以动态地创建对象、调用方法、改变属性值,这在很多Java框架中如Spring和Hiberna…...

用机器学习破解新能源领域的“弃风”难题
音乐发烧友深有体会,玩音乐的本质就是玩电网。火电声音偏暖,水电偏冷,风电偏空旷。至于太阳能发的电,则略显朦胧和单薄。 不知你是否有感觉,近两年家里的音响声音越来越冷,听起来越来越单薄? —…...
iOS性能调优实战:借助克魔(KeyMob)与常用工具深度洞察App瓶颈
在日常iOS开发过程中,性能问题往往是最令人头疼的一类Bug。尤其是在App上线前的压测阶段或是处理用户反馈的高发期,开发者往往需要面对卡顿、崩溃、能耗异常、日志混乱等一系列问题。这些问题表面上看似偶发,但背后往往隐藏着系统资源调度不当…...

20个超级好用的 CSS 动画库
分享 20 个最佳 CSS 动画库。 它们中的大多数将生成纯 CSS 代码,而不需要任何外部库。 1.Animate.css 一个开箱即用型的跨浏览器动画库,可供你在项目中使用。 2.Magic Animations CSS3 一组简单的动画,可以包含在你的网页或应用项目中。 3.An…...

【Redis】笔记|第8节|大厂高并发缓存架构实战与优化
缓存架构 代码结构 代码详情 功能点: 多级缓存,先查本地缓存,再查Redis,最后才查数据库热点数据重建逻辑使用分布式锁,二次查询更新缓存采用读写锁提升性能采用Redis的发布订阅机制通知所有实例更新本地缓存适用读多…...