Flink源码解析之:如何根据JobGraph生成ExecutionGraph
Flink源码解析之:如何根据JobGraph生成ExecutionGraph
在上一篇Flink源码解析中,我们介绍了Flink如何根据StreamGraph生成JobGraph的流程,并着重分析了其算子链的合并过程和JobGraph的构造流程。
对于StreamGraph和JobGraph的生成来说,其都是在客户端生成的,本文将会讲述JobGraph到ExecutionGraph的生成过程,而这一过程会在Flink JobManager的服务端来完成。当JobGraph从客户端提交到JobManager后,JobManager会根据JobGraph生成对应的ExecutionGraph,而ExecutionGraph就是Flink作业调度时使用的核心数据结构。 本篇将会详细介绍JobGraph转换为ExecutionGraph的流程。
主体流程梳理
Flink在将JobGraph转换成ExecutionGraph后,便可以开始执行真正的任务。这一转换流程主要在Flink源码中的DefaultExecutionGraphBuilder
类中的buildGraph
方法中实现的。在转换过程中,涉及到了一些新的基本概念,先来简单介绍一下这些概念,对于理解ExecutionGraph有较大的帮助:
- ExecutionJobVertex: 在ExecutionGraph中表示执行顶点,与JobGraph中的JobVertex一一对应。实际上,每个ExecutionJobVertex也是依赖JobVertex来创建的。
- ExecutionVertex: 在ExecutionJobVertex类中创建,每个并发度都对应了一个ExecutionVertex对象,每个ExecutionVertex都代表JobVertex在某个特定并行子任务中的执行。在实际执行时,每个ExecutionVertex实际上就是一个Task,是ExecutionJobVertex并行执行的一个子任务。
- Execution: Execution表示ExecutionVertex的一次执行。由于ExecutionVertex可以被执行多次(用于恢复、重新计算、重新分配),这个类用于跟踪该ExecutionVertex的单个执行状态和资源。
- IntermediateResult: 在JobGraph中用IntermediateDataSet表示上游JobVertex的输出数据流,而在ExecutionGraph中,则用IntermediateResult来表示ExecutionJobVertex的输出数据流。
- IntermediateResultPartition:这是IntermediateResult的一部分或一个分片。由于有多个并行任务(ExecutionVertex)执行相同的操作,每个任务都会产生一部分IntermediateResult。这些结果在物理存储和计算过程中,可能会被进一步划分成多个分区,每个分区对应一个 IntermediateResultPartition对象。
从上面的基本概念也可以看出,在ExecutionGraph中:
- 相比StreamGraph和JobGraph,ExecutionGraph是实际根据任务并行度来生成拓扑结构的,在ExecutionGraph中,每个并行子任务都对应一个ExecutionVertex顶点和IntermediateResultPartition输出数据流分区。
- 在ExecutionGraph中,上下游节点之间的连接是通过ExecutionVertex -> IntermediateResultPartition -> ExecutionVertex 对象来完成的。
整体的执行流程图如下所示:
入口方法:DefaultExecutionGraphBuilder.buildGraph
ExecutionGraph的生成是在DefaultExecutionGraphBuilder
类的buildGraph
方法中实现的:
public class DefaultExecutionGraphBuilder {public static DefaultExecutionGraph buildGraph(JobGraph jobGraph,Configuration jobManagerConfig,ScheduledExecutorService futureExecutor,Executor ioExecutor,ClassLoader classLoader,CompletedCheckpointStore completedCheckpointStore,CheckpointsCleaner checkpointsCleaner,CheckpointIDCounter checkpointIdCounter,Time rpcTimeout,MetricGroup metrics,BlobWriter blobWriter,Logger log,ShuffleMaster<?> shuffleMaster,JobMasterPartitionTracker partitionTracker,TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint,ExecutionDeploymentListener executionDeploymentListener,ExecutionStateUpdateListener executionStateUpdateListener,long initializationTimestamp,VertexAttemptNumberStore vertexAttemptNumberStore,VertexParallelismStore vertexParallelismStore)throws JobExecutionException, JobException {checkNotNull(jobGraph, "job graph cannot be null");final String jobName = jobGraph.getName();final JobID jobId = jobGraph.getJobID();// 创建JobInformationfinal JobInformation jobInformation =new JobInformation(jobId,jobName,jobGraph.getSerializedExecutionConfig(),jobGraph.getJobConfiguration(),jobGraph.getUserJarBlobKeys(),jobGraph.getClasspaths());// Execution 保留的最大历史数final int maxPriorAttemptsHistoryLength =jobManagerConfig.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);// IntermediateResultPartitions的释放策略final PartitionGroupReleaseStrategy.Factory partitionGroupReleaseStrategyFactory =PartitionGroupReleaseStrategyFactoryLoader.loadPartitionGroupReleaseStrategyFactory(jobManagerConfig);// create a new execution graph, if none exists so farfinal DefaultExecutionGraph executionGraph;try {// 创建默认的ExecutionGraph执行图对象,最后会返回该创建好的执行图对象executionGraph =new DefaultExecutionGraph(jobInformation,futureExecutor,ioExecutor,rpcTimeout,maxPriorAttemptsHistoryLength,classLoader,blobWriter,partitionGroupReleaseStrategyFactory,shuffleMaster,partitionTracker,partitionLocationConstraint,executionDeploymentListener,executionStateUpdateListener,initializationTimestamp,vertexAttemptNumberStore,vertexParallelismStore);} catch (IOException e) {throw new JobException("Could not create the ExecutionGraph.", e);}// set the basic propertiestry {executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));} catch (Throwable t) {log.warn("Cannot create JSON plan for job", t);// give the graph an empty planexecutionGraph.setJsonPlan("{}");}// initialize the vertices that have a master initialization hook// file output formats create directories here, input formats create splitsfinal long initMasterStart = System.nanoTime();log.info("Running initialization on master for job {} ({}).", jobName, jobId);for (JobVertex vertex : jobGraph.getVertices()) {String executableClass = vertex.getInvokableClassName();if (executableClass == null || executableClass.isEmpty()) {throw new JobSubmissionException(jobId,"The vertex "+ vertex.getID()+ " ("+ vertex.getName()+ ") has no invokable class.");}try {vertex.initializeOnMaster(classLoader);} catch (Throwable t) {throw new JobExecutionException(jobId,"Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(),t);}}log.info("Successfully ran initialization on master in {} ms.",(System.nanoTime() - initMasterStart) / 1_000_000);// topologically sort the job vertices and attach the graph to the existing one// 这里会先做一个排序,source源节点会放在最前面,接着开始遍历// 必须保证当前添加到集合的节点的前置节点都已经添加进去了List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();if (log.isDebugEnabled()) {log.debug("Adding {} vertices from job graph {} ({}).",sortedTopology.size(),jobName,jobId);}// 构建执行图的重点方法。生成具体的ExecutionGraphexecutionGraph.attachJobGraph(sortedTopology);if (log.isDebugEnabled()) {log.debug("Successfully created execution graph from job graph {} ({}).", jobName, jobId);}// configure the state checkpointing// checkpoint的相关配置if (isCheckpointingEnabled(jobGraph)) {JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();// Maximum number of remembered checkpointsint historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);CheckpointStatsTracker checkpointStatsTracker =new CheckpointStatsTracker(historySize,snapshotSettings.getCheckpointCoordinatorConfiguration(),metrics);// load the state backend from the application settingsfinal StateBackend applicationConfiguredBackend;final SerializedValue<StateBackend> serializedAppConfigured =snapshotSettings.getDefaultStateBackend();if (serializedAppConfigured == null) {applicationConfiguredBackend = null;} else {try {applicationConfiguredBackend =serializedAppConfigured.deserializeValue(classLoader);} catch (IOException | ClassNotFoundException e) {throw new JobExecutionException(jobId, "Could not deserialize application-defined state backend.", e);}}// StateBackend配置final StateBackend rootBackend;try {rootBackend =StateBackendLoader.fromApplicationOrConfigOrDefault(applicationConfiguredBackend,snapshotSettings.isChangelogStateBackendEnabled(),jobManagerConfig,classLoader,log);} catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e);}// load the checkpoint storage from the application settingsfinal CheckpointStorage applicationConfiguredStorage;final SerializedValue<CheckpointStorage> serializedAppConfiguredStorage =snapshotSettings.getDefaultCheckpointStorage();if (serializedAppConfiguredStorage == null) {applicationConfiguredStorage = null;} else {try {applicationConfiguredStorage =serializedAppConfiguredStorage.deserializeValue(classLoader);} catch (IOException | ClassNotFoundException e) {throw new JobExecutionException(jobId,"Could not deserialize application-defined checkpoint storage.",e);}}final CheckpointStorage rootStorage;try {rootStorage =CheckpointStorageLoader.load(applicationConfiguredStorage,null,rootBackend,jobManagerConfig,classLoader,log);} catch (IllegalConfigurationException | DynamicCodeLoadingException e) {throw new JobExecutionException(jobId, "Could not instantiate configured checkpoint storage", e);}// instantiate the user-defined checkpoint hooks// 示例化用户自定义的cp hookfinal SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks =snapshotSettings.getMasterHooks();final List<MasterTriggerRestoreHook<?>> hooks;if (serializedHooks == null) {hooks = Collections.emptyList();} else {final MasterTriggerRestoreHook.Factory[] hookFactories;try {hookFactories = serializedHooks.deserializeValue(classLoader);} catch (IOException | ClassNotFoundException e) {throw new JobExecutionException(jobId, "Could not instantiate user-defined checkpoint hooks", e);}final Thread thread = Thread.currentThread();final ClassLoader originalClassLoader = thread.getContextClassLoader();thread.setContextClassLoader(classLoader);try {hooks = new ArrayList<>(hookFactories.length);for (MasterTriggerRestoreHook.Factory factory : hookFactories) {hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));}} finally {thread.setContextClassLoader(originalClassLoader);}}final CheckpointCoordinatorConfiguration chkConfig =snapshotSettings.getCheckpointCoordinatorConfiguration();// 创建CheckpointCoordinator对象executionGraph.enableCheckpointing(chkConfig,hooks,checkpointIdCounter,completedCheckpointStore,rootBackend,rootStorage,checkpointStatsTracker,checkpointsCleaner);}// create all the metrics for the Execution Graph// 添加metrics指标metrics.gauge(RestartTimeGauge.METRIC_NAME, new RestartTimeGauge(executionGraph));metrics.gauge(DownTimeGauge.METRIC_NAME, new DownTimeGauge(executionGraph));metrics.gauge(UpTimeGauge.METRIC_NAME, new UpTimeGauge(executionGraph));return executionGraph;}
在这个方法里,会先创建一个 ExecutionGraph 对象,然后对 JobGraph 中的 JobVertex 列表做一下排序(先把有 source 节点的 JobVertex 放在最前面,然后开始遍历,只有当前 JobVertex 的前置节点都已经添加到集合后才能把当前 JobVertex 节点添加到集合中),最后通过过 attachJobGraph() 方法生成具体的ExecutionGraph。
在上面的代码中,最需要核心关注的方法是:executionGraph.attachJobGraph(sortedTopology);
。该方法是创建ExecutionGraph的核心方法,包括了创建上面我们说的各种ExecutionGraph中涉及的对象,以及连接它们来形成ExecutionGraph拓扑结构。
接下来我们进入该方法来一探究竟。
生成ExecutionGraph:attachJobGraph
先来看下attachJobGraph
方法的实现:
public void attachJobGraph(List<JobVertex> topologicallySorted) throws JobException {assertRunningInJobMasterMainThread();LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} "+ "vertices and {} intermediate results.",topologicallySorted.size(),tasks.size(),intermediateResults.size());final long createTimestamp = System.currentTimeMillis();// 遍历排序好的拓扑JobVertexfor (JobVertex jobVertex : topologicallySorted) {if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {this.isStoppable = false;}// 获取节点并行度信息VertexParallelismInformation parallelismInfo =parallelismStore.getParallelismInfo(jobVertex.getID());// create the execution job vertex and attach it to the graph// 创建ExecutionJobVertexExecutionJobVertex ejv =new ExecutionJobVertex(this,jobVertex,maxPriorAttemptsHistoryLength,rpcTimeout,createTimestamp,parallelismInfo,initialAttemptCounts.getAttemptCounts(jobVertex.getID()));// 重要方法!!!// 构建ExecutionGraph,连接上下游节点ejv.connectToPredecessors(this.intermediateResults);ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);if (previousTask != null) {throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",jobVertex.getID(), ejv, previousTask));}// 遍历ExecutionJobVertex的输出IntermediateResultfor (IntermediateResult res : ejv.getProducedDataSets()) {IntermediateResult previousDataSet =this.intermediateResults.putIfAbsent(res.getId(), res);if (previousDataSet != null) {throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",res.getId(), res, previousDataSet));}}this.verticesInCreationOrder.add(ejv);this.numVerticesTotal += ejv.getParallelism();}//将所有的执行顶点和结果分区注册到分布式资源管理系统中,以便能够进行分布式调度。
registerExecutionVerticesAndResultPartitions(this.verticesInCreationOrder);// the topology assigning should happen before notifying new vertices to failoverStrategy// 转换执行拓扑executionTopology = DefaultExecutionTopology.fromExecutionGraph(this);partitionGroupReleaseStrategy =// 创建部分组释放策略的方法,依赖于当前的调度的拓扑结构,这决定了当何时释放特定的中间数据结果所需的策略。
partitionGroupReleaseStrategyFactory.createInstance(getSchedulingTopology());
}
在上面attchGraph
方法中,首先遍历输入的排序后的JobVertex列表,对每一个JobVertex:
- 判断是否停止: 对于单个 JobVertex,如果它是一个输入顶点且不可停止,则整个 Job 不可停止。这在流处理任务中是常见的,一些输入数据源可能无法停止(如Kafka)。
- 获取并行信息并创建执行的顶点: 根据JobVertex的ID,从parallelismStore中获取并行信息。利用这些信息创建ExecutionJobVertex实例,它代表运行在特定TaskManager上的taskId,可以是待调度、运行或已完成的。
- 判断新添加的顶点是否已经存在: 如果试图添加一个已经存在的顶点,这意味着存在程序错误,因为每个JobVertex应当有唯一的ID。这将抛出异常。
- 判断数据集是否已经存在: 同样。如果试图添加一个已经存在的IntermediateResult,这将抛出异常。
- 添加执行顶点到创建顺序列表和增加总的顶点数量: 记录创建顶点的顺序能够确保在执行时能够按照正确的依赖关系进行。并同时更新总的顶点数量。
遍历完成后, 注册执行顶点和结果分区,将所有的执行顶点和结果分区注册到分布式资源管理系统中,以便能够进行分布式调度。
利用DefaultExecutionTopology
工具类将ExecutionGraph
转换为SchedulingTopology
,这样便于任务调度器进行处理。
最后,调用partitionGroupReleaseStrategyFactory.createInstance(getSchedulingTopology())
根据当前的调度的拓扑结构来创建组释放策略,这决定了当何时释放特定的中间数据结果所需的策略。
上面流程中,最需要关注的方法就是new ExecutionJobVertex
和ejv.connectToPredecessors(this.intermediateResults);
接下来,我们分别对其进行探究。
创建 ExecutionJobVertex 对象
进入到该方法的源码中:
@VisibleForTesting
public ExecutionJobVertex(InternalExecutionGraphAccessor graph,JobVertex jobVertex,int maxPriorAttemptsHistoryLength,Time timeout,long createTimestamp,VertexParallelismInformation parallelismInfo,SubtaskAttemptNumberStore initialAttemptCounts)throws JobException {if (graph == null || jobVertex == null) {throw new NullPointerException();}this.graph = graph;this.jobVertex = jobVertex;this.parallelismInfo = parallelismInfo;// verify that our parallelism is not higher than the maximum parallelismif (this.parallelismInfo.getParallelism() > this.parallelismInfo.getMaxParallelism()) {throw new JobException(String.format("Vertex %s's parallelism (%s) is higher than the max parallelism (%s). Please lower the parallelism or increase the max parallelism.",jobVertex.getName(),this.parallelismInfo.getParallelism(),this.parallelismInfo.getMaxParallelism()));}this.resourceProfile =ResourceProfile.fromResourceSpec(jobVertex.getMinResources(), MemorySize.ZERO);this.taskVertices = new ExecutionVertex[this.parallelismInfo.getParallelism()];this.inputs = new ArrayList<>(jobVertex.getInputs().size());// take the sharing groupthis.slotSharingGroup = checkNotNull(jobVertex.getSlotSharingGroup());this.coLocationGroup = jobVertex.getCoLocationGroup();// create the intermediate resultsthis.producedDataSets =new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];for (int i = 0; i < jobVertex.getProducedDataSets().size(); i++) {final IntermediateDataSet result = jobVertex.getProducedDataSets().get(i);this.producedDataSets[i] =new IntermediateResult(result.getId(),this,this.parallelismInfo.getParallelism(),result.getResultType());}// create all task verticesfor (int i = 0; i < this.parallelismInfo.getParallelism(); i++) {ExecutionVertex vertex =new ExecutionVertex(this,i,producedDataSets,timeout,createTimestamp,maxPriorAttemptsHistoryLength,initialAttemptCounts.getAttemptCount(i));this.taskVertices[i] = vertex;}// sanity check for the double referencing between intermediate result partitions and// execution verticesfor (IntermediateResult ir : this.producedDataSets) {if (ir.getNumberOfAssignedPartitions() != this.parallelismInfo.getParallelism()) {throw new RuntimeException("The intermediate result's partitions were not correctly assigned.");}}final List<SerializedValue<OperatorCoordinator.Provider>> coordinatorProviders =getJobVertex().getOperatorCoordinators();if (coordinatorProviders.isEmpty()) {this.operatorCoordinators = Collections.emptyList();} else {final ArrayList<OperatorCoordinatorHolder> coordinators =new ArrayList<>(coordinatorProviders.size());try {for (final SerializedValue<OperatorCoordinator.Provider> provider :coordinatorProviders) {coordinators.add(OperatorCoordinatorHolder.create(provider, this, graph.getUserClassLoader()));}} catch (Exception | LinkageError e) {IOUtils.closeAllQuietly(coordinators);throw new JobException("Cannot instantiate the coordinator for operator " + getName(), e);}this.operatorCoordinators = Collections.unmodifiableList(coordinators);}// set up the input splits, if the vertex has anytry {@SuppressWarnings("unchecked")InputSplitSource<InputSplit> splitSource =(InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();if (splitSource != null) {Thread currentThread = Thread.currentThread();ClassLoader oldContextClassLoader = currentThread.getContextClassLoader();currentThread.setContextClassLoader(graph.getUserClassLoader());try {inputSplits =splitSource.createInputSplits(this.parallelismInfo.getParallelism());if (inputSplits != null) {splitAssigner = splitSource.getInputSplitAssigner(inputSplits);}} finally {currentThread.setContextClassLoader(oldContextClassLoader);}} else {inputSplits = null;}} catch (Throwable t) {throw new JobException("Creating the input splits caused an error: " + t.getMessage(), t);}
}
在上面这段代码中,主要实现了ExecutionVertex
的创建和IntermediateResult
对象的创建:
- 遍历当前JobVertex的输出
IntermediateDataSet
列表,并根据IntermediateDataSet
来创建相应的IntermediateResult
对象。每个IntermediateDataSet
都会对应一个IntermediateResult
。 - 根据当前JobVertex的并发度,来创建相同数量的
ExecutionVertex
对象,每个ExecutionVertex
对象代表一个并行计算任务,在实际执行时就是一个Task任务。
创建ExecutionVertex对象
进一步地,我们观察创建ExecutionVertex
对象的实现逻辑如下所示:
public ExecutionVertex(ExecutionJobVertex jobVertex,int subTaskIndex,IntermediateResult[] producedDataSets,Time timeout,long createTimestamp,int maxPriorExecutionHistoryLength,int initialAttemptCount) {this.jobVertex = jobVertex;this.subTaskIndex = subTaskIndex;this.executionVertexId = new ExecutionVertexID(jobVertex.getJobVertexId(), subTaskIndex);this.taskNameWithSubtask =String.format("%s (%d/%d)",jobVertex.getJobVertex().getName(),subTaskIndex + 1,jobVertex.getParallelism());this.resultPartitions = new LinkedHashMap<>(producedDataSets.length, 1);// 根据IntermediateResult创建当前subTaskIndex分区下的IntermediateResultPartitonfor (IntermediateResult result : producedDataSets) {IntermediateResultPartition irp =new IntermediateResultPartition(result,this,subTaskIndex,getExecutionGraphAccessor().getEdgeManager());// 记录当前分区的irp到ir中result.setPartition(subTaskIndex, irp);// 记录分区ip与irp的对应关系resultPartitions.put(irp.getPartitionId(), irp);}this.priorExecutions = new EvictingBoundedList<>(maxPriorExecutionHistoryLength);// 创建对应的Execution对象,初始化时initialAttempCount为0,如果后面重新调度这个task,它会自增加1this.currentExecution =new Execution(getExecutionGraphAccessor().getFutureExecutor(),this,initialAttemptCount,createTimestamp,timeout);getExecutionGraphAccessor().registerExecution(currentExecution);this.timeout = timeout;this.inputSplits = new ArrayList<>();
}
上述创建ExecutionVertex
的过程主要实现了以下步骤:
- 生成中间结果分区IntermediateResultPartition
中间结果分区代表一个并行任务产生的输出,同一并行任务可能会有多个输出(对应多个后续任务),也就是多个中间结果分区。
- 基于 result,在相应的索引 subTaskIndex 上创建一个 IntermediateResultPartition 并给它赋值。IntermediateResultPartition 提供了并行任务的输出数据,对应于某个特定执行顶点 ExecutionVertex 的并行子任务。
- 在创建过程中,需要使用 getExecutionGraphAccessor().getEdgeManager() 获取边管理器,边管理器是用于维护这个分区与其它 ExecutionVertex 之间的连接关系。
- 记录这个 IntermediateResultPartition 到 result 中的相应索引位置,并在 resultPartitions 映射表中保存 IntermediateResultPartition。
- 创建执行(Execution)对象:
这一过程是基于 Execution 的构造函数引发的。它用于代表该 ExecutionVertex 在某一特定点时间的一次尝试执行。创建 Execution 实例后,会将其注册到执行图(ExecutionGraph)中,以便于后续调度和执行任务。
通过以上流程,生成了中间结果分区,映射了每一个分区和其对应的任务关系,并且创建了 Execution 对象用于管理并跟踪任务的执行状态。
在创建好ExecutionVertex和IntermediateResultPartition后,根据上面的流程图,就是考虑如何将它们进行连接生成ExecutionGraph了。
这部分的实现逻辑就在attachJobGraph
方法的ejv.connectToPredecessors(this.intermediateResults);
方法中实现的。
生成ExecutionGraph
同样地,我们进入源码来深入观察一下实现逻辑:
public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets)throws JobException {List<JobEdge> inputs = jobVertex.getInputs();if (LOG.isDebugEnabled()) {LOG.debug(String.format("Connecting ExecutionJobVertex %s (%s) to %d predecessors.",jobVertex.getID(), jobVertex.getName(), inputs.size()));}for (int num = 0; num < inputs.size(); num++) {JobEdge edge = inputs.get(num);if (LOG.isDebugEnabled()) {if (edge.getSource() == null) {LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via ID %s.",num,jobVertex.getID(),jobVertex.getName(),edge.getSourceId()));} else {LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via predecessor %s (%s).",num,jobVertex.getID(),jobVertex.getName(),edge.getSource().getProducer().getID(),edge.getSource().getProducer().getName()));}}// fetch the intermediate result via ID. if it does not exist, then it either has not// been created, or the order// in which this method is called for the job vertices is not a topological orderIntermediateResult ires = intermediateDataSets.get(edge.getSourceId());if (ires == null) {throw new JobException("Cannot connect this job graph to the previous graph. No previous intermediate result found for ID "+ edge.getSourceId());}this.inputs.add(ires);EdgeManagerBuildUtil.connectVertexToResult(this, ires, edge.getDistributionPattern());}
}
这段代码主要完成了将当前的ExecutionJobVertex与其前置任务(predecessors)连接的流程。传入的参数intermediateDatasets包含了JobGraph中所有的中间计算结果,这些结果是由上游前置任务产生的。
需要注意的是,该过程要求连接操作的执行顺序应遵循任务的拓扑顺序。Flink的计算任务通常由多个阶段组成,每个阶段的输出是下一个阶段的输入,每个阶段(JobVertex)都处理一种类型的计算,例如map或reduce。
流程大致如下:
- 获取输入: 首先获取jobVertex的输入,输入是JobEdge列表,每一条JobEdge都代表一个上游产生的中间数据集和连接上下游的方式(例如HASH, BROADCAST)。
- 循环处理每个输入: 然后遍历这些inputs,对于每一条JobEdge:
- 基于edge.getSourceId()从intermediateDatasets获取IntermediateResult,这是一个中间计算结果。
- 检查该中间结果是否存在,如果不存在,则表示这不是一个拓扑排序,因为预期的情况是当你尝试访问一个中间结果时,它应该已经被创建了。如果找不到,抛出一个异常。
- 如果存在(没有异常被抛出),将找到的IntermediateResult添加到ExecutionJobVertex的inputs(List类型)中,这样当前任务就知道它的输入来自哪些中间结果。
- 调用EdgeManagerBuildUtil.connectVertexToResult方法来建立当前ExecutionJobVertex与找到的IntermediateResult之间的连接。 EdgeManager是Flink中负责管理输入输出边的组件,它显示地记录了发送端的分区和接收端的分区对应关系。
这个流程重要的是建立了Job中每个任务的执行依赖关系,并明确了数据传输的方式,让任务在执行时清楚自己的输入来自哪里,当任务执行完成后,它产生的输出会通过何种方式被发送到哪些任务。
具体的连接方式,我们需要继续进入到EdgeManagerBuildUtil.connectVertexToResult
方法中。其源码如下所示:
/*** Calculate the connections between {@link ExecutionJobVertex} and {@link IntermediateResult} ** based on the {@link DistributionPattern}.** @param vertex the downstream consumer {@link ExecutionJobVertex}* @param intermediateResult the upstream consumed {@link IntermediateResult}* @param distributionPattern the {@link DistributionPattern} of the edge that connects the* upstream {@link IntermediateResult} and the downstream {@link IntermediateResult}*/
static void connectVertexToResult(ExecutionJobVertex vertex,IntermediateResult intermediateResult,DistributionPattern distributionPattern) {switch (distributionPattern) {// 点对点的连接方式case POINTWISE:connectPointwise(vertex.getTaskVertices(), intermediateResult);break;// 全连接的方式case ALL_TO_ALL:connectAllToAll(vertex.getTaskVertices(), intermediateResult);break;default:throw new IllegalArgumentException("Unrecognized distribution pattern.");}
}
会根据DistributionPattern
选择不同的连接方式,这里主要分两种情况,DistributionPattern
是跟Partitioner
的配置有关。
这里以POINTWISE
的连接方式来举例,看一下其是如何在构造ExecutionGraph时连接上下游节点的。
private static void connectPointwise(ExecutionVertex[] taskVertices, IntermediateResult intermediateResult) {final int sourceCount = intermediateResult.getPartitions().length;final int targetCount = taskVertices.length;if (sourceCount == targetCount) {for (int i = 0; i < sourceCount; i++) {ExecutionVertex executionVertex = taskVertices[i];IntermediateResultPartition partition = intermediateResult.getPartitions()[i];ConsumerVertexGroup consumerVertexGroup =ConsumerVertexGroup.fromSingleVertex(executionVertex.getID());partition.addConsumers(consumerVertexGroup);ConsumedPartitionGroup consumedPartitionGroup =createAndRegisterConsumedPartitionGroupToEdgeManager(partition.getPartitionId(), intermediateResult);executionVertex.addConsumedPartitionGroup(consumedPartitionGroup);}} else if (sourceCount > targetCount) {for (int index = 0; index < targetCount; index++) {ExecutionVertex executionVertex = taskVertices[index];ConsumerVertexGroup consumerVertexGroup =ConsumerVertexGroup.fromSingleVertex(executionVertex.getID());int start = index * sourceCount / targetCount;int end = (index + 1) * sourceCount / targetCount;List<IntermediateResultPartitionID> consumedPartitions =new ArrayList<>(end - start);for (int i = start; i < end; i++) {IntermediateResultPartition partition = intermediateResult.getPartitions()[i];partition.addConsumers(consumerVertexGroup);consumedPartitions.add(partition.getPartitionId());}ConsumedPartitionGroup consumedPartitionGroup =createAndRegisterConsumedPartitionGroupToEdgeManager(consumedPartitions, intermediateResult);executionVertex.addConsumedPartitionGroup(consumedPartitionGroup);}} else {for (int partitionNum = 0; partitionNum < sourceCount; partitionNum++) {IntermediateResultPartition partition =intermediateResult.getPartitions()[partitionNum];ConsumedPartitionGroup consumedPartitionGroup =createAndRegisterConsumedPartitionGroupToEdgeManager(partition.getPartitionId(), intermediateResult);int start = (partitionNum * targetCount + sourceCount - 1) / sourceCount;int end = ((partitionNum + 1) * targetCount + sourceCount - 1) / sourceCount;List<ExecutionVertexID> consumers = new ArrayList<>(end - start);for (int i = start; i < end; i++) {ExecutionVertex executionVertex = taskVertices[i];executionVertex.addConsumedPartitionGroup(consumedPartitionGroup);consumers.add(executionVertex.getID());}ConsumerVertexGroup consumerVertexGroup =ConsumerVertexGroup.fromMultipleVertices(consumers);partition.addConsumers(consumerVertexGroup);}}
}
上面这段代码的目的是通过“点对点”的方式(即每个生产者产生的数据只被一个消费者消费)建立任务节点(ExecutionVertex)与中间结果集(IntermediateResultPartition)之间的连接关系。
这个方法的逻辑主要是根据上游任务产生的IntermediateResultPartition的数量(源)和下游ExecutionVertex节点数量(目标)的比例关系,做不同的操作:
- 源和目标数量相等:方法会将每个源中间结果分区与对应的下游ExecutionVertex节点连接。这种情况下,每个任务都完全独立,只会消费一个特定的上游中间结果分区
- 源数量大于目标数量:源中间结果分区会被尽可能平均地分配给下游ExecutionVertex节点,即每个ExecutionVertex可能会消费多个源中间结果分区数据。
- 源数量小于目标数量:每个源中间结果分区可能会被分配给多个下游ExecutionVertex节点消费,即多个ExecutionVertex节点可能消费同一个源中间结果分区数据。
⠀在执行连接的过程中,会创建ConsumerVertexGroup和ConsumedPartitionGroup:
- ConsumerVertexGroup包含一组接收同一个中间结果分区(IntermediateResultPartition)的顶点集合。
- ConsumedPartitionGroup包含顶点要消费的一组中间结果分区。
⠀注意,当源数量小于目标数量时,会有多个任务消费同一个源数据,所以需要使用ConsumerVertexGroup.fromMultipleVertices(consumers)来创建ConsumerVertexGroup。
几种连接情况的示例图如下所示:
到这里,这个作业的 ExecutionGraph 就创建完成了,有了 ExecutionGraph,JobManager 才能对这个作业做相应的调度。
总结
本文详细介绍了JobGraph生成ExecutionGraph的流程,介绍了ExecutionJobVertex、ExecutionVertex、IntermediateResult、IntermediateResultPartition相关概念的原理和生成过程。最后我们介绍了Flink在生成ExecutionGraph时是如何实现IntermediateResultPartition和ExecutionVertex的连接的。
到这里,StreamGraph、JobGraph和Execution的生成过程,在最近的三篇文章中都已经详细讲解完成了,当然除了我们介绍的内容外,还有更多的实现细节没有介绍,有兴趣的读者可以参考文本来阅读源码,以此加深自己的理解和对更多实现细节的挖掘。
最后,再对StreamGraph、JobGraph和ExecutionGraph做一个总结:
- StreamGraph. StreamGraph 是表示 Flink 流计算的图模型,它是用户定义的计算逻辑的内部表示形式,是最原始的用户逻辑,一个没有做任何优化的DataFlow;
- JobGraph. JobGraph 由一个或多个 JobVertex 对象和它们之间的 JobEdge 对象组成,包含并行任务的信息。在JobGraph中对StreamGraph进行了优化,将能够合并在同个算子链中的操作符进行合并,以此减少任务执行时的上下文切换,提任务执行性能。
- ExecutionGraph. ExecutionGraph是 JobGraph 的并发执行版本,由 ExecutionVertex 和 IntermediateResultPartition 组成。每个 JobVertex 会被转换为一个或多个 ExecutionVertex,ExecutorGraph 包含了每个任务的全部实例,包含它们的状态、位置、输入输出结果。ExecutionGraph 是 Flink 中最核心的部分,它用于任务的调度、失败恢复等。
参考:
https://matt33.com/2019/12/20/flink-execution-graph-4/
相关文章:

Flink源码解析之:如何根据JobGraph生成ExecutionGraph
Flink源码解析之:如何根据JobGraph生成ExecutionGraph 在上一篇Flink源码解析中,我们介绍了Flink如何根据StreamGraph生成JobGraph的流程,并着重分析了其算子链的合并过程和JobGraph的构造流程。 对于StreamGraph和JobGraph的生成来说&…...
UE(虚幻)学习(三) UnrealSharp插件中调用非托管DLL
上一篇文章中我使用UnrealSharp成功使用了我的一个C#控制台程序中的网络模块,这个程序是基于KCP网络了,其中调用了Cmake 编译的一个C的DLL,在虚幻中DLL需要放在Binaries目录中才可以。Unity中只要放在任意Plugins目录中就可以。 但是Binaries…...
leetcode 3219. 切蛋糕的最小总开销 II
题目:3219. 切蛋糕的最小总开销 II - 力扣(LeetCode) 排序贪心。 开销越大的越早切。 注意m或n为1的情况。 class Solution { public:long long minimumCost(int m, int n, vector<int>& horizontalCut, vector<int>&…...
vant 地址记录
vant ui 的官网地址记录 vant 4 Vant 4 - A lightweight, customizable Vue UI library for mobile web apps. vant2 https://vant-ui.github.io/vant/v2/ vant3 Vant 3 - Lightweight Mobile UI Components built on Vue...

Lua语言入门 - Lua常量
在Lua中,虽然没有直接的常量关键字(如C中的const),但你可以通过一些编程技巧和约定来实现类似常量的行为。以下是几种常见的方法: 1. 使用全局变量并命名规范 你可以定义一个全局变量,并通过命名约定来表示…...
在Microsoft Windows上安装MySQL
MySQL仅适用于Microsoft Windows 64位操作系统,在Microsoft Windows上安装MySQL有不同的方法:MSI、包含您解压缩的所有必要文件的标准二进制版本(打包为压缩文件)以及自己编译MySQL源文件。 注意:MySQL8.4服务器需要在…...

windows下vscode使用msvc编译器出现中文乱码
文章目录 [toc]1、概述2、修改已创建文件编码3、修改vscode默认编码 更多精彩内容👉内容导航 👈👉C 👈👉开发工具 👈 1、概述 在使用MSVC编译器时,出现中文报错的问题可能与编码格式有关。UTF-…...
Git 解决 everything up-to-date
首先使用git log查看历史提交,找到最新一次提交,比如: PS D:\Unity Projects\CoffeeHouse\CoffeeHouse_BurstDebugInformation_DoNotShip> git log commit a1b54c309ade7c07c3981d3ed748b0ffac2759a3 (HEAD -> master, origin/master)…...

Windows配置cuda,并安装配置Pytorch-GPU版本
文章目录 1. CUDA Toolkit安装2. 安装cuDNN3. 添加环境变量配置Pytorch GPU版本 博主的电脑是Windows11,在安装cuda之前,请先查看pytorch支持的版本,cuda可以向下兼容,但是pytorch不行,请先进入:https://py…...
Neo4j 图数据库安装与操作指南(以mac为例)
目录 一、安装前提条件 1.1 Java环境 1.2 Homebrew(可选) 二、下载并安装Neo4j 2.1 从官方网站下载 2.1.1 访问Neo4j的官方网站 2.1.2 使用Homebrew安装 三、配置Neo4j 3.1 设置环境变量(可选) 3.2 打开配置文件(bash_profile) 3.2.1 打开终端…...
2024年12月个人工作生活总结
本文为 2024年12月工作生活总结。 研发编码 Golang语言byte数组赋值 假定有如下变量: var strCode string var bCode [9]byte现需将string类型转换成byte类型,如下: bCode []byte(strCode)无法转换,提示: cannot…...

PHP:IntelliJ IDEA 配置 PHP 开发环境及导入PHP项目
在创建PHP项目之前我们需要安装PHP插件,安装步骤如下:Windows:IntelliJ IDEA Ultimate 安装 PHP 插件-CSDN博客 1、导入已有PHP项目,导入之后选择,File > Setting 选择对应CLL Interpreter,如果没有操作…...

【嵌入式C语言】指针数组结构体
指针与数组 指针与数组指针数组数组指针 多维数组数组名的保存 结构体定义结构体定义结构体变量使用typedef简化结构体声明访问结构体成员结构体内存分配字节对齐位域定义位域位域的限制示例 指针与数组 指针数组和数组指针是两个不同的概念,它们涉及到指针和数组的…...

国产数据库TiDB从入门到放弃教程
国家层面战略,安全的角度,硬件、软件国产化是趋势,鸿蒙电脑操作系统、鸿蒙手机操作系统…数据库也会慢慢国产化,国产数据库TiDB用起来比OceanBase丝滑,本身没有那么重。 从入门到放弃 1. 介绍1.1 TiDB 的主要特点1.2 T…...
深入解析 Spring 属性:spring.codec.max-in-memory-size
在现代 Web 应用开发中,数据传输的大小和效率直接影响到系统的性能和稳定性。Spring WebFlux 作为一种响应式编程框架,提供了强大的数据流处理能力。在使用 WebFlux 时,spring.codec.max-in-memory-size 是一个关键配置,用于定义应…...
在K8S中,如何查看Pod状态的详情?事件显示cpu不足如何处理?
在Kubernetes中,查看Pod状态的详细通常设计使用kubectl命令行工具,这是kubernetes提供的一个强大的管理工具。以下是如何查看Pod状态详情的步骤: 1. 查看Pod状态详情 列出所有Pod: 使用kubectl get pods命令可以查看集群所有Po…...

ArcGIS教程(009):ArcGIS制作校园3D展示图
文章目录 数据下载校园3D展示图制作创建要素类矢量化【楼】要素矢量化【绿地】矢量化【范围】矢量化处理打开ArcScene添加动画数据下载 https://download.csdn.net/download/WwLK123/90189025校园3D展示图制作 创建要素类 添加底图: 新建【文件地理数据库】,并修改名称为【…...

REDIS2.0
string list hash set 无序集合 声明一个key,键里面的值是元素,元素的类型是string 元素的值是唯一的,不能重复 多个集合类型之间可以进行并集,交集,集查的运算 sadd test1 a b c c d :添加5个元素&am…...

算法练习——模拟题
前言:模拟题的特点在于没有什么固定的技巧,完全考验自己的代码能力,因此有助于提升自己的代码水平。如果说一定有什么技巧的话,那就是有的模拟题能够通过找规律来简化算法。 一:替换所有问号 题目要求: 解…...

京东供应链创新与实践:应用数据驱动的库存选品和调拨算法提升履约效率
2024 年度总结系列 2024 年 10 月,京东零售供应链技术团队凭借其在库存选品与调拨技术上的创新与实践,荣获运筹与管理学领域的国际顶级奖项 Daniel H. Wagner Prize。本文为您介绍获奖背后的供应链技术创新和落地应用。 00 摘要 在电商行业中&#x…...

linux之kylin系统nginx的安装
一、nginx的作用 1.可做高性能的web服务器 直接处理静态资源(HTML/CSS/图片等),响应速度远超传统服务器类似apache支持高并发连接 2.反向代理服务器 隐藏后端服务器IP地址,提高安全性 3.负载均衡服务器 支持多种策略分发流量…...

树莓派超全系列教程文档--(61)树莓派摄像头高级使用方法
树莓派摄像头高级使用方法 配置通过调谐文件来调整相机行为 使用多个摄像头安装 libcam 和 rpicam-apps依赖关系开发包 文章来源: http://raspberry.dns8844.cn/documentation 原文网址 配置 大多数用例自动工作,无需更改相机配置。但是,一…...

【Oracle APEX开发小技巧12】
有如下需求: 有一个问题反馈页面,要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据,方便管理员及时处理反馈。 我的方法:直接将逻辑写在SQL中,这样可以直接在页面展示 完整代码: SELECTSF.FE…...

centos 7 部署awstats 网站访问检测
一、基础环境准备(两种安装方式都要做) bash # 安装必要依赖 yum install -y httpd perl mod_perl perl-Time-HiRes perl-DateTime systemctl enable httpd # 设置 Apache 开机自启 systemctl start httpd # 启动 Apache二、安装 AWStats࿰…...

dedecms 织梦自定义表单留言增加ajax验证码功能
增加ajax功能模块,用户不点击提交按钮,只要输入框失去焦点,就会提前提示验证码是否正确。 一,模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...
Axios请求超时重发机制
Axios 超时重新请求实现方案 在 Axios 中实现超时重新请求可以通过以下几种方式: 1. 使用拦截器实现自动重试 import axios from axios;// 创建axios实例 const instance axios.create();// 设置超时时间 instance.defaults.timeout 5000;// 最大重试次数 cons…...
基于matlab策略迭代和值迭代法的动态规划
经典的基于策略迭代和值迭代法的动态规划matlab代码,实现机器人的最优运输 Dynamic-Programming-master/Environment.pdf , 104724 Dynamic-Programming-master/README.md , 506 Dynamic-Programming-master/generalizedPolicyIteration.m , 1970 Dynamic-Programm…...
怎么让Comfyui导出的图像不包含工作流信息,
为了数据安全,让Comfyui导出的图像不包含工作流信息,导出的图像就不会拖到comfyui中加载出来工作流。 ComfyUI的目录下node.py 直接移除 pnginfo(推荐) 在 save_images 方法中,删除或注释掉所有与 metadata …...

数学建模-滑翔伞伞翼面积的设计,运动状态计算和优化 !
我们考虑滑翔伞的伞翼面积设计问题以及运动状态描述。滑翔伞的性能主要取决于伞翼面积、气动特性以及飞行员的重量。我们的目标是建立数学模型来描述滑翔伞的运动状态,并优化伞翼面积的设计。 一、问题分析 滑翔伞在飞行过程中受到重力、升力和阻力的作用。升力和阻力与伞翼面…...

Sklearn 机器学习 缺失值处理 获取填充失值的统计值
💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 使用 Scikit-learn 处理缺失值并提取填充统计信息的完整指南 在机器学习项目中,数据清…...