Flink源码阅读(四)--- checkpoint制作这篇文章介绍了checkpoint制作原理,这篇文章在此基础上,介绍下怎么从checkpoint/savepoint恢复。本文内容是基于Flink 1.9来讲解。
1. 概述
作业从状态 checkpoint / savepoint 的情况简单总结主要是两种
- 作业手动重启,从savepoint恢复
- 作业运行过程中,某个task执行失败,从checkpoint恢复
savepoint是一种人为主动触发生成的checkpoint,所以checkpoint/savepoint 恢复的原理是一样的。下面以工作中比较常见的某个task失败,作业如何恢复为例进行介绍。
2. 状态分配
首先说明下Task的状态state都有哪些,可以看ExecutionState.java类
CREATED,
SCHEDULED,
DEPLOYING,
RUNNING,
/**
* This state marks "successfully completed". It can only be reached when a
* program reaches the "end of its input". The "end of input" can be reached
* when consuming a bounded input (fix set of files, bounded query, etc) or
* when stopping a program (not cancelling!) which make the input look like
* it reached its end at a specific point.
*/
FINISHED,
CANCELING,
CANCELED,
FAILED,
RECONCILING;
Task各个state的转换关系如下:
* CREATED -> SCHEDULED -> DEPLOYING -> RUNNING -> FINISHED
* | | | |
* | | | +------+
* | | V V
* | | CANCELLING -----+----> CANCELED
* | | |
* | +-------------------------+
* |
* | ... -> FAILED
* V
* RECONCILING -> RUNNING | FINISHED | CANCELED | FAILED
Task进行state转换,是调用的
Execution#transitionState --> vertex.notifyStateTransition --> getExecutionGraph().notifyExecutionChange
如果task变成FAILED,就会调用
failoverStrategy.onTaskFailure --> AdaptedRestartPipelinedRegionStrategyNG#onTaskFailure --> restartTasks --> resetAndRescheduleTasks --> createResetAndRescheduleTasksCallback
这里restartTasks方法的参数是该Pipeline上所有需要restart的task。
重点看下createResetAndRescheduleTasksCallback方法做了什么,看下源码
LOG.info("Finally restart {} tasks to recover from task failure.", unmodifiedVertices.size());
// reset tasks to CREATED state and reload state
resetTasks(unmodifiedVertices, globalModVersion);
// re-schedule tasks
rescheduleTasks(unmodifiedVertices, globalModVersion);
做了两件事情,重置Tasks (状态分配) 和 重新调度Tasks,下面介绍下重置Tasks方法
2.1 重置Tasks
第一步:为每个节点重置Execution
for (ExecutionVertex ev : vertices) {
CoLocationGroup cgroup = ev.getJobVertex().getCoLocationGroup();
if (cgroup != null && !colGroups.contains(cgroup)){
cgroup.resetConstraints();
colGroups.add(cgroup);
}
ev.resetForNewExecution(restartTimestamp, globalModVersion);
}
第二步:把pendingCheckpoints这个map中所有正在做的checkpoint fail掉
executionGraph.getCheckpointCoordinator().abortPendingCheckpoints(
new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION));
第三步:从最近完成的checkpoint恢复state
executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedState(
involvedExecutionJobVertices, false, true);
接下来重点看下第三步怎么从checkpoint恢复的?
2.1.1 首先找到最近完成的一个latestCheckpoint
如果latestCheckpoint==null
如果 errorIfNoCheckpoint 开关为true,直接拋IllegalStateException
如果 errorIfNoCheckpoint 开关为false,直接return
2.1.2 给Tasks分配states,stateAssignmentOperation.assignStates(),主要做了下面几件事情:
1. 对于checkpoint中所有的operatorStates,check在新tasks中是否都有对应的operatorID。如果在新tasks中缺少operatorStates中某一个operatorID,(i) allowNonRestoredState==true, 跳过该operatorID (ii) allowNonRestoredState==false, 拋IllegalStateException异常。
checkStateMappingCompleteness(allowNonRestoredState, operatorStates, tasks);
2. 遍历所有的Tasks
(1) 对于每个Task的所有operator:(i) 如果在checkpoint中存在对应的state,直接记录在operatorStates list中,(ii) 如果在checkpoint中没有对应的state,就为该operatorID初始化一个OperatorState,并记录在operatorStates list中。(iii) 对于并发度改变的缩扩容情况,对state进行重新分配,具体可以参考 state缩扩容。
最终每个Task分配的状态被封装在 JobManagerTaskRestore 中,然后通过 Execution.setInitialState() 关联到 Execution 中。JobManagerTaskRestore 会作为 TaskDeploymentDescriptor 的一个属性下发到 TaskExecutor 中。 缩扩容state重新分配简单总结如下:
Operator State:state存储实现ListCheckpointed接口,这种实现的优点是可以对state根据并发方便重新分配。用户也可以重写restore state逻辑。
Keyed State:Flink引入了Key Group的概念,将Key Group作为Keyed State的基本分配单元,如果并发度改变,就可以重新计算key group分配,然后分到不同的算子中。
(iiii)补充一点,在对state重新分配的时候,会检查新提交tasks的Parallelism与上次operatorStates的MaxParallelism的关系,源码可参考 StateAssignmentOperation#checkParallelismPreconditions方法
1. 如果 task的并发度 > checkpoint中operatorState的最大并发度, 就直接抛异常
2. 如果 task的最大并发度 != operatorState的最大并发度
2.1 如果 task的最大并发度没有自己配置,那把task的最大并发度就设置为operatorState的最大并发度
2.2 如果自己配置了最大并发度,就直接抛异常
if (operatorState.getMaxParallelism() < executionJobVertex.getParallelism()) {
throw new IllegalStateException(
"The state for task "
+ executionJobVertex.getJobVertexId()
+ " can not be restored. The maximum parallelism ("
+ operatorState.getMaxParallelism()
+ ") of the restored state is lower than the configured parallelism ("
+ executionJobVertex.getParallelism()
+ "). Please reduce the parallelism of the task to be lower or equal to the maximum parallelism.");
}
// check that the number of key groups have not changed or if we need to override it to
// satisfy the restored state
if (operatorState.getMaxParallelism() != executionJobVertex.getMaxParallelism()) {
if (!executionJobVertex.isMaxParallelismConfigured()) {
// if the max parallelism was not explicitly specified by the user, we derive it
// from the state
LOG.debug(
"Overriding maximum parallelism for JobVertex {} from {} to {}",
executionJobVertex.getJobVertexId(),
executionJobVertex.getMaxParallelism(),
operatorState.getMaxParallelism());
executionJobVertex.setMaxParallelism(operatorState.getMaxParallelism());
} else {
// if the max parallelism was explicitly specified, we complain on mismatch
throw new IllegalStateException(
"The maximum parallelism ("
+ operatorState.getMaxParallelism()
+ ") with which the latest "
+ "checkpoint of the execution job vertex "
+ executionJobVertex
+ " has been taken and the current maximum parallelism ("
+ executionJobVertex.getMaxParallelism()
+ ") changed. This "
+ "is currently not supported.");
}
}
至于operatorState的最大并发度怎么计算的,等于存储operator对应的ExecutionJobVertex的最大并发度,ExecutionJobVertex的最大并发度可以参考ExecutionJobVertex类的构造方法
1. 如果task设置了最大并发度,就按照设置的来
2. 如果task没有设置最大并发度,就根据算子并发度来计算,可以参考 KeyGroupRangeAssignment#computeDefaultMaxParallelism方法,min(max(parallelism向上取整到2的最近幂, 2^7), 2^15)
public static int computeDefaultMaxParallelism(int operatorParallelism) {
checkParallelismPreconditions(operatorParallelism);
return Math.min(
Math.max(
MathUtils.roundUpToPowerOfTwo(
operatorParallelism + (operatorParallelism / 2)),
DEFAULT_LOWER_BOUND_MAX_PARALLELISM),
UPPER_BOUND_MAX_PARALLELISM);
}
至此,重置Tasks的逻辑大体就介绍完了。
2.2 调度Tasks
入口是AdaptedRestartPipelinedRegionStrategyNG#rescheduleTasks方法,真正开始执行调度的是SchedulingUtils.schedule方法。
关于task调度的内容,可以看下我之前写的一篇文章 Flink作业提交(三)--- Job运行, 调度分为两步,申请slot和deploy task。
在deploy task的时候,首先会调用StreamTask.invoke()方法,在invoke方法中,会对该Task中每个operator调用initializeState()方法,这里看下initializeState#initializeState源码
private void initializeState() throws Exception {
StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
for (StreamOperator<?> operator : allOperators) {
if (null != operator) {
operator.initializeState();
}
}
}
然后会调用AbstractStreamOperator#initializeState方法
@Override
public final void initializeState() throws Exception {
final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());
final StreamTask<?, ?> containingTask =
Preconditions.checkNotNull(getContainingTask());
final CloseableRegistry streamTaskCloseableRegistry =
Preconditions.checkNotNull(containingTask.getCancelables());
final StreamTaskStateInitializer streamTaskStateManager =
Preconditions.checkNotNull(containingTask.createStreamTaskStateInitializer());
final StreamOperatorStateContext context =
streamTaskStateManager.streamOperatorStateContext(
getOperatorID(),
getClass().getSimpleName(),
this,
keySerializer,
streamTaskCloseableRegistry,
metrics);
this.operatorStateBackend = context.operatorStateBackend();
this.keyedStateBackend = context.keyedStateBackend();
if (keyedStateBackend != null) {
this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());
}
timeServiceManager = context.internalTimerServiceManager();
CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs = context.rawKeyedStateInputs();
CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = context.rawOperatorStateInputs();
try {
StateInitializationContext initializationContext = new StateInitializationContextImpl(
context.isRestored(), // information whether we restore or start for the first time
operatorStateBackend, // access to operator state backend
keyedStateStore, // access to keyed state backend
keyedStateInputs, // access to keyed state stream
operatorStateInputs); // access to operator state stream
initializeState(initializationContext);
} finally {
closeFromRegistry(operatorStateInputs, streamTaskCloseableRegistry);
closeFromRegistry(keyedStateInputs, streamTaskCloseableRegistry);
}
}
上面提到 TaskExecutor 使用 TaskStateManager 来管理当前 Task 的状态,TaskStateManager 对象会基于分配的 JobManagerTaskRestore 和本地状态存储 TaskLocalStateStore 进行创建。
状态初始化的关键方法在于通过 StreamTaskStateInitializer.streamOperatorStateContext() 生成 StreamOperatorStateContext,通过 StreamOperatorStateContext 可以获取 operatorStateBackend,Raw State Streams,operatorStateBackend以及timeServiceManager等,然后就可以进行状态恢复了。
咱们接着看下StreamOperatorStateContext是怎么生成的,具体实现可以看下 StreamTaskStateInitializerImpl#streamOperatorStateContext方法
TaskInfo taskInfo = environment.getTaskInfo();
OperatorSubtaskDescriptionText operatorSubtaskDescription =
new OperatorSubtaskDescriptionText(
operatorID,
operatorClassName,
taskInfo.getIndexOfThisSubtask(),
taskInfo.getNumberOfParallelSubtasks());
final String operatorIdentifierText = operatorSubtaskDescription.toString();
final PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates =
taskStateManager.prioritizedOperatorState(operatorID);
AbstractKeyedStateBackend<?> keyedStatedBackend = null;
OperatorStateBackend operatorStateBackend = null;
CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = null;
CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = null;
InternalTimeServiceManager<?> timeServiceManager;
try {
// -------------- Keyed State Backend --------------
keyedStatedBackend = keyedStatedBackend(
keySerializer,
operatorIdentifierText,
prioritizedOperatorSubtaskStates,
streamTaskCloseableRegistry,
metricGroup);
// -------------- Operator State Backend --------------
operatorStateBackend = operatorStateBackend(
operatorIdentifierText,
prioritizedOperatorSubtaskStates,
streamTaskCloseableRegistry);
// -------------- Raw State Streams --------------
rawKeyedStateInputs = rawKeyedStateInputs(
prioritizedOperatorSubtaskStates.getPrioritizedRawKeyedState().iterator());
streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);
rawOperatorStateInputs = rawOperatorStateInputs(
prioritizedOperatorSubtaskStates.getPrioritizedRawOperatorState().iterator());
streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);
// -------------- Internal Timer Service Manager --------------
timeServiceManager = internalTimeServiceManager(keyedStatedBackend, keyContext, rawKeyedStateInputs);
// -------------- Preparing return value --------------
return new StreamOperatorStateContextImpl(
prioritizedOperatorSubtaskStates.isRestored(),
operatorStateBackend,
keyedStatedBackend,
timeServiceManager,
rawOperatorStateInputs,
rawKeyedStateInputs);
- 为了生成 StreamOperatorStateContext
1. 通过 TaskStateManager.prioritizedOperatorState() 方法获得每个 Operator 需要恢复的状态句柄。
2. 使用获得的状态句柄创建并还原 state backend 和 timer。这里引入了 PrioritizedOperatorSubtaskState,它封装了多个备选的 OperatorSubtaskState快照,这些快照相互之间是可以(部分)替换的,并按照优先级排序。
小结
本篇文章介绍了当作业某些Task fail之后,Task状态如何分配,以及调度Task怎么使用state进行恢复。