使用Data Stream API编写的程序通常以不同的形式持有状态:
- 在窗口中收集或聚合元素,直到触发状态存储
- 转换函数可能使用key/value状态接口来存储元素
- 转换函数可能实现
CheckpointedFunction
接口来使得它们的本地变量容错。
参见流API指南中的状态部分
当checkpoint被激活时,状态会被持久化到checkpoint,以防止数据丢失和无缝恢复。状态在内部如何组织和它们如何以及在哪持久化,依赖于所选的状态后端。
可选的状态后端
Flink内部提供了这些状态后端:
- MemoryStateBackend
- FsStateBackend
- RocksDBStateBackend
如果没有其他配置,系统将使用MemoryStateBackend。
MemoryStateBackend
MemoryStateBackend将内部的数据保存在Java堆上。 Key/value状态和窗口操作符持有存储值,触发器等的哈希表。
当进行checkpoint时,这个状态后端会对当前的状态进行快照,并且将其作为checkpoint ACK消息的一部分发送给JobManager(master),该JobManager将其存储在它的堆上。
MemoryStateBackend可以配置使用异步快照的方式。虽然我们强烈鼓励使用异步快照的方式来避免管道阻塞,但是请注意,这个是一个新特性,目前默认情况下不启用。为了启用这个状态,用户可以在初始化 MemoryStateBackend
时将构造函数中相应的布尔标识设为 true
,例如:
new MemoryStateBackend(MAX_MEM_STATE_SIZE, true);
MemoryStateBackend的局限性:
- 单个状态的大小默认情况下最大为5MB。这个值可以通过MemoryStateBackend构造函数进行增加。
- 无论配置的最大状态大小为多少,状态的大小不能超过akka帧大小(见Configuration)
- 聚合的状态必须在JobManager的内存中能存放
MemoryStateBackend适用于:
- 本地开发和调试
- 只有很小状态的作业,例如作业只由record-at-a-time函数组成(Map,FlatMap,Filter,...)。Kafka消费者只需要非常小的状态。
FsStateBackend
FsStateBackend使用文件系统URL(类型,地址,路径),例如“hdfs://namenode:40010/flink/checkpoints” 或 “file:///data/flink/checkpoints”.
FsStateBackend将in-flight数据存放在TaskManager的内存中。当进行checkpoint时,它将状态快照写入到配置的文件系统和目录。最小的元数据存储在JobManager的内存中(或者,在高可用模式下,在元数据checkpoint中)。
FsStateBackend默认使用异步快照以避免在写状态checkpoint时阻塞处理管道。要禁用此特性,用户可以初始化 MemoryStateBackend
时将构造函数中相应的布尔标识设为 false
,例如:
new FsStateBackend(path, false);
FsStateBackend适用于:
- 具有大状态,长窗口,大key/value状态的作业
- 所有的高可用性设置
RocksDBStateBackend
RocksDBStateBackend 使用文件系统URL(类型,地址,路径),例如“hdfs://namenode:40010/flink/checkpoints” 或 “file:///data/flink/checkpoints”.
RocksDBStateBackend将in-flight数据存储在RocksDB数据库中,它(默认)存储在TaskManager的data目录下。当checkpoint时,整个RocksDB数据库将被checkpoint到配置的文件系统和目录下。最小的元数据存储在JobManager的内存中(或者,在高可用模式下,在元数据checkpoint中)。
RocksDBStateBackend总是执行异步快照。
RocksDBStateBackend的限制:
- 作为RocksDB的JNI桥接API是基于byte[]的,每个key和value的最大的支持大小是 2^31字节。重要:在RocksDB中使用合并操作的状态(例如,ListState)能够默默的积累到值的size大于 2^31字节,并且在下次检索时会失败。这是目前 RocksDB JNI的限制。
RocksDBStateBackend适用于:
- 具有大状态,长窗口,大key/value状态的作业
- 所有的高可用性设置
注意:你可以保持的状态的数量只受限于磁盘可用空间的大小。相比于将状态保存到内存的FsStateBackend,这允许保持非常大的状态。然而,这也意味着,可以达到的最大的吞吐量会比状态后端的吞吐量要低。
RocksDBStateBackend是目前唯一提供增量checkpoint的状态后端(请参阅此处) 。
配置状态后端
如果您不做任何指定,默认的状态后端是JobManager。如果你希望为你的集群中的所有作业创建一个非默认的状态后端,你可以通过在flink-conf.yaml中指定一个新的默认后端。默认的状态后端可以在每个作业的基础上进行覆盖,如下所示.
设置一个作业级的状态后端
作业的状态后端通过作业中的 StreamExecutionEnvironment
进行设置,如下述示例所示:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
设置默认状态后端
默认状态后端可以通过在 flink-conf.yaml
中设置state.backend
值指定。
可能的配置项是jobmanager (MemoryStateBackend), filesystem (FsStateBackend), rocksdb (RocksDBStateBackend),或者实现了状态后端工厂FsStateBackendFactory的类的完全限定类名,例如,为RocksDBStateBackend设置为org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
。
配置文件中的示例部分如下所示:
# The backend that will be used to store operator state checkpoints
state.backend: filesystem
# Directory for storing checkpoints
state.backend.fs.checkpointdir: hdfs://namenode:40010/flink/checkpoints