当前位置: 首页 > news >正文

Flink实战(11)-Exactly-Once语义之两阶段提交

0 大纲

[Apache Flink]2017年12月发布的1.4.0版本开始,为流计算引入里程碑特性:TwoPhaseCommitSinkFunction。它提取了两阶段提交协议的通用逻辑,使得通过Flink来构建端到端的Exactly-Once程序成为可能。同时支持:

  • 数据源(source)
  • 和输出端(sink)

包括Apache Kafka 0.11及更高版本。它提供抽象层,用户只需实现少数方法就能实现端到端Exactly-Once语义。

新功能及Flink实现逻辑:

  • 描述Flink checkpoint机制如何保证Flink程序结果的Exactly-Once的
  • 显示Flink如何通过两阶段提交协议与数据源和数据输出端交互,以提供端到端的Exactly-Once保证
  • 通过一个简单的示例,了解如何使用TwoPhaseCommitSinkFunction实现Exactly-Once的文件输出

1 Flink应用中的Exactly-Once语义

Exactly-Once,指每个输入的事件只影响最终结果一次。即使机器或软件故障,既没有重复数据,也不会丢数据。

Flink很久就提供Exactly-Once,checkpoint机制是Flink有能力提供Exactly-Once语义的核心。

一次checkpoint是以下内容的一致性快照:

  • 应用程序的当前状态
  • 输入流的位置

Flink可配置一个固定时间点,定期产生checkpoint,将checkpoint的数据写入持久存储系统,如S3或HDFS。将checkpoint数据写入持久存储是异步,即Flink应用程序在checkpoint过程中可以继续处理数据。

如果发生机器或软件故障,重新启动后,Flink应用程序将从最新的checkpoint点恢复处理; Flink会恢复应用程序状态,将输入流回滚到上次checkpoint保存的位置,然后重新开始运行。这意味着Flink可以像从未发生过故障一样计算结果。

Flink 1.4.0前,Exactly-Once语义仅限Flink应用程序内部,没有扩展到Flink数据处理完后发送的大多数外部系统。Flink应用程序与各种数据输出端进行交互,开发人员自己维护组件上下文保证Exactly-Once语义。

为提供端到端的Exactly-Once语义 – 即除了Flink应用程序内部,Flink写入的外部系统也需要能满足Exactly-Once语义 – 这些外部系统必须提供提交或回滚的方法,然后通过Flink的checkpoint机制协调。

分布式系统中,协调提交和回滚的常用方法是2pc协议。讨论Flink的TwoPhaseCommitSinkFunction如何利用2pc提供端到端的Exactly-Once语义。

2 Flink应用程序端到端的Exactly-Once语义

Kafka经常与Flink使用。Kafka 0.11版本添加事务支持。这意味着现在通过Flink读写Kafaka,并提供端到端的Exactly-Once语义有了必要支持。

Flink对端到端的Exactly-Once语义的支持不仅局限Kafka,可将它与任何一个提供必要的协调机制的源/输出端一起使用。如Pravega,来自DELL/EMC的开源流媒体存储系统,通过Flink的TwoPhaseCommitSinkFunction也能支持端到端的Exactly-Once语义。

image-20231124142310942

示例程序有:

  • 从Kafka读取的数据源(Flink内置的KafkaConsumer)
  • 窗口聚合
  • 将数据写回Kafka的数据输出端(Flink内置的KafkaProducer)

要使数据输出端提供Exactly-Once保证,须将所有数据通过一个事务提交给Kafka。提交捆绑了两个checkpoint之间的所有要写数据。这确保在故障时,能回滚写入的数据。但分布式系统中,通常有多个并发运行的写入任务,所有组件须在提交或回滚时“一致”才能确保一致结果。Flink使用2PC及预提交阶段解决这问题。

pre-commit

checkpoint开始时,即2PC的“预提交”阶段。当checkpoint开始时,Flink的JobManager会将checkpoint barrier(将数据流中的记录分为进入当前checkpoint与进入下一个checkpoint)注入数据流。

brarrier在operator之间传递。对每个operator,它触发operator的状态快照写入state backend。

数据源保存了消费Kafka的偏移量(offset),之后将checkpoint barrier传递给下一operator。

这种方式仅适用于operator具有『内部』状态。

内部状态

指Flink state backend保存和管理的。如第二个operator中window聚合算出来的sum值。当一个进程有它的内部状态时,除了在checkpoint前需将数据变更写入state backend,无需在pre-commit阶段执行其他操作。

Flink负责在checkpoint成功时正确提交这些写入或故障时中止这些写入。

image-20231124150402626

3 Flink应用启动pre-commit阶段

当进程具有『外部』状态,需额外处理。外部状态通常以写入外部系统(如Kafka)的形式出现。此时,为提供Exactly-Once保证,外部系统须【支持事务】,才能和两阶段提交协议集成。

示例数据需写入Kafka,因此数据输出端(Data Sink)有外部状态。此时,在预提交阶段:

  • 除了将其状态写入state backend
  • 数据输出端还必须预先提交其外部事务

当checkpoint barrier在所有operator都传递了一遍,并且触发的checkpoint回调成功完成时,预提交阶段结束。所有触发的状态快照都被视为该checkpoint的一部分。checkpoint是整个应用程序状态的快照,包括预先提交的外部状态。若故障,可回滚到上次成功完成快照的时间点。

下一步是通知所有operator,checkpoint已经成功了。这是2PC的提交阶段,JobManager为应用程序中的每个operator发出checkpoint已完成的回调。

数据源和 widnow operator没有外部状态,因此在提交阶段,这些operator不必执行任何操作。但是,数据输出端(Data Sink)拥有外部状态,此时应该提交外部事务。

总结

  • 一旦所有operator完成预提交,就提交一个commit。
  • 如果至少有一个预提交失败,则所有其他提交都将中止,我们将回滚到上一个成功完成的checkpoint。
  • 在预提交成功之后,提交的commit需要保证最终成功 – operator和外部系统都需要保障这点。如果commit失败(例如,由于间歇性网络问题),整个Flink应用程序将失败,应用程序将根据用户的重启策略重新启动,还会尝试再提交。这个过程至关重要,因为如果commit最终没有成功,将会导致数据丢失。

因此,我们可以确定所有operator都同意checkpoint的最终结果:所有operator都同意数据已提交,或提交被中止并回滚。

4 在Flink中实现两阶段提交Operator

完整的实现两阶段提交协议可能有点复杂,这就是为什么Flink将它的通用逻辑提取到抽象类TwoPhaseCommitSinkFunction中的原因。

接下来基于输出到文件的简单示例,说明如何使用TwoPhaseCommitSinkFunction。用户只需要实现四个函数,就能为数据输出端实现Exactly-Once语义:

  • beginTransaction – 在事务开始前,我们在目标文件系统的临时目录中创建一个临时文件。随后,我们可以在处理数据时将数据写入此文件。
  • preCommit – 在预提交阶段,我们刷新文件到存储,关闭文件,不再重新写入。我们还将为属于下一个checkpoint的任何后续文件写入启动一个新的事务。
  • commit – 在提交阶段,我们将预提交阶段的文件原子地移动到真正的目标目录。需要注意的是,这会增加输出数据可见性的延迟。
  • abort – 在中止阶段,我们删除临时文件。

我们知道,如果发生任何故障,Flink会将应用程序的状态恢复到最新的一次checkpoint点。一种极端的情况是,预提交成功了,但在这次commit的通知到达operator之前发生了故障。在这种情况下,Flink会将operator的状态恢复到已经预提交,但尚未真正提交的状态。

我们需要在预提交阶段保存足够多的信息到checkpoint状态中,以便在重启后能正确的中止或提交事务。在这个例子中,这些信息是临时文件和目标目录的路径。

TwoPhaseCommitSinkFunction已经把这种情况考虑在内了,并且在从checkpoint点恢复状态时,会优先发出一个commit。我们需要以幂等方式实现提交,一般来说,这并不难。在这个示例中,我们可以识别出这样的情况:临时文件不在临时目录中,但已经移动到目标目录了。

在TwoPhaseCommitSinkFunction中,还有一些其他边界情况也会考虑在内,请参考Flink文档了解更多信息。

FAQ

flink sink在如果过来一个checkpoint barrier,会去存储state,这个动作会和普通的write并行吗?还是串行?

在Flink的checkpoint机制中,当一个Checkpoint Barrier过来时,sink会触发对状态的snapshot,这个snapshot动作默认是和普通的write操作并行进行的。

具体来说:

  • Flink的checkpoint机制是通过在datastream中注入Checkpoint Barrier来实现的。

  • 当source接收到Checkpoint Barrier时,会将其传递给下游的transformation和sink。

  • 当sink接收到Checkpoint Barrier时,会启动一个新的线程来执行state snapshot(状态保存)。

  • 这个状态snapshot线程会从状态后端Snapshot State,并存储检查点。

  • 而sink的主线程在接收到Checkpoint Barrier时,会继续处理正常的write。

  • 这样,状态snapshot和正常的write操作就是并行进行的。

但是也可以通过Sink的配置来设置snapshot和write的执行策略,主要有两种模式:

  1. 并行模式(默认):snapshot和write同时进行

  2. 串行模式:snapshot完成后再进行write

综上,Flink sink在默认的并行checkpoint模式下,状态snapshot和普通的write操作是并行执行的。可以通过配置来改变其行为。这样可以根据实际需要进行平衡。

总结

  • Flink的checkpoint机制是支持两阶段提交协议并提供端到端的Exactly-Once语义的基础。
  • 这个方案的优点是: Flink不像其他一些系统那样,通过网络传输存储数据 – 不需要像大多数批处理程序那样将计算的每个阶段写入磁盘。
  • Flink的TwoPhaseCommitSinkFunction提取了两阶段提交协议的通用逻辑,基于此将Flink和支持事务的外部系统结合,构建端到端的Exactly-Once成为可能。
  • 从Flink 1.4.0开始,Pravega和Kafka 0.11 producer都提供了Exactly-Once语义;Kafka在0.11版本首次引入了事务,为在Flink程序中使用Kafka producer提供Exactly-Once语义提供了可能性。
  • Kafaka 0.11 producer的事务是在TwoPhaseCommitSinkFunction基础上实现的,和at-least-once producer相比只增加了非常低的开销。

    本文由博客一文多发平台 OpenWrite 发布!

相关文章:

Flink实战(11)-Exactly-Once语义之两阶段提交

0 大纲 [Apache Flink]2017年12月发布的1.4.0版本开始,为流计算引入里程碑特性:TwoPhaseCommitSinkFunction。它提取了两阶段提交协议的通用逻辑,使得通过Flink来构建端到端的Exactly-Once程序成为可能。同时支持: 数据源&#…...

日志技术logback

一,日志概括 二,日志技术的特点 三,日志技术的体系 三,入门 四,案例 package XinZheng;import org.slf4j.Logger; import org.slf4j.LoggerFactory;public class Main58 {//1,创建一个Logger日志对象public static fi…...

linux(1)之build构建系统基础(一)

Linux(1)之buildroot构建系统(一) Author:Onceday Date:2023年11月12日 漫漫长路,才刚刚开始… 参考文档: The Yocto ProjectBuildroot - Making Embedded Linux Easy 文章目录 Linux(1)之buildroot构建系统(一)1. 概述1.1 如…...

25 Linux I2C 驱动

一、I2C简介 I2C老朋友了,在单片机里面也学过,现在再复习一下。I2C使用两条线在主控制器和从机之间进行数据通信。一条是 SCL(串行时钟线),另外一条是 SDA(串行数据线),这两条数据线需要接上拉电阻,总线空闲的时候 SCL…...

API 设计:使用 Node.js 和 Express.js 的综合教程

API(应用程序编程接口)设计涉及创建一个高效而强大的接口,允许不同的软件应用程序相互交互。 说明 本教程将指导您使用 Node.js 和 Express.js 作为核心技术来规划、设计和构建 API。但是,这些原则可以应用于任何语言或框架。我们…...

vite和webpack的区别和练习

Vite和Webpack都是现代化的前端构建工具,但它们之间存在一些区别: 构建性能:Vite使用ES Modules提高了构建性能,可以在构建时只构建需要的部分,而Webpack则需要在构建时处理整个应用程序。 开发体验:Vite具…...

Python与设计模式--装饰器模式

6-Python与设计模式–装饰器模式 一、快餐点餐系统 又提到了那个快餐点餐系统,不过今天我们只以其中的一个类作为主角:饮料类。 首先,回忆下饮料类: class Beverage():name ""price 0.0type "BEVERAGE"…...

flutter之graphic图表自定义tooltip

renderer graphic中tooltip的TooltipGuide类提供了renderer方法&#xff0c;接收三个参数Size类型&#xff0c;Offset类型&#xff0c;Map<int, Tuple>类型。可查到的文档是真的少&#xff0c;所以只能在源码中扒拉例子&#xff0c;做符合需求的修改。 官方github示例 …...

逆向扒cocosjs安卓包教程-破解加密的js源码

本文只适用于cocosjs引擎打包的游戏apk,针对此类apk进行源码级别的逆向破解,可直接逐个破解工程内的源码部分,让游戏逻辑大白于你的面前,你可以针对js源码进行二次开发。按照我的教程破解过程中遇到什么问题,欢迎留言。 目录 准备apk包 查找加密key 解密jsc文件 方案1…...

Kafka(一)

一&#xff1a;简介 解决高吞吐量项目的需求 是一款为大数据而生的消息中间件&#xff0c;具有百亿级tps的吞吐量&#xff0c;在数据采集、传输、存储的过程中发挥着作用 二&#xff1a;为什么要使用消息队列 一个普通访问量的接口和一个大并发的接口&#xff0c;它们背后的…...

【Amazon】安装卸载AWS CLI操作流程(Windows 、Linux系统)

AWS 命令行界面&#xff08;AWS CLI&#xff09;是用于管理 AWS 产品的统一工具。只需要下载和配置一个工具&#xff0c;您就可以使用命令行控制多个 AWS 产品并利用脚本来自动执行这些服务。 AWS CLI v2 提供了多项新功能&#xff0c;包括改进的安装程序、新的配置选项&#…...

Django同时连接多种数据库

我的使用场景需要同时连接达梦数据库和MYSQL数据库&#xff0c;有的功能需要查询达梦&#xff0c;有的功能则需要查询MYSQL。 第一步&#xff1a;在 Django 的 settings.py 文件中&#xff0c;配置多个数据库连接。你可以在 DATABASES 字典中添加多个数据库配置。每个数据库配置…...

【链表之练习题】

文章目录 翻转链表找到链表的中间节点返回倒数第k个节点合并两个有序链表判断链表是否回文注意 翻转链表 //反转链表//实质上是把每一个节点头插法,原本第一个节点变成最后一个节点public ListNode reverseList(){//链表为空if (head null){return null;}//链表只有一个节点if…...

情感对话机器人的任务体系

人类在处理对话中的情感时&#xff0c;需要先根据对话场景中的蛛丝马迹判断出对方的情感&#xff0c;继而根据对话的主题等信息思考自身用什么情感进行回复&#xff0c;最后结合推理出的情感形成恰当的回复。受人类处理情感对话的启发&#xff0c;情感对话机器人需要完成以下几…...

【笔记 Pytorch 08】深度学习模板 (未完)

文章目录 一、声明二、工程结构三、文件内容main.pymodel.pydataset.pyutils.py 四、问题汇总 一、声明 非常感谢这些资料的作者&#xff1a; 【参考1】、【PyTorch速成教程 (by Sung Kim)】 二、工程结构 ├── main.py&#xff1a;实现训练 (train) 、验证(validation)和…...

【如何学习Python自动化测试】—— Cookie 处理

前提 网络通信是当今社会最为普及和繁荣的技术之一&#xff0c;其承载了人们生活中瞬息万变的信息传递和交流。而作为网络通信的核心要素&#xff0c;网络协议、socket、cookie和session则是网络通信的灵魂。 一、网络协议 网络协议是计算机和网络设备之间相互通信的规则和标准…...

IOS+Appium+Python自动化全实战教程

由于公司的产品坐落于不同的平台&#xff0c;如ios、mac、Android、windows、web。因此每次有新需求的时候&#xff0c;开发结束后&#xff0c;留给测试的时间也不多。此外&#xff0c;一些新的功能实现&#xff0c;偶尔会影响其他的模块功能正常的使用。 网上的ios自动化方面的…...

华硕灵耀XPro(UX7602ZM)原装Win11系统恢复安装教程方法

华硕灵耀XPro(UX7602ZM)原装Win11系统恢复安装教程方法&#xff1a; 第一步&#xff1a;需要自备华硕6个底包工厂安装包&#xff08;EDN.KIT.OFS.SWP.HDI.TLK&#xff09;或者自己备份的iso/esd/wim等镜像恢复 支持系列&#xff1a; 灵耀系列原装系统 无畏系列原装系统 枪…...

SpringBoot整合Redis,redis连接池和RedisTemplate序列化

SpringBoot整合Redis 1、SpringBoot整合redis1.1 pom.xml1.2 application.yml1.3 配置类RedisConfig&#xff0c;实现RedisTemplate序列化1.4 代码测试 2、SpringBoot整合redis几个疑问&#xff1f;2.1、Redis 连接池讲解2.2、RedisTemplate和StringRedisTemplate 3、RedisTemp…...

学习课题:逐步构建开发播放器【QT5 + FFmpeg6 + SDL2】

目录 一、播放器开发(一)&#xff1a;播放器组成大致结构与代码流程设计 二、播放器开发(二)&#xff1a;了解FFmpeg与SDL常用对象和函数 三、播放器开发(三)&#xff1a;FFmpeg与SDL环境配置 四、播放器开发(四)&#xff1a;多线程解复用与解码模块实现 五、播放器开发(五…...

变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析

一、变量声明设计&#xff1a;let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性&#xff0c;这种设计体现了语言的核心哲学。以下是深度解析&#xff1a; 1.1 设计理念剖析 安全优先原则&#xff1a;默认不可变强制开发者明确声明意图 let x 5; …...

Cursor实现用excel数据填充word模版的方法

cursor主页&#xff1a;https://www.cursor.com/ 任务目标&#xff1a;把excel格式的数据里的单元格&#xff0c;按照某一个固定模版填充到word中 文章目录 注意事项逐步生成程序1. 确定格式2. 调试程序 注意事项 直接给一个excel文件和最终呈现的word文件的示例&#xff0c;…...

【Oracle APEX开发小技巧12】

有如下需求&#xff1a; 有一个问题反馈页面&#xff0c;要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据&#xff0c;方便管理员及时处理反馈。 我的方法&#xff1a;直接将逻辑写在SQL中&#xff0c;这样可以直接在页面展示 完整代码&#xff1a; SELECTSF.FE…...

【WiFi帧结构】

文章目录 帧结构MAC头部管理帧 帧结构 Wi-Fi的帧分为三部分组成&#xff1a;MAC头部frame bodyFCS&#xff0c;其中MAC是固定格式的&#xff0c;frame body是可变长度。 MAC头部有frame control&#xff0c;duration&#xff0c;address1&#xff0c;address2&#xff0c;addre…...

Python爬虫(一):爬虫伪装

一、网站防爬机制概述 在当今互联网环境中&#xff0c;具有一定规模或盈利性质的网站几乎都实施了各种防爬措施。这些措施主要分为两大类&#xff1a; 身份验证机制&#xff1a;直接将未经授权的爬虫阻挡在外反爬技术体系&#xff1a;通过各种技术手段增加爬虫获取数据的难度…...

LLM基础1_语言模型如何处理文本

基于GitHub项目&#xff1a;https://github.com/datawhalechina/llms-from-scratch-cn 工具介绍 tiktoken&#xff1a;OpenAI开发的专业"分词器" torch&#xff1a;Facebook开发的强力计算引擎&#xff0c;相当于超级计算器 理解词嵌入&#xff1a;给词语画"…...

MySQL中【正则表达式】用法

MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现&#xff08;两者等价&#xff09;&#xff0c;用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例&#xff1a; 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...

JVM暂停(Stop-The-World,STW)的原因分类及对应排查方案

JVM暂停(Stop-The-World,STW)的完整原因分类及对应排查方案,结合JVM运行机制和常见故障场景整理而成: 一、GC相关暂停​​ 1. ​​安全点(Safepoint)阻塞​​ ​​现象​​:JVM暂停但无GC日志,日志显示No GCs detected。​​原因​​:JVM等待所有线程进入安全点(如…...

全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比

目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec&#xff1f; IPsec VPN 5.1 IPsec传输模式&#xff08;Transport Mode&#xff09; 5.2 IPsec隧道模式&#xff08;Tunne…...

佰力博科技与您探讨热释电测量的几种方法

热释电的测量主要涉及热释电系数的测定&#xff0c;这是表征热释电材料性能的重要参数。热释电系数的测量方法主要包括静态法、动态法和积分电荷法。其中&#xff0c;积分电荷法最为常用&#xff0c;其原理是通过测量在电容器上积累的热释电电荷&#xff0c;从而确定热释电系数…...