Flink源码阅读(五)--- checkpoint / savepoint恢复

Flink源码阅读(四)--- checkpoint制作这篇文章介绍了checkpoint制作原理,这篇文章在此基础上,介绍下怎么从checkpoint/savepoint恢复。本文内容是基于Flink 1.9来讲解。

1. 概述

作业从状态 checkpoint / savepoint 的情况简单总结主要是两种

  • 作业手动重启,从savepoint恢复
  • 作业运行过程中,某个task执行失败,从checkpoint恢复

savepoint是一种人为主动触发生成的checkpoint,所以checkpoint/savepoint 恢复的原理是一样的。下面以工作中比较常见的某个task失败,作业如何恢复为例进行介绍。

2. 状态分配

首先说明下Task的状态state都有哪些,可以看ExecutionState.java类

    CREATED,
    
    SCHEDULED,
    
    DEPLOYING,
    
    RUNNING,

    /**
     * This state marks "successfully completed". It can only be reached when a
     * program reaches the "end of its input". The "end of input" can be reached
     * when consuming a bounded input (fix set of files, bounded query, etc) or
     * when stopping a program (not cancelling!) which make the input look like
     * it reached its end at a specific point.
     */
    FINISHED,
    
    CANCELING,
    
    CANCELED,
    
    FAILED,

    RECONCILING;

Task各个state的转换关系如下:

 *     CREATED  -> SCHEDULED -> DEPLOYING -> RUNNING -> FINISHED
 *        |            |            |          |
 *        |            |            |   +------+
 *        |            |            V   V
 *        |            |         CANCELLING -----+----> CANCELED
 *        |            |                         |
 *        |            +-------------------------+
 *        |
 *        |                                   ... -> FAILED
 *        V
 *    RECONCILING  -> RUNNING | FINISHED | CANCELED | FAILED

Task进行state转换,是调用的

Execution#transitionState --> vertex.notifyStateTransition --> getExecutionGraph().notifyExecutionChange

如果task变成FAILED,就会调用

failoverStrategy.onTaskFailure --> AdaptedRestartPipelinedRegionStrategyNG#onTaskFailure --> restartTasks --> resetAndRescheduleTasks --> createResetAndRescheduleTasksCallback

这里restartTasks方法的参数是该Pipeline上所有需要restart的task。

重点看下createResetAndRescheduleTasksCallback方法做了什么,看下源码

LOG.info("Finally restart {} tasks to recover from task failure.", unmodifiedVertices.size());

// reset tasks to CREATED state and reload state
resetTasks(unmodifiedVertices, globalModVersion);

// re-schedule tasks
rescheduleTasks(unmodifiedVertices, globalModVersion);

做了两件事情,重置Tasks (状态分配) 和 重新调度Tasks,下面介绍下重置Tasks方法

2.1 重置Tasks

第一步:为每个节点重置Execution

        for (ExecutionVertex ev : vertices) {
            CoLocationGroup cgroup = ev.getJobVertex().getCoLocationGroup();
            if (cgroup != null && !colGroups.contains(cgroup)){
                cgroup.resetConstraints();
                colGroups.add(cgroup);
            }

            ev.resetForNewExecution(restartTimestamp, globalModVersion);
        }

第二步:把pendingCheckpoints这个map中所有正在做的checkpoint fail掉

executionGraph.getCheckpointCoordinator().abortPendingCheckpoints(
                new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION));

第三步:从最近完成的checkpoint恢复state

executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedState(
                involvedExecutionJobVertices, false, true);

接下来重点看下第三步怎么从checkpoint恢复的?
2.1.1 首先找到最近完成的一个latestCheckpoint
   如果latestCheckpoint==null
      如果 errorIfNoCheckpoint 开关为true,直接拋IllegalStateException
      如果 errorIfNoCheckpoint 开关为false,直接return
2.1.2 给Tasks分配states,stateAssignmentOperation.assignStates(),主要做了下面几件事情:
   1. 对于checkpoint中所有的operatorStates,check在新tasks中是否都有对应的operatorID。如果在新tasks中缺少operatorStates中某一个operatorID,(i) allowNonRestoredState==true, 跳过该operatorID (ii) allowNonRestoredState==false, 拋IllegalStateException异常。

checkStateMappingCompleteness(allowNonRestoredState, operatorStates, tasks);

   2. 遍历所有的Tasks
(1) 对于每个Task的所有operator:(i) 如果在checkpoint中存在对应的state,直接记录在operatorStates list中,(ii) 如果在checkpoint中没有对应的state,就为该operatorID初始化一个OperatorState,并记录在operatorStates list中。(iii) 对于并发度改变的缩扩容情况,对state进行重新分配,具体可以参考 state缩扩容
   最终每个Task分配的状态被封装在 JobManagerTaskRestore 中,然后通过 Execution.setInitialState() 关联到 Execution 中。JobManagerTaskRestore 会作为 TaskDeploymentDescriptor 的一个属性下发到 TaskExecutor 中。 缩扩容state重新分配简单总结如下:
   Operator State:state存储实现ListCheckpointed接口,这种实现的优点是可以对state根据并发方便重新分配。用户也可以重写restore state逻辑。
   Keyed State:Flink引入了Key Group的概念,将Key Group作为Keyed State的基本分配单元,如果并发度改变,就可以重新计算key group分配,然后分到不同的算子中。

   (iiii)补充一点,在对state重新分配的时候,会检查新提交tasks的Parallelism与上次operatorStates的MaxParallelism的关系,源码可参考 StateAssignmentOperation#checkParallelismPreconditions方法
   1. 如果 task的并发度 > checkpoint中operatorState的最大并发度, 就直接抛异常
   2. 如果 task的最大并发度 != operatorState的最大并发度
     2.1 如果 task的最大并发度没有自己配置,那把task的最大并发度就设置为operatorState的最大并发度
     2.2 如果自己配置了最大并发度,就直接抛异常

        if (operatorState.getMaxParallelism() < executionJobVertex.getParallelism()) {
            throw new IllegalStateException(
                    "The state for task "
                            + executionJobVertex.getJobVertexId()
                            + " can not be restored. The maximum parallelism ("
                            + operatorState.getMaxParallelism()
                            + ") of the restored state is lower than the configured parallelism ("
                            + executionJobVertex.getParallelism()
                            + "). Please reduce the parallelism of the task to be lower or equal to the maximum parallelism.");
        }

        // check that the number of key groups have not changed or if we need to override it to
        // satisfy the restored state
        if (operatorState.getMaxParallelism() != executionJobVertex.getMaxParallelism()) {

            if (!executionJobVertex.isMaxParallelismConfigured()) {
                // if the max parallelism was not explicitly specified by the user, we derive it
                // from the state

                LOG.debug(
                        "Overriding maximum parallelism for JobVertex {} from {} to {}",
                        executionJobVertex.getJobVertexId(),
                        executionJobVertex.getMaxParallelism(),
                        operatorState.getMaxParallelism());

                executionJobVertex.setMaxParallelism(operatorState.getMaxParallelism());
            } else {
                // if the max parallelism was explicitly specified, we complain on mismatch
                throw new IllegalStateException(
                        "The maximum parallelism ("
                                + operatorState.getMaxParallelism()
                                + ") with which the latest "
                                + "checkpoint of the execution job vertex "
                                + executionJobVertex
                                + " has been taken and the current maximum parallelism ("
                                + executionJobVertex.getMaxParallelism()
                                + ") changed. This "
                                + "is currently not supported.");
            }
        }

至于operatorState的最大并发度怎么计算的,等于存储operator对应的ExecutionJobVertex的最大并发度,ExecutionJobVertex的最大并发度可以参考ExecutionJobVertex类的构造方法
   1. 如果task设置了最大并发度,就按照设置的来
   2. 如果task没有设置最大并发度,就根据算子并发度来计算,可以参考 KeyGroupRangeAssignment#computeDefaultMaxParallelism方法,min(max(parallelism向上取整到2的最近幂, 2^7), 2^15)

    public static int computeDefaultMaxParallelism(int operatorParallelism) {

        checkParallelismPreconditions(operatorParallelism);

        return Math.min(
                Math.max(
                        MathUtils.roundUpToPowerOfTwo(
                                operatorParallelism + (operatorParallelism / 2)),
                        DEFAULT_LOWER_BOUND_MAX_PARALLELISM),
                UPPER_BOUND_MAX_PARALLELISM);
    }

至此,重置Tasks的逻辑大体就介绍完了。

2.2 调度Tasks

入口是AdaptedRestartPipelinedRegionStrategyNG#rescheduleTasks方法,真正开始执行调度的是SchedulingUtils.schedule方法。

关于task调度的内容,可以看下我之前写的一篇文章 Flink作业提交(三)--- Job运行, 调度分为两步,申请slot和deploy task。

在deploy task的时候,首先会调用StreamTask.invoke()方法,在invoke方法中,会对该Task中每个operator调用initializeState()方法,这里看下initializeState#initializeState源码

    private void initializeState() throws Exception {

        StreamOperator<?>[] allOperators = operatorChain.getAllOperators();

        for (StreamOperator<?> operator : allOperators) {
            if (null != operator) {
                operator.initializeState();
            }
        }
    }

然后会调用AbstractStreamOperator#initializeState方法

    @Override
    public final void initializeState() throws Exception {

        final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());

        final StreamTask<?, ?> containingTask =
            Preconditions.checkNotNull(getContainingTask());
        final CloseableRegistry streamTaskCloseableRegistry =
            Preconditions.checkNotNull(containingTask.getCancelables());
        final StreamTaskStateInitializer streamTaskStateManager =
            Preconditions.checkNotNull(containingTask.createStreamTaskStateInitializer());

        final StreamOperatorStateContext context =
            streamTaskStateManager.streamOperatorStateContext(
                getOperatorID(),
                getClass().getSimpleName(),
                this,
                keySerializer,
                streamTaskCloseableRegistry,
                metrics);

        this.operatorStateBackend = context.operatorStateBackend();
        this.keyedStateBackend = context.keyedStateBackend();

        if (keyedStateBackend != null) {
            this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());
        }

        timeServiceManager = context.internalTimerServiceManager();

        CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs = context.rawKeyedStateInputs();
        CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = context.rawOperatorStateInputs();

        try {
            StateInitializationContext initializationContext = new StateInitializationContextImpl(
                context.isRestored(), // information whether we restore or start for the first time
                operatorStateBackend, // access to operator state backend
                keyedStateStore, // access to keyed state backend
                keyedStateInputs, // access to keyed state stream
                operatorStateInputs); // access to operator state stream

            initializeState(initializationContext);
        } finally {
            closeFromRegistry(operatorStateInputs, streamTaskCloseableRegistry);
            closeFromRegistry(keyedStateInputs, streamTaskCloseableRegistry);
        }
    }
  • 上面提到 TaskExecutor 使用 TaskStateManager 来管理当前 Task 的状态,TaskStateManager 对象会基于分配的 JobManagerTaskRestore 和本地状态存储 TaskLocalStateStore 进行创建。

  • 状态初始化的关键方法在于通过 StreamTaskStateInitializer.streamOperatorStateContext() 生成 StreamOperatorStateContext,通过 StreamOperatorStateContext 可以获取 operatorStateBackend,Raw State Streams,operatorStateBackend以及timeServiceManager等,然后就可以进行状态恢复了。

  • 咱们接着看下StreamOperatorStateContext是怎么生成的,具体实现可以看下 StreamTaskStateInitializerImpl#streamOperatorStateContext方法

        TaskInfo taskInfo = environment.getTaskInfo();
        OperatorSubtaskDescriptionText operatorSubtaskDescription =
            new OperatorSubtaskDescriptionText(
                operatorID,
                operatorClassName,
                taskInfo.getIndexOfThisSubtask(),
                taskInfo.getNumberOfParallelSubtasks());

        final String operatorIdentifierText = operatorSubtaskDescription.toString();

        final PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates =
            taskStateManager.prioritizedOperatorState(operatorID);

        AbstractKeyedStateBackend<?> keyedStatedBackend = null;
        OperatorStateBackend operatorStateBackend = null;
        CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = null;
        CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = null;
        InternalTimeServiceManager<?> timeServiceManager;

        try {

            // -------------- Keyed State Backend --------------
            keyedStatedBackend = keyedStatedBackend(
                keySerializer,
                operatorIdentifierText,
                prioritizedOperatorSubtaskStates,
                streamTaskCloseableRegistry,
                metricGroup);

            // -------------- Operator State Backend --------------
            operatorStateBackend = operatorStateBackend(
                operatorIdentifierText,
                prioritizedOperatorSubtaskStates,
                streamTaskCloseableRegistry);

            // -------------- Raw State Streams --------------
            rawKeyedStateInputs = rawKeyedStateInputs(
                prioritizedOperatorSubtaskStates.getPrioritizedRawKeyedState().iterator());
            streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);

            rawOperatorStateInputs = rawOperatorStateInputs(
                prioritizedOperatorSubtaskStates.getPrioritizedRawOperatorState().iterator());
            streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);

            // -------------- Internal Timer Service Manager --------------
            timeServiceManager = internalTimeServiceManager(keyedStatedBackend, keyContext, rawKeyedStateInputs);

            // -------------- Preparing return value --------------

            return new StreamOperatorStateContextImpl(
                prioritizedOperatorSubtaskStates.isRestored(),
                operatorStateBackend,
                keyedStatedBackend,
                timeServiceManager,
                rawOperatorStateInputs,
                rawKeyedStateInputs);
  • 为了生成 StreamOperatorStateContext
       1. 通过 TaskStateManager.prioritizedOperatorState() 方法获得每个 Operator 需要恢复的状态句柄。
       2. 使用获得的状态句柄创建并还原 state backend 和 timer。这里引入了 PrioritizedOperatorSubtaskState,它封装了多个备选的 OperatorSubtaskState快照,这些快照相互之间是可以(部分)替换的,并按照优先级排序。

小结

本篇文章介绍了当作业某些Task fail之后,Task状态如何分配,以及调度Task怎么使用state进行恢复。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容