Flink 源码笔记 --- checkpoint
接上篇内容
Flink-1.10 源码笔记 checkpint - 1
进入taskExecutorGateway的triggerCheckpoint方法
获取slot中运行的任务,如果slot中运行着任务, 会task的triggerCheckpointBarrier方法触发task checkpoint barrier
@Override
public CompletableFuture<Acknowledge> triggerCheckpoint(
ExecutionAttemptID executionAttemptID,
long checkpointId,
long checkpointTimestamp,
CheckpointOptions checkpointOptions,
boolean advanceToEndOfEventTime) {
// Flink web ui 输出 @ Triggering checkpoint 1 @ 1590022656734 for job 8691156e98bad1c55fe5e869191de5f8.
log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);
//获取检查点类型
final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
}
//获取slot中运行的任务 task
final Task task = taskSlotTable.getTask(executionAttemptID);
//如果运行的任务 不等于null
if (task != null) {
//触发task checkpoint barrier
task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions, advanceToEndOfEventTime);
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';
log.debug(message);
return FutureUtils.completedExceptionally(new CheckpointException(message, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));
}
}
现在进入triggerCheckpointBarrier方法中,该方法主要做的事情
1.获取AbstractInvokable 该对象是具体的task类型, AbstractInvokable是taskManager的基类, 这里可以是sourceStreamTask或 StreamTask
2.创建CheckpointMetaData对象 ,该对象用于封装checkpoint的元数据信息
3.调用invokable的triggerCheckpointAsync方法,调用的是StreamTask类下的,该方法是用于触发异步的检查点
public void triggerCheckpointBarrier(
final long checkpointID,
final long checkpointTimestamp,
final CheckpointOptions checkpointOptions,
final boolean advanceToEndOfEventTime) {
// 是具体的task类型, AbstractInvokable是taskManager的基类, 这里可以是sourceStreamTask或 StreamTask
// sourceStreamTask --> StreamTask --> AbstractInvokable
final AbstractInvokable invokable = this.invokable;
//该对象 用于封装checkpoint的元数据信息
final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
//如果任务 储于 running状态 并且 invokable 不等于 null
if (executionState == ExecutionState.RUNNING && invokable != null) {
try {
//sourceStreamTask的父类是 streamTask, 调用的方法也是父类中的triggerCheckpointAsync
invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
}
catch (RejectedExecutionException ex) {
// This may happen if the mailbox is closed. It means that the task is shutting down, so we just ignore it.
LOG.debug(
"Triggering checkpoint {} for {} ({}) was rejected by the mailbox",
checkpointID, taskNameWithSubtask, executionId);
}
catch (Throwable t) {
if (getExecutionState() == ExecutionState.RUNNING) {
failExternally(new Exception(
"Error while triggering checkpoint " + checkpointID + " for " +
taskNameWithSubtask, t));
} else {
LOG.debug("Encountered error while triggering checkpoint {} for " +
"{} ({}) while being not in state running.", checkpointID,
taskNameWithSubtask, executionId, t);
}
}
}
else {
LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
// send back a message that we did not do the checkpoint
checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
new CheckpointException("Task name with subtask : " + taskNameWithSubtask, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY));
}
}
进入到StreamTask的triggerCheckpointAsync方法
@Override
public Future<Boolean> triggerCheckpointAsync(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
boolean advanceToEndOfEventTime) {
//
return mailboxProcessor.getMainMailboxExecutor().submit(
() -> triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime),
"checkpoint %s with %s",
checkpointMetaData,
checkpointOptions);
}
继续追踪triggerCheckpoint方法,主要逻辑都在performCheckpoint方法中
private boolean triggerCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
boolean advanceToEndOfEventTime) throws Exception {
try {
// No alignment if we inject a checkpoint
//初始化 checkpoint监控 设置 对齐时间
CheckpointMetrics checkpointMetrics = new CheckpointMetrics().setAlignmentDurationNanos(0L);
//协调子任务的协调员, 例如 sourceStreamTask streamTask
// todo getChannelStateWriter : 在检查点/保存点期间写入通道状态 start : 启动对给定检查点id的通道状态的写操作
subtaskCheckpointCoordinator.getChannelStateWriter().start(checkpointMetaData.getCheckpointId(), checkpointOptions);
//执行checkpoint
boolean success = performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics, advanceToEndOfEventTime);
if (!success) {
declineCheckpoint(checkpointMetaData.getCheckpointId());
}
return success;
} catch (Exception e) {
// propagate exceptions only if the task is still in "running" state
if (isRunning) {
Exception exception = new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() +
" for operator " + getName() + '.', e);
handleCheckpointException(exception);
throw exception;
} else {
LOG.debug("Could not perform checkpoint {} for operator {} while the " +
"invokable was not in state running.", checkpointMetaData.getCheckpointId(), getName(), e);
return false;
}
}
}
进入performCheckpoint方法,该方法,如果成功那么执行checkpoint广播barrier的操作,否则通知下游取消某个检查点了,恢复正常的处理,主要的逻辑都在subtaskCheckpointCoordinator的checkpointState方法中
private boolean performCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetrics checkpointMetrics,
boolean advanceToEndOfTime) throws Exception {
LOG.debug("Starting checkpoint ({}) {} on task {}",
checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());
if (isRunning) {
actionExecutor.runThrowing(() -> {
if (checkpointOptions.getCheckpointType().isSynchronous()) { //同步的
setSynchronousSavepointId(checkpointMetaData.getCheckpointId());
if (advanceToEndOfTime) {
advanceToEndOfEventTime();
}
}
//todo 执行checkpoint的过程
subtaskCheckpointCoordinator.checkpointState(
checkpointMetaData,
checkpointOptions,
checkpointMetrics,
operatorChain,
this::isCanceled);
});
return true;
} else {
actionExecutor.runThrowing(() -> {
// we cannot perform our checkpoint - let the downstream operators know that they
// should not wait for any input from this operator
// we cannot broadcast the cancellation markers on the 'operator chain', because it may not
// yet be created
// todo CancelCheckpointMarker在数据流中传递,类似于{@link CheckpointBarrier},但是它发出信号,
// 表示应该取消某个检查点。需要取消该检查点的任何正在进行的对齐,并恢复正常的处理。
final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
recordWriter.broadcastEvent(message);
});
return false;
}
}
进入到subtaskCheckpointCoordinator的checkpointState方法,进入的是他的实现类SubtaskCheckpointCoordinatorImpl的checkpointState方法
@Override
public void checkpointState(
CheckpointMetaData metadata,
CheckpointOptions options,
CheckpointMetrics metrics,
OperatorChain<?, ?> operatorChain,
Supplier<Boolean> isCanceled) throws Exception {
checkNotNull(options);
checkNotNull(metrics);
// All of the following steps happen as an atomic step from the perspective of barriers and
// records/watermarks/timers/callbacks.
// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
// checkpoint alignments
// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
// The pre-barrier work should be nothing or minimal in the common case.
// todo 执行operatorChain的准备checkpoint barrier的逻辑
operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());
// Step (2): Send the checkpoint barrier downstream
// 广播checkpoint屏障到下游
// operatorChain保存了所有的数据输出
// 遍历所有的输出,将checkpoint屏障发送给它们
operatorChain.broadcastEvent(
//创建一个 checkpointBarrier对象
new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options),
unalignedCheckpointEnabled); //是否对齐
// Step (3): Prepare to spill the in-flight buffers for input and output
if (unalignedCheckpointEnabled) {
prepareInflightDataSnapshot(metadata.getCheckpointId());
}
// Step (4): Take the state snapshot. This should be largely asynchronous, to not impact progress of the
// streaming topology
Map<OperatorID, OperatorSnapshotFutures> snapshotFutures = new HashMap<>(operatorChain.getNumberOfOperators());
try {
//对算子 进行快照
takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isCanceled);
finishAndReportAsync(snapshotFutures, metadata, metrics);
} catch (Exception ex) {
cleanup(snapshotFutures, metadata, metrics, options, ex);
}
}
在该方法中 分为四个步骤
1.执行operatorChain的准备checkpoint barrier的逻辑
operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());
进入prepareSnapshotPreBarrier方法,该方法会沿着 operator chain 向前 通知每个operator 准备 checkpoint
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
// go forward through the operator chain and tell each operator
// to prepare the checkpoint
// 沿着 operator chain 向前 通知每个operator 准备 checkpoint
for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators()) {
operatorWrapper.getStreamOperator().prepareSnapshotPreBarrier(checkpointId);
}
}
2.广播barrier到下游
operatorChain.broadcastEvent(
//创建一个 checkpointBarrier对象
new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options),
unalignedCheckpointEnabled); //是否对齐
进入broadcastEvent方法,将barrier进行广播发送到下游
public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
for (RecordWriterOutput<?> streamOutput : streamOutputs) {
streamOutput.broadcastEvent(event, isPriorityEvent);
}
}
3.准备溢出正在运行的输入和输出缓冲区
if (unalignedCheckpointEnabled) {
prepareInflightDataSnapshot(metadata.getCheckpointId());
}
进入prepareInflightDataSnapshot方法,主要准备缓冲区
private void prepareInflightDataSnapshot(long checkpointId) throws IOException {
prepareInputSnapshot.apply(channelStateWriter, checkpointId)
.thenAccept(unused -> channelStateWriter.finishInput(checkpointId));
ResultPartitionWriter[] writers = env.getAllWriters();
for (ResultPartitionWriter writer : writers) {
for (int i = 0; i < writer.getNumberOfSubpartitions(); i++) {
ResultSubpartition subpartition = writer.getSubpartition(i);
channelStateWriter.addOutputData(
checkpointId,
subpartition.getSubpartitionInfo(),
ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
subpartition.requestInflightBufferSnapshot().toArray(new Buffer[0]));
}
}
channelStateWriter.finishOutput(checkpointId);
}
4.获取快照完成状态 -- 异步的去完成快照
先进入takeSnapshotSync方法
该方法主要完成检查点同步 部分操作
private void takeSnapshotSync(
Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress,
CheckpointMetaData checkpointMetaData,
CheckpointMetrics checkpointMetrics,
CheckpointOptions checkpointOptions,
OperatorChain<?, ?> operatorChain,
Supplier<Boolean> isCanceled) throws Exception {
long checkpointId = checkpointMetaData.getCheckpointId();
long started = System.nanoTime();
ChannelStateWriteResult channelStateWriteResult = checkpointOptions.getCheckpointType() == CHECKPOINT ?
channelStateWriter.getWriteResult(checkpointId) :
ChannelStateWriteResult.EMPTY;
//检查点输出流的工厂,用于为检查点持久化数据。
CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(checkpointId, checkpointOptions.getTargetLocation());
try {
for (StreamOperatorWrapper<?, ?> operatorWrapper : operatorChain.getAllOperators(true)) {
// key 操作符id value 操作符checkpoint结果
operatorSnapshotsInProgress.put(
operatorWrapper.getStreamOperator().getOperatorID(),
buildOperatorSnapshotFutures(
checkpointMetaData,
checkpointOptions,
operatorChain,
operatorWrapper.getStreamOperator(),
isCanceled,
channelStateWriteResult,
storage));
}
} finally {
checkpointStorage.clearCacheFor(checkpointId);
}
LOG.debug(
"{} - finished synchronous part of checkpoint {}. Alignment duration: {} ms, snapshot duration {} ms",
taskName,
checkpointId,
checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
checkpointMetrics.getSyncDurationMillis());
checkpointMetrics.setSyncDurationMillis((System.nanoTime() - started) / 1_000_000);
}
主要看finishAndReportAsync方法,该方法会调用AsyncCheckpointRunnable的run方法
private void finishAndReportAsync(Map<OperatorID, OperatorSnapshotFutures> snapshotFutures, CheckpointMetaData metadata, CheckpointMetrics metrics) {
final Future<?> channelWrittenFuture;
if (unalignedCheckpointEnabled) {
ChannelStateWriteResult writeResult = channelStateWriter.getWriteResult(metadata.getCheckpointId());
channelWrittenFuture = CompletableFuture.allOf(
writeResult.getInputChannelStateHandles(),
writeResult.getResultSubpartitionStateHandles());
} else {
channelWrittenFuture = FutureUtils.completedVoidFuture();
}
// we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
//该方法 会调用 AsyncCheckpointRunnable.run方法,
//run 方法中封装完成了 snapshot过程的逻辑
executorService.execute(
new AsyncCheckpointRunnable( //执行需要完成snapshot过程的逻辑
snapshotFutures,
metadata,
metrics,
channelWrittenFuture,
System.nanoTime(),
taskName,
closeableRegistry,
env,
asyncExceptionHandler));
}
在该方法中的主要逻辑都在AsyncCheckpointRunnable的run方法中,该方法封装了完成执行snapshot过程的逻辑,
@Override
public void run() {
FileSystemSafetyNet.initializeSafetyNetForThread();
try {
//注册一个 可闭算子
closeableRegistry.registerCloseable(this);
//作业管理器任务操作符子任务状态
TaskStateSnapshot jobManagerTaskOperatorSubtaskStates = new TaskStateSnapshot(operatorSnapshotsInProgress.size());
//本地任务操作符子任务状态
TaskStateSnapshot localTaskOperatorSubtaskStates = new TaskStateSnapshot(operatorSnapshotsInProgress.size());
for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry : operatorSnapshotsInProgress.entrySet()) {
//获取算子id
OperatorID operatorID = entry.getKey();
//获取每个 operator正在运行的快照任务
OperatorSnapshotFutures snapshotInProgress = entry.getValue();
// finalize the async part of all by executing all snapshot runnables
// 创建finalizedSnapshots,创建的时候
// 执行快照过程中的异步方法
// todo OperatorSnapshotFinalizer 该对象完成{@link OperatorSnapshotFutures}。
// todo 每个对象都是用{@link OperatorSnapshotFutures}创建的
// todo 执行。然后,对象可以将执行的结果作为{@link OperatorSubtaskState}交付。
OperatorSnapshotFinalizer finalizedSnapshots =
new OperatorSnapshotFinalizer(snapshotInProgress);
// operator快照状态的一个副本,用于向JobManager汇报的快照状态
jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
operatorID,
finalizedSnapshots.getJobManagerOwnedState());
// operator快照状态的一个副本,用于Task本地的故障快速恢复
localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
operatorID,
finalizedSnapshots.getTaskLocalState());
}
final long asyncEndNanos = System.nanoTime();
final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000L;
//为监控系统提供快照操作的耗时
checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);
channelWrittenFuture.get();
//如果 快照状态从 运行变为 完成
if (asyncCheckpointState.compareAndSet(AsyncCheckpointState.RUNNING, AsyncCheckpointState.COMPLETED)) {
//报告已完成快照的状态
reportCompletedSnapshotStates(
jobManagerTaskOperatorSubtaskStates,
localTaskOperatorSubtaskStates,
asyncDurationMillis);
} else {
LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.",
taskName,
checkpointMetaData.getCheckpointId());
}
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} - asynchronous part of checkpoint {} could not be completed.",
taskName,
checkpointMetaData.getCheckpointId(),
e);
}
handleExecutionException(e);
} finally {
closeableRegistry.unregisterCloseable(this);
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
}
}
上一篇
Flink-1.10 源码笔记 checkpint - 1
下一篇
Flink-1.10 源码笔记 checkpint - 3
如有错误,欢迎指正!