Flink 源码笔记 --- checkpoint
接上篇内容
Flink-1.10 源码笔记 checkpint - 2
Snapshot确认消息发送
在AsyncCheckpointRunnable.run()中调用了reportCompletedSnapshotStates方法负责报告 snapshot,在这里taskEnvironment调用了getTaskStateManager 获取TaskStateManager(任务状态管理器),TaskStateManager是TaskStateManagerImpl类型,后调用reportTaskStateSnapshots进行上报状态
private void reportCompletedSnapshotStates(
TaskStateSnapshot acknowledgedTaskStateSnapshot,
TaskStateSnapshot localTaskStateSnapshot,
long asyncDurationMillis) { //快照持续时间
boolean hasAckState = acknowledgedTaskStateSnapshot.hasState();
boolean hasLocalState = localTaskStateSnapshot.hasState();
Preconditions.checkState(hasAckState || !hasLocalState,
"Found cached state but no corresponding primary state is reported to the job " +
"manager. This indicates a problem.");
// we signal stateless tasks by reporting null, so that there are no attempts to assign empty state
// to stateless tasks on restore. This enables simple job modifications that only concern
// stateless without the need to assign them uids to match their (always empty) states.
//todo 上报任务 快照状态
// 当任务上触发检查点或保存点时,它将为其拥有的所有流操作符实例创建快照。然后,通过此接口报告来自任务的所有操作符快照。
// 典型的实现将把报告的状态信息分派并转发给相关方,比如检查点协调器或本地状态存储。
taskEnvironment.getTaskStateManager().reportTaskStateSnapshots( //改方法调用的 TaskStateManagerImpl实现了的方法
checkpointMetaData,
checkpointMetrics,
hasAckState ? acknowledgedTaskStateSnapshot : null,
hasLocalState ? localTaskStateSnapshot : null);
LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms",
taskName, checkpointMetaData.getCheckpointId(), asyncDurationMillis);
LOG.trace("{} - reported the following states in snapshot for checkpoint {}: {}.",
taskName, checkpointMetaData.getCheckpointId(), acknowledgedTaskStateSnapshot);
}
进入TaskStateManagerImpl类,该类实现了TaskStateManager接口
public class TaskStateManagerImpl implements TaskStateManager
现在查看reportTaskStateSnapshots方法,在该方法中,会先将快照状态保存在本地,然后调用checkpointResponder的acknowledgeCheckpoint方法,发送消息
checkpointResponder是RpcCheckpointResponder类型
/**
* @param checkpointMetaData 检查点请求的元数据。
* @param checkpointMetrics 检查点的任务级别度量。 --指标监控
* @param acknowledgedState 报告的状态表示向作业管理器应答。
* @param localState 报告的状态为本地恢复。
*/
@Override
public void reportTaskStateSnapshots(
@Nonnull CheckpointMetaData checkpointMetaData,
@Nonnull CheckpointMetrics checkpointMetrics,
@Nullable TaskStateSnapshot acknowledgedState,
@Nullable TaskStateSnapshot localState) {
long checkpointId = checkpointMetaData.getCheckpointId();
//保存本地快照状态
localStateStore.storeLocalState(checkpointId, localState);
//发送快照成功消息
checkpointResponder.acknowledgeCheckpoint( //调用RpcCheckpointResponder类中的方法
jobId,
executionAttemptID,
checkpointId,
checkpointMetrics,
acknowledgedState);
}
进入到acknowledgeCheckpoint方法中主要调用了checkpointCoordinatorGateway的acknowledgeCheckpoint方法,该方法调用的是JobMaster的acknowledgeCheckpoint方法
@Override
public void acknowledgeCheckpoint(
JobID jobID,
ExecutionAttemptID executionAttemptID,
long checkpointId,
CheckpointMetrics checkpointMetrics,
TaskStateSnapshot subtaskState) {
//调用了同名方法
// JobMaster --> JobMasterGateway --> JobMasterOperatorEventGateway 以此实现 JobMasterOperatorEventGateway为父接口
// jobMaster 是 任务管理器负责执行单个任务的jobGraph
// RpcCheckpointResponder 本对象 在jobManager选举成功的时候,建立和jobManager的联系时候创建的
checkpointCoordinatorGateway.acknowledgeCheckpoint(
jobID,
executionAttemptID,
checkpointId,
checkpointMetrics,
subtaskState);
}
JobMaster继承了FencedRpcEndpoint,实现了JobMasterGateway和JobMasterService接口,
JobMasterGateway接口实现了JobMasterOperatorEventGateway接口
public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService
进入checkpointCoordinatorGateway.acknowledgeCheckpoint方法
/**
* @param jobID 正在运行的作业的作业ID
* @param executionAttemptID 正在运行的任务的执行尝试ID
* @param checkpointId 此检查点的元数据
* @param checkpointMetrics 这个检查点的度量
* @param checkpointState 检查点的状态句柄
*/
@Override
public void acknowledgeCheckpoint(
final JobID jobID,
final ExecutionAttemptID executionAttemptID,
final long checkpointId,
final CheckpointMetrics checkpointMetrics,
final TaskStateSnapshot checkpointState) {
//todo schedulerNG 该对象, 负责调度Flink作业
//调用schedulerBase实现类的方法
schedulerNG.acknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointState);
}
进入schedulerNG的acknowledgeCheckpoint方法
在该方法中, 会获取checkpointCoordinator(checkpoint协调 器),AcknowledgeCheckpoint对象,然后获取获取taskManager位置,最后调用checkpointCoordinator的receiveAcknowledgeMessage方法
@Override
public void acknowledgeCheckpoint(final JobID jobID, final ExecutionAttemptID executionAttemptID, final long checkpointId, final CheckpointMetrics checkpointMetrics, final TaskStateSnapshot checkpointState) {
// 确保在主线程中运行
mainThreadExecutor.assertRunningInMainThread();
//从执行图中 获取checkpoint协调器
final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
/*
该对象用于
此消息从{@link org.apache.flink.runtime.taskexecutor}发送到{@link org.apache.flink.runtime.jobmaster},以指示单个任务的检查点已经完成。
*/
final AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(
jobID,
executionAttemptID,
checkpointId,
checkpointMetrics,
checkpointState);
//获取taskManager位置
final String taskManagerLocationInfo = retrieveTaskManagerLocation(executionAttemptID);
if (checkpointCoordinator != null) {
ioExecutor.execute(() -> {
try {
//该方法 接收确认AcknowledgeCheckpoint消息,并返回该消息是否与某个挂起的检查点相关联。
checkpointCoordinator.receiveAcknowledgeMessage(ackMessage, taskManagerLocationInfo);
} catch (Throwable t) {
log.warn("Error while processing checkpoint acknowledgement message", t);
}
});
} else {
String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator";
if (executionGraph.getState() == JobStatus.RUNNING) {
log.error(errorMessage, jobGraph.getJobID());
} else {
log.debug(errorMessage, jobGraph.getJobID());
}
}
}
现在进入checkpointCoordinator的receiveAcknowledgeMessage方法
public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message, String taskManagerLocationInfo) throws CheckpointException {
if (shutdown || message == null) {
return false;
}
if (!job.equals(message.getJob())) {
LOG.error("Received wrong AcknowledgeCheckpoint message for job {} from {} : {}", job, taskManagerLocationInfo, message);
return false;
}
//获取 检查点id
final long checkpointId = message.getCheckpointId();
//对此处代码添加线程锁, 同一时刻为单线程跑
synchronized (lock) {
// we need to check inside the lock for being shutdown as well, otherwise we
// get races and invalid error log messages
// 确保没有关闭
if (shutdown) {
return false;
}
//获取正在进行的checkpoint
final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);
if (checkpoint != null && !checkpoint.isDiscarded()) {
//acknowledgeTask 使用给定的执行尝试id和给定的子任务状态来识别任务
switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())) {
case SUCCESS: //成功
LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {} at {}.",
checkpointId, message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
//确认成功,并接收到了所有operator快照成功的确认
if (checkpoint.areTasksFullyAcknowledged()) {
//调用改方法
completePendingCheckpoint(checkpoint);
}
break;
case DUPLICATE: // 复制,副本
LOG.debug("Received a duplicate acknowledge message for checkpoint {}, task {}, job {}, location {}.",
message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
break;
case UNKNOWN: //未知的
LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {} at {}, " +
"because the task's execution attempt id was unknown. Discarding " +
"the state handle to avoid lingering state.", message.getCheckpointId(),
message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
break;
case DISCARDED: // 废弃的
LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {} at {}, " +
"because the pending checkpoint had been discarded. Discarding the " +
"state handle tp avoid lingering state.",
message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
}
return true;
} else if (checkpoint != null) {
// this should not happen
throw new IllegalStateException(
"Received message for discarded but non-removed checkpoint " + checkpointId);
} else {
boolean wasPendingCheckpoint;
// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
if (recentPendingCheckpoints.contains(checkpointId)) {
wasPendingCheckpoint = true;
LOG.warn("Received late message for now expired checkpoint attempt {} from task " +
"{} of job {} at {}.", checkpointId, message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
} else {
LOG.debug("Received message for an unknown checkpoint {} from task {} of job {} at {}.",
checkpointId, message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
wasPendingCheckpoint = false;
}
// try to discard the state so that we don't have lingering state lying around
discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
return wasPendingCheckpoint;
}
}
}
在进入该方法后,会先确认正在运行得checkpoint不等于null,并且不能是被丢弃的才会进入逻辑,
首先看一下switch中调用的方法,调用的checkpoint的acknowledgeTask方法,
该方法主要根据任务的状态,返回一个操作结果,
public TaskAcknowledgeResult acknowledgeTask(
ExecutionAttemptID executionAttemptId,
TaskStateSnapshot operatorSubtaskStates,
CheckpointMetrics metrics) {
synchronized (lock) {
//如果弃用 ,返回弃用
if (discarded) {
return TaskAcknowledgeResult.DISCARDED;
}
// 从notYetAcknowledgedTasks集合中移除已确认的task
// notYetAcknowledgedTasks保存了所有未确认的task
final ExecutionVertex vertex = notYetAcknowledgedTasks.remove(executionAttemptId);
if (vertex == null) {
// 如果notYetAcknowledgedTasks没有该task
// 但是它在acknowledgedTasks(已确认的task)集合中
// 返回重复确认DUPLICATE
if (acknowledgedTasks.contains(executionAttemptId)) {
return TaskAcknowledgeResult.DUPLICATE;
} else {
// 其他情况返回未知
return TaskAcknowledgeResult.UNKNOWN;
}
} else {
//如果 不等于null 将其添加至已确认的task集合
acknowledgedTasks.add(executionAttemptId);
}
List<OperatorID> operatorIDs = vertex.getJobVertex().getOperatorIDs();
int subtaskIndex = vertex.getParallelSubtaskIndex();
long ackTimestamp = System.currentTimeMillis();
long stateSize = 0L;
// 这段代码为保存各个operator的snapshot状态
if (operatorSubtaskStates != null) {
for (OperatorID operatorID : operatorIDs) {
//返回给定操作符id的子任务状态(如果不包含则返回null)。
OperatorSubtaskState operatorSubtaskState =
operatorSubtaskStates.getSubtaskStateByOperatorID(operatorID);
// if no real operatorSubtaskState was reported, we insert an empty state
//如果没获取到 operatorSubtaskSate的状态 给一个空状态
if (operatorSubtaskState == null) {
operatorSubtaskState = new OperatorSubtaskState();
}
//获取改 operator的状态
OperatorState operatorState = operatorStates.get(operatorID);
if (operatorState == null) {
//如果 operator有状态, 那么给添加一个默认的状态
operatorState = new OperatorState(
operatorID,
vertex.getTotalNumberOfParallelSubtasks(),
vertex.getMaxParallelism());
//添加的状态后 进行put
operatorStates.put(operatorID, operatorState);
}
//对 operator添加状态 平行任务的索引 operator子任务状态
operatorState.putState(subtaskIndex, operatorSubtaskState);
stateSize += operatorSubtaskState.getStateSize();
}
}
++numAcknowledgedTasks; //多少个确认的任务
// publish the checkpoint statistics
// to prevent null-pointers from concurrent modification, copy reference onto stack
// 这段代码为汇报所有子任务checkpoint状态
final PendingCheckpointStats statsCallback = this.statsCallback;
if (statsCallback != null) {
// Do this in millis because the web frontend works with them
long alignmentDurationMillis = metrics.getAlignmentDurationNanos() / 1_000_000;
long checkpointStartDelayMillis = metrics.getCheckpointStartDelayNanos() / 1_000_000;
SubtaskStateStats subtaskStateStats = new SubtaskStateStats(
subtaskIndex,
ackTimestamp,
stateSize,
metrics.getSyncDurationMillis(),
metrics.getAsyncDurationMillis(),
alignmentDurationMillis,
checkpointStartDelayMillis);
statsCallback.reportSubtaskStats(vertex.getJobvertexId(), subtaskStateStats);
}
// 最后返回执行成功
return TaskAcknowledgeResult.SUCCESS;
}
}
在switch语句中匹配到SUCCESS 会调用 completePendingCheckpoint方法, 在进入方法前会先判断checkpoint.areTasksFullyAcknowledged()方法,通过代码可以看出,所有任务都被确认,并且checkpoint不是被弃用的才会君如completePendingCheckpoint方法
boolean areTasksFullyAcknowledged() {
// 为确认的任务 =空 不是废弃的
return notYetAcknowledgedTasks.isEmpty() && !discarded;
}
现在跟着进入completePendingCheckpoint方法
该方法, 会先将operator的状态注册到 注册表中,然后回调用完成checkpoint的逻辑,到这里checkpoint的过程就已经结束了
private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {
final long checkpointId = pendingCheckpoint.getCheckpointId();
final CompletedCheckpoint completedCheckpoint;
// As a first step to complete the checkpoint, we register its state with the registry
// 作为完成检查点的第一步,我们在注册表中注册检查点的状态
// 注册所有operator的state到sharedStateRegistry
Map<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates();
sharedStateRegistry.registerAll(operatorStates.values());
try {
try {
// 调用完成pendingCheckpoint的逻辑,
completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();
// 重置失败checkpoint的计数
failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId());
} catch (Exception e1) {
// abort the current pending checkpoint if we fails to finalize the pending checkpoint.
// 如果我们未能完成挂起检查点,则终止当前挂起检查点
if (!pendingCheckpoint.isDiscarded()) {
abortPendingCheckpoint(
pendingCheckpoint,
new CheckpointException(
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1));
}
throw new CheckpointException("Could not finalize the pending checkpoint " + checkpointId + '.',
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);
}
// the pending checkpoint must be discarded after the finalization
// 检查状态,调用finalizeCheckpoint方法后pendingCheckpoint必须为discarded状态
Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null);
try {
//存储完成的checkpoint
completedCheckpointStore.addCheckpoint(completedCheckpoint);
} catch (Exception exception) {
// we failed to store the completed checkpoint. Let's clean up
// 未能存储完成的检查点,那么进行清理
executor.execute(new Runnable() {
@Override
public void run() {
try {
completedCheckpoint.discardOnFailedStoring();
} catch (Throwable t) {
LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t);
}
}
});
throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.',
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, exception);
}
} finally {
// 从正在进行中checkpoint集合中移除此checkpoint
pendingCheckpoints.remove(checkpointId);
// 恢复暂停周期性触发
resumePeriodicTriggering();
}
// 保存最近的checkpoint ID
rememberRecentCheckpointId(checkpointId);
// drop those pending checkpoints that are at prior to the completed one
// 在完成检查点之前删除那些挂起的检查点
// 挂掉所有id小于checkpointId的checkpoint操作(被挂掉的checkpoint不能是强制的)
dropSubsumedCheckpoints(checkpointId);
// record the time when this was completed, to calculate
// the 'min delay between checkpoints'
// 保存此次checkpoint完成时间
lastCheckpointCompletionRelativeTime = clock.relativeTimeMillis();
//Completed checkpoint 2 for job d36ca92c353c5fc9794d42fdff834b5d (9013 bytes in 106 ms).
// flink web监控 输出
LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job,
completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());
if (LOG.isDebugEnabled()) {
StringBuilder builder = new StringBuilder();
builder.append("Checkpoint state: ");
for (OperatorState state : completedCheckpoint.getOperatorStates().values()) {
builder.append(state);
builder.append(", ");
}
// Remove last two chars ", "
builder.setLength(builder.length() - 2);
LOG.debug(builder.toString());
}
// send the "notify complete" call to all vertices, coordinators, etc.
sendAcknowledgeMessages(checkpointId, completedCheckpoint.getTimestamp());
}
最后我们看一下PendingCheckpoint如何生成CompletedCheckpoint的过程,首先会调用pendingCheckpoint.finalizeCheckpoint方法,在方法中会存储checkpoint的元数据等操作,最终将pendingCheckpoint变成CompletedCheckpoint状态,在这之后会将pendingCheckpoint标记为弃用的状态,返回完成的checkpoint
public CompletedCheckpoint finalizeCheckpoint() throws IOException {
synchronized (lock) {
checkState(!isDiscarded(), "checkpoint is discarded");
checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet");
// make sure we fulfill the promise with an exception if something fails
try {
// write out the metadata
//创建一个 checkpointMetadata对象 --改对象封装 检查点或保存的元数据
final CheckpointMetadata savepoint = new CheckpointMetadata(checkpointId, operatorStates.values(), masterStates);
final CompletedCheckpointStorageLocation finalizedLocation;
// 保存checkpoint数据到文件系统
// createMetadataOutputStream 创建将检查点元数据持久化到的输出流
try (CheckpointMetadataOutputStream out = targetLocation.createMetadataOutputStream()) {
Checkpoints.storeCheckpointMetadata(savepoint, out);
//在写入所有元数据后关闭流并完成检查点位置
finalizedLocation = out.closeAndFinalizeCheckpoint();
}
// 创建一个 CompletedCheckpoint对象
// CompletedCheckpoint描述在所有需要的任务确认之后的检查点(带有它们的状态),它被认为是成功的。
// CompletedCheckpoint类包含检查点的所有元数据,即、检查点ID、时间戳以及检查点的所有状态的句柄
CompletedCheckpoint completed = new CompletedCheckpoint(
jobId,
checkpointId,
checkpointTimestamp,
System.currentTimeMillis(),
operatorStates,
masterStates,
props,
finalizedLocation);
// completableFuture任务完成,返回completedCheckpoint
onCompletionPromise.complete(completed);
// to prevent null-pointers from concurrent modification, copy reference onto stack
// 设置completedCheckpoint的discardCallback
PendingCheckpointStats statsCallback = this.statsCallback;
if (statsCallback != null) {
// Finalize the statsCallback and give the completed checkpoint a
// callback for discards.
CompletedCheckpointStats.DiscardCallback discardCallback =
statsCallback.reportCompletedCheckpoint(finalizedLocation.getExternalPointer());
//设置当丢弃此检查点时用于跟踪的回调
completed.setDiscardCallback(discardCallback);
}
// mark this pending checkpoint as disposed, but do NOT drop the state
// 标记自己为disposed状态
dispose(false);
return completed;
}
catch (Throwable t) {
onCompletionPromise.completeExceptionally(t);
ExceptionUtils.rethrowIOException(t);
return null; // silence the compiler
}
}
}
完成了pendingCheckpoint向completedCheckpoint转换后,会调用failureManager.handleCheckpointSuccess方法
这个方法主要是重置checkpoint失败的计数
public void handleCheckpointSuccess(@SuppressWarnings("unused") long checkpointId) {
clearCount();
}
checkpoint往期内容
Flink-1.10 源码笔记 checkpint - 1
Flink-1.10 源码笔记 checkpint - 2
如有错误,欢迎指正!