flink的TwoPhaseCommitSinkFunction怎么做才能提供精准一次保证
背景
TwoPhaseCommitSinkFunction是flink中基于二阶段事务提交和检查点机制配合使用实现的精准一次的输出数据汇,但是想要实现精准一次的输出,实际使用中需要注意几个方面,否则不仅仅达不到精准一次输出,反而可能导致数据丢失,连至少一次的语义都不能达到
TwoPhaseCommitSinkFunction注意事项
TwoPhaseCommitSinkFunction是通过在两阶段提交协议实现的事务,大概简化为一下步骤:
1 在收到检查点分隔符的时候,开启事务,并把记录都写到开启的事务中,
2. 开始进行状态的保存时,把检查点id对应的事务结束掉,做好准备提交的准备,并开启下一个事务
public void snapshotState(FunctionSnapshotContext context) throws Exception {// this is like the pre-commit of a 2-phase-commit transaction// we are ready to commit and remember the transactioncheckState(currentTransactionHolder != null,"bug: no transaction object when performing state snapshot");long checkpointId = context.getCheckpointId();LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'",name(),context.getCheckpointId(),currentTransactionHolder);//当前检查点对应的事务做好准备,比如进行stream.flush等,准备好提交事务preCommit(currentTransactionHolder.handle);// 把当前检查点id对应的事务添加到状态中pendingCommitTransactions.put(checkpointId, currentTransactionHolder);LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);currentTransactionHolder = beginTransactionInternal();LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);// 把当前检查点id对应的事务添加到状态中state.clear();state.add(new State<>(this.currentTransactionHolder,new ArrayList<>(pendingCommitTransactions.values()),userContext));}
- 收到检查点完成的通知notify方法,提交第二步中检查点id对应的事务,注意这一步不是每次flink在进行检查点的时候都会通知,这种情况下,某一次的notify方法就需要把前几次的事务一起进行提交了,另外,如果提交某个检查点的事务失败,那么应用会重启,并且在重启后的initSnapshot方法中再次进行事务提交,如果还是失败,这个过程一直持续
public final void notifyCheckpointComplete(long checkpointId) throws Exception {Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator =pendingCommitTransactions.entrySet().iterator();Throwable firstError = null;while (pendingTransactionIterator.hasNext()) {Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();Long pendingTransactionCheckpointId = entry.getKey();TransactionHolder<TXN> pendingTransaction = entry.getValue();if (pendingTransactionCheckpointId > checkpointId) {continue;}LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}",name(),checkpointId,pendingTransaction,pendingTransactionCheckpointId);logWarningIfTimeoutAlmostReached(pendingTransaction);try {// 提交事务commit(pendingTransaction.handle);} catch (Throwable t) {//事务失败时记录异常,后面会把异常抛出导致应用重启if (firstError == null) {firstError = t;}}LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);// 事务成功后移除当前的事务pendingTransactionIterator.remove();}if (firstError != null) {// 事务提交失败会抛出异常,导致job异常中止throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",firstError);}}
总结:
1。事务不能提交失败,如果失败会导致作业失败然后重新提交,如果最终没有成功提交,那么数据会丢失
2。数据库服务端的事务超时时间不能设置太短,不能仅仅大于检查点的间隔大小,原因是上面说的,flink有可能丢失检查点完成后的通知消息,所以服务端的事务超时时间要设置的足够大.
相关文章:
flink的TwoPhaseCommitSinkFunction怎么做才能提供精准一次保证
背景 TwoPhaseCommitSinkFunction是flink中基于二阶段事务提交和检查点机制配合使用实现的精准一次的输出数据汇,但是想要实现精准一次的输出,实际使用中需要注意几个方面,否则不仅仅达不到精准一次输出,反而可能导致数据丢失&am…...
CMake系列讲解(入门篇)1.8 基础命令CMake-set() unset()
基础命令set() unset() 〓〓〓〓〓〓〓〓踏实学CMake总目录〓〓〓〓〓〓〓〓〓〓 8. set() unset() 在CMake中,set用于设置变量的值。这个变量可以为普通变量、Cache或者是环境变量。 如果提供了一个或多个 <value> 参数,则将 <variable> 设…...
【C++ 学习 ㉙】- 详解 C++11 的 constexpr 和 decltype 关键字
目录 一、constexpr 关键字 1.1 - constexpr 修饰普通变量 1.2 - constexpr 修饰函数 1.3 - constexpr 修饰类的构造函数 1.4 - constexpr 和 const 的区别 二、decltype 关键字 2.1 - 推导规则 2.2 - 实际应用 一、constexpr 关键字 constexpr 是 C11 新引入的关键字…...
js获取视频编码
一.背景 有些浏览器不支持某些视频的编码方式导致播放出现问题,这个时候要限制视频上传 二.插件 https://unpkg.com/mediainfo.js0.1.4/dist/mediainfo.min.js 三.完整html代码 <!DOCTYPE html> <html lang"en"> <head><meta ch…...
560. 和为 K 的子数组 --力扣 --JAVA
题目 给你一个整数数组 nums 和一个整数 k ,请你统计并返回 该数组中和为 k 的连续子数组的个数 。 子数组是数组中元素的连续非空序列。 解题思路 数组项累加可以使用双层循环进行遍历;子数组的长度是不确定的,也可能存在1 1 2和1 1 - 1…...
【趣味随笔】农业机器人的种类与发展前景
📢:如果你也对机器人、人工智能感兴趣,看来我们志同道合✨ 📢:不妨浏览一下我的博客主页【https://blog.csdn.net/weixin_51244852】 📢:文章若有幸对你有帮助,可点赞 👍…...
使用CountdownLatch和线程池批量处理http请求,并处理响应数据
背景和问题 背景:最近项目的一个接口数据,需要去请求其他多个服务器的数据,然后统一返回; 问题点:如果遍历所有的服务器地址,然后串行请求就会出现请求时间过长,加入需要请求十个服务器&…...
记录--怎么写一个可以鼠标控制旋转的div?
这里给大家分享我在网上总结出来的一些知识,希望对大家有所帮助 说在前面 鼠标控制元素旋转在现在也是一个很常见的功能,让我们从实现div元素的旋转控制开始来了解元素旋转的具体原理和实现方法吧。 效果展示 体验地址 code.juejin.cn/pen/7290719… 实现…...
JVM第十八讲:调试排错 - Java 问题排查之工具单
调试排错 - Java 问题排查之工具单 程序员想要有更好的发展,排查问题的能力一定得加强。举个例子:cpu100% 怎么排查,线上接口逐渐变慢了该怎么排查?慢查询该如何治理?你的思路是啥?本文是JVM第十八讲&#…...
JAVA基础-正则表达式(12)
目录 Java 正则表达式正则表达式实例正则表达式语法 Matcher 类的方法索引方法查找方法替换方法start 和 end 方法 Java 正则表达式 正则表达式定义了字符串的模式。 正则表达式可以用来搜索、编辑或处理文本。 正则表达式并不仅限于某一种语言,但是在每种语言中有细…...
[论文笔记]GPT-1
引言 今天带来论文Improving Language Understanding by Generative Pre-Training的笔记,它的中文题目为:通过生成式预训练改进语言理解。其实就是GPT的论文。 自然语言理解可以应用于大量NLP任务上,比如文本蕴含、问答、语义相似和文档分类。虽然无标签文本语料是丰富的,…...
【3D 图像分割】基于 Pytorch 的 VNet 3D 图像分割1(综述篇)
在上一个关于3D 目标的任务,是基于普通CNN网络的3D分类任务。在这个任务中,分类数据采用的是CT结节的LIDC-IDRI数据集,其中对结节的良恶性、毛刺、分叶征等等特征进行了各自的等级分类。感兴趣的可以直接点击下方的链接,直达学习&…...
css之Flex弹性布局
文章目录 🐕前言:🏨定义flex容器 display:flex🏨在flex容器中子组件进行排列🪂行排列 flex-direction: row🪂将行排列进行翻转排列 flex-direction: row-reverse🏅按列排列 flex-direction: col…...
web.xml配置详解
在Java Web应用程序中,web.xml是一个XML配置文件,用于定义和配置Servlet、过滤器、监听器和其他Web应用程序组件的行为和属性。web.xml文件通常位于Web应用程序的WEB-INF目录下,用于描述Web应用程序的部署信息和配置。以下是一些web.xml配置的…...
关于我学习Go语言在CSDN分享的心得体会
最近我一直在学习Go语言,并通过CSDN平台分享我的学习心得和体会。在这篇博客中,我将与大家分享我在学习Go语言过程中的经验和收获。希望通过这篇博客能够帮助其他Go语言初学者更好地掌握这门语言,并与广大Go语言爱好者进行交流和互动。 选择…...
Java类的Builder应用以及使用@Data和@Builder高效应用Builder
⭐Java Builder模式:是Java设计模式之一,它属于对象创建型模式,是将一个复杂对象的构建与它的表示分离,使得同样的构建过程可以创建不同的表示。 结论一:使用lombok的Data和Builder注解构建Java类的Builder简洁高效&am…...
【Qt控件之QTabWidget】介绍及使用
描述 QTabWidget类提供了一个带有选项卡的小部件堆栈。 选项卡小部件提供了一个选项卡栏(参见QTabBar)和一个“页面区域”,用于显示与每个选项卡相关联的页面。默认情况下,选项卡栏显示在页面区域的上方,但可以使用…...
Linux实战——网络连接模式的三种模式
Linux可以分为三种网络模式: 桥接模式 (vmnet0) 仅主机模式 (vmnet1) NAT模式 (vmnet8) 当我们下载了vmware之后,在电脑会出现两个虚拟网卡,VMware Network Adapter VMnet1、VMware Network Adapter VMnet8。 可以通过查找 控…...
嵌入式实时操作系统的设计与开发(任意大小的内存管理)
任意大小的内存管理是根据用户需要为其分配内存,即用户需要多大内存就通过acoral_malloc2()为之分配多大内存,同时每块分配出去的内存前面都有一个控制块,控制块里记录了该块内存的大小。 同时未分配出去的内存也有一个控制块,寻…...
文件读取结束的判定
大家好啊,我们今天来补充文件操作的读取结束的判定。 被错误使用的feof 牢记:在文件读取过程中,不能用feof函数的返回值直接用来判断文件的是否结束而是应用于当文件读取结束的时候,判断是读取失败结束,还是遇到文件尾…...
后进先出(LIFO)详解
LIFO 是 Last In, First Out 的缩写,中文译为后进先出。这是一种数据结构的工作原则,类似于一摞盘子或一叠书本: 最后放进去的元素最先出来 -想象往筒状容器里放盘子: (1)你放进的最后一个盘子(…...
云原生玩法三问:构建自定义开发环境
云原生玩法三问:构建自定义开发环境 引言 临时运维一个古董项目,无文档,无环境,无交接人,俗称三无。 运行设备的环境老,本地环境版本高,ssh不过去。正好最近对 腾讯出品的云原生 cnb 感兴趣&…...
管理学院权限管理系统开发总结
文章目录 🎓 管理学院权限管理系统开发总结 - 现代化Web应用实践之路📝 项目概述🏗️ 技术架构设计后端技术栈前端技术栈 💡 核心功能特性1. 用户管理模块2. 权限管理系统3. 统计报表功能4. 用户体验优化 🗄️ 数据库设…...
LangChain知识库管理后端接口:数据库操作详解—— 构建本地知识库系统的基础《二》
这段 Python 代码是一个完整的 知识库数据库操作模块,用于对本地知识库系统中的知识库进行增删改查(CRUD)操作。它基于 SQLAlchemy ORM 框架 和一个自定义的装饰器 with_session 实现数据库会话管理。 📘 一、整体功能概述 该模块…...
【MATLAB代码】基于最大相关熵准则(MCC)的三维鲁棒卡尔曼滤波算法(MCC-KF),附源代码|订阅专栏后可直接查看
文章所述的代码实现了基于最大相关熵准则(MCC)的三维鲁棒卡尔曼滤波算法(MCC-KF),针对传感器观测数据中存在的脉冲型异常噪声问题,通过非线性加权机制提升滤波器的抗干扰能力。代码通过对比传统KF与MCC-KF在含异常值场景下的表现,验证了后者在状态估计鲁棒性方面的显著优…...
【LeetCode】3309. 连接二进制表示可形成的最大数值(递归|回溯|位运算)
LeetCode 3309. 连接二进制表示可形成的最大数值(中等) 题目描述解题思路Java代码 题目描述 题目链接:LeetCode 3309. 连接二进制表示可形成的最大数值(中等) 给你一个长度为 3 的整数数组 nums。 现以某种顺序 连接…...
【学习笔记】erase 删除顺序迭代器后迭代器失效的解决方案
目录 使用 erase 返回值继续迭代使用索引进行遍历 我们知道类似 vector 的顺序迭代器被删除后,迭代器会失效,因为顺序迭代器在内存中是连续存储的,元素删除后,后续元素会前移。 但一些场景中,我们又需要在执行删除操作…...
tauri项目,如何在rust端读取电脑环境变量
如果想在前端通过调用来获取环境变量的值,可以通过标准的依赖: std::env::var(name).ok() 想在前端通过调用来获取,可以写一个command函数: #[tauri::command] pub fn get_env_var(name: String) -> Result<String, Stri…...
全面解析数据库:从基础概念到前沿应用
在数字化时代,数据已成为企业和社会发展的核心资产,而数据库作为存储、管理和处理数据的关键工具,在各个领域发挥着举足轻重的作用。从电商平台的商品信息管理,到社交网络的用户数据存储,再到金融行业的交易记录处理&a…...
Java 与 MySQL 性能优化:MySQL 慢 SQL 诊断与分析方法详解
文章目录 一、开启慢查询日志,定位耗时SQL1.1 查看慢查询日志是否开启1.2 临时开启慢查询日志1.3 永久开启慢查询日志1.4 分析慢查询日志 二、使用EXPLAIN分析SQL执行计划2.1 EXPLAIN的基本使用2.2 EXPLAIN分析案例2.3 根据EXPLAIN结果优化SQL 三、使用SHOW PROFILE…...
