Flink源码阅读(四)--- checkpoint制作

本文内容是基于Flink 1.9来讲解。Flink使用checkpoint检查点来保证Exactly-Once语义的,这篇文章会从源码角度分析下checkpoint怎么触发的。首先说下checkpoint原理,可以直接参考数据流容错官方文档。

概述

Flink分布式快照的一个核心概念就是barrier,barrier会和数据记录一起下发到stream中,是一种特殊的消息,非常轻量。一个barrier就能把数据记录分到不同的snapshot中,不同snapshot对应的barrier可能同时在stream中,这也就意味着可以同时制作多个snapshot。
下面这两个图给出了barrier分割snapshot的原理,以及checkpoint制作过程


barrier.png
checkpoint.png

checkpoint制作

1. 源码入口 ExecutionGraphBuilder#buildGraph

在生成ExecutionGraph的时候,如果设置了开启checkpoint,那在buildGraph方法中会调用executionGraph.enableCheckpointing方法,咱们接着看下这个方法做了什么。

  • 构建CheckpointCoordinator对象。CheckpointCoordinator主要负责触发checkpoint制作并接收task的ack信息,收集并维护task发送的ack state全局视图。
  • 设置checkpoint状态追踪器
  • 创建CheckpointCoordinatorDeActivator对象,该对象会监控JobStatus是activates/deactivates来启动/停止checkpoint scheduler

2. CheckpointCoordinatorDeActivator是怎么触发checkpoint制作的

2.1 看下CheckpointCoordinatorDeActivator#jobStatusChanges方法
    @Override
    public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
        if (newJobStatus == JobStatus.RUNNING) {
            // start the checkpoint scheduler
            coordinator.startCheckpointScheduler();
        } else {
            // anything else should stop the trigger for now
            coordinator.stopCheckpointScheduler();
        }
    }

如果监控到JobStatus变成RUNNING,就会调用CheckpointCoordinator的startCheckpointScheduler方法,最终会调用CheckpointCoordinator#triggerCheckpoint方法,triggerCheckpoint主要做了以下几件事情

  1. 制作checkpoint之前,会做一些检查
       - 排队checkpoint数是否超过设定的允许最大并行checkpoint数
       - 两次checkpoint制作间隔是否满足要求
  2. check需要触发checkpoint所有source task是不是都是running状态
  3. check需要对checkpoint进行ack的所有task(包括source+非source)是不是都是running状态
  4. 对checkpointID加1
  5. 初始化CheckpointStorageLocation
  6. 往pendingCheckpoints这个map中新增一个PendingCheckpoint, key是checkpointID。 PendingCheckpoint表示checkpoint制作已经开始,不过部分task还没有进行ack,一旦所有的task都对这次checkpoint进行了ack,PendingCheckpoint就变成了CompletedCheckpoint。
  7. 注册cancellerHandle,比如checkpoint超时就cancel掉
  8. 对于所有的source task,调用execution.triggerCheckpoint方法

从全局看了checkpoint怎么触发的,然后再看下具体Task粒度怎么做checkpoint的,入口就是上面提到的execution.triggerCheckpoint,真正调用是Task#triggerCheckpointBarrier方法

2.2 SourceTask怎么触发checkpoint的

Task#triggerCheckpointBarrier方法会再去调用StreamTask#performCheckpoint方法,接着看StreamTask#performCheckpoint方法源码

            if (isRunning) {

                if (checkpointOptions.getCheckpointType().isSynchronous()) {
                    syncSavepointLatch.setCheckpointId(checkpointId);

                    if (advanceToEndOfTime) {
                        advanceToEndOfEventTime();
                    }
                }

                // All of the following steps happen as an atomic step from the perspective of barriers and
                // records/watermarks/timers/callbacks.
                // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
                // checkpoint alignments

                // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
                //           The pre-barrier work should be nothing or minimal in the common case.
                operatorChain.prepareSnapshotPreBarrier(checkpointId);

                // Step (2): Send the checkpoint barrier downstream
                operatorChain.broadcastCheckpointBarrier(
                        checkpointId,
                        checkpointMetaData.getTimestamp(),
                        checkpointOptions);

                // Step (3): Take the state snapshot. This should be largely asynchronous, to not
                //           impact progress of the streaming topology
                checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);

                return true;
            }

从源码可以看出,checkpoint的制作主流程是异步来执行的,主要分为

  1. checkpoint制作前的准备,允许operators可以做一些前置工作,这个前置工作最好是没有,也可以有很小一部分
  2. 往下游发barrier(向该Source Task的所有输出,以广播的方式发送barrier,CheckpointBarrier有三个对象组成checkpointId+timestamp+CheckpointOptions(CheckpointType和CheckpointStorageLocationReference))
  3. 开始做checkpoint,这个步骤应该尽可能的异步,以免影响job执行
2.3 SourceTask怎么制作checkpoint的

checkpoint制作跟进去看下StreamTask#executeCheckpointing代码

            startSyncPartNano = System.nanoTime();

            try {
                // 1. 同步执行,会依次调用每一个operator的StreamOperator.snapshotState,返回结果是一个runnable future。
                // 根据checkpoint配置成同步模式和异步模式的区别,这个future可能处于完成状态,也可能处于未完成状态
                for (StreamOperator<?> op : allOperators) {
                    checkpointStreamOperator(op);
                }

                if (LOG.isDebugEnabled()) {
                    LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",
                        checkpointMetaData.getCheckpointId(), owner.getName());
                }

                startAsyncPartNano = System.nanoTime();

                checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);

                // 2. 异步执行,如果是checkpoint配置成同步执行,这里实际上所有的runnable future都是已经完成的状态
                // we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
                AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
                    owner,
                    operatorSnapshotsInProgress,
                    checkpointMetaData,
                    checkpointMetrics,
                    startAsyncPartNano);

                owner.cancelables.registerCloseable(asyncCheckpointRunnable);
                owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);

                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} - finished synchronous part of checkpoint {}. " +
                            "Alignment duration: {} ms, snapshot duration {} ms",
                        owner.getName(), checkpointMetaData.getCheckpointId(),
                        checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
                        checkpointMetrics.getSyncDurationMillis());
                }
            }

其实checkpoint制作分为两部分

  1. 同步checkpoint
    依次调用每一个operator的StreamOperator.snapshotState,返回一个OperatorSnapshotFutures,并将其放入operatorSnapshotsInProgress。各operator的snapshotState方法的实现是一样的,可以看下AbstractStreamOperator#snapshotState,这个方法首先创建了一个OperatorSnapshotFutures,并且最终会把OperatorSnapshotFutures return出去,看下OperatorSnapshotFutures的成员变量,来大体了解下snapshotState主要是存什么内容
    @Nonnull
    private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateManagedFuture;

    @Nonnull
    private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateRawFuture;

    @Nonnull
    private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateManagedFuture;

    @Nonnull
    private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateRawFuture;

从成员变量就可以看出,snapshot中主要存储四部分内容,分别为keyed manage state, keyed raw state, operator manage state 和 operator raw state。

然后具体看下DefaultOperatorStateBackendSnapshotStrategy#snapshot中都做了什么?

  • 把注册的所有OperatorState(就是ListState) 和 BroadcastState 做深度拷贝
  • 将实际的写入操作封装在一个异步的 FutureTask中,如果不启用异步checkpoint模式,那么这个 FutureTask 在同步阶段就会立刻执行。这个 FutureTask 的主要工作包括:
       1. 获取注册的operator state 以及 broadcast operator state 信息
       2. 把上面的state写入checkpoint中
       3. 把状态元信息写入writtenStatesMetaData这个map中
       4. 关闭输出流,返回状态句柄OperatorStreamStateHandle
  1. 异步checkpoint
    同步checkpoint完成之后,会将具体的信息写入异步实现,该流程可以看下AsyncCheckpointRunnable#run方法
  • 调用new OperatorSnapshotFinalizer(snapshotInProgress);方法等待各future执行完成
  • 调用reportCompletedSnapshotStates方法完成checkpoint状态上报JobManager,也就是对checkpoint进行ack
2.4 非SourceTask是怎么触发checkpoint的

对于SourceTask,checkpint的执行是由JM触发,对于非SourceTask,则是依靠上游Task的checkpointBarrier消息触发。上面也提到了,在SourceTask进行checkpoint时,会向下游发送CheckpointBarrier消息,而下游的task正是拿到该消息后,进行checkpoint操作。

我们知道所有的Task都是继承StreamTask,在Task执行的时候,会去调用StreamTask#run方法,下面看下run方法逻辑

    /**
     * Runs the stream-tasks main processing loop.
     */
    private void run() throws Exception {
        final ActionContext actionContext = new ActionContext();
        while (true) {
            if (mailbox.hasMail()) {
                Optional<Runnable> maybeLetter;
                while ((maybeLetter = mailbox.tryTakeMail()).isPresent()) {
                    Runnable letter = maybeLetter.get();
                    if (letter == POISON_LETTER) {
                        return;
                    }
                    letter.run();
                }
            }

            processInput(actionContext);
        }
    }

processInput是用来处理数据的,StreamInputProcessor是处理数据记录的接口,它有三个实现类,分别为StreamOneInputProcessor,StreamTwoInputProcessor和StreamTwoInputSelectableProcessor。我们以StreamOneInputProcessor#processInput进行分析怎么处理barrier消息的

    @Override
    public boolean processInput() throws Exception {
        initializeNumRecordsIn();

        StreamElement recordOrMark = input.pollNextNullable();
        if (recordOrMark == null) {
            input.isAvailable().get();
            return !checkFinished();
        }
        int channel = input.getLastChannel();
        checkState(channel != StreamTaskInput.UNSPECIFIED);

        processElement(recordOrMark, channel);
        return true;
    }

关于barrier的调用链是:input.pollNextNullable() --> StreamTaskNetworkInput#pollNextNullable --> checkpointedInputGate.pollNext() --> barrierHandler.processBarrier,CheckpointBarrierHandler有两个实现类

  • CheckpointBarrierAligner,对应EXACTLY_ONCE语义
  • CheckpointBarrierTracker,对应AT_LEAST_ONCE语义

接下来看下CheckpointBarrierAligner#processBarrier方法,涉及到以下处理流程

  1. 如果下游Task的InputChannels为1(消费的partition数为1),直接不需要对齐,return
  2. 第一个barrier到达Task,调用beginNewAlignment方法开始checkpoint,然后调用onBarrier方法,将numBarriersReceived加1,把对应的inputchannel的状态blockedChannels[channelIndex]置为true,开始barrier对齐操作
  3. 不断调用onBarrier方法,直到所有的barrier都到齐。
  4. 等所有的barrier到齐之后,会进行 (1) 调用releaseBlocksAndResetBarriers,该方法把blockedChannels的状态都设置成false,并且把numBarriersReceived设置成0,方便下一次checkpoint制作,barrier对齐结束。 (2) 调用notifyCheckpoint方法,开始该Task 的checkpoint制作,checkpoint制作还是调用StreamTask#performCheckpoint方法,涉及到 i): snapshot前准备工作 ii:) 往下游广播barrier iii:) 做快照

至此,checkpoint的触发以及制作原理基本介绍完了

小结

本篇文章简单介绍了下checkpoint是怎么被调用,以及SourceTask怎么从开始被触发,以及非SourceTask怎么触发和制作checkpoint的。

  • barrier是以Task也就是operatorChain为单位下发的
  • snapshot中的state是以operator为单位制作的
  • 非SourceTask只有把它消费的所有partition的barrier都接收到之后,才会进行checkpoint的制作

推荐文章列表

checkpoint总结不错的文章 Flink CheckPoint详细过程

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,602评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,442评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,878评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,306评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,330评论 5 373
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,071评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,382评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,006评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,512评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,965评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,094评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,732评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,283评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,286评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,512评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,536评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,828评论 2 345

推荐阅读更多精彩内容