当前位置: 首页 > news >正文

GenericWriteAheadSink每次checkpoint后事务是否必须成功

背景

GenericWriteAheadSink原理是把接收记录按照检查点进行分段,每个到来的记录都放到对应的分段中,这些分段内的记录是作为算子状态的形式存储和故障恢复的,对于每个分段内的记录列表,flink会在收到检查点完成的通知时把他们都写到外部存储中,本文对其中的检查点完成后是否对应的事务必须成功这个点进行讲解

源码解析GenericWriteAheadSink

首先开始进行checkpoint时代码如下

public void snapshotState(StateSnapshotContext context) throws Exception {super.snapshotState(context);// 把检查点id先放入本地变量中saveHandleInState(context.getCheckpointId(), context.getCheckpointTimestamp());this.checkpointedState.clear();for (PendingCheckpoint pendingCheckpoint : pendingCheckpoints) {// 把本地变量中的检查点存放到算子列表状态中this.checkpointedState.add(pendingCheckpoint);}}private void saveHandleInState(final long checkpointId, final long timestamp) throws Exception {PendingCheckpoint pendingCheckpoint =new PendingCheckpoint(checkpointId, subtaskIdx, timestamp, handle);// 把检查点id先放到   pendingCheckpoints本地变量中 pendingCheckpoints.add(pendingCheckpoint);}

其实接收检查点完成的通知:

public void notifyCheckpointComplete(long checkpointId) throws Exception {super.notifyCheckpointComplete(checkpointId);synchronized (pendingCheckpoints) {Iterator<PendingCheckpoint> pendingCheckpointIt = pendingCheckpoints.iterator();while (pendingCheckpointIt.hasNext()) {PendingCheckpoint pendingCheckpoint = pendingCheckpointIt.next();long pastCheckpointId = pendingCheckpoint.checkpointId;int subtaskId = pendingCheckpoint.subtaskId;long timestamp = pendingCheckpoint.timestamp;StreamStateHandle streamHandle = pendingCheckpoint.stateHandle;//把历史的+当前的还没有成功提交的检查点id对应的事务,重新调用sendValue方法并提交对应检查点的事务if (pastCheckpointId <= checkpointId) {try {// 历史的或者当前的事务未提交if (!committer.isCheckpointCommitted(subtaskId, pastCheckpointId)) {try (FSDataInputStream in = streamHandle.openInputStream()) {// 调用sendValue方法写数据boolean success =sendValues(new ReusingMutableToRegularIteratorWrapper<>(new InputViewIterator<>(new DataInputViewStreamWrapper(in),serializer),serializer),pastCheckpointId,timestamp);if (success) {//提交对应检查点对应的事务committer.commitCheckpoint(subtaskId, pastCheckpointId);streamHandle.discardState();pendingCheckpointIt.remove();}}} else {streamHandle.discardState();pendingCheckpointIt.remove();}} catch (Exception e) {// we have to break here to prevent a new (later) checkpoint// from being committed before this oneLOG.error("Could not commit checkpoint.", e);break;}}}}}

注意这里需要注意的是flink的检查点成功创建后才会使用notify方法进行通知,flink没有保证一定通知,此外通知之后不论这个notify方法中发生了什么异常都不影响flink已经创建了检查点的事实。
对应到我们这个例子,你就会发现在notify方法中有需要把历史检查点已经创建成功但是对应的事务没有提交的事务重新调用一次sendValue方法和提交对应检查点的事务,也就是说不是每一次检查点都能成功的提交事务,如果事务没有提交成功,等待下一次检查点的通知即可,下一个检查点的通知会把历史的检查点重新检测一次.

相关文章:

GenericWriteAheadSink每次checkpoint后事务是否必须成功

背景 GenericWriteAheadSink原理是把接收记录按照检查点进行分段&#xff0c;每个到来的记录都放到对应的分段中&#xff0c;这些分段内的记录是作为算子状态的形式存储和故障恢复的&#xff0c;对于每个分段内的记录列表&#xff0c;flink会在收到检查点完成的通知时把他们都…...

[深入浅出AutoSAR] SWC 设计与应用

依AutoSAR及经验辛苦整理&#xff0c;原创保护&#xff0c;禁止转载。 专栏 《深入浅出AutoSAR》 全文 3100 字&#xff0c; 包含 1. SWC 概念 2. 数据类型&#xff08;Datatype&#xff09; 3. 端口&#xff08;Port&#xff09; 4. 端口接口&#xff08;Portinterface&…...

【Ubuntu系统搭建STM32开发环境(国内镜像全程快速配置)】

源于本人失败的经历苦心研究 虚拟机安装ubuntu换源VScode安装安装Java环境安装cubemx安装 arm-Linux-gcc安装gdb server安装OpenOCD 虚拟机安装ubuntu 系统镜像可以在阿里云镜像站且下载速度很快。 选择安装的版本。 我选择的是&#xff1a;ubuntu-22.10-desktop-amd64.iso。…...

Java 中的 Default 关键字

default 关键字&#xff1a;是在 Java 8 中引入的新概念&#xff0c;也可称为 Virtual extension methods——虚拟扩展方法与public、private等都属于修饰符关键字&#xff0c;与其它两个关键字不同之处在于default关键字大部分都用于修饰接口。 default 修饰方法时只能在接口…...

AdaBoost:增强机器学习的力量

一、介绍 机器学习已成为现代技术的基石&#xff0c;为从推荐系统到自动驾驶汽车的一切提供动力。在众多机器学习算法中&#xff0c;AdaBoost&#xff08;自适应增强的缩写&#xff09;作为一种强大的集成方法脱颖而出&#xff0c;为该领域的成功做出了重大贡献。AdaBoost 是一…...

c++踩坑点,类型转换

std::string转换到PVOID std::string转换到PVOID的方式如下 这样的话成功转换 “const char *” 类型的实参与 “WCHAR *” “const char *” 类型的实参与 “WCHAR *” 类型的形参不兼容 可以看到这种报错&#xff0c;可以直接强转如下&#xff1a; 但是在我们这里不适…...

mysql—面试50题—1

注&#xff1a;面试50题将分为5个部分&#xff0c;每部分10题 一、查询数据 学生表 Student create table Student(SId varchar(10),Sname varchar(10),Sage datetime,Ssex varchar(10)); insert into Student values(01 , 赵雷 , 1990-01-01 , 男); insert into Student …...

vue解决报错Unable to preventDefault inside passive event listener invocation.

"Unable to preventDefault inside passive event listener invocation"是浏览器开发中的一个警告信息。这个警告通常出现在使用passive事件监听器时&#xff0c;当在事件处理函数中调用preventDefault()方法时会引发该警告。 在传统的事件监听模型中&#xff0c;当…...

实际项目中最常用的设计模式

在软件开发领域,设计模式是一种经过验证的通用解决方案,用于解决各种常见问题。它们有助于提高代码的可维护性、可扩展性和可重用性。虽然有许多不同的设计模式,但以下是实际项目中最常用的一些: 1. 单例模式 (Singleton Pattern) 单例模式确保一个类只有一个实例,并提供…...

使用stream流根据对象属性对复杂list对象去重

日常开发中&#xff0c;我们可能会遇到这样一种情况&#xff0c;需要对数据库查询出来的数据进行一个二次处理&#xff0c;从而达到我们需要的数据结构。stream流正是java8提供的对复杂list操作方便工具。 我们先介绍如何使用stream流根据对象属性对复杂list对象去重&#xff0…...

vue3脚手架搭建

一.安装 vue3.0 脚手架 如果之前安装了2.0的脚手架&#xff0c;要先卸载掉&#xff0c;输入&#xff1a; npm uninstall vue-cli -g 进行全局卸载 1.安装node.js&#xff08;npm&#xff09; node.js&#xff1a;简单的说 Node.js 就是运行在服务端的 JavaScript。Node.js 是…...

MongoDB 未授权访问漏洞

简介 MongoDB是一个基于分布式文件存储的数据库&#xff0c;是一个介于关系数据库和非关系数据库之间的产品&#xff0c;它的特点是高性能、易部署、易使用&#xff0c;存储数据非常方便&#xff0c;默认情况下是没有认证的这就导致不熟悉它的研发人员部署后没有做访问控制导致…...

花5分钟学习机器学习基础知识

一、什么是机器学习 机器学习的目的是让机器学习,而不是执行预设的算法。 机器学习适用于难以制定规则的问题,如垃圾邮件识别、图像识别。 机器学习模拟人类学习过程:从样本中学习归纳总结,形成模型,然后应用模型完成任务。 机器学习需要大量样本数据和计算能力支持。当前数…...

Qt学习:使用OpenGL绘制3D图形

文章目录 前言一、Qt下使用OpenGL绘制图形介绍二、示例完整代码总结 前言 文章中引用的内容均来自这本书中的原文&#xff1a;【Qt Creator快速入门_霍亚飞编著】&#xff0c;本文的示例也是在书中代码的基础上进行编写的&#xff08;其中部分代码使用原文编译不过&#xff0c…...

在chrom浏览器安装Vue.js devtools插件,遇到恶意扩展程序字样,附百度网盘下载链接

遇到的问题 拖拽下载好的 Vue.js devtools 插件到谷歌扩展程序&#xff0c; 百度网盘下载地址 链接&#xff1a;https://pan.baidu.com/s/1FeK6pwc2UzRUUlMFN3rW5w?pwdw361 提取码&#xff1a;w361 提示&#xff1a; 解决办法 将Vue.js devtools 插件的后缀从.crx改为.zi…...

WSL2的安装与配置(创建Anaconda虚拟环境、更新软件包、安装PyTorch、VSCode)

1. WSL2 安装 以管理员身份打开 PowerShell&#xff08;“开始”菜单 >“PowerShell” >单击右键 >“以管理员身份运行”&#xff09;&#xff0c;然后输入以下命令&#xff1a; dism.exe /online /enable-feature /featurename:Microsoft-Windows-Subsystem-Linux /a…...

【鸿蒙软件开发】ArkTS常见组件之单选框Radio和切换按钮Toggle

文章目录 前言一、Radio单选框1.1 创建单选框1.2 添加Radio事件1.3 场景示例二、切换按钮Toggle2.1 创建切换按钮2.2 创建有子组件的Toggle2.3 自定义样式selectedColor属性switchPointColor属性 2.4 添加事件2.5 示例代码 总结 前言 Radio是单选框组件&#xff0c;通常用于提…...

今年阿里云双十一服务器优惠价格讨论_看看大家怎么说?

2023阿里云双十一云服务器大概会降到什么区间&#xff1f;阿里云服务器网认为会在当前的优惠价格基础上&#xff0c;降价10%左右&#xff0c;可以在阿里云CLUB中心领券&#xff1a;aliyun.club 云服务器专用满减优惠券。阿里云服务器网从各个渠道了解到大家对今年阿里云双十一服…...

LC-1402. 做菜顺序(记忆化搜索 ==> 动态规划、贪心)

1402. 做菜顺序 困难 一个厨师收集了他 n 道菜的满意程度 satisfaction &#xff0c;这个厨师做出每道菜的时间都是 1 单位时间。 一道菜的 「 like-time 系数 」定义为烹饪这道菜结束的时间&#xff08;包含之前每道菜所花费的时间&#xff09;乘以这道菜的满意程度&#x…...

泰森多边形

泰森多边形 93 泰森多边形又叫沃洛诺伊图&#xff08;Voronoi diagram&#xff09;&#xff0c;得名于Georgy Voronoi&#xff0c;是一组由连接两邻点线段的垂直平分线组成的连续多边形。一个泰森多边形内的任一点到构成该多边形的控制点的距离小于到其他多边形控制点的距离。…...

国防科技大学计算机基础课程笔记02信息编码

1.机内码和国标码 国标码就是我们非常熟悉的这个GB2312,但是因为都是16进制&#xff0c;因此这个了16进制的数据既可以翻译成为这个机器码&#xff0c;也可以翻译成为这个国标码&#xff0c;所以这个时候很容易会出现这个歧义的情况&#xff1b; 因此&#xff0c;我们的这个国…...

ES6从入门到精通:前言

ES6简介 ES6&#xff08;ECMAScript 2015&#xff09;是JavaScript语言的重大更新&#xff0c;引入了许多新特性&#xff0c;包括语法糖、新数据类型、模块化支持等&#xff0c;显著提升了开发效率和代码可维护性。 核心知识点概览 变量声明 let 和 const 取代 var&#xf…...

Swift 协议扩展精进之路:解决 CoreData 托管实体子类的类型不匹配问题(下)

概述 在 Swift 开发语言中&#xff0c;各位秃头小码农们可以充分利用语法本身所带来的便利去劈荆斩棘。我们还可以恣意利用泛型、协议关联类型和协议扩展来进一步简化和优化我们复杂的代码需求。 不过&#xff0c;在涉及到多个子类派生于基类进行多态模拟的场景下&#xff0c;…...

聊一聊接口测试的意义有哪些?

目录 一、隔离性 & 早期测试 二、保障系统集成质量 三、验证业务逻辑的核心层 四、提升测试效率与覆盖度 五、系统稳定性的守护者 六、驱动团队协作与契约管理 七、性能与扩展性的前置评估 八、持续交付的核心支撑 接口测试的意义可以从四个维度展开&#xff0c;首…...

精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南

精益数据分析&#xff08;97/126&#xff09;&#xff1a;邮件营销与用户参与度的关键指标优化指南 在数字化营销时代&#xff0c;邮件列表效度、用户参与度和网站性能等指标往往决定着创业公司的增长成败。今天&#xff0c;我们将深入解析邮件打开率、网站可用性、页面参与时…...

MySQL 知识小结(一)

一、my.cnf配置详解 我们知道安装MySQL有两种方式来安装咱们的MySQL数据库&#xff0c;分别是二进制安装编译数据库或者使用三方yum来进行安装,第三方yum的安装相对于二进制压缩包的安装更快捷&#xff0c;但是文件存放起来数据比较冗余&#xff0c;用二进制能够更好管理咱们M…...

STM32---外部32.768K晶振(LSE)无法起振问题

晶振是否起振主要就检查两个1、晶振与MCU是否兼容&#xff1b;2、晶振的负载电容是否匹配 目录 一、判断晶振与MCU是否兼容 二、判断负载电容是否匹配 1. 晶振负载电容&#xff08;CL&#xff09;与匹配电容&#xff08;CL1、CL2&#xff09;的关系 2. 如何选择 CL1 和 CL…...

tomcat入门

1 tomcat 是什么 apache开发的web服务器可以为java web程序提供运行环境tomcat是一款高效&#xff0c;稳定&#xff0c;易于使用的web服务器tomcathttp服务器Servlet服务器 2 tomcat 目录介绍 -bin #存放tomcat的脚本 -conf #存放tomcat的配置文件 ---catalina.policy #to…...

goreplay

1.github地址 https://github.com/buger/goreplay 2.简单介绍 GoReplay 是一个开源的网络监控工具&#xff0c;可以记录用户的实时流量并将其用于镜像、负载测试、监控和详细分析。 3.出现背景 随着应用程序的增长&#xff0c;测试它所需的工作量也会呈指数级增长。GoRepl…...

【Java多线程从青铜到王者】单例设计模式(八)

wait和sleep的区别 我们的wait也是提供了一个还有超时时间的版本&#xff0c;sleep也是可以指定时间的&#xff0c;也就是说时间一到就会解除阻塞&#xff0c;继续执行 wait和sleep都能被提前唤醒(虽然时间还没有到也可以提前唤醒)&#xff0c;wait能被notify提前唤醒&#xf…...