Flink的基于两阶段提交协议的事务数据汇实现
背景
在flink中可以通过使用事务性数据汇实现精准一次的保证,本文基于Kakfa的事务处理来看一下在Flink 内部如何实现基于两阶段提交协议的事务性数据汇.
flink kafka事务性数据汇的实现
1。首先在开始进行快照的时候也就是收到checkpoint通知的时候,在snapshot方法中会开启一个新的事务,代码如下:
public void snapshotState(FunctionSnapshotContext context) throws Exception {// this is like the pre-commit of a 2-phase-commit transaction// we are ready to commit and remember the transactioncheckState(currentTransactionHolder != null,"bug: no transaction object when performing state snapshot");long checkpointId = context.getCheckpointId();LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'",name(),context.getCheckpointId(),currentTransactionHolder);preCommit(currentTransactionHolder.handle);// 调用kafkaProducer.flush();清理上一个事务的状态(注意不是提交),只是确保前一个事务的所有资源清理完毕pendingCommitTransactions.put(checkpointId, currentTransactionHolder);LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);
// 调用producer.beginTransaction();方法开启一个新的kafka事务currentTransactionHolder = beginTransactionInternal();LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);state.clear();state.add(new State<>(this.currentTransactionHolder,new ArrayList<>(pendingCommitTransactions.values()),userContext));}
2.其次在JobManager通知检查点完成的通知方法,也就是notifyCheckpointComplete方法中提交事务
Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator =pendingCommitTransactions.entrySet().iterator();Throwable firstError = null;while (pendingTransactionIterator.hasNext()) {Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();Long pendingTransactionCheckpointId = entry.getKey();TransactionHolder<TXN> pendingTransaction = entry.getValue();if (pendingTransactionCheckpointId > checkpointId) {continue;}LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}",name(),checkpointId,pendingTransaction,pendingTransactionCheckpointId);logWarningIfTimeoutAlmostReached(pendingTransaction);try {//调用producer.commitTransaction()方法提交事务commit(pendingTransaction.handle);} catch (Throwable t) {if (firstError == null) {firstError = t;}}LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);pendingTransactionIterator.remove();}if (firstError != null) {throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",firstError);}
至此,一个两阶段提交的flink事务性数据汇完成了,这个事务性数据汇可以构成端到端一致性的一部分
相关文章:
Flink的基于两阶段提交协议的事务数据汇实现
背景 在flink中可以通过使用事务性数据汇实现精准一次的保证,本文基于Kakfa的事务处理来看一下在Flink 内部如何实现基于两阶段提交协议的事务性数据汇. flink kafka事务性数据汇的实现 1。首先在开始进行快照的时候也就是收到checkpoint通知的时候,在…...
树模型(三)决策树
决策树是什么?决策树(decision tree)是一种基本的分类与回归方法。 长方形代表判断模块 (decision block),椭圆形成代表终止模块(terminating block),表示已经得出结论,可以终止运行。从判断模块引出的左右箭头称作为分支(branch)…...
vueday01——使用属性绑定+ref属性定位获取id
1.属性绑定(Attribute 绑定) 第一种写法 <div v-bind:id"refValue"> content </div> 第二种写法(省略掉v-bind) <div :id"refValue"> content </div> 2.代码展示 <template…...
LeetCode 260. 只出现一次的数字 III:异或
【LetMeFly】260.只出现一次的数字 III 力扣题目链接:https://leetcode.cn/problems/single-number-iii/ 给你一个整数数组 nums,其中恰好有两个元素只出现一次,其余所有元素均出现两次。 找出只出现一次的那两个元素。你可以按 任意顺序 返…...
使用PyTorch解决多分类问题:构建、训练和评估深度学习模型
💗💗💗欢迎来到我的博客,你将找到有关如何使用技术解决问题的文章,也会找到某个技术的学习路线。无论你是何种职业,我都希望我的博客对你有所帮助。最后不要忘记订阅我的博客以获取最新文章,也欢…...
基于nodejs+vue网课学习平台
各功能简要描述如下: 1个人信息管理:包括对学生用户、老师和管理员的信息进行录入、修改,以及老师信息的审核等 2在库课程查询:用于学生用户查询相关课程的功能 3在库老师查询:用于学生用户查询相关老师教学的所有课程的功能。 4在库学校查询:用于学生用户查询相关学…...
读书笔记:Effective C++ 2.0 版,条款13(初始化顺序==声明顺序)、条款14(基类有虚析构)
条款13: 初始化列表中成员列出的顺序和它们在类中声明的顺序相同 类成员是按照它们在类里被声明的顺序进行初始化的,和它们在成员初始化列表中列出的顺序没一点关系。 根本原因可能是考虑到内存的分布,按照定义顺序进行排列。 另外,初始化列表…...
flutter开发实战-下拉刷新与上拉加载更多实现
flutter开发实战-下拉刷新与上拉加载更多实现 在开发中经常遇到列表需要下拉刷新与上拉加载更多,这里使用EasyRefresh,版本是3.3.21 一、什么是EasyRefresh EasyRefresh可以在Flutter应用程序上轻松实现下拉刷新和上拉加载。它几乎支持所有Flutter Sc…...
旧手机热点机改造成服务器方案
如果你也跟我一样有这种想法, 那真的太酷了!!! ok,前提是得有root,不然体验大打折扣 目录 目录 1.做一个能爬墙能走百度直连的热点机(做热点机用) 2.做emby视频服务器 3.做文件服务, 存取文件 4.装青龙面板,跑一些定时任务 5.做远程摄像头监控 6.做web服务器 7.内网穿…...
网工实验笔记:策略路由PBR的应用场景
一、概述 PBR(Policy-Based Routing,策略路由):PBR使得网络设备不仅能够基于报文的目的IP地址进行数据转发,更能基于其他元素进行数据转发,例如源IP地址、源MAC地址、目的MAC地址、源端口号、目的端口号、…...
webrtc快速入门——使用 WebRTC 拍摄静止的照片
文章目录 使用 getUserMedia() 拍摄静态照片HTML 标记JavaScript 代码初始化startup() 函数获取元素引用获取流媒体 监听视频开始播放处理按钮上的点击包装 startup() 方法 清理照片框从流中捕获帧 例子代码HTML代码CSS代码JavaScript代码 过滤器使用特定设备 使用 getUserMedi…...
预约按摩app软件开发定制足浴SPA上们服务小程序
同城按摩小程序是一种基于地理位置服务的小程序,它可以帮助用户快速找到附近的按摩师,并提供在线预约、评价、支付等功能。用户可以通过手机或者其他移动设备访问同城按摩小程序,实现足不出户就能预约到专业的按摩服务。 一、同城按摩小程序的…...
jenkins出错与恢复
如果你的jenkins出现了如下图所示问题(比如不能下载插件,无法保存任务等),这个时候就需要重新安装了。 一、卸载干净jenknis 要彻底卸载 Jenkins,您可以按照以下步骤进行操作: 1、停止 Jenkins 服务&…...
ssh免密登录的原理RSA非对称加密的理解
RSA非对称加密,是采用公钥加密私钥解密的原则。 举个例子SSH的免密登录 SSH免密登录是通过使用公钥加密技术实现的。以下是SSH免密登录的原理: 1. 生成密钥对:首先,在客户端上生成一对密钥,包括一个私钥和一个公钥。私…...
【监督学习】基于合取子句进化算法(CCEA)和析取范式进化算法(DNFEA)解决分类问题(Matlab代码实现)
💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…...
力扣每日一题41:缺失的第一个正数
题目描述: 给你一个未排序的整数数组 nums ,请你找出其中没有出现的最小的正整数。 请你实现时间复杂度为 O(n) 并且只使用常数级别额外空间的解决方案。 示例 1: 输入:nums [1,2,0] 输出:3示例 2: 输…...
OpenCV与mediapipe实践
1. 安装前准备 开发环境:vscode venv 设置vscode, 建立项目,如: t1/src, 用vscode打开,新建终端Terminal,这时可能会有错误产生,解决办法: 运行命令:Set-ExecutionPolicy -ExecutionPolicy …...
【css拾遗】粘性布局实现有滚动条的情况下,按钮固定在页面底部展示
效果: 滚动条滚动过程中,按钮的位置位于手机的底部 滚动条滚到底部时,按钮的位置正常 这个position:sticky真的好用,我原先的想法是利用滚动条滚动事件去控制,没想到css就可以解决 <template><view class…...
git 创建并配置 GitHub 连接密钥
前记: git svn sourcetree gitee github gitlab gitblit gitbucket gitolite gogs 版本控制 | 仓库管理 ---- 系列工程笔记. Platform:Windows 10 Git version:git version 2.32.0.windows.1 Function: git 创建并配置 GitHub…...
使用Premiere、PhotoShop和Audition做视频特效
今天接到一个做视频的任务,给一个精忠报国的视频,要求: ①去掉人声,就是将唱歌的人声去掉,只留下伴奏; ②截图视频中的横幅,做一个展开的效果,类似卷纸慢慢展开;…...
XML Group端口详解
在XML数据映射过程中,经常需要对数据进行分组聚合操作。例如,当处理包含多个物料明细的XML文件时,可能需要将相同物料号的明细归为一组,或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码,增加了开…...
云原生核心技术 (7/12): K8s 核心概念白话解读(上):Pod 和 Deployment 究竟是什么?
大家好,欢迎来到《云原生核心技术》系列的第七篇! 在上一篇,我们成功地使用 Minikube 或 kind 在自己的电脑上搭建起了一个迷你但功能完备的 Kubernetes 集群。现在,我们就像一个拥有了一块崭新数字土地的农场主,是时…...
深入剖析AI大模型:大模型时代的 Prompt 工程全解析
今天聊的内容,我认为是AI开发里面非常重要的内容。它在AI开发里无处不在,当你对 AI 助手说 "用李白的风格写一首关于人工智能的诗",或者让翻译模型 "将这段合同翻译成商务日语" 时,输入的这句话就是 Prompt。…...
高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数
高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数 在软件开发中,单例模式(Singleton Pattern)是一种常见的设计模式,确保一个类仅有一个实例,并提供一个全局访问点。在多线程环境下,实现单例模式时需要注意线程安全问题,以防止多个线程同时创建实例,导致…...
ubuntu22.04 安装docker 和docker-compose
首先你要确保没有docker环境或者使用命令删掉docker sudo apt-get remove docker docker-engine docker.io containerd runc安装docker 更新软件环境 sudo apt update sudo apt upgrade下载docker依赖和GPG 密钥 # 依赖 apt-get install ca-certificates curl gnupg lsb-rel…...
DiscuzX3.5发帖json api
参考文章:PHP实现独立Discuz站外发帖(直连操作数据库)_discuz 发帖api-CSDN博客 简单改造了一下,适配我自己的需求 有一个站点存在多个采集站,我想通过主站拿标题,采集站拿内容 使用到的sql如下 CREATE TABLE pre_forum_post_…...
多元隐函数 偏导公式
我们来推导隐函数 z z ( x , y ) z z(x, y) zz(x,y) 的偏导公式,给定一个隐函数关系: F ( x , y , z ( x , y ) ) 0 F(x, y, z(x, y)) 0 F(x,y,z(x,y))0 🧠 目标: 求 ∂ z ∂ x \frac{\partial z}{\partial x} ∂x∂z、 …...
客户案例 | 短视频点播企业海外视频加速与成本优化:MediaPackage+Cloudfront 技术重构实践
01技术背景与业务挑战 某短视频点播企业深耕国内用户市场,但其后台应用系统部署于东南亚印尼 IDC 机房。 随着业务规模扩大,传统架构已较难满足当前企业发展的需求,企业面临着三重挑战: ① 业务:国内用户访问海外服…...
CVE-2023-25194源码分析与漏洞复现(Kafka JNDI注入)
漏洞概述 漏洞名称:Apache Kafka Connect JNDI注入导致的远程代码执行漏洞 CVE编号:CVE-2023-25194 CVSS评分:8.8 影响版本:Apache Kafka 2.3.0 - 3.3.2 修复版本:≥ 3.4.0 漏洞类型:反序列化导致的远程代…...
[QMT量化交易小白入门]-六十二、ETF轮动中简单的评分算法如何获取历史年化收益32.7%
本专栏主要是介绍QMT的基础用法,常见函数,写策略的方法,也会分享一些量化交易的思路,大概会写100篇左右。 QMT的相关资料较少,在使用过程中不断的摸索,遇到了一些问题,记录下来和大家一起沟通,共同进步。 文章目录 相关阅读1. 策略概述2. 趋势评分模块3 代码解析4 木头…...
