当前位置: 首页 > news >正文

FlinkPipelineComposer 详解

FlinkPipelineComposer 详解

原文

背景

在flink-cdc 3.0中引入了pipeline机制,提供了除Datastream api/flink sql以外的一种方式定义flink 任务

通过提供一个yaml文件,描述source sink transform等主要信息

由FlinkPipelineComposer解析,自动调用DataStream api进行构建

官方样例

 source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*sink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""pipeline:name: Sync MySQL Database to Dorisparallelism: 2

目前可以通过source配置的源只有mysql 和 values

values是调试用的,所以可以说当前这个功能是专门为“mysql同步数据到各个sink”的场景使用的

目前可以使用的sink有

  1. doris
  2. elasticsearch
  3. kafka
  4. paimon
  5. starrocks
  6. values

FlinkPipelineComposer

我们以mysql -> values来观察 FlinkPipelineComposer 是如何通过读取yaml文件的定义来构建DataStream的

values会将mysql产生的cdc消息打印到stdout上

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:type: mysqlhostname: x.x.x.xport: 3306username: usernamepassword: passwordtables: test.t1server-id: 5400-5404server-time-zone: UTC+8sink:type: valuesname: values Sinkpipeline:name: Sync Mysql Database to Valuesparallelism: 2

首先来观察一下这个任务提交到flink集群后具体的链路构成

在这里插入图片描述

结合官方给出的架构

在这里插入图片描述

可以看出,“一个source,一个sink”的yaml定义,最终会生成5个operator

  1. Souce: Flink CDC Event Source: mysql
  2. SchemaOperator
  3. PrePartition

-------------- shuffle --------------

  1. PostPartion
  2. Sink Writer: values Sink

Souce: Flink CDC Event Source: mysql

负责

  1. 创建枚举器
  2. 创建reader
  3. 枚举split分发给reader
  4. reader读取数据生成事件

SchemaOperator

负责和JobMaster上的coodinator沟通,执行schema evolution 相关逻辑,见Flink CDC Schema Evolution 详解

PrePartition

负责

  1. 广播FlushEvent
  2. 广播SchemaChangeEvent
  3. shuffle普通消息到下游

PostPartion

Sink Writer: values Sink

写入下游,values sink当前到实现是打印到stdout

源码解析

接下来分析,FlinkPipelineComposer 读取 yaml 构造DataStream的细节

CliFrontend#main

CliFrontend.java:54

args

在这里插入图片描述

createExecutor 创建 executor CliFrontend.java:76

调用CliExecutor#run CliExecutor.java:70

看一下解析得到的pipelineDef
在这里插入图片描述

这里已经从yaml文件中解析出了source和sink的配置了

composer.compose 调用compose方法开始使用DataStream api进行构建

FlinkPipelineComposer.java:92 FlinkPipelineComposer#compose

声明了5个translator,其中第一个sourceTranslator会生成DataStream<Event> stream,而其他的translator基于这个stream作为input,调用transform方法,放入对应阶段的operator

DataSourceTranslator sourceTranslator = new DataSourceTranslator();
...
TransformTranslator transformTranslator = new TransformTranslator();
...
SchemaOperatorTranslator schemaOperatorTranslator =...
...
DataSinkTranslator sinkTranslator = new DataSinkTranslator();
...
PartitioningTranslator partitioningTranslator = new PartitioningTranslator();
...

translate的调用顺序如下

DataStream<Event> stream =sourceTranslator.translate(...
stream =transformTranslator.translatePreTransform(...
stream =transformTranslator.translatePostTransform(...
stream =schemaOperatorTranslator.translate(...
stream =partitioningTranslator.translate(...
sinkTranslator.translate(pipelineDef.getSink(), stream, dataSink, schemaOperatorIDGenerator.generate());return new FlinkPipelineExecution(env...)...

逐一说明

  1. sourceTranslator.translate 通过source名字获取sourceProvider,关联到stream中
  • sourceProvider.getSource ->
    • MysqlSource ->
      • createReader
      • createEnumerator
  1. stream = transformTranslator.translatePreTransform
if (transforms.isEmpty()) {return input;
}

由于有如上代码,我们的yaml中没有涉及,所以忽略这个transform

  1. stream = transformTranslator.translatePostTransform
  • 同上
  1. stream = schemaOperatorTranslator.translate
  • 插入一个schemaOperator节点,在收到schemaChangeEvent的时候
    1. 停住当前流
    2. 上报coodinator
    3. flush下游数据,让sink消耗完已有数据
    4. sink 通知coodinator flush完成
    5. coodinator调用sink注册的MetaApplier完成schema变更,变更完成后通知schemaOperator
    6. schemaOperator重新放通数据
  1. stream = partitioningTranslator.translate
  • 构建prePartition postPartition节点
  1. sinkTranslator.translate
  • 构建sink节点
  1. FlinkPipelineExecution 中的 execute 方法调用 env.executeAsync(jobName)

总结

flink-cdc 3.0 提供的pipeline模式,通过定义yaml,自动构建了一条cdc pipeline,避免手动调用datastream api,并且支持schema evolution

构建的主要逻辑集中在 FlinkPipelineComposer

相关文章:

FlinkPipelineComposer 详解

FlinkPipelineComposer 详解 原文 背景 在flink-cdc 3.0中引入了pipeline机制&#xff0c;提供了除Datastream api/flink sql以外的一种方式定义flink 任务 通过提供一个yaml文件&#xff0c;描述source sink transform等主要信息 由FlinkPipelineComposer解析&#xff0c…...

蓝桥杯-洛谷刷题-day2(C++)

目录 1.小写字母与大写字母的转换 2.使用string&#xff08;额外开一章持续补充&#xff09; i.访问字符串最后一位 3.保留N位小数输出 i.C侧 ii.C语言侧 iii.总结 4.高精度相加 i.各种数据类型转字符型 ii.三元运算符 iii.循环条件中的carry 1.小写字母与大写字母的…...

16008.行为树(五)-自定义数据指针在黑板中的传递

文章目录 1.1 背景1.2 xml文件定义1.3 代码实现1.3 执行结果1.1 背景 自定义数据结构指针,通过黑板的形式,在树的节点中进行指针的传递。 1.2 xml文件定义 xhome@ubuntu:~/opt/groot_pro$ cat unit_t1.xml<?xml version="1.0" encoding="UTF-8"?&…...

javascript Vue

DOM对象 什么是DOM DOM(Document Object Model)&#xff1a;文档对象模型&#xff0c;就是Javascript将HTML文档的各个组成部分封装为对象&#xff0c;通过修改HTML元素的内容和样式动态改变页面。 如何获取DOM对象 获取DOM中的元素对象&#xff08;Element对象/标签&…...

《揭秘观察者模式:作用与使用场景全解析》

在软件开发的世界中&#xff0c;设计模式就像是建筑师手中的蓝图&#xff0c;指导着软件系统的构建。其中&#xff0c;观察者模式是一种极为重要且广泛应用的设计模式。今天&#xff0c;我们就来深入探讨一下观察者模式的作用和使用场景。 一、观察者模式是什么&#xff1f; …...

【QT常用技术讲解】优化网络链接不上导致qt、qml界面卡顿的问题

前言 qt、qml项目经常会涉及访问MySQL数据库、网络服务器&#xff0c;并且界面打开时的初始化过程就会涉及到链接Mysql、网络服务器获取数据&#xff0c;如果网络不通&#xff0c;卡个几十秒&#xff0c;会让用户觉得非常的不爽&#xff0c;本文从技术调研的角度讲解解决此类问…...

下划线命名json数组转java对象

/*** 将驼峰式命名的字符串转换为下划线方式* @param camelCase* @return*/ private static String toUnderlineCase(String camelCase) {return StrUtil.toUnderlineCase(camelCase); }/*** 下划线-赋值给-驼峰* @param source 源数据* @param target 目标数据*/ public stati…...

实测运行容器化Nginx服务器

文章目录 前言一、拉取Nginx镜像二、创建挂载目录三、运行容器化Nginx服务器四、访问网页测试 总结 前言 运行容器化Nginx服务器&#xff0c;首先确保正确安装docker&#xff0c;并且已启动运行&#xff0c;具体安装docker方法见笔者前面的博文《OpenEuler 下 Docker 安装、配…...

显示器接口种类 | 附图片

显示器接口类型主要包括VGA、DVI、HDMI、DP和USB Type-C等。 VGA、DVI、HDMI、DP和USB Type-C 1. 观察 VGA接口:15针 DP接口&#xff1a;在DP接口旁&#xff0c;都有一个“D”型的标志。 电脑主机&#xff1a;DP(D) 显示器&#xff1a;VGA(15针) Ref https://cloud.tenc…...

C++初阶——list

一、什么是list list是一个可以在序列的任意位置进行插入和删除的容器&#xff0c;并且可以进行双向迭代。list的底层是一个双向链表&#xff0c;双向链表可以将它们包含的每个元素存储在不同且不相关的存储位置。通过将每个元素与前一个元素的链接和后一个元素的链接关联起来&…...

软件设计师-排序算法

冒泡排序 每一趟冒泡排序&#xff0c;从第0个元素开始&#xff0c;和后面的元素比较&#xff0c;如果大于就交换&#xff0c;否则不变&#xff0c;每次冒泡可以把最大的元素放到最后一个&#xff0c;第一次冒泡的终点是n-1,第二趟的是n-2,直到最后剩下一个元素。时间复杂度O(n…...

即插即用篇 | YOLOv8 引入 代理注意力 AgentAttention

Transformer模型中的注意力模块是其核心组成部分。虽然全局注意力机制具有很强的表达能力,但其高昂的计算成本限制了在各种场景中的应用。本文提出了一种新的注意力范式,称为“代理注意力”(Agent Attention),以在计算效率和表示能力之间取得平衡。代理注意力使用四元组(Q…...

020_Servlet_Mysql学生选课系统(新版)_lwplus87

摘 要 随着在校大学生人数的不断增加&#xff0c;教务系统的数据量也不断的上涨。针对学生选课这一环节&#xff0c;本系统从学生网上自主选课以及课程发布两个大方面进行了设计&#xff0c;基本实现了学生的在线信息查询、选课功能以及教师对课程信息发布的管理等功能&…...

LabVIEW导入并显示CAD DXF文件图形 程序见附件

LabVIEW导入并显示CAD DXF文件图形 程序见附件 LabVIEW导入并显示CAD DXF文件图形 程序见附件 - 北京瀚文网星科技有限公司 LabVIEW广泛应用于自动化、数据采集、图形显示等领域。对于涉及CAD图形的应用&#xff0c;LabVIEW也提供了一些方法来导入和显示CAD DXF文件&#x…...

《云原生安全攻防》-- K8s安全防护思路

从本节课程开始&#xff0c;我们将正式进入防护篇。通过深入理解K8s提供的多种安全机制&#xff0c;从防守者的角度&#xff0c;运用K8s的安全最佳实践来保障K8s集群的安全。 在这个课程中&#xff0c;我们将学习以下内容&#xff1a; K8s安全防护思路&#xff1a;掌握K8s自身提…...

鸿蒙系统的发展及开发者机遇

鸿蒙系统&#xff08;HarmonyOS&#xff09;凭借其分布式架构和跨设备协同能力&#xff0c;展现出强大的发展潜力&#xff0c;在智能手机、智能穿戴、车载、家居等行业领域应用日益广泛&#xff0c;已逐渐形成与安卓、iOS 三足鼎立的市场格局。 开发者面临的挑战 1. 技术适应与…...

Java | Leetcode Java题解之第556题下一个更大元素III

题目&#xff1a; 题解&#xff1a; class Solution {public int nextGreaterElement(int n) {int x n, cnt 1;for (; x > 10 && x / 10 % 10 > x % 10; x / 10) {cnt;}x / 10;if (x 0) {return -1;}int targetDigit x % 10;int x2 n, cnt2 0;for (; x2 %…...

OSPF动态路由配置实验:实现高效网络自动化

实验主题&#xff1a;OSPF动态路由协议配置 实验背景 OSPF&#xff08;Open Shortest Path First&#xff09;是一种基于链路状态的路由协议&#xff0c;广泛应用于中大型网络中。它采用Dijkstra算法计算最短路径&#xff0c;以确保网络中的路由更新快速、稳定&#xff0c;并能…...

CRM对企业有什么用?如何在实践中有效应用CRM系统?

在现在非常激烈竞争环境中&#xff0c;客户关系管理系统&#xff08;CRM&#xff09; 已经成为很多企业的“必备神器”&#xff0c;它不仅帮助企业高效地管理客户信息&#xff0c;还能提高客户满意度&#xff0c;增强客户忠诚度&#xff0c;最终推动销售增长和业务发展。然而&a…...

渗透测试之 -- Linux基础

声明 学习视频来自B站UP主 泷羽sec,如涉及侵泷羽sec权马上删除文章笔记的只是方便各位师傅学习知识,以下网站涉及学习内容,其他的都与本人无关,切莫逾越法律红线,否则后果自负 一、Openssl 1、openssl passwd -1 123 openssl一个开源加密工具包&#xff0c;用于各种解密、加…...

暗黑3按键助手:一键解放双手的终极游戏伴侣 [特殊字符]

暗黑3按键助手&#xff1a;一键解放双手的终极游戏伴侣 &#x1f3ae; 【免费下载链接】D3keyHelper D3KeyHelper是一个有图形界面&#xff0c;可自定义配置的暗黑3鼠标宏工具。 项目地址: https://gitcode.com/gh_mirrors/d3/D3keyHelper 还在为暗黑3中复杂的技能连招和…...

从入门到精通解析Python Selenium如何模拟浏览器操作

Selenium是一款开源的自动化测试工具&#xff0c;核心优势在于能模拟真实用户操作浏览器&#xff08;如点击、输入、滚动&#xff09;&#xff0c;并渲染动态加载的网页内容&#xff08;解决Requests库无法爬取JS动态数据的问题&#xff09;。 一、Selenium入门准备&#xff1a…...

信创协同办公价格与成本:这样选,性价比直接拉满!

“一套信创协同办公到底多少钱&#xff1f;”“是按人头收费&#xff0c;还是按项目打包算&#xff1f;”“前期买着便宜&#xff0c;后期维护会不会无底洞&#xff1f;”不管是政企单位采购&#xff0c;还是企业选型&#xff0c;这三个问题几乎是所有人的核心顾虑。毕竟信创办…...

深度解析开源Galgame社区:从零构建纯净视觉小说交流平台

深度解析开源Galgame社区&#xff1a;从零构建纯净视觉小说交流平台 【免费下载链接】kun-touchgal-next TouchGAL是立足于分享快乐的一站式Galgame文化社区, 为Gal爱好者提供一片净土! 项目地址: https://gitcode.com/gh_mirrors/ku/kun-touchgal-next TouchGAL是一个基…...

FPGA实战:手把手教你用Verilog给NAND Flash数据上把“安全锁”(附完整ECC代码)

FPGA实战&#xff1a;用Verilog为NAND Flash打造硬件级ECC防护系统 1. 为什么你的NAND Flash需要硬件ECC&#xff1f; NAND Flash存储芯片在工业控制、物联网终端和边缘计算设备中扮演着关键角色&#xff0c;但它的物理特性导致数据可靠性存在先天缺陷。想象一下&#xff0c;当…...

像素皇城灵蛇贺岁:5分钟部署你的赛博春联生成器(保姆级教程)

像素皇城灵蛇贺岁&#xff1a;5分钟部署你的赛博春联生成器&#xff08;保姆级教程&#xff09; 1. 前言&#xff1a;当传统春节遇上赛博美学 春节贴春联是延续千年的传统习俗&#xff0c;但你是否想过用AI技术为这个传统注入新的活力&#xff1f;今天我们要介绍的"像素…...

Chandra OCR多平台部署指南:Windows WSL2/Mac Metal/Linux Docker全搞定

Chandra OCR多平台部署指南&#xff1a;Windows WSL2/Mac Metal/Linux Docker全搞定 1. Chandra OCR核心能力解析 Chandra是Datalab.to在2025年10月开源的布局感知OCR模型&#xff0c;与传统OCR工具最大的区别在于它能完整保留文档的排版结构信息。想象一下&#xff1a;当你扫…...

Qt桌面应用集成PaddleOCR:从环境搭建到精准识别的实践指南

1. 环境准备&#xff1a;搭建PaddleOCR的Qt开发环境 第一次在Qt里折腾PaddleOCR时&#xff0c;我对着官方文档折腾了半天还是报错&#xff0c;后来发现是第三方库的路径没配好。这里分享下我踩坑后总结的可靠方案。 核心依赖三件套&#xff1a;PaddlePaddle推理库、PaddleOCR C…...

终极英雄联盟工具集:3大核心功能让你轻松掌控游戏全局

终极英雄联盟工具集&#xff1a;3大核心功能让你轻松掌控游戏全局 【免费下载链接】League-Toolkit 兴趣使然的、简单易用的英雄联盟工具集。支持战绩查询、自动秒选等功能。基于 LCU API。 项目地址: https://gitcode.com/gh_mirrors/le/League-Toolkit League-Toolkit…...

深入理解 MySQL 事务:从基础到实战,一篇吃透

在开发和运维 MySQL 数据库的过程中&#xff0c;事务&#xff08;Transaction&#xff09; 是绕不开的核心知识点&#xff0c;它是保证数据库数据安全、一致、可靠的基石。无论是电商下单、银行转账、支付结算&#xff0c;还是日常的业务数据操作&#xff0c;都离不开事务的支撑…...