Flink实战(11)-Exactly-Once语义之两阶段提交
0 大纲
[Apache Flink]2017年12月发布的1.4.0版本开始,为流计算引入里程碑特性:TwoPhaseCommitSinkFunction。它提取了两阶段提交协议的通用逻辑,使得通过Flink来构建端到端的Exactly-Once程序成为可能。同时支持:
- 数据源(source)
- 和输出端(sink)
包括Apache Kafka 0.11及更高版本。它提供抽象层,用户只需实现少数方法就能实现端到端Exactly-Once语义。
新功能及Flink实现逻辑:
- 描述Flink checkpoint机制如何保证Flink程序结果的Exactly-Once的
- 显示Flink如何通过两阶段提交协议与数据源和数据输出端交互,以提供端到端的Exactly-Once保证
- 通过一个简单的示例,了解如何使用TwoPhaseCommitSinkFunction实现Exactly-Once的文件输出
1 Flink应用中的Exactly-Once语义
Exactly-Once,指每个输入的事件只影响最终结果一次。即使机器或软件故障,既没有重复数据,也不会丢数据。
Flink很久就提供Exactly-Once,checkpoint机制是Flink有能力提供Exactly-Once语义的核心。
一次checkpoint是以下内容的一致性快照:
- 应用程序的当前状态
- 输入流的位置
Flink可配置一个固定时间点,定期产生checkpoint,将checkpoint的数据写入持久存储系统,如S3或HDFS。将checkpoint数据写入持久存储是异步,即Flink应用程序在checkpoint过程中可以继续处理数据。
如果发生机器或软件故障,重新启动后,Flink应用程序将从最新的checkpoint点恢复处理; Flink会恢复应用程序状态,将输入流回滚到上次checkpoint保存的位置,然后重新开始运行。这意味着Flink可以像从未发生过故障一样计算结果。
Flink 1.4.0前,Exactly-Once语义仅限Flink应用程序内部,没有扩展到Flink数据处理完后发送的大多数外部系统。Flink应用程序与各种数据输出端进行交互,开发人员自己维护组件上下文保证Exactly-Once语义。
为提供端到端的Exactly-Once语义 – 即除了Flink应用程序内部,Flink写入的外部系统也需要能满足Exactly-Once语义 – 这些外部系统必须提供提交或回滚的方法,然后通过Flink的checkpoint机制协调。
分布式系统中,协调提交和回滚的常用方法是2pc协议。讨论Flink的TwoPhaseCommitSinkFunction如何利用2pc提供端到端的Exactly-Once语义。
2 Flink应用程序端到端的Exactly-Once语义
Kafka经常与Flink使用。Kafka 0.11版本添加事务支持。这意味着现在通过Flink读写Kafaka,并提供端到端的Exactly-Once语义有了必要支持。
Flink对端到端的Exactly-Once语义的支持不仅局限Kafka,可将它与任何一个提供必要的协调机制的源/输出端一起使用。如Pravega,来自DELL/EMC的开源流媒体存储系统,通过Flink的TwoPhaseCommitSinkFunction也能支持端到端的Exactly-Once语义。

示例程序有:
- 从Kafka读取的数据源(Flink内置的KafkaConsumer)
- 窗口聚合
- 将数据写回Kafka的数据输出端(Flink内置的KafkaProducer)
要使数据输出端提供Exactly-Once保证,须将所有数据通过一个事务提交给Kafka。提交捆绑了两个checkpoint之间的所有要写数据。这确保在故障时,能回滚写入的数据。但分布式系统中,通常有多个并发运行的写入任务,所有组件须在提交或回滚时“一致”才能确保一致结果。Flink使用2PC及预提交阶段解决这问题。
pre-commit
checkpoint开始时,即2PC的“预提交”阶段。当checkpoint开始时,Flink的JobManager会将checkpoint barrier(将数据流中的记录分为进入当前checkpoint与进入下一个checkpoint)注入数据流。
brarrier在operator之间传递。对每个operator,它触发operator的状态快照写入state backend。

数据源保存了消费Kafka的偏移量(offset),之后将checkpoint barrier传递给下一operator。
这种方式仅适用于operator具有『内部』状态。
内部状态
指Flink state backend保存和管理的。如第二个operator中window聚合算出来的sum值。当一个进程有它的内部状态时,除了在checkpoint前需将数据变更写入state backend,无需在pre-commit阶段执行其他操作。
Flink负责在checkpoint成功时正确提交这些写入或故障时中止这些写入。

3 Flink应用启动pre-commit阶段
当进程具有『外部』状态,需额外处理。外部状态通常以写入外部系统(如Kafka)的形式出现。此时,为提供Exactly-Once保证,外部系统须【支持事务】,才能和两阶段提交协议集成。
示例数据需写入Kafka,因此数据输出端(Data Sink)有外部状态。此时,在预提交阶段:
- 除了将其状态写入state backend
- 数据输出端还必须预先提交其外部事务

当checkpoint barrier在所有operator都传递了一遍,并且触发的checkpoint回调成功完成时,预提交阶段结束。所有触发的状态快照都被视为该checkpoint的一部分。checkpoint是整个应用程序状态的快照,包括预先提交的外部状态。若故障,可回滚到上次成功完成快照的时间点。
下一步是通知所有operator,checkpoint已经成功了。这是2PC的提交阶段,JobManager为应用程序中的每个operator发出checkpoint已完成的回调。
数据源和 widnow operator没有外部状态,因此在提交阶段,这些operator不必执行任何操作。但是,数据输出端(Data Sink)拥有外部状态,此时应该提交外部事务。

总结
- 一旦所有operator完成预提交,就提交一个commit。
- 如果至少有一个预提交失败,则所有其他提交都将中止,我们将回滚到上一个成功完成的checkpoint。
- 在预提交成功之后,提交的commit需要保证最终成功 – operator和外部系统都需要保障这点。如果commit失败(例如,由于间歇性网络问题),整个Flink应用程序将失败,应用程序将根据用户的重启策略重新启动,还会尝试再提交。这个过程至关重要,因为如果commit最终没有成功,将会导致数据丢失。
因此,我们可以确定所有operator都同意checkpoint的最终结果:所有operator都同意数据已提交,或提交被中止并回滚。
4 在Flink中实现两阶段提交Operator
完整的实现两阶段提交协议可能有点复杂,这就是为什么Flink将它的通用逻辑提取到抽象类TwoPhaseCommitSinkFunction中的原因。
接下来基于输出到文件的简单示例,说明如何使用TwoPhaseCommitSinkFunction。用户只需要实现四个函数,就能为数据输出端实现Exactly-Once语义:
- beginTransaction – 在事务开始前,我们在目标文件系统的临时目录中创建一个临时文件。随后,我们可以在处理数据时将数据写入此文件。
- preCommit – 在预提交阶段,我们刷新文件到存储,关闭文件,不再重新写入。我们还将为属于下一个checkpoint的任何后续文件写入启动一个新的事务。
- commit – 在提交阶段,我们将预提交阶段的文件原子地移动到真正的目标目录。需要注意的是,这会增加输出数据可见性的延迟。
- abort – 在中止阶段,我们删除临时文件。
我们知道,如果发生任何故障,Flink会将应用程序的状态恢复到最新的一次checkpoint点。一种极端的情况是,预提交成功了,但在这次commit的通知到达operator之前发生了故障。在这种情况下,Flink会将operator的状态恢复到已经预提交,但尚未真正提交的状态。
我们需要在预提交阶段保存足够多的信息到checkpoint状态中,以便在重启后能正确的中止或提交事务。在这个例子中,这些信息是临时文件和目标目录的路径。
TwoPhaseCommitSinkFunction已经把这种情况考虑在内了,并且在从checkpoint点恢复状态时,会优先发出一个commit。我们需要以幂等方式实现提交,一般来说,这并不难。在这个示例中,我们可以识别出这样的情况:临时文件不在临时目录中,但已经移动到目标目录了。
在TwoPhaseCommitSinkFunction中,还有一些其他边界情况也会考虑在内,请参考Flink文档了解更多信息。
FAQ
flink sink在如果过来一个checkpoint barrier,会去存储state,这个动作会和普通的write并行吗?还是串行?
在Flink的checkpoint机制中,当一个Checkpoint Barrier过来时,sink会触发对状态的snapshot,这个snapshot动作默认是和普通的write操作并行进行的。
具体来说:
Flink的checkpoint机制是通过在datastream中注入Checkpoint Barrier来实现的。
当source接收到Checkpoint Barrier时,会将其传递给下游的transformation和sink。
当sink接收到Checkpoint Barrier时,会启动一个新的线程来执行state snapshot(状态保存)。
这个状态snapshot线程会从状态后端Snapshot State,并存储检查点。
而sink的主线程在接收到Checkpoint Barrier时,会继续处理正常的write。
这样,状态snapshot和正常的write操作就是并行进行的。
但是也可以通过Sink的配置来设置snapshot和write的执行策略,主要有两种模式:
并行模式(默认):snapshot和write同时进行
串行模式:snapshot完成后再进行write
综上,Flink sink在默认的并行checkpoint模式下,状态snapshot和普通的write操作是并行执行的。可以通过配置来改变其行为。这样可以根据实际需要进行平衡。
总结
- Flink的checkpoint机制是支持两阶段提交协议并提供端到端的Exactly-Once语义的基础。
- 这个方案的优点是: Flink不像其他一些系统那样,通过网络传输存储数据 – 不需要像大多数批处理程序那样将计算的每个阶段写入磁盘。
- Flink的TwoPhaseCommitSinkFunction提取了两阶段提交协议的通用逻辑,基于此将Flink和支持事务的外部系统结合,构建端到端的Exactly-Once成为可能。
- 从Flink 1.4.0开始,Pravega和Kafka 0.11 producer都提供了Exactly-Once语义;Kafka在0.11版本首次引入了事务,为在Flink程序中使用Kafka producer提供Exactly-Once语义提供了可能性。
- Kafaka 0.11 producer的事务是在TwoPhaseCommitSinkFunction基础上实现的,和at-least-once producer相比只增加了非常低的开销。
本文由博客一文多发平台 OpenWrite 发布!
相关文章:
Flink实战(11)-Exactly-Once语义之两阶段提交
0 大纲 [Apache Flink]2017年12月发布的1.4.0版本开始,为流计算引入里程碑特性:TwoPhaseCommitSinkFunction。它提取了两阶段提交协议的通用逻辑,使得通过Flink来构建端到端的Exactly-Once程序成为可能。同时支持: 数据源&#…...
日志技术logback
一,日志概括 二,日志技术的特点 三,日志技术的体系 三,入门 四,案例 package XinZheng;import org.slf4j.Logger; import org.slf4j.LoggerFactory;public class Main58 {//1,创建一个Logger日志对象public static fi…...
linux(1)之build构建系统基础(一)
Linux(1)之buildroot构建系统(一) Author:Onceday Date:2023年11月12日 漫漫长路,才刚刚开始… 参考文档: The Yocto ProjectBuildroot - Making Embedded Linux Easy 文章目录 Linux(1)之buildroot构建系统(一)1. 概述1.1 如…...
25 Linux I2C 驱动
一、I2C简介 I2C老朋友了,在单片机里面也学过,现在再复习一下。I2C使用两条线在主控制器和从机之间进行数据通信。一条是 SCL(串行时钟线),另外一条是 SDA(串行数据线),这两条数据线需要接上拉电阻,总线空闲的时候 SCL…...
API 设计:使用 Node.js 和 Express.js 的综合教程
API(应用程序编程接口)设计涉及创建一个高效而强大的接口,允许不同的软件应用程序相互交互。 说明 本教程将指导您使用 Node.js 和 Express.js 作为核心技术来规划、设计和构建 API。但是,这些原则可以应用于任何语言或框架。我们…...
vite和webpack的区别和练习
Vite和Webpack都是现代化的前端构建工具,但它们之间存在一些区别: 构建性能:Vite使用ES Modules提高了构建性能,可以在构建时只构建需要的部分,而Webpack则需要在构建时处理整个应用程序。 开发体验:Vite具…...
Python与设计模式--装饰器模式
6-Python与设计模式–装饰器模式 一、快餐点餐系统 又提到了那个快餐点餐系统,不过今天我们只以其中的一个类作为主角:饮料类。 首先,回忆下饮料类: class Beverage():name ""price 0.0type "BEVERAGE"…...
flutter之graphic图表自定义tooltip
renderer graphic中tooltip的TooltipGuide类提供了renderer方法,接收三个参数Size类型,Offset类型,Map<int, Tuple>类型。可查到的文档是真的少,所以只能在源码中扒拉例子,做符合需求的修改。 官方github示例 …...
逆向扒cocosjs安卓包教程-破解加密的js源码
本文只适用于cocosjs引擎打包的游戏apk,针对此类apk进行源码级别的逆向破解,可直接逐个破解工程内的源码部分,让游戏逻辑大白于你的面前,你可以针对js源码进行二次开发。按照我的教程破解过程中遇到什么问题,欢迎留言。 目录 准备apk包 查找加密key 解密jsc文件 方案1…...
Kafka(一)
一:简介 解决高吞吐量项目的需求 是一款为大数据而生的消息中间件,具有百亿级tps的吞吐量,在数据采集、传输、存储的过程中发挥着作用 二:为什么要使用消息队列 一个普通访问量的接口和一个大并发的接口,它们背后的…...
【Amazon】安装卸载AWS CLI操作流程(Windows 、Linux系统)
AWS 命令行界面(AWS CLI)是用于管理 AWS 产品的统一工具。只需要下载和配置一个工具,您就可以使用命令行控制多个 AWS 产品并利用脚本来自动执行这些服务。 AWS CLI v2 提供了多项新功能,包括改进的安装程序、新的配置选项&#…...
Django同时连接多种数据库
我的使用场景需要同时连接达梦数据库和MYSQL数据库,有的功能需要查询达梦,有的功能则需要查询MYSQL。 第一步:在 Django 的 settings.py 文件中,配置多个数据库连接。你可以在 DATABASES 字典中添加多个数据库配置。每个数据库配置…...
【链表之练习题】
文章目录 翻转链表找到链表的中间节点返回倒数第k个节点合并两个有序链表判断链表是否回文注意 翻转链表 //反转链表//实质上是把每一个节点头插法,原本第一个节点变成最后一个节点public ListNode reverseList(){//链表为空if (head null){return null;}//链表只有一个节点if…...
情感对话机器人的任务体系
人类在处理对话中的情感时,需要先根据对话场景中的蛛丝马迹判断出对方的情感,继而根据对话的主题等信息思考自身用什么情感进行回复,最后结合推理出的情感形成恰当的回复。受人类处理情感对话的启发,情感对话机器人需要完成以下几…...
【笔记 Pytorch 08】深度学习模板 (未完)
文章目录 一、声明二、工程结构三、文件内容main.pymodel.pydataset.pyutils.py 四、问题汇总 一、声明 非常感谢这些资料的作者: 【参考1】、【PyTorch速成教程 (by Sung Kim)】 二、工程结构 ├── main.py:实现训练 (train) 、验证(validation)和…...
【如何学习Python自动化测试】—— Cookie 处理
前提 网络通信是当今社会最为普及和繁荣的技术之一,其承载了人们生活中瞬息万变的信息传递和交流。而作为网络通信的核心要素,网络协议、socket、cookie和session则是网络通信的灵魂。 一、网络协议 网络协议是计算机和网络设备之间相互通信的规则和标准…...
IOS+Appium+Python自动化全实战教程
由于公司的产品坐落于不同的平台,如ios、mac、Android、windows、web。因此每次有新需求的时候,开发结束后,留给测试的时间也不多。此外,一些新的功能实现,偶尔会影响其他的模块功能正常的使用。 网上的ios自动化方面的…...
华硕灵耀XPro(UX7602ZM)原装Win11系统恢复安装教程方法
华硕灵耀XPro(UX7602ZM)原装Win11系统恢复安装教程方法: 第一步:需要自备华硕6个底包工厂安装包(EDN.KIT.OFS.SWP.HDI.TLK)或者自己备份的iso/esd/wim等镜像恢复 支持系列: 灵耀系列原装系统 无畏系列原装系统 枪…...
SpringBoot整合Redis,redis连接池和RedisTemplate序列化
SpringBoot整合Redis 1、SpringBoot整合redis1.1 pom.xml1.2 application.yml1.3 配置类RedisConfig,实现RedisTemplate序列化1.4 代码测试 2、SpringBoot整合redis几个疑问?2.1、Redis 连接池讲解2.2、RedisTemplate和StringRedisTemplate 3、RedisTemp…...
学习课题:逐步构建开发播放器【QT5 + FFmpeg6 + SDL2】
目录 一、播放器开发(一):播放器组成大致结构与代码流程设计 二、播放器开发(二):了解FFmpeg与SDL常用对象和函数 三、播放器开发(三):FFmpeg与SDL环境配置 四、播放器开发(四):多线程解复用与解码模块实现 五、播放器开发(五…...
变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析
一、变量声明设计:let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性,这种设计体现了语言的核心哲学。以下是深度解析: 1.1 设计理念剖析 安全优先原则:默认不可变强制开发者明确声明意图 let x 5; …...
Cursor实现用excel数据填充word模版的方法
cursor主页:https://www.cursor.com/ 任务目标:把excel格式的数据里的单元格,按照某一个固定模版填充到word中 文章目录 注意事项逐步生成程序1. 确定格式2. 调试程序 注意事项 直接给一个excel文件和最终呈现的word文件的示例,…...
【Oracle APEX开发小技巧12】
有如下需求: 有一个问题反馈页面,要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据,方便管理员及时处理反馈。 我的方法:直接将逻辑写在SQL中,这样可以直接在页面展示 完整代码: SELECTSF.FE…...
【WiFi帧结构】
文章目录 帧结构MAC头部管理帧 帧结构 Wi-Fi的帧分为三部分组成:MAC头部frame bodyFCS,其中MAC是固定格式的,frame body是可变长度。 MAC头部有frame control,duration,address1,address2,addre…...
Python爬虫(一):爬虫伪装
一、网站防爬机制概述 在当今互联网环境中,具有一定规模或盈利性质的网站几乎都实施了各种防爬措施。这些措施主要分为两大类: 身份验证机制:直接将未经授权的爬虫阻挡在外反爬技术体系:通过各种技术手段增加爬虫获取数据的难度…...
LLM基础1_语言模型如何处理文本
基于GitHub项目:https://github.com/datawhalechina/llms-from-scratch-cn 工具介绍 tiktoken:OpenAI开发的专业"分词器" torch:Facebook开发的强力计算引擎,相当于超级计算器 理解词嵌入:给词语画"…...
MySQL中【正则表达式】用法
MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现(两者等价),用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例: 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...
JVM暂停(Stop-The-World,STW)的原因分类及对应排查方案
JVM暂停(Stop-The-World,STW)的完整原因分类及对应排查方案,结合JVM运行机制和常见故障场景整理而成: 一、GC相关暂停 1. 安全点(Safepoint)阻塞 现象:JVM暂停但无GC日志,日志显示No GCs detected。原因:JVM等待所有线程进入安全点(如…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
佰力博科技与您探讨热释电测量的几种方法
热释电的测量主要涉及热释电系数的测定,这是表征热释电材料性能的重要参数。热释电系数的测量方法主要包括静态法、动态法和积分电荷法。其中,积分电荷法最为常用,其原理是通过测量在电容器上积累的热释电电荷,从而确定热释电系数…...
