当前位置: 首页 > news >正文

GenericWriteAheadSink每次checkpoint后事务是否必须成功

背景

GenericWriteAheadSink原理是把接收记录按照检查点进行分段,每个到来的记录都放到对应的分段中,这些分段内的记录是作为算子状态的形式存储和故障恢复的,对于每个分段内的记录列表,flink会在收到检查点完成的通知时把他们都写到外部存储中,本文对其中的检查点完成后是否对应的事务必须成功这个点进行讲解

源码解析GenericWriteAheadSink

首先开始进行checkpoint时代码如下

public void snapshotState(StateSnapshotContext context) throws Exception {super.snapshotState(context);// 把检查点id先放入本地变量中saveHandleInState(context.getCheckpointId(), context.getCheckpointTimestamp());this.checkpointedState.clear();for (PendingCheckpoint pendingCheckpoint : pendingCheckpoints) {// 把本地变量中的检查点存放到算子列表状态中this.checkpointedState.add(pendingCheckpoint);}}private void saveHandleInState(final long checkpointId, final long timestamp) throws Exception {PendingCheckpoint pendingCheckpoint =new PendingCheckpoint(checkpointId, subtaskIdx, timestamp, handle);// 把检查点id先放到   pendingCheckpoints本地变量中 pendingCheckpoints.add(pendingCheckpoint);}

其实接收检查点完成的通知:

public void notifyCheckpointComplete(long checkpointId) throws Exception {super.notifyCheckpointComplete(checkpointId);synchronized (pendingCheckpoints) {Iterator<PendingCheckpoint> pendingCheckpointIt = pendingCheckpoints.iterator();while (pendingCheckpointIt.hasNext()) {PendingCheckpoint pendingCheckpoint = pendingCheckpointIt.next();long pastCheckpointId = pendingCheckpoint.checkpointId;int subtaskId = pendingCheckpoint.subtaskId;long timestamp = pendingCheckpoint.timestamp;StreamStateHandle streamHandle = pendingCheckpoint.stateHandle;//把历史的+当前的还没有成功提交的检查点id对应的事务,重新调用sendValue方法并提交对应检查点的事务if (pastCheckpointId <= checkpointId) {try {// 历史的或者当前的事务未提交if (!committer.isCheckpointCommitted(subtaskId, pastCheckpointId)) {try (FSDataInputStream in = streamHandle.openInputStream()) {// 调用sendValue方法写数据boolean success =sendValues(new ReusingMutableToRegularIteratorWrapper<>(new InputViewIterator<>(new DataInputViewStreamWrapper(in),serializer),serializer),pastCheckpointId,timestamp);if (success) {//提交对应检查点对应的事务committer.commitCheckpoint(subtaskId, pastCheckpointId);streamHandle.discardState();pendingCheckpointIt.remove();}}} else {streamHandle.discardState();pendingCheckpointIt.remove();}} catch (Exception e) {// we have to break here to prevent a new (later) checkpoint// from being committed before this oneLOG.error("Could not commit checkpoint.", e);break;}}}}}

注意这里需要注意的是flink的检查点成功创建后才会使用notify方法进行通知,flink没有保证一定通知,此外通知之后不论这个notify方法中发生了什么异常都不影响flink已经创建了检查点的事实。
对应到我们这个例子,你就会发现在notify方法中有需要把历史检查点已经创建成功但是对应的事务没有提交的事务重新调用一次sendValue方法和提交对应检查点的事务,也就是说不是每一次检查点都能成功的提交事务,如果事务没有提交成功,等待下一次检查点的通知即可,下一个检查点的通知会把历史的检查点重新检测一次.

相关文章:

GenericWriteAheadSink每次checkpoint后事务是否必须成功

背景 GenericWriteAheadSink原理是把接收记录按照检查点进行分段&#xff0c;每个到来的记录都放到对应的分段中&#xff0c;这些分段内的记录是作为算子状态的形式存储和故障恢复的&#xff0c;对于每个分段内的记录列表&#xff0c;flink会在收到检查点完成的通知时把他们都…...

[深入浅出AutoSAR] SWC 设计与应用

依AutoSAR及经验辛苦整理&#xff0c;原创保护&#xff0c;禁止转载。 专栏 《深入浅出AutoSAR》 全文 3100 字&#xff0c; 包含 1. SWC 概念 2. 数据类型&#xff08;Datatype&#xff09; 3. 端口&#xff08;Port&#xff09; 4. 端口接口&#xff08;Portinterface&…...

【Ubuntu系统搭建STM32开发环境(国内镜像全程快速配置)】

源于本人失败的经历苦心研究 虚拟机安装ubuntu换源VScode安装安装Java环境安装cubemx安装 arm-Linux-gcc安装gdb server安装OpenOCD 虚拟机安装ubuntu 系统镜像可以在阿里云镜像站且下载速度很快。 选择安装的版本。 我选择的是&#xff1a;ubuntu-22.10-desktop-amd64.iso。…...

Java 中的 Default 关键字

default 关键字&#xff1a;是在 Java 8 中引入的新概念&#xff0c;也可称为 Virtual extension methods——虚拟扩展方法与public、private等都属于修饰符关键字&#xff0c;与其它两个关键字不同之处在于default关键字大部分都用于修饰接口。 default 修饰方法时只能在接口…...

AdaBoost:增强机器学习的力量

一、介绍 机器学习已成为现代技术的基石&#xff0c;为从推荐系统到自动驾驶汽车的一切提供动力。在众多机器学习算法中&#xff0c;AdaBoost&#xff08;自适应增强的缩写&#xff09;作为一种强大的集成方法脱颖而出&#xff0c;为该领域的成功做出了重大贡献。AdaBoost 是一…...

c++踩坑点,类型转换

std::string转换到PVOID std::string转换到PVOID的方式如下 这样的话成功转换 “const char *” 类型的实参与 “WCHAR *” “const char *” 类型的实参与 “WCHAR *” 类型的形参不兼容 可以看到这种报错&#xff0c;可以直接强转如下&#xff1a; 但是在我们这里不适…...

mysql—面试50题—1

注&#xff1a;面试50题将分为5个部分&#xff0c;每部分10题 一、查询数据 学生表 Student create table Student(SId varchar(10),Sname varchar(10),Sage datetime,Ssex varchar(10)); insert into Student values(01 , 赵雷 , 1990-01-01 , 男); insert into Student …...

vue解决报错Unable to preventDefault inside passive event listener invocation.

"Unable to preventDefault inside passive event listener invocation"是浏览器开发中的一个警告信息。这个警告通常出现在使用passive事件监听器时&#xff0c;当在事件处理函数中调用preventDefault()方法时会引发该警告。 在传统的事件监听模型中&#xff0c;当…...

实际项目中最常用的设计模式

在软件开发领域,设计模式是一种经过验证的通用解决方案,用于解决各种常见问题。它们有助于提高代码的可维护性、可扩展性和可重用性。虽然有许多不同的设计模式,但以下是实际项目中最常用的一些: 1. 单例模式 (Singleton Pattern) 单例模式确保一个类只有一个实例,并提供…...

使用stream流根据对象属性对复杂list对象去重

日常开发中&#xff0c;我们可能会遇到这样一种情况&#xff0c;需要对数据库查询出来的数据进行一个二次处理&#xff0c;从而达到我们需要的数据结构。stream流正是java8提供的对复杂list操作方便工具。 我们先介绍如何使用stream流根据对象属性对复杂list对象去重&#xff0…...

vue3脚手架搭建

一.安装 vue3.0 脚手架 如果之前安装了2.0的脚手架&#xff0c;要先卸载掉&#xff0c;输入&#xff1a; npm uninstall vue-cli -g 进行全局卸载 1.安装node.js&#xff08;npm&#xff09; node.js&#xff1a;简单的说 Node.js 就是运行在服务端的 JavaScript。Node.js 是…...

MongoDB 未授权访问漏洞

简介 MongoDB是一个基于分布式文件存储的数据库&#xff0c;是一个介于关系数据库和非关系数据库之间的产品&#xff0c;它的特点是高性能、易部署、易使用&#xff0c;存储数据非常方便&#xff0c;默认情况下是没有认证的这就导致不熟悉它的研发人员部署后没有做访问控制导致…...

花5分钟学习机器学习基础知识

一、什么是机器学习 机器学习的目的是让机器学习,而不是执行预设的算法。 机器学习适用于难以制定规则的问题,如垃圾邮件识别、图像识别。 机器学习模拟人类学习过程:从样本中学习归纳总结,形成模型,然后应用模型完成任务。 机器学习需要大量样本数据和计算能力支持。当前数…...

Qt学习:使用OpenGL绘制3D图形

文章目录 前言一、Qt下使用OpenGL绘制图形介绍二、示例完整代码总结 前言 文章中引用的内容均来自这本书中的原文&#xff1a;【Qt Creator快速入门_霍亚飞编著】&#xff0c;本文的示例也是在书中代码的基础上进行编写的&#xff08;其中部分代码使用原文编译不过&#xff0c…...

在chrom浏览器安装Vue.js devtools插件,遇到恶意扩展程序字样,附百度网盘下载链接

遇到的问题 拖拽下载好的 Vue.js devtools 插件到谷歌扩展程序&#xff0c; 百度网盘下载地址 链接&#xff1a;https://pan.baidu.com/s/1FeK6pwc2UzRUUlMFN3rW5w?pwdw361 提取码&#xff1a;w361 提示&#xff1a; 解决办法 将Vue.js devtools 插件的后缀从.crx改为.zi…...

WSL2的安装与配置(创建Anaconda虚拟环境、更新软件包、安装PyTorch、VSCode)

1. WSL2 安装 以管理员身份打开 PowerShell&#xff08;“开始”菜单 >“PowerShell” >单击右键 >“以管理员身份运行”&#xff09;&#xff0c;然后输入以下命令&#xff1a; dism.exe /online /enable-feature /featurename:Microsoft-Windows-Subsystem-Linux /a…...

【鸿蒙软件开发】ArkTS常见组件之单选框Radio和切换按钮Toggle

文章目录 前言一、Radio单选框1.1 创建单选框1.2 添加Radio事件1.3 场景示例二、切换按钮Toggle2.1 创建切换按钮2.2 创建有子组件的Toggle2.3 自定义样式selectedColor属性switchPointColor属性 2.4 添加事件2.5 示例代码 总结 前言 Radio是单选框组件&#xff0c;通常用于提…...

今年阿里云双十一服务器优惠价格讨论_看看大家怎么说?

2023阿里云双十一云服务器大概会降到什么区间&#xff1f;阿里云服务器网认为会在当前的优惠价格基础上&#xff0c;降价10%左右&#xff0c;可以在阿里云CLUB中心领券&#xff1a;aliyun.club 云服务器专用满减优惠券。阿里云服务器网从各个渠道了解到大家对今年阿里云双十一服…...

LC-1402. 做菜顺序(记忆化搜索 ==> 动态规划、贪心)

1402. 做菜顺序 困难 一个厨师收集了他 n 道菜的满意程度 satisfaction &#xff0c;这个厨师做出每道菜的时间都是 1 单位时间。 一道菜的 「 like-time 系数 」定义为烹饪这道菜结束的时间&#xff08;包含之前每道菜所花费的时间&#xff09;乘以这道菜的满意程度&#x…...

泰森多边形

泰森多边形 93 泰森多边形又叫沃洛诺伊图&#xff08;Voronoi diagram&#xff09;&#xff0c;得名于Georgy Voronoi&#xff0c;是一组由连接两邻点线段的垂直平分线组成的连续多边形。一个泰森多边形内的任一点到构成该多边形的控制点的距离小于到其他多边形控制点的距离。…...

MySQL 隔离级别:脏读、幻读及不可重复读的原理与示例

一、MySQL 隔离级别 MySQL 提供了四种隔离级别,用于控制事务之间的并发访问以及数据的可见性,不同隔离级别对脏读、幻读、不可重复读这几种并发数据问题有着不同的处理方式,具体如下: 隔离级别脏读不可重复读幻读性能特点及锁机制读未提交(READ UNCOMMITTED)允许出现允许…...

.Net框架,除了EF还有很多很多......

文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...

ESP32读取DHT11温湿度数据

芯片&#xff1a;ESP32 环境&#xff1a;Arduino 一、安装DHT11传感器库 红框的库&#xff0c;别安装错了 二、代码 注意&#xff0c;DATA口要连接在D15上 #include "DHT.h" // 包含DHT库#define DHTPIN 15 // 定义DHT11数据引脚连接到ESP32的GPIO15 #define D…...

Keil 中设置 STM32 Flash 和 RAM 地址详解

文章目录 Keil 中设置 STM32 Flash 和 RAM 地址详解一、Flash 和 RAM 配置界面(Target 选项卡)1. IROM1(用于配置 Flash)2. IRAM1(用于配置 RAM)二、链接器设置界面(Linker 选项卡)1. 勾选“Use Memory Layout from Target Dialog”2. 查看链接器参数(如果没有勾选上面…...

Neo4j 集群管理:原理、技术与最佳实践深度解析

Neo4j 的集群技术是其企业级高可用性、可扩展性和容错能力的核心。通过深入分析官方文档,本文将系统阐述其集群管理的核心原理、关键技术、实用技巧和行业最佳实践。 Neo4j 的 Causal Clustering 架构提供了一个强大而灵活的基石,用于构建高可用、可扩展且一致的图数据库服务…...

PL0语法,分析器实现!

简介 PL/0 是一种简单的编程语言,通常用于教学编译原理。它的语法结构清晰,功能包括常量定义、变量声明、过程(子程序)定义以及基本的控制结构(如条件语句和循环语句)。 PL/0 语法规范 PL/0 是一种教学用的小型编程语言,由 Niklaus Wirth 设计,用于展示编译原理的核…...

高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数

高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数 在软件开发中,单例模式(Singleton Pattern)是一种常见的设计模式,确保一个类仅有一个实例,并提供一个全局访问点。在多线程环境下,实现单例模式时需要注意线程安全问题,以防止多个线程同时创建实例,导致…...

代码随想录刷题day30

1、零钱兑换II 给你一个整数数组 coins 表示不同面额的硬币&#xff0c;另给一个整数 amount 表示总金额。 请你计算并返回可以凑成总金额的硬币组合数。如果任何硬币组合都无法凑出总金额&#xff0c;返回 0 。 假设每一种面额的硬币有无限个。 题目数据保证结果符合 32 位带…...

Vite中定义@软链接

在webpack中可以直接通过符号表示src路径&#xff0c;但是vite中默认不可以。 如何实现&#xff1a; vite中提供了resolve.alias&#xff1a;通过别名在指向一个具体的路径 在vite.config.js中 import { join } from pathexport default defineConfig({plugins: [vue()],//…...

MySQL 主从同步异常处理

阅读原文&#xff1a;https://www.xiaozaoshu.top/articles/mysql-m-s-update-pk MySQL 做双主&#xff0c;遇到的这个错误&#xff1a; Could not execute Update_rows event on table ... Error_code: 1032是 MySQL 主从复制时的经典错误之一&#xff0c;通常表示&#xff…...