DataStream编写的程序通常在以下情况保存状态:
Windows在触发计算之前,收集数据或聚合
Transformation functions可能使用key/value来保存数据
-
Transformation functions可能实现CheckpointedFunction接口保存局部变量(其中,initializeState方法在初始化时触发,snapshotState方法在checkpoint时触发)
# 集群级别的状态后端配置 # The backend that will be used to store operator state checkpoints # (default none) filesystem or rocksdb. state.backend: filesystem # Directory for storing checkpoints state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
可用的状态后端(旧版)
-
MemoryStateBackend
内部数据作为对象在java堆空间中存储。checkpoint时,状态快照发送给jobmanager。
//第一个参数是配置最大使用的堆内存大小 第二个是是否启用异步快照(默认true) new MemoryStateBackend(MAX_MEM_STATE_SIZE, false); env.setStateBackend(new MemoryStateBackend());
局限性:1.默认每个单独的状态大小为5MB。2.不能大于akka.frame的大小。3.聚合状态必须适合jobManager的内存大小。
使用场景:1.开发和调试。2.没有状态的任务。
note:此状态后端不需要配置托管内存(managed memory),建议设置为0(默认为0)。
-
FsStateBackend
文件系统状态后端将正在进行的数据保存在TaskManager内存中。检查点完成后,将状态快照保存到文件系统中。最小的元数据存储在JobManager的内存中(或在高可用性模式下,存储在元数据检查点中)。
//第一个参数是配置路径(hdfs://namenode:40010/flink/checkpoints或file:///data/flink/checkpoints) 第二个是是否启用异步快照(默认true) new FsStateBackend(path, false); env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
使用场景:1.Jobs with large state, long windows, large key/value states。2.All high-availability setups.
note:此状态后端不需要配置托管内存(managed memory),建议设置为0(默认为0)。
-
RocksDBStateBackend
RocksDB状态后端将RocksDB数据库中的数据保存在TaskManager目录中。检查点完成后,整个数据库的数据将进入配置的目录。最小的元数据存储在JobManager的内存中(或在高可用性模式下,存储在元数据检查点中)。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.11</artifactId> <version>1.12.0</version> <scope>provided</scope> </dependency>
//第一个参数是配置路径(hdfs://namenode:40010/flink/checkpoints或file:///data/flink/checkpoints) 第二个是是否增量 new RocksDBStateBackend(path, true); env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));
使用场景:1.Jobs with large state, long windows, large key/value states。2.All high-availability setups.
note:此状态后端可以保持非常大的状态,可以实现的最大吞吐量将降低。
可用的状态后端(新版)
2021年五一,flink1.13发布,其中 1.FLINK-21935:Remove state.backend.async option. 2.FLINK-20976:Unify binary format for Keyed State savepoints.
Flink新版本移除状态后端异步选项,让状态后端只能异步快照,并且统一了savepoint的二进制格式。这样做简化了状态后端,因为MemoryStateBackend和FsStateBackend的状态都存储在内存,而RocksDBStateBackend存储在数据库。FsStateBackend和RocksDBStateBackend都要配置存储路径,可以统一设置路径。
-
HashMapStateBackend
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new HashMapStateBackend()); //等价于MemoryStateBackend env.getCheckpointConfig().setCheckpointStorage(new JobManagerStateBackend()); //等价于FsStateBackend env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://namenode:40010/flink/checkpoints"));
-
EmbeddedRocksDBStateBackend
//等价于RocksDBStateBackend,默认全量检查点 env.setStateBackend(new EmbeddedRocksDBStateBackend()); //开启增量检查点 env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://namenode:40010/flink/checkpoints"));
如何选择正确的状态后端
旧版中,默认MemoryStateBackend不用于生产。在FsStateBackend
和RocksDB
之间进行选择时,可以在性能和可伸缩性之间进行选择。 FsStateBackend
每个状态访问和更新都对Java堆上的对象进行操作,因此速度非常快;但是,状态大小受群集内可用内存的限制。另一方面,它RocksDB
可以根据可用磁盘空间进行扩展,并且是唯一支持增量快照
的状态后端。但是,每个状态访问和更新都需要(反)序列化,并且可能需要从磁盘读取数据,这导致平均性能比内存状态后端慢一个数量级。
flink1.13发布,新版中,在HashMapStateBackend
和RocksDB
之间进行选择时,可以在性能和可伸缩性之间进行选择。
关于RocksDBStateBackend的使用细节
增量检查
- flink-conf.yaml中配置:
state.backend.incremental: true
- 代码中配置:RocksDBStateBackend backend = new RocksDBStateBackend(checkpointDirURI, true);
内存管理
state.backend.rocksdb.memory.managed: true(default true)
。flink不会直接管理RocksDB的内存,默认情况下,开启托管内存,并且预定义的RocksDB将失效,预定义配置有四种(DEFAULT/SPINNING_DISK_OPTIMIZED/SPINNING_DISK_OPTIMIZED_HIGH_MEM/FLASH_SSD_OPTIMIZED
),可以通过rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM)
进行配置。
托管内存是按slot计算的。单个slot上,所有的RocksDB实例共用a shared cache和write buffer manager。其中shared cache主要包含三个组件:1.block cache,2.index and bloom filters,3.MemTables。提供两个参数对写(MemTables)操作和读(index和filters)操作进行配置:
#表示write buffer占比50% (default 0.5)
state.backend.rocksdb.memory.write-buffer-ratio: 0.5
#表示高速缓存占比10% (default 0.1)
state.backend.rocksdb.memory.high-prio-pool-ratio: 0.1
state.backend.rocksdb.memory.managed: false
。如果要手动管理RocksDB的内存,可以通过指定类RocksDBStateBackend.setRocksDBOptions(new MyOptionsFactory());
,该类需要实现ConfigurableRocksDBOptionsFactory接口。
定时器
如果使用RocksDB作为状态后端,flink的定时器也将存储在数据库中。但是如果定时器比较少时,基于堆内存的定时器会获得更好的性能。配置参数为state.backend.rocksdb.timer-service.factoryto: heap (default rocksdb)