当前位置: 首页 > news >正文

Flink系列之:深入理解ttl和checkpoint,Flink SQL应用ttl案例

Flink系列之:深入理解ttl和checkpoint,Flink SQL应用ttl案例

  • 一、深入理解Flink TTL
  • 二、Flink SQL设置TTL
  • 三、Flink设置TTL
  • 四、深入理解checkpoint
  • 五、Flink设置Checkpoint
  • 六、Flink SQL关联多张表
  • 七、Flink SQL使用TTL关联多表

一、深入理解Flink TTL

Flink TTL(Time To Live)是一种机制,用于设置数据的过期时间,控制数据在内存或状态中的存活时间。通过设置TTL,可以自动删除过期的数据,从而释放资源并提高性能。

在Flink中,TTL可以应用于不同的组件和场景,包括窗口、状态和表。

  • 窗口:对于窗口操作,可以将TTL应用于窗口中的数据。当窗口中的数据过期时,Flink会自动丢弃这些数据,从而保持窗口中的数据只包含最新的和有效的内容。这样可以减少内存的使用,同时提高窗口操作的计算性能。

  • 状态:对于有状态的操作,如键控状态或算子状态,可以为状态设置TTL。当状态中的数据过期时,Flink会自动清理过期的状态,释放资源。这对于长时间运行的应用程序特别有用,可以避免状态无限增长,消耗过多的内存。

  • 表:在Flink中,TTL也可以应用于表。可以通过在CREATE TABLE语句的WITH子句中指定TTL的选项来设置表的过期时间。当表中的数据过期时,Flink会自动删除过期的数据行。这对于处理具有实效性(例如日志)的数据特别有用,可以自动清理过期的数据,保持表的内容的新鲜和有效。

TTL在实际应用中的作用主要体现在以下几个方面:

  1. 节省资源:通过设置合适的TTL,可以有效地管理和控制内存和状态的使用。过期的数据会被自动清理,释放资源。这样可以避免无效或过时的数据占用过多的资源,提高应用程序的性能和可扩展性。

  2. 数据清理:对于具有实效性的数据,如日志数据,可以使用TTL自动清理过期的数据。这可以减少手动管理和维护数据的工作量,保持数据的新鲜和有效。

  3. 数据一致性:通过设置合适的TTL,可以确保数据在一定时间内保持一致性。过期的数据不再被读取或使用,可以避免数据不一致性的问题。

  4. 性能优化:TTL可以通过自动清理过期数据来优化查询和计算的性能。只有最新和有效的数据被保留,可以减少数据的处理量,提高计算效率。

总而言之,TTL是Flink中一种重要的机制,用于控制数据的过期时间和生命周期。通过适当配置TTL,可以优化资源使用、提高系统性能,并保持数据的一致性和有效性。

二、Flink SQL设置TTL

Flink SQL中可以使用TTL(Time To Live)来设置数据的过期时间,以控制数据在内存或状态中的存留时间。通过设置TTL,可以自动删除过期的数据,从而节省资源并提高性能。

要在Flink SQL中设置TTL,可以使用CREATE TABLE语句的WITH选项来指定TTL的配置。以下是一个示例:

CREATE TABLE myTable (id INT,name STRING,eventTime TIMESTAMP(3),WATERMARK FOR eventTime AS eventTime - INTERVAL '5' MINUTE -- 定义Watermark
) WITH ('connector' = 'kafka','topic' = 'myTopic','properties.bootstrap.servers' = 'localhost:9092','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true','ttl' = '10m' -- 设置TTL为10分钟
);

在上述示例中,通过在CREATE TABLE语句的WITH子句中的’ttl’选项中指定TTL的值(10m),即设置数据在内存中的存活时间为10分钟。过期的数据会自动被删除。

需要注意的是,引入TTL机制会增加一定的性能和资源开销。因此,在使用TTL时需要权衡好过期时间和系统的性能需求。

三、Flink设置TTL

  1. 在需要设置TTL的数据源或状态上,使用相应的API(例如DataStream API或KeyedState API)设置TTL值。
    // DataStream API
    dataStream.keyBy(<key_selector>).mapStateDescriptor.enableTimeToLive(Duration.ofMillis(<ttl_in_milliseconds>));// KeyedState API
    descriptor.enableTimeToLive(Duration.ofMillis(<ttl_in_milliseconds>));
    
  2. 在Flink作业中配置TTL检查间隔(默认值为每分钟一次):
    state.backend.rocksdb.ttl.compaction.interval: <interval_in_milliseconds>
    

四、深入理解checkpoint

Flink的Checkpoint是一种容错机制,用于在Flink作业执行过程中定期保存数据的一致性检查点。它可以保证作业在发生故障时能够从检查点恢复,并继续进行。下面是一些深入介绍Checkpoint的关键概念和特性:

  1. 一致性保证:Flink的Checkpoint机制通过保存作业状态的快照来实现一致性保证。在Checkpoint期间,Flink会确保所有的输入数据都已经被处理,并将结果写入状态后再进行检查点的保存。这样可以确保在恢复时,从检查点恢复的作业状态仍然是一致的。

  2. 保存顺序:Flink的Checkpoint机制保证了保存检查点的顺序。检查点的保存是有序的,即在一个检查点完成之前,不会开始下一个检查点的保存。这种有序的保存方式能够保证在恢复时按照检查点的顺序进行恢复。

  3. 并行度一致性:Flink的Checkpoint机制能够保证在作业的不同并行任务之间保持一致性。即使在分布式的情况下,Flink也能够确保所有并行任务在某个检查点的位置上都能保持一致。这是通过分布式快照算法和超时机制来实现的。

  4. 可靠性保证:Flink的Checkpoint机制对于作业的故障恢复非常可靠。当一个任务发生故障时,Flink会自动从最近的检查点进行恢复。如果某个检查点无法满足一致性要求,Flink会自动选择前一个检查点进行恢复,以确保作业能够在一个一致的状态下继续执行。

  5. 容错机制:Flink的Checkpoint机制提供了容错机制来应对各种故障情况。例如,如果某个任务在保存检查点时失败,Flink会尝试重新保存检查点,直到成功为止。此外,Flink还支持增量检查点,它可以在不保存整个作业状态的情况下只保存修改的部分状态,从而提高了保存检查点的效率。

  6. 高可用性:Flink的Checkpoint机制还提供了高可用性的选项。可以将检查点数据保存在分布式文件系统中,以防止单点故障。此外,还可以配置备份作业管理器(JobManager)和任务管理器(TaskManager)以确保在某个节点发生故障时能够快速恢复。

总结起来,Flink的Checkpoint机制是一种强大且可靠的容错机制,它能够确保作业在发生故障时能够从一致性检查点恢复,并继续进行。通过保存作业状态的快照,Flink能够保证作业的一致性,并提供了高可用性和高效率的保存和恢复机制。

Checkpoint是Flink中一种重要的容错机制,用于保证作业在发生故障时能够从上一次检查点恢复,并继续进行处理,从而实现容错性。以下是Checkpoint的主要用途:

  1. 容错和故障恢复:Checkpoint可以将作业的状态和数据进行持久化,当发生故障时,Flink可以使用最近的检查点来恢复作业的状态和数据,从而避免数据丢失,并继续处理未完成的任务。

  2. Exactly-Once语义:通过将检查点和事务(如果应用程序使用Flink的事务支持)结合起来,Flink可以实现Exactly-Once语义,确保结果的一致性和准确性。当作业从检查点恢复时,它将只会处理一次输入数据,并产生一次输出,避免了重复和丢失的数据写入。

  3. 冷启动和部署:可以使用检查点来实现作业的冷启动,即在作业启动时,从最近的检查点恢复状态和数据,并从上一次检查点的位置继续处理。这对于在作业启动或重新部署时非常有用,可以快速恢复到之前的状态,减少恢复所需的时间。

  4. 跨版本迁移:当使用不同版本的Flink或更改作业的代码时,可以使用检查点将作业从旧的版本转移到新的版本,从而实现跨版本迁移。

总之,Checkpoint是Flink中的关键机制,其用途包括容错和故障恢复、Exactly-Once语义、冷启动和部署以及跨版本迁移。通过使用Checkpoint,可以提高作业的可靠性、一致性和可恢复性。

五、Flink设置Checkpoint

要设置Flink的Checkpoint和TTL,可以按照以下步骤进行操作:

设置Checkpoint:

  1. 在Flink作业中启用Checkpoint:可以通过在Flink配置文件(flink-conf.yaml)中设置以下属性来开启Checkpoint:
    execution.checkpointing.enabled: true
    
  2. 设置Checkpoint间隔:可以通过以下属性设置Checkpoint的间隔时间(默认值为10秒):
    execution.checkpointing.interval: <interval_in_milliseconds>
    
  3. 设置Checkpoint保存路径:可以通过以下属性设置Checkpoint文件的保存路径(默认为jobmanager根路径):
    state.checkpoints.dir: <checkpoint_directory_path>
    

六、Flink SQL关联多张表

在Flink SQL中,可以通过使用窗口操作来保证在一段时间内多张表的数据总能关联到。窗口操作可以用于基于时间的数据处理,将数据划分为窗口,并在每个窗口上执行关联操作。下面是一个示例,演示如何在一段时间内关联多张表的数据:```sql
-- 创建两个输入表
CREATE TABLE table1 (id INT,name STRING,eventTime TIMESTAMP(3),WATERMARK FOR eventTime AS eventTime - INTERVAL '1' SECOND
) WITH ('connector.type' = 'kafka','connector.version' = 'universal','connector.topic' = 'topic1','connector.startup-mode' = 'earliest-offset','format.type' = 'json'
);CREATE TABLE table2 (id INT,value STRING,eventTime TIMESTAMP(3),WATERMARK FOR eventTime AS eventTime - INTERVAL '1' SECOND
) WITH ('connector.type' = 'kafka','connector.version' = 'universal','connector.topic' = 'topic2','connector.startup-mode' = 'earliest-offset','format.type' = 'json'
);-- 执行关联操作
SELECT t1.id, t1.name, t2.value
FROM table1 t1
JOIN table2 t2 ON t1.id = t2.id AND t1.eventTime BETWEEN t2.eventTime - INTERVAL '5' MINUTE AND t2.eventTime + INTERVAL '5' MINUTE

在上面的例子中,首先创建了两个输入表table1和table2,并分别指定了输入源(此处使用了Kafka作为示例输入源)。然后,在执行关联操作时,使用了通过窗口操作进行时间范围的过滤条件,即"t1.eventTime BETWEEN t2.eventTime - INTERVAL ‘5’ MINUTE AND t2.eventTime + INTERVAL ‘5’ MINUTE",确保了在一段时间内两张表的数据能够关联到。

通过使用窗口操作,可以根据具体的时间范围来进行数据关联,从而保证在一段时间内多张表的数据总能关联到。

七、Flink SQL使用TTL关联多表

Flink还提供了Time-To-Live (TTL)功能,可以用于在表中定义数据的生存时间。当数据的时间戳超过定义的TTL时,Flink会自动将其从表中删除。这在处理实时数据时非常有用,可以自动清理过期的数据。

在Flink中使用TTL可以通过创建表时指定TTL属性来实现,如下所示:

CREATE TABLE myTable (id INT,name STRING,event_time TIMESTAMP(3),WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND,PRIMARY KEY (id) NOT ENFORCED,TTL (event_time) AS event_time + INTERVAL '1' HOUR
) WITH ('connector.type' = 'kafka',...
)

在这个例子中,表myTable定义了一个event_time列,并使用TTL函数指定了数据的生存时间为event_time加上1小时。当数据的event_time超过1小时时,Flink会自动删除这些数据。

通过在Flink SQL中同时使用JOIN和TTL,你可以实现多张表的关联,并根据指定的条件删除过期的数据,从而更灵活地处理和管理数据。

相关文章:

Flink系列之:深入理解ttl和checkpoint,Flink SQL应用ttl案例

Flink系列之&#xff1a;深入理解ttl和checkpoint&#xff0c;Flink SQL应用ttl案例 一、深入理解Flink TTL二、Flink SQL设置TTL三、Flink设置TTL四、深入理解checkpoint五、Flink设置Checkpoint六、Flink SQL关联多张表七、Flink SQL使用TTL关联多表 一、深入理解Flink TTL …...

Wails中js调用go函数(1种go写法,2种js调用方法)

官方js调用go方法文档&#xff1a;https://wails.io/zh-Hans/docs/howdoesitwork a&#xff09;在app.go文件里写一个要js调用的go函数&#xff1a; func (a *App) JSCallGo(data1 string) string { return “test” } b&#xff09;运行 wails dev 命令&#xff0c…...

【我与java的成长记】之面向对象的初步认识

系列文章目录 能看懂文字就能明白系列 C语言笔记传送门 &#x1f31f; 个人主页&#xff1a;古德猫宁- &#x1f308; 信念如阳光&#xff0c;照亮前行的每一步 文章目录 系列文章目录&#x1f308; *信念如阳光&#xff0c;照亮前行的每一步* 前言一、什么是面向对象面向过程…...

面试题之二HTTP和RPC的区别?

面试题之二 HTTP和RPC的区别&#xff1f; Ask范围&#xff1a;分布式和微服务 难度指数&#xff1a;4星 考察频率&#xff1a;70-80% 开发年限&#xff1a;3年左右 从三个方面来回答该问题&#xff1a; 一.功能特性 1)HTTP是属于应用层的协议&#xff1a;超文本传输协议…...

初试Kafka

Kafka 是一个分布式流处理平台&#xff0c;通常用作消息中间件&#xff0c;它可以处理大规模的实时数据流。以下是从零开始使用 Kafka 作为消息中间件的基本教程&#xff1a; 步骤 1: 下载和安装 Kafka 访问 Apache Kafka 官方网站&#xff1a;Apache Kafka下载最新的 Kafka …...

SuperMap Hi-Fi 3D SDK for Unity基础开发教程

作者&#xff1a;kele 一、背景 众所周知&#xff0c;游戏引擎&#xff08;Unity&#xff09;功能强大&#xff0c;可以做出很多炫酷的游戏和动画效果&#xff0c;这部分功能的实现往往不仅仅是靠可视化界面就能够实现的&#xff0c;还需要代码开发。SuperMap Hi-Fi SDKS for …...

Upload-lab(pass1~2)

Pass-1-js检查 这里检验 因为是前端js校验,所以只用绕过js前端校验 用burp抓包修改文件类型 写一个简易版本的php Pass-2-只验证Content-type 仅仅判断content-type类型 因此上传shell.php抓包修改content-type为图片类型&#xff1a;image/jpeg、image/png、image/gif...

Linux:查询当前进程或线程的资源使用情况

目录 一、/proc/[PID]/下的各个文件1、proc简介2、/proc/[PID]/详解 二、通过Linux API获取当前进程或线程的资源使用情况1、getrusage2、sysinfo3、times 在工作中&#xff0c;我们排除app出现的一些性能/资源问题时&#xff0c;通常要先知道当前app的资源使用情况&#xff0c…...

unityc用vs2017介绍

21版unity能用17vs&#xff0c;只要在unity的Edit/Preferences/ExternalTools里面改既可。...

单元测试实战

文章目录 为什么要做单元测试&#xff1f;单元测试的几个核心要点是&#xff1a;单元测试目标单元测试框架JUnitTestNG 单元测试工具&#xff1a; 为什么要做单元测试&#xff1f; 测试代码&#xff1a;通过编写和运行单元测试&#xff0c;开发者能够快速验证代码的各个部分是否…...

WebService

调试工具&#xff1a;Postman、SoapUI Soap WebService :.net WCF 、Java CFX WebService三要素&#xff1a; SOAP&#xff08;Simple Object Access Protocol&#xff09;&#xff1a;用来描述传递信息的格式&#xff0c; 可以和现存的许多因特网协议和格式结合使用&#x…...

Nestjs使用log4j打印日志

众所周知&#xff0c;nest是自带日志的。但是好像没有log4j香&#xff0c;所以咱们来用log4j吧~ 我只演示最简单的用法&#xff0c;用具体怎么样用大家可以自己进行封装。就像前端封装自己的请求一样。 一、安装 yarn add log4js stacktrace-js 二、使用 主要就三个文件&a…...

Selenium - 自动化测试框架

Selenium 介绍 Selenium 是目前用的最广泛的 Web UI 自动化测试框架&#xff0c;核心功能就是可以在多个浏览器上进行自动化测试&#xff0c;支持多种编程语言&#xff0c;目前已经被 google&#xff0c;百度&#xff0c;腾讯等公司广泛使用。 开发步骤 1、配置 google 驱动…...

RFID技术在汽车制造:提高生产效率、优化物流管理和增强安全性

RFID技术在汽车制造:提高生产效率、优化物流管理和增强安全性 随着科技的进步&#xff0c;物联网技术已经深入到各个领域&#xff0c;尤其在制造业中&#xff0c;RFID技术以其独特的优势&#xff0c;如高精度追踪、实时数据收集和自动化操作&#xff0c;正在改变传统的生产方式…...

git异常

1.异常现象 换机新安装 Git 后&#xff0c;拉代码时出现问题&#xff1a; Unable to negotiate with 10.18.18.18 port 29418: no matching key exchange method found. Their offer: diffie-hellman-group14-sha1,diffie-hellman-group1-sha1 fatal: Could not read from rem…...

【C语言学习疑难杂症】第12期:如何从汇编角度深入理解y = (*--p)++这行代码(易懂版)

对于如下代码,思考一下输出结果是什么? int a[] = {5, 8, 7, 6, 2, 7, 3}; int y, *p = &a[1]; y = (*--p)++; printf("%d ",y); printf("%d",a[0]); 这个代码看似简单,但是在“y = (*--p)++;”这行代码里,编译器做了很多工作。 我们在vs2022的…...

5G阅信应用场景有哪些?

5G阅信的应用场景非常广泛&#xff0c;以下是一些常见的应用场景&#xff1a; 1.工业自动化&#xff1a;5G阅信可以连接各种工业设备和传感器&#xff0c;实现设备之间的实时通信和控制&#xff0c;提高生产效率和自动化水平。 2.物联网和智能家居&#xff1a;5G阅信可以连接各…...

使用OpenSSL生成自签名SSL/TLS证书和私钥

使用OpenSSL生成自签名SSL/TLS证书和私钥 前提&#xff1a; 系统安装了OpenSSL&#xff1b; 系统&#xff1a;windows、linux都可&#xff1b; 1 生成私钥 创建一个名为 server.key 的私钥文件&#xff0c;并使用 RSA 算法生成一个 2048 位的密钥。 openssl genrsa -out s…...

pycharm2023.2激活和新建项目,python3.12安装永久换源

pycharm安装 安装版本选择链接 激活参考链接 python安装 Windows下载指定python链接 选择相应版本的64位即可。 安装可以自己选择安装位置&#xff0c;记得勾选&#xff0c;add path即可。其余下一步默认即可。 windows临时换源 pip install 模块包名字 -i https://pypi.…...

FPGA分频电路设计(2)

实验要求&#xff1a; 采用 4 个开关以二进制形式设定分频系数&#xff08;1-10&#xff09;&#xff0c;实现对已知信号的分频。 类似实验我之前做过一次&#xff0c;但那次的方法实在是太笨了&#xff1a; 利用VHDL实现一定系数范围内的信号分频电路 需要重做以便将来应对更…...

汽车电子选型:RF430F5144CIRKVRQ1为什么适合发动机舱附近的应用

RF430F5144CIRKVRQ1&#xff1a;这颗77mm的QFN芯片&#xff0c;如何把13.56MHz NFC和MSP430 MCU塞进一颗汽车级SoCRF430F5144CIRKVRQ1来自德州仪器&#xff0c;是一颗高度集成的NFC传感器收发器SoC。它的核心价值很直接&#xff1a;把13.56MHz HF射频前端、16位MSP430超低功耗M…...

Windows环境下ODBC连接MySQL保姆级教程(含性能优化配置)

Windows环境下ODBC连接MySQL全流程实战指南 1. 环境准备与驱动安装 在Windows平台使用ODBC连接MySQL数据库&#xff0c;首先需要确保开发环境配置正确。与JDBC不同&#xff0c;ODBC作为跨语言的数据库连接标准&#xff0c;其驱动安装过程需要特别注意版本兼容性问题。以下是环境…...

从3天到30分钟:OpCore-Simplify如何重构黑苹果配置的技术民主化之路

从3天到30分钟&#xff1a;OpCore-Simplify如何重构黑苹果配置的技术民主化之路 【免费下载链接】OpCore-Simplify A tool designed to simplify the creation of OpenCore EFI 项目地址: https://gitcode.com/GitHub_Trending/op/OpCore-Simplify 在黑苹果技术领域&…...

Stable-Diffusion-v1-5-archive多风格生成效果:复古海报/科技感UI/手绘插画实拍

Stable Diffusion v1.5 Archive多风格生成效果&#xff1a;复古海报/科技感UI/手绘插画实拍 1. 模型介绍与核心能力 Stable Diffusion v1.5 Archive是经典SD1.5文生图模型的归档版本&#xff0c;作为AI图像生成领域的"常青树"&#xff0c;它依然保持着强大的通用图…...

快速部署SQL Server 2022:Docker容器化实践指南

1. 为什么选择Docker部署SQL Server 2022&#xff1f; 作为开发者&#xff0c;我经历过太多在本地环境安装数据库的噩梦——依赖冲突、版本不兼容、配置复杂&#xff0c;往往折腾半天才能跑起来。直到我开始用Docker部署SQL Server&#xff0c;才发现原来搭建数据库环境可以这么…...

react为啥不像vue3一样做diff优化(双端diff和最长递增子序列)

React 不是不能做 LIS / 双端 Diff&#xff0c; 而是 React 的架构目标 不追求 DOM 最优&#xff0c;追求调度最优 所以它故意不做 Vue 那套极致 Diff 优化。 一、先给结论&#xff08;面试直接说&#xff09; React 不做极致 Diff 优化&#xff0c;是因为它的架构方向是&…...

[特殊字符] GLM-4V-9B企业级方案:客户上传截图问题自动诊断

GLM-4V-9B企业级方案&#xff1a;客户上传截图问题自动诊断 1. 引言 想象一下这个场景&#xff1a;你是一家SaaS公司的技术支持工程师&#xff0c;每天的工作就是处理海量的客户工单。其中&#xff0c;有相当一部分问题描述是模糊的&#xff0c;比如“我的页面显示不正常”、…...

2025小红书跳转卡片技术揭秘:从逆向分析到服务器端自动化部署

1. 小红书跳转卡片技术现状解析 小红书跳转卡片功能原本是平台提供给商家的官方营销工具&#xff0c;但近期所有公开接口都已关闭。现在市面上能正常使用的方案&#xff0c;基本都是通过逆向工程实现的Hook技术方案。我花了两个月时间逆向分析了小红书安卓端7.8版本到8.5版本的…...

短视频 SEO 如何提高网站的搜索排名

为什么短视频 SEO 是提高网站搜索排名的关键 在当今数字化时代&#xff0c;短视频平台已经成为人们获取信息和娱乐的主要渠道。短视频的流行不仅改变了人们的观看习惯&#xff0c;还深刻影响了网络营销的方式。如何利用短视频 SEO&#xff08;搜索引擎优化&#xff09;来提高网…...

D3KeyHelper:暗黑3效率提升工具的全方位应用指南

D3KeyHelper&#xff1a;暗黑3效率提升工具的全方位应用指南 【免费下载链接】D3keyHelper D3KeyHelper是一个有图形界面&#xff0c;可自定义配置的暗黑3鼠标宏工具。 项目地址: https://gitcode.com/gh_mirrors/d3/D3keyHelper D3KeyHelper是一款开源的暗黑3鼠标宏工具…...