Flink Checkpoint配置

范例:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);

// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);

// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure.
env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
  • 使用StreamExecutionEnvironment.enableCheckpointing方法来设置开启checkpoint
    (具体可以使用StreamExecutionEnvironment.getCheckpointConfig.enableCheckpointing(long interval)),
    或者StreamExecutionEnvironment.getCheckpointConfig.enableCheckpointing(long interval, CheckpointingMode mode)
    interval用于指定checkpoint的触发间隔(单位milliseconds)
    CheckpointingMode默认是CheckpointingMode.EXACTLY_ONCE
    也可以指定为CheckpointingMode.AT_LEAST_ONCE

  • 也可以通过StreamExecutionEnvironment.getCheckpointConfig().setCheckpointingMode来设置CheckpointingMode
    一般对于超低延迟的应用(大概几毫秒)可以使用CheckpointingMode.AT_LEAST_ONCE,其他大部分应用使用CheckpointingMode.EXACTLY_ONCE就可以
    checkpointTimeout用于指定checkpoint执行的超时时间(单位milliseconds),超时没完成就会被abort掉。

  • minPauseBetweenCheckpoints用于指定checkpoint距上一个checkpoint完成之后最少等多久可以出发另一个checkpoint
    当指定这个参数时,maxConcurrentCheckpoints的值为1

  • maxConcurrentCheckpoints用于指定运行中的checkpoint最多可以有多少个;
    如果有设置了minPauseBetweenCheckpoints,则maxConcurrentCheckpoints这个参数就不起作用了(大于1的值不起作用)

  • enableExternalizedCheckpoints用于开启checkpoints的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理stateExternalizedCheckpointCleanup用于指定当job canceled的时候externalized checkpoint该如何清理,DELETE_ON_CANCELLATION的话,在job canceled的时候会自动删除externalized state,但是如果是FAILED的状态则会保留;RETAIN_ON_CANCELLATION则在job canceled的时候会保留externalized checkpoint state

  • failOnCheckpointingErrors用于指定在checkpoint发生异常的时候,是否应该failtask,默认为true,如果设置为false,则task会拒绝checkpoint然后继续运行

flink-conf.yaml相关配置:

#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================

# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
# state.backend: filesystem

# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints

# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints

# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend). 
#
# state.backend.incremental: false
  • state.backend用于指定checkpoint state存储的backend,默认为none
  • state.backend.async用于指定backend是否使用异步snapshot(默认为true),有些不支持async或者只支持asyncstate backend可能会忽略这个参数
  • state.backend.fs.memory-threshold,默认为1024,用于指定存储于filesstate大小阈[yù]值,如果小于该值则会存储在root checkpoint metadata file
  • state.backend.incremental,默认为false,用于指定是否采用增量checkpoint,有些不支持增量checkpointbackend会忽略该配置
  • state.backend.local-recovery,默认为false
  • state.checkpoints.dir,默认为none,用于指定checkpointdata filesmeta data存储的目录,该目录必须对所有参与的TaskManagersJobManagers可见
  • state.checkpoints.num-retained,默认为1,用于指定保留的已完成的checkpoints个数
  • state.savepoints.dir,默认为none,用于指定savepoints的默认目录
  • taskmanager.state.local.root-dirs,默认为none

小结:

  • 可以通过使用StreamExecutionEnvironment.enableCheckpointing方法来设置开启checkpoint;具体可以使用enableCheckpointing(long interval),或者enableCheckpointing(long interval, CheckpointingMode mode)

  • checkpoint的高级配置可以配置enableExternalizedCheckpoints(用于开启checkpoints的外部持久化,在job failed的时候externalized checkpoint state无法自动清理,但是在job canceled的时候可以配置是删除还是保留state)

  • flink-conf.yaml里头也有checkpoint的相关配置,主要是state backend的配置,比如state.backend.asyncstate.backend.incrementalstate.checkpoints.dirstate.savepoints.dir

Java 配置实例:

/**
 * 是否重启标识flag
 */
private static boolean replayFlag = true;

/**
 * 重启次数
 */
private static Integer replayTimes;

/**
 * 重启时间间隔
 */
private static Integer replaySeconds;

private static Long checkPointTime;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

if (replayFlag) {
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
            replayTimes,
            Time.of(replaySeconds, TimeUnit.SECONDS)
    ));
    CheckpointConfig config = env.getCheckpointConfig();

    //env.setStateBackend(new FsStateBackend(checkPointDir));
    // 任务流取消和故障时会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
    config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    
    // 设置checkpoint的周期, 每隔1000 ms进行启动一个检查点
    config.setCheckpointInterval(checkPointTime);
    
    // 设置模式为exactly-once
    config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    
    // 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
    config.setMinPauseBetweenCheckpoints(500);
    
    // 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
    config.setCheckpointTimeout(checkPointTime);
    
    // 同一时间只允许进行一个检查点
    config.setMaxConcurrentCheckpoints(1);
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容