Flink系列之:State Time-To-Live (TTL)
Flink系列之:State Time-To-Live TTL
- 一、TTL
- 二、TTL实现代码
- 三、过期状态的清理
一、TTL
- Flink的TTL(Time-To-Live)是一种数据过期策略,用于指定数据在流处理中的存活时间。TTL可以应用于Flink中的状态或事件时间窗口,以控制数据的保留时间。
- 当应用程序使用状态进行计算时,状态可能会消耗存储资源。TTL可以用来设置状态的最大生存时间,超过该时间的状态将被自动清理,以释放存储资源。这可以帮助应对状态数据的增长和资源限制问题。
- 对于事件时间窗口,TTL可以用来指定窗口的持续时间。当到达窗口结束时间后,该窗口的结果将被输出,并且窗口中的所有数据将被清理。这可以确保计算结果及时输出,并释放计算资源。
- 通过设置适当的TTL值,可以控制数据的保留时间,避免资源浪费和计算延迟。TTL的使用可以根据具体应用场景和需求进行配置,以实现数据管理的灵活性和效率。
- 可以将生存时间 (TTL) 分配给任何类型的键控状态。如果配置了 TTL 并且状态值已过期,则将尽最大努力清除存储的值,这将在下面更详细地讨论。
- 所有状态集合类型都支持每条目 TTL。这意味着列表元素和映射条目独立过期。
- 为了使用状态 TTL,必须首先构建一个 StateTtlConfig 配置对象。然后可以通过传递配置在任何状态描述符中启用 TTL 功能:
二、TTL实现代码
java代码:
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
这段代码使用Apache Flink提供的StateTtlConfig来设置状态的TTL(Time-To-Live)配置。
- 首先,导入必要的包org.apache.flink.api.common.state.StateTtlConfig和org.apache.flink.api.common.state.ValueStateDescriptor。
- 然后,创建StateTtlConfig对象ttlConfig,并使用StateTtlConfig.newBuilder(Time.seconds(1))来指定TTL的时间长度为1秒。这意味着状态数据的最大生存时间为1秒。
- 接下来,调用ttlConfig的setUpdateType方法,将UpdateType设置为StateTtlConfig.UpdateType.OnCreateAndWrite。这表示在创建和写入状态时更新TTL。
- 然后,调用ttlConfig的setStateVisibility方法,将StateVisibility设置为StateTtlConfig.StateVisibility.NeverReturnExpired。这表示状态在过期后永远不会返回,也就是被清理后不会再被读取。
- 最后,使用ValueStateDescriptor创建一个名为"text state"的状态描述符stateDescriptor,并调用stateDescriptor的enableTimeToLive方法,将ttlConfig传递给它。这将启用状态的TTL配置。
- 通过配置TTL,可以控制状态的生存时间,以及何时更新和清理状态。这有助于管理状态数据的存储和性能。
Scala代码:
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Timeval ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).buildval stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)
Python代码:
from pyflink.common.time import Time
from pyflink.common.typeinfo import Types
from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfigttl_config = StateTtlConfig \.new_builder(Time.seconds(1)) \.set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \.set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) \.build()state_descriptor = ValueStateDescriptor("text state", Types.STRING())
state_descriptor.enable_time_to_live(ttl_config)
该配置有几个选项需要考虑:
- newBuilder方法的第一个参数是必需的,它是生存时间值。
- 更新类型配置何时刷新状态 TTL(默认为 OnCreateAndWrite):
- StateTtlConfig.UpdateType.OnCreateAndWrite - 仅在创建和写入访问时
- StateTtlConfig.UpdateType.OnReadAndWrite - 也用于读取访问
- (注:如果同时将状态可见性设置为StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp,状态读缓存将被禁用,这会导致PyFlink中的一些性能损失)
状态可见性配置如果尚未清除过期值,是否在读取访问时返回过期值(默认为 NeverReturnExpired):
- StateTtlConfig.StateVisibility.NeverReturnExpired - 永远不会返回过期值
(注:状态读/写缓存将被禁用,这会导致 PyFlink 中的一些性能损失)
- StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 如果仍然可用则返回
在 NeverReturnExpired 的情况下,过期状态的行为就好像它不再存在一样,即使它仍然需要被删除。该选项对于数据必须在 TTL 之后严格无法进行读取访问的用例很有用,例如处理隐私敏感数据的应用程序。
另一个选项 ReturnExpiredIfNotCleanedUp 允许在清理之前返回过期状态。
笔记:
- 状态后端存储上次修改的时间戳以及用户值,这意味着启用此功能会增加状态存储的消耗。堆状态后端存储一个附加的 Java 对象,其中包含对用户状态对象的引用和内存中的原始 long 值。 RocksDB 状态后端为每个存储值、列表条目或映射条目添加 8 个字节。
- 当前仅支持涉及处理时间的 TTL。
- 尝试使用启用 TTL 的描述符恢复之前未配置 TTL 的状态,反之亦然,将导致兼容性失败和 StateMigrationException。
- TTL 配置不是检查点或保存点的一部分,而是 Flink 在当前运行的作业中处理它的一种方式。
- 不建议通过将 ttl 从短值调整为长值来恢复检查点状态,这可能会导致潜在的数据错误。
- 目前,仅当用户值序列化程序可以处理空值时,具有 TTL 的映射状态才支持空用户值。如果序列化器不支持 null 值,则可以使用 NullableSerializer 对其进行包装,但需要在序列化形式中增加一个额外字节。
- 启用 TTL 的配置后,StateDescriptor 中的 defaultValue 实际上已被弃用,将不再生效。这样做的目的是使语义更加清晰,并让用户在状态内容为空或过期时手动管理默认值。
三、过期状态的清理
默认情况下,过期值会在读取时显式删除,例如 ValueState#value,并在配置的状态后端支持的情况下定期在后台进行垃圾收集。可以在 StateTtlConfig 中禁用后台清理:
Java代码:
import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).disableCleanupInBackground().build();
Scala代码:
import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).disableCleanupInBackground.build
Python代码:
from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfigttl_config = StateTtlConfig \.new_builder(Time.seconds(1)) \.cleanup_in_rocksdb_compact_filter(1000, Time.hours(1)) \.build()
为了对后台的一些特殊清理进行更细粒度的控制,您可以如下所述单独配置它。目前,堆状态后端依赖于增量清理,RocksDB 后端使用压缩过滤器进行后台清理。
完整快照中的清理
此外,您可以在拍摄完整状态快照时激活清理,这将减少其大小。在当前实现下,本地状态不会被清除,但在从以前的快照恢复时,它不会包括删除的过期状态。可以在StateTtlConfig中配置:
Java代码:
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupFullSnapshot().build();
Scala代码:
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.time.Timeval ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupFullSnapshot.build
Python代码:
from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfigttl_config = StateTtlConfig \.new_builder(Time.seconds(1)) \.cleanup_full_snapshot() \.build()
此选项不适用于 RocksDB 状态后端中的增量检查点。
对于现有作业,可以在 StateTtlConfig 中随时激活或停用此清理策略,例如从保存点重新启动后。
增量清理
另一种选择是逐步触发某些状态条目的清理。触发器可以是来自每个状态访问或/和每个记录处理的回调。如果此清理策略对于某些状态是活动的,则存储后端会在其所有条目上为此状态保留一个惰性全局迭代器。每次触发增量清理时,迭代器都会前进。检查遍历的状态条目并清除过期的状态条目。
该功能可以在 StateTtlConfig 中配置:
Java代码:
import org.apache.flink.api.common.state.StateTtlConfig;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupIncrementally(10, true).build();
Scala代码:
import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupIncrementally(10, true).build
Python:
from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfigttl_config = StateTtlConfig \.new_builder(Time.seconds(1)) \.cleanup_incrementally(10, True) \.build()
该策略有两个参数。第一个是每次清理触发的检查状态条目数。它总是在每次状态访问时触发。第二个参数定义是否在每次记录处理时额外触发清理。堆后端的默认后台清理会检查 5 个条目,而不会针对每个记录处理进行清理。
笔记:
- 如果没有对状态进行访问或没有处理任何记录,则过期状态将持续存在。
- 增量清理所花费的时间会增加记录处理延迟。
- 目前增量清理仅针对堆状态后端实现。对 RocksDB 设置它不会有任何效果。
- 如果堆状态后端与同步快照一起使用,则全局迭代器在迭代时会保留所有键的副本,因为其特定实现不支持并发修改。启用此功能将增加内存消耗。异步快照则不存在这个问题。
- 对于现有作业,可以在 StateTtlConfig 中随时激活或停用此清理策略,例如从保存点重新启动后。
RocksDB 压缩期间的清理
如果使用 RocksDB 状态后端,将调用 Flink 特定的压缩过滤器进行后台清理。 RocksDB 定期运行异步压缩来合并状态更新并减少存储。 Flink 压缩过滤器使用 TTL 检查状态条目的过期时间戳并排除过期值。
该功能可以在 StateTtlConfig 中配置:
Java代码:
import org.apache.flink.api.common.state.StateTtlConfig;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupInRocksdbCompactFilter(1000, Time.hours(1)).build();
Scala代码:
import org.apache.flink.api.common.state.StateTtlConfigval ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupInRocksdbCompactFilter(1000, Time.hours(1)).build
Python代码:
from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfigttl_config = StateTtlConfig \.new_builder(Time.seconds(1)) \.cleanup_in_rocksdb_compact_filter(1000, Time.hours(1)) \.build()
RocksDB 压缩过滤器每次处理一定数量的状态条目后都会从 Flink 查询当前时间戳,用于检查过期情况。您可以更改它并将自定义值传递给 StateTtlConfig.newBuilder(…).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) 方法。更频繁地更新时间戳可以提高清理速度,但会降低压缩性能,因为它使用来自本机代码的 JNI 调用。 RocksDB 后端的默认后台清理会在每次处理 1000 个条目时查询当前时间戳。
定期压缩可以加快过期状态条目的清理速度,特别是对于很少访问的状态条目。早于该值的文件将被拾取进行压缩,并重新写入到与之前相同的级别。它确保文件定期通过压缩过滤器。您可以更改它并将自定义值传递给 StateTtlConfig.newBuilder(…).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries, Time periodicalCompactionTime) 方法。定期压缩秒数的默认值为 30 天。您可以将其设置为 0 以关闭定期压缩,或设置一个较小的值以加速过期状态条目清理,但它会触发更多压缩。
您可以通过激活 FlinkCompactionFilter 的调试级别来从 RocksDB 过滤器的本机代码激活调试日志:
log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG
笔记:
- 在压缩过程中调用 TTL 过滤器会减慢速度。 TTL 过滤器必须解析上次访问的时间戳,并检查正在压缩的每个键的每个存储状态条目的过期时间。如果是集合状态类型(列表或映射),还会针对每个存储的元素调用检查。
- 如果此功能与包含非固定字节长度元素的列表状态一起使用,则本机 TTL 过滤器必须在每个状态条目(其中至少第一个元素已过期)额外通过 JNI 调用该元素的 Flink java 类型序列化器确定下一个未过期元素的偏移量。
- 对于现有作业,可以在 StateTtlConfig 中随时激活或停用此清理策略,例如从保存点重新启动后。
- 周期性压缩仅在启用 TTL 时才起作用。
相关文章:
Flink系列之:State Time-To-Live (TTL)
Flink系列之:State Time-To-Live TTL 一、TTL二、TTL实现代码三、过期状态的清理 一、TTL Flink的TTL(Time-To-Live)是一种数据过期策略,用于指定数据在流处理中的存活时间。TTL可以应用于Flink中的状态或事件时间窗口࿰…...
数据结构(Chapter Two -01)—线性表及顺序表
2.1 线性表 线性表是具有相同数据类型的n个数据元素的有限序列。第一个元素为表头元素,最后一个元素为表尾元素。除第一个元素,每个元素有且仅有一个直接前驱。除最后一个元素,每个元素都仅有一个直接后继。 其中线性表包括以下(…...
【刷题笔记1】
笔记1 string s;while(cin>>s);cout<<s.length()<<endl;输入为hello nowcoder时,输出为8 (nowcoder的长度) 2.字符串的输入(有空格) string a;getline(cin, a);cout<<a<<endl;输入为ABCabc a 输出为ABCabc a …...
视频数据卡设计方案:120-基于PCIe的视频数据卡
一、产品概述 基于PCIe的一款视频数据收发卡,并通过PCIe传输到存储计算服务器,实现信号的采集、分析、模拟输出,存储。 产品固化FPGA逻辑,实现PCIe的连续采集,单次采集容量2GB,开源的PCIe QT客…...
Windows使用VNC Viewer远程桌面Ubuntu【内网穿透】
文章目录 前言1. ubuntu安装VNC2. 设置vnc开机启动3. windows 安装VNC viewer连接工具4. 内网穿透4.1 安装cpolar【支持使用一键脚本命令安装】4.2 创建隧道映射4.3 测试公网远程访问 5. 配置固定TCP地址5.1 保留一个固定的公网TCP端口地址5.2 配置固定公网TCP端口地址5.3 测试…...
javascript 数组处理的两个利器: `forEach` 和 `map`(上)
🤍 前端开发工程师(主业)、技术博主(副业)、已过CET6 🍨 阿珊和她的猫_CSDN个人主页 🕠 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 🍚 蓝桥云课签约作者、已在蓝桥云…...
【C语言】SCU安全项目1-FindKeys
目录 前言 命令行参数 16进制转字符串 extract_message1 process_keys12 extract_message2 main process_keys34 前言 因为这个学期基本都在搞CTF的web方向,C语言不免荒废。所幸还会一点指针相关的知识,故第一个安全项目做的挺顺利的,…...
IDA pro软件 如何修改.exe小程序打开对话框显示的文字?
环境: Win10 专业版 IDA pro Version 7.5.201028 .exe小程序 问题描述: IDA pro软件 如何修改.exe小程序打开对话框显示的文字? 解决方案: 一、在IDA Python脚本中编写代码来修改.rdata段中的静态字符串可以使用以下示例代码作为起点(未成功) import idc# 定义要修…...
Ubuntu22.04切换用户
一、只有一个用户时没有切换用户菜单项 1、用户信息 cat /etc/passwd 2、系统菜单 二、添加用户 添加新用户ym,全名yang mi 三、有两个及以上的用户时出现切换用户菜单项 1、用户信息 cat /etc/passwd 2、系统菜单 四、切换用户 1、点击上图中Switch User …...
torch.gather(...)
1. Abstract 对于 pytorch 中的函数 torch.gather(input, # (Tensor) the source tensordim, # (int) the axis along which to indexindex, # (LongTensor) the indices of elements to gather*,sparse_gradFalse,outNone ) → Tensor有点绕,很多博客画各…...
vscode如何开发微信小程序?JS与TS的主要区别?
要在 VS Code 中编写微信小程序代码并同步到 Git,需要安装以下插件: 1. 微信小程序插件(WeChat Mini Program):此插件提供了微信小程序的语法高亮、代码提示、调试、上传等功能。 2. Git 插件(GitLens、…...
产品入门第五讲:Axure交互和情境
目录 一.Axure交互和情境的介绍 1.交互介绍 概念 常见的Axure交互设计技巧 2.情境介绍 概念 常见的Axure情境设计技巧: 二.实例展示 1.ERP登录页到主页的跳转 2.ERP的菜单跳转到各个页面 📚📚 🏅我是默,一个…...
Python 自动化之收发邮件(一)
imapclient / smtplib 收发邮件 文章目录 imapclient / smtplib 收发邮件前言一、基本内容二、发送邮件1.整体代码 三、获取邮件1.整体代码 总结 前言 简单给大家写个如何用Python进行发邮件和查看邮件教程,希望对各位有所帮助。 一、基本内容 本文主要分为两部分…...
Flutter开发笔记 —— sqflite插件数据库应用
前言 今天在观阅掘金大佬文章的时候,了解到了该 sqflite 插件,结合官网教程和自己实践,由此总结出该文,希望对大家的学习有帮助! 插件详情 Flutter的 SQLite 插件。支持 iOS、Android 和 MacOS。 支持事务和batch模式…...
OxLint 发布了,Eslint 何去何从?
由于最近的rust在前端领域的崛起,基于rust的前端生态链遭到rust底层重构,最近又爆出OxLint,是一款基于Rust的linter工具Oxlint在国外前端圈引起热烈讨论,很多大佬给出了高度评价;你或许不知道OxLint,相比ES…...
第一次使用ThreadPoolExecutor处理业务
通过对业务逻辑的分析,进行编码,先把第一条sql查出来的数据进行分组,然后分别使用不同的线程去查询数据返回,并添加到原来的数据中。 总感觉哪里写的不对,但是同事们都没用过这个,请大家指教一下ÿ…...
Sharding-Jdbc(6):Sharding-Jdbc日志分析
1 修改配置 将配置文件中的开启分片日志从false改为true Sharding-JDBC中的路由结果是通过分片字段和分片方法来确定的,如果查询条件中有 id 字段的情况还好,查询将会落到某个具体的分片;如果查询没有分片的字段,会向所有的db或者是表都会查…...
centos安装了curl却报 -bash: curl: command not found
前因 我服务器上想用curl下载docker-compress,发现没有curl命令,就去下载安装,安装完成之后,报-bash: curl: command not found 解决方法 [rootcentos ~]# rpm -e --nodeps curl warning: file /usr/bin/curl: remove failed: …...
Re58:读论文 REALM: Retrieval-Augmented Language Model Pre-Training
诸神缄默不语-个人CSDN博文目录 诸神缄默不语的论文阅读笔记和分类 论文名称:REALM: Retrieval-Augmented Language Model Pre-Training 模型名称:Retrieval-Augmented Language Model pre-training (REALM) 本文是2020年ICML论文,作者来自…...
java的json解析
import com.alibaba.fastjson.*; public class JsonParser { public static void main(String[] args) { String jsonStr "{\"name\":\"John\", \"age\":30}"; // JSON字符串示例 // 将JSON字符串转换为JSONObject对象 JSONObje…...
【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型
摘要 拍照搜题系统采用“三层管道(多模态 OCR → 语义检索 → 答案渲染)、两级检索(倒排 BM25 向量 HNSW)并以大语言模型兜底”的整体框架: 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后,分别用…...
大数据学习栈记——Neo4j的安装与使用
本文介绍图数据库Neofj的安装与使用,操作系统:Ubuntu24.04,Neofj版本:2025.04.0。 Apt安装 Neofj可以进行官网安装:Neo4j Deployment Center - Graph Database & Analytics 我这里安装是添加软件源的方法 最新版…...
通过Wrangler CLI在worker中创建数据库和表
官方使用文档:Getting started Cloudflare D1 docs 创建数据库 在命令行中执行完成之后,会在本地和远程创建数据库: npx wranglerlatest d1 create prod-d1-tutorial 在cf中就可以看到数据库: 现在,您的Cloudfla…...
QMC5883L的驱动
简介 本篇文章的代码已经上传到了github上面,开源代码 作为一个电子罗盘模块,我们可以通过I2C从中获取偏航角yaw,相对于六轴陀螺仪的yaw,qmc5883l几乎不会零飘并且成本较低。 参考资料 QMC5883L磁场传感器驱动 QMC5883L磁力计…...
镜像里切换为普通用户
如果你登录远程虚拟机默认就是 root 用户,但你不希望用 root 权限运行 ns-3(这是对的,ns3 工具会拒绝 root),你可以按以下方法创建一个 非 root 用户账号 并切换到它运行 ns-3。 一次性解决方案:创建非 roo…...
第一篇:Agent2Agent (A2A) 协议——协作式人工智能的黎明
AI 领域的快速发展正在催生一个新时代,智能代理(agents)不再是孤立的个体,而是能够像一个数字团队一样协作。然而,当前 AI 生态系统的碎片化阻碍了这一愿景的实现,导致了“AI 巴别塔问题”——不同代理之间…...
LLM基础1_语言模型如何处理文本
基于GitHub项目:https://github.com/datawhalechina/llms-from-scratch-cn 工具介绍 tiktoken:OpenAI开发的专业"分词器" torch:Facebook开发的强力计算引擎,相当于超级计算器 理解词嵌入:给词语画"…...
HashMap中的put方法执行流程(流程图)
1 put操作整体流程 HashMap 的 put 操作是其最核心的功能之一。在 JDK 1.8 及以后版本中,其主要逻辑封装在 putVal 这个内部方法中。整个过程大致如下: 初始判断与哈希计算: 首先,putVal 方法会检查当前的 table(也就…...
【分享】推荐一些办公小工具
1、PDF 在线转换 https://smallpdf.com/cn/pdf-tools 推荐理由:大部分的转换软件需要收费,要么功能不齐全,而开会员又用不了几次浪费钱,借用别人的又不安全。 这个网站它不需要登录或下载安装。而且提供的免费功能就能满足日常…...
莫兰迪高级灰总结计划简约商务通用PPT模版
莫兰迪高级灰总结计划简约商务通用PPT模版,莫兰迪调色板清新简约工作汇报PPT模版,莫兰迪时尚风极简设计PPT模版,大学生毕业论文答辩PPT模版,莫兰迪配色总结计划简约商务通用PPT模版,莫兰迪商务汇报PPT模版,…...
