flink的TwoPhaseCommitSinkFunction怎么做才能提供精准一次保证
背景
TwoPhaseCommitSinkFunction是flink中基于二阶段事务提交和检查点机制配合使用实现的精准一次的输出数据汇,但是想要实现精准一次的输出,实际使用中需要注意几个方面,否则不仅仅达不到精准一次输出,反而可能导致数据丢失,连至少一次的语义都不能达到
TwoPhaseCommitSinkFunction注意事项
TwoPhaseCommitSinkFunction是通过在两阶段提交协议实现的事务,大概简化为一下步骤:
1 在收到检查点分隔符的时候,开启事务,并把记录都写到开启的事务中,
2. 开始进行状态的保存时,把检查点id对应的事务结束掉,做好准备提交的准备,并开启下一个事务
public void snapshotState(FunctionSnapshotContext context) throws Exception {// this is like the pre-commit of a 2-phase-commit transaction// we are ready to commit and remember the transactioncheckState(currentTransactionHolder != null,"bug: no transaction object when performing state snapshot");long checkpointId = context.getCheckpointId();LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'",name(),context.getCheckpointId(),currentTransactionHolder);//当前检查点对应的事务做好准备,比如进行stream.flush等,准备好提交事务preCommit(currentTransactionHolder.handle);// 把当前检查点id对应的事务添加到状态中pendingCommitTransactions.put(checkpointId, currentTransactionHolder);LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);currentTransactionHolder = beginTransactionInternal();LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);// 把当前检查点id对应的事务添加到状态中state.clear();state.add(new State<>(this.currentTransactionHolder,new ArrayList<>(pendingCommitTransactions.values()),userContext));}
- 收到检查点完成的通知notify方法,提交第二步中检查点id对应的事务,注意这一步不是每次flink在进行检查点的时候都会通知,这种情况下,某一次的notify方法就需要把前几次的事务一起进行提交了,另外,如果提交某个检查点的事务失败,那么应用会重启,并且在重启后的initSnapshot方法中再次进行事务提交,如果还是失败,这个过程一直持续
public final void notifyCheckpointComplete(long checkpointId) throws Exception {Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator =pendingCommitTransactions.entrySet().iterator();Throwable firstError = null;while (pendingTransactionIterator.hasNext()) {Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();Long pendingTransactionCheckpointId = entry.getKey();TransactionHolder<TXN> pendingTransaction = entry.getValue();if (pendingTransactionCheckpointId > checkpointId) {continue;}LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}",name(),checkpointId,pendingTransaction,pendingTransactionCheckpointId);logWarningIfTimeoutAlmostReached(pendingTransaction);try {// 提交事务commit(pendingTransaction.handle);} catch (Throwable t) {//事务失败时记录异常,后面会把异常抛出导致应用重启if (firstError == null) {firstError = t;}}LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);// 事务成功后移除当前的事务pendingTransactionIterator.remove();}if (firstError != null) {// 事务提交失败会抛出异常,导致job异常中止throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",firstError);}}
总结:
1。事务不能提交失败,如果失败会导致作业失败然后重新提交,如果最终没有成功提交,那么数据会丢失
2。数据库服务端的事务超时时间不能设置太短,不能仅仅大于检查点的间隔大小,原因是上面说的,flink有可能丢失检查点完成后的通知消息,所以服务端的事务超时时间要设置的足够大.
相关文章:
flink的TwoPhaseCommitSinkFunction怎么做才能提供精准一次保证
背景 TwoPhaseCommitSinkFunction是flink中基于二阶段事务提交和检查点机制配合使用实现的精准一次的输出数据汇,但是想要实现精准一次的输出,实际使用中需要注意几个方面,否则不仅仅达不到精准一次输出,反而可能导致数据丢失&am…...
CMake系列讲解(入门篇)1.8 基础命令CMake-set() unset()
基础命令set() unset() 〓〓〓〓〓〓〓〓踏实学CMake总目录〓〓〓〓〓〓〓〓〓〓 8. set() unset() 在CMake中,set用于设置变量的值。这个变量可以为普通变量、Cache或者是环境变量。 如果提供了一个或多个 <value> 参数,则将 <variable> 设…...
【C++ 学习 ㉙】- 详解 C++11 的 constexpr 和 decltype 关键字
目录 一、constexpr 关键字 1.1 - constexpr 修饰普通变量 1.2 - constexpr 修饰函数 1.3 - constexpr 修饰类的构造函数 1.4 - constexpr 和 const 的区别 二、decltype 关键字 2.1 - 推导规则 2.2 - 实际应用 一、constexpr 关键字 constexpr 是 C11 新引入的关键字…...
js获取视频编码
一.背景 有些浏览器不支持某些视频的编码方式导致播放出现问题,这个时候要限制视频上传 二.插件 https://unpkg.com/mediainfo.js0.1.4/dist/mediainfo.min.js 三.完整html代码 <!DOCTYPE html> <html lang"en"> <head><meta ch…...
560. 和为 K 的子数组 --力扣 --JAVA
题目 给你一个整数数组 nums 和一个整数 k ,请你统计并返回 该数组中和为 k 的连续子数组的个数 。 子数组是数组中元素的连续非空序列。 解题思路 数组项累加可以使用双层循环进行遍历;子数组的长度是不确定的,也可能存在1 1 2和1 1 - 1…...
【趣味随笔】农业机器人的种类与发展前景
📢:如果你也对机器人、人工智能感兴趣,看来我们志同道合✨ 📢:不妨浏览一下我的博客主页【https://blog.csdn.net/weixin_51244852】 📢:文章若有幸对你有帮助,可点赞 👍…...
使用CountdownLatch和线程池批量处理http请求,并处理响应数据
背景和问题 背景:最近项目的一个接口数据,需要去请求其他多个服务器的数据,然后统一返回; 问题点:如果遍历所有的服务器地址,然后串行请求就会出现请求时间过长,加入需要请求十个服务器&…...
记录--怎么写一个可以鼠标控制旋转的div?
这里给大家分享我在网上总结出来的一些知识,希望对大家有所帮助 说在前面 鼠标控制元素旋转在现在也是一个很常见的功能,让我们从实现div元素的旋转控制开始来了解元素旋转的具体原理和实现方法吧。 效果展示 体验地址 code.juejin.cn/pen/7290719… 实现…...
JVM第十八讲:调试排错 - Java 问题排查之工具单
调试排错 - Java 问题排查之工具单 程序员想要有更好的发展,排查问题的能力一定得加强。举个例子:cpu100% 怎么排查,线上接口逐渐变慢了该怎么排查?慢查询该如何治理?你的思路是啥?本文是JVM第十八讲&#…...
JAVA基础-正则表达式(12)
目录 Java 正则表达式正则表达式实例正则表达式语法 Matcher 类的方法索引方法查找方法替换方法start 和 end 方法 Java 正则表达式 正则表达式定义了字符串的模式。 正则表达式可以用来搜索、编辑或处理文本。 正则表达式并不仅限于某一种语言,但是在每种语言中有细…...
[论文笔记]GPT-1
引言 今天带来论文Improving Language Understanding by Generative Pre-Training的笔记,它的中文题目为:通过生成式预训练改进语言理解。其实就是GPT的论文。 自然语言理解可以应用于大量NLP任务上,比如文本蕴含、问答、语义相似和文档分类。虽然无标签文本语料是丰富的,…...
【3D 图像分割】基于 Pytorch 的 VNet 3D 图像分割1(综述篇)
在上一个关于3D 目标的任务,是基于普通CNN网络的3D分类任务。在这个任务中,分类数据采用的是CT结节的LIDC-IDRI数据集,其中对结节的良恶性、毛刺、分叶征等等特征进行了各自的等级分类。感兴趣的可以直接点击下方的链接,直达学习&…...
css之Flex弹性布局
文章目录 🐕前言:🏨定义flex容器 display:flex🏨在flex容器中子组件进行排列🪂行排列 flex-direction: row🪂将行排列进行翻转排列 flex-direction: row-reverse🏅按列排列 flex-direction: col…...
web.xml配置详解
在Java Web应用程序中,web.xml是一个XML配置文件,用于定义和配置Servlet、过滤器、监听器和其他Web应用程序组件的行为和属性。web.xml文件通常位于Web应用程序的WEB-INF目录下,用于描述Web应用程序的部署信息和配置。以下是一些web.xml配置的…...
关于我学习Go语言在CSDN分享的心得体会
最近我一直在学习Go语言,并通过CSDN平台分享我的学习心得和体会。在这篇博客中,我将与大家分享我在学习Go语言过程中的经验和收获。希望通过这篇博客能够帮助其他Go语言初学者更好地掌握这门语言,并与广大Go语言爱好者进行交流和互动。 选择…...
Java类的Builder应用以及使用@Data和@Builder高效应用Builder
⭐Java Builder模式:是Java设计模式之一,它属于对象创建型模式,是将一个复杂对象的构建与它的表示分离,使得同样的构建过程可以创建不同的表示。 结论一:使用lombok的Data和Builder注解构建Java类的Builder简洁高效&am…...
【Qt控件之QTabWidget】介绍及使用
描述 QTabWidget类提供了一个带有选项卡的小部件堆栈。 选项卡小部件提供了一个选项卡栏(参见QTabBar)和一个“页面区域”,用于显示与每个选项卡相关联的页面。默认情况下,选项卡栏显示在页面区域的上方,但可以使用…...
Linux实战——网络连接模式的三种模式
Linux可以分为三种网络模式: 桥接模式 (vmnet0) 仅主机模式 (vmnet1) NAT模式 (vmnet8) 当我们下载了vmware之后,在电脑会出现两个虚拟网卡,VMware Network Adapter VMnet1、VMware Network Adapter VMnet8。 可以通过查找 控…...
嵌入式实时操作系统的设计与开发(任意大小的内存管理)
任意大小的内存管理是根据用户需要为其分配内存,即用户需要多大内存就通过acoral_malloc2()为之分配多大内存,同时每块分配出去的内存前面都有一个控制块,控制块里记录了该块内存的大小。 同时未分配出去的内存也有一个控制块,寻…...
文件读取结束的判定
大家好啊,我们今天来补充文件操作的读取结束的判定。 被错误使用的feof 牢记:在文件读取过程中,不能用feof函数的返回值直接用来判断文件的是否结束而是应用于当文件读取结束的时候,判断是读取失败结束,还是遇到文件尾…...
Vue记事本应用实现教程
文章目录 1. 项目介绍2. 开发环境准备3. 设计应用界面4. 创建Vue实例和数据模型5. 实现记事本功能5.1 添加新记事项5.2 删除记事项5.3 清空所有记事 6. 添加样式7. 功能扩展:显示创建时间8. 功能扩展:记事项搜索9. 完整代码10. Vue知识点解析10.1 数据绑…...
【根据当天日期输出明天的日期(需对闰年做判定)。】2022-5-15
缘由根据当天日期输出明天的日期(需对闰年做判定)。日期类型结构体如下: struct data{ int year; int month; int day;};-编程语言-CSDN问答 struct mdata{ int year; int month; int day; }mdata; int 天数(int year, int month) {switch (month){case 1: case 3:…...
【力扣数据库知识手册笔记】索引
索引 索引的优缺点 优点1. 通过创建唯一性索引,可以保证数据库表中每一行数据的唯一性。2. 可以加快数据的检索速度(创建索引的主要原因)。3. 可以加速表和表之间的连接,实现数据的参考完整性。4. 可以在查询过程中,…...
2024年赣州旅游投资集团社会招聘笔试真
2024年赣州旅游投资集团社会招聘笔试真 题 ( 满 分 1 0 0 分 时 间 1 2 0 分 钟 ) 一、单选题(每题只有一个正确答案,答错、不答或多答均不得分) 1.纪要的特点不包括()。 A.概括重点 B.指导传达 C. 客观纪实 D.有言必录 【答案】: D 2.1864年,()预言了电磁波的存在,并指出…...
vue3 定时器-定义全局方法 vue+ts
1.创建ts文件 路径:src/utils/timer.ts 完整代码: import { onUnmounted } from vuetype TimerCallback (...args: any[]) > voidexport function useGlobalTimer() {const timers: Map<number, NodeJS.Timeout> new Map()// 创建定时器con…...
服务器--宝塔命令
一、宝塔面板安装命令 ⚠️ 必须使用 root 用户 或 sudo 权限执行! sudo su - 1. CentOS 系统: yum install -y wget && wget -O install.sh http://download.bt.cn/install/install_6.0.sh && sh install.sh2. Ubuntu / Debian 系统…...
Java毕业设计:WML信息查询与后端信息发布系统开发
JAVAWML信息查询与后端信息发布系统实现 一、系统概述 本系统基于Java和WML(无线标记语言)技术开发,实现了移动设备上的信息查询与后端信息发布功能。系统采用B/S架构,服务器端使用Java Servlet处理请求,数据库采用MySQL存储信息࿰…...
AI+无人机如何守护濒危物种?YOLOv8实现95%精准识别
【导读】 野生动物监测在理解和保护生态系统中发挥着至关重要的作用。然而,传统的野生动物观察方法往往耗时耗力、成本高昂且范围有限。无人机的出现为野生动物监测提供了有前景的替代方案,能够实现大范围覆盖并远程采集数据。尽管具备这些优势…...
通过 Ansible 在 Windows 2022 上安装 IIS Web 服务器
拓扑结构 这是一个用于通过 Ansible 部署 IIS Web 服务器的实验室拓扑。 前提条件: 在被管理的节点上安装WinRm 准备一张自签名的证书 开放防火墙入站tcp 5985 5986端口 准备自签名证书 PS C:\Users\azureuser> $cert New-SelfSignedCertificate -DnsName &…...
MySQL的pymysql操作
本章是MySQL的最后一章,MySQL到此完结,下一站Hadoop!!! 这章很简单,完整代码在最后,详细讲解之前python课程里面也有,感兴趣的可以往前找一下 一、查询操作 我们需要打开pycharm …...
