flink状态后端----flink1.13发布

DataStream编写的程序通常在以下情况保存状态:

  • Windows在触发计算之前,收集数据或聚合

  • Transformation functions可能使用key/value来保存数据

  • Transformation functions可能实现CheckpointedFunction接口保存局部变量(其中,initializeState方法在初始化时触发,snapshotState方法在checkpoint时触发)

    # 集群级别的状态后端配置
    # The backend that will be used to store operator state checkpoints
    # (default none) filesystem or rocksdb.
    state.backend: filesystem
    # Directory for storing checkpoints
    state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
    

可用的状态后端(旧版)

  1. MemoryStateBackend

    内部数据作为对象在java堆空间中存储。checkpoint时,状态快照发送给jobmanager。

    //第一个参数是配置最大使用的堆内存大小  第二个是是否启用异步快照(默认true)
    new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
    env.setStateBackend(new MemoryStateBackend());
    

    局限性:1.默认每个单独的状态大小为5MB。2.不能大于akka.frame的大小。3.聚合状态必须适合jobManager的内存大小。

    使用场景:1.开发和调试。2.没有状态的任务。

    note:此状态后端不需要配置托管内存(managed memory),建议设置为0(默认为0)。

  2. FsStateBackend

    文件系统状态后端将正在进行的数据保存在TaskManager内存中。检查点完成后,将状态快照保存到文件系统中。最小的元数据存储在JobManager的内存中(或在高可用性模式下,存储在元数据检查点中)。

    //第一个参数是配置路径(hdfs://namenode:40010/flink/checkpoints或file:///data/flink/checkpoints)  第二个是是否启用异步快照(默认true)
    new FsStateBackend(path, false);
    env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
    

    使用场景:1.Jobs with large state, long windows, large key/value states。2.All high-availability setups.

    note:此状态后端不需要配置托管内存(managed memory),建议设置为0(默认为0)。

  3. RocksDBStateBackend

    RocksDB状态后端将RocksDB数据库中的数据保存在TaskManager目录中。检查点完成后,整个数据库的数据将进入配置的目录。最小的元数据存储在JobManager的内存中(或在高可用性模式下,存储在元数据检查点中)。

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
        <version>1.12.0</version>
        <scope>provided</scope>
    </dependency>
    
    //第一个参数是配置路径(hdfs://namenode:40010/flink/checkpoints或file:///data/flink/checkpoints)  第二个是是否增量
    new RocksDBStateBackend(path, true);
    env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));
    

    使用场景:1.Jobs with large state, long windows, large key/value states。2.All high-availability setups.

    note:此状态后端可以保持非常大的状态,可以实现的最大吞吐量将降低。

可用的状态后端(新版)

2021年五一,flink1.13发布,其中 1.FLINK-21935:Remove state.backend.async option. 2.FLINK-20976:Unify binary format for Keyed State savepoints.

Flink新版本移除状态后端异步选项,让状态后端只能异步快照,并且统一了savepoint的二进制格式。这样做简化了状态后端,因为MemoryStateBackend和FsStateBackend的状态都存储在内存,而RocksDBStateBackend存储在数据库。FsStateBackend和RocksDBStateBackend都要配置存储路径,可以统一设置路径。

  1. HashMapStateBackend

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStateBackend(new HashMapStateBackend());
    //等价于MemoryStateBackend
    env.getCheckpointConfig().setCheckpointStorage(new JobManagerStateBackend());
    //等价于FsStateBackend
    env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://namenode:40010/flink/checkpoints"));
    
  2. EmbeddedRocksDBStateBackend

    //等价于RocksDBStateBackend,默认全量检查点
    env.setStateBackend(new EmbeddedRocksDBStateBackend());
    //开启增量检查点
    env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
    env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://namenode:40010/flink/checkpoints"));
    

如何选择正确的状态后端

旧版中,默认MemoryStateBackend不用于生产。在FsStateBackendRocksDB之间进行选择时,可以在性能和可伸缩性之间进行选择。 FsStateBackend每个状态访问和更新都对Java堆上的对象进行操作,因此速度非常快;但是,状态大小受群集内可用内存的限制。另一方面,它RocksDB可以根据可用磁盘空间进行扩展,并且是唯一支持增量快照的状态后端。但是,每个状态访问和更新都需要(反)序列化,并且可能需要从磁盘读取数据,这导致平均性能比内存状态后端慢一个数量级。

flink1.13发布,新版中,在HashMapStateBackendRocksDB之间进行选择时,可以在性能和可伸缩性之间进行选择。

关于RocksDBStateBackend的使用细节

增量检查

  • flink-conf.yaml中配置:state.backend.incremental: true
  • 代码中配置:RocksDBStateBackend backend = new RocksDBStateBackend(checkpointDirURI, true);

内存管理

state.backend.rocksdb.memory.managed: true(default true)。flink不会直接管理RocksDB的内存,默认情况下,开启托管内存,并且预定义的RocksDB将失效,预定义配置有四种(DEFAULT/SPINNING_DISK_OPTIMIZED/SPINNING_DISK_OPTIMIZED_HIGH_MEM/FLASH_SSD_OPTIMIZED),可以通过rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM)进行配置。

托管内存是按slot计算的。单个slot上,所有的RocksDB实例共用a shared cache和write buffer manager。其中shared cache主要包含三个组件:1.block cache,2.index and bloom filters,3.MemTables。提供两个参数对写(MemTables)操作和读(index和filters)操作进行配置:

#表示write buffer占比50% (default 0.5)
state.backend.rocksdb.memory.write-buffer-ratio: 0.5
#表示高速缓存占比10% (default 0.1)
state.backend.rocksdb.memory.high-prio-pool-ratio: 0.1

state.backend.rocksdb.memory.managed: false。如果要手动管理RocksDB的内存,可以通过指定类RocksDBStateBackend.setRocksDBOptions(new MyOptionsFactory());,该类需要实现ConfigurableRocksDBOptionsFactory接口。

定时器

如果使用RocksDB作为状态后端,flink的定时器也将存储在数据库中。但是如果定时器比较少时,基于堆内存的定时器会获得更好的性能。配置参数为state.backend.rocksdb.timer-service.factoryto: heap (default rocksdb)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,607评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,047评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,496评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,405评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,400评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,479评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,883评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,535评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,743评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,544评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,612评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,309评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,881评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,891评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,136评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,783评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,316评论 2 342

推荐阅读更多精彩内容