【Flink状态管理(八)】Checkpoint:CheckpointBarrier对齐后Checkpoint的完成、通知与对学习状态管理源码的思考
文章目录
- 一. 调用StreamTask执行Checkpoint操作
- 1. 执行Checkpoint总体代码流程
- 1.1. StreamTask.checkpointState()
- 1.2. executeCheckpointing
- 1.3. 将算子中的状态快照操作封装在OperatorSnapshotFutures中
- 1.4. 算子状态进行快照
- 1.5. 状态数据快照持久化
- 二. CheckpointCoordinator管理Checkpoint
- 1. Checkpoint执行完毕后的确认过程
- 2. 触发并完成Checkpoint操作
- 3. 通知CheckpointComplete给TaskExecutor
- 三. 状态管理学习小结
上文介绍了CheckpointBarrier的对齐操作,当CheckpointBarrier完成对齐操作后,接下来就是通过notifyCheckpoint()方法触发StreamTask节点的Checkpoint操作。
一. 调用StreamTask执行Checkpoint操作
如下代码,notifyCheckpoint()方法主要包含如下逻辑。
> 1. 判断toNotifyOnCheckpoint不为空。
> 2. 创建CheckpointMetaData和CheckpointMetrics实例,CheckpointMetaData用于存储
> Checkpoint的元信息,CheckpointMetrics用于记录和监控Checkpoint监控指标。
> 3. 触发StreamTask中算子的Checkpoint操作。
protected void notifyCheckpoint(CheckpointBarrier checkpointBarrier, long bufferedBytes, long alignmentDurationNanos) throws Exception {if (toNotifyOnCheckpoint != null) {// 创建CheckpointMetaData对象用于存储Meta信息CheckpointMetaData checkpointMetaData =new CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());// 创建CheckpointMetrics对象用于记录监控指标CheckpointMetrics checkpointMetrics = new CheckpointMetrics().setBytesBufferedInAlignment(bufferedBytes).setAlignmentDurationNanos(alignmentDurationNanos);// 调用toNotifyOnCheckpoint.triggerCheckpointOnBarrier()方法触发Checkpoint操作toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData,checkpointBarrier.getCheckpointOptions(),checkpointMetrics);}
}
注意:StreamTask是唯一实现了Checkpoint方法的子类,即只有StreamTask才能触发当前Task实例中的Checkpoint操作。
接下来具体看Checkpoint执行细节
1. 执行Checkpoint总体代码流程
Checkpoint触发过程分为两种情况:一种是CheckpointCoordinator周期性地触发数据源节点中的Checkpoint操作;另一种是下游算子通过对齐CheckpointBarrier事件触发本节点算子的Checkpoint操作。
不管是哪种方式触发Checkpoint,最终都是调用StreamTask.performCheckpoint()方法实现StreamTask实例中状态数据的持久化操作。
在StreamTask.performCheckpoint()方法中,首先判断当前的Task是否运行正常,然后使用actionExecutor线程池执行Checkpoint操作,Checkpoint的实际执行过程如下。
- Checkpoint执行前的准备操作,让OperatorChain中所有的Operator执行Pre-barrier工作。
- 将CheckpointBarrier事件发送到下游的节点中。
- 算子状态数据进行快照
执行checkpointState()方法,对StreamTask中OperatorChain的所有算子进行状态数据的快照操作,该过程为异步非阻塞过程,不影响数据的正常处理进程,执行完成后会返回True到CheckpointInputGate中。
- task挂掉情况处理:
- 如果isRunning的条件为false,表明Task不在运行状态,此时需要给OperatorChain中的所有算子发送CancelCheckpointMarker消息,这里主要借助recordWriter.broadcastEvent(message)方法向下游算子进行事件广播。
- 当且仅当OperatorChain中的算子还没有执行完Checkpoint操作的时候,下游的算子接收到CancelCheckpointMarker消息后会立即取消Checkpoint操作。
private boolean performCheckpoint(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetrics checkpointMetrics,boolean advanceToEndOfTime) throws Exception {LOG.debug("Starting checkpoint ({}) {} on task {}",checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());final long checkpointId = checkpointMetaData.getCheckpointId();if (isRunning) {// 使用actionExecutor执行Checkpoint逻辑actionExecutor.runThrowing(() -> {if (checkpointOptions.getCheckpointType().isSynchronous()) {setSynchronousSavepointId(checkpointId);if (advanceToEndOfTime) {advanceToEndOfEventTime();}}//Checkpoint操作的准备工作operatorChain.prepareSnapshotPreBarrier(checkpointId);//将checkpoint barrier发送到下游的stream中operatorChain.broadcastCheckpointBarrier(checkpointId,checkpointMetaData.getTimestamp(),checkpointOptions);//对算子中的状态进行快照操作,此步骤是异步操作,//不影响streaming拓扑中数据的正常处理checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);});return true;} else {// 如果Task处于其他状态,则向下游广播CancelCheckpointMarker消息actionExecutor.runThrowing(() -> {final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());recordWriter.broadcastEvent(message);});return false;}
}
1.1. StreamTask.checkpointState()
接下来我们看StreamTask.checkpointState()方法的具体实现,如下代码。
- 创建CheckpointStateOutputStream实例。主要有如下两种实现类:
- FsCheckpointStateOutputStream:文件类型系统
- MemoryCheckpointOutputStream:内存的数据流输出。
- 创建CheckpointingOperation实例,CheckpointingOperation封装了Checkpoint执行的具体操作流程,以及checkpointMetaData、checkpointOptions、storage和checkpointMetrics等Checkpoint执行过程中需要的环境配置信息。
- 调用CheckpointingOperation.executeCheckpointing()方法执行Checkpoint操作。
private void checkpointState(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetrics checkpointMetrics) throws Exception {// 创建CheckpointStreamFactory实例CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(checkpointMetaData.getCheckpointId(),checkpointOptions.getTargetLocation());// 创建CheckpointingOperation实例CheckpointingOperation checkpointingOperation = new CheckpointingOperation(this,checkpointMetaData,checkpointOptions,storage,checkpointMetrics);// 执行Checkpoint操作checkpointingOperation.executeCheckpointing();
}
1.2. executeCheckpointing
如代码所示,CheckpointingOperation.executeCheckpointing()方法主要包含如下逻辑。
- 遍历所有StreamOperator算子,然后调用checkpointStreamOperator()方法为每个算子创建OperatorSnapshotFuture对象。这一步将所有算子的快照操作存储在OperatorSnapshotFutures集合中。
- 将OperatorSnapshotFutures存储到operatorSnapshotsInProgress的键值对集合中,其中Key为OperatorID,Value为该算子执行状态快照操作对应的OperatorSnapshotFutures对象
- 创建AsyncCheckpointRunnable线程对象,AsyncCheckpointRunnable实例中包含了创建好的OperatorSnapshotFutures集合。
- 调用StreamTask.asyncOperationsThreadPool线程池运行asyncCheckpointRunnable线程,执行operatorSnapshotsInProgress集合中算子的异步快照操作。
public void executeCheckpointing() throws Exception {//通过算子创建执行快照操作的OperatorSnapshotFutures对象for (StreamOperator<?> op : allOperators) {checkpointStreamOperator(op);}// 此处省略部分代码startAsyncPartNano = System.nanoTime();checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(owner,operatorSnapshotsInProgress,checkpointMetaData,checkpointMetrics,startAsyncPartNano);// 注册Closeable操作owner.cancelables.registerCloseable(asyncCheckpointRunnable);// 执行asyncCheckpointRunnableowner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);}
1.3. 将算子中的状态快照操作封装在OperatorSnapshotFutures中
如下代码,AbstractStreamOperator.snapshotState()方法将当前算子的状态快照操作封装在OperatorSnapshotFutures对象中,然后通过asyncOperationsThreadPool线程池异步触发所有的OperatorSnapshotFutures操作,方法主要步骤如下。
- 创建OperatorSnapshotFutures对象,封装当前算子对应的状态快照操作。
- 创建snapshotContext上下文对象,存储快照过程需要的上下文信息,并调用snapshotState()方法执行快照操作。
snapshotState()方法由StreamOperator子类实现,例如在AbstractUdfStreamOperator中会调用StreamingFunctionUtils.snapshotFunctionState(context,getOperatorStateBackend(),
userFunction)方法执行函数中的状态快照操作。
- 向snapshotInProgress中指定KeyedStateRawFuture和OperatorStateRawFuture,专门用于处理原生状态数据的快照操作。
- 如果operatorStateBackend不为空,则将operatorStateBackend.snapshot()方法块设定到OperatorStateManagedFuture中,并注册到snapshotInProgress中等待执行。
- 如果keyedStateBackend不为空,则将keyedStateBackend.snapshot()方法块设定到KeyedStateManagedFuture中,并注册到snapshotInProgress中等待执行。
- 返回创建的snapshotInProgress异步Future对象,snapshotInProgress中封装了当前算子需要执行的所有快照操作。
public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,CheckpointStreamFactory factory) throws Exception {// 获取KeyGroupRangeKeyGroupRange keyGroupRange = null != keyedStateBackend ?keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;// 创建OperatorSnapshotFutures处理对象OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();// 创建snapshotContext上下文对象StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(checkpointId,timestamp,factory,keyGroupRange,getContainingTask().getCancelables());try {snapshotState(snapshotContext);// 设定KeyedStateRawFuture和OperatorStateRawFuturesnapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());// 如果operatorStateBackend不为空,设定OperatorStateManagedFutureif (null != operatorStateBackend) {snapshotInProgress.setOperatorStateManagedFuture(operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));}// 如果keyedStateBackend不为空,设定KeyedStateManagedFutureif (null != keyedStateBackend) {snapshotInProgress.setKeyedStateManagedFuture(keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));}} catch (Exception snapshotException) {// 此处省略部分代码}return snapshotInProgress;
}
这里可以看出,原生状态和管理状态的RunnableFuture对象会有所不同
- RawState主要通过从snapshotContext中获取的RawFuture对象 管理状态的快照操作
- ManagedState主要通过operatorStateBackend和keyedStateBackend进行状态的管理,并根据StateBackend的不同实现将状态数据写入内存或外部文件系统中。
1.4. 算子状态进行快照
我们知道所有的状态快照操作都会被封装到OperatorStateManagedFuture对象中,最终通过AsyncCheckpointRunnable线程触发执行。
下面我们看AsyncCheckpointRunnable线程的定义。如代码所示,AsyncCheckpointRunnable.run()方法主要逻辑如下。
- 调用FileSystemSafetyNet.initializeSafetyNetForThread()方法为当前线程初始化文件系统安全网,确保数据能够正常写入。
- 创建TaskStateSnapshot实例:
创建jobManagerTaskOperatorSubtaskStates和localTaskOperatorSubtaskStates对应的TaskStateSnapshot实例,其中jobManagerTaskOperatorSubtaskStates用于存储和记录发送给JobManager的Checkpoint数据,localTaskOperatorSubtaskStates用于存储TaskExecutor本地的状态数据。
- 执行所有状态快照线程操作
遍历operatorSnapshotsInProgress集合,获取OperatorSnapshotFutures并创建OperatorSnapshotFinalizer实例,用于执行所有状态快照线程操作。在OperatorSnapshotFinalizerz中会调用FutureUtils.runIfNotDoneAndGet()方法执行KeyedState和OperatorState的快照操作。
- 从finalizedSnapshots中获取JobManagerOwnedState和TaskLocalState,分别存储在jobManagerTaskOperatorSubtaskStates和localTaskOperatorSubtaskStates集合中。
- 调用checkpointMetrics对象记录Checkpoint执行的时间并汇总到Metric监控系统中。
- 如果AsyncCheckpointState为COMPLETED状态,则调用reportCompletedSnapshotStates()方法向JobManager汇报Checkpoint的执行结果。
- 如果出现其他异常情况,则调用handleExecutionException()方法进行处理。
public void run() {FileSystemSafetyNet.initializeSafetyNetForThread();try {// 创建TaskStateSnapshotTaskStateSnapshot jobManagerTaskOperatorSubtaskStates =new TaskStateSnapshot(operatorSnapshotsInProgress.size());TaskStateSnapshot localTaskOperatorSubtaskStates =new TaskStateSnapshot(operatorSnapshotsInProgress.size());for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry : operatorSnapshotsInProgress.entrySet()) {OperatorID operatorID = entry.getKey();OperatorSnapshotFutures snapshotInProgress = entry.getValue();// 创建OperatorSnapshotFinalizer对象OperatorSnapshotFinalizer finalizedSnapshots =new OperatorSnapshotFinalizer(snapshotInProgress);jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(operatorID,finalizedSnapshots.getJobManagerOwnedState());localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(operatorID,finalizedSnapshots.getTaskLocalState());}final long asyncEndNanos = System.nanoTime();final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000L;checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING,CheckpointingOperation.AsyncCheckpointState.COMPLETED)) {reportCompletedSnapshotStates(jobManagerTaskOperatorSubtaskStates,localTaskOperatorSubtaskStates,asyncDurationMillis);} else {LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.",owner.getName(),checkpointMetaData.getCheckpointId());}} catch (Exception e) {handleExecutionException(e);} finally {owner.cancelables.unregisterCloseable(this);FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();}
}
至此,算子状态数据快照的逻辑基本完成,算子中的托管状态主要借助KeyedStateBackend和OperatorStateBackend管理。
KeyedStateBackend和OperatorStateBackend都实现了SnapshotStrategy接口,提供了状态快照的方法。SnapshotStrategy根据不同类型存储后端,主要有HeapSnapshotStrategy和RocksDBSnapshotStrategy两种类型。
1.5. 状态数据快照持久化
这里我们以HeapSnapshotStrategy为例,介绍在StateBackend中对状态数据进行状态快照持久化操作的步骤。如代码所示,
HeapSnapshotStrategy.processSnapshotMetaInfoForAllStates()方法中定义了对KeyedState以及OperatorState的状态处理逻辑。
- 遍历每个StateSnapshotRestore。
- 调用StateSnapshotRestore.stateSnapshot()方法,此时会创建StateSnapshot对象。
- 将创建的StateSnapshot添加到metaInfoSnapshots和cowStateStableSnapshots集合中,完成堆内存存储类型KvState的快照操作。
private void processSnapshotMetaInfoForAllStates(List metaInfoSnapshots,Map<StateUID, StateSnapshot> cowStateStableSnapshots,Map<StateUID, Integer> stateNamesToId,Map<String, ? extends StateSnapshotRestore> registeredStates,StateMetaInfoSnapshot.BackendStateType stateType) {for (Map.Entry<String, ? extends StateSnapshotRestore> kvState :registeredStates.entrySet()) {final StateUID stateUid = StateUID.of(kvState.getKey(), stateType);stateNamesToId.put(stateUid, stateNamesToId.size());StateSnapshotRestore state = kvState.getValue();if (null != state) {final StateSnapshot stateSnapshot = state.stateSnapshot();metaInfoSnapshots.add(stateSnapshot.getMetaInfoSnapshot());cowStateStableSnapshots.put(stateUid, stateSnapshot);}}
}
二. CheckpointCoordinator管理Checkpoint
1. Checkpoint执行完毕后的确认过程
当StreamTask中所有的算子完成状态数据的快照操作后,Task实例会立即将TaskStateSnapshot消息发送到管理节点的CheckpointCoordinator中,并在CheckpointCoordinator中完成后续的操作。如图所示,Checkpoint执行完毕后的确认过程如下。

- 调用StreamTask.reportCompletedSnapshotStates
当StreamTask中的所有算子都完成快照操作后,会调用StreamTask.reportCompletedSnapshotStates()方法将TaskStateSnapshot等Ack消息发送给TaskStateManager。TaskStateManager封装了CheckpointCoordinatorGateway,因此可以直接和CheckpointCoordinator组件进行RPC通信。
- 消息传递
- 将消息传递给CheckpointCoordinatorGateway
TaskStateManager通过CheckpointResponder.acknowledgeCheckpoint()方法将acknowledgedTaskStateSnapshot消息传递给CheckpointCoordinatorGateway接口实现者,实际上就是JobMasterRPC服务。- 消息传递给CheckpointCoordinator
JobMaster接收到RpcCheckpointResponder返回的Ack消息后,会调用SchedulerNG.acknowledgeCheckpoint()方法将消息传递给调度器。调度器会将Ack消息封装成AcknowledgeCheckpoint,传递给CheckpointCoordinator组件继续处理。
- 管理PendingCheckpoint
当CheckpointCoordinator接收到AcknowledgeCheckpoint后,会从pendingCheckpoints集合中获取对应的PendingCheckpoint,然后判断当前Checkpoint中是否收到AcknowledgedTasks集合所有的Task实例发送的Ack确认消息。
如果notYetAcknowledgedTasks为空,则调用completePendingCheckpoint()方法完成当前PendingCheckpoint操作,并从pendingCheckpoints集合中移除当前的PendingCheckpoint。
- 添加CompletedCheckpoint:
紧接着,PendingCheckpoint会转换成CompletedCheckpoint,此时CheckpointCoordinator会在completedCheckpointStore集合中添加CompletedCheckpoint。
- 通知Checkpoint操作结束。
CheckpointCoordinator会遍历tasksToCommitTo集合中的ExecutionVertex节点并获取Execution对象,然后通过Execution向TaskManagerGateway发送CheckpointComplete消息,通知所有的Task实例本次Checkpoint操作结束。
- 通知同步
当TaskExecutor接收到CheckpointComplete消息后,会从TaskSlotTable中获取对应的Task实例,向Task实例中发送CheckpointComplete消息。所有实现CheckpointListener监听器的组件或算子都会获取Checkpoint完成的消息,然后完成各自后续的处理操作。
2. 触发并完成Checkpoint操作
CheckpointCoordinator组件接收到Task实例的Ack消息(快照完成了?)后,会触发并完成Checkpoint操作。如代码PendingCheckpoint.finalizeCheckpoint()方法的具体实现如下。
1)向sharedStateRegistry中注册operatorStates。
2)结束pendingCheckpoint中的Checkpoint操作并生成CompletedCheckpoint。
3)将completedCheckpoint添加到completedCheckpointStore中,
4)从pendingCheckpoint中移除checkpointId对应的PendingCheckpoint,
并触发队列中的Checkpoint请求。
5)向所有的ExecutionVertex节点发送CheckpointComplete消息,
通知Task实例本次Checkpoint操作完成。private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {final long checkpointId = pendingCheckpoint.getCheckpointId();final CompletedCheckpoint completedCheckpoint;// 首先向sharedStateRegistry中注册operatorStatesMap<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates();sharedStateRegistry.registerAll(operatorStates.values());// 对pendingCheckpoint中的Checkpoint做结束处理并生成CompletedCheckpointtry {try {completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId());}catch (Exception e1) {// 如果出现异常则中止运行并抛出CheckpointExecutionif (!pendingCheckpoint.isDiscarded()) {failPendingCheckpoint(pendingCheckpoint,CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);}throw new CheckpointException("Could not finalize the pending checkpoint " +checkpointId + '.',CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);}// 当完成finalization后,PendingCheckpoint必须被丢弃Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null);// 将completedCheckpoint添加到completedCheckpointStore中try {completedCheckpointStore.addCheckpoint(completedCheckpoint);} catch (Exception exception) {// 如果completed checkpoint存储出现异常则进行清理executor.execute(new Runnable() {@Overridepublic void run() {try {completedCheckpoint.discardOnFailedStoring();} catch (Throwable t) {LOG.warn("Could not properly discard completed checkpoint {}.",completedCheckpoint.getCheckpointID(), t);}}});throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, exception);}} finally {// 最后从pendingCheckpoints中移除checkpointId对应的PendingCheckpointpendingCheckpoints.remove(checkpointId);// 触发队列中的Checkpoint请求triggerQueuedRequests();}// 记录checkpointIdrememberRecentCheckpointId(checkpointId);// 清除之前的CheckpointsdropSubsumedCheckpoints(checkpointId);// 计算和前面Checkpoint操作之间的最低延时lastCheckpointCompletionRelativeTime = clock.relativeTimeMillis();LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job,completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());// 通知所有的ExecutionVertex节点Checkpoint操作完成final long timestamp = completedCheckpoint.getTimestamp();for (ExecutionVertex ev : tasksToCommitTo) {Execution ee = ev.getCurrentExecutionAttempt();if (ee != null) {ee.notifyCheckpointComplete(checkpointId, timestamp);}}
}
3. 通知CheckpointComplete给TaskExecutor
当TaskExecutor接收到来自CheckpointCoordinator的CheckpointComplete消息后,会调用Task.notifyCheckpointComplete()方法将消息传递到指定的Task实例中。Task线程会将CheckpointComplete消息通知给StreamTask中的算子。
如下代码,
/**
将notifyCheckpointComplete()转换成RunnableWithException线程并提交到Mailbox中运行,且在MailboxExecutor线程模型中获取和执行的优先级是最高的。
最终notifyCheckpointComplete()方法会在MailboxProcessor中运行。
**/public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {return mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).submit(() -> notifyCheckpointComplete(checkpointId),"checkpoint %d complete", checkpointId);
}
继续具体看StreamTask.notifyCheckpointComplete(),如下代码:
1)获取当前Task中算子链的算子,并发送Checkpoint完成的消息。
2)获取TaskStateManager对象,向其通知Checkpoint完成消息,这里主要调用
TaskLocalStateStore清理本地无用的Checkpoint数据。
3)如果当前Checkpoint是同步的Savepoint操作,直接完成并终止当前Task实例,并调用
resetSynchronousSavepointId()方法将syncSavepointId重置为空。private void notifyCheckpointComplete(long checkpointId) {try {boolean success = actionExecutor.call(() -> {if (isRunning) {LOG.debug("Notification of complete checkpoint for task {}", getName());// 获取当前Task中operatorChain所有的Operator,并通知每个Operator Checkpoint执行成功的消息for (StreamOperator<?> operator : operatorChain.getAllOperators()) {if (operator != null) {operator.notifyCheckpointComplete(checkpointId);}}return true;} else {LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName());return true;}});// 获取TaskStateManager,并通知Checkpoint执行完成的消息getEnvironment().getTaskStateManager().notifyCheckpointComplete(checkpointId);// 如果是同步的Savepoint操作,则直接完成当前Taskif (success && isSynchronousSavepointId(checkpointId)) {finishTask();// Reset to "notify" the internal synchronous savepoint mailbox loop.resetSynchronousSavepointId();}} catch (Exception e) {handleException(new RuntimeException("Error while confirming checkpoint", e));}
}
算子接收到Checkpoint完成消息后,会根据自身需要进行后续的处理,默认在AbstractStreamOperator基本实现类中会通知keyedStateBackend进行后续操作。
对于AbstractUdfStreamOperator实例,会判断当前userFunction是否实现了CheckpointListener,如果实现了,则向UserFucntion通知Checkpoint执行完成的信息
例如在FlinkKafkaConsumerBase中会通过获取到的Checkpoint完成信息,将Offset提交至Kafka集群,确保消费的数据已经完成处理,详细实现可以参考FlinkKafkaConsumerBase.notifyCheckpointComplete()方法。
public void notifyCheckpointComplete(long checkpointId) throws Exception {super.notifyCheckpointComplete(checkpointId);if (userFunction instanceof CheckpointListener) {((CheckpointListener) userFunction).notifyCheckpointComplete(checkpointId);}
}
三. 状态管理学习小结
通过学习状态管理的源码,我们可以再来思考下如下几个场景问题,是不是有一点“庖丁解牛”的意思!
flink中状态存在的意义是什么,涉及到哪些场景。
- 实时聚合:比如,计算过去一小时内的平均销售额。这时,你会需要使用到Flink的状态来存储过去一小时内的所有销售数据。
- 窗口操作:Flink SQL支持滚动窗口、滑动窗口、会话窗口等。这些窗口操作都需要Flink的状态来存储在窗口期限内的数据。
- 状态的持久化与任务恢复:实时任务挂掉之后,为了快速从上一个点恢复任务,可以使用savepoint和checkpoint。
- 多流join:Flink至少存储一个流中的数据,以便于在新的记录到来时进行匹配。
其次通过学习Flink状态管理相关源码,可以进一步了解状态管理的细节操作,为解决更加复杂的问题打下理论基础
- 深入理解任务运行过程中,各算子状态的流转机制;
- 快速定位问题:在遇到实际问题时,能够快速反应出是哪块逻辑出现了问题;
- 应对故障:状态管理和Flink容错机制相关,可以了解Flink发生故障时如何保证状态的一致性和可恢复性
- 二次开发:可以自定义状态后端,或者拓展优化已有的例如RocksDB状态后端等;
- 性能优化:了解了Flink是如何有效的处理和管理状态,就可以优化任务性能,减少资源消耗。
参考:《Flink设计与实现:核心原理与源码解析》–张利兵
相关文章:
【Flink状态管理(八)】Checkpoint:CheckpointBarrier对齐后Checkpoint的完成、通知与对学习状态管理源码的思考
文章目录 一. 调用StreamTask执行Checkpoint操作1. 执行Checkpoint总体代码流程1.1. StreamTask.checkpointState()1.2. executeCheckpointing1.3. 将算子中的状态快照操作封装在OperatorSnapshotFutures中1.4. 算子状态进行快照1.5. 状态数据快照持久化 二. CheckpointCoordin…...
防御保护第八、九、十、十一天笔记
一、内容安全 1、DFI和DPI技术 --- 深度检测技术 DPI是一种基于应用层的流量检测和控制技术,它会对流量进行拆包,分析包头和应用层的内容,从而识别应用程序和应用程序的内容。这种技术增加了对应用层的分析,识别各种应用…...
【TypeScript基础知识点】的讲解
TypeScript基础知识点 TypeScript基础知识点 TypeScript基础知识点 TypeScript 是一种由 Microsoft 开发和维护的开源编程语言,它是 JavaScript 的一个超集,添加了可选的静态类型和基于类的面向对象编程,以下是一些 TypeScript 的基础知识点…...
牛客周赛 Round 34 解题报告 | 珂学家 | 构造思维 + 置换环
前言 整体评价 好绝望的牛客周赛,彻底暴露了CF菜菜的本质,F题没思路,G题用置换环骗了50%, 这大概是唯一的亮点了。 A. 小红的字符串生成 思路: 枚举 a,b两字符在相等情况下比较特殊 a, b input().split() if a b:print (2)print (a)pri…...
LeetCode13 罗马数字转整数
题目 罗马数字包含以下七种字符: I, V, X, L,C,D 和 M。字符 数值 I 1 V 5 X 10 L 50 C 100 D 500 M 1000 例如&…...
【Hudi】Upsert原理
17张图带你彻底理解Hudi Upsert原理 1.开始提交:判断上次任务是否失败,如果失败会触发回滚操作。然后会根据当前时间生成一个事务开始的请求标识元数据。2.构造HoodieRecord Rdd对象:Hudi 会根据元数据信息构造HoodieRecord Rdd 对象…...
信息系统服务:演绎数字时代的征程
信息系统服务作为数字化时代的基石,已经在人类社会的各个领域发挥着重要作用。本文将从信息系统服务的起源、发展和演化过程,通过生动的例子和准确客观的历史事实,探讨信息系统服务对人类社会的影响与变革。 1. 起源:信息处理的初…...
rust连接postgresql数据库
引入crate: postgres "0.19.7" use postgres::{Client, NoTls, error::Error};fn main() -> Result<(), Error> {let mut client Client::connect("hostlocalhost port5432 dbnamexxxxdb userpostgres passwordxxxxxx", NoTls).un…...
[面试] 什么是死锁? 如何解决死锁?
什么是死锁 死锁,简单来说就是两个或者多个的线程在执行的过程中,争夺同一个共享资源造成的相互等待的现象。如果没有外部干预线程会一直阻塞下去. 导致死锁的原因 互斥条件,共享资源 X 和 Y 只能被一个线程占用; 请求和保持条件…...
网络原理 HTTP _ HTTPS
回顾 我们前面介绍了HTTP协议的请求和响应的基本结构 请求报文是由首行请求头空行正文来组成的 响应报文是由首行形影头空行响应正文组成的 我们也介绍了一定的请求头之中的键值对的属性 Host,Content-type,Content-length,User-agent,Referer,Cookie HTTP协议中的状态码 我们先…...
软件实际应用实例,茶楼收银软件管理系统操作流程,茶室计时计费会员管理系统软件试用版教程
软件实际应用实例,茶楼收银软件管理系统操作流程,茶室计时计费会员管理系统软件试用版教程 一、前言 以下软件以 佳易王茶社计时计费管理系统软件V17.9为例说明 软件文件下载可以点击最下方官网卡片——软件下载——试用版软件下载 1、计时计费&…...
网络安全“三保一评”深度解析
“没有网络安全就没有国家安全”。近几年,我国法律法规陆续发布实施,为承载我国国计民生的重要网络信息系统的安全提供了法律保障,正在实施的“3保1评”为我国重要网络信息系统的安全构筑了四道防线。 什么是“3保1评”? 等保、分…...
IDA使用-2023CICSN华中赛区pwn题逆向为例
文章目录 相关字节标识导入函数和导出函数找程序入口函数选项设置重命名CISCN2023华中赛区分区赛AWDIDA源码main 构造结构体sub_141B() 打开局部变量类型的视图增加变量类型重新定义变量类型再次设置变量类型并重新定义再次设置变量类型并重新定义再次设置变量类型并重新定义 设…...
安装虚拟机出现的一些问题
1、在重新打开软件之后出现闪退 解决:[WSL] 解决nsenter: cannot open /proc/320/ns/time: No such file or directory 问题 小白向-CSDN博客2、重新启动xrdp服务命令 解决: sudo systemctl restart xrdp3、将端口从3389改为3390,因为此前…...
Git+py+ipynb Usage
0.default config ssh-keygen -t rsa #之后一路回车,当前目录.ssh/下产生公私钥 cat ~/.ssh/id_rsa.pub #复制公钥到账号 git config --global user.email account_email git config --global user.name account_namebug of ipynb TqdmWarning: IProgress not found. Please …...
eBPF实践篇之环境搭建
文章目录 前言实验环境前置知识配置开发环境最后 前言 你好,我是醉墨居士,本次我们学习一下eBPF,我们基于libbpf-bootstrap来进行我们的eBPF程序开发🤗 实验环境 一台Debian12操作系统的计算机,我使用的是Debian12.…...
机器学习科普及学习路线
机器学习是一种让计算机系统通过从数据中学习来改进性能的方法。它的学习方法主要包括监督学习、无监督学习和强化学习。下面我将详细解释机器学习的概念、学习方法和学习路线。 1. 机器学习概念: 机器学习是一种人工智能的分支,旨在使计算机系统能够从…...
如何在本地电脑部署HadSky论坛并发布至公网可远程访问【内网穿透】
文章目录 前言1. 网站搭建1.1 网页下载和安装1.2 网页测试1.3 cpolar的安装和注册 2. 本地网页发布2.1 Cpolar临时数据隧道2.2 Cpolar稳定隧道(云端设置)2.3 Cpolar稳定隧道(本地设置)2.4 公网访问测试 总结 前言 经过多年的基础…...
Spring Boot 笔记 025 主界面
1.1 路由搭建 1.1.1 安装vue router npm install vue-router4 1.1.2 在src/router/index.js中创建路由器,并导出 import { createRouter, createWebHistory } from vue-router//导入组件 import LoginVue from /views/Login.vue import LayoutVue from /views/La…...
(done) Positive Semidefinite Matrices 什么是半正定矩阵?如何证明一个矩阵是半正定矩阵? 可以使用特征值
参考视频:https://www.bilibili.com/video/BV1Vg41197ew/?vd_source7a1a0bc74158c6993c7355c5490fc600 参考资料(半正定矩阵的定义):https://baike.baidu.com/item/%E5%8D%8A%E6%AD%A3%E5%AE%9A%E7%9F%A9%E9%98%B5/2152711?frge_ala 看看半正定矩阵的…...
接口测试中缓存处理策略
在接口测试中,缓存处理策略是一个关键环节,直接影响测试结果的准确性和可靠性。合理的缓存处理策略能够确保测试环境的一致性,避免因缓存数据导致的测试偏差。以下是接口测试中常见的缓存处理策略及其详细说明: 一、缓存处理的核…...
stm32G473的flash模式是单bank还是双bank?
今天突然有人stm32G473的flash模式是单bank还是双bank?由于时间太久,我真忘记了。搜搜发现,还真有人和我一样。见下面的链接:https://shequ.stmicroelectronics.cn/forum.php?modviewthread&tid644563 根据STM32G4系列参考手…...
【大模型RAG】Docker 一键部署 Milvus 完整攻略
本文概要 Milvus 2.5 Stand-alone 版可通过 Docker 在几分钟内完成安装;只需暴露 19530(gRPC)与 9091(HTTP/WebUI)两个端口,即可让本地电脑通过 PyMilvus 或浏览器访问远程 Linux 服务器上的 Milvus。下面…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
OPenCV CUDA模块图像处理-----对图像执行 均值漂移滤波(Mean Shift Filtering)函数meanShiftFiltering()
操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 在 GPU 上对图像执行 均值漂移滤波(Mean Shift Filtering),用于图像分割或平滑处理。 该函数将输入图像中的…...
2023赣州旅游投资集团
单选题 1.“不登高山,不知天之高也;不临深溪,不知地之厚也。”这句话说明_____。 A、人的意识具有创造性 B、人的认识是独立于实践之外的 C、实践在认识过程中具有决定作用 D、人的一切知识都是从直接经验中获得的 参考答案: C 本题解…...
Xen Server服务器释放磁盘空间
disk.sh #!/bin/bashcd /run/sr-mount/e54f0646-ae11-0457-b64f-eba4673b824c # 全部虚拟机物理磁盘文件存储 a$(ls -l | awk {print $NF} | cut -d. -f1) # 使用中的虚拟机物理磁盘文件 b$(xe vm-disk-list --multiple | grep uuid | awk {print $NF})printf "%s\n"…...
【VLNs篇】07:NavRL—在动态环境中学习安全飞行
项目内容论文标题NavRL: 在动态环境中学习安全飞行 (NavRL: Learning Safe Flight in Dynamic Environments)核心问题解决无人机在包含静态和动态障碍物的复杂环境中进行安全、高效自主导航的挑战,克服传统方法和现有强化学习方法的局限性。核心算法基于近端策略优化…...
uni-app学习笔记三十五--扩展组件的安装和使用
由于内置组件不能满足日常开发需要,uniapp官方也提供了众多的扩展组件供我们使用。由于不是内置组件,需要安装才能使用。 一、安装扩展插件 安装方法: 1.访问uniapp官方文档组件部分:组件使用的入门教程 | uni-app官网 点击左侧…...
boost::filesystem::path文件路径使用详解和示例
boost::filesystem::path 是 Boost 库中用于跨平台操作文件路径的类,封装了路径的拼接、分割、提取、判断等常用功能。下面是对它的使用详解,包括常用接口与完整示例。 1. 引入头文件与命名空间 #include <boost/filesystem.hpp> namespace fs b…...
