FlinkPipelineComposer 详解
FlinkPipelineComposer 详解
原文
背景
在flink-cdc 3.0中引入了pipeline机制,提供了除Datastream api/flink sql以外的一种方式定义flink 任务
通过提供一个yaml文件,描述source sink transform等主要信息
由FlinkPipelineComposer解析,自动调用DataStream api进行构建
官方样例
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*sink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""pipeline:name: Sync MySQL Database to Dorisparallelism: 2
目前可以通过source配置的源只有mysql 和 values
values是调试用的,所以可以说当前这个功能是专门为“mysql同步数据到各个sink”的场景使用的
目前可以使用的sink有
- doris
- elasticsearch
- kafka
- paimon
- starrocks
- values
FlinkPipelineComposer
我们以mysql -> values来观察 FlinkPipelineComposer 是如何通过读取yaml文件的定义来构建DataStream的
values会将mysql产生的cdc消息打印到stdout上
################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:type: mysqlhostname: x.x.x.xport: 3306username: usernamepassword: passwordtables: test.t1server-id: 5400-5404server-time-zone: UTC+8sink:type: valuesname: values Sinkpipeline:name: Sync Mysql Database to Valuesparallelism: 2
首先来观察一下这个任务提交到flink集群后具体的链路构成

结合官方给出的架构

可以看出,“一个source,一个sink”的yaml定义,最终会生成5个operator
- Souce: Flink CDC Event Source: mysql
- SchemaOperator
- PrePartition
-------------- shuffle --------------
- PostPartion
- Sink Writer: values Sink
Souce: Flink CDC Event Source: mysql
负责
- 创建枚举器
- 创建reader
- 枚举split分发给reader
- reader读取数据生成事件
SchemaOperator
负责和JobMaster上的coodinator沟通,执行schema evolution 相关逻辑,见Flink CDC Schema Evolution 详解
PrePartition
负责
- 广播FlushEvent
- 广播SchemaChangeEvent
- shuffle普通消息到下游
PostPartion
Sink Writer: values Sink
写入下游,values sink当前到实现是打印到stdout
源码解析
接下来分析,FlinkPipelineComposer 读取 yaml 构造DataStream的细节
CliFrontend#main
CliFrontend.java:54
args

createExecutor 创建 executor CliFrontend.java:76
调用CliExecutor#run CliExecutor.java:70
看一下解析得到的pipelineDef

这里已经从yaml文件中解析出了source和sink的配置了
composer.compose 调用compose方法开始使用DataStream api进行构建
FlinkPipelineComposer.java:92 FlinkPipelineComposer#compose
声明了5个translator,其中第一个sourceTranslator会生成DataStream<Event> stream,而其他的translator基于这个stream作为input,调用transform方法,放入对应阶段的operator
DataSourceTranslator sourceTranslator = new DataSourceTranslator();
...
TransformTranslator transformTranslator = new TransformTranslator();
...
SchemaOperatorTranslator schemaOperatorTranslator =...
...
DataSinkTranslator sinkTranslator = new DataSinkTranslator();
...
PartitioningTranslator partitioningTranslator = new PartitioningTranslator();
...
translate的调用顺序如下
DataStream<Event> stream =sourceTranslator.translate(...
stream =transformTranslator.translatePreTransform(...
stream =transformTranslator.translatePostTransform(...
stream =schemaOperatorTranslator.translate(...
stream =partitioningTranslator.translate(...
sinkTranslator.translate(pipelineDef.getSink(), stream, dataSink, schemaOperatorIDGenerator.generate());return new FlinkPipelineExecution(env...)...
逐一说明
- sourceTranslator.translate 通过source名字获取sourceProvider,关联到stream中
- sourceProvider.getSource ->
- MysqlSource ->
- createReader
- createEnumerator
- MysqlSource ->
- stream = transformTranslator.translatePreTransform
if (transforms.isEmpty()) {return input;
}
由于有如上代码,我们的yaml中没有涉及,所以忽略这个transform
- stream = transformTranslator.translatePostTransform
- 同上
- stream = schemaOperatorTranslator.translate
- 插入一个schemaOperator节点,在收到schemaChangeEvent的时候
- 停住当前流
- 上报coodinator
- flush下游数据,让sink消耗完已有数据
- sink 通知coodinator flush完成
- coodinator调用sink注册的MetaApplier完成schema变更,变更完成后通知schemaOperator
- schemaOperator重新放通数据
- stream = partitioningTranslator.translate
- 构建prePartition postPartition节点
- sinkTranslator.translate
- 构建sink节点
- FlinkPipelineExecution 中的 execute 方法调用
env.executeAsync(jobName)
总结
flink-cdc 3.0 提供的pipeline模式,通过定义yaml,自动构建了一条cdc pipeline,避免手动调用datastream api,并且支持schema evolution
构建的主要逻辑集中在 FlinkPipelineComposer
相关文章:
FlinkPipelineComposer 详解
FlinkPipelineComposer 详解 原文 背景 在flink-cdc 3.0中引入了pipeline机制,提供了除Datastream api/flink sql以外的一种方式定义flink 任务 通过提供一个yaml文件,描述source sink transform等主要信息 由FlinkPipelineComposer解析,…...
蓝桥杯-洛谷刷题-day2(C++)
目录 1.小写字母与大写字母的转换 2.使用string(额外开一章持续补充) i.访问字符串最后一位 3.保留N位小数输出 i.C侧 ii.C语言侧 iii.总结 4.高精度相加 i.各种数据类型转字符型 ii.三元运算符 iii.循环条件中的carry 1.小写字母与大写字母的…...
16008.行为树(五)-自定义数据指针在黑板中的传递
文章目录 1.1 背景1.2 xml文件定义1.3 代码实现1.3 执行结果1.1 背景 自定义数据结构指针,通过黑板的形式,在树的节点中进行指针的传递。 1.2 xml文件定义 xhome@ubuntu:~/opt/groot_pro$ cat unit_t1.xml<?xml version="1.0" encoding="UTF-8"?&…...
javascript Vue
DOM对象 什么是DOM DOM(Document Object Model):文档对象模型,就是Javascript将HTML文档的各个组成部分封装为对象,通过修改HTML元素的内容和样式动态改变页面。 如何获取DOM对象 获取DOM中的元素对象(Element对象/标签&…...
《揭秘观察者模式:作用与使用场景全解析》
在软件开发的世界中,设计模式就像是建筑师手中的蓝图,指导着软件系统的构建。其中,观察者模式是一种极为重要且广泛应用的设计模式。今天,我们就来深入探讨一下观察者模式的作用和使用场景。 一、观察者模式是什么? …...
【QT常用技术讲解】优化网络链接不上导致qt、qml界面卡顿的问题
前言 qt、qml项目经常会涉及访问MySQL数据库、网络服务器,并且界面打开时的初始化过程就会涉及到链接Mysql、网络服务器获取数据,如果网络不通,卡个几十秒,会让用户觉得非常的不爽,本文从技术调研的角度讲解解决此类问…...
下划线命名json数组转java对象
/*** 将驼峰式命名的字符串转换为下划线方式* @param camelCase* @return*/ private static String toUnderlineCase(String camelCase) {return StrUtil.toUnderlineCase(camelCase); }/*** 下划线-赋值给-驼峰* @param source 源数据* @param target 目标数据*/ public stati…...
实测运行容器化Nginx服务器
文章目录 前言一、拉取Nginx镜像二、创建挂载目录三、运行容器化Nginx服务器四、访问网页测试 总结 前言 运行容器化Nginx服务器,首先确保正确安装docker,并且已启动运行,具体安装docker方法见笔者前面的博文《OpenEuler 下 Docker 安装、配…...
显示器接口种类 | 附图片
显示器接口类型主要包括VGA、DVI、HDMI、DP和USB Type-C等。 VGA、DVI、HDMI、DP和USB Type-C 1. 观察 VGA接口:15针 DP接口:在DP接口旁,都有一个“D”型的标志。 电脑主机:DP(D) 显示器:VGA(15针) Ref https://cloud.tenc…...
C++初阶——list
一、什么是list list是一个可以在序列的任意位置进行插入和删除的容器,并且可以进行双向迭代。list的底层是一个双向链表,双向链表可以将它们包含的每个元素存储在不同且不相关的存储位置。通过将每个元素与前一个元素的链接和后一个元素的链接关联起来&…...
软件设计师-排序算法
冒泡排序 每一趟冒泡排序,从第0个元素开始,和后面的元素比较,如果大于就交换,否则不变,每次冒泡可以把最大的元素放到最后一个,第一次冒泡的终点是n-1,第二趟的是n-2,直到最后剩下一个元素。时间复杂度O(n…...
即插即用篇 | YOLOv8 引入 代理注意力 AgentAttention
Transformer模型中的注意力模块是其核心组成部分。虽然全局注意力机制具有很强的表达能力,但其高昂的计算成本限制了在各种场景中的应用。本文提出了一种新的注意力范式,称为“代理注意力”(Agent Attention),以在计算效率和表示能力之间取得平衡。代理注意力使用四元组(Q…...
020_Servlet_Mysql学生选课系统(新版)_lwplus87
摘 要 随着在校大学生人数的不断增加,教务系统的数据量也不断的上涨。针对学生选课这一环节,本系统从学生网上自主选课以及课程发布两个大方面进行了设计,基本实现了学生的在线信息查询、选课功能以及教师对课程信息发布的管理等功能&…...
LabVIEW导入并显示CAD DXF文件图形 程序见附件
LabVIEW导入并显示CAD DXF文件图形 程序见附件 LabVIEW导入并显示CAD DXF文件图形 程序见附件 - 北京瀚文网星科技有限公司 LabVIEW广泛应用于自动化、数据采集、图形显示等领域。对于涉及CAD图形的应用,LabVIEW也提供了一些方法来导入和显示CAD DXF文件&#x…...
《云原生安全攻防》-- K8s安全防护思路
从本节课程开始,我们将正式进入防护篇。通过深入理解K8s提供的多种安全机制,从防守者的角度,运用K8s的安全最佳实践来保障K8s集群的安全。 在这个课程中,我们将学习以下内容: K8s安全防护思路:掌握K8s自身提…...
鸿蒙系统的发展及开发者机遇
鸿蒙系统(HarmonyOS)凭借其分布式架构和跨设备协同能力,展现出强大的发展潜力,在智能手机、智能穿戴、车载、家居等行业领域应用日益广泛,已逐渐形成与安卓、iOS 三足鼎立的市场格局。 开发者面临的挑战 1. 技术适应与…...
Java | Leetcode Java题解之第556题下一个更大元素III
题目: 题解: class Solution {public int nextGreaterElement(int n) {int x n, cnt 1;for (; x > 10 && x / 10 % 10 > x % 10; x / 10) {cnt;}x / 10;if (x 0) {return -1;}int targetDigit x % 10;int x2 n, cnt2 0;for (; x2 %…...
OSPF动态路由配置实验:实现高效网络自动化
实验主题:OSPF动态路由协议配置 实验背景 OSPF(Open Shortest Path First)是一种基于链路状态的路由协议,广泛应用于中大型网络中。它采用Dijkstra算法计算最短路径,以确保网络中的路由更新快速、稳定,并能…...
CRM对企业有什么用?如何在实践中有效应用CRM系统?
在现在非常激烈竞争环境中,客户关系管理系统(CRM) 已经成为很多企业的“必备神器”,它不仅帮助企业高效地管理客户信息,还能提高客户满意度,增强客户忠诚度,最终推动销售增长和业务发展。然而&a…...
渗透测试之 -- Linux基础
声明 学习视频来自B站UP主 泷羽sec,如涉及侵泷羽sec权马上删除文章笔记的只是方便各位师傅学习知识,以下网站涉及学习内容,其他的都与本人无关,切莫逾越法律红线,否则后果自负 一、Openssl 1、openssl passwd -1 123 openssl一个开源加密工具包,用于各种解密、加…...
MAA明日方舟助手:一键解放双手的智能游戏伴侣终极指南
MAA明日方舟助手:一键解放双手的智能游戏伴侣终极指南 【免费下载链接】MaaAssistantArknights 《明日方舟》小助手,全日常一键长草!| A one-click tool for the daily tasks of Arknights, supporting all clients. 项目地址: https://git…...
如何三分钟搭建免费音乐聚合平台:MusicFree插件终极配置指南
如何三分钟搭建免费音乐聚合平台:MusicFree插件终极配置指南 【免费下载链接】MusicFreePlugins MusicFree播放插件 项目地址: https://gitcode.com/gh_mirrors/mu/MusicFreePlugins 还在为音乐会员费烦恼吗?想要一个真正免费、无广告的音乐播放体…...
Keil ULINK强制全片擦除与CRC校验实践
1. 问题现象与背景解析当使用Keil开发环境配合ULINK调试器对英飞凌C166系列微控制器进行程序烧录时,部分工程师会遇到一个看似奇怪的现象:明明在代码中设置了全片CRC校验逻辑,但实际运行时却出现校验失败。经过排查发现,ULINK默认…...
[智能体-29]:Chatbox 一款开源、跨平台的「AI 客户端聚合工具」,它本身不提供 AI 模型,而是帮你统一接入 ChatGPT、DeepSeek、Ollama 等几乎所有主流大模
Chatbox 是一款开源、跨平台的「AI 客户端聚合工具」,它本身不提供 AI 模型,而是帮你统一接入 ChatGPT、Claude、Gemini、DeepSeek、Ollama 等几乎所有主流大模型,提供一个隐私优先、功能丰富的统一交互界面。一、核心定位与本质你可以把 Cha…...
避坑指南:在Ubuntu 22.04服务器上部署LibreOffice和JODConverter的完整流程(含中文字体配置)
Ubuntu 22.04服务器部署LibreOffice与JODConverter全流程:从中文字体配置到生产级优化在文档管理系统开发中,文件预览功能一直是刚需。不同于Windows环境的图形化操作,Linux服务器部署面临依赖缺失、字体配置、服务管理等诸多挑战。本文将手把…...
别再被GPG签名卡住了!手把手教你修复Kali老版本apt更新源报错
Kali Linux系统更新源管理进阶指南:从故障修复到高效运维当你成功解决了Kali Linux老版本因GPG签名失效导致的apt更新源报错后,这只是系统维护的第一步。真正的挑战在于如何构建一套可持续的运维策略,避免类似问题反复出现,同时提…...
Vibe Coding工程化:从“感觉编程“到可落地的AI开发范式
一个需要正视的现象 2026年,“Vibe Coding"已经不是一个新鲜词汇。Andrej Karpathy在2025年提出这个概念时,描述的是一种完全依赖AI的编程体验:你描述意图,模型生成代码,你甚至不需要真正"读懂"代码就能…...
去偏机器学习在交通行为因果推断中的应用:从关联分析到因果效应评估
1. 项目概述:当交通研究遇上因果推断在交通工程与城市规划领域,我们常常面临一个核心挑战:如何从海量的观测数据中,剥离出某个特定因素(比如一项新政策、一种交通管控措施)对人们行为的“真实”影响&#x…...
云原生监控体系建设:打造全方位的可观测性平台
云原生监控体系建设:打造全方位的可观测性平台 引言 在云原生时代,监控是保障系统稳定运行的关键。一个完善的监控体系可以帮助我们及时发现问题、定位问题、解决问题。 今天就来分享一下云原生监控体系的建设经验。 监控体系概述 可观测性三支柱 监控体…...
数据科学实践案例与项目管理
数据科学实践案例与项目管理 1. 技术分析 1.1 数据科学项目管理概述 数据科学项目管理是确保项目成功的关键: 项目生命周期问题定义: 明确目标数据收集: 获取数据数据处理: 清洗转换模型开发: 构建模型评估验证: 评估效果部署上线: 生产环境项目管理要素:目标设定进…...
