当前位置: 首页 > news >正文

Flink系列之:深入理解ttl和checkpoint,Flink SQL应用ttl案例

Flink系列之:深入理解ttl和checkpoint,Flink SQL应用ttl案例

  • 一、深入理解Flink TTL
  • 二、Flink SQL设置TTL
  • 三、Flink设置TTL
  • 四、深入理解checkpoint
  • 五、Flink设置Checkpoint
  • 六、Flink SQL关联多张表
  • 七、Flink SQL使用TTL关联多表

一、深入理解Flink TTL

Flink TTL(Time To Live)是一种机制,用于设置数据的过期时间,控制数据在内存或状态中的存活时间。通过设置TTL,可以自动删除过期的数据,从而释放资源并提高性能。

在Flink中,TTL可以应用于不同的组件和场景,包括窗口、状态和表。

  • 窗口:对于窗口操作,可以将TTL应用于窗口中的数据。当窗口中的数据过期时,Flink会自动丢弃这些数据,从而保持窗口中的数据只包含最新的和有效的内容。这样可以减少内存的使用,同时提高窗口操作的计算性能。

  • 状态:对于有状态的操作,如键控状态或算子状态,可以为状态设置TTL。当状态中的数据过期时,Flink会自动清理过期的状态,释放资源。这对于长时间运行的应用程序特别有用,可以避免状态无限增长,消耗过多的内存。

  • 表:在Flink中,TTL也可以应用于表。可以通过在CREATE TABLE语句的WITH子句中指定TTL的选项来设置表的过期时间。当表中的数据过期时,Flink会自动删除过期的数据行。这对于处理具有实效性(例如日志)的数据特别有用,可以自动清理过期的数据,保持表的内容的新鲜和有效。

TTL在实际应用中的作用主要体现在以下几个方面:

  1. 节省资源:通过设置合适的TTL,可以有效地管理和控制内存和状态的使用。过期的数据会被自动清理,释放资源。这样可以避免无效或过时的数据占用过多的资源,提高应用程序的性能和可扩展性。

  2. 数据清理:对于具有实效性的数据,如日志数据,可以使用TTL自动清理过期的数据。这可以减少手动管理和维护数据的工作量,保持数据的新鲜和有效。

  3. 数据一致性:通过设置合适的TTL,可以确保数据在一定时间内保持一致性。过期的数据不再被读取或使用,可以避免数据不一致性的问题。

  4. 性能优化:TTL可以通过自动清理过期数据来优化查询和计算的性能。只有最新和有效的数据被保留,可以减少数据的处理量,提高计算效率。

总而言之,TTL是Flink中一种重要的机制,用于控制数据的过期时间和生命周期。通过适当配置TTL,可以优化资源使用、提高系统性能,并保持数据的一致性和有效性。

二、Flink SQL设置TTL

Flink SQL中可以使用TTL(Time To Live)来设置数据的过期时间,以控制数据在内存或状态中的存留时间。通过设置TTL,可以自动删除过期的数据,从而节省资源并提高性能。

要在Flink SQL中设置TTL,可以使用CREATE TABLE语句的WITH选项来指定TTL的配置。以下是一个示例:

CREATE TABLE myTable (id INT,name STRING,eventTime TIMESTAMP(3),WATERMARK FOR eventTime AS eventTime - INTERVAL '5' MINUTE -- 定义Watermark
) WITH ('connector' = 'kafka','topic' = 'myTopic','properties.bootstrap.servers' = 'localhost:9092','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true','ttl' = '10m' -- 设置TTL为10分钟
);

在上述示例中,通过在CREATE TABLE语句的WITH子句中的’ttl’选项中指定TTL的值(10m),即设置数据在内存中的存活时间为10分钟。过期的数据会自动被删除。

需要注意的是,引入TTL机制会增加一定的性能和资源开销。因此,在使用TTL时需要权衡好过期时间和系统的性能需求。

三、Flink设置TTL

  1. 在需要设置TTL的数据源或状态上,使用相应的API(例如DataStream API或KeyedState API)设置TTL值。
    // DataStream API
    dataStream.keyBy(<key_selector>).mapStateDescriptor.enableTimeToLive(Duration.ofMillis(<ttl_in_milliseconds>));// KeyedState API
    descriptor.enableTimeToLive(Duration.ofMillis(<ttl_in_milliseconds>));
    
  2. 在Flink作业中配置TTL检查间隔(默认值为每分钟一次):
    state.backend.rocksdb.ttl.compaction.interval: <interval_in_milliseconds>
    

四、深入理解checkpoint

Flink的Checkpoint是一种容错机制,用于在Flink作业执行过程中定期保存数据的一致性检查点。它可以保证作业在发生故障时能够从检查点恢复,并继续进行。下面是一些深入介绍Checkpoint的关键概念和特性:

  1. 一致性保证:Flink的Checkpoint机制通过保存作业状态的快照来实现一致性保证。在Checkpoint期间,Flink会确保所有的输入数据都已经被处理,并将结果写入状态后再进行检查点的保存。这样可以确保在恢复时,从检查点恢复的作业状态仍然是一致的。

  2. 保存顺序:Flink的Checkpoint机制保证了保存检查点的顺序。检查点的保存是有序的,即在一个检查点完成之前,不会开始下一个检查点的保存。这种有序的保存方式能够保证在恢复时按照检查点的顺序进行恢复。

  3. 并行度一致性:Flink的Checkpoint机制能够保证在作业的不同并行任务之间保持一致性。即使在分布式的情况下,Flink也能够确保所有并行任务在某个检查点的位置上都能保持一致。这是通过分布式快照算法和超时机制来实现的。

  4. 可靠性保证:Flink的Checkpoint机制对于作业的故障恢复非常可靠。当一个任务发生故障时,Flink会自动从最近的检查点进行恢复。如果某个检查点无法满足一致性要求,Flink会自动选择前一个检查点进行恢复,以确保作业能够在一个一致的状态下继续执行。

  5. 容错机制:Flink的Checkpoint机制提供了容错机制来应对各种故障情况。例如,如果某个任务在保存检查点时失败,Flink会尝试重新保存检查点,直到成功为止。此外,Flink还支持增量检查点,它可以在不保存整个作业状态的情况下只保存修改的部分状态,从而提高了保存检查点的效率。

  6. 高可用性:Flink的Checkpoint机制还提供了高可用性的选项。可以将检查点数据保存在分布式文件系统中,以防止单点故障。此外,还可以配置备份作业管理器(JobManager)和任务管理器(TaskManager)以确保在某个节点发生故障时能够快速恢复。

总结起来,Flink的Checkpoint机制是一种强大且可靠的容错机制,它能够确保作业在发生故障时能够从一致性检查点恢复,并继续进行。通过保存作业状态的快照,Flink能够保证作业的一致性,并提供了高可用性和高效率的保存和恢复机制。

Checkpoint是Flink中一种重要的容错机制,用于保证作业在发生故障时能够从上一次检查点恢复,并继续进行处理,从而实现容错性。以下是Checkpoint的主要用途:

  1. 容错和故障恢复:Checkpoint可以将作业的状态和数据进行持久化,当发生故障时,Flink可以使用最近的检查点来恢复作业的状态和数据,从而避免数据丢失,并继续处理未完成的任务。

  2. Exactly-Once语义:通过将检查点和事务(如果应用程序使用Flink的事务支持)结合起来,Flink可以实现Exactly-Once语义,确保结果的一致性和准确性。当作业从检查点恢复时,它将只会处理一次输入数据,并产生一次输出,避免了重复和丢失的数据写入。

  3. 冷启动和部署:可以使用检查点来实现作业的冷启动,即在作业启动时,从最近的检查点恢复状态和数据,并从上一次检查点的位置继续处理。这对于在作业启动或重新部署时非常有用,可以快速恢复到之前的状态,减少恢复所需的时间。

  4. 跨版本迁移:当使用不同版本的Flink或更改作业的代码时,可以使用检查点将作业从旧的版本转移到新的版本,从而实现跨版本迁移。

总之,Checkpoint是Flink中的关键机制,其用途包括容错和故障恢复、Exactly-Once语义、冷启动和部署以及跨版本迁移。通过使用Checkpoint,可以提高作业的可靠性、一致性和可恢复性。

五、Flink设置Checkpoint

要设置Flink的Checkpoint和TTL,可以按照以下步骤进行操作:

设置Checkpoint:

  1. 在Flink作业中启用Checkpoint:可以通过在Flink配置文件(flink-conf.yaml)中设置以下属性来开启Checkpoint:
    execution.checkpointing.enabled: true
    
  2. 设置Checkpoint间隔:可以通过以下属性设置Checkpoint的间隔时间(默认值为10秒):
    execution.checkpointing.interval: <interval_in_milliseconds>
    
  3. 设置Checkpoint保存路径:可以通过以下属性设置Checkpoint文件的保存路径(默认为jobmanager根路径):
    state.checkpoints.dir: <checkpoint_directory_path>
    

六、Flink SQL关联多张表

在Flink SQL中,可以通过使用窗口操作来保证在一段时间内多张表的数据总能关联到。窗口操作可以用于基于时间的数据处理,将数据划分为窗口,并在每个窗口上执行关联操作。下面是一个示例,演示如何在一段时间内关联多张表的数据:```sql
-- 创建两个输入表
CREATE TABLE table1 (id INT,name STRING,eventTime TIMESTAMP(3),WATERMARK FOR eventTime AS eventTime - INTERVAL '1' SECOND
) WITH ('connector.type' = 'kafka','connector.version' = 'universal','connector.topic' = 'topic1','connector.startup-mode' = 'earliest-offset','format.type' = 'json'
);CREATE TABLE table2 (id INT,value STRING,eventTime TIMESTAMP(3),WATERMARK FOR eventTime AS eventTime - INTERVAL '1' SECOND
) WITH ('connector.type' = 'kafka','connector.version' = 'universal','connector.topic' = 'topic2','connector.startup-mode' = 'earliest-offset','format.type' = 'json'
);-- 执行关联操作
SELECT t1.id, t1.name, t2.value
FROM table1 t1
JOIN table2 t2 ON t1.id = t2.id AND t1.eventTime BETWEEN t2.eventTime - INTERVAL '5' MINUTE AND t2.eventTime + INTERVAL '5' MINUTE

在上面的例子中,首先创建了两个输入表table1和table2,并分别指定了输入源(此处使用了Kafka作为示例输入源)。然后,在执行关联操作时,使用了通过窗口操作进行时间范围的过滤条件,即"t1.eventTime BETWEEN t2.eventTime - INTERVAL ‘5’ MINUTE AND t2.eventTime + INTERVAL ‘5’ MINUTE",确保了在一段时间内两张表的数据能够关联到。

通过使用窗口操作,可以根据具体的时间范围来进行数据关联,从而保证在一段时间内多张表的数据总能关联到。

七、Flink SQL使用TTL关联多表

Flink还提供了Time-To-Live (TTL)功能,可以用于在表中定义数据的生存时间。当数据的时间戳超过定义的TTL时,Flink会自动将其从表中删除。这在处理实时数据时非常有用,可以自动清理过期的数据。

在Flink中使用TTL可以通过创建表时指定TTL属性来实现,如下所示:

CREATE TABLE myTable (id INT,name STRING,event_time TIMESTAMP(3),WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND,PRIMARY KEY (id) NOT ENFORCED,TTL (event_time) AS event_time + INTERVAL '1' HOUR
) WITH ('connector.type' = 'kafka',...
)

在这个例子中,表myTable定义了一个event_time列,并使用TTL函数指定了数据的生存时间为event_time加上1小时。当数据的event_time超过1小时时,Flink会自动删除这些数据。

通过在Flink SQL中同时使用JOIN和TTL,你可以实现多张表的关联,并根据指定的条件删除过期的数据,从而更灵活地处理和管理数据。

相关文章:

Flink系列之:深入理解ttl和checkpoint,Flink SQL应用ttl案例

Flink系列之&#xff1a;深入理解ttl和checkpoint&#xff0c;Flink SQL应用ttl案例 一、深入理解Flink TTL二、Flink SQL设置TTL三、Flink设置TTL四、深入理解checkpoint五、Flink设置Checkpoint六、Flink SQL关联多张表七、Flink SQL使用TTL关联多表 一、深入理解Flink TTL …...

Wails中js调用go函数(1种go写法,2种js调用方法)

官方js调用go方法文档&#xff1a;https://wails.io/zh-Hans/docs/howdoesitwork a&#xff09;在app.go文件里写一个要js调用的go函数&#xff1a; func (a *App) JSCallGo(data1 string) string { return “test” } b&#xff09;运行 wails dev 命令&#xff0c…...

【我与java的成长记】之面向对象的初步认识

系列文章目录 能看懂文字就能明白系列 C语言笔记传送门 &#x1f31f; 个人主页&#xff1a;古德猫宁- &#x1f308; 信念如阳光&#xff0c;照亮前行的每一步 文章目录 系列文章目录&#x1f308; *信念如阳光&#xff0c;照亮前行的每一步* 前言一、什么是面向对象面向过程…...

面试题之二HTTP和RPC的区别?

面试题之二 HTTP和RPC的区别&#xff1f; Ask范围&#xff1a;分布式和微服务 难度指数&#xff1a;4星 考察频率&#xff1a;70-80% 开发年限&#xff1a;3年左右 从三个方面来回答该问题&#xff1a; 一.功能特性 1)HTTP是属于应用层的协议&#xff1a;超文本传输协议…...

初试Kafka

Kafka 是一个分布式流处理平台&#xff0c;通常用作消息中间件&#xff0c;它可以处理大规模的实时数据流。以下是从零开始使用 Kafka 作为消息中间件的基本教程&#xff1a; 步骤 1: 下载和安装 Kafka 访问 Apache Kafka 官方网站&#xff1a;Apache Kafka下载最新的 Kafka …...

SuperMap Hi-Fi 3D SDK for Unity基础开发教程

作者&#xff1a;kele 一、背景 众所周知&#xff0c;游戏引擎&#xff08;Unity&#xff09;功能强大&#xff0c;可以做出很多炫酷的游戏和动画效果&#xff0c;这部分功能的实现往往不仅仅是靠可视化界面就能够实现的&#xff0c;还需要代码开发。SuperMap Hi-Fi SDKS for …...

Upload-lab(pass1~2)

Pass-1-js检查 这里检验 因为是前端js校验,所以只用绕过js前端校验 用burp抓包修改文件类型 写一个简易版本的php Pass-2-只验证Content-type 仅仅判断content-type类型 因此上传shell.php抓包修改content-type为图片类型&#xff1a;image/jpeg、image/png、image/gif...

Linux:查询当前进程或线程的资源使用情况

目录 一、/proc/[PID]/下的各个文件1、proc简介2、/proc/[PID]/详解 二、通过Linux API获取当前进程或线程的资源使用情况1、getrusage2、sysinfo3、times 在工作中&#xff0c;我们排除app出现的一些性能/资源问题时&#xff0c;通常要先知道当前app的资源使用情况&#xff0c…...

unityc用vs2017介绍

21版unity能用17vs&#xff0c;只要在unity的Edit/Preferences/ExternalTools里面改既可。...

单元测试实战

文章目录 为什么要做单元测试&#xff1f;单元测试的几个核心要点是&#xff1a;单元测试目标单元测试框架JUnitTestNG 单元测试工具&#xff1a; 为什么要做单元测试&#xff1f; 测试代码&#xff1a;通过编写和运行单元测试&#xff0c;开发者能够快速验证代码的各个部分是否…...

WebService

调试工具&#xff1a;Postman、SoapUI Soap WebService :.net WCF 、Java CFX WebService三要素&#xff1a; SOAP&#xff08;Simple Object Access Protocol&#xff09;&#xff1a;用来描述传递信息的格式&#xff0c; 可以和现存的许多因特网协议和格式结合使用&#x…...

Nestjs使用log4j打印日志

众所周知&#xff0c;nest是自带日志的。但是好像没有log4j香&#xff0c;所以咱们来用log4j吧~ 我只演示最简单的用法&#xff0c;用具体怎么样用大家可以自己进行封装。就像前端封装自己的请求一样。 一、安装 yarn add log4js stacktrace-js 二、使用 主要就三个文件&a…...

Selenium - 自动化测试框架

Selenium 介绍 Selenium 是目前用的最广泛的 Web UI 自动化测试框架&#xff0c;核心功能就是可以在多个浏览器上进行自动化测试&#xff0c;支持多种编程语言&#xff0c;目前已经被 google&#xff0c;百度&#xff0c;腾讯等公司广泛使用。 开发步骤 1、配置 google 驱动…...

RFID技术在汽车制造:提高生产效率、优化物流管理和增强安全性

RFID技术在汽车制造:提高生产效率、优化物流管理和增强安全性 随着科技的进步&#xff0c;物联网技术已经深入到各个领域&#xff0c;尤其在制造业中&#xff0c;RFID技术以其独特的优势&#xff0c;如高精度追踪、实时数据收集和自动化操作&#xff0c;正在改变传统的生产方式…...

git异常

1.异常现象 换机新安装 Git 后&#xff0c;拉代码时出现问题&#xff1a; Unable to negotiate with 10.18.18.18 port 29418: no matching key exchange method found. Their offer: diffie-hellman-group14-sha1,diffie-hellman-group1-sha1 fatal: Could not read from rem…...

【C语言学习疑难杂症】第12期:如何从汇编角度深入理解y = (*--p)++这行代码(易懂版)

对于如下代码,思考一下输出结果是什么? int a[] = {5, 8, 7, 6, 2, 7, 3}; int y, *p = &a[1]; y = (*--p)++; printf("%d ",y); printf("%d",a[0]); 这个代码看似简单,但是在“y = (*--p)++;”这行代码里,编译器做了很多工作。 我们在vs2022的…...

5G阅信应用场景有哪些?

5G阅信的应用场景非常广泛&#xff0c;以下是一些常见的应用场景&#xff1a; 1.工业自动化&#xff1a;5G阅信可以连接各种工业设备和传感器&#xff0c;实现设备之间的实时通信和控制&#xff0c;提高生产效率和自动化水平。 2.物联网和智能家居&#xff1a;5G阅信可以连接各…...

使用OpenSSL生成自签名SSL/TLS证书和私钥

使用OpenSSL生成自签名SSL/TLS证书和私钥 前提&#xff1a; 系统安装了OpenSSL&#xff1b; 系统&#xff1a;windows、linux都可&#xff1b; 1 生成私钥 创建一个名为 server.key 的私钥文件&#xff0c;并使用 RSA 算法生成一个 2048 位的密钥。 openssl genrsa -out s…...

pycharm2023.2激活和新建项目,python3.12安装永久换源

pycharm安装 安装版本选择链接 激活参考链接 python安装 Windows下载指定python链接 选择相应版本的64位即可。 安装可以自己选择安装位置&#xff0c;记得勾选&#xff0c;add path即可。其余下一步默认即可。 windows临时换源 pip install 模块包名字 -i https://pypi.…...

FPGA分频电路设计(2)

实验要求&#xff1a; 采用 4 个开关以二进制形式设定分频系数&#xff08;1-10&#xff09;&#xff0c;实现对已知信号的分频。 类似实验我之前做过一次&#xff0c;但那次的方法实在是太笨了&#xff1a; 利用VHDL实现一定系数范围内的信号分频电路 需要重做以便将来应对更…...

椭圆曲线密码学(ECC)

一、ECC算法概述 椭圆曲线密码学&#xff08;Elliptic Curve Cryptography&#xff09;是基于椭圆曲线数学理论的公钥密码系统&#xff0c;由Neal Koblitz和Victor Miller在1985年独立提出。相比RSA&#xff0c;ECC在相同安全强度下密钥更短&#xff08;256位ECC ≈ 3072位RSA…...

深入浅出:JavaScript 中的 `window.crypto.getRandomValues()` 方法

深入浅出&#xff1a;JavaScript 中的 window.crypto.getRandomValues() 方法 在现代 Web 开发中&#xff0c;随机数的生成看似简单&#xff0c;却隐藏着许多玄机。无论是生成密码、加密密钥&#xff0c;还是创建安全令牌&#xff0c;随机数的质量直接关系到系统的安全性。Jav…...

镜像里切换为普通用户

如果你登录远程虚拟机默认就是 root 用户&#xff0c;但你不希望用 root 权限运行 ns-3&#xff08;这是对的&#xff0c;ns3 工具会拒绝 root&#xff09;&#xff0c;你可以按以下方法创建一个 非 root 用户账号 并切换到它运行 ns-3。 一次性解决方案&#xff1a;创建非 roo…...

Linux-07 ubuntu 的 chrome 启动不了

文章目录 问题原因解决步骤一、卸载旧版chrome二、重新安装chorme三、启动不了&#xff0c;报错如下四、启动不了&#xff0c;解决如下 总结 问题原因 在应用中可以看到chrome&#xff0c;但是打不开(说明&#xff1a;原来的ubuntu系统出问题了&#xff0c;这个是备用的硬盘&a…...

Ascend NPU上适配Step-Audio模型

1 概述 1.1 简述 Step-Audio 是业界首个集语音理解与生成控制一体化的产品级开源实时语音对话系统&#xff0c;支持多语言对话&#xff08;如 中文&#xff0c;英文&#xff0c;日语&#xff09;&#xff0c;语音情感&#xff08;如 开心&#xff0c;悲伤&#xff09;&#x…...

关键领域软件测试的突围之路:如何破解安全与效率的平衡难题

在数字化浪潮席卷全球的今天&#xff0c;软件系统已成为国家关键领域的核心战斗力。不同于普通商业软件&#xff0c;这些承载着国家安全使命的软件系统面临着前所未有的质量挑战——如何在确保绝对安全的前提下&#xff0c;实现高效测试与快速迭代&#xff1f;这一命题正考验着…...

听写流程自动化实践,轻量级教育辅助

随着智能教育工具的发展&#xff0c;越来越多的传统学习方式正在被数字化、自动化所优化。听写作为语文、英语等学科中重要的基础训练形式&#xff0c;也迎来了更高效的解决方案。 这是一款轻量但功能强大的听写辅助工具。它是基于本地词库与可选在线语音引擎构建&#xff0c;…...

#Uniapp篇:chrome调试unapp适配

chrome调试设备----使用Android模拟机开发调试移动端页面 Chrome://inspect/#devices MuMu模拟器Edge浏览器&#xff1a;Android原生APP嵌入的H5页面元素定位 chrome://inspect/#devices uniapp单位适配 根路径下 postcss.config.js 需要装这些插件 “postcss”: “^8.5.…...

JVM 内存结构 详解

内存结构 运行时数据区&#xff1a; Java虚拟机在运行Java程序过程中管理的内存区域。 程序计数器&#xff1a; ​ 线程私有&#xff0c;程序控制流的指示器&#xff0c;分支、循环、跳转、异常处理、线程恢复等基础功能都依赖这个计数器完成。 ​ 每个线程都有一个程序计数…...

Golang——7、包与接口详解

包与接口详解 1、Golang包详解1.1、Golang中包的定义和介绍1.2、Golang包管理工具go mod1.3、Golang中自定义包1.4、Golang中使用第三包1.5、init函数 2、接口详解2.1、接口的定义2.2、空接口2.3、类型断言2.4、结构体值接收者和指针接收者实现接口的区别2.5、一个结构体实现多…...