本文内容是基于Flink 1.9来讲解。Flink使用checkpoint检查点来保证Exactly-Once语义的,这篇文章会从源码角度分析下checkpoint怎么触发的。首先说下checkpoint原理,可以直接参考数据流容错官方文档。
概述
Flink分布式快照的一个核心概念就是barrier,barrier会和数据记录一起下发到stream中,是一种特殊的消息,非常轻量。一个barrier就能把数据记录分到不同的snapshot中,不同snapshot对应的barrier可能同时在stream中,这也就意味着可以同时制作多个snapshot。
下面这两个图给出了barrier分割snapshot的原理,以及checkpoint制作过程
checkpoint制作
1. 源码入口 ExecutionGraphBuilder#buildGraph
在生成ExecutionGraph的时候,如果设置了开启checkpoint,那在buildGraph方法中会调用executionGraph.enableCheckpointing方法,咱们接着看下这个方法做了什么。
- 构建CheckpointCoordinator对象。CheckpointCoordinator主要负责触发checkpoint制作并接收task的ack信息,收集并维护task发送的ack state全局视图。
- 设置checkpoint状态追踪器
- 创建CheckpointCoordinatorDeActivator对象,该对象会监控JobStatus是activates/deactivates来启动/停止checkpoint scheduler
2. CheckpointCoordinatorDeActivator是怎么触发checkpoint制作的
2.1 看下CheckpointCoordinatorDeActivator#jobStatusChanges方法
@Override
public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
if (newJobStatus == JobStatus.RUNNING) {
// start the checkpoint scheduler
coordinator.startCheckpointScheduler();
} else {
// anything else should stop the trigger for now
coordinator.stopCheckpointScheduler();
}
}
如果监控到JobStatus变成RUNNING,就会调用CheckpointCoordinator的startCheckpointScheduler方法,最终会调用CheckpointCoordinator#triggerCheckpoint方法,triggerCheckpoint主要做了以下几件事情
- 制作checkpoint之前,会做一些检查
- 排队checkpoint数是否超过设定的允许最大并行checkpoint数
- 两次checkpoint制作间隔是否满足要求 - check需要触发checkpoint所有source task是不是都是running状态
- check需要对checkpoint进行ack的所有task(包括source+非source)是不是都是running状态
- 对checkpointID加1
- 初始化CheckpointStorageLocation
- 往pendingCheckpoints这个map中新增一个PendingCheckpoint, key是checkpointID。 PendingCheckpoint表示checkpoint制作已经开始,不过部分task还没有进行ack,一旦所有的task都对这次checkpoint进行了ack,PendingCheckpoint就变成了CompletedCheckpoint。
- 注册cancellerHandle,比如checkpoint超时就cancel掉
- 对于所有的source task,调用execution.triggerCheckpoint方法
从全局看了checkpoint怎么触发的,然后再看下具体Task粒度怎么做checkpoint的,入口就是上面提到的execution.triggerCheckpoint,真正调用是Task#triggerCheckpointBarrier方法
2.2 SourceTask怎么触发checkpoint的
Task#triggerCheckpointBarrier方法会再去调用StreamTask#performCheckpoint方法,接着看StreamTask#performCheckpoint方法源码
if (isRunning) {
if (checkpointOptions.getCheckpointType().isSynchronous()) {
syncSavepointLatch.setCheckpointId(checkpointId);
if (advanceToEndOfTime) {
advanceToEndOfEventTime();
}
}
// All of the following steps happen as an atomic step from the perspective of barriers and
// records/watermarks/timers/callbacks.
// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
// checkpoint alignments
// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
// The pre-barrier work should be nothing or minimal in the common case.
operatorChain.prepareSnapshotPreBarrier(checkpointId);
// Step (2): Send the checkpoint barrier downstream
operatorChain.broadcastCheckpointBarrier(
checkpointId,
checkpointMetaData.getTimestamp(),
checkpointOptions);
// Step (3): Take the state snapshot. This should be largely asynchronous, to not
// impact progress of the streaming topology
checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
return true;
}
从源码可以看出,checkpoint的制作主流程是异步来执行的,主要分为
- checkpoint制作前的准备,允许operators可以做一些前置工作,这个前置工作最好是没有,也可以有很小一部分
- 往下游发barrier(向该Source Task的所有输出,以广播的方式发送barrier,CheckpointBarrier有三个对象组成checkpointId+timestamp+CheckpointOptions(CheckpointType和CheckpointStorageLocationReference))
- 开始做checkpoint,这个步骤应该尽可能的异步,以免影响job执行
2.3 SourceTask怎么制作checkpoint的
checkpoint制作跟进去看下StreamTask#executeCheckpointing代码
startSyncPartNano = System.nanoTime();
try {
// 1. 同步执行,会依次调用每一个operator的StreamOperator.snapshotState,返回结果是一个runnable future。
// 根据checkpoint配置成同步模式和异步模式的区别,这个future可能处于完成状态,也可能处于未完成状态
for (StreamOperator<?> op : allOperators) {
checkpointStreamOperator(op);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",
checkpointMetaData.getCheckpointId(), owner.getName());
}
startAsyncPartNano = System.nanoTime();
checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);
// 2. 异步执行,如果是checkpoint配置成同步执行,这里实际上所有的runnable future都是已经完成的状态
// we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
owner,
operatorSnapshotsInProgress,
checkpointMetaData,
checkpointMetrics,
startAsyncPartNano);
owner.cancelables.registerCloseable(asyncCheckpointRunnable);
owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);
if (LOG.isDebugEnabled()) {
LOG.debug("{} - finished synchronous part of checkpoint {}. " +
"Alignment duration: {} ms, snapshot duration {} ms",
owner.getName(), checkpointMetaData.getCheckpointId(),
checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
checkpointMetrics.getSyncDurationMillis());
}
}
其实checkpoint制作分为两部分
- 同步checkpoint
依次调用每一个operator的StreamOperator.snapshotState,返回一个OperatorSnapshotFutures,并将其放入operatorSnapshotsInProgress。各operator的snapshotState方法的实现是一样的,可以看下AbstractStreamOperator#snapshotState,这个方法首先创建了一个OperatorSnapshotFutures,并且最终会把OperatorSnapshotFutures return出去,看下OperatorSnapshotFutures的成员变量,来大体了解下snapshotState主要是存什么内容
@Nonnull
private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateManagedFuture;
@Nonnull
private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateRawFuture;
@Nonnull
private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateManagedFuture;
@Nonnull
private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateRawFuture;
从成员变量就可以看出,snapshot中主要存储四部分内容,分别为keyed manage state, keyed raw state, operator manage state 和 operator raw state。
然后具体看下DefaultOperatorStateBackendSnapshotStrategy#snapshot中都做了什么?
- 把注册的所有OperatorState(就是ListState) 和 BroadcastState 做深度拷贝
- 将实际的写入操作封装在一个异步的 FutureTask中,如果不启用异步checkpoint模式,那么这个 FutureTask 在同步阶段就会立刻执行。这个 FutureTask 的主要工作包括:
1. 获取注册的operator state 以及 broadcast operator state 信息
2. 把上面的state写入checkpoint中
3. 把状态元信息写入writtenStatesMetaData这个map中
4. 关闭输出流,返回状态句柄OperatorStreamStateHandle
- 异步checkpoint
同步checkpoint完成之后,会将具体的信息写入异步实现,该流程可以看下AsyncCheckpointRunnable#run方法
- 调用new OperatorSnapshotFinalizer(snapshotInProgress);方法等待各future执行完成
- 调用reportCompletedSnapshotStates方法完成checkpoint状态上报JobManager,也就是对checkpoint进行ack
2.4 非SourceTask是怎么触发checkpoint的
对于SourceTask,checkpint的执行是由JM触发,对于非SourceTask,则是依靠上游Task的checkpointBarrier消息触发。上面也提到了,在SourceTask进行checkpoint时,会向下游发送CheckpointBarrier消息,而下游的task正是拿到该消息后,进行checkpoint操作。
我们知道所有的Task都是继承StreamTask,在Task执行的时候,会去调用StreamTask#run方法,下面看下run方法逻辑
/**
* Runs the stream-tasks main processing loop.
*/
private void run() throws Exception {
final ActionContext actionContext = new ActionContext();
while (true) {
if (mailbox.hasMail()) {
Optional<Runnable> maybeLetter;
while ((maybeLetter = mailbox.tryTakeMail()).isPresent()) {
Runnable letter = maybeLetter.get();
if (letter == POISON_LETTER) {
return;
}
letter.run();
}
}
processInput(actionContext);
}
}
processInput是用来处理数据的,StreamInputProcessor是处理数据记录的接口,它有三个实现类,分别为StreamOneInputProcessor,StreamTwoInputProcessor和StreamTwoInputSelectableProcessor。我们以StreamOneInputProcessor#processInput进行分析怎么处理barrier消息的
@Override
public boolean processInput() throws Exception {
initializeNumRecordsIn();
StreamElement recordOrMark = input.pollNextNullable();
if (recordOrMark == null) {
input.isAvailable().get();
return !checkFinished();
}
int channel = input.getLastChannel();
checkState(channel != StreamTaskInput.UNSPECIFIED);
processElement(recordOrMark, channel);
return true;
}
关于barrier的调用链是:input.pollNextNullable() --> StreamTaskNetworkInput#pollNextNullable --> checkpointedInputGate.pollNext() --> barrierHandler.processBarrier,CheckpointBarrierHandler有两个实现类
- CheckpointBarrierAligner,对应EXACTLY_ONCE语义
- CheckpointBarrierTracker,对应AT_LEAST_ONCE语义
接下来看下CheckpointBarrierAligner#processBarrier方法,涉及到以下处理流程
- 如果下游Task的InputChannels为1(消费的partition数为1),直接不需要对齐,return
- 第一个barrier到达Task,调用beginNewAlignment方法开始checkpoint,然后调用onBarrier方法,将numBarriersReceived加1,把对应的inputchannel的状态blockedChannels[channelIndex]置为true,开始barrier对齐操作
- 不断调用onBarrier方法,直到所有的barrier都到齐。
- 等所有的barrier到齐之后,会进行 (1) 调用releaseBlocksAndResetBarriers,该方法把blockedChannels的状态都设置成false,并且把numBarriersReceived设置成0,方便下一次checkpoint制作,barrier对齐结束。 (2) 调用notifyCheckpoint方法,开始该Task 的checkpoint制作,checkpoint制作还是调用StreamTask#performCheckpoint方法,涉及到 i): snapshot前准备工作 ii:) 往下游广播barrier iii:) 做快照
至此,checkpoint的触发以及制作原理基本介绍完了
小结
本篇文章简单介绍了下checkpoint是怎么被调用,以及SourceTask怎么从开始被触发,以及非SourceTask怎么触发和制作checkpoint的。
- barrier是以Task也就是operatorChain为单位下发的
- snapshot中的state是以operator为单位制作的
- 非SourceTask只有把它消费的所有partition的barrier都接收到之后,才会进行checkpoint的制作
推荐文章列表
checkpoint总结不错的文章 Flink CheckPoint详细过程