Flink DataStream 状态和容错 三:Savepoint 和 Restart

Savepoint

Savepoint 和 Checkpoint 的区别

Savepoint 是命令触发的 Checkpoint,对流式程序做一次完整的快照并将结果写到 State backend,可用于停止、恢复或更新 Flink 程序。整个过程依赖于 Checkpoint 机制。另一个不同之处是,Savepoint 不会自动清除。

分配 Operator IDs

Savepoint 中会以 Operator ID 作为 key 保存每个有状态算子的状态:

Operator ID State
source-id State of StatefulSource
mapper-id State of StatefulMapper

Operator ID 用于确定每个算子的状态,只要ID不变,就可以从 Savepoint 中恢复,Operator ID 如果不显示指定会自动生成,生成的ID取决于程序的结构,并且对程序更改很敏感。因此,建议手动分配这些ID:

DataStream<String> stream = env.
  // Stateful source (e.g. Kafka) with ID
  .addSource(new StatefulSource())
  .uid("source-id") // ID for the source operator
  .shuffle()
  // Stateful mapper with ID
  .map(new StatefulMapper())
  .uid("mapper-id") // ID for the mapper
  // Stateless printing sink
  .print(); // Auto-generated ID

Savepoint 操作

触发 Savepoint 时,会创建一个新的 Savepoint 目录,其中将存储数据和元数据。可以通过配置默认 targetDirectory 或指定自定义 targetDirectory:

state.savepoints.dir: hdfs:///flink/savepoints

如果既未配置缺省值也未指定自定义目录,Savepoint 将失败。

触发 Savepoint

$ bin/flink savepoint :jobId [:targetDirectory]

生成 Savepoint(以 jobId 作为唯一ID),并返回创建的 Savepoint 的路径,恢复时需要使用。

在 Yarn 集群触发 Savepoint

$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId

要指定 jobId 和 yarnAppId(YARN应用程序ID),并返回创建的 Savepoint 的路径。

取消作业时生成 Savepoint

$ bin/flink cancel -s [:targetDirectory] :jobId

以原子方式触发具有 jobId 的 Savepoint,并取消作业。

恢复 Savepoint

$ bin/flink run -s :savepointPath [:runArgs]

提交作业,并指定要恢复的 Savepoint路径。

允许启动有未恢复 State

$ bin/flink run -s :savepointPath -n [:runArgs]

默认情况下,恢复操作将尝试将 Savepoint 的所有 State 恢复。如果删除了运算符,则可以通过 –allowNonRestoredState(简写为 -n) 选项跳过无法映射到新程序的状态。

删除 Savepoint

$ bin/flink savepoint -d :savepointPath

通过指定路径删除 Savepoint,也可以通过文件系统手动删除 Savepoint 数据,而不会影响其他 Savepoint 或 Checkpoint。

常见问题

应该为所有算子分配ID吗?
根据经验,是的。严格地说,只需要通过该uid()方法将ID分配给作业中的有状态 算子。Savepoint 仅包含这些算子的 State,无状态算子不是保存点的一部分。

如果在作业中新添加一个有状态算子,会发生什么?
新算子将在没有任何状态的情况下进行初始化,类似于无状态算子。

如果在作业删除一个有状态的算子,会发生什么?
如果没有指定允许启动有未恢复 State(–allowNonRestoredState / -n),启动会失败。

如果在作业中重新排列有状态算子,会发生什么?
如果手动这些算子分配了ID,作业将照常恢复。否则,重新排序后,有状态算子的自动生成ID很可能会更改,将导致无法从 Savepoint 恢复。

如果在作业中添加,删除或重新排序没有状态的算子,会发生什么?
如果为有状态算子手动分配了ID,作业将照常恢复,则无状态算子的改变不会影响。否则,重新排序后,有状态算子的自动生成ID很可能会更改,将导致无法从 Savepoint 恢复。

如果作业的并行性发生改变,会发生什么?
如果 Savepoint 的生成是使用 Flink 1.2.0 以及之后的版本,并且没有使用弃用状态API,可以正常恢复作业。

如果 Savepoint 的生成比 Flink 1.2.0 更早的版本,或者使用弃用状态API,则首先必须将作业和 Savepoint 升级到1.2.0以及之后的版本,然后才能更改并行度。请参考官方 升级指南

Restart

Flink 支持多种不同的重启策略,控制着作业失败后如何重启。集群可以设置默认的重启策略,作业提交的时候也可以指定重启策略,覆盖默认的重启策略。

默认的重启策略配置在 conf/flink-conf.yaml,参数 restart-strategy 定义了采用什么策略。如果 checkpoint 未启用,就会采用 "no restart" 策略,如果启用了 checkpoint 机制,但是未指定重启策略的话,就会采用 "fixed-delay" 策略。每个重启策略都有自己的参数来控制它的行为,这些值也可以在配置文件中设置,每个重启策略的描述都包含着各自的配置值信息。

以下是支持的三种重启策略的可配置项

重启策略 重启策略值
Fixed delay fixed-delay
Failure rate failure-rate
No restart None

除了定义一个默认的重启策略之外,你还可以为每一个Job指定它自己的重启策略,这个重启策略可以在ExecutionEnvironment中调用setRestartStrategy()方法来程序化地调用,这种方式同样适用于StreamExecutionEnvironment。

val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3, // number of restart attempts
  Time.of(10, TimeUnit.SECONDS) // delay
))

固定延迟重启策略(Fixed Delay Restart Strategy)

尝试一个给定的次数来重启Job,如果超过了最大的重启次数,Job最终将失败。在连续的两次重启尝试之间,重启策略会等待一个固定的时间。

参数配置 描述 默认值
restart-strategy.fixed-delay.attempts Flink尝试执行的次数 1,如果启用checkpoint的话是Integer.MAX_VALUE
restart-strategy.fixed-delay.delay 两次重启之间等待的时间 akka.ask.timeout,如果启用checkpoint的话是1s

flink-conf.yaml 参数配置:

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3, // number of restart attempts
  Time.of(10, TimeUnit.SECONDS) // delay
))

失败率重启策略(Failure Rate Restart Strategy)

Job失败后会重启次数如果超过失败率,Job会最终被认定失败。在两个连续的重启尝试之间,重启策略会等待一个固定的时间。

配置参数 描述 默认值
restart-strategy.failure-rate.max-failures-per-interval Flink尝试执行的次数 1
restart-strategy.failure-rate.failure-rate-interval 计算失败率的时间间隔 1 min
restart-strategy.failure-rate.delay 两次重启之间等待的时间 akka.ask.timeout

flink-conf.yaml 参数配置:

restart-strategy:failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3 
restart-strategy.failure-rate.failure-rate-interval: 5 min 
restart-strategy.failure-rate.delay: 10 s
val env = ExecutionEnvironment.getExecutionEnvironment() 
env.setRestartStrategy(RestartStrategies.failureRateRestart( 
  3, // 每个测量时间间隔最大失败次数 
  Time.of(5, TimeUnit.MINUTES), //失败率测量的时间间隔 
  Time.of(10, TimeUnit.SECONDS) // 两次连续重启尝试的时间间隔 
))

无重启策略(No Restart Strategy)

Job直接失败,不会尝试进行重启

flink-conf.yaml 参数配置:

restart-strategy: none
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.noRestart())

Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/savepoints.html
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/restart_strategies.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343

推荐阅读更多精彩内容