Flink中的每个函数和操作符都可以是有状态的(详细信息请参阅使用状态)。有状态的函数在单个元素/事件的处理过程中存储数据,使状态成为任何类型的复杂操作的关键部分。
为了使状态容错,Flink需要checkpoint状态。checkpoint允许Flink恢复流中的状态和位置,以使应用获得和无故障运行相同的语义。
文档《Data Streaming Fault Tolerance》描述了Flink的流容错机制背后的技术。
先决条件
Flink的checkpoint机制与流和状态的持久化存储交互。一般来说,它要求:
- 一个可以在一定时间内重放记录的持久化数据源。这些数据源的一些例子是持久化消息队列(例如,Apache Kafka, RabbitMQ, Amazon Kinesis, Google PubSub)或者文件系统(例如,HDFS, S3, GFS, NFS, Ceph, …)。
- 状态的持久化存储,通常是一个分布式文件系统(例如,HDFS, S3, GFS, NFS, Ceph, …)。
启用和配置checkpoint
默认情况下,checkpoint是禁用的。调用StreamExecutionEnvironment的enableCheckpointing(n)方法开启checkpoint,参数n是相邻checkpoint之间间隔的毫秒数。
checkpoint的其它参数包括:
- exactly-once vs. at-least-once: 可以选择这两种保证级别中的一个模式传递到enableCheckpointing(n)方法中。对于大多数程序来说,Exactly-once更可取。一些超低延迟(几毫秒)的应用可能更关注At-least-once。
- checkpoint timeout: checkpoint完成的超时时间,超过该时间后,checkpoint进程会被终止。
-
minimum time between checkpoints: 为了确保流应用在相邻checkpoint之间完成了一定的业务逻辑,可以定义相邻checkpoint之间必须间隔的时间。如果将此值设置为5000,下一个checkpoint不会在前一个checkpoint完成之后的5秒内启动,不论checkpoint持续时间和checkpoint间隔是多少。注意,这意味着checkpoint间隔永远不会小于这个参数。
通常通过定义“checkpoint之间的时间”而不是checkpoint间隔来配置应用,因为,“checkpoint之间的时间”不容易受到执行checkpoint消费比平时更长的时间(例如,如果目标存储系统突然变慢)的影响。
注意这个值也意味着checkpoint的并发度为1。 -
number of concurrent checkpoints: 默认情况下,在一个checkpoint正在执行的过程中,系统不会触发另一个checkpoint。这确保拓补结构不会在checkpoint上花费太多的时间,以及在处理流方面没有进展。可以允许多个重叠的checkpoint,这比较适合有一定处理延迟(例如,因为函数调用外部服务而需要一些时间响应)的管道,但是它仍然想做非常频繁的checkpoint(100毫秒)来重新处理小概率的失败。
当定义了相邻checkpoint最小间隔时间时,不能使用此选项。 - externalized checkpoints: 可以配置周期性的checkpoint,以便在外部持久化。外部的checkpoint将它们的元数据写入到持久化介质中,并且在作业失败时不会自动的清除。这样,如果你的作业失败时,你就会有一个checkpoint来恢复。更多的细节见Checkpoints。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
相关配置选项
更多的参数和默认值可以通过conf/flink-conf.yaml(请参阅完整配置)设置。
- state.backend: 如果开启了checkpoint,将用于存储操作符状态checkpoint的后端。支持的后端有:
- jobmanager: 内存状态,备份到JobManager/ZooKeeper的内存中。应该只用于最小状态(Kafka偏移)或者测试和本地调试。
- filesystem: 状态在TaskManager的内存中,状态快照存储在文件系统中。支持所有Flink支持的文件系统,例如HDFS, S3, …
- state.backend.fs.checkpointdir: Flink支持的文件系统中存储checkpoint的目录。例如:状态后端必须由JobManager访问,使用file://仅限于本地设置。
- state.backend.rocksdb.checkpointdir: 存储RocksDb文件的本地目录,或由系统目录分隔符分割的目录列表(例如,在Linux/Unix上是":")。默认值是taskmanager.tmp.dirs。
- state.checkpoints.dir: 外部checkpoint的元数据的目标目录。
- state.checkpoints.num-retained: 要保留的完整checkpoint的数量。如果最近的checkpoint已经损坏,那么拥有不止一个允许恢复之前状态的checkpoint。默认值是1。
选择一个状态后端
Flink的checkpoint机制存储计时器和有状态的操作符中的所有状态(包括连接器,窗口和任何用户定义的状态)的一致性快照。checkpoint存储在何处(例如,JobManager memory, file system, database)取决于配置的后端状态。
默认情况下,状态保存在TaskManager的内存中,checkpoint存储在JobManager的内存中。为了适配大状态的持久化,Flink支持在其它后端状态中存储和checkpoint状态。可以通过StreamExecutionEnvironment.setStateBackend(…)方法配置后端状态。
更多关于可用后端状态和作业范围和集群范围的配置选项请参阅后端状态。
迭代作业中的checkpoint状态
Flink当前仅为没有迭代的作业提供处理保证。在迭代作业中启用checkpoint会导致异常。为了在迭代程序上强制checkpoint,当开启checkpoint时用户需要设置一个特殊的标识env.enableCheckpointing(interval, force = true)。
请注意,当失败时,已经通过循环的记录(和跟它们相关的状态修改)会丢失。
重启策略
Flink支持不同的重启策略,该策略控制在失败时如何重新启动作业。更多的信息,见重启策略。