当前位置: 首页 > news >正文

flink的TwoPhaseCommitSinkFunction怎么做才能提供精准一次保证

背景

TwoPhaseCommitSinkFunction是flink中基于二阶段事务提交和检查点机制配合使用实现的精准一次的输出数据汇,但是想要实现精准一次的输出,实际使用中需要注意几个方面,否则不仅仅达不到精准一次输出,反而可能导致数据丢失,连至少一次的语义都不能达到

TwoPhaseCommitSinkFunction注意事项

TwoPhaseCommitSinkFunction是通过在两阶段提交协议实现的事务,大概简化为一下步骤:
1 在收到检查点分隔符的时候,开启事务,并把记录都写到开启的事务中,
2. 开始进行状态的保存时,把检查点id对应的事务结束掉,做好准备提交的准备,并开启下一个事务

public void snapshotState(FunctionSnapshotContext context) throws Exception {// this is like the pre-commit of a 2-phase-commit transaction// we are ready to commit and remember the transactioncheckState(currentTransactionHolder != null,"bug: no transaction object when performing state snapshot");long checkpointId = context.getCheckpointId();LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'",name(),context.getCheckpointId(),currentTransactionHolder);//当前检查点对应的事务做好准备,比如进行stream.flush等,准备好提交事务preCommit(currentTransactionHolder.handle);// 把当前检查点id对应的事务添加到状态中pendingCommitTransactions.put(checkpointId, currentTransactionHolder);LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);currentTransactionHolder = beginTransactionInternal();LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);// 把当前检查点id对应的事务添加到状态中state.clear();state.add(new State<>(this.currentTransactionHolder,new ArrayList<>(pendingCommitTransactions.values()),userContext));}
  1. 收到检查点完成的通知notify方法,提交第二步中检查点id对应的事务,注意这一步不是每次flink在进行检查点的时候都会通知,这种情况下,某一次的notify方法就需要把前几次的事务一起进行提交了,另外,如果提交某个检查点的事务失败,那么应用会重启,并且在重启后的initSnapshot方法中再次进行事务提交,如果还是失败,这个过程一直持续
    public final void notifyCheckpointComplete(long checkpointId) throws Exception {Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator =pendingCommitTransactions.entrySet().iterator();Throwable firstError = null;while (pendingTransactionIterator.hasNext()) {Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();Long pendingTransactionCheckpointId = entry.getKey();TransactionHolder<TXN> pendingTransaction = entry.getValue();if (pendingTransactionCheckpointId > checkpointId) {continue;}LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}",name(),checkpointId,pendingTransaction,pendingTransactionCheckpointId);logWarningIfTimeoutAlmostReached(pendingTransaction);try {// 提交事务commit(pendingTransaction.handle);} catch (Throwable t) {//事务失败时记录异常,后面会把异常抛出导致应用重启if (firstError == null) {firstError = t;}}LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);// 事务成功后移除当前的事务pendingTransactionIterator.remove();}if (firstError != null) {// 事务提交失败会抛出异常,导致job异常中止throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",firstError);}}

总结:

1。事务不能提交失败,如果失败会导致作业失败然后重新提交,如果最终没有成功提交,那么数据会丢失
2。数据库服务端的事务超时时间不能设置太短,不能仅仅大于检查点的间隔大小,原因是上面说的,flink有可能丢失检查点完成后的通知消息,所以服务端的事务超时时间要设置的足够大.

相关文章:

flink的TwoPhaseCommitSinkFunction怎么做才能提供精准一次保证

背景 TwoPhaseCommitSinkFunction是flink中基于二阶段事务提交和检查点机制配合使用实现的精准一次的输出数据汇&#xff0c;但是想要实现精准一次的输出&#xff0c;实际使用中需要注意几个方面&#xff0c;否则不仅仅达不到精准一次输出&#xff0c;反而可能导致数据丢失&am…...

CMake系列讲解(入门篇)1.8 基础命令CMake-set() unset()

基础命令set() unset() 〓〓〓〓〓〓〓〓踏实学CMake总目录〓〓〓〓〓〓〓〓〓〓 8. set() unset() 在CMake中&#xff0c;set用于设置变量的值。这个变量可以为普通变量、Cache或者是环境变量。 如果提供了一个或多个 <value> 参数&#xff0c;则将 <variable> 设…...

【C++ 学习 ㉙】- 详解 C++11 的 constexpr 和 decltype 关键字

目录 一、constexpr 关键字 1.1 - constexpr 修饰普通变量 1.2 - constexpr 修饰函数 1.3 - constexpr 修饰类的构造函数 1.4 - constexpr 和 const 的区别 二、decltype 关键字 2.1 - 推导规则 2.2 - 实际应用 一、constexpr 关键字 constexpr 是 C11 新引入的关键字…...

js获取视频编码

一.背景 有些浏览器不支持某些视频的编码方式导致播放出现问题&#xff0c;这个时候要限制视频上传 二.插件 https://unpkg.com/mediainfo.js0.1.4/dist/mediainfo.min.js 三.完整html代码 <!DOCTYPE html> <html lang"en"> <head><meta ch…...

560. 和为 K 的子数组 --力扣 --JAVA

题目 给你一个整数数组 nums 和一个整数 k &#xff0c;请你统计并返回 该数组中和为 k 的连续子数组的个数 。 子数组是数组中元素的连续非空序列。 解题思路 数组项累加可以使用双层循环进行遍历&#xff1b;子数组的长度是不确定的&#xff0c;也可能存在1 1 2和1 1 - 1…...

【趣味随笔】农业机器人的种类与发展前景

&#x1f4e2;&#xff1a;如果你也对机器人、人工智能感兴趣&#xff0c;看来我们志同道合✨ &#x1f4e2;&#xff1a;不妨浏览一下我的博客主页【https://blog.csdn.net/weixin_51244852】 &#x1f4e2;&#xff1a;文章若有幸对你有帮助&#xff0c;可点赞 &#x1f44d;…...

使用CountdownLatch和线程池批量处理http请求,并处理响应数据

背景和问题 ​ 背景&#xff1a;最近项目的一个接口数据&#xff0c;需要去请求其他多个服务器的数据&#xff0c;然后统一返回&#xff1b; 问题点&#xff1a;如果遍历所有的服务器地址&#xff0c;然后串行请求就会出现请求时间过长&#xff0c;加入需要请求十个服务器&…...

记录--怎么写一个可以鼠标控制旋转的div?

这里给大家分享我在网上总结出来的一些知识&#xff0c;希望对大家有所帮助 说在前面 鼠标控制元素旋转在现在也是一个很常见的功能&#xff0c;让我们从实现div元素的旋转控制开始来了解元素旋转的具体原理和实现方法吧。 效果展示 体验地址 code.juejin.cn/pen/7290719… 实现…...

JVM第十八讲:调试排错 - Java 问题排查之工具单

调试排错 - Java 问题排查之工具单 程序员想要有更好的发展&#xff0c;排查问题的能力一定得加强。举个例子&#xff1a;cpu100% 怎么排查&#xff0c;线上接口逐渐变慢了该怎么排查&#xff1f;慢查询该如何治理&#xff1f;你的思路是啥&#xff1f;本文是JVM第十八讲&#…...

JAVA基础-正则表达式(12)

目录 Java 正则表达式正则表达式实例正则表达式语法 Matcher 类的方法索引方法查找方法替换方法start 和 end 方法 Java 正则表达式 正则表达式定义了字符串的模式。 正则表达式可以用来搜索、编辑或处理文本。 正则表达式并不仅限于某一种语言&#xff0c;但是在每种语言中有细…...

[论文笔记]GPT-1

引言 今天带来论文Improving Language Understanding by Generative Pre-Training的笔记,它的中文题目为:通过生成式预训练改进语言理解。其实就是GPT的论文。 自然语言理解可以应用于大量NLP任务上,比如文本蕴含、问答、语义相似和文档分类。虽然无标签文本语料是丰富的,…...

【3D 图像分割】基于 Pytorch 的 VNet 3D 图像分割1(综述篇)

在上一个关于3D 目标的任务&#xff0c;是基于普通CNN网络的3D分类任务。在这个任务中&#xff0c;分类数据采用的是CT结节的LIDC-IDRI数据集&#xff0c;其中对结节的良恶性、毛刺、分叶征等等特征进行了各自的等级分类。感兴趣的可以直接点击下方的链接&#xff0c;直达学习&…...

css之Flex弹性布局

文章目录 &#x1f415;前言&#xff1a;&#x1f3e8;定义flex容器 display:flex&#x1f3e8;在flex容器中子组件进行排列&#x1fa82;行排列 flex-direction: row&#x1fa82;将行排列进行翻转排列 flex-direction: row-reverse&#x1f3c5;按列排列 flex-direction: col…...

web.xml配置详解

在Java Web应用程序中&#xff0c;web.xml是一个XML配置文件&#xff0c;用于定义和配置Servlet、过滤器、监听器和其他Web应用程序组件的行为和属性。web.xml文件通常位于Web应用程序的WEB-INF目录下&#xff0c;用于描述Web应用程序的部署信息和配置。以下是一些web.xml配置的…...

关于我学习Go语言在CSDN分享的心得体会

最近我一直在学习Go语言&#xff0c;并通过CSDN平台分享我的学习心得和体会。在这篇博客中&#xff0c;我将与大家分享我在学习Go语言过程中的经验和收获。希望通过这篇博客能够帮助其他Go语言初学者更好地掌握这门语言&#xff0c;并与广大Go语言爱好者进行交流和互动。 选择…...

Java类的Builder应用以及使用@Data和@Builder高效应用Builder

⭐Java Builder模式&#xff1a;是Java设计模式之一&#xff0c;它属于对象创建型模式&#xff0c;是将一个复杂对象的构建与它的表示分离&#xff0c;使得同样的构建过程可以创建不同的表示。 结论一&#xff1a;使用lombok的Data和Builder注解构建Java类的Builder简洁高效&am…...

【Qt控件之QTabWidget】介绍及使用

描述 QTabWidget类提供了一个带有选项卡的小部件堆栈。 选项卡小部件提供了一个选项卡栏&#xff08;参见QTabBar&#xff09;和一个“页面区域”&#xff0c;用于显示与每个选项卡相关联的页面。默认情况下&#xff0c;选项卡栏显示在页面区域的上方&#xff0c;但可以使用…...

Linux实战——网络连接模式的三种模式

Linux可以分为三种网络模式&#xff1a; 桥接模式 &#xff08;vmnet0) 仅主机模式 (vmnet1) NAT模式 (vmnet8) 当我们下载了vmware之后&#xff0c;在电脑会出现两个虚拟网卡&#xff0c;VMware Network Adapter VMnet1、VMware Network Adapter VMnet8。 可以通过查找 控…...

嵌入式实时操作系统的设计与开发(任意大小的内存管理)

任意大小的内存管理是根据用户需要为其分配内存&#xff0c;即用户需要多大内存就通过acoral_malloc2()为之分配多大内存&#xff0c;同时每块分配出去的内存前面都有一个控制块&#xff0c;控制块里记录了该块内存的大小。 同时未分配出去的内存也有一个控制块&#xff0c;寻…...

文件读取结束的判定

大家好啊&#xff0c;我们今天来补充文件操作的读取结束的判定。 被错误使用的feof 牢记&#xff1a;在文件读取过程中&#xff0c;不能用feof函数的返回值直接用来判断文件的是否结束而是应用于当文件读取结束的时候&#xff0c;判断是读取失败结束&#xff0c;还是遇到文件尾…...

AC68U刷梅林384/386版本后不能 降级回380,升降级解决办法

前些时间手贱更新了路由器的固件&#xff0c;384.18版本。结果发现了一堆问题&#xff0c;比如客户端列表加载不出来&#xff0c;软件中心打不开等等。想着再刷一下新的固件&#xff0c;结果死活刷不上去。最后翻阅了大量前辈的帖子找到了相关的处理办法。现在路由器中开启SSH&…...

源码级拆解:如何搭建高并发「数字药店+医保购药」一体化平台?

在全民“掌上看病、线上购药”已成常态的今天&#xff0c;数字药店平台正在以惊人的速度扩张。而将数字药店与医保系统打通&#xff0c;实现线上医保购药&#xff0c;更是未来互联网医疗的关键拼图。 那么&#xff0c;如何从技术底层搭建一个 支持高并发、可扩展、安全合规的数…...

Redis线程安全深度解析:单线程模型的并发智慧

Redis线程安全深度解析&#xff1a;单线程模型的并发智慧 引言&#xff1a;Redis的线程模型迷思 “Redis是单线程的”——这个广为流传的说法既正确又不完全正确。Redis的线程安全机制实际上是一套精心设计的并发控制体系&#xff0c;它既保持了单线程的简单性&#xff0c;又…...

IT学习方法与资料分享

一、编程语言与核心技能&#xff1a;构建技术地基 1. 入门首选&#xff1a;Python 与 JavaScript Python&#xff1a;作为 AI 与数据科学的基石&#xff0c;可快速构建数据分析与自动化脚本开发能力。 JavaScript&#xff1a;Web 开发的核心语言&#xff0c;可系统掌握 React/V…...

配置git命令缩写

以下是 Git 命令缩写的配置方法及常用方案&#xff0c;适用于 Linux/macOS/Windows 系统&#xff1a; &#x1f527; 一、配置方法 1. 命令行设置&#xff08;推荐&#xff09; # 基础命令缩写 git config --global alias.st status git config --global alias.co che…...

OpenCV 键盘响应来切换图像

一、知识点 1、int waitKey(int delay 0); (1)、等待按键。 等待指定的毫秒数&#xff0c;返回按键的ASCII码。 (2)、返回值: int型&#xff0c;表示按键ASCII码。 若没有按键&#xff0c;指定时间过去&#xff0c;返回-1。 (3)、参数delay: 等待时间&#xff0c;单位毫…...

Appium+python自动化(十二)- Android UIAutomator

Android团队在4.1版本&#xff08;API 16&#xff09;中推出了一款全新的UI自动化测试工具UiAutomator&#xff0c;用来帮助开发人员更有效率的完成App的Debug工作&#xff0c;同时对于测试人员也是一大福音&#xff0c;为什么这么说呢&#xff1f; UiAutomator提供了以下两种…...

scss(sass)中 的使用说明

在 SCSS&#xff08;Sass&#xff09;中&#xff0c;& 符号是一个父选择器引用&#xff0c;它代表当前嵌套规则的外层选择器。主要用途如下&#xff1a; 1. 连接伪类/伪元素 scss 复制 下载 .button {background: blue;&:hover { // 相当于 .button:hoverbackgrou…...

Python 函数全攻略:函数进阶(生成器、闭包、内置函数、装饰器、推导式)

一、默认参数中的易错点 问题: 当函数的默认参数是可变类型(如 list, dict)时,存在“坑”。 现象: def func(a2=[]): # a2 默认是一个空列表a2.append(2)print(a2)func() # 第一次调用,a2 默认为 [],输出 [2] func([]) # 传入新列表,输出 [2] func([1]) # 传入带元素的…...

Linux命令基础(2)

su和exit命令 可以通过su命令切换到root账户 语法&#xff1a;su [-] 用户名 -符号是可选的&#xff0c;表示是否在切换用户后加载环境变量&#xff0c;建议带上 参数&#xff1a;用户名&#xff0c;表示要切换的用户&#xff0c;用户名可以省略&#xff0c;省略表示切换到ro…...