Flink源码分析系列文档目录
请点击:Flink 源码分析系列文档目录
周期触发checkpoint的方法调用链
- JobMaster.triggerSavepoint
- SchedulerBase.startCheckpointScheduler
- CheckpointCoordinator.startCheckpointScheduler
- CheckpointCoordinator.scheduleTriggerWithDelay
- CheckpointCoordinator.triggerCheckpoint
- Execution.triggerCheckpoint
- Execution.triggerCheckpointHelper
- RpcTaskManagerGateway.triggerCheckpoint
- TaskExecutor.triggerCheckpoint
- Task.triggerCheckpointBarrier
- invokable.triggerCheckpointAsync(这里我们看StreamTask)
- StreamTask.performCheckpoint
- OperatorChain.broadcastCheckpointBarrier和CheckpointingOperation.executeCheckpointing
设置触发checkpoint的节点
ExecutionGraph
的enableCheckpointing
方法,创建了一个checkpointCoordinator
对象。该对象运行如JobManager中,负责统筹这个分布式系统中的checkpoint过程。它负责如下内容:
- 定时触发checkpoint操作。命令数据源发送checkpoint屏障。
- 接收各个operator的某个checkpoint完成确认消息。
- 对于某个checkpoint,当接收到所有operator的确认消息之时,发送消息通知各个operator,checkpoint已完成。
- 保存已完成和正在进行中的checkpoint的相关信息。
我们注意到构建checkpointCoordinator
传入一个变量叫tasksToTrigger
。含义为需要触发checkpoint的节点。这个变量在StreamingJobGraphGenerator
的configureCheckpointing
方法中创建。此方法的相关代码如下(无关部分已省略):
for (JobVertex vertex : jobVertices.values()) {
if (vertex.isInputVertex()) {
triggerVertices.add(vertex.getID());
}
commitVertices.add(vertex.getID());
ackVertices.add(vertex.getID());
}
triggerVertices
集合为符合isInputVertex
这个条件的所有vertex。继续查看isInputVertex
方法。代码如下:
public boolean isInputVertex() {
return this.inputs.isEmpty();
}
这下就明白了。没有任何输入的JobVertex才是inputVertex。因此,Checkpoint操作只会在inputVertex触发,即数据源是首先触发checkpoint操作的节点,然后checkpoint随着checkpoint barrier流向下游,依次触发各个节点的checkpoint操作。
周期触发Checkpoint操作的调用链分析
JobMaster
JobMaster
触发savepoint的时候会启动checkpoint过程。
我们查看下JobMaster
的triggerSavepoint
方法:
@Override
public CompletableFuture<String> triggerSavepoint(
@Nullable final String targetDirectory,
final boolean cancelJob,
final Time timeout) {
return schedulerNG.triggerSavepoint(targetDirectory, cancelJob);
}
下面我们分析下schedulerNG.triggerSavepoint
方法。
SchedulerBase
JobMaster
的triggerSavepoint
里的schedulerNG.triggerSavepoint
调用的是SchedulerBase
的方法。代码如下:
@Override
public CompletableFuture<String> triggerSavepoint(final String targetDirectory, final boolean cancelJob) {
// 确保运行在主线程
mainThreadExecutor.assertRunningInMainThread();
// 从executionGraph获取checkpointCoordinator
final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
if (checkpointCoordinator == null) {
throw new IllegalStateException(
String.format("Job %s is not a streaming job.", jobGraph.getJobID()));
} else if (targetDirectory == null && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
// 确保配置了savepoint默认存储目录,或者方法中传入了存储目录
log.info("Trying to cancel job {} with savepoint, but no savepoint directory configured.", jobGraph.getJobID());
throw new IllegalStateException(
"No savepoint directory configured. You can either specify a directory " +
"while cancelling via -s :targetDirectory or configure a cluster-wide " +
"default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'.");
}
// 如果是取消作业,停止checkpoint协调器
if (cancelJob) {
checkpointCoordinator.stopCheckpointScheduler();
}
// 先触发一次savepoint操作(实际上触发的是checkpoint)
// 接下来返回checkpoint操作保存的文件路径
// 最后执行:1.如果需要取消作业,并且之前步骤抛出了异常,则再次启动checkpoint协调器,抛出异常
// 2.如果需要取消作业,之前步骤没有抛出异常,取消任务执行
return checkpointCoordinator
.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
.thenApply(CompletedCheckpoint::getExternalPointer)
.handleAsync((path, throwable) -> {
if (throwable != null) {
if (cancelJob) {
startCheckpointScheduler(checkpointCoordinator);
}
throw new CompletionException(throwable);
} else if (cancelJob) {
log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID());
cancel();
}
return path;
}, mainThreadExecutor);
}
CheckpointCoordinator
CheckpointCoordinator负责协调所有算子的分布式快照和状态。它向相关的
task发送消息来触发快照动作,之后收集它们快照成功的确认消息。
CheckpointCoordinator
的createActivatorDeactivator
方法。该方法创建了一个Job状态监听器。如果Job的运行状态发生变化会调用listener的jobStatusChanges
方法。代码如下:
CheckpointCoordinatorDeActivator
的jobStatusChanges
方法:
@Override
public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
if (newJobStatus == JobStatus.RUNNING) {
// start the checkpoint scheduler
coordinator.startCheckpointScheduler();
} else {
// anything else should stop the trigger for now
coordinator.stopCheckpointScheduler();
}
}
从代码中不难发现,如果Job跳转为RUNNING状态,调用CheckpointCoordinator
的startCheckpointScheduler
方法。反之调用stopCheckpointScheduler
方法。
接下来看一看startCheckpointScheduler
方法的源代码,如下所示:
public void startCheckpointScheduler() {
synchronized (lock) {
if (shutdown) {
throw new IllegalArgumentException("Checkpoint coordinator is shut down");
}
// make sure all prior timers are cancelled
// 先停止之前创建的scheduler
stopCheckpointScheduler();
// 再创建一个新的scheduler
periodicScheduling = true;
// 延迟一段时间后启动定时checkpoint触发任务
// 延迟时间为checkpoint间隔最短时间到checkpoint间隔时间+1(开区间)之间的随机值
currentPeriodicTrigger = scheduleTriggerWithDelay(getRandomInitDelay());
}
}
scheduleTriggerWithDelay
方法启动了一个checkpoint操作定时触发器,代码如下所示:
private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {
return timer.scheduleAtFixedRate(
new ScheduledTrigger(),
initDelay, baseInterval, TimeUnit.MILLISECONDS);
}
这段代码设置了一个定时触发任务,任务逻辑在ScheduledTrigger
中。它的代码如下:
private final class ScheduledTrigger implements Runnable {
@Override
public void run() {
try {
triggerCheckpoint(System.currentTimeMillis(), true);
}
catch (Exception e) {
LOG.error("Exception while triggering checkpoint for job {}.", job, e);
}
}
}
``
`run`方法内仅有一个调用`triggerCheckpoint`。我们跟踪这个方法:
```java
// 上一步调用该方法时timestamp为系统当前时间,isPeriodic为true
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(long timestamp, boolean isPeriodic) {
try {
return triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic, false);
} catch (CheckpointException e) {
long latestGeneratedCheckpointId = getCheckpointIdCounter().get();
// here we can not get the failed pending checkpoint's id,
// so we pass the negative latest generated checkpoint id as a special flag
failureManager.handleJobLevelCheckpointException(e, -1 * latestGeneratedCheckpointId);
return FutureUtils.completedExceptionally(e);
}
}
接下来会调用真正的checkpoint处理逻辑,该方法比较长。它的代码如下:
@VisibleForTesting
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(
long timestamp,
CheckpointProperties props,
@Nullable String externalSavepointLocation,
boolean isPeriodic,
boolean advanceToEndOfTime) throws CheckpointException {
if (advanceToEndOfTime && !(props.isSynchronous() && props.isSavepoint())) {
throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
}
// make some eager pre-checks
// 参数检查
synchronized (lock) {
preCheckBeforeTriggeringCheckpoint(isPeriodic, props.forceCheckpoint());
}
// check if all tasks that we need to trigger are running.
// if not, abort the checkpoint
// tasksToTrigger为需要触发checkpoint的task,本篇一开始已分析过
// 确保所有的tasksToTrigger都在运行状态
Execution[] executions = new Execution[tasksToTrigger.length];
for (int i = 0; i < tasksToTrigger.length; i++) {
Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
if (ee == null) {
LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
job);
throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
} else if (ee.getState() == ExecutionState.RUNNING) {
executions[i] = ee;
} else {
LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
job,
ExecutionState.RUNNING,
ee.getState());
throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
}
}
// next, check if all tasks that need to acknowledge the checkpoint are running.
// if not, abort the checkpoint
// 检查所有需要接收checkpoint确认消息的task是否在运行
Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);
for (ExecutionVertex ev : tasksToWaitFor) {
Execution ee = ev.getCurrentExecutionAttempt();
if (ee != null) {
ackTasks.put(ee.getAttemptId(), ev);
} else {
LOG.info("Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.",
ev.getTaskNameWithSubtaskIndex(),
job);
throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
}
}
// we will actually trigger this checkpoint!
// 开始checkpoint过程
final CheckpointStorageLocation checkpointStorageLocation;
final long checkpointID;
try {
// this must happen outside the coordinator-wide lock, because it communicates
// with external services (in HA mode) and may block for a while.
// 获取Checkpoint ID
// 非HA模式使用StandaloneCheckpointIDCounter
// HA模式使用ZooKeeperCheckpointIDCounter
checkpointID = checkpointIdCounter.getAndIncrement();
// 获取checkpoint的存储目录
checkpointStorageLocation = props.isSavepoint() ?
checkpointStorage.initializeLocationForSavepoint(checkpointID, externalSavepointLocation) :
checkpointStorage.initializeLocationForCheckpoint(checkpointID);
}
catch (Throwable t) {
int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
LOG.warn("Failed to trigger checkpoint for job {} ({} consecutive failed attempts so far).",
job,
numUnsuccessful,
t);
throw new CheckpointException(CheckpointFailureReason.EXCEPTION, t);
}
// 创建一个进行中checkpoint
final PendingCheckpoint checkpoint = new PendingCheckpoint(
job,
checkpointID,
timestamp,
ackTasks,
masterHooks.keySet(),
props,
checkpointStorageLocation,
executor);
if (statsTracker != null) {
// 获取状态report回调
PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint(
checkpointID,
timestamp,
props);
// 设置状态回调
checkpoint.setStatsCallback(callback);
}
// schedule the timer that will clean up the expired checkpoints
// 过期checkpoint的清理逻辑
final Runnable canceller = () -> {
synchronized (lock) {
// only do the work if the checkpoint is not discarded anyways
// note that checkpoint completion discards the pending checkpoint object
// 排除已废弃的checkpoint
if (!checkpoint.isDiscarded()) {
LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job);
// 放弃正在进行中的checkpoint
failPendingCheckpoint(checkpoint, CheckpointFailureReason.CHECKPOINT_EXPIRED);
// pendingCheckpoints移除此checkpoint
pendingCheckpoints.remove(checkpointID);
// 增加最近checkpoint ID
rememberRecentCheckpointId(checkpointID);
// 触发排队等待的checkpoint操作
// 下面的preCheckBeforeTriggeringCheckpoint方法会触发并发checkpoint检查
// pendingCheckpoints大于maxConcurrentCheckpointAttempts的时候
// triggerRequestQueued会设置为true
// 此时会立刻触发一次checkpoint操作
triggerQueuedRequests();
}
}
};
try {
// re-acquire the coordinator-wide lock
synchronized (lock) {
// 参数检查
preCheckBeforeTriggeringCheckpoint(isPeriodic, props.forceCheckpoint());
LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job);
// 加入checkpoint到pendingCheckpoints集合中
pendingCheckpoints.put(checkpointID, checkpoint);
// 注册一个定时触发任务
// 在checkpoint超时的时候执行checkpoint取消任务
// 具体取消任务为上一段分析的canceller
ScheduledFuture<?> cancellerHandle = timer.schedule(
canceller,
checkpointTimeout, TimeUnit.MILLISECONDS);
// 如果task取消handle没有设置成功(此处只有一种可能,checkpoint已被废弃)
if (!checkpoint.setCancellerHandle(cancellerHandle)) {
// checkpoint is already disposed!
// 调用handle的取消checkpoint方法
cancellerHandle.cancel(false);
}
// TODO, asynchronously snapshots master hook without waiting here
// 循环调用master hook
// MasterTriggerRestoreHook用于生成或回复checkpoint之前通知外部系统
for (MasterTriggerRestoreHook<?> masterHook : masterHooks.values()) {
final MasterState masterState =
MasterHooks.triggerHook(masterHook, checkpointID, timestamp, executor)
.get(checkpointTimeout, TimeUnit.MILLISECONDS);
checkpoint.acknowledgeMasterState(masterHook.getIdentifier(), masterState);
}
// 检查未确认的master状态集合是否为空
Preconditions.checkState(checkpoint.areMasterStatesFullyAcknowledged());
}
// end of lock scope
// 获取checkpoint类型和存储位置配置
final CheckpointOptions checkpointOptions = new CheckpointOptions(
props.getCheckpointType(),
checkpointStorageLocation.getLocationReference());
// send the messages to the tasks that trigger their checkpoint
// 触发所有tasksToTrigger的checkpoint创建过程
for (Execution execution: executions) {
if (props.isSynchronous()) {
execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
} else {
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}
}
// 设置checkpoint未成功触发器个数为0
numUnsuccessfulCheckpointsTriggers.set(0);
return checkpoint.getCompletionFuture();
}
catch (Throwable t) {
// guard the map against concurrent modifications
synchronized (lock) {
pendingCheckpoints.remove(checkpointID);
}
int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
LOG.warn("Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)",
checkpointID, job, numUnsuccessful, t);
if (!checkpoint.isDiscarded()) {
failPendingCheckpoint(checkpoint, CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, t);
}
try {
checkpointStorageLocation.disposeOnFailure();
}
catch (Throwable t2) {
LOG.warn("Cannot dispose failed checkpoint storage location {}", checkpointStorageLocation, t2);
}
// rethrow the CheckpointException directly.
if (t instanceof CheckpointException) {
throw (CheckpointException) t;
}
throw new CheckpointException(CheckpointFailureReason.EXCEPTION, t);
}
}
这段方法比较长,大部分的逻辑为校验task是否在运行,参数校验和调用masterHook。
下面我们分析下触发checkpoint的入口execution.triggerCheckpoint
方法。
public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
triggerCheckpointHelper(checkpointId, timestamp, checkpointOptions, false);
}
triggerCheckpointHelper
方法:
private void triggerCheckpointHelper(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {
final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
}
// 获取slot
final LogicalSlot slot = assignedResource;
if (slot != null) {
// 获取TaskManagerGateway
// TaskManagerGateway用户JobManager和TaskManager通信
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
// 触发checkpoint操作
taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime);
} else {
LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running.");
}
}
这里taskManagerGateway
是RpcTaskManagerGateway
类型。我们查看RpcTaskManagerGateway
的triggerCheckpoint
方法。代码如下:
@Override
public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {
taskExecutorGateway.triggerCheckpoint(
executionAttemptID,
checkpointId,
timestamp,
checkpointOptions,
advanceToEndOfEventTime);
}
此处taskExecutorGateway
为TaskExecutor
类型。我们继续跟踪它的triggerCheckpoint
方法:
@Override
public CompletableFuture<Acknowledge> triggerCheckpoint(
ExecutionAttemptID executionAttemptID,
long checkpointId,
long checkpointTimestamp,
CheckpointOptions checkpointOptions,
boolean advanceToEndOfEventTime) {
log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);
final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
}
// 获取slot中运行的Task
final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) {
// 触发task发送CheckpointBarrier
task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions, advanceToEndOfEventTime);
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';
log.debug(message);
return FutureUtils.completedExceptionally(new CheckpointException(message, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));
}
}
Task
的triggerCheckpointBarrier
方法:
public void triggerCheckpointBarrier(
final long checkpointID,
final long checkpointTimestamp,
final CheckpointOptions checkpointOptions,
final boolean advanceToEndOfEventTime) {
final AbstractInvokable invokable = this.invokable;
final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
if (executionState == ExecutionState.RUNNING && invokable != null) {
try {
invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
}
catch (RejectedExecutionException ex) {
// This may happen if the mailbox is closed. It means that the task is shutting down, so we just ignore it.
LOG.debug(
"Triggering checkpoint {} for {} ({}) was rejected by the mailbox",
checkpointID, taskNameWithSubtask, executionId);
}
catch (Throwable t) {
if (getExecutionState() == ExecutionState.RUNNING) {
failExternally(new Exception(
"Error while triggering checkpoint " + checkpointID + " for " +
taskNameWithSubtask, t));
} else {
LOG.debug("Encountered error while triggering checkpoint {} for " +
"{} ({}) while being not in state running.", checkpointID,
taskNameWithSubtask, executionId, t);
}
}
}
else {
LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
// send back a message that we did not do the checkpoint
checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
new CheckpointException("Task name with subtask : " + taskNameWithSubtask, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY));
}
}
此方法的核心就一句invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime)
。这里的invokable是具体的Task类型,可以是SourceStreamTask
或StreamTask
。其中SourceStreamTask
的父类是StreamTask
,调用的triggerCheckpointAsync
也是父类中的方法。我们分析下StreamTask
的triggerCheckpointAsync
方法:
@Override
public Future<Boolean> triggerCheckpointAsync(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
boolean advanceToEndOfEventTime) {
return mailboxProcessor.getMainMailboxExecutor().submit(
() -> triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime),
"checkpoint %s with %s",
checkpointMetaData,
checkpointOptions);
}
继续跟踪triggerCheckpoint
方法。代码如下:
private boolean triggerCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
boolean advanceToEndOfEventTime) throws Exception {
try {
// No alignment if we inject a checkpoint
// 初始化checkpoint监控
CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
.setBytesBufferedInAlignment(0L)
.setAlignmentDurationNanos(0L);
// 执行checkpoint
boolean success = performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics, advanceToEndOfEventTime);
if (!success) {
// 如果没有成功,拒绝此checkpoint
declineCheckpoint(checkpointMetaData.getCheckpointId());
}
return success;
} catch (Exception e) {
// propagate exceptions only if the task is still in "running" state
if (isRunning) {
Exception exception = new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() +
" for operator " + getName() + '.', e);
handleCheckpointException(exception);
throw exception;
} else {
LOG.debug("Could not perform checkpoint {} for operator {} while the " +
"invokable was not in state running.", checkpointMetaData.getCheckpointId(), getName(), e);
return false;
}
}
}
主要逻辑在performCheckpoint
方法,源代码如下:
private boolean performCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetrics checkpointMetrics,
boolean advanceToEndOfTime) throws Exception {
LOG.debug("Starting checkpoint ({}) {} on task {}",
checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());
final long checkpointId = checkpointMetaData.getCheckpointId();
if (isRunning) {
actionExecutor.runThrowing(() -> {
if (checkpointOptions.getCheckpointType().isSynchronous()) {
setSynchronousSavepointId(checkpointId);
if (advanceToEndOfTime) {
advanceToEndOfEventTime();
}
}
// All of the following steps happen as an atomic step from the perspective of barriers and
// records/watermarks/timers/callbacks.
// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
// checkpoint alignments
// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
// The pre-barrier work should be nothing or minimal in the common case.
// 执行operatorChain的准备checkpoint屏障的逻辑
operatorChain.prepareSnapshotPreBarrier(checkpointId);
// Step (2): Send the checkpoint barrier downstream
// 广播checkpoint屏障到下游
// operatorChain保存了所有的数据输出
// 遍历所有的输出,将checkpoint屏障发送给它们
operatorChain.broadcastCheckpointBarrier(
checkpointId,
checkpointMetaData.getTimestamp(),
checkpointOptions);
// Step (3): Take the state snapshot. This should be largely asynchronous, to not
// impact progress of the streaming topology
// task自己执行checkpoint过程
checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
});
return true;
} else {
actionExecutor.runThrowing(() -> {
// we cannot perform our checkpoint - let the downstream operators know that they
// should not wait for any input from this operator
// we cannot broadcast the cancellation markers on the 'operator chain', because it may not
// yet be created
// 如果任务没有在运行,发送取消checkpoint的标记到下游
final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
recordWriter.broadcastEvent(message);
});
return false;
}
}
OperatorChain
的broadcastCheckpointBarrier
方法负责将checkpoint屏障发送到chain的所有下游输出端。下游task接收到屏障后也是再次向后传递屏障,同时自己进行checkpoint,直到屏障传递到sink。详细专题分析请参见:Flink 源码之分布式快照
broadcastCheckpointBarrier
代码如下所示:
public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
for (RecordWriterOutput<?> streamOutput : streamOutputs) {
streamOutput.broadcastEvent(barrier);
}
}
checkpoint的具体过程在checkpointState
方法中。它的代码如下:
private void checkpointState(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetrics checkpointMetrics) throws Exception {
CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(
checkpointMetaData.getCheckpointId(),
checkpointOptions.getTargetLocation());
// 创建一个Checkpoint操作
CheckpointingOperation checkpointingOperation = new CheckpointingOperation(
this,
checkpointMetaData,
checkpointOptions,
storage,
checkpointMetrics);
// 执行checkpoint
checkpointingOperation.executeCheckpointing();
}
我们继续分析CheckpointingOperation
的executeCheckpointing
方法。代码如下:
public void executeCheckpointing() throws Exception {
startSyncPartNano = System.nanoTime();
try {
// 对所有operator进行checkpoint
for (StreamOperator<?> op : allOperators) {
// 调用AbstractStreamOperator的snapshotState方法
// snapshotState方法中是具体执行snapshot的逻辑
// 逻辑比较复杂,此处暂不分析,待明确之后另开一篇讨论
checkpointStreamOperator(op);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",
checkpointMetaData.getCheckpointId(), owner.getName());
}
startAsyncPartNano = System.nanoTime();
checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);
// we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
// 执行需要完成snapshot过程的逻辑
AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
owner,
operatorSnapshotsInProgress,
checkpointMetaData,
checkpointMetrics,
startAsyncPartNano);
owner.cancelables.registerCloseable(asyncCheckpointRunnable);
owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);
if (LOG.isDebugEnabled()) {
LOG.debug("{} - finished synchronous part of checkpoint {}. " +
"Alignment duration: {} ms, snapshot duration {} ms",
owner.getName(), checkpointMetaData.getCheckpointId(),
checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
checkpointMetrics.getSyncDurationMillis());
}
} catch (Exception ex) {
// Cleanup to release resources
for (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values()) {
if (null != operatorSnapshotResult) {
try {
operatorSnapshotResult.cancel();
} catch (Exception e) {
LOG.warn("Could not properly cancel an operator snapshot result.", e);
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("{} - did NOT finish synchronous part of checkpoint {}. " +
"Alignment duration: {} ms, snapshot duration {} ms",
owner.getName(), checkpointMetaData.getCheckpointId(),
checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
checkpointMetrics.getSyncDurationMillis());
}
if (checkpointOptions.getCheckpointType().isSynchronous()) {
// in the case of a synchronous checkpoint, we always rethrow the exception,
// so that the task fails.
// this is because the intention is always to stop the job after this checkpointing
// operation, and without the failure, the task would go back to normal execution.
throw ex;
} else {
owner.getEnvironment().declineCheckpoint(checkpointMetaData.getCheckpointId(), ex);
}
}
}
owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable)
这一句调用了AsyncCheckpointRunnable
的run
方法。该方法封装了完成执行snapshot过程的逻辑。代码如下:
public void run() {
FileSystemSafetyNet.initializeSafetyNetForThread();
try {
TaskStateSnapshot jobManagerTaskOperatorSubtaskStates =
new TaskStateSnapshot(operatorSnapshotsInProgress.size());
TaskStateSnapshot localTaskOperatorSubtaskStates =
new TaskStateSnapshot(operatorSnapshotsInProgress.size());
for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry : operatorSnapshotsInProgress.entrySet()) {
OperatorID operatorID = entry.getKey();
// 获取每个operator正在运行的快照任务
OperatorSnapshotFutures snapshotInProgress = entry.getValue();
// finalize the async part of all by executing all snapshot runnables
// 创建finalizedSnapshots,创建的时候
// 执行快照过程中的异步方法
OperatorSnapshotFinalizer finalizedSnapshots =
new OperatorSnapshotFinalizer(snapshotInProgress);
// operator快照状态的一个副本,用于向JobManager汇报的快照状态
jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
operatorID,
finalizedSnapshots.getJobManagerOwnedState());
// operator快照状态的一个副本,用于Task本地的故障快速恢复
localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
operatorID,
finalizedSnapshots.getTaskLocalState());
}
final long asyncEndNanos = System.nanoTime();
final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000L;
// 为监控系统提供快照操作的耗时
checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);
// 将快照状态从运行中设置为已完成
if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING,
CheckpointingOperation.AsyncCheckpointState.COMPLETED)) {
// 汇报快照过程已经完成,该方法稍后分析
reportCompletedSnapshotStates(
jobManagerTaskOperatorSubtaskStates,
localTaskOperatorSubtaskStates,
asyncDurationMillis);
} else {
LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.",
owner.getName(),
checkpointMetaData.getCheckpointId());
}
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} - asynchronous part of checkpoint {} could not be completed.",
owner.getName(),
checkpointMetaData.getCheckpointId(),
e);
}
handleExecutionException(e);
} finally {
owner.cancelables.unregisterCloseable(this);
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
}
}
Snapshot确认消息发送
整个过程的调用链
- StreamTask.reportCompletedSnapshotStates
- TaskStateManagerImpl.reportTaskStateSnapshots
- RpcCheckpointResponder.acknowledgeCheckpoint
- JobMaster.acknowledgeCheckpoint
- SchedulerBase.acknowledgeCheckpoint
- CheckpointCoordinator.receiveAcknowledgeMessage
- CheckpointCoordinator.completePendingCheckpoint
- PendingCheckpoint.finalizeCheckpoint
AsyncCheckpointRunnable.reportCompletedSnapshotStates
我们从AsyncCheckpointRunnable
的reportCompletedSnapshotStates
方法开始分析。reportCompletedSnapshotStates
负责汇报snapshot过程成功完成,代码如下:
private void reportCompletedSnapshotStates(
TaskStateSnapshot acknowledgedTaskStateSnapshot,
TaskStateSnapshot localTaskStateSnapshot,
long asyncDurationMillis) {
// 获取任务状态管理器
TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager();
boolean hasAckState = acknowledgedTaskStateSnapshot.hasState();
boolean hasLocalState = localTaskStateSnapshot.hasState();
Preconditions.checkState(hasAckState || !hasLocalState,
"Found cached state but no corresponding primary state is reported to the job " +
"manager. This indicates a problem.");
// we signal stateless tasks by reporting null, so that there are no attempts to assign empty state
// to stateless tasks on restore. This enables simple job modifications that only concern
// stateless without the need to assign them uids to match their (always empty) states.
// 上报任务快照状态
taskStateManager.reportTaskStateSnapshots(
checkpointMetaData,
checkpointMetrics,
hasAckState ? acknowledgedTaskStateSnapshot : null,
hasLocalState ? localTaskStateSnapshot : null);
LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms",
owner.getName(), checkpointMetaData.getCheckpointId(), asyncDurationMillis);
LOG.trace("{} - reported the following states in snapshot for checkpoint {}: {}.",
owner.getName(), checkpointMetaData.getCheckpointId(), acknowledgedTaskStateSnapshot);
}
TaskStateManagerImpl.reportTaskStateSnapshots
上一个代码片段中taskStateManager
的实现类为TaskStateManagerImpl
。接下来分析下reportTaskStateSnapshots
方法。
@Override
public void reportTaskStateSnapshots(
@Nonnull CheckpointMetaData checkpointMetaData,
@Nonnull CheckpointMetrics checkpointMetrics,
@Nullable TaskStateSnapshot acknowledgedState,
@Nullable TaskStateSnapshot localState) {
long checkpointId = checkpointMetaData.getCheckpointId();
// 保存本地快照状态
localStateStore.storeLocalState(checkpointId, localState);
// 发送快照成功确认消息
checkpointResponder.acknowledgeCheckpoint(
jobId,
executionAttemptID,
checkpointId,
checkpointMetrics,
acknowledgedState);
}
RpcCheckpointResponder.acknowledgeCheckpoint
上面checkpointResponder
类型为RpcCheckpointResponder
。我们查看下它的acknowledgeCheckpoint
方法:
@Override
public void acknowledgeCheckpoint(
JobID jobID,
ExecutionAttemptID executionAttemptID,
long checkpointId,
CheckpointMetrics checkpointMetrics,
TaskStateSnapshot subtaskState) {
checkpointCoordinatorGateway.acknowledgeCheckpoint(
jobID,
executionAttemptID,
checkpointId,
checkpointMetrics,
subtaskState);
}
该方法调用了checkpointCoordinatorGateway
的同名方法。这里checkpointCoordinatorGateway
为JobMaster
对象。
RpcCheckpointResponder
在JobManager选举成功的时候,建立和JobManager的联系的时候创建。
JobMaster.acknowledgeCheckpoint
@Override
public void acknowledgeCheckpoint(
final JobID jobID,
final ExecutionAttemptID executionAttemptID,
final long checkpointId,
final CheckpointMetrics checkpointMetrics,
final TaskStateSnapshot checkpointState) {
schedulerNG.acknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointState);
}
SchedulerBase.acknowledgeCheckpoint
我们查看该方法的源代码。经历过层层调用之后我们在这里找到了和CheckpointCoordinator
的交互过程。代码如下所示。
@Override
public void acknowledgeCheckpoint(final JobID jobID, final ExecutionAttemptID executionAttemptID, final long checkpointId, final CheckpointMetrics checkpointMetrics, final TaskStateSnapshot checkpointState) {
// 确保在主线程中运行
mainThreadExecutor.assertRunningInMainThread();
// 获取checkpointCoordinator 对象
final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
// 创建一个checkpoint确认消息
final AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(
jobID,
executionAttemptID,
checkpointId,
checkpointMetrics,
checkpointState);
// 获取taskManager的地址
final String taskManagerLocationInfo = retrieveTaskManagerLocation(executionAttemptID);
if (checkpointCoordinator != null) {
ioExecutor.execute(() -> {
try {
// 调用checkpointCoordinator的接收确认消息方法
checkpointCoordinator.receiveAcknowledgeMessage(ackMessage, taskManagerLocationInfo);
} catch (Throwable t) {
log.warn("Error while processing checkpoint acknowledgement message", t);
}
});
} else {
String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator";
if (executionGraph.getState() == JobStatus.RUNNING) {
log.error(errorMessage, jobGraph.getJobID());
} else {
log.debug(errorMessage, jobGraph.getJobID());
}
}
}
CheckpointCoordinator.receiveAcknowledgeMessage
现在我们回到CheckpointCoordinator
,分析下receiveAcknowledgeMessage
方法。
public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message, String taskManagerLocationInfo) throws CheckpointException {
if (shutdown || message == null) {
return false;
}
// 检查message的JobID和实际运行的JobID是否相同
if (!job.equals(message.getJob())) {
LOG.error("Received wrong AcknowledgeCheckpoint message for job {} from {} : {}", job, taskManagerLocationInfo, message);
return false;
}
// 获取checkpoint ID
final long checkpointId = message.getCheckpointId();
synchronized (lock) {
// we need to check inside the lock for being shutdown as well, otherwise we
// get races and invalid error log messages
// 确保没有shutdown
if (shutdown) {
return false;
}
// 获取正在进行的checkpoint操作
final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);
if (checkpoint != null && !checkpoint.isDiscarded()) {
switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())) {
case SUCCESS:
LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {} at {}.",
checkpointId, message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
// 如果确认成功,并且接收到了所有operator快照成功的确认
// 调用完成此PendingCheckpoint的逻辑
if (checkpoint.areTasksFullyAcknowledged()) {
completePendingCheckpoint(checkpoint);
}
break;
case DUPLICATE:
LOG.debug("Received a duplicate acknowledge message for checkpoint {}, task {}, job {}, location {}.",
message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
break;
case UNKNOWN:
LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {} at {}, " +
"because the task's execution attempt id was unknown. Discarding " +
"the state handle to avoid lingering state.", message.getCheckpointId(),
message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
break;
case DISCARDED:
LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {} at {}, " +
"because the pending checkpoint had been discarded. Discarding the " +
"state handle tp avoid lingering state.",
message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
}
return true;
}
else if (checkpoint != null) {
// this should not happen
throw new IllegalStateException(
"Received message for discarded but non-removed checkpoint " + checkpointId);
}
else {
boolean wasPendingCheckpoint;
// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
if (recentPendingCheckpoints.contains(checkpointId)) {
wasPendingCheckpoint = true;
LOG.warn("Received late message for now expired checkpoint attempt {} from task " +
"{} of job {} at {}.", checkpointId, message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
}
else {
LOG.debug("Received message for an unknown checkpoint {} from task {} of job {} at {}.",
checkpointId, message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
wasPendingCheckpoint = false;
}
// try to discard the state so that we don't have lingering state lying around
discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
return wasPendingCheckpoint;
}
}
}
在分析完成checkpoint方法(completePendingCheckpoint
)之前,我们先分析下PendingCheckpoint的确认过程。
PendingCheckpoint
的acknowledgeTask
方法如下所示:
public TaskAcknowledgeResult acknowledgeTask(
ExecutionAttemptID executionAttemptId,
TaskStateSnapshot operatorSubtaskStates,
CheckpointMetrics metrics) {
synchronized (lock) {
if (discarded) {
// 如果checkpoint已废弃,返回DISCARDED
return TaskAcknowledgeResult.DISCARDED;
}
// 从notYetAcknowledgedTasks集合中移除已确认的task
// notYetAcknowledgedTasks保存了所有未确认的task
final ExecutionVertex vertex = notYetAcknowledgedTasks.remove(executionAttemptId);
if (vertex == null) {
// 如果notYetAcknowledgedTasks没有该task
// 但是它在acknowledgedTasks(已确认的task)集合中
// 返回重复确认DUPLICATE
if (acknowledgedTasks.contains(executionAttemptId)) {
return TaskAcknowledgeResult.DUPLICATE;
} else {
// 其他情况返回未知
return TaskAcknowledgeResult.UNKNOWN;
}
} else {
// 添加到已确认task集合中
acknowledgedTasks.add(executionAttemptId);
}
List<OperatorID> operatorIDs = vertex.getJobVertex().getOperatorIDs();
int subtaskIndex = vertex.getParallelSubtaskIndex();
long ackTimestamp = System.currentTimeMillis();
long stateSize = 0L;
// 这段代码为保存各个operator的snapshot状态
if (operatorSubtaskStates != null) {
for (OperatorID operatorID : operatorIDs) {
OperatorSubtaskState operatorSubtaskState =
operatorSubtaskStates.getSubtaskStateByOperatorID(operatorID);
// if no real operatorSubtaskState was reported, we insert an empty state
if (operatorSubtaskState == null) {
operatorSubtaskState = new OperatorSubtaskState();
}
OperatorState operatorState = operatorStates.get(operatorID);
if (operatorState == null) {
operatorState = new OperatorState(
operatorID,
vertex.getTotalNumberOfParallelSubtasks(),
vertex.getMaxParallelism());
operatorStates.put(operatorID, operatorState);
}
operatorState.putState(subtaskIndex, operatorSubtaskState);
stateSize += operatorSubtaskState.getStateSize();
}
}
++numAcknowledgedTasks;
// publish the checkpoint statistics
// to prevent null-pointers from concurrent modification, copy reference onto stack
// 这段代码为汇报所有子任务checkpoint状态
final PendingCheckpointStats statsCallback = this.statsCallback;
if (statsCallback != null) {
// Do this in millis because the web frontend works with them
long alignmentDurationMillis = metrics.getAlignmentDurationNanos() / 1_000_000;
SubtaskStateStats subtaskStateStats = new SubtaskStateStats(
subtaskIndex,
ackTimestamp,
stateSize,
metrics.getSyncDurationMillis(),
metrics.getAsyncDurationMillis(),
metrics.getBytesBufferedInAlignment(),
alignmentDurationMillis);
statsCallback.reportSubtaskStats(vertex.getJobvertexId(), subtaskStateStats);
}
// 最后返回执行成功
return TaskAcknowledgeResult.SUCCESS;
}
}
我们再来看看什么时候会调用completePendingCheckpoint
方法完成checkpoint。checkpoint.areTasksFullyAcknowledged()
方法返回true的时候会调用。这段代码的逻辑如下:
public boolean areTasksFullyAcknowledged() {
return notYetAcknowledgedTasks.isEmpty() && !discarded;
}
从上面代码可知,任务没有被废弃,并且notYetAcknowledgedTasks
为空(所有的task都已被确认)的时候才会调用completePendingCheckpoint
方法。
CheckpointCoordinator.completePendingCheckpoint
private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {
final long checkpointId = pendingCheckpoint.getCheckpointId();
final CompletedCheckpoint completedCheckpoint;
// As a first step to complete the checkpoint, we register its state with the registry
// 注册所有operator的state到sharedStateRegistry
Map<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates();
sharedStateRegistry.registerAll(operatorStates.values());
try {
try {
// 调用完成pendingCheckpoint的逻辑,具体内容稍后分析
completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();
// 重置失败checkpoint的计数
failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId());
}
catch (Exception e1) {
// abort the current pending checkpoint if we fails to finalize the pending checkpoint.
if (!pendingCheckpoint.isDiscarded()) {
failPendingCheckpoint(pendingCheckpoint, CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);
}
throw new CheckpointException("Could not finalize the pending checkpoint " + checkpointId + '.',
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);
}
// the pending checkpoint must be discarded after the finalization
// 检查状态,调用finalizeCheckpoint方法后pendingCheckpoint必须为discarded状态
Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null);
try {
// 存储已完成的checkpoint
completedCheckpointStore.addCheckpoint(completedCheckpoint);
} catch (Exception exception) {
// we failed to store the completed checkpoint. Let's clean up
executor.execute(new Runnable() {
@Override
public void run() {
try {
completedCheckpoint.discardOnFailedStoring();
} catch (Throwable t) {
LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t);
}
}
});
throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.',
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, exception);
}
} finally {
// 从正在进行中checkpoint集合中移除此checkpoint
pendingCheckpoints.remove(checkpointId);
triggerQueuedRequests();
}
// 保存最近的checkpoint ID
rememberRecentCheckpointId(checkpointId);
// drop those pending checkpoints that are at prior to the completed one
// 挂掉所有id小于checkpointId的checkpoint操作(被挂掉的checkpoint不能是强制的)
dropSubsumedCheckpoints(checkpointId);
// record the time when this was completed, to calculate
// the 'min delay between checkpoints'
// 保存此次checkpoint完成时间
lastCheckpointCompletionRelativeTime = clock.relativeTimeMillis();
LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job,
completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());
if (LOG.isDebugEnabled()) {
StringBuilder builder = new StringBuilder();
builder.append("Checkpoint state: ");
for (OperatorState state : completedCheckpoint.getOperatorStates().values()) {
builder.append(state);
builder.append(", ");
}
// Remove last two chars ", "
builder.setLength(builder.length() - 2);
LOG.debug(builder.toString());
}
// send the "notify complete" call to all vertices
final long timestamp = completedCheckpoint.getTimestamp();
for (ExecutionVertex ev : tasksToCommitTo) {
Execution ee = ev.getCurrentExecutionAttempt();
if (ee != null) {
// 向各个节点发送checkpoint完成的消息,此方法很重要
// 逐级发送通知到所有的task,StreamTask,再到所有的operator,userFunction
// 最后如果userFunction实现了CheckpointListener接口
// 逐个调用这些userFunction的notifyCheckpointComplete方法
ee.notifyCheckpointComplete(checkpointId, timestamp);
}
}
}
注:通知各个operator checkpoint成功的调用链如下:
- Execution.notifyCheckpointComplete
- RpcTaskManagerGateway.notifyCheckpointComplete
- TaskExecutor.confirmCheckpoint
- Task.notifyCheckpointComplete
- StreamTask.notifyCheckpointCompleteAsync
- StreamTask.notifyCheckpointComplete
- AbstractUdfStreamOperator.notifyCheckpointComplete
- CheckpointListener.notifyCheckpointComplete
PendingCheckpoint.finalizeCheckpoint
最后分析一下PendingCheckpoint如何完成,最终生成CompletedCheckpoint对象的过程。代码如下所示:
public CompletedCheckpoint finalizeCheckpoint() throws IOException {
synchronized (lock) {
// 保证所有的masterState都确认
checkState(areMasterStatesFullyAcknowledged(),
"Pending checkpoint has not been fully acknowledged by master states yet.");
// 保证所有的task都确认
checkState(areTasksFullyAcknowledged(),
"Pending checkpoint has not been fully acknowledged by tasks yet.");
// make sure we fulfill the promise with an exception if something fails
try {
// write out the metadata
// 创建一个savepoint对象
final Savepoint savepoint = new SavepointV2(checkpointId, operatorStates.values(), masterStates);
final CompletedCheckpointStorageLocation finalizedLocation;
// 保存checkpoint数据到文件系统
try (CheckpointMetadataOutputStream out = targetLocation.createMetadataOutputStream()) {
Checkpoints.storeCheckpointMetadata(savepoint, out);
finalizedLocation = out.closeAndFinalizeCheckpoint();
}
CompletedCheckpoint completed = new CompletedCheckpoint(
jobId,
checkpointId,
checkpointTimestamp,
System.currentTimeMillis(),
operatorStates,
masterStates,
props,
finalizedLocation);
// completableFuture任务完成,返回completedCheckpoint
onCompletionPromise.complete(completed);
// to prevent null-pointers from concurrent modification, copy reference onto stack
// 设置completedCheckpoint的discardCallback
PendingCheckpointStats statsCallback = this.statsCallback;
if (statsCallback != null) {
// Finalize the statsCallback and give the completed checkpoint a
// callback for discards.
CompletedCheckpointStats.DiscardCallback discardCallback =
statsCallback.reportCompletedCheckpoint(finalizedLocation.getExternalPointer());
completed.setDiscardCallback(discardCallback);
}
// mark this pending checkpoint as disposed, but do NOT drop the state
// 标记自己为disposed状态
dispose(false);
return completed;
}
catch (Throwable t) {
onCompletionPromise.completeExceptionally(t);
ExceptionUtils.rethrowIOException(t);
return null; // silence the compiler
}
}
}
本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。