范例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure.
env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
使用
StreamExecutionEnvironment.enableCheckpointing
方法来设置开启checkpoint
(具体可以使用StreamExecutionEnvironment.getCheckpointConfig.enableCheckpointing(long interval)
),
或者StreamExecutionEnvironment.getCheckpointConfig.enableCheckpointing(long interval, CheckpointingMode mode)
。
interval
用于指定checkpoint
的触发间隔(单位milliseconds
)
而CheckpointingMode
默认是CheckpointingMode.EXACTLY_ONCE
也可以指定为CheckpointingMode.AT_LEAST_ONCE
也可以通过
StreamExecutionEnvironment.getCheckpointConfig().setCheckpointingMode
来设置CheckpointingMode
,
一般对于超低延迟的应用(大概几毫秒)可以使用CheckpointingMode.AT_LEAST_ONCE
,其他大部分应用使用CheckpointingMode.EXACTLY_ONCE
就可以
checkpointTimeout
用于指定checkpoint
执行的超时时间(单位milliseconds
),超时没完成就会被abort
掉。minPauseBetweenCheckpoints
用于指定checkpoint
距上一个checkpoint
完成之后最少等多久可以出发另一个checkpoint
,
当指定这个参数时,maxConcurrentCheckpoints
的值为1maxConcurrentCheckpoints
用于指定运行中的checkpoint
最多可以有多少个;
如果有设置了minPauseBetweenCheckpoints
,则maxConcurrentCheckpoints
这个参数就不起作用了(大于1的值不起作用)enableExternalizedCheckpoints
用于开启checkpoints
的外部持久化,但是在job
失败的时候不会自动清理,需要自己手工清理state
;ExternalizedCheckpointCleanup
用于指定当job canceled
的时候externalized checkpoint
该如何清理,DELETE_ON_CANCELLATION
的话,在job canceled
的时候会自动删除externalized state
,但是如果是FAILED
的状态则会保留;RETAIN_ON_CANCELLATION
则在job canceled
的时候会保留externalized checkpoint state
failOnCheckpointingErrors
用于指定在checkpoint
发生异常的时候,是否应该fail
该task
,默认为true
,如果设置为false
,则task
会拒绝checkpoint
然后继续运行
flink-conf.yaml相关配置:
#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
# state.backend: filesystem
# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend).
#
# state.backend.incremental: false
-
state.backend
用于指定checkpoint state
存储的backend
,默认为none
-
state.backend.async
用于指定backend
是否使用异步snapshot
(默认为true
),有些不支持async
或者只支持async
的state backend
可能会忽略这个参数 -
state.backend.fs.memory-threshold
,默认为1024
,用于指定存储于files
的state
大小阈[yù]值,如果小于该值则会存储在root checkpoint metadata file
-
state.backend.incremental
,默认为false
,用于指定是否采用增量checkpoint
,有些不支持增量checkpoint
的backend
会忽略该配置 -
state.backend.local-recovery
,默认为false
-
state.checkpoints.dir
,默认为none
,用于指定checkpoint
的data files
和meta data
存储的目录,该目录必须对所有参与的TaskManagers
及JobManagers
可见 -
state.checkpoints.num-retained
,默认为1,用于指定保留的已完成的checkpoints
个数 -
state.savepoints.dir
,默认为none
,用于指定savepoints
的默认目录 -
taskmanager.state.local.root-dirs
,默认为none
小结:
可以通过使用
StreamExecutionEnvironment.enableCheckpointing
方法来设置开启checkpoint
;具体可以使用enableCheckpointing(long interval)
,或者enableCheckpointing(long interval, CheckpointingMode mode)
checkpoint
的高级配置可以配置enableExternalizedCheckpoints
(用于开启checkpoints
的外部持久化,在job failed
的时候externalized checkpoint state
无法自动清理,但是在job canceled
的时候可以配置是删除还是保留state
)在
flink-conf.yaml
里头也有checkpoint
的相关配置,主要是state backend
的配置,比如state.backend.async
、state.backend.incremental
、state.checkpoints.dir
、state.savepoints.dir
等
Java 配置实例:
/**
* 是否重启标识flag
*/
private static boolean replayFlag = true;
/**
* 重启次数
*/
private static Integer replayTimes;
/**
* 重启时间间隔
*/
private static Integer replaySeconds;
private static Long checkPointTime;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
if (replayFlag) {
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
replayTimes,
Time.of(replaySeconds, TimeUnit.SECONDS)
));
CheckpointConfig config = env.getCheckpointConfig();
//env.setStateBackend(new FsStateBackend(checkPointDir));
// 任务流取消和故障时会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 设置checkpoint的周期, 每隔1000 ms进行启动一个检查点
config.setCheckpointInterval(checkPointTime);
// 设置模式为exactly-once
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
config.setMinPauseBetweenCheckpoints(500);
// 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
config.setCheckpointTimeout(checkPointTime);
// 同一时间只允许进行一个检查点
config.setMaxConcurrentCheckpoints(1);
}