Savepoint
Savepoint 和 Checkpoint 的区别
Savepoint 是命令触发的 Checkpoint,对流式程序做一次完整的快照并将结果写到 State backend,可用于停止、恢复或更新 Flink 程序。整个过程依赖于 Checkpoint 机制。另一个不同之处是,Savepoint 不会自动清除。
分配 Operator IDs
Savepoint 中会以 Operator ID 作为 key 保存每个有状态算子的状态:
Operator ID | State |
---|---|
source-id | State of StatefulSource |
mapper-id | State of StatefulMapper |
Operator ID 用于确定每个算子的状态,只要ID不变,就可以从 Savepoint 中恢复,Operator ID 如果不显示指定会自动生成,生成的ID取决于程序的结构,并且对程序更改很敏感。因此,建议手动分配这些ID:
DataStream<String> stream = env.
// Stateful source (e.g. Kafka) with ID
.addSource(new StatefulSource())
.uid("source-id") // ID for the source operator
.shuffle()
// Stateful mapper with ID
.map(new StatefulMapper())
.uid("mapper-id") // ID for the mapper
// Stateless printing sink
.print(); // Auto-generated ID
Savepoint 操作
触发 Savepoint 时,会创建一个新的 Savepoint 目录,其中将存储数据和元数据。可以通过配置默认 targetDirectory 或指定自定义 targetDirectory:
state.savepoints.dir: hdfs:///flink/savepoints
如果既未配置缺省值也未指定自定义目录,Savepoint 将失败。
触发 Savepoint
$ bin/flink savepoint :jobId [:targetDirectory]
生成 Savepoint(以 jobId 作为唯一ID),并返回创建的 Savepoint 的路径,恢复时需要使用。
在 Yarn 集群触发 Savepoint
$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
要指定 jobId 和 yarnAppId(YARN应用程序ID),并返回创建的 Savepoint 的路径。
取消作业时生成 Savepoint
$ bin/flink cancel -s [:targetDirectory] :jobId
以原子方式触发具有 jobId 的 Savepoint,并取消作业。
恢复 Savepoint
$ bin/flink run -s :savepointPath [:runArgs]
提交作业,并指定要恢复的 Savepoint路径。
允许启动有未恢复 State
$ bin/flink run -s :savepointPath -n [:runArgs]
默认情况下,恢复操作将尝试将 Savepoint 的所有 State 恢复。如果删除了运算符,则可以通过 –allowNonRestoredState
(简写为 -n
) 选项跳过无法映射到新程序的状态。
删除 Savepoint
$ bin/flink savepoint -d :savepointPath
通过指定路径删除 Savepoint,也可以通过文件系统手动删除 Savepoint 数据,而不会影响其他 Savepoint 或 Checkpoint。
常见问题
应该为所有算子分配ID吗?
根据经验,是的。严格地说,只需要通过该uid()
方法将ID分配给作业中的有状态 算子。Savepoint 仅包含这些算子的 State,无状态算子不是保存点的一部分。
如果在作业中新添加一个有状态算子,会发生什么?
新算子将在没有任何状态的情况下进行初始化,类似于无状态算子。
如果在作业删除一个有状态的算子,会发生什么?
如果没有指定允许启动有未恢复 State(–allowNonRestoredState
/ -n
),启动会失败。
如果在作业中重新排列有状态算子,会发生什么?
如果手动这些算子分配了ID,作业将照常恢复。否则,重新排序后,有状态算子的自动生成ID很可能会更改,将导致无法从 Savepoint 恢复。
如果在作业中添加,删除或重新排序没有状态的算子,会发生什么?
如果为有状态算子手动分配了ID,作业将照常恢复,则无状态算子的改变不会影响。否则,重新排序后,有状态算子的自动生成ID很可能会更改,将导致无法从 Savepoint 恢复。
如果作业的并行性发生改变,会发生什么?
如果 Savepoint 的生成是使用 Flink 1.2.0 以及之后的版本,并且没有使用弃用状态API,可以正常恢复作业。
如果 Savepoint 的生成比 Flink 1.2.0 更早的版本,或者使用弃用状态API,则首先必须将作业和 Savepoint 升级到1.2.0以及之后的版本,然后才能更改并行度。请参考官方 升级指南。
Restart
Flink 支持多种不同的重启策略,控制着作业失败后如何重启。集群可以设置默认的重启策略,作业提交的时候也可以指定重启策略,覆盖默认的重启策略。
默认的重启策略配置在 conf/flink-conf.yaml
,参数 restart-strategy
定义了采用什么策略。如果 checkpoint 未启用,就会采用 "no restart" 策略,如果启用了 checkpoint 机制,但是未指定重启策略的话,就会采用 "fixed-delay" 策略。每个重启策略都有自己的参数来控制它的行为,这些值也可以在配置文件中设置,每个重启策略的描述都包含着各自的配置值信息。
以下是支持的三种重启策略的可配置项
重启策略 | 重启策略值 |
---|---|
Fixed delay | fixed-delay |
Failure rate | failure-rate |
No restart | None |
除了定义一个默认的重启策略之外,你还可以为每一个Job指定它自己的重启策略,这个重启策略可以在ExecutionEnvironment中调用setRestartStrategy()方法来程序化地调用,这种方式同样适用于StreamExecutionEnvironment。
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(10, TimeUnit.SECONDS) // delay
))
固定延迟重启策略(Fixed Delay Restart Strategy)
尝试一个给定的次数来重启Job,如果超过了最大的重启次数,Job最终将失败。在连续的两次重启尝试之间,重启策略会等待一个固定的时间。
参数配置 | 描述 | 默认值 |
---|---|---|
restart-strategy.fixed-delay.attempts | Flink尝试执行的次数 | 1,如果启用checkpoint的话是Integer.MAX_VALUE |
restart-strategy.fixed-delay.delay | 两次重启之间等待的时间 | akka.ask.timeout,如果启用checkpoint的话是1s |
flink-conf.yaml
参数配置:
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(10, TimeUnit.SECONDS) // delay
))
失败率重启策略(Failure Rate Restart Strategy)
Job失败后会重启次数如果超过失败率,Job会最终被认定失败。在两个连续的重启尝试之间,重启策略会等待一个固定的时间。
配置参数 | 描述 | 默认值 |
---|---|---|
restart-strategy.failure-rate.max-failures-per-interval | Flink尝试执行的次数 | 1 |
restart-strategy.failure-rate.failure-rate-interval | 计算失败率的时间间隔 | 1 min |
restart-strategy.failure-rate.delay | 两次重启之间等待的时间 | akka.ask.timeout |
flink-conf.yaml
参数配置:
restart-strategy:failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 每个测量时间间隔最大失败次数
Time.of(5, TimeUnit.MINUTES), //失败率测量的时间间隔
Time.of(10, TimeUnit.SECONDS) // 两次连续重启尝试的时间间隔
))
无重启策略(No Restart Strategy)
Job直接失败,不会尝试进行重启
flink-conf.yaml
参数配置:
restart-strategy: none
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.noRestart())
Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/savepoints.html
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/restart_strategies.html