Flink-1.10 源码笔记 checkpint -- 2

Flink 源码笔记 --- checkpoint

接上篇内容

Flink-1.10 源码笔记 checkpint - 1

进入taskExecutorGateway的triggerCheckpoint方法

​ 获取slot中运行的任务,如果slot中运行着任务, 会task的triggerCheckpointBarrier方法触发task checkpoint barrier

    @Override
    public CompletableFuture<Acknowledge> triggerCheckpoint(
            ExecutionAttemptID executionAttemptID,
            long checkpointId,
            long checkpointTimestamp,
            CheckpointOptions checkpointOptions,
            boolean advanceToEndOfEventTime) {
        // Flink web ui 输出 @ Triggering checkpoint 1 @ 1590022656734 for job 8691156e98bad1c55fe5e869191de5f8.
        log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);

        //获取检查点类型
        final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
        if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
            throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
        }

        //获取slot中运行的任务 task
        final Task task = taskSlotTable.getTask(executionAttemptID);

        //如果运行的任务 不等于null
        if (task != null) {
            //触发task checkpoint barrier
            task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions, advanceToEndOfEventTime);

            return CompletableFuture.completedFuture(Acknowledge.get());
        } else {
            final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';

            log.debug(message);
            return FutureUtils.completedExceptionally(new CheckpointException(message, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));
        }
    }

现在进入triggerCheckpointBarrier方法中,该方法主要做的事情

1.获取AbstractInvokable 该对象是具体的task类型, AbstractInvokable是taskManager的基类, 这里可以是sourceStreamTask或 StreamTask
2.创建CheckpointMetaData对象 ,该对象用于封装checkpoint的元数据信息
3.调用invokable的triggerCheckpointAsync方法,调用的是StreamTask类下的,该方法是用于触发异步的检查点

public void triggerCheckpointBarrier(
            final long checkpointID,
            final long checkpointTimestamp,
            final CheckpointOptions checkpointOptions,
            final boolean advanceToEndOfEventTime) {
        // 是具体的task类型, AbstractInvokable是taskManager的基类,  这里可以是sourceStreamTask或 StreamTask
        // sourceStreamTask --> StreamTask --> AbstractInvokable
        final AbstractInvokable invokable = this.invokable;
        //该对象 用于封装checkpoint的元数据信息
        final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);

        //如果任务 储于 running状态  并且   invokable 不等于 null
        if (executionState == ExecutionState.RUNNING && invokable != null) {
            try {
                //sourceStreamTask的父类是 streamTask, 调用的方法也是父类中的triggerCheckpointAsync
                invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
            }
            catch (RejectedExecutionException ex) {
                // This may happen if the mailbox is closed. It means that the task is shutting down, so we just ignore it.
                LOG.debug(
                    "Triggering checkpoint {} for {} ({}) was rejected by the mailbox",
                    checkpointID, taskNameWithSubtask, executionId);
            }
            catch (Throwable t) {
                if (getExecutionState() == ExecutionState.RUNNING) {
                    failExternally(new Exception(
                        "Error while triggering checkpoint " + checkpointID + " for " +
                            taskNameWithSubtask, t));
                } else {
                    LOG.debug("Encountered error while triggering checkpoint {} for " +
                        "{} ({}) while being not in state running.", checkpointID,
                        taskNameWithSubtask, executionId, t);
                }
            }
        }
        else {
            LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);

            // send back a message that we did not do the checkpoint
            checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
                    new CheckpointException("Task name with subtask : " + taskNameWithSubtask, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY));
        }
    }

进入到StreamTask的triggerCheckpointAsync方法

    @Override
    public Future<Boolean> triggerCheckpointAsync(
            CheckpointMetaData checkpointMetaData,
            CheckpointOptions checkpointOptions,
            boolean advanceToEndOfEventTime) {

        //
        return mailboxProcessor.getMainMailboxExecutor().submit(
                () -> triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime),
                "checkpoint %s with %s",
            checkpointMetaData,
            checkpointOptions);
    }

​ 继续追踪triggerCheckpoint方法,主要逻辑都在performCheckpoint方法中

    private boolean triggerCheckpoint(
            CheckpointMetaData checkpointMetaData,
            CheckpointOptions checkpointOptions,
            boolean advanceToEndOfEventTime) throws Exception {
        try {
            // No alignment if we inject a checkpoint
            //初始化 checkpoint监控                                       设置 对齐时间
            CheckpointMetrics checkpointMetrics = new CheckpointMetrics().setAlignmentDurationNanos(0L);
            //协调子任务的协调员,    例如 sourceStreamTask  streamTask
            // todo getChannelStateWriter : 在检查点/保存点期间写入通道状态  start : 启动对给定检查点id的通道状态的写操作
            subtaskCheckpointCoordinator.getChannelStateWriter().start(checkpointMetaData.getCheckpointId(), checkpointOptions);

            //执行checkpoint
            boolean success = performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics, advanceToEndOfEventTime);
            if (!success) {
                declineCheckpoint(checkpointMetaData.getCheckpointId());
            }
            return success;
        } catch (Exception e) {
            // propagate exceptions only if the task is still in "running" state
            if (isRunning) {
                Exception exception = new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() +
                    " for operator " + getName() + '.', e);
                handleCheckpointException(exception);
                throw exception;
            } else {
                LOG.debug("Could not perform checkpoint {} for operator {} while the " +
                    "invokable was not in state running.", checkpointMetaData.getCheckpointId(), getName(), e);
                return false;
            }
        }
    }

进入performCheckpoint方法,该方法,如果成功那么执行checkpoint广播barrier的操作,否则通知下游取消某个检查点了,恢复正常的处理,主要的逻辑都在subtaskCheckpointCoordinator的checkpointState方法中

private boolean performCheckpoint(
            CheckpointMetaData checkpointMetaData,
            CheckpointOptions checkpointOptions,
            CheckpointMetrics checkpointMetrics,
            boolean advanceToEndOfTime) throws Exception {

        LOG.debug("Starting checkpoint ({}) {} on task {}",
            checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());

        if (isRunning) {
            actionExecutor.runThrowing(() -> {

                if (checkpointOptions.getCheckpointType().isSynchronous()) { //同步的
                    setSynchronousSavepointId(checkpointMetaData.getCheckpointId());

                    if (advanceToEndOfTime) {
                        advanceToEndOfEventTime();
                    }
                }

                //todo 执行checkpoint的过程
                subtaskCheckpointCoordinator.checkpointState(
                    checkpointMetaData,
                    checkpointOptions,
                    checkpointMetrics,
                    operatorChain,
                    this::isCanceled);
            });

            return true;
        } else {
            actionExecutor.runThrowing(() -> {
                // we cannot perform our checkpoint - let the downstream operators know that they
                // should not wait for any input from this operator

                // we cannot broadcast the cancellation markers on the 'operator chain', because it may not
                // yet be created
                // todo CancelCheckpointMarker在数据流中传递,类似于{@link CheckpointBarrier},但是它发出信号,
                //  表示应该取消某个检查点。需要取消该检查点的任何正在进行的对齐,并恢复正常的处理。
                final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
                recordWriter.broadcastEvent(message);
            });

            return false;
        }
    }

进入到subtaskCheckpointCoordinator的checkpointState方法,进入的是他的实现类SubtaskCheckpointCoordinatorImpl的checkpointState方法

@Override
    public void checkpointState(
            CheckpointMetaData metadata,
            CheckpointOptions options,
            CheckpointMetrics metrics,
            OperatorChain<?, ?> operatorChain,
            Supplier<Boolean> isCanceled) throws Exception {

        checkNotNull(options);
        checkNotNull(metrics);

        // All of the following steps happen as an atomic step from the perspective of barriers and
        // records/watermarks/timers/callbacks.
        // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
        // checkpoint alignments

        // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
        //           The pre-barrier work should be nothing or minimal in the common case.
        // todo 执行operatorChain的准备checkpoint barrier的逻辑
        operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());

        // Step (2): Send the checkpoint barrier downstream
        // 广播checkpoint屏障到下游
        // operatorChain保存了所有的数据输出
        // 遍历所有的输出,将checkpoint屏障发送给它们
        operatorChain.broadcastEvent(
                //创建一个 checkpointBarrier对象
            new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options),
            unalignedCheckpointEnabled);  //是否对齐

        // Step (3): Prepare to spill the in-flight buffers for input and output

        if (unalignedCheckpointEnabled) {
            prepareInflightDataSnapshot(metadata.getCheckpointId());
        }

        // Step (4): Take the state snapshot. This should be largely asynchronous, to not impact progress of the
        // streaming topology

        Map<OperatorID, OperatorSnapshotFutures> snapshotFutures = new HashMap<>(operatorChain.getNumberOfOperators());
        try {
            //对算子 进行快照
            takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isCanceled);
            finishAndReportAsync(snapshotFutures, metadata, metrics);
        } catch (Exception ex) {
            cleanup(snapshotFutures, metadata, metrics, options, ex);
        }
    }
在该方法中 分为四个步骤
1.执行operatorChain的准备checkpoint barrier的逻辑
operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());

进入prepareSnapshotPreBarrier方法,该方法会沿着 operator chain 向前 通知每个operator 准备 checkpoint

    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        // go forward through the operator chain and tell each operator
        // to prepare the checkpoint
        // 沿着 operator chain 向前 通知每个operator 准备 checkpoint
        for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators()) {
            operatorWrapper.getStreamOperator().prepareSnapshotPreBarrier(checkpointId);
        }
    }
2.广播barrier到下游
    operatorChain.broadcastEvent(
                //创建一个 checkpointBarrier对象
            new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options),
            unalignedCheckpointEnabled);  //是否对齐

进入broadcastEvent方法,将barrier进行广播发送到下游

    public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
        for (RecordWriterOutput<?> streamOutput : streamOutputs) {
            streamOutput.broadcastEvent(event, isPriorityEvent);
        }
    }

3.准备溢出正在运行的输入和输出缓冲区
    if (unalignedCheckpointEnabled) {
            prepareInflightDataSnapshot(metadata.getCheckpointId());
        }

进入prepareInflightDataSnapshot方法,主要准备缓冲区

    private void prepareInflightDataSnapshot(long checkpointId) throws IOException {
        prepareInputSnapshot.apply(channelStateWriter, checkpointId)
            .thenAccept(unused -> channelStateWriter.finishInput(checkpointId));

        ResultPartitionWriter[] writers = env.getAllWriters();
        for (ResultPartitionWriter writer : writers) {
            for (int i = 0; i < writer.getNumberOfSubpartitions(); i++) {
                ResultSubpartition subpartition = writer.getSubpartition(i);
                channelStateWriter.addOutputData(
                    checkpointId,
                    subpartition.getSubpartitionInfo(),
                    ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
                    subpartition.requestInflightBufferSnapshot().toArray(new Buffer[0]));
            }
        }
        channelStateWriter.finishOutput(checkpointId);
    }   

4.获取快照完成状态 -- 异步的去完成快照

先进入takeSnapshotSync方法

​ 该方法主要完成检查点同步 部分操作

    private void takeSnapshotSync(
            Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress,
            CheckpointMetaData checkpointMetaData,
            CheckpointMetrics checkpointMetrics,
            CheckpointOptions checkpointOptions,
            OperatorChain<?, ?> operatorChain,
            Supplier<Boolean> isCanceled) throws Exception {

        long checkpointId = checkpointMetaData.getCheckpointId();
        long started = System.nanoTime();

        ChannelStateWriteResult channelStateWriteResult = checkpointOptions.getCheckpointType() == CHECKPOINT ?
                                channelStateWriter.getWriteResult(checkpointId) :
                                ChannelStateWriteResult.EMPTY;
        //检查点输出流的工厂,用于为检查点持久化数据。
        CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(checkpointId, checkpointOptions.getTargetLocation());

        try {
            for (StreamOperatorWrapper<?, ?> operatorWrapper : operatorChain.getAllOperators(true)) {
                // key  操作符id   value 操作符checkpoint结果
                operatorSnapshotsInProgress.put(
                    operatorWrapper.getStreamOperator().getOperatorID(),
                    buildOperatorSnapshotFutures(
                        checkpointMetaData,
                        checkpointOptions,
                        operatorChain,
                        operatorWrapper.getStreamOperator(),
                        isCanceled,
                        channelStateWriteResult,
                        storage));
            }
        } finally {
            checkpointStorage.clearCacheFor(checkpointId);
        }

        LOG.debug(
            "{} - finished synchronous part of checkpoint {}. Alignment duration: {} ms, snapshot duration {} ms",
            taskName,
            checkpointId,
            checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
            checkpointMetrics.getSyncDurationMillis());

        checkpointMetrics.setSyncDurationMillis((System.nanoTime() - started) / 1_000_000);
    }

主要看finishAndReportAsync方法,该方法会调用AsyncCheckpointRunnable的run方法

private void finishAndReportAsync(Map<OperatorID, OperatorSnapshotFutures> snapshotFutures, CheckpointMetaData metadata, CheckpointMetrics metrics) {
        final Future<?> channelWrittenFuture;
        if (unalignedCheckpointEnabled) {
            ChannelStateWriteResult writeResult = channelStateWriter.getWriteResult(metadata.getCheckpointId());
            channelWrittenFuture = CompletableFuture.allOf(
                writeResult.getInputChannelStateHandles(),
                writeResult.getResultSubpartitionStateHandles());
        } else {
            channelWrittenFuture = FutureUtils.completedVoidFuture();
        }
        // we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
        //该方法 会调用  AsyncCheckpointRunnable.run方法,
        //run 方法中封装完成了 snapshot过程的逻辑
        executorService.execute(

            new AsyncCheckpointRunnable( //执行需要完成snapshot过程的逻辑
            snapshotFutures,
            metadata,
            metrics,
            channelWrittenFuture,
            System.nanoTime(),
            taskName,
            closeableRegistry,
            env,
            asyncExceptionHandler));
    }


在该方法中的主要逻辑都在AsyncCheckpointRunnable的run方法中,该方法封装了完成执行snapshot过程的逻辑,

@Override
    public void run() {
        FileSystemSafetyNet.initializeSafetyNetForThread();
        try {
            //注册一个 可闭算子
            closeableRegistry.registerCloseable(this);

            //作业管理器任务操作符子任务状态
            TaskStateSnapshot jobManagerTaskOperatorSubtaskStates = new TaskStateSnapshot(operatorSnapshotsInProgress.size());
            //本地任务操作符子任务状态
            TaskStateSnapshot localTaskOperatorSubtaskStates = new TaskStateSnapshot(operatorSnapshotsInProgress.size());

            for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry : operatorSnapshotsInProgress.entrySet()) {
                //获取算子id
                OperatorID operatorID = entry.getKey();
                //获取每个 operator正在运行的快照任务
                OperatorSnapshotFutures snapshotInProgress = entry.getValue();

                // finalize the async part of all by executing all snapshot runnables
                // 创建finalizedSnapshots,创建的时候
                // 执行快照过程中的异步方法
                // todo OperatorSnapshotFinalizer 该对象完成{@link OperatorSnapshotFutures}。
                // todo 每个对象都是用{@link OperatorSnapshotFutures}创建的
                // todo 执行。然后,对象可以将执行的结果作为{@link OperatorSubtaskState}交付。
                OperatorSnapshotFinalizer finalizedSnapshots =
                    new OperatorSnapshotFinalizer(snapshotInProgress);

                // operator快照状态的一个副本,用于向JobManager汇报的快照状态
                jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
                    operatorID,
                    finalizedSnapshots.getJobManagerOwnedState());
                // operator快照状态的一个副本,用于Task本地的故障快速恢复
                localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
                    operatorID,
                    finalizedSnapshots.getTaskLocalState());
            }

            final long asyncEndNanos = System.nanoTime();
            final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000L;
            //为监控系统提供快照操作的耗时
            checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);

            channelWrittenFuture.get();

            //如果 快照状态从 运行变为 完成
            if (asyncCheckpointState.compareAndSet(AsyncCheckpointState.RUNNING, AsyncCheckpointState.COMPLETED)) {

                //报告已完成快照的状态
                reportCompletedSnapshotStates(
                    jobManagerTaskOperatorSubtaskStates,
                    localTaskOperatorSubtaskStates,
                    asyncDurationMillis);

            } else {
                LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.",
                    taskName,
                    checkpointMetaData.getCheckpointId());
            }
        } catch (Exception e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("{} - asynchronous part of checkpoint {} could not be completed.",
                    taskName,
                    checkpointMetaData.getCheckpointId(),
                    e);
            }
            handleExecutionException(e);
        } finally {
            closeableRegistry.unregisterCloseable(this);
            FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
        }
    }

上一篇

Flink-1.10 源码笔记 checkpint - 1

下一篇

Flink-1.10 源码笔记 checkpint - 3

如有错误,欢迎指正!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,602评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,442评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,878评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,306评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,330评论 5 373
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,071评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,382评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,006评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,512评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,965评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,094评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,732评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,283评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,286评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,512评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,536评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,828评论 2 345

推荐阅读更多精彩内容