当前位置: 首页 > article >正文

flink StreamGraph解析

Flink程序有三部分operation组成,分别是源source、转换transformation、目的地sink。这三部分构成DAG。

DAG首先生成的是StreamGraph。

用户代码在添加operation的时候会在env中缓存(变量transformations),在env.execute()执行的时候才会生成对应StreamGraph。

生成StreamGraph

transformations中只存了3个变量,其实是5个。

getStreamGraph顾名思义就是生成StreamGraph。

最后是getStreamGraphGenerator(transformations).generate()生成。getStreamGraphGenerator获取生成器,generate生成StreamGraph

generate方法中首先创建了StreamGraph对象,再遍历transformations给StreamGraph添加相关操作信息(transform(transformation))。其余部分都是处理相关的运行参数(执行参数、checkpoint参数、savepoint参数等)

transform中主要有三部分。

没有并行度,添加并行度

要是指定了slotGroup,将需要的slot资源记录到slotSharingGroupResources中

最后实际转换。优先使用_translatorMap_中存在的translator。这些translator是已经定义好的解释器,可以根据不同场景选择是流模式还是批模式。传统是legacyTransform

legacyTransform

根据情况处理单个流输入或多个流输入。

translate根据情况选择批处理或者流处理

addOperator和addEdge是重点方法,添加顶点和边。

StreamEdge

一个edge连接上下游两个node。

edgeId:唯一id

sourceId、targetId:连接的上下游node的id

outputPartitioner:分区器

StreamNode

一个node可以有多个edge

inEdges、outEdges:node的入边和出边

jobVertexClass:封装用户函数的执行类

StreamGraph

有多个streamNodes组成,streamNodes之间是streamEdge相连。

类似以下这种:

streamNodes:缓存graph所有的node

sources:DAG的输入源集合

sinks:DAG的输出源集合

添加node

addSink、addSource、addOperator是主要方法。可以看到addSink、addSource也是addOperator。

addOperator中addNode是添加StreamNode的方法。

addNode就是创建StreamNode对象,并添加到streamNodes中。

添加edge

方法是addEdge,内部调用addEdgeInternal

addEdgeInternal中前面是处理虚拟节点的。后面是调用createActualEdge来添加

createActualEdge中首先确定partitioner,没有指定partitioner就优先使用ForwardPartitioner,要求上下游并行度一样,否则使用RebalancePartitioner。

然后创建StreamEdge对象,并将相关信息绑定到对应的StreamNode上。

相关文章:

flink StreamGraph解析

Flink程序有三部分operation组成,分别是源source、转换transformation、目的地sink。这三部分构成DAG。 DAG首先生成的是StreamGraph。 用户代码在添加operation的时候会在env中缓存(变量transformations),在env.execute()执行的…...

本地AI模型:未来智能设备的核心驱动力

标题:“本地AI模型:未来智能设备的核心驱动力” 文章信息摘要: 未来AI设备(如Meta Ray-Bans)的发展将更加依赖本地语言模型的优化与集成,而非仅依靠云端AI模型。本地模型在隐私保护、推理速度和离线访问方…...

基于SpringBoot的网上摄影工作室开发与实现 | 含论文、任务书、选题表

随着互联网技术的不断发展,摄影爱好者们越来越需要一个在线平台来展示和分享他们的作品。基于SpringBoot的网上摄影工作室应运而生,它不仅为用户提供了一个展示摄影作品的平台,还为管理员提供了便捷的管理工具。本文幽络源将详细介绍该系统的…...

数字人+展厅应用方案:开启全新沉浸式游览体验

随着人们生活质量的不断提升,对于美好体验的追求日益增长。在展厅展馆领域,传统的展示方式已难以满足大众日益多样化的需求。而通过将数字人与展厅进行深度结合,可以打造数字化、智能化新型展厅,不仅能提升展示效果,还…...

基于单片机的家用无线火灾报警系统的设计

1 总体设计 本设计家用无线火灾报警系统利用单片机控制技术、传感器检测技术、GSM通信技术展开设计,如图2.1所示为本次系统设计的主体框图,系统包括单片机主控模块、温度检测模块、烟雾检测模块、按键模块、GSM通信模块、液晶显示模块、蜂鸣器报警模块。…...

多级缓存(亿级并发解决方案)

多级缓存(亿级流量(并发)的缓存方案) 传统缓存的问题 传统缓存是请求到达tomcat后,先查询redis,如果未命中则查询数据库,问题如下: (1)请求要经过tomcat处…...

iic、spi以及uart

何为总线? 连接多个部件的信息传输线,是部件共享的传输介质 总线的作用? 实现数据传输,即模块之间的通信 总线如何分类? 根据总线连接的外设属于内部外设还是外部外设将总线可以分为片内总线和片外总线 可分为数…...

Shell编程(for循环+并发问题+while循环+流程控制语句+函数传参+函数变量+函数返回值+反向破解MD5)

本篇文章继续给大家介绍Shell编程,包括for循环、并发问题,while循环,流程控制语句,函数传参、函数变量、函数返回值,反向破解MD5等内容。 1.for循环 for 变量 in [取值列表] 取值列表可以是数字 字符串 变量 序列…...

深入 Rollup:从入门到精通(三)Rollup CLI命令行实战

准备阶段:初始化项目 初始化项目,这里使用的是pnpm,也可以使用yarn或者npm # npm npm init -y # yarn yarn init -y # pnpm pnpm init安装rollup # npm npm install rollup -D # yarn yarn add rollup -D # pnpm pnpm install rollup -D在…...

CycleGAN模型解读(附源码+论文)

CycleGAN 论文链接:Unpaired Image-to-Image Translation using Cycle-Consistent Adversarial Networks 官方链接:pytorch-CycleGAN-and-pix2pix 老规矩,先看看效果 总体流程 先简单过一遍流程,细节在代码里说。CycleGAN有…...

线程配置经验

工作时,时常会遇到,线程相关的问题与解法,本人会持续对开发过程中遇到的关于线程相关的问题及解决记录更新记录在此篇博客中。 目录 一、线程基本知识 1. 线程和进程 二、问题与解法 1. 避免乘法级别数量线程并行 1)使用线程池…...

动态规划DP 数字三角形模型 传纸条(题目分析+C++完整代码)

传纸条 原题链接 AcWing 275. 传纸条 题目描述 小渊和小轩是好朋友也是同班同学,他们在一起总有谈不完的话题。 一次素质拓展活动中,班上同学安排坐成一个 m行 n 列的矩阵,而小渊和小轩被安排在矩阵对角线的两端,因此&#x…...

Ubuntu二进制部署K8S 1.29.2

本机说明 本版本非高可用,单Master,以及一个Node 新装的 ubuntu 22.04k8s 1.29.3使用该文档请使用批量替换 192.168.44.141这个IP,其余照着复制粘贴就可以成功需要手动 设置一个 固定DNS,我这里设置的是 8.8.8.8不然coredns无法…...

第05章 10 地形梯度场模拟显示

在 VTK(Visualization Toolkit)中,可以通过计算地形数据的梯度场,并用箭头或线条来表示梯度方向和大小,从而模拟显示地形梯度场。以下是一个示例代码,展示了如何使用 VTK 和 C 来计算和显示地形数据的梯度场…...

全程Kali linux---CTFshow misc入门

图片篇(基础操作) 第一题: ctfshow{22f1fb91fc4169f1c9411ce632a0ed8d} 第二题 解压完成后看到PNG,可以知道这是一张图片,使用mv命令或者直接右键重命名,修改扩展名为“PNG”即可得到flag。 ctfshow{6f66202f21ad22a2a19520cdd…...

[ Spring ] Spring Cloud Alibaba Message Stream Binder for RocketMQ 2025

文章目录 IntroduceProject StructureDeclare Plugins and ModulesApply Plugins and Add DependenciesSender PropertiesSender ApplicationSender ControllerReceiver PropertiesReceiver ApplicationReceiver Message HandlerCongratulationsAutomatically Send Message By …...

深度学习笔记——循环神经网络之LSTM

大家好,这里是好评笔记,公主号:Goodnote,专栏文章私信限时Free。本文详细介绍面试过程中可能遇到的循环神经网络LSTM知识点。 文章目录 文本特征提取的方法1. 基础方法1.1 词袋模型(Bag of Words, BOW)工作…...

AI 模型评估与质量控制:生成内容的评估与问题防护

在生成式 AI 应用中,模型生成的内容质量直接影响用户体验。然而,生成式模型存在一定风险,如幻觉(Hallucination)问题——生成不准确或完全虚构的内容。因此,在构建生成式 AI 应用时,模型评估与质…...

[MILP] Logical Constraints 0-1 (Note2)

1. 如果选择了项目1,则项目2,3也要求被选中 表示为: 2. 如果确定了选项目1,则接下来必须选项目2或者项目3 表示为: or 3. 如果同时选择了项目2和项目3,则不可以选择项目1 表示为: 4. 如果…...

DFFormer实战:使用DFFormer实现图像分类任务(二)

文章目录 训练部分导入项目使用的库设置随机因子设置全局参数图像预处理与增强读取数据设置Loss设置模型设置优化器和学习率调整策略设置混合精度,DP多卡,EMA定义训练和验证函数训练函数验证函数调用训练和验证方法 运行以及结果查看测试完整的代码 在上…...

蓝桥杯例题四

每个人都有无限潜能,只要你敢于去追求,你就能超越自己,实现梦想。人生的道路上会有困难和挑战,但这些都是成长的机会。不要被过去的失败所束缚,要相信自己的能力,坚持不懈地努力奋斗。成功需要付出汗水和努…...

如何复现o1模型,打造医疗 o1?

如何复现o1模型,打造医疗 o1? o1 树搜索一、起点:预训练规模触顶与「推理阶段(Test-Time)扩展」的动机二、Test-Time 扩展的核心思路与常见手段1. Proposer & Verifier 统一视角方法1:纯 Inference Sca…...

PostgreSQL TRUNCATE TABLE 操作详解

PostgreSQL TRUNCATE TABLE 操作详解 引言 在数据库管理中,经常需要对表进行操作以保持数据的有效性和一致性。TRUNCATE TABLE 是 PostgreSQL 中一种高效删除表内所有记录的方法。本文将详细探讨 PostgreSQL 中 TRUNCATE TABLE 的使用方法、性能优势以及注意事项。 什么是 …...

【JavaWeb06】Tomcat基础入门:架构理解与基本配置指南

文章目录 🌍一. WEB 开发❄️1. 介绍 ❄️2. BS 与 CS 开发介绍 ❄️3. JavaWeb 服务软件 🌍二. Tomcat❄️1. Tomcat 下载和安装 ❄️2. Tomcat 启动 ❄️3. Tomcat 启动故障排除 ❄️4. Tomcat 服务中部署 WEB 应用 ❄️5. 浏览器访问 Web 服务过程详…...

【NOI】C++程序结构入门之循环结构三-计数求和

文章目录 前言一、计数求和1.导入2.计数器3.累加器 二、例题讲解问题:1741 - 求出1~n中满足条件的数的个数和总和?问题:1002. 编程求解123...n问题:1004. 编程求1 * 2 * 3*...*n问题:1014. 编程求11/21/3...1/n问题&am…...

[Linux]Shell脚本中以指定用户运行命令

前言 当我们为Linux设置了用户自启动的shel脚本,默认会使用root用户执行启动脚本中的命令,那么我们如何在启动脚本中切换为指定用户指定命令呢。 命令 以下将列出两条命令,两条命令都可以实现以指定用户运行命令,凭喜好选择使用…...

通过 NAudio 控制电脑操作系统音量

根据您的需求,以下是通过 NAudio 获取和控制电脑操作系统音量的方法: 一、获取和控制系统音量 (一)获取系统音量和静音状态 您可以使用 NAudio.CoreAudioApi.MMDeviceEnumerator 来获取系统默认音频设备的音量和静音状态&#…...

新项目上传gitlab

Git global setup git config --global user.name “FUFANGYU” git config --global user.email “fyfucnic.cn” Create a new repository git clone gitgit.dev.arp.cn:casDs/sawrd.git cd sawrd touch README.md git add README.md git commit -m “add README” git push…...

【异步编程基础】FutureTask基本原理与异步阻塞问题

文章目录 一、FutureTask 的桥梁作用二、Future 模式与异步回调三、 FutureTask获取异步结果的逻辑1. 获取异步执行结果的步骤2. 举例说明3. FutureTask的异步阻塞问题 Runnable 用于定义无返回值的任务,而 Callable 用于定义有返回值的任务。然而,Calla…...

原生 Node 开发 Web 服务器

一、创建基本的 HTTP 服务器 使用 http 模块创建 Web 服务器 const http require("http");// 创建服务器const server http.createServer((req, res) > {// 设置响应头res.writeHead(200, { "Content-Type": "text/plain" });// 发送响应…...